Commit 4a8e175e authored by terrymanu's avatar terrymanu
Browse files

refactor listener for sharding-opentracing

parent 71920933
Loading
Loading
Loading
Loading
+0 −12
Original line number Diff line number Diff line
@@ -17,8 +17,6 @@

package io.shardingsphere.core.event;

import com.google.common.base.Optional;
import lombok.AccessLevel;
import lombok.Getter;

import java.util.UUID;
@@ -35,7 +33,6 @@ public class ShardingEvent {
    
    private ShardingEventType eventType = ShardingEventType.BEFORE_EXECUTE;
    
    @Getter(AccessLevel.NONE)
    private Exception exception;
    
    /**
@@ -54,13 +51,4 @@ public class ShardingEvent {
        eventType = ShardingEventType.EXECUTE_FAILURE;
        exception = cause;
    }
    
    /**
     * Get exception.
     * 
     * @return exception
     */
    public final Optional<? extends Exception> getException() {
        return Optional.fromNullable(exception);
    }
}
+1 −3
Original line number Diff line number Diff line
@@ -17,7 +17,6 @@

package io.shardingsphere.core.executor.fixture;

import com.google.common.base.Preconditions;
import io.shardingsphere.core.event.ShardingEventType;
import io.shardingsphere.core.executor.event.ExecutionEvent;
import io.shardingsphere.core.executor.event.OverallExecutionEvent;
@@ -49,9 +48,8 @@ public final class ExecutorTestUtil {
            eventCaller.verifySQLType(((OverallExecutionEvent) event).getSqlType());
            eventCaller.verifyIsParallelExecute(((OverallExecutionEvent) event).isParallelExecute());
        }
        Preconditions.checkState((ShardingEventType.EXECUTE_FAILURE == event.getEventType()) == event.getException().isPresent());
        if (ShardingEventType.EXECUTE_FAILURE == event.getEventType()) {
            eventCaller.verifyException(event.getException().get());
            eventCaller.verifyException(event.getException());
        }
    }
    
+14 −18
Original line number Diff line number Diff line
@@ -41,20 +41,20 @@ public abstract class ExecuteEventListener extends TracingListener<SQLExecutionE

    private static final String OPERATION_NAME_PREFIX = "/SHARDING-SPHERE/EXECUTE/";

    private final ThreadLocal<ActiveSpan> trunkContainer = new ThreadLocal<>();
    private final ThreadLocal<ActiveSpan> trunkSpan = new ThreadLocal<>();
    
    private final ThreadLocal<Span> branchContainer = new ThreadLocal<>();
    private final ThreadLocal<Span> branchSpan = new ThreadLocal<>();
    
    private final ThreadLocal<ActiveSpan> trunkInBranchContainer = new ThreadLocal<>();
    private final ThreadLocal<ActiveSpan> trunkInBranchSpan = new ThreadLocal<>();
    
    @Override
    protected final void beforeExecute(final SQLExecutionEvent event) {
        Tracer tracer = ShardingTracer.get();
        if (ExecutorDataMap.getDataMap().containsKey(SNAPSHOT_DATA_KEY) && null == trunkContainer.get() && null == branchContainer.get()) {
            trunkInBranchContainer.set(((ActiveSpan.Continuation) ExecutorDataMap.getDataMap().get(SNAPSHOT_DATA_KEY)).activate());
        if (ExecutorDataMap.getDataMap().containsKey(SNAPSHOT_DATA_KEY) && null == trunkSpan.get() && null == branchSpan.get()) {
            trunkInBranchSpan.set(((ActiveSpan.Continuation) ExecutorDataMap.getDataMap().get(SNAPSHOT_DATA_KEY)).activate());
        }
        if (null == branchContainer.get()) {
            branchContainer.set(tracer.buildSpan(OPERATION_NAME_PREFIX + getOperation()).withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
        if (null == branchSpan.get()) {
            branchSpan.set(tracer.buildSpan(OPERATION_NAME_PREFIX + getOperation()).withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
                    .withTag(Tags.PEER_HOSTNAME.getKey(), event.getDataSource()).withTag(Tags.COMPONENT.getKey(), LocalTags.COMPONENT_NAME)
                    .withTag(Tags.DB_INSTANCE.getKey(), event.getDataSource()).withTag(Tags.DB_TYPE.getKey(), "sql")
                    .withTag(LocalTags.DB_BIND_VARIABLES.getKey(), event.getParameters().isEmpty() ? "" : Joiner.on(",").join(event.getParameters()))
@@ -64,25 +64,21 @@ public abstract class ExecuteEventListener extends TracingListener<SQLExecutionE
    
    @Override
    protected final void tracingFinish() {
        if (null == branchContainer.get()) {
        if (null == branchSpan.get()) {
            return;
        }
        branchContainer.get().finish();
        branchContainer.remove();
        if (null == trunkInBranchContainer.get()) {
        branchSpan.get().finish();
        branchSpan.remove();
        if (null == trunkInBranchSpan.get()) {
            return;
        }
        trunkInBranchContainer.get().deactivate();
        trunkInBranchContainer.remove();
        trunkInBranchSpan.get().deactivate();
        trunkInBranchSpan.remove();
    }
    
    @Override
    protected final void tracingFailure(final SQLExecutionEvent event) {
        Span span = branchContainer.get();
        span.setTag(Tags.ERROR.getKey(), true);
        if (event.getException().isPresent()) {
            span.log(System.currentTimeMillis(), log(event.getException().get()));
        }
        branchSpan.get().setTag(Tags.ERROR.getKey(), true).log(System.currentTimeMillis(), log(event.getException()));
    }
    
    protected abstract String getOperation();
+6 −12
Original line number Diff line number Diff line
@@ -20,7 +20,6 @@ package io.shardingsphere.opentracing.listener.execution;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import io.opentracing.ActiveSpan;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.shardingsphere.core.executor.event.OverallExecutionEvent;
import io.shardingsphere.core.executor.threadlocal.ExecutorDataMap;
@@ -41,7 +40,7 @@ public final class OverallExecuteEventListener extends TracingListener<OverallEx

    private static final String OPERATION_NAME_PREFIX = "/SHARDING-SPHERE/EXECUTE/";

    private final ThreadLocal<ActiveSpan> trunkContainer = new ThreadLocal<>();
    private final ThreadLocal<ActiveSpan> span = new ThreadLocal<>();
    
    /**
     * Listen overall sql execution event.
@@ -56,9 +55,8 @@ public final class OverallExecuteEventListener extends TracingListener<OverallEx
    
    @Override
    protected void beforeExecute(final OverallExecutionEvent event) {
        Tracer tracer = ShardingTracer.get();
        ActiveSpan activeSpan = tracer.buildSpan(OPERATION_NAME_PREFIX + event.getSqlType().name()).withTag(Tags.COMPONENT.getKey(), LocalTags.COMPONENT_NAME).startActive();
        trunkContainer.set(activeSpan);
        ActiveSpan activeSpan = ShardingTracer.get().buildSpan(OPERATION_NAME_PREFIX + event.getSqlType().name()).withTag(Tags.COMPONENT.getKey(), LocalTags.COMPONENT_NAME).startActive();
        span.set(activeSpan);
        if (event.isParallelExecute()) {
            ExecutorDataMap.getDataMap().put(SNAPSHOT_DATA_KEY, activeSpan.capture());
        }
@@ -66,16 +64,12 @@ public final class OverallExecuteEventListener extends TracingListener<OverallEx
    
    @Override
    protected void tracingFinish() {
        trunkContainer.get().deactivate();
        trunkContainer.remove();
        span.get().deactivate();
        span.remove();
    }
    
    @Override
    protected void tracingFailure(final OverallExecutionEvent event) {
        ActiveSpan activeSpan = trunkContainer.get();
        activeSpan.setTag(Tags.ERROR.getKey(), true);
        if (event.getException().isPresent()) {
            activeSpan.log(System.currentTimeMillis(), log(event.getException().get()));
        }
        span.get().setTag(Tags.ERROR.getKey(), true).log(System.currentTimeMillis(), log(event.getException()));
    }
}
+5 −14
Original line number Diff line number Diff line
@@ -20,7 +20,6 @@ package io.shardingsphere.opentracing.listener.merger;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import io.opentracing.ActiveSpan;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.shardingsphere.core.merger.event.MergeEvent;
import io.shardingsphere.opentracing.ShardingTracer;
@@ -36,7 +35,7 @@ public final class MergeEventListener extends TracingListener<MergeEvent> {
    
    private static final String OPERATION_NAME_PREFIX = "/SHARDING-SPHERE/MERGE/";
    
    private final ThreadLocal<ActiveSpan> spanContainer = new ThreadLocal<>();
    private final ThreadLocal<ActiveSpan> span = new ThreadLocal<>();
    
    /**
     * Listen result set merge event.
@@ -51,25 +50,17 @@ public final class MergeEventListener extends TracingListener<MergeEvent> {
    
    @Override
    protected void beforeExecute(final MergeEvent event) {
        Tracer tracer = ShardingTracer.get();
        ActiveSpan activeSpan = tracer.buildSpan(OPERATION_NAME_PREFIX)
                .withTag(Tags.COMPONENT.getKey(), LocalTags.COMPONENT_NAME)
                .startActive();
        spanContainer.set(activeSpan);
        span.set(ShardingTracer.get().buildSpan(OPERATION_NAME_PREFIX).withTag(Tags.COMPONENT.getKey(), LocalTags.COMPONENT_NAME).startActive());
    }
    
    @Override
    protected void tracingFinish() {
        spanContainer.get().deactivate();
        spanContainer.remove();
        span.get().deactivate();
        span.remove();
    }
    
    @Override
    protected void tracingFailure(final MergeEvent event) {
        ActiveSpan activeSpan = spanContainer.get();
        activeSpan.setTag(Tags.ERROR.getKey(), true);
        if (event.getException().isPresent()) {
            activeSpan.log(System.currentTimeMillis(), log(event.getException().get()));
        }
        span.get().setTag(Tags.ERROR.getKey(), true).log(System.currentTimeMillis(), log(event.getException()));
    }
}
Loading