Commit 2a0bf434 authored by BossZou's avatar BossZou Committed by 李盛俊
Browse files

New operation named MultiSegmentsOperation support multi-segments during flush (#3733)



* Add multiSegmentsOpera

Signed-off-by: default avataryinghao.zou <yinghao.zou@zilliz.com>

* rish test

Signed-off-by: default avataryinghao.zou <yinghao.zou@zilliz.com>

* Add new operation MultiSegmentsOperation

Signed-off-by: default avataryinghao.zou <yinghao.zou@zilliz.com>

* Recoveray ut itls

Signed-off-by: default avataryinghao.zou <yinghao.zou@zilliz.com>

* fix judeg bug

Signed-off-by: default avataryinghao.zou <yinghao.zou@zilliz.com>
Signed-off-by: default avatarshengjun.li <shengjun.li@zilliz.com>
parent ade8bb3f
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -58,6 +58,7 @@ Please mark all changes in change log and use the issue from GitHub
-   \#3132 Refine the implementation of hnsw in faiss and add support for hnsw-flat, hnsw-pq and hnsw-sq8
-   \#3463 Restrict the content of partition_tag
-   \#3502 Normalize http method in web sever
-   \#3732 Add new operation supporting multi-segments

## Improvement
-   \#2543 Remove secondary_path related code
+1 −1
Original line number Diff line number Diff line
@@ -29,7 +29,7 @@ struct MetaQueryContext {

struct MetaApplyContext {
    std::string table_;
    MetaContextOp op_;
    MetaContextOp op_ = oAdd;
    int64_t id_ = 0;
    std::unordered_map<std::string, std::string> attrs_;
    std::unordered_map<std::string, std::string> filter_attrs_;
+136 −0
Original line number Diff line number Diff line
@@ -28,6 +28,142 @@ namespace milvus {
namespace engine {
namespace snapshot {

MultiSegmentsOperation::MultiSegmentsOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
    : BaseT(context, prev_ss) {
}

Status
MultiSegmentsOperation::DoExecute(StorePtr store) {
    if (context_.new_segments.empty()) {
        return Status(SS_INVALID_CONTEX_ERROR, "Nothing to do");
    }

    for (auto iter : context_.new_segment_file_map) {
        for (auto& new_file : iter.second) {
            auto update_ctx = ResourceContextBuilder<SegmentFile>().SetOp(meta::oUpdate).CreatePtr();
            update_ctx->AddAttr(SizeField::Name);
            AddStepWithLsn(*new_file, context_.lsn, update_ctx);
        }
    }

    std::map<ID_TYPE, SegmentCommit::VecT> new_segment_commits;
    for (auto iter : new_segments_) {
        for (auto& new_segment : iter.second) {
            OperationContext context;
            context.new_segment = new_segment;
            // TODO(yhz): Why here get adjusted ss
            context.new_segment_files = context_.new_segment_file_map[new_segment->GetID()];
            auto sc_op = SegmentCommitOperation(context, GetAdjustedSS());
            STATUS_CHECK(sc_op(store));
            SegmentCommit::Ptr sc;
            STATUS_CHECK(sc_op.GetResource(sc));

            sc->SetRowCount(new_segment_counts_[new_segment->GetID()]);

            if (new_segment_commits.find(new_segment->GetPartitionId()) == new_segment_commits.end()) {
                new_segment_commits[new_segment->GetPartitionId()] = SegmentCommit::VecT();
            }
            new_segment_commits[new_segment->GetPartitionId()].push_back(sc);
            auto sc_ctx_p = ResourceContextBuilder<SegmentCommit>().SetOp(meta::oUpdate).CreatePtr();
            sc_ctx_p->AddAttr(RowCountField::Name);
            AddStepWithLsn(*sc, context.lsn, sc_ctx_p);
        }
    }

    for (auto iter : new_segment_commits) {
        auto& partition_id = iter.first;
        auto context = context_;
        context.new_segment_commits = iter.second;
        PartitionCommitOperation pc_op(context, GetAdjustedSS());
        STATUS_CHECK(pc_op(store));
        STATUS_CHECK(pc_op.GetResource(context.new_partition_commit));
        auto pc_ctx_p = ResourceContextBuilder<PartitionCommit>().SetOp(meta::oUpdate).CreatePtr();
        AddStepWithLsn(*context.new_partition_commit, context.lsn, pc_ctx_p);
        context_.new_partition_commits.push_back(context.new_partition_commit);
    }

    CollectionCommitOperation cc_op(context_, GetAdjustedSS());
    STATUS_CHECK(cc_op(store));
    STATUS_CHECK(cc_op.GetResource(context_.new_collection_commit));
    auto cc_ctx_p = ResourceContextBuilder<CollectionCommit>().SetOp(meta::oUpdate).CreatePtr();
    AddStepWithLsn(*context_.new_collection_commit, context_.lsn, cc_ctx_p);

    return Status::OK();
}

Status
MultiSegmentsOperation::CommitNewSegment(const OperationContext& context, SegmentPtr& created) {
    //    if (context_.new_segment) {
    //        return Status(SS_DUPLICATED_ERROR, "Only one new segment could be created");
    //    }
    //    GetAdjustedSS()->GetPa
    if (context.prev_partition == nullptr) {
        return Status(SS_INVALID_CONTEX_ERROR, "Unknown corresponding partition");
    }

    auto partition = GetStartedSS()->GetResource<Partition>(context.prev_partition->GetID());
    if (partition == nullptr || !partition->IsActive()) {
        return Status(SS_STALE_ERROR, "partition of segment has been staled");
    }

    auto op = std::make_shared<SegmentOperation>(context, GetStartedSS());
    STATUS_CHECK(op->Push());
    STATUS_CHECK(op->GetResource(created));
    new_segments_[context.prev_partition->GetID()].push_back(created);
    auto s_ctx_p = ResourceContextBuilder<Segment>().SetOp(meta::oUpdate).CreatePtr();
    AddStepWithLsn(*created, context_.lsn, s_ctx_p);

    context_.new_segments.push_back(created);
    context_.new_segment_file_map[created->GetID()] = SegmentFile::VecT();

    return Status::OK();
}

Status
MultiSegmentsOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created) {
    auto segment = GetStartedSS()->GetResource<Segment>(context.segment_id);
    // TODO(yhz): May not depend on context_.new_segment
    if (!segment) {
        for (auto& seg : context_.new_segments) {
            if (seg->GetID() == context.segment_id) {
                segment = seg;
                break;
            }
        }
    }

    if (!segment || segment->GetID() != context.segment_id) {
        std::stringstream emsg;
        emsg << GetRepr() << ". Invalid segment " << context.segment_id << " in context";
        return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
    }

    auto ctx = context;
    ctx.partition_id = segment->GetPartitionId();
    auto new_sf_op = std::make_shared<SegmentFileOperation>(ctx, GetStartedSS());
    STATUS_CHECK(new_sf_op->Push());
    STATUS_CHECK(new_sf_op->GetResource(created));
    context_.new_segment_file_map[created->GetSegmentId()].push_back(created);
    auto sf_ctx_p = ResourceContextBuilder<SegmentFile>().SetOp(meta::oUpdate).CreatePtr();
    AddStepWithLsn(*created, context_.lsn, sf_ctx_p);

    return Status::OK();
}

Status
MultiSegmentsOperation::CommitRowCount(ID_TYPE segment_id, SIZE_TYPE delta) {
    // TODO(yhz): may need check if segment exists
    for (auto& seg : context_.new_segments) {
        if (seg->GetID() == segment_id) {
            new_segment_counts_[segment_id] = delta;
            return Status::OK();
        }
    }

    std::string err = "Invalid segment id " + std::to_string(segment_id) + ": segment not created";
    return Status(SS_NOT_FOUND_ERROR, err);
}

CompoundSegmentsOperation::CompoundSegmentsOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
    : BaseT(context, prev_ss) {
    for (auto& stale_segment_file : context_.stale_segment_files) {
+23 −0
Original line number Diff line number Diff line
@@ -84,6 +84,29 @@ class ChangeSegmentFileOperation : public CompoundBaseOperation<ChangeSegmentFil
    bool sub_;
};

class MultiSegmentsOperation : public CompoundBaseOperation<MultiSegmentsOperation> {
 public:
    using BaseT = CompoundBaseOperation<MultiSegmentsOperation>;
    static constexpr const char* Name = "MS";

    MultiSegmentsOperation(const OperationContext& context, ScopedSnapshotT prev_ss);

    Status DoExecute(StorePtr) override;

    Status
    CommitNewSegment(const OperationContext& context, SegmentPtr&);

    Status
    CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created);

    Status
    CommitRowCount(ID_TYPE segment_id, SIZE_TYPE delta);

 protected:
    std::map<ID_TYPE, Segment::VecT> new_segments_;
    std::map<ID_TYPE, SIZE_TYPE> new_segment_counts_;
};

class CompoundSegmentsOperation : public CompoundBaseOperation<CompoundSegmentsOperation> {
 public:
    using BaseT = CompoundBaseOperation<CompoundSegmentsOperation>;
+2 −0
Original line number Diff line number Diff line
@@ -55,6 +55,8 @@ struct OperationContext {
    ScopedSnapshotT latest_ss;
    ScopedSnapshotT prev_ss;
    SegmentPtr new_segment = nullptr;
    Segment::VecT new_segments;
    std::map<ID_TYPE, SegmentFile::VecT> new_segment_file_map;
    SegmentCommitPtr new_segment_commit = nullptr;
    std::vector<SegmentCommitPtr> new_segment_commits;
    PartitionPtr new_partition = nullptr;
Loading