Unverified Commit f27c62a5 authored by LiemLin's avatar LiemLin Committed by GitHub
Browse files

workflow lineage (#2421)



* workflow lineage

* workflow lineage unit test

* stash

* add apache license

* Update .env

* code optimize

* add unit test

* add apache license

* add apache license

* optimized code

* add postgresql support and optimized code

* Update pom.xml

* Update en_US.js

* Update zh_CN.js

* remove mock code

* inport i18n

* Update pom.xml

* Delete .env

* add env file

Co-authored-by: default avatarlinhaiqiang <haiqiang.lin@five-star.cn>
Co-authored-by: default avatardailidong <dailidong66@gmail.com>
Co-authored-by: default avatarbecarchal <yangbin_57@sina.com>
parent 3a0d4da2
Loading
Loading
Loading
Loading
+81 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.api.controller;

import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import io.swagger.annotations.ApiParam;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import springfox.documentation.annotations.ApiIgnore;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKFLOW_LINEAGE_ERROR;

@RestController
@RequestMapping("lineages/{projectId}")
public class WorkFlowLineageController extends BaseController {
    private static final Logger logger = LoggerFactory.getLogger(WorkFlowLineageController.class);

    @Autowired
    private WorkFlowLineageService workFlowLineageService;

    @GetMapping(value="/list-name")
    @ResponseStatus(HttpStatus.OK)
    public Result<List<WorkFlowLineage>> queryWorkFlowLineageByName(@ApiIgnore @RequestParam(value = "searchVal", required = false) String searchVal, @ApiParam(name = "projectId", value = "PROJECT_ID", required = true) @PathVariable int projectId) {
        try {
            searchVal = ParameterUtils.handleEscapes(searchVal);
            Map<String, Object> result = workFlowLineageService.queryWorkFlowLineageByName(searchVal,projectId);
            return returnDataList(result);
        } catch (Exception e){
            logger.error(QUERY_WORKFLOW_LINEAGE_ERROR.getMsg(),e);
            return error(QUERY_WORKFLOW_LINEAGE_ERROR.getCode(), QUERY_WORKFLOW_LINEAGE_ERROR.getMsg());
        }
    }

    @GetMapping(value="/list-ids")
    @ResponseStatus(HttpStatus.OK)
    public Result<Map<String, Object>> queryWorkFlowLineageByIds(@ApiIgnore @RequestParam(value = "ids", required = false) String ids,@ApiParam(name = "projectId", value = "PROJECT_ID", required = true) @PathVariable int projectId) {

        try {
            ids = ParameterUtils.handleEscapes(ids);
            Set<Integer> idsSet = new HashSet<>();
            if(ids != null) {
                String[] idsStr = ids.split(",");
                for (String id : idsStr)
                {
                    idsSet.add(Integer.parseInt(id));
                }
            }

            Map<String, Object> result = workFlowLineageService.queryWorkFlowLineageByIds(idsSet, projectId);
            return returnDataList(result);
        } catch (Exception e){
            logger.error(QUERY_WORKFLOW_LINEAGE_ERROR.getMsg(),e);
            return error(QUERY_WORKFLOW_LINEAGE_ERROR.getCode(), QUERY_WORKFLOW_LINEAGE_ERROR.getMsg());
        }
    }
}
+7 −7
Original line number Diff line number Diff line
@@ -168,15 +168,15 @@ public enum Status {
    PREVIEW_SCHEDULE_ERROR(10139,"preview schedule error", "预览调度配置错误"),
    PARSE_TO_CRON_EXPRESSION_ERROR(10140,"parse cron to cron expression error", "解析调度表达式错误"),
    SCHEDULE_START_TIME_END_TIME_SAME(10141,"The start time must not be the same as the end", "开始时间不能和结束时间一样"),
    DELETE_TENANT_BY_ID_FAIL(10142,"delete tenant by id fail, for there are {0} process instances in executing using it", "删除租户失败,有[{0}]个运行中的工作流实例正在使用"),
    DELETE_TENANT_BY_ID_FAIL_DEFINES(10143,"delete tenant by id fail, for there are {0} process definitions using it", "删除租户失败,有[{0}]个工作流定义正在使用"),
    DELETE_TENANT_BY_ID_FAIL_USERS(10144,"delete tenant by id fail, for there are {0} users using it", "删除租户失败,有[{0}]个用户正在使用"),
    DELETE_WORKER_GROUP_BY_ID_FAIL(10145,"delete worker group by id fail, for there are {0} process instances in executing using it", "删除Worker分组失败,有[{0}]个运行中的工作流实例正在使用"),
    QUERY_WORKER_GROUP_FAIL(10146,"query worker group fail ", "查询worker分组失败"),
    DELETE_WORKER_GROUP_FAIL(10147,"delete worker group fail ", "删除worker分组失败"),
    DELETE_TENANT_BY_ID_FAIL(100142,"delete tenant by id fail, for there are {0} process instances in executing using it", "删除租户失败,有[{0}]个运行中的工作流实例正在使用"),
    DELETE_TENANT_BY_ID_FAIL_DEFINES(100143,"delete tenant by id fail, for there are {0} process definitions using it", "删除租户失败,有[{0}]个工作流定义正在使用"),
    DELETE_TENANT_BY_ID_FAIL_USERS(100144,"delete tenant by id fail, for there are {0} users using it", "删除租户失败,有[{0}]个用户正在使用"),
    DELETE_WORKER_GROUP_BY_ID_FAIL(100145,"delete worker group by id fail, for there are {0} process instances in executing using it", "删除Worker分组失败,有[{0}]个运行中的工作流实例正在使用"),
    QUERY_WORKER_GROUP_FAIL(100146,"query worker group fail ", "查询worker分组失败"),
    DELETE_WORKER_GROUP_FAIL(100147,"delete worker group fail ", "删除worker分组失败"),
    QUERY_WORKFLOW_LINEAGE_ERROR(10143,"query workflow lineage error", "查询血缘失败"),
    COPY_PROCESS_DEFINITION_ERROR(10148,"copy process definition error", "复制工作流错误"),
    USER_DISABLED(10149,"The current user is disabled", "当前用户已停用"),

    UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
    UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
    RESOURCE_NOT_EXIST(20004, "resource not exist", "资源不存在"),
+97 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.api.service;

import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.*;

@Service
public class WorkFlowLineageService extends BaseService {

    @Autowired
    private WorkFlowLineageMapper workFlowLineageMapper;

    public Map<String, Object> queryWorkFlowLineageByName(String workFlowName, int projectId) {
        Map<String, Object> result = new HashMap<>(5);
        List<WorkFlowLineage> workFlowLineageList = workFlowLineageMapper.queryByName(workFlowName, projectId);
        result.put(Constants.DATA_LIST, workFlowLineageList);
        putMsg(result, Status.SUCCESS);
        return result;
    }

    private List<WorkFlowRelation> getWorkFlowRelationRecursion(Set<Integer> ids, List<WorkFlowRelation> workFlowRelations,Set<Integer> sourceIds) {
        for(int id : ids) {
            sourceIds.addAll(ids);
            List<WorkFlowRelation> workFlowRelationsTmp = workFlowLineageMapper.querySourceTarget(id);
            if(workFlowRelationsTmp != null && !workFlowRelationsTmp.isEmpty()) {
                Set<Integer> idsTmp = new HashSet<>();
                for(WorkFlowRelation workFlowRelation:workFlowRelationsTmp) {
                    if(!sourceIds.contains(workFlowRelation.getTargetWorkFlowId())){
                        idsTmp.add(workFlowRelation.getTargetWorkFlowId());
                    }
                }
                workFlowRelations.addAll(workFlowRelationsTmp);
                getWorkFlowRelationRecursion(idsTmp, workFlowRelations,sourceIds);
            }
        }
        return workFlowRelations;
    }

    public Map<String, Object> queryWorkFlowLineageByIds(Set<Integer> ids,int projectId) {
        Map<String, Object> result = new HashMap<>(5);
        List<WorkFlowLineage> workFlowLineageList = workFlowLineageMapper.queryByIds(ids, projectId);
        Map<String, Object> workFlowLists = new HashMap<>(5);
        Set<Integer> idsV = new HashSet<>();
        if(ids == null || ids.isEmpty()){
            for(WorkFlowLineage workFlowLineage:workFlowLineageList) {
                idsV.add(workFlowLineage.getWorkFlowId());
            }
        } else {
            idsV = ids;
        }
        List<WorkFlowRelation> workFlowRelations = new ArrayList<>();
        Set<Integer> sourceIds = new HashSet<>();
        getWorkFlowRelationRecursion(idsV, workFlowRelations, sourceIds);

        Set<Integer> idSet = new HashSet<>();
        //If the incoming parameter is not empty, you need to add downstream workflow detail attributes
        if(ids != null && !ids.isEmpty()) {
            for(WorkFlowRelation workFlowRelation : workFlowRelations) {
                idSet.add(workFlowRelation.getTargetWorkFlowId());
            }
            for(int id : ids){
                idSet.remove(id);
            }
            if(!idSet.isEmpty()) {
                workFlowLineageList.addAll(workFlowLineageMapper.queryByIds(idSet, projectId));
            }
        }

        workFlowLists.put("workFlowList",workFlowLineageList);
        workFlowLists.put("workFlowRelationList",workFlowRelations);
        result.put(Constants.DATA_LIST, workFlowLists);
        putMsg(result, Status.SUCCESS);
        return result;
    }
}
+69 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.api.controller;

import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;

import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

public class WorkFlowLineageControllerTest extends AbstractControllerTest {
    private static Logger logger = LoggerFactory.getLogger(WorkFlowLineageControllerTest.class);

    @Test
    public void testQueryWorkFlowLineageByName() throws Exception {
        MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
        paramsMap.add("searchVal","test");
        MvcResult mvcResult = mockMvc.perform(get("/lineages/1/list-name")
                .header("sessionId", sessionId)
                .params(paramsMap))
                .andExpect(status().isOk())
                .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
                .andReturn();
        Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
        Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
        logger.info(mvcResult.getResponse().getContentAsString());

    }

    @Test
    public  void testQueryWorkFlowLineageByIds() throws Exception {
        MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
        paramsMap.add("ids","1");
        MvcResult mvcResult = mockMvc.perform(get("/lineages/1/list-ids")
                .header("sessionId", sessionId)
                .params(paramsMap))
                .andExpect(status().isOk())
                .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
                .andReturn();
        Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
        Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
        logger.info(mvcResult.getResponse().getContentAsString());
    }

}
+88 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.api.service;

import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.EncryptionUtils;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.*;

import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class WorkFlowLineageServiceTest {

    @InjectMocks
    private WorkFlowLineageService workFlowLineageService;

    @Mock
    private WorkFlowLineageMapper workFlowLineageMapper;

    @Test
    public void testQueryWorkFlowLineageByName() {
        String searchVal = "test";
        when(workFlowLineageMapper.queryByName(searchVal, 1)).thenReturn(getWorkFlowLineages());
        Map<String, Object> result = workFlowLineageService.queryWorkFlowLineageByName(searchVal,1);
        List<WorkFlowLineage> workFlowLineageList = (List<WorkFlowLineage>)result.get(Constants.DATA_LIST);
        Assert.assertTrue(workFlowLineageList.size()>0);
    }

    @Test
    public void testQueryWorkFlowLineageByIds() {

        Set<Integer> ids = new HashSet<>();
        ids.add(1);
        ids.add(2);

        when(workFlowLineageMapper.queryByIds(ids, 1)).thenReturn(getWorkFlowLineages());
        when(workFlowLineageMapper.querySourceTarget(1)).thenReturn(getWorkFlowRelation());
        Map<String, Object> result = workFlowLineageService.queryWorkFlowLineageByIds(ids,1);
        Map<String, Object> workFlowLists = (Map<String, Object>)result.get(Constants.DATA_LIST);
        List<WorkFlowLineage> workFlowLineages = (List<WorkFlowLineage>)workFlowLists.get("workFlowList");
        List<WorkFlowRelation> workFlowRelations = (List<WorkFlowRelation>)workFlowLists.get("workFlowRelationList");
        Assert.assertTrue(workFlowLineages.size()>0);
        Assert.assertTrue(workFlowRelations.size()>0);
    }

    private List<WorkFlowLineage> getWorkFlowLineages() {
        List<WorkFlowLineage> workFlowLineages = new ArrayList<>();
        WorkFlowLineage workFlowLineage = new WorkFlowLineage();
        workFlowLineage.setWorkFlowId(1);
        workFlowLineage.setWorkFlowName("testdag");
        workFlowLineages.add(workFlowLineage);
        return workFlowLineages;
    }

    private List<WorkFlowRelation> getWorkFlowRelation(){
        List<WorkFlowRelation> workFlowRelations = new ArrayList<>();
        WorkFlowRelation workFlowRelation = new WorkFlowRelation();
        workFlowRelation.setSourceWorkFlowId(1);
        workFlowRelation.setTargetWorkFlowId(2);
        workFlowRelations.add(workFlowRelation);
        return workFlowRelations;
    }

}
Loading