Commit 870e4146 authored by Jin Hai's avatar Jin Hai Committed by GitHub
Browse files

Merge pull request #131 from scsven/dev

Set task state MOVED after resource copy it completed

Former-commit-id: 2faece7ef215a8566428761f55846148298c7348
parents 275a18b4 eb681d1e
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ Please mark all change in change log and use the ticket from JIRA.
- \#96 - Remove .a file in milvus/lib for docker-version
- \#118 - Using shared_ptr instead of weak_ptr to avoid performance loss
- \#122 - Add unique id for Job
- \#130 - Set task state MOVED after resource copy it completed

## Task

+1 −1
Original line number Diff line number Diff line
@@ -91,7 +91,7 @@ JobMgr::worker_function() {
        // disk resources NEVER be empty.
        if (auto disk = res_mgr_->GetDiskResources()[0].lock()) {
            for (auto& task : tasks) {
                disk->task_table().Put(task);
                disk->task_table().Put(task, nullptr);
            }
        }
    }
+1 −1
Original line number Diff line number Diff line
@@ -120,7 +120,7 @@ Scheduler::OnLoadCompleted(const EventPtr& event) {
            if (resource->HasExecutor() == false) {
                load_completed_event->task_table_item_->Move();
            }
            Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
            Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_, resource);
            break;
        }
        default: { break; }
+2 −17
Original line number Diff line number Diff line
@@ -264,8 +264,8 @@ TaskTable::PickToExecute(uint64_t limit) {
}

void
TaskTable::Put(TaskPtr task) {
    auto item = std::make_shared<TaskTableItem>();
TaskTable::Put(TaskPtr task, TaskTableItemPtr from) {
    auto item = std::make_shared<TaskTableItem>(std::move(from));
    item->id = id_++;
    item->task = std::move(task);
    item->state = TaskTableItemState::START;
@@ -276,21 +276,6 @@ TaskTable::Put(TaskPtr task) {
    }
}

void
TaskTable::Put(std::vector<TaskPtr>& tasks) {
    for (auto& task : tasks) {
        auto item = std::make_shared<TaskTableItem>();
        item->id = id_++;
        item->task = std::move(task);
        item->state = TaskTableItemState::START;
        item->timestamp.start = get_current_timestamp();
        table_.put(std::move(item));
    }
    if (subscriber_) {
        subscriber_();
    }
}

size_t
TaskTable::TaskToExecute() {
    size_t count = 0;
+7 −11
Original line number Diff line number Diff line
@@ -58,8 +58,12 @@ struct TaskTimestamp : public interface::dumpable {
    Dump() const override;
};

struct TaskTableItem;
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;

struct TaskTableItem : public interface::dumpable {
    TaskTableItem() : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex() {
    explicit TaskTableItem(TaskTableItemPtr f = nullptr)
        : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex(), from(std::move(f)) {
    }

    TaskTableItem(const TaskTableItem& src) = delete;
@@ -70,6 +74,7 @@ struct TaskTableItem : public interface::dumpable {
    TaskTableItemState state;  // the state;
    std::mutex mutex;
    TaskTimestamp timestamp;
    TaskTableItemPtr from;

    bool
    IsFinish();
@@ -96,8 +101,6 @@ struct TaskTableItem : public interface::dumpable {
    Dump() const override;
};

using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;

class TaskTable : public interface::dumpable {
 public:
    TaskTable() : table_(1ULL << 16ULL) {
@@ -120,14 +123,7 @@ class TaskTable : public interface::dumpable {
     * Put one task;
     */
    void
    Put(TaskPtr task);

    /*
     * Put tasks back of task table;
     * Called by DBImpl;
     */
    void
    Put(std::vector<TaskPtr>& tasks);
    Put(TaskPtr task, TaskTableItemPtr from = nullptr);

    size_t
    TaskToExecute();
Loading