Commit 549fb512 authored by 余昆's avatar 余昆
Browse files

fix conflict


Former-commit-id: ab8d191bc0b4726c2e8e05a1c7123379c120efc5
parents ef927d13 56a4283b
Loading
Loading
Loading
Loading
+7 −2
Original line number Diff line number Diff line
@@ -14,15 +14,19 @@ Please mark all change in change log and use the ticket from JIRA.
## Improvement
- MS-552 - Add and change the easylogging library
- MS-553 - Refine cache code
- MS-557 - Merge Log.h
- MS-555 - Remove old scheduler
- MS-556 - Add Job Definition in Scheduler
- MS-557 - Merge Log.h
- MS-558 - Refine status code
- MS-562 - Add JobMgr and TaskCreator in Scheduler
- MS-566 - Refactor cmake
- MS-555 - Remove old scheduler
- MS-574 - Milvus configuration refactor
- MS-578 - Make sure milvus5.0 don't crack 0.3.1 data
- MS-585 - Update namespace in scheduler
- MS-606 - Speed up result reduce
- MS-608 - Update TODO names
- MS-609 - Update task construct function
- MS-611 - Add resources validity check in ResourceMgr

## New Feature

@@ -36,6 +40,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-590 - Refine cmake code to support cpplint
- MS-600 - Reconstruct unittest code
- MS-602 - Remove zilliz namespace
- MS-610 - Change error code base value from hex to decimal

# Milvus 0.4.0 (2019-09-12)

+61 −9
Original line number Diff line number Diff line
@@ -24,6 +24,12 @@ namespace scheduler {

void
ResourceMgr::Start() {
    if (not check_resource_valid()) {
        ENGINE_LOG_ERROR << "Resources invalid, cannot start ResourceMgr.";
        ENGINE_LOG_ERROR << Dump();
        return;
    }

    std::lock_guard<std::mutex> lck(resources_mutex_);
    for (auto& resource : resources_) {
        resource->Start();
@@ -60,8 +66,22 @@ ResourceMgr::Add(ResourcePtr&& resource) {

    resource->RegisterSubscriber(std::bind(&ResourceMgr::post_event, this, std::placeholders::_1));

    if (resource->type() == ResourceType::DISK) {
    switch (resource->type()) {
        case ResourceType::DISK: {
            disk_resources_.emplace_back(ResourceWPtr(resource));
            break;
        }
        case ResourceType::CPU: {
            cpu_resources_.emplace_back(ResourceWPtr(resource));
            break;
        }
        case ResourceType::GPU: {
            gpu_resources_.emplace_back(ResourceWPtr(resource));
            break;
        }
        default: {
            break;
        }
    }
    resources_.emplace_back(resource);

@@ -74,7 +94,7 @@ ResourceMgr::Connect(const std::string& name1, const std::string& name2, Connect
    auto res2 = GetResource(name2);
    if (res1 && res2) {
        res1->AddNeighbour(std::static_pointer_cast<Node>(res2), connection);
        // TODO(wxy): enable when task balance supported
        // TODO(wxyu): enable when task balance supported
        //        res2->AddNeighbour(std::static_pointer_cast<Node>(res1), connection);
        return true;
    }
@@ -85,6 +105,8 @@ void
ResourceMgr::Clear() {
    std::lock_guard<std::mutex> lck(resources_mutex_);
    disk_resources_.clear();
    cpu_resources_.clear();
    gpu_resources_.clear();
    resources_.clear();
}

@@ -148,14 +170,14 @@ ResourceMgr::GetNumGpuResource() const {

std::string
ResourceMgr::Dump() {
    std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n";
    std::stringstream ss;
    ss << "ResourceMgr contains " << resources_.size() << " resources." << std::endl;

    for (uint64_t i = 0; i < resources_.size(); ++i) {
        str += "Resource No." + std::to_string(i) + ":\n";
        // str += resources_[i]->Dump();
    for (auto& res : resources_) {
        ss << res->Dump();
    }

    return str;
    return ss.str();
}

std::string
@@ -170,6 +192,34 @@ ResourceMgr::DumpTaskTables() {
    return ss.str();
}

bool
ResourceMgr::check_resource_valid() {
    {
        // TODO: check one disk-resource, one cpu-resource, zero or more gpu-resource;
        if (GetDiskResources().size() != 1) return false;
        if (GetCpuResources().size() != 1) return false;
    }

    {
        // TODO: one compute-resource at least;
        if (GetNumOfComputeResource() < 1) return false;
    }

    {
        // TODO: check disk only connect with cpu
    }

    {
        // TODO: check gpu only connect with cpu
    }

    {
        // TODO: check if exists isolated node
    }

    return true;
}

void
ResourceMgr::post_event(const EventPtr& event) {
    {
@@ -183,7 +233,9 @@ void
ResourceMgr::event_process() {
    while (running_) {
        std::unique_lock<std::mutex> lock(event_mutex_);
        event_cv_.wait(lock, [this] { return !queue_.empty(); });
        event_cv_.wait(lock, [this] {
            return !queue_.empty();
        });

        auto event = queue_.front();
        queue_.pop();
+17 −2
Original line number Diff line number Diff line
@@ -64,7 +64,17 @@ class ResourceMgr {
        return disk_resources_;
    }

    // TODO(wxy): why return shared pointer
    inline std::vector<ResourceWPtr>&
    GetCpuResources() {
        return cpu_resources_;
    }

    inline std::vector<ResourceWPtr>&
    GetGpuResources() {
        return gpu_resources_;
    }

    // TODO(wxyu): why return shared pointer
    inline std::vector<ResourcePtr>
    GetAllResources() {
        return resources_;
@@ -89,7 +99,7 @@ class ResourceMgr {
    GetNumGpuResource() const;

 public:
    // TODO(wxy): add stats interface(low)
    // TODO(wxyu): add stats interface(low)

 public:
    /******** Utility Functions ********/
@@ -100,6 +110,9 @@ class ResourceMgr {
    DumpTaskTables();

 private:
    bool
    check_resource_valid();

    void
    post_event(const EventPtr& event);

@@ -110,6 +123,8 @@ class ResourceMgr {
    bool running_ = false;

    std::vector<ResourceWPtr> disk_resources_;
    std::vector<ResourceWPtr> cpu_resources_;
    std::vector<ResourceWPtr> gpu_resources_;
    std::vector<ResourcePtr> resources_;
    mutable std::mutex resources_mutex_;

+1 −1
Original line number Diff line number Diff line
@@ -146,7 +146,7 @@ load_advance_config() {
    //        }
    //    } catch (const char *msg) {
    //        SERVER_LOG_ERROR << msg;
    //        // TODO(wxy): throw exception instead
    //        // TODO(wxyu): throw exception instead
    //        exit(-1);
    ////        throw std::exception();
    //    }
+1 −1
Original line number Diff line number Diff line
@@ -92,7 +92,7 @@ Scheduler::Process(const EventPtr& event) {
    process_event(event);
}

// TODO(wxy): refactor the function
// TODO(wxyu): refactor the function
void
Scheduler::OnLoadCompleted(const EventPtr& event) {
    auto load_completed_event = std::static_pointer_cast<LoadCompletedEvent>(event);
Loading