@ -29,10 +29,14 @@ import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.DqExecuteResult ;
import org.apache.dolphinscheduler.dao.entity.DqExecuteResult ;
import org.apache.dolphinscheduler.dao.entity.DqExecuteResultAlertContent ;
import org.apache.dolphinscheduler.dao.entity.DqExecuteResultAlertContent ;
import org.apache.dolphinscheduler.dao.entity.ProcessAlertContent ;
import org.apache.dolphinscheduler.dao.entity.ProcessAlertContent ;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog ;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance ;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance ;
import org.apache.dolphinscheduler.dao.entity.ProjectUser ;
import org.apache.dolphinscheduler.dao.entity.ProjectUser ;
import org.apache.dolphinscheduler.dao.entity.TaskAlertContent ;
import org.apache.dolphinscheduler.dao.entity.TaskAlertContent ;
import org.apache.dolphinscheduler.dao.entity.TaskInstance ;
import org.apache.dolphinscheduler.dao.entity.TaskInstance ;
import org.apache.dolphinscheduler.dao.entity.User ;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper ;
import org.apache.dolphinscheduler.dao.mapper.UserMapper ;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState ;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState ;
import org.apache.commons.collections4.CollectionUtils ;
import org.apache.commons.collections4.CollectionUtils ;
@ -59,6 +63,12 @@ public class ProcessAlertManager {
@Autowired
@Autowired
private AlertDao alertDao ;
private AlertDao alertDao ;
@Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper ;
@Autowired
private UserMapper userMapper ;
/ * *
/ * *
* command type convert chinese
* command type convert chinese
*
*
@ -104,6 +114,16 @@ public class ProcessAlertManager {
ProjectUser projectUser ) {
ProjectUser projectUser ) {
String res = "" ;
String res = "" ;
ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper
. queryByDefinitionCodeAndVersion ( processInstance . getProcessDefinitionCode ( ) ,
processInstance . getProcessDefinitionVersion ( ) ) ;
String modifyBy = "" ;
if ( processDefinitionLog ! = null ) {
User operator = userMapper . selectById ( processDefinitionLog . getOperator ( ) ) ;
modifyBy = operator = = null ? "" : operator . getUserName ( ) ;
}
if ( processInstance . getState ( ) . isSuccess ( ) ) {
if ( processInstance . getState ( ) . isSuccess ( ) ) {
List < ProcessAlertContent > successTaskList = new ArrayList < > ( 1 ) ;
List < ProcessAlertContent > successTaskList = new ArrayList < > ( 1 ) ;
ProcessAlertContent processAlertContent = ProcessAlertContent . builder ( )
ProcessAlertContent processAlertContent = ProcessAlertContent . builder ( )
@ -115,6 +135,7 @@ public class ProcessAlertManager {
. processName ( processInstance . getName ( ) )
. processName ( processInstance . getName ( ) )
. processType ( processInstance . getCommandType ( ) )
. processType ( processInstance . getCommandType ( ) )
. processState ( processInstance . getState ( ) )
. processState ( processInstance . getState ( ) )
. modifyBy ( modifyBy )
. recovery ( processInstance . getRecovery ( ) )
. recovery ( processInstance . getRecovery ( ) )
. runTimes ( processInstance . getRunTimes ( ) )
. runTimes ( processInstance . getRunTimes ( ) )
. processStartTime ( processInstance . getStartTime ( ) )
. processStartTime ( processInstance . getStartTime ( ) )
@ -137,6 +158,7 @@ public class ProcessAlertManager {
. processId ( processInstance . getId ( ) )
. processId ( processInstance . getId ( ) )
. processDefinitionCode ( processInstance . getProcessDefinitionCode ( ) )
. processDefinitionCode ( processInstance . getProcessDefinitionCode ( ) )
. processName ( processInstance . getName ( ) )
. processName ( processInstance . getName ( ) )
. modifyBy ( modifyBy )
. taskCode ( task . getTaskCode ( ) )
. taskCode ( task . getTaskCode ( ) )
. taskName ( task . getName ( ) )
. taskName ( task . getName ( ) )
. taskType ( task . getTaskType ( ) )
. taskType ( task . getTaskType ( ) )
@ -166,11 +188,21 @@ public class ProcessAlertManager {
List < ProcessAlertContent > toleranceTaskInstanceList = new ArrayList < > ( ) ;
List < ProcessAlertContent > toleranceTaskInstanceList = new ArrayList < > ( ) ;
ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper
. queryByDefinitionCodeAndVersion ( processInstance . getProcessDefinitionCode ( ) ,
processInstance . getProcessDefinitionVersion ( ) ) ;
String modifyBy = "" ;
if ( processDefinitionLog ! = null ) {
User operator = userMapper . selectById ( processDefinitionLog . getOperator ( ) ) ;
modifyBy = operator = = null ? "" : operator . getUserName ( ) ;
}
for ( TaskInstance taskInstance : toleranceTaskList ) {
for ( TaskInstance taskInstance : toleranceTaskList ) {
ProcessAlertContent processAlertContent = ProcessAlertContent . builder ( )
ProcessAlertContent processAlertContent = ProcessAlertContent . builder ( )
. processId ( processInstance . getId ( ) )
. processId ( processInstance . getId ( ) )
. processDefinitionCode ( processInstance . getProcessDefinitionCode ( ) )
. processDefinitionCode ( processInstance . getProcessDefinitionCode ( ) )
. processName ( processInstance . getName ( ) )
. processName ( processInstance . getName ( ) )
. modifyBy ( modifyBy )
. taskCode ( taskInstance . getTaskCode ( ) )
. taskCode ( taskInstance . getTaskCode ( ) )
. taskName ( taskInstance . getName ( ) )
. taskName ( taskInstance . getName ( ) )
. taskHost ( taskInstance . getHost ( ) )
. taskHost ( taskInstance . getHost ( ) )
@ -413,6 +445,17 @@ public class ProcessAlertManager {
Alert alert = new Alert ( ) ;
Alert alert = new Alert ( ) ;
String cmdName = getCommandCnName ( processInstance . getCommandType ( ) ) ;
String cmdName = getCommandCnName ( processInstance . getCommandType ( ) ) ;
List < ProcessAlertContent > blockingNodeList = new ArrayList < > ( 1 ) ;
List < ProcessAlertContent > blockingNodeList = new ArrayList < > ( 1 ) ;
ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper
. queryByDefinitionCodeAndVersion ( processInstance . getProcessDefinitionCode ( ) ,
processInstance . getProcessDefinitionVersion ( ) ) ;
String modifyBy = "" ;
if ( processDefinitionLog ! = null ) {
User operator = userMapper . selectById ( processDefinitionLog . getOperator ( ) ) ;
modifyBy = operator = = null ? "" : operator . getUserName ( ) ;
}
ProcessAlertContent processAlertContent = ProcessAlertContent . builder ( )
ProcessAlertContent processAlertContent = ProcessAlertContent . builder ( )
. projectCode ( projectUser . getProjectCode ( ) )
. projectCode ( projectUser . getProjectCode ( ) )
. projectName ( projectUser . getProjectName ( ) )
. projectName ( projectUser . getProjectName ( ) )
@ -421,6 +464,7 @@ public class ProcessAlertManager {
. processName ( processInstance . getName ( ) )
. processName ( processInstance . getName ( ) )
. processType ( processInstance . getCommandType ( ) )
. processType ( processInstance . getCommandType ( ) )
. processState ( processInstance . getState ( ) )
. processState ( processInstance . getState ( ) )
. modifyBy ( modifyBy )
. runTimes ( processInstance . getRunTimes ( ) )
. runTimes ( processInstance . getRunTimes ( ) )
. processStartTime ( processInstance . getStartTime ( ) )
. processStartTime ( processInstance . getStartTime ( ) )
. processEndTime ( processInstance . getEndTime ( ) )
. processEndTime ( processInstance . getEndTime ( ) )