Commit c440bdf5 authored by pengys5's avatar pengys5
Browse files

refactor cluster module, modify the way use to tell cluster worker

parent 50193bee
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -5,7 +5,7 @@
    <modules>
        <module>skywalking-collector-cluster</module>
        <module>skywalking-collector-worker</module>
        <module>skywalking-collector-recevier</module>
        <module>skywalking-collector-role</module>
    </modules>
    <parent>
        <artifactId>skywalking</artifactId>
+79 −0
Original line number Diff line number Diff line
package com.a.eye.skywalking.collector;

import akka.actor.ActorSystem;
import akka.actor.Props;
import com.a.eye.skywalking.collector.actor.AbstractClusterWorkerProvider;
import com.a.eye.skywalking.collector.actor.AbstractLocalWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.DuplicateProviderException;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ServiceLoader;

/**
 * @author pengys5
 */
public enum CollectorSystem {
    INSTANCE;

    private Logger logger = LogManager.getFormatterLogger(CollectorSystem.class);

    private ClusterWorkerContext clusterContext;

    public ClusterWorkerContext getClusterContext() {
        return clusterContext;
    }

    public void boot() throws Exception {
        createAkkaSystem();
        createListener();
        createLocalProvider();

        createClusterWorker();
    }

    public void terminate() {
        clusterContext.getAkkaSystem().terminate();
    }

    private void createAkkaSystem() {
        ClusterConfigInitializer.initialize("collector.config");

        final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + ClusterConfig.Cluster.Current.hostname).
                withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + ClusterConfig.Cluster.Current.port)).
                withFallback(ConfigFactory.parseString("akka.cluster.roles=" + ClusterConfig.Cluster.Current.roles)).
                withFallback(ConfigFactory.parseString("akka.actor.provider=" + ClusterConfig.Cluster.provider)).
                withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + ClusterConfig.Cluster.nodes)).
                withFallback(ConfigFactory.load("application.conf"));
        ActorSystem akkaSystem = ActorSystem.create("ClusterSystem", config);

        clusterContext = new ClusterWorkerContext(akkaSystem);
    }

    private void createListener() {
        clusterContext.getAkkaSystem().actorOf(Props.create(WorkersListener.class, clusterContext), WorkersListener.WorkName);
    }

    private void createClusterWorker() throws Exception {
        ServiceLoader<AbstractClusterWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractClusterWorkerProvider.class);
        for (AbstractClusterWorkerProvider provider : clusterServiceLoader) {
            logger.info("create {%s} worker {%s} using java service loader", provider.workerNum(), provider.workerClass().getName());
            for (int i = 1; i <= provider.workerNum(); i++) {
                provider.create(clusterContext, null);
            }
        }
    }

    private void createLocalProvider() throws DuplicateProviderException {
        ServiceLoader<AbstractLocalWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractLocalWorkerProvider.class);
        for (AbstractLocalWorkerProvider provider : clusterServiceLoader) {
            clusterContext.putProvider(provider);
        }
    }
}
+0 −37
Original line number Diff line number Diff line
package com.a.eye.skywalking.collector.actor;

import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.RingBuffer;

/**
 * @author pengys5
 */
public abstract class AbstractAsyncMember extends AbstractMember {

    private RingBuffer<MessageHolder> ringBuffer;

    public AbstractAsyncMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
        super(actorRef);
        this.ringBuffer = ringBuffer;
    }

    public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception {
        Object message = event.getMessage();
        event.reset();
        receive(message);
        if (endOfBatch) {
            receive(new EndOfBatchCommand());
        }
    }

    public void beTold(Object message) throws Exception {
        long sequence = ringBuffer.next();
        try {
            ringBuffer.get(sequence).setMessage(message);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}
+82 −0
Original line number Diff line number Diff line
package com.a.eye.skywalking.collector.actor;

import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
import com.a.eye.skywalking.collector.cluster.WorkerListenerMessage;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
 * @author pengys5
 */
public abstract class AbstractClusterWorker extends AbstractWorker {

    public AbstractClusterWorker(Role role, ClusterWorkerContext clusterContext) throws Exception {
        super(role, clusterContext);
    }

    static class WorkerWithAkka extends UntypedActor {

        private Logger logger = LogManager.getFormatterLogger(WorkerWithAkka.class);

        private Cluster cluster = Cluster.get(getContext().system());

        private final AbstractClusterWorker ownerWorker;

        public WorkerWithAkka(AbstractClusterWorker ownerWorker) {
            this.ownerWorker = ownerWorker;
        }

        @Override
        public void preStart() throws Exception {
            cluster.subscribe(getSelf(), ClusterEvent.MemberUp.class);
        }

        @Override
        public void postStop() throws Exception {
            cluster.unsubscribe(getSelf());
        }

        /**
         * Listening {@link ClusterEvent.MemberUp} and {@link ClusterEvent.CurrentClusterState}
         * cluster event, when event send from the member of {@link WorkersListener} then tell
         * the sender to register self.
         */
        @Override
        public void onReceive(Object message) throws Throwable {
            if (message instanceof ClusterEvent.CurrentClusterState) {
                ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState) message;
                for (Member member : state.getMembers()) {
                    if (member.status().equals(MemberStatus.up())) {
                        register(member);
                    }
                }
            } else if (message instanceof ClusterEvent.MemberUp) {
                ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp) message;
                logger.info("receive ClusterEvent.MemberUp message, address: %s", memberUp.member().address().toString());
                register(memberUp.member());
            } else {
                logger.debug("worker class: %s, message class: %s", this.getClass().getName(), message.getClass().getName());
                ownerWorker.work(message);
            }
        }

        /**
         * When member role is {@link WorkersListener#WorkName} then Select actor from context
         * and send register message to {@link WorkersListener}
         *
         * @param member is the new created or restart worker
         */
        void register(Member member) {
            if (member.hasRole(WorkersListener.WorkName)) {
                WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(ownerWorker.getRole());
                logger.info("member address: %s, worker path: %s", member.address().toString(), getSelf().path().toString());
                getContext().actorSelection(member.address() + "/user/" + WorkersListener.WorkName).tell(registerMessage, getSelf());
            }
        }
    }
}
+30 −0
Original line number Diff line number Diff line
package com.a.eye.skywalking.collector.actor;

import akka.actor.ActorRef;
import akka.actor.Props;

import java.lang.reflect.Constructor;

/**
 * @author pengys5
 */
public abstract class AbstractClusterWorkerProvider<T extends AbstractClusterWorker> extends AbstractWorkerProvider<T> {

    public abstract int workerNum();

    @Override
    final public WorkerRef create(ClusterWorkerContext clusterContext, LocalWorkerContext localContext) throws Exception {
        int num = ClusterWorkerRefCounter.INSTANCE.incrementAndGet(role());

        Constructor workerConstructor = workerClass().getDeclaredConstructor(new Class<?>[]{Role.class, ClusterWorkerContext.class});
        workerConstructor.setAccessible(true);
        T clusterWorker = (T) workerConstructor.newInstance(role(), clusterContext);
        clusterWorker.preStart();

        ActorRef actorRef = clusterContext.getAkkaSystem().actorOf(Props.create(AbstractClusterWorker.WorkerWithAkka.class, clusterWorker), role() + "_" + num);

        ClusterWorkerRef workerRef = new ClusterWorkerRef(actorRef, role());
        clusterContext.put(workerRef);
        return workerRef;
    }
}
Loading