Commit 8d234a91 authored by 彭勇升 pengys's avatar 彭勇升 pengys Committed by 吴晟
Browse files

Feature of database session in OAP server. (#3147)

* Feature of database session

* Make it configurable.

* Make the OAP server can't startup.
parent bf504ab7
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -37,6 +37,7 @@ public class CoreModuleConfig extends ModuleConfig {
    @Setter private int gRPCPort;
    @Setter private int maxConcurrentCallsPerConnection;
    @Setter private int maxMessageSize;
    @Setter private boolean enableDatabaseSession;
    private final List<String> downsampling;
    /**
     * The period of doing data persistence.
+18 −57
Original line number Diff line number Diff line
@@ -20,65 +20,25 @@ package org.apache.skywalking.oap.server.core;

import java.io.IOException;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngine;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoader;
import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
import org.apache.skywalking.oap.server.core.query.LogQueryService;
import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
import org.apache.skywalking.oap.server.core.query.MetricQueryService;
import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.query.TraceQueryService;
import org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.config.*;
import org.apache.skywalking.oap.server.core.oal.rt.*;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.*;
import org.apache.skywalking.oap.server.core.remote.client.*;
import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
@@ -96,7 +56,6 @@ public class CoreModuleProvider extends ModuleProvider {
    private final AnnotationScan annotationScan;
    private final StorageModels storageModels;
    private final SourceReceiverImpl receiver;
    private StreamAnnotationListener streamAnnotationListener;
    private OALEngine oalEngine;

    public CoreModuleProvider() {
@@ -120,7 +79,7 @@ public class CoreModuleProvider extends ModuleProvider {
    }

    @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
        streamAnnotationListener = new StreamAnnotationListener(getManager());
        StreamAnnotationListener streamAnnotationListener = new StreamAnnotationListener(getManager());

        AnnotationScan scopeScan = new AnnotationScan();
        scopeScan.registerListener(new DefaultScopeDefine.Listener());
@@ -200,6 +159,8 @@ public class CoreModuleProvider extends ModuleProvider {

        this.remoteClientManager = new RemoteClientManager(getManager());
        this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);

        MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
    }

    @Override public void start() throws ModuleStartException {
+1 −0
Original line number Diff line number Diff line
@@ -35,6 +35,7 @@ public abstract class Metrics extends StreamData implements StorageData {
    public static final String ENTITY_ID = "entity_id";

    @Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket;
    @Getter @Setter private long survivalTime = 0L;

    public abstract String id();

+1 −1
Original line number Diff line number Diff line
@@ -43,7 +43,7 @@ public abstract class PxxMetrics extends Metrics implements IntValueHolder {
    @Getter @Setter @Column(columnName = DETAIL_GROUP) private IntKeyLongValueArray detailGroup;

    private final int percentileRank;
    private Map<Integer, IntKeyLongValue> detailIndex;
    @Getter private Map<Integer, IntKeyLongValue> detailIndex;

    public PxxMetrics(int percentileRank) {
        this.percentileRank = percentileRank;
+61 −20
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@

package org.apache.skywalking.oap.server.core.analysis.worker;

import java.io.IOException;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
@@ -40,16 +41,20 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
    private static final Logger logger = LoggerFactory.getLogger(MetricsPersistentWorker.class);

    private final Model model;
    private final Map<Metrics, Metrics> databaseSession;
    private final MergeDataCache<Metrics> mergeDataCache;
    private final IMetricsDAO metricsDAO;
    private final AbstractWorker<Metrics> nextAlarmWorker;
    private final AbstractWorker<ExportEvent> nextExportWorker;
    private final DataCarrier<Metrics> dataCarrier;
    private final boolean enableDatabaseSession;

    MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker,
        AbstractWorker<ExportEvent> nextExportWorker) {
        AbstractWorker<ExportEvent> nextExportWorker, boolean enableDatabaseSession) {
        super(moduleDefineHolder);
        this.model = model;
        this.databaseSession = new HashMap<>(100);
        this.enableDatabaseSession = enableDatabaseSession;
        this.mergeDataCache = new MergeDataCache<>();
        this.metricsDAO = metricsDAO;
        this.nextAlarmWorker = nextAlarmWorker;
@@ -83,23 +88,21 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
        return mergeDataCache;
    }

    @Override public void prepareBatch(MergeDataCache<Metrics> cache, List<PrepareRequest> prepareRequests) {
    @Override public void prepareBatch(Collection<Metrics> lastCollection, List<PrepareRequest> prepareRequests) {
        long start = System.currentTimeMillis();

        Collection<Metrics> collection = cache.getLast().collection();

        int i = 0;
        int batchGetSize = 2000;
        Metrics[] metrics = null;
        for (Metrics data : collection) {
        for (Metrics data : lastCollection) {
            if (Objects.nonNull(nextExportWorker)) {
                ExportEvent event = new ExportEvent(data, ExportEvent.EventType.INCREMENT);
                nextExportWorker.in(event);
            }

            int batchGetSize = 2000;
            int mod = i % batchGetSize;
            if (mod == 0) {
                int residual = collection.size() - i;
                int residual = lastCollection.size() - i;
                if (residual >= batchGetSize) {
                    metrics = new Metrics[batchGetSize];
                } else {
@@ -110,23 +113,18 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat

            if (mod == metrics.length - 1) {
                try {
                    Map<String, Metrics> dbMetricsMap = metricsDAO.get(model, metrics);
                    syncStorageToCache(metrics);

                    for (Metrics metric : metrics) {
                        if (dbMetricsMap.containsKey(metric.id())) {
                            metric.combine(dbMetricsMap.get(metric.id()));
                            metric.calculate();
                            prepareRequests.add(metricsDAO.prepareBatchUpdate(model, metric));
                        Metrics cacheMetric = databaseSession.get(metric);
                        if (cacheMetric != null) {
                            cacheMetric.combine(metric);
                            cacheMetric.calculate();
                            prepareRequests.add(metricsDAO.prepareBatchUpdate(model, cacheMetric));
                            nextWorker(cacheMetric);
                        } else {
                            prepareRequests.add(metricsDAO.prepareBatchInsert(model, metric));
                        }

                        if (Objects.nonNull(nextAlarmWorker)) {
                            nextAlarmWorker.in(metric);
                        }
                        if (Objects.nonNull(nextExportWorker)) {
                            ExportEvent event = new ExportEvent(metric, ExportEvent.EventType.TOTAL);
                            nextExportWorker.in(event);
                            nextWorker(metric);
                        }
                    }
                } catch (Throwable t) {
@@ -142,6 +140,16 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
        }
    }

    private void nextWorker(Metrics metric) {
        if (Objects.nonNull(nextAlarmWorker)) {
            nextAlarmWorker.in(metric);
        }
        if (Objects.nonNull(nextExportWorker)) {
            ExportEvent event = new ExportEvent(metric, ExportEvent.EventType.TOTAL);
            nextExportWorker.in(event);
        }
    }

    @Override public void cacheData(Metrics input) {
        mergeDataCache.writing();
        if (mergeDataCache.containsKey(input)) {
@@ -156,6 +164,39 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
        mergeDataCache.finishWriting();
    }

    private void syncStorageToCache(Metrics[] metrics) throws IOException {
        if (!enableDatabaseSession) {
            databaseSession.clear();
        }

        List<String> notInCacheIds = new ArrayList<>();
        for (Metrics metric : metrics) {
            if (!databaseSession.containsKey(metric)) {
                notInCacheIds.add(metric.id());
            }
        }

        if (notInCacheIds.size() > 0) {
            List<Metrics> metricsList = metricsDAO.multiGet(model, notInCacheIds);
            for (Metrics metric : metricsList) {
                databaseSession.put(metric, metric);
            }
        }
    }

    @Override public void endOfRound(long tookTime) {
        if (enableDatabaseSession) {
            Iterator<Metrics> iterator = databaseSession.values().iterator();
            while (iterator.hasNext()) {
                Metrics metrics = iterator.next();
                metrics.setSurvivalTime(tookTime + metrics.getSurvivalTime());
                if (metrics.getSurvivalTime() > 70000) {
                    iterator.remove();
                }
            }
        }
    }

    private class PersistentConsumer implements IConsumer<Metrics> {

        private final MetricsPersistentWorker persistent;
Loading