Commit b29954cd authored by groot's avatar groot Committed by 李盛俊
Browse files

avoid dead circle when merge failed (#3787)



* avoid dead circle when merge failed

Signed-off-by: default avatargroot <yihua.mo@zilliz.com>

* typo

Signed-off-by: default avatargroot <yihua.mo@zilliz.com>
Signed-off-by: default avatarshengjun.li <shengjun.li@zilliz.com>
parent 35a56316
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -151,7 +151,7 @@ GetAdaptiveGroups(const SegmentInfoList& info_list, int64_t row_per_segment, Seg
    while (!best_group.empty()) {
        groups.emplace_back(best_group);

        for (SegmentInfoList::iterator iter = infos.begin(); iter != infos.end();) {
        for (auto iter = infos.begin(); iter != infos.end();) {
            auto found = std::find(best_group.begin(), best_group.end(), (*iter).id_);
            if (found != best_group.end()) {
                iter = infos.erase(iter);
+29 −1
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@
#include "utils/Log.h"

#include <map>
#include <unordered_set>
#include <utility>

namespace milvus {
@@ -96,10 +97,15 @@ MergeManagerImpl::MergeSegments(int64_t collection_id, MergeStrategyType type) {
        return status;
    }

    // to avoid dead-circle, ignore failed segments
    std::unordered_set<snapshot::ID_TYPE> failed_segments;
    while (true) {
        snapshot::ScopedSnapshotT latest_ss;
        STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(latest_ss, collection_id));

        // segment must meet two conditions for merging:
        // 1. the segment's row count is less than segment_row_count
        // 2. the segment has no index(for any field)
        snapshot::IDS_TYPE segment_ids;
        SnapshotVisitor ss_visitor(latest_ss);
        ss_visitor.SegmentsToMerge(segment_ids);
@@ -107,6 +113,18 @@ MergeManagerImpl::MergeSegments(int64_t collection_id, MergeStrategyType type) {
            break;  // nothing to merge
        }

        // ignore failed segments in last round
        if (!failed_segments.empty()) {
            for (auto iter = segment_ids.begin(); iter != segment_ids.end();) {
                if (failed_segments.find(*iter) != failed_segments.end()) {
                    iter = segment_ids.erase(iter);
                } else {
                    ++iter;
                }
            }
            failed_segments.clear();
        }

        // collect segments info
        Partition2SegmentsMap part2seg;
        for (auto& segment_id : segment_ids) {
@@ -150,7 +168,17 @@ MergeManagerImpl::MergeSegments(int64_t collection_id, MergeStrategyType type) {
        // do merge
        for (auto& segments : segment_groups) {
            MergeTask task(options_, latest_ss, segments);
            task.Execute();
            status = task.Execute();
            if (!status.ok()) {
                // merge failed, these segments will not take part in next round
                std::string msg;
                for (auto& id : segments) {
                    failed_segments.insert(id);
                    msg += std::to_string(id);
                    msg += ",";
                }
                LOG_ENGINE_ERROR_ << "Failed to merge segments: " << msg << " reason: " << status.message();
            }
        }
    }