Browse Source

Merge remote-tracking branch 'upstream/dev-1.1.0' into dev-1.1.0

pull/2/head
huyuanming 5 years ago
parent
commit
195ac1e41c
  1. 2
      README.md
  2. 57
      escheduler-alert/src/main/java/cn/escheduler/alert/manager/EnterpriseWeChatManager.java
  3. 9
      escheduler-alert/src/main/java/cn/escheduler/alert/runner/AlertSender.java
  4. 8
      escheduler-alert/src/main/java/cn/escheduler/alert/utils/Constants.java
  5. 123
      escheduler-alert/src/main/java/cn/escheduler/alert/utils/EnterpriseWeChatUtils.java
  6. 4
      escheduler-alert/src/main/resources/alert.properties
  7. 57
      escheduler-alert/src/test/java/cn/escheduler/alert/utils/EnterpriseWeChatUtilsTest.java
  8. 51
      escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java
  9. 12
      escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java
  10. 17
      escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java
  11. 12
      escheduler-common/src/main/java/cn/escheduler/common/Constants.java
  12. 9
      escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java
  13. 2
      escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java
  14. 103
      escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java
  15. 70
      escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java
  16. 4
      escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
  17. 6
      escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java
  18. 85
      escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java
  19. 75
      escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
  20. 39
      escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java
  21. 2
      escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java
  22. 66
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
  23. 2
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java
  24. 2
      escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue
  25. 2
      escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue
  26. 2
      escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js
  27. 58
      escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue
  28. 2
      escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue
  29. 4
      escheduler-ui/src/js/module/i18n/locale/zh_CN.js
  30. 18
      install.sh

2
README.md

@ -50,7 +50,7 @@ Easy Scheduler
### 近期研发计划 ### 近期研发计划
EasyScheduler的工作计划:<a href="https://github.com/analysys/EasyScheduler/projects/1" target="_blank">研发计划</a> ,其中 In Develop卡片下是1.0.2版本的功能,TODO卡片是待做事项(包括 feature ideas) EasyScheduler的工作计划:<a href="https://github.com/analysys/EasyScheduler/projects/1" target="_blank">研发计划</a> ,其中 In Develop卡片下是1.1.0版本的功能,TODO卡片是待做事项(包括 feature ideas)
### 贡献代码 ### 贡献代码

57
escheduler-alert/src/main/java/cn/escheduler/alert/manager/EnterpriseWeChatManager.java

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.alert.manager;
import cn.escheduler.alert.utils.Constants;
import cn.escheduler.alert.utils.EnterpriseWeChatUtils;
import cn.escheduler.dao.model.Alert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Enterprise WeChat Manager
*/
public class EnterpriseWeChatManager {
private static final Logger logger = LoggerFactory.getLogger(MsgManager.class);
/**
* Enterprise We Chat send
* @param alert
*/
public Map<String,Object> send(Alert alert, String token){
Map<String,Object> retMap = new HashMap<>();
retMap.put(Constants.STATUS, false);
String agentId = EnterpriseWeChatUtils.enterpriseWeChatAgentId;
String users = EnterpriseWeChatUtils.enterpriseWeChatUsers;
List<String> userList = Arrays.asList(users.split(","));
logger.info("send message {}",alert);
String msg = EnterpriseWeChatUtils.makeUserSendMsg(userList, agentId,EnterpriseWeChatUtils.markdownByAlert(alert));
try {
EnterpriseWeChatUtils.sendEnterpriseWeChat(Constants.UTF_8, msg, token);
} catch (IOException e) {
logger.error(e.getMessage(),e);
}
retMap.put(Constants.STATUS, true);
return retMap;
}
}

9
escheduler-alert/src/main/java/cn/escheduler/alert/runner/AlertSender.java

@ -17,7 +17,9 @@
package cn.escheduler.alert.runner; package cn.escheduler.alert.runner;
import cn.escheduler.alert.manager.EmailManager; import cn.escheduler.alert.manager.EmailManager;
import cn.escheduler.alert.manager.EnterpriseWeChatManager;
import cn.escheduler.alert.utils.Constants; import cn.escheduler.alert.utils.Constants;
import cn.escheduler.alert.utils.EnterpriseWeChatUtils;
import cn.escheduler.common.enums.AlertStatus; import cn.escheduler.common.enums.AlertStatus;
import cn.escheduler.common.enums.AlertType; import cn.escheduler.common.enums.AlertType;
import cn.escheduler.dao.AlertDao; import cn.escheduler.dao.AlertDao;
@ -40,6 +42,7 @@ public class AlertSender{
private static final Logger logger = LoggerFactory.getLogger(AlertSender.class); private static final Logger logger = LoggerFactory.getLogger(AlertSender.class);
private static final EmailManager emailManager= new EmailManager(); private static final EmailManager emailManager= new EmailManager();
private static final EnterpriseWeChatManager weChatManager= new EnterpriseWeChatManager();
private List<Alert> alertList; private List<Alert> alertList;
@ -109,6 +112,12 @@ public class AlertSender{
if (flag){ if (flag){
alertDao.updateAlert(AlertStatus.EXECUTION_SUCCESS, "execution success", alert.getId()); alertDao.updateAlert(AlertStatus.EXECUTION_SUCCESS, "execution success", alert.getId());
logger.info("alert send success"); logger.info("alert send success");
try {
String token = EnterpriseWeChatUtils.getToken();
weChatManager.send(alert,token);
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
}else { }else {
alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE,String.valueOf(retMaps.get(Constants.MESSAGE)),alert.getId()); alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE,String.valueOf(retMaps.get(Constants.MESSAGE)),alert.getId());
logger.info("alert send error : {}" , String.valueOf(retMaps.get(Constants.MESSAGE))); logger.info("alert send error : {}" , String.valueOf(retMaps.get(Constants.MESSAGE)));

8
escheduler-alert/src/main/java/cn/escheduler/alert/utils/Constants.java

@ -129,6 +129,10 @@ public class Constants {
public static final int ALERT_SCAN_INTERVEL = 5000; public static final int ALERT_SCAN_INTERVEL = 5000;
public static final String MARKDOWN_QUOTE = ">";
public static final String MARKDOWN_ENTER = "\n";
public static final String ENTERPRISE_WECHAT_CORP_ID = "enterprise.wechat.corp.id"; public static final String ENTERPRISE_WECHAT_CORP_ID = "enterprise.wechat.corp.id";
public static final String ENTERPRISE_WECHAT_SECRET = "enterprise.wechat.secret"; public static final String ENTERPRISE_WECHAT_SECRET = "enterprise.wechat.secret";
@ -140,4 +144,8 @@ public class Constants {
public static final String ENTERPRISE_WECHAT_TEAM_SEND_MSG = "enterprise.wechat.team.send.msg"; public static final String ENTERPRISE_WECHAT_TEAM_SEND_MSG = "enterprise.wechat.team.send.msg";
public static final String ENTERPRISE_WECHAT_USER_SEND_MSG = "enterprise.wechat.user.send.msg"; public static final String ENTERPRISE_WECHAT_USER_SEND_MSG = "enterprise.wechat.user.send.msg";
public static final String ENTERPRISE_WECHAT_AGENT_ID = "enterprise.wechat.agent.id";
public static final String ENTERPRISE_WECHAT_USERS = "enterprise.wechat.users";
} }

123
escheduler-alert/src/main/java/cn/escheduler/alert/utils/EnterpriseWeChatUtils.java

@ -16,9 +16,12 @@
*/ */
package cn.escheduler.alert.utils; package cn.escheduler.alert.utils;
import cn.escheduler.common.enums.ShowType;
import cn.escheduler.dao.model.Alert;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.google.common.reflect.TypeToken; import com.google.common.reflect.TypeToken;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity; import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpGet;
@ -31,13 +34,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.*;
import java.util.Map;
import static cn.escheduler.alert.utils.PropertyUtils.getString; import static cn.escheduler.alert.utils.PropertyUtils.getString;
/** /**
* qiye weixin utils * Enterprise WeChat utils
*/ */
public class EnterpriseWeChatUtils { public class EnterpriseWeChatUtils {
@ -48,7 +50,7 @@ public class EnterpriseWeChatUtils {
private static final String enterpriseWeChatSecret = getString(Constants.ENTERPRISE_WECHAT_SECRET); private static final String enterpriseWeChatSecret = getString(Constants.ENTERPRISE_WECHAT_SECRET);
private static final String enterpriseWeChatTokenUrl = getString(Constants.ENTERPRISE_WECHAT_TOKEN_URL); private static final String enterpriseWeChatTokenUrl = getString(Constants.ENTERPRISE_WECHAT_TOKEN_URL);
private String enterpriseWeChatTokenUrlReplace = enterpriseWeChatTokenUrl private static String enterpriseWeChatTokenUrlReplace = enterpriseWeChatTokenUrl
.replaceAll("\\$corpId", enterpriseWeChatCorpId) .replaceAll("\\$corpId", enterpriseWeChatCorpId)
.replaceAll("\\$secret", enterpriseWeChatSecret); .replaceAll("\\$secret", enterpriseWeChatSecret);
@ -58,12 +60,16 @@ public class EnterpriseWeChatUtils {
private static final String enterpriseWeChatUserSendMsg = getString(Constants.ENTERPRISE_WECHAT_USER_SEND_MSG); private static final String enterpriseWeChatUserSendMsg = getString(Constants.ENTERPRISE_WECHAT_USER_SEND_MSG);
public static final String enterpriseWeChatAgentId = getString(Constants.ENTERPRISE_WECHAT_AGENT_ID);
public static final String enterpriseWeChatUsers = getString(Constants.ENTERPRISE_WECHAT_USERS);
/** /**
* get winxin token info * get Enterprise WeChat token info
* @return token string info * @return token string info
* @throws IOException * @throws IOException
*/ */
public String getToken() throws IOException { public static String getToken() throws IOException {
String resp; String resp;
CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpClient httpClient = HttpClients.createDefault();
@ -71,7 +77,7 @@ public class EnterpriseWeChatUtils {
CloseableHttpResponse response = httpClient.execute(httpGet); CloseableHttpResponse response = httpClient.execute(httpGet);
try { try {
HttpEntity entity = response.getEntity(); HttpEntity entity = response.getEntity();
resp = EntityUtils.toString(entity, "utf-8"); resp = EntityUtils.toString(entity, Constants.UTF_8);
EntityUtils.consume(entity); EntityUtils.consume(entity);
} finally { } finally {
response.close(); response.close();
@ -84,26 +90,26 @@ public class EnterpriseWeChatUtils {
} }
/** /**
* make team single weixin message * make team single Enterprise WeChat message
* @param toParty * @param toParty
* @param agentId * @param agentId
* @param msg * @param msg
* @return weixin send message * @return Enterprise WeChat send message
*/ */
public String makeTeamSendMsg(String toParty, String agentId, String msg) { public static String makeTeamSendMsg(String toParty, String agentId, String msg) {
return enterpriseWeChatTeamSendMsg.replaceAll("\\$toParty", toParty) return enterpriseWeChatTeamSendMsg.replaceAll("\\$toParty", toParty)
.replaceAll("\\$agentId", agentId) .replaceAll("\\$agentId", agentId)
.replaceAll("\\$msg", msg); .replaceAll("\\$msg", msg);
} }
/** /**
* make team multi weixin message * make team multi Enterprise WeChat message
* @param toParty * @param toParty
* @param agentId * @param agentId
* @param msg * @param msg
* @return weixin send message * @return Enterprise WeChat send message
*/ */
public String makeTeamSendMsg(Collection<String> toParty, String agentId, String msg) { public static String makeTeamSendMsg(Collection<String> toParty, String agentId, String msg) {
String listParty = FuncUtils.mkString(toParty, "|"); String listParty = FuncUtils.mkString(toParty, "|");
return enterpriseWeChatTeamSendMsg.replaceAll("\\$toParty", listParty) return enterpriseWeChatTeamSendMsg.replaceAll("\\$toParty", listParty)
.replaceAll("\\$agentId", agentId) .replaceAll("\\$agentId", agentId)
@ -115,9 +121,9 @@ public class EnterpriseWeChatUtils {
* @param toUser * @param toUser
* @param agentId * @param agentId
* @param msg * @param msg
* @return weixin send message * @return Enterprise WeChat send message
*/ */
public String makeUserSendMsg(String toUser, String agentId, String msg) { public static String makeUserSendMsg(String toUser, String agentId, String msg) {
return enterpriseWeChatUserSendMsg.replaceAll("\\$toUser", toUser) return enterpriseWeChatUserSendMsg.replaceAll("\\$toUser", toUser)
.replaceAll("\\$agentId", agentId) .replaceAll("\\$agentId", agentId)
.replaceAll("\\$msg", msg); .replaceAll("\\$msg", msg);
@ -128,9 +134,9 @@ public class EnterpriseWeChatUtils {
* @param toUser * @param toUser
* @param agentId * @param agentId
* @param msg * @param msg
* @return weixin send message * @return Enterprise WeChat send message
*/ */
public String makeUserSendMsg(Collection<String> toUser, String agentId, String msg) { public static String makeUserSendMsg(Collection<String> toUser, String agentId, String msg) {
String listUser = FuncUtils.mkString(toUser, "|"); String listUser = FuncUtils.mkString(toUser, "|");
return enterpriseWeChatUserSendMsg.replaceAll("\\$toUser", listUser) return enterpriseWeChatUserSendMsg.replaceAll("\\$toUser", listUser)
.replaceAll("\\$agentId", agentId) .replaceAll("\\$agentId", agentId)
@ -138,14 +144,14 @@ public class EnterpriseWeChatUtils {
} }
/** /**
* send weixin * send Enterprise WeChat
* @param charset * @param charset
* @param data * @param data
* @param token * @param token
* @return weixin resp, demo: {"errcode":0,"errmsg":"ok","invaliduser":""} * @return Enterprise WeChat resp, demo: {"errcode":0,"errmsg":"ok","invaliduser":""}
* @throws IOException * @throws IOException
*/ */
public String sendQiyeWeixin(String charset, String data, String token) throws IOException { public static String sendEnterpriseWeChat(String charset, String data, String token) throws IOException {
String enterpriseWeChatPushUrlReplace = enterpriseWeChatPushUrl.replaceAll("\\$token", token); String enterpriseWeChatPushUrlReplace = enterpriseWeChatPushUrl.replaceAll("\\$token", token);
CloseableHttpClient httpclient = HttpClients.createDefault(); CloseableHttpClient httpclient = HttpClients.createDefault();
@ -160,8 +166,83 @@ public class EnterpriseWeChatUtils {
} finally { } finally {
response.close(); response.close();
} }
logger.info("qiye weixin send [{}], param:{}, resp:{}", enterpriseWeChatPushUrl, data, resp); logger.info("Enterprise WeChat send [{}], param:{}, resp:{}", enterpriseWeChatPushUrl, data, resp);
return resp; return resp;
} }
/**
* convert table to markdown style
* @param title
* @param content
* @return
*/
public static String markdownTable(String title,String content){
List<LinkedHashMap> mapItemsList = JSONUtils.toList(content, LinkedHashMap.class);
StringBuilder contents = new StringBuilder(200);
for (LinkedHashMap mapItems : mapItemsList){
Set<Map.Entry<String, String>> entries = mapItems.entrySet();
Iterator<Map.Entry<String, String>> iterator = entries.iterator();
StringBuilder t = new StringBuilder(String.format("`%s`%s",title,Constants.MARKDOWN_ENTER));
while (iterator.hasNext()){
Map.Entry<String, String> entry = iterator.next();
t.append(Constants.MARKDOWN_QUOTE);
t.append(entry.getKey()).append(":").append(entry.getValue());
t.append(Constants.MARKDOWN_ENTER);
}
contents.append(t);
}
return contents.toString();
}
/**
* convert text to markdown style
* @param title
* @param content
* @return
*/
public static String markdownText(String title,String content){
if (StringUtils.isNotEmpty(content)){
List<String> list;
try {
list = JSONUtils.toList(content,String.class);
}catch (Exception e){
logger.error("json format exception",e);
return null;
}
StringBuilder contents = new StringBuilder(100);
contents.append(String.format("`%s`\n",title));
for (String str : list){
contents.append(Constants.MARKDOWN_QUOTE);
contents.append(str);
contents.append(Constants.MARKDOWN_ENTER);
}
return contents.toString();
}
return null;
}
/**
* Determine the mardown style based on the show type of the alert
* @param alert
* @return
*/
public static String markdownByAlert(Alert alert){
String result = "";
if (alert.getShowType() == ShowType.TABLE) {
result = markdownTable(alert.getTitle(),alert.getContent());
}else if(alert.getShowType() == ShowType.TEXT){
result = markdownText(alert.getTitle(),alert.getContent());
}
return result;
}
} }

4
escheduler-alert/src/main/resources/alert.properties

@ -19,10 +19,12 @@ xls.file.path=/tmp/xls
# Enterprise WeChat configuration # Enterprise WeChat configuration
enterprise.wechat.corp.id=xxxxxxx enterprise.wechat.corp.id=xxxxxxx
enterprise.wechat.secret=xxxxxxx enterprise.wechat.secret=xxxxxxx
enterprise.wechat.agent.id=xxxxxxx
enterprise.wechat.users=xxxxxxx
enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpId&corpsecret=$secret enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpId&corpsecret=$secret
enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$token enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$token
enterprise.wechat.team.send.msg={\"toparty\":\"$toParty\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"} enterprise.wechat.team.send.msg={\"toparty\":\"$toParty\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"}
enterprise.wechat.user.send.msg={\"touser\":\"$toUser\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"} enterprise.wechat.user.send.msg={\"touser\":\"$toUser\",\"agentid\":\"$agentId\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"$msg\"}}

57
escheduler-alert/src/test/java/cn/escheduler/alert/utils/EnterpriseWeChatUtilsTest.java

@ -16,11 +16,10 @@
*/ */
package cn.escheduler.alert.utils; package cn.escheduler.alert.utils;
import com.alibaba.fastjson.JSON;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import com.alibaba.fastjson.JSON;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -33,24 +32,23 @@ import java.util.Collection;
* enterprise.wechat.token.url * enterprise.wechat.token.url
* enterprise.wechat.push.url * enterprise.wechat.push.url
* enterprise.wechat.send.msg * enterprise.wechat.send.msg
* enterprise.wechat.agent.id
* enterprise.wechat.users
*/ */
public class EnterpriseWeChatUtilsTest { public class EnterpriseWeChatUtilsTest {
private String agentId = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_AGENT_ID); // app id
private Collection<String> listUserId = Arrays.asList(PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_USERS).split(","));
// Please change // Please change
private String agentId = "1000002"; // app id
private String partyId = "2"; private String partyId = "2";
private Collection<String> listPartyId = Arrays.asList("2","4"); private Collection<String> listPartyId = Arrays.asList("2","4");
private String userId = "test1";
private Collection<String> listUserId = Arrays.asList("test1","test2");
@Test @Test
public void testSendSingleTeamWeChat() { public void testSendSingleTeamWeChat() {
EnterpriseWeChatUtils wx = new EnterpriseWeChatUtils();
try { try {
String token = wx.getToken(); String token = EnterpriseWeChatUtils.getToken();
String msg = wx.makeTeamSendMsg(partyId, agentId, "hello world"); String msg = EnterpriseWeChatUtils.makeTeamSendMsg(partyId, agentId, "hello world");
String resp = wx.sendQiyeWeixin("utf-8", msg, token); String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
String errmsg = JSON.parseObject(resp).getString("errmsg"); String errmsg = JSON.parseObject(resp).getString("errmsg");
Assert.assertEquals(errmsg, "ok"); Assert.assertEquals(errmsg, "ok");
@ -61,12 +59,11 @@ public class EnterpriseWeChatUtilsTest {
@Test @Test
public void testSendMultiTeamWeChat() { public void testSendMultiTeamWeChat() {
EnterpriseWeChatUtils wx = new EnterpriseWeChatUtils();
try { try {
String token = wx.getToken(); String token = EnterpriseWeChatUtils.getToken();
String msg = wx.makeTeamSendMsg(listPartyId, agentId, "hello world"); String msg = EnterpriseWeChatUtils.makeTeamSendMsg(listPartyId, agentId, "hello world");
String resp = wx.sendQiyeWeixin("utf-8", msg, token); String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
String errmsg = JSON.parseObject(resp).getString("errmsg"); String errmsg = JSON.parseObject(resp).getString("errmsg");
Assert.assertEquals(errmsg, "ok"); Assert.assertEquals(errmsg, "ok");
@ -77,12 +74,23 @@ public class EnterpriseWeChatUtilsTest {
@Test @Test
public void testSendSingleUserWeChat() { public void testSendSingleUserWeChat() {
EnterpriseWeChatUtils wx = new EnterpriseWeChatUtils();
try { try {
String token = wx.getToken(); String token = EnterpriseWeChatUtils.getToken();
String msg = wx.makeUserSendMsg(userId, agentId, "hello world"); String msg = EnterpriseWeChatUtils.makeUserSendMsg(listUserId.stream().findFirst().get(), agentId, "您的会议室已经预定,稍后会同步到`邮箱` \n" +
String resp = wx.sendQiyeWeixin("utf-8", msg, token); ">**事项详情** \n" +
">事 项:<font color='info'>开会</font> <br>" +
">组织者:@miglioguan \n" +
">参与者:@miglioguan、@kunliu、@jamdeezhou、@kanexiong、@kisonwang \n" +
"> \n" +
">会议室:<font color='info'>广州TIT 1楼 301</font> \n" +
">日 期:<font color='warning'>2018年5月18日</font> \n" +
">时 间:<font color='comment'>上午9:00-11:00</font> \n" +
"> \n" +
">请准时参加会议。 \n" +
"> \n" +
">如需修改会议信息,请点击:[修改会议信息](https://work.weixin.qq.com)\"");
String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
String errmsg = JSON.parseObject(resp).getString("errmsg"); String errmsg = JSON.parseObject(resp).getString("errmsg");
Assert.assertEquals(errmsg, "ok"); Assert.assertEquals(errmsg, "ok");
@ -93,12 +101,11 @@ public class EnterpriseWeChatUtilsTest {
@Test @Test
public void testSendMultiUserWeChat() { public void testSendMultiUserWeChat() {
EnterpriseWeChatUtils wx = new EnterpriseWeChatUtils();
try { try {
String token = wx.getToken(); String token = EnterpriseWeChatUtils.getToken();
String msg = wx.makeUserSendMsg(listUserId, agentId, "hello world");
String resp = wx.sendQiyeWeixin("utf-8", msg, token); String msg = EnterpriseWeChatUtils.makeUserSendMsg(listUserId, agentId, "hello world");
String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
String errmsg = JSON.parseObject(resp).getString("errmsg"); String errmsg = JSON.parseObject(resp).getString("errmsg");
Assert.assertEquals(errmsg, "ok"); Assert.assertEquals(errmsg, "ok");

51
escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java

@ -24,7 +24,7 @@ import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.Result; import cn.escheduler.api.utils.Result;
import cn.escheduler.common.enums.*; import cn.escheduler.common.enums.*;
import cn.escheduler.dao.model.User; import cn.escheduler.dao.model.User;
import io.swagger.annotations.Api; import io.swagger.annotations.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -38,9 +38,9 @@ import static cn.escheduler.api.enums.Status.*;
/** /**
* execute task controller * execute process controller
*/ */
@ApiIgnore @Api(tags = "PROCESS_INSTANCE_EXECUTOR_TAG", position = 1)
@RestController @RestController
@RequestMapping("projects/{projectName}/executors") @RequestMapping("projects/{projectName}/executors")
public class ExecutorController extends BaseController { public class ExecutorController extends BaseController {
@ -53,10 +53,27 @@ public class ExecutorController extends BaseController {
/** /**
* execute process instance * execute process instance
*/ */
@ApiOperation(value = "startProcessInstance", notes= "RUN_PROCESS_INSTANCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String"),
@ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType ="FailureStrategy"),
@ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType ="String"),
@ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType ="TaskDependType"),
@ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType ="CommandType"),
@ApiImplicitParam(name = "warningType", value = "WARNING_TYPE",required = true, dataType ="WarningType"),
@ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID",required = true, dataType ="Int", example = "100"),
@ApiImplicitParam(name = "receivers", value = "RECEIVERS",dataType ="String" ),
@ApiImplicitParam(name = "receiversCc", value = "RECEIVERS_CC",dataType ="String" ),
@ApiImplicitParam(name = "runMode", value = "RUN_MODE",dataType ="RunMode" ),
@ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority" ),
@ApiImplicitParam(name = "workerGroupId", value = "WORKER_GROUP_ID", dataType = "Int",example = "100"),
@ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int",example = "100"),
})
@PostMapping(value = "start-process-instance") @PostMapping(value = "start-process-instance")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
public Result startProcessInstance(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, public Result startProcessInstance(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable String projectName, @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam(value = "processDefinitionId") int processDefinitionId, @RequestParam(value = "processDefinitionId") int processDefinitionId,
@RequestParam(value = "scheduleTime", required = false) String scheduleTime, @RequestParam(value = "scheduleTime", required = false) String scheduleTime,
@RequestParam(value = "failureStrategy", required = true) FailureStrategy failureStrategy, @RequestParam(value = "failureStrategy", required = true) FailureStrategy failureStrategy,
@ -102,10 +119,15 @@ public class ExecutorController extends BaseController {
* @param processInstanceId * @param processInstanceId
* @return * @return
*/ */
@ApiOperation(value = "execute", notes= "EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType")
})
@PostMapping(value = "/execute") @PostMapping(value = "/execute")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
public Result execute(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, public Result execute(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable String projectName, @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam("processInstanceId") Integer processInstanceId, @RequestParam("processInstanceId") Integer processInstanceId,
@RequestParam("executeType") ExecuteType executeType @RequestParam("executeType") ExecuteType executeType
) { ) {
@ -127,9 +149,13 @@ public class ExecutorController extends BaseController {
* @param processDefinitionId * @param processDefinitionId
* @return * @return
*/ */
@ApiOperation(value = "startCheckProcessDefinition", notes= "START_CHECK_PROCESS_DEFINITION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100")
})
@PostMapping(value = "/start-check") @PostMapping(value = "/start-check")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
public Result startCheckProcessDefinition(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, public Result startCheckProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "processDefinitionId") int processDefinitionId) { @RequestParam(value = "processDefinitionId") int processDefinitionId) {
logger.info("login user {}, check process definition", loginUser.getUserName(), processDefinitionId); logger.info("login user {}, check process definition", loginUser.getUserName(), processDefinitionId);
try { try {
@ -149,9 +175,16 @@ public class ExecutorController extends BaseController {
* @param processDefinitionId * @param processDefinitionId
* @return * @return
*/ */
@ApiIgnore
@ApiOperation(value = "getReceiverCc", notes= "GET_RECEIVER_CC_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/get-receiver-cc") @GetMapping(value = "/get-receiver-cc")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
public Result getReceiverCc(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, public Result getReceiverCc(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "processDefinitionId",required = false) Integer processDefinitionId, @RequestParam(value = "processDefinitionId",required = false) Integer processDefinitionId,
@RequestParam(value = "processInstanceId",required = false) Integer processInstanceId) { @RequestParam(value = "processInstanceId",required = false) Integer processInstanceId) {
logger.info("login user {}, get process definition receiver and cc", loginUser.getUserName()); logger.info("login user {}, get process definition receiver and cc", loginUser.getUserName());

12
escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java

@ -191,6 +191,16 @@ public class ExecutorService extends BaseService{
return checkResult; return checkResult;
} }
// checkTenantExists();
Tenant tenant = processDao.getTenantForProcess(processDefinition.getTenantId(),
processDefinition.getUserId());
if(tenant == null){
logger.error("there is not any vaild tenant for the process definition: id:{},name:{}, ",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
switch (executeType) { switch (executeType) {
case REPEAT_RUNNING: case REPEAT_RUNNING:
result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING); result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING);
@ -260,7 +270,7 @@ public class ExecutorService extends BaseService{
} }
break; break;
case RECOVER_SUSPENDED_PROCESS: case RECOVER_SUSPENDED_PROCESS:
if (executionStatus.typeIsPause()) { if (executionStatus.typeIsPause()|| executionStatus.typeIsCancel()) {
checkResult = true; checkResult = true;
} }
default: default:

17
escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java

@ -38,10 +38,7 @@ import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.common.utils.ParameterUtils;
import cn.escheduler.common.utils.placeholder.BusinessTimeUtils; import cn.escheduler.common.utils.placeholder.BusinessTimeUtils;
import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.mapper.ProcessDefinitionMapper; import cn.escheduler.dao.mapper.*;
import cn.escheduler.dao.mapper.ProcessInstanceMapper;
import cn.escheduler.dao.mapper.ProjectMapper;
import cn.escheduler.dao.mapper.TaskInstanceMapper;
import cn.escheduler.dao.model.*; import cn.escheduler.dao.model.*;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -97,6 +94,9 @@ public class ProcessInstanceService extends BaseDAGService {
@Autowired @Autowired
LoggerService loggerService; LoggerService loggerService;
@Autowired
WorkerGroupMapper workerGroupMapper;
/** /**
* query process instance by id * query process instance by id
* *
@ -115,6 +115,15 @@ public class ProcessInstanceService extends BaseDAGService {
return checkResult; return checkResult;
} }
ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processId); ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processId);
if(processInstance.getWorkerGroupId() == -1){
processInstance.setWorkerGroupName(DEFAULT);
}else{
WorkerGroup workerGroup = workerGroupMapper.queryById(processInstance.getWorkerGroupId());
processInstance.setWorkerGroupName(workerGroup.getName());
}
ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
processInstance.setReceivers(processDefinition.getReceivers());
processInstance.setReceiversCc(processDefinition.getReceiversCc());
result.put(Constants.DATA_LIST, processInstance); result.put(Constants.DATA_LIST, processInstance);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);

12
escheduler-common/src/main/java/cn/escheduler/common/Constants.java

@ -219,6 +219,11 @@ public final class Constants {
*/ */
public static final String SEMICOLON = ";"; public static final String SEMICOLON = ";";
/**
* DOT .
*/
public static final String DOT = ".";
/** /**
* ZOOKEEPER_SESSION_TIMEOUT * ZOOKEEPER_SESSION_TIMEOUT
*/ */
@ -483,6 +488,8 @@ public final class Constants {
public static final String TASK_RECORD_PWD = "task.record.datasource.password"; public static final String TASK_RECORD_PWD = "task.record.datasource.password";
public static final String DEFAULT = "Default";
public static String TASK_RECORD_TABLE_HIVE_LOG = "eamp_hive_log_hd"; public static String TASK_RECORD_TABLE_HIVE_LOG = "eamp_hive_log_hd";
public static String TASK_RECORD_TABLE_HISTORY_HIVE_LOG = "eamp_hive_hist_log_hd"; public static String TASK_RECORD_TABLE_HISTORY_HIVE_LOG = "eamp_hive_hist_log_hd";
@ -883,6 +890,11 @@ public final class Constants {
*/ */
public static final String LOGIN_USER_KEY_TAB_USERNAME = "login.user.keytab.username"; public static final String LOGIN_USER_KEY_TAB_USERNAME = "login.user.keytab.username";
/**
* default worker group id
*/
public static final int DEFAULT_WORKER_ID = -1;
/** /**
* loginUserFromKeytab path * loginUserFromKeytab path
*/ */

9
escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java

@ -24,20 +24,17 @@ public interface ITaskQueue {
/** /**
* take out all the elements * take out all the elements
* *
* this method has deprecated
* use checkTaskExists instead
* *
* @param key * @param key
* @return * @return
*/ */
@Deprecated
List<String> getAllTasks(String key); List<String> getAllTasks(String key);
/** /**
* check task exists in the task queue or not * check task exists in the task queue or not
* *
* @param key queue name * @param key queue name
* @param task ${priority}_${processInstanceId}_${taskId} * @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* @return true if exists in the queue * @return true if exists in the queue
*/ */
boolean checkTaskExists(String key, String task); boolean checkTaskExists(String key, String task);
@ -54,10 +51,10 @@ public interface ITaskQueue {
* an element pops out of the queue * an element pops out of the queue
* *
* @param key queue name * @param key queue name
* @param remove whether remove the element * @param n how many elements to poll
* @return * @return
*/ */
String poll(String key, boolean remove); List<String> poll(String key, int n);
/** /**
* remove a element from queue * remove a element from queue

2
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java

@ -42,7 +42,7 @@ public class TaskQueueFactory {
public static ITaskQueue getTaskQueueInstance() { public static ITaskQueue getTaskQueueInstance() {
String queueImplValue = CommonUtils.getQueueImplValue(); String queueImplValue = CommonUtils.getQueueImplValue();
if (StringUtils.isNotBlank(queueImplValue)) { if (StringUtils.isNotBlank(queueImplValue)) {
// queueImplValue = StringUtils.trim(queueImplValue); // queueImplValue = IpUtils.trim(queueImplValue);
// if (SCHEDULER_QUEUE_REDIS_IMPL.equals(queueImplValue)) { // if (SCHEDULER_QUEUE_REDIS_IMPL.equals(queueImplValue)) {
// logger.info("task queue impl use reids "); // logger.info("task queue impl use reids ");

103
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java

@ -19,6 +19,8 @@ package cn.escheduler.common.queue;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.Bytes; import cn.escheduler.common.utils.Bytes;
import cn.escheduler.common.utils.IpUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient; import cn.escheduler.common.zk.AbstractZKClient;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
@ -26,10 +28,7 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.*;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/** /**
* A singleton of a task queue implemented with zookeeper * A singleton of a task queue implemented with zookeeper
@ -62,7 +61,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* @param key task queue name * @param key task queue name
* @return * @return
*/ */
@Deprecated
@Override @Override
public List<String> getAllTasks(String key) { public List<String> getAllTasks(String key) {
try { try {
@ -80,7 +78,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* check task exists in the task queue or not * check task exists in the task queue or not
* *
* @param key queue name * @param key queue name
* @param task ${priority}_${processInstanceId}_${taskId} * @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* @return true if exists in the queue * @return true if exists in the queue
*/ */
@Override @Override
@ -110,7 +108,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* add task to tasks queue * add task to tasks queue
* *
* @param key task queue name * @param key task queue name
* @param value ${priority}_${processInstanceId}_${taskId} * @param value ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
*/ */
@Override @Override
public void add(String key, String value) { public void add(String key, String value) {
@ -118,9 +116,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value; String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value;
String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value)); String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value));
// String path = conf.getString(Constants.ZOOKEEPER_SCHEDULER_ROOT) + Constants.SINGLE_SLASH + Constants.SCHEDULER_TASKS_QUEUE + "_add" + Constants.SINGLE_SLASH + value;
// getZkClient().create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,
// Bytes.toBytes(value));
logger.info("add task : {} to tasks queue , result success",result); logger.info("add task : {} to tasks queue , result success",result);
} catch (Exception e) { } catch (Exception e) {
logger.error("add task to tasks queue exception",e); logger.error("add task to tasks queue exception",e);
@ -132,16 +127,16 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
/** /**
* An element pops out of the queue <p> * An element pops out of the queue <p>
* note: * note:
* ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
* The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low. * The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low.
* *
* 流程实例优先级_流程实例id_任务优先级_任务id high <- low * 流程优先级_流程实例id_任务优先级_任务id_任务执行的机器id1,任务执行的机器id2,... high <- low
* @param key task queue name * @param key task queue name
* @param remove whether remove the element * @param tasksNum how many elements to poll
* @return the task id to be executed * @return the task ids to be executed
*/ */
@Override @Override
public String poll(String key, boolean remove) { public List<String> poll(String key, int tasksNum) {
try{ try{
CuratorFramework zk = getZkClient(); CuratorFramework zk = getZkClient();
String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH; String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
@ -149,55 +144,83 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
if(list != null && list.size() > 0){ if(list != null && list.size() > 0){
String workerIp = OSUtils.getHost();
String workerIpLongStr = String.valueOf(IpUtils.ipToLong(workerIp));
int size = list.size(); int size = list.size();
String formatTargetTask = null;
String targetTaskKey = null; Set<String> taskTreeSet = new TreeSet<>();
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
String taskDetail = list.get(i); String taskDetail = list.get(i);
String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE); String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE);
if(taskDetailArrs.length == 4){ //向前版本兼容
if(taskDetailArrs.length >= 4){
//format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
String formatTask = String.format("%s_%010d_%s_%010d", taskDetailArrs[0], Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2], Long.parseLong(taskDetailArrs[3])); String formatTask = String.format("%s_%010d_%s_%010d", taskDetailArrs[0], Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2], Long.parseLong(taskDetailArrs[3]));
if(i > 0){ if(taskDetailArrs.length > 4){
int result = formatTask.compareTo(formatTargetTask); String taskHosts = taskDetailArrs[4];
if(result < 0){
formatTargetTask = formatTask; //task can assign to any worker host if equals default ip value of worker server
targetTaskKey = taskDetail; if(!taskHosts.equals(String.valueOf(Constants.DEFAULT_WORKER_ID))){
String[] taskHostsArr = taskHosts.split(Constants.COMMA);
if(!Arrays.asList(taskHostsArr).contains(workerIpLongStr)){
continue;
} }
}else{
formatTargetTask = formatTask;
targetTaskKey = taskDetail;
} }
}else{
logger.error("task queue poll error, task detail :{} , please check!", taskDetail);
} }
taskTreeSet.add(formatTask);
} }
if(formatTargetTask != null){ }
String taskIdPath = tasksQueuePath + targetTaskKey;
logger.info("consume task {}", taskIdPath); List<String> taskslist = getTasksListFromTreeSet(tasksNum, taskTreeSet);
String[] vals = targetTaskKey.split(Constants.UNDERLINE); logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(taskslist.toArray()), size - taskslist.size());
if(remove){ return taskslist;
removeNode(key, targetTaskKey);
}
logger.info("consume task: {},there still have {} tasks need to be executed", vals[vals.length - 1], size - 1);
return targetTaskKey;
}else{ }else{
logger.error("should not go here, task queue poll error, please check!"); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("add task to tasks queue exception",e); logger.error("add task to tasks queue exception",e);
} }
return null; return new ArrayList<String>();
}
/**
* get task list from tree set
*
* @param tasksNum
* @param taskTreeSet
*/
public List<String> getTasksListFromTreeSet(int tasksNum, Set<String> taskTreeSet) {
Iterator<String> iterator = taskTreeSet.iterator();
int j = 0;
List<String> taskslist = new ArrayList<>(tasksNum);
while(iterator.hasNext()){
if(j++ < tasksNum){
String task = iterator.next();
String[] taskArray = task.split(Constants.UNDERLINE);
int processInstanceId = Integer.parseInt(taskArray[1]);
int taskId = Integer.parseInt(taskArray[3]);
String destTask = taskArray[0]+Constants.UNDERLINE + processInstanceId + Constants.UNDERLINE
+ taskArray[2] + Constants.UNDERLINE + taskId;
taskslist.add(destTask);
}
}
return taskslist;
} }
@Override @Override
public void removeNode(String key, String nodeValue){ public void removeNode(String key, String nodeValue){

70
escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* http utils
*/
public class IpUtils {
private static final Logger logger = LoggerFactory.getLogger(IpUtils.class);
public static final String DOT = ".";
/**
* ip str to long <p>
*
* @param ipStr ip string
*/
public static Long ipToLong(String ipStr) {
String[] ipSet = ipStr.split("\\" + DOT);
return Long.parseLong(ipSet[0]) << 24 | Long.parseLong(ipSet[1]) << 16 | Long.parseLong(ipSet[2]) << 8 | Long.parseLong(ipSet[3]);
}
/**
* long to ip
* @param ipLong the long number converted from IP
* @return String
*/
public static String longToIp(long ipLong) {
long[] ipNumbers = new long[4];
long tmp = 0xFF;
ipNumbers[0] = ipLong >> 24 & tmp;
ipNumbers[1] = ipLong >> 16 & tmp;
ipNumbers[2] = ipLong >> 8 & tmp;
ipNumbers[3] = ipLong & tmp;
StringBuilder sb = new StringBuilder(16);
sb.append(ipNumbers[0]).append(DOT)
.append(ipNumbers[1]).append(DOT)
.append(ipNumbers[2]).append(DOT)
.append(ipNumbers[3]);
return sb.toString();
}
public static void main(String[] args){
long ipLong = ipToLong("11.3.4.5");
logger.info(longToIp(ipLong));
}
}

4
escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java

@ -312,7 +312,11 @@ public abstract class AbstractZKClient {
childrenList = zkClient.getChildren().forPath(masterZNodeParentPath); childrenList = zkClient.getChildren().forPath(masterZNodeParentPath);
} }
} catch (Exception e) { } catch (Exception e) {
// logger.warn(e.getMessage());
if(!e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){
logger.warn(e.getMessage(),e); logger.warn(e.getMessage(),e);
}
return childrenList.size(); return childrenList.size();
} }
return childrenList.size(); return childrenList.size();

6
escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java

@ -37,6 +37,12 @@ public class OSUtilsTest {
// static HardwareAbstractionLayer hal = si.getHardware(); // static HardwareAbstractionLayer hal = si.getHardware();
@Test
public void getHost(){
logger.info(OSUtils.getHost());
}
@Test @Test
public void memoryUsage() { public void memoryUsage() {
logger.info("memoryUsage : {}", OSUtils.memoryUsage());// 0.3361799418926239 logger.info("memoryUsage : {}", OSUtils.memoryUsage());// 0.3361799418926239

85
escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java

@ -17,12 +17,15 @@
package cn.escheduler.common.queue; package cn.escheduler.common.queue;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.Random; import java.util.Random;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -34,59 +37,62 @@ public class TaskQueueImplTest {
private static final Logger logger = LoggerFactory.getLogger(TaskQueueImplTest.class); private static final Logger logger = LoggerFactory.getLogger(TaskQueueImplTest.class);
ITaskQueue tasksQueue = null;
@Test @Before
public void testTaskQueue(){ public void before(){
tasksQueue = TaskQueueFactory.getTaskQueueInstance();
//clear all data
tasksQueue.delete();
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); }
@After
public void after(){
//clear all data //clear all data
tasksQueue.delete(); tasksQueue.delete();
}
@Test
public void testAdd(){
//add //add
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1"); tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_1_1_2130706433,3232236775");
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"2"); tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"0_1_1_1_2130706433,3232236775");
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"3"); tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_0_1_2130706433,3232236775");
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"4"); tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_2_1_1_2130706433,3232236775");
List<String> tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1);
if(tasks.size() < 0){
return;
}
//pop //pop
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); String node1 = tasks.get(0);
assertEquals(node1,"1");
String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
assertEquals(node2,"2");
//sadd
String task1 = "1.1.1.1-1-mr";
String task2 = "1.1.1.2-2-mr";
String task3 = "1.1.1.3-3-mr";
String task4 = "1.1.1.4-4-mr";
String task5 = "1.1.1.5-5-mr";
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task1);
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task2);
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task3);
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task4);
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5);
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5); //repeat task
Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),5);
logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray()));
//srem
tasksQueue.srem(Constants.SCHEDULER_TASKS_KILL,task5);
//smembers
Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),4);
logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray()));
assertEquals(node1,"0_0000000001_1_0000000001");
tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1);
if(tasks.size() < 0){
return;
}
String node2 = tasks.get(0);
assertEquals(node2,"0_0000000001_1_0000000001");
} }
/** /**
* test one million data from zookeeper queue * test one million data from zookeeper queue
*/ */
@Test @Test
public void extremeTest(){ public void extremeTest(){
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
//clear all data
tasksQueue.delete();
int total = 30 * 10000; int total = 30 * 10000;
for(int i = 0; i < total; i++) for(int i = 0; i < total; i++)
@ -99,14 +105,9 @@ public class TaskQueueImplTest {
} }
} }
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0);
assertEquals(node1,"0"); assertEquals(node1,"0");
//clear all data
tasksQueue.delete();
} }
} }

75
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java

@ -25,6 +25,7 @@ import cn.escheduler.common.queue.ITaskQueue;
import cn.escheduler.common.queue.TaskQueueFactory; import cn.escheduler.common.queue.TaskQueueFactory;
import cn.escheduler.common.task.subprocess.SubProcessParameters; import cn.escheduler.common.task.subprocess.SubProcessParameters;
import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.IpUtils;
import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.common.utils.ParameterUtils;
import cn.escheduler.dao.mapper.*; import cn.escheduler.dao.mapper.*;
@ -117,7 +118,7 @@ public class ProcessDao extends AbstractBaseDao {
*/ */
@Override @Override
protected void init() { protected void init() {
userMapper=getMapper(UserMapper.class); userMapper = getMapper(UserMapper.class);
processDefineMapper = getMapper(ProcessDefinitionMapper.class); processDefineMapper = getMapper(ProcessDefinitionMapper.class);
processInstanceMapper = getMapper(ProcessInstanceMapper.class); processInstanceMapper = getMapper(ProcessInstanceMapper.class);
dataSourceMapper = getMapper(DataSourceMapper.class); dataSourceMapper = getMapper(DataSourceMapper.class);
@ -1015,11 +1016,58 @@ public class ProcessDao extends AbstractBaseDao {
* *
* 流程实例优先级_流程实例id_任务优先级_任务id high <- low * 流程实例优先级_流程实例id_任务优先级_任务id high <- low
* *
* @param task * @param taskInstance
* @return * @return
*/ */
private String taskZkInfo(TaskInstance task) { private String taskZkInfo(TaskInstance taskInstance) {
return String.valueOf(task.getProcessInstancePriority().ordinal()) + Constants.UNDERLINE + task.getProcessInstanceId() + Constants.UNDERLINE + task.getTaskInstancePriority().ordinal() + Constants.UNDERLINE + task.getId();
int taskWorkerGroupId = getTaskWorkerGroupId(taskInstance);
StringBuilder sb = new StringBuilder(100);
sb.append(taskInstance.getProcessInstancePriority().ordinal()).append(Constants.UNDERLINE)
.append(taskInstance.getProcessInstanceId()).append(Constants.UNDERLINE)
.append(taskInstance.getTaskInstancePriority().ordinal()).append(Constants.UNDERLINE)
.append(taskInstance.getId()).append(Constants.UNDERLINE);
if(taskWorkerGroupId > 0){
//not to find data from db
WorkerGroup workerGroup = queryWorkerGroupById(taskWorkerGroupId);
if(workerGroup == null ){
logger.info("task {} cannot find the worker group, use all worker instead.", taskInstance.getId());
sb.append(Constants.DEFAULT_WORKER_ID);
return sb.toString();
}
String ips = workerGroup.getIpList();
if(StringUtils.isBlank(ips)){
logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers",
taskInstance.getId(), workerGroup.getId());
sb.append(Constants.DEFAULT_WORKER_ID);
return sb.toString();
}
StringBuilder ipSb = new StringBuilder(100);
String[] ipArray = ips.split(COMMA);
for (String ip : ipArray) {
long ipLong = IpUtils.ipToLong(ip);
ipSb.append(ipLong).append(COMMA);
}
if(ipSb.length() > 0) {
ipSb.deleteCharAt(ipSb.length() - 1);
}
sb.append(ipSb);
}else{
sb.append(Constants.DEFAULT_WORKER_ID);
}
return sb.toString();
} }
/** /**
@ -1683,5 +1731,24 @@ public class ProcessDao extends AbstractBaseDao {
} }
/**
* get task worker group id
*
* @param taskInstance
* @return
*/
public int getTaskWorkerGroupId(TaskInstance taskInstance) {
int taskWorkerGroupId = taskInstance.getWorkerGroupId();
ProcessInstance processInstance = findProcessInstanceByTaskId(taskInstance.getId());
if(processInstance == null){
logger.error("cannot find the task:{} process instance", taskInstance.getId());
return Constants.DEFAULT_WORKER_ID;
}
int processWorkerGroupId = processInstance.getWorkerGroupId();
taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId);
return taskWorkerGroupId;
}
} }

39
escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java

@ -194,6 +194,21 @@ public class ProcessInstance {
*/ */
private int tenantId; private int tenantId;
/**
* worker group name. for api.
*/
private String workerGroupName;
/**
* receivers for api
*/
private String receivers;
/**
* receivers cc for api
*/
private String receiversCc;
public ProcessInstance(){ public ProcessInstance(){
} }
@ -560,4 +575,28 @@ public class ProcessInstance {
public int getTenantId() { public int getTenantId() {
return this.tenantId ; return this.tenantId ;
} }
public String getWorkerGroupName() {
return workerGroupName;
}
public void setWorkerGroupName(String workerGroupName) {
this.workerGroupName = workerGroupName;
}
public String getReceivers() {
return receivers;
}
public void setReceivers(String receivers) {
this.receivers = receivers;
}
public String getReceiversCc() {
return receiversCc;
}
public void setReceiversCc(String receiversCc) {
this.receiversCc = receiversCc;
}
} }

2
escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java

@ -216,7 +216,7 @@ public class MasterServer implements CommandLineRunner, IStoppable {
if(Stopper.isRunning()) { if(Stopper.isRunning()) {
// send heartbeat to zk // send heartbeat to zk
if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) { if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) {
logger.error("master send heartbeat to zk failed"); logger.error("master send heartbeat to zk failed: can't find zookeeper regist path of master server");
return; return;
} }

66
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java

@ -25,8 +25,9 @@ import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.model.*; import cn.escheduler.dao.model.*;
import cn.escheduler.server.zk.ZKWorkerClient; import cn.escheduler.server.zk.ZKWorkerClient;
import com.cronutils.utils.StringUtils;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -98,15 +99,7 @@ public class FetchTaskThread implements Runnable{
*/ */
private boolean checkWorkerGroup(TaskInstance taskInstance, String host){ private boolean checkWorkerGroup(TaskInstance taskInstance, String host){
int taskWorkerGroupId = taskInstance.getWorkerGroupId(); int taskWorkerGroupId = processDao.getTaskWorkerGroupId(taskInstance);
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId());
if(processInstance == null){
logger.error("cannot find the task:{} process instance", taskInstance.getId());
return false;
}
int processWorkerGroupId = processInstance.getWorkerGroupId();
taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId);
if(taskWorkerGroupId <= 0){ if(taskWorkerGroupId <= 0){
return true; return true;
@ -117,44 +110,48 @@ public class FetchTaskThread implements Runnable{
return true; return true;
} }
String ips = workerGroup.getIpList(); String ips = workerGroup.getIpList();
if(ips == null){ if(StringUtils.isBlank(ips)){
logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers", logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers",
taskInstance.getId(), workerGroup.getId()); taskInstance.getId(), workerGroup.getId());
} }
String[] ipArray = ips.split(","); String[] ipArray = ips.split(Constants.COMMA);
List<String> ipList = Arrays.asList(ipArray); List<String> ipList = Arrays.asList(ipArray);
return ipList.contains(host); return ipList.contains(host);
} }
@Override @Override
public void run() { public void run() {
while (Stopper.isRunning()){ while (Stopper.isRunning()){
InterProcessMutex mutex = null; InterProcessMutex mutex = null;
try { try {
if(OSUtils.checkResource(this.conf, false)) {
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
//check memory and cpu usage and threads
if(OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor)) {
//whether have tasks, if no tasks , no need lock //get all tasks
List<String> tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE);
if(tasksQueueList.size() > 0){
// creating distributed locks, lock path /escheduler/lock/worker // creating distributed locks, lock path /escheduler/lock/worker
String zNodeLockPath = zkWorkerClient.getWorkerLockPath(); String zNodeLockPath = zkWorkerClient.getWorkerLockPath();
mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath); mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath);
mutex.acquire(); mutex.acquire();
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService; // task instance id str
List<String> taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum);
for (int i = 0; i < taskNum; i++) { for(String taskQueueStr : taskQueueStrArr){
if (StringUtils.isNotBlank(taskQueueStr )) {
int activeCount = poolExecutor.getActiveCount(); if (!checkThreadCount(poolExecutor)) {
if (activeCount >= workerExecNums) { break;
logger.info("thread insufficient , activeCount : {} , workerExecNums : {}",activeCount,workerExecNums);
continue;
} }
// task instance id str
String taskQueueStr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
if (!StringUtils.isEmpty(taskQueueStr )) {
String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE); String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE);
String taskInstIdStr = taskStringArray[taskStringArray.length - 1]; String taskInstIdStr = taskStringArray[taskStringArray.length - 1];
Date now = new Date(); Date now = new Date();
@ -177,6 +174,7 @@ public class FetchTaskThread implements Runnable{
logger.error("task instance is null. task id : {} ", taskId); logger.error("task instance is null. task id : {} ", taskId);
continue; continue;
} }
if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){ if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
continue; continue;
} }
@ -191,7 +189,6 @@ public class FetchTaskThread implements Runnable{
// get process instance // get process instance
ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
// get process define // get process define
ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId()); ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
@ -223,16 +220,17 @@ public class FetchTaskThread implements Runnable{
workerExecService.submit(new TaskScheduleThread(taskInstance, processDao)); workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
} }
}
} }
} }
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}catch (Exception e){ }catch (Exception e){
logger.error("fetch task thread exception : " + e.getMessage(),e); logger.error("fetch task thread exception : " + e.getMessage(),e);
} }finally {
finally {
if (mutex != null){ if (mutex != null){
try { try {
mutex.release(); mutex.release();
@ -247,4 +245,18 @@ public class FetchTaskThread implements Runnable{
} }
} }
} }
/**
*
* @param poolExecutor
* @return
*/
private boolean checkThreadCount(ThreadPoolExecutor poolExecutor) {
int activeCount = poolExecutor.getActiveCount();
if (activeCount >= workerExecNums) {
logger.info("thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource", activeCount, workerExecNums, Constants.SLEEP_TIME_MILLIS);
return false;
}
return true;
}
} }

2
escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java

@ -387,7 +387,7 @@ public class SqlTask extends AbstractTask {
String showTypeName = sqlParameters.getShowType().replace(Constants.COMMA,"").trim(); String showTypeName = sqlParameters.getShowType().replace(Constants.COMMA,"").trim();
if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){ if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){
Map<String, Object> mailResult = MailUtils.sendMails(receviersList, receviersCcList, title, content, ShowType.valueOf(showTypeName)); Map<String, Object> mailResult = MailUtils.sendMails(receviersList, receviersCcList, title, content, ShowType.valueOf(showTypeName));
if(!(Boolean) mailResult.get(Constants.STATUS)){ if(!(Boolean) mailResult.get(cn.escheduler.common.Constants.STATUS)){
throw new RuntimeException("send mail failed!"); throw new RuntimeException("send mail failed!");
} }
}else{ }else{

2
escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue

@ -6,7 +6,7 @@
<div class="row-title"> <div class="row-title">
<div class="left"> <div class="left">
<span class="sp">IP: {{item.host}}</span> <span class="sp">IP: {{item.host}}</span>
<span class="sp">{{$t('Port')}}: {{item.port}}</span> <span class="sp">{{$t('Process Pid')}}: {{item.port}}</span>
<span class="sp">{{$t('Zk registration directory')}}: {{item.zkDirectory}}</span> <span class="sp">{{$t('Zk registration directory')}}: {{item.zkDirectory}}</span>
</div> </div>
<div class="right"> <div class="right">

2
escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue

@ -6,7 +6,7 @@
<div class="row-title"> <div class="row-title">
<div class="left"> <div class="left">
<span class="sp">IP: {{item.host}}</span> <span class="sp">IP: {{item.host}}</span>
<span class="sp">{{$t('Port')}}: {{item.port}}</span> <span class="sp">{{$t('Process Pid')}}: {{item.port}}</span>
<span class="sp">{{$t('Zk registration directory')}}: {{item.zkDirectory}}</span> <span class="sp">{{$t('Zk registration directory')}}: {{item.zkDirectory}}</span>
</div> </div>
<div class="right"> <div class="right">

2
escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js

@ -37,7 +37,7 @@ let warningTypeList = [
] ]
const isEmial = (val) => { const isEmial = (val) => {
let regEmail = /^([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line let regEmail = /^([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line
return regEmail.test(val) return regEmail.test(val)
} }

58
escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue

@ -95,17 +95,17 @@
shape="circle" shape="circle"
size="xsmall" size="xsmall"
data-toggle="tooltip" data-toggle="tooltip"
:title="$t('Stop')" :title="item.state === 'STOP' ? $t('Recovery Suspend') : $t('Stop')"
@click="_stop(item)" @click="_stop(item,$index)"
icon="iconfont icon-zanting1" :icon="item.state === 'STOP' ? 'iconfont icon-ai06' : 'iconfont icon-zanting'"
:disabled="item.state !== 'RUNNING_EXEUTION'"></x-button> :disabled="item.state !== 'RUNNING_EXEUTION' && item.state != 'STOP'"></x-button>
<x-button type="warning" <x-button type="warning"
shape="circle" shape="circle"
size="xsmall" size="xsmall"
data-toggle="tooltip" data-toggle="tooltip"
:title="item.state === 'PAUSE' ? $t('Recovery Suspend') : $t('Pause')" :title="item.state === 'PAUSE' ? $t('Recovery Suspend') : $t('Pause')"
@click="_suspend(item,$index)" @click="_suspend(item,$index)"
:icon="item.state === 'PAUSE' ? 'iconfont icon-ai06' : 'iconfont icon-zanting'" :icon="item.state === 'PAUSE' ? 'iconfont icon-ai06' : 'iconfont icon-zanting1'"
:disabled="item.state !== 'RUNNING_EXEUTION' && item.state !== 'PAUSE'"></x-button> :disabled="item.state !== 'RUNNING_EXEUTION' && item.state !== 'PAUSE'"></x-button>
<x-poptip <x-poptip
:ref="'poptip-delete-' + $index" :ref="'poptip-delete-' + $index"
@ -155,7 +155,7 @@
shape="circle" shape="circle"
size="xsmall" size="xsmall"
disabled="true"> disabled="true">
{{item.count}}s {{item.count}}
</x-button> </x-button>
<x-button <x-button
v-show="buttonType !== 'run'" v-show="buttonType !== 'run'"
@ -173,7 +173,7 @@
shape="circle" shape="circle"
size="xsmall" size="xsmall"
disabled="true"> disabled="true">
{{item.count}}s {{item.count}}
</x-button> </x-button>
<x-button <x-button
v-show="buttonType !== 'store'" v-show="buttonType !== 'store'"
@ -185,26 +185,26 @@
</x-button> </x-button>
<!--Stop--> <!--Stop-->
<x-button <!--<x-button-->
type="error" <!--type="error"-->
shape="circle" <!--shape="circle"-->
size="xsmall" <!--size="xsmall"-->
icon="iconfont icon-zanting1" <!--icon="iconfont icon-zanting1"-->
disabled="true"> <!--disabled="true">-->
</x-button> <!--</x-button>-->
<!--倒计时 => Recovery Suspend/Pause--> <!--倒计时 => Recovery Suspend/Pause-->
<x-button <x-button
v-show="item.state === 'PAUSE' && buttonType === 'suspend'" v-show="(item.state === 'PAUSE' || item.state == 'STOP') && buttonType === 'suspend'"
type="warning" type="warning"
shape="circle" shape="circle"
size="xsmall" size="xsmall"
disabled="true"> disabled="true">
{{item.count}}s {{item.count}}
</x-button> </x-button>
<!--Recovery Suspend--> <!--Recovery Suspend-->
<x-button <x-button
v-show="item.state === 'PAUSE' && buttonType !== 'suspend'" v-show="(item.state === 'PAUSE' || item.state == 'STOP') && buttonType !== 'suspend'"
type="warning" type="warning"
shape="circle" shape="circle"
size="xsmall" size="xsmall"
@ -217,6 +217,15 @@
type="warning" type="warning"
shape="circle" shape="circle"
size="xsmall" size="xsmall"
icon="iconfont icon-zanting1"
disabled="true">
</x-button>
<!--Stop-->
<x-button
v-show="item.state !== 'STOP'"
type="warning"
shape="circle"
size="xsmall"
icon="iconfont icon-zanting" icon="iconfont icon-zanting"
disabled="true"> disabled="true">
</x-button> </x-button>
@ -362,11 +371,20 @@
* stop * stop
* @param STOP * @param STOP
*/ */
_stop (item) { _stop (item, index) {
if(item.state == 'STOP') {
this._countDownFn({
id: item.id,
executeType: 'RECOVER_SUSPENDED_PROCESS',
index: index,
buttonType: 'suspend'
})
} else {
this._upExecutorsState({ this._upExecutorsState({
processInstanceId: item.id, processInstanceId: item.id,
executeType: 'STOP' executeType: 'STOP'
}) })
}
}, },
/** /**
* pause * pause
@ -383,7 +401,7 @@
} else { } else {
this._upExecutorsState({ this._upExecutorsState({
processInstanceId: item.id, processInstanceId: item.id,
executeType: item.state === 'PAUSE' ? 'RECOVER_SUSPENDED_PROCESS' : 'PAUSE' executeType: 'PAUSE'
}) })
} }
}, },
@ -435,7 +453,7 @@
if (data.length) { if (data.length) {
_.map(data, v => { _.map(data, v => {
v.disabled = true v.disabled = true
v.count = 10 v.count = 9
}) })
} }
return data return data

2
escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue

@ -131,7 +131,7 @@
} }
}, },
_verification () { _verification () {
let regEmail = /^([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line let regEmail = /^([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line
// Mobile phone number regular // Mobile phone number regular
let regPhone = /^1(3|4|5|6|7|8)\d{9}$/; // eslint-disable-line let regPhone = /^1(3|4|5|6|7|8)\d{9}$/; // eslint-disable-line

4
escheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -237,7 +237,7 @@ export default {
'Recovery Failed': '恢复失败', 'Recovery Failed': '恢复失败',
'Stop': '停止', 'Stop': '停止',
'Pause': '暂停', 'Pause': '暂停',
'Recovery Suspend': '恢复暂停', 'Recovery Suspend': '恢复运行',
'Gantt': '甘特图', 'Gantt': '甘特图',
'Name': '名称', 'Name': '名称',
'Node Type': '节点类型', 'Node Type': '节点类型',
@ -282,7 +282,7 @@ export default {
'Start Process': '启动工作流', 'Start Process': '启动工作流',
'Execute from the current node': '从当前节点开始执行', 'Execute from the current node': '从当前节点开始执行',
'Recover tolerance fault process': '恢复被容错的工作流', 'Recover tolerance fault process': '恢复被容错的工作流',
'Resume the suspension process': '恢复暂停流程', 'Resume the suspension process': '恢复运行流程',
'Execute from the failed nodes': '从失败节点开始执行', 'Execute from the failed nodes': '从失败节点开始执行',
'Complement Data': '补数', 'Complement Data': '补数',
'Scheduling execution': '调度执行', 'Scheduling execution': '调度执行',

18
install.sh

@ -106,6 +106,18 @@ sslEnable="true"
# 下载Excel路径 # 下载Excel路径
xlsFilePath="/tmp/xls" xlsFilePath="/tmp/xls"
# 企业微信企业ID配置
enterpriseWechatCorpId="xxxxxxxxxx"
# 企业微信应用Secret配置
enterpriseWechatSecret="xxxxxxxxxx"
# 企业微信应用AgentId配置
enterpriseWechatAgentId="xxxxxxxxxx"
# 企业微信用户配置,多个用户以,分割
enterpriseWechatUsers="xxxxx,xxxxx"
#是否启动监控自启动脚本 #是否启动监控自启动脚本
monitorServerState="false" monitorServerState="false"
@ -318,7 +330,7 @@ sed -i ${txt} "s#master.exec.task.number.*#master.exec.task.number=${masterExecT
sed -i ${txt} "s#master.heartbeat.interval.*#master.heartbeat.interval=${masterHeartbeatInterval}#g" conf/master.properties sed -i ${txt} "s#master.heartbeat.interval.*#master.heartbeat.interval=${masterHeartbeatInterval}#g" conf/master.properties
sed -i ${txt} "s#master.task.commit.retryTimes.*#master.task.commit.retryTimes=${masterTaskCommitRetryTimes}#g" conf/master.properties sed -i ${txt} "s#master.task.commit.retryTimes.*#master.task.commit.retryTimes=${masterTaskCommitRetryTimes}#g" conf/master.properties
sed -i ${txt} "s#master.task.commit.interval.*#master.task.commit.interval=${masterTaskCommitInterval}#g" conf/master.properties sed -i ${txt} "s#master.task.commit.interval.*#master.task.commit.interval=${masterTaskCommitInterval}#g" conf/master.properties
sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties #sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties
sed -i ${txt} "s#master.reserved.memory.*#master.reserved.memory=${masterReservedMemory}#g" conf/master.properties sed -i ${txt} "s#master.reserved.memory.*#master.reserved.memory=${masterReservedMemory}#g" conf/master.properties
@ -345,6 +357,10 @@ sed -i ${txt} "s#mail.passwd.*#mail.passwd=${mailPassword}#g" conf/alert.propert
sed -i ${txt} "s#mail.smtp.starttls.enable.*#mail.smtp.starttls.enable=${starttlsEnable}#g" conf/alert.properties sed -i ${txt} "s#mail.smtp.starttls.enable.*#mail.smtp.starttls.enable=${starttlsEnable}#g" conf/alert.properties
sed -i ${txt} "s#mail.smtp.ssl.enable.*#mail.smtp.ssl.enable=${sslEnable}#g" conf/alert.properties sed -i ${txt} "s#mail.smtp.ssl.enable.*#mail.smtp.ssl.enable=${sslEnable}#g" conf/alert.properties
sed -i ${txt} "s#xls.file.path.*#xls.file.path=${xlsFilePath}#g" conf/alert.properties sed -i ${txt} "s#xls.file.path.*#xls.file.path=${xlsFilePath}#g" conf/alert.properties
sed -i ${txt} "s#enterprise.wechat.corp.id.*#enterprise.wechat.corp.id=${enterpriseWechatCorpId}#g" conf/alert.properties
sed -i ${txt} "s#enterprise.wechat.secret.*#enterprise.wechat.secret=${enterpriseWechatSecret}#g" conf/alert.properties
sed -i ${txt} "s#enterprise.wechat.agent.id.*#enterprise.wechat.agent.id=${enterpriseWechatAgentId}#g" conf/alert.properties
sed -i ${txt} "s#enterprise.wechat.users.*#enterprise.wechat.users=${enterpriseWechatUsers}#g" conf/alert.properties
sed -i ${txt} "s#installPath.*#installPath=${installPath}#g" conf/config/install_config.conf sed -i ${txt} "s#installPath.*#installPath=${installPath}#g" conf/config/install_config.conf

Loading…
Cancel
Save