@ -20,9 +20,16 @@ package org.apache.dolphinscheduler.server.master.utils;
import static org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters.DependentFailurePolicyEnum.DEPENDENT_FAILURE_WAITING ;
import static org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters.DependentFailurePolicyEnum.DEPENDENT_FAILURE_WAITING ;
import org.apache.dolphinscheduler.common.constants.Constants ;
import org.apache.dolphinscheduler.common.constants.Constants ;
import org.apache.dolphinscheduler.common.enums.Flag ;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType ;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance ;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance ;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation ;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition ;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog ;
import org.apache.dolphinscheduler.dao.entity.TaskInstance ;
import org.apache.dolphinscheduler.dao.entity.TaskInstance ;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao ;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao ;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao ;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao ;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao ;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao ;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult ;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult ;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation ;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation ;
@ -32,6 +39,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters ;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters ;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils ;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils ;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext ;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext ;
import org.apache.dolphinscheduler.service.process.ProcessService ;
import java.time.Duration ;
import java.time.Duration ;
import java.time.Instant ;
import java.time.Instant ;
@ -40,6 +48,7 @@ import java.util.Date;
import java.util.HashMap ;
import java.util.HashMap ;
import java.util.List ;
import java.util.List ;
import java.util.Map ;
import java.util.Map ;
import java.util.stream.Collectors ;
import lombok.extern.slf4j.Slf4j ;
import lombok.extern.slf4j.Slf4j ;
@ -72,6 +81,22 @@ public class DependentExecute {
* /
* /
private Map < String , DependResult > dependResultMap = new HashMap < > ( ) ;
private Map < String , DependResult > dependResultMap = new HashMap < > ( ) ;
/ * *
* process service
* /
private final ProcessService processService = SpringApplicationContext . getBean ( ProcessService . class ) ;
/ * *
* task definition log dao
* /
private final TaskDefinitionLogDao taskDefinitionLogDao =
SpringApplicationContext . getBean ( TaskDefinitionLogDao . class ) ;
/ * *
* task definition dao
* /
private final TaskDefinitionDao taskDefinitionDao = SpringApplicationContext . getBean ( TaskDefinitionDao . class ) ;
/ * *
/ * *
* constructor
* constructor
*
*
@ -118,10 +143,13 @@ public class DependentExecute {
return DependResult . WAITING ;
return DependResult . WAITING ;
}
}
// need to check workflow for updates, so get all task and check the task state
// need to check workflow for updates, so get all task and check the task state
if ( dependentItem . getDepTaskCode ( ) = = Constants . DEPENDENT_ALL_TASK _CODE ) {
if ( dependentItem . getDepTaskCode ( ) = = Constants . DEPENDENT_WORKFLOW _CODE ) {
result = dependResultByProcessInstance ( processInstance ) ;
result = dependResultByProcessInstance ( processInstance ) ;
} else if ( dependentItem . getDepTaskCode ( ) = = Constants . DEPENDENT_ALL_TASK_CODE ) {
result = dependResultByAllTaskOfProcessInstance ( processInstance , dateInterval , testFlag ) ;
} else {
} else {
result = getDependTaskResult ( dependentItem . getDepTaskCode ( ) , processInstance , testFlag ) ;
result = dependResultBySingleTaskInstance ( processInstance , dependentItem . getDepTaskCode ( ) , dateInterval ,
testFlag ) ;
}
}
if ( result ! = DependResult . SUCCESS ) {
if ( result ! = DependResult . SUCCESS ) {
break ;
break ;
@ -131,7 +159,7 @@ public class DependentExecute {
}
}
/ * *
/ * *
* depend type = depend_all
* depend type = depend_work_flow
*
*
* @return
* @return
* /
* /
@ -142,6 +170,59 @@ public class DependentExecute {
if ( processInstance . getState ( ) . isSuccess ( ) ) {
if ( processInstance . getState ( ) . isSuccess ( ) ) {
return DependResult . SUCCESS ;
return DependResult . SUCCESS ;
}
}
log . warn (
"The dependent workflow did not execute successfully, so return depend failed. processCode: {}, processName: {}" ,
processInstance . getProcessDefinitionCode ( ) , processInstance . getName ( ) ) ;
return DependResult . FAILED ;
}
/ * *
* depend type = depend_all
*
* @return
* /
private DependResult dependResultByAllTaskOfProcessInstance ( ProcessInstance processInstance ,
DateInterval dateInterval , int testFlag ) {
if ( ! processInstance . getState ( ) . isFinished ( ) ) {
log . info ( "Wait for the dependent workflow to complete, processCode: {}, processInstanceId: {}." ,
processInstance . getProcessDefinitionCode ( ) , processInstance . getId ( ) ) ;
return DependResult . WAITING ;
}
if ( processInstance . getState ( ) . isSuccess ( ) ) {
List < ProcessTaskRelation > processTaskRelations =
processService . findRelationByCode ( processInstance . getProcessDefinitionCode ( ) ,
processInstance . getProcessDefinitionVersion ( ) ) ;
List < TaskDefinitionLog > taskDefinitionLogs =
taskDefinitionLogDao . queryTaskDefineLogList ( processTaskRelations ) ;
Map < Long , String > taskDefinitionCodeMap =
taskDefinitionLogs . stream ( ) . filter ( taskDefinitionLog - > taskDefinitionLog . getFlag ( ) = = Flag . YES )
. collect ( Collectors . toMap ( TaskDefinitionLog : : getCode , TaskDefinitionLog : : getName ) ) ;
List < TaskInstance > taskInstanceList =
taskInstanceDao . queryLastTaskInstanceListIntervalByTaskCodes ( taskDefinitionCodeMap . keySet ( ) ,
dateInterval , testFlag ) ;
Map < Long , TaskExecutionStatus > taskExecutionStatusMap =
taskInstanceList . stream ( )
. filter ( taskInstance - > taskInstance . getTaskExecuteType ( ) ! = TaskExecuteType . STREAM )
. collect ( Collectors . toMap ( TaskInstance : : getTaskCode , TaskInstance : : getState ) ) ;
for ( Long taskCode : taskDefinitionCodeMap . keySet ( ) ) {
if ( ! taskExecutionStatusMap . containsKey ( taskCode ) ) {
log . warn (
"The task of the workflow is not being executed, taskCode: {}, processInstanceId: {}, processName: {}." ,
taskCode , processInstance . getProcessDefinitionCode ( ) , processInstance . getName ( ) ) ;
return DependResult . FAILED ;
} else {
if ( ! taskExecutionStatusMap . get ( taskCode ) . isSuccess ( ) ) {
log . warn (
"The task of the workflow is not being executed successfully, taskCode: {}, processInstanceId: {}, processName: {}." ,
taskCode , processInstance . getProcessDefinitionCode ( ) , processInstance . getName ( ) ) ;
return DependResult . FAILED ;
}
}
}
return DependResult . SUCCESS ;
}
return DependResult . FAILED ;
return DependResult . FAILED ;
}
}
@ -180,6 +261,54 @@ public class DependentExecute {
return result ;
return result ;
}
}
/ * *
* depend type = depend_task
*
* @param processInstance last process instance in the date interval
* @param depTaskCode the dependent task code
* @param dateInterval date interval
* @param testFlag test flag
* @return depend result
* /
private DependResult dependResultBySingleTaskInstance ( ProcessInstance processInstance , long depTaskCode ,
DateInterval dateInterval , int testFlag ) {
TaskInstance taskInstance =
taskInstanceDao . queryLastTaskInstanceIntervalByTaskCode ( depTaskCode , dateInterval , testFlag ) ;
if ( taskInstance = = null ) {
TaskDefinition taskDefinition = taskDefinitionDao . queryByCode ( depTaskCode ) ;
if ( taskDefinition = = null ) {
log . error ( "The dependent task definition can not be find, so return depend failed, taskCode: {}" ,
depTaskCode ) ;
return DependResult . FAILED ;
}
if ( taskDefinition . getFlag ( ) = = Flag . NO ) {
log . info (
"The dependent task is a forbidden task, so return depend success. Task code: {}, task name: {}" ,
taskDefinition . getCode ( ) , taskDefinition . getName ( ) ) ;
return DependResult . SUCCESS ;
}
if ( ! processInstance . getState ( ) . isFinished ( ) ) {
log . info ( "Wait for the dependent workflow to complete, processCode: {}, processInstanceId: {}." ,
processInstance . getProcessDefinitionCode ( ) , processInstance . getId ( ) ) ;
return DependResult . WAITING ;
}
return DependResult . FAILED ;
} else {
if ( TaskExecuteType . STREAM = = taskInstance . getTaskExecuteType ( ) ) {
log . info (
"The dependent task is a streaming task, so return depend success. Task code: {}, task name: {}." ,
taskInstance . getTaskCode ( ) , taskInstance . getName ( ) ) ;
return DependResult . SUCCESS ;
}
return getDependResultByState ( taskInstance . getState ( ) ) ;
}
}
/ * *
/ * *
* find the last one process instance that :
* find the last one process instance that :
* 1 . manual run and finish between the interval
* 1 . manual run and finish between the interval
@ -221,6 +350,9 @@ public class DependentExecute {
} else if ( state . isSuccess ( ) ) {
} else if ( state . isSuccess ( ) ) {
return DependResult . SUCCESS ;
return DependResult . SUCCESS ;
} else {
} else {
log . warn (
"The dependent task were not executed successfully, so return depend failed. Task code: {}, task name: {}." ,
taskInstance . getTaskCode ( ) , taskInstance . getName ( ) ) ;
return DependResult . FAILED ;
return DependResult . FAILED ;
}
}
}
}