Commit 3ec54e3f authored by pengys5's avatar pengys5
Browse files

add async member use Disruptor queue

parent 082bcbc2
Loading
Loading
Loading
Loading
+3 −4
Original line number Diff line number Diff line
@@ -13,7 +13,6 @@
    <packaging>jar</packaging>

    <properties>
        <project.spring.version>4.1.6.RELEASE</project.spring.version>
    </properties>

    <dependencies>
@@ -23,9 +22,9 @@
            <version>1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>${project.spring.version}</version>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.3.6</version>
        </dependency>
        <dependency>
            <groupId>com.a.eye</groupId>
+61 −0
Original line number Diff line number Diff line
package com.a.eye.skywalking.collector.actor;

import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.DaemonThreadFactory;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

import java.lang.reflect.Constructor;

/**
 * @author pengys5
 */
public abstract class AbstractASyncMemberProvider<T extends EventHandler> {

    private RingBuffer<MessageHolder> ringBuffer;

    public abstract Class memberClass();

    public T createWorker(EventFactory eventFactory, ActorRef actorRef) throws Exception {
        if (memberClass() == null) {
            throw new IllegalArgumentException("cannot createInstance() with nothing obtained from memberClass()");
        }

        Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class[]{ActorRef.class});
        memberConstructor.setAccessible(true);
        T member = (T) memberConstructor.newInstance(actorRef);

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;
        // Construct the Disruptor
        Disruptor<MessageHolder> disruptor = new Disruptor<MessageHolder>(eventFactory, bufferSize, DaemonThreadFactory.INSTANCE);
        // Connect the handler
        disruptor.handleEventsWith(member);
        // Start the Disruptor, starts all threads running
        disruptor.start();
        // Get the ring buffer from the Disruptor to be used for publishing.
        ringBuffer = disruptor.getRingBuffer();
        return member;
    }

    public void onData(MessageHolder message) {
        long sequence = ringBuffer.next();
        try {
            ringBuffer.get(sequence).setMessage(message);
        } finally {
            ringBuffer.publish(sequence);
        }
    }

    /**
     * Use {@link #memberClass()} method returned class's simple name as a role name.
     *
     * @return is role of Worker
     */
    protected String roleName() {
        return memberClass().getSimpleName();
    }
}
+18 −15
Original line number Diff line number Diff line
@@ -3,41 +3,43 @@ package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.cluster.WorkersRefCenter;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.EventHandler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;

/**
 * @author pengys5
 */
public abstract class AbstractMember<T> {
public abstract class AbstractMember<T> implements EventHandler<MessageHolder<T>> {

    private MemberSystem memberSystem;
    private Logger logger = LogManager.getFormatterLogger(AbstractMember.class);

    private ActorRef actorRef;

    public MemberSystem memberContext() {
        return memberSystem;
    }

    public ActorRef getSelf() {
        return actorRef;
    }

    public AbstractMember(MemberSystem memberSystem, ActorRef actorRef) {
        this.memberSystem = memberSystem;
    public AbstractMember(ActorRef actorRef) {
        this.actorRef = actorRef;
    }


    public abstract void preStart() throws Exception;

    /**
     * Receive the message to analyse.
     *
     * @param message is the data send from the forward worker
     * @throws Throwable is the exception thrown by that worker implementation processing
     * @throws Exception is the exception thrown by that worker implementation processing
     */
    public abstract void receive(Object message) throws Throwable;
    public abstract void receive(Object message) throws Exception;

    public void onEvent(MessageHolder<T> event, long sequence, boolean endOfBatch) throws Exception {
        T message = event.getMessage();
        event.reset();
        receive(message);
    }

    /**
     * Send analysed data to next Worker.
@@ -45,9 +47,10 @@ public abstract class AbstractMember<T> {
     * @param targetWorkerProvider is the worker provider to create worker instance.
     * @param selector             is the selector to select a same role worker instance form cluster.
     * @param message              is the data used to send to next worker.
     * @throws Throwable
     * @throws Exception
     */
    public void tell(AbstractWorkerProvider targetWorkerProvider, WorkerSelector selector, T message) throws Throwable {
    public void tell(AbstractWorkerProvider targetWorkerProvider, WorkerSelector selector, T message) throws Exception {
        logger.debug("worker provider: %s ,role name: %s", targetWorkerProvider.getClass().getName(), targetWorkerProvider.roleName());
        List<WorkerRef> availableWorks = WorkersRefCenter.INSTANCE.availableWorks(targetWorkerProvider.roleName());
        selector.select(availableWorks, message).tell(message, getSelf());
    }
+6 −6
Original line number Diff line number Diff line
@@ -7,19 +7,19 @@ import java.lang.reflect.Constructor;
/**
 * @author pengys5
 */
public abstract class AbstractMemberProvider {
public abstract class AbstractSyncMemberProvider<T> {

    public abstract Class memberClass();

    public void createWorker(MemberSystem system, ActorRef actorRef) throws Exception {
    public T createWorker(ActorRef actorRef) throws Exception {
        if (memberClass() == null) {
            throw new IllegalArgumentException("cannot createInstance() with nothing obtained from memberClass()");
        }

        Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class[]{MemberSystem.class, ActorRef.class});
        Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class[]{ActorRef.class});
        memberConstructor.setAccessible(true);
        AbstractMember member = (AbstractMember) memberConstructor.newInstance(system, actorRef);
        member.preStart();
        system.memberOf(member, roleName());
        T member = (T) memberConstructor.newInstance(actorRef);
        return member;
    }

    /**
+10 −16
Original line number Diff line number Diff line
package com.a.eye.skywalking.collector.actor;

import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
@@ -11,7 +11,6 @@ import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.a.eye.skywalking.collector.cluster.WorkersRefCenter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import scala.Option;

import java.util.List;

@@ -43,18 +42,17 @@ public abstract class AbstractWorker<T> extends UntypedActor {

    private Logger logger = LogManager.getFormatterLogger(AbstractWorker.class);

    private MemberSystem memberSystem = new MemberSystem();
    private Cluster cluster = Cluster.get(getContext().system());

    @Override
    public void preStart() throws Exception {
        super.preStart();
        cluster.subscribe(getSelf(), ClusterEvent.MemberUp.class);
        register();
    }

    @Override
    public void preRestart(Throwable reason, Option<Object> message) throws Exception {
        super.preRestart(reason, message);
        register();
    public void postStop() throws Exception {
        cluster.unsubscribe(getSelf());
    }

    /**
@@ -81,11 +79,11 @@ public abstract class AbstractWorker<T> extends UntypedActor {
                }
            }
        } else if (message instanceof ClusterEvent.MemberUp) {
            logger.info("receive ClusterEvent.MemberUp message");
            ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp) message;
            logger.info("receive ClusterEvent.MemberUp message, address: %s", memberUp.member().address().toString());
            register(memberUp.member());
        } else {
            logger.info("message class: %s", message.getClass().getName());
            logger.debug("worker class: %s, message class: %s", this.getClass().getName(), message.getClass().getName());
            receive(message);
        }
    }
@@ -110,9 +108,9 @@ public abstract class AbstractWorker<T> extends UntypedActor {
     * @param member is the new created or restart worker
     */
    void register(Member member) {
        System.out.println("register");
        if (member.getRoles().equals(WorkersListener.WorkName)) {
        if (member.hasRole(WorkersListener.WorkName)) {
            WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(getClass().getSimpleName());
            logger.info("member address: %s, worker path: %s", member.address().toString(), getSelf().path().toString());
            getContext().actorSelection(member.address() + "/user/" + WorkersListener.WorkName).tell(registerMessage, getSelf());
        }
    }
@@ -121,8 +119,4 @@ public abstract class AbstractWorker<T> extends UntypedActor {
        WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(getClass().getSimpleName());
        getContext().actorSelection("/user/" + WorkersListener.WorkName).tell(registerMessage, getSelf());
    }

    public MemberSystem memberContext() {
        return memberSystem;
    }
}
Loading