Commit 65cbff70 authored by groot's avatar groot
Browse files

MS-587 Count get wrong result after adding vectors and index built immediately


Former-commit-id: 9a634a66f20f72c8e9b71d877176dd1483b41c29
parent d48161d4
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-568 - Fix gpuresource free error
- MS-572 - Milvus crash when get SIGINT
- MS-577 - Unittest Query randomly hung
- MS-587 - Count get wrong result after adding vectors and index built immediately

## Improvement
- MS-552 - Add and change the easylogging library
+46 −20
Original line number Diff line number Diff line
@@ -275,7 +275,11 @@ Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index)
        }
    }

    //step 3: wait and build index
    //step 3: let merge file thread finish
    //to avoid duplicate data bug
    WaitMergeFileFinish();

    //step 4: wait and build index
    //for IDMAP type, only wait all NEW file converted to RAW file
    //for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
    std::vector<int> file_types;
@@ -470,12 +474,8 @@ void DBImpl::BackgroundTimerTask() {
    server::SystemInfo::GetInstance().Init();
    while (true) {
        if (shutting_down_.load(std::memory_order_acquire)){
            for(auto& iter : compact_thread_results_) {
                iter.wait();
            }
            for(auto& iter : index_thread_results_) {
                iter.wait();
            }
            WaitMergeFileFinish();
            WaitBuildIndexFinish();

            ENGINE_LOG_DEBUG << "DB background thread exit";
            break;
@@ -489,6 +489,20 @@ void DBImpl::BackgroundTimerTask() {
    }
}

void DBImpl::WaitMergeFileFinish() {
    std::lock_guard<std::mutex> lck(compact_result_mutex_);
    for(auto& iter : compact_thread_results_) {
        iter.wait();
    }
}

void DBImpl::WaitBuildIndexFinish() {
    std::lock_guard<std::mutex> lck(index_result_mutex_);
    for(auto& iter : index_thread_results_) {
        iter.wait();
    }
}

void DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
@@ -545,20 +559,26 @@ void DBImpl::StartCompactionTask() {
    MemSerialize();

    //compactiong has been finished?
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (!compact_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (compact_thread_results_.back().wait_for(span) == std::future_status::ready) {
                compact_thread_results_.pop_back();
            }
        }
    }

    //add new compaction task
    {
        std::lock_guard<std::mutex> lck(compact_result_mutex_);
        if (compact_thread_results_.empty()) {
            compact_thread_results_.push_back(
                compact_thread_pool_.enqueue(&DBImpl::BackgroundCompaction, this, compact_table_ids_));
            compact_table_ids_.clear();
        }
    }
}

Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
        const meta::TableFilesSchema& files) {
@@ -700,19 +720,25 @@ void DBImpl::StartBuildIndexTask(bool force) {
    }

    //build index has been finished?
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (!index_thread_results_.empty()) {
            std::chrono::milliseconds span(10);
            if (index_thread_results_.back().wait_for(span) == std::future_status::ready) {
                index_thread_results_.pop_back();
            }
        }
    }

    //add new build index task
    {
        std::lock_guard<std::mutex> lck(index_result_mutex_);
        if (index_thread_results_.empty()) {
            index_thread_results_.push_back(
                index_thread_pool_.enqueue(&DBImpl::BackgroundBuildIndex, this));
        }
    }
}

Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
    ExecutionEnginePtr to_index =
+4 −0
Original line number Diff line number Diff line
@@ -111,6 +111,8 @@ class DBImpl : public DB {
                      QueryResults &results);

    void BackgroundTimerTask();
    void WaitMergeFileFinish();
    void WaitBuildIndexFinish();

    void StartMetricTask();

@@ -140,10 +142,12 @@ class DBImpl : public DB {
    std::mutex mem_serialize_mutex_;

    ThreadPool compact_thread_pool_;
    std::mutex compact_result_mutex_;
    std::list<std::future<void>> compact_thread_results_;
    std::set<std::string> compact_table_ids_;

    ThreadPool index_thread_pool_;
    std::mutex index_result_mutex_;
    std::list<std::future<void>> index_thread_results_;

    std::mutex build_index_mutex_;
+1 −1
Original line number Diff line number Diff line
@@ -35,7 +35,7 @@ EngineFactory::Build(uint16_t dimension,
        return nullptr;
    }

    ENGINE_LOG_DEBUG << "EngineFactory EngineTypee: " << (int)index_type;
    ENGINE_LOG_DEBUG << "EngineFactory index type: " << (int)index_type;
    ExecutionEnginePtr execution_engine_ptr =
            std::make_shared<ExecutionEngineImpl>(dimension, location, index_type, metric_type, nlist);

+54 −2
Original line number Diff line number Diff line
@@ -373,6 +373,9 @@ Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
                return HandleException("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES", dropPartitionsByDatesQuery.error());
            }
        } //Scoped Connection

        ENGINE_LOG_DEBUG << "Successfully drop partitions, table id = " << table_schema.table_id_;

    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DROPPING PARTITIONS BY DATES", e.what());
    }
@@ -443,6 +446,7 @@ Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
            }
        } //Scoped Connection

        ENGINE_LOG_DEBUG << "Successfully create table: " << table_schema.table_id_;
        return utils::CreateTablePath(options_, table_schema.table_id_);

    } catch (std::exception &e) {
@@ -589,6 +593,8 @@ Status MySQLMetaImpl::UpdateTableIndex(const std::string &table_id, const TableI

        } //Scoped Connection

        ENGINE_LOG_DEBUG << "Successfully update table index, table id = " << table_id;

    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE INDEX PARAM", e.what());
    }
@@ -621,6 +627,8 @@ Status MySQLMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag)

        } //Scoped Connection

        ENGINE_LOG_DEBUG << "Successfully update table flag, table id = " << table_id;

    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLAG", e.what());
    }
@@ -725,6 +733,8 @@ Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {

        } //Scoped Connection

        ENGINE_LOG_DEBUG << "Successfully drop table index, table id = " << table_id;

    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DROPPING TABLE INDEX", e.what());
    }
@@ -762,6 +772,8 @@ Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
            DeleteTableFiles(table_id);
        }

        ENGINE_LOG_DEBUG << "Successfully delete table, table id = " << table_id;

    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DELETING TABLE", e.what());
    }
@@ -795,6 +807,9 @@ Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
                return HandleException("QUERY ERROR WHEN DELETING TABLE FILES", deleteTableFilesQuery.error());
            }
        } //Scoped Connection

        ENGINE_LOG_DEBUG << "Successfully delete table files, table id = " << table_id;

    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN DELETING TABLE FILES", e.what());
    }
@@ -1001,6 +1016,7 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
            }
        } // Scoped Connection

        ENGINE_LOG_DEBUG << "Successfully create table file, file id = " << file_schema.file_id_;
        return utils::CreateTableFilePath(options_, file_schema);

    } catch (std::exception &e) {
@@ -1082,6 +1098,9 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
            files.push_back(table_file);
        }

        if(res.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << res.size() << " to-index files";
        }
        return ret;

    } catch (std::exception &e) {
@@ -1195,6 +1214,9 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
            files[table_file.date_].push_back(table_file);
        }

        if(res.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << res.size() << " to-search files";
        }
        return ret;
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", e.what());
@@ -1285,6 +1307,9 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
            files[table_file.date_].push_back(table_file);
        }

        if(res.size() > 0) {
            ENGINE_LOG_DEBUG << "Collect " << res.size() << " to-merge files";
        }
        return ret;

    } catch (std::exception &e) {
@@ -1369,6 +1394,7 @@ Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
            table_files.emplace_back(file_schema);
        }

        ENGINE_LOG_DEBUG << "Get table files by id";
        return ret;

    } catch (std::exception &e) {
@@ -1386,7 +1412,7 @@ Status MySQLMetaImpl::Archive() {
    for (auto &kv : criterias) {
        auto &criteria = kv.first;
        auto &limit = kv.second;
        if (criteria == "days") {
        if (criteria == engine::ARCHIVE_CONF_DAYS) {
            size_t usecs = limit * D_SEC * US_PS;
            long now = utils::GetMicroSecTimeStamp();

@@ -1410,16 +1436,20 @@ Status MySQLMetaImpl::Archive() {
                    return HandleException("QUERY ERROR DURING ARCHIVE", archiveQuery.error());
                }

                ENGINE_LOG_DEBUG << "Archive old files";

            } catch (std::exception &e) {
                return HandleException("GENERAL ERROR WHEN DURING ARCHIVE", e.what());
            }
        }
        if (criteria == "disk") {
        if (criteria == engine::ARCHIVE_CONF_DISK) {
            uint64_t sum = 0;
            Size(sum);

            auto to_delete = (sum - limit * G);
            DiscardFiles(to_delete);

            ENGINE_LOG_DEBUG << "Archive files to free disk";
        }
    }

@@ -1596,6 +1626,8 @@ Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
            }
        } //Scoped Connection

        ENGINE_LOG_DEBUG << "Update single table file, file id = " << file_schema.file_id_;

    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILE", e.what());
    }
@@ -1625,6 +1657,8 @@ Status MySQLMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
            return HandleException("QUERY ERROR WHEN UPDATING TABLE FILE TO INDEX", updateTableFilesToIndexQuery.error());
        }

        ENGINE_LOG_DEBUG << "Update files to to_index, table id = " << table_id;

    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES TO INDEX", e.what());
    }
@@ -1705,6 +1739,8 @@ Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
            }
        } //Scoped Connection

        ENGINE_LOG_DEBUG << "Update " << files.size() << " table files";

    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES", e.what());
    }
@@ -1782,6 +1818,11 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
                    return HandleException("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", cleanUpFilesWithTTLQuery.error());
                }
            }

            if(res.size() > 0) {
                ENGINE_LOG_DEBUG << "Clean " << res.size() << " files deleted in " << seconds << " seconds";
            }

        } //Scoped Connection

    } catch (std::exception &e) {
@@ -1832,6 +1873,10 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
                    return HandleException("QUERY ERROR WHEN CLEANING UP TABLES WITH TTL", cleanUpFilesWithTTLQuery.error());
                }
            }

            if(res.size() > 0) {
                ENGINE_LOG_DEBUG << "Remove " << res.size() << " tables from meta";
            }
        } //Scoped Connection

    } catch (std::exception &e) {
@@ -1864,6 +1909,10 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
                    utils::DeleteTablePath(options_, table_id);
                }
            }

            if(table_ids.size() > 0) {
                ENGINE_LOG_DEBUG << "Remove " << table_ids.size() << " tables folder";
            }
        }
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
@@ -1904,6 +1953,9 @@ Status MySQLMetaImpl::CleanUp() {
            }
        }

        if(res.size() > 0) {
            ENGINE_LOG_DEBUG << "Clean " << res.size() << " files";
        }
    } catch (std::exception &e) {
        return HandleException("GENERAL ERROR WHEN CLEANING UP FILES", e.what());
    }
Loading