Unverified Commit 7ea2ee47 authored by groot's avatar groot Committed by GitHub
Browse files

fix lsn meta bug (#1813)



* fix lsn meta bug

Signed-off-by: default avatargroot <yihua.mo@zilliz.com>

* fix mysql lsn meta bug

Signed-off-by: default avatargroot <yihua.mo@zilliz.com>
parent f211d42e
Loading
Loading
Loading
Loading
+123 −6
Original line number Diff line number Diff line
@@ -135,6 +135,11 @@ class MetaSchema {
    MetaFields fields_;
};

// Environment schema
static const MetaSchema ENVIRONMENT_SCHEMA(META_ENVIRONMENT, {
                                                                 MetaField("global_lsn", "BIGINT", "NOT NULL"),
                                                             });

// Tables schema
static const MetaSchema TABLES_SCHEMA(META_TABLES, {
                                                       MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"),
@@ -240,6 +245,11 @@ MySQLMetaImpl::ValidateMetaSchema() {
        return schema.IsEqual(exist_fields);
    };

    // verify Environment
    if (!validate_func(ENVIRONMENT_SCHEMA)) {
        throw Exception(DB_INCOMPATIB_META, "Meta Environment schema is created by Milvus old version");
    }

    // verify Tables
    if (!validate_func(TABLES_SCHEMA)) {
        throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
@@ -348,6 +358,19 @@ MySQLMetaImpl::Initialize() {
        throw Exception(DB_META_TRANSACTION_FAILED, msg);
    }

    // step 9: create meta table Environment
    InitializeQuery << "CREATE TABLE IF NOT EXISTS " << ENVIRONMENT_SCHEMA.name() << " ("
                    << ENVIRONMENT_SCHEMA.ToString() + ");";

    ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();

    initialize_query_exec = InitializeQuery.exec();
    if (!initialize_query_exec) {
        std::string msg = "Failed to create meta table 'Environment' in MySQL";
        ENGINE_LOG_ERROR << msg;
        throw Exception(DB_META_TRANSACTION_FAILED, msg);
    }

    return Status::OK();
}

@@ -962,14 +985,14 @@ MySQLMetaImpl::UpdateTableFlushLSN(const std::string& table_id, uint64_t flush_l
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query updateTableFlagQuery = connectionPtr->query();
            updateTableFlagQuery << "UPDATE " << META_TABLES << " SET flush_lsn = " << flush_lsn
            mysqlpp::Query statement = connectionPtr->query();
            statement << "UPDATE " << META_TABLES << " SET flush_lsn = " << flush_lsn
                      << " WHERE table_id = " << mysqlpp::quote << table_id << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFlushLSN: " << updateTableFlagQuery.str();
            ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFlushLSN: " << statement.str();

            if (!updateTableFlagQuery.exec()) {
                return HandleException("QUERY ERROR WHEN UPDATING TABLE FLUSH_LSN", updateTableFlagQuery.error());
            if (!statement.exec()) {
                return HandleException("QUERY ERROR WHEN UPDATING TABLE FLUSH_LSN", statement.error());
            }
        }  // Scoped Connection

@@ -983,6 +1006,32 @@ MySQLMetaImpl::UpdateTableFlushLSN(const std::string& table_id, uint64_t flush_l

Status
MySQLMetaImpl::GetTableFlushLSN(const std::string& table_id, uint64_t& flush_lsn) {
    try {
        server::MetricCollector metric;

        mysqlpp::StoreQueryResult res;
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
            if (connectionPtr == nullptr) {
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query statement = connectionPtr->query();
            statement << "SELECT flush_lsn FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote << table_id
                      << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::GetTableFlushLSN: " << statement.str();

            res = statement.store();
        }  // Scoped Connection

        if (!res.empty()) {
            flush_lsn = res[0]["flush_lsn"];
        }
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN GET TABLE FLUSH_LSN", e.what());
    }

    return Status::OK();
}

@@ -2532,11 +2581,79 @@ MySQLMetaImpl::DiscardFiles(int64_t to_discard_size) {

Status
MySQLMetaImpl::SetGlobalLastLSN(uint64_t lsn) {
    try {
        server::MetricCollector metric;
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
            if (connectionPtr == nullptr) {
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            bool first_create = false;
            {
                mysqlpp::StoreQueryResult res;
                mysqlpp::Query statement = connectionPtr->query();
                statement << "SELECT global_lsn FROM " << META_ENVIRONMENT << ";";
                res = statement.store();
                if (res.num_rows() == 0) {
                    first_create = true;
                }
            }

            if (first_create) {  // first time to get global lsn
                mysqlpp::Query statement = connectionPtr->query();
                statement << "INSERT INTO " << META_ENVIRONMENT << " VALUES(" << lsn << ");";
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::SetGlobalLastLSN: " << statement.str();

                if (!statement.exec()) {
                    return HandleException("QUERY ERROR WHEN SET GLOBAL LSN", statement.error());
                }
            } else {
                mysqlpp::Query statement = connectionPtr->query();
                statement << "UPDATE " << META_ENVIRONMENT << " SET global_lsn = " << lsn << ";";
                ENGINE_LOG_DEBUG << "MySQLMetaImpl::SetGlobalLastLSN: " << statement.str();

                if (!statement.exec()) {
                    return HandleException("QUERY ERROR WHEN SET GLOBAL LSN", statement.error());
                }
            }
        }  // Scoped Connection

        ENGINE_LOG_DEBUG << "Successfully update global_lsn = " << lsn;
    } catch (std::exception& e) {
        return HandleException("QUERY ERROR WHEN SET GLOBAL LSN", e.what());
    }

    return Status::OK();
}

Status
MySQLMetaImpl::GetGlobalLastLSN(uint64_t& lsn) {
    try {
        server::MetricCollector metric;

        mysqlpp::StoreQueryResult res;
        {
            mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
            if (connectionPtr == nullptr) {
                return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
            }

            mysqlpp::Query statement = connectionPtr->query();
            statement << "SELECT global_lsn FROM " << META_ENVIRONMENT << ";";

            ENGINE_LOG_DEBUG << "MySQLMetaImpl::GetGlobalLastLSN: " << statement.str();

            res = statement.store();
        }  // Scoped Connection

        if (!res.empty()) {
            lsn = res[0]["global_lsn"];
        }
    } catch (std::exception& e) {
        return HandleException("GENERAL ERROR WHEN GET GLOBAL LSN", e.what());
    }

    return Status::OK();
}

+3 −2
Original line number Diff line number Diff line
@@ -520,7 +520,7 @@ SqliteMetaImpl::UpdateTableFlushLSN(const std::string& table_id, uint64_t flush_

        ConnectorPtr->update_all(set(c(&TableSchema::flush_lsn_) = flush_lsn),
                                 where(c(&TableSchema::table_id_) == table_id));
        ENGINE_LOG_DEBUG << "Successfully update table flush_lsn, table id = " << table_id;
        ENGINE_LOG_DEBUG << "Update table flush_lsn, table_id = " << table_id << " flush_lsn = " << flush_lsn;
    } catch (std::exception& e) {
        std::string msg = "Encounter exception when update table lsn: table_id = " + table_id;
        return HandleException(msg, e.what());
@@ -720,7 +720,7 @@ SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex&
        auto tables = ConnectorPtr->select(
            columns(&TableSchema::id_, &TableSchema::state_, &TableSchema::dimension_, &TableSchema::created_on_,
                    &TableSchema::flag_, &TableSchema::index_file_size_, &TableSchema::owner_table_,
                    &TableSchema::partition_tag_, &TableSchema::version_),
                    &TableSchema::partition_tag_, &TableSchema::version_, &TableSchema::flush_lsn_),
            where(c(&TableSchema::table_id_) == table_id and c(&TableSchema::state_) != (int)TableSchema::TO_DELETE));

        if (tables.size() > 0) {
@@ -735,6 +735,7 @@ SqliteMetaImpl::UpdateTableIndex(const std::string& table_id, const TableIndex&
            table_schema.owner_table_ = std::get<6>(tables[0]);
            table_schema.partition_tag_ = std::get<7>(tables[0]);
            table_schema.version_ = std::get<8>(tables[0]);
            table_schema.flush_lsn_ = std::get<9>(tables[0]);
            table_schema.engine_type_ = index.engine_type_;
            table_schema.index_params_ = index.extra_params_.dump();
            table_schema.metric_type_ = index.metric_type_;