Unverified Commit 753ed58f authored by tswstarplanet's avatar tswstarplanet Committed by GitHub
Browse files

[Bug-3140]fix the deadlock between start and stop of ZKServer (#3141)



* [Bug-3140]fix the deadlock between start and stop of ZKServer

* use Log framework to print information

* fix code smells; add path prefix of embedded zk server

* Update dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java

Co-authored-by: default avatarYichao Yang <1048262223@qq.com>

* optimize the code

Co-authored-by: default avatarYichao Yang <1048262223@qq.com>
parent 6f9970b1
Loading
Loading
Loading
Loading
+71 −48
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@
 */
package org.apache.dolphinscheduler.service.zk;

import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
@@ -34,44 +35,62 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class ZKServer {
    private static final Logger logger = LoggerFactory.getLogger(ZKServer.class);

    private static volatile PublicZooKeeperServerMain zkServer = null;

    public static final int DEFAULT_ZK_TEST_PORT = 2181;

    private static String dataDir = null;
    private final AtomicBoolean isStarted = new AtomicBoolean(false);

    private PublicZooKeeperServerMain zooKeeperServerMain = null;

    private int port;

    private static final AtomicBoolean isStarted = new AtomicBoolean(false);
    private String dataDir = null;

    private String prefix;

    public static void main(String[] args) {
        if(!isStarted()){
            ZKServer.start();
        ZKServer zkServer;
        if (args.length == 0) {
            zkServer = new ZKServer();
        } else if (args.length == 1){
            zkServer = new ZKServer(Integer.valueOf(args[0]), "");
        } else {
            zkServer = new ZKServer(Integer.valueOf(args[0]), args[1]);
        }
        zkServer.registerHook();
        zkServer.start();
    }

    public ZKServer() {
        this(DEFAULT_ZK_TEST_PORT, "");
    }

    public ZKServer(int port, String prefix) {
        this.port = port;
        if (prefix != null && prefix.contains("/")) {
            throw new IllegalArgumentException("The prefix of path may not have '/'");
        }
        this.prefix = (prefix == null ? null : prefix.trim());
    }

    private void registerHook() {
        /**
         *  register hooks, which are called before the process exits
         */
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                @Override
                public void run() {
                    stop();
                }
            }));
        }else{
            logger.info("zk server aleady started");
        }
        Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
    }

    /**
     * start service
     */
    public static void start() {
    public void start() {
        try {
            startLocalZkServer(DEFAULT_ZK_TEST_PORT);
            startLocalZkServer(port);
        } catch (Exception e) {
            logger.error("Failed to start ZK: " + e);
            logger.error("Failed to start ZK ", e);
        }
    }

    public static boolean isStarted(){
    public boolean isStarted(){
        return isStarted.get();
    }

@@ -94,8 +113,12 @@ public class ZKServer {
     *
     * @param port The port to listen on
     */
    public static void startLocalZkServer(final int port) {
        String zkDataDir = System.getProperty("user.dir") +"/zookeeper_data";
    public void startLocalZkServer(final int port) {
        String zkDataDir = System.getProperty("user.dir") + (StringUtils.isEmpty(prefix) ? StringUtils.EMPTY : ("/" + prefix)) + "/zookeeper_data";
        File file = new File(zkDataDir);
        if (file.exists()) {
            logger.warn("The path of zk server exists");
        }
        logger.info("zk server starting, data dir path:{}" , zkDataDir);
        startLocalZkServer(port, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME,"60");
    }
@@ -108,11 +131,9 @@ public class ZKServer {
     * @param tickTime    zk tick time
     * @param maxClientCnxns    zk max client connections
     */
    private static synchronized void startLocalZkServer(final int port, final String dataDirPath,final int tickTime,String maxClientCnxns) {
        if (zkServer != null) {
            throw new RuntimeException("Zookeeper server is already started!");
        }
        zkServer = new PublicZooKeeperServerMain();
    private void startLocalZkServer(final int port, final String dataDirPath,final int tickTime,String maxClientCnxns) {
        if (isStarted.compareAndSet(false, true)) {
            zooKeeperServerMain = new PublicZooKeeperServerMain();
            logger.info("Zookeeper data path : {} ", dataDirPath);
            dataDir = dataDirPath;
            final String[] args = new String[]{Integer.toString(port), dataDirPath, Integer.toString(tickTime), maxClientCnxns};
@@ -121,18 +142,18 @@ public class ZKServer {
                logger.info("Zookeeper server started ");
                isStarted.compareAndSet(false, true);

            zkServer.initializeAndRun(args);
        } catch (QuorumPeerConfig.ConfigException e) {
            logger.warn("Caught exception while starting ZK", e);
        } catch (IOException e) {
                zooKeeperServerMain.initializeAndRun(args);
            } catch (QuorumPeerConfig.ConfigException | IOException e) {
                logger.warn("Caught exception while starting ZK", e);
                throw new RuntimeException(e);
            }
        }
    }

    /**
     * Stops a local Zk instance, deleting its data directory
     */
    public static void stop() {
    public void stop() {
        try {
            stopLocalZkServer(true);
            logger.info("zk server stopped");
@@ -147,15 +168,17 @@ public class ZKServer {
     *
     * @param deleteDataDir Whether or not to delete the data directory
     */
    private static synchronized void stopLocalZkServer(final boolean deleteDataDir) {
        if (zkServer != null) {
    private void stopLocalZkServer(final boolean deleteDataDir) {
        if (isStarted.compareAndSet(true, false)) {
            try {
                zkServer.shutdown();
                zkServer = null;
                if (zooKeeperServerMain == null) {
                    return;
                }
                zooKeeperServerMain.shutdown();
                zooKeeperServerMain = null;
                if (deleteDataDir) {
                    org.apache.commons.io.FileUtils.deleteDirectory(new File(dataDir));
                }
                isStarted.compareAndSet(true, false);
            } catch (Exception e) {
                logger.warn("Caught exception while stopping ZK server", e);
                throw new RuntimeException(e);
+32 −6
Original line number Diff line number Diff line
@@ -18,18 +18,44 @@ package org.apache.dolphinscheduler.service.zk;

import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// ZKServer is a process, can't unit test
public class ZKServerTest {
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class ZKServerTest {
    private static final Logger log = LoggerFactory.getLogger(ZKServerTest.class);

    @Test
    public void isStarted() {
        Assert.assertEquals(false, ZKServer.isStarted());
    public void testRunWithDefaultPort() {
        AtomicReference<ZKServer> zkServer = new AtomicReference<>();
        new Thread(() -> {
            zkServer.set(new ZKServer());
            zkServer.get().start();
        }).start();
        try {
            TimeUnit.SECONDS.sleep(5);
            Assert.assertEquals(true, zkServer.get().isStarted());
        } catch (InterruptedException e) {
            log.error("Thread interrupted", e);
        }
        zkServer.get().stop();
    }

    @Test
    public void stop() {
        ZKServer.stop();
    public void testRunWithCustomPort() {
        AtomicReference<ZKServer> zkServer = new AtomicReference<>();
        new Thread(() -> {
            zkServer.set(new ZKServer(2183, null));
            zkServer.get().start();
        }).start();
        try {
            TimeUnit.SECONDS.sleep(5);
            Assert.assertEquals(true, zkServer.get().isStarted());
        } catch (InterruptedException e) {
            log.error("Thread interrupted", e);
        }
        zkServer.get().stop();
    }
}
 No newline at end of file