Commit 7dc7348c authored by ligang's avatar ligang
Browse files

Merge remote-tracking branch 'remotes/upstream/branch-1.0.2' into branch-1.0.2

parents 55d53144 e171234f
Loading
Loading
Loading
Loading
+7 −5
Original line number Diff line number Diff line
@@ -701,17 +701,19 @@ public class ResourcesService extends BaseService {
        if (checkAdmin(loginUser, result)) {
            return result;
        }

        List<Resource> resourceList = resourcesMapper.queryResourceExceptUserId(userId);
        Set<Resource> resourceSet = null;
        List<Object> list ;
        if (resourceList != null && resourceList.size() > 0) {
            resourceSet = new HashSet<>(resourceList);

            Set<Resource> resourceSet = new HashSet<>(resourceList);
            List<Resource> authedResourceList = resourcesMapper.queryAuthorizedResourceList(userId);

            getAuthorizedResourceList(resourceSet, authedResourceList);
            list = new ArrayList<>(resourceSet);
        }else {
            list = new ArrayList<>(0);
        }
        result.put(Constants.DATA_LIST, new ArrayList<>(resourceSet));

        result.put(Constants.DATA_LIST, list);
        putMsg(result,Status.SUCCESS);
        return result;
    }
+40 −13
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@ import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.*;
import cn.escheduler.common.model.DateInterval;
import cn.escheduler.common.model.TaskNode;
import cn.escheduler.common.process.Property;
import cn.escheduler.common.queue.ITaskQueue;
import cn.escheduler.common.queue.TaskQueueFactory;
import cn.escheduler.common.task.subprocess.SubProcessParameters;
@@ -41,6 +42,7 @@ import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.*;
import java.util.stream.Collectors;

import static cn.escheduler.common.Constants.*;
import static cn.escheduler.dao.datasource.ConnectionFactory.getMapper;
@@ -689,41 +691,62 @@ public class ProcessDao extends AbstractBaseDao {
     * handle sub work process instance, update relation table and command parameters
     * set sub work process flag, extends parent work process command parameters.
     */
    public ProcessInstance setSubProcessParam(ProcessInstance processInstance){
        String cmdParam = processInstance.getCommandParam();
    public ProcessInstance setSubProcessParam(ProcessInstance subProcessInstance){
        String cmdParam = subProcessInstance.getCommandParam();
        if(StringUtils.isEmpty(cmdParam)){
            return processInstance;
            return subProcessInstance;
        }
        Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
        // write sub process id into cmd param.
        if(paramMap.containsKey(CMDPARAM_SUB_PROCESS)
                && CMDPARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMDPARAM_SUB_PROCESS))){
            paramMap.remove(CMDPARAM_SUB_PROCESS);
            paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(processInstance.getId()));
            processInstance.setCommandParam(JSONUtils.toJson(paramMap));
            processInstance.setIsSubProcess(Flag.YES);
            this.saveProcessInstance(processInstance);
            paramMap.put(CMDPARAM_SUB_PROCESS, String.valueOf(subProcessInstance.getId()));
            subProcessInstance.setCommandParam(JSONUtils.toJson(paramMap));
            subProcessInstance.setIsSubProcess(Flag.YES);
            this.saveProcessInstance(subProcessInstance);
        }
        // copy parent instance user def params to sub process..
        String parentInstanceId = paramMap.get(CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID);
        if(StringUtils.isNotEmpty(parentInstanceId)){
            ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
            if(parentInstance != null){
                processInstance.setGlobalParams(parentInstance.getGlobalParams());
                this.saveProcessInstance(processInstance);
                subProcessInstance.setGlobalParams(
                        joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
                this.saveProcessInstance(subProcessInstance);
            }else{
                logger.error("sub process command params error, cannot find parent instance: {} ", cmdParam);
            }
        }
        ProcessInstanceMap processInstanceMap = JSONUtils.parseObject(cmdParam, ProcessInstanceMap.class);
        if(processInstanceMap == null || processInstanceMap.getParentProcessInstanceId() == 0){
            return processInstance;
            return subProcessInstance;
        }
        // update sub process id to process map table
        processInstanceMap.setProcessInstanceId(processInstance.getId());
        processInstanceMap.setProcessInstanceId(subProcessInstance.getId());

        this.updateWorkProcessInstanceMap(processInstanceMap);
        return processInstance;
        return subProcessInstance;
    }

    /**
     * join parent global params into sub process.
     *  only the keys doesn't in sub process global would be joined.
     * @param parentGlobalParams
     * @param subGlobalParams
     * @return
     */
    private String joinGlobalParams(String parentGlobalParams, String subGlobalParams){
        List<Property> parentPropertyList = JSONUtils.toList(parentGlobalParams, Property.class);
        List<Property> subPropertyList = JSONUtils.toList(subGlobalParams, Property.class);
        Map<String,String> subMap = subPropertyList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));

        for(Property parent : parentPropertyList){
            if(!subMap.containsKey(parent.getProp())){
                subPropertyList.add(parent);
            }
        }
        return JSONUtils.toJson(subPropertyList);
    }

    /**
@@ -898,7 +921,11 @@ public class ProcessDao extends AbstractBaseDao {
                    taskInstance.setFlag(Flag.NO);
                    updateTaskInstance(taskInstance);
                    // crate new task instance
                    if(taskInstance.getState() != ExecutionStatus.NEED_FAULT_TOLERANCE){
                        taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1 );
                    }
                    taskInstance.setEndTime(null);
                    taskInstance.setStartTime(new Date());
                    taskInstance.setFlag(Flag.YES);
                    taskInstance.setHost(null);
                    taskInstance.setId(0);
+5 −1
Original line number Diff line number Diff line
@@ -422,9 +422,13 @@ public class TaskInstance {
        if(this.isSubProcess()){
            return false;
        }
        if(this.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){
            return true;
        }else {
            return (this.getState().typeIsFailure()
                && this.getRetryTimes() < this.getMaxRetryTimes());
        }
    }

    public void setDependency(String dependency) {
        this.dependency = dependency;
+1 −1
Original line number Diff line number Diff line
@@ -869,7 +869,7 @@ public class MasterExecThread implements Runnable {
        }

        Date now = new Date();
        long runningTime =  DateUtils.differMs(now, processInstance.getStartTime());
        long runningTime =  DateUtils.diffMin(now, processInstance.getStartTime());

        if(runningTime > processInstance.getTimeout()){
            return true;
+4 −4
Original line number Diff line number Diff line
@@ -88,9 +88,9 @@ eschedulerConf(){
            proxy_set_header remote_addr $E_remote_addr;
            proxy_set_header X-Forwarded-For $E_proxy_add_x_forwarded_for;
            proxy_http_version 1.1;
            proxy_connect_timeout 4s;
            proxy_read_timeout 30s;
            proxy_send_timeout 12s;
            proxy_connect_timeout 300s;
            proxy_read_timeout 300s;
            proxy_send_timeout 300s;
            proxy_set_header Upgrade $E_http_upgrade;
            proxy_set_header Connection "upgrade";
            }
Loading