Commit a7a4a316 authored by baoliang's avatar baoliang
Browse files

update process time out

parent bb9012ec
Loading
Loading
Loading
Loading
+33 −2
Original line number Diff line number Diff line
@@ -23,6 +23,8 @@ import cn.escheduler.dao.datasource.ConnectionFactory;
import cn.escheduler.dao.mapper.AlertMapper;
import cn.escheduler.dao.mapper.UserAlertGroupMapper;
import cn.escheduler.dao.model.Alert;
import cn.escheduler.dao.model.ProcessDefinition;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.User;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -83,8 +85,9 @@ public class AlertDao extends AbstractBaseDao {
     */
    public void sendServerStopedAlert(int alertgroupId,String host,String serverType){
        Alert alert = new Alert();
        String content = String.format("[{'type':'%s','host':'%s','event':'服务挂掉','警告级别':'严重'}]",serverType,host);
        alert.setTitle("容错告警");
        String content = String.format("[{'type':'%s','host':'%s','event':'server down','warning level':'serious'}]",
                serverType, host);
        alert.setTitle("Fault tolerance warning");
        alert.setShowType(ShowType.TABLE);
        alert.setContent(content);
        alert.setAlertType(AlertType.EMAIL);
@@ -94,6 +97,34 @@ public class AlertDao extends AbstractBaseDao {
        alertMapper.insert(alert);
    }

    /**
     * process time out alert
     * @param processInstance
     * @param processDefinition
     */
    public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition){
        int alertgroupId = processInstance.getWarningGroupId();
        String receivers = processDefinition.getReceivers();
        String receiversCc = processDefinition.getReceiversCc();
        Alert alert = new Alert();
        String content = String.format("[{'id':'%d','name':'%s','event':'timeout','warnLevel':'middle'}]",
                processInstance.getId(), processInstance.getName());
        alert.setTitle("Process Timeout Warn");
        alert.setShowType(ShowType.TABLE);
        alert.setContent(content);
        alert.setAlertType(AlertType.EMAIL);
        alert.setAlertGroupId(alertgroupId);
        if (StringUtils.isNotEmpty(receivers)) {
            alert.setReceivers(receivers);
        }
        if (StringUtils.isNotEmpty(receiversCc)) {
            alert.setReceiversCc(receiversCc);
        }
        alert.setCreateTime(new Date());
        alert.setUpdateTime(new Date());
        alertMapper.insert(alert);
    }

    /**
     * task timeout warn
     */
+29 −3
Original line number Diff line number Diff line
@@ -258,7 +258,7 @@ public class MasterExecThread implements Runnable {
            processDao.createRecoveryWaitingThreadCommand(null, processInstance);
        }
        List<TaskInstance> taskInstances = processDao.findValidTaskListByProcessId(processInstance.getId());
        alertManager.sendWarnningOfProcessInstance(processInstance, taskInstances);
        alertManager.sendAlertProcessInstance(processInstance, taskInstances);
    }


@@ -775,8 +775,15 @@ public class MasterExecThread implements Runnable {
    private void runProcess(){
        // submit start node
        submitPostNode(null);
        // submitStandByTask();
        boolean sendTimeWarning = false;
        while(!processInstance.IsProcessInstanceStop()){

            // send warning email if process time out.
            if( !sendTimeWarning && checkProcessTimeOut(processInstance) ){
                alertManager.sendProcessTimeoutAlert(processInstance,
                        processDao.findProcessDefineById(processInstance.getProcessDefinitionId()));
                sendTimeWarning = true;
            }
            Set<MasterBaseTaskExecThread> keys = activeTaskNode.keySet();
            for (MasterBaseTaskExecThread taskExecThread : keys) {
                Future<Boolean> future = activeTaskNode.get(taskExecThread);
@@ -821,7 +828,7 @@ public class MasterExecThread implements Runnable {
            }
            // send alert
            if(this.recoverToleranceFaultTaskList.size() > 0){
                alertManager.sendWarnningWorkerleranceFault(processInstance, recoverToleranceFaultTaskList);
                alertManager.sendAlertWorkerToleranceFault(processInstance, recoverToleranceFaultTaskList);
                this.recoverToleranceFaultTaskList.clear();
            }
            // updateProcessInstance completed task status
@@ -851,6 +858,25 @@ public class MasterExecThread implements Runnable {
        logger.info("process:{} end, state :{}", processInstance.getId(), processInstance.getState());
    }

    /**
     * check process time out
     * @param processInstance
     * @return
     */
    private boolean checkProcessTimeOut(ProcessInstance processInstance) {
        if(processInstance.getTimeout() == 0 ){
            return false;
        }

        Date now = new Date();
        long runningTime =  DateUtils.differMs(now, processInstance.getStartTime());

        if(runningTime > processInstance.getTimeout()){
            return true;
        }
        return false;
    }

    private boolean canSubmitTaskToQueue() {
        return OSUtils.checkResource(conf, true);
    }
+31 −27
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@ import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.dao.AlertDao;
import cn.escheduler.dao.DaoFactory;
import cn.escheduler.dao.model.Alert;
import cn.escheduler.dao.model.ProcessDefinition;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance;
import org.slf4j.Logger;
@@ -54,27 +55,27 @@ public class AlertManager {
    private String getCommandCnName(CommandType commandType) {
        switch (commandType) {
            case RECOVER_TOLERANCE_FAULT_PROCESS:
                return "恢复容错";
                return "recover tolerance fault process";
            case RECOVER_SUSPENDED_PROCESS:
                return "恢复暂停流程";
                return "recover suspended process";
            case START_CURRENT_TASK_PROCESS:
                return "从当前节点开始执行";
                return "start current task process";
            case START_FAILURE_TASK_PROCESS:
                return "从失败节点开始执行";
                return "start failure task process";
            case START_PROCESS:
                return "启动工作流";
                return "start process";
            case REPEAT_RUNNING:
                return "重跑";
                return "repeat running";
            case SCHEDULER:
                return "定时执行";
                return "scheduler";
            case COMPLEMENT_DATA:
                return "补数";
                return "complement data";
            case PAUSE:
                return "暂停工作流";
                return "pause";
            case STOP:
                return "停止工作流";
                return "stop";
            default:
                return "未知的命令类型";
                return "unknown type";
        }
    }

@@ -124,14 +125,14 @@ public class AlertManager {
                    continue;
                }
                LinkedHashMap<String, String> failedTaskMap = new LinkedHashMap();
                failedTaskMap.put("任务id", String.valueOf(task.getId()));
                failedTaskMap.put("任务名称", task.getName());
                failedTaskMap.put("任务类型", task.getTaskType());
                failedTaskMap.put("任务状态", task.getState().toString());
                failedTaskMap.put("任务开始时间", DateUtils.dateToString(task.getStartTime()));
                failedTaskMap.put("任务结束时间", DateUtils.dateToString(task.getEndTime()));
                failedTaskMap.put("task id", String.valueOf(task.getId()));
                failedTaskMap.put("task name", task.getName());
                failedTaskMap.put("task type", task.getTaskType());
                failedTaskMap.put("task state", task.getState().toString());
                failedTaskMap.put("task start time", DateUtils.dateToString(task.getStartTime()));
                failedTaskMap.put("task end time", DateUtils.dateToString(task.getEndTime()));
                failedTaskMap.put("host", task.getHost());
                failedTaskMap.put("日志路径", task.getLogPath());
                failedTaskMap.put("log path", task.getLogPath());
                failedTaskList.add(failedTaskMap);
            }
            res = JSONUtils.toJson(failedTaskList);
@@ -152,10 +153,10 @@ public class AlertManager {

        for(TaskInstance taskInstance: toleranceTaskList){
            LinkedHashMap<String, String> toleranceWorkerContentMap = new LinkedHashMap();
            toleranceWorkerContentMap.put("工作流程名称", processInstance.getName());
            toleranceWorkerContentMap.put("容错任务名称", taskInstance.getName());
            toleranceWorkerContentMap.put("容错机器IP", taskInstance.getHost());
            toleranceWorkerContentMap.put("任务失败次数", String.valueOf(taskInstance.getRetryTimes()));
            toleranceWorkerContentMap.put("process name", processInstance.getName());
            toleranceWorkerContentMap.put("task name", taskInstance.getName());
            toleranceWorkerContentMap.put("host", taskInstance.getHost());
            toleranceWorkerContentMap.put("task retry times", String.valueOf(taskInstance.getRetryTimes()));
            toleranceTaskInstanceList.add(toleranceWorkerContentMap);
        }
        return JSONUtils.toJson(toleranceTaskInstanceList);
@@ -166,9 +167,9 @@ public class AlertManager {
     * @param processInstance
     * @param toleranceTaskList
     */
    public void sendWarnningWorkerleranceFault(ProcessInstance processInstance, List<TaskInstance> toleranceTaskList){
    public void sendAlertWorkerToleranceFault(ProcessInstance processInstance, List<TaskInstance> toleranceTaskList){
        Alert alert = new Alert();
        alert.setTitle("worker容错报警");
        alert.setTitle("worker fault tolerance");
        alert.setShowType(ShowType.TABLE);
        String content = getWorkerToleranceContent(processInstance, toleranceTaskList);
        alert.setContent(content);
@@ -187,7 +188,7 @@ public class AlertManager {
     * send process instance alert
     * @param processInstance
     */
    public void sendWarnningOfProcessInstance(ProcessInstance processInstance,
    public void sendAlertProcessInstance(ProcessInstance processInstance,
                                         List<TaskInstance> taskInstances){

        boolean sendWarnning = false;
@@ -217,7 +218,7 @@ public class AlertManager {


        String cmdName = getCommandCnName(processInstance.getCommandType());
        String success = processInstance.getState().typeIsSuccess() ? "成功" :"失败";
        String success = processInstance.getState().typeIsSuccess() ? "success" :"failed";
        alert.setTitle(cmdName + success);
        ShowType showType = processInstance.getState().typeIsSuccess() ? ShowType.TEXT : ShowType.TABLE;
        alert.setShowType(showType);
@@ -233,4 +234,7 @@ public class AlertManager {
        logger.info("add alert to db , alert: {}", alert.toString());
    }

    public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition) {
        alertDao.sendProcessTimeoutAlert(processInstance, processDefinition);
    }
}
+2 −2
Original line number Diff line number Diff line
@@ -76,7 +76,7 @@ public class AlertManagerTest {
        toleranceTaskList.add(toleranceTask1);
        toleranceTaskList.add(toleranceTask2);

        alertManager.sendWarnningWorkerleranceFault(processInstance, toleranceTaskList);
        alertManager.sendAlertWorkerToleranceFault(processInstance, toleranceTaskList);
    }


@@ -103,7 +103,7 @@ public class AlertManagerTest {
        toleranceTaskList.add(toleranceTask1);
        toleranceTaskList.add(toleranceTask2);

        alertManager.sendWarnningOfProcessInstance(processInstance, toleranceTaskList);
        alertManager.sendAlertProcessInstance(processInstance, toleranceTaskList);
    }

}