Loading core/src/db/DBImpl.cpp +5 −5 Original line number Diff line number Diff line Loading @@ -1058,7 +1058,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) { // step 4: wait and build index status = index_failed_checker_.CleanFailedIndexFileOfTable(table_id); status = BuildTableIndexRecursively(table_id, index); status = WaitTableIndexRecursively(table_id, index); return status; } Loading Loading @@ -1738,7 +1738,7 @@ DBImpl::UpdateTableIndexRecursively(const std::string& table_id, const TableInde } Status DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex& index) { DBImpl::WaitTableIndexRecursively(const std::string& table_id, const TableIndex& index) { // for IDMAP type, only wait all NEW file converted to RAW file // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files std::vector<int> file_types; Loading Loading @@ -1779,8 +1779,8 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex std::vector<meta::TableSchema> partition_array; status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& schema : partition_array) { status = BuildTableIndexRecursively(schema.table_id_, index); fiu_do_on("DBImpl.BuildTableIndexRecursively.fail_build_table_Index_for_partition", status = WaitTableIndexRecursively(schema.table_id_, index); fiu_do_on("DBImpl.WaitTableIndexRecursively.fail_build_table_Index_for_partition", status = Status(DB_ERROR, "")); if (!status.ok()) { return status; Loading @@ -1790,7 +1790,7 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex // failed to build index for some files, return error std::string err_msg; index_failed_checker_.GetErrMsgForTable(table_id, err_msg); fiu_do_on("DBImpl.BuildTableIndexRecursively.not_empty_err_msg", err_msg.append("fiu")); fiu_do_on("DBImpl.WaitTableIndexRecursively.not_empty_err_msg", err_msg.append("fiu")); if (!err_msg.empty()) { return Status(DB_ERROR, err_msg); } Loading core/src/db/DBImpl.h +1 −1 Original line number Diff line number Diff line Loading @@ -216,7 +216,7 @@ class DBImpl : public DB, public server::CacheConfigHandler { UpdateTableIndexRecursively(const std::string& table_id, const TableIndex& index); Status BuildTableIndexRecursively(const std::string& table_id, const TableIndex& index); WaitTableIndexRecursively(const std::string& table_id, const TableIndex& index); Status DropTableIndexRecursively(const std::string& table_id); Loading core/src/db/insert/MemTable.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -364,7 +364,7 @@ MemTable::ApplyDeletes() { } } status = meta_->UpdateTableFiles(table_files_to_update); status = meta_->UpdateTableFilesRowCount(table_files_to_update); if (!status.ok()) { std::string err_msg = "Failed to apply deletes: " + status.ToString(); Loading core/src/db/meta/Meta.h +3 −0 Original line number Diff line number Diff line Loading @@ -87,6 +87,9 @@ class Meta { virtual Status UpdateTableFiles(TableFilesSchema& files) = 0; virtual Status UpdateTableFilesRowCount(TableFilesSchema& files) = 0; virtual Status UpdateTableIndex(const std::string& table_id, const TableIndex& index) = 0; Loading core/src/db/meta/MySQLMetaImpl.cpp +40 −0 Original line number Diff line number Diff line Loading @@ -1240,6 +1240,46 @@ MySQLMetaImpl::UpdateTableFiles(TableFilesSchema& files) { return Status::OK(); } Status MySQLMetaImpl::UpdateTableFilesRowCount(TableFilesSchema& files) { try { server::MetricCollector metric; { mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_); bool is_null_connection = (connectionPtr == nullptr); if (is_null_connection) { return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } mysqlpp::Query updateTableFilesQuery = connectionPtr->query(); for (auto& file : files) { std::string row_count = std::to_string(file.row_count_); std::string updated_time = std::to_string(utils::GetMicroSecTimeStamp()); updateTableFilesQuery << "UPDATE " << META_TABLEFILES << " SET row_count = " << row_count << " , updated_time = " << updated_time << " WHERE file_id = " << file.file_id_ << ";"; ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFilesRowCount: " << updateTableFilesQuery.str(); if (!updateTableFilesQuery.exec()) { return HandleException("QUERY ERROR WHEN UPDATING TABLE FILES", updateTableFilesQuery.error()); } ENGINE_LOG_DEBUG << "Update file " << file.file_id_ << " row count to " << file.row_count_; } } // Scoped Connection ENGINE_LOG_DEBUG << "Update " << files.size() << " table files"; } catch (std::exception& e) { return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES ROW COUNT", e.what()); } return Status::OK(); } Status MySQLMetaImpl::DescribeTableIndex(const std::string& table_id, TableIndex& index) { try { Loading Loading
core/src/db/DBImpl.cpp +5 −5 Original line number Diff line number Diff line Loading @@ -1058,7 +1058,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) { // step 4: wait and build index status = index_failed_checker_.CleanFailedIndexFileOfTable(table_id); status = BuildTableIndexRecursively(table_id, index); status = WaitTableIndexRecursively(table_id, index); return status; } Loading Loading @@ -1738,7 +1738,7 @@ DBImpl::UpdateTableIndexRecursively(const std::string& table_id, const TableInde } Status DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex& index) { DBImpl::WaitTableIndexRecursively(const std::string& table_id, const TableIndex& index) { // for IDMAP type, only wait all NEW file converted to RAW file // for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files std::vector<int> file_types; Loading Loading @@ -1779,8 +1779,8 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex std::vector<meta::TableSchema> partition_array; status = meta_ptr_->ShowPartitions(table_id, partition_array); for (auto& schema : partition_array) { status = BuildTableIndexRecursively(schema.table_id_, index); fiu_do_on("DBImpl.BuildTableIndexRecursively.fail_build_table_Index_for_partition", status = WaitTableIndexRecursively(schema.table_id_, index); fiu_do_on("DBImpl.WaitTableIndexRecursively.fail_build_table_Index_for_partition", status = Status(DB_ERROR, "")); if (!status.ok()) { return status; Loading @@ -1790,7 +1790,7 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex // failed to build index for some files, return error std::string err_msg; index_failed_checker_.GetErrMsgForTable(table_id, err_msg); fiu_do_on("DBImpl.BuildTableIndexRecursively.not_empty_err_msg", err_msg.append("fiu")); fiu_do_on("DBImpl.WaitTableIndexRecursively.not_empty_err_msg", err_msg.append("fiu")); if (!err_msg.empty()) { return Status(DB_ERROR, err_msg); } Loading
core/src/db/DBImpl.h +1 −1 Original line number Diff line number Diff line Loading @@ -216,7 +216,7 @@ class DBImpl : public DB, public server::CacheConfigHandler { UpdateTableIndexRecursively(const std::string& table_id, const TableIndex& index); Status BuildTableIndexRecursively(const std::string& table_id, const TableIndex& index); WaitTableIndexRecursively(const std::string& table_id, const TableIndex& index); Status DropTableIndexRecursively(const std::string& table_id); Loading
core/src/db/insert/MemTable.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -364,7 +364,7 @@ MemTable::ApplyDeletes() { } } status = meta_->UpdateTableFiles(table_files_to_update); status = meta_->UpdateTableFilesRowCount(table_files_to_update); if (!status.ok()) { std::string err_msg = "Failed to apply deletes: " + status.ToString(); Loading
core/src/db/meta/Meta.h +3 −0 Original line number Diff line number Diff line Loading @@ -87,6 +87,9 @@ class Meta { virtual Status UpdateTableFiles(TableFilesSchema& files) = 0; virtual Status UpdateTableFilesRowCount(TableFilesSchema& files) = 0; virtual Status UpdateTableIndex(const std::string& table_id, const TableIndex& index) = 0; Loading
core/src/db/meta/MySQLMetaImpl.cpp +40 −0 Original line number Diff line number Diff line Loading @@ -1240,6 +1240,46 @@ MySQLMetaImpl::UpdateTableFiles(TableFilesSchema& files) { return Status::OK(); } Status MySQLMetaImpl::UpdateTableFilesRowCount(TableFilesSchema& files) { try { server::MetricCollector metric; { mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_); bool is_null_connection = (connectionPtr == nullptr); if (is_null_connection) { return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } mysqlpp::Query updateTableFilesQuery = connectionPtr->query(); for (auto& file : files) { std::string row_count = std::to_string(file.row_count_); std::string updated_time = std::to_string(utils::GetMicroSecTimeStamp()); updateTableFilesQuery << "UPDATE " << META_TABLEFILES << " SET row_count = " << row_count << " , updated_time = " << updated_time << " WHERE file_id = " << file.file_id_ << ";"; ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFilesRowCount: " << updateTableFilesQuery.str(); if (!updateTableFilesQuery.exec()) { return HandleException("QUERY ERROR WHEN UPDATING TABLE FILES", updateTableFilesQuery.error()); } ENGINE_LOG_DEBUG << "Update file " << file.file_id_ << " row count to " << file.row_count_; } } // Scoped Connection ENGINE_LOG_DEBUG << "Update " << files.size() << " table files"; } catch (std::exception& e) { return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES ROW COUNT", e.what()); } return Status::OK(); } Status MySQLMetaImpl::DescribeTableIndex(const std::string& table_id, TableIndex& index) { try { Loading