Loading core/src/db/DBImpl.cpp +4 −5 Original line number Diff line number Diff line Loading @@ -791,18 +791,17 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) { meta_ptr_->Archive(); meta::Table2FileIDs ignore_files = ongoing_files_checker_.GetOngoingFiles(); { uint64_t ttl = 10 * meta::SECOND; // default: file data will be erase from cache after few seconds meta_ptr_->CleanUpCacheWithTTL(ttl, ignore_files); uint64_t ttl = 1 * meta::SECOND; // default: file data will be erase from cache after few seconds meta_ptr_->CleanUpCacheWithTTL(ttl, &ongoing_files_checker_); } { uint64_t ttl = 20 * meta::SECOND; // default: file will be deleted after few seconds uint64_t ttl = 1 * meta::SECOND; // default: file will be deleted after few seconds if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) { ttl = meta::H_SEC; } meta_ptr_->CleanUpFilesWithTTL(ttl, ignore_files); meta_ptr_->CleanUpFilesWithTTL(ttl, &ongoing_files_checker_); } // ENGINE_LOG_TRACE << " Background compaction thread exit"; Loading core/src/db/IndexFailedChecker.cpp +2 −2 Original line number Diff line number Diff line Loading @@ -38,7 +38,7 @@ IndexFailedChecker::GetFailedIndexFileOfTable(const std::string& table_id, std:: std::lock_guard<std::mutex> lck(mutex_); auto iter = index_failed_files_.find(table_id); if (iter != index_failed_files_.end()) { FileID2FailedTimes& failed_map = iter->second; meta::File2RefCount& failed_map = iter->second; for (auto it_file = failed_map.begin(); it_file != failed_map.end(); ++it_file) { failed_files.push_back(it_file->first); } Loading @@ -53,7 +53,7 @@ IndexFailedChecker::MarkFailedIndexFile(const meta::TableFileSchema& file) { auto iter = index_failed_files_.find(file.table_id_); if (iter == index_failed_files_.end()) { FileID2FailedTimes failed_files; meta::File2RefCount failed_files; failed_files.insert(std::make_pair(file.file_id_, 1)); index_failed_files_.insert(std::make_pair(file.table_id_, failed_files)); } else { Loading core/src/db/IndexFailedChecker.h +1 −3 Original line number Diff line number Diff line Loading @@ -47,9 +47,7 @@ class IndexFailedChecker { private: std::mutex mutex_; using FileID2FailedTimes = std::map<std::string, uint64_t>; using Table2FailedFiles = std::map<std::string, FileID2FailedTimes>; Table2FailedFiles index_failed_files_; // file id mapping to failed times meta::Table2Files index_failed_files_; // table id mapping to (file id mapping to failed times) }; } // namespace engine Loading core/src/db/OngoingFileChecker.cpp +31 −8 Original line number Diff line number Diff line Loading @@ -16,6 +16,7 @@ // under the License. #include "db/OngoingFileChecker.h" #include "utils/Log.h" #include <utility> Loading Loading @@ -56,11 +57,21 @@ OngoingFileChecker::UnmarkOngoingFiles(const meta::TableFilesSchema& table_files return Status::OK(); } meta::Table2FileIDs OngoingFileChecker::GetOngoingFiles() { // return copy // don't return reference(avoid multi-threads conflict) return ongoing_files_; bool OngoingFileChecker::IsIgnored(const meta::TableFileSchema& schema) { std::lock_guard<std::mutex> lck(mutex_); auto iter = ongoing_files_.find(schema.table_id_); if (iter == ongoing_files_.end()) { return false; } else { auto it_file = iter->second.find(schema.file_id_); if (it_file == iter->second.end()) { return false; } else { return (it_file->second > 0); } } } Status Loading @@ -71,12 +82,21 @@ OngoingFileChecker::MarkOngoingFileNoLock(const meta::TableFileSchema& table_fil auto iter = ongoing_files_.find(table_file.table_id_); if (iter == ongoing_files_.end()) { meta::FileIDArray file_ids = {table_file.file_id_}; ongoing_files_.insert(std::make_pair(table_file.table_id_, file_ids)); meta::File2RefCount files_refcount; files_refcount.insert(std::make_pair(table_file.file_id_, 1)); ongoing_files_.insert(std::make_pair(table_file.table_id_, files_refcount)); } else { auto it_file = iter->second.find(table_file.file_id_); if (it_file == iter->second.end()) { iter->second[table_file.file_id_] = 1; } else { iter->second.insert(table_file.file_id_); it_file->second++; } } ENGINE_LOG_DEBUG << "Mark ongoing file:" << table_file.file_id_ << " refcount:" << ongoing_files_[table_file.table_id_][table_file.file_id_]; return Status::OK(); } Loading @@ -94,6 +114,9 @@ OngoingFileChecker::UnmarkOngoingFileNoLock(const meta::TableFileSchema& table_f } } ENGINE_LOG_DEBUG << "Mark ongoing file:" << table_file.file_id_ << " refcount:" << ongoing_files_[table_file.table_id_][table_file.file_id_]; return Status::OK(); } Loading core/src/db/OngoingFileChecker.h +4 −4 Original line number Diff line number Diff line Loading @@ -28,7 +28,7 @@ namespace milvus { namespace engine { class OngoingFileChecker { class OngoingFileChecker : public meta::Meta::CleanUpFilter { public: Status MarkOngoingFile(const meta::TableFileSchema& table_file); Loading @@ -42,8 +42,8 @@ class OngoingFileChecker { Status UnmarkOngoingFiles(const meta::TableFilesSchema& table_files); meta::Table2FileIDs GetOngoingFiles(); bool IsIgnored(const meta::TableFileSchema& schema) override; private: Status Loading @@ -54,7 +54,7 @@ class OngoingFileChecker { private: std::mutex mutex_; meta::Table2FileIDs ongoing_files_; meta::Table2Files ongoing_files_; // table id mapping to (file id mapping to ongoing ref-count) }; } // namespace engine Loading Loading
core/src/db/DBImpl.cpp +4 −5 Original line number Diff line number Diff line Loading @@ -791,18 +791,17 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) { meta_ptr_->Archive(); meta::Table2FileIDs ignore_files = ongoing_files_checker_.GetOngoingFiles(); { uint64_t ttl = 10 * meta::SECOND; // default: file data will be erase from cache after few seconds meta_ptr_->CleanUpCacheWithTTL(ttl, ignore_files); uint64_t ttl = 1 * meta::SECOND; // default: file data will be erase from cache after few seconds meta_ptr_->CleanUpCacheWithTTL(ttl, &ongoing_files_checker_); } { uint64_t ttl = 20 * meta::SECOND; // default: file will be deleted after few seconds uint64_t ttl = 1 * meta::SECOND; // default: file will be deleted after few seconds if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) { ttl = meta::H_SEC; } meta_ptr_->CleanUpFilesWithTTL(ttl, ignore_files); meta_ptr_->CleanUpFilesWithTTL(ttl, &ongoing_files_checker_); } // ENGINE_LOG_TRACE << " Background compaction thread exit"; Loading
core/src/db/IndexFailedChecker.cpp +2 −2 Original line number Diff line number Diff line Loading @@ -38,7 +38,7 @@ IndexFailedChecker::GetFailedIndexFileOfTable(const std::string& table_id, std:: std::lock_guard<std::mutex> lck(mutex_); auto iter = index_failed_files_.find(table_id); if (iter != index_failed_files_.end()) { FileID2FailedTimes& failed_map = iter->second; meta::File2RefCount& failed_map = iter->second; for (auto it_file = failed_map.begin(); it_file != failed_map.end(); ++it_file) { failed_files.push_back(it_file->first); } Loading @@ -53,7 +53,7 @@ IndexFailedChecker::MarkFailedIndexFile(const meta::TableFileSchema& file) { auto iter = index_failed_files_.find(file.table_id_); if (iter == index_failed_files_.end()) { FileID2FailedTimes failed_files; meta::File2RefCount failed_files; failed_files.insert(std::make_pair(file.file_id_, 1)); index_failed_files_.insert(std::make_pair(file.table_id_, failed_files)); } else { Loading
core/src/db/IndexFailedChecker.h +1 −3 Original line number Diff line number Diff line Loading @@ -47,9 +47,7 @@ class IndexFailedChecker { private: std::mutex mutex_; using FileID2FailedTimes = std::map<std::string, uint64_t>; using Table2FailedFiles = std::map<std::string, FileID2FailedTimes>; Table2FailedFiles index_failed_files_; // file id mapping to failed times meta::Table2Files index_failed_files_; // table id mapping to (file id mapping to failed times) }; } // namespace engine Loading
core/src/db/OngoingFileChecker.cpp +31 −8 Original line number Diff line number Diff line Loading @@ -16,6 +16,7 @@ // under the License. #include "db/OngoingFileChecker.h" #include "utils/Log.h" #include <utility> Loading Loading @@ -56,11 +57,21 @@ OngoingFileChecker::UnmarkOngoingFiles(const meta::TableFilesSchema& table_files return Status::OK(); } meta::Table2FileIDs OngoingFileChecker::GetOngoingFiles() { // return copy // don't return reference(avoid multi-threads conflict) return ongoing_files_; bool OngoingFileChecker::IsIgnored(const meta::TableFileSchema& schema) { std::lock_guard<std::mutex> lck(mutex_); auto iter = ongoing_files_.find(schema.table_id_); if (iter == ongoing_files_.end()) { return false; } else { auto it_file = iter->second.find(schema.file_id_); if (it_file == iter->second.end()) { return false; } else { return (it_file->second > 0); } } } Status Loading @@ -71,12 +82,21 @@ OngoingFileChecker::MarkOngoingFileNoLock(const meta::TableFileSchema& table_fil auto iter = ongoing_files_.find(table_file.table_id_); if (iter == ongoing_files_.end()) { meta::FileIDArray file_ids = {table_file.file_id_}; ongoing_files_.insert(std::make_pair(table_file.table_id_, file_ids)); meta::File2RefCount files_refcount; files_refcount.insert(std::make_pair(table_file.file_id_, 1)); ongoing_files_.insert(std::make_pair(table_file.table_id_, files_refcount)); } else { auto it_file = iter->second.find(table_file.file_id_); if (it_file == iter->second.end()) { iter->second[table_file.file_id_] = 1; } else { iter->second.insert(table_file.file_id_); it_file->second++; } } ENGINE_LOG_DEBUG << "Mark ongoing file:" << table_file.file_id_ << " refcount:" << ongoing_files_[table_file.table_id_][table_file.file_id_]; return Status::OK(); } Loading @@ -94,6 +114,9 @@ OngoingFileChecker::UnmarkOngoingFileNoLock(const meta::TableFileSchema& table_f } } ENGINE_LOG_DEBUG << "Mark ongoing file:" << table_file.file_id_ << " refcount:" << ongoing_files_[table_file.table_id_][table_file.file_id_]; return Status::OK(); } Loading
core/src/db/OngoingFileChecker.h +4 −4 Original line number Diff line number Diff line Loading @@ -28,7 +28,7 @@ namespace milvus { namespace engine { class OngoingFileChecker { class OngoingFileChecker : public meta::Meta::CleanUpFilter { public: Status MarkOngoingFile(const meta::TableFileSchema& table_file); Loading @@ -42,8 +42,8 @@ class OngoingFileChecker { Status UnmarkOngoingFiles(const meta::TableFilesSchema& table_files); meta::Table2FileIDs GetOngoingFiles(); bool IsIgnored(const meta::TableFileSchema& schema) override; private: Status Loading @@ -54,7 +54,7 @@ class OngoingFileChecker { private: std::mutex mutex_; meta::Table2FileIDs ongoing_files_; meta::Table2Files ongoing_files_; // table id mapping to (file id mapping to ongoing ref-count) }; } // namespace engine Loading