Commit 883f7207 authored by 吴晟's avatar 吴晟
Browse files

Finish the most codes of collector service metric collector. Sample like this, @peng-yongsheng

```
17:45:48.486 [pool-2-thread-1] ERROR org.skywalking.apm.collector.core.module.instrument.MetricCollector -
##################################################################################################################
#                                             Collector Service Report                                                                                                                                            #
##################################################################################################################
BaseB:
 P-B:
BaseA:
 P-A:
  public void org.skywalking.apm.collector.core.module.ModuleABusiness1Impl.print():
   Avg=1585 (nano), Success Rate=100%
```
parent 7bbe0693
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -96,7 +96,7 @@ public abstract class ModuleProvider {
        Service service) throws ServiceNotProvidedException {
        if (serviceType.isInstance(service)) {
            if (manager.isServiceInstrument()) {
                service = ServiceInstrumentation.INSTANCE.buildServiceUnderMonitor(service);
                service = ServiceInstrumentation.INSTANCE.buildServiceUnderMonitor(module.name(), name(), service);
            }
            this.services.put(serviceType, service);
        } else {
+144 −1
Original line number Diff line number Diff line
@@ -18,11 +18,154 @@

package org.skywalking.apm.collector.core.module.instrument;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The <code>MetricCollector</code> collects the service metrics by Module/Provider/Service structure.
 */
public enum MetricCollector {
public enum MetricCollector implements Runnable {
    INSTANCE;

    private final Logger logger = LoggerFactory.getLogger(MetricCollector.class);
    private HashMap<String, ModuleMetric> modules = new HashMap<>();

    MetricCollector() {
        ScheduledExecutorService service = Executors
            .newSingleThreadScheduledExecutor();
        service.scheduleAtFixedRate(this, 10, 60, TimeUnit.SECONDS);
    }

    @Override
    public void run() {
        if (!logger.isDebugEnabled()) {
            return;
        }
        StringBuilder report = new StringBuilder();
        report.append("\n");
        report.append("##################################################################################################################\n");
        report.append("#                                             Collector Service Report                                           #\n");
        report.append("##################################################################################################################\n");
        modules.forEach((moduleName, moduleMetric) -> {
            report.append(moduleName).append(":\n");
            moduleMetric.providers.forEach((providerName, providerMetric) -> {
                report.append("\t").append(providerName).append(":\n");
                providerMetric.services.forEach((serviceName, serviceMetric) -> {
                    serviceMetric.methodMetrics.forEach((method, metric) -> {
                        report.append("\t\t").append(method).append(":\n");
                        report.append("\t\t\t").append(metric).append("\n");
                        serviceMetric.methodMetrics.put(method, new ServiceMethodMetric());
                    });
                });
            });
        });

        logger.debug(report.toString());

    }

    ServiceMetric registerService(String module, String provider, String service) {
        return initIfAbsent(module).initIfAbsent(provider).initIfAbsent(service);
    }

    private ModuleMetric initIfAbsent(String moduleName) {
        if (!modules.containsKey(moduleName)) {
            ModuleMetric metric = new ModuleMetric(moduleName);
            modules.put(moduleName, metric);
            return metric;
        }
        return modules.get(moduleName);
    }

    private class ModuleMetric {
        private String moduleName;
        private HashMap<String, ProviderMetric> providers = new HashMap<>();

        public ModuleMetric(String moduleName) {
            this.moduleName = moduleName;
        }

        private ProviderMetric initIfAbsent(String providerName) {
            if (!providers.containsKey(providerName)) {
                ProviderMetric metric = new ProviderMetric(providerName);
                providers.put(providerName, metric);
                return metric;
            }
            return providers.get(providerName);
        }
    }

    private class ProviderMetric {
        private String providerName;
        private HashMap<String, ServiceMetric> services = new HashMap<>();

        public ProviderMetric(String providerName) {
            this.providerName = providerName;
        }

        private ServiceMetric initIfAbsent(String serviceName) {
            if (!services.containsKey(serviceName)) {
                ServiceMetric metric = new ServiceMetric(serviceName);
                services.put(serviceName, metric);
                return metric;
            }
            return services.get(serviceName);
        }
    }

    class ServiceMetric {
        private String serviceName;
        private ConcurrentHashMap<Method, ServiceMethodMetric> methodMetrics = new ConcurrentHashMap<>();

        public ServiceMetric(String serviceName) {
            this.serviceName = serviceName;
        }

        void trace(Method method, long nano, boolean occurException) {
            if (logger.isDebugEnabled()) {
                ServiceMethodMetric metric = methodMetrics.get(method);
                if (metric == null) {
                    ServiceMethodMetric methodMetric = new ServiceMethodMetric();
                    methodMetrics.putIfAbsent(method, methodMetric);
                    metric = methodMetrics.get(method);
                }
                metric.add(nano, occurException);
            }
        }
    }

    private class ServiceMethodMetric {
        private AtomicLong totalTimeNano;
        private AtomicLong counter;
        private AtomicLong errorCounter;

        public ServiceMethodMetric() {
            totalTimeNano = new AtomicLong(0);
            counter = new AtomicLong(0);
            errorCounter = new AtomicLong(0);
        }

        private void add(long nano, boolean occurException) {
            totalTimeNano.addAndGet(nano);
            counter.incrementAndGet();
            if (occurException)
                errorCounter.incrementAndGet();
        }

        @Override public String toString() {
            if (counter.longValue() == 0) {
                return "Avg=N/A";
            }
            return "Avg=" + (totalTimeNano.longValue() / counter.longValue()) + " (nano)" +
                ", Success Rate=" + (counter.longValue() - errorCounter.longValue()) * 100 / counter.longValue() +
                "%";
        }
    }
}
+2 −2
Original line number Diff line number Diff line
@@ -42,7 +42,7 @@ public enum ServiceInstrumentation {
    private final Logger logger = LoggerFactory.getLogger(ServiceInstrumentation.class);
    private ElementMatcher<? super MethodDescription> excludeObjectMethodsMatcher;

    public Service buildServiceUnderMonitor(Service implementation) {
    public Service buildServiceUnderMonitor(String moduleName, String providerName, Service implementation) {
        if (implementation instanceof TracedService) {
            // Duplicate service instrument, ignore.
            return implementation;
@@ -51,7 +51,7 @@ public enum ServiceInstrumentation {
            return new ByteBuddy().subclass(implementation.getClass())
                .implement(TracedService.class)
                .method(getDefaultMatcher()).intercept(
                    MethodDelegation.withDefaultConfiguration().to(new ServiceMetricCollector())
                    MethodDelegation.withDefaultConfiguration().to(new ServiceMetricTracing(moduleName, providerName, implementation.getClass().getName()))
                ).make().load(getClass().getClassLoader()
                ).getLoaded().newInstance();
        } catch (InstantiationException e) {
+18 −2
Original line number Diff line number Diff line
@@ -29,7 +29,12 @@ import net.bytebuddy.implementation.bind.annotation.This;
/**
 * @author wu-sheng
 */
public class ServiceMetricCollector {
public class ServiceMetricTracing {
    private MetricCollector.ServiceMetric serviceMetric;

    public ServiceMetricTracing(String module, String provider, String service) {
        serviceMetric = MetricCollector.INSTANCE.registerService(module, provider, service);
    }

    @RuntimeType
    public Object intercept(@This Object obj,
@@ -37,6 +42,17 @@ public class ServiceMetricCollector {
        @SuperCall Callable<?> zuper,
        @Origin Method method
    ) throws Throwable {
        boolean occurError = false;
        long startNano = System.nanoTime();
        long endNano;
        try {
            return zuper.call();
        } catch (Throwable t) {
            occurError = true;
            throw t;
        } finally {
            endNano = System.nanoTime();
            serviceMetric.trace(method, endNano - startNano, occurError);
        }
    }
}
+18 −0
Original line number Diff line number Diff line
@@ -54,5 +54,23 @@ public class ModuleManagerTest {
        BaseModuleA.ServiceABusiness1 serviceABusiness1 = manager.find("BaseA").getService(BaseModuleA.ServiceABusiness1.class);

        Assert.assertTrue(serviceABusiness1 instanceof TracedService);

//        for (int i = 0; i < 10000; i++)
//            serviceABusiness1.print();
//
//        try {
//            Thread.sleep(60 * 1000L);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
//
//        for (int i = 0; i < 10000; i++)
//            serviceABusiness1.print();
//
//        try {
//            Thread.sleep(120 * 1000L);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
    }
}