Commit 63f396b6 authored by DK.Pino's avatar DK.Pino Committed by qiaozhanwei
Browse files

refactor zk tree cache (#1577)

* refactor zk tree cache

* refactor zk tree cache
parent 597d6859
Loading
Loading
Loading
Loading
+0 −35
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.common.zk;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;

public abstract class AbstractListener implements TreeCacheListener {

    @Override
    public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
        String path = null == event.getData() ? "" : event.getData().getPath();
        if (path.isEmpty()) {
            return;
        }
        dataChanged(client, event, path);
    }

    protected abstract void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path);
}
+27 −21
Original line number Diff line number Diff line
@@ -16,8 +16,10 @@
 */
package org.apache.dolphinscheduler.common.zk;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,30 +36,37 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {

    private final Logger logger = LoggerFactory.getLogger(ZookeeperCachedOperator.class);

    //kay is zk path, value is TreeCache
    private ConcurrentHashMap<String, TreeCache> allCaches = new ConcurrentHashMap<>();

    TreeCache treeCache;
    /**
     * @param cachePath zk path
     * @param listener  operator
     * register a unified listener of /${dsRoot},
     */
    public void registerListener(final String cachePath, final TreeCacheListener listener) {
        TreeCache newCache = new TreeCache(zkClient, cachePath);
        logger.info("add listener to zk path: {}", cachePath);
    @Override
    protected void registerListener() {
        treeCache = new TreeCache(zkClient, getZookeeperConfig().getDsRoot());
        logger.info("add listener to zk path: {}", getZookeeperConfig().getDsRoot());
        try {
            newCache.start();
            treeCache.start();
        } catch (Exception e) {
            logger.error("add listener to zk path: {} failed", cachePath);
            logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot());
            throw new RuntimeException(e);
        }

        newCache.getListenable().addListener(listener);
        treeCache.getListenable().addListener((client, event) -> {
            String path = null == event.getData() ? "" : event.getData().getPath();
            if (path.isEmpty()) {
                return;
            }
            dataChanged(client, event, path);
        });

        allCaches.put(cachePath, newCache);
    }

    //for sub class
    protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path){}

    public String getFromCache(final String cachePath, final String key) {
        ChildData resultInCache = allCaches.get(checkNotNull(cachePath)).getCurrentData(key);
        ChildData resultInCache = treeCache.getCurrentData(key);
        if (null != resultInCache) {
            return null == resultInCache.getData() ? null : new String(resultInCache.getData(), StandardCharsets.UTF_8);
        }
@@ -65,18 +74,15 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
    }

    public TreeCache getTreeCache(final String cachePath) {
        return allCaches.get(checkNotNull(cachePath));
        return treeCache;
    }

    public void close() {

        allCaches.forEach((path, cache) -> {
            cache.close();
        treeCache.close();
        try {
            Thread.sleep(500);
        } catch (InterruptedException ignore) {
        }
        });
        super.close();
    }
}
+5 −3
Original line number Diff line number Diff line
@@ -57,11 +57,13 @@ public class ZookeeperOperator implements InitializingBean {
    public void afterPropertiesSet() throws Exception {
        this.zkClient = buildClient();
        initStateLister();
        //init();
        registerListener();
    }

    //for subclass
    //protected void init(){}
    /**
     * this method is for sub class,
     */
    protected void registerListener(){}

    public void initStateLister() {
        checkNotNull(zkClient);
+46 −50
Original line number Diff line number Diff line
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.zk.AbstractListener;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
@@ -31,9 +30,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.utils.ThreadUtils;
import org.slf4j.Logger;
@@ -101,12 +97,6 @@ public class ZKMasterClient extends AbstractZKClient {
			// init system znode
			this.initSystemZNode();

			// monitor master
			this.listenerMaster();

			// monitor worker
			this.listenerWorker();

			// register master
			this.registerMaster();

@@ -158,30 +148,21 @@ public class ZKMasterClient extends AbstractZKClient {
		}
	}


	/**
	 *  monitor master
	 * handle path events that this class cares about
	 * @param client   zkClient
	 * @param event	   path event
	 * @param path     zk path
	 */
	public void listenerMaster(){
		registerListener(getZNodeParentPath(ZKNodeType.MASTER), new AbstractListener() {
	@Override
	protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
				switch (event.getType()) {
					case NODE_ADDED:
						logger.info("master node added : {}", path);
						break;
					case NODE_REMOVED:
						String serverHost = getHostByEventDataPath(path);
						if (checkServerSelfDead(serverHost, ZKNodeType.MASTER)) {
							return;
						}
						removeZKNodePath(path, ZKNodeType.MASTER, true);
						break;
					default:
						break;
				}
		if(path.startsWith(getZNodeParentPath(ZKNodeType.MASTER)+Constants.SINGLE_SLASH)){  //monitor master
			handleMasterEvent(event,path);

		}else if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){  //monitor worker
			handleWorkerEvent(event,path);
		}
		});
		//other path event, ignore
	}

	/**
@@ -272,13 +253,30 @@ public class ZKMasterClient extends AbstractZKClient {
		}
	}

	/**
	 * monitor master
	 */
	public void handleMasterEvent(TreeCacheEvent event, String path){
		switch (event.getType()) {
			case NODE_ADDED:
				logger.info("master node added : {}", path);
				break;
			case NODE_REMOVED:
				String serverHost = getHostByEventDataPath(path);
				if (checkServerSelfDead(serverHost, ZKNodeType.MASTER)) {
					return;
				}
				removeZKNodePath(path, ZKNodeType.MASTER, true);
				break;
			default:
				break;
		}
	}

	/**
	 * monitor worker
	 */
	public void listenerWorker(){
		registerListener(getZNodeParentPath(ZKNodeType.WORKER), new AbstractListener() {
			@Override
			protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
	public void handleWorkerEvent(TreeCacheEvent event, String path){
		switch (event.getType()) {
			case NODE_ADDED:
				logger.info("worker node added : {}", path);
@@ -291,8 +289,6 @@ public class ZKMasterClient extends AbstractZKClient {
				break;
		}
	}
		});
	}


	/**
+29 −32
Original line number Diff line number Diff line
@@ -19,20 +19,13 @@ package org.apache.dolphinscheduler.server.zk;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.zk.AbstractListener;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.concurrent.ThreadFactory;


/**
 *  zookeeper worker client
@@ -61,9 +54,6 @@ public class ZKWorkerClient extends AbstractZKClient {
		// init system znode
		this.initSystemZNode();

		// monitor worker
		this.listenerWorker();

		// register worker
		this.registWorker();
	}
@@ -85,12 +75,22 @@ public class ZKWorkerClient extends AbstractZKClient {
	}

	/**
	 *  monitor worker
	 * handle path events that this class cares about
	 * @param client   zkClient
	 * @param event	   path event
	 * @param path     zk path
	 */
	private void listenerWorker(){
		registerListener(getZNodeParentPath(ZKNodeType.WORKER), new AbstractListener() {
	@Override
	protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
		if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){
			handleWorkerEvent(event,path);
		}
	}

	/**
	 * monitor worker
	 */
	public void handleWorkerEvent(TreeCacheEvent event, String path){
		switch (event.getType()) {
			case NODE_ADDED:
				logger.info("worker node added : {}", path);
@@ -106,9 +106,6 @@ public class ZKWorkerClient extends AbstractZKClient {
				break;
		}
	}
		});

	}

	/**
	 * get worker znode