diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessAlertContent.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessAlertContent.java index 4382f17f8f..bc96c9f173 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessAlertContent.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessAlertContent.java @@ -61,6 +61,8 @@ public class ProcessAlertContent implements Serializable { private CommandType processType; @JsonProperty("processState") private WorkflowExecutionStatus processState; + @JsonProperty("modifyBy") + private String modifyBy; @JsonProperty("recovery") private Flag recovery; @JsonProperty("runTimes") diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java index 4a403215ba..dc679d927f 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java @@ -29,10 +29,14 @@ import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.DqExecuteResult; import org.apache.dolphinscheduler.dao.entity.DqExecuteResultAlertContent; import org.apache.dolphinscheduler.dao.entity.ProcessAlertContent; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.TaskAlertContent; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; +import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState; import org.apache.commons.collections4.CollectionUtils; @@ -59,6 +63,12 @@ public class ProcessAlertManager { @Autowired private AlertDao alertDao; + @Autowired + private ProcessDefinitionLogMapper processDefinitionLogMapper; + + @Autowired + private UserMapper userMapper; + /** * command type convert chinese * @@ -104,6 +114,16 @@ public class ProcessAlertManager { ProjectUser projectUser) { String res = ""; + ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper + .queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); + + String modifyBy = ""; + if (processDefinitionLog != null) { + User operator = userMapper.selectById(processDefinitionLog.getOperator()); + modifyBy = operator == null ? "" : operator.getUserName(); + } + if (processInstance.getState().isSuccess()) { List successTaskList = new ArrayList<>(1); ProcessAlertContent processAlertContent = ProcessAlertContent.builder() @@ -115,6 +135,7 @@ public class ProcessAlertManager { .processName(processInstance.getName()) .processType(processInstance.getCommandType()) .processState(processInstance.getState()) + .modifyBy(modifyBy) .recovery(processInstance.getRecovery()) .runTimes(processInstance.getRunTimes()) .processStartTime(processInstance.getStartTime()) @@ -137,6 +158,7 @@ public class ProcessAlertManager { .processId(processInstance.getId()) .processDefinitionCode(processInstance.getProcessDefinitionCode()) .processName(processInstance.getName()) + .modifyBy(modifyBy) .taskCode(task.getTaskCode()) .taskName(task.getName()) .taskType(task.getTaskType()) @@ -166,11 +188,21 @@ public class ProcessAlertManager { List toleranceTaskInstanceList = new ArrayList<>(); + ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper + .queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); + String modifyBy = ""; + if (processDefinitionLog != null) { + User operator = userMapper.selectById(processDefinitionLog.getOperator()); + modifyBy = operator == null ? "" : operator.getUserName(); + } + for (TaskInstance taskInstance : toleranceTaskList) { ProcessAlertContent processAlertContent = ProcessAlertContent.builder() .processId(processInstance.getId()) .processDefinitionCode(processInstance.getProcessDefinitionCode()) .processName(processInstance.getName()) + .modifyBy(modifyBy) .taskCode(taskInstance.getTaskCode()) .taskName(taskInstance.getName()) .taskHost(taskInstance.getHost()) @@ -413,6 +445,17 @@ public class ProcessAlertManager { Alert alert = new Alert(); String cmdName = getCommandCnName(processInstance.getCommandType()); List blockingNodeList = new ArrayList<>(1); + + ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper + .queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); + + String modifyBy = ""; + if (processDefinitionLog != null) { + User operator = userMapper.selectById(processDefinitionLog.getOperator()); + modifyBy = operator == null ? "" : operator.getUserName(); + } + ProcessAlertContent processAlertContent = ProcessAlertContent.builder() .projectCode(projectUser.getProjectCode()) .projectName(projectUser.getProjectName()) @@ -421,6 +464,7 @@ public class ProcessAlertManager { .processName(processInstance.getName()) .processType(processInstance.getCommandType()) .processState(processInstance.getState()) + .modifyBy(modifyBy) .runTimes(processInstance.getRunTimes()) .processStartTime(processInstance.getStartTime()) .processEndTime(processInstance.getEndTime()) diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManagerTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManagerTest.java index 2dc864eb1e..c9fe3eea84 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManagerTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManagerTest.java @@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; +import org.apache.dolphinscheduler.dao.mapper.UserMapper; import java.util.ArrayList; import java.util.Date; @@ -51,6 +53,12 @@ public class ProcessAlertManagerTest { @Mock private AlertDao alertDao; + @Mock + private ProcessDefinitionLogMapper processDefinitionLogMapper; + + @Mock + private UserMapper userMapper; + /** * send worker alert fault tolerance */ @@ -81,6 +89,8 @@ public class ProcessAlertManagerTest { processInstance.setState(WorkflowExecutionStatus.SUCCESS); processInstance.setCommandType(CommandType.COMPLEMENT_DATA); processInstance.setWarningGroupId(1); + processInstance.setProcessDefinitionCode(1L); + processInstance.setProcessDefinitionVersion(1); ProjectUser projectUser = new ProjectUser(); TaskInstance taskInstance = new TaskInstance(); @@ -106,6 +116,8 @@ public class ProcessAlertManagerTest { processInstance.setEndTime(new Date()); processInstance.setHost("127.0.0.1"); processInstance.setWarningGroupId(1); + processInstance.setProcessDefinitionCode(1L); + processInstance.setProcessDefinitionVersion(1); ProjectUser projectUser = new ProjectUser();