Unverified Commit a9629951 authored by 余昆's avatar 余昆 Committed by GitHub
Browse files

Improve ut coverage (#2516) (#2522)



* Improve ut coverage

Signed-off-by: default avatarfishpenguin <kun.yu@zilliz.com>

* Delete unused code

Signed-off-by: default avatarfishpenguin <kun.yu@zilliz.com>

* Add fiu in HybridSearchRequest

Signed-off-by: default avatarfishpenguin <kun.yu@zilliz.com>

* Update helm config

Signed-off-by: default avatarJinHai-CN <hai.jin@zilliz.com>

* Change BinaryQuery validation check

Signed-off-by: default avatarfishpenguin <kun.yu@zilliz.com>

* code format

Signed-off-by: default avatarfishpenguin <kun.yu@zilliz.com>

* code format

Signed-off-by: default avatarfishpenguin <kun.yu@zilliz.com>

* code format

Signed-off-by: default avatarfishpenguin <kun.yu@zilliz.com>

Co-authored-by: default avatarJinHai-CN <hai.jin@zilliz.com>
parent 551be85d
Loading
Loading
Loading
Loading
+13 −4
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@
#include "codecs/default/DefaultAttrsFormat.h"

#include <fcntl.h>
#include <fiu-local.h>
#include <unistd.h>
#include <algorithm>
#include <memory>
@@ -34,7 +35,9 @@ namespace codec {
void
DefaultAttrsFormat::read_attrs_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, off_t offset,
                                        size_t num, std::vector<uint8_t>& raw_attrs, size_t& nbytes) {
    if (!fs_ptr->reader_ptr_->open(file_path.c_str())) {
    auto open_res = fs_ptr->reader_ptr_->open(file_path.c_str());
    fiu_do_on("read_attrs_internal_open_file_fail", open_res = false);
    if (!open_res) {
        std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
        LOG_ENGINE_ERROR_ << err_msg;
        throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
@@ -56,7 +59,9 @@ DefaultAttrsFormat::read_attrs_internal(const storage::FSHandlerPtr& fs_ptr, con
void
DefaultAttrsFormat::read_uids_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path,
                                       std::vector<int64_t>& uids) {
    if (!fs_ptr->reader_ptr_->open(file_path.c_str())) {
    auto open_res = fs_ptr->reader_ptr_->open(file_path.c_str());
    fiu_do_on("read_uids_internal_open_file_fail", open_res = false);
    if (!open_res) {
        std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno);
        LOG_ENGINE_ERROR_ << err_msg;
        throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg);
@@ -76,7 +81,9 @@ DefaultAttrsFormat::read(const milvus::storage::FSHandlerPtr& fs_ptr, milvus::se
    const std::lock_guard<std::mutex> lock(mutex_);

    std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
    if (!boost::filesystem::is_directory(dir_path)) {
    auto is_directory = boost::filesystem::is_directory(dir_path);
    fiu_do_on("read_id_directory_false", is_directory = false);
    if (!is_directory) {
        std::string err_msg = "Directory: " + dir_path + "does not exist";
        LOG_ENGINE_ERROR_ << err_msg;
        throw Exception(SERVER_INVALID_ARGUMENT, err_msg);
@@ -190,7 +197,9 @@ DefaultAttrsFormat::read_uids(const milvus::storage::FSHandlerPtr& fs_ptr, std::
    const std::lock_guard<std::mutex> lock(mutex_);

    std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
    if (!boost::filesystem::is_directory(dir_path)) {
    auto is_directory = boost::filesystem::is_directory(dir_path);
    fiu_do_on("is_directory_false", is_directory = false);
    if (!is_directory) {
        std::string err_msg = "Directory: " + dir_path + "does not exist";
        LOG_ENGINE_ERROR_ << err_msg;
        throw Exception(SERVER_INVALID_ARGUMENT, err_msg);
+2 −0
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@

#include "codecs/default/DefaultIdBloomFilterFormat.h"

#include <fiu-local.h>
#include <memory>
#include <string>

@@ -37,6 +38,7 @@ DefaultIdBloomFilterFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::I
    const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_;
    scaling_bloom_t* bloom_filter =
        new_scaling_bloom_from_file(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str());
    fiu_do_on("bloom_filter_nullptr", bloom_filter = nullptr);
    if (bloom_filter == nullptr) {
        std::string err_msg =
            "Failed to read bloom filter from file: " + bloom_filter_file_path + ". " + std::strerror(errno);
+101 −95
Original line number Diff line number Diff line
@@ -168,6 +168,7 @@ DBImpl::Start() {
    }

    // background metric thread
    fiu_do_on("options_metric_enable", options_.metric_enable_ = true);
    if (options_.metric_enable_) {
        bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this);
    }
@@ -1042,6 +1043,7 @@ DBImpl::Flush() {
    LOG_ENGINE_DEBUG_ << "Begin flush all collections";

    Status status;
    fiu_do_on("options_wal_enable_false", options_.wal_enable_ = false);
    if (options_.wal_enable_) {
        LOG_ENGINE_DEBUG_ << "WAL flush";
        auto lsn = wal_mgr_->Flush();
@@ -1401,7 +1403,10 @@ DBImpl::GetVectorsByIdHelper(const IDNumbers& id_array, std::vector<engine::Vect
        engine::utils::GetParentPath(file.location_, segment_dir);
        segment::SegmentReader segment_reader(segment_dir);
        segment::IdBloomFilterPtr id_bloom_filter_ptr;
        segment_reader.LoadBloomFilter(id_bloom_filter_ptr);
        auto status = segment_reader.LoadBloomFilter(id_bloom_filter_ptr);
        if (!status.ok()) {
            return status;
        }

        for (IDNumbers::iterator it = temp_ids.begin(); it != temp_ids.end();) {
            int64_t vector_id = *it;
@@ -2090,100 +2095,101 @@ DBImpl::StartMergeTask(const std::set<std::string>& merge_collection_ids, bool f
    // LOG_ENGINE_DEBUG_ << "End StartMergeTask";
}

Status
DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& files_holder) {
    // const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);

    LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id;

    // step 1: create table file
    meta::SegmentSchema table_file;
    table_file.collection_id_ = collection_id;
    table_file.file_type_ = meta::SegmentSchema::NEW_MERGE;
    Status status = meta_ptr_->CreateHybridCollectionFile(table_file);

    if (!status.ok()) {
        LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString();
        return status;
    }

    // step 2: merge files
    /*
    ExecutionEnginePtr index =
        EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
                             (MetricType)table_file.metric_type_, table_file.nlist_);
*/
    meta::SegmentsSchema updated;

    std::string new_segment_dir;
    utils::GetParentPath(table_file.location_, new_segment_dir);
    auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);

    // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
    milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
    for (auto& file : files) {
        server::CollectMergeFilesMetrics metrics;
        std::string segment_dir_to_merge;
        utils::GetParentPath(file.location_, segment_dir_to_merge);
        segment_writer_ptr->Merge(segment_dir_to_merge, table_file.file_id_);

        files_holder.UnmarkFile(file);

        auto file_schema = file;
        file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
        updated.push_back(file_schema);
        int64_t size = segment_writer_ptr->Size();
        if (size >= file_schema.index_file_size_) {
            break;
        }
    }

    // step 3: serialize to disk
    try {
        status = segment_writer_ptr->Serialize();
        fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception());
        fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, ""));
    } catch (std::exception& ex) {
        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
        LOG_ENGINE_ERROR_ << msg;
        status = Status(DB_ERROR, msg);
    }

    if (!status.ok()) {
        LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << status.message();

        // if failed to serialize merge file to disk
        // typical error: out of disk space, out of memory or permission denied
        table_file.file_type_ = meta::SegmentSchema::TO_DELETE;
        status = meta_ptr_->UpdateCollectionFile(table_file);
        LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

        return status;
    }

    // step 4: update table files state
    // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
    // else set file type to RAW, no need to build index
    if (!utils::IsRawIndexType(table_file.engine_type_)) {
        table_file.file_type_ = (segment_writer_ptr->Size() >= (size_t)(table_file.index_file_size_))
                                    ? meta::SegmentSchema::TO_INDEX
                                    : meta::SegmentSchema::RAW;
    } else {
        table_file.file_type_ = meta::SegmentSchema::RAW;
    }
    table_file.file_size_ = segment_writer_ptr->Size();
    table_file.row_count_ = segment_writer_ptr->VectorCount();
    updated.push_back(table_file);
    status = meta_ptr_->UpdateCollectionFiles(updated);
    LOG_ENGINE_DEBUG_ << "New merged segment " << table_file.segment_id_ << " of size " << segment_writer_ptr->Size()
                      << " bytes";

    if (options_.insert_cache_immediately_) {
        segment_writer_ptr->Cache();
    }

    return status;
}
// Status
// DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& files_holder) {
//    // const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
//
//    LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id;
//
//    // step 1: create table file
//    meta::SegmentSchema table_file;
//    table_file.collection_id_ = collection_id;
//    table_file.file_type_ = meta::SegmentSchema::NEW_MERGE;
//    Status status = meta_ptr_->CreateHybridCollectionFile(table_file);
//
//    if (!status.ok()) {
//        LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString();
//        return status;
//    }
//
//    // step 2: merge files
//    /*
//    ExecutionEnginePtr index =
//        EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
//                             (MetricType)table_file.metric_type_, table_file.nlist_);
//*/
//    meta::SegmentsSchema updated;
//
//    std::string new_segment_dir;
//    utils::GetParentPath(table_file.location_, new_segment_dir);
//    auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);
//
//    // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
//    milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
//    for (auto& file : files) {
//        server::CollectMergeFilesMetrics metrics;
//        std::string segment_dir_to_merge;
//        utils::GetParentPath(file.location_, segment_dir_to_merge);
//        segment_writer_ptr->Merge(segment_dir_to_merge, table_file.file_id_);
//
//        files_holder.UnmarkFile(file);
//
//        auto file_schema = file;
//        file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
//        updated.push_back(file_schema);
//        int64_t size = segment_writer_ptr->Size();
//        if (size >= file_schema.index_file_size_) {
//            break;
//        }
//    }
//
//    // step 3: serialize to disk
//    try {
//        status = segment_writer_ptr->Serialize();
//        fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception());
//        fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, ""));
//    } catch (std::exception& ex) {
//        std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
//        LOG_ENGINE_ERROR_ << msg;
//        status = Status(DB_ERROR, msg);
//    }
//
//    if (!status.ok()) {
//        LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " <<
//        status.message();
//
//        // if failed to serialize merge file to disk
//        // typical error: out of disk space, out of memory or permission denied
//        table_file.file_type_ = meta::SegmentSchema::TO_DELETE;
//        status = meta_ptr_->UpdateCollectionFile(table_file);
//        LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
//
//        return status;
//    }
//
//    // step 4: update table files state
//    // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
//    // else set file type to RAW, no need to build index
//    if (!utils::IsRawIndexType(table_file.engine_type_)) {
//        table_file.file_type_ = (segment_writer_ptr->Size() >= (size_t)(table_file.index_file_size_))
//                                    ? meta::SegmentSchema::TO_INDEX
//                                    : meta::SegmentSchema::RAW;
//    } else {
//        table_file.file_type_ = meta::SegmentSchema::RAW;
//    }
//    table_file.file_size_ = segment_writer_ptr->Size();
//    table_file.row_count_ = segment_writer_ptr->VectorCount();
//    updated.push_back(table_file);
//    status = meta_ptr_->UpdateCollectionFiles(updated);
//    LOG_ENGINE_DEBUG_ << "New merged segment " << table_file.segment_id_ << " of size " << segment_writer_ptr->Size()
//                      << " bytes";
//
//    if (options_.insert_cache_immediately_) {
//        segment_writer_ptr->Cache();
//    }
//
//    return status;
//}

void
DBImpl::BackgroundMerge(std::set<std::string> collection_ids, bool force_merge_all) {
+2 −2
Original line number Diff line number Diff line
@@ -238,8 +238,8 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
    void
    BackgroundMerge(std::set<std::string> collection_ids, bool force_merge_all);

    Status
    MergeHybridFiles(const std::string& table_id, meta::FilesHolder& files_holder);
    //    Status
    //    MergeHybridFiles(const std::string& table_id, meta::FilesHolder& files_holder);

    void
    StartBuildIndexTask();
+2 −1
Original line number Diff line number Diff line
@@ -11,6 +11,7 @@

#include "db/insert/MemManagerImpl.h"

#include <fiu-local.h>
#include <thread>

#include "VectorSource.h"
@@ -36,9 +37,9 @@ MemManagerImpl::InsertVectors(const std::string& collection_id, int64_t length,
                              const float* vectors, uint64_t lsn, std::set<std::string>& flushed_tables) {
    flushed_tables.clear();
    if (GetCurrentMem() > options_.insert_buffer_size_) {
        LOG_ENGINE_DEBUG_ << "Insert buffer size exceeds limit. Performing force flush";
        // TODO(zhiru): Don't apply delete here in order to avoid possible concurrency issues with Merge
        auto status = Flush(flushed_tables, false);
        fiu_do_on("MemManagerImpl::InsertVectors_flush_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
        if (!status.ok()) {
            return status;
        }
Loading