Commit b0533224 authored by Technoboy-'s avatar Technoboy-
Browse files

refactor AbstractZKClient

parent 98a7daa6
Loading
Loading
Loading
Loading
+36 −60
Original line number Diff line number Diff line
@@ -16,6 +16,22 @@
 */
package org.apache.dolphinscheduler.common.zk;

import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
@@ -23,26 +39,9 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ResInfo;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

import static org.apache.dolphinscheduler.common.Constants.*;


/**
 * abstract zookeeper client
@@ -70,8 +69,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
				return;
			}

			byte[] bytes = zkClient.getData().forPath(znode);
			String resInfoStr = new String(bytes);
			String resInfoStr = super.get(znode);
			String[] splits = resInfoStr.split(Constants.COMMA);
			if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
				return;
@@ -107,8 +105,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
		String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX;
		String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;

		if(zkClient.checkExists().forPath(zNode) == null ||
				zkClient.checkExists().forPath(deadServerPath) != null ){
		if(!isExisted(zNode) || isExisted(deadServerPath)){
			return true;
		}

@@ -118,17 +115,15 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{


	public void removeDeadServerByHost(String host, String serverType) throws Exception {
        List<String> deadServers = zkClient.getChildren().forPath(getDeadZNodeParentPath());
        List<String> deadServers = super.getChildrenKeys(getDeadZNodeParentPath());
        for(String serverPath : deadServers){
            if(serverPath.startsWith(serverType+UNDERLINE+host)){
							String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
				if(zkClient.checkExists().forPath(server) != null){
					zkClient.delete().forPath(server);
              super.remove(server);
							logger.info("{} server {} deleted from zk dead server path success" , serverType , host);
            }
        }
	}
	}


	/**
@@ -143,8 +138,8 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
		// create temporary sequence nodes for master znode
		String parentPath = getZNodeParentPath(zkNodeType);
		String serverPathPrefix = parentPath + "/" + OSUtils.getHost();
		String registerPath = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(
				serverPathPrefix + UNDERLINE, heartbeatZKInfo.getBytes());
    String registerPath = serverPathPrefix + UNDERLINE;
    super.persistEphemeral(registerPath, heartbeatZKInfo);
		logger.info("register {} node {} success" , zkNodeType.toString(), registerPath);
		return registerPath;
	}
@@ -196,10 +191,10 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{

		}else if(opType.equals(ADD_ZK_OP)){
			String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
			if(zkClient.checkExists().forPath(deadServerPath) == null){
			if(!super.isExisted(deadServerPath)){
				//add dead server info to zk dead server path : /dead-servers/

				zkClient.create().forPath(deadServerPath,(type + UNDERLINE + ipSeqNo).getBytes());
				super.persist(deadServerPath,(type + UNDERLINE + ipSeqNo));

				logger.info("{} server dead , and {} added to zk dead server path success" ,
						zkNodeType.toString(), zNode);
@@ -226,20 +221,14 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
		List<String> childrenList = new ArrayList<>();
		try {
			// read master node parent path from conf
			if(zkClient.checkExists().forPath(getZNodeParentPath(ZKNodeType.MASTER)) != null){
				childrenList = zkClient.getChildren().forPath(getZNodeParentPath(ZKNodeType.MASTER));
			if(super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))){
				childrenList = super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
			}
		} catch (Exception e) {
			if(e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){
				logger.error("zookeeper service not started",e);
			}else{
				logger.error(e.getMessage(),e);
			logger.error("getActiveMasterNum error",e);
		}

		}finally {
		return childrenList.size();
	}
	}

	/**
	 *
@@ -280,10 +269,9 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
		Map<String, String> masterMap = new HashMap<>();
		try {
			String path =  getZNodeParentPath(zkNodeType);
			List<String> serverList  = getZkClient().getChildren().forPath(path);
			List<String> serverList  = super.getChildrenKeys(path);
			for(String server : serverList){
				byte[] bytes  = getZkClient().getData().forPath(path + "/" + server);
				masterMap.putIfAbsent(server, new String(bytes));
				masterMap.putIfAbsent(server, super.get(path + "/" + server));
			}
		} catch (Exception e) {
			logger.error("get server list failed : " + e.getMessage(), e);
@@ -430,27 +418,15 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
	 */
	protected void initSystemZNode(){
		try {
			createNodePath(getMasterZNodeParentPath());
			createNodePath(getWorkerZNodeParentPath());
			createNodePath(getDeadZNodeParentPath());
			persist(getMasterZNodeParentPath(), "");
			persist(getWorkerZNodeParentPath(), "");
			persist(getDeadZNodeParentPath(), "");

		} catch (Exception e) {
			logger.error("init system znode failed : " + e.getMessage(),e);
		}
	}

	/**
	 * create zookeeper node path if not exists
	 * @param zNodeParentPath zookeeper parent path
	 * @throws Exception errors
	 */
	private void createNodePath(String zNodeParentPath) throws Exception {
	    if(null == zkClient.checkExists().forPath(zNodeParentPath)){
	        zkClient.create().creatingParentContainersIfNeeded()
					.withMode(CreateMode.PERSISTENT).forPath(zNodeParentPath);
		}
	}

	/**
	 * server self dead, stop all threads
	 * @param serverHost server host