Commit 35a56316 authored by groot's avatar groot Committed by 李盛俊
Browse files

add adaptive merge strategy (#3780)

parent 6c314348
Loading
Loading
Loading
Loading
+12 −3
Original line number Diff line number Diff line
@@ -363,10 +363,19 @@ DBImpl::CreateIndex(const std::shared_ptr<server::Context>& context, const std::
        }
    }

    // step 3: merge segments before create index, since there could be some small segments just flushed
    {
        snapshot::ScopedSnapshotT latest_ss;
        STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name));
        std::set<int64_t> collection_ids = {latest_ss->GetCollectionId()};
        StartMergeTask(collection_ids, true);  // start force-merge task
        WaitMergeFileFinish();                 // let force-merge file thread finish
    }

    // clear index failed retry map of this collection
    ClearIndexFailedRecord(collection_name);

    // step 3: iterate segments need to be build index, wait until all segments are built
    // step 4: iterate segments need to be build index, wait until all segments are built
    while (true) {
        // start background build index thread
        std::vector<std::string> collection_names = {collection_name};
@@ -906,7 +915,7 @@ DBImpl::InternalFlush(const std::string& collection_name, bool merge) {
    }

    if (merge) {
        StartMergeTask(flushed_collection_ids);
        StartMergeTask(flushed_collection_ids, false);
    }
}

@@ -1117,7 +1126,7 @@ DBImpl::BackgroundMerge(std::set<int64_t> collection_ids, bool force_merge_all)
    for (auto& collection_id : collection_ids) {
        const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);

        MergeStrategyType type = force_merge_all ? MergeStrategyType::SIMPLE : MergeStrategyType::LAYERED;
        MergeStrategyType type = force_merge_all ? MergeStrategyType::ADAPTIVE : MergeStrategyType::LAYERED;
        auto status = merge_mgr_ptr_->MergeSegments(collection_id, type);
        if (!status.ok()) {
            LOG_ENGINE_ERROR_ << "Failed to get merge files for collection id: " << collection_id
+1 −1
Original line number Diff line number Diff line
@@ -152,7 +152,7 @@ class DBImpl : public DB, public ConfigObserver {
    WaitBuildIndexFinish();

    void
    StartMergeTask(const std::set<int64_t>& collection_ids, bool force_merge_all = false);
    StartMergeTask(const std::set<int64_t>& collection_ids, bool force_merge_all);

    void
    BackgroundMerge(std::set<int64_t> collection_ids, bool force_merge_all);
+188 −0
Original line number Diff line number Diff line
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

#include "db/merge/MergeAdaptiveStrategy.h"
#include "utils/Log.h"

#include <algorithm>
#include <map>
#include <utility>

namespace milvus {
namespace engine {

namespace {

// this method is to get all possible combinations of a number list
// for example, source = [1, 2, 3, 4]
// the result will be: [1,2],[1,3],[1,4],[2,3],[2,4],[3,4],[1,2,3],[1,2,4],[1,3,4],[2,3,4],[1,2,3,4]
template <typename T>
void
GetCombination(const std::vector<T>& source, size_t m, std::vector<std::vector<T>>& result) {
    if (source.empty() || m > source.size()) {
        return;
    }

    std::vector<int8_t> bitset;
    bitset.resize(source.size());
    for (int8_t i = 0; i < m; ++i) {
        bitset[i] = 1;
    }

    do {
        std::vector<T> new_combination;
        for (size_t i = 0; i < bitset.size(); ++i) {
            if (bitset[i]) {
                new_combination.push_back(source[i]);
            }
        }
        result.push_back(new_combination);
    } while (std::prev_permutation(bitset.begin(), bitset.end()));
}

template <typename T>
void
GetAllCombination(const std::vector<T>& source, std::vector<std::vector<T>>& result) {
    size_t m = source.size();
    for (size_t i = 2; i <= m; ++i) {
        GetCombination(source, i, result);
    }
}

using ID2Segment = std::map<snapshot::ID_TYPE, SegmentInfo>;
using RowSum2Segments = std::map<int64_t, snapshot::IDS_TYPE>;

void
AllPossibleRowSum(const SegmentInfoList& info_list, int64_t row_per_segment, RowSum2Segments& row_sum_ge,
                  RowSum2Segments& row_sum_lt) {
    row_sum_ge.clear();
    row_sum_lt.clear();

    snapshot::IDS_TYPE ids;
    ID2Segment id2segment;
    for (const SegmentInfo& segment_info : info_list) {
        if (segment_info.row_count_ <= 0 || segment_info.row_count_ >= row_per_segment) {
            continue;  // empty segment or full segment
        }

        ids.push_back((segment_info.id_));
        id2segment.insert(std::make_pair(segment_info.id_, segment_info));
    }

    std::vector<std::vector<snapshot::ID_TYPE>> combinations;
    GetAllCombination<snapshot::ID_TYPE>(ids, combinations);

    for (auto& ids : combinations) {
        int64_t sum = 0;
        snapshot::IDS_TYPE temp_ids;
        for (auto& id : ids) {
            auto iter = id2segment.find(id);
            if (iter == id2segment.end()) {
                sum = 0;
                break;
            }

            sum += iter->second.row_count_;
            temp_ids.push_back(id);
        }

        if (sum == 0) {
            continue;
        }

        int64_t gap = sum - row_per_segment;
        if (gap >= 0) {
            row_sum_ge.insert(std::make_pair(sum, temp_ids));
        } else {
            row_sum_lt.insert(std::make_pair(sum, temp_ids));
        }
    }
}

// this method is to get all possible combinations and calculate each combination row sum,
// compare row sum to get a best one
// fox example, there are 3 segments in info_list: [id=1, row=20000], [id=2, row=90000], [id=3, row=50000]
// assume row_per_segment = 100000
// then there will be 4 merge combinations: (id: 1,2), (id: 2,3), (id: 1,3), (id: 1,2,3)
// the row sum of the 4 combinations is: 110000, 140000, 70000, 160000
// the 110000 is closest to 100000, so (id: 1,2) is the best group
//
// another case, there are 3 segments in info_list: [id=1, row=20000], [id=2, row=30000], [id=3, row=40000]
// then there will be 4 merge combinations: (id: 1,2), (id: 2,3), (id: 1,3), (id: 1,2,3)
// the row sum of the 4 combinations is: 50000, 70000, 60000, 90000
// the 90000 is closest to 100000, so (id: 1,2,3) is the best group
void
GetBestGroup(const SegmentInfoList& info_list, int64_t row_per_segment, snapshot::IDS_TYPE& best_group) {
    best_group.clear();
    RowSum2Segments row_sum_ge, row_sum_lt;
    AllPossibleRowSum(info_list, row_per_segment, row_sum_ge, row_sum_lt);

    if (row_sum_ge.empty()) {
        if (!row_sum_lt.empty()) {
            best_group = row_sum_lt.rbegin()->second;
        }
    } else {
        best_group = row_sum_ge.begin()->second;
    }
}

// this method is to distrbute segment into reasonable groups
// fox example, there are 4 segments in info_list:
// [id=1, row=20000], [id=2, row=90000], [id=3, row=50000], [id=4, row=5000]
// assume row_per_segment = 100000
// they are distrbuted to 2 groups: (id: 1, 2), (id: 3, 4)
// the row sum of the 2 groups is: 110000, 55000
void
GetAdaptiveGroups(const SegmentInfoList& info_list, int64_t row_per_segment, SegmentGroups& groups) {
    // this is a copy since the info_list is const, and the infos will be changed later
    SegmentInfoList infos = info_list;

    snapshot::IDS_TYPE best_group;
    GetBestGroup(infos, row_per_segment, best_group);

    while (!best_group.empty()) {
        groups.emplace_back(best_group);

        for (SegmentInfoList::iterator iter = infos.begin(); iter != infos.end();) {
            auto found = std::find(best_group.begin(), best_group.end(), (*iter).id_);
            if (found != best_group.end()) {
                iter = infos.erase(iter);
            } else {
                ++iter;
            }
        }

        if (infos.size() <= 1) {
            break;
        }

        GetBestGroup(infos, row_per_segment, best_group);
    }
}

}  // namespace

Status
MergeAdaptiveStrategy::RegroupSegments(const Partition2SegmentsMap& part2segment, int64_t row_per_segment,
                                       SegmentGroups& groups) {
    for (auto& kv : part2segment) {
        if (kv.second.size() <= 1) {
            continue;  // no segment or only one segment, no need to merge
        }

        GetAdaptiveGroups(kv.second, row_per_segment, groups);
    }

    return Status::OK();
}

}  // namespace engine
}  // namespace milvus
+30 −0
Original line number Diff line number Diff line
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

#pragma once

#include <string>
#include <vector>

#include "db/merge/MergeStrategy.h"
#include "utils/Status.h"

namespace milvus {
namespace engine {

class MergeAdaptiveStrategy : public MergeStrategy {
 public:
    Status
    RegroupSegments(const Partition2SegmentsMap& part2segment, int64_t row_per_segment, SegmentGroups& groups) override;
};  // MergeSimpleStrategy

}  // namespace engine
}  // namespace milvus
+16 −8
Original line number Diff line number Diff line
@@ -26,19 +26,27 @@ namespace engine {
// 1. SIMPLE
//    merge in old way, merge segment one by one, stop merge until segment row count exceed segment_row_count
// 2. LAYERED
//    distribute segments to several groups according to segment row count
//    firstly, define layers by row count: 4KB, 16KB, 64KB, 256KB, 1024KB
//    if segment row count between 0KB~4KB, put it into layer "4096"
//    if segment row count between 4KB~16KB, put it into layer "16384"
//    if segment row count between 16KB~64KB, put it into layer "65536"
//    if segment row count between 64KB~256KB, put it into layer "262144"
//    if segment row count between 256KB~1024KB, put it into layer "1048576"
//    file row count greater than 1024KB, put into layer MAX_SEGMENT_ROW_COUNT
//    distribute segments to several groups according to segment_row_count
//    assume segment_row_count = 100000
//    firstly, define layers by row count: 100000, 20000, 4000
//    if segment row count between 0~4000, put it into layer "4000"
//    if segment row count between 4000~20000, put it into layer "20000"
//    if segment row count between 20000~100000, put it into layer "100000"
//    segment row count greater than 100000 will be ignored
//    secondly, merge segments for each group
//    third, if some segment's create time is 30 seconds ago, and it still un-merged, force merge with upper layer
// 3. ADAPTIVE
//    merge segments to fit the segment_row_count
//    assume segment_row_count = 100000
//    segment_1 row count = 80000
//    segment_2 row count = 60000
//    segment_3 row count = 30000
//    segment_1 and segment_3 will be merged, since there row sum = 110000,
//    that is much closer than row sum of segment_1 and segment_2, 140000
enum class MergeStrategyType {
    SIMPLE = 1,
    LAYERED = 2,
    ADAPTIVE = 3,
};

class MergeManager {
Loading