Loading pom.xml +0 −47 Original line number Diff line number Diff line Loading @@ -67,10 +67,6 @@ <atomikos.version>4.0.6</atomikos.version> <curator.version>2.10.0</curator.version> <grpc.version>1.7.0</grpc.version> <protobuf.version>3.4.0</protobuf.version> <guava-retrying.version>2.0.0</guava-retrying.version> <opentracing.version>0.30.0</opentracing.version> <lombok.version>1.16.4</lombok.version> Loading Loading @@ -119,7 +115,6 @@ <jdepend-maven-plugin.version>2.0</jdepend-maven-plugin.version> <taglist-maven-plugin.version>2.4</taglist-maven-plugin.version> <maven-gpg-plugin.version>1.6</maven-gpg-plugin.version> <protobuf-maven-plugin.version>0.5.0</protobuf-maven-plugin.version> <os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version> <docker-maven-plugin.version>0.4.14</docker-maven-plugin.version> <apache-rat-plugin.version>0.12</apache-rat-plugin.version> Loading Loading @@ -222,26 +217,6 @@ <artifactId>curator-recipes</artifactId> <version>${curator.version}</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty</artifactId> <version>${grpc.version}</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> <version>${grpc.version}</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> <version>${grpc.version}</version> </dependency> <dependency> <groupId>com.github.rholder</groupId> <artifactId>guava-retrying</artifactId> <version>${guava-retrying.version}</version> </dependency> <dependency> <groupId>io.opentracing</groupId> Loading Loading @@ -685,24 +660,6 @@ </execution> </executions> </plugin> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>${protobuf-maven-plugin.version}</version> <configuration> <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact> <pluginId>grpc-java</pluginId> <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>compile-custom</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.eluder.coveralls</groupId> <artifactId>coveralls-maven-plugin</artifactId> Loading Loading @@ -733,11 +690,7 @@ </ignoreMethodAnnotations> <excludes> <exclude>org/apache/shardingsphere/core/parse/antlr/autogen/*.class</exclude> <exclude>authpb/*.class</exclude> <exclude>mvccpb/*.class</exclude> <exclude>etcdserverpb/*.class</exclude> <exclude>org/apache/shardingsphere/orchestration/reg/zookeeper/*.class</exclude> <exclude>org/apache/shardingsphere/orchestration/reg/etcd/*.class</exclude> <exclude>org/apache/shardingsphere/**/*Test.class</exclude> <exclude>org/apache/shardingsphere/**/Test*.class</exclude> </excludes> Loading sharding-orchestration/sharding-orchestration-reg/pom.xml +0 −1 Original line number Diff line number Diff line Loading @@ -30,6 +30,5 @@ <modules> <module>sharding-orchestration-reg-api</module> <module>sharding-orchestration-reg-zookeeper-curator</module> <module>sharding-orchestration-reg-etcd</module> </modules> </project> sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-etcd/pom.xmldeleted 100644 → 0 +0 −53 Original line number Diff line number Diff line <?xml version="1.0" encoding="UTF-8"?> <!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more ~ contributor license agreements. See the NOTICE file distributed with ~ this work for additional information regarding copyright ownership. ~ The ASF licenses this file to You under the Apache License, Version 2.0 ~ (the "License"); you may not use this file except in compliance with ~ the License. You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0 ~ ~ Unless required by applicable law or agreed to in writing, software ~ distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the License for the specific language governing permissions and ~ limitations under the License. --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-orchestration-reg</artifactId> <version>4.0.0-RC1-SNAPSHOT</version> </parent> <artifactId>sharding-orchestration-reg-etcd</artifactId> <name>${project.artifactId}</name> <dependencies> <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-orchestration-reg-api</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty</artifactId> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> </dependency> <dependency> <groupId>com.github.rholder</groupId> <artifactId>guava-retrying</artifactId> </dependency> </dependencies> </project> sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-etcd/src/main/java/org/apache/shardingsphere/orchestration/reg/etcd/EtcdRegistryCenter.javadeleted 100644 → 0 +0 −206 Original line number Diff line number Diff line /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.shardingsphere.orchestration.reg.etcd; import com.google.common.base.Optional; import com.google.common.base.Splitter; import com.google.protobuf.ByteString; import etcdserverpb.KVGrpc; import etcdserverpb.KVGrpc.KVFutureStub; import etcdserverpb.LeaseGrpc; import etcdserverpb.LeaseGrpc.LeaseFutureStub; import etcdserverpb.Rpc.LeaseGrantRequest; import etcdserverpb.Rpc.PutRequest; import etcdserverpb.Rpc.RangeRequest; import etcdserverpb.Rpc.RangeResponse; import etcdserverpb.Rpc.WatchCreateRequest; import etcdserverpb.Rpc.WatchRequest; import etcdserverpb.WatchGrpc; import etcdserverpb.WatchGrpc.WatchStub; import io.grpc.Channel; import mvccpb.Kv.KeyValue; import org.apache.shardingsphere.orchestration.reg.api.RegistryCenter; import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration; import org.apache.shardingsphere.orchestration.reg.etcd.internal.channel.EtcdChannelFactory; import org.apache.shardingsphere.orchestration.reg.etcd.internal.keepalive.KeepAlive; import org.apache.shardingsphere.orchestration.reg.etcd.internal.retry.EtcdRetryEngine; import org.apache.shardingsphere.orchestration.reg.etcd.internal.watcher.EtcdWatchStreamObserver; import org.apache.shardingsphere.orchestration.reg.exception.RegistryCenterException; import org.apache.shardingsphere.orchestration.reg.listener.DataChangedEventListener; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * Etcd based registry center. * * @author junxiong */ public final class EtcdRegistryCenter implements RegistryCenter { private RegistryCenterConfiguration config; private EtcdRetryEngine etcdRetryEngine; private KVFutureStub kvStub; private LeaseFutureStub leaseStub; private WatchStub watchStub; private KeepAlive keepAlive; @Override public void init(final RegistryCenterConfiguration config) { this.config = config; etcdRetryEngine = new EtcdRetryEngine(config); Channel channel = EtcdChannelFactory.getInstance(Splitter.on(',').trimResults().splitToList(config.getServerLists())); kvStub = KVGrpc.newFutureStub(channel); leaseStub = LeaseGrpc.newFutureStub(channel); watchStub = WatchGrpc.newStub(channel); keepAlive = new KeepAlive(channel, config.getTimeToLiveSeconds()); } @Override public String get(final String key) { final RangeRequest request = RangeRequest.newBuilder().setKey(ByteString.copyFromUtf8(key)).build(); return etcdRetryEngine.execute(new Callable<String>() { @Override public String call() throws InterruptedException, ExecutionException, TimeoutException { RangeResponse response = kvStub.range(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS); return response.getKvsCount() > 0 ? response.getKvs(0).getValue().toStringUtf8() : null; } }).orNull(); } @Override public String getDirectly(final String key) { return get(key); } @Override public boolean isExisted(final String key) { return null != get(key); } @Override public List<String> getChildrenKeys(final String key) { String path = key + "/"; final RangeRequest request = RangeRequest.newBuilder().setKey(ByteString.copyFromUtf8(path)).setRangeEnd(getRangeEnd(path)).build(); Optional<List<String>> result = etcdRetryEngine.execute(new Callable<List<String>>() { @Override public List<String> call() throws InterruptedException, ExecutionException, TimeoutException { RangeResponse response = kvStub.range(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS); List<String> result = new ArrayList<>(); for (KeyValue each : response.getKvsList()) { String childFullPath = each.getKey().toStringUtf8(); result.add(childFullPath.substring(childFullPath.lastIndexOf("/") + 1)); } return result; } }); return result.isPresent() ? result.get() : Collections.<String>emptyList(); } @Override public void persist(final String key, final String value) { final PutRequest request = PutRequest.newBuilder().setPrevKv(true).setKey(ByteString.copyFromUtf8(key)).setValue(ByteString.copyFromUtf8(value)).build(); etcdRetryEngine.execute(new Callable<Void>() { @Override public Void call() throws InterruptedException, ExecutionException, TimeoutException { kvStub.put(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS); return null; } }); } @Override public void update(final String key, final String value) { persist(key, value); } @Override public void persistEphemeral(final String key, final String value) { final Optional<Long> leaseId = lease(); if (!leaseId.isPresent()) { throw new RegistryCenterException("Unable to set up heat beat for key %s", key); } final PutRequest request = PutRequest.newBuilder().setPrevKv(true).setLease(leaseId.get()).setKey(ByteString.copyFromUtf8(key)).setValue(ByteString.copyFromUtf8(value)).build(); etcdRetryEngine.execute(new Callable<Void>() { @Override public Void call() throws InterruptedException, ExecutionException, TimeoutException { kvStub.put(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS); return null; } }); } private Optional<Long> lease() { final LeaseGrantRequest request = LeaseGrantRequest.newBuilder().setTTL(config.getTimeToLiveSeconds()).build(); return etcdRetryEngine.execute(new Callable<Long>() { @Override public Long call() throws InterruptedException, ExecutionException, TimeoutException { long leaseId = leaseStub.leaseGrant(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS).getID(); keepAlive.heartbeat(leaseId); return leaseId; } }); } @Override public void watch(final String key, final DataChangedEventListener dataChangedEventListener) { WatchCreateRequest createWatchRequest = WatchCreateRequest.newBuilder().setKey(ByteString.copyFromUtf8(key)).setRangeEnd(getRangeEnd(key)).build(); final WatchRequest request = WatchRequest.newBuilder().setCreateRequest(createWatchRequest).build(); etcdRetryEngine.execute(new Callable<Void>() { @Override public Void call() { watchStub.watch(new EtcdWatchStreamObserver(dataChangedEventListener)).onNext(request); return null; } }); } @Override public void close() { keepAlive.close(); } private ByteString getRangeEnd(final String key) { byte[] noPrefix = {0}; byte[] endKey = key.getBytes().clone(); for (int i = endKey.length - 1; i >= 0; i--) { if (endKey[i] < 0xff) { endKey[i] = (byte) (endKey[i] + 1); return ByteString.copyFrom(Arrays.copyOf(endKey, i + 1)); } } return ByteString.copyFrom(noPrefix); } } sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-etcd/src/main/java/org/apache/shardingsphere/orchestration/reg/etcd/internal/channel/EtcdChannelFactory.javadeleted 100644 → 0 +0 −59 Original line number Diff line number Diff line /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.shardingsphere.orchestration.reg.etcd.internal.channel; import io.grpc.Channel; import io.grpc.netty.NettyChannelBuilder; import io.grpc.util.RoundRobinLoadBalancerFactory; import lombok.AccessLevel; import lombok.NoArgsConstructor; import java.util.List; import java.util.concurrent.ConcurrentHashMap; /** * Etcd channel factory. * * @author zhangliang */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class EtcdChannelFactory { private static final String TARGET = "etcd"; private static ConcurrentHashMap<List<String>, Channel> etcdChannels = new ConcurrentHashMap<>(); /** * Get etcd channel instance. * * @param endpoints etcd endpoints * @return etcd channel */ public static Channel getInstance(final List<String> endpoints) { if (etcdChannels.containsKey(endpoints)) { return etcdChannels.get(endpoints); } Channel channel = NettyChannelBuilder.forTarget(TARGET) .usePlaintext(true) .nameResolverFactory(new EtcdNameSolverFactory(TARGET, endpoints)) .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance()) .build(); Channel result = etcdChannels.putIfAbsent(endpoints, channel); return null == result ? channel : result; } } Loading
pom.xml +0 −47 Original line number Diff line number Diff line Loading @@ -67,10 +67,6 @@ <atomikos.version>4.0.6</atomikos.version> <curator.version>2.10.0</curator.version> <grpc.version>1.7.0</grpc.version> <protobuf.version>3.4.0</protobuf.version> <guava-retrying.version>2.0.0</guava-retrying.version> <opentracing.version>0.30.0</opentracing.version> <lombok.version>1.16.4</lombok.version> Loading Loading @@ -119,7 +115,6 @@ <jdepend-maven-plugin.version>2.0</jdepend-maven-plugin.version> <taglist-maven-plugin.version>2.4</taglist-maven-plugin.version> <maven-gpg-plugin.version>1.6</maven-gpg-plugin.version> <protobuf-maven-plugin.version>0.5.0</protobuf-maven-plugin.version> <os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version> <docker-maven-plugin.version>0.4.14</docker-maven-plugin.version> <apache-rat-plugin.version>0.12</apache-rat-plugin.version> Loading Loading @@ -222,26 +217,6 @@ <artifactId>curator-recipes</artifactId> <version>${curator.version}</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty</artifactId> <version>${grpc.version}</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> <version>${grpc.version}</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> <version>${grpc.version}</version> </dependency> <dependency> <groupId>com.github.rholder</groupId> <artifactId>guava-retrying</artifactId> <version>${guava-retrying.version}</version> </dependency> <dependency> <groupId>io.opentracing</groupId> Loading Loading @@ -685,24 +660,6 @@ </execution> </executions> </plugin> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>${protobuf-maven-plugin.version}</version> <configuration> <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact> <pluginId>grpc-java</pluginId> <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>compile-custom</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.eluder.coveralls</groupId> <artifactId>coveralls-maven-plugin</artifactId> Loading Loading @@ -733,11 +690,7 @@ </ignoreMethodAnnotations> <excludes> <exclude>org/apache/shardingsphere/core/parse/antlr/autogen/*.class</exclude> <exclude>authpb/*.class</exclude> <exclude>mvccpb/*.class</exclude> <exclude>etcdserverpb/*.class</exclude> <exclude>org/apache/shardingsphere/orchestration/reg/zookeeper/*.class</exclude> <exclude>org/apache/shardingsphere/orchestration/reg/etcd/*.class</exclude> <exclude>org/apache/shardingsphere/**/*Test.class</exclude> <exclude>org/apache/shardingsphere/**/Test*.class</exclude> </excludes> Loading
sharding-orchestration/sharding-orchestration-reg/pom.xml +0 −1 Original line number Diff line number Diff line Loading @@ -30,6 +30,5 @@ <modules> <module>sharding-orchestration-reg-api</module> <module>sharding-orchestration-reg-zookeeper-curator</module> <module>sharding-orchestration-reg-etcd</module> </modules> </project>
sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-etcd/pom.xmldeleted 100644 → 0 +0 −53 Original line number Diff line number Diff line <?xml version="1.0" encoding="UTF-8"?> <!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more ~ contributor license agreements. See the NOTICE file distributed with ~ this work for additional information regarding copyright ownership. ~ The ASF licenses this file to You under the Apache License, Version 2.0 ~ (the "License"); you may not use this file except in compliance with ~ the License. You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0 ~ ~ Unless required by applicable law or agreed to in writing, software ~ distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the License for the specific language governing permissions and ~ limitations under the License. --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-orchestration-reg</artifactId> <version>4.0.0-RC1-SNAPSHOT</version> </parent> <artifactId>sharding-orchestration-reg-etcd</artifactId> <name>${project.artifactId}</name> <dependencies> <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-orchestration-reg-api</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty</artifactId> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> </dependency> <dependency> <groupId>com.github.rholder</groupId> <artifactId>guava-retrying</artifactId> </dependency> </dependencies> </project>
sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-etcd/src/main/java/org/apache/shardingsphere/orchestration/reg/etcd/EtcdRegistryCenter.javadeleted 100644 → 0 +0 −206 Original line number Diff line number Diff line /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.shardingsphere.orchestration.reg.etcd; import com.google.common.base.Optional; import com.google.common.base.Splitter; import com.google.protobuf.ByteString; import etcdserverpb.KVGrpc; import etcdserverpb.KVGrpc.KVFutureStub; import etcdserverpb.LeaseGrpc; import etcdserverpb.LeaseGrpc.LeaseFutureStub; import etcdserverpb.Rpc.LeaseGrantRequest; import etcdserverpb.Rpc.PutRequest; import etcdserverpb.Rpc.RangeRequest; import etcdserverpb.Rpc.RangeResponse; import etcdserverpb.Rpc.WatchCreateRequest; import etcdserverpb.Rpc.WatchRequest; import etcdserverpb.WatchGrpc; import etcdserverpb.WatchGrpc.WatchStub; import io.grpc.Channel; import mvccpb.Kv.KeyValue; import org.apache.shardingsphere.orchestration.reg.api.RegistryCenter; import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration; import org.apache.shardingsphere.orchestration.reg.etcd.internal.channel.EtcdChannelFactory; import org.apache.shardingsphere.orchestration.reg.etcd.internal.keepalive.KeepAlive; import org.apache.shardingsphere.orchestration.reg.etcd.internal.retry.EtcdRetryEngine; import org.apache.shardingsphere.orchestration.reg.etcd.internal.watcher.EtcdWatchStreamObserver; import org.apache.shardingsphere.orchestration.reg.exception.RegistryCenterException; import org.apache.shardingsphere.orchestration.reg.listener.DataChangedEventListener; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * Etcd based registry center. * * @author junxiong */ public final class EtcdRegistryCenter implements RegistryCenter { private RegistryCenterConfiguration config; private EtcdRetryEngine etcdRetryEngine; private KVFutureStub kvStub; private LeaseFutureStub leaseStub; private WatchStub watchStub; private KeepAlive keepAlive; @Override public void init(final RegistryCenterConfiguration config) { this.config = config; etcdRetryEngine = new EtcdRetryEngine(config); Channel channel = EtcdChannelFactory.getInstance(Splitter.on(',').trimResults().splitToList(config.getServerLists())); kvStub = KVGrpc.newFutureStub(channel); leaseStub = LeaseGrpc.newFutureStub(channel); watchStub = WatchGrpc.newStub(channel); keepAlive = new KeepAlive(channel, config.getTimeToLiveSeconds()); } @Override public String get(final String key) { final RangeRequest request = RangeRequest.newBuilder().setKey(ByteString.copyFromUtf8(key)).build(); return etcdRetryEngine.execute(new Callable<String>() { @Override public String call() throws InterruptedException, ExecutionException, TimeoutException { RangeResponse response = kvStub.range(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS); return response.getKvsCount() > 0 ? response.getKvs(0).getValue().toStringUtf8() : null; } }).orNull(); } @Override public String getDirectly(final String key) { return get(key); } @Override public boolean isExisted(final String key) { return null != get(key); } @Override public List<String> getChildrenKeys(final String key) { String path = key + "/"; final RangeRequest request = RangeRequest.newBuilder().setKey(ByteString.copyFromUtf8(path)).setRangeEnd(getRangeEnd(path)).build(); Optional<List<String>> result = etcdRetryEngine.execute(new Callable<List<String>>() { @Override public List<String> call() throws InterruptedException, ExecutionException, TimeoutException { RangeResponse response = kvStub.range(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS); List<String> result = new ArrayList<>(); for (KeyValue each : response.getKvsList()) { String childFullPath = each.getKey().toStringUtf8(); result.add(childFullPath.substring(childFullPath.lastIndexOf("/") + 1)); } return result; } }); return result.isPresent() ? result.get() : Collections.<String>emptyList(); } @Override public void persist(final String key, final String value) { final PutRequest request = PutRequest.newBuilder().setPrevKv(true).setKey(ByteString.copyFromUtf8(key)).setValue(ByteString.copyFromUtf8(value)).build(); etcdRetryEngine.execute(new Callable<Void>() { @Override public Void call() throws InterruptedException, ExecutionException, TimeoutException { kvStub.put(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS); return null; } }); } @Override public void update(final String key, final String value) { persist(key, value); } @Override public void persistEphemeral(final String key, final String value) { final Optional<Long> leaseId = lease(); if (!leaseId.isPresent()) { throw new RegistryCenterException("Unable to set up heat beat for key %s", key); } final PutRequest request = PutRequest.newBuilder().setPrevKv(true).setLease(leaseId.get()).setKey(ByteString.copyFromUtf8(key)).setValue(ByteString.copyFromUtf8(value)).build(); etcdRetryEngine.execute(new Callable<Void>() { @Override public Void call() throws InterruptedException, ExecutionException, TimeoutException { kvStub.put(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS); return null; } }); } private Optional<Long> lease() { final LeaseGrantRequest request = LeaseGrantRequest.newBuilder().setTTL(config.getTimeToLiveSeconds()).build(); return etcdRetryEngine.execute(new Callable<Long>() { @Override public Long call() throws InterruptedException, ExecutionException, TimeoutException { long leaseId = leaseStub.leaseGrant(request).get(config.getOperationTimeoutMilliseconds(), TimeUnit.MILLISECONDS).getID(); keepAlive.heartbeat(leaseId); return leaseId; } }); } @Override public void watch(final String key, final DataChangedEventListener dataChangedEventListener) { WatchCreateRequest createWatchRequest = WatchCreateRequest.newBuilder().setKey(ByteString.copyFromUtf8(key)).setRangeEnd(getRangeEnd(key)).build(); final WatchRequest request = WatchRequest.newBuilder().setCreateRequest(createWatchRequest).build(); etcdRetryEngine.execute(new Callable<Void>() { @Override public Void call() { watchStub.watch(new EtcdWatchStreamObserver(dataChangedEventListener)).onNext(request); return null; } }); } @Override public void close() { keepAlive.close(); } private ByteString getRangeEnd(final String key) { byte[] noPrefix = {0}; byte[] endKey = key.getBytes().clone(); for (int i = endKey.length - 1; i >= 0; i--) { if (endKey[i] < 0xff) { endKey[i] = (byte) (endKey[i] + 1); return ByteString.copyFrom(Arrays.copyOf(endKey, i + 1)); } } return ByteString.copyFrom(noPrefix); } }
sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-etcd/src/main/java/org/apache/shardingsphere/orchestration/reg/etcd/internal/channel/EtcdChannelFactory.javadeleted 100644 → 0 +0 −59 Original line number Diff line number Diff line /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.shardingsphere.orchestration.reg.etcd.internal.channel; import io.grpc.Channel; import io.grpc.netty.NettyChannelBuilder; import io.grpc.util.RoundRobinLoadBalancerFactory; import lombok.AccessLevel; import lombok.NoArgsConstructor; import java.util.List; import java.util.concurrent.ConcurrentHashMap; /** * Etcd channel factory. * * @author zhangliang */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class EtcdChannelFactory { private static final String TARGET = "etcd"; private static ConcurrentHashMap<List<String>, Channel> etcdChannels = new ConcurrentHashMap<>(); /** * Get etcd channel instance. * * @param endpoints etcd endpoints * @return etcd channel */ public static Channel getInstance(final List<String> endpoints) { if (etcdChannels.containsKey(endpoints)) { return etcdChannels.get(endpoints); } Channel channel = NettyChannelBuilder.forTarget(TARGET) .usePlaintext(true) .nameResolverFactory(new EtcdNameSolverFactory(TARGET, endpoints)) .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance()) .build(); Channel result = etcdChannels.putIfAbsent(endpoints, channel); return null == result ? channel : result; } }