Loading dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +10 −1 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; Loading Loading @@ -59,7 +60,7 @@ public class ExecutorService extends BaseService{ private ProcessDefinitionMapper processDefinitionMapper; @Autowired private ProcessDefinitionService processDefinitionService; private MonitorService monitorService; @Autowired Loading Loading @@ -123,6 +124,14 @@ public class ExecutorService extends BaseService{ return result; } // check master server exists List<Server> masterServers = monitorService.getServerListFromZK(true); if (masterServers.size() == 0) { putMsg(result, Status.MASTER_NOT_EXISTS); return result; } /** * create command */ Loading dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java +1 −0 Original line number Diff line number Diff line Loading @@ -365,6 +365,7 @@ public class SchedulerService extends BaseService { if (masterServers.size() == 0) { putMsg(result, Status.MASTER_NOT_EXISTS); return result; } // set status Loading dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java +38 −5 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.RunMode; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; Loading Loading @@ -63,6 +64,9 @@ public class ExecutorService2Test { @Mock private ProjectService projectService; @Mock private MonitorService monitorService; private int processDefinitionId = 1; private int tenantId = 1; Loading Loading @@ -102,6 +106,7 @@ public class ExecutorService2Test { Mockito.when(processDefinitionMapper.selectById(processDefinitionId)).thenReturn(processDefinition); Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant()); Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1); Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(getMasterServersList()); } /** Loading @@ -121,7 +126,6 @@ public class ExecutorService2Test { Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); }catch (Exception e){ Assert.assertTrue(false); } } Loading @@ -142,7 +146,6 @@ public class ExecutorService2Test { Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); verify(processService, times(0)).createCommand(any(Command.class)); }catch (Exception e){ Assert.assertTrue(false); } } Loading @@ -163,7 +166,6 @@ public class ExecutorService2Test { Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); }catch (Exception e){ Assert.assertTrue(false); } } Loading @@ -184,7 +186,6 @@ public class ExecutorService2Test { Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(31)).createCommand(any(Command.class)); }catch (Exception e){ Assert.assertTrue(false); } } Loading @@ -205,10 +206,42 @@ public class ExecutorService2Test { Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(15)).createCommand(any(Command.class)); }catch (Exception e){ Assert.assertTrue(false); } } @Test public void testNoMsterServers() throws ParseException{ Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(new ArrayList<Server>()); Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName, processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA, null, null, null, null, 0, "", "", RunMode.RUN_MODE_PARALLEL, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); Assert.assertEquals(result.get(Constants.STATUS),Status.MASTER_NOT_EXISTS); } private List<Server> getMasterServersList(){ List<Server> masterServerList = new ArrayList<>(); Server masterServer1 = new Server(); masterServer1.setId(1); masterServer1.setHost("192.168.220.188"); masterServer1.setPort(1121); masterServerList.add(masterServer1); Server masterServer2 = new Server(); masterServer2.setId(2); masterServer2.setHost("192.168.220.189"); masterServer2.setPort(1122); masterServerList.add(masterServer2); return masterServerList; } private List<Schedule> zeroSchedulerList(){ return Collections.EMPTY_LIST; } Loading dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +14 −2 Original line number Diff line number Diff line Loading @@ -126,13 +126,25 @@ public class TaskPriorityQueueConsumer extends Thread{ ThreadUtils.sleep(SLEEP_TIME_MILLIS); } if (result){ if (result || taskInstanceIsFinalState(taskInstanceId)){ break; } } return result; } /** * taskInstance is final state * success,failure,kill,stop,pause,threadwaiting is final state * @param taskInstanceId taskInstanceId * @return taskInstance is final state */ public Boolean taskInstanceIsFinalState(int taskInstanceId){ TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId); return taskInstance.getState().typeIsFinished(); } /** * get TaskExecutionContext * @param taskInstanceId taskInstanceId Loading dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +45 −1 Original line number Diff line number Diff line Loading @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; Loading @@ -35,9 +36,12 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.springframework.beans.factory.annotation.Autowired; import java.util.Date; import java.util.Set; /** Loading @@ -53,6 +57,12 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { private NettyExecutorManager nettyExecutorManager; /** * zookeeper register center */ private ZookeeperRegistryCenter zookeeperRegistryCenter; /** * constructor of MasterTaskExecThread * @param taskInstance task instance Loading @@ -61,6 +71,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { super(taskInstance); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class); this.zookeeperRegistryCenter = SpringApplicationContext.getBean(ZookeeperRegistryCenter.class); } /** Loading Loading @@ -175,6 +186,16 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { } alreadyKilled = true; String taskInstanceWorkerGroup = taskInstance.getWorkerGroup(); // not exists if (!existsValidWorkerGroup(taskInstanceWorkerGroup)){ taskInstance.setState(ExecutionStatus.KILL); taskInstance.setEndTime(new Date()); processService.updateTaskInstance(taskInstance); return; } TaskKillRequestCommand killCommand = new TaskKillRequestCommand(); killCommand.setTaskInstanceId(taskInstance.getId()); Loading @@ -185,10 +206,33 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { nettyExecutorManager.executeDirectly(executionContext); logger.info("master add kill task :{} id:{} to kill queue", logger.info("master kill taskInstance name :{} taskInstance id:{}", taskInstance.getName(), taskInstance.getId() ); } /** * whether exists valid worker group * @param taskInstanceWorkerGroup taskInstanceWorkerGroup * @return whether exists */ public Boolean existsValidWorkerGroup(String taskInstanceWorkerGroup){ Set<String> workerGroups = zookeeperRegistryCenter.getWorkerGroupDirectly(); // not worker group if (CollectionUtils.isEmpty(workerGroups)){ return false; } // has worker group , but not taskInstance assigned worker group if (!workerGroups.contains(taskInstanceWorkerGroup)){ return false; } Set<String> workers = zookeeperRegistryCenter.getWorkerGroupNodesDirectly(taskInstanceWorkerGroup); if (CollectionUtils.isEmpty(workers)) { return false; } return true; } /** * get task timeout parameter * @return TaskTimeoutParameter Loading Loading
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +10 −1 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; Loading Loading @@ -59,7 +60,7 @@ public class ExecutorService extends BaseService{ private ProcessDefinitionMapper processDefinitionMapper; @Autowired private ProcessDefinitionService processDefinitionService; private MonitorService monitorService; @Autowired Loading Loading @@ -123,6 +124,14 @@ public class ExecutorService extends BaseService{ return result; } // check master server exists List<Server> masterServers = monitorService.getServerListFromZK(true); if (masterServers.size() == 0) { putMsg(result, Status.MASTER_NOT_EXISTS); return result; } /** * create command */ Loading
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java +1 −0 Original line number Diff line number Diff line Loading @@ -365,6 +365,7 @@ public class SchedulerService extends BaseService { if (masterServers.size() == 0) { putMsg(result, Status.MASTER_NOT_EXISTS); return result; } // set status Loading
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java +38 −5 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.RunMode; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; Loading Loading @@ -63,6 +64,9 @@ public class ExecutorService2Test { @Mock private ProjectService projectService; @Mock private MonitorService monitorService; private int processDefinitionId = 1; private int tenantId = 1; Loading Loading @@ -102,6 +106,7 @@ public class ExecutorService2Test { Mockito.when(processDefinitionMapper.selectById(processDefinitionId)).thenReturn(processDefinition); Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant()); Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1); Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(getMasterServersList()); } /** Loading @@ -121,7 +126,6 @@ public class ExecutorService2Test { Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); }catch (Exception e){ Assert.assertTrue(false); } } Loading @@ -142,7 +146,6 @@ public class ExecutorService2Test { Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); verify(processService, times(0)).createCommand(any(Command.class)); }catch (Exception e){ Assert.assertTrue(false); } } Loading @@ -163,7 +166,6 @@ public class ExecutorService2Test { Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); }catch (Exception e){ Assert.assertTrue(false); } } Loading @@ -184,7 +186,6 @@ public class ExecutorService2Test { Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(31)).createCommand(any(Command.class)); }catch (Exception e){ Assert.assertTrue(false); } } Loading @@ -205,10 +206,42 @@ public class ExecutorService2Test { Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(15)).createCommand(any(Command.class)); }catch (Exception e){ Assert.assertTrue(false); } } @Test public void testNoMsterServers() throws ParseException{ Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(new ArrayList<Server>()); Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName, processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA, null, null, null, null, 0, "", "", RunMode.RUN_MODE_PARALLEL, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); Assert.assertEquals(result.get(Constants.STATUS),Status.MASTER_NOT_EXISTS); } private List<Server> getMasterServersList(){ List<Server> masterServerList = new ArrayList<>(); Server masterServer1 = new Server(); masterServer1.setId(1); masterServer1.setHost("192.168.220.188"); masterServer1.setPort(1121); masterServerList.add(masterServer1); Server masterServer2 = new Server(); masterServer2.setId(2); masterServer2.setHost("192.168.220.189"); masterServer2.setPort(1122); masterServerList.add(masterServer2); return masterServerList; } private List<Schedule> zeroSchedulerList(){ return Collections.EMPTY_LIST; } Loading
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +14 −2 Original line number Diff line number Diff line Loading @@ -126,13 +126,25 @@ public class TaskPriorityQueueConsumer extends Thread{ ThreadUtils.sleep(SLEEP_TIME_MILLIS); } if (result){ if (result || taskInstanceIsFinalState(taskInstanceId)){ break; } } return result; } /** * taskInstance is final state * success,failure,kill,stop,pause,threadwaiting is final state * @param taskInstanceId taskInstanceId * @return taskInstance is final state */ public Boolean taskInstanceIsFinalState(int taskInstanceId){ TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId); return taskInstance.getState().typeIsFinished(); } /** * get TaskExecutionContext * @param taskInstanceId taskInstanceId Loading
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +45 −1 Original line number Diff line number Diff line Loading @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; Loading @@ -35,9 +36,12 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.springframework.beans.factory.annotation.Autowired; import java.util.Date; import java.util.Set; /** Loading @@ -53,6 +57,12 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { private NettyExecutorManager nettyExecutorManager; /** * zookeeper register center */ private ZookeeperRegistryCenter zookeeperRegistryCenter; /** * constructor of MasterTaskExecThread * @param taskInstance task instance Loading @@ -61,6 +71,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { super(taskInstance); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class); this.zookeeperRegistryCenter = SpringApplicationContext.getBean(ZookeeperRegistryCenter.class); } /** Loading Loading @@ -175,6 +186,16 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { } alreadyKilled = true; String taskInstanceWorkerGroup = taskInstance.getWorkerGroup(); // not exists if (!existsValidWorkerGroup(taskInstanceWorkerGroup)){ taskInstance.setState(ExecutionStatus.KILL); taskInstance.setEndTime(new Date()); processService.updateTaskInstance(taskInstance); return; } TaskKillRequestCommand killCommand = new TaskKillRequestCommand(); killCommand.setTaskInstanceId(taskInstance.getId()); Loading @@ -185,10 +206,33 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { nettyExecutorManager.executeDirectly(executionContext); logger.info("master add kill task :{} id:{} to kill queue", logger.info("master kill taskInstance name :{} taskInstance id:{}", taskInstance.getName(), taskInstance.getId() ); } /** * whether exists valid worker group * @param taskInstanceWorkerGroup taskInstanceWorkerGroup * @return whether exists */ public Boolean existsValidWorkerGroup(String taskInstanceWorkerGroup){ Set<String> workerGroups = zookeeperRegistryCenter.getWorkerGroupDirectly(); // not worker group if (CollectionUtils.isEmpty(workerGroups)){ return false; } // has worker group , but not taskInstance assigned worker group if (!workerGroups.contains(taskInstanceWorkerGroup)){ return false; } Set<String> workers = zookeeperRegistryCenter.getWorkerGroupNodesDirectly(taskInstanceWorkerGroup); if (CollectionUtils.isEmpty(workers)) { return false; } return true; } /** * get task timeout parameter * @return TaskTimeoutParameter Loading