Commit 532f4303 authored by Andy Ai's avatar Andy Ai Committed by 吴晟
Browse files

Fixed java agent cannot trace gRPC stream service (#3145)

* Fixed java agent cannot trace gRPC stream service
parent 6e837153
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -36,7 +36,7 @@ import static org.apache.skywalking.apm.plugin.grpc.v1.OperationNameFormatUtil.f
public class AsyncUnaryRequestCallCallInterceptor implements StaticMethodsAroundInterceptor {
    @Override public void beforeMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
        MethodInterceptResult result) {
        BlockingCallClientInterceptor originClientCall = (BlockingCallClientInterceptor)allArguments[0];
        CallClientInterceptor originClientCall = (CallClientInterceptor)allArguments[0];
        Channel channel = originClientCall.getChannel();
        MethodDescriptor methodDescriptor = originClientCall.getMethodDescriptor();

+1 −1
Original line number Diff line number Diff line
@@ -30,7 +30,7 @@ import org.apache.skywalking.apm.agent.core.context.ContextManager;
/**
 * @author zhang xin
 */
public class BlockingCallClientInterceptor extends ForwardingClientCall.SimpleForwardingClientCall {
public class BlockingCallClientInterceptor extends ForwardingClientCall.SimpleForwardingClientCall implements CallClientInterceptor {

    private final MethodDescriptor methodDescriptor;
    private final Channel channel;
+34 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package org.apache.skywalking.apm.plugin.grpc.v1;

import io.grpc.Channel;
import io.grpc.MethodDescriptor;

/**
 * @author AI
 * 2019-07-22
 */
public interface CallClientInterceptor {

    public Channel getChannel();

    public MethodDescriptor getMethodDescriptor();

}
+24 −6
Original line number Diff line number Diff line
@@ -43,14 +43,18 @@ import static org.apache.skywalking.apm.plugin.grpc.v1.OperationNameFormatUtil.f
/**
 * @author zhangxin
 */
public class StreamCallClientInterceptor extends ForwardingClientCall.SimpleForwardingClientCall {
public class StreamCallClientInterceptor extends ForwardingClientCall.SimpleForwardingClientCall implements CallClientInterceptor {

    private final String serviceName;
    private final String remotePeer;
    private final String operationPrefix;
    private final Channel channel;
    private final MethodDescriptor methodDescriptor;

    protected StreamCallClientInterceptor(ClientCall delegate, MethodDescriptor method, Channel channel) {
        super(delegate);
        this.channel = channel;
        this.methodDescriptor = method;
        this.serviceName = formatOperationName(method);
        this.remotePeer = channel.authority();
        this.operationPrefix = OperationNameFormatUtil.formatOperationName(method) + CLIENT;
@@ -72,6 +76,16 @@ public class StreamCallClientInterceptor extends ForwardingClientCall.SimpleForw
        ContextManager.stopSpan();
    }

    @Override
    public Channel getChannel() {
        return channel;
    }

    @Override
    public MethodDescriptor getMethodDescriptor() {
        return methodDescriptor;
    }

    private class CallListener extends ForwardingClientCallListener.SimpleForwardingClientCallListener {

        private final ContextSnapshot contextSnapshot;
@@ -81,15 +95,18 @@ public class StreamCallClientInterceptor extends ForwardingClientCall.SimpleForw
            this.contextSnapshot = contextSnapshot;
        }

        @Override public void onReady() {
        @Override
        public void onReady() {
            delegate().onReady();
        }

        @Override public void onHeaders(Metadata headers) {
        @Override
        public void onHeaders(Metadata headers) {
            delegate().onHeaders(headers);
        }

        @Override public void onMessage(Object message) {
        @Override
        public void onMessage(Object message) {
            try {
                ContextManager.createLocalSpan(operationPrefix + STREAM_RESPONSE_OBSERVER_ON_NEXT_OPERATION_NAME);
                ContextManager.continued(contextSnapshot);
@@ -101,11 +118,12 @@ public class StreamCallClientInterceptor extends ForwardingClientCall.SimpleForw
            }
        }

        @Override public void onClose(Status status, Metadata trailers) {
        @Override
        public void onClose(Status status, Metadata trailers) {
            try {
                if (!status.isOk()) {
                    AbstractSpan abstractSpan = ContextManager.createLocalSpan(operationPrefix + STREAM_RESPONSE_OBSERVER_ON_ERROR_OPERATION_NAME);
                    abstractSpan.errorOccurred().log(status.getCause());
                    abstractSpan.errorOccurred().log(status.asRuntimeException());
                    Tags.STATUS_CODE.set(abstractSpan, status.getCode().name());
                } else {
                    AbstractSpan abstractSpan = ContextManager.createLocalSpan(operationPrefix + STREAM_RESPONSE_OBSERVER_ON_COMPLETE_OPERATION_NAME);