Commit ace72ff9 authored by 余昆's avatar 余昆
Browse files

MS-639 SQ8H index created failed and server hang


Former-commit-id: e6af3a26032d737fba6e22c04e4298e973ff60df
parent aac2cbd2
Loading
Loading
Loading
Loading
+25 −0
Original line number Diff line number Diff line
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.


#include "BuildMgr.h"

namespace milvus {
namespace scheduler {

}  // namespace scheduler
}  // namespace milvus
+63 −0
Original line number Diff line number Diff line
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <atomic>
#include <condition_variable>
#include <deque>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

namespace milvus {
namespace scheduler {

class BuildMgr {
 public:
    explicit BuildMgr(int64_t numoftasks) : numoftasks_(numoftasks) {
    }

 public:
    void
    Put() {
        ++numoftasks_;
    }

    void
    take() {
        --numoftasks_;
    }

    int64_t
    numoftasks() {
        return (int64_t)numoftasks_;
    }

 private:
    std::atomic_long numoftasks_;
};

using BuildMgrPtr = std::shared_ptr<BuildMgr>;

}  // namespace scheduler
}  // namespace milvus
+3 −0
Original line number Diff line number Diff line
@@ -41,6 +41,9 @@ std::mutex JobMgrInst::mutex_;
OptimizerPtr OptimizerInst::instance = nullptr;
std::mutex OptimizerInst::mutex_;

BuildMgrPtr BuildMgrInst::instance = nullptr;
std::mutex BuildMgrInst::mutex_;

void
load_simple_config() {
    server::Config& config = server::Config::GetInstance();
+19 −0
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@

#pragma once

#include "BuildMgr.h"
#include "JobMgr.h"
#include "ResourceMgr.h"
#include "Scheduler.h"
@@ -105,6 +106,24 @@ class OptimizerInst {
    static std::mutex mutex_;
};

class BuildMgrInst {
 public:
    static BuildMgrPtr
    GetInstance() {
        if (instance == nullptr) {
            std::lock_guard<std::mutex> lock(mutex_);
            if (instance == nullptr) {
                instance = std::make_shared<BuildMgr>(4);
            }
        }
        return instance;
    }

 private:
    static BuildMgrPtr instance;
    static std::mutex mutex_;
};

void
StartSchedulerService();

+25 −3
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@
// under the License.

#include "scheduler/TaskTable.h"
#include "scheduler/SchedInst.h"
#include "Utils.h"
#include "event/TaskTableUpdatedEvent.h"
#include "utils/Log.h"
@@ -164,6 +165,15 @@ TaskTable::PickToLoad(uint64_t limit) {
        if (not table_[j]) {
            SERVER_LOG_WARNING << "table[" << j << "] is nullptr";
        }

        if (table_[j]->task->path().Current() == "cpu") {
            if (table_[j]->task->Type() == TaskType::BuildIndexTask
                && BuildMgrInst::GetInstance()->numoftasks() < 1) {
                return std::vector<uint64_t>();
            }
        }


        if (table_[j]->state == TaskTableItemState::LOADED) {
            ++count;
            if (count > 2)
@@ -177,9 +187,21 @@ TaskTable::PickToLoad(uint64_t limit) {
        if (not cross && table_[i]->IsFinish()) {
            last_finish_ = i;
        } else if (table_[i]->state == TaskTableItemState::START) {
            auto task = table_[i]->task;
            if (task->Type() == TaskType::BuildIndexTask && task->path().Current() == "cpu") {
                if (BuildMgrInst::GetInstance()->numoftasks() == 0) {
                    break;
                } else {
                    cross = true;
                    indexes.push_back(i);
                    ++count;
                    BuildMgrInst::GetInstance()->take();
                }
            } else {
                cross = true;
                indexes.push_back(i);
                ++count;
            }
        }
    }
    return indexes;
Loading