Loading cpp/CHANGELOG.md +1 −0 Original line number Diff line number Diff line Loading @@ -25,6 +25,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-585 - Update namespace in scheduler - MS-608 - Update TODO names - MS-609 - Update task construct function - MS-611 - Add resources validity check in ResourceMgr ## New Feature Loading cpp/src/scheduler/ResourceMgr.cpp +58 −8 Original line number Diff line number Diff line Loading @@ -24,6 +24,12 @@ namespace scheduler { void ResourceMgr::Start() { if (not check_resource_valid()) { ENGINE_LOG_ERROR << "Resources invalid, cannot start ResourceMgr."; ENGINE_LOG_ERROR << Dump(); return; } std::lock_guard<std::mutex> lck(resources_mutex_); for (auto& resource : resources_) { resource->Start(); Loading Loading @@ -60,8 +66,22 @@ ResourceMgr::Add(ResourcePtr&& resource) { resource->RegisterSubscriber(std::bind(&ResourceMgr::post_event, this, std::placeholders::_1)); if (resource->type() == ResourceType::DISK) { switch (resource->type()) { case ResourceType::DISK: { disk_resources_.emplace_back(ResourceWPtr(resource)); break; } case ResourceType::CPU: { cpu_resources_.emplace_back(ResourceWPtr(resource)); break; } case ResourceType::GPU: { gpu_resources_.emplace_back(ResourceWPtr(resource)); break; } default: { break; } } resources_.emplace_back(resource); Loading Loading @@ -148,14 +168,14 @@ ResourceMgr::GetNumGpuResource() const { std::string ResourceMgr::Dump() { std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n"; std::stringstream ss; ss << "ResourceMgr contains " << resources_.size() << " resources." << std::endl; for (uint64_t i = 0; i < resources_.size(); ++i) { str += "Resource No." + std::to_string(i) + ":\n"; // str += resources_[i]->Dump(); for (auto& res : resources_) { ss << res->Dump(); } return str; return ss.str(); } std::string Loading @@ -170,6 +190,34 @@ ResourceMgr::DumpTaskTables() { return ss.str(); } bool ResourceMgr::check_resource_valid() { { // TODO: check one disk-resource, one cpu-resource, zero or more gpu-resource; if (GetDiskResources().size() != 1) return false; if (GetCpuResources().size() != 1) return false; } { // TODO: one compute-resource at least; if (GetNumOfComputeResource() < 1) return false; } { // TODO: check disk only connect with cpu } { // TODO: check gpu only connect with cpu } { // TODO: check if exists isolated node } return true; } void ResourceMgr::post_event(const EventPtr& event) { { Loading @@ -183,7 +231,9 @@ void ResourceMgr::event_process() { while (running_) { std::unique_lock<std::mutex> lock(event_mutex_); event_cv_.wait(lock, [this] { return !queue_.empty(); }); event_cv_.wait(lock, [this] { return !queue_.empty(); }); auto event = queue_.front(); queue_.pop(); Loading cpp/src/scheduler/ResourceMgr.h +15 −0 Original line number Diff line number Diff line Loading @@ -64,6 +64,16 @@ class ResourceMgr { return disk_resources_; } inline std::vector<ResourceWPtr>& GetCpuResources() { return cpu_resources_; } inline std::vector<ResourceWPtr>& GetGpuResources() { return gpu_resources_; } // TODO(wxyu): why return shared pointer inline std::vector<ResourcePtr> GetAllResources() { Loading Loading @@ -100,6 +110,9 @@ class ResourceMgr { DumpTaskTables(); private: bool check_resource_valid(); void post_event(const EventPtr& event); Loading @@ -110,6 +123,8 @@ class ResourceMgr { bool running_ = false; std::vector<ResourceWPtr> disk_resources_; std::vector<ResourceWPtr> cpu_resources_; std::vector<ResourceWPtr> gpu_resources_; std::vector<ResourcePtr> resources_; mutable std::mutex resources_mutex_; Loading Loading
cpp/CHANGELOG.md +1 −0 Original line number Diff line number Diff line Loading @@ -25,6 +25,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-585 - Update namespace in scheduler - MS-608 - Update TODO names - MS-609 - Update task construct function - MS-611 - Add resources validity check in ResourceMgr ## New Feature Loading
cpp/src/scheduler/ResourceMgr.cpp +58 −8 Original line number Diff line number Diff line Loading @@ -24,6 +24,12 @@ namespace scheduler { void ResourceMgr::Start() { if (not check_resource_valid()) { ENGINE_LOG_ERROR << "Resources invalid, cannot start ResourceMgr."; ENGINE_LOG_ERROR << Dump(); return; } std::lock_guard<std::mutex> lck(resources_mutex_); for (auto& resource : resources_) { resource->Start(); Loading Loading @@ -60,8 +66,22 @@ ResourceMgr::Add(ResourcePtr&& resource) { resource->RegisterSubscriber(std::bind(&ResourceMgr::post_event, this, std::placeholders::_1)); if (resource->type() == ResourceType::DISK) { switch (resource->type()) { case ResourceType::DISK: { disk_resources_.emplace_back(ResourceWPtr(resource)); break; } case ResourceType::CPU: { cpu_resources_.emplace_back(ResourceWPtr(resource)); break; } case ResourceType::GPU: { gpu_resources_.emplace_back(ResourceWPtr(resource)); break; } default: { break; } } resources_.emplace_back(resource); Loading Loading @@ -148,14 +168,14 @@ ResourceMgr::GetNumGpuResource() const { std::string ResourceMgr::Dump() { std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n"; std::stringstream ss; ss << "ResourceMgr contains " << resources_.size() << " resources." << std::endl; for (uint64_t i = 0; i < resources_.size(); ++i) { str += "Resource No." + std::to_string(i) + ":\n"; // str += resources_[i]->Dump(); for (auto& res : resources_) { ss << res->Dump(); } return str; return ss.str(); } std::string Loading @@ -170,6 +190,34 @@ ResourceMgr::DumpTaskTables() { return ss.str(); } bool ResourceMgr::check_resource_valid() { { // TODO: check one disk-resource, one cpu-resource, zero or more gpu-resource; if (GetDiskResources().size() != 1) return false; if (GetCpuResources().size() != 1) return false; } { // TODO: one compute-resource at least; if (GetNumOfComputeResource() < 1) return false; } { // TODO: check disk only connect with cpu } { // TODO: check gpu only connect with cpu } { // TODO: check if exists isolated node } return true; } void ResourceMgr::post_event(const EventPtr& event) { { Loading @@ -183,7 +231,9 @@ void ResourceMgr::event_process() { while (running_) { std::unique_lock<std::mutex> lock(event_mutex_); event_cv_.wait(lock, [this] { return !queue_.empty(); }); event_cv_.wait(lock, [this] { return !queue_.empty(); }); auto event = queue_.front(); queue_.pop(); Loading
cpp/src/scheduler/ResourceMgr.h +15 −0 Original line number Diff line number Diff line Loading @@ -64,6 +64,16 @@ class ResourceMgr { return disk_resources_; } inline std::vector<ResourceWPtr>& GetCpuResources() { return cpu_resources_; } inline std::vector<ResourceWPtr>& GetGpuResources() { return gpu_resources_; } // TODO(wxyu): why return shared pointer inline std::vector<ResourcePtr> GetAllResources() { Loading Loading @@ -100,6 +110,9 @@ class ResourceMgr { DumpTaskTables(); private: bool check_resource_valid(); void post_event(const EventPtr& event); Loading @@ -110,6 +123,8 @@ class ResourceMgr { bool running_ = false; std::vector<ResourceWPtr> disk_resources_; std::vector<ResourceWPtr> cpu_resources_; std::vector<ResourceWPtr> gpu_resources_; std::vector<ResourcePtr> resources_; mutable std::mutex resources_mutex_; Loading