Commit ab9bc922 authored by 吴晟's avatar 吴晟 Committed by 彭勇升 pengys
Browse files

Fix H2 metrics read bug, can't read expected existing metrics. (#3132)

Make agent mock works as usual.
parent 594994ff
Loading
Loading
Loading
Loading
+8 −5
Original line number Diff line number Diff line
@@ -18,11 +18,14 @@

package org.apache.skywalking.oap.server.receiver.trace.mock;

import io.grpc.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.network.language.agent.*;
import org.joda.time.DateTime;
import org.apache.skywalking.apm.network.language.agent.Downstream;
import org.apache.skywalking.apm.network.language.agent.TraceSegmentServiceGrpc;
import org.apache.skywalking.apm.network.language.agent.UniqueId;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;

/**
 * @author peng-yongsheng
@@ -39,8 +42,8 @@ public class AgentDataMock {
        StreamObserver<UpstreamSegment> streamObserver = createStreamObserver();

        UniqueId.Builder globalTraceId = UniqueIdBuilder.INSTANCE.create();
//        long startTimestamp = System.currentTimeMillis();
        long startTimestamp = new DateTime().minusDays(2).getMillis();
        long startTimestamp = System.currentTimeMillis();
        //long startTimestamp = new DateTime().minusDays(2).getMillis();

        // ServiceAMock
        ServiceAMock serviceAMock = new ServiceAMock(registerMock);
+11 −4
Original line number Diff line number Diff line
@@ -18,14 +18,16 @@

package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;

import java.sql.*;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
import org.slf4j.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author wusheng, peng-yongsheng
@@ -51,8 +53,13 @@ public class H2BatchDAO implements IBatchDAO {

        try (Connection connection = h2Client.getConnection()) {
            for (Object exe : collection) {
                try {
                    SQLExecutor sqlExecutor = (SQLExecutor)exe;
                    sqlExecutor.invoke(connection);
                } catch (SQLException e) {
                    // Just avoid one execution failure makes the rest of batch failure.
                    logger.error(e.getMessage(), e);
                }
            }
        } catch (SQLException | JDBCClientException e) {
            logger.error(e.getMessage(), e);
+28 −10
Original line number Diff line number Diff line
@@ -19,17 +19,27 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;

import java.io.IOException;
import java.sql.*;
import java.util.*;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataType;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.*;
import org.slf4j.*;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.ArrayParamBuilder;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author wusheng, peng-yongsheng
@@ -49,9 +59,14 @@ public class H2SQLExecutor {

            try (ResultSet rs = h2Client.executeQuery(connection, "SELECT * FROM " + modelName + " WHERE id in (" + param + ")")) {
                List<StorageData> storageDataList = new ArrayList<>();
                while (rs.next()) {
                    storageDataList.add(toStorageData(rs, modelName, storageBuilder));
                StorageData storageData;
                do {
                    storageData = toStorageData(rs, modelName, storageBuilder);
                    if (storageData != null) {
                        storageDataList.add(storageData);
                    }
                }
                while (storageData != null);
                return storageDataList;
            }
        } catch (SQLException | JDBCClientException e) {
@@ -81,7 +96,8 @@ public class H2SQLExecutor {
        }
    }

    protected StorageData toStorageData(ResultSet rs, String modelName, StorageBuilder storageBuilder) throws SQLException {
    protected StorageData toStorageData(ResultSet rs, String modelName,
        StorageBuilder storageBuilder) throws SQLException {
        if (rs.next()) {
            Map data = new HashMap();
            List<ModelColumn> columns = TableMetaInfo.get(modelName).getColumns();
@@ -106,7 +122,8 @@ public class H2SQLExecutor {
        return Const.NONE;
    }

    protected SQLExecutor getInsertExecutor(String modelName, StorageData metrics, StorageBuilder storageBuilder) throws IOException {
    protected SQLExecutor getInsertExecutor(String modelName, StorageData metrics,
        StorageBuilder storageBuilder) throws IOException {
        Map<String, Object> objectMap = storageBuilder.data2Map(metrics);

        SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + modelName + " VALUES");
@@ -133,7 +150,8 @@ public class H2SQLExecutor {
        return new SQLExecutor(sqlBuilder.toString(), param);
    }

    protected SQLExecutor getUpdateExecutor(String modelName, StorageData metrics, StorageBuilder storageBuilder) throws IOException {
    protected SQLExecutor getUpdateExecutor(String modelName, StorageData metrics,
        StorageBuilder storageBuilder) throws IOException {
        Map<String, Object> objectMap = storageBuilder.data2Map(metrics);

        SQLBuilder sqlBuilder = new SQLBuilder("UPDATE " + modelName + " SET ");