Loading escheduler-common/src/main/java/cn/escheduler/common/Constants.java +5 −0 Original line number Diff line number Diff line Loading @@ -252,6 +252,11 @@ public final class Constants { */ public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss"; /** * date format of yyyyMMdd */ public static final String YYYYMMDD = "yyyyMMdd"; /** * date format of yyyyMMddHHmmss */ Loading escheduler-common/src/main/java/cn/escheduler/common/enums/TaskRecordStatus.java 0 → 100644 +35 −0 Original line number Diff line number Diff line /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package cn.escheduler.common.enums; /** * task record status * */ public enum TaskRecordStatus { /** * status: * 0 sucess * 1 failure * 2 exception */ SUCCESS,FAILURE,EXCEPTION } escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java +7 −1 Original line number Diff line number Diff line Loading @@ -30,5 +30,11 @@ public enum TaskType { * 6 PYTHON * 7 DEPENDENT */ SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT; public static boolean typeIsNormalTask(String typeName) { TaskType taskType = TaskType.valueOf(typeName); return !(taskType == TaskType.SUB_PROCESS || taskType == TaskType.DEPENDENT); } } escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java +46 −1 Original line number Diff line number Diff line Loading @@ -17,6 +17,8 @@ package cn.escheduler.dao; import cn.escheduler.common.Constants; import cn.escheduler.common.enums.TaskRecordStatus; import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.dao.model.TaskRecord; import org.apache.commons.configuration.Configuration; Loading @@ -28,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.sql.*; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; Loading @@ -43,7 +46,7 @@ public class TaskRecordDao { /** * 加载配置文件 * load conf file */ private static Configuration conf; Loading @@ -56,6 +59,14 @@ public class TaskRecordDao { } } /** * get task record flag * @return */ public static boolean getTaskRecordFlag(){ return conf.getBoolean(Constants.TASK_RECORD_FLAG); } /** * create connection * @return Loading Loading @@ -253,4 +264,38 @@ public class TaskRecordDao { } return recordList; } /** * according to procname and procdate query task record * @param procName * @param procDate * @return */ public static TaskRecordStatus getTaskRecordState(String procName,String procDate){ String sql = String.format("SELECT * FROM eamp_hive_log_hd WHERE PROC_NAME='%s' and PROC_DATE='%s'" ,procName,procDate); List<TaskRecord> taskRecordList = getQueryResult(sql); // contains no record and sql exception if (CollectionUtils.isEmpty(taskRecordList)){ // exception return TaskRecordStatus.EXCEPTION; }else if (taskRecordList.size() > 1){ return TaskRecordStatus.EXCEPTION; }else { TaskRecord taskRecord = taskRecordList.get(0); if (taskRecord == null){ return TaskRecordStatus.EXCEPTION; } Long targetRowCount = taskRecord.getTargetRowCount(); if (targetRowCount <= 0){ return TaskRecordStatus.FAILURE; }else { return TaskRecordStatus.SUCCESS; } } } } escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +20 −0 Original line number Diff line number Diff line Loading @@ -19,15 +19,18 @@ package cn.escheduler.server.worker.runner; import cn.escheduler.common.Constants; import cn.escheduler.common.enums.ExecutionStatus; import cn.escheduler.common.enums.TaskRecordStatus; import cn.escheduler.common.enums.TaskType; import cn.escheduler.common.model.TaskNode; import cn.escheduler.common.process.Property; import cn.escheduler.common.task.AbstractParameters; import cn.escheduler.common.task.TaskTimeoutParameter; import cn.escheduler.common.utils.CommonUtils; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.HadoopUtils; import cn.escheduler.common.utils.TaskParametersUtils; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.TaskRecordDao; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.server.utils.LoggerUtils; Loading Loading @@ -199,6 +202,23 @@ public class TaskScheduleThread implements Callable<Boolean> { if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ status = ExecutionStatus.SUCCESS; // task recor flat : if true , start up qianfan if (TaskRecordDao.getTaskRecordFlag() && TaskType.typeIsNormalTask(taskInstance.getTaskType())){ Date scheduleTime = processInstance.getScheduleTime(); if(scheduleTime == null){ scheduleTime = processInstance.getStartTime(); } // process exec time : yyyyMMdd format String scheduleDate = DateUtils.format(scheduleTime, Constants.YYYYMMDD); TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), scheduleDate); logger.info("task record status : {}",taskRecordState); if (taskRecordState == TaskRecordStatus.FAILURE){ status = ExecutionStatus.FAILURE; } } }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){ status = ExecutionStatus.KILL; }else { Loading Loading
escheduler-common/src/main/java/cn/escheduler/common/Constants.java +5 −0 Original line number Diff line number Diff line Loading @@ -252,6 +252,11 @@ public final class Constants { */ public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss"; /** * date format of yyyyMMdd */ public static final String YYYYMMDD = "yyyyMMdd"; /** * date format of yyyyMMddHHmmss */ Loading
escheduler-common/src/main/java/cn/escheduler/common/enums/TaskRecordStatus.java 0 → 100644 +35 −0 Original line number Diff line number Diff line /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package cn.escheduler.common.enums; /** * task record status * */ public enum TaskRecordStatus { /** * status: * 0 sucess * 1 failure * 2 exception */ SUCCESS,FAILURE,EXCEPTION }
escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java +7 −1 Original line number Diff line number Diff line Loading @@ -30,5 +30,11 @@ public enum TaskType { * 6 PYTHON * 7 DEPENDENT */ SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT; public static boolean typeIsNormalTask(String typeName) { TaskType taskType = TaskType.valueOf(typeName); return !(taskType == TaskType.SUB_PROCESS || taskType == TaskType.DEPENDENT); } }
escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java +46 −1 Original line number Diff line number Diff line Loading @@ -17,6 +17,8 @@ package cn.escheduler.dao; import cn.escheduler.common.Constants; import cn.escheduler.common.enums.TaskRecordStatus; import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.dao.model.TaskRecord; import org.apache.commons.configuration.Configuration; Loading @@ -28,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.sql.*; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; Loading @@ -43,7 +46,7 @@ public class TaskRecordDao { /** * 加载配置文件 * load conf file */ private static Configuration conf; Loading @@ -56,6 +59,14 @@ public class TaskRecordDao { } } /** * get task record flag * @return */ public static boolean getTaskRecordFlag(){ return conf.getBoolean(Constants.TASK_RECORD_FLAG); } /** * create connection * @return Loading Loading @@ -253,4 +264,38 @@ public class TaskRecordDao { } return recordList; } /** * according to procname and procdate query task record * @param procName * @param procDate * @return */ public static TaskRecordStatus getTaskRecordState(String procName,String procDate){ String sql = String.format("SELECT * FROM eamp_hive_log_hd WHERE PROC_NAME='%s' and PROC_DATE='%s'" ,procName,procDate); List<TaskRecord> taskRecordList = getQueryResult(sql); // contains no record and sql exception if (CollectionUtils.isEmpty(taskRecordList)){ // exception return TaskRecordStatus.EXCEPTION; }else if (taskRecordList.size() > 1){ return TaskRecordStatus.EXCEPTION; }else { TaskRecord taskRecord = taskRecordList.get(0); if (taskRecord == null){ return TaskRecordStatus.EXCEPTION; } Long targetRowCount = taskRecord.getTargetRowCount(); if (targetRowCount <= 0){ return TaskRecordStatus.FAILURE; }else { return TaskRecordStatus.SUCCESS; } } } }
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +20 −0 Original line number Diff line number Diff line Loading @@ -19,15 +19,18 @@ package cn.escheduler.server.worker.runner; import cn.escheduler.common.Constants; import cn.escheduler.common.enums.ExecutionStatus; import cn.escheduler.common.enums.TaskRecordStatus; import cn.escheduler.common.enums.TaskType; import cn.escheduler.common.model.TaskNode; import cn.escheduler.common.process.Property; import cn.escheduler.common.task.AbstractParameters; import cn.escheduler.common.task.TaskTimeoutParameter; import cn.escheduler.common.utils.CommonUtils; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.HadoopUtils; import cn.escheduler.common.utils.TaskParametersUtils; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.TaskRecordDao; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.server.utils.LoggerUtils; Loading Loading @@ -199,6 +202,23 @@ public class TaskScheduleThread implements Callable<Boolean> { if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ status = ExecutionStatus.SUCCESS; // task recor flat : if true , start up qianfan if (TaskRecordDao.getTaskRecordFlag() && TaskType.typeIsNormalTask(taskInstance.getTaskType())){ Date scheduleTime = processInstance.getScheduleTime(); if(scheduleTime == null){ scheduleTime = processInstance.getStartTime(); } // process exec time : yyyyMMdd format String scheduleDate = DateUtils.format(scheduleTime, Constants.YYYYMMDD); TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), scheduleDate); logger.info("task record status : {}",taskRecordState); if (taskRecordState == TaskRecordStatus.FAILURE){ status = ExecutionStatus.FAILURE; } } }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){ status = ExecutionStatus.KILL; }else { Loading