Loading escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java +7 −5 Original line number Diff line number Diff line Loading @@ -701,17 +701,19 @@ public class ResourcesService extends BaseService { if (checkAdmin(loginUser, result)) { return result; } List<Resource> resourceList = resourcesMapper.queryResourceExceptUserId(userId); Set<Resource> resourceSet = null; List<Object> list ; if (resourceList != null && resourceList.size() > 0) { resourceSet = new HashSet<>(resourceList); Set<Resource> resourceSet = new HashSet<>(resourceList); List<Resource> authedResourceList = resourcesMapper.queryAuthorizedResourceList(userId); getAuthorizedResourceList(resourceSet, authedResourceList); list = new ArrayList<>(resourceSet); }else { list = new ArrayList<>(0); } result.put(Constants.DATA_LIST, new ArrayList<>(resourceSet)); result.put(Constants.DATA_LIST, list); putMsg(result,Status.SUCCESS); return result; } Loading escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +48 −13 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ import cn.escheduler.common.Constants; import cn.escheduler.common.enums.*; import cn.escheduler.common.model.DateInterval; import cn.escheduler.common.model.TaskNode; import cn.escheduler.common.process.Property; import cn.escheduler.common.queue.ITaskQueue; import cn.escheduler.common.queue.TaskQueueFactory; import cn.escheduler.common.task.subprocess.SubProcessParameters; Loading @@ -41,6 +42,7 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import java.util.*; import java.util.stream.Collectors; import static cn.escheduler.common.Constants.*; import static cn.escheduler.dao.datasource.ConnectionFactory.getMapper; Loading Loading @@ -689,41 +691,62 @@ public class ProcessDao extends AbstractBaseDao { * handle sub work process instance, update relation table and command parameters * set sub work process flag, extends parent work process command parameters. */ public ProcessInstance setSubProcessParam(ProcessInstance processInstance){ String cmdParam = processInstance.getCommandParam(); public ProcessInstance setSubProcessParam(ProcessInstance subProcessInstance){ String cmdParam = subProcessInstance.getCommandParam(); if(StringUtils.isEmpty(cmdParam)){ return processInstance; return subProcessInstance; } Map<String, String> paramMap = JSONUtils.toMap(cmdParam); // write sub process id into cmd param. if(paramMap.containsKey(CMDPARAM_SUB_PROCESS) && CMDPARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMDPARAM_SUB_PROCESS))){ paramMap.remove(CMDPARAM_SUB_PROCESS); paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(processInstance.getId())); processInstance.setCommandParam(JSONUtils.toJson(paramMap)); processInstance.setIsSubProcess(Flag.YES); this.saveProcessInstance(processInstance); paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId())); subProcessInstance.setCommandParam(JSONUtils.toJson(paramMap)); subProcessInstance.setIsSubProcess(Flag.YES); this.saveProcessInstance(subProcessInstance); } // copy parent instance user def params to sub process.. String parentInstanceId = paramMap.get(CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID); if(StringUtils.isNotEmpty(parentInstanceId)){ ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)); if(parentInstance != null){ processInstance.setGlobalParams(parentInstance.getGlobalParams()); this.saveProcessInstance(processInstance); subProcessInstance.setGlobalParams( joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams())); this.saveProcessInstance(subProcessInstance); }else{ logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam); } } ProcessInstanceMap processInstanceMap = JSONUtils.parseObject(cmdParam, ProcessInstanceMap.class); if(processInstanceMap == null || processInstanceMap.getParentProcessInstanceId() == 0){ return processInstance; return subProcessInstance; } // update sub process id to process map table processInstanceMap.setProcessInstanceId(processInstance.getId()); processInstanceMap.setProcessInstanceId(subProcessInstance.getId()); this.updateWorkProcessInstanceMap(processInstanceMap); return processInstance; return subProcessInstance; } /** * join parent global params into sub process. * only the keys doesn't in sub process global would be joined. * @param parentGlobalParams * @param subGlobalParams * @return */ private String joinGlobalParams(String parentGlobalParams, String subGlobalParams){ List<Property> parentPropertyList = JSONUtils.toList(parentGlobalParams, Property.class); List<Property> subPropertyList = JSONUtils.toList(subGlobalParams, Property.class); Map<String,String> subMap = subPropertyList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); for(Property parent : parentPropertyList){ if(!subMap.containsKey(parent.getProp())){ subPropertyList.add(parent); } } return JSONUtils.toJson(subPropertyList); } /** Loading Loading @@ -898,7 +921,11 @@ public class ProcessDao extends AbstractBaseDao { taskInstance.setFlag(Flag.NO); updateTaskInstance(taskInstance); // crate new task instance if(taskInstance.getState() != ExecutionStatus.NEED_FAULT_TOLERANCE){ taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1 ); } taskInstance.setEndTime(null); taskInstance.setStartTime(new Date()); taskInstance.setFlag(Flag.YES); taskInstance.setHost(null); taskInstance.setId(0); Loading Loading @@ -1526,6 +1553,14 @@ public class ProcessDao extends AbstractBaseDao { } public void selfFaultTolerant(int ... states){ List<ProcessInstance> processInstanceList = processInstanceMapper.listByStatus(states); for (ProcessInstance processInstance:processInstanceList){ selfFaultTolerant(processInstance); } } @Transactional(value = "TransactionManager",rollbackFor = Exception.class) public void selfFaultTolerant(ProcessInstance processInstance){ Loading escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java +5 −1 Original line number Diff line number Diff line Loading @@ -422,9 +422,13 @@ public class TaskInstance { if(this.isSubProcess()){ return false; } if(this.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){ return true; }else { return (this.getState().typeIsFailure() && this.getRetryTimes() < this.getMaxRetryTimes()); } } public void setDependency(String dependency) { this.dependency = dependency; Loading escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java +1 −1 Original line number Diff line number Diff line Loading @@ -869,7 +869,7 @@ public class MasterExecThread implements Runnable { } Date now = new Date(); long runningTime = DateUtils.differMs(now, processInstance.getStartTime()); long runningTime = DateUtils.diffMin(now, processInstance.getStartTime()); if(runningTime > processInstance.getTimeout()){ return true; Loading escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java +1 −1 Original line number Diff line number Diff line Loading @@ -175,7 +175,7 @@ public class AlertManager { alert.setContent(content); alert.setAlertType(AlertType.EMAIL); alert.setCreateTime(new Date()); alert.setAlertGroupId(processInstance.getWarningGroupId()); alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1:processInstance.getWarningGroupId()); alert.setReceivers(processInstance.getProcessDefinition().getReceivers()); alert.setReceiversCc(processInstance.getProcessDefinition().getReceiversCc()); Loading Loading
escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java +7 −5 Original line number Diff line number Diff line Loading @@ -701,17 +701,19 @@ public class ResourcesService extends BaseService { if (checkAdmin(loginUser, result)) { return result; } List<Resource> resourceList = resourcesMapper.queryResourceExceptUserId(userId); Set<Resource> resourceSet = null; List<Object> list ; if (resourceList != null && resourceList.size() > 0) { resourceSet = new HashSet<>(resourceList); Set<Resource> resourceSet = new HashSet<>(resourceList); List<Resource> authedResourceList = resourcesMapper.queryAuthorizedResourceList(userId); getAuthorizedResourceList(resourceSet, authedResourceList); list = new ArrayList<>(resourceSet); }else { list = new ArrayList<>(0); } result.put(Constants.DATA_LIST, new ArrayList<>(resourceSet)); result.put(Constants.DATA_LIST, list); putMsg(result,Status.SUCCESS); return result; } Loading
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +48 −13 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ import cn.escheduler.common.Constants; import cn.escheduler.common.enums.*; import cn.escheduler.common.model.DateInterval; import cn.escheduler.common.model.TaskNode; import cn.escheduler.common.process.Property; import cn.escheduler.common.queue.ITaskQueue; import cn.escheduler.common.queue.TaskQueueFactory; import cn.escheduler.common.task.subprocess.SubProcessParameters; Loading @@ -41,6 +42,7 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import java.util.*; import java.util.stream.Collectors; import static cn.escheduler.common.Constants.*; import static cn.escheduler.dao.datasource.ConnectionFactory.getMapper; Loading Loading @@ -689,41 +691,62 @@ public class ProcessDao extends AbstractBaseDao { * handle sub work process instance, update relation table and command parameters * set sub work process flag, extends parent work process command parameters. */ public ProcessInstance setSubProcessParam(ProcessInstance processInstance){ String cmdParam = processInstance.getCommandParam(); public ProcessInstance setSubProcessParam(ProcessInstance subProcessInstance){ String cmdParam = subProcessInstance.getCommandParam(); if(StringUtils.isEmpty(cmdParam)){ return processInstance; return subProcessInstance; } Map<String, String> paramMap = JSONUtils.toMap(cmdParam); // write sub process id into cmd param. if(paramMap.containsKey(CMDPARAM_SUB_PROCESS) && CMDPARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMDPARAM_SUB_PROCESS))){ paramMap.remove(CMDPARAM_SUB_PROCESS); paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(processInstance.getId())); processInstance.setCommandParam(JSONUtils.toJson(paramMap)); processInstance.setIsSubProcess(Flag.YES); this.saveProcessInstance(processInstance); paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId())); subProcessInstance.setCommandParam(JSONUtils.toJson(paramMap)); subProcessInstance.setIsSubProcess(Flag.YES); this.saveProcessInstance(subProcessInstance); } // copy parent instance user def params to sub process.. String parentInstanceId = paramMap.get(CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID); if(StringUtils.isNotEmpty(parentInstanceId)){ ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)); if(parentInstance != null){ processInstance.setGlobalParams(parentInstance.getGlobalParams()); this.saveProcessInstance(processInstance); subProcessInstance.setGlobalParams( joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams())); this.saveProcessInstance(subProcessInstance); }else{ logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam); } } ProcessInstanceMap processInstanceMap = JSONUtils.parseObject(cmdParam, ProcessInstanceMap.class); if(processInstanceMap == null || processInstanceMap.getParentProcessInstanceId() == 0){ return processInstance; return subProcessInstance; } // update sub process id to process map table processInstanceMap.setProcessInstanceId(processInstance.getId()); processInstanceMap.setProcessInstanceId(subProcessInstance.getId()); this.updateWorkProcessInstanceMap(processInstanceMap); return processInstance; return subProcessInstance; } /** * join parent global params into sub process. * only the keys doesn't in sub process global would be joined. * @param parentGlobalParams * @param subGlobalParams * @return */ private String joinGlobalParams(String parentGlobalParams, String subGlobalParams){ List<Property> parentPropertyList = JSONUtils.toList(parentGlobalParams, Property.class); List<Property> subPropertyList = JSONUtils.toList(subGlobalParams, Property.class); Map<String,String> subMap = subPropertyList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); for(Property parent : parentPropertyList){ if(!subMap.containsKey(parent.getProp())){ subPropertyList.add(parent); } } return JSONUtils.toJson(subPropertyList); } /** Loading Loading @@ -898,7 +921,11 @@ public class ProcessDao extends AbstractBaseDao { taskInstance.setFlag(Flag.NO); updateTaskInstance(taskInstance); // crate new task instance if(taskInstance.getState() != ExecutionStatus.NEED_FAULT_TOLERANCE){ taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1 ); } taskInstance.setEndTime(null); taskInstance.setStartTime(new Date()); taskInstance.setFlag(Flag.YES); taskInstance.setHost(null); taskInstance.setId(0); Loading Loading @@ -1526,6 +1553,14 @@ public class ProcessDao extends AbstractBaseDao { } public void selfFaultTolerant(int ... states){ List<ProcessInstance> processInstanceList = processInstanceMapper.listByStatus(states); for (ProcessInstance processInstance:processInstanceList){ selfFaultTolerant(processInstance); } } @Transactional(value = "TransactionManager",rollbackFor = Exception.class) public void selfFaultTolerant(ProcessInstance processInstance){ Loading
escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java +5 −1 Original line number Diff line number Diff line Loading @@ -422,9 +422,13 @@ public class TaskInstance { if(this.isSubProcess()){ return false; } if(this.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){ return true; }else { return (this.getState().typeIsFailure() && this.getRetryTimes() < this.getMaxRetryTimes()); } } public void setDependency(String dependency) { this.dependency = dependency; Loading
escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java +1 −1 Original line number Diff line number Diff line Loading @@ -869,7 +869,7 @@ public class MasterExecThread implements Runnable { } Date now = new Date(); long runningTime = DateUtils.differMs(now, processInstance.getStartTime()); long runningTime = DateUtils.diffMin(now, processInstance.getStartTime()); if(runningTime > processInstance.getTimeout()){ return true; Loading
escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java +1 −1 Original line number Diff line number Diff line Loading @@ -175,7 +175,7 @@ public class AlertManager { alert.setContent(content); alert.setAlertType(AlertType.EMAIL); alert.setCreateTime(new Date()); alert.setAlertGroupId(processInstance.getWarningGroupId()); alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1:processInstance.getWarningGroupId()); alert.setReceivers(processInstance.getProcessDefinition().getReceivers()); alert.setReceiversCc(processInstance.getProcessDefinition().getReceiversCc()); Loading