Commit db0d7885 authored by huyuanming's avatar huyuanming
Browse files

Merge remote-tracking branch 'upstream/dev-1.1.0' into dev-1.1.0

parents 6b477f49 c3a0dcc4
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -13,13 +13,13 @@

**流程定义**:通过拖拽任务节点并建立任务节点的关联所形成的可视化**DAG**

**流程实例**:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成
**流程实例**:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成,流程定义每运行一次,产生一个流程实例

**任务实例**:任务实例是流程定义中任务节点的实例化,标识着具体的任务执行状态

**任务类型**: 目前支持有SHELL、SQL、SUB_PROCESS、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT,同时计划支持动态插件扩展,注意:其中子 **SUB_PROCESS**  也是一个单独的流程定义,是可以单独启动执行的
**任务类型**: 目前支持有SHELL、SQL、SUB_PROCESS(子流程)、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT(依赖),同时计划支持动态插件扩展,注意:其中子 **SUB_PROCESS**  也是一个单独的流程定义,是可以单独启动执行的

**调度方式:** 系统支持基于cron表达式的定时调度和手动调度。命令类型支持:启动工作流、从当前节点开始执行、恢复被容错的工作流、恢复暂停流程、从失败节点开始执行、补数、调度、重跑、暂停、停止、恢复等待线程。其中 **恢复被容错的工作流****恢复等待线程** 两种命令类型是由调度内部控制使用,外部无法调用
**调度方式:** 系统支持基于cron表达式的定时调度和手动调度。命令类型支持:启动工作流、从当前节点开始执行、恢复被容错的工作流、恢复暂停流程、从失败节点开始执行、补数、定时、重跑、暂停、停止、恢复等待线程。其中 **恢复被容错的工作流****恢复等待线程** 两种命令类型是由调度内部控制使用,外部无法调用

**定时调度**:系统采用 **quartz** 分布式调度器,并同时支持cron表达式可视化的生成

+12 −0
Original line number Diff line number Diff line
@@ -128,4 +128,16 @@ public class Constants {
    public static final String TH_END = "</th>";

    public static final int ALERT_SCAN_INTERVEL = 5000;

    public static final String ENTERPRISE_WECHAT_CORP_ID = "enterprise.wechat.corp.id";

    public static final String ENTERPRISE_WECHAT_SECRET = "enterprise.wechat.secret";

    public static final String ENTERPRISE_WECHAT_TOKEN_URL = "enterprise.wechat.token.url";

    public static final String ENTERPRISE_WECHAT_PUSH_URL = "enterprise.wechat.push.url";

    public static final String ENTERPRISE_WECHAT_TEAM_SEND_MSG = "enterprise.wechat.team.send.msg";

    public static final String ENTERPRISE_WECHAT_USER_SEND_MSG = "enterprise.wechat.user.send.msg";
}
+167 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.escheduler.alert.utils;

import com.alibaba.fastjson.JSON;

import com.google.common.reflect.TypeToken;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;

import static cn.escheduler.alert.utils.PropertyUtils.getString;

/**
 * qiye weixin utils
 */
public class EnterpriseWeChatUtils {

    public static final Logger logger = LoggerFactory.getLogger(EnterpriseWeChatUtils.class);

    private static final String enterpriseWeChatCorpId = getString(Constants.ENTERPRISE_WECHAT_CORP_ID);

    private static final String enterpriseWeChatSecret = getString(Constants.ENTERPRISE_WECHAT_SECRET);

    private static final String enterpriseWeChatTokenUrl = getString(Constants.ENTERPRISE_WECHAT_TOKEN_URL);
    private String enterpriseWeChatTokenUrlReplace = enterpriseWeChatTokenUrl
            .replaceAll("\\$corpId", enterpriseWeChatCorpId)
            .replaceAll("\\$secret", enterpriseWeChatSecret);

    private static final String enterpriseWeChatPushUrl = getString(Constants.ENTERPRISE_WECHAT_PUSH_URL);

    private static final String enterpriseWeChatTeamSendMsg = getString(Constants.ENTERPRISE_WECHAT_TEAM_SEND_MSG);

    private static final String enterpriseWeChatUserSendMsg = getString(Constants.ENTERPRISE_WECHAT_USER_SEND_MSG);

    /**
     * get winxin token info
     * @return token string info
     * @throws IOException
     */
    public String getToken() throws IOException {
        String resp;

        CloseableHttpClient httpClient = HttpClients.createDefault();
        HttpGet httpGet = new HttpGet(enterpriseWeChatTokenUrlReplace);
        CloseableHttpResponse response = httpClient.execute(httpGet);
        try {
            HttpEntity entity = response.getEntity();
            resp = EntityUtils.toString(entity, "utf-8");
            EntityUtils.consume(entity);
        } finally {
            response.close();
        }

        Map<String, Object> map = JSON.parseObject(resp,
                new TypeToken<Map<String, Object>>() {
                }.getType());
        return map.get("access_token").toString();
    }

    /**
     * make team single weixin message
     * @param toParty
     * @param agentId
     * @param msg
     * @return weixin send message
     */
    public String makeTeamSendMsg(String toParty, String agentId, String msg) {
        return enterpriseWeChatTeamSendMsg.replaceAll("\\$toParty", toParty)
                .replaceAll("\\$agentId", agentId)
                .replaceAll("\\$msg", msg);
    }

    /**
     * make team multi weixin message
     * @param toParty
     * @param agentId
     * @param msg
     * @return weixin send message
     */
    public String makeTeamSendMsg(Collection<String> toParty, String agentId, String msg) {
        String listParty = FuncUtils.mkString(toParty, "|");
        return enterpriseWeChatTeamSendMsg.replaceAll("\\$toParty", listParty)
                .replaceAll("\\$agentId", agentId)
                .replaceAll("\\$msg", msg);
    }

    /**
     * make team single user message
     * @param toUser
     * @param agentId
     * @param msg
     * @return weixin send message
     */
    public String makeUserSendMsg(String toUser, String agentId, String msg) {
        return enterpriseWeChatUserSendMsg.replaceAll("\\$toUser", toUser)
                .replaceAll("\\$agentId", agentId)
                .replaceAll("\\$msg", msg);
    }

    /**
     * make team multi user message
     * @param toUser
     * @param agentId
     * @param msg
     * @return weixin send message
     */
    public String makeUserSendMsg(Collection<String> toUser, String agentId, String msg) {
        String listUser = FuncUtils.mkString(toUser, "|");
        return enterpriseWeChatUserSendMsg.replaceAll("\\$toUser", listUser)
                .replaceAll("\\$agentId", agentId)
                .replaceAll("\\$msg", msg);
    }

    /**
     * send weixin
     * @param charset
     * @param data
     * @param token
     * @return weixin resp, demo: {"errcode":0,"errmsg":"ok","invaliduser":""}
     * @throws IOException
     */
    public String sendQiyeWeixin(String charset, String data, String token) throws IOException {
        String enterpriseWeChatPushUrlReplace = enterpriseWeChatPushUrl.replaceAll("\\$token", token);

        CloseableHttpClient httpclient = HttpClients.createDefault();
        HttpPost httpPost = new HttpPost(enterpriseWeChatPushUrlReplace);
        httpPost.setEntity(new StringEntity(data, charset));
        CloseableHttpResponse response = httpclient.execute(httpPost);
        String resp;
        try {
            HttpEntity entity = response.getEntity();
            resp = EntityUtils.toString(entity, charset);
            EntityUtils.consume(entity);
        } finally {
            response.close();
        }
        logger.info("qiye weixin send [{}], param:{}, resp:{}", enterpriseWeChatPushUrl, data, resp);
        return resp;
    }

}
+34 −0
Original line number Diff line number Diff line
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.escheduler.alert.utils;

public class FuncUtils {

    static public String mkString(Iterable<String> list, String split) {
        StringBuilder sb = new StringBuilder();
        boolean first = true;
        for (String item : list) {
            if (first)
                first = false;
            else
                sb.append(split);
            sb.append(item);
        }
        return sb.toString();
    }

}
+7 −0
Original line number Diff line number Diff line
@@ -16,6 +16,13 @@ mail.smtp.ssl.enable=true
#xls file path,need create if not exist
xls.file.path=/tmp/xls

# Enterprise WeChat configuration
enterprise.wechat.corp.id=xxxxxxx
enterprise.wechat.secret=xxxxxxx
enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpId&corpsecret=$secret
enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$token
enterprise.wechat.team.send.msg={\"toparty\":\"$toParty\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"}
enterprise.wechat.user.send.msg={\"touser\":\"$toUser\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"}


Loading