Commit 5199e4b8 authored by dailidong's avatar dailidong
Browse files

add monitor

parent f84980ec
Loading
Loading
Loading
Loading
+129 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.escheduler.api.controller;


import cn.escheduler.api.service.MonitorService;
import cn.escheduler.api.service.ServerService;
import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.Result;
import cn.escheduler.dao.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;

import java.util.Map;

import static cn.escheduler.api.enums.Status.*;


/**
 * monitor controller
 */
@RestController
@RequestMapping("/monitor")
public class MonitorController extends BaseController{

    private static final Logger logger = LoggerFactory.getLogger(MonitorController.class);

    @Autowired
    private ServerService serverService;

    @Autowired
    private MonitorService monitorService;

    /**
     * master list
     * @param loginUser
     * @return
     */
    @GetMapping(value = "/master/list")
    @ResponseStatus(HttpStatus.OK)
    public Result listMaster(@RequestAttribute(value = Constants.SESSION_USER) User loginUser) {
        logger.info("login user: {}, query all master", loginUser.getUserName());
        try{
            logger.info("list master, user:{}", loginUser.getUserName());
            Map<String, Object> result = serverService.queryMaster(loginUser);
            return returnDataList(result);
        }catch (Exception e){
            logger.error(LIST_MASTERS_ERROR.getMsg(),e);
            return error(LIST_MASTERS_ERROR.getCode(),
                    LIST_MASTERS_ERROR.getMsg());
        }
    }

    /**
     * worker list
     * @param loginUser
     * @return
     */
    @GetMapping(value = "/worker/list")
    @ResponseStatus(HttpStatus.OK)
    public Result listWorker(@RequestAttribute(value = Constants.SESSION_USER) User loginUser) {
        logger.info("login user: {}, query all workers", loginUser.getUserName());
        try{
            Map<String, Object> result = serverService.queryWorker(loginUser);
            return returnDataList(result);
        }catch (Exception e){
            logger.error(LIST_WORKERS_ERROR.getMsg(),e);
            return error(LIST_WORKERS_ERROR.getCode(),
                    LIST_WORKERS_ERROR.getMsg());
        }
    }

    /**
     * query database state
     * @param loginUser
     * @return
     */
    @GetMapping(value = "/database")
    @ResponseStatus(HttpStatus.OK)
    public Result queryDatabaseState(@RequestAttribute(value = Constants.SESSION_USER) User loginUser) {
        logger.info("login user: {}, query database state", loginUser.getUserName());
        try{

            Map<String, Object> result = monitorService.queryDatabaseState(loginUser);
            return returnDataList(result);
        }catch (Exception e){
            logger.error(QUERY_DATABASE_STATE_ERROR.getMsg(),e);
            return error(QUERY_DATABASE_STATE_ERROR.getCode(),
                    QUERY_DATABASE_STATE_ERROR.getMsg());
        }
    }

    /**
     * query zookeeper state
     * @param loginUser
     * @return
     */
    @GetMapping(value = "/zookeeper/list")
    @ResponseStatus(HttpStatus.OK)
    public Result queryZookeeperState(@RequestAttribute(value = Constants.SESSION_USER) User loginUser) {
        logger.info("login user: {}, query zookeeper state", loginUser.getUserName());
        try{
            Map<String, Object> result = monitorService.queryZookeeperState(loginUser);
            return returnDataList(result);
        }catch (Exception e){
            logger.error(QUERY_ZOOKEEPER_STATE_ERROR.getMsg(),e);
            return error(QUERY_ZOOKEEPER_STATE_ERROR.getCode(),
                    QUERY_ZOOKEEPER_STATE_ERROR.getMsg());
        }
    }

}
+78 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.escheduler.api.service;

import cn.escheduler.api.enums.Status;
import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.ZookeeperMonitorUtils;
import cn.escheduler.dao.MonitorDBDao;
import cn.escheduler.dao.model.MonitorRecord;
import cn.escheduler.dao.model.User;
import cn.escheduler.dao.model.ZookeeperRecord;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * monitor service
 */
@Service
public class MonitorService extends BaseService{

  /**
   * query database state
   *
   * @return
   */
  public Map<String,Object> queryDatabaseState(User loginUser) {
    Map<String, Object> result = new HashMap<>(5);
    if (checkAdmin(loginUser, result)){
      return result;
    }

    List<MonitorRecord> monitorRecordList = MonitorDBDao.queryDatabaseState();

    result.put(Constants.DATA_LIST, monitorRecordList);
    putMsg(result, Status.SUCCESS);

    return result;

  }


  /**
   * query zookeeper state
   *
   * @return
   */
  public Map<String,Object> queryZookeeperState(User loginUser) {
    Map<String, Object> result = new HashMap<>(5);
    if (checkAdmin(loginUser, result)){
      return result;
    }

    List<ZookeeperRecord> zookeeperRecordList = ZookeeperMonitorUtils.zookeeperInfoList();

    result.put(Constants.DATA_LIST, zookeeperRecordList);
    putMsg(result, Status.SUCCESS);

    return result;

  }
}
+211 −0
Original line number Diff line number Diff line
package cn.escheduler.api.utils;

import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.client.FourLetterWordMain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Scanner;

/**
 *	zookeeper状态监控:4字口诀 
 *
 */
public class ZooKeeperState {

	private static final Logger logger = LoggerFactory.getLogger(ZooKeeperState.class);

	private final String host;
	private final int port;

	private int minLatency = -1, avgLatency = -1, maxLatency = -1;
	private long received = -1;
	private long sent = -1;
	private int outStanding = -1;
	private long zxid = -1;
	private String mode = null;
	private int nodeCount = -1;
	private int watches = -1;
	private int connections = -1;

	public ZooKeeperState(String connectionString) {
		String host = connectionString.substring(0,
				connectionString.indexOf(':'));
		int port = Integer.parseInt(connectionString.substring(connectionString
				.indexOf(':') + 1));
		this.host = host;
		this.port = port;
	}

	public void getZookeeperInfo() {
		String content = cmd("srvr");
		if (StringUtils.isNotBlank(content)) {
			Scanner scannerForStat = new Scanner(content);
			while (scannerForStat.hasNext()) {
				String line = scannerForStat.nextLine();
				if (line.startsWith("Latency min/avg/max:")) {
					String[] latencys = getStringValueFromLine(line).split("/");
					minLatency = Integer.parseInt(latencys[0]);
					avgLatency = Integer.parseInt(latencys[1]);
					maxLatency = Integer.parseInt(latencys[2]);
				} else if (line.startsWith("Received:")) {
					received = Long.parseLong(getStringValueFromLine(line));
				} else if (line.startsWith("Sent:")) {
					sent = Long.parseLong(getStringValueFromLine(line));
				} else if (line.startsWith("Outstanding:")) {
					outStanding = Integer.parseInt(getStringValueFromLine(line));
				} else if (line.startsWith("Zxid:")) {
					zxid = Long.parseLong(getStringValueFromLine(line).substring(2), 16);
				} else if (line.startsWith("Mode:")) {
					mode = getStringValueFromLine(line);
				} else if (line.startsWith("Node count:")) {
					nodeCount = Integer.parseInt(getStringValueFromLine(line));
				}
			}
			scannerForStat.close();
		}

		String wchsText = cmd("wchs");
		if (StringUtils.isNotBlank(wchsText)) {
			Scanner scannerForWchs = new Scanner(wchsText);
			while (scannerForWchs.hasNext()) {
				String line = scannerForWchs.nextLine();
				if (line.startsWith("Total watches:")) {
					watches = Integer.parseInt(getStringValueFromLine(line));
				}
			}
			scannerForWchs.close();
		}
		
		String consText = cmd("cons");
		if (StringUtils.isNotBlank(consText)) {
			Scanner scannerForCons = new Scanner(consText);
			if (StringUtils.isNotBlank(consText)) {
				connections = 0;
			}
			while (scannerForCons.hasNext()) {
				@SuppressWarnings("unused")
				String line = scannerForCons.nextLine();
				++connections;
			}
			scannerForCons.close();
		}
	}


	public boolean ruok() {
		return "imok\n".equals(cmd("ruok"));
	}


	private String getStringValueFromLine(String line) {
		return line.substring(line.indexOf(":") + 1, line.length()).replaceAll(
				" ", "").trim();
	}

	private class SendThread extends Thread {
		private String cmd;

		public String ret = "";

		public SendThread(String cmd) {
			this.cmd = cmd;
		}

		@Override
		public void run() {
			try {
				ret = FourLetterWordMain.send4LetterWord(host, port, cmd);
			} catch (IOException e) {
				logger.error(e.getMessage(),e);
				return;
			}
		}

	}

	private String cmd(String cmd) {
		final int waitTimeout = 5;
		SendThread sendThread = new SendThread(cmd);
		sendThread.setName("FourLetterCmd:" + cmd);
		sendThread.start();
		try {
			sendThread.join(waitTimeout * 1000);
			return sendThread.ret;
		} catch (InterruptedException e) {
			logger.error("send " + cmd + " to server " + host + ":" + port + " failed!", e);
		}
		return "";
	}

	public Logger getLogger() {
		return logger;
	}

	public String getHost() {
		return host;
	}

	public int getPort() {
		return port;
	}

	public int getMinLatency() {
		return minLatency;
	}

	public int getAvgLatency() {
		return avgLatency;
	}

	public int getMaxLatency() {
		return maxLatency;
	}

	public long getReceived() {
		return received;
	}

	public long getSent() {
		return sent;
	}

	public int getOutStanding() {
		return outStanding;
	}

	public long getZxid() {
		return zxid;
	}

	public String getMode() {
		return mode;
	}

	public int getNodeCount() {
		return nodeCount;
	}

	public int getWatches() {
		return watches;
	}

	public int getConnections() {
		return connections;
	}

	@Override
	public String toString() {
		return "ZooKeeperState [host=" + host + ", port=" + port
				+ ", minLatency=" + minLatency + ", avgLatency=" + avgLatency
				+ ", maxLatency=" + maxLatency + ", received=" + received
				+ ", sent=" + sent + ", outStanding=" + outStanding + ", zxid="
				+ zxid + ", mode=" + mode + ", nodeCount=" + nodeCount
				+ ", watches=" + watches + ", connections="
				+ connections + "]";
	}
	
	
	
}
+72 −0
Original line number Diff line number Diff line
package cn.escheduler.api.utils;

import cn.escheduler.common.zk.AbstractZKClient;
import cn.escheduler.dao.model.ZookeeperRecord;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;


/**
 *	monitor zookeeper info
 */
public class ZookeeperMonitorUtils {

	private static final Logger LOG = LoggerFactory.getLogger(ZookeeperMonitorUtils.class);
	private static final String zookeeperList = AbstractZKClient.getZookeeperQuorum();

	/**
	 *
	 * @return zookeeper info list
	 */
	public static List<ZookeeperRecord> zookeeperInfoList(){
		String zookeeperServers = zookeeperList.replaceAll("[\\t\\n\\x0B\\f\\r]", "");
		try{
			return zookeeperInfoList(zookeeperServers);
		}catch(Exception e){
			LOG.error(e.getMessage(),e);
		}
		return null;
	}


	private static List<ZookeeperRecord> zookeeperInfoList(String zookeeperServers) {

		List<ZookeeperRecord> list = new ArrayList<>(5);

		if(StringUtils.isNotBlank(zookeeperServers)){
			String[] zookeeperServersArray = zookeeperServers.split(",");
			
			for (String zookeeperServer : zookeeperServersArray) {
				ZooKeeperState state = new ZooKeeperState(zookeeperServer);
				boolean ok = state.ruok();
				if(ok){
					state.getZookeeperInfo();
				}
				
				String hostName = zookeeperServer;
				int connections = state.getConnections();
				int watches = state.getWatches();
				long sent = state.getSent();
				long received = state.getReceived();
				String mode =  state.getMode();
				int minLatency =  state.getMinLatency();
				int avgLatency = state.getAvgLatency();
				int maxLatency = state.getMaxLatency();
				int nodeCount = state.getNodeCount();
				int status = ok ? 1 : 0;
				Date date = new Date();

				ZookeeperRecord zookeeperRecord = new ZookeeperRecord(hostName,connections,watches,sent,received,mode,minLatency,avgLatency,maxLatency,nodeCount,status,date);
				list.add(zookeeperRecord);

			}
		}

		return list;
	}
}
+105 −0
Original line number Diff line number Diff line
package cn.escheduler.api.controller;

import cn.escheduler.api.enums.Status;
import cn.escheduler.api.utils.Result;
import cn.escheduler.common.enums.ResourceType;
import cn.escheduler.common.utils.JSONUtils;
import com.alibaba.fastjson.JSONObject;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.web.context.WebApplicationContext;

import static org.junit.Assert.*;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

@RunWith(SpringRunner.class)
@SpringBootTest
public class MonitorControllerTest {

    private static final Logger logger = LoggerFactory.getLogger(MonitorControllerTest.class);
    public static final String SESSION_ID = "sessionId";
    public static String SESSION_ID_VALUE;

    private MockMvc mockMvc;

    @Autowired
    private WebApplicationContext webApplicationContext;



    @Before
    public void setUp() {
        mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();
        SESSION_ID_VALUE = "bad76fc4-2eb4-4aae-b32b-d650e4beb6af";
    }

    @Test
    public void listMaster() throws Exception {

        MvcResult mvcResult = mockMvc.perform(get("/monitor/master/list")
                .header(SESSION_ID, SESSION_ID_VALUE)
               /* .param("type", ResourceType.FILE.name())*/   )
                .andExpect(status().isOk())
                .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
                .andReturn();

        Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
        result.getCode().equals(Status.SUCCESS.getCode());


        JSONObject object = (JSONObject) JSONObject.parse(mvcResult.getResponse().getContentAsString());

        Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
        logger.info(mvcResult.getResponse().getContentAsString());
    }


    @Test
    public void queryDatabaseState() throws Exception {
        MvcResult mvcResult = mockMvc.perform(get("/monitor/database")
                        .header(SESSION_ID, SESSION_ID_VALUE)
                /* .param("type", ResourceType.FILE.name())*/   )
                .andExpect(status().isOk())
                .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
                .andReturn();

        Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
        result.getCode().equals(Status.SUCCESS.getCode());


        Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
        logger.info(mvcResult.getResponse().getContentAsString());
    }


    @Test
    public void queryZookeeperState() throws Exception {
        MvcResult mvcResult = mockMvc.perform(get("/monitor/zookeeper/list")
                        .header(SESSION_ID, SESSION_ID_VALUE)
                /* .param("type", ResourceType.FILE.name())*/   )
                .andExpect(status().isOk())
                .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
                .andReturn();

        Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
        result.getCode().equals(Status.SUCCESS.getCode());



        Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
        logger.info(mvcResult.getResponse().getContentAsString());
    }
}
 No newline at end of file
Loading