Commit 8b2b5ba6 authored by Tboy's avatar Tboy Committed by qiaozhanwei
Browse files

refactor TaskQueueZkImpl (#1591)

* fix #1515

* sleep when resource in not satisfy. fix #1522

* add sleep 1s for no command

* fix MasterBaseTaskExecThread submit method bug

* updates

* add log

* delete lombok

* remove duplicate code

* refactor TaskQueueZkImpl

* ignore First , we have to rewrite

* updates
parent aadf0aae
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.queue;

import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -43,7 +44,7 @@ public class TaskQueueFactory {
    String queueImplValue = CommonUtils.getQueueImplValue();
    if (StringUtils.isNotBlank(queueImplValue)) {
        logger.info("task queue impl use zookeeper ");
        return TaskQueueZkImpl.getInstance();
        return SpringApplicationContext.getBean(TaskQueueZkImpl.class);
    }else{
      logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit ");
      System.exit(-1);
+23 −140
Original line number Diff line number Diff line
@@ -17,22 +17,14 @@
package org.apache.dolphinscheduler.common.queue;


import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.Bytes;
import org.apache.dolphinscheduler.common.utils.IpUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.zk.DefaultEnsembleProvider;
import org.apache.dolphinscheduler.common.zk.ZookeeperConfig;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.apache.dolphinscheduler.common.zk.ZookeeperOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.*;

@@ -40,35 +32,13 @@ import java.util.*;
 * A singleton of a task queue implemented with zookeeper
 * tasks queue implemention
 */
@Service
public class TaskQueueZkImpl implements ITaskQueue {

    private static final Logger logger = LoggerFactory.getLogger(TaskQueueZkImpl.class);

    private static volatile TaskQueueZkImpl instance;

    private CuratorFramework zkClient;

    private ZookeeperConfig zookeeperConfig;

    private CuratorFramework getZkClient() {
        return zkClient;
    }

    private TaskQueueZkImpl(){
        init();
    }

    public static TaskQueueZkImpl getInstance(){
        if (null == instance) {
            synchronized (TaskQueueZkImpl.class) {
                if(null == instance) {
                    instance = new TaskQueueZkImpl();
                }
            }
        }
        return instance;
    }

    @Autowired
    private ZookeeperOperator zookeeperOperator;

    /**
     * get all tasks from tasks queue
@@ -78,14 +48,12 @@ public class TaskQueueZkImpl implements ITaskQueue {
    @Override
    public List<String> getAllTasks(String key) {
        try {
            List<String> list = getZkClient().getChildren().forPath(getTasksPath(key));

            List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key));
            return list;
        } catch (Exception e) {
            logger.error("get all tasks from tasks queue exception",e);
        }

        return new ArrayList<String>();
        return new ArrayList<>();
    }

    /**
@@ -99,22 +67,8 @@ public class TaskQueueZkImpl implements ITaskQueue {
    public boolean checkTaskExists(String key, String task) {
        String taskPath = getTasksPath(key) + Constants.SINGLE_SLASH + task;

        try {
            Stat stat = zkClient.checkExists().forPath(taskPath);
        return zookeeperOperator.isExisted(taskPath);

            if(null == stat){
                logger.info("check task:{} not exist in task queue",task);
                return false;
            }else{
                logger.info("check task {} exists in task queue ",task);
                return true;
            }

        } catch (Exception e) {
            logger.info(String.format("task {} check exists in task queue exception ", task), e);
        }

        return false;
    }


@@ -128,9 +82,7 @@ public class TaskQueueZkImpl implements ITaskQueue {
    public boolean add(String key, String value){
        try {
            String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value;
            String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value));

            logger.info("add task : {} to tasks queue , result success",result);
            zookeeperOperator.persist(taskIdPath, value);
            return true;
        } catch (Exception e) {
            logger.error("add task to tasks queue exception",e);
@@ -153,8 +105,7 @@ public class TaskQueueZkImpl implements ITaskQueue {
    @Override
    public List<String> poll(String key, int tasksNum) {
        try{
            CuratorFramework zk = getZkClient();
            List<String> list = zk.getChildren().forPath(getTasksPath(key));
            List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key));

            if(list != null && list.size() > 0){

@@ -277,15 +228,12 @@ public class TaskQueueZkImpl implements ITaskQueue {
    @Override
    public void removeNode(String key, String nodeValue){

        CuratorFramework zk = getZkClient();
        String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
        String taskIdPath = tasksQueuePath + nodeValue;
        logger.info("consume task {}", taskIdPath);
        logger.info("removeNode task {}", taskIdPath);
        try{
            Stat stat = zk.checkExists().forPath(taskIdPath);
            if(stat != null){
                zk.delete().forPath(taskIdPath);
            }
            zookeeperOperator.remove(taskIdPath);

        }catch(Exception e){
            logger.error(String.format("delete task:%s from zookeeper fail, exception:" ,nodeValue) ,e);
        }
@@ -307,12 +255,9 @@ public class TaskQueueZkImpl implements ITaskQueue {

            if(value != null && value.trim().length() > 0){
                String path = getTasksPath(key) + Constants.SINGLE_SLASH;
                CuratorFramework zk = getZkClient();
                Stat stat = zk.checkExists().forPath(path + value);

                if(null == stat){
                    String result = zk.create().withMode(CreateMode.PERSISTENT).forPath(path + value,Bytes.toBytes(value));
                    logger.info("add task:{} to tasks set result:{} ",value,result);
                if(!zookeeperOperator.isExisted(path + value)){
                    zookeeperOperator.persist(path + value,value);
                    logger.info("add task:{} to tasks set ",value);
                } else{
                    logger.info("task {} exists in tasks set ",value);
                }
@@ -336,15 +281,7 @@ public class TaskQueueZkImpl implements ITaskQueue {
    public void srem(String key, String value) {
        try{
            String path = getTasksPath(key) + Constants.SINGLE_SLASH;
            CuratorFramework zk = getZkClient();
            Stat stat = zk.checkExists().forPath(path + value);

            if(null != stat){
                zk.delete().forPath(path + value);
                logger.info("delete task:{} from tasks set ",value);
            }else{
                logger.info("delete task:{} from tasks set fail, there is no this task",value);
            }
            zookeeperOperator.remove(path + value);

        }catch(Exception e){
            logger.error(String.format("delete task:" + value + " exception"),e);
@@ -363,7 +300,7 @@ public class TaskQueueZkImpl implements ITaskQueue {
        Set<String> tasksSet = new HashSet<>();

        try {
            List<String> list = getZkClient().getChildren().forPath(getTasksPath(key));
            List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key));

            for (String task : list) {
                tasksSet.add(task);
@@ -377,56 +314,6 @@ public class TaskQueueZkImpl implements ITaskQueue {
        return tasksSet;
    }



    /**
     * Init the task queue of zookeeper node
     */
    private void init(){
        initZkClient();
        try {
            String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
            String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);

            for(String taskQueuePath : new String[]{tasksQueuePath,tasksCancelPath}){
                if(zkClient.checkExists().forPath(taskQueuePath) == null){
                    // create a persistent parent node
                    zkClient.create().creatingParentContainersIfNeeded()
                            .withMode(CreateMode.PERSISTENT).forPath(taskQueuePath);
                    logger.info("create tasks queue parent node success : {} ",taskQueuePath);
                }
            }

        } catch (Exception e) {
            logger.error("create zk node failure",e);
        }
    }

    private void initZkClient() {

        Configuration conf = null;
        try {
            conf = new PropertiesConfiguration(Constants.ZOOKEEPER_PROPERTIES_PATH);
        } catch (ConfigurationException ex) {
            logger.error("load zookeeper properties file failed, system exit");
            System.exit(-1);
        }

        zkClient = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(conf.getString("zookeeper.quorum")))
                .retryPolicy(new ExponentialBackoffRetry(conf.getInt("zookeeper.retry.base.sleep"), conf.getInt("zookeeper.retry.maxtime"), conf.getInt("zookeeper.retry.max.sleep")))
                .sessionTimeoutMs(conf.getInt("zookeeper.session.timeout"))
                .connectionTimeoutMs(conf.getInt("zookeeper.connection.timeout"))
                .build();

        zkClient.start();
        try {
            zkClient.blockUntilConnected();
        } catch (final Exception ex) {
            throw new RuntimeException(ex);
        }
    }


    /**
     * Clear the task queue of zookeeper node
     */
@@ -437,16 +324,12 @@ public class TaskQueueZkImpl implements ITaskQueue {
            String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);

            for(String taskQueuePath : new String[]{tasksQueuePath,tasksCancelPath}){
                if(zkClient.checkExists().forPath(taskQueuePath) != null){

                    List<String> list = zkClient.getChildren().forPath(taskQueuePath);

                if(zookeeperOperator.isExisted(taskQueuePath)){
                    List<String> list = zookeeperOperator.getChildrenKeys(taskQueuePath);
                    for (String task : list) {
                        zkClient.delete().forPath(taskQueuePath + Constants.SINGLE_SLASH + task);
                        zookeeperOperator.remove(taskQueuePath + Constants.SINGLE_SLASH + task);
                        logger.info("delete task from tasks queue : {}/{} ",taskQueuePath,task);

                    }

                }
            }

+1 −1
Original line number Diff line number Diff line
@@ -14,7 +14,7 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.server.utils;
package org.apache.dolphinscheduler.common.utils;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
+0 −3
Original line number Diff line number Diff line
@@ -127,9 +127,6 @@ public class ZookeeperOperator implements InitializingBean {
        List<String> values;
        try {
            values = zkClient.getChildren().forPath(key);
            if (CollectionUtils.isEmpty(values)) {
                logger.warn("getChildrenKeys key : {} is empty", key);
            }
            return values;
        } catch (InterruptedException ex) {
            logger.error("getChildrenKeys key : {} InterruptedException", key);
+1 −3
Original line number Diff line number Diff line
@@ -32,11 +32,9 @@ import static org.junit.Assert.*;
/**
 * task queue test
 */
@Ignore
public class TaskQueueZKImplTest extends BaseTaskQueueTest  {




    @Before
    public void before(){

Loading