Unverified Commit 2589e6f4 authored by bao liang's avatar bao liang Committed by GitHub
Browse files

Merge pull request #78 from lenboo/dev-20190415

add history command
parents dd1be123 94738929
Loading
Loading
Loading
Loading
+20 −1
Original line number Diff line number Diff line
@@ -11,3 +11,22 @@ CREATE TABLE `t_escheduler_access_token` (
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;

CREATE TABLE `escheduler`.`无标题`  (
  `id` int(11) NOT NULL COMMENT '主键',
  `command_type` tinyint(4) NULL DEFAULT NULL COMMENT '命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程 4 从失败节点开始执行',
  `executor_id` int(11) NULL DEFAULT NULL COMMENT '命令执行者',
  `process_definition_id` int(11) NULL DEFAULT NULL COMMENT '流程定义id',
  `command_param` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '命令的参数(json格式)',
  `task_depend_type` tinyint(4) NULL DEFAULT NULL COMMENT '节点依赖类型',
  `failure_strategy` tinyint(4) NULL DEFAULT 0 COMMENT '失败策略:0结束,1继续',
  `warning_type` tinyint(4) NULL DEFAULT 0 COMMENT '告警类型',
  `warning_group_id` int(11) NULL DEFAULT NULL COMMENT '告警组',
  `schedule_time` datetime(0) NULL DEFAULT NULL COMMENT '预期运行时间',
  `start_time` datetime(0) NULL DEFAULT NULL COMMENT '开始时间',
  `update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
  `dependence` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '依赖字段',
  `process_instance_priority` int(11) NULL DEFAULT NULL COMMENT '流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest',
  `message` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '执行信息',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
+11 −0
Original line number Diff line number Diff line
@@ -88,6 +88,9 @@ public class ProcessDao extends AbstractBaseDao {
    @Autowired
    private ResourceMapper resourceMapper;

    @Autowired
    private ErrorCommandMapper errorCommandMapper;

    /**
     * task queue impl
     */
@@ -139,6 +142,7 @@ public class ProcessDao extends AbstractBaseDao {
            if(processInstance == null){
                logger.error("scan command, command parameter is error: %s", command.toString());
                delCommandByid(command.getId());
                saveErrorCommand(command, "process instance is null");
                return null;
            }else if(!checkThreadNum(command, validThreadNum)){
                    logger.info("there is not enough thread for this command: {}",command.toString() );
@@ -153,11 +157,18 @@ public class ProcessDao extends AbstractBaseDao {
            }
        }catch (Exception e){
            logger.error("scan command error ", e);
            saveErrorCommand(command, e.toString());
            delCommandByid(command.getId());
        }
        return null;
    }

    private void saveErrorCommand(Command command, String message) {

        ErrorCommand errorCommand = new ErrorCommand(command, message);
        this.errorCommandMapper.insert(errorCommand);
    }

    /**
     * set process waiting thread
     * @param command
+45 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.escheduler.dao.mapper;

import cn.escheduler.common.enums.*;
import cn.escheduler.dao.model.Command;
import cn.escheduler.dao.model.ErrorCommand;
import org.apache.ibatis.annotations.*;
import org.apache.ibatis.type.EnumOrdinalTypeHandler;
import org.apache.ibatis.type.JdbcType;

import java.sql.Timestamp;
import java.util.List;

/**
 * command mapper
 */
public interface ErrorCommandMapper {

    /**
     * inert error command
     * @param errorCommand
     * @return
     */
    @InsertProvider(type = ErrorCommandMapperProvider.class, method = "insert")
    @Options(useGeneratedKeys = true,keyProperty = "errorCommand.id")
    @SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "errorCommand.id", before = false, resultType = int.class)
    int insert(@Param("errorCommand") ErrorCommand errorCommand);


}
+41 −0
Original line number Diff line number Diff line
package cn.escheduler.dao.mapper;

import cn.escheduler.common.enums.*;
import cn.escheduler.common.utils.EnumFieldUtil;
import org.apache.ibatis.jdbc.SQL;

import java.util.Map;

public class ErrorCommandMapperProvider {


    private static final String TABLE_NAME = "t_escheduler_error_command";


    /**
     * inert command
     *
     * @param parameter
     * @return
     */
    public String insert(Map<String, Object> parameter) {
        return new SQL() {
            {
                INSERT_INTO(TABLE_NAME);
                VALUES("`command_type`", EnumFieldUtil.genFieldStr("errorCommand.commandType", CommandType.class));
                VALUES("`process_definition_id`", "#{errorCommand.processDefinitionId}");
                VALUES("`executor_id`", "#{errorCommand.executorId}");
                VALUES("`command_param`", "#{errorCommand.commandParam}");
                VALUES("`task_depend_type`", EnumFieldUtil.genFieldStr("errorCommand.taskDependType", TaskDependType.class));
                VALUES("`failure_strategy`", EnumFieldUtil.genFieldStr("errorCommand.failureStrategy", FailureStrategy.class));
                VALUES("`warning_type`", EnumFieldUtil.genFieldStr("errorCommand.warningType", WarningType.class));
                VALUES("`process_instance_priority`", EnumFieldUtil.genFieldStr("errorCommand.processInstancePriority", Priority.class));
                VALUES("`warning_group_id`", "#{errorCommand.warningGroupId}");
                VALUES("`schedule_time`", "#{errorCommand.scheduleTime}");
                VALUES("`update_time`", "#{errorCommand.updateTime}");
                VALUES("`start_time`", "#{errorCommand.startTime}");
                VALUES("`message`", "#{errorCommand.message}");
            }
        }.toString();
    }
}
+275 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.escheduler.dao.model;

import cn.escheduler.common.enums.*;

import java.util.Date;

/**
 * command
 */
public class ErrorCommand {

    /**
     * id
     */
    private int id;

    /**
     * command type
     */
    private CommandType commandType;

    /**
     * process definition id
     */
    private int processDefinitionId;

    /**
     * executor id
     */
    private int executorId;

    /**
     * command parameter, format json
     */
    private String commandParam;

    /**
     * task depend type
     */
    private TaskDependType taskDependType;

    /**
     * failure strategy
     */
    private FailureStrategy failureStrategy;

    /**
     *  warning type
     */
    private WarningType warningType;

    /**
     * warning group id
     */
    private Integer warningGroupId;

    /**
     * schedule time
     */
    private Date scheduleTime;

    /**
     * start time
     */
    private Date startTime;

    /**
     * process instance priority
     */
    private Priority processInstancePriority;

    /**
     * update time
     */
    private Date updateTime;

    /**
     * 执行信息
     */
    private String message;


    public ErrorCommand(Command command, String message){
        this.commandType = command.getCommandType();
        this.executorId = command.getExecutorId();
        this.processDefinitionId = command.getProcessDefinitionId();
        this.commandParam = command.getCommandParam();
        this.warningType = command.getWarningType();
        this.warningGroupId = command.getWarningGroupId();
        this.scheduleTime = command.getScheduleTime();
        this.taskDependType = command.getTaskDependType();
        this.failureStrategy = command.getFailureStrategy();
        this.startTime = command.getStartTime();
        this.updateTime = command.getUpdateTime();
        this.processInstancePriority = command.getProcessInstancePriority();
        this.message = message;
    }

    public ErrorCommand(
            CommandType commandType,
            TaskDependType taskDependType,
            FailureStrategy failureStrategy,
            int executorId,
            int processDefinitionId,
            String commandParam,
            WarningType warningType,
            int warningGroupId,
            Date scheduleTime,
            Priority processInstancePriority,
            String message){
        this.commandType = commandType;
        this.executorId = executorId;
        this.processDefinitionId = processDefinitionId;
        this.commandParam = commandParam;
        this.warningType = warningType;
        this.warningGroupId = warningGroupId;
        this.scheduleTime = scheduleTime;
        this.taskDependType = taskDependType;
        this.failureStrategy = failureStrategy;
        this.startTime = new Date();
        this.updateTime = new Date();
        this.processInstancePriority = processInstancePriority;
        this.message = message;
    }


    public TaskDependType getTaskDependType() {
        return taskDependType;
    }

    public void setTaskDependType(TaskDependType taskDependType) {
        this.taskDependType = taskDependType;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public CommandType getCommandType() {
        return commandType;
    }

    public void setCommandType(CommandType commandType) {
        this.commandType = commandType;
    }

    public int getProcessDefinitionId() {
        return processDefinitionId;
    }

    public void setProcessDefinitionId(int processDefinitionId) {
        this.processDefinitionId = processDefinitionId;
    }


    public FailureStrategy getFailureStrategy() {
        return failureStrategy;
    }

    public void setFailureStrategy(FailureStrategy failureStrategy) {
        this.failureStrategy = failureStrategy;
    }

    public void setCommandParam(String commandParam) {
        this.commandParam = commandParam;
    }

    public String getCommandParam() {
        return commandParam;
    }

    public WarningType getWarningType() {
        return warningType;
    }

    public void setWarningType(WarningType warningType) {
        this.warningType = warningType;
    }

    public Integer getWarningGroupId() {
        return warningGroupId;
    }

    public void setWarningGroupId(Integer warningGroupId) {
        this.warningGroupId = warningGroupId;
    }

    public Date getScheduleTime() {
        return scheduleTime;
    }

    public void setScheduleTime(Date scheduleTime) {
        this.scheduleTime = scheduleTime;
    }

    public Date getStartTime() {
        return startTime;
    }

    public void setStartTime(Date startTime) {
        this.startTime = startTime;
    }

    public int getExecutorId() {
        return executorId;
    }

    public void setExecutorId(int executorId) {
        this.executorId = executorId;
    }

    public Priority getProcessInstancePriority() {
        return processInstancePriority;
    }

    public void setProcessInstancePriority(Priority processInstancePriority) {
        this.processInstancePriority = processInstancePriority;
    }

    public Date getUpdateTime() {
        return updateTime;
    }

    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
    }

    @Override
    public String toString() {
        return "Command{" +
                "id=" + id +
                ", commandType=" + commandType +
                ", processDefinitionId=" + processDefinitionId +
                ", executorId=" + executorId +
                ", commandParam='" + commandParam + '\'' +
                ", taskDependType=" + taskDependType +
                ", failureStrategy=" + failureStrategy +
                ", warningType=" + warningType +
                ", warningGroupId=" + warningGroupId +
                ", scheduleTime=" + scheduleTime +
                ", startTime=" + startTime +
                ", processInstancePriority=" + processInstancePriority +
                ", updateTime=" + updateTime +
                ", message=" + message +
                '}';
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}