Unverified Commit 21498332 authored by qiaozhanwei's avatar qiaozhanwei Committed by GitHub
Browse files

fix #1775 bug,delete process definition when process instance is running (#1790)

* fix #1775 bug,delete process definition when process instance is running

* revert CONTRIBUTING.md
parent 1490c705
Loading
Loading
Loading
Loading
+5 −0
Original line number Diff line number Diff line
@@ -455,6 +455,11 @@ public class ProcessDao {
        if(tenantId >= 0){
            tenant = tenantMapper.queryById(tenantId);
        }

        if (userId == 0){
            return null;
        }

        if(tenant == null){
            User user = userMapper.selectById(userId);
            tenant = tenantMapper.queryById(user.getTenantId());
+32 −5
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -143,6 +144,7 @@ public class FetchTaskThread implements Runnable{
        logger.info("worker start fetch tasks...");
        while (Stopper.isRunning()){
            InterProcessMutex mutex = null;
            String currentTaskQueueStr = null;
            try {
                ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
                //check memory and cpu usage and threads
@@ -168,6 +170,9 @@ public class FetchTaskThread implements Runnable{
                List<String> taskQueueStrArr = taskQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskNum);

                for(String taskQueueStr : taskQueueStrArr){

                    currentTaskQueueStr = taskQueueStr;

                    if (StringUtils.isEmpty(taskQueueStr)) {
                        continue;
                    }
@@ -191,13 +196,16 @@ public class FetchTaskThread implements Runnable{
                        continue;
                    }

                    Tenant tenant = processDao.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
                            taskInstance.getProcessDefine().getUserId());
                    // if process definition is null ,process definition already deleted
                    int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
                    Tenant tenant = processDao.getTenantForProcess(
                            taskInstance.getProcessInstance().getTenantId(),
                            userId);

                    // verify tenant is null
                    if (verifyTenantIsNull(tenant)) {
                        logger.warn("remove task queue : {} due to tenant is null", taskQueueStr);
                        removeNodeFromTaskQueue(taskQueueStr);
                        processErrorTask(taskQueueStr);
                        continue;
                    }

@@ -236,6 +244,7 @@ public class FetchTaskThread implements Runnable{
                }

            }catch (Exception e){
                processErrorTask(currentTaskQueueStr);
                logger.error("fetch task thread failure" ,e);
            }finally {
                AbstractZKClient.releaseMutex(mutex);
@@ -243,6 +252,25 @@ public class FetchTaskThread implements Runnable{
        }
    }

    /**
     * process error task
     *
     * @param taskQueueStr task queue str
     */
    private void processErrorTask(String taskQueueStr){
        // remove from zk
        removeNodeFromTaskQueue(taskQueueStr);

        if (taskInstance != null){
            processDao.changeTaskState(ExecutionStatus.FAILURE,
                    taskInstance.getStartTime(),
                    taskInstance.getHost(),
                    null,
                    null,
                    taskInstId);
        }

    }
    /**
     * remove node from task queue
     *
@@ -273,8 +301,7 @@ public class FetchTaskThread implements Runnable{
     */
    private boolean verifyTenantIsNull(Tenant tenant) {
        if(tenant == null){
            logger.error("tenant not exists,process define id : {},process instance id : {},task instance id : {}",
                    taskInstance.getProcessDefine().getId(),
            logger.error("tenant not exists,process instance id : {},task instance id : {}",
                    taskInstance.getProcessInstance().getId(),
                    taskInstance.getId());
            return true;