Loading escheduler-api/src/main/java/cn/escheduler/api/controller/ProcessInstanceController.java +5 −1 Original line number Diff line number Diff line Loading @@ -22,6 +22,8 @@ import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.Result; import cn.escheduler.common.enums.ExecutionStatus; import cn.escheduler.common.enums.Flag; import cn.escheduler.common.queue.ITaskQueue; import cn.escheduler.common.queue.TaskQueueFactory; import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.dao.model.User; import org.slf4j.Logger; Loading Loading @@ -189,7 +191,9 @@ public class ProcessInstanceController extends BaseController{ try{ logger.info("delete process instance by id, login user:{}, project name:{}, process instance id:{}", loginUser.getUserName(), projectName, processInstanceId); Map<String, Object> result = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId); // task queue ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); Map<String, Object> result = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue); return returnDataList(result); }catch (Exception e){ logger.error(DELETE_PROCESS_INSTANCE_BY_ID_ERROR.getMsg(),e); Loading escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java +30 −9 Original line number Diff line number Diff line Loading @@ -30,6 +30,8 @@ import cn.escheduler.common.graph.DAG; import cn.escheduler.common.model.TaskNode; import cn.escheduler.common.model.TaskNodeRelation; import cn.escheduler.common.process.Property; import cn.escheduler.common.queue.ITaskQueue; import cn.escheduler.common.queue.TaskQueueFactory; import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.JSONUtils; Loading Loading @@ -446,13 +448,13 @@ public class ProcessInstanceService extends BaseDAGService { /** * delete process instance by id, at the same time,delete task instance and their mapping relation data * * @param loginUser * @param projectName * @param workflowId * @param processInstanceId * @param tasksQueue * @return */ public Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer workflowId) { public Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId,ITaskQueue tasksQueue) { Map<String, Object> result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); Loading @@ -462,17 +464,34 @@ public class ProcessInstanceService extends BaseDAGService { if (resultEnum != Status.SUCCESS) { return checkResult; } ProcessInstance processInstance = processDao.findProcessInstanceDetailById(workflowId); ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processInstanceId); List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstanceId); //process instance priority int processInstancePriority = processInstance.getProcessInstancePriority().ordinal(); if (processInstance == null) { putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, workflowId); putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); return result; } int delete = processDao.deleteWorkProcessInstanceById(workflowId); processDao.deleteAllSubWorkProcessByParentId(workflowId); processDao.deleteWorkProcessMapByParentId(workflowId); int delete = processDao.deleteWorkProcessInstanceById(processInstanceId); processDao.deleteAllSubWorkProcessByParentId(processInstanceId); processDao.deleteWorkProcessMapByParentId(processInstanceId); if (delete > 0) { if (CollectionUtils.isNotEmpty(taskInstanceList)){ for (TaskInstance taskInstance : taskInstanceList){ // task instance priority int taskInstancePriority = taskInstance.getTaskInstancePriority().ordinal(); String nodeValue=processInstancePriority + "_" + processInstanceId + "_" +taskInstancePriority + "_" + taskInstance.getId(); try { logger.info("delete task queue node : {}",nodeValue); tasksQueue.removeNode(cn.escheduler.common.Constants.SCHEDULER_TASKS_QUEUE, nodeValue); }catch (Exception e){ logger.error("delete task queue node : {}", nodeValue); } } } putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR); Loading @@ -489,6 +508,8 @@ public class ProcessInstanceService extends BaseDAGService { * @return */ public Map<String, Object> batchDeleteProcessInstanceByIds(User loginUser, String projectName, String processInstanceIds) { // task queue ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); Map<String, Object> result = new HashMap<>(5); List<Integer> deleteFailedIdList = new ArrayList<Integer>(); Loading @@ -507,7 +528,7 @@ public class ProcessInstanceService extends BaseDAGService { for (String strProcessInstanceId:processInstanceIdArray) { int processInstanceId = Integer.parseInt(strProcessInstanceId); try { deleteProcessInstanceById(loginUser, projectName, processInstanceId); deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue); } catch (Exception e) { deleteFailedIdList.add(processInstanceId); } Loading escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +8 −0 Original line number Diff line number Diff line Loading @@ -1526,6 +1526,14 @@ public class ProcessDao extends AbstractBaseDao { } public void selfFaultTolerant(int ... states){ List<ProcessInstance> processInstanceList = processInstanceMapper.listByStatus(states); for (ProcessInstance processInstance:processInstanceList){ selfFaultTolerant(processInstance); } } @Transactional(value = "TransactionManager",rollbackFor = Exception.class) public void selfFaultTolerant(ProcessInstance processInstance){ Loading escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java +1 −1 Original line number Diff line number Diff line Loading @@ -175,7 +175,7 @@ public class AlertManager { alert.setContent(content); alert.setAlertType(AlertType.EMAIL); alert.setCreateTime(new Date()); alert.setAlertGroupId(processInstance.getWarningGroupId()); alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1:processInstance.getWarningGroupId()); alert.setReceivers(processInstance.getProcessDefinition().getReceivers()); alert.setReceiversCc(processInstance.getProcessDefinition().getReceiversCc()); Loading escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java +2 −2 Original line number Diff line number Diff line Loading @@ -123,9 +123,9 @@ public class ZKMasterClient extends AbstractZKClient { // register master this.registMaster(); // check if fault tolerance is required // check if fault tolerance is required,failure and tolerance if (getActiveMasterNum() == 1) { processDao.selfFaultTolerant(ExecutionStatus.RUNNING_EXEUTION.ordinal()); processDao.selfFaultTolerant(ExecutionStatus.RUNNING_EXEUTION.ordinal(),ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal()); } } Loading Loading
escheduler-api/src/main/java/cn/escheduler/api/controller/ProcessInstanceController.java +5 −1 Original line number Diff line number Diff line Loading @@ -22,6 +22,8 @@ import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.Result; import cn.escheduler.common.enums.ExecutionStatus; import cn.escheduler.common.enums.Flag; import cn.escheduler.common.queue.ITaskQueue; import cn.escheduler.common.queue.TaskQueueFactory; import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.dao.model.User; import org.slf4j.Logger; Loading Loading @@ -189,7 +191,9 @@ public class ProcessInstanceController extends BaseController{ try{ logger.info("delete process instance by id, login user:{}, project name:{}, process instance id:{}", loginUser.getUserName(), projectName, processInstanceId); Map<String, Object> result = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId); // task queue ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); Map<String, Object> result = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue); return returnDataList(result); }catch (Exception e){ logger.error(DELETE_PROCESS_INSTANCE_BY_ID_ERROR.getMsg(),e); Loading
escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java +30 −9 Original line number Diff line number Diff line Loading @@ -30,6 +30,8 @@ import cn.escheduler.common.graph.DAG; import cn.escheduler.common.model.TaskNode; import cn.escheduler.common.model.TaskNodeRelation; import cn.escheduler.common.process.Property; import cn.escheduler.common.queue.ITaskQueue; import cn.escheduler.common.queue.TaskQueueFactory; import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.JSONUtils; Loading Loading @@ -446,13 +448,13 @@ public class ProcessInstanceService extends BaseDAGService { /** * delete process instance by id, at the same time,delete task instance and their mapping relation data * * @param loginUser * @param projectName * @param workflowId * @param processInstanceId * @param tasksQueue * @return */ public Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer workflowId) { public Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId,ITaskQueue tasksQueue) { Map<String, Object> result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); Loading @@ -462,17 +464,34 @@ public class ProcessInstanceService extends BaseDAGService { if (resultEnum != Status.SUCCESS) { return checkResult; } ProcessInstance processInstance = processDao.findProcessInstanceDetailById(workflowId); ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processInstanceId); List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstanceId); //process instance priority int processInstancePriority = processInstance.getProcessInstancePriority().ordinal(); if (processInstance == null) { putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, workflowId); putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); return result; } int delete = processDao.deleteWorkProcessInstanceById(workflowId); processDao.deleteAllSubWorkProcessByParentId(workflowId); processDao.deleteWorkProcessMapByParentId(workflowId); int delete = processDao.deleteWorkProcessInstanceById(processInstanceId); processDao.deleteAllSubWorkProcessByParentId(processInstanceId); processDao.deleteWorkProcessMapByParentId(processInstanceId); if (delete > 0) { if (CollectionUtils.isNotEmpty(taskInstanceList)){ for (TaskInstance taskInstance : taskInstanceList){ // task instance priority int taskInstancePriority = taskInstance.getTaskInstancePriority().ordinal(); String nodeValue=processInstancePriority + "_" + processInstanceId + "_" +taskInstancePriority + "_" + taskInstance.getId(); try { logger.info("delete task queue node : {}",nodeValue); tasksQueue.removeNode(cn.escheduler.common.Constants.SCHEDULER_TASKS_QUEUE, nodeValue); }catch (Exception e){ logger.error("delete task queue node : {}", nodeValue); } } } putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR); Loading @@ -489,6 +508,8 @@ public class ProcessInstanceService extends BaseDAGService { * @return */ public Map<String, Object> batchDeleteProcessInstanceByIds(User loginUser, String projectName, String processInstanceIds) { // task queue ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); Map<String, Object> result = new HashMap<>(5); List<Integer> deleteFailedIdList = new ArrayList<Integer>(); Loading @@ -507,7 +528,7 @@ public class ProcessInstanceService extends BaseDAGService { for (String strProcessInstanceId:processInstanceIdArray) { int processInstanceId = Integer.parseInt(strProcessInstanceId); try { deleteProcessInstanceById(loginUser, projectName, processInstanceId); deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue); } catch (Exception e) { deleteFailedIdList.add(processInstanceId); } Loading
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +8 −0 Original line number Diff line number Diff line Loading @@ -1526,6 +1526,14 @@ public class ProcessDao extends AbstractBaseDao { } public void selfFaultTolerant(int ... states){ List<ProcessInstance> processInstanceList = processInstanceMapper.listByStatus(states); for (ProcessInstance processInstance:processInstanceList){ selfFaultTolerant(processInstance); } } @Transactional(value = "TransactionManager",rollbackFor = Exception.class) public void selfFaultTolerant(ProcessInstance processInstance){ Loading
escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java +1 −1 Original line number Diff line number Diff line Loading @@ -175,7 +175,7 @@ public class AlertManager { alert.setContent(content); alert.setAlertType(AlertType.EMAIL); alert.setCreateTime(new Date()); alert.setAlertGroupId(processInstance.getWarningGroupId()); alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1:processInstance.getWarningGroupId()); alert.setReceivers(processInstance.getProcessDefinition().getReceivers()); alert.setReceiversCc(processInstance.getProcessDefinition().getReceiversCc()); Loading
escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java +2 −2 Original line number Diff line number Diff line Loading @@ -123,9 +123,9 @@ public class ZKMasterClient extends AbstractZKClient { // register master this.registMaster(); // check if fault tolerance is required // check if fault tolerance is required,failure and tolerance if (getActiveMasterNum() == 1) { processDao.selfFaultTolerant(ExecutionStatus.RUNNING_EXEUTION.ordinal()); processDao.selfFaultTolerant(ExecutionStatus.RUNNING_EXEUTION.ordinal(),ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal()); } } Loading