Commit f68675ef authored by 王翔宇's avatar 王翔宇
Browse files

rename functions tasktable, make it accessing likes standard structure


Former-commit-id: 59ae30f3c08fc645908d175d2b65006437f1af47
parent 7fb08018
Loading
Loading
Loading
Loading
+0 −5
Original line number Diff line number Diff line
@@ -291,11 +291,6 @@ TaskTable::Put(std::vector<TaskPtr>& tasks) {
    }
}

TaskTableItemPtr
TaskTable::Get(uint64_t index) {
    return table_[index];
}

size_t
TaskTable::TaskToExecute() {
    size_t count = 0;
+25 −32
Original line number Diff line number Diff line
@@ -106,6 +106,11 @@ class TaskTable : public interface::dumpable {
    TaskTable(const TaskTable&) = delete;
    TaskTable(TaskTable&&) = delete;

 public:
    json
    Dump() const override;

 public:
    inline void
    RegisterSubscriber(std::function<void(void)> subscriber) {
        subscriber_ = std::move(subscriber);
@@ -124,40 +129,35 @@ class TaskTable : public interface::dumpable {
    void
    Put(std::vector<TaskPtr>& tasks);

    /*
     * Return task table item reference;
     */
    TaskTableItemPtr
    Get(uint64_t index);

    inline size_t
    Capacity() {
        return table_.capacity();
    }

    /*
     * Return size of task table;
     */
    inline size_t
    Size() {
        return table_.size();
    }

    size_t
    TaskToExecute();

 public:
    const TaskTableItemPtr& operator[](uint64_t index) {
        return table_[index];
    }

 public:
    std::vector<uint64_t>
    PickToLoad(uint64_t limit);

    std::vector<uint64_t>
    PickToExecute(uint64_t limit);

 public:
    inline const TaskTableItemPtr& operator[](uint64_t index) {
        return table_[index];
    }

    inline const TaskTableItemPtr&
    at(uint64_t index) {
        return table_[index];
    }

    inline size_t
    capacity() {
        return table_.capacity();
    }

    inline size_t
    size() {
        return table_.size();
    }

 public:
    /******** Action ********/

@@ -223,13 +223,6 @@ class TaskTable : public interface::dumpable {
        return table_[index]->Moved();
    }

 public:
    /*
     * Dump;
     */
    json
    Dump() const override;

 private:
    std::uint64_t id_ = 0;
    CircleQueue<TaskTableItemPtr> table_;
+2 −2
Original line number Diff line number Diff line
@@ -132,7 +132,7 @@ Resource::pick_task_load() {
    for (auto index : indexes) {
        // try to set one task loading, then return
        if (task_table_.Load(index))
            return task_table_.Get(index);
            return task_table_.at(index);
        // else try next
    }
    return nullptr;
@@ -150,7 +150,7 @@ Resource::pick_task_execute() {
        }

        if (task_table_.Execute(index)) {
            return task_table_.Get(index);
            return task_table_.at(index);
        }
        //        if (task_table_[index]->task->label()->Type() == TaskLabelType::SPECIFIED_RESOURCE) {
        //            if (task_table_.Get(index)->task->path().Current() == task_table_.Get(index)->task->path().Last()
+1 −1
Original line number Diff line number Diff line
@@ -165,7 +165,7 @@ TEST_F(SchedulerTest, ON_LOAD_COMPLETED) {
    }

    sleep(3);
    ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
    ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().size(), NUM);
}

TEST_F(SchedulerTest, PUSH_TASK_TO_NEIGHBOUR_RANDOMLY_TEST) {
+56 −56
Original line number Diff line number Diff line
@@ -183,19 +183,19 @@ TEST_F(TaskTableBaseTest, SUBSCRIBER) {

TEST_F(TaskTableBaseTest, PUT_TASK) {
    empty_table_.Put(task1_);
    ASSERT_EQ(empty_table_.Get(0)->task, task1_);
    ASSERT_EQ(empty_table_.at(0)->task, task1_);
}

TEST_F(TaskTableBaseTest, PUT_INVALID_TEST) {
    empty_table_.Put(invalid_task_);
    ASSERT_EQ(empty_table_.Get(0)->task, invalid_task_);
    ASSERT_EQ(empty_table_.at(0)->task, invalid_task_);
}

TEST_F(TaskTableBaseTest, PUT_BATCH) {
    std::vector<milvus::scheduler::TaskPtr> tasks{task1_, task2_};
    empty_table_.Put(tasks);
    ASSERT_EQ(empty_table_.Get(0)->task, task1_);
    ASSERT_EQ(empty_table_.Get(1)->task, task2_);
    ASSERT_EQ(empty_table_.at(0)->task, task1_);
    ASSERT_EQ(empty_table_.at(1)->task, task2_);
}

TEST_F(TaskTableBaseTest, PUT_EMPTY_BATCH) {
@@ -204,14 +204,14 @@ TEST_F(TaskTableBaseTest, PUT_EMPTY_BATCH) {
}

TEST_F(TaskTableBaseTest, SIZE) {
    ASSERT_EQ(empty_table_.Size(), 0);
    ASSERT_EQ(empty_table_.size(), 0);
    empty_table_.Put(task1_);
    ASSERT_EQ(empty_table_.Size(), 1);
    ASSERT_EQ(empty_table_.size(), 1);
}

TEST_F(TaskTableBaseTest, OPERATOR) {
    empty_table_.Put(task1_);
    ASSERT_EQ(empty_table_.Get(0), empty_table_[0]);
    ASSERT_EQ(empty_table_.at(0), empty_table_[0]);
}

TEST_F(TaskTableBaseTest, PICK_TO_LOAD) {
@@ -224,7 +224,7 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD) {

    auto indexes = empty_table_.PickToLoad(1);
    ASSERT_EQ(indexes.size(), 1);
    ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
    ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
}

TEST_F(TaskTableBaseTest, PICK_TO_LOAD_LIMIT) {
@@ -237,9 +237,9 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD_LIMIT) {

    auto indexes = empty_table_.PickToLoad(3);
    ASSERT_EQ(indexes.size(), 3);
    ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
    ASSERT_EQ(indexes[1] % empty_table_.Capacity(), 3);
    ASSERT_EQ(indexes[2] % empty_table_.Capacity(), 4);
    ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
    ASSERT_EQ(indexes[1] % empty_table_.capacity(), 3);
    ASSERT_EQ(indexes[2] % empty_table_.capacity(), 4);
}

TEST_F(TaskTableBaseTest, PICK_TO_LOAD_CACHE) {
@@ -253,14 +253,14 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD_CACHE) {
    // first pick, non-cache
    auto indexes = empty_table_.PickToLoad(1);
    ASSERT_EQ(indexes.size(), 1);
    ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
    ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);

    // second pick, iterate from 2
    // invalid state change
    empty_table_[1]->state = milvus::scheduler::TaskTableItemState::START;
    indexes = empty_table_.PickToLoad(1);
    ASSERT_EQ(indexes.size(), 1);
    ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
    ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
}

TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE) {
@@ -274,7 +274,7 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE) {

    auto indexes = empty_table_.PickToExecute(1);
    ASSERT_EQ(indexes.size(), 1);
    ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
    ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
}

TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_LIMIT) {
@@ -289,8 +289,8 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_LIMIT) {

    auto indexes = empty_table_.PickToExecute(3);
    ASSERT_EQ(indexes.size(), 2);
    ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
    ASSERT_EQ(indexes[1] % empty_table_.Capacity(), 3);
    ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
    ASSERT_EQ(indexes[1] % empty_table_.capacity(), 3);
}

TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_CACHE) {
@@ -305,14 +305,14 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_CACHE) {
    // first pick, non-cache
    auto indexes = empty_table_.PickToExecute(1);
    ASSERT_EQ(indexes.size(), 1);
    ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
    ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);

    // second pick, iterate from 2
    // invalid state change
    empty_table_[1]->state = milvus::scheduler::TaskTableItemState::START;
    indexes = empty_table_.PickToExecute(1);
    ASSERT_EQ(indexes.size(), 1);
    ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2);
    ASSERT_EQ(indexes[0] % empty_table_.capacity(), 2);
}

/************ TaskTableAdvanceTest ************/
@@ -328,14 +328,14 @@ class TaskTableAdvanceTest : public ::testing::Test {
            table1_.Put(task);
        }

        table1_.Get(0)->state = milvus::scheduler::TaskTableItemState::INVALID;
        table1_.Get(1)->state = milvus::scheduler::TaskTableItemState::START;
        table1_.Get(2)->state = milvus::scheduler::TaskTableItemState::LOADING;
        table1_.Get(3)->state = milvus::scheduler::TaskTableItemState::LOADED;
        table1_.Get(4)->state = milvus::scheduler::TaskTableItemState::EXECUTING;
        table1_.Get(5)->state = milvus::scheduler::TaskTableItemState::EXECUTED;
        table1_.Get(6)->state = milvus::scheduler::TaskTableItemState::MOVING;
        table1_.Get(7)->state = milvus::scheduler::TaskTableItemState::MOVED;
        table1_.at(0)->state = milvus::scheduler::TaskTableItemState::INVALID;
        table1_.at(1)->state = milvus::scheduler::TaskTableItemState::START;
        table1_.at(2)->state = milvus::scheduler::TaskTableItemState::LOADING;
        table1_.at(3)->state = milvus::scheduler::TaskTableItemState::LOADED;
        table1_.at(4)->state = milvus::scheduler::TaskTableItemState::EXECUTING;
        table1_.at(5)->state = milvus::scheduler::TaskTableItemState::EXECUTED;
        table1_.at(6)->state = milvus::scheduler::TaskTableItemState::MOVING;
        table1_.at(7)->state = milvus::scheduler::TaskTableItemState::MOVED;
    }

    milvus::scheduler::TaskTable table1_;
@@ -343,114 +343,114 @@ class TaskTableAdvanceTest : public ::testing::Test {

TEST_F(TaskTableAdvanceTest, LOAD) {
    std::vector<milvus::scheduler::TaskTableItemState> before_state;
    for (size_t i = 0; i < table1_.Size(); ++i) {
    for (size_t i = 0; i < table1_.size(); ++i) {
        before_state.push_back(table1_[i]->state);
    }

    for (size_t i = 0; i < table1_.Size(); ++i) {
    for (size_t i = 0; i < table1_.size(); ++i) {
        table1_.Load(i);
    }

    for (size_t i = 0; i < table1_.Size(); ++i) {
    for (size_t i = 0; i < table1_.size(); ++i) {
        if (before_state[i] == milvus::scheduler::TaskTableItemState::START) {
            ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::LOADING);
            ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::LOADING);
        } else {
            ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
            ASSERT_EQ(table1_.at(i)->state, before_state[i]);
        }
    }
}

TEST_F(TaskTableAdvanceTest, LOADED) {
    std::vector<milvus::scheduler::TaskTableItemState> before_state;
    for (size_t i = 0; i < table1_.Size(); ++i) {
    for (size_t i = 0; i < table1_.size(); ++i) {
        before_state.push_back(table1_[i]->state);
    }

    for (size_t i = 0; i < table1_.Size(); ++i) {
    for (size_t i = 0; i < table1_.size(); ++i) {
        table1_.Loaded(i);
    }

    for (size_t i = 0; i < table1_.Size(); ++i) {
    for (size_t i = 0; i < table1_.size(); ++i) {
        if (before_state[i] == milvus::scheduler::TaskTableItemState::LOADING) {
            ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::LOADED);
            ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::LOADED);
        } else {
            ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
            ASSERT_EQ(table1_.at(i)->state, before_state[i]);
        }
    }
}

TEST_F(TaskTableAdvanceTest, EXECUTE) {
    std::vector<milvus::scheduler::TaskTableItemState> before_state;
    for (size_t i = 0; i < table1_.Size(); ++i) {
    for (size_t i = 0; i < table1_.size(); ++i) {
        before_state.push_back(table1_[i]->state);
    }

    for (size_t i = 0; i < table1_.Size(); ++i) {
    for (size_t i = 0; i < table1_.size(); ++i) {
        table1_.Execute(i);
    }

    for (size_t i = 0; i < table1_.Size(); ++i) {
    for (size_t i = 0; i < table1_.size(); ++i) {
        if (before_state[i] == milvus::scheduler::TaskTableItemState::LOADED) {
            ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::EXECUTING);
            ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::EXECUTING);
        } else {
            ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
            ASSERT_EQ(table1_.at(i)->state, before_state[i]);
        }
    }
}

TEST_F(TaskTableAdvanceTest, EXECUTED) {
    std::vector<milvus::scheduler::TaskTableItemState> before_state;
    for (size_t i = 0; i < table1_.Size(); ++i) {
    for (size_t i = 0; i < table1_.size(); ++i) {
        before_state.push_back(table1_[i]->state);
    }

    for (size_t i = 0; i < table1_.Size(); ++i) {
    for (size_t i = 0; i < table1_.size(); ++i) {
        table1_.Executed(i);
    }

    for (size_t i = 0; i < table1_.Size(); ++i) {
    for (size_t i = 0; i < table1_.size(); ++i) {
        if (before_state[i] == milvus::scheduler::TaskTableItemState::EXECUTING) {
            ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::EXECUTED);
            ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::EXECUTED);
        } else {
            ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
            ASSERT_EQ(table1_.at(i)->state, before_state[i]);
        }
    }
}

TEST_F(TaskTableAdvanceTest, MOVE) {
    std::vector<milvus::scheduler::TaskTableItemState> before_state;
    for (size_t i = 0; i < table1_.Size(); ++i) {
    for (size_t i = 0; i < table1_.size(); ++i) {
        before_state.push_back(table1_[i]->state);
    }

    for (size_t i = 0; i < table1_.Size(); ++i) {
    for (size_t i = 0; i < table1_.size(); ++i) {
        table1_.Move(i);
    }

    for (size_t i = 0; i < table1_.Size(); ++i) {
    for (size_t i = 0; i < table1_.size(); ++i) {
        if (before_state[i] == milvus::scheduler::TaskTableItemState::LOADED) {
            ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::MOVING);
            ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::MOVING);
        } else {
            ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
            ASSERT_EQ(table1_.at(i)->state, before_state[i]);
        }
    }
}

TEST_F(TaskTableAdvanceTest, MOVED) {
    std::vector<milvus::scheduler::TaskTableItemState> before_state;
    for (size_t i = 0; i < table1_.Size(); ++i) {
    for (size_t i = 0; i < table1_.size(); ++i) {
        before_state.push_back(table1_[i]->state);
    }

    for (size_t i = 0; i < table1_.Size(); ++i) {
    for (size_t i = 0; i < table1_.size(); ++i) {
        table1_.Moved(i);
    }

    for (size_t i = 0; i < table1_.Size(); ++i) {
    for (size_t i = 0; i < table1_.size(); ++i) {
        if (before_state[i] == milvus::scheduler::TaskTableItemState::MOVING) {
            ASSERT_EQ(table1_.Get(i)->state, milvus::scheduler::TaskTableItemState::MOVED);
            ASSERT_EQ(table1_.at(i)->state, milvus::scheduler::TaskTableItemState::MOVED);
        } else {
            ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
            ASSERT_EQ(table1_.at(i)->state, before_state[i]);
        }
    }
}