Commit fd250ff3 authored by 向偲彪's avatar 向偲彪
Browse files
parents 5c2e94e9 eef8bf4e
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -391,7 +391,7 @@ public class ProcessInstanceController extends BaseController{
                    }
                }
            }
            if(deleteFailedIdList.size() > 0){
            if(!deleteFailedIdList.isEmpty()){
                putMsg(result, Status.BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR, String.join(",", deleteFailedIdList));
            }else{
                putMsg(result, Status.SUCCESS);
+0 −3
Original line number Diff line number Diff line
@@ -106,9 +106,6 @@ public class ProcessMeta {
     */
    private String scheduleWorkerGroupName;

    public ProcessMeta() {
    }

    public String getProjectName() {
        return projectName;
    }
+18 −18
Original line number Diff line number Diff line
@@ -43,36 +43,36 @@ public class TaskCountDto {
    }

    private void countTaskDtos(List<ExecuteStatusCount> taskInstanceStateCounts){
        int submitted_success = 0;
        int running_exeution = 0;
        int ready_pause = 0;
        int submittedSuccess = 0;
        int runningExeution = 0;
        int readyPause = 0;
        int pause = 0;
        int ready_stop = 0;
        int readyStop = 0;
        int stop = 0;
        int failure = 0;
        int success = 0;
        int need_fault_tolerance = 0;
        int needFaultTolerance = 0;
        int kill = 0;
        int waitting_thread = 0;
        int waittingThread = 0;

        for(ExecuteStatusCount taskInstanceStateCount : taskInstanceStateCounts){
            ExecutionStatus status = taskInstanceStateCount.getExecutionStatus();
            totalCount += taskInstanceStateCount.getCount();
            switch (status){
                case SUBMITTED_SUCCESS:
                    submitted_success += taskInstanceStateCount.getCount();
                    submittedSuccess += taskInstanceStateCount.getCount();
                    break;
                case RUNNING_EXEUTION:
                    running_exeution += taskInstanceStateCount.getCount();
                    runningExeution += taskInstanceStateCount.getCount();
                    break;
                case READY_PAUSE:
                    ready_pause += taskInstanceStateCount.getCount();
                    readyPause += taskInstanceStateCount.getCount();
                    break;
                case PAUSE:
                    pause += taskInstanceStateCount.getCount();
                    break;
                case READY_STOP:
                    ready_stop += taskInstanceStateCount.getCount();
                    readyStop += taskInstanceStateCount.getCount();
                    break;
                case STOP:
                    stop += taskInstanceStateCount.getCount();
@@ -84,13 +84,13 @@ public class TaskCountDto {
                    success += taskInstanceStateCount.getCount();
                    break;
                case NEED_FAULT_TOLERANCE:
                    need_fault_tolerance += taskInstanceStateCount.getCount();
                    needFaultTolerance += taskInstanceStateCount.getCount();
                    break;
                case KILL:
                    kill += taskInstanceStateCount.getCount();
                    break;
                case WAITTING_THREAD:
                    waitting_thread += taskInstanceStateCount.getCount();
                    waittingThread += taskInstanceStateCount.getCount();
                    break;

                    default:
@@ -98,17 +98,17 @@ public class TaskCountDto {
            }
        }
        this.taskCountDtos = new ArrayList<>();
        this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.SUBMITTED_SUCCESS, submitted_success));
        this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.RUNNING_EXEUTION, running_exeution));
        this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.READY_PAUSE, ready_pause));
        this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.SUBMITTED_SUCCESS, submittedSuccess));
        this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.RUNNING_EXEUTION, runningExeution));
        this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.READY_PAUSE, readyPause));
        this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.PAUSE, pause));
        this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.READY_STOP, ready_stop));
        this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.READY_STOP, readyStop));
        this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.STOP, stop));
        this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.FAILURE, failure));
        this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.SUCCESS, success));
        this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.NEED_FAULT_TOLERANCE, need_fault_tolerance));
        this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.NEED_FAULT_TOLERANCE, needFaultTolerance));
        this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.KILL, kill));
        this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.WAITTING_THREAD, waitting_thread));
        this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.WAITTING_THREAD, waittingThread));
    }


+5 −8
Original line number Diff line number Diff line
@@ -148,7 +148,7 @@ public class ProcessDefinitionService extends BaseDAGService {

        //custom global params
        List<Property> globalParamsList = processData.getGlobalParams();
        if (globalParamsList != null && globalParamsList.size() > 0) {
        if (CollectionUtils.isNotEmpty(globalParamsList)) {
            Set<Property> globalParamsSet = new HashSet<>(globalParamsList);
            globalParamsList = new ArrayList<>(globalParamsSet);
            processDefine.setGlobalParamList(globalParamsList);
@@ -314,7 +314,7 @@ public class ProcessDefinitionService extends BaseDAGService {

        //custom global params
        List<Property> globalParamsList = new ArrayList<>();
        if (processData.getGlobalParams() != null && processData.getGlobalParams().size() > 0) {
        if (CollectionUtils.isNotEmpty(processData.getGlobalParams())) {
            Set<Property> userDefParamsSet = new HashSet<>(processData.getGlobalParams());
            globalParamsList = new ArrayList<>(userDefParamsSet);
        }
@@ -453,12 +453,11 @@ public class ProcessDefinitionService extends BaseDAGService {
        ProcessDefinition processDefinition = processDefineMapper.selectById(id);

        switch (state) {
            case ONLINE: {
            case ONLINE:
                processDefinition.setReleaseState(state);
                processDefineMapper.updateById(processDefinition);
                break;
            }
            case OFFLINE: {
            case OFFLINE:
                processDefinition.setReleaseState(state);
                processDefineMapper.updateById(processDefinition);
                List<Schedule> scheduleList = scheduleMapper.selectAllByProcessDefineArray(
@@ -473,12 +472,10 @@ public class ProcessDefinitionService extends BaseDAGService {
                    SchedulerService.deleteSchedule(project.getId(), schedule.getId());
                }
                break;
            }
            default: {
            default:
                putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "releaseState");
                return result;
        }
        }

        putMsg(result, Status.SUCCESS);
        return result;
+7 −8
Original line number Diff line number Diff line
@@ -239,7 +239,7 @@ public class ProcessInstanceService extends BaseDAGService {
        }
        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
        List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processId);
        AddDependResultForTaskList(taskInstanceList);
        addDependResultForTaskList(taskInstanceList);
        Map<String, Object> resultMap = new HashMap<>();
        resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString());
        resultMap.put(TASK_LIST, taskInstanceList);
@@ -253,9 +253,9 @@ public class ProcessInstanceService extends BaseDAGService {
     * add dependent result for dependent task
     * @param taskInstanceList
     */
    private void AddDependResultForTaskList(List<TaskInstance> taskInstanceList) throws IOException {
    private void addDependResultForTaskList(List<TaskInstance> taskInstanceList) throws IOException {
        for(TaskInstance taskInstance: taskInstanceList){
            if(taskInstance.getTaskType().toUpperCase().equals(TaskType.DEPENDENT.toString())){
            if(taskInstance.getTaskType().equalsIgnoreCase(TaskType.DEPENDENT.toString())){
                Result logResult = loggerService.queryLog(
                        taskInstance.getId(), 0, 4098);
                if(logResult.getCode() == Status.SUCCESS.ordinal()){
@@ -414,11 +414,10 @@ public class ProcessInstanceService extends BaseDAGService {
            processInstance.setProcessInstanceJson(processInstanceJson);
            processInstance.setGlobalParams(globalParams);
        }
//        int update = processDao.updateProcessInstance(processInstanceId, processInstanceJson,
//                globalParams, schedule, flag, locations, connects);

        int update = processService.updateProcessInstance(processInstance);
        int updateDefine = 1;
        if (syncDefine && StringUtils.isNotEmpty(processInstanceJson)) {
        if (Boolean.TRUE.equals(syncDefine) && StringUtils.isNotEmpty(processInstanceJson)) {
            processDefinition.setProcessDefinitionJson(processInstanceJson);
            processDefinition.setGlobalParams(originDefParams);
            processDefinition.setLocations(locations);
@@ -544,7 +543,7 @@ public class ProcessInstanceService extends BaseDAGService {
                    nodeValueSb.append(ipSb);
                }

                logger.info("delete task queue node : {}",nodeValueSb.toString());
                logger.info("delete task queue node : {}",nodeValueSb);
                tasksQueue.removeNode(org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_QUEUE, nodeValueSb.toString());

            }
@@ -621,7 +620,7 @@ public class ProcessInstanceService extends BaseDAGService {
                Map<String,Object> localParamsMap = new HashMap<>();
                localParamsMap.put("taskType",taskNode.getType());
                localParamsMap.put("localParamsList",localParamsList);
                if (localParamsList.size() > 0) {
                if (CollectionUtils.isNotEmpty(localParamsList)) {
                    localUserDefParams.put(taskNode.getName(), localParamsMap);
                }
            }
Loading