Commit 6b1014c3 authored by 彭勇升 pengys's avatar 彭勇升 pengys Committed by 吴晟
Browse files

Fixed the collector OOM bug. (#1862)

* Fixed the bug of remote client not blocked when not received on complete message, it will carry the out of a memory exception.

* Sleep 10ms, not to sleep max 10ms.

* No more than 10 stream observers are allowed at the same time to send remote message. Otherwise block the remote queue.

* no message
parent 4d17c45e
Loading
Loading
Loading
Loading
+23 −48
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.remote.client;

import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
@@ -40,6 +41,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
    private final GRPCClient client;
    private final DataCarrier<RemoteMessage> carrier;
    private final StreamDataClassGetter streamDataClassGetter;
    private final AtomicInteger concurrentStreamObserverNumber = new AtomicInteger(0);

    public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInstance remoteInstance, int channelSize,
        int bufferSize) {
@@ -67,6 +69,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie

        @Override public void consume(List<RemoteMessage> remoteMessages) {
            StreamObserver<RemoteMessage> streamObserver = createStreamObserver();

            for (RemoteMessage remoteMessage : remoteMessages) {
                streamObserver.onNext(remoteMessage);
            }
@@ -84,65 +87,37 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
    private StreamObserver<RemoteMessage> createStreamObserver() {
        RemoteServiceGrpc.RemoteServiceStub stub = RemoteServiceGrpc.newStub(client.getChannel());

        StreamStatus status = new StreamStatus(false);
        return stub.call(new StreamObserver<Empty>() {
            @Override public void onNext(Empty empty) {
            }
        int sleepTotalMillis = 0;
        int sleepMillis = 10;
        while (concurrentStreamObserverNumber.incrementAndGet() > 10) {
            concurrentStreamObserverNumber.addAndGet(-1);

            @Override public void onError(Throwable throwable) {
                logger.error(throwable.getMessage(), throwable);
            }

            @Override public void onCompleted() {
                status.finished();
            }
        });
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }

    class StreamStatus {

        private final Logger logger = LoggerFactory.getLogger(StreamStatus.class);
            sleepTotalMillis += sleepMillis;

        private volatile boolean status;

        StreamStatus(boolean status) {
            this.status = status;
            if (sleepTotalMillis > 60000) {
                logger.warn("Remote client block times over 60 seconds.");
            }

        public boolean isFinish() {
            return status;
        }

        void finished() {
            this.status = true;
        return stub.call(new StreamObserver<Empty>() {
            @Override public void onNext(Empty empty) {
            }

        /**
         * @param maxTimeout max wait time, milliseconds.
         */
        public void wait4Finish(long maxTimeout) {
            long time = 0;
            while (!status) {
                if (time > maxTimeout) {
                    break;
                }
                try2Sleep(5);
                time += 5;
            }
            @Override public void onError(Throwable throwable) {
                concurrentStreamObserverNumber.addAndGet(-1);
                logger.error(throwable.getMessage(), throwable);
            }

        /**
         * Try to sleep, and ignore the {@link InterruptedException}
         *
         * @param millis the length of time to sleep in milliseconds
         */
        private void try2Sleep(long millis) {
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
            @Override public void onCompleted() {
                concurrentStreamObserverNumber.addAndGet(-1);
            }
        });
    }

    @Override public int compareTo(GRPCRemoteClient o) {