diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java index 49b8c01ece..685d72c1e8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java @@ -14,15 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.dao; +package org.apache.dolphinscheduler.dao; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; import org.apache.dolphinscheduler.common.enums.AlertStatus; import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.enums.ShowType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory; import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; @@ -30,13 +29,17 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.AlertMapper; import org.apache.dolphinscheduler.dao.mapper.UserAlertGroupMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import java.util.ArrayList; import java.util.Date; +import java.util.LinkedHashMap; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + @Component public class AlertDao extends AbstractBaseDao { @@ -56,21 +59,23 @@ public class AlertDao extends AbstractBaseDao { /** * insert alert + * * @param alert alert * @return add alert result */ - public int addAlert(Alert alert){ + public int addAlert(Alert alert) { return alertMapper.insert(alert); } /** * update alert + * * @param alertStatus alertStatus * @param log log * @param id id * @return update alert result */ - public int updateAlert(AlertStatus alertStatus,String log,int id){ + public int updateAlert(AlertStatus alertStatus, String log, int id) { Alert alert = alertMapper.selectById(id); alert.setAlertStatus(alertStatus); alert.setUpdateTime(new Date()); @@ -80,46 +85,61 @@ public class AlertDao extends AbstractBaseDao { /** * query user list by alert group id + * * @param alerGroupId alerGroupId * @return user list */ - public List queryUserByAlertGroupId(int alerGroupId){ + public List queryUserByAlertGroupId(int alerGroupId) { return userAlertGroupMapper.listUserByAlertgroupId(alerGroupId); } /** * MasterServer or WorkerServer stoped + * * @param alertgroupId alertgroupId * @param host host * @param serverType serverType */ - public void sendServerStopedAlert(int alertgroupId,String host,String serverType){ + public void sendServerStopedAlert(int alertgroupId, String host, String serverType) { Alert alert = new Alert(); - String content = String.format("[{'type':'%s','host':'%s','event':'server down','warning level':'serious'}]", - serverType, host); + List serverStopList = new ArrayList<>(1); + LinkedHashMap serverStopedMap = new LinkedHashMap(); + serverStopedMap.put("type", serverType); + serverStopedMap.put("host", host); + serverStopedMap.put("event", "server down"); + serverStopedMap.put("warning level", "serious"); + serverStopList.add(serverStopedMap); + String content = JSONUtils.toJsonString(serverStopList); alert.setTitle("Fault tolerance warning"); saveTaskTimeoutAlert(alert, content, alertgroupId, null, null); } /** * process time out alert + * * @param processInstance processInstance * @param processDefinition processDefinition */ - public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition){ + public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition) { int alertgroupId = processInstance.getWarningGroupId(); String receivers = processDefinition.getReceivers(); String receiversCc = processDefinition.getReceiversCc(); Alert alert = new Alert(); - String content = String.format("[{'id':'%d','name':'%s','event':'timeout','warnLevel':'middle'}]", - processInstance.getId(), processInstance.getName()); + List processTimeoutList = new ArrayList<>(1); + LinkedHashMap processTimeoutMap = new LinkedHashMap(); + processTimeoutMap.put("id", String.valueOf(processInstance.getId())); + processTimeoutMap.put("name", processInstance.getName()); + processTimeoutMap.put("event", "timeout"); + processTimeoutMap.put("warnLevel", "middle"); + processTimeoutList.add(processTimeoutMap); + String content = JSONUtils.toJsonString(processTimeoutList); alert.setTitle("Process Timeout Warn"); saveTaskTimeoutAlert(alert, content, alertgroupId, receivers, receiversCc); } - private void saveTaskTimeoutAlert(Alert alert, String content, int alertgroupId, - String receivers, String receiversCc){ + private void saveTaskTimeoutAlert(Alert alert, String content, int alertgroupId, + String receivers, String receiversCc) { alert.setShowType(ShowType.TABLE); alert.setContent(content); alert.setAlertType(AlertType.EMAIL); @@ -135,9 +155,9 @@ public class AlertDao extends AbstractBaseDao { alertMapper.insert(alert); } - /** * task timeout warn + * * @param alertgroupId alertgroupId * @param receivers receivers * @param receiversCc receiversCc @@ -146,34 +166,45 @@ public class AlertDao extends AbstractBaseDao { * @param taskId taskId * @param taskName taskName */ - public void sendTaskTimeoutAlert(int alertgroupId,String receivers,String receiversCc, int processInstanceId, - String processInstanceName, int taskId,String taskName){ + public void sendTaskTimeoutAlert(int alertgroupId, String receivers, String receiversCc, int processInstanceId, + String processInstanceName, int taskId, String taskName) { Alert alert = new Alert(); - String content = String.format("[{'process instance id':'%d','task name':'%s','task id':'%d','task name':'%s'," + - "'event':'timeout','warnLevel':'middle'}]", processInstanceId, processInstanceName, taskId, taskName); + List taskTimeoutList = new ArrayList<>(1); + LinkedHashMap taskTimeoutMap = new LinkedHashMap(); + taskTimeoutMap.put("process instance id", String.valueOf(processInstanceId)); + taskTimeoutMap.put("process name", processInstanceName); + taskTimeoutMap.put("task id", String.valueOf(taskId)); + taskTimeoutMap.put("task name", taskName); + taskTimeoutMap.put("event", "timeout"); + taskTimeoutMap.put("warnLevel", "middle"); + taskTimeoutList.add(taskTimeoutMap); + String content = JSONUtils.toJsonString(taskTimeoutList); alert.setTitle("Task Timeout Warn"); saveTaskTimeoutAlert(alert, content, alertgroupId, receivers, receiversCc); } /** * list the alert information of waiting to be executed + * * @return alert list */ - public List listWaitExecutionAlert(){ + public List listWaitExecutionAlert() { return alertMapper.listAlertByStatus(AlertStatus.WAIT_EXECUTION); } /** * list user information by alert group id + * * @param alertgroupId alertgroupId * @return user list */ - public List listUserByAlertgroupId(int alertgroupId){ + public List listUserByAlertgroupId(int alertgroupId) { return userAlertGroupMapper.listUserByAlertgroupId(alertgroupId); } /** * for test + * * @return AlertMapper */ public AlertMapper getAlertMapper() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java index 49ec9d3fdd..08c6022519 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java @@ -14,29 +14,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.utils; +package org.apache.dolphinscheduler.server.utils; import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ShowType; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.DaoFactory; import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Date; import java.util.LinkedHashMap; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * alert manager */ @@ -50,8 +51,7 @@ public class AlertManager { /** * alert dao */ - private AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class); - + private final AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class); /** * command type convert chinese @@ -86,50 +86,37 @@ public class AlertManager { } } - /** - * process instance format - */ - private static final String PROCESS_INSTANCE_FORMAT = - "\"id:%d\"," + - "\"name:%s\"," + - "\"job type: %s\"," + - "\"state: %s\"," + - "\"recovery:%s\"," + - "\"run time: %d\"," + - "\"start time: %s\"," + - "\"end time: %s\"," + - "\"host: %s\"" ; - /** * get process instance content - * @param processInstance process instance - * @param taskInstances task instance list + * + * @param processInstance process instance + * @param taskInstances task instance list * @return process instance format content */ public String getContentProcessInstance(ProcessInstance processInstance, - List taskInstances){ + List taskInstances) { String res = ""; - if(processInstance.getState().typeIsSuccess()){ - res = String.format(PROCESS_INSTANCE_FORMAT, - processInstance.getId(), - processInstance.getName(), - getCommandCnName(processInstance.getCommandType()), - processInstance.getState().toString(), - processInstance.getRecovery().toString(), - processInstance.getRunTimes(), - DateUtils.dateToString(processInstance.getStartTime()), - DateUtils.dateToString(processInstance.getEndTime()), - processInstance.getHost() - - ); - res = "[" + res + "]"; - }else if(processInstance.getState().typeIsFailure()){ + if (processInstance.getState().typeIsSuccess()) { + List successTaskList = new ArrayList<>(1); + LinkedHashMap successTaskMap = new LinkedHashMap(); + successTaskMap.put("id", String.valueOf(processInstance.getId())); + successTaskMap.put("name", processInstance.getName()); + successTaskMap.put("job type", getCommandCnName(processInstance.getCommandType())); + successTaskMap.put("state", processInstance.getState().toString()); + successTaskMap.put("recovery", processInstance.getRecovery().toString()); + successTaskMap.put("run time", String.valueOf(processInstance.getRunTimes())); + successTaskMap.put("start time", DateUtils.dateToString(processInstance.getStartTime())); + successTaskMap.put("end time", DateUtils.dateToString(processInstance.getEndTime())); + successTaskMap.put("host", processInstance.getHost()); + successTaskList.add(successTaskMap); + res = JSONUtils.toJsonString(successTaskList); + } else if (processInstance.getState().typeIsFailure()) { List failedTaskList = new ArrayList<>(); - for(TaskInstance task : taskInstances){ - if(task.getState().typeIsSuccess()){ + for (TaskInstance task : taskInstances) { + if (task.getState().typeIsSuccess()) { continue; } LinkedHashMap failedTaskMap = new LinkedHashMap(); @@ -154,15 +141,15 @@ public class AlertManager { /** * getting worker fault tolerant content * - * @param processInstance process instance + * @param processInstance process instance * @param toleranceTaskList tolerance task list * @return worker tolerance content */ - private String getWorkerToleranceContent(ProcessInstance processInstance, List toleranceTaskList){ + private String getWorkerToleranceContent(ProcessInstance processInstance, List toleranceTaskList) { - List> toleranceTaskInstanceList = new ArrayList<>(); + List> toleranceTaskInstanceList = new ArrayList<>(); - for(TaskInstance taskInstance: toleranceTaskList){ + for (TaskInstance taskInstance : toleranceTaskList) { LinkedHashMap toleranceWorkerContentMap = new LinkedHashMap(); toleranceWorkerContentMap.put("process name", processInstance.getName()); toleranceWorkerContentMap.put("task name", taskInstance.getName()); @@ -176,11 +163,11 @@ public class AlertManager { /** * send worker alert fault tolerance * - * @param processInstance process instance + * @param processInstance process instance * @param toleranceTaskList tolerance task list */ - public void sendAlertWorkerToleranceFault(ProcessInstance processInstance, List toleranceTaskList){ - try{ + public void sendAlertWorkerToleranceFault(ProcessInstance processInstance, List toleranceTaskList) { + try { Alert alert = new Alert(); alert.setTitle("worker fault tolerance"); alert.setShowType(ShowType.TABLE); @@ -188,13 +175,13 @@ public class AlertManager { alert.setContent(content); alert.setAlertType(AlertType.EMAIL); alert.setCreateTime(new Date()); - alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1:processInstance.getWarningGroupId()); + alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1 : processInstance.getWarningGroupId()); alert.setReceivers(processInstance.getProcessDefinition().getReceivers()); alert.setReceiversCc(processInstance.getProcessDefinition().getReceiversCc()); alertDao.addAlert(alert); logger.info("add alert to db , alert : {}", alert.toString()); - }catch (Exception e){ + } catch (Exception e) { logger.error("send alert failed:{} ", e.getMessage()); } @@ -202,40 +189,40 @@ public class AlertManager { /** * send process instance alert - * @param processInstance process instance - * @param taskInstances task instance list + * + * @param processInstance process instance + * @param taskInstances task instance list */ public void sendAlertProcessInstance(ProcessInstance processInstance, - List taskInstances){ + List taskInstances) { boolean sendWarnning = false; WarningType warningType = processInstance.getWarningType(); - switch (warningType){ + switch (warningType) { case ALL: - if(processInstance.getState().typeIsFinished()){ + if (processInstance.getState().typeIsFinished()) { sendWarnning = true; } break; case SUCCESS: - if(processInstance.getState().typeIsSuccess()){ + if (processInstance.getState().typeIsSuccess()) { sendWarnning = true; } break; case FAILURE: - if(processInstance.getState().typeIsFailure()){ + if (processInstance.getState().typeIsFailure()) { sendWarnning = true; } break; - default: + default: } - if(!sendWarnning){ + if (!sendWarnning) { return; } Alert alert = new Alert(); - String cmdName = getCommandCnName(processInstance.getCommandType()); - String success = processInstance.getState().typeIsSuccess() ? "success" :"failed"; + String success = processInstance.getState().typeIsSuccess() ? "success" : "failed"; alert.setTitle(cmdName + " " + success); ShowType showType = processInstance.getState().typeIsSuccess() ? ShowType.TEXT : ShowType.TABLE; alert.setShowType(showType); @@ -254,7 +241,7 @@ public class AlertManager { /** * send process timeout alert * - * @param processInstance process instance + * @param processInstance process instance * @param processDefinition process definition */ public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition) {