Commit 71d27b18 authored by peng-yongsheng's avatar peng-yongsheng
Browse files

Segment standardization

parent f74a5a9c
Loading
Loading
Loading
Loading
+27 −0
Original line number Diff line number Diff line
/*
 * Copyright 2017, OpenSkywalking Organization All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * Project repository: https://github.com/OpenSkywalking/skywalking
 */

package org.skywalking.apm.collector.agent.stream.buffer;

/**
 * @author peng-yongsheng
 */
public class BufferFileConfig {
    public static int BUFFER_OFFSET_MAX_FILE_SIZE = 10 * 1024 * 1024;
    public static int BUFFER_SEGMENT_MAX_FILE_SIZE = 10 * 1024 * 1024;
}
+99 −0
Original line number Diff line number Diff line
/*
 * Copyright 2017, OpenSkywalking Organization All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * Project repository: https://github.com/OpenSkywalking/skywalking
 */

package org.skywalking.apm.collector.agent.stream.buffer;

/**
 * @author peng-yongsheng
 */
public class Offset {

    private static final String SPLIT_CHARACTER = ",";
    private ReadOffset readOffset;
    private WriteOffset writeOffset;

    public Offset() {
        readOffset = new ReadOffset();
        writeOffset = new WriteOffset();
    }

    public String serialize() {
        return readOffset.getReadFileName() + SPLIT_CHARACTER + String.valueOf(readOffset.getReadFileOffset())
            + SPLIT_CHARACTER + writeOffset.getWriteFileName() + SPLIT_CHARACTER + String.valueOf(writeOffset.getWriteFileOffset());
    }

    public void deserialize(String value) {
        String[] values = value.split(SPLIT_CHARACTER);
        if (values.length == 4) {
            this.readOffset.readFileName = values[0];
            this.readOffset.readFileOffset = Long.parseLong(values[1]);
            this.writeOffset.writeFileName = values[2];
            this.writeOffset.writeFileOffset = Long.parseLong(values[3]);
        }
    }

    public ReadOffset getReadOffset() {
        return readOffset;
    }

    public WriteOffset getWriteOffset() {
        return writeOffset;
    }

    public static class ReadOffset {
        private String readFileName;
        private long readFileOffset = 0;

        public String getReadFileName() {
            return readFileName;
        }

        public long getReadFileOffset() {
            return readFileOffset;
        }

        public void setReadFileName(String readFileName) {
            this.readFileName = readFileName;
        }

        public void setReadFileOffset(long readFileOffset) {
            this.readFileOffset = readFileOffset;
        }
    }

    public static class WriteOffset {
        private String writeFileName;
        private long writeFileOffset = 0;

        public String getWriteFileName() {
            return writeFileName;
        }

        public long getWriteFileOffset() {
            return writeFileOffset;
        }

        public void setWriteFileName(String writeFileName) {
            this.writeFileName = writeFileName;
        }

        public void setWriteFileOffset(long writeFileOffset) {
            this.writeFileOffset = writeFileOffset;
        }
    }
}
+147 −0
Original line number Diff line number Diff line
/*
 * Copyright 2017, OpenSkywalking Organization All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * Project repository: https://github.com/OpenSkywalking/skywalking
 */

package org.skywalking.apm.collector.agent.stream.buffer;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.collector.agent.stream.util.FileUtils;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author peng-yongsheng
 */
public enum OffsetManager {
    INSTANCE;

    private final Logger logger = LoggerFactory.getLogger(OffsetManager.class);

    private static final String OFFSET_FILE_PREFIX = "offset";
    private File offsetFile;
    private Offset offset;
    private boolean initialized = false;
    private RandomAccessFile randomAccessFile = null;
    private String lastOffsetRecord = Const.EMPTY_STRING;

    public synchronized void initialize() throws IOException {
        if (!initialized) {
            this.offset = new Offset();
            File dataPath = new File(SegmentBufferConfig.BUFFER_PATH);
            if (dataPath.mkdirs()) {
                createOffsetFile();
            } else {
                File[] offsetFiles = dataPath.listFiles(new PrefixFileNameFilter());
                if (CollectionUtils.isNotEmpty(offsetFiles) && offsetFiles.length > 0) {
                    for (int i = 0; i < offsetFiles.length; i++) {
                        if (i != offsetFiles.length - 1) {
                            offsetFiles[i].delete();
                        } else {
                            offsetFile = offsetFiles[i];
                        }
                    }
                } else {
                    createOffsetFile();
                }
            }
            String offsetRecord = FileUtils.INSTANCE.readLastLine(offsetFile);
            offset.deserialize(offsetRecord);
            initialized = true;

            Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> flush(), 10, 3, TimeUnit.SECONDS);
        }
    }

    private void createOffsetFile() throws IOException {
        String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
        String offsetFileName = OFFSET_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
        offsetFile = new File(SegmentBufferConfig.BUFFER_PATH + offsetFileName);
        this.offset.getWriteOffset().setWriteFileName(Const.EMPTY_STRING);
        this.offset.getWriteOffset().setWriteFileOffset(0);
        this.offset.getReadOffset().setReadFileName(Const.EMPTY_STRING);
        this.offset.getReadOffset().setReadFileOffset(0);
        this.flush();
    }

    public void flush() {
        String offsetRecord = offset.serialize();
        if (!lastOffsetRecord.equals(offsetRecord)) {
            if (offsetFile.length() >= BufferFileConfig.BUFFER_OFFSET_MAX_FILE_SIZE) {
                nextFile();
            }
            FileUtils.INSTANCE.writeAppendToLast(offsetFile, randomAccessFile, offsetRecord);
            lastOffsetRecord = offsetRecord;
        }
    }

    private void nextFile() {
        String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
        String offsetFileName = OFFSET_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
        File newOffsetFile = new File(SegmentBufferConfig.BUFFER_PATH + offsetFileName);
        offsetFile.delete();
        offsetFile = newOffsetFile;
        this.flush();
    }

    public String getReadFileName() {
        return offset.getReadOffset().getReadFileName();
    }

    public long getReadFileOffset() {
        return offset.getReadOffset().getReadFileOffset();
    }

    public void setReadOffset(long readFileOffset) {
        offset.getReadOffset().setReadFileOffset(readFileOffset);
    }

    public void setReadOffset(String readFileName, long readFileOffset) {
        offset.getReadOffset().setReadFileName(readFileName);
        offset.getReadOffset().setReadFileOffset(readFileOffset);
    }

    public String getWriteFileName() {
        return offset.getWriteOffset().getWriteFileName();
    }

    public long getWriteFileOffset() {
        return offset.getWriteOffset().getWriteFileOffset();
    }

    public void setWriteOffset(String writeFileName, long writeFileOffset) {
        offset.getWriteOffset().setWriteFileName(writeFileName);
        offset.getWriteOffset().setWriteFileOffset(writeFileOffset);
    }

    public void setWriteOffset(long writeFileOffset) {
        offset.getWriteOffset().setWriteFileOffset(writeFileOffset);
    }

    class PrefixFileNameFilter implements FilenameFilter {
        @Override public boolean accept(File dir, String name) {
            return name.startsWith(OFFSET_FILE_PREFIX);
        }
    }
}
+28 −0
Original line number Diff line number Diff line
/*
 * Copyright 2017, OpenSkywalking Organization All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * Project repository: https://github.com/OpenSkywalking/skywalking
 */

package org.skywalking.apm.collector.agent.stream.buffer;

import org.skywalking.apm.collector.core.config.SystemConfig;

/**
 * @author peng-yongsheng
 */
public class SegmentBufferConfig {
    public static String BUFFER_PATH = SystemConfig.DATA_PATH + "/buffer/";
}
+102 −0
Original line number Diff line number Diff line
/*
 * Copyright 2017, OpenSkywalking Organization All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * Project repository: https://github.com/OpenSkywalking/skywalking
 */

package org.skywalking.apm.collector.agent.stream.buffer;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.network.proto.UpstreamSegment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author peng-yongsheng
 */
public enum SegmentBufferManager {
    INSTANCE;

    private final Logger logger = LoggerFactory.getLogger(SegmentBufferManager.class);

    public static final String DATA_FILE_PREFIX = "data";
    private FileOutputStream outputStream;

    public synchronized void initialize() {
        logger.info("segment buffer initialize");
        try {
            OffsetManager.INSTANCE.initialize();
            if (new File(SegmentBufferConfig.BUFFER_PATH).mkdirs()) {
                newDataFile();
            } else {
                String writeFileName = OffsetManager.INSTANCE.getWriteFileName();
                if (StringUtils.isNotEmpty(writeFileName)) {
                    File dataFile = new File(SegmentBufferConfig.BUFFER_PATH + writeFileName);
                    if (dataFile.exists()) {
                        outputStream = new FileOutputStream(new File(SegmentBufferConfig.BUFFER_PATH + writeFileName), true);
                    } else {
                        newDataFile();
                    }
                } else {
                    newDataFile();
                }
            }
            SegmentBufferReader.INSTANCE.initialize();
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
        }
    }

    public synchronized void writeBuffer(UpstreamSegment segment) {
        try {
            segment.writeDelimitedTo(outputStream);
            long position = outputStream.getChannel().position();
            if (position > BufferFileConfig.BUFFER_SEGMENT_MAX_FILE_SIZE) {
                newDataFile();
            } else {
                OffsetManager.INSTANCE.setWriteOffset(position);
            }
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
        }
    }

    private void newDataFile() throws IOException {
        logger.debug("create new segment buffer file");
        String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
        String writeFileName = DATA_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
        File dataFile = new File(SegmentBufferConfig.BUFFER_PATH + writeFileName);
        dataFile.createNewFile();
        OffsetManager.INSTANCE.setWriteOffset(writeFileName, 0);
        try {
            if (outputStream != null) {
                outputStream.close();
            }
            outputStream = new FileOutputStream(dataFile);
            outputStream.getChannel().position(0);
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
        }
    }

    public synchronized void flush() {

    }
}
Loading