Loading dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java +12 −12 Original line number Diff line number Diff line Loading @@ -449,30 +449,30 @@ public class ProcessDefinitionController extends BaseController { } /** * export process definition by id * batch export process definition by ids * * @param loginUser login user * @param projectName project name * @param processDefinitionId process definition id * @param processDefinitionIds process definition ids * @param response response */ @ApiOperation(value = "exportProcessDefinitionById", notes= "EXPORT_PROCESS_DEFINITION_BY_ID_NOTES") @ApiOperation(value = "batchExportProcessDefinitionByIds", notes= "BATCH_EXPORT_PROCESS_DEFINITION_BY_IDS_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100") @ApiImplicitParam(name = "processDefinitionIds", value = "PROCESS_DEFINITION_ID", required = true, dataType = "String") }) @GetMapping(value = "/export") @ResponseBody public void exportProcessDefinitionById(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @PathVariable String projectName, @RequestParam("processDefinitionId") Integer processDefinitionId, public void batchExportProcessDefinitionByIds(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, @RequestParam("processDefinitionIds") String processDefinitionIds, HttpServletResponse response) { try { logger.info("export process definition by id, login user:{}, project name:{}, process definition id:{}", loginUser.getUserName(), projectName, processDefinitionId); processDefinitionService.exportProcessDefinitionById(loginUser, projectName, processDefinitionId, response); logger.info("batch export process definition by ids, login user:{}, project name:{}, process definition ids:{}", loginUser.getUserName(), projectName, processDefinitionIds); processDefinitionService.batchExportProcessDefinitionByIds(loginUser, projectName, processDefinitionIds, response); } catch (Exception e) { logger.error(Status.EXPORT_PROCESS_DEFINE_BY_ID_ERROR.getMsg(), e); logger.error(Status.BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR.getMsg(), e); } } Loading dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +3 −2 Original line number Diff line number Diff line Loading @@ -214,8 +214,8 @@ public enum Status { EXECUTE_PROCESS_INSTANCE_ERROR(50015,"execute process instance error", "操作工作流实例错误"), CHECK_PROCESS_DEFINITION_ERROR(50016,"check process definition error", "检查工作流实例错误"), QUERY_RECIPIENTS_AND_COPYERS_BY_PROCESS_DEFINITION_ERROR(50017,"query recipients and copyers by process definition error", "查询收件人和抄送人错误"), DATA_IS_NOT_VALID(50017,"data %s not valid", "数据[%s]无效"), DATA_IS_NULL(50018,"data %s is null", "数据[%s]不能为空"), DATA_IS_NOT_VALID(50017,"data {0} not valid", "数据[{0}]无效"), DATA_IS_NULL(50018,"data {0} is null", "数据[{0}]不能为空"), PROCESS_NODE_HAS_CYCLE(50019,"process node has cycle", "流程节点间存在循环依赖"), PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node %s parameter invalid", "流程节点[%s]参数无效"), PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line", "工作流定义[{0}]已上线"), Loading @@ -226,6 +226,7 @@ public enum Status { BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR(50026,"batch delete process definition by ids {0} error", "批量删除工作流定义[{0}]错误"), TENANT_NOT_SUITABLE(50027,"there is not any tenant suitable, please choose a tenant available.", "没有合适的租户,请选择可用的租户"), EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028,"export process definition by id error", "导出工作流定义错误"), BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028,"batch export process definition by ids error", "批量导出工作流定义错误"), IMPORT_PROCESS_DEFINE_ERROR(50029,"import process definition error", "导入工作流定义错误"), HDFS_NOT_STARTUP(60001,"hdfs not startup", "hdfs未启用"), Loading dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +193 −70 Original line number Diff line number Diff line Loading @@ -563,14 +563,18 @@ public class ProcessDefinitionService extends BaseDAGService { } /** * export process definition by id * * @param loginUser login user * @param projectName project name * @param processDefinitionId process definition id * @param response response * batch export process definition by ids * @param loginUser * @param projectName * @param processDefinitionIds * @param response */ public void exportProcessDefinitionById(User loginUser, String projectName, Integer processDefinitionId, HttpServletResponse response) { public void batchExportProcessDefinitionByIds(User loginUser, String projectName, String processDefinitionIds, HttpServletResponse response){ if(StringUtils.isEmpty(processDefinitionIds)){ return; } //export project info Project project = projectMapper.queryByName(projectName); Loading @@ -578,20 +582,51 @@ public class ProcessDefinitionService extends BaseDAGService { Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Status resultStatus = (Status) checkResult.get(Constants.STATUS); if (resultStatus == Status.SUCCESS) { if(resultStatus != Status.SUCCESS){ return; } List<ProcessMeta> processDefinitionList = getProcessDefinitionList(processDefinitionIds); if(CollectionUtils.isNotEmpty(processDefinitionList)){ downloadProcessDefinitionFile(response, processDefinitionList); } } /** * get process definition list by ids * @param processDefinitionIds * @return */ private List<ProcessMeta> getProcessDefinitionList(String processDefinitionIds){ List<ProcessMeta> processDefinitionList = new ArrayList<>(); String[] processDefinitionIdArray = processDefinitionIds.split(","); for (String strProcessDefinitionId : processDefinitionIdArray) { //get workflow info int processDefinitionId = Integer.parseInt(strProcessDefinitionId); ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId); if (null != processDefinition) { String exportProcessJson = exportProcessMetaDataStr(processDefinitionId, processDefinition); processDefinitionList.add(exportProcessMetaData(processDefinitionId, processDefinition)); } } return processDefinitionList; } /** * download the process definition file * @param response * @param processDefinitionList */ private void downloadProcessDefinitionFile(HttpServletResponse response, List<ProcessMeta> processDefinitionList) { response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE); response.setHeader("Content-Disposition", "attachment;filename="+processDefinition.getName()+".json"); BufferedOutputStream buff = null; ServletOutputStream out = null; try { out = response.getOutputStream(); buff = new BufferedOutputStream(out); buff.write(exportProcessJson.getBytes(StandardCharsets.UTF_8)); buff.write(JSON.toJSONString(processDefinitionList).getBytes(StandardCharsets.UTF_8)); buff.flush(); buff.close(); } catch (IOException e) { Loading @@ -613,8 +648,6 @@ public class ProcessDefinitionService extends BaseDAGService { } } } } } /** * get export process metadata string Loading @@ -623,6 +656,17 @@ public class ProcessDefinitionService extends BaseDAGService { * @return export process metadata string */ public String exportProcessMetaDataStr(Integer processDefinitionId, ProcessDefinition processDefinition) { //create workflow json file return JSONUtils.toJsonString(exportProcessMetaData(processDefinitionId,processDefinition)); } /** * get export process metadata string * @param processDefinitionId process definition id * @param processDefinition process definition * @return export process metadata string */ public ProcessMeta exportProcessMetaData(Integer processDefinitionId, ProcessDefinition processDefinition) { //correct task param which has data source or dependent param String correctProcessDefinitionJson = addExportTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson()); processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson); Loading @@ -639,14 +683,6 @@ public class ProcessDefinitionService extends BaseDAGService { List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId); if (!schedules.isEmpty()) { Schedule schedule = schedules.get(0); /*WorkerGroup workerGroup = workerGroupMapper.selectById(schedule.getWorkerGroupId()); if (null == workerGroup && schedule.getWorkerGroupId() == -1) { workerGroup = new WorkerGroup(); workerGroup.setId(-1); workerGroup.setName(""); }*/ exportProcessMeta.setScheduleWarningType(schedule.getWarningType().toString()); exportProcessMeta.setScheduleWarningGroupId(schedule.getWarningGroupId()); exportProcessMeta.setScheduleStartTime(DateUtils.dateToString(schedule.getStartTime())); Loading @@ -658,7 +694,7 @@ public class ProcessDefinitionService extends BaseDAGService { exportProcessMeta.setScheduleWorkerGroupName(schedule.getWorkerGroup()); } //create workflow json file return JSONUtils.toJsonString(exportProcessMeta); return exportProcessMeta; } /** Loading Loading @@ -705,26 +741,38 @@ public class ProcessDefinitionService extends BaseDAGService { public Map<String, Object> importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) { Map<String, Object> result = new HashMap<>(5); String processMetaJson = FileUtils.file2String(file); ProcessMeta processMeta = JSONUtils.parseObject(processMetaJson, ProcessMeta.class); List<ProcessMeta> processMetaList = JSON.parseArray(processMetaJson,ProcessMeta.class); //check file content if (null == processMeta) { if (CollectionUtils.isEmpty(processMetaList)) { putMsg(result, Status.DATA_IS_NULL, "fileContent"); return result; } if (StringUtils.isEmpty(processMeta.getProjectName())) { putMsg(result, Status.DATA_IS_NULL, "projectName"); for(ProcessMeta processMeta:processMetaList){ if (!checkAndImportProcessDefinition(loginUser, currentProjectName, result, processMeta)){ return result; } if (StringUtils.isEmpty(processMeta.getProcessDefinitionName())) { putMsg(result, Status.DATA_IS_NULL, "processDefinitionName"); return result; } if (StringUtils.isEmpty(processMeta.getProcessDefinitionJson())) { putMsg(result, Status.DATA_IS_NULL, "processDefinitionJson"); return result; } /** * check and import process definition * @param loginUser * @param currentProjectName * @param result * @param processMeta * @return */ private boolean checkAndImportProcessDefinition(User loginUser, String currentProjectName, Map<String, Object> result, ProcessMeta processMeta) { if(!checkImportanceParams(processMeta,result)){ return false; } //deal with process name String processDefinitionName = processMeta.getProcessDefinitionName(); //use currentProjectName to query Loading @@ -734,31 +782,84 @@ public class ProcessDefinitionService extends BaseDAGService { processDefinitionName, 1); } //add special task param String importProcessParam = addImportTaskNodeParam(loginUser, processMeta.getProcessDefinitionJson(), targetProject); // get create process result Map<String, Object> createProcessResult = getCreateProcessResult(loginUser, currentProjectName, result, processMeta, processDefinitionName, addImportTaskNodeParam(loginUser, processMeta.getProcessDefinitionJson(), targetProject)); if(createProcessResult == null){ return false; } //create process definition Integer processDefinitionId = Objects.isNull(createProcessResult.get("processDefinitionId"))? null:Integer.parseInt(createProcessResult.get("processDefinitionId").toString()); //scheduler param return getImportProcessScheduleResult(loginUser, currentProjectName, result, processMeta, processDefinitionName, processDefinitionId); } Map<String, Object> createProcessResult; /** * get create process result * @param loginUser * @param currentProjectName * @param result * @param processMeta * @param processDefinitionName * @param importProcessParam * @return */ private Map<String, Object> getCreateProcessResult(User loginUser, String currentProjectName, Map<String, Object> result, ProcessMeta processMeta, String processDefinitionName, String importProcessParam){ Map<String, Object> createProcessResult = null; try { createProcessResult = createProcessDefinition(loginUser ,currentProjectName, processDefinitionName, processDefinitionName+"_import_"+System.currentTimeMillis(), importProcessParam, processMeta.getProcessDefinitionDescription(), processMeta.getProcessDefinitionLocations(), processMeta.getProcessDefinitionConnects()); putMsg(result, Status.SUCCESS); } catch (JsonProcessingException e) { logger.error("import process meta json data: {}", e.getMessage(), e); putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); return result; } putMsg(result, Status.SUCCESS); //create process definition Integer processDefinitionId = null; if (null != createProcessResult && Objects.nonNull(createProcessResult.get("processDefinitionId"))) { processDefinitionId = Integer.parseInt(createProcessResult.get("processDefinitionId").toString()); return createProcessResult; } //scheduler param /** * get import process schedule result * @param loginUser * @param currentProjectName * @param result * @param processMeta * @param processDefinitionName * @param processDefinitionId * @return */ private boolean getImportProcessScheduleResult(User loginUser, String currentProjectName, Map<String, Object> result, ProcessMeta processMeta, String processDefinitionName, Integer processDefinitionId) { if (null != processMeta.getScheduleCrontab() && null != processDefinitionId) { int scheduleInsert = importProcessSchedule(loginUser, currentProjectName, Loading @@ -768,11 +869,33 @@ public class ProcessDefinitionService extends BaseDAGService { if (0 == scheduleInsert) { putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); return result; return false; } } return true; } return result; /** * check importance params * @param processMeta * @param result * @return */ private boolean checkImportanceParams(ProcessMeta processMeta,Map<String, Object> result){ if (StringUtils.isEmpty(processMeta.getProjectName())) { putMsg(result, Status.DATA_IS_NULL, "projectName"); return false; } if (StringUtils.isEmpty(processMeta.getProcessDefinitionName())) { putMsg(result, Status.DATA_IS_NULL, "processDefinitionName"); return false; } if (StringUtils.isEmpty(processMeta.getProcessDefinitionJson())) { putMsg(result, Status.DATA_IS_NULL, "processDefinitionJson"); return false; } return true; } /** Loading dolphinscheduler-api/src/main/resources/i18n/messages.properties +2 −0 Original line number Diff line number Diff line Loading @@ -252,3 +252,5 @@ UNAUTHORIZED_DATA_SOURCE_NOTES=unauthorized data source AUTHORIZED_DATA_SOURCE_NOTES=authorized data source DELETE_SCHEDULER_BY_ID_NOTES=delete scheduler by id QUERY_ALERT_GROUP_LIST_PAGING_NOTES=query alert group list paging EXPORT_PROCESS_DEFINITION_BY_ID_NOTES=export process definition by id BATCH_EXPORT_PROCESS_DEFINITION_BY_IDS_NOTES= batch export process definition by ids No newline at end of file dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties +2 −0 Original line number Diff line number Diff line Loading @@ -252,3 +252,5 @@ UNAUTHORIZED_DATA_SOURCE_NOTES=unauthorized data source AUTHORIZED_DATA_SOURCE_NOTES=authorized data source DELETE_SCHEDULER_BY_ID_NOTES=delete scheduler by id QUERY_ALERT_GROUP_LIST_PAGING_NOTES=query alert group list paging EXPORT_PROCESS_DEFINITION_BY_ID_NOTES=export process definition by id BATCH_EXPORT_PROCESS_DEFINITION_BY_IDS_NOTES= batch export process definition by ids Loading
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java +12 −12 Original line number Diff line number Diff line Loading @@ -449,30 +449,30 @@ public class ProcessDefinitionController extends BaseController { } /** * export process definition by id * batch export process definition by ids * * @param loginUser login user * @param projectName project name * @param processDefinitionId process definition id * @param processDefinitionIds process definition ids * @param response response */ @ApiOperation(value = "exportProcessDefinitionById", notes= "EXPORT_PROCESS_DEFINITION_BY_ID_NOTES") @ApiOperation(value = "batchExportProcessDefinitionByIds", notes= "BATCH_EXPORT_PROCESS_DEFINITION_BY_IDS_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100") @ApiImplicitParam(name = "processDefinitionIds", value = "PROCESS_DEFINITION_ID", required = true, dataType = "String") }) @GetMapping(value = "/export") @ResponseBody public void exportProcessDefinitionById(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @PathVariable String projectName, @RequestParam("processDefinitionId") Integer processDefinitionId, public void batchExportProcessDefinitionByIds(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, @RequestParam("processDefinitionIds") String processDefinitionIds, HttpServletResponse response) { try { logger.info("export process definition by id, login user:{}, project name:{}, process definition id:{}", loginUser.getUserName(), projectName, processDefinitionId); processDefinitionService.exportProcessDefinitionById(loginUser, projectName, processDefinitionId, response); logger.info("batch export process definition by ids, login user:{}, project name:{}, process definition ids:{}", loginUser.getUserName(), projectName, processDefinitionIds); processDefinitionService.batchExportProcessDefinitionByIds(loginUser, projectName, processDefinitionIds, response); } catch (Exception e) { logger.error(Status.EXPORT_PROCESS_DEFINE_BY_ID_ERROR.getMsg(), e); logger.error(Status.BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR.getMsg(), e); } } Loading
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +3 −2 Original line number Diff line number Diff line Loading @@ -214,8 +214,8 @@ public enum Status { EXECUTE_PROCESS_INSTANCE_ERROR(50015,"execute process instance error", "操作工作流实例错误"), CHECK_PROCESS_DEFINITION_ERROR(50016,"check process definition error", "检查工作流实例错误"), QUERY_RECIPIENTS_AND_COPYERS_BY_PROCESS_DEFINITION_ERROR(50017,"query recipients and copyers by process definition error", "查询收件人和抄送人错误"), DATA_IS_NOT_VALID(50017,"data %s not valid", "数据[%s]无效"), DATA_IS_NULL(50018,"data %s is null", "数据[%s]不能为空"), DATA_IS_NOT_VALID(50017,"data {0} not valid", "数据[{0}]无效"), DATA_IS_NULL(50018,"data {0} is null", "数据[{0}]不能为空"), PROCESS_NODE_HAS_CYCLE(50019,"process node has cycle", "流程节点间存在循环依赖"), PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node %s parameter invalid", "流程节点[%s]参数无效"), PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line", "工作流定义[{0}]已上线"), Loading @@ -226,6 +226,7 @@ public enum Status { BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR(50026,"batch delete process definition by ids {0} error", "批量删除工作流定义[{0}]错误"), TENANT_NOT_SUITABLE(50027,"there is not any tenant suitable, please choose a tenant available.", "没有合适的租户,请选择可用的租户"), EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028,"export process definition by id error", "导出工作流定义错误"), BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028,"batch export process definition by ids error", "批量导出工作流定义错误"), IMPORT_PROCESS_DEFINE_ERROR(50029,"import process definition error", "导入工作流定义错误"), HDFS_NOT_STARTUP(60001,"hdfs not startup", "hdfs未启用"), Loading
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +193 −70 Original line number Diff line number Diff line Loading @@ -563,14 +563,18 @@ public class ProcessDefinitionService extends BaseDAGService { } /** * export process definition by id * * @param loginUser login user * @param projectName project name * @param processDefinitionId process definition id * @param response response * batch export process definition by ids * @param loginUser * @param projectName * @param processDefinitionIds * @param response */ public void exportProcessDefinitionById(User loginUser, String projectName, Integer processDefinitionId, HttpServletResponse response) { public void batchExportProcessDefinitionByIds(User loginUser, String projectName, String processDefinitionIds, HttpServletResponse response){ if(StringUtils.isEmpty(processDefinitionIds)){ return; } //export project info Project project = projectMapper.queryByName(projectName); Loading @@ -578,20 +582,51 @@ public class ProcessDefinitionService extends BaseDAGService { Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Status resultStatus = (Status) checkResult.get(Constants.STATUS); if (resultStatus == Status.SUCCESS) { if(resultStatus != Status.SUCCESS){ return; } List<ProcessMeta> processDefinitionList = getProcessDefinitionList(processDefinitionIds); if(CollectionUtils.isNotEmpty(processDefinitionList)){ downloadProcessDefinitionFile(response, processDefinitionList); } } /** * get process definition list by ids * @param processDefinitionIds * @return */ private List<ProcessMeta> getProcessDefinitionList(String processDefinitionIds){ List<ProcessMeta> processDefinitionList = new ArrayList<>(); String[] processDefinitionIdArray = processDefinitionIds.split(","); for (String strProcessDefinitionId : processDefinitionIdArray) { //get workflow info int processDefinitionId = Integer.parseInt(strProcessDefinitionId); ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId); if (null != processDefinition) { String exportProcessJson = exportProcessMetaDataStr(processDefinitionId, processDefinition); processDefinitionList.add(exportProcessMetaData(processDefinitionId, processDefinition)); } } return processDefinitionList; } /** * download the process definition file * @param response * @param processDefinitionList */ private void downloadProcessDefinitionFile(HttpServletResponse response, List<ProcessMeta> processDefinitionList) { response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE); response.setHeader("Content-Disposition", "attachment;filename="+processDefinition.getName()+".json"); BufferedOutputStream buff = null; ServletOutputStream out = null; try { out = response.getOutputStream(); buff = new BufferedOutputStream(out); buff.write(exportProcessJson.getBytes(StandardCharsets.UTF_8)); buff.write(JSON.toJSONString(processDefinitionList).getBytes(StandardCharsets.UTF_8)); buff.flush(); buff.close(); } catch (IOException e) { Loading @@ -613,8 +648,6 @@ public class ProcessDefinitionService extends BaseDAGService { } } } } } /** * get export process metadata string Loading @@ -623,6 +656,17 @@ public class ProcessDefinitionService extends BaseDAGService { * @return export process metadata string */ public String exportProcessMetaDataStr(Integer processDefinitionId, ProcessDefinition processDefinition) { //create workflow json file return JSONUtils.toJsonString(exportProcessMetaData(processDefinitionId,processDefinition)); } /** * get export process metadata string * @param processDefinitionId process definition id * @param processDefinition process definition * @return export process metadata string */ public ProcessMeta exportProcessMetaData(Integer processDefinitionId, ProcessDefinition processDefinition) { //correct task param which has data source or dependent param String correctProcessDefinitionJson = addExportTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson()); processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson); Loading @@ -639,14 +683,6 @@ public class ProcessDefinitionService extends BaseDAGService { List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId); if (!schedules.isEmpty()) { Schedule schedule = schedules.get(0); /*WorkerGroup workerGroup = workerGroupMapper.selectById(schedule.getWorkerGroupId()); if (null == workerGroup && schedule.getWorkerGroupId() == -1) { workerGroup = new WorkerGroup(); workerGroup.setId(-1); workerGroup.setName(""); }*/ exportProcessMeta.setScheduleWarningType(schedule.getWarningType().toString()); exportProcessMeta.setScheduleWarningGroupId(schedule.getWarningGroupId()); exportProcessMeta.setScheduleStartTime(DateUtils.dateToString(schedule.getStartTime())); Loading @@ -658,7 +694,7 @@ public class ProcessDefinitionService extends BaseDAGService { exportProcessMeta.setScheduleWorkerGroupName(schedule.getWorkerGroup()); } //create workflow json file return JSONUtils.toJsonString(exportProcessMeta); return exportProcessMeta; } /** Loading Loading @@ -705,26 +741,38 @@ public class ProcessDefinitionService extends BaseDAGService { public Map<String, Object> importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) { Map<String, Object> result = new HashMap<>(5); String processMetaJson = FileUtils.file2String(file); ProcessMeta processMeta = JSONUtils.parseObject(processMetaJson, ProcessMeta.class); List<ProcessMeta> processMetaList = JSON.parseArray(processMetaJson,ProcessMeta.class); //check file content if (null == processMeta) { if (CollectionUtils.isEmpty(processMetaList)) { putMsg(result, Status.DATA_IS_NULL, "fileContent"); return result; } if (StringUtils.isEmpty(processMeta.getProjectName())) { putMsg(result, Status.DATA_IS_NULL, "projectName"); for(ProcessMeta processMeta:processMetaList){ if (!checkAndImportProcessDefinition(loginUser, currentProjectName, result, processMeta)){ return result; } if (StringUtils.isEmpty(processMeta.getProcessDefinitionName())) { putMsg(result, Status.DATA_IS_NULL, "processDefinitionName"); return result; } if (StringUtils.isEmpty(processMeta.getProcessDefinitionJson())) { putMsg(result, Status.DATA_IS_NULL, "processDefinitionJson"); return result; } /** * check and import process definition * @param loginUser * @param currentProjectName * @param result * @param processMeta * @return */ private boolean checkAndImportProcessDefinition(User loginUser, String currentProjectName, Map<String, Object> result, ProcessMeta processMeta) { if(!checkImportanceParams(processMeta,result)){ return false; } //deal with process name String processDefinitionName = processMeta.getProcessDefinitionName(); //use currentProjectName to query Loading @@ -734,31 +782,84 @@ public class ProcessDefinitionService extends BaseDAGService { processDefinitionName, 1); } //add special task param String importProcessParam = addImportTaskNodeParam(loginUser, processMeta.getProcessDefinitionJson(), targetProject); // get create process result Map<String, Object> createProcessResult = getCreateProcessResult(loginUser, currentProjectName, result, processMeta, processDefinitionName, addImportTaskNodeParam(loginUser, processMeta.getProcessDefinitionJson(), targetProject)); if(createProcessResult == null){ return false; } //create process definition Integer processDefinitionId = Objects.isNull(createProcessResult.get("processDefinitionId"))? null:Integer.parseInt(createProcessResult.get("processDefinitionId").toString()); //scheduler param return getImportProcessScheduleResult(loginUser, currentProjectName, result, processMeta, processDefinitionName, processDefinitionId); } Map<String, Object> createProcessResult; /** * get create process result * @param loginUser * @param currentProjectName * @param result * @param processMeta * @param processDefinitionName * @param importProcessParam * @return */ private Map<String, Object> getCreateProcessResult(User loginUser, String currentProjectName, Map<String, Object> result, ProcessMeta processMeta, String processDefinitionName, String importProcessParam){ Map<String, Object> createProcessResult = null; try { createProcessResult = createProcessDefinition(loginUser ,currentProjectName, processDefinitionName, processDefinitionName+"_import_"+System.currentTimeMillis(), importProcessParam, processMeta.getProcessDefinitionDescription(), processMeta.getProcessDefinitionLocations(), processMeta.getProcessDefinitionConnects()); putMsg(result, Status.SUCCESS); } catch (JsonProcessingException e) { logger.error("import process meta json data: {}", e.getMessage(), e); putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); return result; } putMsg(result, Status.SUCCESS); //create process definition Integer processDefinitionId = null; if (null != createProcessResult && Objects.nonNull(createProcessResult.get("processDefinitionId"))) { processDefinitionId = Integer.parseInt(createProcessResult.get("processDefinitionId").toString()); return createProcessResult; } //scheduler param /** * get import process schedule result * @param loginUser * @param currentProjectName * @param result * @param processMeta * @param processDefinitionName * @param processDefinitionId * @return */ private boolean getImportProcessScheduleResult(User loginUser, String currentProjectName, Map<String, Object> result, ProcessMeta processMeta, String processDefinitionName, Integer processDefinitionId) { if (null != processMeta.getScheduleCrontab() && null != processDefinitionId) { int scheduleInsert = importProcessSchedule(loginUser, currentProjectName, Loading @@ -768,11 +869,33 @@ public class ProcessDefinitionService extends BaseDAGService { if (0 == scheduleInsert) { putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); return result; return false; } } return true; } return result; /** * check importance params * @param processMeta * @param result * @return */ private boolean checkImportanceParams(ProcessMeta processMeta,Map<String, Object> result){ if (StringUtils.isEmpty(processMeta.getProjectName())) { putMsg(result, Status.DATA_IS_NULL, "projectName"); return false; } if (StringUtils.isEmpty(processMeta.getProcessDefinitionName())) { putMsg(result, Status.DATA_IS_NULL, "processDefinitionName"); return false; } if (StringUtils.isEmpty(processMeta.getProcessDefinitionJson())) { putMsg(result, Status.DATA_IS_NULL, "processDefinitionJson"); return false; } return true; } /** Loading
dolphinscheduler-api/src/main/resources/i18n/messages.properties +2 −0 Original line number Diff line number Diff line Loading @@ -252,3 +252,5 @@ UNAUTHORIZED_DATA_SOURCE_NOTES=unauthorized data source AUTHORIZED_DATA_SOURCE_NOTES=authorized data source DELETE_SCHEDULER_BY_ID_NOTES=delete scheduler by id QUERY_ALERT_GROUP_LIST_PAGING_NOTES=query alert group list paging EXPORT_PROCESS_DEFINITION_BY_ID_NOTES=export process definition by id BATCH_EXPORT_PROCESS_DEFINITION_BY_IDS_NOTES= batch export process definition by ids No newline at end of file
dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties +2 −0 Original line number Diff line number Diff line Loading @@ -252,3 +252,5 @@ UNAUTHORIZED_DATA_SOURCE_NOTES=unauthorized data source AUTHORIZED_DATA_SOURCE_NOTES=authorized data source DELETE_SCHEDULER_BY_ID_NOTES=delete scheduler by id QUERY_ALERT_GROUP_LIST_PAGING_NOTES=query alert group list paging EXPORT_PROCESS_DEFINITION_BY_ID_NOTES=export process definition by id BATCH_EXPORT_PROCESS_DEFINITION_BY_IDS_NOTES= batch export process definition by ids