Commit b671d634 authored by pengys5's avatar pengys5
Browse files

Update instance heart beat time that receive from jam metric service.

#358
parent 2169b112
Loading
Loading
Loading
Loading
+18 −1
Original line number Diff line number Diff line
@@ -6,17 +6,19 @@ import org.skywalking.apm.collector.agentjvm.worker.cpu.CpuMetricPersistenceWork
import org.skywalking.apm.collector.agentjvm.worker.cpu.define.CpuMetricDataDefine;
import org.skywalking.apm.collector.agentjvm.worker.gc.GCMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.gc.define.GCMetricDataDefine;
import org.skywalking.apm.collector.agentjvm.worker.heartbeat.InstHeartBeatPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.heartbeat.define.InstanceHeartBeatDataDefine;
import org.skywalking.apm.collector.agentjvm.worker.memory.MemoryMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.memory.define.MemoryMetricDataDefine;
import org.skywalking.apm.collector.agentjvm.worker.memorypool.MemoryPoolMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.memorypool.define.MemoryPoolMetricDataDefine;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.network.proto.CPU;
import org.skywalking.apm.network.proto.Downstream;
@@ -42,6 +44,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
        StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
        request.getMetricsList().forEach(metric -> {
            long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
            senToInstanceHeartBeatPersistenceWorker(context, applicationInstanceId, time);
            sendToCpuMetricPersistenceWorker(context, applicationInstanceId, time, metric.getCpu());
            sendToMemoryMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryList());
            sendToMemoryPoolMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryPoolList());
@@ -52,6 +55,20 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
        responseObserver.onCompleted();
    }

    private void senToInstanceHeartBeatPersistenceWorker(StreamModuleContext context, int applicationInstanceId,
        long heartBeatTime) {
        InstanceHeartBeatDataDefine.InstanceHeartBeat heartBeat = new InstanceHeartBeatDataDefine.InstanceHeartBeat();
        heartBeat.setId(String.valueOf(applicationInstanceId));
        heartBeat.setHeartbeatTime(heartBeatTime);
        heartBeat.setApplicationInstanceId(applicationInstanceId);
        try {
            logger.debug("send to instance heart beat persistence worker, id: {}", heartBeat.getId());
            context.getClusterWorkerContext().lookup(InstHeartBeatPersistenceWorker.WorkerRole.INSTANCE).tell(heartBeat.toData());
        } catch (WorkerInvokeException | WorkerNotFoundException e) {
            logger.error(e.getMessage(), e);
        }
    }

    private void sendToCpuMetricPersistenceWorker(StreamModuleContext context, int applicationInstanceId,
        long timeBucket, CPU cpu) {
        CpuMetricDataDefine.CpuMetric cpuMetric = new CpuMetricDataDefine.CpuMetric();
+71 −0
Original line number Diff line number Diff line
package org.skywalking.apm.collector.agentjvm.worker.heartbeat;

import org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao.InstanceHeartBeatEsDAO;
import org.skywalking.apm.collector.agentjvm.worker.heartbeat.define.InstanceHeartBeatDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;

/**
 * @author pengys5
 */
public class InstHeartBeatPersistenceWorker extends PersistenceWorker {

    public InstHeartBeatPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
        super(role, clusterContext);
    }

    @Override public void preStart() throws ProviderNotFoundException {
        super.preStart();
    }

    @Override protected boolean needMergeDBData() {
        return false;
    }

    @Override protected IPersistenceDAO persistenceDAO() {
        return (IPersistenceDAO)DAOContainer.INSTANCE.get(InstanceHeartBeatEsDAO.class.getName());
    }

    public static class Factory extends AbstractLocalAsyncWorkerProvider<InstHeartBeatPersistenceWorker> {
        @Override
        public Role role() {
            return WorkerRole.INSTANCE;
        }

        @Override
        public InstHeartBeatPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
            return new InstHeartBeatPersistenceWorker(role(), clusterContext);
        }

        @Override
        public int queueSize() {
            return 1024;
        }
    }

    public enum WorkerRole implements Role {
        INSTANCE;

        @Override
        public String roleName() {
            return InstHeartBeatPersistenceWorker.class.getSimpleName();
        }

        @Override
        public WorkerSelector workerSelector() {
            return new RollingSelector();
        }

        @Override public DataDefine dataDefine() {
            return new InstanceHeartBeatDataDefine();
        }
    }
}
+7 −0
Original line number Diff line number Diff line
package org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao;

/**
 * @author pengys5
 */
public interface IInstanceHeartBeatDAO {
}
 No newline at end of file
+31 −0
Original line number Diff line number Diff line
package org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao;

import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.register.InstanceTable;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;

/**
 * @author pengys5
 */
public class InstanceHeartBeatEsDAO extends EsDAO implements IInstanceHeartBeatDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {

    @Override public Data get(String id, DataDefine dataDefine) {
        return null;
    }

    @Override public IndexRequestBuilder prepareBatchInsert(Data data) {
        return null;
    }

    @Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
        Map<String, Object> source = new HashMap<>();
        source.put(InstanceTable.COLUMN_REGISTER_TIME, data.getDataLong(0));
        return getClient().prepareUpdate(InstanceTable.TABLE, data.getDataString(0)).setDoc(source);
    }
}
+9 −0
Original line number Diff line number Diff line
package org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao;

import org.skywalking.apm.collector.storage.h2.dao.H2DAO;

/**
 * @author pengys5
 */
public class InstanceHeartBeatH2DAO extends H2DAO implements IInstanceHeartBeatDAO {
}
Loading