Commit c5b6b57f authored by pengys5's avatar pengys5
Browse files

no message

parent 3ec54e3f
Loading
Loading
Loading
Loading
+33 −0
Original line number Diff line number Diff line
package com.a.eye.skywalking.collector.actor;

import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.RingBuffer;

/**
 * @author pengys5
 */
public abstract class AbstractAsyncMember extends AbstractMember {

    private RingBuffer<MessageHolder> ringBuffer;

    public AbstractAsyncMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
        super(actorRef);
        this.ringBuffer = ringBuffer;
    }

    public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception {
        Object message = event.getMessage();
        event.reset();
        receive(message);
    }

    public void beTold(Object message) throws Exception {
        long sequence = ringBuffer.next();
        try {
            ringBuffer.get(sequence).setMessage(message);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}
+41 −0
Original line number Diff line number Diff line
@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.DaemonThreadFactory;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.EventFactory;
import com.a.eye.skywalking.collector.queue.MessageHolderFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
@@ -13,49 +13,29 @@ import java.lang.reflect.Constructor;
/**
 * @author pengys5
 */
public abstract class AbstractASyncMemberProvider<T extends EventHandler> {
public abstract class AbstractAsyncMemberProvider<T extends EventHandler> extends AbstractMemberProvider<T> {

    private RingBuffer<MessageHolder> ringBuffer;
    public abstract int queueSize();

    public abstract Class memberClass();

    public T createWorker(EventFactory eventFactory, ActorRef actorRef) throws Exception {
    @Override
    public T createWorker(ActorRef actorRef) throws Exception {
        if (memberClass() == null) {
            throw new IllegalArgumentException("cannot createInstance() with nothing obtained from memberClass()");
        }

        Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class[]{ActorRef.class});
        memberConstructor.setAccessible(true);
        T member = (T) memberConstructor.newInstance(actorRef);

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;
        int bufferSize = queueSize();
        // Construct the Disruptor
        Disruptor<MessageHolder> disruptor = new Disruptor<MessageHolder>(eventFactory, bufferSize, DaemonThreadFactory.INSTANCE);
        // Connect the handler
        disruptor.handleEventsWith(member);
        Disruptor<MessageHolder> disruptor = new Disruptor<MessageHolder>(MessageHolderFactory.INSTANCE, bufferSize, DaemonThreadFactory.INSTANCE);
        // Start the Disruptor, starts all threads running
        disruptor.start();
        // Get the ring buffer from the Disruptor to be used for publishing.
        ringBuffer = disruptor.getRingBuffer();
        return member;
    }
        RingBuffer<MessageHolder> ringBuffer = disruptor.start();

    public void onData(MessageHolder message) {
        long sequence = ringBuffer.next();
        try {
            ringBuffer.get(sequence).setMessage(message);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
        Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class<?>[]{RingBuffer.class, ActorRef.class});
        memberConstructor.setAccessible(true);
        T member = (T) memberConstructor.newInstance(ringBuffer, actorRef);

    /**
     * Use {@link #memberClass()} method returned class's simple name as a role name.
     *
     * @return is role of Worker
     */
    protected String roleName() {
        return memberClass().getSimpleName();
        // Connect the handler
        disruptor.handleEventsWith(member);
        return member;
    }
}
+5 −9
Original line number Diff line number Diff line
@@ -13,13 +13,13 @@ import java.util.List;
/**
 * @author pengys5
 */
public abstract class AbstractMember<T> implements EventHandler<MessageHolder<T>> {
public abstract class AbstractMember implements EventHandler<MessageHolder> {

    private Logger logger = LogManager.getFormatterLogger(AbstractMember.class);

    private ActorRef actorRef;

    public ActorRef getSelf() {
    private ActorRef getSelf() {
        return actorRef;
    }

@@ -27,6 +27,8 @@ public abstract class AbstractMember<T> implements EventHandler<MessageHolder<T>
        this.actorRef = actorRef;
    }

    public abstract void beTold(Object message) throws Exception;

    /**
     * Receive the message to analyse.
     *
@@ -35,12 +37,6 @@ public abstract class AbstractMember<T> implements EventHandler<MessageHolder<T>
     */
    public abstract void receive(Object message) throws Exception;

    public void onEvent(MessageHolder<T> event, long sequence, boolean endOfBatch) throws Exception {
        T message = event.getMessage();
        event.reset();
        receive(message);
    }

    /**
     * Send analysed data to next Worker.
     *
@@ -49,7 +45,7 @@ public abstract class AbstractMember<T> implements EventHandler<MessageHolder<T>
     * @param message              is the data used to send to next worker.
     * @throws Exception
     */
    public void tell(AbstractWorkerProvider targetWorkerProvider, WorkerSelector selector, T message) throws Exception {
    public void tell(AbstractWorkerProvider targetWorkerProvider, WorkerSelector selector, Object message) throws Exception {
        logger.debug("worker provider: %s ,role name: %s", targetWorkerProvider.getClass().getName(), targetWorkerProvider.roleName());
        List<WorkerRef> availableWorks = WorkersRefCenter.INSTANCE.availableWorks(targetWorkerProvider.roleName());
        selector.select(availableWorks, message).tell(message, getSelf());
+13 −0
Original line number Diff line number Diff line
package com.a.eye.skywalking.collector.actor;

import akka.actor.ActorRef;

/**
 * @author pengys5
 */
public abstract class AbstractMemberProvider<T> {

    public abstract Class memberClass();

    public abstract T createWorker(ActorRef actorRef) throws Exception;
}
+23 −0
Original line number Diff line number Diff line
package com.a.eye.skywalking.collector.actor;

import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.MessageHolder;

/**
 * @author pengys5
 */
public abstract class AbstractSyncMember extends AbstractMember {

    public AbstractSyncMember(ActorRef actorRef) {
        super(actorRef);
    }

    @Override
    public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception {
    }

    @Override
    public void beTold(Object message) throws Exception {
        receive(message);
    }
}
Loading