Commit a0e4d19e authored by starlord's avatar starlord
Browse files

format db code


Former-commit-id: 3b4781e4c000beea41baddf54635d7bbe7ea5f52
parent 6ed49d1d
Loading
Loading
Loading
Loading
+24 −24
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@

#include <string>
#include <memory>
#include <vector>

namespace zilliz {
namespace milvus {
@@ -72,7 +73,6 @@ public:
    virtual Status DropIndex(const std::string &table_id) = 0;

    virtual Status DropAll() = 0;

}; // DB

using DBPtr = std::shared_ptr<DB>;
+6 −4
Original line number Diff line number Diff line
@@ -16,7 +16,7 @@
// under the License.


#include "DBFactory.h"
#include "db/DBFactory.h"
#include "DBImpl.h"
#include "utils/Exception.h"
#include "meta/MetaFactory.h"
@@ -33,14 +33,16 @@ namespace zilliz {
namespace milvus {
namespace engine {

DBOptions DBFactory::BuildOption() {
DBOptions
DBFactory::BuildOption() {
    auto meta = MetaFactory::BuildOption();
    DBOptions options;
    options.meta_ = meta;
    return options;
}

DBPtr DBFactory::Build(const DBOptions& options) {
DBPtr
DBFactory::Build(const DBOptions &options) {
    return std::make_shared<DBImpl>(options);
}

+5 −6
Original line number Diff line number Diff line
@@ -34,7 +34,6 @@ public:
    static DBPtr Build(const DBOptions &options);
};


}
}
}
} // namespace engine
} // namespace milvus
} // namespace zilliz
+167 −127
Original line number Diff line number Diff line
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include "DBImpl.h"
#include "db/DBImpl.h"
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
#include "engine/EngineFactory.h"
@@ -36,6 +36,7 @@
#include <thread>
#include <iostream>
#include <cstring>
#include <algorithm>
#include <boost/filesystem.hpp>

namespace zilliz {
@@ -48,8 +49,7 @@ constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;

}

} // namespace

DBImpl::DBImpl(const DBOptions &options)
    : options_(options),
@@ -68,7 +68,8 @@ DBImpl::~DBImpl() {
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//external api
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Status DBImpl::Start() {
Status
DBImpl::Start() {
    if (!shutting_down_.load(std::memory_order_acquire)) {
        return Status::OK();
    }
@@ -85,7 +86,8 @@ Status DBImpl::Start() {
    return Status::OK();
}

Status DBImpl::Stop() {
Status
DBImpl::Stop() {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return Status::OK();
    }
@@ -106,11 +108,13 @@ Status DBImpl::Stop() {
    return Status::OK();
}

Status DBImpl::DropAll() {
Status
DBImpl::DropAll() {
    return meta_ptr_->DropAll();
}

Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
Status
DBImpl::CreateTable(meta::TableSchema &table_schema) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return Status(DB_ERROR, "Milsvus server is shutdown!");
    }
@@ -120,7 +124,8 @@ Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
    return meta_ptr_->CreateTable(temp_schema);
}

Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
Status
DBImpl::DeleteTable(const std::string &table_id, const meta::DatesT &dates) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return Status(DB_ERROR, "Milsvus server is shutdown!");
    }
@@ -144,7 +149,8 @@ Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& date
    return Status::OK();
}

Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
Status
DBImpl::DescribeTable(meta::TableSchema &table_schema) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return Status(DB_ERROR, "Milsvus server is shutdown!");
    }
@@ -154,7 +160,8 @@ Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
    return stat;
}

Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
Status
DBImpl::HasTable(const std::string &table_id, bool &has_or_not) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return Status(DB_ERROR, "Milsvus server is shutdown!");
    }
@@ -162,7 +169,8 @@ Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
    return meta_ptr_->HasTable(table_id, has_or_not);
}

Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
Status
DBImpl::AllTables(std::vector<meta::TableSchema> &table_schema_array) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return Status(DB_ERROR, "Milsvus server is shutdown!");
    }
@@ -170,7 +178,8 @@ Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
    return meta_ptr_->AllTables(table_schema_array);
}

Status DBImpl::PreloadTable(const std::string &table_id) {
Status
DBImpl::PreloadTable(const std::string &table_id) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return Status(DB_ERROR, "Milsvus server is shutdown!");
    }
@@ -191,7 +200,11 @@ Status DBImpl::PreloadTable(const std::string &table_id) {

    for (auto &day_files : files) {
        for (auto &file : day_files.second) {
            ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_, (MetricType)file.metric_type_, file.nlist_);
            ExecutionEnginePtr engine = EngineFactory::Build(file.dimension_,
                                                             file.location_,
                                                             (EngineType) file.engine_type_,
                                                             (MetricType) file.metric_type_,
                                                             file.nlist_);
            if (engine == nullptr) {
                ENGINE_LOG_ERROR << "Invalid engine type";
                return Status(DB_ERROR, "Invalid engine type");
@@ -215,7 +228,8 @@ Status DBImpl::PreloadTable(const std::string &table_id) {
    return Status::OK();
}

Status DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
Status
DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return Status(DB_ERROR, "Milsvus server is shutdown!");
    }
@@ -223,7 +237,8 @@ Status DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
    return meta_ptr_->UpdateTableFlag(table_id, flag);
}

Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
Status
DBImpl::GetTableRowCount(const std::string &table_id, uint64_t &row_count) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return Status(DB_ERROR, "Milsvus server is shutdown!");
    }
@@ -231,7 +246,8 @@ Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count
    return meta_ptr_->Count(table_id, row_count);
}

Status DBImpl::InsertVectors(const std::string& table_id_,
Status
DBImpl::InsertVectors(const std::string &table_id_,
                      uint64_t n, const float *vectors, IDNumbers &vector_ids_) {
//    ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
    if (shutting_down_.load(std::memory_order_acquire)) {
@@ -241,7 +257,8 @@ Status DBImpl::InsertVectors(const std::string& table_id_,
    Status status;
    zilliz::milvus::server::CollectInsertMetrics metrics(n, status);
    status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
//    std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    std::chrono::microseconds time_span =
//          std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
//    double average_time = double(time_span.count()) / n;

//    ENGINE_LOG_DEBUG << "Insert vectors to cache finished";
@@ -249,7 +266,8 @@ Status DBImpl::InsertVectors(const std::string& table_id_,
    return status;
}

Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
Status
DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) {
    {
        std::unique_lock<std::mutex> lock(build_index_mutex_);

@@ -316,16 +334,19 @@ Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index)
    return Status::OK();
}

Status DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
Status
DBImpl::DescribeIndex(const std::string &table_id, TableIndex &index) {
    return meta_ptr_->DescribeTableIndex(table_id, index);
}

Status DBImpl::DropIndex(const std::string& table_id) {
Status
DBImpl::DropIndex(const std::string &table_id) {
    ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
    return meta_ptr_->DropTableIndex(table_id);
}

Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
Status
DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
              const float *vectors, QueryResults &results) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return Status(DB_ERROR, "Milsvus server is shutdown!");
@@ -337,7 +358,8 @@ Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint6
    return result;
}

Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
Status
DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe,
              const float *vectors, const meta::DatesT &dates, QueryResults &results) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return Status(DB_ERROR, "Milsvus server is shutdown!");
@@ -364,7 +386,8 @@ Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint6
    return status;
}

Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
Status
DBImpl::Query(const std::string &table_id, const std::vector<std::string> &file_ids,
              uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors,
              const meta::DatesT &dates, QueryResults &results) {
    if (shutting_down_.load(std::memory_order_acquire)) {
@@ -405,7 +428,8 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>
    return status;
}

Status DBImpl::Size(uint64_t& result) {
Status
DBImpl::Size(uint64_t &result) {
    if (shutting_down_.load(std::memory_order_acquire)) {
        return Status(DB_ERROR, "Milsvus server is shutdown!");
    }
@@ -413,28 +437,28 @@ Status DBImpl::Size(uint64_t& result) {
    return meta_ptr_->Size(result);
}


///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//internal methods
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
Status
DBImpl::QueryAsync(const std::string &table_id, const meta::TableFilesSchema &files,
                   uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors,
                   const meta::DatesT &dates, QueryResults &results) {
    using namespace scheduler;
    server::CollectQueryMetrics metrics(nq);

    TimeRecorder rc("");

    //step 1: get files to search
    ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size() << " date range count: " << dates.size();
    SearchJobPtr job = std::make_shared<SearchJob>(0, k, nq, nprobe, vectors);
    ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size() << " date range count: "
                     << dates.size();
    scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(0, k, nq, nprobe, vectors);
    for (auto &file : files) {
        TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        job->AddIndexFile(file_ptr);
    }

    //step 2: put search task to scheduler
    JobMgrInst::GetInstance()->Put(job);
    scheduler::JobMgrInst::GetInstance()->Put(job);
    job->WaitResult();
    if (!job->GetStatus().ok()) {
        return job->GetStatus();
@@ -453,9 +477,12 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
//        double search_percent = search_cost/total_cost;
//        double reduce_percent = reduce_cost/total_cost;
//
//        ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info << " percent: " << load_percent*100 << "%";
//        ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info << " percent: " << search_percent*100 << "%";
//        ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info << " percent: " << reduce_percent*100 << "%";
//        ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info
//          << " percent: " << load_percent*100 << "%";
//        ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info
//          << " percent: " << search_percent*100 << "%";
//        ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info
//          << " percent: " << reduce_percent*100 << "%";
//    } else {
//        ENGINE_LOG_DEBUG << "Engine load cost: " << load_info
//            << " search cost: " << search_info
@@ -469,7 +496,8 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
    return Status::OK();
}

void DBImpl::BackgroundTimerTask() {
void
DBImpl::BackgroundTimerTask() {
    Status status;
    server::SystemInfo::GetInstance().Init();
    while (true) {
@@ -489,21 +517,24 @@ void DBImpl::BackgroundTimerTask() {
    }
}

void DBImpl::WaitMergeFileFinish() {
void
DBImpl::WaitMergeFileFinish() {
    std::lock_guard<std::mutex> lck(compact_result_mutex_);
    for (auto &iter : compact_thread_results_) {
        iter.wait();
    }
}

void DBImpl::WaitBuildIndexFinish() {
void
DBImpl::WaitBuildIndexFinish() {
    std::lock_guard<std::mutex> lck(index_result_mutex_);
    for (auto &iter : index_thread_results_) {
        iter.wait();
    }
}

void DBImpl::StartMetricTask() {
void
DBImpl::StartMetricTask() {
    static uint64_t metric_clock_tick = 0;
    metric_clock_tick++;
    if (metric_clock_tick % METRIC_ACTION_INTERVAL != 0) {
@@ -533,7 +564,8 @@ void DBImpl::StartMetricTask() {
    ENGINE_LOG_TRACE << "Metric task finished";
}

Status DBImpl::MemSerialize() {
Status
DBImpl::MemSerialize() {
    std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
    std::set<std::string> temp_table_ids;
    mem_mgr_->Serialize(temp_table_ids);
@@ -548,7 +580,8 @@ Status DBImpl::MemSerialize() {
    return Status::OK();
}

void DBImpl::StartCompactionTask() {
void
DBImpl::StartCompactionTask() {
    static uint64_t compact_clock_tick = 0;
    compact_clock_tick++;
    if (compact_clock_tick % COMPACT_ACTION_INTERVAL != 0) {
@@ -580,7 +613,8 @@ void DBImpl::StartCompactionTask() {
    }
}

Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
Status
DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date,
                   const meta::TableFilesSchema &files) {
    ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;

@@ -602,7 +636,7 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
                             (MetricType) table_file.metric_type_, table_file.nlist_);

    meta::TableFilesSchema updated;
    long  index_size = 0;
    int64_t index_size = 0;

    for (auto &file : files) {
        server::CollectMergeFilesMetrics metrics;
@@ -658,7 +692,8 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
    return status;
}

Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
Status
DBImpl::BackgroundMergeFiles(const std::string &table_id) {
    meta::DatePartionedTableFilesSchema raw_files;
    auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
    if (!status.ok()) {
@@ -685,7 +720,8 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
    return Status::OK();
}

void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
void
DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
    ENGINE_LOG_TRACE << " Background compaction thread start";

    Status status;
@@ -712,7 +748,8 @@ void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
    ENGINE_LOG_TRACE << " Background compaction thread exit";
}

void DBImpl::StartBuildIndexTask(bool force) {
void
DBImpl::StartBuildIndexTask(bool force) {
    static uint64_t index_clock_tick = 0;
    index_clock_tick++;
    if (!force && (index_clock_tick % INDEX_ACTION_INTERVAL != 0)) {
@@ -740,7 +777,8 @@ void DBImpl::StartBuildIndexTask(bool force) {
    }
}

Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
Status
DBImpl::BuildIndex(const meta::TableFileSchema &file) {
    ExecutionEnginePtr to_index =
        EngineFactory::Build(file.dimension_, file.location_, (EngineType) file.engine_type_,
                             (MetricType) file.metric_type_, file.nlist_);
@@ -761,7 +799,8 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
        table_file.file_type_ = meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path
        table_file.file_type_ =
            meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path
        status = meta_ptr_->CreateTableFile(table_file);
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
@@ -777,11 +816,11 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
            if (index == nullptr) {
                table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
                status = meta_ptr_->UpdateTableFile(table_file);
                ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
                ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_
                                 << " to to_delete";

                return status;
            }

        } catch (std::exception &ex) {
            //typical error: out of gpu memory
            std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
@@ -791,7 +830,8 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";

            std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough" << std::endl;
            std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough"
                      << std::endl;

            return Status(DB_ERROR, msg);
        }
@@ -850,7 +890,6 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
            status = meta_ptr_->UpdateTableFile(table_file);
            ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
        }

    } catch (std::exception &ex) {
        std::string msg = "Build index encounter exception: " + std::string(ex.what());
        ENGINE_LOG_ERROR << msg;
@@ -860,7 +899,8 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
    return Status::OK();
}

void DBImpl::BackgroundBuildIndex() {
void
DBImpl::BackgroundBuildIndex() {
    ENGINE_LOG_TRACE << "Background build index thread start";

    std::unique_lock<std::mutex> lock(build_index_mutex_);
+6 −6
Original line number Diff line number Diff line
@@ -29,7 +29,8 @@
#include <thread>
#include <list>
#include <set>

#include <vector>
#include <string>

namespace zilliz {
namespace milvus {
@@ -151,7 +152,6 @@ class DBImpl : public DB {
    std::list<std::future<void>> index_thread_results_;

    std::mutex build_index_mutex_;

}; // DBImpl


Loading