Loading dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +2 −2 Original line number Diff line number Diff line Loading @@ -123,8 +123,8 @@ public class MasterServer { // self tolerant this.zkMasterClient.start(); // masterSchedulerService.start(); // scheduler start this.masterSchedulerService.start(); // start QuartzExecutors // what system should do if exception Loading dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +9 −5 Original line number Diff line number Diff line Loading @@ -35,7 +35,6 @@ import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.AlertManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.slf4j.Logger; Loading Loading @@ -123,7 +122,7 @@ public class MasterExecThread implements Runnable { /** * alert manager */ private AlertManager alertManager = new AlertManager(); private AlertManager alertManager; /** * the object of DAG Loading Loading @@ -151,15 +150,20 @@ public class MasterExecThread implements Runnable { * @param processService processService * @param nettyRemotingClient nettyRemotingClient */ public MasterExecThread(ProcessInstance processInstance, ProcessService processService, NettyRemotingClient nettyRemotingClient){ public MasterExecThread(ProcessInstance processInstance , ProcessService processService , NettyRemotingClient nettyRemotingClient , AlertManager alertManager , MasterConfig masterConfig) { this.processService = processService; this.processInstance = processInstance; this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class); this.masterConfig = masterConfig; int masterTaskExecNum = masterConfig.getMasterExecTaskNum(); this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread", masterTaskExecNum); this.nettyRemotingClient = nettyRemotingClient; this.alertManager = alertManager; } Loading dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +18 −5 Original line number Diff line number Diff line Loading @@ -16,6 +16,11 @@ */ package org.apache.dolphinscheduler.server.master.runner; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.dolphinscheduler.common.Constants; Loading @@ -28,6 +33,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.AlertManager; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; Loading @@ -35,10 +41,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * master scheduler thread */ Loading Loading @@ -68,6 +70,11 @@ public class MasterSchedulerService extends Thread { @Autowired private MasterConfig masterConfig; /** * alert manager */ private AlertManager alertManager = new AlertManager(); /** * netty remoting client */ Loading Loading @@ -139,7 +146,13 @@ public class MasterSchedulerService extends Thread { this.masterConfig.getMasterExecThreads() - activeCount, command); if (processInstance != null) { logger.info("start master exec thread , split DAG ..."); masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient)); masterExecService.execute( new MasterExecThread( processInstance , processService , nettyRemotingClient , alertManager , masterConfig)); } }catch (Exception e){ logger.error("scan command error ", e); Loading dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java +11 −10 Original line number Diff line number Diff line Loading @@ -72,8 +72,6 @@ public class MasterExecThreadTest { applicationContext = mock(ApplicationContext.class); config = new MasterConfig(); config.setMasterExecTaskNum(1); SpringApplicationContext springApplicationContext = new SpringApplicationContext(); springApplicationContext.setApplicationContext(applicationContext); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); processInstance = mock(ProcessInstance.class); Loading @@ -84,14 +82,17 @@ public class MasterExecThreadTest { Mockito.when(processInstance.getScheduleTime()).thenReturn(DateUtils.stringToDate("2020-01-01 00:00:00")); Map<String, String> cmdParam = new HashMap<>(); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, "2020-01-01 00:00:00"); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, "2020-01-31 23:00:00"); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, "2020-01-20 23:00:00"); Mockito.when(processInstance.getCommandParam()).thenReturn(JSONUtils.toJsonString(cmdParam)); ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setGlobalParamMap(Collections.EMPTY_MAP); processDefinition.setGlobalParamList(Collections.EMPTY_LIST); Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processService,null)); masterExecThread = PowerMockito.spy(new MasterExecThread( processInstance , processService ,null, null, config)); // prepareProcess init dag Field dag = MasterExecThread.class.getDeclaredField("dag"); dag.setAccessible(true); Loading @@ -114,11 +115,11 @@ public class MasterExecThreadTest { Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess"); method.setAccessible(true); method.invoke(masterExecThread); // one create save, and 1-30 for next save, and last day 31 no save verify(processService, times(31)).saveProcessInstance(processInstance); // one create save, and 1-30 for next save, and last day 20 no save verify(processService, times(20)).saveProcessInstance(processInstance); }catch (Exception e){ e.printStackTrace(); Assert.assertTrue(false); Assert.fail(); } } Loading @@ -133,10 +134,10 @@ public class MasterExecThreadTest { Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess"); method.setAccessible(true); method.invoke(masterExecThread); // one create save, and 15(1 to 31 step 2) for next save, and last day 31 no save verify(processService, times(15)).saveProcessInstance(processInstance); // one create save, and 9(1 to 20 step 2) for next save, and last day 31 no save verify(processService, times(9)).saveProcessInstance(processInstance); }catch (Exception e){ Assert.assertTrue(false); Assert.fail(); } } Loading dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +15 −3 Original line number Diff line number Diff line Loading @@ -17,11 +17,18 @@ package org.apache.dolphinscheduler.server.master.consumer; import java.util.Date; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; Loading @@ -34,6 +41,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; Loading @@ -42,8 +50,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.Date; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, Loading Loading @@ -250,5 +256,11 @@ public class TaskPriorityQueueConsumerTest { taskPriorityQueueConsumer.taskInstanceIsFinalState(1); } @After public void close() { Stopper.stop(); } } Loading
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +2 −2 Original line number Diff line number Diff line Loading @@ -123,8 +123,8 @@ public class MasterServer { // self tolerant this.zkMasterClient.start(); // masterSchedulerService.start(); // scheduler start this.masterSchedulerService.start(); // start QuartzExecutors // what system should do if exception Loading
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +9 −5 Original line number Diff line number Diff line Loading @@ -35,7 +35,6 @@ import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.AlertManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.slf4j.Logger; Loading Loading @@ -123,7 +122,7 @@ public class MasterExecThread implements Runnable { /** * alert manager */ private AlertManager alertManager = new AlertManager(); private AlertManager alertManager; /** * the object of DAG Loading Loading @@ -151,15 +150,20 @@ public class MasterExecThread implements Runnable { * @param processService processService * @param nettyRemotingClient nettyRemotingClient */ public MasterExecThread(ProcessInstance processInstance, ProcessService processService, NettyRemotingClient nettyRemotingClient){ public MasterExecThread(ProcessInstance processInstance , ProcessService processService , NettyRemotingClient nettyRemotingClient , AlertManager alertManager , MasterConfig masterConfig) { this.processService = processService; this.processInstance = processInstance; this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class); this.masterConfig = masterConfig; int masterTaskExecNum = masterConfig.getMasterExecTaskNum(); this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread", masterTaskExecNum); this.nettyRemotingClient = nettyRemotingClient; this.alertManager = alertManager; } Loading
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +18 −5 Original line number Diff line number Diff line Loading @@ -16,6 +16,11 @@ */ package org.apache.dolphinscheduler.server.master.runner; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.dolphinscheduler.common.Constants; Loading @@ -28,6 +33,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.AlertManager; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; Loading @@ -35,10 +41,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * master scheduler thread */ Loading Loading @@ -68,6 +70,11 @@ public class MasterSchedulerService extends Thread { @Autowired private MasterConfig masterConfig; /** * alert manager */ private AlertManager alertManager = new AlertManager(); /** * netty remoting client */ Loading Loading @@ -139,7 +146,13 @@ public class MasterSchedulerService extends Thread { this.masterConfig.getMasterExecThreads() - activeCount, command); if (processInstance != null) { logger.info("start master exec thread , split DAG ..."); masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient)); masterExecService.execute( new MasterExecThread( processInstance , processService , nettyRemotingClient , alertManager , masterConfig)); } }catch (Exception e){ logger.error("scan command error ", e); Loading
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java +11 −10 Original line number Diff line number Diff line Loading @@ -72,8 +72,6 @@ public class MasterExecThreadTest { applicationContext = mock(ApplicationContext.class); config = new MasterConfig(); config.setMasterExecTaskNum(1); SpringApplicationContext springApplicationContext = new SpringApplicationContext(); springApplicationContext.setApplicationContext(applicationContext); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); processInstance = mock(ProcessInstance.class); Loading @@ -84,14 +82,17 @@ public class MasterExecThreadTest { Mockito.when(processInstance.getScheduleTime()).thenReturn(DateUtils.stringToDate("2020-01-01 00:00:00")); Map<String, String> cmdParam = new HashMap<>(); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, "2020-01-01 00:00:00"); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, "2020-01-31 23:00:00"); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, "2020-01-20 23:00:00"); Mockito.when(processInstance.getCommandParam()).thenReturn(JSONUtils.toJsonString(cmdParam)); ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setGlobalParamMap(Collections.EMPTY_MAP); processDefinition.setGlobalParamList(Collections.EMPTY_LIST); Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processService,null)); masterExecThread = PowerMockito.spy(new MasterExecThread( processInstance , processService ,null, null, config)); // prepareProcess init dag Field dag = MasterExecThread.class.getDeclaredField("dag"); dag.setAccessible(true); Loading @@ -114,11 +115,11 @@ public class MasterExecThreadTest { Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess"); method.setAccessible(true); method.invoke(masterExecThread); // one create save, and 1-30 for next save, and last day 31 no save verify(processService, times(31)).saveProcessInstance(processInstance); // one create save, and 1-30 for next save, and last day 20 no save verify(processService, times(20)).saveProcessInstance(processInstance); }catch (Exception e){ e.printStackTrace(); Assert.assertTrue(false); Assert.fail(); } } Loading @@ -133,10 +134,10 @@ public class MasterExecThreadTest { Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess"); method.setAccessible(true); method.invoke(masterExecThread); // one create save, and 15(1 to 31 step 2) for next save, and last day 31 no save verify(processService, times(15)).saveProcessInstance(processInstance); // one create save, and 9(1 to 20 step 2) for next save, and last day 31 no save verify(processService, times(9)).saveProcessInstance(processInstance); }catch (Exception e){ Assert.assertTrue(false); Assert.fail(); } } Loading
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +15 −3 Original line number Diff line number Diff line Loading @@ -17,11 +17,18 @@ package org.apache.dolphinscheduler.server.master.consumer; import java.util.Date; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; Loading @@ -34,6 +41,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; Loading @@ -42,8 +50,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.Date; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, Loading Loading @@ -250,5 +256,11 @@ public class TaskPriorityQueueConsumerTest { taskPriorityQueueConsumer.taskInstanceIsFinalState(1); } @After public void close() { Stopper.stop(); } }