Commit 2db52c1f authored by pengys5's avatar pengys5
Browse files

Trace dag web ui test success.

#354
parent 7f5248c5
Loading
Loading
Loading
Loading
+39 −32
Original line number Diff line number Diff line
@@ -2,15 +2,13 @@ package org.skywalking.apm.collector.agentstream.worker.node.component;

import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.define.node.NodeComponentDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.define.node.NodeComponentDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
@@ -22,48 +20,59 @@ import org.slf4j.LoggerFactory;
/**
 * @author pengys5
 */
public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener, LocalSpanListener {
public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener {

    private final Logger logger = LoggerFactory.getLogger(NodeComponentSpanListener.class);

    private List<String> nodeComponents = new ArrayList<>();
    private List<NodeComponentDataDefine.NodeComponent> nodeComponents = new ArrayList<>();
    private long timeBucket;

    @Override
    public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
        String componentName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getComponentId());
        NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
        nodeComponent.setComponentId(spanObject.getComponentId());

        String id;
        if (spanObject.getComponentId() == 0) {
            componentName = spanObject.getComponent();
            nodeComponent.setComponentName(spanObject.getComponent());
            id = nodeComponent.getComponentName();
        } else {
            nodeComponent.setComponentName(Const.EMPTY_STRING);
            id = String.valueOf(nodeComponent.getComponentId());
        }
        String peer = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getPeerId());

        nodeComponent.setPeerId(spanObject.getPeerId());
        if (spanObject.getPeerId() == 0) {
            peer = spanObject.getPeer();
            nodeComponent.setPeer(spanObject.getPeer());
            id = id + Const.ID_SPLIT + nodeComponent.getPeer();
        } else {
            nodeComponent.setPeer(Const.EMPTY_STRING);
            id = id + Const.ID_SPLIT + nodeComponent.getPeerId();
        }

        String agg = peer + Const.ID_SPLIT + componentName;
        nodeComponents.add(agg);
        nodeComponent.setId(id);
        nodeComponents.add(nodeComponent);
    }

    @Override
    public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
        buildEntryOrLocal(spanObject, applicationId);
    }

    @Override
    public void parseLocal(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
        buildEntryOrLocal(spanObject, applicationId);
    }

    private void buildEntryOrLocal(SpanObject spanObject, int applicationId) {
        String componentName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getComponentId());
        NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
        nodeComponent.setComponentId(spanObject.getComponentId());

        String id;
        if (spanObject.getComponentId() == 0) {
            componentName = spanObject.getComponent();
            nodeComponent.setComponentName(spanObject.getComponent());
            id = nodeComponent.getComponentName();
        } else {
            id = String.valueOf(nodeComponent.getComponentId());
            nodeComponent.setComponentName(Const.EMPTY_STRING);
        }

        String peer = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId);
        String agg = peer + Const.ID_SPLIT + componentName;
        nodeComponents.add(agg);
        nodeComponent.setPeerId(applicationId);
        nodeComponent.setPeer(Const.EMPTY_STRING);
        id = id + Const.ID_SPLIT + String.valueOf(applicationId);
        nodeComponent.setId(id);

        nodeComponents.add(nodeComponent);
    }

    @Override
@@ -74,10 +83,8 @@ public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanLis
    @Override public void build() {
        StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);

        nodeComponents.forEach(agg -> {
            NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
            nodeComponent.setId(timeBucket + Const.ID_SPLIT + agg);
            nodeComponent.setAgg(agg);
        nodeComponents.forEach(nodeComponent -> {
            nodeComponent.setId(timeBucket + Const.ID_SPLIT + nodeComponent.getId());
            nodeComponent.setTimeBucket(timeBucket);

            try {
+14 −5
Original line number Diff line number Diff line
@@ -5,11 +5,11 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.node.NodeComponentTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;

/**
 * @author pengys5
@@ -21,7 +21,10 @@ public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO, IPer
        if (getResponse.isExists()) {
            Data data = dataDefine.build(id);
            Map<String, Object> source = getResponse.getSource();
            data.setDataString(1, (String)source.get(NodeComponentTable.COLUMN_AGG));
            data.setDataInteger(0, ((Number)source.get(NodeComponentTable.COLUMN_COMPONENT_ID)).intValue());
            data.setDataString(1, (String)source.get(NodeComponentTable.COLUMN_COMPONENT_NAME));
            data.setDataInteger(1, ((Number)source.get(NodeComponentTable.COLUMN_PEER_ID)).intValue());
            data.setDataString(2, (String)source.get(NodeComponentTable.COLUMN_PEER));
            data.setDataLong(0, (Long)source.get(NodeComponentTable.COLUMN_TIME_BUCKET));
            return data;
        } else {
@@ -31,7 +34,10 @@ public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO, IPer

    @Override public IndexRequestBuilder prepareBatchInsert(Data data) {
        Map<String, Object> source = new HashMap<>();
        source.put(NodeComponentTable.COLUMN_AGG, data.getDataString(1));
        source.put(NodeComponentTable.COLUMN_COMPONENT_ID, data.getDataInteger(0));
        source.put(NodeComponentTable.COLUMN_COMPONENT_NAME, data.getDataString(1));
        source.put(NodeComponentTable.COLUMN_PEER_ID, data.getDataInteger(1));
        source.put(NodeComponentTable.COLUMN_PEER, data.getDataString(2));
        source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getDataLong(0));

        return getClient().prepareIndex(NodeComponentTable.TABLE, data.getDataString(0)).setSource(source);
@@ -39,7 +45,10 @@ public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO, IPer

    @Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
        Map<String, Object> source = new HashMap<>();
        source.put(NodeComponentTable.COLUMN_AGG, data.getDataString(1));
        source.put(NodeComponentTable.COLUMN_COMPONENT_ID, data.getDataInteger(0));
        source.put(NodeComponentTable.COLUMN_COMPONENT_NAME, data.getDataString(1));
        source.put(NodeComponentTable.COLUMN_PEER_ID, data.getDataInteger(1));
        source.put(NodeComponentTable.COLUMN_PEER, data.getDataString(2));
        source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getDataLong(0));

        return getClient().prepareUpdate(NodeComponentTable.TABLE, data.getDataString(0)).setDoc(source);
+5 −2
Original line number Diff line number Diff line
package org.skywalking.apm.collector.agentstream.worker.node.component.define;

import org.skywalking.apm.collector.storage.define.node.NodeComponentTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.define.node.NodeComponentTable;

/**
 * @author pengys5
@@ -26,7 +26,10 @@ public class NodeComponentEsTableDefine extends ElasticSearchTableDefine {
    }

    @Override public void initialize() {
        addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name()));
        addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, ElasticSearchColumnDefine.Type.Integer.name()));
        addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
        addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_PEER_ID, ElasticSearchColumnDefine.Type.Integer.name()));
        addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_PEER, ElasticSearchColumnDefine.Type.Keyword.name()));
        addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
    }
}
+5 −2
Original line number Diff line number Diff line
package org.skywalking.apm.collector.agentstream.worker.node.component.define;

import org.skywalking.apm.collector.storage.define.node.NodeComponentTable;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.define.node.NodeComponentTable;

/**
 * @author pengys5
@@ -15,7 +15,10 @@ public class NodeComponentH2TableDefine extends H2TableDefine {

    @Override public void initialize() {
        addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
        addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_AGG, H2ColumnDefine.Type.Varchar.name()));
        addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, H2ColumnDefine.Type.Int.name()));
        addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, H2ColumnDefine.Type.Varchar.name()));
        addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_PEER_ID, H2ColumnDefine.Type.Int.name()));
        addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_PEER, H2ColumnDefine.Type.Varchar.name()));
        addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
    }
}
+19 −14
Original line number Diff line number Diff line
@@ -2,13 +2,12 @@ package org.skywalking.apm.collector.agentstream.worker.node.mapping;

import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.define.node.NodeMappingDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.define.node.NodeMappingDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
@@ -25,19 +24,27 @@ public class NodeMappingSpanListener implements RefsListener, FirstSpanListener

    private final Logger logger = LoggerFactory.getLogger(NodeMappingSpanListener.class);

    private List<String> nodeMappings = new ArrayList<>();
    private List<NodeMappingDataDefine.NodeMapping> nodeMappings = new ArrayList<>();
    private long timeBucket;

    @Override public void parseRef(TraceSegmentReference reference, int applicationId, int applicationInstanceId,
        String segmentId) {
        logger.debug("node mapping listener parse reference");
        String peers = reference.getNetworkAddress();
        NodeMappingDataDefine.NodeMapping nodeMapping = new NodeMappingDataDefine.NodeMapping();
        nodeMapping.setApplicationId(applicationId);
        nodeMapping.setAddressId(reference.getNetworkAddressId());

        String id = String.valueOf(applicationId);
        if (reference.getNetworkAddressId() != 0) {
            peers = ExchangeMarkUtils.INSTANCE.buildMarkedID(reference.getNetworkAddressId());
            nodeMapping.setAddress(Const.EMPTY_STRING);
            id = id + Const.ID_SPLIT + String.valueOf(nodeMapping.getAddressId());
        } else {
            id = id + Const.ID_SPLIT + reference.getNetworkAddress();
            nodeMapping.setAddress(reference.getNetworkAddress());
        }

        String agg = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId) + Const.ID_SPLIT + peers;
        nodeMappings.add(agg);
        nodeMapping.setId(id);
        nodeMappings.add(nodeMapping);
    }

    @Override
@@ -48,13 +55,11 @@ public class NodeMappingSpanListener implements RefsListener, FirstSpanListener
    @Override public void build() {
        logger.debug("node mapping listener build");
        StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
        for (String agg : nodeMappings) {
            NodeMappingDataDefine.NodeMapping nodeMapping = new NodeMappingDataDefine.NodeMapping();
            nodeMapping.setId(timeBucket + Const.ID_SPLIT + agg);
            nodeMapping.setAgg(agg);
            nodeMapping.setTimeBucket(timeBucket);

        for (NodeMappingDataDefine.NodeMapping nodeMapping : nodeMappings) {
            try {
                nodeMapping.setId(timeBucket + Const.ID_SPLIT + nodeMapping.getId());
                nodeMapping.setTimeBucket(timeBucket);
                logger.debug("send to node mapping aggregation worker, id: {}", nodeMapping.getId());
                context.getClusterWorkerContext().lookup(NodeMappingAggregationWorker.WorkerRole.INSTANCE).tell(nodeMapping.toData());
            } catch (WorkerInvokeException | WorkerNotFoundException e) {
Loading