Loading escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +45 −29 Original line number Diff line number Diff line Loading @@ -120,30 +120,51 @@ public class ProcessDao extends AbstractBaseDao { * find one command from command queue, construct process instance * @param logger * @param host * @param vaildThreadNum * @param validThreadNum * @return */ @Transactional(value = "TransactionManager",rollbackFor = Exception.class) public ProcessInstance scanCommand(Logger logger, String host, int vaildThreadNum){ public ProcessInstance scanCommand(Logger logger, String host, int validThreadNum){ ProcessInstance processInstance = null; Command command = findOneCommand(); if (command == null) { return null; } logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); try{ processInstance = constructProcessInstance(command, host); //cannot construct process instance, return null; if(processInstance == null){ logger.error("scan command, command parameter is error: %s", command.toString()); }else{ // check thread number enough for this command, if not, change state to waiting thread. int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId()); if(vaildThreadNum < commandThreadCount){ delCommandByid(command.getId()); return null; }else if(!checkThreadNum(command, validThreadNum)){ logger.info("there is not enough thread for this command: {}",command.toString() ); return setWaitingThreadProcess(command, processInstance); }else{ processInstance.setCommandType(command.getCommandType()); processInstance.addHistoryCmd(command.getCommandType()); saveProcessInstance(processInstance); this.setSubProcessParam(processInstance); delCommandByid(command.getId()); return processInstance; } }catch (Exception e){ logger.error("scan command error ", e); delCommandByid(command.getId()); } return null; } /** * set process waiting thread * @param command * @param processInstance * @return */ private ProcessInstance setWaitingThreadProcess(Command command, ProcessInstance processInstance) { processInstance.setState(ExecutionStatus.WAITTING_THREAD); if(command.getCommandType() != CommandType.RECOVER_WAITTING_THREAD){ processInstance.addHistoryCmd(command.getCommandType()); Loading @@ -152,16 +173,11 @@ public class ProcessDao extends AbstractBaseDao { this.setSubProcessParam(processInstance); createRecoveryWaitingThreadCommand(command, processInstance); return null; }else{ processInstance.setCommandType(command.getCommandType()); processInstance.addHistoryCmd(command.getCommandType()); saveProcessInstance(processInstance); this.setSubProcessParam(processInstance); } } // delete command delCommandByid(command.getId()); return processInstance; private boolean checkThreadNum(Command command, int validThreadNum) { int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId()); return validThreadNum >= commandThreadCount; } /** Loading Loading @@ -669,7 +685,7 @@ public class ProcessDao extends AbstractBaseDao { paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(processInstance.getId())); processInstance.setCommandParam(JSONUtils.toJson(paramMap)); processInstance.setIsSubProcess(Flag.YES); this.updateProcessInstance(processInstance); this.saveProcessInstance(processInstance); } // copy parent instance user def params to sub process.. String parentInstanceId = paramMap.get(CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID); Loading @@ -677,7 +693,7 @@ public class ProcessDao extends AbstractBaseDao { ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)); if(parentInstance != null){ processInstance.setGlobalParams(parentInstance.getGlobalParams()); this.updateProcessInstance(processInstance); this.saveProcessInstance(processInstance); }else{ logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam); } Loading Loading
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +45 −29 Original line number Diff line number Diff line Loading @@ -120,30 +120,51 @@ public class ProcessDao extends AbstractBaseDao { * find one command from command queue, construct process instance * @param logger * @param host * @param vaildThreadNum * @param validThreadNum * @return */ @Transactional(value = "TransactionManager",rollbackFor = Exception.class) public ProcessInstance scanCommand(Logger logger, String host, int vaildThreadNum){ public ProcessInstance scanCommand(Logger logger, String host, int validThreadNum){ ProcessInstance processInstance = null; Command command = findOneCommand(); if (command == null) { return null; } logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); try{ processInstance = constructProcessInstance(command, host); //cannot construct process instance, return null; if(processInstance == null){ logger.error("scan command, command parameter is error: %s", command.toString()); }else{ // check thread number enough for this command, if not, change state to waiting thread. int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId()); if(vaildThreadNum < commandThreadCount){ delCommandByid(command.getId()); return null; }else if(!checkThreadNum(command, validThreadNum)){ logger.info("there is not enough thread for this command: {}",command.toString() ); return setWaitingThreadProcess(command, processInstance); }else{ processInstance.setCommandType(command.getCommandType()); processInstance.addHistoryCmd(command.getCommandType()); saveProcessInstance(processInstance); this.setSubProcessParam(processInstance); delCommandByid(command.getId()); return processInstance; } }catch (Exception e){ logger.error("scan command error ", e); delCommandByid(command.getId()); } return null; } /** * set process waiting thread * @param command * @param processInstance * @return */ private ProcessInstance setWaitingThreadProcess(Command command, ProcessInstance processInstance) { processInstance.setState(ExecutionStatus.WAITTING_THREAD); if(command.getCommandType() != CommandType.RECOVER_WAITTING_THREAD){ processInstance.addHistoryCmd(command.getCommandType()); Loading @@ -152,16 +173,11 @@ public class ProcessDao extends AbstractBaseDao { this.setSubProcessParam(processInstance); createRecoveryWaitingThreadCommand(command, processInstance); return null; }else{ processInstance.setCommandType(command.getCommandType()); processInstance.addHistoryCmd(command.getCommandType()); saveProcessInstance(processInstance); this.setSubProcessParam(processInstance); } } // delete command delCommandByid(command.getId()); return processInstance; private boolean checkThreadNum(Command command, int validThreadNum) { int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId()); return validThreadNum >= commandThreadCount; } /** Loading Loading @@ -669,7 +685,7 @@ public class ProcessDao extends AbstractBaseDao { paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(processInstance.getId())); processInstance.setCommandParam(JSONUtils.toJson(paramMap)); processInstance.setIsSubProcess(Flag.YES); this.updateProcessInstance(processInstance); this.saveProcessInstance(processInstance); } // copy parent instance user def params to sub process.. String parentInstanceId = paramMap.get(CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID); Loading @@ -677,7 +693,7 @@ public class ProcessDao extends AbstractBaseDao { ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)); if(parentInstance != null){ processInstance.setGlobalParams(parentInstance.getGlobalParams()); this.updateProcessInstance(processInstance); this.saveProcessInstance(processInstance); }else{ logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam); } Loading