Loading escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +22 −14 Original line number Diff line number Diff line Loading @@ -25,15 +25,14 @@ import cn.escheduler.common.model.TaskNode; import cn.escheduler.common.process.Property; import cn.escheduler.common.task.AbstractParameters; import cn.escheduler.common.task.TaskTimeoutParameter; import cn.escheduler.common.utils.CommonUtils; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.HadoopUtils; import cn.escheduler.common.utils.TaskParametersUtils; import cn.escheduler.common.task.shell.ShellParameters; import cn.escheduler.common.utils.*; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.TaskRecordDao; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.server.utils.LoggerUtils; import cn.escheduler.server.utils.ParamUtils; import cn.escheduler.server.worker.log.TaskLogger; import cn.escheduler.server.worker.task.AbstractTask; import cn.escheduler.server.worker.task.TaskManager; Loading Loading @@ -144,6 +143,7 @@ public class TaskScheduleThread implements Callable<Boolean> { TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class); List<String> projectRes = createProjectResFiles(taskNode); // copy hdfs file to local Loading Loading @@ -205,19 +205,27 @@ public class TaskScheduleThread implements Callable<Boolean> { // task recor flat : if true , start up qianfan if (TaskRecordDao.getTaskRecordFlag() && TaskType.typeIsNormalTask(taskInstance.getTaskType())){ Date scheduleTime = processInstance.getScheduleTime(); if(scheduleTime == null){ scheduleTime = processInstance.getStartTime(); } // process exec time : yyyyMMdd format String scheduleDate = DateUtils.format(scheduleTime, Constants.YYYYMMDD); TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), scheduleDate); AbstractParameters params = JSONUtils.parseObject(taskProps.getTaskParams(), AbstractParameters.class); // replace placeholder Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), taskProps.getDefinedParams(), params.getLocalParametersMap(), processInstance.getCmdTypeIfComplement(), processInstance.getScheduleTime()); if (paramsMap != null && !paramsMap.isEmpty() && paramsMap.containsKey("v_proc_date")){ String vProcDate = paramsMap.get("v_proc_date").getValue(); if (!StringUtils.isEmpty(vProcDate)){ TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate); logger.info("task record status : {}",taskRecordState); if (taskRecordState == TaskRecordStatus.FAILURE){ status = ExecutionStatus.FAILURE; } } } } }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){ status = ExecutionStatus.KILL; Loading Loading
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +22 −14 Original line number Diff line number Diff line Loading @@ -25,15 +25,14 @@ import cn.escheduler.common.model.TaskNode; import cn.escheduler.common.process.Property; import cn.escheduler.common.task.AbstractParameters; import cn.escheduler.common.task.TaskTimeoutParameter; import cn.escheduler.common.utils.CommonUtils; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.HadoopUtils; import cn.escheduler.common.utils.TaskParametersUtils; import cn.escheduler.common.task.shell.ShellParameters; import cn.escheduler.common.utils.*; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.TaskRecordDao; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.server.utils.LoggerUtils; import cn.escheduler.server.utils.ParamUtils; import cn.escheduler.server.worker.log.TaskLogger; import cn.escheduler.server.worker.task.AbstractTask; import cn.escheduler.server.worker.task.TaskManager; Loading Loading @@ -144,6 +143,7 @@ public class TaskScheduleThread implements Callable<Boolean> { TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class); List<String> projectRes = createProjectResFiles(taskNode); // copy hdfs file to local Loading Loading @@ -205,19 +205,27 @@ public class TaskScheduleThread implements Callable<Boolean> { // task recor flat : if true , start up qianfan if (TaskRecordDao.getTaskRecordFlag() && TaskType.typeIsNormalTask(taskInstance.getTaskType())){ Date scheduleTime = processInstance.getScheduleTime(); if(scheduleTime == null){ scheduleTime = processInstance.getStartTime(); } // process exec time : yyyyMMdd format String scheduleDate = DateUtils.format(scheduleTime, Constants.YYYYMMDD); TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), scheduleDate); AbstractParameters params = JSONUtils.parseObject(taskProps.getTaskParams(), AbstractParameters.class); // replace placeholder Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), taskProps.getDefinedParams(), params.getLocalParametersMap(), processInstance.getCmdTypeIfComplement(), processInstance.getScheduleTime()); if (paramsMap != null && !paramsMap.isEmpty() && paramsMap.containsKey("v_proc_date")){ String vProcDate = paramsMap.get("v_proc_date").getValue(); if (!StringUtils.isEmpty(vProcDate)){ TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate); logger.info("task record status : {}",taskRecordState); if (taskRecordState == TaskRecordStatus.FAILURE){ status = ExecutionStatus.FAILURE; } } } } }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){ status = ExecutionStatus.KILL; Loading