Unverified Commit 372e58d7 authored by bao liang's avatar bao liang Committed by GitHub
Browse files

#1813 remove "_001" after the master/server register path in zookeepe (#1824)

* donot submit task to queue if sub process

* [feature] #1813 remove "_001" after the master/server register path in zookeeper (#1820)

* change master/worker register path.

* remove "_" from register path.
parent d7b55abd
Loading
Loading
Loading
Loading
+21 −34
Original line number Diff line number Diff line
@@ -125,20 +125,18 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
        }
	}


	/**
	 * create zookeeper path according the zk node type.
	 * @param zkNodeType zookeeper node type
	 * @return  register zookeeper path
	 * @throws Exception
	 */
	private String createZNodePath(ZKNodeType zkNodeType) throws Exception {
	private String createZNodePath(ZKNodeType zkNodeType, String host) throws Exception {
		// specify the format of stored data in ZK nodes
		String heartbeatZKInfo = ResInfo.getHeartBeatInfo(new Date());
		// create temporary sequence nodes for master znode
		String parentPath = getZNodeParentPath(zkNodeType);
		String serverPathPrefix = parentPath + "/" + OSUtils.getHost();
    String registerPath = serverPathPrefix + UNDERLINE;
		String registerPath= getZNodeParentPath(zkNodeType) + SINGLE_SLASH + host;

    	super.persistEphemeral(registerPath, heartbeatZKInfo);
		logger.info("register {} node {} success" , zkNodeType.toString(), registerPath);
		return registerPath;
@@ -158,7 +156,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
					zkNodeType.toString(), host);
			return registerPath;
		}
		registerPath = createZNodePath(zkNodeType);
		registerPath = createZNodePath(zkNodeType, host);

    // handle dead server
		handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP);
@@ -176,25 +174,19 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
	 * @throws Exception errors
	 */
	public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception {
		//ip_sequenceno
		String[] zNodesPath = zNode.split("\\/");
		String ipSeqNo = zNodesPath[zNodesPath.length - 1];

		String host = getHostByEventDataPath(zNode);
		String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;


		//check server restart, if restart , dead server path in zk should be delete
		if(opType.equals(DELETE_ZK_OP)){
			String[] ipAndSeqNo = ipSeqNo.split(UNDERLINE);
			String ip = ipAndSeqNo[0];
			removeDeadServerByHost(ip, type);
			removeDeadServerByHost(host, type);

		}else if(opType.equals(ADD_ZK_OP)){
			String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
			String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
			if(!super.isExisted(deadServerPath)){
				//add dead server info to zk dead server path : /dead-servers/

				super.persist(deadServerPath,(type + UNDERLINE + ipSeqNo));
				super.persist(deadServerPath,(type + UNDERLINE + host));

				logger.info("{} server dead , and {} added to zk dead server path success" ,
						zkNodeType.toString(), zNode);
@@ -295,21 +287,13 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
		}
		Map<String, String> serverMaps = getServerMaps(zkNodeType);
		for(String hostKey : serverMaps.keySet()){
			if(hostKey.startsWith(host + UNDERLINE)){
			if(hostKey.startsWith(host)){
				return true;
			}
		}
		return false;
	}

	/**
	 *  get zkclient
	 * @return zookeeper client
	 */
	public  CuratorFramework getZkClient() {
		return zkClient;
	}

	/**
	 *
	 * @return get worker node parent path
@@ -446,19 +430,22 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
	}

	/**
	 *  get host ip, string format: masterParentPath/ip_000001/value
	 *  get host ip, string format: masterParentPath/ip
	 * @param path path
	 * @return host ip, string format: masterParentPath/ip_000001/value
	 * @return host ip, string format: masterParentPath/ip
	 */
	protected String getHostByEventDataPath(String path) {
		int  startIndex = path.lastIndexOf("/")+1;
		int endIndex = 	path.lastIndexOf("_");

		if(startIndex >= endIndex){
			logger.error("parse ip error");
		if(StringUtils.isEmpty(path)){
		    logger.error("empty path!");
			return "";
		}
		String[] pathArray = path.split(SINGLE_SLASH);
		if(pathArray.length < 1){
			logger.error("parse ip error: {}", path);
			return "";
		}
		return path.substring(startIndex, endIndex);
		return pathArray[pathArray.length - 1];

	}
	/**
	 * acquire zk lock
+1 −0
Original line number Diff line number Diff line
@@ -77,6 +77,7 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
        return treeCache;
    }

    @Override
    public void close() {
        treeCache.close();
        try {