Browse Source

Merge remote-tracking branch 'upstream/dev-1.1.0' into dev-1.1.0

pull/2/head
lenboo 6 years ago
parent
commit
85bd3a218c
  1. 2
      README.md
  2. 6
      docs/zh_CN/系统架构设计.md
  3. 57
      escheduler-alert/src/main/java/cn/escheduler/alert/manager/EnterpriseWeChatManager.java
  4. 9
      escheduler-alert/src/main/java/cn/escheduler/alert/runner/AlertSender.java
  5. 20
      escheduler-alert/src/main/java/cn/escheduler/alert/utils/Constants.java
  6. 248
      escheduler-alert/src/main/java/cn/escheduler/alert/utils/EnterpriseWeChatUtils.java
  7. 34
      escheduler-alert/src/main/java/cn/escheduler/alert/utils/FuncUtils.java
  8. 9
      escheduler-alert/src/main/resources/alert.properties
  9. 117
      escheduler-alert/src/test/java/cn/escheduler/alert/utils/EnterpriseWeChatUtilsTest.java
  10. 3
      escheduler-api/src/main/java/cn/escheduler/api/controller/DataSourceController.java
  11. 51
      escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java
  12. 14
      escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java
  13. 12
      escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java
  14. 12
      escheduler-api/src/main/java/cn/escheduler/api/utils/CheckUtils.java
  15. 12
      escheduler-api/src/main/resources/i18n/messages.properties
  16. 12
      escheduler-api/src/main/resources/i18n/messages_en_US.properties
  17. 10
      escheduler-api/src/main/resources/i18n/messages_zh_CN.properties
  18. 42
      escheduler-api/src/main/resources/logback.xml
  19. 10
      escheduler-common/src/main/java/cn/escheduler/common/Constants.java
  20. 9
      escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java
  21. 2
      escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java
  22. 103
      escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java
  23. 11
      escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java
  24. 70
      escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java
  25. 6
      escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
  26. 4
      escheduler-common/src/main/resources/common/common.properties
  27. 6
      escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java
  28. 85
      escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java
  29. 75
      escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
  30. 13
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapper.java
  31. 19
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapperProvider.java
  32. 2
      escheduler-server/pom.xml
  33. 2
      escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java
  34. 161
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
  35. 19
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java
  36. 18
      escheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue
  37. 2
      escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue
  38. 116
      escheduler-ui/src/js/conf/home/pages/dag/_source/startingParam/index.vue
  39. 4
      escheduler-ui/src/js/conf/home/pages/dag/instanceDetails.vue
  40. 4
      escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue
  41. 4
      escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue
  42. 4
      escheduler-ui/src/js/conf/home/pages/projects/pages/_source/instanceConditions/index.vue
  43. 8
      escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/email.vue
  44. 43
      escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue
  45. 2
      escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js
  46. 2
      escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue
  47. 17
      escheduler-ui/src/js/conf/home/store/dag/actions.js
  48. 5
      escheduler-ui/src/js/conf/home/store/dag/state.js
  49. 7
      escheduler-ui/src/js/module/i18n/locale/en_US.js
  50. 6
      escheduler-ui/src/js/module/i18n/locale/zh_CN.js
  51. 18
      install.sh

2
README.md

@ -50,7 +50,7 @@ Easy Scheduler
### 近期研发计划
EasyScheduler的工作计划:<a href="https://github.com/analysys/EasyScheduler/projects/1" target="_blank">研发计划</a> ,其中 In Develop卡片下是1.0.2版本的功能,TODO卡片是待做事项(包括 feature ideas)
EasyScheduler的工作计划:<a href="https://github.com/analysys/EasyScheduler/projects/1" target="_blank">研发计划</a> ,其中 In Develop卡片下是1.1.0版本的功能,TODO卡片是待做事项(包括 feature ideas)
### 贡献代码

6
docs/zh_CN/系统架构设计.md

@ -13,13 +13,13 @@
**流程定义**:通过拖拽任务节点并建立任务节点的关联所形成的可视化**DAG**
**流程实例**:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成
**流程实例**:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成,流程定义每运行一次,产生一个流程实例
**任务实例**:任务实例是流程定义中任务节点的实例化,标识着具体的任务执行状态
**任务类型**: 目前支持有SHELL、SQL、SUB_PROCESS、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT,同时计划支持动态插件扩展,注意:其中子 **SUB_PROCESS** 也是一个单独的流程定义,是可以单独启动执行的
**任务类型**: 目前支持有SHELL、SQL、SUB_PROCESS(子流程)、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT(依赖),同时计划支持动态插件扩展,注意:其中子 **SUB_PROCESS** 也是一个单独的流程定义,是可以单独启动执行的
**调度方式:** 系统支持基于cron表达式的定时调度和手动调度。命令类型支持:启动工作流、从当前节点开始执行、恢复被容错的工作流、恢复暂停流程、从失败节点开始执行、补数、调度、重跑、暂停、停止、恢复等待线程。其中 **恢复被容错的工作流****恢复等待线程** 两种命令类型是由调度内部控制使用,外部无法调用
**调度方式:** 系统支持基于cron表达式的定时调度和手动调度。命令类型支持:启动工作流、从当前节点开始执行、恢复被容错的工作流、恢复暂停流程、从失败节点开始执行、补数、定时、重跑、暂停、停止、恢复等待线程。其中 **恢复被容错的工作流****恢复等待线程** 两种命令类型是由调度内部控制使用,外部无法调用
**定时调度**:系统采用 **quartz** 分布式调度器,并同时支持cron表达式可视化的生成

57
escheduler-alert/src/main/java/cn/escheduler/alert/manager/EnterpriseWeChatManager.java

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.alert.manager;
import cn.escheduler.alert.utils.Constants;
import cn.escheduler.alert.utils.EnterpriseWeChatUtils;
import cn.escheduler.dao.model.Alert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Enterprise WeChat Manager
*/
public class EnterpriseWeChatManager {
private static final Logger logger = LoggerFactory.getLogger(MsgManager.class);
/**
* Enterprise We Chat send
* @param alert
*/
public Map<String,Object> send(Alert alert, String token){
Map<String,Object> retMap = new HashMap<>();
retMap.put(Constants.STATUS, false);
String agentId = EnterpriseWeChatUtils.enterpriseWeChatAgentId;
String users = EnterpriseWeChatUtils.enterpriseWeChatUsers;
List<String> userList = Arrays.asList(users.split(","));
logger.info("send message {}",alert);
String msg = EnterpriseWeChatUtils.makeUserSendMsg(userList, agentId,EnterpriseWeChatUtils.markdownByAlert(alert));
try {
EnterpriseWeChatUtils.sendEnterpriseWeChat(Constants.UTF_8, msg, token);
} catch (IOException e) {
logger.error(e.getMessage(),e);
}
retMap.put(Constants.STATUS, true);
return retMap;
}
}

9
escheduler-alert/src/main/java/cn/escheduler/alert/runner/AlertSender.java

@ -17,7 +17,9 @@
package cn.escheduler.alert.runner;
import cn.escheduler.alert.manager.EmailManager;
import cn.escheduler.alert.manager.EnterpriseWeChatManager;
import cn.escheduler.alert.utils.Constants;
import cn.escheduler.alert.utils.EnterpriseWeChatUtils;
import cn.escheduler.common.enums.AlertStatus;
import cn.escheduler.common.enums.AlertType;
import cn.escheduler.dao.AlertDao;
@ -40,6 +42,7 @@ public class AlertSender{
private static final Logger logger = LoggerFactory.getLogger(AlertSender.class);
private static final EmailManager emailManager= new EmailManager();
private static final EnterpriseWeChatManager weChatManager= new EnterpriseWeChatManager();
private List<Alert> alertList;
@ -109,6 +112,12 @@ public class AlertSender{
if (flag){
alertDao.updateAlert(AlertStatus.EXECUTION_SUCCESS, "execution success", alert.getId());
logger.info("alert send success");
try {
String token = EnterpriseWeChatUtils.getToken();
weChatManager.send(alert,token);
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
}else {
alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE,String.valueOf(retMaps.get(Constants.MESSAGE)),alert.getId());
logger.info("alert send error : {}" , String.valueOf(retMaps.get(Constants.MESSAGE)));

20
escheduler-alert/src/main/java/cn/escheduler/alert/utils/Constants.java

@ -128,4 +128,24 @@ public class Constants {
public static final String TH_END = "</th>";
public static final int ALERT_SCAN_INTERVEL = 5000;
public static final String MARKDOWN_QUOTE = ">";
public static final String MARKDOWN_ENTER = "\n";
public static final String ENTERPRISE_WECHAT_CORP_ID = "enterprise.wechat.corp.id";
public static final String ENTERPRISE_WECHAT_SECRET = "enterprise.wechat.secret";
public static final String ENTERPRISE_WECHAT_TOKEN_URL = "enterprise.wechat.token.url";
public static final String ENTERPRISE_WECHAT_PUSH_URL = "enterprise.wechat.push.url";
public static final String ENTERPRISE_WECHAT_TEAM_SEND_MSG = "enterprise.wechat.team.send.msg";
public static final String ENTERPRISE_WECHAT_USER_SEND_MSG = "enterprise.wechat.user.send.msg";
public static final String ENTERPRISE_WECHAT_AGENT_ID = "enterprise.wechat.agent.id";
public static final String ENTERPRISE_WECHAT_USERS = "enterprise.wechat.users";
}

248
escheduler-alert/src/main/java/cn/escheduler/alert/utils/EnterpriseWeChatUtils.java

@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.alert.utils;
import cn.escheduler.common.enums.ShowType;
import cn.escheduler.dao.model.Alert;
import com.alibaba.fastjson.JSON;
import com.google.common.reflect.TypeToken;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import static cn.escheduler.alert.utils.PropertyUtils.getString;
/**
* Enterprise WeChat utils
*/
public class EnterpriseWeChatUtils {
public static final Logger logger = LoggerFactory.getLogger(EnterpriseWeChatUtils.class);
private static final String enterpriseWeChatCorpId = getString(Constants.ENTERPRISE_WECHAT_CORP_ID);
private static final String enterpriseWeChatSecret = getString(Constants.ENTERPRISE_WECHAT_SECRET);
private static final String enterpriseWeChatTokenUrl = getString(Constants.ENTERPRISE_WECHAT_TOKEN_URL);
private static String enterpriseWeChatTokenUrlReplace = enterpriseWeChatTokenUrl
.replaceAll("\\$corpId", enterpriseWeChatCorpId)
.replaceAll("\\$secret", enterpriseWeChatSecret);
private static final String enterpriseWeChatPushUrl = getString(Constants.ENTERPRISE_WECHAT_PUSH_URL);
private static final String enterpriseWeChatTeamSendMsg = getString(Constants.ENTERPRISE_WECHAT_TEAM_SEND_MSG);
private static final String enterpriseWeChatUserSendMsg = getString(Constants.ENTERPRISE_WECHAT_USER_SEND_MSG);
public static final String enterpriseWeChatAgentId = getString(Constants.ENTERPRISE_WECHAT_AGENT_ID);
public static final String enterpriseWeChatUsers = getString(Constants.ENTERPRISE_WECHAT_USERS);
/**
* get Enterprise WeChat token info
* @return token string info
* @throws IOException
*/
public static String getToken() throws IOException {
String resp;
CloseableHttpClient httpClient = HttpClients.createDefault();
HttpGet httpGet = new HttpGet(enterpriseWeChatTokenUrlReplace);
CloseableHttpResponse response = httpClient.execute(httpGet);
try {
HttpEntity entity = response.getEntity();
resp = EntityUtils.toString(entity, Constants.UTF_8);
EntityUtils.consume(entity);
} finally {
response.close();
}
Map<String, Object> map = JSON.parseObject(resp,
new TypeToken<Map<String, Object>>() {
}.getType());
return map.get("access_token").toString();
}
/**
* make team single Enterprise WeChat message
* @param toParty
* @param agentId
* @param msg
* @return Enterprise WeChat send message
*/
public static String makeTeamSendMsg(String toParty, String agentId, String msg) {
return enterpriseWeChatTeamSendMsg.replaceAll("\\$toParty", toParty)
.replaceAll("\\$agentId", agentId)
.replaceAll("\\$msg", msg);
}
/**
* make team multi Enterprise WeChat message
* @param toParty
* @param agentId
* @param msg
* @return Enterprise WeChat send message
*/
public static String makeTeamSendMsg(Collection<String> toParty, String agentId, String msg) {
String listParty = FuncUtils.mkString(toParty, "|");
return enterpriseWeChatTeamSendMsg.replaceAll("\\$toParty", listParty)
.replaceAll("\\$agentId", agentId)
.replaceAll("\\$msg", msg);
}
/**
* make team single user message
* @param toUser
* @param agentId
* @param msg
* @return Enterprise WeChat send message
*/
public static String makeUserSendMsg(String toUser, String agentId, String msg) {
return enterpriseWeChatUserSendMsg.replaceAll("\\$toUser", toUser)
.replaceAll("\\$agentId", agentId)
.replaceAll("\\$msg", msg);
}
/**
* make team multi user message
* @param toUser
* @param agentId
* @param msg
* @return Enterprise WeChat send message
*/
public static String makeUserSendMsg(Collection<String> toUser, String agentId, String msg) {
String listUser = FuncUtils.mkString(toUser, "|");
return enterpriseWeChatUserSendMsg.replaceAll("\\$toUser", listUser)
.replaceAll("\\$agentId", agentId)
.replaceAll("\\$msg", msg);
}
/**
* send Enterprise WeChat
* @param charset
* @param data
* @param token
* @return Enterprise WeChat resp, demo: {"errcode":0,"errmsg":"ok","invaliduser":""}
* @throws IOException
*/
public static String sendEnterpriseWeChat(String charset, String data, String token) throws IOException {
String enterpriseWeChatPushUrlReplace = enterpriseWeChatPushUrl.replaceAll("\\$token", token);
CloseableHttpClient httpclient = HttpClients.createDefault();
HttpPost httpPost = new HttpPost(enterpriseWeChatPushUrlReplace);
httpPost.setEntity(new StringEntity(data, charset));
CloseableHttpResponse response = httpclient.execute(httpPost);
String resp;
try {
HttpEntity entity = response.getEntity();
resp = EntityUtils.toString(entity, charset);
EntityUtils.consume(entity);
} finally {
response.close();
}
logger.info("Enterprise WeChat send [{}], param:{}, resp:{}", enterpriseWeChatPushUrl, data, resp);
return resp;
}
/**
* convert table to markdown style
* @param title
* @param content
* @return
*/
public static String markdownTable(String title,String content){
List<LinkedHashMap> mapItemsList = JSONUtils.toList(content, LinkedHashMap.class);
StringBuilder contents = new StringBuilder(200);
for (LinkedHashMap mapItems : mapItemsList){
Set<Map.Entry<String, String>> entries = mapItems.entrySet();
Iterator<Map.Entry<String, String>> iterator = entries.iterator();
StringBuilder t = new StringBuilder(String.format("`%s`%s",title,Constants.MARKDOWN_ENTER));
while (iterator.hasNext()){
Map.Entry<String, String> entry = iterator.next();
t.append(Constants.MARKDOWN_QUOTE);
t.append(entry.getKey()).append(":").append(entry.getValue());
t.append(Constants.MARKDOWN_ENTER);
}
contents.append(t);
}
return contents.toString();
}
/**
* convert text to markdown style
* @param title
* @param content
* @return
*/
public static String markdownText(String title,String content){
if (StringUtils.isNotEmpty(content)){
List<String> list;
try {
list = JSONUtils.toList(content,String.class);
}catch (Exception e){
logger.error("json format exception",e);
return null;
}
StringBuilder contents = new StringBuilder(100);
contents.append(String.format("`%s`\n",title));
for (String str : list){
contents.append(Constants.MARKDOWN_QUOTE);
contents.append(str);
contents.append(Constants.MARKDOWN_ENTER);
}
return contents.toString();
}
return null;
}
/**
* Determine the mardown style based on the show type of the alert
* @param alert
* @return
*/
public static String markdownByAlert(Alert alert){
String result = "";
if (alert.getShowType() == ShowType.TABLE) {
result = markdownTable(alert.getTitle(),alert.getContent());
}else if(alert.getShowType() == ShowType.TEXT){
result = markdownText(alert.getTitle(),alert.getContent());
}
return result;
}
}

34
escheduler-alert/src/main/java/cn/escheduler/alert/utils/FuncUtils.java

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.alert.utils;
public class FuncUtils {
static public String mkString(Iterable<String> list, String split) {
StringBuilder sb = new StringBuilder();
boolean first = true;
for (String item : list) {
if (first)
first = false;
else
sb.append(split);
sb.append(item);
}
return sb.toString();
}
}

9
escheduler-alert/src/main/resources/alert.properties

@ -16,6 +16,15 @@ mail.smtp.ssl.enable=true
#xls file path,need create if not exist
xls.file.path=/tmp/xls
# Enterprise WeChat configuration
enterprise.wechat.corp.id=xxxxxxx
enterprise.wechat.secret=xxxxxxx
enterprise.wechat.agent.id=xxxxxxx
enterprise.wechat.users=xxxxxxx
enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpId&corpsecret=$secret
enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$token
enterprise.wechat.team.send.msg={\"toparty\":\"$toParty\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"}
enterprise.wechat.user.send.msg={\"touser\":\"$toUser\",\"agentid\":\"$agentId\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"$msg\"}}

117
escheduler-alert/src/test/java/cn/escheduler/alert/utils/EnterpriseWeChatUtilsTest.java

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.alert.utils;
import com.alibaba.fastjson.JSON;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
/**
* Please manually modify the configuration file before testing.
* file: alert.properties
* enterprise.wechat.corp.id
* enterprise.wechat.secret
* enterprise.wechat.token.url
* enterprise.wechat.push.url
* enterprise.wechat.send.msg
* enterprise.wechat.agent.id
* enterprise.wechat.users
*/
public class EnterpriseWeChatUtilsTest {
private String agentId = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_AGENT_ID); // app id
private Collection<String> listUserId = Arrays.asList(PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_USERS).split(","));
// Please change
private String partyId = "2";
private Collection<String> listPartyId = Arrays.asList("2","4");
@Test
public void testSendSingleTeamWeChat() {
try {
String token = EnterpriseWeChatUtils.getToken();
String msg = EnterpriseWeChatUtils.makeTeamSendMsg(partyId, agentId, "hello world");
String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
String errmsg = JSON.parseObject(resp).getString("errmsg");
Assert.assertEquals(errmsg, "ok");
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void testSendMultiTeamWeChat() {
try {
String token = EnterpriseWeChatUtils.getToken();
String msg = EnterpriseWeChatUtils.makeTeamSendMsg(listPartyId, agentId, "hello world");
String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
String errmsg = JSON.parseObject(resp).getString("errmsg");
Assert.assertEquals(errmsg, "ok");
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void testSendSingleUserWeChat() {
try {
String token = EnterpriseWeChatUtils.getToken();
String msg = EnterpriseWeChatUtils.makeUserSendMsg(listUserId.stream().findFirst().get(), agentId, "您的会议室已经预定,稍后会同步到`邮箱` \n" +
">**事项详情** \n" +
">事 项:<font color='info'>开会</font> <br>" +
">组织者:@miglioguan \n" +
">参与者:@miglioguan、@kunliu、@jamdeezhou、@kanexiong、@kisonwang \n" +
"> \n" +
">会议室:<font color='info'>广州TIT 1楼 301</font> \n" +
">日 期:<font color='warning'>2018年5月18日</font> \n" +
">时 间:<font color='comment'>上午9:00-11:00</font> \n" +
"> \n" +
">请准时参加会议。 \n" +
"> \n" +
">如需修改会议信息,请点击:[修改会议信息](https://work.weixin.qq.com)\"");
String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
String errmsg = JSON.parseObject(resp).getString("errmsg");
Assert.assertEquals(errmsg, "ok");
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void testSendMultiUserWeChat() {
try {
String token = EnterpriseWeChatUtils.getToken();
String msg = EnterpriseWeChatUtils.makeUserSendMsg(listUserId, agentId, "hello world");
String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
String errmsg = JSON.parseObject(resp).getString("errmsg");
Assert.assertEquals(errmsg, "ok");
} catch (IOException e) {
e.printStackTrace();
}
}
}

3
escheduler-api/src/main/java/cn/escheduler/api/controller/DataSourceController.java

@ -23,6 +23,7 @@ import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.Result;
import cn.escheduler.common.enums.DbType;
import cn.escheduler.common.enums.ResUploadType;
import cn.escheduler.common.utils.CommonUtils;
import cn.escheduler.common.utils.ParameterUtils;
import cn.escheduler.common.utils.PropertyUtils;
import cn.escheduler.dao.model.User;
@ -455,7 +456,7 @@ public class DataSourceController extends BaseController {
logger.info("login user {},get kerberos startup state : {}", loginUser.getUserName());
try{
// if upload resource is HDFS and kerberos startup is true , else false
return success(Status.SUCCESS.getMsg(), CheckUtils.getKerberosStartupState());
return success(Status.SUCCESS.getMsg(), CommonUtils.getKerberosStartupState());
}catch (Exception e){
logger.error(KERBEROS_STARTUP_STATE.getMsg(),e);
return error(Status.KERBEROS_STARTUP_STATE.getCode(), Status.KERBEROS_STARTUP_STATE.getMsg());

51
escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java

@ -24,7 +24,7 @@ import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.Result;
import cn.escheduler.common.enums.*;
import cn.escheduler.dao.model.User;
import io.swagger.annotations.Api;
import io.swagger.annotations.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -38,9 +38,9 @@ import static cn.escheduler.api.enums.Status.*;
/**
* execute task controller
* execute process controller
*/
@ApiIgnore
@Api(tags = "PROCESS_INSTANCE_EXECUTOR_TAG", position = 1)
@RestController
@RequestMapping("projects/{projectName}/executors")
public class ExecutorController extends BaseController {
@ -53,10 +53,27 @@ public class ExecutorController extends BaseController {
/**
* execute process instance
*/
@ApiOperation(value = "startProcessInstance", notes= "RUN_PROCESS_INSTANCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String"),
@ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType ="FailureStrategy"),
@ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType ="String"),
@ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType ="TaskDependType"),
@ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType ="CommandType"),
@ApiImplicitParam(name = "warningType", value = "WARNING_TYPE",required = true, dataType ="WarningType"),
@ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID",required = true, dataType ="Int", example = "100"),
@ApiImplicitParam(name = "receivers", value = "RECEIVERS",dataType ="String" ),
@ApiImplicitParam(name = "receiversCc", value = "RECEIVERS_CC",dataType ="String" ),
@ApiImplicitParam(name = "runMode", value = "RUN_MODE",dataType ="RunMode" ),
@ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority" ),
@ApiImplicitParam(name = "workerGroupId", value = "WORKER_GROUP_ID", dataType = "Int",example = "100"),
@ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int",example = "100"),
})
@PostMapping(value = "start-process-instance")
@ResponseStatus(HttpStatus.OK)
public Result startProcessInstance(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable String projectName,
public Result startProcessInstance(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam(value = "processDefinitionId") int processDefinitionId,
@RequestParam(value = "scheduleTime", required = false) String scheduleTime,
@RequestParam(value = "failureStrategy", required = true) FailureStrategy failureStrategy,
@ -102,10 +119,15 @@ public class ExecutorController extends BaseController {
* @param processInstanceId
* @return
*/
@ApiOperation(value = "execute", notes= "EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType")
})
@PostMapping(value = "/execute")
@ResponseStatus(HttpStatus.OK)
public Result execute(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable String projectName,
public Result execute(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam("processInstanceId") Integer processInstanceId,
@RequestParam("executeType") ExecuteType executeType
) {
@ -127,9 +149,13 @@ public class ExecutorController extends BaseController {
* @param processDefinitionId
* @return
*/
@ApiOperation(value = "startCheckProcessDefinition", notes= "START_CHECK_PROCESS_DEFINITION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100")
})
@PostMapping(value = "/start-check")
@ResponseStatus(HttpStatus.OK)
public Result startCheckProcessDefinition(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
public Result startCheckProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "processDefinitionId") int processDefinitionId) {
logger.info("login user {}, check process definition", loginUser.getUserName(), processDefinitionId);
try {
@ -149,9 +175,16 @@ public class ExecutorController extends BaseController {
* @param processDefinitionId
* @return
*/
@ApiIgnore
@ApiOperation(value = "getReceiverCc", notes= "GET_RECEIVER_CC_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/get-receiver-cc")
@ResponseStatus(HttpStatus.OK)
public Result getReceiverCc(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
public Result getReceiverCc(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "processDefinitionId",required = false) Integer processDefinitionId,
@RequestParam(value = "processInstanceId",required = false) Integer processInstanceId) {
logger.info("login user {}, get process definition receiver and cc", loginUser.getUserName());

14
escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java

@ -25,6 +25,7 @@ import cn.escheduler.common.enums.DbType;
import cn.escheduler.common.enums.ResUploadType;
import cn.escheduler.common.enums.UserType;
import cn.escheduler.common.job.db.*;
import cn.escheduler.common.utils.CommonUtils;
import cn.escheduler.common.utils.PropertyUtils;
import cn.escheduler.dao.mapper.DataSourceMapper;
import cn.escheduler.dao.mapper.DatasourceUserMapper;
@ -324,7 +325,14 @@ public class DataSourceService extends BaseService{
*/
public Map<String, Object> queryDataSourceList(User loginUser, Integer type) {
Map<String, Object> result = new HashMap<>(5);
List<DataSource> datasourceList = dataSourceMapper.queryDataSourceByType(loginUser.getId(), type);
List<DataSource> datasourceList;
if (isAdmin(loginUser)) {
datasourceList = dataSourceMapper.listAllDataSourceByType(type);
}else{
datasourceList = dataSourceMapper.queryDataSourceByType(loginUser.getId(), type);
}
result.put(Constants.DATA_LIST, datasourceList);
putMsg(result, Status.SUCCESS);
@ -374,7 +382,7 @@ public class DataSourceService extends BaseService{
break;
case HIVE:
case SPARK:
if (CheckUtils.getKerberosStartupState()) {
if (CommonUtils.getKerberosStartupState()) {
System.setProperty(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF,
getString(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH));
Configuration configuration = new Configuration();
@ -470,7 +478,7 @@ public class DataSourceService extends BaseService{
String address = buildAddress(type, host, port);
String jdbcUrl = address + "/" + database;
if (CheckUtils.getKerberosStartupState() &&
if (CommonUtils.getKerberosStartupState() &&
(type == DbType.HIVE || type == DbType.SPARK)){
jdbcUrl += ";principal=" + principal;
}

12
escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java

@ -307,11 +307,13 @@ public class UsersService extends BaseService {
// delete user
User user = userMapper.queryTenantCodeByUserId(id);
if (PropertyUtils.getResUploadStartupState()){
String userPath = HadoopUtils.getHdfsDataBasePath() + "/" + user.getTenantCode() + "/home/" + id;
HadoopUtils.getInstance().delete(userPath, true);
if (user != null) {
if (PropertyUtils.getResUploadStartupState()) {
String userPath = HadoopUtils.getHdfsDataBasePath() + "/" + user.getTenantCode() + "/home/" + id;
if (HadoopUtils.getInstance().exists(userPath)) {
HadoopUtils.getInstance().delete(userPath, true);
}
}
}
userMapper.delete(id);

12
escheduler-api/src/main/java/cn/escheduler/api/utils/CheckUtils.java

@ -160,16 +160,4 @@ public class CheckUtils {
return pattern.matcher(str).matches();
}
/**
* if upload resource is HDFS and kerberos startup is true , else false
* @return
*/
public static boolean getKerberosStartupState(){
String resUploadStartupType = PropertyUtils.getString(cn.escheduler.common.Constants.RES_UPLOAD_STARTUP_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
Boolean kerberosStartupState = getBoolean(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE);
return resUploadType == ResUploadType.HDFS && kerberosStartupState;
}
}

12
escheduler-api/src/main/resources/i18n/messages.properties

@ -1,4 +1,16 @@
QUERY_SCHEDULE_LIST_NOTES=query schedule list
EXECUTE_PROCESS_TAG=execute process related operation
PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation
RUN_PROCESS_INSTANCE_NOTES=run process instance
START_NODE_LIST=start node list(node name)
TASK_DEPEND_TYPE=task depend type
COMMAND_TYPE=command type
RUN_MODE=run mode
TIMEOUT=timeout
EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance
EXECUTE_TYPE=execute type
START_CHECK_PROCESS_DEFINITION_NOTES=start check process definition
GET_RECEIVER_CC_NOTES=query receiver cc
DESC=description
GROUP_NAME=group name
GROUP_TYPE=group type

12
escheduler-api/src/main/resources/i18n/messages_en_US.properties

@ -1,4 +1,16 @@
QUERY_SCHEDULE_LIST_NOTES=query schedule list
EXECUTE_PROCESS_TAG=execute process related operation
PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation
RUN_PROCESS_INSTANCE_NOTES=run process instance
START_NODE_LIST=start node list(node name)
TASK_DEPEND_TYPE=task depend type
COMMAND_TYPE=command type
RUN_MODE=run mode
TIMEOUT=timeout
EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance
EXECUTE_TYPE=execute type
START_CHECK_PROCESS_DEFINITION_NOTES=start check process definition
GET_RECEIVER_CC_NOTES=query receiver cc
DESC=description
GROUP_NAME=group name
GROUP_TYPE=group type

10
escheduler-api/src/main/resources/i18n/messages_zh_CN.properties

@ -1,4 +1,14 @@
QUERY_SCHEDULE_LIST_NOTES=查询定时列表
PROCESS_INSTANCE_EXECUTOR_TAG=流程实例执行相关操作
RUN_PROCESS_INSTANCE_NOTES=运行流程实例
START_NODE_LIST=开始节点列表(节点name)
TASK_DEPEND_TYPE=任务依赖类型
COMMAND_TYPE=指令类型
RUN_MODE=运行模式
TIMEOUT=超时时间
EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=执行流程实例的各种操作(暂停、停止、重跑、恢复等)
EXECUTE_TYPE=执行类型
START_CHECK_PROCESS_DEFINITION_NOTES=检查流程定义
DESC=备注(描述)
GROUP_NAME=组名称
GROUP_TYPE=组类型

42
escheduler-api/src/main/resources/logback.xml

@ -1,42 +0,0 @@
<!-- Logback configuration. See http://logback.qos.ch/manual/index.html -->
<configuration scan="true" scanPeriod="120 seconds">
<logger name="org.apache.zookeeper" level="WARN"/>
<logger name="org.apache.hbase" level="WARN"/>
<logger name="org.apache.hadoop" level="WARN"/>
<property name="log.base" value="logs" />
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="APISERVERLOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- Log level filter -->
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<file>${log.base}/escheduler-api-server.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.base}/escheduler-api-server.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxHistory>168</maxHistory>
<maxFileSize>64MB</maxFileSize>
</rollingPolicy>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>

10
escheduler-common/src/main/java/cn/escheduler/common/Constants.java

@ -219,6 +219,11 @@ public final class Constants {
*/
public static final String SEMICOLON = ";";
/**
* DOT .
*/
public static final String DOT = ".";
/**
* ZOOKEEPER_SESSION_TIMEOUT
*/
@ -885,6 +890,11 @@ public final class Constants {
*/
public static final String LOGIN_USER_KEY_TAB_USERNAME = "login.user.keytab.username";
/**
* default worker group id
*/
public static final int DEFAULT_WORKER_ID = -1;
/**
* loginUserFromKeytab path
*/

9
escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java

@ -24,20 +24,17 @@ public interface ITaskQueue {
/**
* take out all the elements
*
* this method has deprecated
* use checkTaskExists instead
*
* @param key
* @return
*/
@Deprecated
List<String> getAllTasks(String key);
/**
* check task exists in the task queue or not
*
* @param key queue name
* @param task ${priority}_${processInstanceId}_${taskId}
* @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* @return true if exists in the queue
*/
boolean checkTaskExists(String key, String task);
@ -54,10 +51,10 @@ public interface ITaskQueue {
* an element pops out of the queue
*
* @param key queue name
* @param remove whether remove the element
* @param n how many elements to poll
* @return
*/
String poll(String key, boolean remove);
List<String> poll(String key, int n);
/**
* remove a element from queue

2
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java

@ -42,7 +42,7 @@ public class TaskQueueFactory {
public static ITaskQueue getTaskQueueInstance() {
String queueImplValue = CommonUtils.getQueueImplValue();
if (StringUtils.isNotBlank(queueImplValue)) {
// queueImplValue = StringUtils.trim(queueImplValue);
// queueImplValue = IpUtils.trim(queueImplValue);
// if (SCHEDULER_QUEUE_REDIS_IMPL.equals(queueImplValue)) {
// logger.info("task queue impl use reids ");

103
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java

@ -19,6 +19,8 @@ package cn.escheduler.common.queue;
import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.Bytes;
import cn.escheduler.common.utils.IpUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
@ -26,10 +28,7 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
/**
* A singleton of a task queue implemented with zookeeper
@ -62,7 +61,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* @param key task queue name
* @return
*/
@Deprecated
@Override
public List<String> getAllTasks(String key) {
try {
@ -80,7 +78,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* check task exists in the task queue or not
*
* @param key queue name
* @param task ${priority}_${processInstanceId}_${taskId}
* @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* @return true if exists in the queue
*/
@Override
@ -110,7 +108,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* add task to tasks queue
*
* @param key task queue name
* @param value ${priority}_${processInstanceId}_${taskId}
* @param value ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
*/
@Override
public void add(String key, String value) {
@ -118,9 +116,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value;
String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value));
// String path = conf.getString(Constants.ZOOKEEPER_SCHEDULER_ROOT) + Constants.SINGLE_SLASH + Constants.SCHEDULER_TASKS_QUEUE + "_add" + Constants.SINGLE_SLASH + value;
// getZkClient().create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,
// Bytes.toBytes(value));
logger.info("add task : {} to tasks queue , result success",result);
} catch (Exception e) {
logger.error("add task to tasks queue exception",e);
@ -132,16 +127,16 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
/**
* An element pops out of the queue <p>
* note:
* ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
* The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low.
*
* 流程实例优先级_流程实例id_任务优先级_任务id high <- low
* 流程优先级_流程实例id_任务优先级_任务id_任务执行的机器id1,任务执行的机器id2,... high <- low
* @param key task queue name
* @param remove whether remove the element
* @return the task id to be executed
* @param tasksNum how many elements to poll
* @return the task ids to be executed
*/
@Override
public String poll(String key, boolean remove) {
public List<String> poll(String key, int tasksNum) {
try{
CuratorFramework zk = getZkClient();
String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
@ -149,55 +144,79 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
if(list != null && list.size() > 0){
String workerIp = OSUtils.getHost();
String workerIpLongStr = String.valueOf(IpUtils.ipToLong(workerIp));
int size = list.size();
String formatTargetTask = null;
String targetTaskKey = null;
Set<String> taskTreeSet = new TreeSet<>();
for (int i = 0; i < size; i++) {
String taskDetail = list.get(i);
String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE);
if(taskDetailArrs.length == 4){
//向前版本兼容
if(taskDetailArrs.length >= 4){
//format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
String formatTask = String.format("%s_%010d_%s_%010d", taskDetailArrs[0], Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2], Long.parseLong(taskDetailArrs[3]));
if(i > 0){
int result = formatTask.compareTo(formatTargetTask);
if(result < 0){
formatTargetTask = formatTask;
targetTaskKey = taskDetail;
if(taskDetailArrs.length > 4){
String taskHosts = taskDetailArrs[4];
//task can assign to any worker host if equals default ip value of worker server
if(!taskHosts.equals(Constants.DEFAULT_WORKER_ID)){
String[] taskHostsArr = taskHosts.split(Constants.COMMA);
if(!Arrays.asList(taskHostsArr).contains(workerIpLongStr)){
continue;
}
}
}else{
formatTargetTask = formatTask;
targetTaskKey = taskDetail;
}
}else{
logger.error("task queue poll error, task detail :{} , please check!", taskDetail);
taskTreeSet.add(formatTask);
}
}
if(formatTargetTask != null){
String taskIdPath = tasksQueuePath + targetTaskKey;
}
logger.info("consume task {}", taskIdPath);
List<String> taskslist = getTasksListFromTreeSet(tasksNum, taskTreeSet);
String[] vals = targetTaskKey.split(Constants.UNDERLINE);
logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(taskslist.toArray()), size - taskslist.size());
if(remove){
removeNode(key, targetTaskKey);
}
logger.info("consume task: {},there still have {} tasks need to be executed", vals[vals.length - 1], size - 1);
return targetTaskKey;
}else{
logger.error("should not go here, task queue poll error, please check!");
}
return taskslist;
}else{
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
} catch (Exception e) {
logger.error("add task to tasks queue exception",e);
}
return null;
return new ArrayList<String>();
}
/**
* get task list from tree set
*
* @param tasksNum
* @param taskTreeSet
*/
public List<String> getTasksListFromTreeSet(int tasksNum, Set<String> taskTreeSet) {
Iterator<String> iterator = taskTreeSet.iterator();
int j = 0;
List<String> taskslist = new ArrayList<>(tasksNum);
while(iterator.hasNext()){
if(j++ < tasksNum){
String task = iterator.next();
taskslist.add(task);
}
}
return taskslist;
}
@Override
public void removeNode(String key, String nodeValue){

11
escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java

@ -17,6 +17,7 @@
package cn.escheduler.common.utils;
import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ResUploadType;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -63,4 +64,14 @@ public class CommonUtils {
/**
* if upload resource is HDFS and kerberos startup is true , else false
* @return
*/
public static boolean getKerberosStartupState(){
String resUploadStartupType = PropertyUtils.getString(cn.escheduler.common.Constants.RES_UPLOAD_STARTUP_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
Boolean kerberosStartupState = getBoolean(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE);
return resUploadType == ResUploadType.HDFS && kerberosStartupState;
}
}

70
escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* http utils
*/
public class IpUtils {
private static final Logger logger = LoggerFactory.getLogger(IpUtils.class);
public static final String DOT = ".";
/**
* ip str to long <p>
*
* @param ipStr ip string
*/
public static Long ipToLong(String ipStr) {
String[] ipSet = ipStr.split("\\" + DOT);
return Long.parseLong(ipSet[0]) << 24 | Long.parseLong(ipSet[1]) << 16 | Long.parseLong(ipSet[2]) << 8 | Long.parseLong(ipSet[3]);
}
/**
* long to ip
* @param ipLong the long number converted from IP
* @return String
*/
public static String longToIp(long ipLong) {
long[] ipNumbers = new long[4];
long tmp = 0xFF;
ipNumbers[0] = ipLong >> 24 & tmp;
ipNumbers[1] = ipLong >> 16 & tmp;
ipNumbers[2] = ipLong >> 8 & tmp;
ipNumbers[3] = ipLong & tmp;
StringBuilder sb = new StringBuilder(16);
sb.append(ipNumbers[0]).append(DOT)
.append(ipNumbers[1]).append(DOT)
.append(ipNumbers[2]).append(DOT)
.append(ipNumbers[3]);
return sb.toString();
}
public static void main(String[] args){
long ipLong = ipToLong("11.3.4.5");
logger.info(longToIp(ipLong));
}
}

6
escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java

@ -312,7 +312,11 @@ public abstract class AbstractZKClient {
childrenList = zkClient.getChildren().forPath(masterZNodeParentPath);
}
} catch (Exception e) {
logger.warn(e.getMessage(),e);
// logger.warn(e.getMessage());
if(!e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){
logger.warn(e.getMessage(),e);
}
return childrenList.size();
}
return childrenList.size();

4
escheduler-common/src/main/resources/common/common.properties

@ -26,10 +26,10 @@ hadoop.security.authentication.startup.state=false
java.security.krb5.conf.path=/opt/krb5.conf
# loginUserFromKeytab user
login.user.keytab.username="hdfs-mycluster@ESZ.COM"
login.user.keytab.username=hdfs-mycluster@ESZ.COM
# loginUserFromKeytab path
login.user.keytab.path="/opt/hdfs.headless.keytab"
login.user.keytab.path=/opt/hdfs.headless.keytab
# system env path. self configuration, please make sure the directory and file exists and have read write execute permissions
escheduler.env.path=/opt/.escheduler_env.sh

6
escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java

@ -37,6 +37,12 @@ public class OSUtilsTest {
// static HardwareAbstractionLayer hal = si.getHardware();
@Test
public void getHost(){
logger.info(OSUtils.getHost());
}
@Test
public void memoryUsage() {
logger.info("memoryUsage : {}", OSUtils.memoryUsage());// 0.3361799418926239

85
escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java

@ -17,12 +17,15 @@
package cn.escheduler.common.queue;
import cn.escheduler.common.Constants;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import static org.junit.Assert.assertEquals;
@ -34,59 +37,62 @@ public class TaskQueueImplTest {
private static final Logger logger = LoggerFactory.getLogger(TaskQueueImplTest.class);
ITaskQueue tasksQueue = null;
@Test
public void testTaskQueue(){
@Before
public void before(){
tasksQueue = TaskQueueFactory.getTaskQueueInstance();
//clear all data
tasksQueue.delete();
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
}
@After
public void after(){
//clear all data
tasksQueue.delete();
}
@Test
public void testAdd(){
//add
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1");
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"2");
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"3");
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"4");
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_1_1_2130706433,3232236775");
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"0_1_1_1_2130706433,3232236775");
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_0_1_2130706433,3232236775");
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_2_1_1_2130706433,3232236775");
List<String> tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1);
if(tasks.size() < 0){
return;
}
//pop
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
assertEquals(node1,"1");
String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
assertEquals(node2,"2");
//sadd
String task1 = "1.1.1.1-1-mr";
String task2 = "1.1.1.2-2-mr";
String task3 = "1.1.1.3-3-mr";
String task4 = "1.1.1.4-4-mr";
String task5 = "1.1.1.5-5-mr";
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task1);
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task2);
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task3);
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task4);
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5);
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5); //repeat task
Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),5);
logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray()));
//srem
tasksQueue.srem(Constants.SCHEDULER_TASKS_KILL,task5);
//smembers
Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),4);
logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray()));
String node1 = tasks.get(0);
assertEquals(node1,"0_0000000001_1_0000000001");
tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1);
if(tasks.size() < 0){
return;
}
String node2 = tasks.get(0);
assertEquals(node2,"0_0000000001_1_0000000001");
}
/**
* test one million data from zookeeper queue
*/
@Test
public void extremeTest(){
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
//clear all data
tasksQueue.delete();
int total = 30 * 10000;
for(int i = 0; i < total; i++)
@ -99,14 +105,9 @@ public class TaskQueueImplTest {
}
}
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0);
assertEquals(node1,"0");
//clear all data
tasksQueue.delete();
}
}

75
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java

@ -25,6 +25,7 @@ import cn.escheduler.common.queue.ITaskQueue;
import cn.escheduler.common.queue.TaskQueueFactory;
import cn.escheduler.common.task.subprocess.SubProcessParameters;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.IpUtils;
import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.common.utils.ParameterUtils;
import cn.escheduler.dao.mapper.*;
@ -117,7 +118,7 @@ public class ProcessDao extends AbstractBaseDao {
*/
@Override
protected void init() {
userMapper=getMapper(UserMapper.class);
userMapper = getMapper(UserMapper.class);
processDefineMapper = getMapper(ProcessDefinitionMapper.class);
processInstanceMapper = getMapper(ProcessInstanceMapper.class);
dataSourceMapper = getMapper(DataSourceMapper.class);
@ -1015,11 +1016,58 @@ public class ProcessDao extends AbstractBaseDao {
*
* 流程实例优先级_流程实例id_任务优先级_任务id high <- low
*
* @param task
* @param taskInstance
* @return
*/
private String taskZkInfo(TaskInstance task) {
return String.valueOf(task.getProcessInstancePriority().ordinal()) + Constants.UNDERLINE + task.getProcessInstanceId() + Constants.UNDERLINE + task.getTaskInstancePriority().ordinal() + Constants.UNDERLINE + task.getId();
private String taskZkInfo(TaskInstance taskInstance) {
int taskWorkerGroupId = getTaskWorkerGroupId(taskInstance);
StringBuilder sb = new StringBuilder(100);
sb.append(taskInstance.getProcessInstancePriority().ordinal()).append(Constants.UNDERLINE)
.append(taskInstance.getProcessInstanceId()).append(Constants.UNDERLINE)
.append(taskInstance.getTaskInstancePriority().ordinal()).append(Constants.UNDERLINE)
.append(taskInstance.getId()).append(Constants.UNDERLINE);
if(taskWorkerGroupId > 0){
//not to find data from db
WorkerGroup workerGroup = queryWorkerGroupById(taskWorkerGroupId);
if(workerGroup == null ){
logger.info("task {} cannot find the worker group, use all worker instead.", taskInstance.getId());
sb.append(Constants.DEFAULT_WORKER_ID);
return sb.toString();
}
String ips = workerGroup.getIpList();
if(StringUtils.isBlank(ips)){
logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers",
taskInstance.getId(), workerGroup.getId());
sb.append(Constants.DEFAULT_WORKER_ID);
return sb.toString();
}
StringBuilder ipSb = new StringBuilder(100);
String[] ipArray = ips.split(COMMA);
for (String ip : ipArray) {
long ipLong = IpUtils.ipToLong(ip);
ipSb.append(ipLong).append(COMMA);
}
if(ipSb.length() > 0) {
ipSb.deleteCharAt(ipSb.length() - 1);
}
sb.append(ipSb);
}else{
sb.append(Constants.DEFAULT_WORKER_ID);
}
return sb.toString();
}
/**
@ -1683,5 +1731,24 @@ public class ProcessDao extends AbstractBaseDao {
}
/**
* get task worker group id
*
* @param taskInstance
* @return
*/
public int getTaskWorkerGroupId(TaskInstance taskInstance) {
int taskWorkerGroupId = taskInstance.getWorkerGroupId();
ProcessInstance processInstance = findProcessInstanceByTaskId(taskInstance.getId());
if(processInstance == null){
logger.error("cannot find the task:{} process instance", taskInstance.getId());
return Constants.DEFAULT_WORKER_ID;
}
int processWorkerGroupId = processInstance.getWorkerGroupId();
taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId);
return taskWorkerGroupId;
}
}

13
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapper.java

@ -216,4 +216,17 @@ public interface DataSourceMapper {
@SelectProvider(type = DataSourceMapperProvider.class, method = "queryDatasourceExceptUserId")
List<DataSource> queryDatasourceExceptUserId(@Param("userId") int userId);
@Results(value = {
@Result(property = "id", column = "id", id = true, javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "name", column = "name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "note", column = "note", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "type", column = "type", typeHandler = EnumOrdinalTypeHandler.class, javaType = DbType.class, jdbcType = JdbcType.INTEGER),
@Result(property = "userId", column = "user_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "connectionParams", column = "connection_params", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "createTime", column = "create_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
@Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE)
})
@SelectProvider(type = DataSourceMapperProvider.class, method = "listAllDataSourceByType")
List<DataSource> listAllDataSourceByType(@Param("type") Integer type);
}

19
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapperProvider.java

@ -175,8 +175,7 @@ public class DataSourceMapperProvider {
}
/**
* 查询总的数据源数目
*
* Query the total number of data sources
* @param parameter
* @return
*/
@ -228,4 +227,20 @@ public class DataSourceMapperProvider {
WHERE("user_id <> #{userId}");
}}.toString();
}
/**
* list all data source by type
*
* @param parameter
* @return
*/
public String listAllDataSourceByType(Map<String, Object> parameter) {
return new SQL() {{
SELECT("*");
FROM(TABLE_NAME);
WHERE("type = #{type}");
}}.toString();
}
}

2
escheduler-server/pom.xml

@ -89,7 +89,7 @@
<artifactId>escheduler-alert</artifactId>
</dependency>
</dependencies>
</dependencies>
<build>

2
escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java

@ -216,7 +216,7 @@ public class MasterServer implements CommandLineRunner, IStoppable {
if(Stopper.isRunning()) {
// send heartbeat to zk
if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) {
logger.error("master send heartbeat to zk failed");
logger.error("master send heartbeat to zk failed: can't find zookeeper regist path of master server");
return;
}

161
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java

@ -25,8 +25,9 @@ import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.model.*;
import cn.escheduler.server.zk.ZKWorkerClient;
import com.cronutils.utils.StringUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -98,15 +99,7 @@ public class FetchTaskThread implements Runnable{
*/
private boolean checkWorkerGroup(TaskInstance taskInstance, String host){
int taskWorkerGroupId = taskInstance.getWorkerGroupId();
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId());
if(processInstance == null){
logger.error("cannot find the task:{} process instance", taskInstance.getId());
return false;
}
int processWorkerGroupId = processInstance.getWorkerGroupId();
taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId);
int taskWorkerGroupId = processDao.getTaskWorkerGroupId(taskInstance);
if(taskWorkerGroupId <= 0){
return true;
@ -117,119 +110,127 @@ public class FetchTaskThread implements Runnable{
return true;
}
String ips = workerGroup.getIpList();
if(ips == null){
if(StringUtils.isBlank(ips)){
logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers",
taskInstance.getId(), workerGroup.getId());
}
String[] ipArray = ips.split(",");
String[] ipArray = ips.split(Constants.COMMA);
List<String> ipList = Arrays.asList(ipArray);
return ipList.contains(host);
}
@Override
public void run() {
while (Stopper.isRunning()){
InterProcessMutex mutex = null;
try {
if(OSUtils.checkResource(this.conf, false)) {
// creating distributed locks, lock path /escheduler/lock/worker
String zNodeLockPath = zkWorkerClient.getWorkerLockPath();
mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath);
mutex.acquire();
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
//check memory and cpu usage and threads
if(OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor)) {
for (int i = 0; i < taskNum; i++) {
int activeCount = poolExecutor.getActiveCount();
if (activeCount >= workerExecNums) {
logger.info("thread insufficient , activeCount : {} , workerExecNums : {}",activeCount,workerExecNums);
continue;
}
//whether have tasks, if no tasks , no need lock //get all tasks
List<String> tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE);
if(tasksQueueList.size() > 0){
// creating distributed locks, lock path /escheduler/lock/worker
String zNodeLockPath = zkWorkerClient.getWorkerLockPath();
mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath);
mutex.acquire();
// task instance id str
String taskQueueStr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
List<String> taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum);
if (!StringUtils.isEmpty(taskQueueStr )) {
for(String taskQueueStr : taskQueueStrArr){
if (StringUtils.isNotBlank(taskQueueStr )) {
String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE);
String taskInstIdStr = taskStringArray[taskStringArray.length - 1];
Date now = new Date();
Integer taskId = Integer.parseInt(taskInstIdStr);
if (!checkThreadCount(poolExecutor)) {
break;
}
// find task instance by task id
TaskInstance taskInstance = processDao.findTaskInstanceById(taskId);
String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE);
String taskInstIdStr = taskStringArray[taskStringArray.length - 1];
Date now = new Date();
Integer taskId = Integer.parseInt(taskInstIdStr);
logger.info("worker fetch taskId : {} from queue ", taskId);
// find task instance by task id
TaskInstance taskInstance = processDao.findTaskInstanceById(taskId);
int retryTimes = 30;
// mainly to wait for the master insert task to succeed
while (taskInstance == null && retryTimes > 0) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
taskInstance = processDao.findTaskInstanceById(taskId);
retryTimes--;
}
logger.info("worker fetch taskId : {} from queue ", taskId);
if (taskInstance == null ) {
logger.error("task instance is null. task id : {} ", taskId);
continue;
}
if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
continue;
}
taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
logger.info("remove task:{} from queue", taskQueueStr);
int retryTimes = 30;
// mainly to wait for the master insert task to succeed
while (taskInstance == null && retryTimes > 0) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
taskInstance = processDao.findTaskInstanceById(taskId);
retryTimes--;
}
if (taskInstance == null ) {
logger.error("task instance is null. task id : {} ", taskId);
continue;
}
// set execute task worker host
taskInstance.setHost(OSUtils.getHost());
taskInstance.setStartTime(now);
if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
continue;
}
taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
logger.info("remove task:{} from queue", taskQueueStr);
// set execute task worker host
taskInstance.setHost(OSUtils.getHost());
taskInstance.setStartTime(now);
// get process instance
ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
// get process instance
ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
// get process define
ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
// get process define
ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processDefine);
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processDefine);
// get local execute path
String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(),
processDefine.getId(),
processInstance.getId(),
taskInstance.getId());
logger.info("task instance local execute path : {} ", execLocalPath);
// get local execute path
String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(),
processDefine.getId(),
processInstance.getId(),
taskInstance.getId());
logger.info("task instance local execute path : {} ", execLocalPath);
// set task execute path
taskInstance.setExecutePath(execLocalPath);
// set task execute path
taskInstance.setExecutePath(execLocalPath);
Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
processDefine.getUserId());
// check and create Linux users
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath,
processInstance.getTenantCode(), logger);
tenant.getTenantCode(), logger);
logger.info("task : {} ready to submit to task scheduler thread",taskId);
// submit task
workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
logger.info("task : {} ready to submit to task scheduler thread",taskId);
// submit task
workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
}
}
}
}
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}catch (Exception e){
logger.error("fetch task thread exception : " + e.getMessage(),e);
}
finally {
}finally {
if (mutex != null){
try {
mutex.release();
@ -244,4 +245,18 @@ public class FetchTaskThread implements Runnable{
}
}
}
/**
*
* @param poolExecutor
* @return
*/
private boolean checkThreadCount(ThreadPoolExecutor poolExecutor) {
int activeCount = poolExecutor.getActiveCount();
if (activeCount >= workerExecNums) {
logger.info("thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource", activeCount, workerExecNums, Constants.SLEEP_TIME_MILLIS);
return false;
}
return true;
}
}

19
escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java

@ -29,6 +29,7 @@ import cn.escheduler.common.task.sql.SqlBinds;
import cn.escheduler.common.task.sql.SqlParameters;
import cn.escheduler.common.task.sql.SqlType;
import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.CommonUtils;
import cn.escheduler.common.utils.ParameterUtils;
import cn.escheduler.dao.AlertDao;
import cn.escheduler.dao.DaoFactory;
@ -43,6 +44,8 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.EnumUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import java.sql.*;
@ -51,6 +54,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static cn.escheduler.common.utils.PropertyUtils.getString;
/**
* sql task
*/
@ -228,7 +233,15 @@ public class SqlTask extends AbstractTask {
List<String> createFuncs){
Connection connection = null;
try {
if (CommonUtils.getKerberosStartupState()) {
System.setProperty(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF,
getString(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH));
Configuration configuration = new Configuration();
configuration.set(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab(getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_USERNAME),
getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_PATH));
}
if (DbType.HIVE.name().equals(sqlParameters.getType())) {
Properties paramProp = new Properties();
paramProp.setProperty("user", baseDataSource.getUser());
@ -278,7 +291,7 @@ public class SqlTask extends AbstractTask {
array.add(mapOfColValues);
}
logger.info("execute sql : {}", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue));
logger.debug("execute sql : {}", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue));
// send as an attachment
if (StringUtils.isEmpty(sqlParameters.getShowType())) {
@ -374,7 +387,7 @@ public class SqlTask extends AbstractTask {
String showTypeName = sqlParameters.getShowType().replace(Constants.COMMA,"").trim();
if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){
Map<String, Object> mailResult = MailUtils.sendMails(receviersList, receviersCcList, title, content, ShowType.valueOf(showTypeName));
if(!(Boolean) mailResult.get(Constants.STATUS)){
if(!(Boolean) mailResult.get(cn.escheduler.common.Constants.STATUS)){
throw new RuntimeException("send mail failed!");
}
}else{

18
escheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue

@ -28,6 +28,17 @@
@click="_toggleView"
icon="fa fa-code">
</x-button>
<x-button
style="vertical-align: middle;"
data-toggle="tooltip"
:title="$t('Startup parameter')"
data-container="body"
type="primary"
size="xsmall"
:disabled="$route.name !== 'projects-instance-details'"
@click="_toggleParam"
icon="fa fa-chevron-circle-right">
</x-button>
<span class="name">{{name}}</span>
&nbsp;
<span v-if="name" class="copy-name" @click="_copyName" :data-clipboard-text="name"><i class="iconfont" data-container="body" data-toggle="tooltip" title="复制名称" >&#xe61e;</i></span>
@ -383,6 +394,13 @@
_toggleView () {
findComponentDownward(this.$root, `assist-dag-index`)._toggleView()
},
/**
* Starting parameters
*/
_toggleParam () {
findComponentDownward(this.$root, `starting-params-dag-index`)._toggleParam()
},
/**
* Create a node popup layer
* @param Object id

2
escheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue

@ -3,7 +3,7 @@
<m-list-box>
<div slot="text">{{$t('Program Type')}}</div>
<div slot="content">
<x-select v-model="programType" :disabled="isDetails" style="width: 100px;">
<x-select v-model="programType" :disabled="isDetails" style="width: 110px;">
<x-option
v-for="city in programTypeList"
:key="city.code"

116
escheduler-ui/src/js/conf/home/pages/dag/_source/startingParam/index.vue

@ -0,0 +1,116 @@
<template>
<div class="starting-params-dag-index">
<template v-if="isView && isActive">
<div class="box">
<p class="box-hd"><i class="fa fa-chevron-circle-right"></i><b>{{$t('Startup parameter')}}</b></p>
<ul class="box-bd">
<li><span>{{$t('Startup type')}}</span><span>{{_rtRunningType(startupParam.commandType)}}</span></li>
<li><span>{{$t('Complement range')}}</span><span v-if="startupParam.commandParam && startupParam.commandParam.complementStartDate">{{startupParam.commandParam.complementStartDate}}-{{startupParam.commandParam.complementEndDate}}</span><span v-else>-</span></li>
<li><span>{{$t('Failure Strategy')}}</span><span>{{startupParam.failureStrategy === 'END' ? $t('End') : $t('Continue')}}</span></li>
<li><span>{{$t('Process priority')}}</span><span>{{startupParam.processInstancePriority}}</span></li>
<li><span>{{$t('Worker group')}}</span><span v-if="workerGroupList.length">{{_rtWorkerGroupName(startupParam.workerGroupId)}}</span></li>
<li><span>{{$t('Notification strategy')}}</span><span>{{_rtWarningType(startupParam.warningType)}}</span></li>
<li><span>{{$t('Notification group')}}</span><span v-if="notifyGroupList.length">{{_rtNotifyGroupName(startupParam.warningGroupId)}}</span></li>
<li><span>{{$t('Recipient')}}</span><span>{{startupParam.receivers || '-'}}</span></li>
<li><span>{{$t('Cc')}}</span><span>{{startupParam.receiversCc || '-'}}</span></li>
</ul>
</div>
</template>
</div>
</template>
<script>
import store from '@/conf/home/store'
import { runningType } from '@/conf/home/pages/dag/_source/config'
import { warningTypeList } from '@/conf/home/pages/projects/pages/definition/pages/list/_source/util'
export default {
name: 'starting-params-dag-index',
data () {
return {
store,
startupParam: store.state.dag.startup,
isView: false,
isActive: true,
notifyGroupList: null,
workerGroupList: null
}
},
methods: {
_toggleParam () {
this.isView = !this.isView
},
_rtRunningType (code) {
return _.filter(runningType, v => v.code === code)[0].desc
},
_rtWarningType (id) {
return _.filter(warningTypeList, v => v.id === id)[0].code
},
_rtNotifyGroupName (id) {
let o = _.filter(this.notifyGroupList, v => v.id === id)
if (o && o.length) {
return o[0].code
}
return '-'
},
_rtWorkerGroupName (id) {
let o = _.filter(this.workerGroupList, v => v.id === id)
if (o && o.length) {
return o[0].name
}
return '-'
},
_getNotifyGroupList () {
let notifyGroupListS = _.cloneDeep(this.store.state.dag.notifyGroupListS) || []
if (!notifyGroupListS.length) {
this.store.dispatch('dag/getNotifyGroupList').then(res => {
this.notifyGroupList = res
})
} else {
this.notifyGroupList = notifyGroupListS
}
},
_getWorkerGroupList () {
let stateWorkerGroupsList = this.store.state.security.workerGroupsListAll || []
if (!stateWorkerGroupsList.length) {
this.store.dispatch('security/getWorkerGroupsAll').then(res => {
this.workerGroupList = res
})
} else {
this.workerGroupList = stateWorkerGroupsList
}
}
},
watch: {
'$route': {
deep: true,
handler () {
this.isActive = false
this.notifyGroupList = null
this.workerGroupList = null
this.$nextTick(() => (this.isActive = true))
}
}
},
mounted () {
this._getNotifyGroupList()
this._getWorkerGroupList()
}
}
</script>
<style lang="scss">
.starting-params-dag-index {
.box {
padding: 5px 10px 10px;
.box-hd {
.fa {
color: #0097e0;
margin-right: 4px;
}
font-size: 16px;
}
.box-bd {
margin-left: 20px;
}
}
}
</style>

4
escheduler-ui/src/js/conf/home/pages/dag/instanceDetails.vue

@ -1,6 +1,7 @@
<template>
<div class="home-main index-model">
<m-variable></m-variable>
<m-starting-param></m-starting-param>
<m-dag v-if="!isLoading" :type="'instance'"></m-dag>
<m-spin :is-spin="isLoading"></m-spin>
</div>
@ -10,6 +11,7 @@
import { mapActions, mapMutations } from 'vuex'
import mSpin from '@/module/components/spin/spin'
import mVariable from './_source/variable'
import mStartingParam from './_source/startingParam'
import Affirm from './_source/jumpAffirm'
import disabledState from '@/module/mixin/disabledState'
@ -91,6 +93,6 @@
},
mounted () {
},
components: { mDag, mSpin, mVariable }
components: { mDag, mSpin, mVariable, mStartingParam }
}
</script>

4
escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue

@ -6,7 +6,7 @@
<div class="row-title">
<div class="left">
<span class="sp">IP: {{item.host}}</span>
<span class="sp">{{$t('Port')}}: {{item.port}}</span>
<span class="sp">{{$t('Process Pid')}}: {{item.port}}</span>
<span class="sp">{{$t('Zk registration directory')}}: {{item.zkDirectory}}</span>
</div>
<div class="right">
@ -93,4 +93,4 @@
</script>
<style lang="scss" rel="stylesheet/scss">
@import "./servers";
</style>
</style>

4
escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue

@ -6,7 +6,7 @@
<div class="row-title">
<div class="left">
<span class="sp">IP: {{item.host}}</span>
<span class="sp">{{$t('Port')}}: {{item.port}}</span>
<span class="sp">{{$t('Process Pid')}}: {{item.port}}</span>
<span class="sp">{{$t('Zk registration directory')}}: {{item.zkDirectory}}</span>
</div>
<div class="right">
@ -94,4 +94,4 @@
</script>
<style lang="scss" rel="stylesheet/scss">
@import "./servers";
</style>
</style>

4
escheduler-ui/src/js/conf/home/pages/projects/pages/_source/instanceConditions/index.vue

@ -36,10 +36,10 @@
</x-select>
</div>
<div class="list">
<x-input v-model="searchParams.host" style="width: 140px;" size="small" :placeholder="$t('host')"></x-input>
<x-input v-model="searchParams.host" @on-enterkey="_ckQuery" style="width: 140px;" size="small" :placeholder="$t('host')"></x-input>
</div>
<div class="list">
<x-input v-model="searchParams.searchVal" style="width: 200px;" size="small" :placeholder="$t('name')"></x-input>
<x-input v-model="searchParams.searchVal" @on-enterkey="_ckQuery" style="width: 200px;" size="small" :placeholder="$t('name')"></x-input>
</div>
</template>
</m-conditions>

8
escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/email.vue

@ -32,9 +32,12 @@
v-model="email"
:disabled="disabled"
:placeholder="$t('Please enter email')"
@blur="_emailEnter"
@keydown.tab="_emailTab"
@keyup.delete="_emailDelete"
@keyup.enter="_emailEnter"
@keyup.space="_emailEnter"
@keyup.186="_emailEnter"
@keyup.up="_emailKeyup('up')"
@keyup.down="_emailKeyup('down')">
</span>
@ -78,6 +81,11 @@
* Manually add a mailbox
*/
_manualEmail () {
if (this.email === '') {
return
}
this.email = _.trim(this.email).replace(/(;$)|(;$)/g, "")
let email = this.email
let is = (n) => {

43
escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue

@ -21,9 +21,11 @@
</div>
</div>
<div class="clearfix list">
<x-button type="info" style="margin-left:20px" shape="circle" :loading="spinnerLoading" @click="preview()" v-ps="['GENERAL_USER']">执行时间</x-button>
<div class="text">
{{$t('Timing')}}
</div>
<div class="cont">
<template>
<x-poptip :ref="'poptip'" placement="bottom-start">
@ -43,6 +45,13 @@
</template>
</div>
</div>
<div class="clearfix list">
<div style = "padding-left: 150px;">未来五次执行时间</div>
<ul style = "padding-left: 150px;">
<li v-for="time in previewTimes">{{time}}</li>
</ul>
</div>
<div class="clearfix list">
<div class="text">
{{$t('Failure Strategy')}}
@ -162,7 +171,8 @@
receiversCc: [],
i18n: i18n.globalScope.LOCALE,
processInstancePriority: 'MEDIUM',
workerGroupId: -1
workerGroupId: -1,
previewTimes: []
}
},
props: {
@ -180,6 +190,11 @@
return false
}
if (this.scheduleTime[0] === this.scheduleTime[1]) {
this.$message.warning(`${i18n.$t('The start time must not be the same as the end')}`)
return false
}
if (!this.crontab) {
this.$message.warning(`${i18n.$t('Please enter crontab')}`)
return false
@ -225,6 +240,29 @@
}
},
_preview () {
if (this._verification()) {
let api = 'dag/previewSchedule'
let searchParams = {
schedule: JSON.stringify({
startTime: this.scheduleTime[0],
endTime: this.scheduleTime[1],
crontab: this.crontab
})
}
let msg = ''
this.store.dispatch(api, searchParams).then(res => {
this.previewTimes = res
if (this.previewTimes.length) {
resolve()
} else {
reject(new Error(0))
}
})
}
},
_getNotifyGroupList () {
return new Promise((resolve, reject) => {
let notifyGroupListS = _.cloneDeep(this.store.state.dag.notifyGroupListS) || []
@ -248,6 +286,9 @@
},
close () {
this.$emit('close')
},
preview () {
this._preview()
}
},
watch: {

2
escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js

@ -37,7 +37,7 @@ let warningTypeList = [
]
const isEmial = (val) => {
let regEmail = /^([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line
let regEmail = /^([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line
return regEmail.test(val)
}

2
escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue

@ -131,7 +131,7 @@
}
},
_verification () {
let regEmail = /^([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line
let regEmail = /^([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line
// Mobile phone number regular
let regPhone = /^1(3|4|5|6|7|8)\d{9}$/; // eslint-disable-line

17
escheduler-ui/src/js/conf/home/store/dag/actions.js

@ -149,6 +149,10 @@ export default {
state.tenantId = processInstanceJson.tenantId
//startup parameters
state.startup = _.assign(state.startup, _.pick(res.data, ['commandType', 'failureStrategy', 'processInstancePriority', 'workerGroupId', 'warningType', 'warningGroupId', 'receivers', 'receiversCc']))
state.startup.commandParam = JSON.parse(res.data.commandParam)
resolve(res.data)
}).catch(res => {
reject(res)
@ -383,6 +387,19 @@ export default {
})
})
},
/**
* Preview timing
*/
previewSchedule ({ state }, payload) {
return new Promise((resolve, reject) => {
io.post(`projects/${state.projectName}/schedule/preview`, payload, res => {
resolve(res.data)
//alert(res.data)
}).catch(e => {
reject(e)
})
})
},
/**
* Timing list paging
*/

5
escheduler-ui/src/js/conf/home/store/dag/state.js

@ -92,5 +92,8 @@ export default {
// Process instance list{ view a single record }
instanceListS: [],
// Operating state
isDetails: false
isDetails: false,
startup: {
}
}

7
escheduler-ui/src/js/module/i18n/locale/en_US.js

@ -466,6 +466,9 @@ export default {
'Statistics manage': 'Statistics manage',
'statistics': 'statistics',
'select tenant':'select tenant',
'Process Instance Running Count': 'Process Instance Running Count',
'Please enter Principal':'Please enter Principal'
'Please enter Principal':'Please enter Principal',
'The start time must not be the same as the end': 'The start time must not be the same as the end',
'Startup parameter': 'Startup parameter',
'Startup type': 'Startup type',
'Complement range': 'Complement range'
}

6
escheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -467,5 +467,9 @@ export default {
'Statistics manage': '统计管理',
'statistics': '统计',
'select tenant':'选择租户',
'Please enter Principal':'请输入Principal'
'Please enter Principal':'请输入Principal',
'The start time must not be the same as the end': '开始时间和结束时间不能相同',
'Startup parameter': '启动参数',
'Startup type': '启动类型',
'Complement range': '补数范围'
}

18
install.sh

@ -106,6 +106,18 @@ sslEnable="true"
# 下载Excel路径
xlsFilePath="/tmp/xls"
# 企业微信企业ID配置
enterpriseWechatCorpId="xxxxxxxxxx"
# 企业微信应用Secret配置
enterpriseWechatSecret="xxxxxxxxxx"
# 企业微信应用AgentId配置
enterpriseWechatAgentId="xxxxxxxxxx"
# 企业微信用户配置,多个用户以,分割
enterpriseWechatUsers="xxxxx,xxxxx"
#是否启动监控自启动脚本
monitorServerState="false"
@ -318,7 +330,7 @@ sed -i ${txt} "s#master.exec.task.number.*#master.exec.task.number=${masterExecT
sed -i ${txt} "s#master.heartbeat.interval.*#master.heartbeat.interval=${masterHeartbeatInterval}#g" conf/master.properties
sed -i ${txt} "s#master.task.commit.retryTimes.*#master.task.commit.retryTimes=${masterTaskCommitRetryTimes}#g" conf/master.properties
sed -i ${txt} "s#master.task.commit.interval.*#master.task.commit.interval=${masterTaskCommitInterval}#g" conf/master.properties
sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties
#sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties
sed -i ${txt} "s#master.reserved.memory.*#master.reserved.memory=${masterReservedMemory}#g" conf/master.properties
@ -345,6 +357,10 @@ sed -i ${txt} "s#mail.passwd.*#mail.passwd=${mailPassword}#g" conf/alert.propert
sed -i ${txt} "s#mail.smtp.starttls.enable.*#mail.smtp.starttls.enable=${starttlsEnable}#g" conf/alert.properties
sed -i ${txt} "s#mail.smtp.ssl.enable.*#mail.smtp.ssl.enable=${sslEnable}#g" conf/alert.properties
sed -i ${txt} "s#xls.file.path.*#xls.file.path=${xlsFilePath}#g" conf/alert.properties
sed -i ${txt} "s#enterprise.wechat.corp.id.*#enterprise.wechat.corp.id=${enterpriseWechatCorpId}#g" conf/alert.properties
sed -i ${txt} "s#enterprise.wechat.secret.*#enterprise.wechat.secret=${enterpriseWechatSecret}#g" conf/alert.properties
sed -i ${txt} "s#enterprise.wechat.agent.id.*#enterprise.wechat.agent.id=${enterpriseWechatAgentId}#g" conf/alert.properties
sed -i ${txt} "s#enterprise.wechat.users.*#enterprise.wechat.users=${enterpriseWechatUsers}#g" conf/alert.properties
sed -i ${txt} "s#installPath.*#installPath=${installPath}#g" conf/config/install_config.conf

Loading…
Cancel
Save