Commit 142019ac authored by 余昆's avatar 余昆
Browse files

Merge remote-tracking branch 'upstream/branch-0.5.0' into branch-0.5.0-yk


Former-commit-id: 45eb9b37e98e38a8aeee374c529f327549989734
parents c6beac2d 648a535e
Loading
Loading
Loading
Loading
+0 −5
Original line number Diff line number Diff line
@@ -21,11 +21,6 @@ namespace milvus {
namespace engine {
namespace meta {

const size_t K = 1024UL;
const size_t M = K * K;
const size_t G = K * M;
const size_t T = K * G;

const size_t S_PS = 1UL;
const size_t MS_PS = 1000 * S_PS;
const size_t US_PS = 1000 * MS_PS;
+4 −6
Original line number Diff line number Diff line
@@ -66,16 +66,14 @@ JobMgr::worker_function() {
        }

        auto tasks = build_task(job);
        auto disk_list = res_mgr_->GetDiskResources();
        if (!disk_list.empty()) {
            if (auto disk = disk_list[0].lock()) {
        // disk resources NEVER be empty.
        if (auto disk = res_mgr_->GetDiskResources()[0].lock()) {
            for (auto& task : tasks) {
                disk->task_table().Put(task);
            }
        }
    }
}
}

std::vector<TaskPtr>
JobMgr::build_task(const JobPtr& job) {
+15 −9
Original line number Diff line number Diff line
@@ -79,9 +79,7 @@ ResourceMgr::Add(ResourcePtr&& resource) {
            gpu_resources_.emplace_back(ResourceWPtr(resource));
            break;
        }
        default: {
            break;
        }
        default: { break; }
    }
    resources_.emplace_back(resource);

@@ -104,6 +102,10 @@ ResourceMgr::Connect(const std::string& name1, const std::string& name2, Connect
void
ResourceMgr::Clear() {
    std::lock_guard<std::mutex> lck(resources_mutex_);
    if (running_) {
        ENGINE_LOG_ERROR << "ResourceMgr is running, cannot clear.";
        return;
    }
    disk_resources_.clear();
    cpu_resources_.clear();
    gpu_resources_.clear();
@@ -196,13 +198,19 @@ bool
ResourceMgr::check_resource_valid() {
    {
        // TODO: check one disk-resource, one cpu-resource, zero or more gpu-resource;
        if (GetDiskResources().size() != 1) return false;
        if (GetCpuResources().size() != 1) return false;
        if (GetDiskResources().size() != 1) {
            return false;
        }
        if (GetCpuResources().size() != 1) {
            return false;
        }
    }

    {
        // TODO: one compute-resource at least;
        if (GetNumOfComputeResource() < 1) return false;
        if (GetNumOfComputeResource() < 1) {
            return false;
        }
    }

    {
@@ -233,9 +241,7 @@ void
ResourceMgr::event_process() {
    while (running_) {
        std::unique_lock<std::mutex> lock(event_mutex_);
        event_cv_.wait(lock, [this] {
            return !queue_.empty();
        });
        event_cv_.wait(lock, [this] { return !queue_.empty(); });

        auto event = queue_.front();
        queue_.pop();
+5 −8
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@
#include "utils/Log.h"
#include "utils/TimeRecorder.h"

#include <algorithm>
#include <string>
#include <thread>
#include <utility>
@@ -34,6 +35,7 @@ static constexpr size_t PARALLEL_REDUCE_BATCH = 1000;

std::mutex XSearchTask::merge_mutex_;

// TODO(wxyu): remove unused code
// bool
// NeedParallelReduce(uint64_t nq, uint64_t topk) {
//    server::ServerConfig &config = server::ServerConfig::GetInstance();
@@ -229,13 +231,8 @@ XSearchTask::Execute() {
}

Status
XSearchTask::TopkResult(const std::vector<long> &input_ids,
                        const std::vector<float> &input_distance,
                        uint64_t input_k,
                        uint64_t nq,
                        uint64_t topk,
                        bool ascending,
                        scheduler::ResultSet &result) {
XSearchTask::TopkResult(const std::vector<int64_t>& input_ids, const std::vector<float>& input_distance,
                        uint64_t input_k, uint64_t nq, uint64_t topk, bool ascending, scheduler::ResultSet& result) {
    scheduler::ResultSet result_buf;

    if (result.empty()) {
+2 −7
Original line number Diff line number Diff line
@@ -39,13 +39,8 @@ class XSearchTask : public Task {

 public:
    static Status
    TopkResult(const std::vector<long> &input_ids,
               const std::vector<float> &input_distance,
               uint64_t input_k,
               uint64_t nq,
               uint64_t topk,
               bool ascending,
               scheduler::ResultSet &result);
    TopkResult(const std::vector<int64_t>& input_ids, const std::vector<float>& input_distance, uint64_t input_k,
               uint64_t nq, uint64_t topk, bool ascending, scheduler::ResultSet& result);

 public:
    TableFileSchemaPtr file_;
Loading