Commit 5150647e authored by 蔡宇东's avatar 蔡宇东
Browse files

#168 fix reduce bug


Former-commit-id: 5c6f2e25cbf9efe4a58bf89757fc4983ebda4f35
parent f49d23ba
Loading
Loading
Loading
Loading
+1 −7
Original line number Diff line number Diff line
@@ -243,19 +243,13 @@ XSearchTask::Execute() {
}

void
XSearchTask::MergeTopkToResultSet(const std::vector<int64_t>& src_ids, const std::vector<float>& src_distances,
XSearchTask::MergeTopkToResultSet(const scheduler::ResultIds& src_ids, const scheduler::ResultDistances& src_distances,
                                  size_t src_k, size_t nq, size_t topk, bool ascending, scheduler::ResultIds& tar_ids,
                                  scheduler::ResultDistances& tar_distances) {
    if (src_ids.empty()) {
        return;
    }

    if (tar_ids.empty()) {
        tar_ids = src_ids;
        tar_distances = src_distances;
        return;
    }

    size_t tar_k = tar_ids.size() / nq;
    size_t buf_k = std::min(topk, src_k + tar_k);

+2 −2
Original line number Diff line number Diff line
@@ -39,8 +39,8 @@ class XSearchTask : public Task {

 public:
    static void
    MergeTopkToResultSet(const std::vector<int64_t>& src_ids, const std::vector<float>& src_distances, uint64_t src_k,
                         uint64_t nq, uint64_t topk, bool ascending, scheduler::ResultIds& tar_ids,
    MergeTopkToResultSet(const scheduler::ResultIds& src_ids, const scheduler::ResultDistances& src_distances,
                         size_t src_k, size_t nq, size_t topk, bool ascending, scheduler::ResultIds& tar_ids,
                         scheduler::ResultDistances& tar_distances);

    //    static void
+74 −69
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@
#include <cmath>
#include <vector>

#include "scheduler/job/SearchJob.h"
#include "scheduler/task/SearchTask.h"
#include "utils/TimeRecorder.h"
#include "utils/ThreadPool.h"
@@ -28,76 +29,80 @@ namespace {
namespace ms = milvus::scheduler;

void
BuildResult(std::vector<int64_t>& output_ids,
            std::vector<float>& output_distance,
            uint64_t input_k,
            uint64_t topk,
            uint64_t nq,
BuildResult(ms::ResultIds& output_ids,
            ms::ResultDistances & output_distances,
            size_t input_k,
            size_t topk,
            size_t nq,
            bool ascending) {
    output_ids.clear();
    output_ids.resize(nq * topk);
    output_distance.clear();
    output_distance.resize(nq * topk);
    output_distances.clear();
    output_distances.resize(nq * topk);

    for (uint64_t i = 0; i < nq; i++) {
    for (size_t i = 0; i < nq; i++) {
        //insert valid items
        for (uint64_t j = 0; j < input_k; j++) {
        for (size_t j = 0; j < input_k; j++) {
            output_ids[i * topk + j] = (int64_t)(drand48() * 100000);
            output_distance[i * topk + j] = ascending ? (j + drand48()) : ((input_k - j) + drand48());
            output_distances[i * topk + j] = ascending ? (j + drand48()) : ((input_k - j) + drand48());
        }
        //insert invalid items
        for (uint64_t j = input_k; j < topk; j++) {
        for (size_t j = input_k; j < topk; j++) {
            output_ids[i * topk + j] = -1;
            output_distance[i * topk + j] = -1.0;
            output_distances[i * topk + j] = -1.0;
        }
    }
}

void
CopyResult(std::vector<int64_t>& output_ids,
           std::vector<float>& output_distance,
           uint64_t output_topk,
           std::vector<int64_t>& input_ids,
           std::vector<float>& input_distance,
           uint64_t input_topk,
           uint64_t nq) {
CopyResult(ms::ResultIds& output_ids,
           ms::ResultDistances& output_distances,
           size_t output_topk,
           ms::ResultIds& input_ids,
           ms::ResultDistances& input_distances,
           size_t input_topk,
           size_t nq) {
    ASSERT_TRUE(input_ids.size() >= nq * input_topk);
    ASSERT_TRUE(input_distance.size() >= nq * input_topk);
    ASSERT_TRUE(input_distances.size() >= nq * input_topk);
    ASSERT_TRUE(output_topk <= input_topk);
    output_ids.clear();
    output_ids.resize(nq * output_topk);
    output_distance.clear();
    output_distance.resize(nq * output_topk);
    output_distances.clear();
    output_distances.resize(nq * output_topk);

    for (uint64_t i = 0; i < nq; i++) {
        for (uint64_t j = 0; j < output_topk; j++) {
    for (size_t i = 0; i < nq; i++) {
        for (size_t j = 0; j < output_topk; j++) {
            output_ids[i * output_topk + j] = input_ids[i * input_topk + j];
            output_distance[i * output_topk + j] = input_distance[i * input_topk + j];
            output_distances[i * output_topk + j] = input_distances[i * input_topk + j];
        }
    }
}

void
CheckTopkResult(const std::vector<int64_t>& input_ids_1,
                const std::vector<float>& input_distance_1,
                const std::vector<int64_t>& input_ids_2,
                const std::vector<float>& input_distance_2,
                uint64_t topk,
                uint64_t nq,
CheckTopkResult(const ms::ResultIds& input_ids_1,
                const ms::ResultDistances& input_distances_1,
                size_t input_k_1,
                const ms::ResultIds& input_ids_2,
                const ms::ResultDistances& input_distances_2,
                size_t input_k_2,
                size_t topk,
                size_t nq,
                bool ascending,
                const ms::ResultIds& result_ids,
                const ms::ResultDistances& result_distances) {
    ASSERT_EQ(result_ids.size(), nq * topk);
    ASSERT_EQ(result_distances.size(), nq * topk);
    ASSERT_EQ(input_ids_1.size(), input_distance_1.size());
    ASSERT_EQ(input_ids_2.size(), input_distance_2.size());
    ASSERT_EQ(result_ids.size(), result_distances.size());
    ASSERT_EQ(input_ids_1.size(), input_distances_1.size());
    ASSERT_EQ(input_ids_2.size(), input_distances_2.size());

    for (int64_t i = 0; i < nq; i++) {
    size_t result_k = result_distances.size() / nq;
    ASSERT_EQ(result_k, std::min(topk, input_k_1 + input_k_2));

    for (size_t i = 0; i < nq; i++) {
        std::vector<float>
            src_vec(input_distance_1.begin() + i * topk, input_distance_1.begin() + (i + 1) * topk);
            src_vec(input_distances_1.begin() + i * topk, input_distances_1.begin() + (i + 1) * topk);
        src_vec.insert(src_vec.end(),
                       input_distance_2.begin() + i * topk,
                       input_distance_2.begin() + (i + 1) * topk);
                       input_distances_2.begin() + i * topk,
                       input_distances_2.begin() + (i + 1) * topk);
        if (ascending) {
            std::sort(src_vec.begin(), src_vec.end());
        } else {
@@ -113,9 +118,9 @@ CheckTopkResult(const std::vector<int64_t>& input_ids_1,
                ++iter;
        }

        uint64_t n = std::min(topk, result_ids.size() / nq);
        for (uint64_t j = 0; j < n; j++) {
            uint64_t idx = i * n + j;
        size_t n = std::min(topk, result_ids.size() / nq);
        for (size_t j = 0; j < n; j++) {
            size_t idx = i * n + j;
            if (result_ids[idx] < 0) {
                continue;
            }
@@ -130,21 +135,21 @@ CheckTopkResult(const std::vector<int64_t>& input_ids_1,
} // namespace

void
MergeTopkToResultSetTest(uint64_t topk_1, uint64_t topk_2, uint64_t nq, uint64_t topk, bool ascending) {
    std::vector<int64_t> ids1, ids2;
    std::vector<float> dist1, dist2;
MergeTopkToResultSetTest(size_t topk_1, size_t topk_2, size_t nq, size_t topk, bool ascending) {
    ms::ResultIds ids1, ids2;
    ms::ResultDistances dist1, dist2;
    ms::ResultIds result_ids;
    ms::ResultDistances result_distances;
    BuildResult(ids1, dist1, topk_1, topk, nq, ascending);
    BuildResult(ids2, dist2, topk_2, topk, nq, ascending);
    ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, topk_1, nq, topk, ascending, result_ids, result_distances);
    ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, topk_2, nq, topk, ascending, result_ids, result_distances);
    CheckTopkResult(ids1, dist1, ids2, dist2, topk, nq, ascending, result_ids, result_distances);
    CheckTopkResult(ids1, dist1, topk_1, ids2, dist2, topk_2, topk, nq, ascending, result_ids, result_distances);
}

TEST(DBSearchTest, MERGE_RESULT_SET_TEST) {
    uint64_t NQ = 15;
    uint64_t TOP_K = 64;
    size_t NQ = 15;
    size_t TOP_K = 64;

    /* test1, id1/dist1 valid, id2/dist2 empty */
    MergeTopkToResultSetTest(TOP_K, 0, NQ, TOP_K, true);
@@ -163,21 +168,21 @@ TEST(DBSearchTest, MERGE_RESULT_SET_TEST) {
    MergeTopkToResultSetTest(TOP_K / 2, TOP_K / 3, NQ, TOP_K, false);
}

//void MergeTopkArrayTest(uint64_t topk_1, uint64_t topk_2, uint64_t nq, uint64_t topk, bool ascending) {
//void MergeTopkArrayTest(size_t topk_1, size_t topk_2, size_t nq, size_t topk, bool ascending) {
//    std::vector<int64_t> ids1, ids2;
//    std::vector<float> dist1, dist2;
//    ms::ResultSet result;
//    BuildResult(ids1, dist1, topk_1, topk, nq, ascending);
//    BuildResult(ids2, dist2, topk_2, topk, nq, ascending);
//    uint64_t result_topk = std::min(topk, topk_1 + topk_2);
//    size_t result_topk = std::min(topk, topk_1 + topk_2);
//    ms::XSearchTask::MergeTopkArray(ids1, dist1, topk_1, ids2, dist2, topk_2, nq, topk, ascending);
//    if (ids1.size() != result_topk * nq) {
//        std::cout << ids1.size() << " " << result_topk * nq << std::endl;
//    }
//    ASSERT_TRUE(ids1.size() == result_topk * nq);
//    ASSERT_TRUE(dist1.size() == result_topk * nq);
//    for (uint64_t i = 0; i < nq; i++) {
//        for (uint64_t k = 1; k < result_topk; k++) {
//    for (size_t i = 0; i < nq; i++) {
//        for (size_t k = 1; k < result_topk; k++) {
//            float f0 = dist1[i * topk + k - 1];
//            float f1 = dist1[i * topk + k];
//            if (ascending) {
@@ -196,8 +201,8 @@ TEST(DBSearchTest, MERGE_RESULT_SET_TEST) {
//}

//TEST(DBSearchTest, MERGE_ARRAY_TEST) {
//    uint64_t NQ = 15;
//    uint64_t TOP_K = 64;
//    size_t NQ = 15;
//    size_t TOP_K = 64;
//
//    /* test1, id1/dist1 valid, id2/dist2 empty */
//    MergeTopkArrayTest(TOP_K, 0, NQ, TOP_K, true);
@@ -226,23 +231,23 @@ TEST(DBSearchTest, REDUCE_PERF_TEST) {
    int32_t index_file_num = 478;   /* sift1B dataset, index files num */
    bool ascending = true;

    std::vector<int32_t> thread_vec = {4};
    std::vector<int32_t> nq_vec = {1000};
    std::vector<int32_t> topk_vec = {64};
    int32_t NQ = nq_vec[nq_vec.size() - 1];
    int32_t TOPK = topk_vec[topk_vec.size() - 1];
    std::vector<size_t> thread_vec = {4};
    std::vector<size_t> nq_vec = {1000};
    std::vector<size_t> topk_vec = {64};
    size_t NQ = nq_vec[nq_vec.size() - 1];
    size_t TOPK = topk_vec[topk_vec.size() - 1];

    std::vector<std::vector<int64_t>> id_vec;
    std::vector<std::vector<float>> dist_vec;
    std::vector<int64_t> input_ids;
    std::vector<float> input_distance;
    std::vector<ms::ResultIds> id_vec;
    std::vector<ms::ResultDistances> dist_vec;
    ms::ResultIds input_ids;
    ms::ResultDistances input_distances;
    int32_t i, k, step;

    /* generate testing data */
    for (i = 0; i < index_file_num; i++) {
        BuildResult(input_ids, input_distance, TOPK, TOPK, NQ, ascending);
        BuildResult(input_ids, input_distances, TOPK, TOPK, NQ, ascending);
        id_vec.push_back(input_ids);
        dist_vec.push_back(input_distance);
        dist_vec.push_back(input_distances);
    }

    for (int32_t max_thread_num : thread_vec) {
@@ -254,8 +259,8 @@ TEST(DBSearchTest, REDUCE_PERF_TEST) {
                ms::ResultIds final_result_ids, final_result_ids_2, final_result_ids_3;
                ms::ResultDistances final_result_distances, final_result_distances_2, final_result_distances_3;

                std::vector<std::vector<int64_t>> id_vec_1(index_file_num);
                std::vector<std::vector<float>> dist_vec_1(index_file_num);
                std::vector<ms::ResultIds> id_vec_1(index_file_num);
                std::vector<ms::ResultDistances> dist_vec_1(index_file_num);
                for (i = 0; i < index_file_num; i++) {
                    CopyResult(id_vec_1[i], dist_vec_1[i], top_k, id_vec[i], dist_vec[i], TOPK, nq);
                }
@@ -285,7 +290,7 @@ TEST(DBSearchTest, REDUCE_PERF_TEST) {
//                /* method-2 */
//                std::vector<std::vector<int64_t>> id_vec_2(index_file_num);
//                std::vector<std::vector<float>> dist_vec_2(index_file_num);
//                std::vector<uint64_t> k_vec_2(index_file_num);
//                std::vector<size_t> k_vec_2(index_file_num);
//                for (i = 0; i < index_file_num; i++) {
//                    CopyResult(id_vec_2[i], dist_vec_2[i], top_k, id_vec[i], dist_vec[i], TOPK, nq);
//                    k_vec_2[i] = top_k;
@@ -328,7 +333,7 @@ TEST(DBSearchTest, REDUCE_PERF_TEST) {
//                /* method-3 parallel */
//                std::vector<std::vector<int64_t>> id_vec_3(index_file_num);
//                std::vector<std::vector<float>> dist_vec_3(index_file_num);
//                std::vector<uint64_t> k_vec_3(index_file_num);
//                std::vector<size_t> k_vec_3(index_file_num);
//                for (i = 0; i < index_file_num; i++) {
//                    CopyResult(id_vec_3[i], dist_vec_3[i], top_k, id_vec[i], dist_vec[i], TOPK, nq);
//                    k_vec_3[i] = top_k;