Loading dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/JSONUtilsTest.java +2 −0 Original line number Diff line number Diff line Loading @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.alert.utils; import com.fasterxml.jackson.databind.JsonNode; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; Loading Loading @@ -109,4 +110,5 @@ public class JSONUtilsTest { } } dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +212 −193 Original line number Diff line number Diff line Loading @@ -17,11 +17,12 @@ package org.apache.dolphinscheduler.api.service; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.api.dto.ProcessMeta; import org.apache.dolphinscheduler.api.dto.treeview.Instance; import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto; Loading Loading @@ -73,6 +74,11 @@ public class ProcessDefinitionService extends BaseDAGService { private static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionService.class); private static final String PROCESSDEFINITIONID = "processDefinitionId"; private static final String RELEASESTATE = "releaseState"; private static final String TASKS = "tasks"; @Autowired private ProjectMapper projectMapper; Loading Loading @@ -169,6 +175,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * get resource ids * * @param processData process data * @return resource ids */ Loading Loading @@ -513,7 +520,7 @@ public class ProcessDefinitionService extends BaseDAGService { // check state if (null == state) { putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "releaseState"); putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); return result; } Loading @@ -525,12 +532,12 @@ public class ProcessDefinitionService extends BaseDAGService { String resourceIds = processDefinition.getResourceIds(); if (StringUtils.isNotBlank(resourceIds)) { Integer[] resourceIdArray = Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new); PermissionCheck<Integer> permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_ID,processService,resourceIdArray,loginUser.getId(),logger); PermissionCheck<Integer> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE_ID, processService, resourceIdArray, loginUser.getId(), logger); try { permissionCheck.checkPermission(); } catch (Exception e) { logger.error(e.getMessage(), e); putMsg(result, Status.RESOURCE_NOT_EXIST_OR_NO_PERMISSION, "releaseState"); putMsg(result, Status.RESOURCE_NOT_EXIST_OR_NO_PERMISSION, RELEASESTATE); return result; } } Loading @@ -554,7 +561,7 @@ public class ProcessDefinitionService extends BaseDAGService { } break; default: putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "releaseState"); putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); return result; } Loading @@ -564,6 +571,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * batch export process definition by ids * * @param loginUser * @param projectName * @param processDefinitionIds Loading Loading @@ -596,6 +604,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * get process definition list by ids * * @param processDefinitionIds * @return */ Loading @@ -616,6 +625,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * download the process definition file * * @param response * @param processDefinitionList */ Loading Loading @@ -651,6 +661,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * get export process metadata string * * @param processDefinitionId process definition id * @param processDefinition process definition * @return export process metadata string Loading @@ -662,6 +673,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * get export process metadata string * * @param processDefinitionId process definition id * @param processDefinition process definition * @return export process metadata string Loading Loading @@ -699,17 +711,18 @@ public class ProcessDefinitionService extends BaseDAGService { /** * correct task param which has datasource or dependent * * @param processDefinitionJson processDefinitionJson * @return correct processDefinitionJson */ public String addExportTaskNodeSpecialParam(String processDefinitionJson) { JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson); JSONArray jsonArray = (JSONArray) jsonObject.get("tasks"); ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson); ArrayNode jsonArray = (ArrayNode) jsonObject.path(TASKS); for (int i = 0; i < jsonArray.size(); i++) { JSONObject taskNode = jsonArray.getJSONObject(i); if (StringUtils.isNotEmpty(taskNode.getString("type"))) { String taskType = taskNode.getString("type"); JsonNode taskNode = jsonArray.path(i); if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { String taskType = taskNode.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); if (null != addTaskParam) { Loading @@ -717,12 +730,13 @@ public class ProcessDefinitionService extends BaseDAGService { } } } jsonObject.put("tasks", jsonArray); jsonObject.set(TASKS, jsonArray); return jsonObject.toString(); } /** * check task if has sub process * * @param taskType task type * @return if task has sub process return true else false */ Loading @@ -732,6 +746,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * import process definition * * @param loginUser login user * @param file process metadata json file * @param currentProjectName current project name Loading Loading @@ -761,6 +776,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * check and import process definition * * @param loginUser * @param currentProjectName * @param result Loading Loading @@ -807,8 +823,8 @@ public class ProcessDefinitionService extends BaseDAGService { //create process definition Integer processDefinitionId = Objects.isNull(createProcessResult.get("processDefinitionId"))? null:Integer.parseInt(createProcessResult.get("processDefinitionId").toString()); Objects.isNull(createProcessResult.get(PROCESSDEFINITIONID)) ? null : Integer.parseInt(createProcessResult.get(PROCESSDEFINITIONID).toString()); //scheduler param return getImportProcessScheduleResult(loginUser, Loading @@ -822,6 +838,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * get create process result * * @param loginUser * @param currentProjectName * @param result Loading Loading @@ -856,6 +873,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * get import process schedule result * * @param loginUser * @param currentProjectName * @param result Loading Loading @@ -887,6 +905,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * check importance params * * @param processMeta * @param result * @return Loading @@ -910,18 +929,19 @@ public class ProcessDefinitionService extends BaseDAGService { /** * import process add special task param * * @param loginUser login user * @param processDefinitionJson process definition json * @param targetProject target project * @return import process param */ private String addImportTaskNodeParam(User loginUser, String processDefinitionJson, Project targetProject) { JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson); JSONArray jsonArray = (JSONArray) jsonObject.get("tasks"); ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson); ArrayNode jsonArray = (ArrayNode) jsonObject.get(TASKS); //add sql and dependent param for (int i = 0; i < jsonArray.size(); i++) { JSONObject taskNode = jsonArray.getJSONObject(i); String taskType = taskNode.getString("type"); JsonNode taskNode = jsonArray.path(i); String taskType = taskNode.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); if (null != addTaskParam) { addTaskParam.addImportSpecialParam(taskNode); Loading @@ -931,20 +951,21 @@ public class ProcessDefinitionService extends BaseDAGService { //recursive sub-process parameter correction map key for old process id value for new process id Map<Integer, Integer> subProcessIdMap = new HashMap<>(20); List<Object> subProcessList = jsonArray.stream() .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).getString("type"))) List<Object> subProcessList = StreamUtils.asStream(jsonArray.elements()) .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).path("type").asText())) .collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(subProcessList)) { importSubProcess(loginUser, targetProject, jsonArray, subProcessIdMap); } jsonObject.put("tasks", jsonArray); jsonObject.set(TASKS, jsonArray); return jsonObject.toString(); } /** * import process schedule * * @param loginUser login user * @param currentProjectName current project name * @param processMeta process meta data Loading Loading @@ -998,32 +1019,37 @@ public class ProcessDefinitionService extends BaseDAGService { /** * check import process has sub process * recursion create sub process * * @param loginUser login user * @param targetProject target project * @param jsonArray process task array * @param subProcessIdMap correct sub process id map */ public void importSubProcess(User loginUser, Project targetProject, JSONArray jsonArray, Map<Integer, Integer> subProcessIdMap) { public void importSubProcess(User loginUser, Project targetProject, ArrayNode jsonArray, Map<Integer, Integer> subProcessIdMap) { for (int i = 0; i < jsonArray.size(); i++) { JSONObject taskNode = jsonArray.getJSONObject(i); String taskType = taskNode.getString("type"); ObjectNode taskNode = (ObjectNode) jsonArray.path(i); String taskType = taskNode.path("type").asText(); if (checkTaskHasSubProcess(taskType)) { if (!checkTaskHasSubProcess(taskType)) { continue; } //get sub process info JSONObject subParams = JSONUtils.parseObject(taskNode.getString("params")); Integer subProcessId = subParams.getInteger("processDefinitionId"); ObjectNode subParams = (ObjectNode) taskNode.path("params"); Integer subProcessId = subParams.path(PROCESSDEFINITIONID).asInt(); ProcessDefinition subProcess = processDefineMapper.queryByDefineId(subProcessId); //check is sub process exist in db if (null != subProcess) { if (null == subProcess) { continue; } String subProcessJson = subProcess.getProcessDefinitionJson(); //check current project has sub process ProcessDefinition currentProjectSubProcess = processDefineMapper.queryByDefineName(targetProject.getId(), subProcess.getName()); if (null == currentProjectSubProcess) { JSONArray subJsonArray = (JSONArray) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get("tasks"); ArrayNode subJsonArray = (ArrayNode) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get(TASKS); List<Object> subProcessList = subJsonArray.stream() .filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).getString("type"))) List<Object> subProcessList = StreamUtils.asStream(subJsonArray.elements()) .filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).path("type").asText())) .collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(subProcessList)) { Loading Loading @@ -1071,10 +1097,8 @@ public class ProcessDefinitionService extends BaseDAGService { if (null != newSubProcessDefine) { subProcessIdMap.put(subProcessId, newSubProcessDefine.getId()); subParams.put("processDefinitionId", newSubProcessDefine.getId()); taskNode.put("params", subParams); } } subParams.put(PROCESSDEFINITIONID, newSubProcessDefine.getId()); taskNode.set("params", subParams); } } } Loading Loading @@ -1138,9 +1162,8 @@ public class ProcessDefinitionService extends BaseDAGService { * * @param defineId define id * @return task node list * @throws Exception exception */ public Map<String, Object> getTaskNodeListByDefinitionId(Integer defineId) throws Exception { public Map<String, Object> getTaskNodeListByDefinitionId(Integer defineId) { Map<String, Object> result = new HashMap<>(); ProcessDefinition processDefinition = processDefineMapper.selectById(defineId); Loading Loading @@ -1176,9 +1199,8 @@ public class ProcessDefinitionService extends BaseDAGService { * * @param defineIdList define id list * @return task node list * @throws Exception exception */ public Map<String, Object> getTaskNodeListByDefinitionIdList(String defineIdList) throws Exception { public Map<String, Object> getTaskNodeListByDefinitionIdList(String defineIdList) { Map<String, Object> result = new HashMap<>(); Map<Integer, List<TaskNode>> taskNodeMap = new HashMap<>(); Loading Loading @@ -1364,9 +1386,8 @@ public class ProcessDefinitionService extends BaseDAGService { * * @param processDefinition process definition * @return dag graph * @throws Exception if exception happens */ private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) throws Exception { private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) { String processDefinitionJson = processDefinition.getProcessDefinitionJson(); Loading @@ -1386,8 +1407,6 @@ public class ProcessDefinitionService extends BaseDAGService { } /** * whether the graph has a ring * Loading dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +2 −3 Original line number Diff line number Diff line Loading @@ -504,9 +504,8 @@ public class ProcessInstanceService extends BaseDAGService { * * @param processInstanceId process instance id * @return variables data * @throws Exception exception */ public Map<String, Object> viewVariables( Integer processInstanceId) throws Exception { public Map<String, Object> viewVariables(Integer processInstanceId) { Map<String, Object> result = new HashMap<>(5); ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId); Loading Loading @@ -537,7 +536,7 @@ public class ProcessInstanceService extends BaseDAGService { List<TaskNode> taskNodeList = workflowData.getTasks(); // global param string String globalParamStr = JSON.toJSONString(globalParams); String globalParamStr = JSONUtils.toJson(globalParams); globalParamStr = ParameterUtils.convertParameterPlaceholders(globalParamStr, timeParams); globalParams = JSON.parseArray(globalParamStr, Property.class); for (Property property : globalParams) { Loading dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java +11 −10 Original line number Diff line number Diff line Loading @@ -16,9 +16,9 @@ */ package org.apache.dolphinscheduler.api.utils.exportprocess; import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; import org.springframework.beans.factory.InitializingBean; Loading @@ -33,6 +33,7 @@ import java.util.List; @Service public class DataSourceParam implements ProcessAddTaskParam, InitializingBean { private static final String PARAMS = "params"; @Autowired private DataSourceMapper dataSourceMapper; Loading @@ -42,14 +43,14 @@ public class DataSourceParam implements ProcessAddTaskParam, InitializingBean { * @return task node json object */ @Override public JSONObject addExportSpecialParam(JSONObject taskNode) { public JsonNode addExportSpecialParam(JsonNode taskNode) { // add sqlParameters JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params")); DataSource dataSource = dataSourceMapper.selectById((Integer) sqlParameters.get("datasource")); ObjectNode sqlParameters = (ObjectNode) taskNode.path(PARAMS); DataSource dataSource = dataSourceMapper.selectById(sqlParameters.get("datasource").asInt()); if (null != dataSource) { sqlParameters.put("datasourceName", dataSource.getName()); } taskNode.put("params", sqlParameters); ((ObjectNode)taskNode).set(PARAMS, sqlParameters); return taskNode; } Loading @@ -60,14 +61,14 @@ public class DataSourceParam implements ProcessAddTaskParam, InitializingBean { * @return task node json object */ @Override public JSONObject addImportSpecialParam(JSONObject taskNode) { JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params")); List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName")); public JsonNode addImportSpecialParam(JsonNode taskNode) { ObjectNode sqlParameters = (ObjectNode) taskNode.path(PARAMS); List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.path("datasourceName").asText()); if (!dataSources.isEmpty()) { DataSource dataSource = dataSources.get(0); sqlParameters.put("datasource", dataSource.getId()); } taskNode.put("params", sqlParameters); ((ObjectNode)taskNode).set(PARAMS, sqlParameters); return taskNode; } Loading dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java +21 −19 Original line number Diff line number Diff line Loading @@ -16,8 +16,9 @@ */ package org.apache.dolphinscheduler.api.utils.exportprocess; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; Loading @@ -34,6 +35,7 @@ import org.springframework.stereotype.Service; @Service public class DependentParam implements ProcessAddTaskParam, InitializingBean { private static final String DEPENDENCE = "dependence"; @Autowired ProcessDefinitionMapper processDefineMapper; Loading @@ -47,18 +49,18 @@ public class DependentParam implements ProcessAddTaskParam, InitializingBean { * @return task node json object */ @Override public JSONObject addExportSpecialParam(JSONObject taskNode) { public JsonNode addExportSpecialParam(JsonNode taskNode) { // add dependent param JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence")); ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.path(DEPENDENCE).asText()); if (null != dependentParameters) { JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList"); ArrayNode dependTaskList = (ArrayNode) dependentParameters.get("dependTaskList"); for (int j = 0; j < dependTaskList.size(); j++) { JSONObject dependentTaskModel = dependTaskList.getJSONObject(j); JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList"); JsonNode dependentTaskModel = dependTaskList.path(j); ArrayNode dependItemList = (ArrayNode) dependentTaskModel.get("dependItemList"); for (int k = 0; k < dependItemList.size(); k++) { JSONObject dependentItem = dependItemList.getJSONObject(k); int definitionId = dependentItem.getInteger("definitionId"); ObjectNode dependentItem = (ObjectNode) dependItemList.path(k); int definitionId = dependentItem.path("definitionId").asInt(); ProcessDefinition definition = processDefineMapper.queryByDefineId(definitionId); if (null != definition) { dependentItem.put("projectName", definition.getProjectName()); Loading @@ -66,7 +68,7 @@ public class DependentParam implements ProcessAddTaskParam, InitializingBean { } } } taskNode.put("dependence", dependentParameters); ((ObjectNode)taskNode).set(DEPENDENCE, dependentParameters); } return taskNode; Loading @@ -78,18 +80,18 @@ public class DependentParam implements ProcessAddTaskParam, InitializingBean { * @return */ @Override public JSONObject addImportSpecialParam(JSONObject taskNode) { JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence")); public JsonNode addImportSpecialParam(JsonNode taskNode) { ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.path(DEPENDENCE).asText()); if(dependentParameters != null){ JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList"); ArrayNode dependTaskList = (ArrayNode) dependentParameters.path("dependTaskList"); for (int h = 0; h < dependTaskList.size(); h++) { JSONObject dependentTaskModel = dependTaskList.getJSONObject(h); JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList"); ObjectNode dependentTaskModel = (ObjectNode) dependTaskList.path(h); ArrayNode dependItemList = (ArrayNode) dependentTaskModel.get("dependItemList"); for (int k = 0; k < dependItemList.size(); k++) { JSONObject dependentItem = dependItemList.getJSONObject(k); Project dependentItemProject = projectMapper.queryByName(dependentItem.getString("projectName")); ObjectNode dependentItem = (ObjectNode) dependItemList.path(k); Project dependentItemProject = projectMapper.queryByName(dependentItem.path("projectName").asText()); if(dependentItemProject != null){ ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getId(),dependentItem.getString("definitionName")); ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getId(),dependentItem.path("definitionName").asText()); if(definition != null){ dependentItem.put("projectId",dependentItemProject.getId()); dependentItem.put("definitionId",definition.getId()); Loading @@ -97,7 +99,7 @@ public class DependentParam implements ProcessAddTaskParam, InitializingBean { } } } taskNode.put("dependence", dependentParameters); ((ObjectNode)taskNode).set(DEPENDENCE, dependentParameters); } return taskNode; } Loading Loading
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/JSONUtilsTest.java +2 −0 Original line number Diff line number Diff line Loading @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.alert.utils; import com.fasterxml.jackson.databind.JsonNode; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; Loading Loading @@ -109,4 +110,5 @@ public class JSONUtilsTest { } }
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +212 −193 Original line number Diff line number Diff line Loading @@ -17,11 +17,12 @@ package org.apache.dolphinscheduler.api.service; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.api.dto.ProcessMeta; import org.apache.dolphinscheduler.api.dto.treeview.Instance; import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto; Loading Loading @@ -73,6 +74,11 @@ public class ProcessDefinitionService extends BaseDAGService { private static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionService.class); private static final String PROCESSDEFINITIONID = "processDefinitionId"; private static final String RELEASESTATE = "releaseState"; private static final String TASKS = "tasks"; @Autowired private ProjectMapper projectMapper; Loading Loading @@ -169,6 +175,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * get resource ids * * @param processData process data * @return resource ids */ Loading Loading @@ -513,7 +520,7 @@ public class ProcessDefinitionService extends BaseDAGService { // check state if (null == state) { putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "releaseState"); putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); return result; } Loading @@ -525,12 +532,12 @@ public class ProcessDefinitionService extends BaseDAGService { String resourceIds = processDefinition.getResourceIds(); if (StringUtils.isNotBlank(resourceIds)) { Integer[] resourceIdArray = Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new); PermissionCheck<Integer> permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_ID,processService,resourceIdArray,loginUser.getId(),logger); PermissionCheck<Integer> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE_ID, processService, resourceIdArray, loginUser.getId(), logger); try { permissionCheck.checkPermission(); } catch (Exception e) { logger.error(e.getMessage(), e); putMsg(result, Status.RESOURCE_NOT_EXIST_OR_NO_PERMISSION, "releaseState"); putMsg(result, Status.RESOURCE_NOT_EXIST_OR_NO_PERMISSION, RELEASESTATE); return result; } } Loading @@ -554,7 +561,7 @@ public class ProcessDefinitionService extends BaseDAGService { } break; default: putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "releaseState"); putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); return result; } Loading @@ -564,6 +571,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * batch export process definition by ids * * @param loginUser * @param projectName * @param processDefinitionIds Loading Loading @@ -596,6 +604,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * get process definition list by ids * * @param processDefinitionIds * @return */ Loading @@ -616,6 +625,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * download the process definition file * * @param response * @param processDefinitionList */ Loading Loading @@ -651,6 +661,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * get export process metadata string * * @param processDefinitionId process definition id * @param processDefinition process definition * @return export process metadata string Loading @@ -662,6 +673,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * get export process metadata string * * @param processDefinitionId process definition id * @param processDefinition process definition * @return export process metadata string Loading Loading @@ -699,17 +711,18 @@ public class ProcessDefinitionService extends BaseDAGService { /** * correct task param which has datasource or dependent * * @param processDefinitionJson processDefinitionJson * @return correct processDefinitionJson */ public String addExportTaskNodeSpecialParam(String processDefinitionJson) { JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson); JSONArray jsonArray = (JSONArray) jsonObject.get("tasks"); ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson); ArrayNode jsonArray = (ArrayNode) jsonObject.path(TASKS); for (int i = 0; i < jsonArray.size(); i++) { JSONObject taskNode = jsonArray.getJSONObject(i); if (StringUtils.isNotEmpty(taskNode.getString("type"))) { String taskType = taskNode.getString("type"); JsonNode taskNode = jsonArray.path(i); if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { String taskType = taskNode.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); if (null != addTaskParam) { Loading @@ -717,12 +730,13 @@ public class ProcessDefinitionService extends BaseDAGService { } } } jsonObject.put("tasks", jsonArray); jsonObject.set(TASKS, jsonArray); return jsonObject.toString(); } /** * check task if has sub process * * @param taskType task type * @return if task has sub process return true else false */ Loading @@ -732,6 +746,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * import process definition * * @param loginUser login user * @param file process metadata json file * @param currentProjectName current project name Loading Loading @@ -761,6 +776,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * check and import process definition * * @param loginUser * @param currentProjectName * @param result Loading Loading @@ -807,8 +823,8 @@ public class ProcessDefinitionService extends BaseDAGService { //create process definition Integer processDefinitionId = Objects.isNull(createProcessResult.get("processDefinitionId"))? null:Integer.parseInt(createProcessResult.get("processDefinitionId").toString()); Objects.isNull(createProcessResult.get(PROCESSDEFINITIONID)) ? null : Integer.parseInt(createProcessResult.get(PROCESSDEFINITIONID).toString()); //scheduler param return getImportProcessScheduleResult(loginUser, Loading @@ -822,6 +838,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * get create process result * * @param loginUser * @param currentProjectName * @param result Loading Loading @@ -856,6 +873,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * get import process schedule result * * @param loginUser * @param currentProjectName * @param result Loading Loading @@ -887,6 +905,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * check importance params * * @param processMeta * @param result * @return Loading @@ -910,18 +929,19 @@ public class ProcessDefinitionService extends BaseDAGService { /** * import process add special task param * * @param loginUser login user * @param processDefinitionJson process definition json * @param targetProject target project * @return import process param */ private String addImportTaskNodeParam(User loginUser, String processDefinitionJson, Project targetProject) { JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson); JSONArray jsonArray = (JSONArray) jsonObject.get("tasks"); ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson); ArrayNode jsonArray = (ArrayNode) jsonObject.get(TASKS); //add sql and dependent param for (int i = 0; i < jsonArray.size(); i++) { JSONObject taskNode = jsonArray.getJSONObject(i); String taskType = taskNode.getString("type"); JsonNode taskNode = jsonArray.path(i); String taskType = taskNode.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); if (null != addTaskParam) { addTaskParam.addImportSpecialParam(taskNode); Loading @@ -931,20 +951,21 @@ public class ProcessDefinitionService extends BaseDAGService { //recursive sub-process parameter correction map key for old process id value for new process id Map<Integer, Integer> subProcessIdMap = new HashMap<>(20); List<Object> subProcessList = jsonArray.stream() .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).getString("type"))) List<Object> subProcessList = StreamUtils.asStream(jsonArray.elements()) .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).path("type").asText())) .collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(subProcessList)) { importSubProcess(loginUser, targetProject, jsonArray, subProcessIdMap); } jsonObject.put("tasks", jsonArray); jsonObject.set(TASKS, jsonArray); return jsonObject.toString(); } /** * import process schedule * * @param loginUser login user * @param currentProjectName current project name * @param processMeta process meta data Loading Loading @@ -998,32 +1019,37 @@ public class ProcessDefinitionService extends BaseDAGService { /** * check import process has sub process * recursion create sub process * * @param loginUser login user * @param targetProject target project * @param jsonArray process task array * @param subProcessIdMap correct sub process id map */ public void importSubProcess(User loginUser, Project targetProject, JSONArray jsonArray, Map<Integer, Integer> subProcessIdMap) { public void importSubProcess(User loginUser, Project targetProject, ArrayNode jsonArray, Map<Integer, Integer> subProcessIdMap) { for (int i = 0; i < jsonArray.size(); i++) { JSONObject taskNode = jsonArray.getJSONObject(i); String taskType = taskNode.getString("type"); ObjectNode taskNode = (ObjectNode) jsonArray.path(i); String taskType = taskNode.path("type").asText(); if (checkTaskHasSubProcess(taskType)) { if (!checkTaskHasSubProcess(taskType)) { continue; } //get sub process info JSONObject subParams = JSONUtils.parseObject(taskNode.getString("params")); Integer subProcessId = subParams.getInteger("processDefinitionId"); ObjectNode subParams = (ObjectNode) taskNode.path("params"); Integer subProcessId = subParams.path(PROCESSDEFINITIONID).asInt(); ProcessDefinition subProcess = processDefineMapper.queryByDefineId(subProcessId); //check is sub process exist in db if (null != subProcess) { if (null == subProcess) { continue; } String subProcessJson = subProcess.getProcessDefinitionJson(); //check current project has sub process ProcessDefinition currentProjectSubProcess = processDefineMapper.queryByDefineName(targetProject.getId(), subProcess.getName()); if (null == currentProjectSubProcess) { JSONArray subJsonArray = (JSONArray) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get("tasks"); ArrayNode subJsonArray = (ArrayNode) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get(TASKS); List<Object> subProcessList = subJsonArray.stream() .filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).getString("type"))) List<Object> subProcessList = StreamUtils.asStream(subJsonArray.elements()) .filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).path("type").asText())) .collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(subProcessList)) { Loading Loading @@ -1071,10 +1097,8 @@ public class ProcessDefinitionService extends BaseDAGService { if (null != newSubProcessDefine) { subProcessIdMap.put(subProcessId, newSubProcessDefine.getId()); subParams.put("processDefinitionId", newSubProcessDefine.getId()); taskNode.put("params", subParams); } } subParams.put(PROCESSDEFINITIONID, newSubProcessDefine.getId()); taskNode.set("params", subParams); } } } Loading Loading @@ -1138,9 +1162,8 @@ public class ProcessDefinitionService extends BaseDAGService { * * @param defineId define id * @return task node list * @throws Exception exception */ public Map<String, Object> getTaskNodeListByDefinitionId(Integer defineId) throws Exception { public Map<String, Object> getTaskNodeListByDefinitionId(Integer defineId) { Map<String, Object> result = new HashMap<>(); ProcessDefinition processDefinition = processDefineMapper.selectById(defineId); Loading Loading @@ -1176,9 +1199,8 @@ public class ProcessDefinitionService extends BaseDAGService { * * @param defineIdList define id list * @return task node list * @throws Exception exception */ public Map<String, Object> getTaskNodeListByDefinitionIdList(String defineIdList) throws Exception { public Map<String, Object> getTaskNodeListByDefinitionIdList(String defineIdList) { Map<String, Object> result = new HashMap<>(); Map<Integer, List<TaskNode>> taskNodeMap = new HashMap<>(); Loading Loading @@ -1364,9 +1386,8 @@ public class ProcessDefinitionService extends BaseDAGService { * * @param processDefinition process definition * @return dag graph * @throws Exception if exception happens */ private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) throws Exception { private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) { String processDefinitionJson = processDefinition.getProcessDefinitionJson(); Loading @@ -1386,8 +1407,6 @@ public class ProcessDefinitionService extends BaseDAGService { } /** * whether the graph has a ring * Loading
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +2 −3 Original line number Diff line number Diff line Loading @@ -504,9 +504,8 @@ public class ProcessInstanceService extends BaseDAGService { * * @param processInstanceId process instance id * @return variables data * @throws Exception exception */ public Map<String, Object> viewVariables( Integer processInstanceId) throws Exception { public Map<String, Object> viewVariables(Integer processInstanceId) { Map<String, Object> result = new HashMap<>(5); ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId); Loading Loading @@ -537,7 +536,7 @@ public class ProcessInstanceService extends BaseDAGService { List<TaskNode> taskNodeList = workflowData.getTasks(); // global param string String globalParamStr = JSON.toJSONString(globalParams); String globalParamStr = JSONUtils.toJson(globalParams); globalParamStr = ParameterUtils.convertParameterPlaceholders(globalParamStr, timeParams); globalParams = JSON.parseArray(globalParamStr, Property.class); for (Property property : globalParams) { Loading
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java +11 −10 Original line number Diff line number Diff line Loading @@ -16,9 +16,9 @@ */ package org.apache.dolphinscheduler.api.utils.exportprocess; import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; import org.springframework.beans.factory.InitializingBean; Loading @@ -33,6 +33,7 @@ import java.util.List; @Service public class DataSourceParam implements ProcessAddTaskParam, InitializingBean { private static final String PARAMS = "params"; @Autowired private DataSourceMapper dataSourceMapper; Loading @@ -42,14 +43,14 @@ public class DataSourceParam implements ProcessAddTaskParam, InitializingBean { * @return task node json object */ @Override public JSONObject addExportSpecialParam(JSONObject taskNode) { public JsonNode addExportSpecialParam(JsonNode taskNode) { // add sqlParameters JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params")); DataSource dataSource = dataSourceMapper.selectById((Integer) sqlParameters.get("datasource")); ObjectNode sqlParameters = (ObjectNode) taskNode.path(PARAMS); DataSource dataSource = dataSourceMapper.selectById(sqlParameters.get("datasource").asInt()); if (null != dataSource) { sqlParameters.put("datasourceName", dataSource.getName()); } taskNode.put("params", sqlParameters); ((ObjectNode)taskNode).set(PARAMS, sqlParameters); return taskNode; } Loading @@ -60,14 +61,14 @@ public class DataSourceParam implements ProcessAddTaskParam, InitializingBean { * @return task node json object */ @Override public JSONObject addImportSpecialParam(JSONObject taskNode) { JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params")); List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName")); public JsonNode addImportSpecialParam(JsonNode taskNode) { ObjectNode sqlParameters = (ObjectNode) taskNode.path(PARAMS); List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.path("datasourceName").asText()); if (!dataSources.isEmpty()) { DataSource dataSource = dataSources.get(0); sqlParameters.put("datasource", dataSource.getId()); } taskNode.put("params", sqlParameters); ((ObjectNode)taskNode).set(PARAMS, sqlParameters); return taskNode; } Loading
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java +21 −19 Original line number Diff line number Diff line Loading @@ -16,8 +16,9 @@ */ package org.apache.dolphinscheduler.api.utils.exportprocess; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; Loading @@ -34,6 +35,7 @@ import org.springframework.stereotype.Service; @Service public class DependentParam implements ProcessAddTaskParam, InitializingBean { private static final String DEPENDENCE = "dependence"; @Autowired ProcessDefinitionMapper processDefineMapper; Loading @@ -47,18 +49,18 @@ public class DependentParam implements ProcessAddTaskParam, InitializingBean { * @return task node json object */ @Override public JSONObject addExportSpecialParam(JSONObject taskNode) { public JsonNode addExportSpecialParam(JsonNode taskNode) { // add dependent param JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence")); ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.path(DEPENDENCE).asText()); if (null != dependentParameters) { JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList"); ArrayNode dependTaskList = (ArrayNode) dependentParameters.get("dependTaskList"); for (int j = 0; j < dependTaskList.size(); j++) { JSONObject dependentTaskModel = dependTaskList.getJSONObject(j); JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList"); JsonNode dependentTaskModel = dependTaskList.path(j); ArrayNode dependItemList = (ArrayNode) dependentTaskModel.get("dependItemList"); for (int k = 0; k < dependItemList.size(); k++) { JSONObject dependentItem = dependItemList.getJSONObject(k); int definitionId = dependentItem.getInteger("definitionId"); ObjectNode dependentItem = (ObjectNode) dependItemList.path(k); int definitionId = dependentItem.path("definitionId").asInt(); ProcessDefinition definition = processDefineMapper.queryByDefineId(definitionId); if (null != definition) { dependentItem.put("projectName", definition.getProjectName()); Loading @@ -66,7 +68,7 @@ public class DependentParam implements ProcessAddTaskParam, InitializingBean { } } } taskNode.put("dependence", dependentParameters); ((ObjectNode)taskNode).set(DEPENDENCE, dependentParameters); } return taskNode; Loading @@ -78,18 +80,18 @@ public class DependentParam implements ProcessAddTaskParam, InitializingBean { * @return */ @Override public JSONObject addImportSpecialParam(JSONObject taskNode) { JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence")); public JsonNode addImportSpecialParam(JsonNode taskNode) { ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.path(DEPENDENCE).asText()); if(dependentParameters != null){ JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList"); ArrayNode dependTaskList = (ArrayNode) dependentParameters.path("dependTaskList"); for (int h = 0; h < dependTaskList.size(); h++) { JSONObject dependentTaskModel = dependTaskList.getJSONObject(h); JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList"); ObjectNode dependentTaskModel = (ObjectNode) dependTaskList.path(h); ArrayNode dependItemList = (ArrayNode) dependentTaskModel.get("dependItemList"); for (int k = 0; k < dependItemList.size(); k++) { JSONObject dependentItem = dependItemList.getJSONObject(k); Project dependentItemProject = projectMapper.queryByName(dependentItem.getString("projectName")); ObjectNode dependentItem = (ObjectNode) dependItemList.path(k); Project dependentItemProject = projectMapper.queryByName(dependentItem.path("projectName").asText()); if(dependentItemProject != null){ ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getId(),dependentItem.getString("definitionName")); ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getId(),dependentItem.path("definitionName").asText()); if(definition != null){ dependentItem.put("projectId",dependentItemProject.getId()); dependentItem.put("definitionId",definition.getId()); Loading @@ -97,7 +99,7 @@ public class DependentParam implements ProcessAddTaskParam, InitializingBean { } } } taskNode.put("dependence", dependentParameters); ((ObjectNode)taskNode).set(DEPENDENCE, dependentParameters); } return taskNode; } Loading