Unverified Commit 64f1de8d authored by kezhenxu94's avatar kezhenxu94 Committed by GitHub
Browse files

Merge branch 'dev' into issue/1463

parents e9b8c73d 6e81dd3b
Loading
Loading
Loading
Loading
+23 −7
Original line number Diff line number Diff line
@@ -16,8 +16,11 @@
 */
package org.apache.dolphinscheduler.alert;

import org.apache.dolphinscheduler.alert.plugin.EmailAlertPlugin;
import org.apache.dolphinscheduler.alert.runner.AlertSender;
import org.apache.dolphinscheduler.alert.utils.Constants;
import org.apache.dolphinscheduler.alert.utils.PropertyUtils;
import org.apache.dolphinscheduler.common.plugin.FilePluginManager;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
@@ -41,8 +44,21 @@ public class AlertServer {

    private static AlertServer instance;

    public AlertServer() {
    private FilePluginManager alertPluginManager;

    private static final String[] whitePrefixes = new String[]{"org.apache.dolphinscheduler.plugin.utils."};

    private static final String[] excludePrefixes = new String[]{
            "org.apache.dolphinscheduler.plugin.",
            "ch.qos.logback.",
            "org.slf4j."
    };

    public AlertServer() {
        alertPluginManager =
                new FilePluginManager(PropertyUtils.getString(Constants.PLUGIN_DIR), whitePrefixes, excludePrefixes);
        // add default alert plugins
        alertPluginManager.addPlugin(new EmailAlertPlugin());
    }

    public synchronized static AlertServer getInstance() {
@@ -62,7 +78,7 @@ public class AlertServer {
                Thread.currentThread().interrupt();
            }
            List<Alert> alerts = alertDao.listWaitExecutionAlert();
            alertSender = new AlertSender(alerts, alertDao);
            alertSender = new AlertSender(alerts, alertDao, alertPluginManager);
            alertSender.run();
        }
    }
+2 −3
Original line number Diff line number Diff line
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.alert.manager;

import org.apache.dolphinscheduler.alert.utils.MailUtils;
import org.apache.dolphinscheduler.common.enums.ShowType;

import java.util.List;
import java.util.Map;
@@ -35,7 +34,7 @@ public class EmailManager {
     * @param showType the showType
     * @return the send result
     */
    public Map<String,Object> send(List<String> receviersList,List<String> receviersCcList,String title,String content,ShowType showType){
    public Map<String,Object> send(List<String> receviersList,List<String> receviersCcList,String title,String content,String showType){

        return MailUtils.sendMails(receviersList, receviersCcList, title, content, showType);
    }
@@ -48,7 +47,7 @@ public class EmailManager {
     * @param showType the showType
     * @return the send result
     */
    public Map<String,Object> send(List<String> receviersList,String title,String content,ShowType showType){
    public Map<String,Object> send(List<String> receviersList,String title,String content,String showType){

        return MailUtils.sendMails(receviersList,title, content, showType);
    }
+5 −5
Original line number Diff line number Diff line
@@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.alert.manager;

import org.apache.dolphinscheduler.alert.utils.Constants;
import org.apache.dolphinscheduler.alert.utils.EnterpriseWeChatUtils;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.plugin.model.AlertInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -35,18 +35,18 @@ public class EnterpriseWeChatManager {
    private static final Logger logger = LoggerFactory.getLogger(EnterpriseWeChatManager.class);
    /**
     * Enterprise We Chat send
     * @param alert the alert
     * @param alertInfo the alert info
     * @param token the token
     * @return the send result
     */
    public Map<String,Object> send(Alert alert, String token){
    public Map<String,Object> send(AlertInfo alertInfo, String token){
        Map<String,Object> retMap = new HashMap<>();
        retMap.put(Constants.STATUS, false);
        String agentId = EnterpriseWeChatUtils.ENTERPRISE_WE_CHAT_AGENT_ID;
        String users = EnterpriseWeChatUtils.ENTERPRISE_WE_CHAT_USERS;
        List<String> userList = Arrays.asList(users.split(","));
        logger.info("send message {}",alert);
        String msg = EnterpriseWeChatUtils.makeUserSendMsg(userList, agentId,EnterpriseWeChatUtils.markdownByAlert(alert));
        logger.info("send message {}", alertInfo.getAlertData().getTitle());
        String msg = EnterpriseWeChatUtils.makeUserSendMsg(userList, agentId,EnterpriseWeChatUtils.markdownByAlert(alertInfo.getAlertData()));
        try {
            EnterpriseWeChatUtils.sendEnterpriseWeChat(Constants.UTF_8, msg, token);
        } catch (IOException e) {
+133 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.alert.plugin;

import org.apache.dolphinscheduler.alert.manager.EmailManager;
import org.apache.dolphinscheduler.alert.manager.EnterpriseWeChatManager;
import org.apache.dolphinscheduler.alert.utils.Constants;
import org.apache.dolphinscheduler.alert.utils.EnterpriseWeChatUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.plugin.api.AlertPlugin;
import org.apache.dolphinscheduler.plugin.model.AlertData;
import org.apache.dolphinscheduler.plugin.model.AlertInfo;
import org.apache.dolphinscheduler.plugin.model.PluginName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

/**
 * EmailAlertPlugin
 *
 * This plugin is a default plugin, and mix up email and enterprise wechat, because adapt with former alert behavior
 */
public class EmailAlertPlugin implements AlertPlugin {

    private static final Logger logger = LoggerFactory.getLogger(EmailAlertPlugin.class);

    private PluginName pluginName;

    private static final EmailManager emailManager = new EmailManager();
    private static final EnterpriseWeChatManager weChatManager = new EnterpriseWeChatManager();

    public EmailAlertPlugin() {
        this.pluginName = new PluginName();
        this.pluginName.setEnglish(Constants.PLUGIN_DEFAULT_EMAIL_EN);
        this.pluginName.setChinese(Constants.PLUGIN_DEFAULT_EMAIL_CH);
    }

    @Override
    public String getId() {
        return Constants.PLUGIN_DEFAULT_EMAIL;
    }

    @Override
    public PluginName getName() {
        return pluginName;
    }

    @Override
    @SuppressWarnings("unchecked")
    public Map<String, Object> process(AlertInfo info) {
        Map<String, Object> retMaps = new HashMap<>();

        AlertData alert = info.getAlertData();

        List<String> receviersList = (List<String>) info.getProp(Constants.PLUGIN_DEFAULT_EMAIL_RECEIVERS);

        // receiving group list
        // custom receiver
        String receivers = alert.getReceivers();
        if (StringUtils.isNotEmpty(receivers)) {
            String[] splits = receivers.split(",");
            receviersList.addAll(Arrays.asList(splits));
        }

        List<String> receviersCcList = new ArrayList<>();
        // Custom Copier
        String receiversCc = alert.getReceiversCc();
        if (StringUtils.isNotEmpty(receiversCc)) {
            String[] splits = receiversCc.split(",");
            receviersCcList.addAll(Arrays.asList(splits));
        }

        if (CollectionUtils.isEmpty(receviersList) && CollectionUtils.isEmpty(receviersCcList)) {
            logger.warn("alert send error : At least one receiver address required");
            retMaps.put(Constants.STATUS, "false");
            retMaps.put(Constants.MESSAGE, "execution failure,At least one receiver address required.");
            return retMaps;
        }

        retMaps = emailManager.send(receviersList, receviersCcList, alert.getTitle(), alert.getContent(),
                alert.getShowType());

        //send flag
        boolean flag = false;

        if (retMaps == null) {
            retMaps = new HashMap<>();
            retMaps.put(Constants.MESSAGE, "alert send error.");
            retMaps.put(Constants.STATUS, "false");
            logger.info("alert send error : {}", retMaps.get(Constants.MESSAGE));
            return retMaps;
        }

        flag = Boolean.parseBoolean(String.valueOf(retMaps.get(Constants.STATUS)));

        if (flag) {
            logger.info("alert send success");
            retMaps.put(Constants.MESSAGE, "email send success.");
            if (EnterpriseWeChatUtils.isEnable()) {
                logger.info("Enterprise WeChat is enable!");
                try {
                    String token = EnterpriseWeChatUtils.getToken();
                    weChatManager.send(info, token);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }

        } else {
            retMaps.put(Constants.MESSAGE, "alert send error.");
            logger.info("alert send error : {}", retMaps.get(Constants.MESSAGE));
        }

        return retMaps;
    }

}
+38 −92
Original line number Diff line number Diff line
@@ -16,22 +16,19 @@
 */
package org.apache.dolphinscheduler.alert.runner;

import org.apache.dolphinscheduler.alert.manager.EmailManager;
import org.apache.dolphinscheduler.alert.manager.EnterpriseWeChatManager;
import org.apache.dolphinscheduler.alert.utils.Constants;
import org.apache.dolphinscheduler.alert.utils.EnterpriseWeChatUtils;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.plugin.PluginManager;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.plugin.api.AlertPlugin;
import org.apache.dolphinscheduler.plugin.model.AlertData;
import org.apache.dolphinscheduler.plugin.model.AlertInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

@@ -42,24 +39,22 @@ public class AlertSender{

    private static final Logger logger = LoggerFactory.getLogger(AlertSender.class);

    private static final EmailManager emailManager= new EmailManager();
    private static final EnterpriseWeChatManager weChatManager= new EnterpriseWeChatManager();


    private List<Alert> alertList;
    private AlertDao alertDao;
    private PluginManager pluginManager;

    public AlertSender() {
    }

    public AlertSender(){}
    public AlertSender(List<Alert> alertList, AlertDao alertDao){
    public AlertSender(List<Alert> alertList, AlertDao alertDao, PluginManager pluginManager) {
        super();
        this.alertList = alertList;
        this.alertDao = alertDao;
        this.pluginManager = pluginManager;
    }

    public void run() {

        List<User> users;

        Map<String, Object> retMaps = null;
        for (Alert alert : alertList) {
            users = alertDao.listUserByAlertgroupId(alert.getAlertGroupId());
@@ -69,86 +64,37 @@ public class AlertSender{
            for (User user : users) {
                receviersList.add(user.getEmail());
            }
            // custom receiver
            String receivers = alert.getReceivers();
            if (StringUtils.isNotEmpty(receivers)){
                String[] splits = receivers.split(",");
                receviersList.addAll(Arrays.asList(splits));
            }

            // copy list
            List<String> receviersCcList = new ArrayList<>();


            // Custom Copier
            String receiversCc = alert.getReceiversCc();

            if (StringUtils.isNotEmpty(receiversCc)){
                String[] splits = receiversCc.split(",");
                receviersCcList.addAll(Arrays.asList(splits));
            }

            if (CollectionUtils.isEmpty(receviersList) && CollectionUtils.isEmpty(receviersCcList)) {
                logger.warn("alert send error : At least one receiver address required");
                alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, "execution failure,At least one receiver address required.", alert.getId());
                continue;
            }
            AlertData alertData = new AlertData();
            alertData.setId(alert.getId())
                    .setAlertGroupId(alert.getAlertGroupId())
                    .setContent(alert.getContent())
                    .setLog(alert.getLog())
                    .setReceivers(alert.getReceivers())
                    .setReceiversCc(alert.getReceiversCc())
                    .setShowType(alert.getShowType().getDescp())
                    .setTitle(alert.getTitle());

            if (alert.getAlertType() == AlertType.EMAIL){
                retMaps = emailManager.send(receviersList,receviersCcList, alert.getTitle(), alert.getContent(),alert.getShowType());
            AlertInfo alertInfo = new AlertInfo();
            alertInfo.setAlertData(alertData);

                alert.setInfo(retMaps);
            }else if (alert.getAlertType() == AlertType.SMS){
                retMaps = emailManager.send(getReciversForSMS(users), alert.getTitle(), alert.getContent(),alert.getShowType());
                alert.setInfo(retMaps);
            } else {
                logger.error("AlertType is not defined. code: {}, descp: {}", 
                    alert.getAlertType().getCode(), 
                    alert.getAlertType().getDescp());
                return;
            }
            alertInfo.addProp("receivers", receviersList);

            //send flag
            boolean flag = false;
            AlertPlugin emailPlugin = pluginManager.findOne(Constants.PLUGIN_DEFAULT_EMAIL);
            retMaps = emailPlugin.process(alertInfo);

            if (null != retMaps) {
                flag = Boolean.parseBoolean(String.valueOf(retMaps.get(Constants.STATUS)));
            }

            if (flag) {
                alertDao.updateAlert(AlertStatus.EXECUTION_SUCCESS, "execution success", alert.getId());
                logger.info("alert send success");
                if (EnterpriseWeChatUtils.isEnable()) {
                    logger.info("Enterprise WeChat is enable!");
                    try {
                        String token = EnterpriseWeChatUtils.getToken();
                        weChatManager.send(alert, token);
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }

            } else {
                if (null != retMaps) {
            if (retMaps == null) {
                alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, "alert send error", alert.getId());
                logger.info("alert send error : return value is null");
            } else if (!Boolean.parseBoolean(String.valueOf(retMaps.get(Constants.STATUS)))) {
                alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, String.valueOf(retMaps.get(Constants.MESSAGE)), alert.getId());
                logger.info("alert send error : {}", retMaps.get(Constants.MESSAGE));
                }
            } else {
                alertDao.updateAlert(AlertStatus.EXECUTION_SUCCESS, (String) retMaps.get(Constants.MESSAGE), alert.getId());
                logger.info("alert send success");
            }
        }

    }


    /**
     * get a list of SMS users
     * @param users
     * @return
     */
    private List<String> getReciversForSMS(List<User> users){
        List<String> list = new ArrayList<>();
        for (User user : users){
            list.add(user.getPhone());
        }
        return list;
    }
}
Loading