Loading cpp/src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h +1 −0 Original line number Diff line number Diff line Loading @@ -60,6 +60,7 @@ class IVFSQHybrid : public GPUIVFSQ { void UnsetQuantizer(); // todo(xiaojun): return void => VecIndex void LoadData(const knowhere::QuantizerPtr& q, const Config& conf); Loading cpp/src/db/engine/ExecutionEngine.h +2 −2 Original line number Diff line number Diff line Loading @@ -65,7 +65,7 @@ class ExecutionEngine { Load(bool to_cache = true) = 0; virtual Status CopyToGpu(uint64_t device_id) = 0; CopyToGpu(uint64_t device_id, bool hybrid) = 0; virtual Status CopyToIndexFileToGpu(uint64_t device_id) = 0; Loading @@ -80,7 +80,7 @@ class ExecutionEngine { Merge(const std::string& location) = 0; virtual Status Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels) const = 0; Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels, bool hybrid) const = 0; virtual std::shared_ptr<ExecutionEngine> BuildIndex(const std::string& location, EngineType engine_type) = 0; Loading cpp/src/db/engine/ExecutionEngineImpl.cpp +38 −3 Original line number Diff line number Diff line Loading @@ -35,6 +35,7 @@ #include <stdexcept> #include <utility> #include <vector> #include <src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h> namespace milvus { namespace engine { Loading Loading @@ -245,7 +246,39 @@ ExecutionEngineImpl::Load(bool to_cache) { } Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) { if (hybrid) { auto key = location_ + ".quantizer"; auto quantizer = std::static_pointer_cast<CachedQuantizer>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(key)); auto conf = std::make_shared<knowhere::QuantizerCfg>(); conf->gpu_id = device_id; if (quantizer) { // cache hit conf->mode = 2; index_->SetQuantizer(quantizer->Data()); index_->LoadData(quantizer->Data(), conf); } else { // cache miss if (index_ == nullptr) { ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to gpu"; return Status(DB_ERROR, "index is null"); } conf->mode = 1; auto q = index_->LoadQuantizer(conf); index_->SetQuantizer(q); conf->mode = 2; index_->LoadData(q, conf); // cache auto cached_quantizer = std::make_shared<CachedQuantizer>(q); cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(key, cached_quantizer); } return Status::OK(); } auto index = std::static_pointer_cast<VecIndex>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_)); bool already_in_cache = (index != nullptr); if (already_in_cache) { Loading Loading @@ -390,7 +423,7 @@ ExecutionEngineImpl::BuildIndex(const std::string& location, EngineType engine_t Status ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels) const { int64_t* labels, bool hybrid) const { if (index_ == nullptr) { ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to search"; return Status(DB_ERROR, "index is null"); Loading @@ -406,7 +439,9 @@ ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t npr auto adapter = AdapterMgr::GetInstance().GetAdapter(index_->GetType()); auto conf = adapter->MatchSearch(temp_conf, index_->GetType()); if (hybrid) { HybridLoad(); } auto status = index_->Search(n, data, distances, labels, conf); Loading cpp/src/db/engine/ExecutionEngineImpl.h +8 −2 Original line number Diff line number Diff line Loading @@ -56,7 +56,7 @@ class ExecutionEngineImpl : public ExecutionEngine { Load(bool to_cache) override; Status CopyToGpu(uint64_t device_id) override; CopyToGpu(uint64_t device_id, bool hybrid = false) override; Status CopyToIndexFileToGpu(uint64_t device_id) override; Loading @@ -71,7 +71,13 @@ class ExecutionEngineImpl : public ExecutionEngine { Merge(const std::string& location) override; Status Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels) const override; Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels, bool hybrid = false) const override; ExecutionEnginePtr BuildIndex(const std::string& location, EngineType engine_type) override; Loading cpp/src/scheduler/JobMgr.cpp +28 −2 Original line number Diff line number Diff line Loading @@ -20,8 +20,10 @@ #include "TaskCreator.h" #include "optimizer/Optimizer.h" #include "task/Task.h" #include "scheduler/tasklabel/SpecResLabel.h" #include "scheduler/optimizer/Optimizer.h" #include "scheduler/Algorithm.h" #include <src/scheduler/optimizer/Optimizer.h> #include <utility> namespace milvus { Loading Loading @@ -60,7 +62,9 @@ void JobMgr::worker_function() { while (running_) { std::unique_lock<std::mutex> lock(mutex_); cv_.wait(lock, [this] { return !queue_.empty(); }); cv_.wait(lock, [this] { return !queue_.empty(); }); auto job = queue_.front(); queue_.pop(); lock.unlock(); Loading @@ -73,6 +77,10 @@ JobMgr::worker_function() { OptimizerInst::GetInstance()->Run(task); } for (auto& task: tasks) { calculate_path(task); } // disk resources NEVER be empty. if (auto disk = res_mgr_->GetDiskResources()[0].lock()) { for (auto& task : tasks) { Loading @@ -87,5 +95,23 @@ JobMgr::build_task(const JobPtr& job) { return TaskCreator::Create(job); } void JobMgr::calculate_path(const TaskPtr& task) { if (task->type_ != TaskType::SearchTask) { return; } if (task->label()->Type() != TaskLabelType::SPECIFIED_RESOURCE) { return; } std::vector<std::string> path; auto spec_label = std::static_pointer_cast<SpecResLabel>(task->label()); auto src = res_mgr_->GetDiskResources()[0]; auto dest = spec_label->resource(); ShortestPath(src.lock(), dest.lock(), res_mgr_, path); task->path() = Path(path, path.size() - 1); } } // namespace scheduler } // namespace milvus Loading
cpp/src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h +1 −0 Original line number Diff line number Diff line Loading @@ -60,6 +60,7 @@ class IVFSQHybrid : public GPUIVFSQ { void UnsetQuantizer(); // todo(xiaojun): return void => VecIndex void LoadData(const knowhere::QuantizerPtr& q, const Config& conf); Loading
cpp/src/db/engine/ExecutionEngine.h +2 −2 Original line number Diff line number Diff line Loading @@ -65,7 +65,7 @@ class ExecutionEngine { Load(bool to_cache = true) = 0; virtual Status CopyToGpu(uint64_t device_id) = 0; CopyToGpu(uint64_t device_id, bool hybrid) = 0; virtual Status CopyToIndexFileToGpu(uint64_t device_id) = 0; Loading @@ -80,7 +80,7 @@ class ExecutionEngine { Merge(const std::string& location) = 0; virtual Status Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels) const = 0; Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels, bool hybrid) const = 0; virtual std::shared_ptr<ExecutionEngine> BuildIndex(const std::string& location, EngineType engine_type) = 0; Loading
cpp/src/db/engine/ExecutionEngineImpl.cpp +38 −3 Original line number Diff line number Diff line Loading @@ -35,6 +35,7 @@ #include <stdexcept> #include <utility> #include <vector> #include <src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h> namespace milvus { namespace engine { Loading Loading @@ -245,7 +246,39 @@ ExecutionEngineImpl::Load(bool to_cache) { } Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) { if (hybrid) { auto key = location_ + ".quantizer"; auto quantizer = std::static_pointer_cast<CachedQuantizer>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(key)); auto conf = std::make_shared<knowhere::QuantizerCfg>(); conf->gpu_id = device_id; if (quantizer) { // cache hit conf->mode = 2; index_->SetQuantizer(quantizer->Data()); index_->LoadData(quantizer->Data(), conf); } else { // cache miss if (index_ == nullptr) { ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to gpu"; return Status(DB_ERROR, "index is null"); } conf->mode = 1; auto q = index_->LoadQuantizer(conf); index_->SetQuantizer(q); conf->mode = 2; index_->LoadData(q, conf); // cache auto cached_quantizer = std::make_shared<CachedQuantizer>(q); cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(key, cached_quantizer); } return Status::OK(); } auto index = std::static_pointer_cast<VecIndex>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_)); bool already_in_cache = (index != nullptr); if (already_in_cache) { Loading Loading @@ -390,7 +423,7 @@ ExecutionEngineImpl::BuildIndex(const std::string& location, EngineType engine_t Status ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels) const { int64_t* labels, bool hybrid) const { if (index_ == nullptr) { ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to search"; return Status(DB_ERROR, "index is null"); Loading @@ -406,7 +439,9 @@ ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t npr auto adapter = AdapterMgr::GetInstance().GetAdapter(index_->GetType()); auto conf = adapter->MatchSearch(temp_conf, index_->GetType()); if (hybrid) { HybridLoad(); } auto status = index_->Search(n, data, distances, labels, conf); Loading
cpp/src/db/engine/ExecutionEngineImpl.h +8 −2 Original line number Diff line number Diff line Loading @@ -56,7 +56,7 @@ class ExecutionEngineImpl : public ExecutionEngine { Load(bool to_cache) override; Status CopyToGpu(uint64_t device_id) override; CopyToGpu(uint64_t device_id, bool hybrid = false) override; Status CopyToIndexFileToGpu(uint64_t device_id) override; Loading @@ -71,7 +71,13 @@ class ExecutionEngineImpl : public ExecutionEngine { Merge(const std::string& location) override; Status Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels) const override; Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels, bool hybrid = false) const override; ExecutionEnginePtr BuildIndex(const std::string& location, EngineType engine_type) override; Loading
cpp/src/scheduler/JobMgr.cpp +28 −2 Original line number Diff line number Diff line Loading @@ -20,8 +20,10 @@ #include "TaskCreator.h" #include "optimizer/Optimizer.h" #include "task/Task.h" #include "scheduler/tasklabel/SpecResLabel.h" #include "scheduler/optimizer/Optimizer.h" #include "scheduler/Algorithm.h" #include <src/scheduler/optimizer/Optimizer.h> #include <utility> namespace milvus { Loading Loading @@ -60,7 +62,9 @@ void JobMgr::worker_function() { while (running_) { std::unique_lock<std::mutex> lock(mutex_); cv_.wait(lock, [this] { return !queue_.empty(); }); cv_.wait(lock, [this] { return !queue_.empty(); }); auto job = queue_.front(); queue_.pop(); lock.unlock(); Loading @@ -73,6 +77,10 @@ JobMgr::worker_function() { OptimizerInst::GetInstance()->Run(task); } for (auto& task: tasks) { calculate_path(task); } // disk resources NEVER be empty. if (auto disk = res_mgr_->GetDiskResources()[0].lock()) { for (auto& task : tasks) { Loading @@ -87,5 +95,23 @@ JobMgr::build_task(const JobPtr& job) { return TaskCreator::Create(job); } void JobMgr::calculate_path(const TaskPtr& task) { if (task->type_ != TaskType::SearchTask) { return; } if (task->label()->Type() != TaskLabelType::SPECIFIED_RESOURCE) { return; } std::vector<std::string> path; auto spec_label = std::static_pointer_cast<SpecResLabel>(task->label()); auto src = res_mgr_->GetDiskResources()[0]; auto dest = spec_label->resource(); ShortestPath(src.lock(), dest.lock(), res_mgr_, path); task->path() = Path(path, path.size() - 1); } } // namespace scheduler } // namespace milvus