Loading CHANGELOG.md +7 −0 Original line number Diff line number Diff line Loading @@ -36,12 +36,17 @@ Please mark all change in change log and use the ticket from JIRA. - \#543 - client raise exception in shards when search results is empty - \#545 - Avoid dead circle of build index thread when error occurs - \#547 - NSG build failed using GPU-edition if set gpu_enable false - \#548 - NSG search accuracy is too low - \#552 - Server down during building index_type: IVF_PQ using GPU-edition - \#561 - Milvus server should report exception/error message or terminate on mysql metadata backend error - \#579 - Build index hang in GPU version when gpu_resources disabled - \#596 - Frequently insert operation cost too much disk space - \#599 - Build index log is incorrect - \#602 - Optimizer specify wrong gpu_id - \#606 - No log generated during building index with CPU - \#631 - FAISS isn't compiled with O3 option - \#649 - Typo "partiton" should be "partition" - \#654 - Random crash when frequently insert vector one by one ## Feature - \#12 - Pure CPU version for Milvus Loading @@ -55,6 +60,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#502 - C++ SDK support IVFPQ and SPTAG - \#560 - Add version in server config file - \#605 - Print more messages when server start - \#644 - Add a new rpc command to get milvus build version whether cpu or gpu ## Improvement - \#255 - Add ivfsq8 test report detailed version Loading @@ -76,6 +82,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#470 - Small raw files should not be build index - \#584 - Intergrate internal FAISS - \#611 - Remove MILVUS_CPU_VERSION - \#634 - FAISS GPU version is compiled with O0 ## Task Loading core/src/cache/Cache.inl +5 −0 Original line number Diff line number Diff line Loading @@ -176,6 +176,11 @@ Cache<ItemObj>::print() { { std::lock_guard<std::mutex> lock(mutex_); cache_count = lru_.size(); #if 0 for (auto it = lru_.begin(); it != lru_.end(); ++it) { SERVER_LOG_DEBUG << it->first; } #endif } SERVER_LOG_DEBUG << "[Cache item count]: " << cache_count; Loading core/src/db/DB.h +1 −1 Original line number Diff line number Diff line Loading @@ -80,7 +80,7 @@ class DB { DropPartitionByTag(const std::string& table_id, const std::string& partition_tag) = 0; virtual Status ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partiton_schema_array) = 0; ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partition_schema_array) = 0; virtual Status InsertVectors(const std::string& table_id, const std::string& partition_tag, uint64_t n, const float* vectors, Loading core/src/db/DBImpl.cpp +68 −139 Original line number Diff line number Diff line Loading @@ -52,9 +52,7 @@ constexpr uint64_t METRIC_ACTION_INTERVAL = 1; constexpr uint64_t COMPACT_ACTION_INTERVAL = 1; constexpr uint64_t INDEX_ACTION_INTERVAL = 1; constexpr uint64_t INDEX_FAILED_RETRY_TIME = 1; static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milsvus server is shutdown!"); static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!"); void TraverseFiles(const meta::DatePartionedTableFilesSchema& date_files, meta::TableFilesSchema& files_array) { Loading Loading @@ -192,9 +190,9 @@ DBImpl::PreloadTable(const std::string& table_id) { } // step 2: get files from partition tables std::vector<meta::TableSchema> partiton_array; status = meta_ptr_->ShowPartitions(table_id, partiton_array); for (auto& schema : partiton_array) { std::vector<meta::TableSchema> partition_array; status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& schema : partition_array) { status = GetFilesToSearch(schema.table_id_, ids, dates, files_array); } Loading Loading @@ -298,12 +296,12 @@ DBImpl::DropPartitionByTag(const std::string& table_id, const std::string& parti } Status DBImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partiton_schema_array) { DBImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partition_schema_array) { if (shutting_down_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } return meta_ptr_->ShowPartitions(table_id, partiton_schema_array); return meta_ptr_->ShowPartitions(table_id, partition_schema_array); } Status Loading Loading @@ -370,7 +368,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) { WaitMergeFileFinish(); // step 4: wait and build index status = CleanFailedIndexFileOfTable(table_id); status = index_failed_checker_.CleanFailedIndexFileOfTable(table_id); status = BuildTableIndexRecursively(table_id, index); return status; Loading Loading @@ -429,9 +427,9 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& parti return status; } std::vector<meta::TableSchema> partiton_array; status = meta_ptr_->ShowPartitions(table_id, partiton_array); for (auto& schema : partiton_array) { std::vector<meta::TableSchema> partition_array; status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& schema : partition_array) { status = GetFilesToSearch(schema.table_id_, ids, dates, files_array); } } else { Loading Loading @@ -504,7 +502,9 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi TimeRecorder rc(""); // step 1: get files to search // step 1: construct search job auto status = ongoing_files_checker_.MarkOngoingFiles(files); ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size(); scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(k, nq, nprobe, vectors); for (auto& file : files) { Loading @@ -512,9 +512,11 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi job->AddIndexFile(file_ptr); } // step 2: put search task to scheduler // step 2: put search job to scheduler and wait result scheduler::JobMgrInst::GetInstance()->Put(job); job->WaitResult(); status = ongoing_files_checker_.UnmarkOngoingFiles(files); if (!job->GetStatus().ok()) { return job->GetStatus(); } Loading Loading @@ -693,7 +695,6 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m auto file_schema = file; file_schema.file_type_ = meta::TableFileSchema::TO_DELETE; updated.push_back(file_schema); ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_; index_size = index->Size(); if (index_size >= file_schema.index_file_size_) { Loading @@ -703,20 +704,27 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m // step 3: serialize to disk try { index->Serialize(); status = index->Serialize(); if (!status.ok()) { ENGINE_LOG_ERROR << status.message(); } } catch (std::exception& ex) { // typical error: out of disk space or permition denied std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; status = Status(DB_ERROR, msg); } if (!status.ok()) { // if failed to serialize merge file to disk // typical error: out of disk space, out of memory or permition denied table_file.file_type_ = meta::TableFileSchema::TO_DELETE; status = meta_ptr_->UpdateTableFile(table_file); ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; std::cout << "ERROR: failed to persist merged index file: " << table_file.location_ << ", possible out of disk space" << std::endl; ENGINE_LOG_ERROR << "Failed to persist merged file: " << table_file.location_ << ", possible out of disk space or memory"; return Status(DB_ERROR, msg); return status; } // step 4: update table files state Loading Loading @@ -751,13 +759,15 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) { } for (auto& kv : raw_files) { auto files = kv.second; meta::TableFilesSchema& files = kv.second; if (files.size() < options_.merge_trigger_number_) { ENGINE_LOG_TRACE << "Files number not greater equal than merge trigger number, skip merge action"; continue; } status = ongoing_files_checker_.MarkOngoingFiles(files); MergeFiles(table_id, kv.first, kv.second); status = ongoing_files_checker_.UnmarkOngoingFiles(files); if (shutting_down_.load(std::memory_order_acquire)) { ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id; Loading Loading @@ -788,16 +798,12 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) { meta_ptr_->Archive(); { uint64_t ttl = 10 * meta::SECOND; // default: file data will be erase from cache after few seconds meta_ptr_->CleanUpCacheWithTTL(ttl); } { uint64_t ttl = 5 * meta::MINUTE; // default: file will be deleted after few minutes uint64_t ttl = 10 * meta::SECOND; // default: file will be hard-deleted few seconds after soft-deleted if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) { ttl = meta::DAY; ttl = meta::HOUR; } meta_ptr_->CleanUpFilesWithTTL(ttl); meta_ptr_->CleanUpFilesWithTTL(ttl, &ongoing_files_checker_); } // ENGINE_LOG_TRACE << " Background compaction thread exit"; Loading Loading @@ -833,14 +839,15 @@ DBImpl::StartBuildIndexTask(bool force) { void DBImpl::BackgroundBuildIndex() { // ENGINE_LOG_TRACE << "Background build index thread start"; std::unique_lock<std::mutex> lock(build_index_mutex_); meta::TableFilesSchema to_index_files; meta_ptr_->FilesToIndex(to_index_files); Status status = IgnoreFailedIndexFiles(to_index_files); Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files); if (!to_index_files.empty()) { ENGINE_LOG_DEBUG << "Background build index thread begin"; status = ongoing_files_checker_.MarkOngoingFiles(to_index_files); // step 2: put build index task to scheduler std::vector<std::pair<scheduler::BuildIndexJobPtr, scheduler::TableFileSchemaPtr>> job2file_map; for (auto& file : to_index_files) { Loading @@ -851,6 +858,7 @@ DBImpl::BackgroundBuildIndex() { job2file_map.push_back(std::make_pair(job, file_ptr)); } // step 3: wait build index finished and mark failed files for (auto iter = job2file_map.begin(); iter != job2file_map.end(); ++iter) { scheduler::BuildIndexJobPtr job = iter->first; meta::TableFileSchema& file_schema = *(iter->second.get()); Loading @@ -859,17 +867,17 @@ DBImpl::BackgroundBuildIndex() { Status status = job->GetStatus(); ENGINE_LOG_ERROR << "Building index job " << job->id() << " failed: " << status.ToString(); MarkFailedIndexFile(file_schema); index_failed_checker_.MarkFailedIndexFile(file_schema); } else { MarkSucceedIndexFile(file_schema); ENGINE_LOG_DEBUG << "Building index job " << job->id() << " succeed."; index_failed_checker_.MarkSucceedIndexFile(file_schema); } status = ongoing_files_checker_.UnmarkOngoingFile(file_schema); } ENGINE_LOG_DEBUG << "Background build index thread finished"; } // ENGINE_LOG_TRACE << "Background build index thread exit"; } Status Loading @@ -894,6 +902,8 @@ DBImpl::GetFilesToBuildIndex(const std::string& table_id, const std::vector<int> Status DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& file_ids, const meta::DatesT& dates, meta::TableFilesSchema& files) { ENGINE_LOG_DEBUG << "Collect files from table: " << table_id; meta::DatePartionedTableFilesSchema date_files; auto status = meta_ptr_->FilesToSearch(table_id, file_ids, dates, date_files); if (!status.ok()) { Loading @@ -907,15 +917,15 @@ DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& Status DBImpl::GetPartitionsByTags(const std::string& table_id, const std::vector<std::string>& partition_tags, std::set<std::string>& partition_name_array) { std::vector<meta::TableSchema> partiton_array; auto status = meta_ptr_->ShowPartitions(table_id, partiton_array); std::vector<meta::TableSchema> partition_array; auto status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& tag : partition_tags) { // trim side-blank of tag, only compare valid characters // for example: " ab cd " is treated as "ab cd" std::string valid_tag = tag; server::StringHelpFunctions::TrimStringBlank(valid_tag); for (auto& schema : partiton_array) { for (auto& schema : partition_array) { if (server::StringHelpFunctions::IsRegexMatch(schema.partition_tag_, valid_tag)) { partition_name_array.insert(schema.table_id_); } Loading @@ -934,7 +944,7 @@ DBImpl::DropTableRecursively(const std::string& table_id, const meta::DatesT& da if (dates.empty()) { status = mem_mgr_->EraseMemVector(table_id); // not allow insert status = meta_ptr_->DropTable(table_id); // soft delete table CleanFailedIndexFileOfTable(table_id); index_failed_checker_.CleanFailedIndexFileOfTable(table_id); // scheduler will determine when to delete table files auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource(); Loading @@ -945,9 +955,9 @@ DBImpl::DropTableRecursively(const std::string& table_id, const meta::DatesT& da status = meta_ptr_->DropDataByDate(table_id, dates); } std::vector<meta::TableSchema> partiton_array; status = meta_ptr_->ShowPartitions(table_id, partiton_array); for (auto& schema : partiton_array) { std::vector<meta::TableSchema> partition_array; status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& schema : partition_array) { status = DropTableRecursively(schema.table_id_, dates); if (!status.ok()) { return status; Loading @@ -967,9 +977,9 @@ DBImpl::UpdateTableIndexRecursively(const std::string& table_id, const TableInde return status; } std::vector<meta::TableSchema> partiton_array; status = meta_ptr_->ShowPartitions(table_id, partiton_array); for (auto& schema : partiton_array) { std::vector<meta::TableSchema> partition_array; status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& schema : partition_array) { status = UpdateTableIndexRecursively(schema.table_id_, index); if (!status.ok()) { return status; Loading Loading @@ -1014,13 +1024,13 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex GetFilesToBuildIndex(table_id, file_types, table_files); times++; IgnoreFailedIndexFiles(table_files); index_failed_checker_.IgnoreFailedIndexFiles(table_files); } // build index for partition std::vector<meta::TableSchema> partiton_array; status = meta_ptr_->ShowPartitions(table_id, partiton_array); for (auto& schema : partiton_array) { std::vector<meta::TableSchema> partition_array; status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& schema : partition_array) { status = BuildTableIndexRecursively(schema.table_id_, index); if (!status.ok()) { return status; Loading @@ -1029,7 +1039,7 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex // failed to build index for some files, return error std::vector<std::string> failed_files; GetFailedIndexFileOfTable(table_id, failed_files); index_failed_checker_.GetFailedIndexFileOfTable(table_id, failed_files); if (!failed_files.empty()) { std::string msg = "Failed to build index for " + std::to_string(failed_files.size()) + ((failed_files.size() == 1) ? " file" : " files"); Loading @@ -1043,16 +1053,16 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex Status DBImpl::DropTableIndexRecursively(const std::string& table_id) { ENGINE_LOG_DEBUG << "Drop index for table: " << table_id; CleanFailedIndexFileOfTable(table_id); index_failed_checker_.CleanFailedIndexFileOfTable(table_id); auto status = meta_ptr_->DropTableIndex(table_id); if (!status.ok()) { return status; } // drop partition index std::vector<meta::TableSchema> partiton_array; status = meta_ptr_->ShowPartitions(table_id, partiton_array); for (auto& schema : partiton_array) { std::vector<meta::TableSchema> partition_array; status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& schema : partition_array) { status = DropTableIndexRecursively(schema.table_id_); if (!status.ok()) { return status; Loading @@ -1071,9 +1081,9 @@ DBImpl::GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_c } // get partition row count std::vector<meta::TableSchema> partiton_array; status = meta_ptr_->ShowPartitions(table_id, partiton_array); for (auto& schema : partiton_array) { std::vector<meta::TableSchema> partition_array; status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& schema : partition_array) { uint64_t partition_row_count = 0; status = GetTableRowCountRecursively(schema.table_id_, partition_row_count); if (!status.ok()) { Loading @@ -1086,86 +1096,5 @@ DBImpl::GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_c return Status::OK(); } Status DBImpl::CleanFailedIndexFileOfTable(const std::string& table_id) { std::lock_guard<std::mutex> lck(index_failed_mutex_); index_failed_files_.erase(table_id); // rebuild failed index files for this table return Status::OK(); } Status DBImpl::GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files) { failed_files.clear(); std::lock_guard<std::mutex> lck(index_failed_mutex_); auto iter = index_failed_files_.find(table_id); if (iter != index_failed_files_.end()) { FileID2FailedTimes& failed_map = iter->second; for (auto it_file = failed_map.begin(); it_file != failed_map.end(); ++it_file) { failed_files.push_back(it_file->first); } } return Status::OK(); } Status DBImpl::MarkFailedIndexFile(const meta::TableFileSchema& file) { std::lock_guard<std::mutex> lck(index_failed_mutex_); auto iter = index_failed_files_.find(file.table_id_); if (iter == index_failed_files_.end()) { FileID2FailedTimes failed_files; failed_files.insert(std::make_pair(file.file_id_, 1)); index_failed_files_.insert(std::make_pair(file.table_id_, failed_files)); } else { auto it_failed_files = iter->second.find(file.file_id_); if (it_failed_files != iter->second.end()) { it_failed_files->second++; } else { iter->second.insert(std::make_pair(file.file_id_, 1)); } } return Status::OK(); } Status DBImpl::MarkSucceedIndexFile(const meta::TableFileSchema& file) { std::lock_guard<std::mutex> lck(index_failed_mutex_); auto iter = index_failed_files_.find(file.table_id_); if (iter != index_failed_files_.end()) { iter->second.erase(file.file_id_); } return Status::OK(); } Status DBImpl::IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files) { std::lock_guard<std::mutex> lck(index_failed_mutex_); // there could be some failed files belong to different table. // some files may has failed for several times, no need to build index for these files. // thus we can avoid dead circle for build index operation for (auto it_file = table_files.begin(); it_file != table_files.end();) { auto it_failed_files = index_failed_files_.find((*it_file).table_id_); if (it_failed_files != index_failed_files_.end()) { auto it_failed_file = it_failed_files->second.find((*it_file).file_id_); if (it_failed_file != it_failed_files->second.end()) { if (it_failed_file->second >= INDEX_FAILED_RETRY_TIME) { it_file = table_files.erase(it_file); continue; } } } ++it_file; } return Status::OK(); } } // namespace engine } // namespace milvus core/src/db/DBImpl.h +9 −23 Original line number Diff line number Diff line Loading @@ -18,8 +18,10 @@ #pragma once #include "DB.h" #include "Types.h" #include "src/db/insert/MemManager.h" #include "db/IndexFailedChecker.h" #include "db/OngoingFileChecker.h" #include "db/Types.h" #include "db/insert/MemManager.h" #include "utils/ThreadPool.h" #include <atomic> Loading Loading @@ -87,7 +89,7 @@ class DBImpl : public DB { DropPartitionByTag(const std::string& table_id, const std::string& partition_tag) override; Status ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partiton_schema_array) override; ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partition_schema_array) override; Status InsertVectors(const std::string& table_id, const std::string& partition_tag, uint64_t n, const float* vectors, Loading Loading @@ -178,21 +180,6 @@ class DBImpl : public DB { Status GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_count); Status CleanFailedIndexFileOfTable(const std::string& table_id); Status GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files); Status MarkFailedIndexFile(const meta::TableFileSchema& file); Status MarkSucceedIndexFile(const meta::TableFileSchema& file); Status IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files); private: const DBOptions options_; Loading @@ -214,10 +201,9 @@ class DBImpl : public DB { std::list<std::future<void>> index_thread_results_; std::mutex build_index_mutex_; std::mutex index_failed_mutex_; using FileID2FailedTimes = std::map<std::string, uint64_t>; using Table2FailedFiles = std::map<std::string, FileID2FailedTimes>; Table2FailedFiles index_failed_files_; // file id mapping to failed times IndexFailedChecker index_failed_checker_; OngoingFileChecker ongoing_files_checker_; }; // DBImpl } // namespace engine Loading Loading
CHANGELOG.md +7 −0 Original line number Diff line number Diff line Loading @@ -36,12 +36,17 @@ Please mark all change in change log and use the ticket from JIRA. - \#543 - client raise exception in shards when search results is empty - \#545 - Avoid dead circle of build index thread when error occurs - \#547 - NSG build failed using GPU-edition if set gpu_enable false - \#548 - NSG search accuracy is too low - \#552 - Server down during building index_type: IVF_PQ using GPU-edition - \#561 - Milvus server should report exception/error message or terminate on mysql metadata backend error - \#579 - Build index hang in GPU version when gpu_resources disabled - \#596 - Frequently insert operation cost too much disk space - \#599 - Build index log is incorrect - \#602 - Optimizer specify wrong gpu_id - \#606 - No log generated during building index with CPU - \#631 - FAISS isn't compiled with O3 option - \#649 - Typo "partiton" should be "partition" - \#654 - Random crash when frequently insert vector one by one ## Feature - \#12 - Pure CPU version for Milvus Loading @@ -55,6 +60,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#502 - C++ SDK support IVFPQ and SPTAG - \#560 - Add version in server config file - \#605 - Print more messages when server start - \#644 - Add a new rpc command to get milvus build version whether cpu or gpu ## Improvement - \#255 - Add ivfsq8 test report detailed version Loading @@ -76,6 +82,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#470 - Small raw files should not be build index - \#584 - Intergrate internal FAISS - \#611 - Remove MILVUS_CPU_VERSION - \#634 - FAISS GPU version is compiled with O0 ## Task Loading
core/src/cache/Cache.inl +5 −0 Original line number Diff line number Diff line Loading @@ -176,6 +176,11 @@ Cache<ItemObj>::print() { { std::lock_guard<std::mutex> lock(mutex_); cache_count = lru_.size(); #if 0 for (auto it = lru_.begin(); it != lru_.end(); ++it) { SERVER_LOG_DEBUG << it->first; } #endif } SERVER_LOG_DEBUG << "[Cache item count]: " << cache_count; Loading
core/src/db/DB.h +1 −1 Original line number Diff line number Diff line Loading @@ -80,7 +80,7 @@ class DB { DropPartitionByTag(const std::string& table_id, const std::string& partition_tag) = 0; virtual Status ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partiton_schema_array) = 0; ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partition_schema_array) = 0; virtual Status InsertVectors(const std::string& table_id, const std::string& partition_tag, uint64_t n, const float* vectors, Loading
core/src/db/DBImpl.cpp +68 −139 Original line number Diff line number Diff line Loading @@ -52,9 +52,7 @@ constexpr uint64_t METRIC_ACTION_INTERVAL = 1; constexpr uint64_t COMPACT_ACTION_INTERVAL = 1; constexpr uint64_t INDEX_ACTION_INTERVAL = 1; constexpr uint64_t INDEX_FAILED_RETRY_TIME = 1; static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milsvus server is shutdown!"); static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!"); void TraverseFiles(const meta::DatePartionedTableFilesSchema& date_files, meta::TableFilesSchema& files_array) { Loading Loading @@ -192,9 +190,9 @@ DBImpl::PreloadTable(const std::string& table_id) { } // step 2: get files from partition tables std::vector<meta::TableSchema> partiton_array; status = meta_ptr_->ShowPartitions(table_id, partiton_array); for (auto& schema : partiton_array) { std::vector<meta::TableSchema> partition_array; status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& schema : partition_array) { status = GetFilesToSearch(schema.table_id_, ids, dates, files_array); } Loading Loading @@ -298,12 +296,12 @@ DBImpl::DropPartitionByTag(const std::string& table_id, const std::string& parti } Status DBImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partiton_schema_array) { DBImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partition_schema_array) { if (shutting_down_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } return meta_ptr_->ShowPartitions(table_id, partiton_schema_array); return meta_ptr_->ShowPartitions(table_id, partition_schema_array); } Status Loading Loading @@ -370,7 +368,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) { WaitMergeFileFinish(); // step 4: wait and build index status = CleanFailedIndexFileOfTable(table_id); status = index_failed_checker_.CleanFailedIndexFileOfTable(table_id); status = BuildTableIndexRecursively(table_id, index); return status; Loading Loading @@ -429,9 +427,9 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& parti return status; } std::vector<meta::TableSchema> partiton_array; status = meta_ptr_->ShowPartitions(table_id, partiton_array); for (auto& schema : partiton_array) { std::vector<meta::TableSchema> partition_array; status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& schema : partition_array) { status = GetFilesToSearch(schema.table_id_, ids, dates, files_array); } } else { Loading Loading @@ -504,7 +502,9 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi TimeRecorder rc(""); // step 1: get files to search // step 1: construct search job auto status = ongoing_files_checker_.MarkOngoingFiles(files); ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size(); scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(k, nq, nprobe, vectors); for (auto& file : files) { Loading @@ -512,9 +512,11 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi job->AddIndexFile(file_ptr); } // step 2: put search task to scheduler // step 2: put search job to scheduler and wait result scheduler::JobMgrInst::GetInstance()->Put(job); job->WaitResult(); status = ongoing_files_checker_.UnmarkOngoingFiles(files); if (!job->GetStatus().ok()) { return job->GetStatus(); } Loading Loading @@ -693,7 +695,6 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m auto file_schema = file; file_schema.file_type_ = meta::TableFileSchema::TO_DELETE; updated.push_back(file_schema); ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_; index_size = index->Size(); if (index_size >= file_schema.index_file_size_) { Loading @@ -703,20 +704,27 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m // step 3: serialize to disk try { index->Serialize(); status = index->Serialize(); if (!status.ok()) { ENGINE_LOG_ERROR << status.message(); } } catch (std::exception& ex) { // typical error: out of disk space or permition denied std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; status = Status(DB_ERROR, msg); } if (!status.ok()) { // if failed to serialize merge file to disk // typical error: out of disk space, out of memory or permition denied table_file.file_type_ = meta::TableFileSchema::TO_DELETE; status = meta_ptr_->UpdateTableFile(table_file); ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; std::cout << "ERROR: failed to persist merged index file: " << table_file.location_ << ", possible out of disk space" << std::endl; ENGINE_LOG_ERROR << "Failed to persist merged file: " << table_file.location_ << ", possible out of disk space or memory"; return Status(DB_ERROR, msg); return status; } // step 4: update table files state Loading Loading @@ -751,13 +759,15 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) { } for (auto& kv : raw_files) { auto files = kv.second; meta::TableFilesSchema& files = kv.second; if (files.size() < options_.merge_trigger_number_) { ENGINE_LOG_TRACE << "Files number not greater equal than merge trigger number, skip merge action"; continue; } status = ongoing_files_checker_.MarkOngoingFiles(files); MergeFiles(table_id, kv.first, kv.second); status = ongoing_files_checker_.UnmarkOngoingFiles(files); if (shutting_down_.load(std::memory_order_acquire)) { ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id; Loading Loading @@ -788,16 +798,12 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) { meta_ptr_->Archive(); { uint64_t ttl = 10 * meta::SECOND; // default: file data will be erase from cache after few seconds meta_ptr_->CleanUpCacheWithTTL(ttl); } { uint64_t ttl = 5 * meta::MINUTE; // default: file will be deleted after few minutes uint64_t ttl = 10 * meta::SECOND; // default: file will be hard-deleted few seconds after soft-deleted if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) { ttl = meta::DAY; ttl = meta::HOUR; } meta_ptr_->CleanUpFilesWithTTL(ttl); meta_ptr_->CleanUpFilesWithTTL(ttl, &ongoing_files_checker_); } // ENGINE_LOG_TRACE << " Background compaction thread exit"; Loading Loading @@ -833,14 +839,15 @@ DBImpl::StartBuildIndexTask(bool force) { void DBImpl::BackgroundBuildIndex() { // ENGINE_LOG_TRACE << "Background build index thread start"; std::unique_lock<std::mutex> lock(build_index_mutex_); meta::TableFilesSchema to_index_files; meta_ptr_->FilesToIndex(to_index_files); Status status = IgnoreFailedIndexFiles(to_index_files); Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files); if (!to_index_files.empty()) { ENGINE_LOG_DEBUG << "Background build index thread begin"; status = ongoing_files_checker_.MarkOngoingFiles(to_index_files); // step 2: put build index task to scheduler std::vector<std::pair<scheduler::BuildIndexJobPtr, scheduler::TableFileSchemaPtr>> job2file_map; for (auto& file : to_index_files) { Loading @@ -851,6 +858,7 @@ DBImpl::BackgroundBuildIndex() { job2file_map.push_back(std::make_pair(job, file_ptr)); } // step 3: wait build index finished and mark failed files for (auto iter = job2file_map.begin(); iter != job2file_map.end(); ++iter) { scheduler::BuildIndexJobPtr job = iter->first; meta::TableFileSchema& file_schema = *(iter->second.get()); Loading @@ -859,17 +867,17 @@ DBImpl::BackgroundBuildIndex() { Status status = job->GetStatus(); ENGINE_LOG_ERROR << "Building index job " << job->id() << " failed: " << status.ToString(); MarkFailedIndexFile(file_schema); index_failed_checker_.MarkFailedIndexFile(file_schema); } else { MarkSucceedIndexFile(file_schema); ENGINE_LOG_DEBUG << "Building index job " << job->id() << " succeed."; index_failed_checker_.MarkSucceedIndexFile(file_schema); } status = ongoing_files_checker_.UnmarkOngoingFile(file_schema); } ENGINE_LOG_DEBUG << "Background build index thread finished"; } // ENGINE_LOG_TRACE << "Background build index thread exit"; } Status Loading @@ -894,6 +902,8 @@ DBImpl::GetFilesToBuildIndex(const std::string& table_id, const std::vector<int> Status DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& file_ids, const meta::DatesT& dates, meta::TableFilesSchema& files) { ENGINE_LOG_DEBUG << "Collect files from table: " << table_id; meta::DatePartionedTableFilesSchema date_files; auto status = meta_ptr_->FilesToSearch(table_id, file_ids, dates, date_files); if (!status.ok()) { Loading @@ -907,15 +917,15 @@ DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& Status DBImpl::GetPartitionsByTags(const std::string& table_id, const std::vector<std::string>& partition_tags, std::set<std::string>& partition_name_array) { std::vector<meta::TableSchema> partiton_array; auto status = meta_ptr_->ShowPartitions(table_id, partiton_array); std::vector<meta::TableSchema> partition_array; auto status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& tag : partition_tags) { // trim side-blank of tag, only compare valid characters // for example: " ab cd " is treated as "ab cd" std::string valid_tag = tag; server::StringHelpFunctions::TrimStringBlank(valid_tag); for (auto& schema : partiton_array) { for (auto& schema : partition_array) { if (server::StringHelpFunctions::IsRegexMatch(schema.partition_tag_, valid_tag)) { partition_name_array.insert(schema.table_id_); } Loading @@ -934,7 +944,7 @@ DBImpl::DropTableRecursively(const std::string& table_id, const meta::DatesT& da if (dates.empty()) { status = mem_mgr_->EraseMemVector(table_id); // not allow insert status = meta_ptr_->DropTable(table_id); // soft delete table CleanFailedIndexFileOfTable(table_id); index_failed_checker_.CleanFailedIndexFileOfTable(table_id); // scheduler will determine when to delete table files auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource(); Loading @@ -945,9 +955,9 @@ DBImpl::DropTableRecursively(const std::string& table_id, const meta::DatesT& da status = meta_ptr_->DropDataByDate(table_id, dates); } std::vector<meta::TableSchema> partiton_array; status = meta_ptr_->ShowPartitions(table_id, partiton_array); for (auto& schema : partiton_array) { std::vector<meta::TableSchema> partition_array; status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& schema : partition_array) { status = DropTableRecursively(schema.table_id_, dates); if (!status.ok()) { return status; Loading @@ -967,9 +977,9 @@ DBImpl::UpdateTableIndexRecursively(const std::string& table_id, const TableInde return status; } std::vector<meta::TableSchema> partiton_array; status = meta_ptr_->ShowPartitions(table_id, partiton_array); for (auto& schema : partiton_array) { std::vector<meta::TableSchema> partition_array; status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& schema : partition_array) { status = UpdateTableIndexRecursively(schema.table_id_, index); if (!status.ok()) { return status; Loading Loading @@ -1014,13 +1024,13 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex GetFilesToBuildIndex(table_id, file_types, table_files); times++; IgnoreFailedIndexFiles(table_files); index_failed_checker_.IgnoreFailedIndexFiles(table_files); } // build index for partition std::vector<meta::TableSchema> partiton_array; status = meta_ptr_->ShowPartitions(table_id, partiton_array); for (auto& schema : partiton_array) { std::vector<meta::TableSchema> partition_array; status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& schema : partition_array) { status = BuildTableIndexRecursively(schema.table_id_, index); if (!status.ok()) { return status; Loading @@ -1029,7 +1039,7 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex // failed to build index for some files, return error std::vector<std::string> failed_files; GetFailedIndexFileOfTable(table_id, failed_files); index_failed_checker_.GetFailedIndexFileOfTable(table_id, failed_files); if (!failed_files.empty()) { std::string msg = "Failed to build index for " + std::to_string(failed_files.size()) + ((failed_files.size() == 1) ? " file" : " files"); Loading @@ -1043,16 +1053,16 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex Status DBImpl::DropTableIndexRecursively(const std::string& table_id) { ENGINE_LOG_DEBUG << "Drop index for table: " << table_id; CleanFailedIndexFileOfTable(table_id); index_failed_checker_.CleanFailedIndexFileOfTable(table_id); auto status = meta_ptr_->DropTableIndex(table_id); if (!status.ok()) { return status; } // drop partition index std::vector<meta::TableSchema> partiton_array; status = meta_ptr_->ShowPartitions(table_id, partiton_array); for (auto& schema : partiton_array) { std::vector<meta::TableSchema> partition_array; status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& schema : partition_array) { status = DropTableIndexRecursively(schema.table_id_); if (!status.ok()) { return status; Loading @@ -1071,9 +1081,9 @@ DBImpl::GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_c } // get partition row count std::vector<meta::TableSchema> partiton_array; status = meta_ptr_->ShowPartitions(table_id, partiton_array); for (auto& schema : partiton_array) { std::vector<meta::TableSchema> partition_array; status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& schema : partition_array) { uint64_t partition_row_count = 0; status = GetTableRowCountRecursively(schema.table_id_, partition_row_count); if (!status.ok()) { Loading @@ -1086,86 +1096,5 @@ DBImpl::GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_c return Status::OK(); } Status DBImpl::CleanFailedIndexFileOfTable(const std::string& table_id) { std::lock_guard<std::mutex> lck(index_failed_mutex_); index_failed_files_.erase(table_id); // rebuild failed index files for this table return Status::OK(); } Status DBImpl::GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files) { failed_files.clear(); std::lock_guard<std::mutex> lck(index_failed_mutex_); auto iter = index_failed_files_.find(table_id); if (iter != index_failed_files_.end()) { FileID2FailedTimes& failed_map = iter->second; for (auto it_file = failed_map.begin(); it_file != failed_map.end(); ++it_file) { failed_files.push_back(it_file->first); } } return Status::OK(); } Status DBImpl::MarkFailedIndexFile(const meta::TableFileSchema& file) { std::lock_guard<std::mutex> lck(index_failed_mutex_); auto iter = index_failed_files_.find(file.table_id_); if (iter == index_failed_files_.end()) { FileID2FailedTimes failed_files; failed_files.insert(std::make_pair(file.file_id_, 1)); index_failed_files_.insert(std::make_pair(file.table_id_, failed_files)); } else { auto it_failed_files = iter->second.find(file.file_id_); if (it_failed_files != iter->second.end()) { it_failed_files->second++; } else { iter->second.insert(std::make_pair(file.file_id_, 1)); } } return Status::OK(); } Status DBImpl::MarkSucceedIndexFile(const meta::TableFileSchema& file) { std::lock_guard<std::mutex> lck(index_failed_mutex_); auto iter = index_failed_files_.find(file.table_id_); if (iter != index_failed_files_.end()) { iter->second.erase(file.file_id_); } return Status::OK(); } Status DBImpl::IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files) { std::lock_guard<std::mutex> lck(index_failed_mutex_); // there could be some failed files belong to different table. // some files may has failed for several times, no need to build index for these files. // thus we can avoid dead circle for build index operation for (auto it_file = table_files.begin(); it_file != table_files.end();) { auto it_failed_files = index_failed_files_.find((*it_file).table_id_); if (it_failed_files != index_failed_files_.end()) { auto it_failed_file = it_failed_files->second.find((*it_file).file_id_); if (it_failed_file != it_failed_files->second.end()) { if (it_failed_file->second >= INDEX_FAILED_RETRY_TIME) { it_file = table_files.erase(it_file); continue; } } } ++it_file; } return Status::OK(); } } // namespace engine } // namespace milvus
core/src/db/DBImpl.h +9 −23 Original line number Diff line number Diff line Loading @@ -18,8 +18,10 @@ #pragma once #include "DB.h" #include "Types.h" #include "src/db/insert/MemManager.h" #include "db/IndexFailedChecker.h" #include "db/OngoingFileChecker.h" #include "db/Types.h" #include "db/insert/MemManager.h" #include "utils/ThreadPool.h" #include <atomic> Loading Loading @@ -87,7 +89,7 @@ class DBImpl : public DB { DropPartitionByTag(const std::string& table_id, const std::string& partition_tag) override; Status ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partiton_schema_array) override; ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partition_schema_array) override; Status InsertVectors(const std::string& table_id, const std::string& partition_tag, uint64_t n, const float* vectors, Loading Loading @@ -178,21 +180,6 @@ class DBImpl : public DB { Status GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_count); Status CleanFailedIndexFileOfTable(const std::string& table_id); Status GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files); Status MarkFailedIndexFile(const meta::TableFileSchema& file); Status MarkSucceedIndexFile(const meta::TableFileSchema& file); Status IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files); private: const DBOptions options_; Loading @@ -214,10 +201,9 @@ class DBImpl : public DB { std::list<std::future<void>> index_thread_results_; std::mutex build_index_mutex_; std::mutex index_failed_mutex_; using FileID2FailedTimes = std::map<std::string, uint64_t>; using Table2FailedFiles = std::map<std::string, FileID2FailedTimes>; Table2FailedFiles index_failed_files_; // file id mapping to failed times IndexFailedChecker index_failed_checker_; OngoingFileChecker ongoing_files_checker_; }; // DBImpl } // namespace engine Loading