Loading dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueFactory.java +2 −1 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.queue; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.utils.SpringApplicationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; Loading @@ -43,7 +44,7 @@ public class TaskQueueFactory { String queueImplValue = CommonUtils.getQueueImplValue(); if (StringUtils.isNotBlank(queueImplValue)) { logger.info("task queue impl use zookeeper "); return TaskQueueZkImpl.getInstance(); return SpringApplicationContext.getBean(TaskQueueZkImpl.class); }else{ logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit "); System.exit(-1); Loading dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java +23 −140 Original line number Diff line number Diff line Loading @@ -17,22 +17,14 @@ package org.apache.dolphinscheduler.common.queue; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.Bytes; import org.apache.dolphinscheduler.common.utils.IpUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.zk.DefaultEnsembleProvider; import org.apache.dolphinscheduler.common.zk.ZookeeperConfig; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.apache.dolphinscheduler.common.zk.ZookeeperOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.*; Loading @@ -40,35 +32,13 @@ import java.util.*; * A singleton of a task queue implemented with zookeeper * tasks queue implemention */ @Service public class TaskQueueZkImpl implements ITaskQueue { private static final Logger logger = LoggerFactory.getLogger(TaskQueueZkImpl.class); private static volatile TaskQueueZkImpl instance; private CuratorFramework zkClient; private ZookeeperConfig zookeeperConfig; private CuratorFramework getZkClient() { return zkClient; } private TaskQueueZkImpl(){ init(); } public static TaskQueueZkImpl getInstance(){ if (null == instance) { synchronized (TaskQueueZkImpl.class) { if(null == instance) { instance = new TaskQueueZkImpl(); } } } return instance; } @Autowired private ZookeeperOperator zookeeperOperator; /** * get all tasks from tasks queue Loading @@ -78,14 +48,12 @@ public class TaskQueueZkImpl implements ITaskQueue { @Override public List<String> getAllTasks(String key) { try { List<String> list = getZkClient().getChildren().forPath(getTasksPath(key)); List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key)); return list; } catch (Exception e) { logger.error("get all tasks from tasks queue exception",e); } return new ArrayList<String>(); return new ArrayList<>(); } /** Loading @@ -99,22 +67,8 @@ public class TaskQueueZkImpl implements ITaskQueue { public boolean checkTaskExists(String key, String task) { String taskPath = getTasksPath(key) + Constants.SINGLE_SLASH + task; try { Stat stat = zkClient.checkExists().forPath(taskPath); return zookeeperOperator.isExisted(taskPath); if(null == stat){ logger.info("check task:{} not exist in task queue",task); return false; }else{ logger.info("check task {} exists in task queue ",task); return true; } } catch (Exception e) { logger.info(String.format("task {} check exists in task queue exception ", task), e); } return false; } Loading @@ -128,9 +82,7 @@ public class TaskQueueZkImpl implements ITaskQueue { public boolean add(String key, String value){ try { String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value; String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value)); logger.info("add task : {} to tasks queue , result success",result); zookeeperOperator.persist(taskIdPath, value); return true; } catch (Exception e) { logger.error("add task to tasks queue exception",e); Loading @@ -153,8 +105,7 @@ public class TaskQueueZkImpl implements ITaskQueue { @Override public List<String> poll(String key, int tasksNum) { try{ CuratorFramework zk = getZkClient(); List<String> list = zk.getChildren().forPath(getTasksPath(key)); List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key)); if(list != null && list.size() > 0){ Loading Loading @@ -277,15 +228,12 @@ public class TaskQueueZkImpl implements ITaskQueue { @Override public void removeNode(String key, String nodeValue){ CuratorFramework zk = getZkClient(); String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH; String taskIdPath = tasksQueuePath + nodeValue; logger.info("consume task {}", taskIdPath); logger.info("removeNode task {}", taskIdPath); try{ Stat stat = zk.checkExists().forPath(taskIdPath); if(stat != null){ zk.delete().forPath(taskIdPath); } zookeeperOperator.remove(taskIdPath); }catch(Exception e){ logger.error(String.format("delete task:%s from zookeeper fail, exception:" ,nodeValue) ,e); } Loading @@ -307,12 +255,9 @@ public class TaskQueueZkImpl implements ITaskQueue { if(value != null && value.trim().length() > 0){ String path = getTasksPath(key) + Constants.SINGLE_SLASH; CuratorFramework zk = getZkClient(); Stat stat = zk.checkExists().forPath(path + value); if(null == stat){ String result = zk.create().withMode(CreateMode.PERSISTENT).forPath(path + value,Bytes.toBytes(value)); logger.info("add task:{} to tasks set result:{} ",value,result); if(!zookeeperOperator.isExisted(path + value)){ zookeeperOperator.persist(path + value,value); logger.info("add task:{} to tasks set ",value); } else{ logger.info("task {} exists in tasks set ",value); } Loading @@ -336,15 +281,7 @@ public class TaskQueueZkImpl implements ITaskQueue { public void srem(String key, String value) { try{ String path = getTasksPath(key) + Constants.SINGLE_SLASH; CuratorFramework zk = getZkClient(); Stat stat = zk.checkExists().forPath(path + value); if(null != stat){ zk.delete().forPath(path + value); logger.info("delete task:{} from tasks set ",value); }else{ logger.info("delete task:{} from tasks set fail, there is no this task",value); } zookeeperOperator.remove(path + value); }catch(Exception e){ logger.error(String.format("delete task:" + value + " exception"),e); Loading @@ -363,7 +300,7 @@ public class TaskQueueZkImpl implements ITaskQueue { Set<String> tasksSet = new HashSet<>(); try { List<String> list = getZkClient().getChildren().forPath(getTasksPath(key)); List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key)); for (String task : list) { tasksSet.add(task); Loading @@ -377,56 +314,6 @@ public class TaskQueueZkImpl implements ITaskQueue { return tasksSet; } /** * Init the task queue of zookeeper node */ private void init(){ initZkClient(); try { String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL); for(String taskQueuePath : new String[]{tasksQueuePath,tasksCancelPath}){ if(zkClient.checkExists().forPath(taskQueuePath) == null){ // create a persistent parent node zkClient.create().creatingParentContainersIfNeeded() .withMode(CreateMode.PERSISTENT).forPath(taskQueuePath); logger.info("create tasks queue parent node success : {} ",taskQueuePath); } } } catch (Exception e) { logger.error("create zk node failure",e); } } private void initZkClient() { Configuration conf = null; try { conf = new PropertiesConfiguration(Constants.ZOOKEEPER_PROPERTIES_PATH); } catch (ConfigurationException ex) { logger.error("load zookeeper properties file failed, system exit"); System.exit(-1); } zkClient = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(conf.getString("zookeeper.quorum"))) .retryPolicy(new ExponentialBackoffRetry(conf.getInt("zookeeper.retry.base.sleep"), conf.getInt("zookeeper.retry.maxtime"), conf.getInt("zookeeper.retry.max.sleep"))) .sessionTimeoutMs(conf.getInt("zookeeper.session.timeout")) .connectionTimeoutMs(conf.getInt("zookeeper.connection.timeout")) .build(); zkClient.start(); try { zkClient.blockUntilConnected(); } catch (final Exception ex) { throw new RuntimeException(ex); } } /** * Clear the task queue of zookeeper node */ Loading @@ -437,16 +324,12 @@ public class TaskQueueZkImpl implements ITaskQueue { String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL); for(String taskQueuePath : new String[]{tasksQueuePath,tasksCancelPath}){ if(zkClient.checkExists().forPath(taskQueuePath) != null){ List<String> list = zkClient.getChildren().forPath(taskQueuePath); if(zookeeperOperator.isExisted(taskQueuePath)){ List<String> list = zookeeperOperator.getChildrenKeys(taskQueuePath); for (String task : list) { zkClient.delete().forPath(taskQueuePath + Constants.SINGLE_SLASH + task); zookeeperOperator.remove(taskQueuePath + Constants.SINGLE_SLASH + task); logger.info("delete task from tasks queue : {}/{} ",taskQueuePath,task); } } } Loading dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SpringApplicationContext.java→dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SpringApplicationContext.java +1 −1 Original line number Diff line number Diff line Loading @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.server.utils; package org.apache.dolphinscheduler.common.utils; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; Loading dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java +0 −3 Original line number Diff line number Diff line Loading @@ -127,9 +127,6 @@ public class ZookeeperOperator implements InitializingBean { List<String> values; try { values = zkClient.getChildren().forPath(key); if (CollectionUtils.isEmpty(values)) { logger.warn("getChildrenKeys key : {} is empty", key); } return values; } catch (InterruptedException ex) { logger.error("getChildrenKeys key : {} InterruptedException", key); Loading dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java +2 −1 Original line number Diff line number Diff line Loading @@ -105,7 +105,8 @@ public class ProcessDao { /** * task queue impl */ protected ITaskQueue taskQueue = TaskQueueFactory.getTaskQueueInstance(); @Autowired private ITaskQueue taskQueue; /** * handle Command (construct ProcessInstance from Command) , wrapped in transaction * @param logger logger Loading Loading
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueFactory.java +2 −1 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.queue; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.utils.SpringApplicationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; Loading @@ -43,7 +44,7 @@ public class TaskQueueFactory { String queueImplValue = CommonUtils.getQueueImplValue(); if (StringUtils.isNotBlank(queueImplValue)) { logger.info("task queue impl use zookeeper "); return TaskQueueZkImpl.getInstance(); return SpringApplicationContext.getBean(TaskQueueZkImpl.class); }else{ logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit "); System.exit(-1); Loading
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java +23 −140 Original line number Diff line number Diff line Loading @@ -17,22 +17,14 @@ package org.apache.dolphinscheduler.common.queue; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.Bytes; import org.apache.dolphinscheduler.common.utils.IpUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.zk.DefaultEnsembleProvider; import org.apache.dolphinscheduler.common.zk.ZookeeperConfig; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.apache.dolphinscheduler.common.zk.ZookeeperOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.*; Loading @@ -40,35 +32,13 @@ import java.util.*; * A singleton of a task queue implemented with zookeeper * tasks queue implemention */ @Service public class TaskQueueZkImpl implements ITaskQueue { private static final Logger logger = LoggerFactory.getLogger(TaskQueueZkImpl.class); private static volatile TaskQueueZkImpl instance; private CuratorFramework zkClient; private ZookeeperConfig zookeeperConfig; private CuratorFramework getZkClient() { return zkClient; } private TaskQueueZkImpl(){ init(); } public static TaskQueueZkImpl getInstance(){ if (null == instance) { synchronized (TaskQueueZkImpl.class) { if(null == instance) { instance = new TaskQueueZkImpl(); } } } return instance; } @Autowired private ZookeeperOperator zookeeperOperator; /** * get all tasks from tasks queue Loading @@ -78,14 +48,12 @@ public class TaskQueueZkImpl implements ITaskQueue { @Override public List<String> getAllTasks(String key) { try { List<String> list = getZkClient().getChildren().forPath(getTasksPath(key)); List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key)); return list; } catch (Exception e) { logger.error("get all tasks from tasks queue exception",e); } return new ArrayList<String>(); return new ArrayList<>(); } /** Loading @@ -99,22 +67,8 @@ public class TaskQueueZkImpl implements ITaskQueue { public boolean checkTaskExists(String key, String task) { String taskPath = getTasksPath(key) + Constants.SINGLE_SLASH + task; try { Stat stat = zkClient.checkExists().forPath(taskPath); return zookeeperOperator.isExisted(taskPath); if(null == stat){ logger.info("check task:{} not exist in task queue",task); return false; }else{ logger.info("check task {} exists in task queue ",task); return true; } } catch (Exception e) { logger.info(String.format("task {} check exists in task queue exception ", task), e); } return false; } Loading @@ -128,9 +82,7 @@ public class TaskQueueZkImpl implements ITaskQueue { public boolean add(String key, String value){ try { String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value; String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value)); logger.info("add task : {} to tasks queue , result success",result); zookeeperOperator.persist(taskIdPath, value); return true; } catch (Exception e) { logger.error("add task to tasks queue exception",e); Loading @@ -153,8 +105,7 @@ public class TaskQueueZkImpl implements ITaskQueue { @Override public List<String> poll(String key, int tasksNum) { try{ CuratorFramework zk = getZkClient(); List<String> list = zk.getChildren().forPath(getTasksPath(key)); List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key)); if(list != null && list.size() > 0){ Loading Loading @@ -277,15 +228,12 @@ public class TaskQueueZkImpl implements ITaskQueue { @Override public void removeNode(String key, String nodeValue){ CuratorFramework zk = getZkClient(); String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH; String taskIdPath = tasksQueuePath + nodeValue; logger.info("consume task {}", taskIdPath); logger.info("removeNode task {}", taskIdPath); try{ Stat stat = zk.checkExists().forPath(taskIdPath); if(stat != null){ zk.delete().forPath(taskIdPath); } zookeeperOperator.remove(taskIdPath); }catch(Exception e){ logger.error(String.format("delete task:%s from zookeeper fail, exception:" ,nodeValue) ,e); } Loading @@ -307,12 +255,9 @@ public class TaskQueueZkImpl implements ITaskQueue { if(value != null && value.trim().length() > 0){ String path = getTasksPath(key) + Constants.SINGLE_SLASH; CuratorFramework zk = getZkClient(); Stat stat = zk.checkExists().forPath(path + value); if(null == stat){ String result = zk.create().withMode(CreateMode.PERSISTENT).forPath(path + value,Bytes.toBytes(value)); logger.info("add task:{} to tasks set result:{} ",value,result); if(!zookeeperOperator.isExisted(path + value)){ zookeeperOperator.persist(path + value,value); logger.info("add task:{} to tasks set ",value); } else{ logger.info("task {} exists in tasks set ",value); } Loading @@ -336,15 +281,7 @@ public class TaskQueueZkImpl implements ITaskQueue { public void srem(String key, String value) { try{ String path = getTasksPath(key) + Constants.SINGLE_SLASH; CuratorFramework zk = getZkClient(); Stat stat = zk.checkExists().forPath(path + value); if(null != stat){ zk.delete().forPath(path + value); logger.info("delete task:{} from tasks set ",value); }else{ logger.info("delete task:{} from tasks set fail, there is no this task",value); } zookeeperOperator.remove(path + value); }catch(Exception e){ logger.error(String.format("delete task:" + value + " exception"),e); Loading @@ -363,7 +300,7 @@ public class TaskQueueZkImpl implements ITaskQueue { Set<String> tasksSet = new HashSet<>(); try { List<String> list = getZkClient().getChildren().forPath(getTasksPath(key)); List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key)); for (String task : list) { tasksSet.add(task); Loading @@ -377,56 +314,6 @@ public class TaskQueueZkImpl implements ITaskQueue { return tasksSet; } /** * Init the task queue of zookeeper node */ private void init(){ initZkClient(); try { String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL); for(String taskQueuePath : new String[]{tasksQueuePath,tasksCancelPath}){ if(zkClient.checkExists().forPath(taskQueuePath) == null){ // create a persistent parent node zkClient.create().creatingParentContainersIfNeeded() .withMode(CreateMode.PERSISTENT).forPath(taskQueuePath); logger.info("create tasks queue parent node success : {} ",taskQueuePath); } } } catch (Exception e) { logger.error("create zk node failure",e); } } private void initZkClient() { Configuration conf = null; try { conf = new PropertiesConfiguration(Constants.ZOOKEEPER_PROPERTIES_PATH); } catch (ConfigurationException ex) { logger.error("load zookeeper properties file failed, system exit"); System.exit(-1); } zkClient = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(conf.getString("zookeeper.quorum"))) .retryPolicy(new ExponentialBackoffRetry(conf.getInt("zookeeper.retry.base.sleep"), conf.getInt("zookeeper.retry.maxtime"), conf.getInt("zookeeper.retry.max.sleep"))) .sessionTimeoutMs(conf.getInt("zookeeper.session.timeout")) .connectionTimeoutMs(conf.getInt("zookeeper.connection.timeout")) .build(); zkClient.start(); try { zkClient.blockUntilConnected(); } catch (final Exception ex) { throw new RuntimeException(ex); } } /** * Clear the task queue of zookeeper node */ Loading @@ -437,16 +324,12 @@ public class TaskQueueZkImpl implements ITaskQueue { String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL); for(String taskQueuePath : new String[]{tasksQueuePath,tasksCancelPath}){ if(zkClient.checkExists().forPath(taskQueuePath) != null){ List<String> list = zkClient.getChildren().forPath(taskQueuePath); if(zookeeperOperator.isExisted(taskQueuePath)){ List<String> list = zookeeperOperator.getChildrenKeys(taskQueuePath); for (String task : list) { zkClient.delete().forPath(taskQueuePath + Constants.SINGLE_SLASH + task); zookeeperOperator.remove(taskQueuePath + Constants.SINGLE_SLASH + task); logger.info("delete task from tasks queue : {}/{} ",taskQueuePath,task); } } } Loading
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SpringApplicationContext.java→dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SpringApplicationContext.java +1 −1 Original line number Diff line number Diff line Loading @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.server.utils; package org.apache.dolphinscheduler.common.utils; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; Loading
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java +0 −3 Original line number Diff line number Diff line Loading @@ -127,9 +127,6 @@ public class ZookeeperOperator implements InitializingBean { List<String> values; try { values = zkClient.getChildren().forPath(key); if (CollectionUtils.isEmpty(values)) { logger.warn("getChildrenKeys key : {} is empty", key); } return values; } catch (InterruptedException ex) { logger.error("getChildrenKeys key : {} InterruptedException", key); Loading
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java +2 −1 Original line number Diff line number Diff line Loading @@ -105,7 +105,8 @@ public class ProcessDao { /** * task queue impl */ protected ITaskQueue taskQueue = TaskQueueFactory.getTaskQueueInstance(); @Autowired private ITaskQueue taskQueue; /** * handle Command (construct ProcessInstance from Command) , wrapped in transaction * @param logger logger Loading