Unverified Commit 1b83b344 authored by 吴晟's avatar 吴晟 Committed by GitHub
Browse files

Relied on grpc response for certain, to avoid OOM when collector is overloaded. (#1323)

parent 0f0b56db
Loading
Loading
Loading
Loading
+23 −1
Original line number Diff line number Diff line
@@ -16,13 +16,16 @@
 *
 */


package org.apache.skywalking.apm.agent.core.remote;

import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;

/**
 * @author wusheng
 */
public class GRPCStreamServiceStatus {
    private static final ILog logger = LogManager.getLogger(GRPCStreamServiceStatus.class);
    private volatile boolean status;

    public GRPCStreamServiceStatus(boolean status) {
@@ -52,6 +55,25 @@ public class GRPCStreamServiceStatus {
        return status;
    }

    /**
     * Wait until success status reported.
     */
    public void wait4Finish() {
        long recheckCycle = 5;
        long hasWaited = 0L;
        long maxCycle = 30 * 1000L;// 30 seconds max.
        while (!status) {
            try2Sleep(recheckCycle);
            hasWaited += recheckCycle;

            if (recheckCycle >= maxCycle) {
                logger.warn("Collector traceSegment service doesn't response in {} seconds.", hasWaited);
            } else {
                recheckCycle = recheckCycle * 2 > maxCycle ? maxCycle : recheckCycle * 2;
            }
        }
    }

    /**
     * Try to sleep, and ignore the {@link InterruptedException}
     *
+3 −6
Original line number Diff line number Diff line
@@ -16,11 +16,11 @@
 *
 */


package org.apache.skywalking.apm.agent.core.remote;

import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
@@ -36,8 +36,6 @@ import org.apache.skywalking.apm.network.proto.Downstream;
import org.apache.skywalking.apm.network.proto.TraceSegmentServiceGrpc;
import org.apache.skywalking.apm.network.proto.UpstreamSegment;

import java.util.List;

import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.BUFFER_SIZE;
import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.CHANNEL_SIZE;
import static org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED;
@@ -122,9 +120,8 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
            }
            upstreamSegmentStreamObserver.onCompleted();

            if (status.wait4Finish(TIMEOUT)) {
            status.wait4Finish();
            segmentUplinkedCounter += data.size();
            }
        } else {
            segmentAbandonedCounter += data.size();
        }