Unverified Commit a33fddc2 authored by bao liang's avatar bao liang Committed by GitHub
Browse files

Merge pull request #314 from lenboo/branch-1.0.2

fix bugs:global parameters,unit calculate error,tolerance tasks...
parents 5956409a faf993e6
Loading
Loading
Loading
Loading
+7 −5
Original line number Diff line number Diff line
@@ -701,17 +701,19 @@ public class ResourcesService extends BaseService {
        if (checkAdmin(loginUser, result)) {
            return result;
        }

        List<Resource> resourceList = resourcesMapper.queryResourceExceptUserId(userId);
        Set<Resource> resourceSet = null;
        List<Object> list ;
        if (resourceList != null && resourceList.size() > 0) {
            resourceSet = new HashSet<>(resourceList);

            Set<Resource> resourceSet = new HashSet<>(resourceList);
            List<Resource> authedResourceList = resourcesMapper.queryAuthorizedResourceList(userId);

            getAuthorizedResourceList(resourceSet, authedResourceList);
            list = new ArrayList<>(resourceSet);
        }else {
            list = new ArrayList<>(0);
        }
        result.put(Constants.DATA_LIST, new ArrayList<>(resourceSet));

        result.put(Constants.DATA_LIST, list);
        putMsg(result,Status.SUCCESS);
        return result;
    }
+40 −13
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@ import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.*;
import cn.escheduler.common.model.DateInterval;
import cn.escheduler.common.model.TaskNode;
import cn.escheduler.common.process.Property;
import cn.escheduler.common.queue.ITaskQueue;
import cn.escheduler.common.queue.TaskQueueFactory;
import cn.escheduler.common.task.subprocess.SubProcessParameters;
@@ -41,6 +42,7 @@ import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.*;
import java.util.stream.Collectors;

import static cn.escheduler.common.Constants.*;
import static cn.escheduler.dao.datasource.ConnectionFactory.getMapper;
@@ -689,41 +691,62 @@ public class ProcessDao extends AbstractBaseDao {
     * handle sub work process instance, update relation table and command parameters
     * set sub work process flag, extends parent work process command parameters.
     */
    public ProcessInstance setSubProcessParam(ProcessInstance processInstance){
        String cmdParam = processInstance.getCommandParam();
    public ProcessInstance setSubProcessParam(ProcessInstance subProcessInstance){
        String cmdParam = subProcessInstance.getCommandParam();
        if(StringUtils.isEmpty(cmdParam)){
            return processInstance;
            return subProcessInstance;
        }
        Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
        // write sub process id into cmd param.
        if(paramMap.containsKey(CMDPARAM_SUB_PROCESS)
                && CMDPARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMDPARAM_SUB_PROCESS))){
            paramMap.remove(CMDPARAM_SUB_PROCESS);
            paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(processInstance.getId()));
            processInstance.setCommandParam(JSONUtils.toJson(paramMap));
            processInstance.setIsSubProcess(Flag.YES);
            this.saveProcessInstance(processInstance);
            paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId()));
            subProcessInstance.setCommandParam(JSONUtils.toJson(paramMap));
            subProcessInstance.setIsSubProcess(Flag.YES);
            this.saveProcessInstance(subProcessInstance);
        }
        // copy parent instance user def params to sub process..
        String parentInstanceId = paramMap.get(CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID);
        if(StringUtils.isNotEmpty(parentInstanceId)){
            ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
            if(parentInstance != null){
                processInstance.setGlobalParams(parentInstance.getGlobalParams());
                this.saveProcessInstance(processInstance);
                subProcessInstance.setGlobalParams(
                        joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
                this.saveProcessInstance(subProcessInstance);
            }else{
                logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam);
            }
        }
        ProcessInstanceMap processInstanceMap = JSONUtils.parseObject(cmdParam, ProcessInstanceMap.class);
        if(processInstanceMap == null || processInstanceMap.getParentProcessInstanceId() == 0){
            return processInstance;
            return subProcessInstance;
        }
        // update sub process id to process map table
        processInstanceMap.setProcessInstanceId(processInstance.getId());
        processInstanceMap.setProcessInstanceId(subProcessInstance.getId());

        this.updateWorkProcessInstanceMap(processInstanceMap);
        return processInstance;
        return subProcessInstance;
    }

    /**
     * join parent global params into sub process.
     *  only the keys doesn't in sub process global would be joined.
     * @param parentGlobalParams
     * @param subGlobalParams
     * @return
     */
    private String joinGlobalParams(String parentGlobalParams, String subGlobalParams){
        List<Property> parentPropertyList = JSONUtils.toList(parentGlobalParams, Property.class);
        List<Property> subPropertyList = JSONUtils.toList(subGlobalParams, Property.class);
        Map<String,String> subMap = subPropertyList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));

        for(Property parent : parentPropertyList){
            if(!subMap.containsKey(parent.getProp())){
                subPropertyList.add(parent);
            }
        }
        return JSONUtils.toJson(subPropertyList);
    }

    /**
@@ -898,7 +921,11 @@ public class ProcessDao extends AbstractBaseDao {
                    taskInstance.setFlag(Flag.NO);
                    updateTaskInstance(taskInstance);
                    // crate new task instance
                    if(taskInstance.getState() != ExecutionStatus.NEED_FAULT_TOLERANCE){
                        taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1 );
                    }
                    taskInstance.setEndTime(null);
                    taskInstance.setStartTime(new Date());
                    taskInstance.setFlag(Flag.YES);
                    taskInstance.setHost(null);
                    taskInstance.setId(0);
+5 −1
Original line number Diff line number Diff line
@@ -422,9 +422,13 @@ public class TaskInstance {
        if(this.isSubProcess()){
            return false;
        }
        if(this.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){
            return true;
        }else {
            return (this.getState().typeIsFailure()
                && this.getRetryTimes() < this.getMaxRetryTimes());
        }
    }

    public void setDependency(String dependency) {
        this.dependency = dependency;
+1 −1
Original line number Diff line number Diff line
@@ -869,7 +869,7 @@ public class MasterExecThread implements Runnable {
        }

        Date now = new Date();
        long runningTime =  DateUtils.differMs(now, processInstance.getStartTime());
        long runningTime =  DateUtils.diffMin(now, processInstance.getStartTime());

        if(runningTime > processInstance.getTimeout()){
            return true;
+2 −2
Original line number Diff line number Diff line
@@ -130,7 +130,7 @@ export default {
  'Please enter database name': '请输入数据库名',
  'jdbc connect parameters': 'jdbc连接参数',
  'Test Connect': '测试连接',
  'Please enter resource name': '请输入源名称',
  'Please enter resource name': '请输入数据源名称',
  'Please enter IP/hostname': '请输入IP/主机名',
  'jdbc connection parameters is not a correct JSON format': 'jdbc连接参数不是一个正确的JSON格式',
  '#': '编号',
@@ -419,7 +419,7 @@ export default {
  'Create token': '创建令牌',
  'Edit token': '编辑令牌',
  'Please enter the IP address separated by commas': '请输入IP地址多个用逗号隔开',
  'Note: Multiple IP addresses have been comma separated': '注意:多个IP地址逗号分割',
  'Note: Multiple IP addresses have been comma separated': '注意:多个IP地址逗号分割',
  'Failure time': '失效时间',
  'User': '用户',
  'Please enter token': '请输入令牌',