Unverified Commit 8605f4cc authored by 彭勇升 pengys's avatar 彭勇升 pengys Committed by GitHub
Browse files

Change the metrics process flow. (#3157)

* Feature of database session

* Make it configurable.

* Change the metrics process flow.
before: metrics entrance -> aggregate worker -> remote worker -> trans worker -> minute, hour, day, month persistence worker -> storage
after: metrics entrance -> aggregate worker -> remote worker -> minute persistence worker ->  trans worker -> hour, day, month persistence worker -> storage
parent 924f25fe
Loading
Loading
Loading
Loading
+6 −1
Original line number Diff line number Diff line
@@ -47,10 +47,11 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
    private final AbstractWorker<Metrics> nextAlarmWorker;
    private final AbstractWorker<ExportEvent> nextExportWorker;
    private final DataCarrier<Metrics> dataCarrier;
    private final MetricsTransWorker transWorker;
    private final boolean enableDatabaseSession;

    MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker,
        AbstractWorker<ExportEvent> nextExportWorker, boolean enableDatabaseSession) {
        AbstractWorker<ExportEvent> nextExportWorker, MetricsTransWorker transWorker, boolean enableDatabaseSession) {
        super(moduleDefineHolder);
        this.model = model;
        this.databaseSession = new HashMap<>(100);
@@ -59,6 +60,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
        this.metricsDAO = metricsDAO;
        this.nextAlarmWorker = nextAlarmWorker;
        this.nextExportWorker = nextExportWorker;
        this.transWorker = transWorker;

        String name = "METRICS_L2_AGGREGATION";
        int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
@@ -99,6 +101,9 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
                ExportEvent event = new ExportEvent(data, ExportEvent.EventType.INCREMENT);
                nextExportWorker.in(event);
            }
            if (Objects.nonNull(transWorker)) {
                transWorker.in(data);
            }

            int mod = i % batchGetSize;
            if (mod == 0) {
+7 −7
Original line number Diff line number Diff line
@@ -86,14 +86,14 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
            monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
        }

        Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute), false);
        MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, model);
        MetricsTransWorker transWorker = new MetricsTransWorker(moduleDefineHolder, stream.name(), hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);

        MetricsTransWorker transWorker = new MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
        Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute), false);
        MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, model, transWorker);

        String remoteReceiverWorkerName = stream.name() + "_rec";
        IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceSetter.class);
        workerInstanceSetter.put(remoteReceiverWorkerName, transWorker, metricsClass);
        workerInstanceSetter.put(remoteReceiverWorkerName, minutePersistentWorker, metricsClass);

        MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
        MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(moduleDefineHolder, remoteWorker, stream.name());
@@ -101,18 +101,18 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
        entryWorkers.put(metricsClass, aggregateWorker);
    }

    private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
    private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, Model model, MetricsTransWorker transWorker) {
        AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleDefineHolder);
        ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);

        MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, enableDatabaseSession);
        MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker, enableDatabaseSession);
        persistentWorkers.add(minutePersistentWorker);

        return minutePersistentWorker;
    }

    private MetricsPersistentWorker worker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
        MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, null, null, enableDatabaseSession);
        MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, null, null, null, enableDatabaseSession);
        persistentWorkers.add(persistentWorker);

        return persistentWorker;
+2 −20
Original line number Diff line number Diff line
@@ -23,11 +23,8 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;

/**
 * @author peng-yongsheng
@@ -36,30 +33,24 @@ public class MetricsTransWorker extends AbstractWorker<Metrics> {

    private static final Logger logger = LoggerFactory.getLogger(MetricsTransWorker.class);

    private final MetricsPersistentWorker minutePersistenceWorker;
    private final MetricsPersistentWorker hourPersistenceWorker;
    private final MetricsPersistentWorker dayPersistenceWorker;
    private final MetricsPersistentWorker monthPersistenceWorker;

    private final CounterMetrics aggregationMinCounter;
    private final CounterMetrics aggregationHourCounter;
    private final CounterMetrics aggregationDayCounter;
    private final CounterMetrics aggregationMonthCounter;

    public MetricsTransWorker(ModuleDefineHolder moduleDefineHolder, String modelName,
        MetricsPersistentWorker minutePersistenceWorker,
        MetricsPersistentWorker hourPersistenceWorker,
        MetricsPersistentWorker dayPersistenceWorker,
        MetricsPersistentWorker monthPersistenceWorker) {
        super(moduleDefineHolder);
        this.minutePersistenceWorker = minutePersistenceWorker;
        this.hourPersistenceWorker = hourPersistenceWorker;
        this.dayPersistenceWorker = dayPersistenceWorker;
        this.monthPersistenceWorker = monthPersistenceWorker;

        MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
        aggregationMinCounter = metricsCreator.createCounter("metrics_aggregation", "The number of rows in aggregation",
            new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(modelName, "2", "min"));
        aggregationHourCounter = metricsCreator.createCounter("metrics_aggregation", "The number of rows in aggregation",
            new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(modelName, "2", "hour"));
        aggregationDayCounter = metricsCreator.createCounter("metrics_aggregation", "The number of rows in aggregation",
@@ -81,14 +72,5 @@ public class MetricsTransWorker extends AbstractWorker<Metrics> {
            aggregationHourCounter.inc();
            monthPersistenceWorker.in(metrics.toMonth());
        }

        /*
         * Minute persistent must be at the end of all time dimensionalities
         * Because #toHour, #toDay, #toMonth include clone inside, which could avoid concurrency situation.
         */
        if (Objects.nonNull(minutePersistenceWorker)) {
            aggregationMinCounter.inc();
            minutePersistenceWorker.in(metrics);
        }
    }
}