Unverified Commit 672a4825 authored by Tboy's avatar Tboy Committed by GitHub
Browse files

Merge pull request #1700 from fordeal-smalldata/fix-task-zknode-miss

Fixed tasks_queue and tasks_kill did not exist in zookeeper #1696
parents 88423748 cae656c1
Loading
Loading
Loading
Loading
+27 −8
Original line number Diff line number Diff line
@@ -37,8 +37,27 @@ public class TaskQueueZkImpl implements ITaskQueue {

    private static final Logger logger = LoggerFactory.getLogger(TaskQueueZkImpl.class);

    private final ZookeeperOperator zookeeperOperator;

    @Autowired
    private ZookeeperOperator zookeeperOperator;
    public TaskQueueZkImpl(ZookeeperOperator zookeeperOperator) {
        this.zookeeperOperator = zookeeperOperator;

        try {
            String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
            String tasksKillPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);

            for (String key : new String[]{tasksQueuePath,tasksKillPath}){
                if (!zookeeperOperator.isExisted(key)){
                    zookeeperOperator.persist(key, "");
                    logger.info("create tasks queue parent node success : {}", key);
                }
            }
        } catch (Exception e) {
            logger.error("create tasks queue parent node failure", e);
        }
    }


    /**
     * get all tasks from tasks queue
@@ -321,14 +340,14 @@ public class TaskQueueZkImpl implements ITaskQueue {
    public void delete(){
        try {
            String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
            String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);
            String tasksKillPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);

            for(String taskQueuePath : new String[]{tasksQueuePath,tasksCancelPath}){
                if(zookeeperOperator.isExisted(taskQueuePath)){
                    List<String> list = zookeeperOperator.getChildrenKeys(taskQueuePath);
            for (String key : new String[]{tasksQueuePath,tasksKillPath}){
                if (zookeeperOperator.isExisted(key)){
                    List<String> list = zookeeperOperator.getChildrenKeys(key);
                    for (String task : list) {
                        zookeeperOperator.remove(taskQueuePath + Constants.SINGLE_SLASH + task);
                        logger.info("delete task from tasks queue : {}/{} ",taskQueuePath,task);
                        zookeeperOperator.remove(key + Constants.SINGLE_SLASH + task);
                        logger.info("delete task from tasks queue : {}/{} ", key, task);
                    }
                }
            }