Commit c2a1ef4d authored by pengys5's avatar pengys5
Browse files

add ClusterEvent.UnreachableMember in WorkersListener to unregister from WorkersRefCenter

parent fce707e7
Loading
Loading
Loading
Loading
+0 −15
Original line number Diff line number Diff line
@@ -47,7 +47,6 @@ public abstract class AbstractWorker extends UntypedActor {
    @Override
    public void preStart() throws Exception {
        cluster.subscribe(getSelf(), ClusterEvent.MemberUp.class);
        register();
    }

    @Override
@@ -71,7 +70,6 @@ public abstract class AbstractWorker extends UntypedActor {
    @Override
    public void onReceive(Object message) throws Throwable {
        if (message instanceof ClusterEvent.CurrentClusterState) {
            logger.info("receive ClusterEvent.CurrentClusterState message");
            ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState) message;
            for (Member member : state.getMembers()) {
                if (member.status().equals(MemberStatus.up())) {
@@ -82,14 +80,6 @@ public abstract class AbstractWorker extends UntypedActor {
            ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp) message;
            logger.info("receive ClusterEvent.MemberUp message, address: %s", memberUp.member().address().toString());
            register(memberUp.member());
        } else if (message instanceof ClusterEvent.MemberEvent) {
            System.out.println("other event: " + message.getClass().getSimpleName());
        } else if (message instanceof ClusterEvent.UnreachableMember) {
            System.out.println("other event: " + message.getClass().getSimpleName());
        } else if (message instanceof ClusterEvent.MemberJoined) {
            System.out.println("other event: " + message.getClass().getSimpleName());
        } else if (message instanceof ClusterEvent.ReachableMember) {
            System.out.println("other event: " + message.getClass().getSimpleName());
        } else {
            logger.debug("worker class: %s, message class: %s", this.getClass().getName(), message.getClass().getName());
            receive(message);
@@ -126,9 +116,4 @@ public abstract class AbstractWorker extends UntypedActor {
            getContext().actorSelection(member.address() + "/user/" + WorkersListener.WorkName).tell(registerMessage, getSelf());
        }
    }

    void register() {
        WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(getClass().getSimpleName());
        getContext().actorSelection("/user/" + WorkersListener.WorkName).tell(registerMessage, getSelf());
    }
}
+5 −0
Original line number Diff line number Diff line
package com.a.eye.skywalking.collector.actor;

import akka.actor.ActorPath;
import akka.actor.ActorRef;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -26,6 +27,10 @@ public class WorkerRef {
        actorRef.tell(message, sender);
    }

    public ActorPath path() {
        return actorRef.path();
    }

    public String getWorkerRole() {
        return workerRole;
    }
+13 −1
Original line number Diff line number Diff line
@@ -3,6 +3,8 @@ package com.a.eye.skywalking.collector.cluster;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@@ -24,12 +26,19 @@ public class WorkersListener extends UntypedActor {

    public static final String WorkName = "WorkersListener";

    private Cluster cluster = Cluster.get(getContext().system());

    @Override
    public void preStart() throws Exception {
        cluster.subscribe(getSelf(), ClusterEvent.UnreachableMember.class);
    }

    @Override
    public void onReceive(Object message) throws Throwable {
        if (message instanceof WorkerListenerMessage.RegisterMessage) {
            WorkerListenerMessage.RegisterMessage register = (WorkerListenerMessage.RegisterMessage) message;
            ActorRef sender = getSender();
            getContext().watch(sender);
//            getContext().watch(sender);

            logger.info("register worker of role: %s, path: %s", register.getWorkRole(), sender.toString());

@@ -37,6 +46,9 @@ public class WorkersListener extends UntypedActor {
        } else if (message instanceof Terminated) {
            Terminated terminated = (Terminated) message;
            WorkersRefCenter.INSTANCE.unregister(terminated.getActor());
        } else if (message instanceof ClusterEvent.UnreachableMember) {
            ClusterEvent.UnreachableMember unreachableMember = (ClusterEvent.UnreachableMember) message;
            WorkersRefCenter.INSTANCE.unregister(unreachableMember.member().address());
        } else {
            unhandled(message);
        }
+24 −5
Original line number Diff line number Diff line
package com.a.eye.skywalking.collector.cluster;

import akka.actor.ActorRef;
import akka.actor.Address;
import com.a.eye.skywalking.collector.actor.WorkerRef;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
@@ -24,13 +22,13 @@ public enum WorkersRefCenter {
    private Map<ActorRef, WorkerRef> actorRefToWorkerRef = new ConcurrentHashMap<>();

    public void register(ActorRef newActorRef, String workerRole) {
        System.out.println("register: " + workerRole);
        if (!roleToWorkerRef.containsKey(workerRole)) {
            List<WorkerRef> actorList = Collections.synchronizedList(new ArrayList<WorkerRef>());
            roleToWorkerRef.putIfAbsent(workerRole, actorList);
        }

        WorkerRef newWorkerRef = new WorkerRef(newActorRef, workerRole);

        roleToWorkerRef.get(workerRole).add(newWorkerRef);
        actorRefToWorkerRef.put(newActorRef, newWorkerRef);
    }
@@ -41,6 +39,27 @@ public enum WorkersRefCenter {
        actorRefToWorkerRef.remove(oldActorRef);
    }

    public void unregister(Address address) {
        Iterator<ActorRef> actorRefToWorkerRefIterator = actorRefToWorkerRef.keySet().iterator();
        while (actorRefToWorkerRefIterator.hasNext()) {
            if (address.equals(actorRefToWorkerRefIterator.next().path().address())) {
                actorRefToWorkerRefIterator.remove();
            }
        }

        Iterator<Map.Entry<String, List<WorkerRef>>> roleToWorkerRefIterator = roleToWorkerRef.entrySet().iterator();
        while (roleToWorkerRefIterator.hasNext()) {
            List<WorkerRef> workerRefList = roleToWorkerRefIterator.next().getValue();

            Iterator<WorkerRef> workerRefIterator = workerRefList.iterator();
            while (workerRefIterator.hasNext()) {
                if (workerRefIterator.next().path().address().equals(address)) {
                    workerRefIterator.remove();
                }
            }
        }
    }

    /**
     * Get all available {@link WorkerRef} list, by the given worker role.
     *
+16 −2
Original line number Diff line number Diff line
@@ -20,4 +20,18 @@ akka {
    //    serialize-messages = on
    warn-about-java-serializer-usage = on
  }

  remote {
    log-remote-lifecycle-events = off

    netty.tcp {
      hostname = "127.0.0.1"
      port = 1000
    }
  }

  cluster {
    auto-down-unreachable-after = off
    metrics.enabled = off
  }
}
 No newline at end of file
Loading