Commit 05d9c0a9 authored by Stalary's avatar Stalary Committed by 吴晟
Browse files

MOD: Modify compatibility of kafka plugin and expand operationName (#3390)

* ADD: add operationName length threshold

* MOD:move operationName threshold to Config, simplified code

* MOD:update agent set-up document, clean code

* MOD:add agent.operation_name_threshold conf prefix

* MOD: Modify compatibility of kafka plugin and expand operationName

* MOD: Delete localSpan layer, modify consumer operationName, delete producer key

* MOD: Modify operationName, reduce String operation

* FIX: Fix callback break, the new version of kafka callback is intercepted twice and needs to be judged

* FIX: Fix different problems with KafkaProducer constructor in old and new versions

* MOD: separate code
parent b0d35981
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -25,7 +25,7 @@
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>apm-kafka-v1-plugin</artifactId>
    <artifactId>apm-kafka-plugin</artifactId>

    <properties>
        <kafka-clients.version>0.11.0.0</kafka-clients.version>
+27 −13
Original line number Diff line number Diff line
@@ -16,25 +16,35 @@
 *
 */

package org.apache.skywalking.apm.plugin.kafka.v1;
package org.apache.skywalking.apm.plugin.kafka;

import java.lang.reflect.Method;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;

import java.lang.reflect.Method;

/**
 * @author zhang xin, stalary
 **/
public class CallbackInterceptor implements InstanceMethodsAroundInterceptor {

    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                             MethodInterceptResult result) throws Throwable {
        AbstractSpan abstractSpan = ContextManager.createLocalSpan("Producer/Callback");

        //Get the SnapshotContext
        ContextSnapshot contextSnapshot = (ContextSnapshot) objInst.getSkyWalkingDynamicField();
        if (null != contextSnapshot) {
            RecordMetadata metadata = (RecordMetadata) allArguments[0];
            AbstractSpan activeSpan = ContextManager.createLocalSpan("Kafka/Producer/Callback");
            activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
            Tags.MQ_TOPIC.set(activeSpan, metadata.topic());
            ContextManager.continued(contextSnapshot);
        }
    }
@@ -42,15 +52,19 @@ public class CallbackInterceptor implements InstanceMethodsAroundInterceptor {
    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                              Object ret) throws Throwable {
        ContextSnapshot contextSnapshot = (ContextSnapshot) objInst.getSkyWalkingDynamicField();
        if (null != contextSnapshot) {
            Exception exceptions = (Exception) allArguments[1];
            if (exceptions != null) {
                ContextManager.activeSpan().errorOccurred().log(exceptions);
            }
            ContextManager.stopSpan();
        }
        return ret;
    }

    @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                      Class<?>[] argumentsTypes, Throwable t) {
        ContextManager.activeSpan().errorOccurred().log(t);
    }
+5 −1
Original line number Diff line number Diff line
@@ -16,12 +16,15 @@
 *
 */

package org.apache.skywalking.apm.plugin.kafka.v1;
package org.apache.skywalking.apm.plugin.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;

/**
 * @author zhang xin, stalary
 **/
public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor {

    @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
@@ -29,6 +32,7 @@ public class ConsumerConstructorInterceptor implements InstanceConstructorInterc
        // set the bootstrap server address
        ConsumerEnhanceRequiredInfo requiredInfo = new ConsumerEnhanceRequiredInfo();
        requiredInfo.setBrokerServers(config.getList("bootstrap.servers"));
        requiredInfo.setGroupId(config.getString("group.id"));
        objInst.setSkyWalkingDynamicField(requiredInfo);
    }
}
+10 −1
Original line number Diff line number Diff line
@@ -16,7 +16,7 @@
 *
 */

package org.apache.skywalking.apm.plugin.kafka.v1;
package org.apache.skywalking.apm.plugin.kafka;

import java.util.Collection;
import java.util.List;
@@ -25,6 +25,7 @@ import org.apache.skywalking.apm.util.StringUtil;
public class ConsumerEnhanceRequiredInfo {
    private String brokerServers;
    private String topics;
    private String groupId;
    private long startTime;

    public void setBrokerServers(List<String> brokerServers) {
@@ -35,6 +36,10 @@ public class ConsumerEnhanceRequiredInfo {
        this.topics = StringUtil.join(';', topics.toArray(new String[0]));
    }

    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }

    public String getBrokerServers() {
        return brokerServers;
    }
@@ -43,6 +48,10 @@ public class ConsumerEnhanceRequiredInfo {
        return topics;
    }

    public String getGroupId() {
        return groupId;
    }

    public void setStartTime(long startTime) {
        this.startTime = startTime;
    }
+4 −4
Original line number Diff line number Diff line
@@ -16,7 +16,7 @@
 *
 */

package org.apache.skywalking.apm.plugin.kafka.v1;
package org.apache.skywalking.apm.plugin.kafka;

import java.lang.reflect.Method;
import java.util.Iterator;
@@ -37,12 +37,12 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInt
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;

/**
 * @author  zhang xin
 * @author  zhang xin, stalary
 */
public class KafkaConsumerInterceptor implements InstanceMethodsAroundInterceptor {

    public static final String OPERATE_NAME_PREFIX = "Kafka/";
    public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer";
    public static final String CONSUMER_OPERATE_NAME = "/Consumer/";

    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
@@ -60,7 +60,7 @@ public class KafkaConsumerInterceptor implements InstanceMethodsAroundIntercepto
        //
        if (records.size() > 0) {
            ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo)objInst.getSkyWalkingDynamicField();
            AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME_SUFFIX, null).start(requiredInfo.getStartTime());
            AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME + requiredInfo.getGroupId(), null).start(requiredInfo.getStartTime());

            activeSpan.setComponent(ComponentsDefine.KAFKA_CONSUMER);
            SpanLayer.asMQ(activeSpan);
Loading