Commit b66c6439 authored by 余昆's avatar 余昆
Browse files

complete BuildIndexJob and BuildIndexTask


Former-commit-id: e7fbc9aff4eca3dce7ce754676cfe06bbce8efca
parent 8cef5e00
Loading
Loading
Loading
Loading
+90 −65
Original line number Diff line number Diff line
@@ -28,6 +28,7 @@
#include "scheduler/SchedInst.h"
#include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h"
#include "scheduler/job/BuildIndexJob.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"

@@ -39,6 +40,7 @@
#include <iostream>
#include <thread>


namespace zilliz {
namespace milvus {
namespace engine {
@@ -894,17 +896,40 @@ DBImpl::BackgroundBuildIndex() {
    meta::TableFilesSchema to_index_files;
    meta_ptr_->FilesToIndex(to_index_files);
    Status status;

    scheduler::BuildIndexJobPtr
        job = std::make_shared<scheduler::BuildIndexJob>(0);

    // step 2: put build index task to scheduler
    scheduler::JobMgrInst::GetInstance()->Put(job);
    for (auto &file : to_index_files) {
        status = BuildIndex(file);
        std::cout << "get to index file" << std::endl;
        meta::TableFileSchema table_file;
        table_file.table_id_ = file.table_id_;
        table_file.date_ = file.date_;
        table_file.file_type_ =
            meta::TableFileSchema::NEW_INDEX;  // for multi-db-path, distribute index file averagely to each path
        status = meta_ptr_->CreateTableFile(table_file);
        if (!status.ok()) {
            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
            ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
        }

        if (shutting_down_.load(std::memory_order_acquire)) {
            ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
            break;
        }
        scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
        job->AddToIndexFiles(file_ptr, table_file);
    }
    job->WaitBuildIndexFinish();

//    for (auto &file : to_index_files) {
//        status = BuildIndex(file);
//        if (!status.ok()) {
//            ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
//        }
//
//        if (shutting_down_.load(std::memory_order_acquire)) {
//            ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
//            break;
//        }
//    }

    ENGINE_LOG_TRACE << "Background build index thread exit";
}
+25 −4
Original line number Diff line number Diff line
@@ -15,9 +15,12 @@
// specific language governing permissions and limitations
// under the License.

#include <src/scheduler/tasklabel/SpecResLabel.h>
#include "scheduler/TaskCreator.h"
#include "scheduler/tasklabel/BroadcastLabel.h"
#include "tasklabel/DefaultLabel.h"
#include "SchedInst.h"


namespace zilliz {
namespace milvus {
@@ -32,6 +35,9 @@ TaskCreator::Create(const JobPtr& job) {
        case JobType::DELETE: {
            return Create(std::static_pointer_cast<DeleteJob>(job));
        }
        case JobType::BUILD: {
            return Create(std::static_pointer_cast<BuildIndexJob>(job));
        }
        default: {
            // TODO: error
            return std::vector<TaskPtr>();
@@ -63,6 +69,21 @@ TaskCreator::Create(const DeleteJobPtr& job) {
    return tasks;
}

std::vector<TaskPtr>
TaskCreator::Create(const zilliz::milvus::scheduler::BuildIndexJobPtr &job) {
    std::vector<TaskPtr> tasks;
    //TODO(yukun): remove "disk" hardcode here
    ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("disk");

    for (auto &to_index_file : job->to_index_files()) {
        auto task = std::make_shared<XBuildIndexTask>(to_index_file.second);
        task->label() = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
        task->job_ = job;
        tasks.emplace_back(task);
    }
    return tasks;
}

}  // namespace scheduler
}  // namespace milvus
}  // namespace zilliz
+4 −0
Original line number Diff line number Diff line
@@ -32,6 +32,7 @@
#include "job/SearchJob.h"
#include "task/DeleteTask.h"
#include "task/SearchTask.h"
#include "task/BuildIndexTask.h"
#include "task/Task.h"

namespace zilliz {
@@ -49,6 +50,9 @@ class TaskCreator {

    static std::vector<TaskPtr>
    Create(const DeleteJobPtr& job);

    static std::vector<TaskPtr>
    Create(const BuildIndexJobPtr& job);
};

}  // namespace scheduler
+33 −17
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@
#include "../Algorithm.h"
#include "Action.h"
#include "src/cache/GpuCacheMgr.h"
#include "src/server/Config.h"

namespace zilliz {
namespace milvus {
@@ -142,7 +143,7 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
            transport_costs.push_back(transport_cost);
            paths.emplace_back(path);
        }

        if (task->job_.lock()->type() == JobType::SEARCH) {
            // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
            uint64_t min_cost = std::numeric_limits<uint64_t>::max();
            uint64_t min_cost_idx = 0;
@@ -162,6 +163,21 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
            // step 3: set path in task
            Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
            task->path() = task_path;
        } else if (task->job_.lock()->type() == JobType::BUILD) {
            //step2: Read device id in config
            //get build index gpu resource
            server::Config &config = server::Config::GetInstance();
            int32_t build_index_gpu;
            Status stat = config.GetDBConfigBuildIndexGPU(build_index_gpu);

            for (uint64_t i = 0; i < compute_resources.size(); ++i) {
                if (compute_resources[i]->device_id() == build_index_gpu) {
                    Path task_path(paths[i], paths[i].size() - 1);
                    task->path() = task_path;
                    break;
                }
            }
        }
    }

    if (resource->name() == task->path().Last()) {
+63 −0
Original line number Diff line number Diff line
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "BuildIndexJob.h"
#include "utils/Log.h"


namespace zilliz {
namespace milvus {
namespace scheduler {

BuildIndexJob::BuildIndexJob(zilliz::milvus::scheduler::JobId id)
    : Job(id, JobType::BUILD){

}

bool
BuildIndexJob::AddToIndexFiles(const engine::meta::TableFileSchemaPtr &to_index_file,
                               const TableFileSchema table_file) {
    std::unique_lock<std::mutex> lock(mutex_);
    if (to_index_file == nullptr) {
        return false;
    }

    SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " add to_index file: " << to_index_file->id_;

    to_index_files_[to_index_file->id_] = to_index_file;
    table_files_[table_file.id_] = table_file;
}

Status&
BuildIndexJob::WaitBuildIndexFinish() {
    std::unique_lock<std::mutex> lock(mutex_);
    cv_.wait(lock, [this] { return to_index_files_.empty(); });
    SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " all done";
}

void
BuildIndexJob::BuildIndexDone(size_t to_index_id) {
    std::unique_lock<std::mutex> lock(mutex_);
    to_index_files_.erase(to_index_id);
    cv_.notify_all();
    SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " finish index file: " << to_index_id;
}


}
}
}
 No newline at end of file
Loading