Unverified Commit e80f4d15 authored by 吴晟's avatar 吴晟 Committed by GitHub
Browse files

Merge pull request #571 from ascrutae/feature/support-rocketMQ-plugin

support rocket mq plugin
parents 986700ea 1fa81783
Loading
Loading
Loading
Loading
+5 −1
Original line number Diff line number Diff line
@@ -73,6 +73,9 @@ public class ComponentsDefine {

    public static final OfficialComponent ELASTIC_JOB = new OfficialComponent(24, "ElasticJob");
  
    public static final OfficialComponent ROCKET_MQ = new OfficialComponent(25, "RocketMQ");


    private static ComponentsDefine instance = new ComponentsDefine();

    private String[] components;
@@ -82,7 +85,7 @@ public class ComponentsDefine {
    }

    public ComponentsDefine() {
        components = new String[25];
        components = new String[26];
        addComponent(TOMCAT);
        addComponent(HTTPCLIENT);
        addComponent(DUBBO);
@@ -107,6 +110,7 @@ public class ComponentsDefine {
        addComponent(POSTGRESQL);
        addComponent(GRPC);
        addComponent(ELASTIC_JOB);
        addComponent(ROCKET_MQ);
    }

    private void addComponent(OfficialComponent component) {
+1 −0
Original line number Diff line number Diff line
@@ -52,6 +52,7 @@
        <module>h2-1.x-plugin</module>
        <module>postgresql-8.x-plugin</module>
        <module>oracle-10.x-plugin</module>
        <module>rocketMQ-4.x-plugin</module>
        <module>elastic-job-2.x-plugin</module>
        <module>mongodb-2.x-plugin</module>
    </modules>
+68 −0
Original line number Diff line number Diff line
<?xml version="1.0" encoding="UTF-8"?>
<!--
  ~ Copyright 2017, OpenSkywalking Organization All rights reserved.
  ~
  ~ Licensed under the Apache License, Version 2.0 (the "License");
  ~ you may not use this file except in compliance with the License.
  ~ You may obtain a copy of the License at
  ~
  ~     http://www.apache.org/licenses/LICENSE-2.0
  ~
  ~ Unless required by applicable law or agreed to in writing, software
  ~ distributed under the License is distributed on an "AS IS" BASIS,
  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  ~ See the License for the specific language governing permissions and
  ~ limitations under the License.
  ~
  ~ Project repository: https://github.com/OpenSkywalking/skywalking
  -->

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>apm-sdk-plugin</artifactId>
        <groupId>org.skywalking</groupId>
        <version>3.3.0-2017</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>apm-rocketmq-4.x-plugin</artifactId>
    <name>rocketMQ-4.x-plugin</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.1.0-incubating</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-deploy-plugin</artifactId>
            </plugin>
            <plugin>
                <!-- 源码插件 -->
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-source-plugin</artifactId>
                <!-- 发布时自动将源码同时发布的配置 -->
                <executions>
                    <execution>
                        <id>attach-sources</id>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
+78 −0
Original line number Diff line number Diff line
/*
 * Copyright 2017, OpenSkywalking Organization All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * Project repository: https://github.com/OpenSkywalking/skywalking
 */

package org.skywalking.apm.plugin.rocketMQ.v4;

import java.lang.reflect.Method;
import java.util.List;
import org.apache.rocketmq.common.message.MessageExt;
import org.skywalking.apm.agent.core.context.CarrierItem;
import org.skywalking.apm.agent.core.context.ContextCarrier;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.skywalking.apm.network.trace.component.ComponentsDefine;

/**
 * {@link AbstractMessageConsumeInterceptor} create entry span when the <code>consumeMessage</code> in the {@link
 * org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently} and {@link
 * org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly} class.
 *
 * @author zhangxin
 */
public abstract class AbstractMessageConsumeInterceptor implements InstanceMethodsAroundInterceptor {

    public static final String COMSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";

    @Override
    public final void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
        Class<?>[] argumentsTypes,
        MethodInterceptResult result) throws Throwable {
        List<MessageExt> msgs = (List<MessageExt>)allArguments[0];

        ContextCarrier contextCarrier = getContextCarrierFromMessage(msgs.get(0));
        AbstractSpan span = ContextManager.createEntrySpan(COMSUMER_OPERATION_NAME_PREFIX + msgs.get(0).getTopic() + "/Consumer", contextCarrier);

        span.setComponent(ComponentsDefine.ROCKET_MQ);
        span.setLayer(SpanLayer.MQ);
        for (int i = 1; i < msgs.size(); i++) {
            ContextManager.extract(getContextCarrierFromMessage(msgs.get(i)));
        }

    }

    @Override public final void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
        Class<?>[] argumentsTypes, Throwable t) {
        ContextManager.activeSpan().errorOccurred().log(t);
    }

    private ContextCarrier getContextCarrierFromMessage(MessageExt message) {
        ContextCarrier contextCarrier = new ContextCarrier();

        CarrierItem next = contextCarrier.items();
        while (next.hasNext()) {
            next = next.next();
            next.setHeadValue(message.getUserProperty(next.getHeadKey()));
        }

        return contextCarrier;
    }
}
+50 −0
Original line number Diff line number Diff line
/*
 * Copyright 2017, OpenSkywalking Organization All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * Project repository: https://github.com/OpenSkywalking/skywalking
 */

package org.skywalking.apm.plugin.rocketMQ.v4;

import java.lang.reflect.Method;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.tag.Tags;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;

/**
 * {@link MessageConcurrentlyConsumeInterceptor} set the process status after the {@link
 * org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(java.util.List,
 * org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext)} method execute.
 *
 * @author zhang xin
 */
public class MessageConcurrentlyConsumeInterceptor extends AbstractMessageConsumeInterceptor {

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        Object ret) throws Throwable {
        ConsumeConcurrentlyStatus status = (ConsumeConcurrentlyStatus)ret;
        if (status == ConsumeConcurrentlyStatus.RECONSUME_LATER) {
            AbstractSpan activeSpan = ContextManager.activeSpan();
            activeSpan.errorOccurred();
            Tags.STATUS_CODE.set(activeSpan, status.name());
        }
        ContextManager.stopSpan();
        return ret;
    }
}
Loading