Commit baf542a7 authored by pengys5's avatar pengys5
Browse files

refactor ClusterContext and LocalContext implements Lookup interface that let...

refactor ClusterContext and LocalContext implements Lookup interface that let the user of context just use lookup or findprovider method.
parent 934aa6bb
Loading
Loading
Loading
Loading
+7 −6
Original line number Diff line number Diff line
@@ -21,16 +21,15 @@ public class CollectorSystem {

    private ClusterWorkerContext clusterContext;

    public ClusterWorkerContext getClusterContext() {
    public LookUp getClusterContext() {
        return clusterContext;
    }

    public void boot() throws Exception {
    public void boot() throws UsedRoleNameException, ProviderNotFountException {
        createAkkaSystem();
        createListener();
        loadLocalProviders();

        createClusterWorker();
        createClusterWorkers();
    }

    public void terminate() {
@@ -55,12 +54,13 @@ public class CollectorSystem {
        clusterContext.getAkkaSystem().actorOf(Props.create(WorkersListener.class, clusterContext), WorkersListener.WorkName);
    }

    private void createClusterWorker() throws Exception {
    private void createClusterWorkers() throws ProviderNotFountException {
        ServiceLoader<AbstractClusterWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractClusterWorkerProvider.class);
        for (AbstractClusterWorkerProvider provider : clusterServiceLoader) {
            logger.info("create {%s} worker using java service loader", provider.workerNum());
            provider.setClusterContext(clusterContext);
            for (int i = 1; i <= provider.workerNum(); i++) {
                provider.create(clusterContext, new LocalWorkerContext());
                provider.create(AbstractWorker.noOwner());
            }
        }
    }
@@ -68,6 +68,7 @@ public class CollectorSystem {
    private void loadLocalProviders() throws UsedRoleNameException {
        ServiceLoader<AbstractLocalWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractLocalWorkerProvider.class);
        for (AbstractLocalWorkerProvider provider : clusterServiceLoader) {
            provider.setClusterContext(clusterContext);
            clusterContext.putProvider(provider);
        }
    }
+4 −4
Original line number Diff line number Diff line
@@ -11,16 +11,16 @@ public abstract class AbstractClusterWorkerProvider<T extends AbstractClusterWor
    public abstract int workerNum();

    @Override
    final public WorkerRef onCreate(ClusterWorkerContext clusterContext, LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFountException {
    final public WorkerRef onCreate(LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFountException {
        int num = ClusterWorkerRefCounter.INSTANCE.incrementAndGet(role());

        T clusterWorker = (T) workerInstance(clusterContext);
        T clusterWorker = (T) workerInstance(getClusterContext());
        clusterWorker.preStart();

        ActorRef actorRef = clusterContext.getAkkaSystem().actorOf(Props.create(AbstractClusterWorker.WorkerWithAkka.class, clusterWorker), role() + "_" + num);
        ActorRef actorRef = getClusterContext().getAkkaSystem().actorOf(Props.create(AbstractClusterWorker.WorkerWithAkka.class, clusterWorker), role() + "_" + num);

        ClusterWorkerRef workerRef = new ClusterWorkerRef(actorRef, role());
        clusterContext.put(workerRef);
        getClusterContext().put(workerRef);
        return workerRef;
    }
}
+7 −3
Original line number Diff line number Diff line
@@ -14,8 +14,8 @@ public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAs
    public abstract int queueSize();

    @Override
    final public WorkerRef onCreate(ClusterWorkerContext clusterContext, LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFountException {
        T localAsyncWorker = (T) workerInstance(clusterContext);
    final public WorkerRef onCreate(LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFountException {
        T localAsyncWorker = (T) workerInstance(getClusterContext());
        localAsyncWorker.preStart();

        // Specify the size of the ring buffer, must be power of 2.
@@ -37,7 +37,11 @@ public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAs
        disruptor.start();

        LocalAsyncWorkerRef workerRef = new LocalAsyncWorkerRef(role(), disruptorWorker);

        if (localContext != null) {
            localContext.put(workerRef);
        }

        return workerRef;
    }
}
+6 −0
Original line number Diff line number Diff line
@@ -7,4 +7,10 @@ public abstract class AbstractLocalSyncWorker extends AbstractLocalWorker {
    public AbstractLocalSyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
        super(role, clusterContext, selfContext);
    }

    @Override
    final public void work(Object message) throws Exception {
    }

    public abstract Object onWork(Object message) throws Exception;
}
+6 −3
Original line number Diff line number Diff line
@@ -6,12 +6,15 @@ package com.a.eye.skywalking.collector.actor;
public abstract class AbstractLocalSyncWorkerProvider<T extends AbstractLocalSyncWorker> extends AbstractLocalWorkerProvider<T> {

    @Override
    final public WorkerRef onCreate(ClusterWorkerContext clusterContext, LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFountException {
        T localSyncWorker = (T) workerInstance(clusterContext);
    final public WorkerRef onCreate(LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFountException {
        T localSyncWorker = (T) workerInstance(getClusterContext());
        localSyncWorker.preStart();

        LocalSyncWorkerRef workerRef = new LocalSyncWorkerRef(role(), localSyncWorker);

        if (localContext != null) {
            localContext.put(workerRef);
        }
        return workerRef;
    }
}
Loading