Commit 5967cf4b authored by peng-yongsheng's avatar peng-yongsheng
Browse files

Register graph finish.

parent d8ba62e9
Loading
Loading
Loading
Loading
+15 −32
Original line number Diff line number Diff line
@@ -29,13 +29,10 @@ import org.skywalking.apm.collector.agent.grpc.handler.naming.AgentGRPCNamingHan
import org.skywalking.apm.collector.agent.grpc.handler.naming.AgentGRPCNamingListener;
import org.skywalking.apm.collector.agent.stream.AgentStreamSingleton;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.grpc.manager.GRPCManagerModule;
@@ -45,8 +42,6 @@ import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;

/**
@@ -74,7 +69,6 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
        String host = config.getProperty(HOST);
        Integer port = (Integer)config.get(PORT);

        try {
        ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
        moduleRegisterService.register(AgentModule.NAME, this.name(), new AgentModuleGRPCRegistration(host, port));

@@ -85,20 +79,11 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
        NamingHandlerRegisterService namingHandlerRegisterService = getManager().find(NamingModule.NAME).getService(NamingHandlerRegisterService.class);
        namingHandlerRegisterService.register(new AgentGRPCNamingHandler(namingListener));

            CacheServiceManager cacheServiceManager = new CacheServiceManager();
            cacheServiceManager.init(getManager());

            DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class);

        GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
        Server gRPCServer = managerService.createIfAbsent(host, port);

            AgentStreamSingleton agentStreamSingleton = AgentStreamSingleton.getInstance(getManager(), cacheServiceManager, new WorkerCreateListener());

            addHandlers(daoService, gRPCServer, cacheServiceManager, agentStreamSingleton);
        } catch (ModuleNotFoundException e) {
            throw new ServiceNotProvidedException(e.getMessage());
        }
        AgentStreamSingleton.getInstance(getManager(), new WorkerCreateListener());
        addHandlers(gRPCServer);
    }

    @Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@@ -109,12 +94,10 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
        return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME, CacheModule.NAME, RemoteModule.NAME};
    }

    private void addHandlers(DAOService daoService, Server gRPCServer, CacheServiceManager cacheServiceManager,
        AgentStreamSingleton agentStreamSingleton) {
        Graph<Application> applicationRegisterGraph = agentStreamSingleton.getApplicationRegisterGraph();
        gRPCServer.addHandler(new ApplicationRegisterServiceHandler(cacheServiceManager, applicationRegisterGraph));
        gRPCServer.addHandler(new InstanceDiscoveryServiceHandler(daoService, cacheServiceManager));
        gRPCServer.addHandler(new ServiceNameDiscoveryServiceHandler(cacheServiceManager));
    private void addHandlers(Server gRPCServer) {
        gRPCServer.addHandler(new ApplicationRegisterServiceHandler(getManager()));
        gRPCServer.addHandler(new InstanceDiscoveryServiceHandler(getManager()));
        gRPCServer.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
        gRPCServer.addHandler(new JVMMetricsServiceHandler());
        gRPCServer.addHandler(new TraceSegmentServiceHandler());
    }
+11 −6
Original line number Diff line number Diff line
@@ -21,8 +21,9 @@ package org.skywalking.apm.collector.agent.grpc.handler;
import com.google.protobuf.ProtocolStringList;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationMapping;
@@ -40,9 +41,8 @@ public class ApplicationRegisterServiceHandler extends ApplicationRegisterServic

    private final ApplicationIDService applicationIDService;

    public ApplicationRegisterServiceHandler(CacheServiceManager cacheServiceManager,
        Graph<org.skywalking.apm.collector.storage.table.register.Application> applicationRegisterGraph) {
        applicationIDService = new ApplicationIDService(cacheServiceManager, applicationRegisterGraph);
    public ApplicationRegisterServiceHandler(ModuleManager moduleManager) {
        applicationIDService = new ApplicationIDService(moduleManager);
    }

    @Override public void register(Application request, StreamObserver<ApplicationMapping> responseObserver) {
@@ -52,7 +52,12 @@ public class ApplicationRegisterServiceHandler extends ApplicationRegisterServic
        ApplicationMapping.Builder builder = ApplicationMapping.newBuilder();
        for (int i = 0; i < applicationCodes.size(); i++) {
            String applicationCode = applicationCodes.get(i);
            int applicationId = applicationIDService.getOrCreate(applicationCode);
            int applicationId = 0;
            try {
                applicationId = applicationIDService.getOrCreate(applicationCode);
            } catch (ModuleNotFoundException | ServiceNotProvidedException e) {
                logger.error(e.getMessage(), e);
            }

            if (applicationId != 0) {
                KeyWithIntegerValue value = KeyWithIntegerValue.newBuilder().setKey(applicationCode).setValue(applicationId).build();
+18 −8
Original line number Diff line number Diff line
@@ -22,10 +22,11 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.network.proto.ApplicationInstance;
import org.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.skywalking.apm.network.proto.ApplicationInstanceRecover;
@@ -44,14 +45,19 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp

    private final InstanceIDService instanceIDService;

    public InstanceDiscoveryServiceHandler(DAOService daoService, CacheServiceManager cacheServiceManager) {
        this.instanceIDService = new InstanceIDService(daoService, cacheServiceManager);
    public InstanceDiscoveryServiceHandler(ModuleManager moduleManager) {
        this.instanceIDService = new InstanceIDService(moduleManager);
    }

    @Override
    public void register(ApplicationInstance request, StreamObserver<ApplicationInstanceMapping> responseObserver) {
        long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(request.getRegisterTime());
        int instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), timeBucket, buildOsInfo(request.getOsinfo()));
        int instanceId = 0;
        try {
            instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), timeBucket, buildOsInfo(request.getOsinfo()));
        } catch (ModuleNotFoundException | ServiceNotProvidedException e) {
            logger.error(e.getMessage(), e);
        }
        ApplicationInstanceMapping.Builder builder = ApplicationInstanceMapping.newBuilder();
        builder.setApplicationId(request.getApplicationId());
        builder.setApplicationInstanceId(instanceId);
@@ -62,7 +68,11 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
    @Override
    public void registerRecover(ApplicationInstanceRecover request, StreamObserver<Downstream> responseObserver) {
        long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(request.getRegisterTime());
        try {
            instanceIDService.recover(request.getApplicationInstanceId(), request.getApplicationId(), timeBucket, buildOsInfo(request.getOsinfo()));
        } catch (ModuleNotFoundException | ServiceNotProvidedException e) {
            logger.error(e.getMessage(), e);
        }
        responseObserver.onNext(Downstream.newBuilder().build());
        responseObserver.onCompleted();
    }
@@ -74,9 +84,9 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
        osInfoJson.addProperty("processId", osinfo.getProcessNo());

        JsonArray ipv4Array = new JsonArray();
        osinfo.getIpv4SList().forEach(ipv4 -> {
        for (String ipv4 : osinfo.getIpv4SList()) {
            ipv4Array.add(ipv4);
        });
        }
        osInfoJson.add("ipv4s", ipv4Array);
        return osInfoJson.toString();
    }
+3 −3
Original line number Diff line number Diff line
@@ -21,7 +21,7 @@ package org.skywalking.apm.collector.agent.grpc.handler;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.ServiceNameCollection;
import org.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
@@ -40,8 +40,8 @@ public class ServiceNameDiscoveryServiceHandler extends ServiceNameDiscoveryServ

    private final ServiceNameService serviceNameService;

    public ServiceNameDiscoveryServiceHandler(CacheServiceManager cacheServiceManager) {
        this.serviceNameService = new ServiceNameService(cacheServiceManager);
    public ServiceNameDiscoveryServiceHandler(ModuleManager moduleManager) {
        this.serviceNameService = new ServiceNameService(moduleManager);
    }

    @Override public void discovery(ServiceNameCollection request,
+11 −16
Original line number Diff line number Diff line
@@ -28,7 +28,6 @@ import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.jetty.manager.JettyManagerModule;
@@ -66,7 +65,6 @@ public class AgentModuleJettyProvider extends ModuleProvider {
        Integer port = (Integer)config.get(PORT);
        String contextPath = config.getProperty(CONTEXT_PATH);

        try {
        ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
        moduleRegisterService.register(AgentModule.NAME, this.name(), new AgentModuleJettyRegistration(host, port, contextPath));

@@ -82,9 +80,6 @@ public class AgentModuleJettyProvider extends ModuleProvider {
        JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class);
        Server jettyServer = managerService.createIfAbsent(host, port, contextPath);
        addHandlers(daoService, jettyServer);
        } catch (ModuleNotFoundException e) {
            throw new ServiceNotProvidedException(e.getMessage());
        }
    }

    @Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
Loading