Loading apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/grpc/handler/JVMMetricsServiceHandler.java +3 −3 Original line number Diff line number Diff line Loading @@ -44,7 +44,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME); request.getMetricsList().forEach(metric -> { long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime()); senToInstanceHeartBeatPersistenceWorker(context, applicationInstanceId, time); senToInstanceHeartBeatPersistenceWorker(context, applicationInstanceId, metric.getTime()); sendToCpuMetricPersistenceWorker(context, applicationInstanceId, time, metric.getCpu()); sendToMemoryMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryList()); sendToMemoryPoolMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryPoolList()); Loading @@ -59,8 +59,8 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe long heartBeatTime) { InstanceHeartBeatDataDefine.InstanceHeartBeat heartBeat = new InstanceHeartBeatDataDefine.InstanceHeartBeat(); heartBeat.setId(String.valueOf(applicationInstanceId)); heartBeat.setHeartbeatTime(heartBeatTime); heartBeat.setApplicationInstanceId(applicationInstanceId); heartBeat.setHeartBeatTime(heartBeatTime); heartBeat.setInstanceId(applicationInstanceId); try { logger.debug("send to instance heart beat persistence worker, id: {}", heartBeat.getId()); context.getClusterWorkerContext().lookup(InstHeartBeatPersistenceWorker.WorkerRole.INSTANCE).tell(heartBeat.toData()); Loading apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/InstHeartBeatPersistenceWorker.java +3 −3 Original line number Diff line number Diff line package org.skywalking.apm.collector.agentjvm.worker.heartbeat; import org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao.InstanceHeartBeatEsDAO; import org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao.IInstanceHeartBeatDAO; import org.skywalking.apm.collector.agentjvm.worker.heartbeat.define.InstanceHeartBeatDataDefine; import org.skywalking.apm.collector.storage.dao.DAOContainer; import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider; Loading @@ -27,11 +27,11 @@ public class InstHeartBeatPersistenceWorker extends PersistenceWorker { } @Override protected boolean needMergeDBData() { return false; return true; } @Override protected IPersistenceDAO persistenceDAO() { return (IPersistenceDAO)DAOContainer.INSTANCE.get(InstanceHeartBeatEsDAO.class.getName()); return (IPersistenceDAO)DAOContainer.INSTANCE.get(IInstanceHeartBeatDAO.class.getName()); } public static class Factory extends AbstractLocalAsyncWorkerProvider<InstHeartBeatPersistenceWorker> { Loading apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/dao/InstanceHeartBeatEsDAO.java +20 −3 Original line number Diff line number Diff line Loading @@ -2,30 +2,47 @@ package org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao; import java.util.HashMap; import java.util.Map; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.skywalking.apm.collector.core.framework.UnexpectedException; import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO; import org.skywalking.apm.collector.storage.table.register.InstanceTable; import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; import org.skywalking.apm.collector.stream.worker.impl.data.Data; import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author pengys5 */ public class InstanceHeartBeatEsDAO extends EsDAO implements IInstanceHeartBeatDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> { private final Logger logger = LoggerFactory.getLogger(InstanceHeartBeatEsDAO.class); @Override public Data get(String id, DataDefine dataDefine) { GetResponse getResponse = getClient().prepareGet(InstanceTable.TABLE, id).get(); if (getResponse.isExists()) { Data data = dataDefine.build(id); Map<String, Object> source = getResponse.getSource(); data.setDataInteger(0, (Integer)source.get(InstanceTable.COLUMN_INSTANCE_ID)); data.setDataLong(0, (Long)source.get(InstanceTable.COLUMN_HEARTBEAT_TIME)); logger.debug("id: {} is exists", id); return data; } else { logger.debug("id: {} is not exists", id); return null; } } @Override public IndexRequestBuilder prepareBatchInsert(Data data) { return null; throw new UnexpectedException("There is no need to merge stream data with database data."); } @Override public UpdateRequestBuilder prepareBatchUpdate(Data data) { Map<String, Object> source = new HashMap<>(); source.put(InstanceTable.COLUMN_REGISTER_TIME, data.getDataLong(0)); source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, data.getDataLong(0)); return getClient().prepareUpdate(InstanceTable.TABLE, data.getDataString(0)).setDoc(source); } } apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/define/InstanceHeartBeatDataDefine.java +32 −25 Original line number Diff line number Diff line package org.skywalking.apm.collector.agentjvm.worker.heartbeat.define; import org.skywalking.apm.collector.core.framework.UnexpectedException; import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; import org.skywalking.apm.collector.storage.table.register.InstanceTable; import org.skywalking.apm.collector.stream.worker.impl.data.Attribute; Loading @@ -22,27 +21,35 @@ public class InstanceHeartBeatDataDefine extends DataDefine { @Override protected void attributeDefine() { addAttribute(0, new Attribute(InstanceTable.COLUMN_ID, AttributeType.STRING, new NonOperation())); addAttribute(1, new Attribute(InstanceTable.COLUMN_INSTANCE_ID, AttributeType.INTEGER, new NonOperation())); addAttribute(1, new Attribute(InstanceTable.COLUMN_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation())); addAttribute(2, new Attribute(InstanceTable.COLUMN_HEARTBEAT_TIME, AttributeType.LONG, new CoverOperation())); } @Override public Object deserialize(RemoteData remoteData) { throw new UnexpectedException("instance heart beat data did not need send to remote worker."); String id = remoteData.getDataStrings(0); int instanceId = remoteData.getDataIntegers(0); long heartBeatTime = remoteData.getDataLongs(0); return new InstanceHeartBeat(id, heartBeatTime, instanceId); } @Override public RemoteData serialize(Object object) { throw new UnexpectedException("instance heart beat data did not need send to remote worker."); InstanceHeartBeat instanceHeartBeat = (InstanceHeartBeat)object; RemoteData.Builder builder = RemoteData.newBuilder(); builder.addDataStrings(instanceHeartBeat.getId()); builder.addDataIntegers(instanceHeartBeat.getInstanceId()); builder.addDataLongs(instanceHeartBeat.getHeartBeatTime()); return builder.build(); } public static class InstanceHeartBeat implements Transform<InstanceHeartBeat> { private String id; private int applicationInstanceId; private long heartbeatTime; private long heartBeatTime; private int instanceId; public InstanceHeartBeat(String id, int applicationInstanceId, long heartbeatTime) { public InstanceHeartBeat(String id, long heartBeatTime, int instanceId) { this.id = id; this.applicationInstanceId = applicationInstanceId; this.heartbeatTime = heartbeatTime; this.heartBeatTime = heartBeatTime; this.instanceId = instanceId; } public InstanceHeartBeat() { Loading @@ -52,40 +59,40 @@ public class InstanceHeartBeatDataDefine extends DataDefine { InstanceHeartBeatDataDefine define = new InstanceHeartBeatDataDefine(); Data data = define.build(id); data.setDataString(0, this.id); data.setDataInteger(0, this.applicationInstanceId); data.setDataLong(0, this.heartbeatTime); data.setDataInteger(0, this.instanceId); data.setDataLong(0, this.heartBeatTime); return data; } @Override public InstanceHeartBeat toSelf(Data data) { this.id = data.getDataString(0); this.applicationInstanceId = data.getDataInteger(0); this.heartbeatTime = data.getDataLong(0); this.instanceId = data.getDataInteger(0); this.heartBeatTime = data.getDataLong(0); return this; } public void setId(String id) { this.id = id; public String getId() { return id; } public void setApplicationInstanceId(int applicationInstanceId) { this.applicationInstanceId = applicationInstanceId; public void setId(String id) { this.id = id; } public String getId() { return id; public long getHeartBeatTime() { return heartBeatTime; } public int getApplicationInstanceId() { return applicationInstanceId; public void setHeartBeatTime(long heartBeatTime) { this.heartBeatTime = heartBeatTime; } public long getHeartbeatTime() { return heartbeatTime; public int getInstanceId() { return instanceId; } public void setHeartbeatTime(long heartbeatTime) { this.heartbeatTime = heartbeatTime; public void setInstanceId(int instanceId) { this.instanceId = instanceId; } } } apm-collector/apm-collector-agentjvm/src/main/resources/META-INF/defines/es_dao.define +2 −1 Original line number Diff line number Diff line Loading @@ -2,3 +2,4 @@ org.skywalking.apm.collector.agentjvm.worker.cpu.dao.CpuMetricEsDAO org.skywalking.apm.collector.agentjvm.worker.memory.dao.MemoryMetricEsDAO org.skywalking.apm.collector.agentjvm.worker.memorypool.dao.MemoryPoolMetricEsDAO org.skywalking.apm.collector.agentjvm.worker.gc.dao.GCMetricEsDAO org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao.InstanceHeartBeatEsDAO No newline at end of file Loading
apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/grpc/handler/JVMMetricsServiceHandler.java +3 −3 Original line number Diff line number Diff line Loading @@ -44,7 +44,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME); request.getMetricsList().forEach(metric -> { long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime()); senToInstanceHeartBeatPersistenceWorker(context, applicationInstanceId, time); senToInstanceHeartBeatPersistenceWorker(context, applicationInstanceId, metric.getTime()); sendToCpuMetricPersistenceWorker(context, applicationInstanceId, time, metric.getCpu()); sendToMemoryMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryList()); sendToMemoryPoolMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryPoolList()); Loading @@ -59,8 +59,8 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe long heartBeatTime) { InstanceHeartBeatDataDefine.InstanceHeartBeat heartBeat = new InstanceHeartBeatDataDefine.InstanceHeartBeat(); heartBeat.setId(String.valueOf(applicationInstanceId)); heartBeat.setHeartbeatTime(heartBeatTime); heartBeat.setApplicationInstanceId(applicationInstanceId); heartBeat.setHeartBeatTime(heartBeatTime); heartBeat.setInstanceId(applicationInstanceId); try { logger.debug("send to instance heart beat persistence worker, id: {}", heartBeat.getId()); context.getClusterWorkerContext().lookup(InstHeartBeatPersistenceWorker.WorkerRole.INSTANCE).tell(heartBeat.toData()); Loading
apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/InstHeartBeatPersistenceWorker.java +3 −3 Original line number Diff line number Diff line package org.skywalking.apm.collector.agentjvm.worker.heartbeat; import org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao.InstanceHeartBeatEsDAO; import org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao.IInstanceHeartBeatDAO; import org.skywalking.apm.collector.agentjvm.worker.heartbeat.define.InstanceHeartBeatDataDefine; import org.skywalking.apm.collector.storage.dao.DAOContainer; import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider; Loading @@ -27,11 +27,11 @@ public class InstHeartBeatPersistenceWorker extends PersistenceWorker { } @Override protected boolean needMergeDBData() { return false; return true; } @Override protected IPersistenceDAO persistenceDAO() { return (IPersistenceDAO)DAOContainer.INSTANCE.get(InstanceHeartBeatEsDAO.class.getName()); return (IPersistenceDAO)DAOContainer.INSTANCE.get(IInstanceHeartBeatDAO.class.getName()); } public static class Factory extends AbstractLocalAsyncWorkerProvider<InstHeartBeatPersistenceWorker> { Loading
apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/dao/InstanceHeartBeatEsDAO.java +20 −3 Original line number Diff line number Diff line Loading @@ -2,30 +2,47 @@ package org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao; import java.util.HashMap; import java.util.Map; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.skywalking.apm.collector.core.framework.UnexpectedException; import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO; import org.skywalking.apm.collector.storage.table.register.InstanceTable; import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; import org.skywalking.apm.collector.stream.worker.impl.data.Data; import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author pengys5 */ public class InstanceHeartBeatEsDAO extends EsDAO implements IInstanceHeartBeatDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> { private final Logger logger = LoggerFactory.getLogger(InstanceHeartBeatEsDAO.class); @Override public Data get(String id, DataDefine dataDefine) { GetResponse getResponse = getClient().prepareGet(InstanceTable.TABLE, id).get(); if (getResponse.isExists()) { Data data = dataDefine.build(id); Map<String, Object> source = getResponse.getSource(); data.setDataInteger(0, (Integer)source.get(InstanceTable.COLUMN_INSTANCE_ID)); data.setDataLong(0, (Long)source.get(InstanceTable.COLUMN_HEARTBEAT_TIME)); logger.debug("id: {} is exists", id); return data; } else { logger.debug("id: {} is not exists", id); return null; } } @Override public IndexRequestBuilder prepareBatchInsert(Data data) { return null; throw new UnexpectedException("There is no need to merge stream data with database data."); } @Override public UpdateRequestBuilder prepareBatchUpdate(Data data) { Map<String, Object> source = new HashMap<>(); source.put(InstanceTable.COLUMN_REGISTER_TIME, data.getDataLong(0)); source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, data.getDataLong(0)); return getClient().prepareUpdate(InstanceTable.TABLE, data.getDataString(0)).setDoc(source); } }
apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/define/InstanceHeartBeatDataDefine.java +32 −25 Original line number Diff line number Diff line package org.skywalking.apm.collector.agentjvm.worker.heartbeat.define; import org.skywalking.apm.collector.core.framework.UnexpectedException; import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; import org.skywalking.apm.collector.storage.table.register.InstanceTable; import org.skywalking.apm.collector.stream.worker.impl.data.Attribute; Loading @@ -22,27 +21,35 @@ public class InstanceHeartBeatDataDefine extends DataDefine { @Override protected void attributeDefine() { addAttribute(0, new Attribute(InstanceTable.COLUMN_ID, AttributeType.STRING, new NonOperation())); addAttribute(1, new Attribute(InstanceTable.COLUMN_INSTANCE_ID, AttributeType.INTEGER, new NonOperation())); addAttribute(1, new Attribute(InstanceTable.COLUMN_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation())); addAttribute(2, new Attribute(InstanceTable.COLUMN_HEARTBEAT_TIME, AttributeType.LONG, new CoverOperation())); } @Override public Object deserialize(RemoteData remoteData) { throw new UnexpectedException("instance heart beat data did not need send to remote worker."); String id = remoteData.getDataStrings(0); int instanceId = remoteData.getDataIntegers(0); long heartBeatTime = remoteData.getDataLongs(0); return new InstanceHeartBeat(id, heartBeatTime, instanceId); } @Override public RemoteData serialize(Object object) { throw new UnexpectedException("instance heart beat data did not need send to remote worker."); InstanceHeartBeat instanceHeartBeat = (InstanceHeartBeat)object; RemoteData.Builder builder = RemoteData.newBuilder(); builder.addDataStrings(instanceHeartBeat.getId()); builder.addDataIntegers(instanceHeartBeat.getInstanceId()); builder.addDataLongs(instanceHeartBeat.getHeartBeatTime()); return builder.build(); } public static class InstanceHeartBeat implements Transform<InstanceHeartBeat> { private String id; private int applicationInstanceId; private long heartbeatTime; private long heartBeatTime; private int instanceId; public InstanceHeartBeat(String id, int applicationInstanceId, long heartbeatTime) { public InstanceHeartBeat(String id, long heartBeatTime, int instanceId) { this.id = id; this.applicationInstanceId = applicationInstanceId; this.heartbeatTime = heartbeatTime; this.heartBeatTime = heartBeatTime; this.instanceId = instanceId; } public InstanceHeartBeat() { Loading @@ -52,40 +59,40 @@ public class InstanceHeartBeatDataDefine extends DataDefine { InstanceHeartBeatDataDefine define = new InstanceHeartBeatDataDefine(); Data data = define.build(id); data.setDataString(0, this.id); data.setDataInteger(0, this.applicationInstanceId); data.setDataLong(0, this.heartbeatTime); data.setDataInteger(0, this.instanceId); data.setDataLong(0, this.heartBeatTime); return data; } @Override public InstanceHeartBeat toSelf(Data data) { this.id = data.getDataString(0); this.applicationInstanceId = data.getDataInteger(0); this.heartbeatTime = data.getDataLong(0); this.instanceId = data.getDataInteger(0); this.heartBeatTime = data.getDataLong(0); return this; } public void setId(String id) { this.id = id; public String getId() { return id; } public void setApplicationInstanceId(int applicationInstanceId) { this.applicationInstanceId = applicationInstanceId; public void setId(String id) { this.id = id; } public String getId() { return id; public long getHeartBeatTime() { return heartBeatTime; } public int getApplicationInstanceId() { return applicationInstanceId; public void setHeartBeatTime(long heartBeatTime) { this.heartBeatTime = heartBeatTime; } public long getHeartbeatTime() { return heartbeatTime; public int getInstanceId() { return instanceId; } public void setHeartbeatTime(long heartbeatTime) { this.heartbeatTime = heartbeatTime; public void setInstanceId(int instanceId) { this.instanceId = instanceId; } } }
apm-collector/apm-collector-agentjvm/src/main/resources/META-INF/defines/es_dao.define +2 −1 Original line number Diff line number Diff line Loading @@ -2,3 +2,4 @@ org.skywalking.apm.collector.agentjvm.worker.cpu.dao.CpuMetricEsDAO org.skywalking.apm.collector.agentjvm.worker.memory.dao.MemoryMetricEsDAO org.skywalking.apm.collector.agentjvm.worker.memorypool.dao.MemoryPoolMetricEsDAO org.skywalking.apm.collector.agentjvm.worker.gc.dao.GCMetricEsDAO org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao.InstanceHeartBeatEsDAO No newline at end of file