Loading dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java +7 −0 Original line number Diff line number Diff line Loading @@ -30,6 +30,13 @@ public interface ITaskQueue { */ List<String> getAllTasks(String key); /** * check if has a task * @param key queue name * @return true if has; false if not */ boolean hasTask(String key); /** * check task exists in the task queue or not * Loading dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java +15 −0 Original line number Diff line number Diff line Loading @@ -75,6 +75,21 @@ public class TaskQueueZkImpl implements ITaskQueue { return new ArrayList<>(); } /** * check if has a task * @param key queue name * @return true if has; false if not */ @Override public boolean hasTask(String key) { try { return zookeeperOperator.hasChildren(key); } catch (Exception e) { logger.error("check has task in tasks queue exception",e); } return false; } /** * check task exists in the task queue or not * Loading dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java +11 −0 Original line number Diff line number Diff line Loading @@ -27,6 +27,7 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; Loading Loading @@ -139,6 +140,16 @@ public class ZookeeperOperator implements InitializingBean { } } public boolean hasChildren(final String key){ Stat stat ; try { stat = zkClient.checkExists().forPath(key); return stat.getNumChildren() >= 1; } catch (Exception ex) { throw new IllegalStateException(ex); } } public boolean isExisted(final String key) { try { return zkClient.checkExists().forPath(key) != null; Loading dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java +10 −1 Original line number Diff line number Diff line Loading @@ -64,7 +64,16 @@ public class TaskQueueZKImplTest extends BaseTaskQueueTest { allTasks = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); assertEquals(allTasks.size(),0); } @Test public void hasTask(){ init(); boolean hasTask = tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); assertTrue(hasTask); //delete all tasksQueue.delete(); hasTask = tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); assertFalse(hasTask); } /** * test check task exists in the task queue or not */ Loading dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java +3 −2 Original line number Diff line number Diff line Loading @@ -150,8 +150,9 @@ public class FetchTaskThread implements Runnable{ } //whether have tasks, if no tasks , no need lock //get all tasks List<String> tasksQueueList = taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); if (CollectionUtils.isEmpty(tasksQueueList)){ boolean hasTask = taskQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); if (!hasTask){ Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } Loading Loading
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java +7 −0 Original line number Diff line number Diff line Loading @@ -30,6 +30,13 @@ public interface ITaskQueue { */ List<String> getAllTasks(String key); /** * check if has a task * @param key queue name * @return true if has; false if not */ boolean hasTask(String key); /** * check task exists in the task queue or not * Loading
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java +15 −0 Original line number Diff line number Diff line Loading @@ -75,6 +75,21 @@ public class TaskQueueZkImpl implements ITaskQueue { return new ArrayList<>(); } /** * check if has a task * @param key queue name * @return true if has; false if not */ @Override public boolean hasTask(String key) { try { return zookeeperOperator.hasChildren(key); } catch (Exception e) { logger.error("check has task in tasks queue exception",e); } return false; } /** * check task exists in the task queue or not * Loading
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java +11 −0 Original line number Diff line number Diff line Loading @@ -27,6 +27,7 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; Loading Loading @@ -139,6 +140,16 @@ public class ZookeeperOperator implements InitializingBean { } } public boolean hasChildren(final String key){ Stat stat ; try { stat = zkClient.checkExists().forPath(key); return stat.getNumChildren() >= 1; } catch (Exception ex) { throw new IllegalStateException(ex); } } public boolean isExisted(final String key) { try { return zkClient.checkExists().forPath(key) != null; Loading
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java +10 −1 Original line number Diff line number Diff line Loading @@ -64,7 +64,16 @@ public class TaskQueueZKImplTest extends BaseTaskQueueTest { allTasks = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); assertEquals(allTasks.size(),0); } @Test public void hasTask(){ init(); boolean hasTask = tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); assertTrue(hasTask); //delete all tasksQueue.delete(); hasTask = tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); assertFalse(hasTask); } /** * test check task exists in the task queue or not */ Loading
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java +3 −2 Original line number Diff line number Diff line Loading @@ -150,8 +150,9 @@ public class FetchTaskThread implements Runnable{ } //whether have tasks, if no tasks , no need lock //get all tasks List<String> tasksQueueList = taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); if (CollectionUtils.isEmpty(tasksQueueList)){ boolean hasTask = taskQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); if (!hasTask){ Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } Loading