Loading CHANGELOG.md +1 −0 Original line number Diff line number Diff line Loading @@ -12,6 +12,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#80 - Print version information into log during server start - \#82 - Move easyloggingpp into "external" directory - \#92 - Speed up CMake build process - \#118 - Using shared_ptr instead of weak_ptr to avoid performance loss ## Feature - \#115 - Using new structure for tasktable Loading core/src/scheduler/Algorithm.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -54,7 +54,7 @@ ShortestPath(const ResourcePtr& src, const ResourcePtr& dest, const ResourceMgrP auto cur_neighbours = cur_node->GetNeighbours(); for (auto& neighbour : cur_neighbours) { auto neighbour_res = std::static_pointer_cast<Resource>(neighbour.neighbour_node.lock()); auto neighbour_res = std::static_pointer_cast<Resource>(neighbour.neighbour_node); dis_matrix[name_id_map.at(res->name())][name_id_map.at(neighbour_res->name())] = neighbour.connection.transport_cost(); } Loading core/src/scheduler/Scheduler.cpp +30 −34 Original line number Diff line number Diff line Loading @@ -26,10 +26,8 @@ namespace milvus { namespace scheduler { Scheduler::Scheduler(ResourceMgrWPtr res_mgr) : running_(false), res_mgr_(std::move(res_mgr)) { if (auto mgr = res_mgr_.lock()) { mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1)); } Scheduler::Scheduler(ResourceMgrPtr res_mgr) : running_(false), res_mgr_(std::move(res_mgr)) { res_mgr_->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1)); event_register_.insert(std::make_pair(static_cast<uint64_t>(EventType::START_UP), std::bind(&Scheduler::OnStartUp, this, std::placeholders::_1))); event_register_.insert(std::make_pair(static_cast<uint64_t>(EventType::LOAD_COMPLETED), Loading @@ -40,6 +38,10 @@ Scheduler::Scheduler(ResourceMgrWPtr res_mgr) : running_(false), res_mgr_(std::m std::bind(&Scheduler::OnFinishTask, this, std::placeholders::_1))); } Scheduler::~Scheduler() { res_mgr_ = nullptr; } void Scheduler::Start() { running_ = true; Loading Loading @@ -100,7 +102,8 @@ Scheduler::Process(const EventPtr& event) { void Scheduler::OnLoadCompleted(const EventPtr& event) { auto load_completed_event = std::static_pointer_cast<LoadCompletedEvent>(event); if (auto resource = event->resource_.lock()) { auto resource = event->resource_; resource->WakeupExecutor(); auto task_table_type = load_completed_event->task_table_item_->task->label()->Type(); Loading @@ -124,27 +127,20 @@ Scheduler::OnLoadCompleted(const EventPtr& event) { } resource->WakeupLoader(); } } void Scheduler::OnStartUp(const EventPtr& event) { if (auto resource = event->resource_.lock()) { resource->WakeupLoader(); } event->resource_->WakeupLoader(); } void Scheduler::OnFinishTask(const EventPtr& event) { if (auto resource = event->resource_.lock()) { resource->WakeupLoader(); } event->resource_->WakeupLoader(); } void Scheduler::OnTaskTableUpdated(const EventPtr& event) { if (auto resource = event->resource_.lock()) { resource->WakeupLoader(); } event->resource_->WakeupLoader(); } } // namespace scheduler Loading core/src/scheduler/Scheduler.h +4 −2 Original line number Diff line number Diff line Loading @@ -34,7 +34,9 @@ namespace scheduler { class Scheduler : public interface::dumpable { public: explicit Scheduler(ResourceMgrWPtr res_mgr); explicit Scheduler(ResourceMgrPtr res_mgr); ~Scheduler(); Scheduler(const Scheduler&) = delete; Scheduler(Scheduler&&) = delete; Loading Loading @@ -118,7 +120,7 @@ class Scheduler : public interface::dumpable { std::unordered_map<uint64_t, std::function<void(EventPtr)>> event_register_; ResourceMgrWPtr res_mgr_; ResourceMgrPtr res_mgr_; std::queue<EventPtr> event_queue_; std::thread worker_thread_; std::mutex event_mutex_; Loading core/src/scheduler/action/Action.h +3 −2 Original line number Diff line number Diff line Loading @@ -37,10 +37,11 @@ class Action { PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest); static void DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, std::shared_ptr<LoadCompletedEvent> event); DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource, std::shared_ptr<LoadCompletedEvent> event); static void SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource, std::shared_ptr<LoadCompletedEvent> event); }; Loading Loading
CHANGELOG.md +1 −0 Original line number Diff line number Diff line Loading @@ -12,6 +12,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#80 - Print version information into log during server start - \#82 - Move easyloggingpp into "external" directory - \#92 - Speed up CMake build process - \#118 - Using shared_ptr instead of weak_ptr to avoid performance loss ## Feature - \#115 - Using new structure for tasktable Loading
core/src/scheduler/Algorithm.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -54,7 +54,7 @@ ShortestPath(const ResourcePtr& src, const ResourcePtr& dest, const ResourceMgrP auto cur_neighbours = cur_node->GetNeighbours(); for (auto& neighbour : cur_neighbours) { auto neighbour_res = std::static_pointer_cast<Resource>(neighbour.neighbour_node.lock()); auto neighbour_res = std::static_pointer_cast<Resource>(neighbour.neighbour_node); dis_matrix[name_id_map.at(res->name())][name_id_map.at(neighbour_res->name())] = neighbour.connection.transport_cost(); } Loading
core/src/scheduler/Scheduler.cpp +30 −34 Original line number Diff line number Diff line Loading @@ -26,10 +26,8 @@ namespace milvus { namespace scheduler { Scheduler::Scheduler(ResourceMgrWPtr res_mgr) : running_(false), res_mgr_(std::move(res_mgr)) { if (auto mgr = res_mgr_.lock()) { mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1)); } Scheduler::Scheduler(ResourceMgrPtr res_mgr) : running_(false), res_mgr_(std::move(res_mgr)) { res_mgr_->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1)); event_register_.insert(std::make_pair(static_cast<uint64_t>(EventType::START_UP), std::bind(&Scheduler::OnStartUp, this, std::placeholders::_1))); event_register_.insert(std::make_pair(static_cast<uint64_t>(EventType::LOAD_COMPLETED), Loading @@ -40,6 +38,10 @@ Scheduler::Scheduler(ResourceMgrWPtr res_mgr) : running_(false), res_mgr_(std::m std::bind(&Scheduler::OnFinishTask, this, std::placeholders::_1))); } Scheduler::~Scheduler() { res_mgr_ = nullptr; } void Scheduler::Start() { running_ = true; Loading Loading @@ -100,7 +102,8 @@ Scheduler::Process(const EventPtr& event) { void Scheduler::OnLoadCompleted(const EventPtr& event) { auto load_completed_event = std::static_pointer_cast<LoadCompletedEvent>(event); if (auto resource = event->resource_.lock()) { auto resource = event->resource_; resource->WakeupExecutor(); auto task_table_type = load_completed_event->task_table_item_->task->label()->Type(); Loading @@ -124,27 +127,20 @@ Scheduler::OnLoadCompleted(const EventPtr& event) { } resource->WakeupLoader(); } } void Scheduler::OnStartUp(const EventPtr& event) { if (auto resource = event->resource_.lock()) { resource->WakeupLoader(); } event->resource_->WakeupLoader(); } void Scheduler::OnFinishTask(const EventPtr& event) { if (auto resource = event->resource_.lock()) { resource->WakeupLoader(); } event->resource_->WakeupLoader(); } void Scheduler::OnTaskTableUpdated(const EventPtr& event) { if (auto resource = event->resource_.lock()) { resource->WakeupLoader(); } event->resource_->WakeupLoader(); } } // namespace scheduler Loading
core/src/scheduler/Scheduler.h +4 −2 Original line number Diff line number Diff line Loading @@ -34,7 +34,9 @@ namespace scheduler { class Scheduler : public interface::dumpable { public: explicit Scheduler(ResourceMgrWPtr res_mgr); explicit Scheduler(ResourceMgrPtr res_mgr); ~Scheduler(); Scheduler(const Scheduler&) = delete; Scheduler(Scheduler&&) = delete; Loading Loading @@ -118,7 +120,7 @@ class Scheduler : public interface::dumpable { std::unordered_map<uint64_t, std::function<void(EventPtr)>> event_register_; ResourceMgrWPtr res_mgr_; ResourceMgrPtr res_mgr_; std::queue<EventPtr> event_queue_; std::thread worker_thread_; std::mutex event_mutex_; Loading
core/src/scheduler/action/Action.h +3 −2 Original line number Diff line number Diff line Loading @@ -37,10 +37,11 @@ class Action { PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest); static void DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, std::shared_ptr<LoadCompletedEvent> event); DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource, std::shared_ptr<LoadCompletedEvent> event); static void SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource, std::shared_ptr<LoadCompletedEvent> event); }; Loading