From 94a8ade8eddb1ea9bdc1d21c9aee43b2d39786d2 Mon Sep 17 00:00:00 2001 From: cdp <1> Date: Tue, 14 May 2019 16:45:19 +0800 Subject: [PATCH 01/18] Add qiye_weixin send feature. --- escheduler-alert/pom.xml | 11 ++ .../cn/escheduler/alert/utils/Constants.java | 12 ++ .../cn/escheduler/alert/utils/FuncUtils.java | 18 ++ .../alert/utils/QiyeWeixinUtils.java | 167 ++++++++++++++++++ .../src/main/resources/alert.properties | 7 + .../alert/utils/QiyeWeixinUtilsTest.java | 110 ++++++++++++ 6 files changed, 325 insertions(+) create mode 100644 escheduler-alert/src/main/java/cn/escheduler/alert/utils/FuncUtils.java create mode 100644 escheduler-alert/src/main/java/cn/escheduler/alert/utils/QiyeWeixinUtils.java create mode 100644 escheduler-alert/src/test/java/cn/escheduler/alert/utils/QiyeWeixinUtilsTest.java diff --git a/escheduler-alert/pom.xml b/escheduler-alert/pom.xml index f066bc4b72..cb7d2db4bc 100644 --- a/escheduler-alert/pom.xml +++ b/escheduler-alert/pom.xml @@ -102,6 +102,17 @@ escheduler-dao + + org.apache.httpcomponents + httpcore + 4.4.6 + + + org.apache.httpcomponents + httpclient + 4.5.5 + + diff --git a/escheduler-alert/src/main/java/cn/escheduler/alert/utils/Constants.java b/escheduler-alert/src/main/java/cn/escheduler/alert/utils/Constants.java index 07d1866a5d..cc2530826f 100644 --- a/escheduler-alert/src/main/java/cn/escheduler/alert/utils/Constants.java +++ b/escheduler-alert/src/main/java/cn/escheduler/alert/utils/Constants.java @@ -126,4 +126,16 @@ public class Constants { public static final String TH_END = ""; public static final int ALERT_SCAN_INTERVEL = 5000; + + public static final String QIYE_WEIXIN_CORP_ID = "qiye.weixin.corp.id"; + + public static final String QIYE_WEIXIN_SECRET = "qiye.weixin.secret"; + + public static final String QIYE_WEIXIN_TOKEN_URL = "qiye.weixin.token.url"; + + public static final String QIYE_WEIXIN_PUSH_URL = "qiye.weixin.push.url"; + + public static final String QIYE_WEIXIN_TEAM_SEND_MSG = "qiye.weixin.team.send.msg"; + + public static final String QIYE_WEIXIN_USER_SEND_MSG = "qiye.weixin.user.send.msg"; } diff --git a/escheduler-alert/src/main/java/cn/escheduler/alert/utils/FuncUtils.java b/escheduler-alert/src/main/java/cn/escheduler/alert/utils/FuncUtils.java new file mode 100644 index 0000000000..c973e1de9b --- /dev/null +++ b/escheduler-alert/src/main/java/cn/escheduler/alert/utils/FuncUtils.java @@ -0,0 +1,18 @@ +package cn.escheduler.alert.utils; + +public class FuncUtils { + + static public String mkString(Iterable list, String split) { + StringBuilder sb = new StringBuilder(); + boolean first = true; + for (String item : list) { + if (first) + first = false; + else + sb.append(split); + sb.append(item); + } + return sb.toString(); + } + +} diff --git a/escheduler-alert/src/main/java/cn/escheduler/alert/utils/QiyeWeixinUtils.java b/escheduler-alert/src/main/java/cn/escheduler/alert/utils/QiyeWeixinUtils.java new file mode 100644 index 0000000000..5af6baa4fc --- /dev/null +++ b/escheduler-alert/src/main/java/cn/escheduler/alert/utils/QiyeWeixinUtils.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.alert.utils; + +import com.alibaba.fastjson.JSON; + +import com.google.common.reflect.TypeToken; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +import static cn.escheduler.alert.utils.PropertyUtils.getString; + +/** + * qiye weixin utils + */ +public class QiyeWeixinUtils { + + public static final Logger logger = LoggerFactory.getLogger(QiyeWeixinUtils.class); + + private static final String qiyeWeixinCorpId = getString(Constants.QIYE_WEIXIN_CORP_ID); + + private static final String qiyeWeixinSecret = getString(Constants.QIYE_WEIXIN_SECRET); + + private static final String qiyeWeixinTokenUrl = getString(Constants.QIYE_WEIXIN_TOKEN_URL); + private String qiyeWeixinTokenUrlReplace = qiyeWeixinTokenUrl + .replaceAll("\\$weixinCorpId", qiyeWeixinCorpId) + .replaceAll("\\$weixinSecret", qiyeWeixinSecret); + + private static final String qiyeWeixinPushUrl = getString(Constants.QIYE_WEIXIN_PUSH_URL); + + private static final String qiyeWeixinTeamSendMsg = getString(Constants.QIYE_WEIXIN_TEAM_SEND_MSG); + + private static final String qiyeWeixinUserSendMsg = getString(Constants.QIYE_WEIXIN_USER_SEND_MSG); + + /** + * get winxin token info + * @return token string info + * @throws IOException + */ + public String getToken() throws IOException { + String resp; + + CloseableHttpClient httpClient = HttpClients.createDefault(); + HttpGet httpGet = new HttpGet(qiyeWeixinTokenUrlReplace); + CloseableHttpResponse response = httpClient.execute(httpGet); + try { + HttpEntity entity = response.getEntity(); + resp = EntityUtils.toString(entity, "utf-8"); + EntityUtils.consume(entity); + } finally { + response.close(); + } + + Map map = JSON.parseObject(resp, + new TypeToken>() { + }.getType()); + return map.get("access_token").toString(); + } + + /** + * make team single weixin message + * @param toParty + * @param agentId + * @param msg + * @return weixin send message + */ + public String makeTeamSendMsg(String toParty, String agentId, String msg) { + return qiyeWeixinTeamSendMsg.replaceAll("\\$toParty", toParty) + .replaceAll("\\$agentId", agentId) + .replaceAll("\\$msg", msg); + } + + /** + * make team multi weixin message + * @param toParty + * @param agentId + * @param msg + * @return weixin send message + */ + public String makeTeamSendMsg(Collection toParty, String agentId, String msg) { + String listParty = FuncUtils.mkString(toParty, "|"); + return qiyeWeixinTeamSendMsg.replaceAll("\\$toParty", listParty) + .replaceAll("\\$agentId", agentId) + .replaceAll("\\$msg", msg); + } + + /** + * make team single user message + * @param toUser + * @param agentId + * @param msg + * @return weixin send message + */ + public String makeUserSendMsg(String toUser, String agentId, String msg) { + return qiyeWeixinUserSendMsg.replaceAll("\\$toUser", toUser) + .replaceAll("\\$agentId", agentId) + .replaceAll("\\$msg", msg); + } + + /** + * make team multi user message + * @param toUser + * @param agentId + * @param msg + * @return weixin send message + */ + public String makeUserSendMsg(Collection toUser, String agentId, String msg) { + String listUser = FuncUtils.mkString(toUser, "|"); + return qiyeWeixinUserSendMsg.replaceAll("\\$toUser", listUser) + .replaceAll("\\$agentId", agentId) + .replaceAll("\\$msg", msg); + } + + /** + * send weixin + * @param charset + * @param data + * @param token + * @return weixin resp, demo: {"errcode":0,"errmsg":"ok","invaliduser":""} + * @throws IOException + */ + public String sendQiyeWeixin(String charset, String data, String token) throws IOException { + String qiyeWeixinPushUrlReplace = qiyeWeixinPushUrl.replaceAll("\\$weixinToken", token); + + CloseableHttpClient httpclient = HttpClients.createDefault(); + HttpPost httpPost = new HttpPost(qiyeWeixinPushUrlReplace); + httpPost.setEntity(new StringEntity(data, charset)); + CloseableHttpResponse response = httpclient.execute(httpPost); + String resp; + try { + HttpEntity entity = response.getEntity(); + resp = EntityUtils.toString(entity, charset); + EntityUtils.consume(entity); + } finally { + response.close(); + } + logger.info("qiye weixin send [{}], param:{}, resp:{}", qiyeWeixinPushUrl, data, resp); + return resp; + } + +} diff --git a/escheduler-alert/src/main/resources/alert.properties b/escheduler-alert/src/main/resources/alert.properties index e2cba1160d..31265c2200 100644 --- a/escheduler-alert/src/main/resources/alert.properties +++ b/escheduler-alert/src/main/resources/alert.properties @@ -11,6 +11,13 @@ mail.passwd=xxxxxxx #xls file path,need create if not exist xls.file.path=/opt/xls +# qiye weixin configuration +qiye.weixin.corp.id=xxxxxxx +qiye.weixin.secret=xxxxxxx +qiye.weixin.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$weixinCorpId&corpsecret=$weixinSecret +qiye.weixin.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$weixinToken +qiye.weixin.team.send.msg={\"toparty\":\"$toParty\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"} +qiye.weixin.user.send.msg={\"touser\":\"$toUser\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"} diff --git a/escheduler-alert/src/test/java/cn/escheduler/alert/utils/QiyeWeixinUtilsTest.java b/escheduler-alert/src/test/java/cn/escheduler/alert/utils/QiyeWeixinUtilsTest.java new file mode 100644 index 0000000000..051f3bccd1 --- /dev/null +++ b/escheduler-alert/src/test/java/cn/escheduler/alert/utils/QiyeWeixinUtilsTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.alert.utils; + +import org.junit.Assert; +import org.junit.Test; + +import com.alibaba.fastjson.JSON; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +/** + * Please manually modify the configuration file before testing. + * file: alert.properties + * qiye.weixin.corp.id + * qiye.weixin.secret + * qiye.weixin.token.url + * qiye.weixin.push.url + * qiye.weixin.send.msg + */ +public class QiyeWeixinUtilsTest { + + // Please change + private String agentId = "1000002"; // app id + private String partyId = "2"; + private Collection listPartyId = Arrays.asList("2","4"); + private String userId = "test1"; + private Collection listUserId = Arrays.asList("test1","test2"); + + @Test + public void testSendSingleTeamWeixin() { + QiyeWeixinUtils wx = new QiyeWeixinUtils(); + + try { + String token = wx.getToken(); + String msg = wx.makeTeamSendMsg(partyId, agentId, "hello world"); + String resp = wx.sendQiyeWeixin("utf-8", msg, token); + + String errmsg = JSON.parseObject(resp).getString("errmsg"); + Assert.assertEquals(errmsg, "ok"); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Test + public void testSendMultiTeamWeixin() { + QiyeWeixinUtils wx = new QiyeWeixinUtils(); + + try { + String token = wx.getToken(); + String msg = wx.makeTeamSendMsg(listPartyId, agentId, "hello world"); + String resp = wx.sendQiyeWeixin("utf-8", msg, token); + + String errmsg = JSON.parseObject(resp).getString("errmsg"); + Assert.assertEquals(errmsg, "ok"); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Test + public void testSendSingleUserWeixin() { + QiyeWeixinUtils wx = new QiyeWeixinUtils(); + + try { + String token = wx.getToken(); + String msg = wx.makeUserSendMsg(userId, agentId, "hello world"); + String resp = wx.sendQiyeWeixin("utf-8", msg, token); + + String errmsg = JSON.parseObject(resp).getString("errmsg"); + Assert.assertEquals(errmsg, "ok"); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Test + public void testSendMultiUserWeixin() { + QiyeWeixinUtils wx = new QiyeWeixinUtils(); + + try { + String token = wx.getToken(); + String msg = wx.makeUserSendMsg(listUserId, agentId, "hello world"); + String resp = wx.sendQiyeWeixin("utf-8", msg, token); + + String errmsg = JSON.parseObject(resp).getString("errmsg"); + Assert.assertEquals(errmsg, "ok"); + } catch (IOException e) { + e.printStackTrace(); + } + } + +} From e76af95910112ffcbebce457ad0812fc0ca472fd Mon Sep 17 00:00:00 2001 From: chendapao Date: Tue, 14 May 2019 16:57:31 +0800 Subject: [PATCH 02/18] Add License --- .../cn/escheduler/alert/utils/FuncUtils.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/escheduler-alert/src/main/java/cn/escheduler/alert/utils/FuncUtils.java b/escheduler-alert/src/main/java/cn/escheduler/alert/utils/FuncUtils.java index c973e1de9b..b4238beef0 100644 --- a/escheduler-alert/src/main/java/cn/escheduler/alert/utils/FuncUtils.java +++ b/escheduler-alert/src/main/java/cn/escheduler/alert/utils/FuncUtils.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package cn.escheduler.alert.utils; public class FuncUtils { From 1d6241725603dc761d44ec126203d6fc417b6aea Mon Sep 17 00:00:00 2001 From: chendapao Date: Tue, 14 May 2019 17:25:37 +0800 Subject: [PATCH 03/18] Change name --- .../cn/escheduler/alert/utils/Constants.java | 12 +++--- ...nUtils.java => EnterpriseWeChatUtils.java} | 38 +++++++++---------- .../src/main/resources/alert.properties | 14 +++---- ...st.java => EnterpriseWeChatUtilsTest.java} | 28 +++++++------- 4 files changed, 46 insertions(+), 46 deletions(-) rename escheduler-alert/src/main/java/cn/escheduler/alert/utils/{QiyeWeixinUtils.java => EnterpriseWeChatUtils.java} (75%) rename escheduler-alert/src/test/java/cn/escheduler/alert/utils/{QiyeWeixinUtilsTest.java => EnterpriseWeChatUtilsTest.java} (82%) diff --git a/escheduler-alert/src/main/java/cn/escheduler/alert/utils/Constants.java b/escheduler-alert/src/main/java/cn/escheduler/alert/utils/Constants.java index cc2530826f..9516473697 100644 --- a/escheduler-alert/src/main/java/cn/escheduler/alert/utils/Constants.java +++ b/escheduler-alert/src/main/java/cn/escheduler/alert/utils/Constants.java @@ -127,15 +127,15 @@ public class Constants { public static final int ALERT_SCAN_INTERVEL = 5000; - public static final String QIYE_WEIXIN_CORP_ID = "qiye.weixin.corp.id"; + public static final String ENTERPRISE_WECHAT_CORP_ID = "enterprise.wechat.corp.id"; - public static final String QIYE_WEIXIN_SECRET = "qiye.weixin.secret"; + public static final String ENTERPRISE_WECHAT_SECRET = "enterprise.wechat.secret"; - public static final String QIYE_WEIXIN_TOKEN_URL = "qiye.weixin.token.url"; + public static final String ENTERPRISE_WECHAT_TOKEN_URL = "enterprise.wechat.token.url"; - public static final String QIYE_WEIXIN_PUSH_URL = "qiye.weixin.push.url"; + public static final String ENTERPRISE_WECHAT_PUSH_URL = "enterprise.wechat.push.url"; - public static final String QIYE_WEIXIN_TEAM_SEND_MSG = "qiye.weixin.team.send.msg"; + public static final String ENTERPRISE_WECHAT_TEAM_SEND_MSG = "enterprise.wechat.team.send.msg"; - public static final String QIYE_WEIXIN_USER_SEND_MSG = "qiye.weixin.user.send.msg"; + public static final String ENTERPRISE_WECHAT_USER_SEND_MSG = "enterprise.wechat.user.send.msg"; } diff --git a/escheduler-alert/src/main/java/cn/escheduler/alert/utils/QiyeWeixinUtils.java b/escheduler-alert/src/main/java/cn/escheduler/alert/utils/EnterpriseWeChatUtils.java similarity index 75% rename from escheduler-alert/src/main/java/cn/escheduler/alert/utils/QiyeWeixinUtils.java rename to escheduler-alert/src/main/java/cn/escheduler/alert/utils/EnterpriseWeChatUtils.java index 5af6baa4fc..169977b01e 100644 --- a/escheduler-alert/src/main/java/cn/escheduler/alert/utils/QiyeWeixinUtils.java +++ b/escheduler-alert/src/main/java/cn/escheduler/alert/utils/EnterpriseWeChatUtils.java @@ -39,24 +39,24 @@ import static cn.escheduler.alert.utils.PropertyUtils.getString; /** * qiye weixin utils */ -public class QiyeWeixinUtils { +public class EnterpriseWeChatUtils { - public static final Logger logger = LoggerFactory.getLogger(QiyeWeixinUtils.class); + public static final Logger logger = LoggerFactory.getLogger(EnterpriseWeChatUtils.class); - private static final String qiyeWeixinCorpId = getString(Constants.QIYE_WEIXIN_CORP_ID); + private static final String enterpriseWeChatCorpId = getString(Constants.ENTERPRISE_WECHAT_CORP_ID); - private static final String qiyeWeixinSecret = getString(Constants.QIYE_WEIXIN_SECRET); + private static final String enterpriseWeChatSecret = getString(Constants.ENTERPRISE_WECHAT_SECRET); - private static final String qiyeWeixinTokenUrl = getString(Constants.QIYE_WEIXIN_TOKEN_URL); - private String qiyeWeixinTokenUrlReplace = qiyeWeixinTokenUrl - .replaceAll("\\$weixinCorpId", qiyeWeixinCorpId) - .replaceAll("\\$weixinSecret", qiyeWeixinSecret); + private static final String enterpriseWeChatTokenUrl = getString(Constants.ENTERPRISE_WECHAT_TOKEN_URL); + private String enterpriseWeChatTokenUrlReplace = enterpriseWeChatTokenUrl + .replaceAll("\\$corpId", enterpriseWeChatCorpId) + .replaceAll("\\$secret", enterpriseWeChatSecret); - private static final String qiyeWeixinPushUrl = getString(Constants.QIYE_WEIXIN_PUSH_URL); + private static final String enterpriseWeChatPushUrl = getString(Constants.ENTERPRISE_WECHAT_PUSH_URL); - private static final String qiyeWeixinTeamSendMsg = getString(Constants.QIYE_WEIXIN_TEAM_SEND_MSG); + private static final String enterpriseWeChatTeamSendMsg = getString(Constants.ENTERPRISE_WECHAT_TEAM_SEND_MSG); - private static final String qiyeWeixinUserSendMsg = getString(Constants.QIYE_WEIXIN_USER_SEND_MSG); + private static final String enterpriseWeChatUserSendMsg = getString(Constants.ENTERPRISE_WECHAT_USER_SEND_MSG); /** * get winxin token info @@ -67,7 +67,7 @@ public class QiyeWeixinUtils { String resp; CloseableHttpClient httpClient = HttpClients.createDefault(); - HttpGet httpGet = new HttpGet(qiyeWeixinTokenUrlReplace); + HttpGet httpGet = new HttpGet(enterpriseWeChatTokenUrlReplace); CloseableHttpResponse response = httpClient.execute(httpGet); try { HttpEntity entity = response.getEntity(); @@ -91,7 +91,7 @@ public class QiyeWeixinUtils { * @return weixin send message */ public String makeTeamSendMsg(String toParty, String agentId, String msg) { - return qiyeWeixinTeamSendMsg.replaceAll("\\$toParty", toParty) + return enterpriseWeChatTeamSendMsg.replaceAll("\\$toParty", toParty) .replaceAll("\\$agentId", agentId) .replaceAll("\\$msg", msg); } @@ -105,7 +105,7 @@ public class QiyeWeixinUtils { */ public String makeTeamSendMsg(Collection toParty, String agentId, String msg) { String listParty = FuncUtils.mkString(toParty, "|"); - return qiyeWeixinTeamSendMsg.replaceAll("\\$toParty", listParty) + return enterpriseWeChatTeamSendMsg.replaceAll("\\$toParty", listParty) .replaceAll("\\$agentId", agentId) .replaceAll("\\$msg", msg); } @@ -118,7 +118,7 @@ public class QiyeWeixinUtils { * @return weixin send message */ public String makeUserSendMsg(String toUser, String agentId, String msg) { - return qiyeWeixinUserSendMsg.replaceAll("\\$toUser", toUser) + return enterpriseWeChatUserSendMsg.replaceAll("\\$toUser", toUser) .replaceAll("\\$agentId", agentId) .replaceAll("\\$msg", msg); } @@ -132,7 +132,7 @@ public class QiyeWeixinUtils { */ public String makeUserSendMsg(Collection toUser, String agentId, String msg) { String listUser = FuncUtils.mkString(toUser, "|"); - return qiyeWeixinUserSendMsg.replaceAll("\\$toUser", listUser) + return enterpriseWeChatUserSendMsg.replaceAll("\\$toUser", listUser) .replaceAll("\\$agentId", agentId) .replaceAll("\\$msg", msg); } @@ -146,10 +146,10 @@ public class QiyeWeixinUtils { * @throws IOException */ public String sendQiyeWeixin(String charset, String data, String token) throws IOException { - String qiyeWeixinPushUrlReplace = qiyeWeixinPushUrl.replaceAll("\\$weixinToken", token); + String enterpriseWeChatPushUrlReplace = enterpriseWeChatPushUrl.replaceAll("\\$token", token); CloseableHttpClient httpclient = HttpClients.createDefault(); - HttpPost httpPost = new HttpPost(qiyeWeixinPushUrlReplace); + HttpPost httpPost = new HttpPost(enterpriseWeChatPushUrlReplace); httpPost.setEntity(new StringEntity(data, charset)); CloseableHttpResponse response = httpclient.execute(httpPost); String resp; @@ -160,7 +160,7 @@ public class QiyeWeixinUtils { } finally { response.close(); } - logger.info("qiye weixin send [{}], param:{}, resp:{}", qiyeWeixinPushUrl, data, resp); + logger.info("qiye weixin send [{}], param:{}, resp:{}", enterpriseWeChatPushUrl, data, resp); return resp; } diff --git a/escheduler-alert/src/main/resources/alert.properties b/escheduler-alert/src/main/resources/alert.properties index 31265c2200..09bd286b82 100644 --- a/escheduler-alert/src/main/resources/alert.properties +++ b/escheduler-alert/src/main/resources/alert.properties @@ -11,13 +11,13 @@ mail.passwd=xxxxxxx #xls file path,need create if not exist xls.file.path=/opt/xls -# qiye weixin configuration -qiye.weixin.corp.id=xxxxxxx -qiye.weixin.secret=xxxxxxx -qiye.weixin.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$weixinCorpId&corpsecret=$weixinSecret -qiye.weixin.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$weixinToken -qiye.weixin.team.send.msg={\"toparty\":\"$toParty\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"} -qiye.weixin.user.send.msg={\"touser\":\"$toUser\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"} +# Enterprise WeChat configuration +enterprise.wechat.corp.id=xxxxxxx +enterprise.wechat.secret=xxxxxxx +enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpId&corpsecret=$secret +enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$token +enterprise.wechat.team.send.msg={\"toparty\":\"$toParty\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"} +enterprise.wechat.user.send.msg={\"touser\":\"$toUser\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"} diff --git a/escheduler-alert/src/test/java/cn/escheduler/alert/utils/QiyeWeixinUtilsTest.java b/escheduler-alert/src/test/java/cn/escheduler/alert/utils/EnterpriseWeChatUtilsTest.java similarity index 82% rename from escheduler-alert/src/test/java/cn/escheduler/alert/utils/QiyeWeixinUtilsTest.java rename to escheduler-alert/src/test/java/cn/escheduler/alert/utils/EnterpriseWeChatUtilsTest.java index 051f3bccd1..2a45196aed 100644 --- a/escheduler-alert/src/test/java/cn/escheduler/alert/utils/QiyeWeixinUtilsTest.java +++ b/escheduler-alert/src/test/java/cn/escheduler/alert/utils/EnterpriseWeChatUtilsTest.java @@ -28,13 +28,13 @@ import java.util.Collection; /** * Please manually modify the configuration file before testing. * file: alert.properties - * qiye.weixin.corp.id - * qiye.weixin.secret - * qiye.weixin.token.url - * qiye.weixin.push.url - * qiye.weixin.send.msg + * enterprise.wechat.corp.id + * enterprise.wechat.secret + * enterprise.wechat.token.url + * enterprise.wechat.push.url + * enterprise.wechat.send.msg */ -public class QiyeWeixinUtilsTest { +public class EnterpriseWeChatUtilsTest { // Please change private String agentId = "1000002"; // app id @@ -44,8 +44,8 @@ public class QiyeWeixinUtilsTest { private Collection listUserId = Arrays.asList("test1","test2"); @Test - public void testSendSingleTeamWeixin() { - QiyeWeixinUtils wx = new QiyeWeixinUtils(); + public void testSendSingleTeamWeChat() { + EnterpriseWeChatUtils wx = new EnterpriseWeChatUtils(); try { String token = wx.getToken(); @@ -60,8 +60,8 @@ public class QiyeWeixinUtilsTest { } @Test - public void testSendMultiTeamWeixin() { - QiyeWeixinUtils wx = new QiyeWeixinUtils(); + public void testSendMultiTeamWeChat() { + EnterpriseWeChatUtils wx = new EnterpriseWeChatUtils(); try { String token = wx.getToken(); @@ -76,8 +76,8 @@ public class QiyeWeixinUtilsTest { } @Test - public void testSendSingleUserWeixin() { - QiyeWeixinUtils wx = new QiyeWeixinUtils(); + public void testSendSingleUserWeChat() { + EnterpriseWeChatUtils wx = new EnterpriseWeChatUtils(); try { String token = wx.getToken(); @@ -92,8 +92,8 @@ public class QiyeWeixinUtilsTest { } @Test - public void testSendMultiUserWeixin() { - QiyeWeixinUtils wx = new QiyeWeixinUtils(); + public void testSendMultiUserWeChat() { + EnterpriseWeChatUtils wx = new EnterpriseWeChatUtils(); try { String token = wx.getToken(); From 1f8cc8a80727cdedb8112c251b46749a2f30a296 Mon Sep 17 00:00:00 2001 From: lidongdai Date: Mon, 27 May 2019 18:09:50 +0800 Subject: [PATCH 04/18] add api docs --- .../api/controller/ExecutorController.java | 51 +++++++++++++++---- 1 file changed, 42 insertions(+), 9 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java index d6872a278c..1938644724 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java @@ -24,7 +24,7 @@ import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.Result; import cn.escheduler.common.enums.*; import cn.escheduler.dao.model.User; -import io.swagger.annotations.Api; +import io.swagger.annotations.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -38,9 +38,9 @@ import static cn.escheduler.api.enums.Status.*; /** - * execute task controller + * execute process controller */ -@ApiIgnore +@Api(tags = "PROCESS_INSTANCE_EXECUTOR_TAG", position = 1) @RestController @RequestMapping("projects/{projectName}/executors") public class ExecutorController extends BaseController { @@ -53,10 +53,27 @@ public class ExecutorController extends BaseController { /** * execute process instance */ + @ApiOperation(value = "startProcessInstance", notes= "RUN_PROCESS_INSTANCE_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String"), + @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType ="FailureStrategy"), + @ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType ="String"), + @ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType ="TaskDependType"), + @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType ="CommandType"), + @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE",required = true, dataType ="WarningType"), + @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID",required = true, dataType ="Int", example = "100"), + @ApiImplicitParam(name = "receivers", value = "RECEIVERS",dataType ="String" ), + @ApiImplicitParam(name = "receiversCc", value = "RECEIVERS_CC",dataType ="String" ), + @ApiImplicitParam(name = "runMode", value = "RUN_MODE",dataType ="RunMode" ), + @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority" ), + @ApiImplicitParam(name = "workerGroupId", value = "WORKER_GROUP_ID", dataType = "Int",example = "100"), + @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int",example = "100"), + }) @PostMapping(value = "start-process-instance") @ResponseStatus(HttpStatus.OK) - public Result startProcessInstance(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @PathVariable String projectName, + public Result startProcessInstance(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, @RequestParam(value = "processDefinitionId") int processDefinitionId, @RequestParam(value = "scheduleTime", required = false) String scheduleTime, @RequestParam(value = "failureStrategy", required = true) FailureStrategy failureStrategy, @@ -102,10 +119,15 @@ public class ExecutorController extends BaseController { * @param processInstanceId * @return */ + @ApiOperation(value = "execute", notes= "EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType") + }) @PostMapping(value = "/execute") @ResponseStatus(HttpStatus.OK) - public Result execute(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @PathVariable String projectName, + public Result execute(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, @RequestParam("processInstanceId") Integer processInstanceId, @RequestParam("executeType") ExecuteType executeType ) { @@ -127,9 +149,13 @@ public class ExecutorController extends BaseController { * @param processDefinitionId * @return */ + @ApiOperation(value = "startCheckProcessDefinition", notes= "START_CHECK_PROCESS_DEFINITION_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100") + }) @PostMapping(value = "/start-check") @ResponseStatus(HttpStatus.OK) - public Result startCheckProcessDefinition(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, + public Result startCheckProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam(value = "processDefinitionId") int processDefinitionId) { logger.info("login user {}, check process definition", loginUser.getUserName(), processDefinitionId); try { @@ -149,9 +175,16 @@ public class ExecutorController extends BaseController { * @param processDefinitionId * @return */ + @ApiIgnore + @ApiOperation(value = "getReceiverCc", notes= "GET_RECEIVER_CC_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100") + + }) @GetMapping(value = "/get-receiver-cc") @ResponseStatus(HttpStatus.OK) - public Result getReceiverCc(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, + public Result getReceiverCc(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam(value = "processDefinitionId",required = false) Integer processDefinitionId, @RequestParam(value = "processInstanceId",required = false) Integer processInstanceId) { logger.info("login user {}, get process definition receiver and cc", loginUser.getUserName()); From 027c209d27b44bbeceaa65be9e24c63cf7ce6ff2 Mon Sep 17 00:00:00 2001 From: bao liang <29528966+lenboo@users.noreply.github.com> Date: Wed, 5 Jun 2019 11:36:02 +0800 Subject: [PATCH 05/18] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 19c1d2e45b..ffd8dcf396 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ Easy Scheduler ### 近期研发计划 -EasyScheduler的工作计划:研发计划 ,其中 In Develop卡片下是1.0.2版本的功能,TODO卡片是待做事项(包括 feature ideas) +EasyScheduler的工作计划:研发计划 ,其中 In Develop卡片下是1.1.0版本的功能,TODO卡片是待做事项(包括 feature ideas) ### 贡献代码 From f1665bb3bfede7fe2d849c97baad794c5302fefc Mon Sep 17 00:00:00 2001 From: mywiki <394907365@qq.com> Date: Thu, 6 Jun 2019 16:33:29 +0800 Subject: [PATCH 06/18] =?UTF-8?q?=E9=82=AE=E7=AE=B1=E6=A0=BC=E5=BC=8F?= =?UTF-8?q?=E7=9A=84=E9=AA=8C=E8=AF=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pages/projects/pages/definition/pages/list/_source/util.js | 2 +- .../conf/home/pages/security/pages/users/_source/createUser.vue | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js b/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js index db6c8aa261..2259dea9cd 100644 --- a/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js +++ b/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js @@ -37,7 +37,7 @@ let warningTypeList = [ ] const isEmial = (val) => { - let regEmail = /^([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line + let regEmail = /^([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line return regEmail.test(val) } diff --git a/escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue b/escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue index 9d3ab042d8..323f224198 100644 --- a/escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue +++ b/escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue @@ -129,7 +129,7 @@ } }, _verification () { - let regEmail = /^([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line + let regEmail = /^([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line // Mobile phone number regular let regPhone = /^1(3|4|5|6|7|8)\d{9}$/; // eslint-disable-line From d7af3678eee75d864cfcbe8a4618829e547539c7 Mon Sep 17 00:00:00 2001 From: lidongdai Date: Mon, 10 Jun 2019 16:40:26 +0800 Subject: [PATCH 07/18] add info: master send heartbeat to zk failed: can't find zookeeper regist path of master server --- .../src/main/java/cn/escheduler/server/master/MasterServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java b/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java index e137824814..bf0dcbfe75 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java @@ -216,7 +216,7 @@ public class MasterServer implements CommandLineRunner, IStoppable { if(Stopper.isRunning()) { // send heartbeat to zk if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) { - logger.error("master send heartbeat to zk failed"); + logger.error("master send heartbeat to zk failed: can't find zookeeper regist path of master server"); return; } From a964e2014a5a7e265a909b8cdd8dc953cb5a59de Mon Sep 17 00:00:00 2001 From: lidongdai Date: Mon, 17 Jun 2019 15:40:55 +0800 Subject: [PATCH 08/18] delete nouse code --- .../main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java index 1b39cb1e8d..c3be8030ed 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java @@ -118,9 +118,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value; String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value)); -// String path = conf.getString(Constants.ZOOKEEPER_SCHEDULER_ROOT) + Constants.SINGLE_SLASH + Constants.SCHEDULER_TASKS_QUEUE + "_add" + Constants.SINGLE_SLASH + value; -// getZkClient().create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, -// Bytes.toBytes(value)); logger.info("add task : {} to tasks queue , result success",result); } catch (Exception e) { logger.error("add task to tasks queue exception",e); From 93acff187ec74344ad995b6358f92ed349b78c4a Mon Sep 17 00:00:00 2001 From: xiaobiesan Date: Thu, 20 Jun 2019 15:23:42 +0800 Subject: [PATCH 09/18] Fix naming errors.js/conf/home/pages/monitor/pages/servers/master.vue/work.vue --- .../src/js/conf/home/pages/monitor/pages/servers/master.vue | 4 ++-- .../src/js/conf/home/pages/monitor/pages/servers/worker.vue | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue b/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue index 22d48782d2..914bab2812 100644 --- a/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue +++ b/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue @@ -6,7 +6,7 @@
IP: {{item.host}} - {{$t('Port')}}: {{item.port}} + {{$t('Process Pid')}}: {{item.port}} {{$t('Zk registration directory')}}: {{item.zkDirectory}}
@@ -93,4 +93,4 @@ \ No newline at end of file + diff --git a/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue b/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue index 3cf0993415..960beeb14a 100644 --- a/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue +++ b/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue @@ -6,7 +6,7 @@
IP: {{item.host}} - {{$t('Port')}}: {{item.port}} + {{$t('Process Pid')}}: {{item.port}} {{$t('Zk registration directory')}}: {{item.zkDirectory}}
@@ -94,4 +94,4 @@ \ No newline at end of file + From 76323482be0c5868e56a832956d8a27f5a060d2e Mon Sep 17 00:00:00 2001 From: lidongdai Date: Mon, 24 Jun 2019 11:40:23 +0800 Subject: [PATCH 10/18] remove master cpu load avg --- install.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/install.sh b/install.sh index 9023af86be..c652f237b4 100644 --- a/install.sh +++ b/install.sh @@ -268,7 +268,7 @@ sed -i ${txt} "s#master.exec.task.number.*#master.exec.task.number=${masterExecT sed -i ${txt} "s#master.heartbeat.interval.*#master.heartbeat.interval=${masterHeartbeatInterval}#g" conf/master.properties sed -i ${txt} "s#master.task.commit.retryTimes.*#master.task.commit.retryTimes=${masterTaskCommitRetryTimes}#g" conf/master.properties sed -i ${txt} "s#master.task.commit.interval.*#master.task.commit.interval=${masterTaskCommitInterval}#g" conf/master.properties -sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties +#sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties sed -i ${txt} "s#master.reserved.memory.*#master.reserved.memory=${masterReservedMemory}#g" conf/master.properties From d83e0b6317cd12d9bc37f02c89800fc73cb3928b Mon Sep 17 00:00:00 2001 From: "pengdou@analysys.com.cn" Date: Wed, 26 Jun 2019 19:50:35 +0800 Subject: [PATCH 11/18] add ip utils class --- .../cn/escheduler/common/utils/IpUtils.java | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java diff --git a/escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java new file mode 100644 index 0000000000..ddc520a876 --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.utils; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * http utils + */ +public class IpUtils { + + private static final Logger logger = LoggerFactory.getLogger(IpUtils.class); + public static final String DOT = "."; + + /** + * ip str to long

+ * + * @param ipStr ip string + */ + public static Long ipToLong(String ipStr) { + String[] ipSet = ipStr.split("\\" + DOT); + + return Long.parseLong(ipSet[0]) << 24 | Long.parseLong(ipSet[1]) << 16 | Long.parseLong(ipSet[2]) << 8 | Long.parseLong(ipSet[3]); + } + + /** + * long to ip + * @param ipLong the long number converted from IP + * @return String + */ + public static String longToIp(long ipLong) { + long[] ipNumbers = new long[4]; + long tmp = 0xFF; + ipNumbers[0] = ipLong >> 24 & tmp; + ipNumbers[1] = ipLong >> 16 & tmp; + ipNumbers[2] = ipLong >> 8 & tmp; + ipNumbers[3] = ipLong & tmp; + + StringBuilder sb = new StringBuilder(16); + sb.append(ipNumbers[0]).append(DOT) + .append(ipNumbers[1]).append(DOT) + .append(ipNumbers[2]).append(DOT) + .append(ipNumbers[3]); + return sb.toString(); + } + + + + public static void main(String[] args){ + long ipLong = ipToLong("11.3.4.5"); + logger.info(longToIp(ipLong)); + } +} From c8906686d414363b7066c2e33221ce2172cbe781 Mon Sep 17 00:00:00 2001 From: lidongdai Date: Thu, 27 Jun 2019 20:48:20 +0800 Subject: [PATCH 12/18] =?UTF-8?q?=E4=BC=98=E5=8C=96worker=20server?= =?UTF-8?q?=E8=8E=B7=E5=8F=96=E4=BB=BB=E5=8A=A1=E7=9A=84=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/alert.properties | 6 +- .../java/cn/escheduler/common/Constants.java | 8 +- .../escheduler/common/queue/ITaskQueue.java | 9 +- .../common/queue/TaskQueueFactory.java | 2 +- .../common/queue/TaskQueueZkImpl.java | 100 ++++++----- .../common/zk/AbstractZKClient.java | 6 +- .../cn/escheduler/common/os/OSUtilsTest.java | 6 + .../common/queue/TaskQueueImplTest.java | 8 +- .../java/cn/escheduler/dao/ProcessDao.java | 74 +++++++- .../server/worker/runner/FetchTaskThread.java | 163 ++++++++++-------- 10 files changed, 249 insertions(+), 133 deletions(-) diff --git a/escheduler-alert/src/main/resources/alert.properties b/escheduler-alert/src/main/resources/alert.properties index e2cba1160d..ef9ef68e76 100644 --- a/escheduler-alert/src/main/resources/alert.properties +++ b/escheduler-alert/src/main/resources/alert.properties @@ -3,10 +3,10 @@ alert.type=EMAIL # mail server configuration mail.protocol=SMTP -mail.server.host=smtp.exmail.qq.com +mail.server.host=smtp.163.com mail.server.port=25 -mail.sender=xxxxxxx -mail.passwd=xxxxxxx +mail.sender=d66380022@163.com +mail.passwd=qwertyuiop123 #xls file path,need create if not exist xls.file.path=/opt/xls diff --git a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java index abf29fdc99..2c5b9d945d 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java @@ -192,6 +192,11 @@ public final class Constants { */ public static final String SEMICOLON = ";"; + /** + * DOT . + */ + public static final String DOT = "."; + /** * ZOOKEEPER_SESSION_TIMEOUT */ @@ -822,6 +827,7 @@ public final class Constants { /** - * + * default worker group id */ + public static final int DEFAULT_WORKER_ID = -1; } diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java index 106d6ff915..6f6e979797 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java @@ -24,20 +24,17 @@ public interface ITaskQueue { /** * take out all the elements * - * this method has deprecated - * use checkTaskExists instead * * @param key * @return */ - @Deprecated List getAllTasks(String key); /** * check task exists in the task queue or not * * @param key queue name - * @param task ${priority}_${processInstanceId}_${taskId} + * @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} * @return true if exists in the queue */ boolean checkTaskExists(String key, String task); @@ -54,10 +51,10 @@ public interface ITaskQueue { * an element pops out of the queue * * @param key queue name - * @param remove whether remove the element + * @param n how many elements to poll * @return */ - String poll(String key, boolean remove); + List poll(String key, int n); /** * remove a element from queue diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java index c8931064af..2d17481da4 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java @@ -42,7 +42,7 @@ public class TaskQueueFactory { public static ITaskQueue getTaskQueueInstance() { String queueImplValue = CommonUtils.getQueueImplValue(); if (StringUtils.isNotBlank(queueImplValue)) { -// queueImplValue = StringUtils.trim(queueImplValue); +// queueImplValue = IpUtils.trim(queueImplValue); // if (SCHEDULER_QUEUE_REDIS_IMPL.equals(queueImplValue)) { // logger.info("task queue impl use reids "); diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java index c3be8030ed..200685a0b7 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java @@ -19,6 +19,8 @@ package cn.escheduler.common.queue; import cn.escheduler.common.Constants; import cn.escheduler.common.utils.Bytes; +import cn.escheduler.common.utils.IpUtils; +import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.zk.AbstractZKClient; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; @@ -26,10 +28,7 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; /** * A singleton of a task queue implemented with zookeeper @@ -62,7 +61,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * @param key task queue name * @return */ - @Deprecated @Override public List getAllTasks(String key) { try { @@ -80,7 +78,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * check task exists in the task queue or not * * @param key queue name - * @param task ${priority}_${processInstanceId}_${taskId} + * @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} * @return true if exists in the queue */ @Override @@ -110,7 +108,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * add task to tasks queue * * @param key task queue name - * @param value ${priority}_${processInstanceId}_${taskId} + * @param value ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,... */ @Override public void add(String key, String value) { @@ -129,16 +127,16 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { /** * An element pops out of the queue

* note: - * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} + * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,... * The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low. * - * 流程实例优先级_流程实例id_任务优先级_任务id high <- low + * 流程优先级_流程实例id_任务优先级_任务id_任务执行的机器id1,任务执行的机器id2,... high <- low * @param key task queue name - * @param remove whether remove the element - * @return the task id to be executed + * @param tasksNum how many elements to poll + * @return the task ids to be executed */ @Override - public String poll(String key, boolean remove) { + public List poll(String key, int tasksNum) { try{ CuratorFramework zk = getZkClient(); String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH; @@ -146,55 +144,79 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { if(list != null && list.size() > 0){ + String workerIp = OSUtils.getHost(); + String workerIpLongStr = String.valueOf(IpUtils.ipToLong(workerIp)); + int size = list.size(); - String formatTargetTask = null; - String targetTaskKey = null; + + Set taskTreeSet = new TreeSet<>(); + for (int i = 0; i < size; i++) { + String taskDetail = list.get(i); String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE); - if(taskDetailArrs.length == 4){ + //向前版本兼容 + if(taskDetailArrs.length >= 4){ + //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} String formatTask = String.format("%s_%010d_%s_%010d", taskDetailArrs[0], Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2], Long.parseLong(taskDetailArrs[3])); - if(i > 0){ - int result = formatTask.compareTo(formatTargetTask); - if(result < 0){ - formatTargetTask = formatTask; - targetTaskKey = taskDetail; + if(taskDetailArrs.length > 4){ + String taskHosts = taskDetailArrs[4]; + + //task can assign to any worker host if equals default ip value of worker server + if(!taskHosts.equals(Constants.DEFAULT_WORKER_ID)){ + String[] taskHostsArr = taskHosts.split(Constants.COMMA); + + if(!Arrays.asList(taskHostsArr).contains(workerIpLongStr)){ + continue; + } } - }else{ - formatTargetTask = formatTask; - targetTaskKey = taskDetail; } - }else{ - logger.error("task queue poll error, task detail :{} , please check!", taskDetail); + + taskTreeSet.add(formatTask); + } - } - if(formatTargetTask != null){ - String taskIdPath = tasksQueuePath + targetTaskKey; + } - logger.info("consume task {}", taskIdPath); + List taskslist = getTasksListFromTreeSet(tasksNum, taskTreeSet); - String[] vals = targetTaskKey.split(Constants.UNDERLINE); + logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(taskslist.toArray()), size - taskslist.size()); - if(remove){ - removeNode(key, targetTaskKey); - } - logger.info("consume task: {},there still have {} tasks need to be executed", vals[vals.length - 1], size - 1); - return targetTaskKey; - }else{ - logger.error("should not go here, task queue poll error, please check!"); - } + return taskslist; + }else{ + Thread.sleep(Constants.SLEEP_TIME_MILLIS); } } catch (Exception e) { logger.error("add task to tasks queue exception",e); } - return null; + return new ArrayList(); + } + + + /** + * get task list from tree set + * + * @param tasksNum + * @param taskTreeSet + */ + public List getTasksListFromTreeSet(int tasksNum, Set taskTreeSet) { + Iterator iterator = taskTreeSet.iterator(); + int j = 0; + List taskslist = new ArrayList<>(tasksNum); + while(iterator.hasNext()){ + if(j++ < tasksNum){ + String task = iterator.next(); + taskslist.add(task); + } + } + return taskslist; } + @Override public void removeNode(String key, String nodeValue){ diff --git a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java index 177669b43c..e2f064be13 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java @@ -312,7 +312,11 @@ public abstract class AbstractZKClient { childrenList = zkClient.getChildren().forPath(masterZNodeParentPath); } } catch (Exception e) { - logger.warn(e.getMessage(),e); +// logger.warn(e.getMessage()); + if(!e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){ + logger.warn(e.getMessage(),e); + } + return childrenList.size(); } return childrenList.size(); diff --git a/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java b/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java index 7d35bc8480..4bf152bbf2 100644 --- a/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java +++ b/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java @@ -37,6 +37,12 @@ public class OSUtilsTest { // static HardwareAbstractionLayer hal = si.getHardware(); + @Test + public void getHost(){ + logger.info(OSUtils.getHost()); + } + + @Test public void memoryUsage() { logger.info("memoryUsage : {}", OSUtils.memoryUsage());// 0.3361799418926239 diff --git a/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java b/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java index 03ba29a840..0560ac46ff 100644 --- a/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java +++ b/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java @@ -49,9 +49,11 @@ public class TaskQueueImplTest { tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"4"); //pop - String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); + String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0); + assertEquals(node1,"1"); - String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); + + String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0); assertEquals(node2,"2"); //sadd @@ -99,7 +101,7 @@ public class TaskQueueImplTest { } } - String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); + String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0); assertEquals(node1,"0"); //clear all data diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index b18fb5e974..3fff75b8c3 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -24,6 +24,7 @@ import cn.escheduler.common.queue.ITaskQueue; import cn.escheduler.common.queue.TaskQueueFactory; import cn.escheduler.common.task.subprocess.SubProcessParameters; import cn.escheduler.common.utils.DateUtils; +import cn.escheduler.common.utils.IpUtils; import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.dao.mapper.*; @@ -108,7 +109,7 @@ public class ProcessDao extends AbstractBaseDao { */ @Override protected void init() { - userMapper=getMapper(UserMapper.class); + userMapper = getMapper(UserMapper.class); processDefineMapper = getMapper(ProcessDefinitionMapper.class); processInstanceMapper = getMapper(ProcessInstanceMapper.class); dataSourceMapper = getMapper(DataSourceMapper.class); @@ -947,11 +948,58 @@ public class ProcessDao extends AbstractBaseDao { * * 流程实例优先级_流程实例id_任务优先级_任务id high <- low * - * @param task + * @param taskInstance * @return */ - private String taskZkInfo(TaskInstance task) { - return String.valueOf(task.getProcessInstancePriority().ordinal()) + Constants.UNDERLINE + task.getProcessInstanceId() + Constants.UNDERLINE + task.getTaskInstancePriority().ordinal() + Constants.UNDERLINE + task.getId(); + private String taskZkInfo(TaskInstance taskInstance) { + + int taskWorkerGroupId = getTaskWorkerGroupId(taskInstance); + + StringBuilder sb = new StringBuilder(100); + + sb.append(taskInstance.getProcessInstancePriority().ordinal()).append(Constants.UNDERLINE) + .append(taskInstance.getProcessInstanceId()).append(Constants.UNDERLINE) + .append(taskInstance.getTaskInstancePriority().ordinal()).append(Constants.UNDERLINE) + .append(taskInstance.getId()).append(Constants.UNDERLINE); + + if(taskWorkerGroupId > 0){ + //not to find data from db + WorkerGroup workerGroup = queryWorkerGroupById(taskWorkerGroupId); + if(workerGroup == null ){ + logger.info("task {} cannot find the worker group, use all worker instead.", taskInstance.getId()); + + sb.append(Constants.DEFAULT_WORKER_ID); + return sb.toString(); + } + + String ips = workerGroup.getIpList(); + + if(StringUtils.isBlank(ips)){ + logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers", + taskInstance.getId(), workerGroup.getId()); + sb.append(Constants.DEFAULT_WORKER_ID); + return sb.toString(); + } + + StringBuilder ipSb = new StringBuilder(100); + String[] ipArray = ips.split(COMMA); + + for (String ip : ipArray) { + long ipLong = IpUtils.ipToLong(ip); + ipSb.append(ipLong).append(COMMA); + } + + if(ipSb.length() > 0) { + ipSb.deleteCharAt(ipSb.length() - 1); + } + + sb.append(ipSb); + }else{ + sb.append(Constants.DEFAULT_WORKER_ID); + } + + + return sb.toString(); } /** @@ -1591,5 +1639,23 @@ public class ProcessDao extends AbstractBaseDao { } + /** + * get task worker group id + * + * @param taskInstance + * @return + */ + public int getTaskWorkerGroupId(TaskInstance taskInstance) { + int taskWorkerGroupId = taskInstance.getWorkerGroupId(); + ProcessInstance processInstance = findProcessInstanceByTaskId(taskInstance.getId()); + if(processInstance == null){ + logger.error("cannot find the task:{} process instance", taskInstance.getId()); + } + int processWorkerGroupId = processInstance.getWorkerGroupId(); + + taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId); + return taskWorkerGroupId; + } + } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java index 3ecafde57a..ba10fcb57d 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java @@ -28,8 +28,9 @@ import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.dao.model.WorkerGroup; import cn.escheduler.server.zk.ZKWorkerClient; -import com.cronutils.utils.StringUtils; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,15 +102,7 @@ public class FetchTaskThread implements Runnable{ */ private boolean checkWorkerGroup(TaskInstance taskInstance, String host){ - int taskWorkerGroupId = taskInstance.getWorkerGroupId(); - ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId()); - if(processInstance == null){ - logger.error("cannot find the task:{} process instance", taskInstance.getId()); - return false; - } - int processWorkerGroupId = processInstance.getWorkerGroupId(); - - taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId); + int taskWorkerGroupId = processDao.getTaskWorkerGroupId(taskInstance); if(taskWorkerGroupId <= 0){ return true; @@ -120,118 +113,124 @@ public class FetchTaskThread implements Runnable{ return true; } String ips = workerGroup.getIpList(); - if(ips == null){ + if(StringUtils.isBlank(ips)){ logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers", taskInstance.getId(), workerGroup.getId()); } - String[] ipArray = ips.split(","); + String[] ipArray = ips.split(Constants.COMMA); List ipList = Arrays.asList(ipArray); return ipList.contains(host); } + + @Override public void run() { while (Stopper.isRunning()){ InterProcessMutex mutex = null; try { - if(OSUtils.checkResource(this.conf, false)) { - // creating distributed locks, lock path /escheduler/lock/worker - String zNodeLockPath = zkWorkerClient.getWorkerLockPath(); - mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath); - mutex.acquire(); + ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService; - ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService; + //check memory and cpu usage and threads + if(OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor)) { - for (int i = 0; i < taskNum; i++) { - - int activeCount = poolExecutor.getActiveCount(); - if (activeCount >= workerExecNums) { - logger.info("thread insufficient , activeCount : {} , workerExecNums : {}",activeCount,workerExecNums); - continue; - } + //whether have tasks, if no tasks , no need lock //get all tasks + List tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE); + if(tasksQueueList.size() > 0){ + // creating distributed locks, lock path /escheduler/lock/worker + String zNodeLockPath = zkWorkerClient.getWorkerLockPath(); + mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath); + mutex.acquire(); // task instance id str - String taskQueueStr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); + List taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum); - if (!StringUtils.isEmpty(taskQueueStr )) { + for(String taskQueueStr : taskQueueStrArr){ + if (StringUtils.isNotBlank(taskQueueStr )) { - String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE); - String taskInstIdStr = taskStringArray[taskStringArray.length - 1]; - Date now = new Date(); - Integer taskId = Integer.parseInt(taskInstIdStr); + if (!checkThreadCount(poolExecutor)) { + break; + } - // find task instance by task id - TaskInstance taskInstance = processDao.findTaskInstanceById(taskId); + String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE); + String taskInstIdStr = taskStringArray[taskStringArray.length - 1]; + Date now = new Date(); + Integer taskId = Integer.parseInt(taskInstIdStr); - logger.info("worker fetch taskId : {} from queue ", taskId); + // find task instance by task id + TaskInstance taskInstance = processDao.findTaskInstanceById(taskId); - int retryTimes = 30; - // mainly to wait for the master insert task to succeed - while (taskInstance == null && retryTimes > 0) { - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - taskInstance = processDao.findTaskInstanceById(taskId); - retryTimes--; - } + logger.info("worker fetch taskId : {} from queue ", taskId); - if (taskInstance == null ) { - logger.error("task instance is null. task id : {} ", taskId); - continue; - } - if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){ - continue; - } - taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr); - logger.info("remove task:{} from queue", taskQueueStr); + int retryTimes = 30; + // mainly to wait for the master insert task to succeed + while (taskInstance == null && retryTimes > 0) { + Thread.sleep(Constants.SLEEP_TIME_MILLIS); + taskInstance = processDao.findTaskInstanceById(taskId); + retryTimes--; + } + + if (taskInstance == null ) { + logger.error("task instance is null. task id : {} ", taskId); + continue; + } - // set execute task worker host - taskInstance.setHost(OSUtils.getHost()); - taskInstance.setStartTime(now); + if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){ + continue; + } + taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr); + logger.info("remove task:{} from queue", taskQueueStr); + // set execute task worker host + taskInstance.setHost(OSUtils.getHost()); + taskInstance.setStartTime(now); - // get process instance - ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); - // get process define - ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId()); + // get process instance + ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); + // get process define + ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId()); - taskInstance.setProcessInstance(processInstance); - taskInstance.setProcessDefine(processDefine); + taskInstance.setProcessInstance(processInstance); + taskInstance.setProcessDefine(processDefine); - // get local execute path - String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(), - processDefine.getId(), - processInstance.getId(), - taskInstance.getId()); - logger.info("task instance local execute path : {} ", execLocalPath); + // get local execute path + String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(), + processDefine.getId(), + processInstance.getId(), + taskInstance.getId()); + logger.info("task instance local execute path : {} ", execLocalPath); - // set task execute path - taskInstance.setExecutePath(execLocalPath); - // check and create Linux users - FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, - processInstance.getTenantCode(), logger); + // set task execute path + taskInstance.setExecutePath(execLocalPath); - logger.info("task : {} ready to submit to task scheduler thread",taskId); - // submit task - workerExecService.submit(new TaskScheduleThread(taskInstance, processDao)); + // check and create Linux users + FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, + processInstance.getTenantCode(), logger); + logger.info("task : {} ready to submit to task scheduler thread",taskId); + // submit task + workerExecService.submit(new TaskScheduleThread(taskInstance, processDao)); + + } } } + } Thread.sleep(Constants.SLEEP_TIME_MILLIS); }catch (Exception e){ logger.error("fetch task thread exception : " + e.getMessage(),e); - } - finally { + }finally { if (mutex != null){ try { mutex.release(); @@ -246,4 +245,18 @@ public class FetchTaskThread implements Runnable{ } } } + + /** + * + * @param poolExecutor + * @return + */ + private boolean checkThreadCount(ThreadPoolExecutor poolExecutor) { + int activeCount = poolExecutor.getActiveCount(); + if (activeCount >= workerExecNums) { + logger.info("thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource", activeCount, workerExecNums, Constants.SLEEP_TIME_MILLIS); + return false; + } + return true; + } } \ No newline at end of file From 7c48b98b88832c9ce233578ef02e5a38c1191697 Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Tue, 2 Jul 2019 20:23:03 +0800 Subject: [PATCH 13/18] remove httpcore and httpclient --- escheduler-alert/pom.xml | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/escheduler-alert/pom.xml b/escheduler-alert/pom.xml index 04c80ccf64..ed3baa40f0 100644 --- a/escheduler-alert/pom.xml +++ b/escheduler-alert/pom.xml @@ -102,17 +102,6 @@ escheduler-dao - - org.apache.httpcomponents - httpcore - 4.4.6 - - - org.apache.httpcomponents - httpclient - 4.5.5 - - From 5e57036831e2f0135c8545a3a98ce2bf76903a6d Mon Sep 17 00:00:00 2001 From: lidongdai Date: Wed, 3 Jul 2019 20:22:49 +0800 Subject: [PATCH 14/18] update TaskQueueImplTest --- .../common/queue/TaskQueueImplTest.java | 77 +++++++++---------- 1 file changed, 38 insertions(+), 39 deletions(-) diff --git a/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java b/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java index 0560ac46ff..72a6e46200 100644 --- a/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java +++ b/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java @@ -17,12 +17,15 @@ package cn.escheduler.common.queue; import cn.escheduler.common.Constants; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; +import java.util.List; import java.util.Random; import static org.junit.Assert.assertEquals; @@ -34,61 +37,62 @@ public class TaskQueueImplTest { private static final Logger logger = LoggerFactory.getLogger(TaskQueueImplTest.class); + ITaskQueue tasksQueue = null; + + @Before + public void before(){ + tasksQueue = TaskQueueFactory.getTaskQueueInstance(); + //clear all data + tasksQueue.delete(); + + } - @Test - public void testTaskQueue(){ - ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); + @After + public void after(){ //clear all data tasksQueue.delete(); + } + + + @Test + public void testAdd(){ //add - tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1"); - tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"2"); - tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"3"); - tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"4"); + tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_1_1_2130706433,3232236775"); + tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"0_1_1_1_2130706433,3232236775"); + tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_0_1_2130706433,3232236775"); + tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_2_1_1_2130706433,3232236775"); - //pop - String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0); + List tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1); - assertEquals(node1,"1"); + if(tasks.size() < 0){ + return; + } - String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0); - assertEquals(node2,"2"); + //pop + String node1 = tasks.get(0); - //sadd - String task1 = "1.1.1.1-1-mr"; - String task2 = "1.1.1.2-2-mr"; - String task3 = "1.1.1.3-3-mr"; - String task4 = "1.1.1.4-4-mr"; - String task5 = "1.1.1.5-5-mr"; + assertEquals(node1,"0_0000000001_1_0000000001"); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task1); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task2); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task3); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task4); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5); //repeat task + tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1); - Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),5); - logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray())); - //srem - tasksQueue.srem(Constants.SCHEDULER_TASKS_KILL,task5); - //smembers - Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),4); - logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray())); + if(tasks.size() < 0){ + return; + } + String node2 = tasks.get(0); + assertEquals(node2,"0_0000000001_1_0000000001"); } + + /** * test one million data from zookeeper queue */ @Test public void extremeTest(){ - ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); - //clear all data - tasksQueue.delete(); int total = 30 * 10000; for(int i = 0; i < total; i++) @@ -104,11 +108,6 @@ public class TaskQueueImplTest { String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0); assertEquals(node1,"0"); - //clear all data - tasksQueue.delete(); - - - } } From 0107afe72b0a1076244b2beb4a19b3eda53e3a45 Mon Sep 17 00:00:00 2001 From: lidongdai Date: Wed, 3 Jul 2019 20:32:17 +0800 Subject: [PATCH 15/18] restore alert.properties --- escheduler-alert/src/main/resources/alert.properties | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/escheduler-alert/src/main/resources/alert.properties b/escheduler-alert/src/main/resources/alert.properties index ef9ef68e76..e2cba1160d 100644 --- a/escheduler-alert/src/main/resources/alert.properties +++ b/escheduler-alert/src/main/resources/alert.properties @@ -3,10 +3,10 @@ alert.type=EMAIL # mail server configuration mail.protocol=SMTP -mail.server.host=smtp.163.com +mail.server.host=smtp.exmail.qq.com mail.server.port=25 -mail.sender=d66380022@163.com -mail.passwd=qwertyuiop123 +mail.sender=xxxxxxx +mail.passwd=xxxxxxx #xls file path,need create if not exist xls.file.path=/opt/xls From e7508469d480dd6e5cd29310211f6cad0a93a60f Mon Sep 17 00:00:00 2001 From: lidongdai Date: Wed, 3 Jul 2019 20:47:18 +0800 Subject: [PATCH 16/18] update api i18n and docs --- docs/zh_CN/系统架构设计.md | 6 +++--- .../src/main/resources/i18n/messages.properties | 12 ++++++++++++ .../main/resources/i18n/messages_en_US.properties | 12 ++++++++++++ .../main/resources/i18n/messages_zh_CN.properties | 10 ++++++++++ 4 files changed, 37 insertions(+), 3 deletions(-) diff --git a/docs/zh_CN/系统架构设计.md b/docs/zh_CN/系统架构设计.md index a6e1645a4c..134684155d 100644 --- a/docs/zh_CN/系统架构设计.md +++ b/docs/zh_CN/系统架构设计.md @@ -13,13 +13,13 @@ **流程定义**:通过拖拽任务节点并建立任务节点的关联所形成的可视化**DAG** -**流程实例**:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成 +**流程实例**:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成,流程定义每运行一次,产生一个流程实例 **任务实例**:任务实例是流程定义中任务节点的实例化,标识着具体的任务执行状态 -**任务类型**: 目前支持有SHELL、SQL、SUB_PROCESS、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT,同时计划支持动态插件扩展,注意:其中子 **SUB_PROCESS** 也是一个单独的流程定义,是可以单独启动执行的 +**任务类型**: 目前支持有SHELL、SQL、SUB_PROCESS(子流程)、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT(依赖),同时计划支持动态插件扩展,注意:其中子 **SUB_PROCESS** 也是一个单独的流程定义,是可以单独启动执行的 -**调度方式:** 系统支持基于cron表达式的定时调度和手动调度。命令类型支持:启动工作流、从当前节点开始执行、恢复被容错的工作流、恢复暂停流程、从失败节点开始执行、补数、调度、重跑、暂停、停止、恢复等待线程。其中 **恢复被容错的工作流** 和 **恢复等待线程** 两种命令类型是由调度内部控制使用,外部无法调用 +**调度方式:** 系统支持基于cron表达式的定时调度和手动调度。命令类型支持:启动工作流、从当前节点开始执行、恢复被容错的工作流、恢复暂停流程、从失败节点开始执行、补数、定时、重跑、暂停、停止、恢复等待线程。其中 **恢复被容错的工作流** 和 **恢复等待线程** 两种命令类型是由调度内部控制使用,外部无法调用 **定时调度**:系统采用 **quartz** 分布式调度器,并同时支持cron表达式可视化的生成 diff --git a/escheduler-api/src/main/resources/i18n/messages.properties b/escheduler-api/src/main/resources/i18n/messages.properties index db1111ad78..0361534f22 100644 --- a/escheduler-api/src/main/resources/i18n/messages.properties +++ b/escheduler-api/src/main/resources/i18n/messages.properties @@ -1,4 +1,16 @@ QUERY_SCHEDULE_LIST_NOTES=query schedule list +EXECUTE_PROCESS_TAG=execute process related operation +PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation +RUN_PROCESS_INSTANCE_NOTES=run process instance +START_NODE_LIST=start node list(node name) +TASK_DEPEND_TYPE=task depend type +COMMAND_TYPE=command type +RUN_MODE=run mode +TIMEOUT=timeout +EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance +EXECUTE_TYPE=execute type +START_CHECK_PROCESS_DEFINITION_NOTES=start check process definition +GET_RECEIVER_CC_NOTES=query receiver cc DESC=description GROUP_NAME=group name GROUP_TYPE=group type diff --git a/escheduler-api/src/main/resources/i18n/messages_en_US.properties b/escheduler-api/src/main/resources/i18n/messages_en_US.properties index db1111ad78..0361534f22 100644 --- a/escheduler-api/src/main/resources/i18n/messages_en_US.properties +++ b/escheduler-api/src/main/resources/i18n/messages_en_US.properties @@ -1,4 +1,16 @@ QUERY_SCHEDULE_LIST_NOTES=query schedule list +EXECUTE_PROCESS_TAG=execute process related operation +PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation +RUN_PROCESS_INSTANCE_NOTES=run process instance +START_NODE_LIST=start node list(node name) +TASK_DEPEND_TYPE=task depend type +COMMAND_TYPE=command type +RUN_MODE=run mode +TIMEOUT=timeout +EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance +EXECUTE_TYPE=execute type +START_CHECK_PROCESS_DEFINITION_NOTES=start check process definition +GET_RECEIVER_CC_NOTES=query receiver cc DESC=description GROUP_NAME=group name GROUP_TYPE=group type diff --git a/escheduler-api/src/main/resources/i18n/messages_zh_CN.properties b/escheduler-api/src/main/resources/i18n/messages_zh_CN.properties index 73d62a8839..633e5c890a 100644 --- a/escheduler-api/src/main/resources/i18n/messages_zh_CN.properties +++ b/escheduler-api/src/main/resources/i18n/messages_zh_CN.properties @@ -1,4 +1,14 @@ QUERY_SCHEDULE_LIST_NOTES=查询定时列表 +PROCESS_INSTANCE_EXECUTOR_TAG=流程实例执行相关操作 +RUN_PROCESS_INSTANCE_NOTES=运行流程实例 +START_NODE_LIST=开始节点列表(节点name) +TASK_DEPEND_TYPE=任务依赖类型 +COMMAND_TYPE=指令类型 +RUN_MODE=运行模式 +TIMEOUT=超时时间 +EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=执行流程实例的各种操作(暂停、停止、重跑、恢复等) +EXECUTE_TYPE=执行类型 +START_CHECK_PROCESS_DEFINITION_NOTES=检查流程定义 DESC=备注(描述) GROUP_NAME=组名称 GROUP_TYPE=组类型 From dcf7dd5b2b26a4fec2e2f2a041ea67e0eedbacf5 Mon Sep 17 00:00:00 2001 From: lidongdai Date: Wed, 3 Jul 2019 21:03:39 +0800 Subject: [PATCH 17/18] update pom --- escheduler-server/pom.xml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/escheduler-server/pom.xml b/escheduler-server/pom.xml index ad21578d6c..657633c80b 100644 --- a/escheduler-server/pom.xml +++ b/escheduler-server/pom.xml @@ -88,8 +88,12 @@ cn.analysys escheduler-alert + + cn.analysys + escheduler-api + - + From 258ff0cb6c2b4fbc3c8b9a6036c17e2efad00ac1 Mon Sep 17 00:00:00 2001 From: lidongdai Date: Thu, 4 Jul 2019 19:29:59 +0800 Subject: [PATCH 18/18] update SqlTask --- .../src/main/java/cn/escheduler/dao/ProcessDao.java | 1 + escheduler-server/pom.xml | 4 ---- .../java/cn/escheduler/server/worker/task/sql/SqlTask.java | 2 +- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index 0d5af285c3..056904281a 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -1693,6 +1693,7 @@ public class ProcessDao extends AbstractBaseDao { ProcessInstance processInstance = findProcessInstanceByTaskId(taskInstance.getId()); if(processInstance == null){ logger.error("cannot find the task:{} process instance", taskInstance.getId()); + return Constants.DEFAULT_WORKER_ID; } int processWorkerGroupId = processInstance.getWorkerGroupId(); diff --git a/escheduler-server/pom.xml b/escheduler-server/pom.xml index 657633c80b..6341539e4c 100644 --- a/escheduler-server/pom.xml +++ b/escheduler-server/pom.xml @@ -88,10 +88,6 @@ cn.analysys escheduler-alert - - cn.analysys - escheduler-api - diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java index 4eb567d8c8..f013771bc0 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java @@ -374,7 +374,7 @@ public class SqlTask extends AbstractTask { String showTypeName = sqlParameters.getShowType().replace(Constants.COMMA,"").trim(); if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){ Map mailResult = MailUtils.sendMails(receviersList, receviersCcList, title, content, ShowType.valueOf(showTypeName)); - if(!(Boolean) mailResult.get(cn.escheduler.api.utils.Constants.STATUS)){ + if(!(Boolean) mailResult.get(cn.escheduler.common.Constants.STATUS)){ throw new RuntimeException("send mail failed!"); } }else{