Loading CHANGELOG.md +2 −1 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#458 - Index data is not compatible between 0.5 and 0.6 - \#465 - Server hang caused by searching with nsg index - \#486 - gpu no usage during index building - \#497 - CPU-version search performance decreased - \#504 - The code coverage rate of core/src/scheduler/optimizer is too low - \#509 - IVF_PQ index build trapped into dead loop caused by invalid params - \#513 - Unittest DELETE_BY_RANGE sometimes failed Loading @@ -31,7 +32,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#532 - assigin value to `table_name` from confest shell - \#533 - NSG build failed with MetricType Inner Product - \#543 - client raise exception in shards when search results is empty - \#497 - CPU-version search performance decreased - \#545 - Avoid dead circle of build index thread when error occurs ## Feature - \#12 - Pure CPU version for Milvus Loading core/src/db/DBImpl.cpp +124 −8 Original line number Diff line number Diff line Loading @@ -41,6 +41,7 @@ #include <iostream> #include <set> #include <thread> #include <utility> namespace milvus { namespace engine { Loading @@ -51,6 +52,8 @@ constexpr uint64_t METRIC_ACTION_INTERVAL = 1; constexpr uint64_t COMPACT_ACTION_INTERVAL = 1; constexpr uint64_t INDEX_ACTION_INTERVAL = 1; constexpr uint64_t INDEX_FAILED_RETRY_TIME = 1; static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milsvus server is shutdown!"); void Loading Loading @@ -361,6 +364,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) { WaitMergeFileFinish(); // step 4: wait and build index status = CleanFailedIndexFileOfTable(table_id); status = BuildTableIndexRecursively(table_id, index); return status; Loading Loading @@ -828,24 +832,37 @@ DBImpl::BackgroundBuildIndex() { std::unique_lock<std::mutex> lock(build_index_mutex_); meta::TableFilesSchema to_index_files; meta_ptr_->FilesToIndex(to_index_files); Status status; Status status = IgnoreFailedIndexFiles(to_index_files); if (!to_index_files.empty()) { scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_); // step 2: put build index task to scheduler std::map<scheduler::BuildIndexJobPtr, scheduler::TableFileSchemaPtr> job2file_map; for (auto& file : to_index_files) { scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_); scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file); job->AddToIndexFiles(file_ptr); } scheduler::JobMgrInst::GetInstance()->Put(job); job2file_map.insert(std::make_pair(job, file_ptr)); } for (auto iter = job2file_map.begin(); iter != job2file_map.end(); ++iter) { scheduler::BuildIndexJobPtr job = iter->first; meta::TableFileSchema& file_schema = *(iter->second.get()); job->WaitBuildIndexFinish(); if (!job->GetStatus().ok()) { Status status = job->GetStatus(); ENGINE_LOG_ERROR << "Building index failed: " << status.ToString(); ENGINE_LOG_ERROR << "Building index job " << job->id() << " failed: " << status.ToString(); MarkFailedIndexFile(file_schema); } else { MarkSucceedIndexFile(file_schema); ENGINE_LOG_DEBUG << "Building index job " << job->id() << " succeed."; } } ENGINE_LOG_DEBUG << "Background build index thread finished"; } // ENGINE_LOG_TRACE << "Background build index thread exit"; } Loading Loading @@ -911,6 +928,7 @@ DBImpl::DropTableRecursively(const std::string& table_id, const meta::DatesT& da if (dates.empty()) { status = mem_mgr_->EraseMemVector(table_id); // not allow insert status = meta_ptr_->DropTable(table_id); // soft delete table CleanFailedIndexFileOfTable(table_id); // scheduler will determine when to delete table files auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource(); Loading Loading @@ -989,6 +1007,8 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10 * 1000, times * 100))); GetFilesToBuildIndex(table_id, file_types, table_files); times++; IgnoreFailedIndexFiles(table_files); } // build index for partition Loading @@ -1001,12 +1021,27 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex } } // failed to build index for some files, return error std::vector<std::string> failed_files; GetFailedIndexFileOfTable(table_id, failed_files); if (!failed_files.empty()) { std::string msg = "Failed to build index for " + std::to_string(failed_files.size()) + ((failed_files.size() == 1) ? " file" : " files"); #ifdef MILVUS_CPU_VERSION msg += ", please double check index parameters."; #else msg += ", file size is too large or gpu memory is not enough."; #endif return Status(DB_ERROR, msg); } return Status::OK(); } Status DBImpl::DropTableIndexRecursively(const std::string& table_id) { ENGINE_LOG_DEBUG << "Drop index for table: " << table_id; CleanFailedIndexFileOfTable(table_id); auto status = meta_ptr_->DropTableIndex(table_id); if (!status.ok()) { return status; Loading Loading @@ -1049,5 +1084,86 @@ DBImpl::GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_c return Status::OK(); } Status DBImpl::CleanFailedIndexFileOfTable(const std::string& table_id) { std::lock_guard<std::mutex> lck(index_failed_mutex_); index_failed_files_.erase(table_id); // rebuild failed index files for this table return Status::OK(); } Status DBImpl::GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files) { failed_files.clear(); std::lock_guard<std::mutex> lck(index_failed_mutex_); auto iter = index_failed_files_.find(table_id); if (iter != index_failed_files_.end()) { FileID2FailedTimes& failed_map = iter->second; for (auto it_file = failed_map.begin(); it_file != failed_map.end(); ++it_file) { failed_files.push_back(it_file->first); } } return Status::OK(); } Status DBImpl::MarkFailedIndexFile(const meta::TableFileSchema& file) { std::lock_guard<std::mutex> lck(index_failed_mutex_); auto iter = index_failed_files_.find(file.table_id_); if (iter == index_failed_files_.end()) { FileID2FailedTimes failed_files; failed_files.insert(std::make_pair(file.file_id_, 1)); index_failed_files_.insert(std::make_pair(file.table_id_, failed_files)); } else { auto it_failed_files = iter->second.find(file.file_id_); if (it_failed_files != iter->second.end()) { it_failed_files->second++; } else { iter->second.insert(std::make_pair(file.file_id_, 1)); } } return Status::OK(); } Status DBImpl::MarkSucceedIndexFile(const meta::TableFileSchema& file) { std::lock_guard<std::mutex> lck(index_failed_mutex_); auto iter = index_failed_files_.find(file.table_id_); if (iter != index_failed_files_.end()) { iter->second.erase(file.file_id_); } return Status::OK(); } Status DBImpl::IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files) { std::lock_guard<std::mutex> lck(index_failed_mutex_); // there could be some failed files belong to different table. // some files may has failed for several times, no need to build index for these files. // thus we can avoid dead circle for build index operation for (auto it_file = table_files.begin(); it_file != table_files.end();) { auto it_failed_files = index_failed_files_.find((*it_file).table_id_); if (it_failed_files != index_failed_files_.end()) { auto it_failed_file = it_failed_files->second.find((*it_file).file_id_); if (it_failed_file != it_failed_files->second.end()) { if (it_failed_file->second >= INDEX_FAILED_RETRY_TIME) { it_file = table_files.erase(it_file); continue; } } } ++it_file; } return Status::OK(); } } // namespace engine } // namespace milvus core/src/db/DBImpl.h +21 −3 Original line number Diff line number Diff line Loading @@ -25,6 +25,7 @@ #include <atomic> #include <condition_variable> #include <list> #include <map> #include <memory> #include <mutex> #include <set> Loading @@ -35,8 +36,6 @@ namespace milvus { namespace engine { class Env; namespace meta { class Meta; } Loading Loading @@ -179,6 +178,21 @@ class DBImpl : public DB { Status GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_count); Status CleanFailedIndexFileOfTable(const std::string& table_id); Status GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files); Status MarkFailedIndexFile(const meta::TableFileSchema& file); Status MarkSucceedIndexFile(const meta::TableFileSchema& file); Status IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files); private: const DBOptions options_; Loading @@ -200,6 +214,10 @@ class DBImpl : public DB { std::list<std::future<void>> index_thread_results_; std::mutex build_index_mutex_; std::mutex index_failed_mutex_; using FileID2FailedTimes = std::map<std::string, uint64_t>; using Table2FailedFiles = std::map<std::string, FileID2FailedTimes>; Table2FailedFiles index_failed_files_; // file id mapping to failed times }; // DBImpl } // namespace engine Loading core/src/db/Utils.cpp +3 −1 Original line number Diff line number Diff line Loading @@ -154,7 +154,9 @@ GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file } std::string msg = "Table file doesn't exist: " + file_path; if (table_file.file_size_ > 0) { // no need to pop error for empty file ENGINE_LOG_ERROR << msg << " in path: " << options.path_ << " for table: " << table_file.table_id_; } return Status(DB_ERROR, msg); } Loading core/src/db/meta/MySQLMetaImpl.cpp +29 −4 Original line number Diff line number Diff line Loading @@ -1610,10 +1610,35 @@ MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>& } } ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count << " new files:" << new_count << " new_merge files:" << new_merge_count << " new_index files:" << new_index_count << " to_index files:" << to_index_count << " index files:" << index_count << " backup files:" << backup_count; std::string msg = "Get table files by type. "; for (int file_type : file_types) { switch (file_type) { case (int)TableFileSchema::RAW: msg = msg + "raw files:" + std::to_string(raw_count); break; case (int)TableFileSchema::NEW: msg = msg + "new files:" + std::to_string(raw_count); break; case (int)TableFileSchema::NEW_MERGE: msg = msg + "new_merge files:" + std::to_string(raw_count); break; case (int)TableFileSchema::NEW_INDEX: msg = msg + "new_index files:" + std::to_string(raw_count); break; case (int)TableFileSchema::TO_INDEX: msg = msg + "to_index files:" + std::to_string(raw_count); break; case (int)TableFileSchema::INDEX: msg = msg + "index files:" + std::to_string(raw_count); break; case (int)TableFileSchema::BACKUP: msg = msg + "backup files:" + std::to_string(raw_count); break; default: break; } } ENGINE_LOG_DEBUG << msg; } } catch (std::exception& e) { return HandleException("GENERAL ERROR WHEN GET FILE BY TYPE", e.what()); Loading Loading
CHANGELOG.md +2 −1 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#458 - Index data is not compatible between 0.5 and 0.6 - \#465 - Server hang caused by searching with nsg index - \#486 - gpu no usage during index building - \#497 - CPU-version search performance decreased - \#504 - The code coverage rate of core/src/scheduler/optimizer is too low - \#509 - IVF_PQ index build trapped into dead loop caused by invalid params - \#513 - Unittest DELETE_BY_RANGE sometimes failed Loading @@ -31,7 +32,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#532 - assigin value to `table_name` from confest shell - \#533 - NSG build failed with MetricType Inner Product - \#543 - client raise exception in shards when search results is empty - \#497 - CPU-version search performance decreased - \#545 - Avoid dead circle of build index thread when error occurs ## Feature - \#12 - Pure CPU version for Milvus Loading
core/src/db/DBImpl.cpp +124 −8 Original line number Diff line number Diff line Loading @@ -41,6 +41,7 @@ #include <iostream> #include <set> #include <thread> #include <utility> namespace milvus { namespace engine { Loading @@ -51,6 +52,8 @@ constexpr uint64_t METRIC_ACTION_INTERVAL = 1; constexpr uint64_t COMPACT_ACTION_INTERVAL = 1; constexpr uint64_t INDEX_ACTION_INTERVAL = 1; constexpr uint64_t INDEX_FAILED_RETRY_TIME = 1; static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milsvus server is shutdown!"); void Loading Loading @@ -361,6 +364,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) { WaitMergeFileFinish(); // step 4: wait and build index status = CleanFailedIndexFileOfTable(table_id); status = BuildTableIndexRecursively(table_id, index); return status; Loading Loading @@ -828,24 +832,37 @@ DBImpl::BackgroundBuildIndex() { std::unique_lock<std::mutex> lock(build_index_mutex_); meta::TableFilesSchema to_index_files; meta_ptr_->FilesToIndex(to_index_files); Status status; Status status = IgnoreFailedIndexFiles(to_index_files); if (!to_index_files.empty()) { scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_); // step 2: put build index task to scheduler std::map<scheduler::BuildIndexJobPtr, scheduler::TableFileSchemaPtr> job2file_map; for (auto& file : to_index_files) { scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(meta_ptr_, options_); scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file); job->AddToIndexFiles(file_ptr); } scheduler::JobMgrInst::GetInstance()->Put(job); job2file_map.insert(std::make_pair(job, file_ptr)); } for (auto iter = job2file_map.begin(); iter != job2file_map.end(); ++iter) { scheduler::BuildIndexJobPtr job = iter->first; meta::TableFileSchema& file_schema = *(iter->second.get()); job->WaitBuildIndexFinish(); if (!job->GetStatus().ok()) { Status status = job->GetStatus(); ENGINE_LOG_ERROR << "Building index failed: " << status.ToString(); ENGINE_LOG_ERROR << "Building index job " << job->id() << " failed: " << status.ToString(); MarkFailedIndexFile(file_schema); } else { MarkSucceedIndexFile(file_schema); ENGINE_LOG_DEBUG << "Building index job " << job->id() << " succeed."; } } ENGINE_LOG_DEBUG << "Background build index thread finished"; } // ENGINE_LOG_TRACE << "Background build index thread exit"; } Loading Loading @@ -911,6 +928,7 @@ DBImpl::DropTableRecursively(const std::string& table_id, const meta::DatesT& da if (dates.empty()) { status = mem_mgr_->EraseMemVector(table_id); // not allow insert status = meta_ptr_->DropTable(table_id); // soft delete table CleanFailedIndexFileOfTable(table_id); // scheduler will determine when to delete table files auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource(); Loading Loading @@ -989,6 +1007,8 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10 * 1000, times * 100))); GetFilesToBuildIndex(table_id, file_types, table_files); times++; IgnoreFailedIndexFiles(table_files); } // build index for partition Loading @@ -1001,12 +1021,27 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex } } // failed to build index for some files, return error std::vector<std::string> failed_files; GetFailedIndexFileOfTable(table_id, failed_files); if (!failed_files.empty()) { std::string msg = "Failed to build index for " + std::to_string(failed_files.size()) + ((failed_files.size() == 1) ? " file" : " files"); #ifdef MILVUS_CPU_VERSION msg += ", please double check index parameters."; #else msg += ", file size is too large or gpu memory is not enough."; #endif return Status(DB_ERROR, msg); } return Status::OK(); } Status DBImpl::DropTableIndexRecursively(const std::string& table_id) { ENGINE_LOG_DEBUG << "Drop index for table: " << table_id; CleanFailedIndexFileOfTable(table_id); auto status = meta_ptr_->DropTableIndex(table_id); if (!status.ok()) { return status; Loading Loading @@ -1049,5 +1084,86 @@ DBImpl::GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_c return Status::OK(); } Status DBImpl::CleanFailedIndexFileOfTable(const std::string& table_id) { std::lock_guard<std::mutex> lck(index_failed_mutex_); index_failed_files_.erase(table_id); // rebuild failed index files for this table return Status::OK(); } Status DBImpl::GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files) { failed_files.clear(); std::lock_guard<std::mutex> lck(index_failed_mutex_); auto iter = index_failed_files_.find(table_id); if (iter != index_failed_files_.end()) { FileID2FailedTimes& failed_map = iter->second; for (auto it_file = failed_map.begin(); it_file != failed_map.end(); ++it_file) { failed_files.push_back(it_file->first); } } return Status::OK(); } Status DBImpl::MarkFailedIndexFile(const meta::TableFileSchema& file) { std::lock_guard<std::mutex> lck(index_failed_mutex_); auto iter = index_failed_files_.find(file.table_id_); if (iter == index_failed_files_.end()) { FileID2FailedTimes failed_files; failed_files.insert(std::make_pair(file.file_id_, 1)); index_failed_files_.insert(std::make_pair(file.table_id_, failed_files)); } else { auto it_failed_files = iter->second.find(file.file_id_); if (it_failed_files != iter->second.end()) { it_failed_files->second++; } else { iter->second.insert(std::make_pair(file.file_id_, 1)); } } return Status::OK(); } Status DBImpl::MarkSucceedIndexFile(const meta::TableFileSchema& file) { std::lock_guard<std::mutex> lck(index_failed_mutex_); auto iter = index_failed_files_.find(file.table_id_); if (iter != index_failed_files_.end()) { iter->second.erase(file.file_id_); } return Status::OK(); } Status DBImpl::IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files) { std::lock_guard<std::mutex> lck(index_failed_mutex_); // there could be some failed files belong to different table. // some files may has failed for several times, no need to build index for these files. // thus we can avoid dead circle for build index operation for (auto it_file = table_files.begin(); it_file != table_files.end();) { auto it_failed_files = index_failed_files_.find((*it_file).table_id_); if (it_failed_files != index_failed_files_.end()) { auto it_failed_file = it_failed_files->second.find((*it_file).file_id_); if (it_failed_file != it_failed_files->second.end()) { if (it_failed_file->second >= INDEX_FAILED_RETRY_TIME) { it_file = table_files.erase(it_file); continue; } } } ++it_file; } return Status::OK(); } } // namespace engine } // namespace milvus
core/src/db/DBImpl.h +21 −3 Original line number Diff line number Diff line Loading @@ -25,6 +25,7 @@ #include <atomic> #include <condition_variable> #include <list> #include <map> #include <memory> #include <mutex> #include <set> Loading @@ -35,8 +36,6 @@ namespace milvus { namespace engine { class Env; namespace meta { class Meta; } Loading Loading @@ -179,6 +178,21 @@ class DBImpl : public DB { Status GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_count); Status CleanFailedIndexFileOfTable(const std::string& table_id); Status GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files); Status MarkFailedIndexFile(const meta::TableFileSchema& file); Status MarkSucceedIndexFile(const meta::TableFileSchema& file); Status IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files); private: const DBOptions options_; Loading @@ -200,6 +214,10 @@ class DBImpl : public DB { std::list<std::future<void>> index_thread_results_; std::mutex build_index_mutex_; std::mutex index_failed_mutex_; using FileID2FailedTimes = std::map<std::string, uint64_t>; using Table2FailedFiles = std::map<std::string, FileID2FailedTimes>; Table2FailedFiles index_failed_files_; // file id mapping to failed times }; // DBImpl } // namespace engine Loading
core/src/db/Utils.cpp +3 −1 Original line number Diff line number Diff line Loading @@ -154,7 +154,9 @@ GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file } std::string msg = "Table file doesn't exist: " + file_path; if (table_file.file_size_ > 0) { // no need to pop error for empty file ENGINE_LOG_ERROR << msg << " in path: " << options.path_ << " for table: " << table_file.table_id_; } return Status(DB_ERROR, msg); } Loading
core/src/db/meta/MySQLMetaImpl.cpp +29 −4 Original line number Diff line number Diff line Loading @@ -1610,10 +1610,35 @@ MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>& } } ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count << " new files:" << new_count << " new_merge files:" << new_merge_count << " new_index files:" << new_index_count << " to_index files:" << to_index_count << " index files:" << index_count << " backup files:" << backup_count; std::string msg = "Get table files by type. "; for (int file_type : file_types) { switch (file_type) { case (int)TableFileSchema::RAW: msg = msg + "raw files:" + std::to_string(raw_count); break; case (int)TableFileSchema::NEW: msg = msg + "new files:" + std::to_string(raw_count); break; case (int)TableFileSchema::NEW_MERGE: msg = msg + "new_merge files:" + std::to_string(raw_count); break; case (int)TableFileSchema::NEW_INDEX: msg = msg + "new_index files:" + std::to_string(raw_count); break; case (int)TableFileSchema::TO_INDEX: msg = msg + "to_index files:" + std::to_string(raw_count); break; case (int)TableFileSchema::INDEX: msg = msg + "index files:" + std::to_string(raw_count); break; case (int)TableFileSchema::BACKUP: msg = msg + "backup files:" + std::to_string(raw_count); break; default: break; } } ENGINE_LOG_DEBUG << msg; } } catch (std::exception& e) { return HandleException("GENERAL ERROR WHEN GET FILE BY TYPE", e.what()); Loading