Loading apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneDataMonitor.java 0 → 100644 +71 −0 Original line number Diff line number Diff line package org.skywalking.apm.collector.cluster.standalone; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import org.skywalking.apm.collector.client.h2.H2Client; import org.skywalking.apm.collector.client.zookeeper.util.PathUtils; import org.skywalking.apm.collector.core.CollectorException; import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.client.DataMonitor; import org.skywalking.apm.collector.core.cluster.ClusterDataListener; import org.skywalking.apm.collector.core.module.ModuleRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author pengys5 */ public class ClusterStandaloneDataMonitor implements DataMonitor { private final Logger logger = LoggerFactory.getLogger(ClusterStandaloneDataMonitor.class); private H2Client client; private Map<String, ClusterDataListener> listeners; private Map<String, ModuleRegistration> registrations; public ClusterStandaloneDataMonitor() { listeners = new LinkedHashMap<>(); registrations = new LinkedHashMap<>(); } @Override public void setClient(Client client) { this.client = (H2Client)client; } @Override public void addListener(ClusterDataListener listener, ModuleRegistration registration) throws ClientException { String path = PathUtils.convertKey2Path(listener.path()); logger.info("listener path: {}", path); listeners.put(path, listener); registrations.put(path, registration); } @Override public ClusterDataListener getListener(String path) { path = PathUtils.convertKey2Path(path); return listeners.get(path); } @Override public void createPath(String path) throws ClientException { } @Override public void setData(String path, String value) throws ClientException { if (listeners.containsKey(path)) { listeners.get(path).addAddress(value); listeners.get(path).serverJoinNotify(value); } } @Override public void start() throws CollectorException { Iterator<Map.Entry<String, ModuleRegistration>> entryIterator = registrations.entrySet().iterator(); while (entryIterator.hasNext()) { Map.Entry<String, ModuleRegistration> next = entryIterator.next(); ModuleRegistration.Value value = next.getValue().buildValue(); String contextPath = value.getContextPath() == null ? "" : value.getContextPath(); setData(next.getKey(), value.getHostPort() + contextPath); } } } apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleDefine.java +8 −2 Original line number Diff line number Diff line Loading @@ -15,6 +15,12 @@ public class ClusterStandaloneModuleDefine extends ClusterModuleDefine { public static final String MODULE_NAME = "standalone"; private final ClusterStandaloneDataMonitor dataMonitor; public ClusterStandaloneModuleDefine() { this.dataMonitor = new ClusterStandaloneDataMonitor(); } @Override public String group() { return ClusterModuleGroupDefine.GROUP_NAME; } Loading @@ -32,7 +38,7 @@ public class ClusterStandaloneModuleDefine extends ClusterModuleDefine { } @Override public DataMonitor dataMonitor() { return null; return dataMonitor; } @Override protected Client createClient() { Loading @@ -40,6 +46,6 @@ public class ClusterStandaloneModuleDefine extends ClusterModuleDefine { } @Override public ClusterModuleRegistrationReader registrationReader() { return null; return new ClusterStandaloneModuleRegistrationReader(dataMonitor); } } Loading
apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneDataMonitor.java 0 → 100644 +71 −0 Original line number Diff line number Diff line package org.skywalking.apm.collector.cluster.standalone; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import org.skywalking.apm.collector.client.h2.H2Client; import org.skywalking.apm.collector.client.zookeeper.util.PathUtils; import org.skywalking.apm.collector.core.CollectorException; import org.skywalking.apm.collector.core.client.Client; import org.skywalking.apm.collector.core.client.ClientException; import org.skywalking.apm.collector.core.client.DataMonitor; import org.skywalking.apm.collector.core.cluster.ClusterDataListener; import org.skywalking.apm.collector.core.module.ModuleRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author pengys5 */ public class ClusterStandaloneDataMonitor implements DataMonitor { private final Logger logger = LoggerFactory.getLogger(ClusterStandaloneDataMonitor.class); private H2Client client; private Map<String, ClusterDataListener> listeners; private Map<String, ModuleRegistration> registrations; public ClusterStandaloneDataMonitor() { listeners = new LinkedHashMap<>(); registrations = new LinkedHashMap<>(); } @Override public void setClient(Client client) { this.client = (H2Client)client; } @Override public void addListener(ClusterDataListener listener, ModuleRegistration registration) throws ClientException { String path = PathUtils.convertKey2Path(listener.path()); logger.info("listener path: {}", path); listeners.put(path, listener); registrations.put(path, registration); } @Override public ClusterDataListener getListener(String path) { path = PathUtils.convertKey2Path(path); return listeners.get(path); } @Override public void createPath(String path) throws ClientException { } @Override public void setData(String path, String value) throws ClientException { if (listeners.containsKey(path)) { listeners.get(path).addAddress(value); listeners.get(path).serverJoinNotify(value); } } @Override public void start() throws CollectorException { Iterator<Map.Entry<String, ModuleRegistration>> entryIterator = registrations.entrySet().iterator(); while (entryIterator.hasNext()) { Map.Entry<String, ModuleRegistration> next = entryIterator.next(); ModuleRegistration.Value value = next.getValue().buildValue(); String contextPath = value.getContextPath() == null ? "" : value.getContextPath(); setData(next.getKey(), value.getHostPort() + contextPath); } } }
apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/standalone/ClusterStandaloneModuleDefine.java +8 −2 Original line number Diff line number Diff line Loading @@ -15,6 +15,12 @@ public class ClusterStandaloneModuleDefine extends ClusterModuleDefine { public static final String MODULE_NAME = "standalone"; private final ClusterStandaloneDataMonitor dataMonitor; public ClusterStandaloneModuleDefine() { this.dataMonitor = new ClusterStandaloneDataMonitor(); } @Override public String group() { return ClusterModuleGroupDefine.GROUP_NAME; } Loading @@ -32,7 +38,7 @@ public class ClusterStandaloneModuleDefine extends ClusterModuleDefine { } @Override public DataMonitor dataMonitor() { return null; return dataMonitor; } @Override protected Client createClient() { Loading @@ -40,6 +46,6 @@ public class ClusterStandaloneModuleDefine extends ClusterModuleDefine { } @Override public ClusterModuleRegistrationReader registrationReader() { return null; return new ClusterStandaloneModuleRegistrationReader(dataMonitor); } }