Commit 8feaf766 authored by peng-yongsheng's avatar peng-yongsheng
Browse files

Trace stream aggregation test success.

parent c10b7b18
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -99,6 +99,6 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
        gRPCServer.addHandler(new InstanceDiscoveryServiceHandler(getManager()));
        gRPCServer.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
        gRPCServer.addHandler(new JVMMetricsServiceHandler());
        gRPCServer.addHandler(new TraceSegmentServiceHandler());
        gRPCServer.addHandler(new TraceSegmentServiceHandler(getManager()));
    }
}
+10 −2
Original line number Diff line number Diff line
@@ -19,6 +19,8 @@
package org.skywalking.apm.collector.agent.grpc.handler;

import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.parser.SegmentParse;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc;
@@ -33,12 +35,18 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg

    private final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class);

    private final ModuleManager moduleManager;

    public TraceSegmentServiceHandler(ModuleManager moduleManager) {
        this.moduleManager = moduleManager;
    }

    @Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
        return new StreamObserver<UpstreamSegment>() {
            @Override public void onNext(UpstreamSegment segment) {
                logger.debug("receive segment");
//                SegmentParse segmentParse = new SegmentParse();
//                segmentParse.parse(segment, SegmentParse.Source.Agent);
                SegmentParse segmentParse = new SegmentParse(moduleManager);
                segmentParse.parse(segment, SegmentParse.Source.Agent);
            }

            @Override public void onError(Throwable throwable) {
+1 −0
Original line number Diff line number Diff line
@@ -56,6 +56,7 @@ public class AgentStreamSingleton {

        PersistenceTimer timer = new PersistenceTimer();
        timer.start(moduleManager, workerCreateListener.getPersistenceWorkers());

    }

    private void createJVMGraph() {
+5 −10
Original line number Diff line number Diff line
@@ -55,47 +55,42 @@ public class JvmMetricStreamGraph {
    }

    @SuppressWarnings("unchecked")
    public Graph<GCMetric> createGcMetricGraph() {
    public void createGcMetricGraph() {
        QueueCreatorService<GCMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

        Graph<GCMetric> graph = GraphManager.INSTANCE.createIfAbsent(GC_METRIC_GRAPH_ID, GCMetric.class);
        graph.addNode(new GCMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
        return graph;
    }

    @SuppressWarnings("unchecked")
    public Graph<CpuMetric> createCpuMetricGraph() {
    public void createCpuMetricGraph() {
        QueueCreatorService<CpuMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

        Graph<CpuMetric> graph = GraphManager.INSTANCE.createIfAbsent(CPU_METRIC_GRAPH_ID, CpuMetric.class);
        graph.addNode(new CpuMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
        return graph;
    }

    @SuppressWarnings("unchecked")
    public Graph<MemoryMetric> createMemoryMetricGraph() {
    public void createMemoryMetricGraph() {
        QueueCreatorService<MemoryMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

        Graph<MemoryMetric> graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_METRIC_GRAPH_ID, MemoryMetric.class);
        graph.addNode(new MemoryMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
        return graph;
    }

    @SuppressWarnings("unchecked")
    public Graph<MemoryPoolMetric> createMemoryPoolMetricGraph() {
    public void createMemoryPoolMetricGraph() {
        QueueCreatorService<MemoryPoolMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

        Graph<MemoryPoolMetric> graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class);
        graph.addNode(new MemoryPoolMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
        return graph;
    }

    @SuppressWarnings("unchecked")
    public Graph<Instance> createHeartBeatGraph() {
    public void createHeartBeatGraph() {
        QueueCreatorService<Instance> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

        Graph<Instance> graph = GraphManager.INSTANCE.createIfAbsent(INST_HEART_BEAT_GRAPH_ID, Instance.class);
        graph.addNode(new InstHeartBeatPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
        return graph;
    }
}
+3 −6
Original line number Diff line number Diff line
@@ -54,7 +54,7 @@ public class RegisterStreamGraph {
    }

    @SuppressWarnings("unchecked")
    public Graph<Application> createApplicationRegisterGraph() {
    public void createApplicationRegisterGraph() {
        RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);

        QueueCreatorService<Application> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
@@ -62,11 +62,10 @@ public class RegisterStreamGraph {
        Graph<Application> graph = GraphManager.INSTANCE.createIfAbsent(APPLICATION_REGISTER_GRAPH_ID, Application.class);
        graph.addNode(new ApplicationRegisterRemoteWorker.Factory(moduleManager, remoteSenderService, APPLICATION_REGISTER_GRAPH_ID).create(workerCreateListener))
            .addNext(new ApplicationRegisterSerialWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
        return graph;
    }

    @SuppressWarnings("unchecked")
    public Graph<Instance> createInstanceRegisterGraph() {
    public void createInstanceRegisterGraph() {
        RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);

        QueueCreatorService<Instance> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
@@ -74,11 +73,10 @@ public class RegisterStreamGraph {
        Graph<Instance> graph = GraphManager.INSTANCE.createIfAbsent(INSTANCE_REGISTER_GRAPH_ID, Instance.class);
        graph.addNode(new InstanceRegisterRemoteWorker.Factory(moduleManager, remoteSenderService, INSTANCE_REGISTER_GRAPH_ID).create(workerCreateListener))
            .addNext(new InstanceRegisterSerialWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
        return graph;
    }

    @SuppressWarnings("unchecked")
    public Graph<ServiceName> createServiceNameRegisterGraph() {
    public void createServiceNameRegisterGraph() {
        RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);

        QueueCreatorService<ServiceName> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
@@ -86,6 +84,5 @@ public class RegisterStreamGraph {
        Graph<ServiceName> graph = GraphManager.INSTANCE.createIfAbsent(SERVICE_NAME_REGISTER_GRAPH_ID, ServiceName.class);
        graph.addNode(new ServiceNameRegisterRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_NAME_REGISTER_GRAPH_ID).create(workerCreateListener))
            .addNext(new ServiceNameRegisterSerialWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
        return graph;
    }
}
Loading