Loading core/src/cache/Cache.inl +5 −5 Original line number Diff line number Diff line Loading @@ -176,14 +176,14 @@ Cache<ItemObj>::print() { { std::lock_guard<std::mutex> lock(mutex_); cache_count = lru_.size(); } SERVER_LOG_DEBUG << "[Cache item count]: " << cache_count; #if 1 #if 0 for (auto it = lru_.begin(); it != lru_.end(); ++it) { SERVER_LOG_DEBUG << it->first; } #endif } SERVER_LOG_DEBUG << "[Cache item count]: " << cache_count; SERVER_LOG_DEBUG << "[Cache usage]: " << usage_ << " bytes"; SERVER_LOG_DEBUG << "[Cache capacity]: " << capacity_ << " bytes"; } Loading core/src/db/DBImpl.cpp +6 −8 Original line number Diff line number Diff line Loading @@ -722,7 +722,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m status = meta_ptr_->UpdateTableFile(table_file); ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; ENGINE_LOG_ERROR << "ERROR: failed to persist merged file: " << table_file.location_ ENGINE_LOG_ERROR << "Failed to persist merged file: " << table_file.location_ << ", possible out of disk space or memory"; return status; Loading Loading @@ -803,6 +803,7 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) { if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) { ttl = meta::H_SEC; } meta_ptr_->CleanUpFilesWithTTL(ttl, &ongoing_files_checker_); } Loading Loading @@ -839,14 +840,13 @@ DBImpl::StartBuildIndexTask(bool force) { void DBImpl::BackgroundBuildIndex() { // ENGINE_LOG_TRACE << "Background build index thread start"; std::unique_lock<std::mutex> lock(build_index_mutex_); meta::TableFilesSchema to_index_files; meta_ptr_->FilesToIndex(to_index_files); Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files); if (!to_index_files.empty()) { ENGINE_LOG_DEBUG << "Background build index thread begin"; status = ongoing_files_checker_.MarkOngoingFiles(to_index_files); // step 2: put build index task to scheduler Loading @@ -870,17 +870,15 @@ DBImpl::BackgroundBuildIndex() { index_failed_checker_.MarkFailedIndexFile(file_schema); } else { index_failed_checker_.MarkSucceedIndexFile(file_schema); ENGINE_LOG_DEBUG << "Building index job " << job->id() << " succeed."; index_failed_checker_.MarkSucceedIndexFile(file_schema); } status = ongoing_files_checker_.UnmarkOngoingFile(file_schema); } status = ongoing_files_checker_.UnmarkOngoingFiles(to_index_files); ENGINE_LOG_DEBUG << "Background build index thread finished"; } // ENGINE_LOG_TRACE << "Background build index thread exit"; } Status Loading core/src/db/engine/ExecutionEngineImpl.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -463,7 +463,7 @@ ExecutionEngineImpl::Merge(const std::string& location) { if (auto file_index = std::dynamic_pointer_cast<BFIndex>(to_merge)) { auto status = index_->Add(file_index->Count(), file_index->GetRawVectors(), file_index->GetRawIds()); if (!status.ok()) { ENGINE_LOG_ERROR << "Merge: Add Error"; ENGINE_LOG_ERROR << "Failed to merge: " << location << " to: " << location_; } return status; } else { Loading core/src/db/meta/MySQLMetaImpl.cpp +19 −9 Original line number Diff line number Diff line Loading @@ -1800,7 +1800,9 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { mysqlpp::Query query = connectionPtr->query(); query << "SELECT id, table_id, file_id, date" << " FROM " << META_TABLEFILES << " WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " FROM " << META_TABLEFILES << " WHERE file_type IN (" << std::to_string(TableFileSchema::TO_DELETE) << "," << std::to_string(TableFileSchema::BACKUP) << ")" << " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";"; ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str(); Loading @@ -1810,11 +1812,13 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { TableFileSchema table_file; std::vector<std::string> idsToDelete; int64_t clean_files = 0; for (auto& resRow : res) { table_file.id_ = resRow["id"]; // implicit conversion resRow["table_id"].to_string(table_file.table_id_); resRow["file_id"].to_string(table_file.file_id_); table_file.date_ = resRow["date"]; table_file.file_type_ = resRow["file_type"]; // check if the file can be deleted if (filter && filter->IsIgnored(table_file)) { Loading @@ -1823,17 +1827,23 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { continue; // ignore this file, don't delete it } // erase file data from cache // because GetTableFilePath won't able to generate file path after the file is deleted utils::GetTableFilePath(options_, table_file); server::CommonUtil::EraseFromCache(table_file.location_); if (table_file.file_type_ == (int)TableFileSchema::TO_DELETE) { // delete file from disk storage utils::DeleteTableFilePath(options_, table_file); ENGINE_LOG_DEBUG << "Removing file id:" << table_file.id_ << " location:" << table_file.location_; // erase file data from cache server::CommonUtil::EraseFromCache(table_file.location_); idsToDelete.emplace_back(std::to_string(table_file.id_)); table_ids.insert(table_file.table_id_); } clean_files++; } // delete file from meta if (!idsToDelete.empty()) { std::stringstream idsToDeleteSS; Loading @@ -1852,8 +1862,8 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { } } if (res.size() > 0) { ENGINE_LOG_DEBUG << "Clean " << res.size() << " files deleted in " << seconds << " seconds"; if (clean_files > 0) { ENGINE_LOG_DEBUG << "Clean " << clean_files << " files deleted in " << seconds << " seconds"; } } // Scoped Connection } catch (std::exception& e) { Loading core/src/db/meta/SqliteMetaImpl.cpp +26 −13 Original line number Diff line number Diff line Loading @@ -1302,6 +1302,11 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { try { server::MetricCollector metric; std::vector<int> file_types = { (int)TableFileSchema::TO_DELETE, (int)TableFileSchema::BACKUP, }; // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard<std::mutex> meta_lock(meta_mutex_); Loading @@ -1309,21 +1314,23 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { auto files = ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::date_), where( c(&TableFileSchema::file_type_) == (int)TableFileSchema::TO_DELETE in(&TableFileSchema::file_type_, file_types) and c(&TableFileSchema::updated_time_) < now - seconds * US_PS)); int64_t clean_files = 0; auto commited = ConnectorPtr->transaction([&]() mutable { TableFileSchema table_file; for (auto& file : files) { table_file.id_ = std::get<0>(file); table_file.table_id_ = std::get<1>(file); table_file.file_id_ = std::get<2>(file); table_file.date_ = std::get<3>(file); table_file.file_type_ = std::get<3>(file); table_file.date_ = std::get<4>(file); // check if the file can be deleted if (filter && filter->IsIgnored(table_file)) { Loading @@ -1332,18 +1339,24 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { continue; // ignore this file, don't delete it } // erase from cache, must do this before file deleted, // because GetTableFilePath won't able to generate file path after the file is deleted utils::GetTableFilePath(options_, table_file); server::CommonUtil::EraseFromCache(table_file.location_); if (table_file.file_type_ == (int)TableFileSchema::TO_DELETE) { // delete file from meta ConnectorPtr->remove<TableFileSchema>(table_file.id_); // delete file from disk storage utils::DeleteTableFilePath(options_, table_file); // erase from cache server::CommonUtil::EraseFromCache(table_file.location_); ENGINE_LOG_DEBUG << "Removing file id:" << table_file.file_id_ << " location:" << table_file.location_; table_ids.insert(table_file.table_id_); } clean_files++; } return true; }); Loading @@ -1351,8 +1364,8 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed"); } if (files.size() > 0) { ENGINE_LOG_DEBUG << "Clean " << files.size() << " files deleted in " << seconds << " seconds"; if (clean_files > 0) { ENGINE_LOG_DEBUG << "Clean " << clean_files << " files deleted in " << seconds << " seconds"; } } catch (std::exception& e) { return HandleException("Encounter exception when clean table files", e.what()); Loading Loading
core/src/cache/Cache.inl +5 −5 Original line number Diff line number Diff line Loading @@ -176,14 +176,14 @@ Cache<ItemObj>::print() { { std::lock_guard<std::mutex> lock(mutex_); cache_count = lru_.size(); } SERVER_LOG_DEBUG << "[Cache item count]: " << cache_count; #if 1 #if 0 for (auto it = lru_.begin(); it != lru_.end(); ++it) { SERVER_LOG_DEBUG << it->first; } #endif } SERVER_LOG_DEBUG << "[Cache item count]: " << cache_count; SERVER_LOG_DEBUG << "[Cache usage]: " << usage_ << " bytes"; SERVER_LOG_DEBUG << "[Cache capacity]: " << capacity_ << " bytes"; } Loading
core/src/db/DBImpl.cpp +6 −8 Original line number Diff line number Diff line Loading @@ -722,7 +722,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m status = meta_ptr_->UpdateTableFile(table_file); ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; ENGINE_LOG_ERROR << "ERROR: failed to persist merged file: " << table_file.location_ ENGINE_LOG_ERROR << "Failed to persist merged file: " << table_file.location_ << ", possible out of disk space or memory"; return status; Loading Loading @@ -803,6 +803,7 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) { if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) { ttl = meta::H_SEC; } meta_ptr_->CleanUpFilesWithTTL(ttl, &ongoing_files_checker_); } Loading Loading @@ -839,14 +840,13 @@ DBImpl::StartBuildIndexTask(bool force) { void DBImpl::BackgroundBuildIndex() { // ENGINE_LOG_TRACE << "Background build index thread start"; std::unique_lock<std::mutex> lock(build_index_mutex_); meta::TableFilesSchema to_index_files; meta_ptr_->FilesToIndex(to_index_files); Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files); if (!to_index_files.empty()) { ENGINE_LOG_DEBUG << "Background build index thread begin"; status = ongoing_files_checker_.MarkOngoingFiles(to_index_files); // step 2: put build index task to scheduler Loading @@ -870,17 +870,15 @@ DBImpl::BackgroundBuildIndex() { index_failed_checker_.MarkFailedIndexFile(file_schema); } else { index_failed_checker_.MarkSucceedIndexFile(file_schema); ENGINE_LOG_DEBUG << "Building index job " << job->id() << " succeed."; index_failed_checker_.MarkSucceedIndexFile(file_schema); } status = ongoing_files_checker_.UnmarkOngoingFile(file_schema); } status = ongoing_files_checker_.UnmarkOngoingFiles(to_index_files); ENGINE_LOG_DEBUG << "Background build index thread finished"; } // ENGINE_LOG_TRACE << "Background build index thread exit"; } Status Loading
core/src/db/engine/ExecutionEngineImpl.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -463,7 +463,7 @@ ExecutionEngineImpl::Merge(const std::string& location) { if (auto file_index = std::dynamic_pointer_cast<BFIndex>(to_merge)) { auto status = index_->Add(file_index->Count(), file_index->GetRawVectors(), file_index->GetRawIds()); if (!status.ok()) { ENGINE_LOG_ERROR << "Merge: Add Error"; ENGINE_LOG_ERROR << "Failed to merge: " << location << " to: " << location_; } return status; } else { Loading
core/src/db/meta/MySQLMetaImpl.cpp +19 −9 Original line number Diff line number Diff line Loading @@ -1800,7 +1800,9 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { mysqlpp::Query query = connectionPtr->query(); query << "SELECT id, table_id, file_id, date" << " FROM " << META_TABLEFILES << " WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " FROM " << META_TABLEFILES << " WHERE file_type IN (" << std::to_string(TableFileSchema::TO_DELETE) << "," << std::to_string(TableFileSchema::BACKUP) << ")" << " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";"; ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str(); Loading @@ -1810,11 +1812,13 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { TableFileSchema table_file; std::vector<std::string> idsToDelete; int64_t clean_files = 0; for (auto& resRow : res) { table_file.id_ = resRow["id"]; // implicit conversion resRow["table_id"].to_string(table_file.table_id_); resRow["file_id"].to_string(table_file.file_id_); table_file.date_ = resRow["date"]; table_file.file_type_ = resRow["file_type"]; // check if the file can be deleted if (filter && filter->IsIgnored(table_file)) { Loading @@ -1823,17 +1827,23 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { continue; // ignore this file, don't delete it } // erase file data from cache // because GetTableFilePath won't able to generate file path after the file is deleted utils::GetTableFilePath(options_, table_file); server::CommonUtil::EraseFromCache(table_file.location_); if (table_file.file_type_ == (int)TableFileSchema::TO_DELETE) { // delete file from disk storage utils::DeleteTableFilePath(options_, table_file); ENGINE_LOG_DEBUG << "Removing file id:" << table_file.id_ << " location:" << table_file.location_; // erase file data from cache server::CommonUtil::EraseFromCache(table_file.location_); idsToDelete.emplace_back(std::to_string(table_file.id_)); table_ids.insert(table_file.table_id_); } clean_files++; } // delete file from meta if (!idsToDelete.empty()) { std::stringstream idsToDeleteSS; Loading @@ -1852,8 +1862,8 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { } } if (res.size() > 0) { ENGINE_LOG_DEBUG << "Clean " << res.size() << " files deleted in " << seconds << " seconds"; if (clean_files > 0) { ENGINE_LOG_DEBUG << "Clean " << clean_files << " files deleted in " << seconds << " seconds"; } } // Scoped Connection } catch (std::exception& e) { Loading
core/src/db/meta/SqliteMetaImpl.cpp +26 −13 Original line number Diff line number Diff line Loading @@ -1302,6 +1302,11 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { try { server::MetricCollector metric; std::vector<int> file_types = { (int)TableFileSchema::TO_DELETE, (int)TableFileSchema::BACKUP, }; // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard<std::mutex> meta_lock(meta_mutex_); Loading @@ -1309,21 +1314,23 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { auto files = ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_, &TableFileSchema::file_type_, &TableFileSchema::date_), where( c(&TableFileSchema::file_type_) == (int)TableFileSchema::TO_DELETE in(&TableFileSchema::file_type_, file_types) and c(&TableFileSchema::updated_time_) < now - seconds * US_PS)); int64_t clean_files = 0; auto commited = ConnectorPtr->transaction([&]() mutable { TableFileSchema table_file; for (auto& file : files) { table_file.id_ = std::get<0>(file); table_file.table_id_ = std::get<1>(file); table_file.file_id_ = std::get<2>(file); table_file.date_ = std::get<3>(file); table_file.file_type_ = std::get<3>(file); table_file.date_ = std::get<4>(file); // check if the file can be deleted if (filter && filter->IsIgnored(table_file)) { Loading @@ -1332,18 +1339,24 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { continue; // ignore this file, don't delete it } // erase from cache, must do this before file deleted, // because GetTableFilePath won't able to generate file path after the file is deleted utils::GetTableFilePath(options_, table_file); server::CommonUtil::EraseFromCache(table_file.location_); if (table_file.file_type_ == (int)TableFileSchema::TO_DELETE) { // delete file from meta ConnectorPtr->remove<TableFileSchema>(table_file.id_); // delete file from disk storage utils::DeleteTableFilePath(options_, table_file); // erase from cache server::CommonUtil::EraseFromCache(table_file.location_); ENGINE_LOG_DEBUG << "Removing file id:" << table_file.file_id_ << " location:" << table_file.location_; table_ids.insert(table_file.table_id_); } clean_files++; } return true; }); Loading @@ -1351,8 +1364,8 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed"); } if (files.size() > 0) { ENGINE_LOG_DEBUG << "Clean " << files.size() << " files deleted in " << seconds << " seconds"; if (clean_files > 0) { ENGINE_LOG_DEBUG << "Clean " << clean_files << " files deleted in " << seconds << " seconds"; } } catch (std::exception& e) { return HandleException("Encounter exception when clean table files", e.what()); Loading