diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/AlertDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/AlertDao.java index dcc87eee80..bf7ac14c27 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/AlertDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/AlertDao.java @@ -23,6 +23,8 @@ import cn.escheduler.dao.datasource.ConnectionFactory; import cn.escheduler.dao.mapper.AlertMapper; import cn.escheduler.dao.mapper.UserAlertGroupMapper; import cn.escheduler.dao.model.Alert; +import cn.escheduler.dao.model.ProcessDefinition; +import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.User; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -83,8 +85,9 @@ public class AlertDao extends AbstractBaseDao { */ public void sendServerStopedAlert(int alertgroupId,String host,String serverType){ Alert alert = new Alert(); - String content = String.format("[{'type':'%s','host':'%s','event':'服务挂掉','警告级别':'严重'}]",serverType,host); - alert.setTitle("容错告警"); + String content = String.format("[{'type':'%s','host':'%s','event':'server down','warning level':'serious'}]", + serverType, host); + alert.setTitle("Fault tolerance warning"); alert.setShowType(ShowType.TABLE); alert.setContent(content); alert.setAlertType(AlertType.EMAIL); @@ -94,6 +97,34 @@ public class AlertDao extends AbstractBaseDao { alertMapper.insert(alert); } + /** + * process time out alert + * @param processInstance + * @param processDefinition + */ + public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition){ + int alertgroupId = processInstance.getWarningGroupId(); + String receivers = processDefinition.getReceivers(); + String receiversCc = processDefinition.getReceiversCc(); + Alert alert = new Alert(); + String content = String.format("[{'id':'%d','name':'%s','event':'timeout','warnLevel':'middle'}]", + processInstance.getId(), processInstance.getName()); + alert.setTitle("Process Timeout Warn"); + alert.setShowType(ShowType.TABLE); + alert.setContent(content); + alert.setAlertType(AlertType.EMAIL); + alert.setAlertGroupId(alertgroupId); + if (StringUtils.isNotEmpty(receivers)) { + alert.setReceivers(receivers); + } + if (StringUtils.isNotEmpty(receiversCc)) { + alert.setReceiversCc(receiversCc); + } + alert.setCreateTime(new Date()); + alert.setUpdateTime(new Date()); + alertMapper.insert(alert); + } + /** * task timeout warn */ diff --git a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java index b5ed0053a2..ae170b8925 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java @@ -258,7 +258,7 @@ public class MasterExecThread implements Runnable { processDao.createRecoveryWaitingThreadCommand(null, processInstance); } List taskInstances = processDao.findValidTaskListByProcessId(processInstance.getId()); - alertManager.sendWarnningOfProcessInstance(processInstance, taskInstances); + alertManager.sendAlertProcessInstance(processInstance, taskInstances); } @@ -775,8 +775,15 @@ public class MasterExecThread implements Runnable { private void runProcess(){ // submit start node submitPostNode(null); - // submitStandByTask(); + boolean sendTimeWarning = false; while(!processInstance.IsProcessInstanceStop()){ + + // send warning email if process time out. + if( !sendTimeWarning && checkProcessTimeOut(processInstance) ){ + alertManager.sendProcessTimeoutAlert(processInstance, + processDao.findProcessDefineById(processInstance.getProcessDefinitionId())); + sendTimeWarning = true; + } Set keys = activeTaskNode.keySet(); for (MasterBaseTaskExecThread taskExecThread : keys) { Future future = activeTaskNode.get(taskExecThread); @@ -821,7 +828,7 @@ public class MasterExecThread implements Runnable { } // send alert if(this.recoverToleranceFaultTaskList.size() > 0){ - alertManager.sendWarnningWorkerleranceFault(processInstance, recoverToleranceFaultTaskList); + alertManager.sendAlertWorkerToleranceFault(processInstance, recoverToleranceFaultTaskList); this.recoverToleranceFaultTaskList.clear(); } // updateProcessInstance completed task status @@ -851,6 +858,25 @@ public class MasterExecThread implements Runnable { logger.info("process:{} end, state :{}", processInstance.getId(), processInstance.getState()); } + /** + * check process time out + * @param processInstance + * @return + */ + private boolean checkProcessTimeOut(ProcessInstance processInstance) { + if(processInstance.getTimeout() == 0 ){ + return false; + } + + Date now = new Date(); + long runningTime = DateUtils.differMs(now, processInstance.getStartTime()); + + if(runningTime > processInstance.getTimeout()){ + return true; + } + return false; + } + private boolean canSubmitTaskToQueue() { return OSUtils.checkResource(conf, true); } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java b/escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java index ca478096bd..fc62bcf73d 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java @@ -26,6 +26,7 @@ import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.dao.AlertDao; import cn.escheduler.dao.DaoFactory; import cn.escheduler.dao.model.Alert; +import cn.escheduler.dao.model.ProcessDefinition; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; import org.slf4j.Logger; @@ -54,27 +55,27 @@ public class AlertManager { private String getCommandCnName(CommandType commandType) { switch (commandType) { case RECOVER_TOLERANCE_FAULT_PROCESS: - return "恢复容错"; + return "recover tolerance fault process"; case RECOVER_SUSPENDED_PROCESS: - return "恢复暂停流程"; + return "recover suspended process"; case START_CURRENT_TASK_PROCESS: - return "从当前节点开始执行"; + return "start current task process"; case START_FAILURE_TASK_PROCESS: - return "从失败节点开始执行"; + return "start failure task process"; case START_PROCESS: - return "启动工作流"; + return "start process"; case REPEAT_RUNNING: - return "重跑"; + return "repeat running"; case SCHEDULER: - return "定时执行"; + return "scheduler"; case COMPLEMENT_DATA: - return "补数"; + return "complement data"; case PAUSE: - return "暂停工作流"; + return "pause"; case STOP: - return "停止工作流"; + return "stop"; default: - return "未知的命令类型"; + return "unknown type"; } } @@ -124,14 +125,14 @@ public class AlertManager { continue; } LinkedHashMap failedTaskMap = new LinkedHashMap(); - failedTaskMap.put("任务id", String.valueOf(task.getId())); - failedTaskMap.put("任务名称", task.getName()); - failedTaskMap.put("任务类型", task.getTaskType()); - failedTaskMap.put("任务状态", task.getState().toString()); - failedTaskMap.put("任务开始时间", DateUtils.dateToString(task.getStartTime())); - failedTaskMap.put("任务结束时间", DateUtils.dateToString(task.getEndTime())); + failedTaskMap.put("task id", String.valueOf(task.getId())); + failedTaskMap.put("task name", task.getName()); + failedTaskMap.put("task type", task.getTaskType()); + failedTaskMap.put("task state", task.getState().toString()); + failedTaskMap.put("task start time", DateUtils.dateToString(task.getStartTime())); + failedTaskMap.put("task end time", DateUtils.dateToString(task.getEndTime())); failedTaskMap.put("host", task.getHost()); - failedTaskMap.put("日志路径", task.getLogPath()); + failedTaskMap.put("log path", task.getLogPath()); failedTaskList.add(failedTaskMap); } res = JSONUtils.toJson(failedTaskList); @@ -152,10 +153,10 @@ public class AlertManager { for(TaskInstance taskInstance: toleranceTaskList){ LinkedHashMap toleranceWorkerContentMap = new LinkedHashMap(); - toleranceWorkerContentMap.put("工作流程名称", processInstance.getName()); - toleranceWorkerContentMap.put("容错任务名称", taskInstance.getName()); - toleranceWorkerContentMap.put("容错机器IP", taskInstance.getHost()); - toleranceWorkerContentMap.put("任务失败次数", String.valueOf(taskInstance.getRetryTimes())); + toleranceWorkerContentMap.put("process name", processInstance.getName()); + toleranceWorkerContentMap.put("task name", taskInstance.getName()); + toleranceWorkerContentMap.put("host", taskInstance.getHost()); + toleranceWorkerContentMap.put("task retry times", String.valueOf(taskInstance.getRetryTimes())); toleranceTaskInstanceList.add(toleranceWorkerContentMap); } return JSONUtils.toJson(toleranceTaskInstanceList); @@ -166,9 +167,9 @@ public class AlertManager { * @param processInstance * @param toleranceTaskList */ - public void sendWarnningWorkerleranceFault(ProcessInstance processInstance, List toleranceTaskList){ + public void sendAlertWorkerToleranceFault(ProcessInstance processInstance, List toleranceTaskList){ Alert alert = new Alert(); - alert.setTitle("worker容错报警"); + alert.setTitle("worker fault tolerance"); alert.setShowType(ShowType.TABLE); String content = getWorkerToleranceContent(processInstance, toleranceTaskList); alert.setContent(content); @@ -187,8 +188,8 @@ public class AlertManager { * send process instance alert * @param processInstance */ - public void sendWarnningOfProcessInstance(ProcessInstance processInstance, - List taskInstances){ + public void sendAlertProcessInstance(ProcessInstance processInstance, + List taskInstances){ boolean sendWarnning = false; WarningType warningType = processInstance.getWarningType(); @@ -217,7 +218,7 @@ public class AlertManager { String cmdName = getCommandCnName(processInstance.getCommandType()); - String success = processInstance.getState().typeIsSuccess() ? "成功" :"失败"; + String success = processInstance.getState().typeIsSuccess() ? "success" :"failed"; alert.setTitle(cmdName + success); ShowType showType = processInstance.getState().typeIsSuccess() ? ShowType.TEXT : ShowType.TABLE; alert.setShowType(showType); @@ -233,4 +234,7 @@ public class AlertManager { logger.info("add alert to db , alert: {}", alert.toString()); } + public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition) { + alertDao.sendProcessTimeoutAlert(processInstance, processDefinition); + } } diff --git a/escheduler-server/src/test/java/cn/escheduler/server/master/AlertManagerTest.java b/escheduler-server/src/test/java/cn/escheduler/server/master/AlertManagerTest.java index e6881a3afc..14c46b7d0c 100644 --- a/escheduler-server/src/test/java/cn/escheduler/server/master/AlertManagerTest.java +++ b/escheduler-server/src/test/java/cn/escheduler/server/master/AlertManagerTest.java @@ -76,7 +76,7 @@ public class AlertManagerTest { toleranceTaskList.add(toleranceTask1); toleranceTaskList.add(toleranceTask2); - alertManager.sendWarnningWorkerleranceFault(processInstance, toleranceTaskList); + alertManager.sendAlertWorkerToleranceFault(processInstance, toleranceTaskList); } @@ -103,7 +103,7 @@ public class AlertManagerTest { toleranceTaskList.add(toleranceTask1); toleranceTaskList.add(toleranceTask2); - alertManager.sendWarnningOfProcessInstance(processInstance, toleranceTaskList); + alertManager.sendAlertProcessInstance(processInstance, toleranceTaskList); } }