Commit 8b503431 authored by 彭勇升 pengys's avatar 彭勇升 pengys Committed by 吴晟
Browse files

Improve buffer reader speed. (#2188)

* 1. Sleep 500 milliseconds after a batch re-call finish.
2. Re-create register lock index when the system property named debug is setting.
3. Get the register inventory before lock to avoid increment the sequence but not use.
4. Return the exchange flag after all the references and spans in one segment parsed to reduce the number of segment parse.
5. Put the not exchanged segment into a collection then try to exchange no more than 10 times because of the exchange is asynchronous.
6. Cache the segment object to avoid repeated deserialization.

#2185

* #2185
1. Sleep 500 milliseconds after a batch re-call finish.
2. Re-create register lock index when the system property named debug is setting.
3. Get the register inventory before lock to avoid increment the sequence but not use.
parent 9c08c3e2
Loading
Loading
Loading
Loading
+26 −15
Original line number Diff line number Diff line
@@ -70,10 +70,17 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {

        if (sources.size() > 1000 || registerSource.getEndOfBatchContext().isEndOfBatch()) {
            sources.values().forEach(source -> {
                try {
                    RegisterSource dbSource = registerDAO.get(modelName, source.id());
                    if (Objects.nonNull(dbSource)) {
                        if (dbSource.combine(source)) {
                            registerDAO.forceUpdate(modelName, dbSource);
                        }
                    } else {
                        int sequence;
                        if ((sequence = registerLockDAO.tryLockAndIncrement(scope)) != Const.NONE) {
                            try {
                        RegisterSource dbSource = registerDAO.get(modelName, source.id());
                                dbSource = registerDAO.get(modelName, source.id());
                                if (Objects.nonNull(dbSource)) {
                                    if (dbSource.combine(source)) {
                                        registerDAO.forceUpdate(modelName, dbSource);
@@ -90,6 +97,10 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
                        } else {
                            logger.info("{} inventory register try lock and increment sequence failure.", scope.name());
                        }
                    }
                } catch (Throwable t) {
                    logger.error(t.getMessage(), t);
                }
            });
            sources.clear();
        }
+38 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package org.apache.skywalking.oap.server.library.buffer;

import com.google.protobuf.GeneratedMessageV3;
import lombok.*;
import org.apache.skywalking.apm.network.language.agent.TraceSegmentObject;
import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;

/**
 * @author peng-yongsheng
 */
@Getter
public class BufferData<MESSAGE_TYPE extends GeneratedMessageV3> {
    private MESSAGE_TYPE messageType;
    @Setter private TraceSegmentObject v1Segment;
    @Setter private SegmentObject v2Segment;

    public BufferData(MESSAGE_TYPE messageType) {
        this.messageType = messageType;
    }
}
+57 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package org.apache.skywalking.oap.server.library.buffer;

import com.google.protobuf.GeneratedMessageV3;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author peng-yongsheng
 */
public class BufferDataCollection<MESSAGE_TYPE extends GeneratedMessageV3> {

    private AtomicInteger index = new AtomicInteger(0);
    private final List<BufferData<MESSAGE_TYPE>> bufferDataList;

    public BufferDataCollection(int size) {
        this.bufferDataList = new ArrayList<>(size);
        for (int i = 0; i < size; i++) {
            bufferDataList.add(null);
        }
    }

    public void add(BufferData<MESSAGE_TYPE> bufferData) {
        bufferDataList.set(index.getAndIncrement(), bufferData);

    }

    public int size() {
        return index.get();
    }

    public synchronized List<BufferData<MESSAGE_TYPE>> export() {
        List<BufferData<MESSAGE_TYPE>> exportData = new ArrayList<>(index.get());
        for (int i = 0; i < index.get(); i++) {
            exportData.add(bufferDataList.get(i));
        }
        index.set(0);
        return exportData;
    }
}
+2 −1
Original line number Diff line number Diff line
@@ -121,7 +121,8 @@ public class BufferStream<MESSAGE_TYPE extends GeneratedMessageV3> {
            return this;
        }

        public Builder<MESSAGE_TYPE> callBack(DataStreamReader.CallBack<MESSAGE_TYPE> callBack) {
        public Builder<MESSAGE_TYPE> callBack(
            DataStreamReader.CallBack<MESSAGE_TYPE> callBack) {
            this.callBack = callBack;
            return this;
        }
+51 −17
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.library.buffer;

import com.google.protobuf.*;
import java.io.*;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.*;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.PrefixFileFilter;
@@ -38,6 +38,8 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
    private final Offset.ReadOffset readOffset;
    private final Parser<MESSAGE_TYPE> parser;
    private final CallBack<MESSAGE_TYPE> callBack;
    private final int collectionSize = 100;
    private final BufferDataCollection<MESSAGE_TYPE> bufferDataCollection;
    private File readingFile;
    private InputStream inputStream;

@@ -47,6 +49,7 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
        this.readOffset = readOffset;
        this.parser = parser;
        this.callBack = callBack;
        this.bufferDataCollection = new BufferDataCollection<>(collectionSize);
    }

    void initialize() {
@@ -114,25 +117,32 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
            }

            while (readOffset.getOffset() < readingFile.length()) {
                BufferData<MESSAGE_TYPE> bufferData = new BufferData<>(parser.parseDelimitedFrom(inputStream));

                MESSAGE_TYPE messageType = parser.parseDelimitedFrom(inputStream);
                if (messageType != null) {
                    int i = 0;
                    while (!callBack.call(messageType)) {
                        try {
                            TimeUnit.MILLISECONDS.sleep(500);
                        } catch (InterruptedException e) {
                            logger.error(e.getMessage());
                if (bufferData.getMessageType() != null) {
                    boolean isComplete = callBack.call(bufferData);
                    final int serialized = bufferData.getMessageType().getSerializedSize();
                    final int offset = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
                    readOffset.setOffset(readOffset.getOffset() + offset);

                    if (!isComplete) {
                        if (bufferDataCollection.size() == collectionSize) {
                            reCall();
                        }
                        bufferDataCollection.add(bufferData);
                    }

                        i++;
                        if (i == 10) {
                            break;
                    if (logger.isDebugEnabled()) {
                        logger.debug("collection size: {}, max size: {}", bufferDataCollection.size(), collectionSize);
                    }
                } else if (bufferDataCollection.size() > 0) {
                    reCall();
                } else {
                    try {
                        TimeUnit.SECONDS.sleep(5);
                    } catch (InterruptedException e) {
                        logger.error(e.getMessage(), e);
                    }
                    final int serialized = messageType.getSerializedSize();
                    final int offset = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
                    readOffset.setOffset(readOffset.getOffset() + offset);
                }
            }
        } catch (IOException e) {
@@ -140,7 +150,31 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
        }
    }

    private void reCall() {
        int maxCycle = 10;
        for (int i = 1; i <= maxCycle; i++) {
            if (bufferDataCollection.size() > 0) {
                List<BufferData<MESSAGE_TYPE>> bufferDataList = bufferDataCollection.export();
                for (BufferData<MESSAGE_TYPE> data : bufferDataList) {
                    if (!callBack.call(data)) {
                        if (i != maxCycle) {
                            bufferDataCollection.add(data);
                        }
                    }
                }

                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage(), e);
                }
            } else {
                break;
            }
        }
    }

    public interface CallBack<MESSAGE_TYPE extends GeneratedMessageV3> {
        boolean call(MESSAGE_TYPE message);
        boolean call(BufferData<MESSAGE_TYPE> bufferData);
    }
}
Loading