Commit 13421da7 authored by ascrutae's avatar ascrutae
Browse files

1.完成统计部分

2. 删除无用代码
parent e024a329
Loading
Loading
Loading
Loading
+5 −5
Original line number Diff line number Diff line
@@ -5,6 +5,9 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import com.ai.cloud.skywalking.analysis.chainbuild.ChainBuildMapper;
import com.ai.cloud.skywalking.analysis.chainbuild.ChainBuildReducer;
import com.ai.cloud.skywalking.analysis.chainbuild.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.config.HBaseTableMetaData;

import org.apache.hadoop.conf.Configuration;
@@ -20,9 +23,6 @@ import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ai.cloud.skywalking.analysis.categorize2chain.Categorize2ChainMapper;
import com.ai.cloud.skywalking.analysis.categorize2chain.Categorize2ChainReducer;
import com.ai.cloud.skywalking.analysis.categorize2chain.po.ChainInfo;
import com.ai.cloud.skywalking.analysis.config.Config;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;

@@ -52,10 +52,10 @@ public class AnalysisServerDriver extends Configured implements Tool {
        job.setJarByClass(AnalysisServerDriver.class);
        Scan scan = buildHBaseScan(args);

        TableMapReduceUtil.initTableMapperJob(HBaseTableMetaData.TABLE_CALL_CHAIN.TABLE_NAME, scan, Categorize2ChainMapper.class,
        TableMapReduceUtil.initTableMapperJob(HBaseTableMetaData.TABLE_CALL_CHAIN.TABLE_NAME, scan, ChainBuildMapper.class,
                Text.class, ChainInfo.class, job);

        job.setReducerClass(Categorize2ChainReducer.class);
        job.setReducerClass(ChainBuildReducer.class);
        job.setNumReduceTasks(Config.Reducer.REDUCER_NUMBER);
        job.setOutputFormatClass(NullOutputFormat.class);
        return job.waitForCompletion(true) ? 0 : 1;
+0 −43
Original line number Diff line number Diff line
package com.ai.cloud.skywalking.analysis.chain2summary;

import com.ai.cloud.skywalking.analysis.chain2summary.po.ChainSpecificTimeSummary;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class Chain2SummaryMapper extends TableMapper<Text, ChainSpecificTimeSummary> {

    private Logger logger = LoggerFactory
            .getLogger(Chain2SummaryMapper.class);


    @Override
    protected void setup(Context context) throws IOException,
            InterruptedException {
        ConfigInitializer.initialize();
    }

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        try {
            ChainSpecificTimeSummary summary = new ChainSpecificTimeSummary(Bytes.toString(key.get()));
            for (Cell cell : value.rawCells()) {
                summary.addChainNodeSummaryResult(Bytes.toString(cell.getValueArray(),
                        cell.getValueOffset(), cell.getValueLength()));
            }
            context.write(new Text(summary.buildMapperKey()), summary);
        } catch (Exception e) {
            logger.error("Failed to mapper call chain[" + key.toString() + "]",
                    e);
        }
    }
}
+0 −49
Original line number Diff line number Diff line
package com.ai.cloud.skywalking.analysis.chain2summary;

import com.ai.cloud.skywalking.analysis.chain2summary.po.ChainSpecificTimeSummary;
import com.ai.cloud.skywalking.analysis.config.ConfigInitializer;

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Iterator;

public class Chain2SummaryReducer extends Reducer<Text, ChainSpecificTimeSummary, Text, IntWritable> {
    private Logger logger = LoggerFactory
            .getLogger(Chain2SummaryReducer.class);

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        ConfigInitializer.initialize();
    }

    @Override
    protected void reduce(Text key, Iterable<ChainSpecificTimeSummary> values, Context context) throws IOException, InterruptedException {
        doReduceAction(Bytes.toString(key.getBytes()), values.iterator());
    }

    public void doReduceAction(String key, Iterator<ChainSpecificTimeSummary> summaryIterator) {
        try {
            ChainRelationship4Search chainRelationship = ChainRelationship4Search.load(Bytes.toString(key.getBytes()));
            Summary summary = new Summary();
            while (summaryIterator.hasNext()) {
                try {
                    ChainSpecificTimeSummary timeSummary = summaryIterator.next();
                    summary.summary(timeSummary, chainRelationship);
                } catch (Exception e) {
                    logger.error("Failed to reduce", e);
                }
            }

            summary.saveToHBase();
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("Failed to reduce key=" + Bytes.toString(key.getBytes()), e);
        }
    }
}
+0 −32
Original line number Diff line number Diff line
package com.ai.cloud.skywalking.analysis.chain2summary;

import com.ai.cloud.skywalking.analysis.categorize2chain.util.HBaseUtil;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class ChainRelationship4Search {

    // key: 正常链路ID value: 正常链路ID
    // key: 异常链路ID value: 正常链路ID
    // key: 未分类链路ID value: 未分类链路ID
    private Map<String, String> chainRelationshipMap = new HashMap<String, String>();

    public static ChainRelationship4Search load(String rowkey) throws IOException {
        ChainRelationship4Search chainRelationship4Search = HBaseUtil.queryChainRelationship(rowkey);
        return chainRelationship4Search;
    }

    public void addRelationship(String cid) {
        chainRelationshipMap.put(cid, cid);
    }

    public void addRelationship(String normalCID, String abnormalCID) {
        chainRelationshipMap.put(normalCID, abnormalCID);
    }

    public String searchRelationship(String cid) {
        return chainRelationshipMap.get(cid);
    }
}
+0 −36
Original line number Diff line number Diff line
package com.ai.cloud.skywalking.analysis.chain2summary;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.ai.cloud.skywalking.analysis.chain2summary.entity.ChainSummaryWithRelationship;
import com.ai.cloud.skywalking.analysis.chain2summary.po.ChainSpecificTimeSummary;

public class Summary {
    private Map<String, ChainSummaryWithRelationship> summaryWithRelationshipMap;

    public  Summary(){
        summaryWithRelationshipMap = new ConcurrentHashMap<String, ChainSummaryWithRelationship>();
    }

    public void summary(ChainSpecificTimeSummary timeSummary, ChainRelationship4Search chainRelationship) throws IOException {
        String cid = chainRelationship.searchRelationship(timeSummary.getcId());
        if (cid == null || cid.length() == 0) {
            cid = timeSummary.getcId();
        }

        if (!summaryWithRelationshipMap.containsKey(cid)) {
            summaryWithRelationshipMap.put(cid, new ChainSummaryWithRelationship(cid));
        }

        ChainSummaryWithRelationship chainSummaryWithRelationship = summaryWithRelationshipMap.get(cid);
        chainSummaryWithRelationship.summary(timeSummary);
    }

    public void saveToHBase() throws IOException, InterruptedException {
        for (Map.Entry<String, ChainSummaryWithRelationship> entry : summaryWithRelationshipMap.entrySet()) {
            entry.getValue().saveToHBase();
        }
    }
}
Loading