Commit 7dfa9976 authored by Yelli's avatar Yelli Committed by qiaozhanwei
Browse files

refactor export process (#1772)

* refactor process export

* comment & null check for export process

* add License

* add javadoc

* add DataSourceParamTest and DependentParamTest

* refactor process export & add exportProcessMetaDataStr UT
parent b6d6bb64
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -256,9 +256,9 @@ public class ProjectController extends BaseController {

    /**
     * import process definition
     *
     * @param loginUser login user
     * @param file resource file
     * @param projectName project name
     * @return import result code
     */
    @ApiOperation(value = "importProcessDefinition", notes= "EXPORT_PROCCESS_DEFINITION_NOTES")
+247 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.api.dto;

/**
 * ProcessMeta
 */
public class ProcessMeta {

    /**
     * project name
     */
    private String projectName;

    /**
     * process definition name
     */
    private String processDefinitionName;

    /**
     * processs definition json
     */
    private String processDefinitionJson;

    /**
     * process definition desc
     */
    private String processDefinitionDescription;

    /**
     * process definition locations
     */
    private String processDefinitionLocations;

    /**
     * process definition connects
     */
    private String processDefinitionConnects;

    /**
     * warning type
     */
    private String scheduleWarningType;

    /**
     * warning group id
     */
    private int scheduleWarningGroupId;

    /**
     * warning group name
     */
    private String scheduleWarningGroupName;

    /**
     * start time
     */
    private String scheduleStartTime;

    /**
     * end time
     */
    private String scheduleEndTime;

    /**
     * crontab
     */
    private String scheduleCrontab;

    /**
     * failure strategy
     */
    private String scheduleFailureStrategy;

    /**
     * release state
     */
    private String scheduleReleaseState;

    /**
     * process instance priority
     */
    private String scheduleProcessInstancePriority;

    /**
     * worker group id
     */
    private int scheduleWorkerGroupId;

    /**
     * worker group name
     */
    private String scheduleWorkerGroupName;

    public ProcessMeta() {
    }

    public String getProjectName() {
        return projectName;
    }

    public void setProjectName(String projectName) {
        this.projectName = projectName;
    }

    public String getProcessDefinitionName() {
        return processDefinitionName;
    }

    public void setProcessDefinitionName(String processDefinitionName) {
        this.processDefinitionName = processDefinitionName;
    }

    public String getProcessDefinitionJson() {
        return processDefinitionJson;
    }

    public void setProcessDefinitionJson(String processDefinitionJson) {
        this.processDefinitionJson = processDefinitionJson;
    }

    public String getProcessDefinitionDescription() {
        return processDefinitionDescription;
    }

    public void setProcessDefinitionDescription(String processDefinitionDescription) {
        this.processDefinitionDescription = processDefinitionDescription;
    }

    public String getProcessDefinitionLocations() {
        return processDefinitionLocations;
    }

    public void setProcessDefinitionLocations(String processDefinitionLocations) {
        this.processDefinitionLocations = processDefinitionLocations;
    }

    public String getProcessDefinitionConnects() {
        return processDefinitionConnects;
    }

    public void setProcessDefinitionConnects(String processDefinitionConnects) {
        this.processDefinitionConnects = processDefinitionConnects;
    }

    public String getScheduleWarningType() {
        return scheduleWarningType;
    }

    public void setScheduleWarningType(String scheduleWarningType) {
        this.scheduleWarningType = scheduleWarningType;
    }

    public int getScheduleWarningGroupId() {
        return scheduleWarningGroupId;
    }

    public void setScheduleWarningGroupId(int scheduleWarningGroupId) {
        this.scheduleWarningGroupId = scheduleWarningGroupId;
    }

    public String getScheduleWarningGroupName() {
        return scheduleWarningGroupName;
    }

    public void setScheduleWarningGroupName(String scheduleWarningGroupName) {
        this.scheduleWarningGroupName = scheduleWarningGroupName;
    }

    public String getScheduleStartTime() {
        return scheduleStartTime;
    }

    public void setScheduleStartTime(String scheduleStartTime) {
        this.scheduleStartTime = scheduleStartTime;
    }

    public String getScheduleEndTime() {
        return scheduleEndTime;
    }

    public void setScheduleEndTime(String scheduleEndTime) {
        this.scheduleEndTime = scheduleEndTime;
    }

    public String getScheduleCrontab() {
        return scheduleCrontab;
    }

    public void setScheduleCrontab(String scheduleCrontab) {
        this.scheduleCrontab = scheduleCrontab;
    }

    public String getScheduleFailureStrategy() {
        return scheduleFailureStrategy;
    }

    public void setScheduleFailureStrategy(String scheduleFailureStrategy) {
        this.scheduleFailureStrategy = scheduleFailureStrategy;
    }

    public String getScheduleReleaseState() {
        return scheduleReleaseState;
    }

    public void setScheduleReleaseState(String scheduleReleaseState) {
        this.scheduleReleaseState = scheduleReleaseState;
    }

    public String getScheduleProcessInstancePriority() {
        return scheduleProcessInstancePriority;
    }

    public void setScheduleProcessInstancePriority(String scheduleProcessInstancePriority) {
        this.scheduleProcessInstancePriority = scheduleProcessInstancePriority;
    }

    public int getScheduleWorkerGroupId() {
        return scheduleWorkerGroupId;
    }

    public void setScheduleWorkerGroupId(int scheduleWorkerGroupId) {
        this.scheduleWorkerGroupId = scheduleWorkerGroupId;
    }

    public String getScheduleWorkerGroupName() {
        return scheduleWorkerGroupName;
    }

    public void setScheduleWorkerGroupName(String scheduleWorkerGroupName) {
        this.scheduleWorkerGroupName = scheduleWorkerGroupName;
    }
}
+60 −69
Original line number Diff line number Diff line
@@ -22,11 +22,14 @@ import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.dolphinscheduler.api.dto.ProcessMeta;
import org.apache.dolphinscheduler.api.dto.treeview.Instance;
import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.exportprocess.TaskNodeParamFactory;
import org.apache.dolphinscheduler.api.utils.exportprocess.exportProcessAddTaskParam;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.graph.DAG;
@@ -496,43 +499,7 @@ public class ProcessDefinitionService extends BaseDAGService {
            ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId);

            if (null != processDefinition) {
                //correct task param which has data source or dependent param
                String correctProcessDefinitionJson = addTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson());
                processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson);

                Map<String, Object> row = new LinkedHashMap<>();
                row.put("projectName", processDefinition.getProjectName());
                row.put("processDefinitionName", processDefinition.getName());
                row.put("processDefinitionJson", processDefinition.getProcessDefinitionJson());
                row.put("processDefinitionDescription", processDefinition.getDescription());
                row.put("processDefinitionLocations", processDefinition.getLocations());
                row.put("processDefinitionConnects", processDefinition.getConnects());

                //schedule info
                List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
                if (!schedules.isEmpty()) {
                    Schedule schedule = schedules.get(0);
                    row.put("scheduleWarningType", schedule.getWarningType());
                    row.put("scheduleWarningGroupId", schedule.getWarningGroupId());
                    row.put("scheduleStartTime", DateUtils.dateToString(schedule.getStartTime()));
                    row.put("scheduleEndTime", DateUtils.dateToString(schedule.getEndTime()));
                    row.put("scheduleCrontab", schedule.getCrontab());
                    row.put("scheduleFailureStrategy", schedule.getFailureStrategy());
                    row.put("scheduleReleaseState", ReleaseState.OFFLINE);
                    row.put("scheduleProcessInstancePriority", schedule.getProcessInstancePriority());
                    if(schedule.getId() == -1){
                        row.put("scheduleWorkerGroupId", -1);
                    }else{
                        WorkerGroup workerGroup = workerGroupMapper.selectById(schedule.getWorkerGroupId());
                        if(workerGroup != null){
                            row.put("scheduleWorkerGroupName", workerGroup.getName());
                        }
                    }

                }

                //create workflow json file
                String rowsJson = JSONUtils.toJsonString(row);
                String exportProcessJson = exportProcessMetaDataStr(processDefinitionId, processDefinition);
                response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE);
                response.setHeader("Content-Disposition", "attachment;filename="+processDefinition.getName()+".json");
                BufferedOutputStream buff = null;
@@ -540,7 +507,7 @@ public class ProcessDefinitionService extends BaseDAGService {
                try {
                    out = response.getOutputStream();
                    buff = new BufferedOutputStream(out);
                    buff.write(rowsJson.getBytes(StandardCharsets.UTF_8));
                    buff.write(exportProcessJson.getBytes(StandardCharsets.UTF_8));
                    buff.flush();
                    buff.close();
                } catch (IOException e) {
@@ -560,10 +527,58 @@ public class ProcessDefinitionService extends BaseDAGService {
                            logger.warn("export process output stream not close", e);
                        }
                    }
                }
            }
        }
    }

    /**
     * get export process metadata string
     * @param processDefinitionId process definition id
     * @param processDefinition process definition
     * @return export process metadata string
     */
    public String exportProcessMetaDataStr(Integer processDefinitionId, ProcessDefinition processDefinition) {
        //correct task param which has data source or dependent param
        String correctProcessDefinitionJson = addTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson());
        processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson);

        //export process metadata
        ProcessMeta exportProcessMeta = new ProcessMeta();
        exportProcessMeta.setProjectName(processDefinition.getProjectName());
        exportProcessMeta.setProcessDefinitionName(processDefinition.getName());
        exportProcessMeta.setProcessDefinitionJson(processDefinition.getProcessDefinitionJson());
        exportProcessMeta.setProcessDefinitionLocations(processDefinition.getLocations());
        exportProcessMeta.setProcessDefinitionConnects(processDefinition.getConnects());

        //schedule info
        List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
        if (!schedules.isEmpty()) {
            Schedule schedule = schedules.get(0);
            WorkerGroup workerGroup = workerGroupMapper.selectById(schedule.getWorkerGroupId());

            if (null == workerGroup && schedule.getWorkerGroupId() == -1) {
                workerGroup = new WorkerGroup();
                workerGroup.setId(-1);
                workerGroup.setName("");
            }

            exportProcessMeta.setScheduleWarningType(schedule.getWarningType().toString());
            exportProcessMeta.setScheduleWarningGroupId(schedule.getWarningGroupId());
            exportProcessMeta.setScheduleStartTime(DateUtils.dateToString(schedule.getStartTime()));
            exportProcessMeta.setScheduleEndTime(DateUtils.dateToString(schedule.getEndTime()));
            exportProcessMeta.setScheduleCrontab(schedule.getCrontab());
            exportProcessMeta.setScheduleFailureStrategy(String.valueOf(schedule.getFailureStrategy()));
            exportProcessMeta.setScheduleReleaseState(String.valueOf(ReleaseState.OFFLINE));
            exportProcessMeta.setScheduleProcessInstancePriority(String.valueOf(schedule.getProcessInstancePriority()));

            if (null != workerGroup) {
                exportProcessMeta.setScheduleWorkerGroupId(workerGroup.getId());
                exportProcessMeta.setScheduleWorkerGroupName(workerGroup.getName());
            }
        }
        //create workflow json file
        return JSONUtils.toJsonString(exportProcessMeta);
    }

    /**
@@ -580,35 +595,9 @@ public class ProcessDefinitionService extends BaseDAGService {
            if (StringUtils.isNotEmpty(taskNode.getString("type"))) {
                String taskType = taskNode.getString("type");

                if(checkTaskHasDataSource(taskType)){
                    // add sqlParameters
                    JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params"));
                    DataSource dataSource = dataSourceMapper.selectById((Integer) sqlParameters.get("datasource"));
                    if (null != dataSource) {
                        sqlParameters.put("datasourceName", dataSource.getName());
                    }
                    taskNode.put("params", sqlParameters);
                }else if(checkTaskHasDependent(taskType)){
                    // add dependent param
                    JSONObject dependentParameters =  JSONUtils.parseObject(taskNode.getString("dependence"));

                    if(null != dependentParameters){
                        JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList");
                        for (int j = 0; j < dependTaskList.size(); j++) {
                            JSONObject dependentTaskModel = dependTaskList.getJSONObject(j);
                            JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList");
                            for (int k = 0; k < dependItemList.size(); k++) {
                                JSONObject dependentItem = dependItemList.getJSONObject(k);
                                int definitionId = dependentItem.getInteger("definitionId");
                                ProcessDefinition definition = processDefineMapper.queryByDefineId(definitionId);
                                if(null != definition){
                                    dependentItem.put("projectName",definition.getProjectName());
                                    dependentItem.put("definitionName",definition.getName());
                                }
                            }
                        }
                        taskNode.put("dependence", dependentParameters);
                    }
                exportProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
                if (null != addTaskParam) {
                    addTaskParam.addSpecialParam(taskNode);
                }
            }
        }
@@ -648,7 +637,7 @@ public class ProcessDefinitionService extends BaseDAGService {
     * @param loginUser login user
     * @param file process metadata json file
     * @param currentProjectName current project name
     * @return
     * @return import process
     */
    @Transactional(rollbackFor = Exception.class)
    public Map<String, Object> importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) {
@@ -860,6 +849,8 @@ public class ProcessDefinitionService extends BaseDAGService {
     * recursion create sub process
     * @param loginUser login user
     * @param targetProject target project
     * @param jsonArray process task array
     * @param subProcessIdMap correct sub process id map
     */
    public void importSubProcess(User loginUser, Project targetProject, JSONArray jsonArray, Map<Integer, Integer> subProcessIdMap) {
        for (int i = 0; i < jsonArray.size(); i++) {
+64 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.api.utils.exportprocess;

import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * task node add datasource param strategy
 */
@Service
public class DataSourceParam implements exportProcessAddTaskParam, InitializingBean {

    @Autowired
    private DataSourceMapper dataSourceMapper;

    /**
     * add datasource params
     * @param taskNode task node json object
     * @return task node json object
     */
    @Override
    public JSONObject addSpecialParam(JSONObject taskNode) {
        // add sqlParameters
        JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params"));
        DataSource dataSource = dataSourceMapper.selectById((Integer) sqlParameters.get("datasource"));
        if (null != dataSource) {
            sqlParameters.put("datasourceName", dataSource.getName());
        }
        taskNode.put("params", sqlParameters);

        return taskNode;
    }


    /**
     * put datasource strategy
     */
    @Override
    public void afterPropertiesSet() {
        TaskNodeParamFactory.register(TaskType.SQL.name(), this);
        TaskNodeParamFactory.register(TaskType.PROCEDURE.name(), this);
    }
}
 No newline at end of file
+77 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.api.utils.exportprocess;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * task node add dependent param strategy
 */
@Service
public class DependentParam implements exportProcessAddTaskParam, InitializingBean {


    @Autowired
    ProcessDefinitionMapper processDefineMapper;

    /**
     * add dependent param
     * @param taskNode task node json object
     * @return task node json object
     */
    @Override
    public JSONObject addSpecialParam(JSONObject taskNode) {
        // add dependent param
        JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence"));

        if (null != dependentParameters) {
            JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList");
            for (int j = 0; j < dependTaskList.size(); j++) {
                JSONObject dependentTaskModel = dependTaskList.getJSONObject(j);
                JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList");
                for (int k = 0; k < dependItemList.size(); k++) {
                    JSONObject dependentItem = dependItemList.getJSONObject(k);
                    int definitionId = dependentItem.getInteger("definitionId");
                    ProcessDefinition definition = processDefineMapper.queryByDefineId(definitionId);
                    if (null != definition) {
                        dependentItem.put("projectName", definition.getProjectName());
                        dependentItem.put("definitionName", definition.getName());
                    }
                }
            }
            taskNode.put("dependence", dependentParameters);
        }

        return taskNode;
    }

    /**
     * put dependent strategy
     */
    @Override
    public void afterPropertiesSet() {
        TaskNodeParamFactory.register(TaskType.DEPENDENT.name(), this);
    }
}
Loading