Commit 7970dce9 authored by pengys5's avatar pengys5
Browse files

refactor the way how to create worker instance

parent c440bdf5
Loading
Loading
Loading
Loading
+6 −11
Original line number Diff line number Diff line
@@ -2,10 +2,7 @@ package com.a.eye.skywalking.collector;

import akka.actor.ActorSystem;
import akka.actor.Props;
import com.a.eye.skywalking.collector.actor.AbstractClusterWorkerProvider;
import com.a.eye.skywalking.collector.actor.AbstractLocalWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.DuplicateProviderException;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
@@ -19,9 +16,7 @@ import java.util.ServiceLoader;
/**
 * @author pengys5
 */
public enum CollectorSystem {
    INSTANCE;

public class CollectorSystem {
    private Logger logger = LogManager.getFormatterLogger(CollectorSystem.class);

    private ClusterWorkerContext clusterContext;
@@ -33,7 +28,7 @@ public enum CollectorSystem {
    public void boot() throws Exception {
        createAkkaSystem();
        createListener();
        createLocalProvider();
        loadLocalProviders();

        createClusterWorker();
    }
@@ -63,14 +58,14 @@ public enum CollectorSystem {
    private void createClusterWorker() throws Exception {
        ServiceLoader<AbstractClusterWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractClusterWorkerProvider.class);
        for (AbstractClusterWorkerProvider provider : clusterServiceLoader) {
            logger.info("create {%s} worker {%s} using java service loader", provider.workerNum(), provider.workerClass().getName());
            logger.info("create {%s} worker using java service loader", provider.workerNum());
            for (int i = 1; i <= provider.workerNum(); i++) {
                provider.create(clusterContext, null);
                provider.create(clusterContext, new LocalWorkerContext());
            }
        }
    }

    private void createLocalProvider() throws DuplicateProviderException {
    private void loadLocalProviders() throws UsedRoleNameException {
        ServiceLoader<AbstractLocalWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractLocalWorkerProvider.class);
        for (AbstractLocalWorkerProvider provider : clusterServiceLoader) {
            clusterContext.putProvider(provider);
+2 −2
Original line number Diff line number Diff line
@@ -15,8 +15,8 @@ import org.apache.logging.log4j.Logger;
 */
public abstract class AbstractClusterWorker extends AbstractWorker {

    public AbstractClusterWorker(Role role, ClusterWorkerContext clusterContext) throws Exception {
        super(role, clusterContext);
    public AbstractClusterWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
        super(role, clusterContext, selfContext);
    }

    static class WorkerWithAkka extends UntypedActor {
+2 −6
Original line number Diff line number Diff line
@@ -3,8 +3,6 @@ package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import akka.actor.Props;

import java.lang.reflect.Constructor;

/**
 * @author pengys5
 */
@@ -13,12 +11,10 @@ public abstract class AbstractClusterWorkerProvider<T extends AbstractClusterWor
    public abstract int workerNum();

    @Override
    final public WorkerRef create(ClusterWorkerContext clusterContext, LocalWorkerContext localContext) throws Exception {
    final public WorkerRef onCreate(ClusterWorkerContext clusterContext, LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFountException {
        int num = ClusterWorkerRefCounter.INSTANCE.incrementAndGet(role());

        Constructor workerConstructor = workerClass().getDeclaredConstructor(new Class<?>[]{Role.class, ClusterWorkerContext.class});
        workerConstructor.setAccessible(true);
        T clusterWorker = (T) workerConstructor.newInstance(role(), clusterContext);
        T clusterWorker = (T) workerInstance(clusterContext);
        clusterWorker.preStart();

        ActorRef actorRef = clusterContext.getAkkaSystem().actorOf(Props.create(AbstractClusterWorker.WorkerWithAkka.class, clusterWorker), role() + "_" + num);
+13 −9
Original line number Diff line number Diff line
@@ -10,8 +10,8 @@ import com.lmax.disruptor.RingBuffer;
 */
public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {

    public AbstractLocalAsyncWorker(Role role, ClusterWorkerContext clusterContext) throws Exception {
        super(role, clusterContext);
    public AbstractLocalAsyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
        super(role, clusterContext, selfContext);
    }

    static class WorkerWithDisruptor implements EventHandler<MessageHolder> {
@@ -19,18 +19,22 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
        private RingBuffer<MessageHolder> ringBuffer;
        private AbstractLocalAsyncWorker asyncWorker;

        private WorkerWithDisruptor(RingBuffer<MessageHolder> ringBuffer, AbstractLocalAsyncWorker asyncWorker) {
        public WorkerWithDisruptor(RingBuffer<MessageHolder> ringBuffer, AbstractLocalAsyncWorker asyncWorker) {
            this.ringBuffer = ringBuffer;
            this.asyncWorker = asyncWorker;
        }

        public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception {
        public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) {
            try {
                Object message = event.getMessage();
                event.reset();
                asyncWorker.work(message);
                if (endOfBatch) {
                    asyncWorker.work(new EndOfBatchCommand());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void tell(Object message) throws Exception {
+7 −12
Original line number Diff line number Diff line
@@ -6,8 +6,6 @@ import com.a.eye.skywalking.collector.queue.MessageHolderFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

import java.lang.reflect.Constructor;

/**
 * @author pengys5
 */
@@ -16,24 +14,21 @@ public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAs
    public abstract int queueSize();

    @Override
    final public WorkerRef create(ClusterWorkerContext clusterContext, LocalWorkerContext localContext) throws Exception {
        validate();

        Constructor workerConstructor = workerClass().getDeclaredConstructor(new Class<?>[]{Role.class, ClusterWorkerContext.class});
        workerConstructor.setAccessible(true);
        T localAsyncWorker = (T) workerConstructor.newInstance(role(), clusterContext);
    final public WorkerRef onCreate(ClusterWorkerContext clusterContext, LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFountException {
        T localAsyncWorker = (T) workerInstance(clusterContext);
        localAsyncWorker.preStart();

        Constructor memberConstructor = AbstractLocalAsyncWorker.WorkerWithDisruptor.class.getDeclaredConstructor(new Class<?>[]{RingBuffer.class, AbstractLocalAsyncWorker.class});
        memberConstructor.setAccessible(true);

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = queueSize();
        if (!((((bufferSize - 1) & bufferSize) == 0) && bufferSize != 0)) {
            throw new IllegalArgumentException("queue size must be power of 2");
        }

        // Construct the Disruptor
        Disruptor<MessageHolder> disruptor = new Disruptor<MessageHolder>(MessageHolderFactory.INSTANCE, bufferSize, DaemonThreadFactory.INSTANCE);

        RingBuffer<MessageHolder> ringBuffer = disruptor.getRingBuffer();
        T.WorkerWithDisruptor disruptorWorker = (T.WorkerWithDisruptor) memberConstructor.newInstance(ringBuffer, localAsyncWorker);
        T.WorkerWithDisruptor disruptorWorker = new T.WorkerWithDisruptor(ringBuffer, localAsyncWorker);

        // Connect the handler
        disruptor.handleEventsWith(disruptorWorker);
Loading