Commit 4196d4e0 authored by terrymanu's avatar terrymanu
Browse files

refactor SamplingService

parent 526846fa
Loading
Loading
Loading
Loading
+8 −2
Original line number Diff line number Diff line
@@ -65,12 +65,18 @@ public final class ShardingTracer {
        init(tracer, 0);
    }
    
    private static void init(final Tracer tracer, final int sampleNumPM) {
    /**
     * Initialize tracer from another one.
     *
     * @param tracer that is delegated
     * @param samplingRatePerMinute sampling rate per minute
     */
    public static void init(final Tracer tracer, final int samplingRatePerMinute) {
        if (GlobalTracer.isRegistered()) {
            return;
        }
        GlobalTracer.register(tracer);
        SamplingService.getInstance().init(sampleNumPM);
        SamplingService.getInstance().init(samplingRatePerMinute);
        new OverallExecuteEventListener().register();
        new DQLExecuteEventListener().register();
        new DMLExecuteEventListener().register();
+1 −1
Original line number Diff line number Diff line
@@ -27,11 +27,11 @@ import lombok.Getter;
 * @author gaohongtao
 * @author wangkai
 */
@Getter
public final class ConfigurationLoader {
    
    private static final ConfigurationParser[] PARSERS = new ConfigurationParser[]{new OpentracingConfigurationParser()};
    
    @Getter
    private final String tracerClassName;
    
    @Getter
+3 −3
Original line number Diff line number Diff line
@@ -66,11 +66,11 @@ public abstract class TracingListener<T extends ShardingEvent> {
    
    protected abstract void tracingFailure(T event);
    
    protected final Map<String, ?> log(final Throwable t) {
    protected final Map<String, ?> log(final Throwable cause) {
        Map<String, String> result = new HashMap<>(3, 1);
        result.put("event", "error");
        result.put("error.kind", t.getClass().getName());
        result.put("message", t.getMessage());
        result.put("error.kind", cause.getClass().getName());
        result.put("message", cause.getMessage());
        return result;
    }
}
+16 −35
Original line number Diff line number Diff line
@@ -17,15 +17,15 @@

package io.shardingsphere.opentracing.sampling;

import io.shardingsphere.core.executor.ShardingThreadFactoryBuilder;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Sampling control service.
@@ -37,11 +37,9 @@ public final class SamplingService {
    
    private static final SamplingService INSTANCE = new SamplingService();
    
    private int sampleNumPM;
    
    private volatile boolean on;
    private int samplingRatePerMinute;
    
    private volatile AtomicInteger samplingCount;
    private volatile AtomicInteger samplingCount = new AtomicInteger(0);
    
    private volatile ScheduledFuture<?> scheduledFuture;
    
@@ -57,57 +55,40 @@ public final class SamplingService {
    /**
     * sampling service init.
     *
     * @param sampleNumPM sampling num in one minutes
     * @param samplingRatePerMinute sampling rate in one minute
     */
    public void init(final int sampleNumPM) {
        this.sampleNumPM = sampleNumPM;
        if (scheduledFuture != null) {
    public void init(final int samplingRatePerMinute) {
        this.samplingRatePerMinute = samplingRatePerMinute;
        if (null != scheduledFuture) {
            scheduledFuture.cancel(true);
        }
        if (this.sampleNumPM > 0) {
            on = true;
            this.resetSamplingFactor();
            ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
                
                private final AtomicInteger threadIndex = new AtomicInteger(0);
                
                @Override
                public Thread newThread(final Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("Sharding-opentracing-sampling" + threadIndex.incrementAndGet());
                    thread.setDaemon(true);
                    return thread;
                }
            });
            scheduledFuture = service.scheduleAtFixedRate(new Runnable() {
        if (samplingRatePerMinute > 0) {
            ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(ShardingThreadFactoryBuilder.build("Opentracing-Sampling-Cleaner"));
            scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                
                @Override
                public void run() {
                    resetSamplingFactor();
                    samplingCount.set(0);
                }
            }, 0, 1, TimeUnit.MINUTES);
        }
    }
    
    /**
     * Is sampling allowed.
     * Judge sampling is allowed or not.
     *
     * @return true, if sampling mechanism is on.
     * @return sampling is allowed or not
     */
    public boolean trySampling() {
        return !on || samplingCount.get() <= sampleNumPM;
        return samplingCount.get() < samplingRatePerMinute;
    }
    
    /**
     * Increase sampling count.
     */
    public void increaseSampling() {
        if (on) {
        if (samplingRatePerMinute > 0) {
            samplingCount.getAndIncrement();
        }
    }
    
    private void resetSamplingFactor() {
        samplingCount = new AtomicInteger(0);
    }
}
+1 −1
Original line number Diff line number Diff line
@@ -61,7 +61,7 @@ public final class ExecuteEventListenerTest {
    
    @BeforeClass
    public static void init() {
        ShardingTracer.init(TRACER);
        ShardingTracer.init(TRACER, 40);
    }
    
    @AfterClass
Loading