Loading dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java +1 −5 Original line number Diff line number Diff line Loading @@ -145,7 +145,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { public List<String> poll(String key, int tasksNum) { try{ CuratorFramework zk = getZkClient(); String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH; List<String> list = zk.getChildren().forPath(getTasksPath(key)); if(list != null && list.size() > 0){ Loading @@ -155,7 +154,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { int size = list.size(); Set<String> taskTreeSet = new TreeSet<>(new Comparator<String>() { @Override public int compare(String o1, String o2) { Loading Loading @@ -183,7 +181,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { String taskDetail = list.get(i); String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE); //forward compatibility 向前版本兼容 //forward compatibility if(taskDetailArrs.length >= 4){ //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} Loading @@ -201,9 +199,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { formatTask += Constants.UNDERLINE + taskDetailArrs[4]; } taskTreeSet.add(formatTask); } } List<String> taskslist = getTasksListFromTreeSet(tasksNum, taskTreeSet); Loading dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueImplTest.java→dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java +228 −0 Original line number Diff line number Diff line Loading @@ -19,40 +19,35 @@ package org.apache.dolphinscheduler.common.queue; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.IpUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.zk.ZKServer; import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.Random; import static org.junit.Assert.assertEquals; import static org.junit.Assert.*; /** * task queue test */ public class TaskQueueImplTest { public class TaskQueueZKImplTest { private static final Logger logger = LoggerFactory.getLogger(TaskQueueImplTest.class); ITaskQueue tasksQueue = null; private ITaskQueue tasksQueue = null; @Before public void before(){ ZKServer.start(); ZKServer.start(); tasksQueue = TaskQueueFactory.getTaskQueueInstance(); //clear all data tasksQueue.delete(); } @After public void after(){ //clear all data Loading @@ -61,9 +56,48 @@ public class TaskQueueImplTest { } /** * test take out all the elements */ @Test public void getAllTasks(){ //add init(); // get all List<String> allTasks = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); assertEquals(allTasks.size(),2); //delete all tasksQueue.delete(); allTasks = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); assertEquals(allTasks.size(),0); } /** * test check task exists in the task queue or not */ @Test public void testAdd(){ public void checkTaskExists(){ String task= "1_0_1_1_-1"; //add init(); // check Exist true boolean taskExists = tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, task); assertTrue(taskExists); //remove task tasksQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); // check Exist false taskExists = tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, task); assertFalse(taskExists); } /** * test add element to the queue */ @Test public void add(){ //add tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_0_1_1_-1"); Loading @@ -79,10 +113,92 @@ public class TaskQueueImplTest { //pop String node1 = tasks.get(0); assertEquals(node1,"0_0_0_1_" + IpUtils.ipToLong(OSUtils.getHost())); } /** * test element pops out of the queue */ @Test public void poll(){ //add init(); List<String> taskList = tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 2); assertEquals(taskList.size(),2); assertEquals(taskList.get(0),"0_1_1_1_-1"); assertEquals(taskList.get(1),"1_0_1_1_-1"); } /** * test remove element from queue */ @Test public void removeNode(){ String task = "1_0_1_1_-1"; //add init(); tasksQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); assertFalse(tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task)); } /** * test add an element to the set */ @Test public void sadd(){ String task = "1_0_1_1_-1"; tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); //check size assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1); } /** * test delete the value corresponding to the key in the set */ @Test public void srem(){ String task = "1_0_1_1_-1"; tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); //check size assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1); //remove and get size tasksQueue.srem(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),0); } /** * test gets all the elements of the set based on the key */ @Test public void smembers(){ //first init assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),0); //add String task = "1_0_1_1_-1"; tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); //check size assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1); //add task = "0_1_1_1_"; tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); //check size assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),2); } /** * init data */ private void init(){ //add tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_0_1_1_-1"); tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_1_1_1_-1"); } Loading pom.xml +1 −0 Original line number Diff line number Diff line Loading @@ -614,6 +614,7 @@ <includes> <include>**/common/utils/*.java</include> <include>**/common/graph/*.java</include> <include>**/common/queue/*.java</include> <include>**/api/utils/CheckUtilsTest.java</include> <include>**/api/utils/FileUtilsTest.java</include> </includes> Loading Loading
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java +1 −5 Original line number Diff line number Diff line Loading @@ -145,7 +145,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { public List<String> poll(String key, int tasksNum) { try{ CuratorFramework zk = getZkClient(); String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH; List<String> list = zk.getChildren().forPath(getTasksPath(key)); if(list != null && list.size() > 0){ Loading @@ -155,7 +154,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { int size = list.size(); Set<String> taskTreeSet = new TreeSet<>(new Comparator<String>() { @Override public int compare(String o1, String o2) { Loading Loading @@ -183,7 +181,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { String taskDetail = list.get(i); String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE); //forward compatibility 向前版本兼容 //forward compatibility if(taskDetailArrs.length >= 4){ //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} Loading @@ -201,9 +199,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { formatTask += Constants.UNDERLINE + taskDetailArrs[4]; } taskTreeSet.add(formatTask); } } List<String> taskslist = getTasksListFromTreeSet(tasksNum, taskTreeSet); Loading
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueImplTest.java→dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java +228 −0 Original line number Diff line number Diff line Loading @@ -19,40 +19,35 @@ package org.apache.dolphinscheduler.common.queue; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.IpUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.zk.ZKServer; import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.Random; import static org.junit.Assert.assertEquals; import static org.junit.Assert.*; /** * task queue test */ public class TaskQueueImplTest { public class TaskQueueZKImplTest { private static final Logger logger = LoggerFactory.getLogger(TaskQueueImplTest.class); ITaskQueue tasksQueue = null; private ITaskQueue tasksQueue = null; @Before public void before(){ ZKServer.start(); ZKServer.start(); tasksQueue = TaskQueueFactory.getTaskQueueInstance(); //clear all data tasksQueue.delete(); } @After public void after(){ //clear all data Loading @@ -61,9 +56,48 @@ public class TaskQueueImplTest { } /** * test take out all the elements */ @Test public void getAllTasks(){ //add init(); // get all List<String> allTasks = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); assertEquals(allTasks.size(),2); //delete all tasksQueue.delete(); allTasks = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); assertEquals(allTasks.size(),0); } /** * test check task exists in the task queue or not */ @Test public void testAdd(){ public void checkTaskExists(){ String task= "1_0_1_1_-1"; //add init(); // check Exist true boolean taskExists = tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, task); assertTrue(taskExists); //remove task tasksQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); // check Exist false taskExists = tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, task); assertFalse(taskExists); } /** * test add element to the queue */ @Test public void add(){ //add tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_0_1_1_-1"); Loading @@ -79,10 +113,92 @@ public class TaskQueueImplTest { //pop String node1 = tasks.get(0); assertEquals(node1,"0_0_0_1_" + IpUtils.ipToLong(OSUtils.getHost())); } /** * test element pops out of the queue */ @Test public void poll(){ //add init(); List<String> taskList = tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 2); assertEquals(taskList.size(),2); assertEquals(taskList.get(0),"0_1_1_1_-1"); assertEquals(taskList.get(1),"1_0_1_1_-1"); } /** * test remove element from queue */ @Test public void removeNode(){ String task = "1_0_1_1_-1"; //add init(); tasksQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); assertFalse(tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task)); } /** * test add an element to the set */ @Test public void sadd(){ String task = "1_0_1_1_-1"; tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); //check size assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1); } /** * test delete the value corresponding to the key in the set */ @Test public void srem(){ String task = "1_0_1_1_-1"; tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); //check size assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1); //remove and get size tasksQueue.srem(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),0); } /** * test gets all the elements of the set based on the key */ @Test public void smembers(){ //first init assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),0); //add String task = "1_0_1_1_-1"; tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); //check size assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1); //add task = "0_1_1_1_"; tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); //check size assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),2); } /** * init data */ private void init(){ //add tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_0_1_1_-1"); tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_1_1_1_-1"); } Loading
pom.xml +1 −0 Original line number Diff line number Diff line Loading @@ -614,6 +614,7 @@ <includes> <include>**/common/utils/*.java</include> <include>**/common/graph/*.java</include> <include>**/common/queue/*.java</include> <include>**/api/utils/CheckUtilsTest.java</include> <include>**/api/utils/FileUtilsTest.java</include> </includes> Loading