Loading dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java +7 −19 Original line number Diff line number Diff line Loading @@ -17,26 +17,16 @@ package org.apache.dolphinscheduler.common.queue; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.TreeSet; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.Bytes; import org.apache.dolphinscheduler.common.utils.IpUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.zk.AbstractZKClient; import org.apache.curator.framework.CuratorFramework; import org.apache.dolphinscheduler.common.zk.DefaultEnsembleProvider; import org.apache.dolphinscheduler.common.zk.ZookeeperConfig; import org.apache.zookeeper.CreateMode; Loading @@ -44,7 +34,7 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull; import java.util.*; /** * A singleton of a task queue implemented with zookeeper Loading Loading @@ -421,12 +411,11 @@ public class TaskQueueZkImpl implements ITaskQueue { logger.error("load zookeeper properties file failed, system exit"); System.exit(-1); } zookeeperConfig = ZookeeperConfig.getFromConf(conf); zkClient = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(), "zookeeper quorum can't be null"))) .retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs())) .sessionTimeoutMs(zookeeperConfig.getSessionTimeoutMs()) .connectionTimeoutMs(zookeeperConfig.getConnectionTimeoutMs()) zkClient = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(conf.getString("zookeeper.quorum"))) .retryPolicy(new ExponentialBackoffRetry(conf.getInt("zookeeper.retry.base.sleep"), conf.getInt("zookeeper.retry.maxtime"), conf.getInt("zookeeper.retry.max.sleep"))) .sessionTimeoutMs(conf.getInt("zookeeper.session.timeout")) .connectionTimeoutMs(conf.getInt("zookeeper.connection.timeout")) .build(); zkClient.start(); Loading Loading @@ -472,8 +461,7 @@ public class TaskQueueZkImpl implements ITaskQueue { * @return */ public String getTasksPath(String key){ return zookeeperConfig.getDsRoot() + Constants.SINGLE_SLASH + key; return "/dolphinscheduler" + Constants.SINGLE_SLASH + key; } } dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperConfig.java +62 −15 Original line number Diff line number Diff line Loading @@ -16,11 +16,6 @@ */ package org.apache.dolphinscheduler.common.zk; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.configuration.Configuration; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.PropertySource; import org.springframework.stereotype.Component; Loading @@ -29,10 +24,6 @@ import org.springframework.stereotype.Component; * zookeeper conf */ @Component @Data @AllArgsConstructor @NoArgsConstructor @Builder @PropertySource("classpath:zookeeper.properties") public class ZookeeperConfig { Loading @@ -58,14 +49,70 @@ public class ZookeeperConfig { @Value("${zookeeper.connection.digest: }") private String digest; //ds scheduler install config @Value("${zookeeper.dolphinscheduler.root:/dolphinscheduler}") private String dsRoot; public static ZookeeperConfig getFromConf(Configuration conf){ return ZookeeperConfig.builder().serverList(conf.getString("zookeeper.quorum")).baseSleepTimeMs(conf.getInt("zookeeper.retry.base.sleep")) .maxSleepMs(conf.getInt("zookeeper.retry.max.sleep")).maxRetries(conf.getInt("zookeeper.retry.maxtime")) .sessionTimeoutMs(conf.getInt("zookeeper.session.timeout")).connectionTimeoutMs(conf.getInt("zookeeper.connection.timeout")) .dsRoot(conf.getString("zookeeper.dolphinscheduler.root")).build(); public String getServerList() { return serverList; } public void setServerList(String serverList) { this.serverList = serverList; } public int getBaseSleepTimeMs() { return baseSleepTimeMs; } public void setBaseSleepTimeMs(int baseSleepTimeMs) { this.baseSleepTimeMs = baseSleepTimeMs; } public int getMaxSleepMs() { return maxSleepMs; } public void setMaxSleepMs(int maxSleepMs) { this.maxSleepMs = maxSleepMs; } public int getMaxRetries() { return maxRetries; } public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; } public int getSessionTimeoutMs() { return sessionTimeoutMs; } public void setSessionTimeoutMs(int sessionTimeoutMs) { this.sessionTimeoutMs = sessionTimeoutMs; } public int getConnectionTimeoutMs() { return connectionTimeoutMs; } public void setConnectionTimeoutMs(int connectionTimeoutMs) { this.connectionTimeoutMs = connectionTimeoutMs; } public String getDigest() { return digest; } public void setDigest(String digest) { this.digest = digest; } public String getDsRoot() { return dsRoot; } public void setDsRoot(String dsRoot) { this.dsRoot = dsRoot; } } No newline at end of file Loading
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java +7 −19 Original line number Diff line number Diff line Loading @@ -17,26 +17,16 @@ package org.apache.dolphinscheduler.common.queue; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.TreeSet; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.Bytes; import org.apache.dolphinscheduler.common.utils.IpUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.zk.AbstractZKClient; import org.apache.curator.framework.CuratorFramework; import org.apache.dolphinscheduler.common.zk.DefaultEnsembleProvider; import org.apache.dolphinscheduler.common.zk.ZookeeperConfig; import org.apache.zookeeper.CreateMode; Loading @@ -44,7 +34,7 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull; import java.util.*; /** * A singleton of a task queue implemented with zookeeper Loading Loading @@ -421,12 +411,11 @@ public class TaskQueueZkImpl implements ITaskQueue { logger.error("load zookeeper properties file failed, system exit"); System.exit(-1); } zookeeperConfig = ZookeeperConfig.getFromConf(conf); zkClient = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(), "zookeeper quorum can't be null"))) .retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs())) .sessionTimeoutMs(zookeeperConfig.getSessionTimeoutMs()) .connectionTimeoutMs(zookeeperConfig.getConnectionTimeoutMs()) zkClient = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(conf.getString("zookeeper.quorum"))) .retryPolicy(new ExponentialBackoffRetry(conf.getInt("zookeeper.retry.base.sleep"), conf.getInt("zookeeper.retry.maxtime"), conf.getInt("zookeeper.retry.max.sleep"))) .sessionTimeoutMs(conf.getInt("zookeeper.session.timeout")) .connectionTimeoutMs(conf.getInt("zookeeper.connection.timeout")) .build(); zkClient.start(); Loading Loading @@ -472,8 +461,7 @@ public class TaskQueueZkImpl implements ITaskQueue { * @return */ public String getTasksPath(String key){ return zookeeperConfig.getDsRoot() + Constants.SINGLE_SLASH + key; return "/dolphinscheduler" + Constants.SINGLE_SLASH + key; } }
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperConfig.java +62 −15 Original line number Diff line number Diff line Loading @@ -16,11 +16,6 @@ */ package org.apache.dolphinscheduler.common.zk; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.configuration.Configuration; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.PropertySource; import org.springframework.stereotype.Component; Loading @@ -29,10 +24,6 @@ import org.springframework.stereotype.Component; * zookeeper conf */ @Component @Data @AllArgsConstructor @NoArgsConstructor @Builder @PropertySource("classpath:zookeeper.properties") public class ZookeeperConfig { Loading @@ -58,14 +49,70 @@ public class ZookeeperConfig { @Value("${zookeeper.connection.digest: }") private String digest; //ds scheduler install config @Value("${zookeeper.dolphinscheduler.root:/dolphinscheduler}") private String dsRoot; public static ZookeeperConfig getFromConf(Configuration conf){ return ZookeeperConfig.builder().serverList(conf.getString("zookeeper.quorum")).baseSleepTimeMs(conf.getInt("zookeeper.retry.base.sleep")) .maxSleepMs(conf.getInt("zookeeper.retry.max.sleep")).maxRetries(conf.getInt("zookeeper.retry.maxtime")) .sessionTimeoutMs(conf.getInt("zookeeper.session.timeout")).connectionTimeoutMs(conf.getInt("zookeeper.connection.timeout")) .dsRoot(conf.getString("zookeeper.dolphinscheduler.root")).build(); public String getServerList() { return serverList; } public void setServerList(String serverList) { this.serverList = serverList; } public int getBaseSleepTimeMs() { return baseSleepTimeMs; } public void setBaseSleepTimeMs(int baseSleepTimeMs) { this.baseSleepTimeMs = baseSleepTimeMs; } public int getMaxSleepMs() { return maxSleepMs; } public void setMaxSleepMs(int maxSleepMs) { this.maxSleepMs = maxSleepMs; } public int getMaxRetries() { return maxRetries; } public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; } public int getSessionTimeoutMs() { return sessionTimeoutMs; } public void setSessionTimeoutMs(int sessionTimeoutMs) { this.sessionTimeoutMs = sessionTimeoutMs; } public int getConnectionTimeoutMs() { return connectionTimeoutMs; } public void setConnectionTimeoutMs(int connectionTimeoutMs) { this.connectionTimeoutMs = connectionTimeoutMs; } public String getDigest() { return digest; } public void setDigest(String digest) { this.digest = digest; } public String getDsRoot() { return dsRoot; } public void setDsRoot(String dsRoot) { this.dsRoot = dsRoot; } } No newline at end of file