Commit 99c11b5c authored by ascrutae's avatar ascrutae
Browse files

add exception handler for disruptorQueue

parent 8472f5c7
Loading
Loading
Loading
Loading
+19 −0
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@

package org.skywalking.apm.collector.queue.disruptor;

import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import org.skywalking.apm.collector.core.queue.DaemonThreadFactory;
@@ -25,12 +26,16 @@ import org.skywalking.apm.collector.core.queue.MessageHolder;
import org.skywalking.apm.collector.core.queue.QueueCreator;
import org.skywalking.apm.collector.core.queue.QueueEventHandler;
import org.skywalking.apm.collector.core.queue.QueueExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author peng-yongsheng
 */
public class DisruptorQueueCreator implements QueueCreator {

    private final Logger logger = LoggerFactory.getLogger(DisruptorQueueCreator.class);

    @Override public QueueEventHandler create(int queueSize, QueueExecutor executor) {
        // Specify the size of the ring buffer, must be power of 2.
        if (!((((queueSize - 1) & queueSize) == 0) && queueSize != 0)) {
@@ -40,6 +45,20 @@ public class DisruptorQueueCreator implements QueueCreator {
        // Construct the Disruptor
        Disruptor<MessageHolder> disruptor = new Disruptor(MessageHolderFactory.INSTANCE, queueSize, DaemonThreadFactory.INSTANCE);

        disruptor.setDefaultExceptionHandler(new ExceptionHandler<MessageHolder>() {
            @Override public void handleEventException(Throwable ex, long sequence, MessageHolder event) {
                logger.error("handler message error! message: {}.", event.getMessage(), ex);
            }

            @Override public void handleOnStartException(Throwable ex) {
                logger.error("create disruptor failed!", ex);
            }

            @Override public void handleOnShutdownException(Throwable ex) {
                logger.error("shutdown disruptor failed!", ex);
            }
        });

        RingBuffer<MessageHolder> ringBuffer = disruptor.getRingBuffer();
        DisruptorEventHandler eventHandler = new DisruptorEventHandler(ringBuffer, executor);