diff --git a/README.md b/README.md index 19c1d2e45b..ffd8dcf396 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ Easy Scheduler ### 近期研发计划 -EasyScheduler的工作计划:研发计划 ,其中 In Develop卡片下是1.0.2版本的功能,TODO卡片是待做事项(包括 feature ideas) +EasyScheduler的工作计划:研发计划 ,其中 In Develop卡片下是1.1.0版本的功能,TODO卡片是待做事项(包括 feature ideas) ### 贡献代码 diff --git a/escheduler-alert/src/main/java/cn/escheduler/alert/manager/EnterpriseWeChatManager.java b/escheduler-alert/src/main/java/cn/escheduler/alert/manager/EnterpriseWeChatManager.java new file mode 100644 index 0000000000..cf16f3a63e --- /dev/null +++ b/escheduler-alert/src/main/java/cn/escheduler/alert/manager/EnterpriseWeChatManager.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.alert.manager; + +import cn.escheduler.alert.utils.Constants; +import cn.escheduler.alert.utils.EnterpriseWeChatUtils; +import cn.escheduler.dao.model.Alert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Enterprise WeChat Manager + */ +public class EnterpriseWeChatManager { + private static final Logger logger = LoggerFactory.getLogger(MsgManager.class); + /** + * Enterprise We Chat send + * @param alert + */ + public Map send(Alert alert, String token){ + Map retMap = new HashMap<>(); + retMap.put(Constants.STATUS, false); + String agentId = EnterpriseWeChatUtils.enterpriseWeChatAgentId; + String users = EnterpriseWeChatUtils.enterpriseWeChatUsers; + List userList = Arrays.asList(users.split(",")); + logger.info("send message {}",alert); + String msg = EnterpriseWeChatUtils.makeUserSendMsg(userList, agentId,EnterpriseWeChatUtils.markdownByAlert(alert)); + try { + EnterpriseWeChatUtils.sendEnterpriseWeChat(Constants.UTF_8, msg, token); + } catch (IOException e) { + logger.error(e.getMessage(),e); + } + retMap.put(Constants.STATUS, true); + return retMap; + } + +} diff --git a/escheduler-alert/src/main/java/cn/escheduler/alert/runner/AlertSender.java b/escheduler-alert/src/main/java/cn/escheduler/alert/runner/AlertSender.java index fbaf512632..bc6566ef51 100644 --- a/escheduler-alert/src/main/java/cn/escheduler/alert/runner/AlertSender.java +++ b/escheduler-alert/src/main/java/cn/escheduler/alert/runner/AlertSender.java @@ -17,7 +17,9 @@ package cn.escheduler.alert.runner; import cn.escheduler.alert.manager.EmailManager; +import cn.escheduler.alert.manager.EnterpriseWeChatManager; import cn.escheduler.alert.utils.Constants; +import cn.escheduler.alert.utils.EnterpriseWeChatUtils; import cn.escheduler.common.enums.AlertStatus; import cn.escheduler.common.enums.AlertType; import cn.escheduler.dao.AlertDao; @@ -40,6 +42,7 @@ public class AlertSender{ private static final Logger logger = LoggerFactory.getLogger(AlertSender.class); private static final EmailManager emailManager= new EmailManager(); + private static final EnterpriseWeChatManager weChatManager= new EnterpriseWeChatManager(); private List alertList; @@ -109,6 +112,12 @@ public class AlertSender{ if (flag){ alertDao.updateAlert(AlertStatus.EXECUTION_SUCCESS, "execution success", alert.getId()); logger.info("alert send success"); + try { + String token = EnterpriseWeChatUtils.getToken(); + weChatManager.send(alert,token); + } catch (Exception e) { + logger.error(e.getMessage(),e); + } }else { alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE,String.valueOf(retMaps.get(Constants.MESSAGE)),alert.getId()); logger.info("alert send error : {}" , String.valueOf(retMaps.get(Constants.MESSAGE))); diff --git a/escheduler-alert/src/main/java/cn/escheduler/alert/utils/Constants.java b/escheduler-alert/src/main/java/cn/escheduler/alert/utils/Constants.java index 70136f3972..d077dcf65c 100644 --- a/escheduler-alert/src/main/java/cn/escheduler/alert/utils/Constants.java +++ b/escheduler-alert/src/main/java/cn/escheduler/alert/utils/Constants.java @@ -129,6 +129,10 @@ public class Constants { public static final int ALERT_SCAN_INTERVEL = 5000; + public static final String MARKDOWN_QUOTE = ">"; + + public static final String MARKDOWN_ENTER = "\n"; + public static final String ENTERPRISE_WECHAT_CORP_ID = "enterprise.wechat.corp.id"; public static final String ENTERPRISE_WECHAT_SECRET = "enterprise.wechat.secret"; @@ -140,4 +144,8 @@ public class Constants { public static final String ENTERPRISE_WECHAT_TEAM_SEND_MSG = "enterprise.wechat.team.send.msg"; public static final String ENTERPRISE_WECHAT_USER_SEND_MSG = "enterprise.wechat.user.send.msg"; + + public static final String ENTERPRISE_WECHAT_AGENT_ID = "enterprise.wechat.agent.id"; + + public static final String ENTERPRISE_WECHAT_USERS = "enterprise.wechat.users"; } diff --git a/escheduler-alert/src/main/java/cn/escheduler/alert/utils/EnterpriseWeChatUtils.java b/escheduler-alert/src/main/java/cn/escheduler/alert/utils/EnterpriseWeChatUtils.java index 169977b01e..eab81498c2 100644 --- a/escheduler-alert/src/main/java/cn/escheduler/alert/utils/EnterpriseWeChatUtils.java +++ b/escheduler-alert/src/main/java/cn/escheduler/alert/utils/EnterpriseWeChatUtils.java @@ -16,9 +16,12 @@ */ package cn.escheduler.alert.utils; +import cn.escheduler.common.enums.ShowType; +import cn.escheduler.dao.model.Alert; import com.alibaba.fastjson.JSON; import com.google.common.reflect.TypeToken; +import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; @@ -31,13 +34,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Collection; -import java.util.Map; +import java.util.*; import static cn.escheduler.alert.utils.PropertyUtils.getString; /** - * qiye weixin utils + * Enterprise WeChat utils */ public class EnterpriseWeChatUtils { @@ -48,7 +50,7 @@ public class EnterpriseWeChatUtils { private static final String enterpriseWeChatSecret = getString(Constants.ENTERPRISE_WECHAT_SECRET); private static final String enterpriseWeChatTokenUrl = getString(Constants.ENTERPRISE_WECHAT_TOKEN_URL); - private String enterpriseWeChatTokenUrlReplace = enterpriseWeChatTokenUrl + private static String enterpriseWeChatTokenUrlReplace = enterpriseWeChatTokenUrl .replaceAll("\\$corpId", enterpriseWeChatCorpId) .replaceAll("\\$secret", enterpriseWeChatSecret); @@ -58,12 +60,16 @@ public class EnterpriseWeChatUtils { private static final String enterpriseWeChatUserSendMsg = getString(Constants.ENTERPRISE_WECHAT_USER_SEND_MSG); + public static final String enterpriseWeChatAgentId = getString(Constants.ENTERPRISE_WECHAT_AGENT_ID); + + public static final String enterpriseWeChatUsers = getString(Constants.ENTERPRISE_WECHAT_USERS); + /** - * get winxin token info + * get Enterprise WeChat token info * @return token string info * @throws IOException */ - public String getToken() throws IOException { + public static String getToken() throws IOException { String resp; CloseableHttpClient httpClient = HttpClients.createDefault(); @@ -71,7 +77,7 @@ public class EnterpriseWeChatUtils { CloseableHttpResponse response = httpClient.execute(httpGet); try { HttpEntity entity = response.getEntity(); - resp = EntityUtils.toString(entity, "utf-8"); + resp = EntityUtils.toString(entity, Constants.UTF_8); EntityUtils.consume(entity); } finally { response.close(); @@ -84,26 +90,26 @@ public class EnterpriseWeChatUtils { } /** - * make team single weixin message + * make team single Enterprise WeChat message * @param toParty * @param agentId * @param msg - * @return weixin send message + * @return Enterprise WeChat send message */ - public String makeTeamSendMsg(String toParty, String agentId, String msg) { + public static String makeTeamSendMsg(String toParty, String agentId, String msg) { return enterpriseWeChatTeamSendMsg.replaceAll("\\$toParty", toParty) .replaceAll("\\$agentId", agentId) .replaceAll("\\$msg", msg); } /** - * make team multi weixin message + * make team multi Enterprise WeChat message * @param toParty * @param agentId * @param msg - * @return weixin send message + * @return Enterprise WeChat send message */ - public String makeTeamSendMsg(Collection toParty, String agentId, String msg) { + public static String makeTeamSendMsg(Collection toParty, String agentId, String msg) { String listParty = FuncUtils.mkString(toParty, "|"); return enterpriseWeChatTeamSendMsg.replaceAll("\\$toParty", listParty) .replaceAll("\\$agentId", agentId) @@ -115,9 +121,9 @@ public class EnterpriseWeChatUtils { * @param toUser * @param agentId * @param msg - * @return weixin send message + * @return Enterprise WeChat send message */ - public String makeUserSendMsg(String toUser, String agentId, String msg) { + public static String makeUserSendMsg(String toUser, String agentId, String msg) { return enterpriseWeChatUserSendMsg.replaceAll("\\$toUser", toUser) .replaceAll("\\$agentId", agentId) .replaceAll("\\$msg", msg); @@ -128,9 +134,9 @@ public class EnterpriseWeChatUtils { * @param toUser * @param agentId * @param msg - * @return weixin send message + * @return Enterprise WeChat send message */ - public String makeUserSendMsg(Collection toUser, String agentId, String msg) { + public static String makeUserSendMsg(Collection toUser, String agentId, String msg) { String listUser = FuncUtils.mkString(toUser, "|"); return enterpriseWeChatUserSendMsg.replaceAll("\\$toUser", listUser) .replaceAll("\\$agentId", agentId) @@ -138,14 +144,14 @@ public class EnterpriseWeChatUtils { } /** - * send weixin + * send Enterprise WeChat * @param charset * @param data * @param token - * @return weixin resp, demo: {"errcode":0,"errmsg":"ok","invaliduser":""} + * @return Enterprise WeChat resp, demo: {"errcode":0,"errmsg":"ok","invaliduser":""} * @throws IOException */ - public String sendQiyeWeixin(String charset, String data, String token) throws IOException { + public static String sendEnterpriseWeChat(String charset, String data, String token) throws IOException { String enterpriseWeChatPushUrlReplace = enterpriseWeChatPushUrl.replaceAll("\\$token", token); CloseableHttpClient httpclient = HttpClients.createDefault(); @@ -160,8 +166,83 @@ public class EnterpriseWeChatUtils { } finally { response.close(); } - logger.info("qiye weixin send [{}], param:{}, resp:{}", enterpriseWeChatPushUrl, data, resp); + logger.info("Enterprise WeChat send [{}], param:{}, resp:{}", enterpriseWeChatPushUrl, data, resp); return resp; } + /** + * convert table to markdown style + * @param title + * @param content + * @return + */ + public static String markdownTable(String title,String content){ + List mapItemsList = JSONUtils.toList(content, LinkedHashMap.class); + StringBuilder contents = new StringBuilder(200); + for (LinkedHashMap mapItems : mapItemsList){ + + Set> entries = mapItems.entrySet(); + + Iterator> iterator = entries.iterator(); + + StringBuilder t = new StringBuilder(String.format("`%s`%s",title,Constants.MARKDOWN_ENTER)); + while (iterator.hasNext()){ + + Map.Entry entry = iterator.next(); + t.append(Constants.MARKDOWN_QUOTE); + t.append(entry.getKey()).append(":").append(entry.getValue()); + t.append(Constants.MARKDOWN_ENTER); + } + + contents.append(t); + } + return contents.toString(); + } + + /** + * convert text to markdown style + * @param title + * @param content + * @return + */ + public static String markdownText(String title,String content){ + if (StringUtils.isNotEmpty(content)){ + List list; + try { + list = JSONUtils.toList(content,String.class); + }catch (Exception e){ + logger.error("json format exception",e); + return null; + } + + StringBuilder contents = new StringBuilder(100); + contents.append(String.format("`%s`\n",title)); + for (String str : list){ + contents.append(Constants.MARKDOWN_QUOTE); + contents.append(str); + contents.append(Constants.MARKDOWN_ENTER); + } + + return contents.toString(); + + } + return null; + } + + /** + * Determine the mardown style based on the show type of the alert + * @param alert + * @return + */ + public static String markdownByAlert(Alert alert){ + String result = ""; + if (alert.getShowType() == ShowType.TABLE) { + result = markdownTable(alert.getTitle(),alert.getContent()); + }else if(alert.getShowType() == ShowType.TEXT){ + result = markdownText(alert.getTitle(),alert.getContent()); + } + return result; + + } + } diff --git a/escheduler-alert/src/main/resources/alert.properties b/escheduler-alert/src/main/resources/alert.properties index 8a8b8a5365..87ccae6377 100644 --- a/escheduler-alert/src/main/resources/alert.properties +++ b/escheduler-alert/src/main/resources/alert.properties @@ -19,10 +19,12 @@ xls.file.path=/tmp/xls # Enterprise WeChat configuration enterprise.wechat.corp.id=xxxxxxx enterprise.wechat.secret=xxxxxxx +enterprise.wechat.agent.id=xxxxxxx +enterprise.wechat.users=xxxxxxx enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpId&corpsecret=$secret enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$token enterprise.wechat.team.send.msg={\"toparty\":\"$toParty\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"} -enterprise.wechat.user.send.msg={\"touser\":\"$toUser\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"} +enterprise.wechat.user.send.msg={\"touser\":\"$toUser\",\"agentid\":\"$agentId\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"$msg\"}} diff --git a/escheduler-alert/src/test/java/cn/escheduler/alert/utils/EnterpriseWeChatUtilsTest.java b/escheduler-alert/src/test/java/cn/escheduler/alert/utils/EnterpriseWeChatUtilsTest.java index 2a45196aed..646d34ed7b 100644 --- a/escheduler-alert/src/test/java/cn/escheduler/alert/utils/EnterpriseWeChatUtilsTest.java +++ b/escheduler-alert/src/test/java/cn/escheduler/alert/utils/EnterpriseWeChatUtilsTest.java @@ -16,11 +16,10 @@ */ package cn.escheduler.alert.utils; +import com.alibaba.fastjson.JSON; import org.junit.Assert; import org.junit.Test; -import com.alibaba.fastjson.JSON; - import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -33,24 +32,23 @@ import java.util.Collection; * enterprise.wechat.token.url * enterprise.wechat.push.url * enterprise.wechat.send.msg + * enterprise.wechat.agent.id + * enterprise.wechat.users */ public class EnterpriseWeChatUtilsTest { + private String agentId = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_AGENT_ID); // app id + private Collection listUserId = Arrays.asList(PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_USERS).split(",")); + // Please change - private String agentId = "1000002"; // app id private String partyId = "2"; private Collection listPartyId = Arrays.asList("2","4"); - private String userId = "test1"; - private Collection listUserId = Arrays.asList("test1","test2"); - @Test public void testSendSingleTeamWeChat() { - EnterpriseWeChatUtils wx = new EnterpriseWeChatUtils(); - try { - String token = wx.getToken(); - String msg = wx.makeTeamSendMsg(partyId, agentId, "hello world"); - String resp = wx.sendQiyeWeixin("utf-8", msg, token); + String token = EnterpriseWeChatUtils.getToken(); + String msg = EnterpriseWeChatUtils.makeTeamSendMsg(partyId, agentId, "hello world"); + String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token); String errmsg = JSON.parseObject(resp).getString("errmsg"); Assert.assertEquals(errmsg, "ok"); @@ -61,12 +59,11 @@ public class EnterpriseWeChatUtilsTest { @Test public void testSendMultiTeamWeChat() { - EnterpriseWeChatUtils wx = new EnterpriseWeChatUtils(); try { - String token = wx.getToken(); - String msg = wx.makeTeamSendMsg(listPartyId, agentId, "hello world"); - String resp = wx.sendQiyeWeixin("utf-8", msg, token); + String token = EnterpriseWeChatUtils.getToken(); + String msg = EnterpriseWeChatUtils.makeTeamSendMsg(listPartyId, agentId, "hello world"); + String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token); String errmsg = JSON.parseObject(resp).getString("errmsg"); Assert.assertEquals(errmsg, "ok"); @@ -77,12 +74,23 @@ public class EnterpriseWeChatUtilsTest { @Test public void testSendSingleUserWeChat() { - EnterpriseWeChatUtils wx = new EnterpriseWeChatUtils(); - try { - String token = wx.getToken(); - String msg = wx.makeUserSendMsg(userId, agentId, "hello world"); - String resp = wx.sendQiyeWeixin("utf-8", msg, token); + String token = EnterpriseWeChatUtils.getToken(); + String msg = EnterpriseWeChatUtils.makeUserSendMsg(listUserId.stream().findFirst().get(), agentId, "您的会议室已经预定,稍后会同步到`邮箱` \n" + + ">**事项详情** \n" + + ">事 项:开会
" + + ">组织者:@miglioguan \n" + + ">参与者:@miglioguan、@kunliu、@jamdeezhou、@kanexiong、@kisonwang \n" + + "> \n" + + ">会议室:广州TIT 1楼 301 \n" + + ">日 期:2018年5月18日 \n" + + ">时 间:上午9:00-11:00 \n" + + "> \n" + + ">请准时参加会议。 \n" + + "> \n" + + ">如需修改会议信息,请点击:[修改会议信息](https://work.weixin.qq.com)\""); + + String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token); String errmsg = JSON.parseObject(resp).getString("errmsg"); Assert.assertEquals(errmsg, "ok"); @@ -93,12 +101,11 @@ public class EnterpriseWeChatUtilsTest { @Test public void testSendMultiUserWeChat() { - EnterpriseWeChatUtils wx = new EnterpriseWeChatUtils(); - try { - String token = wx.getToken(); - String msg = wx.makeUserSendMsg(listUserId, agentId, "hello world"); - String resp = wx.sendQiyeWeixin("utf-8", msg, token); + String token = EnterpriseWeChatUtils.getToken(); + + String msg = EnterpriseWeChatUtils.makeUserSendMsg(listUserId, agentId, "hello world"); + String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token); String errmsg = JSON.parseObject(resp).getString("errmsg"); Assert.assertEquals(errmsg, "ok"); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java index d6872a278c..1938644724 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java @@ -24,7 +24,7 @@ import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.Result; import cn.escheduler.common.enums.*; import cn.escheduler.dao.model.User; -import io.swagger.annotations.Api; +import io.swagger.annotations.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -38,9 +38,9 @@ import static cn.escheduler.api.enums.Status.*; /** - * execute task controller + * execute process controller */ -@ApiIgnore +@Api(tags = "PROCESS_INSTANCE_EXECUTOR_TAG", position = 1) @RestController @RequestMapping("projects/{projectName}/executors") public class ExecutorController extends BaseController { @@ -53,10 +53,27 @@ public class ExecutorController extends BaseController { /** * execute process instance */ + @ApiOperation(value = "startProcessInstance", notes= "RUN_PROCESS_INSTANCE_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String"), + @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType ="FailureStrategy"), + @ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType ="String"), + @ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType ="TaskDependType"), + @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType ="CommandType"), + @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE",required = true, dataType ="WarningType"), + @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID",required = true, dataType ="Int", example = "100"), + @ApiImplicitParam(name = "receivers", value = "RECEIVERS",dataType ="String" ), + @ApiImplicitParam(name = "receiversCc", value = "RECEIVERS_CC",dataType ="String" ), + @ApiImplicitParam(name = "runMode", value = "RUN_MODE",dataType ="RunMode" ), + @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority" ), + @ApiImplicitParam(name = "workerGroupId", value = "WORKER_GROUP_ID", dataType = "Int",example = "100"), + @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int",example = "100"), + }) @PostMapping(value = "start-process-instance") @ResponseStatus(HttpStatus.OK) - public Result startProcessInstance(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @PathVariable String projectName, + public Result startProcessInstance(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, @RequestParam(value = "processDefinitionId") int processDefinitionId, @RequestParam(value = "scheduleTime", required = false) String scheduleTime, @RequestParam(value = "failureStrategy", required = true) FailureStrategy failureStrategy, @@ -102,10 +119,15 @@ public class ExecutorController extends BaseController { * @param processInstanceId * @return */ + @ApiOperation(value = "execute", notes= "EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType") + }) @PostMapping(value = "/execute") @ResponseStatus(HttpStatus.OK) - public Result execute(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @PathVariable String projectName, + public Result execute(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, @RequestParam("processInstanceId") Integer processInstanceId, @RequestParam("executeType") ExecuteType executeType ) { @@ -127,9 +149,13 @@ public class ExecutorController extends BaseController { * @param processDefinitionId * @return */ + @ApiOperation(value = "startCheckProcessDefinition", notes= "START_CHECK_PROCESS_DEFINITION_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100") + }) @PostMapping(value = "/start-check") @ResponseStatus(HttpStatus.OK) - public Result startCheckProcessDefinition(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, + public Result startCheckProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam(value = "processDefinitionId") int processDefinitionId) { logger.info("login user {}, check process definition", loginUser.getUserName(), processDefinitionId); try { @@ -149,9 +175,16 @@ public class ExecutorController extends BaseController { * @param processDefinitionId * @return */ + @ApiIgnore + @ApiOperation(value = "getReceiverCc", notes= "GET_RECEIVER_CC_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100") + + }) @GetMapping(value = "/get-receiver-cc") @ResponseStatus(HttpStatus.OK) - public Result getReceiverCc(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, + public Result getReceiverCc(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam(value = "processDefinitionId",required = false) Integer processDefinitionId, @RequestParam(value = "processInstanceId",required = false) Integer processInstanceId) { logger.info("login user {}, get process definition receiver and cc", loginUser.getUserName()); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java index 740fbc961c..9602ac6cef 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java @@ -191,6 +191,16 @@ public class ExecutorService extends BaseService{ return checkResult; } + // checkTenantExists(); + Tenant tenant = processDao.getTenantForProcess(processDefinition.getTenantId(), + processDefinition.getUserId()); + if(tenant == null){ + logger.error("there is not any vaild tenant for the process definition: id:{},name:{}, ", + processDefinition.getId(), processDefinition.getName()); + putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); + return result; + } + switch (executeType) { case REPEAT_RUNNING: result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING); @@ -260,7 +270,7 @@ public class ExecutorService extends BaseService{ } break; case RECOVER_SUSPENDED_PROCESS: - if (executionStatus.typeIsPause()) { + if (executionStatus.typeIsPause()|| executionStatus.typeIsCancel()) { checkResult = true; } default: diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java index 3cbf5f1414..9ba29e0555 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java @@ -38,10 +38,7 @@ import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.common.utils.placeholder.BusinessTimeUtils; import cn.escheduler.dao.ProcessDao; -import cn.escheduler.dao.mapper.ProcessDefinitionMapper; -import cn.escheduler.dao.mapper.ProcessInstanceMapper; -import cn.escheduler.dao.mapper.ProjectMapper; -import cn.escheduler.dao.mapper.TaskInstanceMapper; +import cn.escheduler.dao.mapper.*; import cn.escheduler.dao.model.*; import com.alibaba.fastjson.JSON; import org.apache.commons.lang3.StringUtils; @@ -97,6 +94,9 @@ public class ProcessInstanceService extends BaseDAGService { @Autowired LoggerService loggerService; + @Autowired + WorkerGroupMapper workerGroupMapper; + /** * query process instance by id * @@ -115,6 +115,15 @@ public class ProcessInstanceService extends BaseDAGService { return checkResult; } ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processId); + if(processInstance.getWorkerGroupId() == -1){ + processInstance.setWorkerGroupName(DEFAULT); + }else{ + WorkerGroup workerGroup = workerGroupMapper.queryById(processInstance.getWorkerGroupId()); + processInstance.setWorkerGroupName(workerGroup.getName()); + } + ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId()); + processInstance.setReceivers(processDefinition.getReceivers()); + processInstance.setReceiversCc(processDefinition.getReceiversCc()); result.put(Constants.DATA_LIST, processInstance); putMsg(result, Status.SUCCESS); diff --git a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java index 62332cbb75..7cd077864c 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java @@ -219,6 +219,11 @@ public final class Constants { */ public static final String SEMICOLON = ";"; + /** + * DOT . + */ + public static final String DOT = "."; + /** * ZOOKEEPER_SESSION_TIMEOUT */ @@ -483,6 +488,8 @@ public final class Constants { public static final String TASK_RECORD_PWD = "task.record.datasource.password"; + public static final String DEFAULT = "Default"; + public static String TASK_RECORD_TABLE_HIVE_LOG = "eamp_hive_log_hd"; public static String TASK_RECORD_TABLE_HISTORY_HIVE_LOG = "eamp_hive_hist_log_hd"; @@ -883,6 +890,11 @@ public final class Constants { */ public static final String LOGIN_USER_KEY_TAB_USERNAME = "login.user.keytab.username"; + /** + * default worker group id + */ + public static final int DEFAULT_WORKER_ID = -1; + /** * loginUserFromKeytab path */ diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java index 106d6ff915..6f6e979797 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java @@ -24,20 +24,17 @@ public interface ITaskQueue { /** * take out all the elements * - * this method has deprecated - * use checkTaskExists instead * * @param key * @return */ - @Deprecated List getAllTasks(String key); /** * check task exists in the task queue or not * * @param key queue name - * @param task ${priority}_${processInstanceId}_${taskId} + * @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} * @return true if exists in the queue */ boolean checkTaskExists(String key, String task); @@ -54,10 +51,10 @@ public interface ITaskQueue { * an element pops out of the queue * * @param key queue name - * @param remove whether remove the element + * @param n how many elements to poll * @return */ - String poll(String key, boolean remove); + List poll(String key, int n); /** * remove a element from queue diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java index c8931064af..2d17481da4 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java @@ -42,7 +42,7 @@ public class TaskQueueFactory { public static ITaskQueue getTaskQueueInstance() { String queueImplValue = CommonUtils.getQueueImplValue(); if (StringUtils.isNotBlank(queueImplValue)) { -// queueImplValue = StringUtils.trim(queueImplValue); +// queueImplValue = IpUtils.trim(queueImplValue); // if (SCHEDULER_QUEUE_REDIS_IMPL.equals(queueImplValue)) { // logger.info("task queue impl use reids "); diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java index 28f696aa6e..e116cd3271 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java @@ -19,6 +19,8 @@ package cn.escheduler.common.queue; import cn.escheduler.common.Constants; import cn.escheduler.common.utils.Bytes; +import cn.escheduler.common.utils.IpUtils; +import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.zk.AbstractZKClient; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; @@ -26,10 +28,7 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; /** * A singleton of a task queue implemented with zookeeper @@ -62,7 +61,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * @param key task queue name * @return */ - @Deprecated @Override public List getAllTasks(String key) { try { @@ -80,7 +78,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * check task exists in the task queue or not * * @param key queue name - * @param task ${priority}_${processInstanceId}_${taskId} + * @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} * @return true if exists in the queue */ @Override @@ -110,7 +108,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * add task to tasks queue * * @param key task queue name - * @param value ${priority}_${processInstanceId}_${taskId} + * @param value ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,... */ @Override public void add(String key, String value) { @@ -118,9 +116,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value; String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value)); -// String path = conf.getString(Constants.ZOOKEEPER_SCHEDULER_ROOT) + Constants.SINGLE_SLASH + Constants.SCHEDULER_TASKS_QUEUE + "_add" + Constants.SINGLE_SLASH + value; -// getZkClient().create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, -// Bytes.toBytes(value)); logger.info("add task : {} to tasks queue , result success",result); } catch (Exception e) { logger.error("add task to tasks queue exception",e); @@ -132,16 +127,16 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { /** * An element pops out of the queue

* note: - * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} + * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,... * The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low. * - * 流程实例优先级_流程实例id_任务优先级_任务id high <- low + * 流程优先级_流程实例id_任务优先级_任务id_任务执行的机器id1,任务执行的机器id2,... high <- low * @param key task queue name - * @param remove whether remove the element - * @return the task id to be executed + * @param tasksNum how many elements to poll + * @return the task ids to be executed */ @Override - public String poll(String key, boolean remove) { + public List poll(String key, int tasksNum) { try{ CuratorFramework zk = getZkClient(); String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH; @@ -149,55 +144,83 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { if(list != null && list.size() > 0){ + String workerIp = OSUtils.getHost(); + String workerIpLongStr = String.valueOf(IpUtils.ipToLong(workerIp)); + int size = list.size(); - String formatTargetTask = null; - String targetTaskKey = null; + + Set taskTreeSet = new TreeSet<>(); + for (int i = 0; i < size; i++) { + String taskDetail = list.get(i); String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE); - if(taskDetailArrs.length == 4){ + //向前版本兼容 + if(taskDetailArrs.length >= 4){ + //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} String formatTask = String.format("%s_%010d_%s_%010d", taskDetailArrs[0], Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2], Long.parseLong(taskDetailArrs[3])); - if(i > 0){ - int result = formatTask.compareTo(formatTargetTask); - if(result < 0){ - formatTargetTask = formatTask; - targetTaskKey = taskDetail; + if(taskDetailArrs.length > 4){ + String taskHosts = taskDetailArrs[4]; + + //task can assign to any worker host if equals default ip value of worker server + if(!taskHosts.equals(String.valueOf(Constants.DEFAULT_WORKER_ID))){ + String[] taskHostsArr = taskHosts.split(Constants.COMMA); + if(!Arrays.asList(taskHostsArr).contains(workerIpLongStr)){ + continue; + } } - }else{ - formatTargetTask = formatTask; - targetTaskKey = taskDetail; } - }else{ - logger.error("task queue poll error, task detail :{} , please check!", taskDetail); + + taskTreeSet.add(formatTask); + } - } - if(formatTargetTask != null){ - String taskIdPath = tasksQueuePath + targetTaskKey; + } - logger.info("consume task {}", taskIdPath); + List taskslist = getTasksListFromTreeSet(tasksNum, taskTreeSet); - String[] vals = targetTaskKey.split(Constants.UNDERLINE); + logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(taskslist.toArray()), size - taskslist.size()); - if(remove){ - removeNode(key, targetTaskKey); - } - logger.info("consume task: {},there still have {} tasks need to be executed", vals[vals.length - 1], size - 1); - return targetTaskKey; - }else{ - logger.error("should not go here, task queue poll error, please check!"); - } + return taskslist; + }else{ + Thread.sleep(Constants.SLEEP_TIME_MILLIS); } } catch (Exception e) { logger.error("add task to tasks queue exception",e); } - return null; + return new ArrayList(); } + + /** + * get task list from tree set + * + * @param tasksNum + * @param taskTreeSet + */ + public List getTasksListFromTreeSet(int tasksNum, Set taskTreeSet) { + Iterator iterator = taskTreeSet.iterator(); + int j = 0; + List taskslist = new ArrayList<>(tasksNum); + while(iterator.hasNext()){ + if(j++ < tasksNum){ + String task = iterator.next(); + String[] taskArray = task.split(Constants.UNDERLINE); + int processInstanceId = Integer.parseInt(taskArray[1]); + int taskId = Integer.parseInt(taskArray[3]); + String destTask = taskArray[0]+Constants.UNDERLINE + processInstanceId + Constants.UNDERLINE + + taskArray[2] + Constants.UNDERLINE + taskId; + taskslist.add(destTask); + } + } + return taskslist; + } + + @Override public void removeNode(String key, String nodeValue){ diff --git a/escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java new file mode 100644 index 0000000000..ddc520a876 --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.utils; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * http utils + */ +public class IpUtils { + + private static final Logger logger = LoggerFactory.getLogger(IpUtils.class); + public static final String DOT = "."; + + /** + * ip str to long

+ * + * @param ipStr ip string + */ + public static Long ipToLong(String ipStr) { + String[] ipSet = ipStr.split("\\" + DOT); + + return Long.parseLong(ipSet[0]) << 24 | Long.parseLong(ipSet[1]) << 16 | Long.parseLong(ipSet[2]) << 8 | Long.parseLong(ipSet[3]); + } + + /** + * long to ip + * @param ipLong the long number converted from IP + * @return String + */ + public static String longToIp(long ipLong) { + long[] ipNumbers = new long[4]; + long tmp = 0xFF; + ipNumbers[0] = ipLong >> 24 & tmp; + ipNumbers[1] = ipLong >> 16 & tmp; + ipNumbers[2] = ipLong >> 8 & tmp; + ipNumbers[3] = ipLong & tmp; + + StringBuilder sb = new StringBuilder(16); + sb.append(ipNumbers[0]).append(DOT) + .append(ipNumbers[1]).append(DOT) + .append(ipNumbers[2]).append(DOT) + .append(ipNumbers[3]); + return sb.toString(); + } + + + + public static void main(String[] args){ + long ipLong = ipToLong("11.3.4.5"); + logger.info(longToIp(ipLong)); + } +} diff --git a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java index 177669b43c..e2f064be13 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java @@ -312,7 +312,11 @@ public abstract class AbstractZKClient { childrenList = zkClient.getChildren().forPath(masterZNodeParentPath); } } catch (Exception e) { - logger.warn(e.getMessage(),e); +// logger.warn(e.getMessage()); + if(!e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){ + logger.warn(e.getMessage(),e); + } + return childrenList.size(); } return childrenList.size(); diff --git a/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java b/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java index 7d35bc8480..4bf152bbf2 100644 --- a/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java +++ b/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java @@ -37,6 +37,12 @@ public class OSUtilsTest { // static HardwareAbstractionLayer hal = si.getHardware(); + @Test + public void getHost(){ + logger.info(OSUtils.getHost()); + } + + @Test public void memoryUsage() { logger.info("memoryUsage : {}", OSUtils.memoryUsage());// 0.3361799418926239 diff --git a/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java b/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java index 03ba29a840..72a6e46200 100644 --- a/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java +++ b/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java @@ -17,12 +17,15 @@ package cn.escheduler.common.queue; import cn.escheduler.common.Constants; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; +import java.util.List; import java.util.Random; import static org.junit.Assert.assertEquals; @@ -34,59 +37,62 @@ public class TaskQueueImplTest { private static final Logger logger = LoggerFactory.getLogger(TaskQueueImplTest.class); + ITaskQueue tasksQueue = null; - @Test - public void testTaskQueue(){ + @Before + public void before(){ + tasksQueue = TaskQueueFactory.getTaskQueueInstance(); + //clear all data + tasksQueue.delete(); - ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); + } + + + @After + public void after(){ //clear all data tasksQueue.delete(); + } + + + @Test + public void testAdd(){ //add - tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1"); - tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"2"); - tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"3"); - tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"4"); + tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_1_1_2130706433,3232236775"); + tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"0_1_1_1_2130706433,3232236775"); + tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_0_1_2130706433,3232236775"); + tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_2_1_1_2130706433,3232236775"); + + List tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1); + + if(tasks.size() < 0){ + return; + } //pop - String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); - assertEquals(node1,"1"); - String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); - assertEquals(node2,"2"); - - //sadd - String task1 = "1.1.1.1-1-mr"; - String task2 = "1.1.1.2-2-mr"; - String task3 = "1.1.1.3-3-mr"; - String task4 = "1.1.1.4-4-mr"; - String task5 = "1.1.1.5-5-mr"; - - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task1); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task2); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task3); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task4); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5); //repeat task - - Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),5); - logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray())); - //srem - tasksQueue.srem(Constants.SCHEDULER_TASKS_KILL,task5); - //smembers - Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),4); - logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray())); + String node1 = tasks.get(0); + assertEquals(node1,"0_0000000001_1_0000000001"); + + tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1); + + if(tasks.size() < 0){ + return; + } + + String node2 = tasks.get(0); + assertEquals(node2,"0_0000000001_1_0000000001"); } + + /** * test one million data from zookeeper queue */ @Test public void extremeTest(){ - ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); - //clear all data - tasksQueue.delete(); int total = 30 * 10000; for(int i = 0; i < total; i++) @@ -99,14 +105,9 @@ public class TaskQueueImplTest { } } - String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); + String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0); assertEquals(node1,"0"); - //clear all data - tasksQueue.delete(); - - - } } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index 17ca727340..8ba0f47960 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -25,6 +25,7 @@ import cn.escheduler.common.queue.ITaskQueue; import cn.escheduler.common.queue.TaskQueueFactory; import cn.escheduler.common.task.subprocess.SubProcessParameters; import cn.escheduler.common.utils.DateUtils; +import cn.escheduler.common.utils.IpUtils; import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.dao.mapper.*; @@ -117,7 +118,7 @@ public class ProcessDao extends AbstractBaseDao { */ @Override protected void init() { - userMapper=getMapper(UserMapper.class); + userMapper = getMapper(UserMapper.class); processDefineMapper = getMapper(ProcessDefinitionMapper.class); processInstanceMapper = getMapper(ProcessInstanceMapper.class); dataSourceMapper = getMapper(DataSourceMapper.class); @@ -1015,11 +1016,58 @@ public class ProcessDao extends AbstractBaseDao { * * 流程实例优先级_流程实例id_任务优先级_任务id high <- low * - * @param task + * @param taskInstance * @return */ - private String taskZkInfo(TaskInstance task) { - return String.valueOf(task.getProcessInstancePriority().ordinal()) + Constants.UNDERLINE + task.getProcessInstanceId() + Constants.UNDERLINE + task.getTaskInstancePriority().ordinal() + Constants.UNDERLINE + task.getId(); + private String taskZkInfo(TaskInstance taskInstance) { + + int taskWorkerGroupId = getTaskWorkerGroupId(taskInstance); + + StringBuilder sb = new StringBuilder(100); + + sb.append(taskInstance.getProcessInstancePriority().ordinal()).append(Constants.UNDERLINE) + .append(taskInstance.getProcessInstanceId()).append(Constants.UNDERLINE) + .append(taskInstance.getTaskInstancePriority().ordinal()).append(Constants.UNDERLINE) + .append(taskInstance.getId()).append(Constants.UNDERLINE); + + if(taskWorkerGroupId > 0){ + //not to find data from db + WorkerGroup workerGroup = queryWorkerGroupById(taskWorkerGroupId); + if(workerGroup == null ){ + logger.info("task {} cannot find the worker group, use all worker instead.", taskInstance.getId()); + + sb.append(Constants.DEFAULT_WORKER_ID); + return sb.toString(); + } + + String ips = workerGroup.getIpList(); + + if(StringUtils.isBlank(ips)){ + logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers", + taskInstance.getId(), workerGroup.getId()); + sb.append(Constants.DEFAULT_WORKER_ID); + return sb.toString(); + } + + StringBuilder ipSb = new StringBuilder(100); + String[] ipArray = ips.split(COMMA); + + for (String ip : ipArray) { + long ipLong = IpUtils.ipToLong(ip); + ipSb.append(ipLong).append(COMMA); + } + + if(ipSb.length() > 0) { + ipSb.deleteCharAt(ipSb.length() - 1); + } + + sb.append(ipSb); + }else{ + sb.append(Constants.DEFAULT_WORKER_ID); + } + + + return sb.toString(); } /** @@ -1683,5 +1731,24 @@ public class ProcessDao extends AbstractBaseDao { } + /** + * get task worker group id + * + * @param taskInstance + * @return + */ + public int getTaskWorkerGroupId(TaskInstance taskInstance) { + int taskWorkerGroupId = taskInstance.getWorkerGroupId(); + ProcessInstance processInstance = findProcessInstanceByTaskId(taskInstance.getId()); + if(processInstance == null){ + logger.error("cannot find the task:{} process instance", taskInstance.getId()); + return Constants.DEFAULT_WORKER_ID; + } + int processWorkerGroupId = processInstance.getWorkerGroupId(); + + taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId); + return taskWorkerGroupId; + } + } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java index 24902c0121..5c9418ca72 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java @@ -194,6 +194,21 @@ public class ProcessInstance { */ private int tenantId; + /** + * worker group name. for api. + */ + private String workerGroupName; + + /** + * receivers for api + */ + private String receivers; + + /** + * receivers cc for api + */ + private String receiversCc; + public ProcessInstance(){ } @@ -560,4 +575,28 @@ public class ProcessInstance { public int getTenantId() { return this.tenantId ; } + + public String getWorkerGroupName() { + return workerGroupName; + } + + public void setWorkerGroupName(String workerGroupName) { + this.workerGroupName = workerGroupName; + } + + public String getReceivers() { + return receivers; + } + + public void setReceivers(String receivers) { + this.receivers = receivers; + } + + public String getReceiversCc() { + return receiversCc; + } + + public void setReceiversCc(String receiversCc) { + this.receiversCc = receiversCc; + } } diff --git a/escheduler-server/pom.xml b/escheduler-server/pom.xml index ad21578d6c..6341539e4c 100644 --- a/escheduler-server/pom.xml +++ b/escheduler-server/pom.xml @@ -89,7 +89,7 @@ escheduler-alert - + diff --git a/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java b/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java index e137824814..bf0dcbfe75 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java @@ -216,7 +216,7 @@ public class MasterServer implements CommandLineRunner, IStoppable { if(Stopper.isRunning()) { // send heartbeat to zk if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) { - logger.error("master send heartbeat to zk failed"); + logger.error("master send heartbeat to zk failed: can't find zookeeper regist path of master server"); return; } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java index 1c6232bc9a..a960570ea5 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java @@ -25,8 +25,9 @@ import cn.escheduler.common.utils.OSUtils; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.model.*; import cn.escheduler.server.zk.ZKWorkerClient; -import com.cronutils.utils.StringUtils; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,15 +99,7 @@ public class FetchTaskThread implements Runnable{ */ private boolean checkWorkerGroup(TaskInstance taskInstance, String host){ - int taskWorkerGroupId = taskInstance.getWorkerGroupId(); - ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId()); - if(processInstance == null){ - logger.error("cannot find the task:{} process instance", taskInstance.getId()); - return false; - } - int processWorkerGroupId = processInstance.getWorkerGroupId(); - - taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId); + int taskWorkerGroupId = processDao.getTaskWorkerGroupId(taskInstance); if(taskWorkerGroupId <= 0){ return true; @@ -117,99 +110,103 @@ public class FetchTaskThread implements Runnable{ return true; } String ips = workerGroup.getIpList(); - if(ips == null){ + if(StringUtils.isBlank(ips)){ logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers", taskInstance.getId(), workerGroup.getId()); } - String[] ipArray = ips.split(","); + String[] ipArray = ips.split(Constants.COMMA); List ipList = Arrays.asList(ipArray); return ipList.contains(host); } + + @Override public void run() { while (Stopper.isRunning()){ InterProcessMutex mutex = null; try { - if(OSUtils.checkResource(this.conf, false)) { - // creating distributed locks, lock path /escheduler/lock/worker - String zNodeLockPath = zkWorkerClient.getWorkerLockPath(); - mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath); - mutex.acquire(); + ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService; - ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService; + //check memory and cpu usage and threads + if(OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor)) { - for (int i = 0; i < taskNum; i++) { - - int activeCount = poolExecutor.getActiveCount(); - if (activeCount >= workerExecNums) { - logger.info("thread insufficient , activeCount : {} , workerExecNums : {}",activeCount,workerExecNums); - continue; - } + //whether have tasks, if no tasks , no need lock //get all tasks + List tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE); + if(tasksQueueList.size() > 0){ + // creating distributed locks, lock path /escheduler/lock/worker + String zNodeLockPath = zkWorkerClient.getWorkerLockPath(); + mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath); + mutex.acquire(); // task instance id str - String taskQueueStr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); + List taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum); - if (!StringUtils.isEmpty(taskQueueStr )) { + for(String taskQueueStr : taskQueueStrArr){ + if (StringUtils.isNotBlank(taskQueueStr )) { - String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE); - String taskInstIdStr = taskStringArray[taskStringArray.length - 1]; - Date now = new Date(); - Integer taskId = Integer.parseInt(taskInstIdStr); + if (!checkThreadCount(poolExecutor)) { + break; + } - // find task instance by task id - TaskInstance taskInstance = processDao.findTaskInstanceById(taskId); + String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE); + String taskInstIdStr = taskStringArray[taskStringArray.length - 1]; + Date now = new Date(); + Integer taskId = Integer.parseInt(taskInstIdStr); - logger.info("worker fetch taskId : {} from queue ", taskId); + // find task instance by task id + TaskInstance taskInstance = processDao.findTaskInstanceById(taskId); - int retryTimes = 30; - // mainly to wait for the master insert task to succeed - while (taskInstance == null && retryTimes > 0) { - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - taskInstance = processDao.findTaskInstanceById(taskId); - retryTimes--; - } + logger.info("worker fetch taskId : {} from queue ", taskId); - if (taskInstance == null ) { - logger.error("task instance is null. task id : {} ", taskId); - continue; - } - if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){ - continue; - } - taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr); - logger.info("remove task:{} from queue", taskQueueStr); + int retryTimes = 30; + // mainly to wait for the master insert task to succeed + while (taskInstance == null && retryTimes > 0) { + Thread.sleep(Constants.SLEEP_TIME_MILLIS); + taskInstance = processDao.findTaskInstanceById(taskId); + retryTimes--; + } + + if (taskInstance == null ) { + logger.error("task instance is null. task id : {} ", taskId); + continue; + } - // set execute task worker host - taskInstance.setHost(OSUtils.getHost()); - taskInstance.setStartTime(now); + if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){ + continue; + } + taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr); + logger.info("remove task:{} from queue", taskQueueStr); + // set execute task worker host + taskInstance.setHost(OSUtils.getHost()); + taskInstance.setStartTime(now); - // get process instance - ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); + // get process instance + ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); - // get process define - ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId()); + // get process define + ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId()); - taskInstance.setProcessInstance(processInstance); - taskInstance.setProcessDefine(processDefine); + taskInstance.setProcessInstance(processInstance); + taskInstance.setProcessDefine(processDefine); - // get local execute path - String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(), - processDefine.getId(), - processInstance.getId(), - taskInstance.getId()); - logger.info("task instance local execute path : {} ", execLocalPath); + // get local execute path + String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(), + processDefine.getId(), + processInstance.getId(), + taskInstance.getId()); + logger.info("task instance local execute path : {} ", execLocalPath); - // set task execute path - taskInstance.setExecutePath(execLocalPath); + // set task execute path + taskInstance.setExecutePath(execLocalPath); Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(), processDefine.getUserId()); @@ -218,21 +215,22 @@ public class FetchTaskThread implements Runnable{ FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, tenant.getTenantCode(), logger); - logger.info("task : {} ready to submit to task scheduler thread",taskId); - // submit task - workerExecService.submit(new TaskScheduleThread(taskInstance, processDao)); + logger.info("task : {} ready to submit to task scheduler thread",taskId); + // submit task + workerExecService.submit(new TaskScheduleThread(taskInstance, processDao)); + } } } + } Thread.sleep(Constants.SLEEP_TIME_MILLIS); }catch (Exception e){ logger.error("fetch task thread exception : " + e.getMessage(),e); - } - finally { + }finally { if (mutex != null){ try { mutex.release(); @@ -247,4 +245,18 @@ public class FetchTaskThread implements Runnable{ } } } + + /** + * + * @param poolExecutor + * @return + */ + private boolean checkThreadCount(ThreadPoolExecutor poolExecutor) { + int activeCount = poolExecutor.getActiveCount(); + if (activeCount >= workerExecNums) { + logger.info("thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource", activeCount, workerExecNums, Constants.SLEEP_TIME_MILLIS); + return false; + } + return true; + } } \ No newline at end of file diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java index 26d682f132..09f6467aad 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java @@ -387,7 +387,7 @@ public class SqlTask extends AbstractTask { String showTypeName = sqlParameters.getShowType().replace(Constants.COMMA,"").trim(); if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){ Map mailResult = MailUtils.sendMails(receviersList, receviersCcList, title, content, ShowType.valueOf(showTypeName)); - if(!(Boolean) mailResult.get(Constants.STATUS)){ + if(!(Boolean) mailResult.get(cn.escheduler.common.Constants.STATUS)){ throw new RuntimeException("send mail failed!"); } }else{ diff --git a/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue b/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue index 22d48782d2..914bab2812 100644 --- a/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue +++ b/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue @@ -6,7 +6,7 @@

IP: {{item.host}} - {{$t('Port')}}: {{item.port}} + {{$t('Process Pid')}}: {{item.port}} {{$t('Zk registration directory')}}: {{item.zkDirectory}}
@@ -93,4 +93,4 @@ \ No newline at end of file + diff --git a/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue b/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue index 3cf0993415..960beeb14a 100644 --- a/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue +++ b/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue @@ -6,7 +6,7 @@
IP: {{item.host}} - {{$t('Port')}}: {{item.port}} + {{$t('Process Pid')}}: {{item.port}} {{$t('Zk registration directory')}}: {{item.zkDirectory}}
@@ -94,4 +94,4 @@ \ No newline at end of file + diff --git a/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js b/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js index db6c8aa261..2259dea9cd 100644 --- a/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js +++ b/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js @@ -37,7 +37,7 @@ let warningTypeList = [ ] const isEmial = (val) => { - let regEmail = /^([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line + let regEmail = /^([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line return regEmail.test(val) } diff --git a/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue b/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue index 2bc1cad066..619407a61a 100644 --- a/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue +++ b/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue @@ -95,17 +95,17 @@ shape="circle" size="xsmall" data-toggle="tooltip" - :title="$t('Stop')" - @click="_stop(item)" - icon="iconfont icon-zanting1" - :disabled="item.state !== 'RUNNING_EXEUTION'"> + :title="item.state === 'STOP' ? $t('Recovery Suspend') : $t('Stop')" + @click="_stop(item,$index)" + :icon="item.state === 'STOP' ? 'iconfont icon-ai06' : 'iconfont icon-zanting'" + :disabled="item.state !== 'RUNNING_EXEUTION' && item.state != 'STOP'"> - {{item.count}}s + {{item.count}} - {{item.count}}s + {{item.count}} - - + + + + + + + - {{item.count}}s + {{item.count}} + + + @@ -362,11 +371,20 @@ * stop * @param STOP */ - _stop (item) { - this._upExecutorsState({ - processInstanceId: item.id, - executeType: 'STOP' - }) + _stop (item, index) { + if(item.state == 'STOP') { + this._countDownFn({ + id: item.id, + executeType: 'RECOVER_SUSPENDED_PROCESS', + index: index, + buttonType: 'suspend' + }) + } else { + this._upExecutorsState({ + processInstanceId: item.id, + executeType: 'STOP' + }) + } }, /** * pause @@ -383,7 +401,7 @@ } else { this._upExecutorsState({ processInstanceId: item.id, - executeType: item.state === 'PAUSE' ? 'RECOVER_SUSPENDED_PROCESS' : 'PAUSE' + executeType: 'PAUSE' }) } }, @@ -435,7 +453,7 @@ if (data.length) { _.map(data, v => { v.disabled = true - v.count = 10 + v.count = 9 }) } return data diff --git a/escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue b/escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue index b02db7848e..574a995ec4 100644 --- a/escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue +++ b/escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue @@ -131,7 +131,7 @@ } }, _verification () { - let regEmail = /^([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line + let regEmail = /^([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line // Mobile phone number regular let regPhone = /^1(3|4|5|6|7|8)\d{9}$/; // eslint-disable-line diff --git a/escheduler-ui/src/js/module/i18n/locale/zh_CN.js b/escheduler-ui/src/js/module/i18n/locale/zh_CN.js index c21dee915c..1186a353ac 100644 --- a/escheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/escheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -237,7 +237,7 @@ export default { 'Recovery Failed': '恢复失败', 'Stop': '停止', 'Pause': '暂停', - 'Recovery Suspend': '恢复暂停', + 'Recovery Suspend': '恢复运行', 'Gantt': '甘特图', 'Name': '名称', 'Node Type': '节点类型', @@ -282,7 +282,7 @@ export default { 'Start Process': '启动工作流', 'Execute from the current node': '从当前节点开始执行', 'Recover tolerance fault process': '恢复被容错的工作流', - 'Resume the suspension process': '恢复暂停流程', + 'Resume the suspension process': '恢复运行流程', 'Execute from the failed nodes': '从失败节点开始执行', 'Complement Data': '补数', 'Scheduling execution': '调度执行', diff --git a/install.sh b/install.sh index a80c3198a9..d58be482fd 100644 --- a/install.sh +++ b/install.sh @@ -106,6 +106,18 @@ sslEnable="true" # 下载Excel路径 xlsFilePath="/tmp/xls" +# 企业微信企业ID配置 +enterpriseWechatCorpId="xxxxxxxxxx" + +# 企业微信应用Secret配置 +enterpriseWechatSecret="xxxxxxxxxx" + +# 企业微信应用AgentId配置 +enterpriseWechatAgentId="xxxxxxxxxx" + +# 企业微信用户配置,多个用户以,分割 +enterpriseWechatUsers="xxxxx,xxxxx" + #是否启动监控自启动脚本 monitorServerState="false" @@ -318,7 +330,7 @@ sed -i ${txt} "s#master.exec.task.number.*#master.exec.task.number=${masterExecT sed -i ${txt} "s#master.heartbeat.interval.*#master.heartbeat.interval=${masterHeartbeatInterval}#g" conf/master.properties sed -i ${txt} "s#master.task.commit.retryTimes.*#master.task.commit.retryTimes=${masterTaskCommitRetryTimes}#g" conf/master.properties sed -i ${txt} "s#master.task.commit.interval.*#master.task.commit.interval=${masterTaskCommitInterval}#g" conf/master.properties -sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties +#sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties sed -i ${txt} "s#master.reserved.memory.*#master.reserved.memory=${masterReservedMemory}#g" conf/master.properties @@ -345,6 +357,10 @@ sed -i ${txt} "s#mail.passwd.*#mail.passwd=${mailPassword}#g" conf/alert.propert sed -i ${txt} "s#mail.smtp.starttls.enable.*#mail.smtp.starttls.enable=${starttlsEnable}#g" conf/alert.properties sed -i ${txt} "s#mail.smtp.ssl.enable.*#mail.smtp.ssl.enable=${sslEnable}#g" conf/alert.properties sed -i ${txt} "s#xls.file.path.*#xls.file.path=${xlsFilePath}#g" conf/alert.properties +sed -i ${txt} "s#enterprise.wechat.corp.id.*#enterprise.wechat.corp.id=${enterpriseWechatCorpId}#g" conf/alert.properties +sed -i ${txt} "s#enterprise.wechat.secret.*#enterprise.wechat.secret=${enterpriseWechatSecret}#g" conf/alert.properties +sed -i ${txt} "s#enterprise.wechat.agent.id.*#enterprise.wechat.agent.id=${enterpriseWechatAgentId}#g" conf/alert.properties +sed -i ${txt} "s#enterprise.wechat.users.*#enterprise.wechat.users=${enterpriseWechatUsers}#g" conf/alert.properties sed -i ${txt} "s#installPath.*#installPath=${installPath}#g" conf/config/install_config.conf