Loading skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMember.java +1 −1 Original line number Diff line number Diff line Loading @@ -27,7 +27,7 @@ public abstract class AbstractMember implements EventHandler<MessageHolder> { this.actorRef = actorRef; } public abstract void beTold(Object message) throws Exception; protected abstract void beTold(Object message) throws Exception; /** * Receive the message to analyse. Loading skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java +12 −0 Original line number Diff line number Diff line Loading @@ -82,6 +82,14 @@ public abstract class AbstractWorker extends UntypedActor { ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp) message; logger.info("receive ClusterEvent.MemberUp message, address: %s", memberUp.member().address().toString()); register(memberUp.member()); } else if (message instanceof ClusterEvent.MemberEvent) { System.out.println("other event: " + message.getClass().getSimpleName()); } else if (message instanceof ClusterEvent.UnreachableMember) { System.out.println("other event: " + message.getClass().getSimpleName()); } else if (message instanceof ClusterEvent.MemberJoined) { System.out.println("other event: " + message.getClass().getSimpleName()); } else if (message instanceof ClusterEvent.ReachableMember) { System.out.println("other event: " + message.getClass().getSimpleName()); } else { logger.debug("worker class: %s, message class: %s", this.getClass().getName(), message.getClass().getName()); receive(message); Loading @@ -101,6 +109,10 @@ public abstract class AbstractWorker extends UntypedActor { selector.select(availableWorks, message).tell(message, getSelf()); } public void tell(AbstractMember targetMember, Object message) throws Exception { targetMember.beTold(message); } /** * When member role is {@link WorkersListener#WorkName} then Select actor from context * and send register message to {@link WorkersListener} Loading skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractHashMessage.java→skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/selector/AbstractHashMessage.java +3 −3 Original line number Diff line number Diff line package com.a.eye.skywalking.collector.actor; package com.a.eye.skywalking.collector.actor.selector; /** * @author pengys5 Loading @@ -6,11 +6,11 @@ package com.a.eye.skywalking.collector.actor; public abstract class AbstractHashMessage { private int hashCode; public void setHashCode(String key) { public AbstractHashMessage(String key) { this.hashCode = key.hashCode(); } public int getHashCode() { protected int getHashCode() { return hashCode; } } skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/selector/HashCodeSelector.java +0 −1 Original line number Diff line number Diff line package com.a.eye.skywalking.collector.actor.selector; import com.a.eye.skywalking.collector.actor.AbstractHashMessage; import com.a.eye.skywalking.collector.actor.AbstractWorker; import com.a.eye.skywalking.collector.actor.WorkerRef; Loading skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenter.java +1 −0 Original line number Diff line number Diff line Loading @@ -24,6 +24,7 @@ public enum WorkersRefCenter { private Map<ActorRef, WorkerRef> actorRefToWorkerRef = new ConcurrentHashMap<>(); public void register(ActorRef newActorRef, String workerRole) { System.out.println("register: " + workerRole); if (!roleToWorkerRef.containsKey(workerRole)) { List<WorkerRef> actorList = Collections.synchronizedList(new ArrayList<WorkerRef>()); roleToWorkerRef.putIfAbsent(workerRole, actorList); Loading Loading
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMember.java +1 −1 Original line number Diff line number Diff line Loading @@ -27,7 +27,7 @@ public abstract class AbstractMember implements EventHandler<MessageHolder> { this.actorRef = actorRef; } public abstract void beTold(Object message) throws Exception; protected abstract void beTold(Object message) throws Exception; /** * Receive the message to analyse. Loading
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java +12 −0 Original line number Diff line number Diff line Loading @@ -82,6 +82,14 @@ public abstract class AbstractWorker extends UntypedActor { ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp) message; logger.info("receive ClusterEvent.MemberUp message, address: %s", memberUp.member().address().toString()); register(memberUp.member()); } else if (message instanceof ClusterEvent.MemberEvent) { System.out.println("other event: " + message.getClass().getSimpleName()); } else if (message instanceof ClusterEvent.UnreachableMember) { System.out.println("other event: " + message.getClass().getSimpleName()); } else if (message instanceof ClusterEvent.MemberJoined) { System.out.println("other event: " + message.getClass().getSimpleName()); } else if (message instanceof ClusterEvent.ReachableMember) { System.out.println("other event: " + message.getClass().getSimpleName()); } else { logger.debug("worker class: %s, message class: %s", this.getClass().getName(), message.getClass().getName()); receive(message); Loading @@ -101,6 +109,10 @@ public abstract class AbstractWorker extends UntypedActor { selector.select(availableWorks, message).tell(message, getSelf()); } public void tell(AbstractMember targetMember, Object message) throws Exception { targetMember.beTold(message); } /** * When member role is {@link WorkersListener#WorkName} then Select actor from context * and send register message to {@link WorkersListener} Loading
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractHashMessage.java→skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/selector/AbstractHashMessage.java +3 −3 Original line number Diff line number Diff line package com.a.eye.skywalking.collector.actor; package com.a.eye.skywalking.collector.actor.selector; /** * @author pengys5 Loading @@ -6,11 +6,11 @@ package com.a.eye.skywalking.collector.actor; public abstract class AbstractHashMessage { private int hashCode; public void setHashCode(String key) { public AbstractHashMessage(String key) { this.hashCode = key.hashCode(); } public int getHashCode() { protected int getHashCode() { return hashCode; } }
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/selector/HashCodeSelector.java +0 −1 Original line number Diff line number Diff line package com.a.eye.skywalking.collector.actor.selector; import com.a.eye.skywalking.collector.actor.AbstractHashMessage; import com.a.eye.skywalking.collector.actor.AbstractWorker; import com.a.eye.skywalking.collector.actor.WorkerRef; Loading
skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersRefCenter.java +1 −0 Original line number Diff line number Diff line Loading @@ -24,6 +24,7 @@ public enum WorkersRefCenter { private Map<ActorRef, WorkerRef> actorRefToWorkerRef = new ConcurrentHashMap<>(); public void register(ActorRef newActorRef, String workerRole) { System.out.println("register: " + workerRole); if (!roleToWorkerRef.containsKey(workerRole)) { List<WorkerRef> actorList = Collections.synchronizedList(new ArrayList<WorkerRef>()); roleToWorkerRef.putIfAbsent(workerRole, actorList); Loading