Unverified Commit b4af3fd1 authored by Yichao Yang's avatar Yichao Yang Committed by GitHub
Browse files

[Feature-2815][server] One worker can belong to different workergroups (#2934)



* [Feature-2815][server] One worker can belong to different workergroups

* Feature: Add test cases

* Update MonitorService.java

* Update MonitorService.java

Co-authored-by: default avatardailidong <dailidong66@gmail.com>
parent 657ea475
Loading
Loading
Loading
Loading
+42 −12
Original line number Diff line number Diff line
@@ -16,23 +16,27 @@
 */
package org.apache.dolphinscheduler.api.service;

import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.dao.MonitorDBDao;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerServerModel;
import org.apache.dolphinscheduler.dao.MonitorDBDao;
import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.dolphinscheduler.common.utils.Preconditions.*;
import com.google.common.collect.Sets;

/**
 * monitor service
@@ -108,9 +112,35 @@ public class MonitorService extends BaseService{
  public Map<String,Object> queryWorker(User loginUser) {

    Map<String, Object> result = new HashMap<>(5);
    List<Server> masterServers = getServerListFromZK(false);
    List<WorkerServerModel> workerServers = getServerListFromZK(false)
            .stream()
            .map((Server server) -> {
              WorkerServerModel model = new WorkerServerModel();
              model.setId(server.getId());
              model.setHost(server.getHost());
              model.setPort(server.getPort());
              model.setZkDirectories(Sets.newHashSet(server.getZkDirectory()));
              model.setResInfo(server.getResInfo());
              model.setCreateTime(server.getCreateTime());
              model.setLastHeartbeatTime(server.getLastHeartbeatTime());
              return model;
            })
            .collect(Collectors.toList());

    Map<String, WorkerServerModel> workerHostPortServerMapping = workerServers
            .stream()
            .collect(Collectors.toMap(
                    (WorkerServerModel worker) -> {
                        String[] s = worker.getZkDirectories().iterator().next().split("/");
                        return s[s.length - 1];
                    }
                    , Function.identity()
                    , (WorkerServerModel oldOne, WorkerServerModel newOne) -> {
                      oldOne.getZkDirectories().addAll(newOne.getZkDirectories());
                      return oldOne;
                    }));

    result.put(Constants.DATA_LIST, masterServers);
    result.put(Constants.DATA_LIST, workerHostPortServerMapping.values());
    putMsg(result,Status.SUCCESS);

    return result;
+122 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.common.model;


import java.util.Date;
import java.util.Set;

import com.fasterxml.jackson.annotation.JsonFormat;

/**
 * server
 */
public class WorkerServerModel {

    /**
     * id
     */
    private int id;

    /**
     * host
     */
    private String host;

    /**
     * port
     */
    private int port;

    /**
     * worker directories in zookeeper
     */
    private Set<String> zkDirectories;

    /**
     * resource info about CPU and memory
     */
    private String resInfo;

    /**
     * create time
     */
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    private Date createTime;

    /**
     * last heart beat time
     */
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    private Date lastHeartbeatTime;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Set<String> getZkDirectories() {
        return zkDirectories;
    }

    public void setZkDirectories(Set<String> zkDirectories) {
        this.zkDirectories = zkDirectories;
    }

    public Date getLastHeartbeatTime() {
        return lastHeartbeatTime;
    }

    public void setLastHeartbeatTime(Date lastHeartbeatTime) {
        this.lastHeartbeatTime = lastHeartbeatTime;
    }

    public String getResInfo() {
        return resInfo;
    }

    public void setResInfo(String resInfo) {
        this.resInfo = resInfo;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }
}
+9 −6
Original line number Diff line number Diff line
@@ -16,6 +16,13 @@
 */
package org.apache.dolphinscheduler.server.master.registry;

import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
@@ -30,11 +37,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Sets;

/**
 *  master registry
@@ -97,7 +100,7 @@ public class MasterRegistry {
        HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
                masterConfig.getMasterReservedMemory(),
                masterConfig.getMasterMaxCpuloadAvg(),
                getMasterPath(),
                Sets.newHashSet(getMasterPath()),
                zookeeperRegistryCenter);

        this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
+18 −15
Original line number Diff line number Diff line
@@ -17,16 +17,17 @@

package org.apache.dolphinscheduler.server.registry;

import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;

import java.util.Date;
import java.util.Set;

import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;

import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;

public class HeartBeatTask extends Thread {

    private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
@@ -34,25 +35,24 @@ public class HeartBeatTask extends Thread{
    private String startTime;
    private double reservedMemory;
    private double maxCpuloadAvg;
    private String heartBeatPath;
    private Set<String> heartBeatPaths;
    private ZookeeperRegistryCenter zookeeperRegistryCenter;

    public HeartBeatTask(String startTime,
                         double reservedMemory,
                         double maxCpuloadAvg,
                         String heartBeatPath,
                         Set<String> heartBeatPaths,
                         ZookeeperRegistryCenter zookeeperRegistryCenter) {
        this.startTime = startTime;
        this.reservedMemory = reservedMemory;
        this.maxCpuloadAvg = maxCpuloadAvg;
        this.heartBeatPath = heartBeatPath;
        this.heartBeatPaths = heartBeatPaths;
        this.zookeeperRegistryCenter = zookeeperRegistryCenter;
    }

    @Override
    public void run() {
        try {

            double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
            double loadAverage = OSUtils.loadAverage();

@@ -76,7 +76,10 @@ public class HeartBeatTask extends Thread{
            builder.append(status).append(COMMA);
            //save process id
            builder.append(OSUtils.getProcessID());

            for (String heartBeatPath : heartBeatPaths) {
                zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, builder.toString());
            }
        } catch (Throwable ex) {
            logger.error("error write heartbeat info", ex);
        }
+8 −6
Original line number Diff line number Diff line
@@ -17,6 +17,8 @@
 */
package org.apache.dolphinscheduler.server.worker.config;

import java.util.Set;

import org.apache.dolphinscheduler.common.Constants;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
@@ -41,8 +43,8 @@ public class WorkerConfig {
    @Value("${worker.reserved.memory:0.3}")
    private double workerReservedMemory;

    @Value("${worker.group: default}")
    private String workerGroup;
    @Value("#{'${worker.groups:default}'.split(',')}")
    private Set<String> workerGroups;

    @Value("${worker.listen.port: 1234}")
    private int listenPort;
@@ -55,12 +57,12 @@ public class WorkerConfig {
        this.listenPort = listenPort;
    }

    public String getWorkerGroup() {
        return workerGroup;
    public Set<String> getWorkerGroups() {
        return workerGroups;
    }

    public void setWorkerGroup(String workerGroup) {
        this.workerGroup = workerGroup;
    public void setWorkerGroups(Set<String> workerGroups) {
        this.workerGroups = workerGroups;
    }

    public int getWorkerExecThreads() {
Loading