Commit 869b7dbc authored by 王翔宇's avatar 王翔宇
Browse files

Using new structure for tasktable


Former-commit-id: 686aa0f2d3e8d4160822a7adf4ca36f71d6054ba
parent 3d6c9bb4
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -14,6 +14,8 @@ Please mark all change in change log and use the ticket from JIRA.
- \#92 - Speed up CMake build process

## Feature
- \#115 - Using new structure for tasktable

## Task

# Milvus 0.5.0 (2019-10-21)
+14 −11
Original line number Diff line number Diff line
@@ -34,27 +34,30 @@ namespace scheduler {

class BuildMgr {
 public:
    explicit BuildMgr(int64_t numoftasks) : numoftasks_(numoftasks) {
    explicit BuildMgr(int64_t concurrent_limit) : available_(concurrent_limit) {
    }

 public:
    void
    Put() {
        ++numoftasks_;
        std::lock_guard<std::mutex> lock(mutex_);
        ++available_;
    }

    void
    take() {
        --numoftasks_;
    bool
    Take() {
        std::lock_guard<std::mutex> lock(mutex_);
        if (available_ < 1) {
            return false;
        } else {
            --available_;
            return true;
        }

    int64_t
    numoftasks() {
        return (int64_t)numoftasks_;
    }

 private:
    std::atomic_long numoftasks_;
    std::int64_t available_;
    std::mutex mutex_;
};

using BuildMgrPtr = std::shared_ptr<BuildMgr>;
+119 −0
Original line number Diff line number Diff line
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once

#include <atomic>
#include <condition_variable>
#include <deque>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>

namespace milvus {
namespace scheduler {

template <typename T>
class CircleQueue {
    using value_type = T;
    using atomic_size_type = std::atomic_ullong;
    using size_type = uint64_t;
    using const_reference = const value_type&;
#define MEMORY_ORDER (std::memory_order::memory_order_seq_cst)

 public:
    explicit CircleQueue(size_type cap) : data_(cap, nullptr), capacity_(cap), front_() {
        front_.store(cap - 1, MEMORY_ORDER);
    }

    CircleQueue() = delete;
    CircleQueue(const CircleQueue& q) = delete;
    CircleQueue(CircleQueue&& q) = delete;

 public:
    const_reference operator[](size_type n) {
        return data_[n % capacity_];
    }

    size_type
    front() {
        return front_.load(MEMORY_ORDER);
    }

    size_type
    rear() {
        return rear_;
    }

    size_type
    size() {
        return size_;
    }

    size_type
    capacity() {
        return capacity_;
    }

    void
    set_front(uint64_t last_finish) {
        if (last_finish == rear_) {
            throw;
        }
        front_.store(last_finish % capacity_, MEMORY_ORDER);
    }

    void
    put(const value_type& x) {
        if ((rear_) % capacity_ == front_.load(MEMORY_ORDER)) {
            throw;
        }
        data_[rear_] = x;
        rear_ = ++rear_ % capacity_;
        if (size_ < capacity_) {
            ++size_;
        }
    }

    void
    put(value_type&& x) {
        if ((rear_) % capacity_ == front_.load(MEMORY_ORDER)) {
            throw;
        }
        data_[rear_] = std::move(x);
        rear_ = ++rear_ % capacity_;
        if (size_ < capacity_) {
            ++size_;
        }
    }

 private:
    std::vector<value_type> data_;
    size_type capacity_;
    atomic_size_type front_;
    size_type rear_ = 0;
    size_type size_ = 0;
#undef MEMORY_ORDER
};

}  // namespace scheduler
}  // namespace milvus
+70 −26
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@
#include "event/TaskTableUpdatedEvent.h"
#include "scheduler/SchedInst.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"

#include <ctime>
#include <sstream>
@@ -153,7 +154,42 @@ TaskTableItem::Dump() const {

std::vector<uint64_t>
TaskTable::PickToLoad(uint64_t limit) {
    std::lock_guard<std::mutex> lock(mutex_);
#if 1
    TimeRecorder rc("");
    std::vector<uint64_t> indexes;
    bool cross = false;

    uint64_t available_begin = table_.front() + 1;
    for (uint64_t i = 0, loaded_count = 0, pick_count = 0; i < table_.size() && pick_count < limit; ++i) {
        auto index = available_begin + i;
        if (not table_[index])
            break;
        if (index % table_.capacity() == table_.rear())
            break;
        if (not cross && table_[index]->IsFinish()) {
            table_.set_front(index);
        } else if (table_[index]->state == TaskTableItemState::LOADED) {
            cross = true;
            ++loaded_count;
            if (loaded_count > 2)
                return std::vector<uint64_t>();
        } else if (table_[index]->state == TaskTableItemState::START) {
            auto task = table_[index]->task;

            // if task is a build index task, limit it
            if (task->Type() == TaskType::BuildIndexTask && task->path().Current() == "cpu") {
                if (not BuildMgrInst::GetInstance()->Take()) {
                    continue;
                }
            }
            cross = true;
            indexes.push_back(index);
            ++pick_count;
        }
    }
    rc.ElapseFromBegin("PickToLoad ");
    return indexes;
#else
    size_t count = 0;
    for (uint64_t j = last_finish_ + 1; j < table_.size(); ++j) {
        if (not table_[j]) {
@@ -197,34 +233,44 @@ TaskTable::PickToLoad(uint64_t limit) {
        }
    }
    return indexes;
#endif
}

std::vector<uint64_t>
TaskTable::PickToExecute(uint64_t limit) {
    std::lock_guard<std::mutex> lock(mutex_);
    TimeRecorder rc("");
    std::vector<uint64_t> indexes;
    bool cross = false;
    for (uint64_t i = last_finish_ + 1, count = 0; i < table_.size() && count < limit; ++i) {
        if (not cross && table_[i]->IsFinish()) {
            last_finish_ = i;
        } else if (table_[i]->state == TaskTableItemState::LOADED) {
    uint64_t available_begin = table_.front() + 1;
    for (uint64_t i = 0, pick_count = 0; i < table_.size() && pick_count < limit; ++i) {
        uint64_t index = available_begin + i;
        if (not table_[index]) {
            break;
        }
        if (index % table_.capacity() == table_.rear()) {
            break;
        }

        if (not cross && table_[index]->IsFinish()) {
            table_.set_front(index);
        } else if (table_[index]->state == TaskTableItemState::LOADED) {
            cross = true;
            indexes.push_back(i);
            ++count;
            indexes.push_back(index);
            ++pick_count;
        }
    }
    rc.ElapseFromBegin("PickToExecute ");
    return indexes;
}

void
TaskTable::Put(TaskPtr task) {
    std::lock_guard<std::mutex> lock(mutex_);
    auto item = std::make_shared<TaskTableItem>();
    item->id = id_++;
    item->task = std::move(task);
    item->state = TaskTableItemState::START;
    item->timestamp.start = get_current_timestamp();
    table_.push_back(item);
    table_.put(std::move(item));
    if (subscriber_) {
        subscriber_();
    }
@@ -232,14 +278,13 @@ TaskTable::Put(TaskPtr task) {

void
TaskTable::Put(std::vector<TaskPtr>& tasks) {
    std::lock_guard<std::mutex> lock(mutex_);
    for (auto& task : tasks) {
        auto item = std::make_shared<TaskTableItem>();
        item->id = id_++;
        item->task = std::move(task);
        item->state = TaskTableItemState::START;
        item->timestamp.start = get_current_timestamp();
        table_.push_back(item);
        table_.put(std::move(item));
    }
    if (subscriber_) {
        subscriber_();
@@ -248,26 +293,25 @@ TaskTable::Put(std::vector<TaskPtr>& tasks) {

TaskTableItemPtr
TaskTable::Get(uint64_t index) {
    std::lock_guard<std::mutex> lock(mutex_);
    return table_[index];
}

// void
// TaskTable::Clear() {
//// find first task is NOT (done or moved), erase from begin to it;
////        auto iterator = table_.begin();
////        while (iterator->state == TaskTableItemState::EXECUTED or
////            iterator->state == TaskTableItemState::MOVED)
////            iterator++;
////        table_.erase(table_.begin(), iterator);
//}
size_t
TaskTable::TaskToExecute() {
    size_t count = 0;
    auto begin = table_.front() + 1;
    for (size_t i = 0; i < table_.size(); ++i) {
        auto index = begin + i;
        if (table_[index]->state == TaskTableItemState::LOADED) {
            ++count;
        }
    }
    return count;
}

json
TaskTable::Dump() const {
    json ret;
    for (auto& item : table_) {
        ret.push_back(item->Dump());
    }
    json ret{{"error.message", "not support yet."}};
    return ret;
}

+11 −29
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@
#include <utility>
#include <vector>

#include "CircleQueue.h"
#include "event/Event.h"
#include "interface/interfaces.h"
#include "task/SearchTask.h"
@@ -99,7 +100,8 @@ using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;

class TaskTable : public interface::dumpable {
 public:
    TaskTable() = default;
    TaskTable() : table_(1ULL << 16ULL) {
    }

    TaskTable(const TaskTable&) = delete;
    TaskTable(TaskTable&&) = delete;
@@ -128,20 +130,9 @@ class TaskTable : public interface::dumpable {
    TaskTableItemPtr
    Get(uint64_t index);

    /*
     * TODO(wxyu): BIG GC
     * Remove sequence task which is DONE or MOVED from front;
     * Called by ?
     */
    //    void
    //    Clear();

    /*
     * Return true if task table empty, otherwise false;
     */
    inline bool
    Empty() {
        return table_.empty();
    inline size_t
    Capacity() {
        return table_.capacity();
    }

    /*
@@ -152,22 +143,14 @@ class TaskTable : public interface::dumpable {
        return table_.size();
    }

    size_t
    TaskToExecute();

 public:
    TaskTableItemPtr& operator[](uint64_t index) {
        std::lock_guard<std::mutex> lock(mutex_);
    const TaskTableItemPtr& operator[](uint64_t index) {
        return table_[index];
    }

    std::deque<TaskTableItemPtr>::iterator
    begin() {
        return table_.begin();
    }

    std::deque<TaskTableItemPtr>::iterator
    end() {
        return table_.end();
    }

 public:
    std::vector<uint64_t>
    PickToLoad(uint64_t limit);
@@ -249,8 +232,7 @@ class TaskTable : public interface::dumpable {

 private:
    std::uint64_t id_ = 0;
    mutable std::mutex mutex_;
    std::deque<TaskTableItemPtr> table_;
    CircleQueue<TaskTableItemPtr> table_;
    std::function<void(void)> subscriber_ = nullptr;

    // cache last finish avoid Pick task from begin always
Loading