Browse Source

fix 9243 (#9244)

3.0.0/version-upgrade
JinYong Li 3 years ago committed by GitHub
parent
commit
85e56b5e2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 26
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
  2. 15
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProjectUser.java
  3. 1
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml
  4. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  5. 20
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java

26
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java

@ -25,10 +25,9 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessAlertContent; import org.apache.dolphinscheduler.dao.entity.ProcessAlertContent;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.ServerAlertContent; import org.apache.dolphinscheduler.dao.entity.ServerAlertContent;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper; import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.AlertMapper; import org.apache.dolphinscheduler.dao.mapper.AlertMapper;
@ -116,19 +115,22 @@ public class AlertDao {
* process time out alert * process time out alert
* *
* @param processInstance processInstance * @param processInstance processInstance
* @param processDefinition processDefinition * @param projectUser projectUser
*/ */
public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition) { public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProjectUser projectUser) {
int alertGroupId = processInstance.getWarningGroupId(); int alertGroupId = processInstance.getWarningGroupId();
Alert alert = new Alert(); Alert alert = new Alert();
List<ProcessAlertContent> processAlertContentList = new ArrayList<>(1); List<ProcessAlertContent> processAlertContentList = new ArrayList<>(1);
ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder() ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
.projectCode(processDefinition.getProjectCode()) .projectCode(projectUser.getProjectCode())
.projectName(processDefinition.getProjectName()) .projectName(projectUser.getProjectName())
.owner(processDefinition.getUserName()) .owner(projectUser.getUserName())
.processId(processInstance.getId()) .processId(processInstance.getId())
.processDefinitionCode(processInstance.getProcessDefinitionCode()) .processDefinitionCode(processInstance.getProcessDefinitionCode())
.processName(processInstance.getName()) .processName(processInstance.getName())
.processType(processInstance.getCommandType())
.processState(processInstance.getState())
.runTimes(processInstance.getRunTimes())
.processStartTime(processInstance.getStartTime()) .processStartTime(processInstance.getStartTime())
.processHost(processInstance.getHost()) .processHost(processInstance.getHost())
.event(AlertEvent.TIME_OUT) .event(AlertEvent.TIME_OUT)
@ -154,15 +156,15 @@ public class AlertDao {
* *
* @param processInstance processInstanceId * @param processInstance processInstanceId
* @param taskInstance taskInstance * @param taskInstance taskInstance
* @param taskDefinition taskDefinition * @param projectUser projectUser
*/ */
public void sendTaskTimeoutAlert(ProcessInstance processInstance, TaskInstance taskInstance, TaskDefinition taskDefinition) { public void sendTaskTimeoutAlert(ProcessInstance processInstance, TaskInstance taskInstance, ProjectUser projectUser) {
Alert alert = new Alert(); Alert alert = new Alert();
List<ProcessAlertContent> processAlertContentList = new ArrayList<>(1); List<ProcessAlertContent> processAlertContentList = new ArrayList<>(1);
ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder() ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
.projectCode(taskDefinition.getProjectCode()) .projectCode(projectUser.getProjectCode())
.projectName(taskDefinition.getProjectName()) .projectName(projectUser.getProjectName())
.owner(taskDefinition.getUserName()) .owner(projectUser.getUserName())
.processId(processInstance.getId()) .processId(processInstance.getId())
.processDefinitionCode(processInstance.getProcessDefinitionCode()) .processDefinitionCode(processInstance.getProcessDefinitionCode())
.processName(processInstance.getName()) .processName(processInstance.getName())

15
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProjectUser.java

@ -38,6 +38,12 @@ public class ProjectUser {
@TableField("project_id") @TableField("project_id")
private int projectId; private int projectId;
/**
* project code
*/
@TableField(exist = false)
private long projectCode;
/** /**
* project name * project name
*/ */
@ -125,12 +131,21 @@ public class ProjectUser {
this.perm = perm; this.perm = perm;
} }
public long getProjectCode() {
return projectCode;
}
public void setProjectCode(long projectCode) {
this.projectCode = projectCode;
}
@Override @Override
public String toString() { public String toString() {
return "ProjectUser{" return "ProjectUser{"
+ "id=" + id + "id=" + id
+ ", userId=" + userId + ", userId=" + userId
+ ", projectId=" + projectId + ", projectId=" + projectId
+ ", projectCode=" + projectCode
+ ", projectName='" + projectName + '\'' + ", projectName='" + projectName + '\''
+ ", userName='" + userName + '\'' + ", userName='" + userName + '\''
+ ", perm=" + perm + ", perm=" + perm

1
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml

@ -150,6 +150,7 @@
<select id="queryProjectWithUserByProcessInstanceId" resultType="org.apache.dolphinscheduler.dao.entity.ProjectUser"> <select id="queryProjectWithUserByProcessInstanceId" resultType="org.apache.dolphinscheduler.dao.entity.ProjectUser">
select select
dp.id project_id, dp.id project_id,
dp.code project_code,
dp.name project_name, dp.name project_name,
u.user_name user_name u.user_name user_name
from t_ds_process_instance di from t_ds_process_instance di

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -384,13 +384,15 @@ public class WorkflowExecuteThread {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
taskProcessor.action(TaskAction.TIMEOUT); taskProcessor.action(TaskAction.TIMEOUT);
} else { } else {
processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, taskInstance.getTaskDefine()); ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser);
} }
return true; return true;
} }
private boolean processTimeout() { private boolean processTimeout() {
this.processAlertManager.sendProcessTimeoutAlert(this.processInstance, this.processDefinition); ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
this.processAlertManager.sendProcessTimeoutAlert(this.processInstance, projectUser);
return true; return true;
} }

20
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java

@ -26,11 +26,9 @@ import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.DqExecuteResult; import org.apache.dolphinscheduler.dao.entity.DqExecuteResult;
import org.apache.dolphinscheduler.dao.entity.DqExecuteResultAlertContent; import org.apache.dolphinscheduler.dao.entity.DqExecuteResultAlertContent;
import org.apache.dolphinscheduler.dao.entity.ProcessAlertContent; import org.apache.dolphinscheduler.dao.entity.ProcessAlertContent;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.TaskAlertContent; import org.apache.dolphinscheduler.dao.entity.TaskAlertContent;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState;
@ -108,7 +106,7 @@ public class ProcessAlertManager {
if (processInstance.getState().typeIsSuccess()) { if (processInstance.getState().typeIsSuccess()) {
List<ProcessAlertContent> successTaskList = new ArrayList<>(1); List<ProcessAlertContent> successTaskList = new ArrayList<>(1);
ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder() ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
.projectId(projectUser.getProjectId()) .projectCode(projectUser.getProjectCode())
.projectName(projectUser.getProjectName()) .projectName(projectUser.getProjectName())
.owner(projectUser.getUserName()) .owner(projectUser.getUserName())
.processId(processInstance.getId()) .processId(processInstance.getId())
@ -132,7 +130,7 @@ public class ProcessAlertManager {
continue; continue;
} }
ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder() ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
.projectId(projectUser.getProjectId()) .projectCode(projectUser.getProjectCode())
.projectName(projectUser.getProjectName()) .projectName(projectUser.getProjectName())
.owner(projectUser.getUserName()) .owner(projectUser.getUserName())
.processId(processInstance.getId()) .processId(processInstance.getId())
@ -225,7 +223,7 @@ public class ProcessAlertManager {
String success = processInstance.getState().typeIsSuccess() ? "success" : "failed"; String success = processInstance.getState().typeIsSuccess() ? "success" : "failed";
alert.setTitle(cmdName + " " + success); alert.setTitle(cmdName + " " + success);
alert.setWarningType(processInstance.getState().typeIsSuccess() ? WarningType.SUCCESS : WarningType.FAILURE); alert.setWarningType(processInstance.getState().typeIsSuccess() ? WarningType.SUCCESS : WarningType.FAILURE);
String content = getContentProcessInstance(processInstance, taskInstances,projectUser); String content = getContentProcessInstance(processInstance, taskInstances, projectUser);
alert.setContent(content); alert.setContent(content);
alert.setAlertGroupId(processInstance.getWarningGroupId()); alert.setAlertGroupId(processInstance.getWarningGroupId());
alert.setCreateTime(new Date()); alert.setCreateTime(new Date());
@ -270,10 +268,10 @@ public class ProcessAlertManager {
* send process timeout alert * send process timeout alert
* *
* @param processInstance process instance * @param processInstance process instance
* @param processDefinition process definition * @param projectUser projectUser
*/ */
public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition) { public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProjectUser projectUser) {
alertDao.sendProcessTimeoutAlert(processInstance, processDefinition); alertDao.sendProcessTimeoutAlert(processInstance, projectUser);
} }
/** /**
@ -359,8 +357,8 @@ public class ProcessAlertManager {
return JSONUtils.toJsonString(content); return JSONUtils.toJsonString(content);
} }
public void sendTaskTimeoutAlert(ProcessInstance processInstance, TaskInstance taskInstance, TaskDefinition taskDefinition) { public void sendTaskTimeoutAlert(ProcessInstance processInstance, TaskInstance taskInstance, ProjectUser projectUser) {
alertDao.sendTaskTimeoutAlert(processInstance, taskInstance, taskDefinition); alertDao.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser);
} }
/** /**
@ -376,7 +374,7 @@ public class ProcessAlertManager {
String cmdName = getCommandCnName(processInstance.getCommandType()); String cmdName = getCommandCnName(processInstance.getCommandType());
List<ProcessAlertContent> blockingNodeList = new ArrayList<>(1); List<ProcessAlertContent> blockingNodeList = new ArrayList<>(1);
ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder() ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
.projectId(projectUser.getProjectId()) .projectCode(projectUser.getProjectCode())
.projectName(projectUser.getProjectName()) .projectName(projectUser.getProjectName())
.owner(projectUser.getUserName()) .owner(projectUser.getUserName())
.processId(processInstance.getId()) .processId(processInstance.getId())

Loading…
Cancel
Save