Loading escheduler-api/src/main/java/cn/escheduler/api/controller/TaskRecordController.java +33 −1 Original line number Diff line number Diff line Loading @@ -68,7 +68,39 @@ public class TaskRecordController extends BaseController{ try{ logger.info("query task record list, task name:{}, state :{}, taskDate: {}, start:{}, end:{}", taskName, state, taskDate, startTime, endTime); Map<String, Object> result = taskRecordService.queryTaskRecordListPaging(taskName, startTime, taskDate, sourceTable, destTable, endTime,state, pageNo, pageSize); Map<String, Object> result = taskRecordService.queryTaskRecordListPaging(false, taskName, startTime, taskDate, sourceTable, destTable, endTime,state, pageNo, pageSize); return returnDataListPaging(result); }catch (Exception e){ logger.error(QUERY_TASK_RECORD_LIST_PAGING_ERROR.getMsg(),e); return error(QUERY_TASK_RECORD_LIST_PAGING_ERROR.getCode(), QUERY_TASK_RECORD_LIST_PAGING_ERROR.getMsg()); } } /** * query history task record list paging * * @param loginUser * @return */ @GetMapping("/history-list-paging") @ResponseStatus(HttpStatus.OK) public Result queryHistoryTaskRecordListPaging(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam(value = "taskName", required = false) String taskName, @RequestParam(value = "state", required = false) String state, @RequestParam(value = "sourceTable", required = false) String sourceTable, @RequestParam(value = "destTable", required = false) String destTable, @RequestParam(value = "taskDate", required = false) String taskDate, @RequestParam(value = "startDate", required = false) String startTime, @RequestParam(value = "endDate", required = false) String endTime, @RequestParam("pageNo") Integer pageNo, @RequestParam("pageSize") Integer pageSize ){ try{ logger.info("query hisotry task record list, task name:{}, state :{}, taskDate: {}, start:{}, end:{}", taskName, state, taskDate, startTime, endTime); Map<String, Object> result = taskRecordService.queryTaskRecordListPaging(true, taskName, startTime, taskDate, sourceTable, destTable, endTime,state, pageNo, pageSize); return returnDataListPaging(result); }catch (Exception e){ logger.error(QUERY_TASK_RECORD_LIST_PAGING_ERROR.getMsg(),e); Loading escheduler-api/src/main/java/cn/escheduler/api/service/TaskRecordService.java +6 −3 Original line number Diff line number Diff line Loading @@ -29,6 +29,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import static cn.escheduler.common.Constants.*; /** * task record service */ Loading @@ -51,7 +53,7 @@ public class TaskRecordService extends BaseService{ * @param pageSize * @return */ public Map<String,Object> queryTaskRecordListPaging(String taskName, String startDate, public Map<String,Object> queryTaskRecordListPaging(boolean isHistory, String taskName, String startDate, String taskDate, String sourceTable, String destTable, String endDate, String state, Integer pageNo, Integer pageSize) { Loading @@ -69,8 +71,9 @@ public class TaskRecordService extends BaseService{ map.put("offset", pageInfo.getStart().toString()); map.put("pageSize", pageInfo.getPageSize().toString()); int count = TaskRecordDao.countTaskRecord(map); List<TaskRecord> recordList = TaskRecordDao.queryAllTaskRecord(map); String table = isHistory ? TASK_RECORD_TABLE_HISTORY_HIVE_LOG : TASK_RECORD_TABLE_HIVE_LOG; int count = TaskRecordDao.countTaskRecord(map, table); List<TaskRecord> recordList = TaskRecordDao.queryAllTaskRecord(map, table); pageInfo.setTotalCount(count); pageInfo.setLists(recordList); result.put(Constants.DATA_LIST, pageInfo); Loading escheduler-common/src/main/java/cn/escheduler/common/Constants.java +9 −0 Original line number Diff line number Diff line Loading @@ -463,6 +463,10 @@ public final class Constants { public static final String TASK_RECORD_PWD = "task.record.datasource.password"; public static String TASK_RECORD_TABLE_HIVE_LOG = "eamp_hive_log_hd"; public static String TASK_RECORD_TABLE_HISTORY_HIVE_LOG = "eamp_hive_hist_log_hd"; public static final String STATUS = "status"; Loading Loading @@ -826,4 +830,9 @@ public final class Constants { public static final String CONTENT = "content"; public static final String DEPENDENT_SPLIT = ":||"; public static final String DEPENDENT_ALL = "ALL"; /** * */ } escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +45 −29 Original line number Diff line number Diff line Loading @@ -120,30 +120,51 @@ public class ProcessDao extends AbstractBaseDao { * find one command from command queue, construct process instance * @param logger * @param host * @param vaildThreadNum * @param validThreadNum * @return */ @Transactional(value = "TransactionManager",rollbackFor = Exception.class) public ProcessInstance scanCommand(Logger logger, String host, int vaildThreadNum){ public ProcessInstance scanCommand(Logger logger, String host, int validThreadNum){ ProcessInstance processInstance = null; Command command = findOneCommand(); if (command == null) { return null; } logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); try{ processInstance = constructProcessInstance(command, host); //cannot construct process instance, return null; if(processInstance == null){ logger.error("scan command, command parameter is error: %s", command.toString()); }else{ // check thread number enough for this command, if not, change state to waiting thread. int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId()); if(vaildThreadNum < commandThreadCount){ delCommandByid(command.getId()); return null; }else if(!checkThreadNum(command, validThreadNum)){ logger.info("there is not enough thread for this command: {}",command.toString() ); return setWaitingThreadProcess(command, processInstance); }else{ processInstance.setCommandType(command.getCommandType()); processInstance.addHistoryCmd(command.getCommandType()); saveProcessInstance(processInstance); this.setSubProcessParam(processInstance); delCommandByid(command.getId()); return processInstance; } }catch (Exception e){ logger.error("scan command error ", e); delCommandByid(command.getId()); } return null; } /** * set process waiting thread * @param command * @param processInstance * @return */ private ProcessInstance setWaitingThreadProcess(Command command, ProcessInstance processInstance) { processInstance.setState(ExecutionStatus.WAITTING_THREAD); if(command.getCommandType() != CommandType.RECOVER_WAITTING_THREAD){ processInstance.addHistoryCmd(command.getCommandType()); Loading @@ -152,16 +173,11 @@ public class ProcessDao extends AbstractBaseDao { this.setSubProcessParam(processInstance); createRecoveryWaitingThreadCommand(command, processInstance); return null; }else{ processInstance.setCommandType(command.getCommandType()); processInstance.addHistoryCmd(command.getCommandType()); saveProcessInstance(processInstance); this.setSubProcessParam(processInstance); } } // delete command delCommandByid(command.getId()); return processInstance; private boolean checkThreadNum(Command command, int validThreadNum) { int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId()); return validThreadNum >= commandThreadCount; } /** Loading Loading @@ -669,7 +685,7 @@ public class ProcessDao extends AbstractBaseDao { paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(processInstance.getId())); processInstance.setCommandParam(JSONUtils.toJson(paramMap)); processInstance.setIsSubProcess(Flag.YES); this.updateProcessInstance(processInstance); this.saveProcessInstance(processInstance); } // copy parent instance user def params to sub process.. String parentInstanceId = paramMap.get(CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID); Loading @@ -677,7 +693,7 @@ public class ProcessDao extends AbstractBaseDao { ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)); if(parentInstance != null){ processInstance.setGlobalParams(parentInstance.getGlobalParams()); this.updateProcessInstance(processInstance); this.saveProcessInstance(processInstance); }else{ logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam); } Loading escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java +6 −4 Original line number Diff line number Diff line Loading @@ -40,6 +40,8 @@ public class TaskRecordDao { private static Logger logger = LoggerFactory.getLogger(TaskRecordDao.class.getName()); /** * 加载配置文件 */ Loading Loading @@ -134,7 +136,7 @@ public class TaskRecordDao { * @param filterMap * @return */ public static int countTaskRecord(Map<String, String> filterMap){ public static int countTaskRecord(Map<String, String> filterMap, String table){ int count = 0; Connection conn = null; Loading @@ -143,7 +145,7 @@ public class TaskRecordDao { if(conn == null){ return count; } String sql = "select count(1) as count from eamp_hive_log_hd"; String sql = String.format("select count(1) as count from %s", table); sql += getWhereString(filterMap); PreparedStatement pstmt; pstmt = conn.prepareStatement(sql); Loading Loading @@ -171,9 +173,9 @@ public class TaskRecordDao { * @param filterMap * @return */ public static List<TaskRecord> queryAllTaskRecord(Map<String,String> filterMap ) { public static List<TaskRecord> queryAllTaskRecord(Map<String,String> filterMap , String table) { String sql = "select * from eamp_hive_log_hd "; String sql = String.format("select * from %s", table); sql += getWhereString(filterMap); int offset = Integer.parseInt(filterMap.get("offset")); Loading Loading
escheduler-api/src/main/java/cn/escheduler/api/controller/TaskRecordController.java +33 −1 Original line number Diff line number Diff line Loading @@ -68,7 +68,39 @@ public class TaskRecordController extends BaseController{ try{ logger.info("query task record list, task name:{}, state :{}, taskDate: {}, start:{}, end:{}", taskName, state, taskDate, startTime, endTime); Map<String, Object> result = taskRecordService.queryTaskRecordListPaging(taskName, startTime, taskDate, sourceTable, destTable, endTime,state, pageNo, pageSize); Map<String, Object> result = taskRecordService.queryTaskRecordListPaging(false, taskName, startTime, taskDate, sourceTable, destTable, endTime,state, pageNo, pageSize); return returnDataListPaging(result); }catch (Exception e){ logger.error(QUERY_TASK_RECORD_LIST_PAGING_ERROR.getMsg(),e); return error(QUERY_TASK_RECORD_LIST_PAGING_ERROR.getCode(), QUERY_TASK_RECORD_LIST_PAGING_ERROR.getMsg()); } } /** * query history task record list paging * * @param loginUser * @return */ @GetMapping("/history-list-paging") @ResponseStatus(HttpStatus.OK) public Result queryHistoryTaskRecordListPaging(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam(value = "taskName", required = false) String taskName, @RequestParam(value = "state", required = false) String state, @RequestParam(value = "sourceTable", required = false) String sourceTable, @RequestParam(value = "destTable", required = false) String destTable, @RequestParam(value = "taskDate", required = false) String taskDate, @RequestParam(value = "startDate", required = false) String startTime, @RequestParam(value = "endDate", required = false) String endTime, @RequestParam("pageNo") Integer pageNo, @RequestParam("pageSize") Integer pageSize ){ try{ logger.info("query hisotry task record list, task name:{}, state :{}, taskDate: {}, start:{}, end:{}", taskName, state, taskDate, startTime, endTime); Map<String, Object> result = taskRecordService.queryTaskRecordListPaging(true, taskName, startTime, taskDate, sourceTable, destTable, endTime,state, pageNo, pageSize); return returnDataListPaging(result); }catch (Exception e){ logger.error(QUERY_TASK_RECORD_LIST_PAGING_ERROR.getMsg(),e); Loading
escheduler-api/src/main/java/cn/escheduler/api/service/TaskRecordService.java +6 −3 Original line number Diff line number Diff line Loading @@ -29,6 +29,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import static cn.escheduler.common.Constants.*; /** * task record service */ Loading @@ -51,7 +53,7 @@ public class TaskRecordService extends BaseService{ * @param pageSize * @return */ public Map<String,Object> queryTaskRecordListPaging(String taskName, String startDate, public Map<String,Object> queryTaskRecordListPaging(boolean isHistory, String taskName, String startDate, String taskDate, String sourceTable, String destTable, String endDate, String state, Integer pageNo, Integer pageSize) { Loading @@ -69,8 +71,9 @@ public class TaskRecordService extends BaseService{ map.put("offset", pageInfo.getStart().toString()); map.put("pageSize", pageInfo.getPageSize().toString()); int count = TaskRecordDao.countTaskRecord(map); List<TaskRecord> recordList = TaskRecordDao.queryAllTaskRecord(map); String table = isHistory ? TASK_RECORD_TABLE_HISTORY_HIVE_LOG : TASK_RECORD_TABLE_HIVE_LOG; int count = TaskRecordDao.countTaskRecord(map, table); List<TaskRecord> recordList = TaskRecordDao.queryAllTaskRecord(map, table); pageInfo.setTotalCount(count); pageInfo.setLists(recordList); result.put(Constants.DATA_LIST, pageInfo); Loading
escheduler-common/src/main/java/cn/escheduler/common/Constants.java +9 −0 Original line number Diff line number Diff line Loading @@ -463,6 +463,10 @@ public final class Constants { public static final String TASK_RECORD_PWD = "task.record.datasource.password"; public static String TASK_RECORD_TABLE_HIVE_LOG = "eamp_hive_log_hd"; public static String TASK_RECORD_TABLE_HISTORY_HIVE_LOG = "eamp_hive_hist_log_hd"; public static final String STATUS = "status"; Loading Loading @@ -826,4 +830,9 @@ public final class Constants { public static final String CONTENT = "content"; public static final String DEPENDENT_SPLIT = ":||"; public static final String DEPENDENT_ALL = "ALL"; /** * */ }
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +45 −29 Original line number Diff line number Diff line Loading @@ -120,30 +120,51 @@ public class ProcessDao extends AbstractBaseDao { * find one command from command queue, construct process instance * @param logger * @param host * @param vaildThreadNum * @param validThreadNum * @return */ @Transactional(value = "TransactionManager",rollbackFor = Exception.class) public ProcessInstance scanCommand(Logger logger, String host, int vaildThreadNum){ public ProcessInstance scanCommand(Logger logger, String host, int validThreadNum){ ProcessInstance processInstance = null; Command command = findOneCommand(); if (command == null) { return null; } logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); try{ processInstance = constructProcessInstance(command, host); //cannot construct process instance, return null; if(processInstance == null){ logger.error("scan command, command parameter is error: %s", command.toString()); }else{ // check thread number enough for this command, if not, change state to waiting thread. int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId()); if(vaildThreadNum < commandThreadCount){ delCommandByid(command.getId()); return null; }else if(!checkThreadNum(command, validThreadNum)){ logger.info("there is not enough thread for this command: {}",command.toString() ); return setWaitingThreadProcess(command, processInstance); }else{ processInstance.setCommandType(command.getCommandType()); processInstance.addHistoryCmd(command.getCommandType()); saveProcessInstance(processInstance); this.setSubProcessParam(processInstance); delCommandByid(command.getId()); return processInstance; } }catch (Exception e){ logger.error("scan command error ", e); delCommandByid(command.getId()); } return null; } /** * set process waiting thread * @param command * @param processInstance * @return */ private ProcessInstance setWaitingThreadProcess(Command command, ProcessInstance processInstance) { processInstance.setState(ExecutionStatus.WAITTING_THREAD); if(command.getCommandType() != CommandType.RECOVER_WAITTING_THREAD){ processInstance.addHistoryCmd(command.getCommandType()); Loading @@ -152,16 +173,11 @@ public class ProcessDao extends AbstractBaseDao { this.setSubProcessParam(processInstance); createRecoveryWaitingThreadCommand(command, processInstance); return null; }else{ processInstance.setCommandType(command.getCommandType()); processInstance.addHistoryCmd(command.getCommandType()); saveProcessInstance(processInstance); this.setSubProcessParam(processInstance); } } // delete command delCommandByid(command.getId()); return processInstance; private boolean checkThreadNum(Command command, int validThreadNum) { int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionId()); return validThreadNum >= commandThreadCount; } /** Loading Loading @@ -669,7 +685,7 @@ public class ProcessDao extends AbstractBaseDao { paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(processInstance.getId())); processInstance.setCommandParam(JSONUtils.toJson(paramMap)); processInstance.setIsSubProcess(Flag.YES); this.updateProcessInstance(processInstance); this.saveProcessInstance(processInstance); } // copy parent instance user def params to sub process.. String parentInstanceId = paramMap.get(CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID); Loading @@ -677,7 +693,7 @@ public class ProcessDao extends AbstractBaseDao { ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)); if(parentInstance != null){ processInstance.setGlobalParams(parentInstance.getGlobalParams()); this.updateProcessInstance(processInstance); this.saveProcessInstance(processInstance); }else{ logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam); } Loading
escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java +6 −4 Original line number Diff line number Diff line Loading @@ -40,6 +40,8 @@ public class TaskRecordDao { private static Logger logger = LoggerFactory.getLogger(TaskRecordDao.class.getName()); /** * 加载配置文件 */ Loading Loading @@ -134,7 +136,7 @@ public class TaskRecordDao { * @param filterMap * @return */ public static int countTaskRecord(Map<String, String> filterMap){ public static int countTaskRecord(Map<String, String> filterMap, String table){ int count = 0; Connection conn = null; Loading @@ -143,7 +145,7 @@ public class TaskRecordDao { if(conn == null){ return count; } String sql = "select count(1) as count from eamp_hive_log_hd"; String sql = String.format("select count(1) as count from %s", table); sql += getWhereString(filterMap); PreparedStatement pstmt; pstmt = conn.prepareStatement(sql); Loading Loading @@ -171,9 +173,9 @@ public class TaskRecordDao { * @param filterMap * @return */ public static List<TaskRecord> queryAllTaskRecord(Map<String,String> filterMap ) { public static List<TaskRecord> queryAllTaskRecord(Map<String,String> filterMap , String table) { String sql = "select * from eamp_hive_log_hd "; String sql = String.format("select * from %s", table); sql += getWhereString(filterMap); int offset = Integer.parseInt(filterMap.get("offset")); Loading