Unverified Commit 9074366f authored by groot's avatar groot Committed by GitHub
Browse files

fix potential hang bug (#1884)



* test

Signed-off-by: default avatargroot <yihua.mo@zilliz.com>

* test

Signed-off-by: default avatargroot <yihua.mo@zilliz.com>

* test

Signed-off-by: default avatargroot <yihua.mo@zilliz.com>

* unittest

Signed-off-by: default avatargroot <yihua.mo@zilliz.com>

* format code

Signed-off-by: default avatargroot <yihua.mo@zilliz.com>

* fix crash

Signed-off-by: default avatargroot <yihua.mo@zilliz.com>

* merge master

Signed-off-by: default avatargroot <yihua.mo@zilliz.com>
parent f4626a78
Loading
Loading
Loading
Loading
+39 −48
Original line number Diff line number Diff line
@@ -52,7 +52,7 @@ XBuildIndexTask::XBuildIndexTask(SegmentSchemaPtr file, TaskLabelPtr label)

void
XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {
    TimeRecorder rc("");
    TimeRecorder rc("XBuildIndexTask::Load");
    Status stat = Status::OK();
    std::string error_msg;
    std::string type_str;
@@ -101,7 +101,7 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {
        std::string info = "Build index task load file id:" + std::to_string(file_->id_) + " " + type_str +
                           " file type:" + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) +
                           " bytes from location: " + file_->location_ + " totally cost";
        double span = rc.ElapseFromBegin(info);
        rc.ElapseFromBegin(info);

        to_index_id_ = file_->id_;
        to_index_type_ = file_->file_type_;
@@ -110,19 +110,21 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {

void
XBuildIndexTask::Execute() {
    TimeRecorderAuto rc("XBuildIndexTask::Execute " + std::to_string(to_index_id_));

    if (auto job = job_.lock()) {
        auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
        if (to_index_engine_ == nullptr) {
            build_index_job->BuildIndexDone(to_index_id_);
            build_index_job->GetStatus() = Status(DB_ERROR, "source index is null");
            return;
        }

    TimeRecorder rc("DoBuildIndex file id:" + std::to_string(to_index_id_));

    if (auto job = job_.lock()) {
        auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
        std::string location = file_->location_;
        EngineType engine_type = (EngineType)file_->engine_type_;
        std::shared_ptr<engine::ExecutionEngine> index;

        // step 2: create collection file
        // step 1: create collection file
        engine::meta::SegmentSchema table_file;
        table_file.collection_id_ = file_->collection_id_;
        table_file.segment_id_ = file_->file_id_;
@@ -131,6 +133,7 @@ XBuildIndexTask::Execute() {

        engine::meta::MetaPtr meta_ptr = build_index_job->meta();
        Status status = meta_ptr->CreateCollectionFile(table_file);

        fiu_do_on("XBuildIndexTask.Execute.create_table_success", status = Status::OK());
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to create collection file: " << status.ToString();
@@ -140,73 +143,63 @@ XBuildIndexTask::Execute() {
            return;
        }

        // step 3: build index
        auto failed_build_index = [&](std::string log_msg, std::string err_msg) {
            table_file.file_type_ = engine::meta::SegmentSchema::TO_DELETE;
            status = meta_ptr->UpdateCollectionFile(table_file);
            ENGINE_LOG_ERROR << log_msg;

            build_index_job->BuildIndexDone(to_index_id_);
            build_index_job->GetStatus() = Status(DB_ERROR, err_msg);
            to_index_engine_ = nullptr;
        };

        // step 2: build index
        try {
            ENGINE_LOG_DEBUG << "Begin build index for file:" + table_file.location_;
            index = to_index_engine_->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
            fiu_do_on("XBuildIndexTask.Execute.build_index_fail", index = nullptr);
            if (index == nullptr) {
                throw Exception(DB_ERROR, "index NULL");
                std::string log_msg = "Failed to build index " + table_file.file_id_ + ", reason: source index is null";
                failed_build_index(log_msg, "source index is null");
                return;
            }
        } catch (std::exception& ex) {
            std::string msg = "Build index exception: " + std::string(ex.what());
            ENGINE_LOG_ERROR << msg;

            table_file.file_type_ = engine::meta::SegmentSchema::TO_DELETE;
            status = meta_ptr->UpdateCollectionFile(table_file);
            ENGINE_LOG_DEBUG << "Build index fail, mark file: " << table_file.file_id_ << " to to_delete";

            build_index_job->BuildIndexDone(to_index_id_);
            build_index_job->GetStatus() = Status(DB_ERROR, msg);
            to_index_engine_ = nullptr;
            std::string msg = "Failed to build index " + table_file.file_id_ + ", reason: " + std::string(ex.what());
            failed_build_index(msg, ex.what());
            return;
        }

        // step 4: if collection has been deleted, dont save index file
        // step 3: if collection has been deleted, dont save index file
        bool has_collection = false;
        meta_ptr->HasCollection(file_->collection_id_, has_collection);
        fiu_do_on("XBuildIndexTask.Execute.has_collection", has_collection = true);

        if (!has_collection) {
            meta_ptr->DeleteTableFiles(file_->collection_id_);

            build_index_job->BuildIndexDone(to_index_id_);
            build_index_job->GetStatus() = Status(DB_ERROR, "Collection has been deleted, discard index file.");
            to_index_engine_ = nullptr;
            std::string msg = "Failed to build index " + table_file.file_id_ + ", reason: collection has been deleted";
            failed_build_index(msg, "Collection has been deleted");
            return;
        }

        // step 5: save index file
        // step 4: save index file
        try {
            fiu_do_on("XBuildIndexTask.Execute.throw_std_exception", throw std::exception());
            status = index->Serialize();
            if (!status.ok()) {
                ENGINE_LOG_ERROR << status.message();
                std::string msg =
                    "Failed to persist index file: " + table_file.location_ + ", reason: " + status.message();
                failed_build_index(msg, status.message());
                return;
            }
        } catch (std::exception& ex) {
            std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
            ENGINE_LOG_ERROR << msg;
            status = Status(DB_ERROR, msg);
        }

        fiu_do_on("XBuildIndexTask.Execute.save_index_file_success", status = Status::OK());
        if (!status.ok()) {
            // if failed to serialize index file to disk
            // typical error: out of disk space, out of memory or permition denied
            table_file.file_type_ = engine::meta::SegmentSchema::TO_DELETE;
            status = meta_ptr->UpdateCollectionFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            ENGINE_LOG_ERROR << "Failed to persist index file: " << table_file.location_
                             << ", possible out of disk space or memory";

            build_index_job->BuildIndexDone(to_index_id_);
            build_index_job->GetStatus() = status;
            to_index_engine_ = nullptr;
            std::string msg =
                "Failed to persist index file:" + table_file.location_ + ", exception:" + std::string(ex.what());
            failed_build_index(msg, ex.what());
            return;
        }

        // step 6: update meta
        // step 5: update meta
        table_file.file_type_ = engine::meta::SegmentSchema::INDEX;
        table_file.file_size_ = server::CommonUtil::GetFileSize(table_file.location_);
        table_file.row_count_ = file_->row_count_;  // index->Count();
@@ -243,8 +236,6 @@ XBuildIndexTask::Execute() {
        build_index_job->BuildIndexDone(to_index_id_);
    }

    rc.ElapseFromBegin("totally cost");

    to_index_engine_ = nullptr;
}

+7 −8
Original line number Diff line number Diff line
@@ -188,10 +188,7 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
    std::string info = "Search task load file id:" + std::to_string(file_->id_) + " " + type_str +
                       " file type:" + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) +
                       " bytes from location: " + file_->location_ + " totally cost";
    double span = rc.ElapseFromBegin(info);
    //    for (auto &context : search_contexts_) {
    //        context->AccumLoadCost(span);
    //    }
    rc.ElapseFromBegin(info);

    CollectFileMetrics(file_->file_type_, file_size);

@@ -205,10 +202,6 @@ void
XSearchTask::Execute() {
    milvus::server::ContextFollower tracer(context_, "XSearchTask::Execute " + std::to_string(index_id_));

    if (index_engine_ == nullptr) {
        return;
    }

    //    ENGINE_LOG_DEBUG << "Searching in file id:" << index_id_ << " with "
    //                     << search_contexts_.size() << " tasks";

@@ -222,6 +215,12 @@ XSearchTask::Execute() {

    if (auto job = job_.lock()) {
        auto search_job = std::static_pointer_cast<scheduler::SearchJob>(job);

        if (index_engine_ == nullptr) {
            search_job->SearchDone(index_id_);
            return;
        }

        // step 1: allocate memory
        uint64_t nq = search_job->nq();
        uint64_t topk = search_job->topk();
+0 −7
Original line number Diff line number Diff line
@@ -123,13 +123,6 @@ TEST(TaskTest, TEST_TASK) {
    build_index_task.Execute();
    fiu_disable("XBuildIndexTask.Execute.throw_std_exception");

    // always enable 'save_index_file_success'
    fiu_enable("XBuildIndexTask.Execute.save_index_file_success", 1, NULL, 0);
    build_index_task.to_index_engine_ =
        EngineFactory::Build(file->dimension_, file->location_, (EngineType)file->engine_type_,
                             (MetricType)file->metric_type_, json);
    build_index_task.Execute();

    fiu_enable("XBuildIndexTask.Execute.update_table_file_fail", 1, NULL, 0);
    build_index_task.to_index_engine_ =
        EngineFactory::Build(file->dimension_, file->location_, (EngineType)file->engine_type_,