Commit ef927d13 authored by 余昆's avatar 余昆
Browse files

MS-603 Add BuildIndex to scheduler


Former-commit-id: 54cdf8632649ed2576e2d6bcace2bc73e0814679
parent b87ddb9f
Loading
Loading
Loading
Loading
+6 −8
Original line number Diff line number Diff line
@@ -898,24 +898,22 @@ DBImpl::BackgroundBuildIndex() {
    meta_ptr_->FilesToIndex(to_index_files);
    Status status;

    scheduler::BuildIndexJobPtr
        job = std::make_shared<scheduler::BuildIndexJob>(0, meta_ptr_);

    // step 2: put build index task to scheduler
//    for (auto &file : to_index_files) {
//        std::cout << "get to index file" << std::endl;
//        scheduler::BuildIndexJobPtr
//            job = std::make_shared<scheduler::BuildIndexJob>(0, meta_ptr_, options_);
//
//        scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
//        job->AddToIndexFiles(file_ptr);
//
//        job->AddToIndexFiles(file_ptr);
//        scheduler::JobMgrInst::GetInstance()->Put(job);
//        job->WaitBuildIndexFinish();
//        if (!job->GetStatus().ok()) {
//            Status status = job->GetStatus();
//            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
//        }
//
//    }
//    scheduler::JobMgrInst::GetInstance()->Put(job);
//    job->WaitBuildIndexFinish();


    for (auto &file : to_index_files) {
        std::cout << "get to index file" << std::endl;
+3 −0
Original line number Diff line number Diff line
@@ -66,6 +66,9 @@ class ExecutionEngine {
    virtual Status
    CopyToGpu(uint64_t device_id) = 0;

    virtual Status
    CopyToIndexFileToGpu(uint64_t device_id) = 0;

    virtual Status
    CopyToCpu() = 0;

+12 −2
Original line number Diff line number Diff line
@@ -133,7 +133,6 @@ ExecutionEngineImpl::Serialize() {

Status
ExecutionEngineImpl::Load(bool to_cache) {
    std::cout << "load" << std::endl;
    index_ = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
    bool already_in_cache = (index_ != nullptr);
    if (!already_in_cache) {
@@ -162,7 +161,7 @@ ExecutionEngineImpl::Load(bool to_cache) {

Status
ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
    std::cout << "copy2gpu" << std::endl;
    std::cout << "copytogpu" << std::endl;
    auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
    bool already_in_cache = (index != nullptr);
    if (already_in_cache) {
@@ -189,6 +188,17 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
    return Status::OK();
}

Status
ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) {
    auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
    bool already_in_cache = (index != nullptr);
    if (!already_in_cache) {
        cache::DataObjPtr obj = std::make_shared<cache::DataObj>(nullptr, PhysicalSize());
        milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_, obj);
    }
    return Status::OK();
}

Status
ExecutionEngineImpl::CopyToCpu() {
    auto index = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
+3 −0
Original line number Diff line number Diff line
@@ -58,6 +58,9 @@ class ExecutionEngineImpl : public ExecutionEngine {
    Status
    CopyToGpu(uint64_t device_id) override;

    Status
    CopyToIndexFileToGpu(uint64_t device_id) override; 

    Status
    CopyToCpu() override;

+23 −15
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@
#include "src/cache/GpuCacheMgr.h"
#include "src/server/Config.h"


namespace milvus {
namespace scheduler {

@@ -171,15 +172,22 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
            int32_t build_index_gpu;
            Status stat = config.GetDBConfigBuildIndexGPU(build_index_gpu);

            bool find_gpu_res = false;
            for (uint64_t i = 0; i < compute_resources.size(); ++i) {
                if (res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
                    if (compute_resources[i]->name()
                        == res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
                        find_gpu_res = true;
                        Path task_path(paths[i], paths[i].size() - 1);
                        task->path() = task_path;
                        break;
                    }
                }
            }
            if (not find_gpu_res) {
                task->path() = Path(paths[0], paths[0].size() - 1);
            }
        }
    }

    if (resource->name() == task->path().Last()) {
Loading