@ -22,6 +22,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.parameters.DependentPa
import org.apache.dolphinscheduler.common.constants.Constants ;
import org.apache.dolphinscheduler.common.constants.Constants ;
import org.apache.dolphinscheduler.common.enums.Flag ;
import org.apache.dolphinscheduler.common.enums.Flag ;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType ;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType ;
import org.apache.dolphinscheduler.common.utils.JSONUtils ;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance ;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance ;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation ;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation ;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition ;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition ;
@ -33,9 +34,11 @@ import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao ;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao ;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult ;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult ;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation ;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation ;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct ;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus ;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus ;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval ;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval ;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem ;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem ;
import org.apache.dolphinscheduler.plugin.task.api.model.Property ;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters ;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters ;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils ;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils ;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext ;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext ;
@ -48,6 +51,7 @@ import java.util.Date;
import java.util.HashMap ;
import java.util.HashMap ;
import java.util.List ;
import java.util.List ;
import java.util.Map ;
import java.util.Map ;
import java.util.function.Function ;
import java.util.stream.Collectors ;
import java.util.stream.Collectors ;
import lombok.extern.slf4j.Slf4j ;
import lombok.extern.slf4j.Slf4j ;
@ -97,6 +101,14 @@ public class DependentExecute {
* /
* /
private final TaskDefinitionDao taskDefinitionDao = SpringApplicationContext . getBean ( TaskDefinitionDao . class ) ;
private final TaskDefinitionDao taskDefinitionDao = SpringApplicationContext . getBean ( TaskDefinitionDao . class ) ;
private Map < String , Property > dependTaskVarPoolPropertyMap = new HashMap < > ( ) ;
private Map < String , Long > dependTaskVarPoolEndTimeMap = new HashMap < > ( ) ;
private Map < String , Property > dependItemVarPoolPropertyMap = new HashMap < > ( ) ;
private Map < String , Long > dependItemVarPoolEndTimeMap = new HashMap < > ( ) ;
/ * *
/ * *
* constructor
* constructor
*
*
@ -168,6 +180,7 @@ public class DependentExecute {
return DependResult . WAITING ;
return DependResult . WAITING ;
}
}
if ( processInstance . getState ( ) . isSuccess ( ) ) {
if ( processInstance . getState ( ) . isSuccess ( ) ) {
addItemVarPool ( processInstance . getVarPool ( ) , processInstance . getEndTime ( ) . getTime ( ) ) ;
return DependResult . SUCCESS ;
return DependResult . SUCCESS ;
}
}
log . warn (
log . warn (
@ -221,46 +234,12 @@ public class DependentExecute {
}
}
}
}
}
}
addItemVarPool ( processInstance . getVarPool ( ) , processInstance . getEndTime ( ) . getTime ( ) ) ;
return DependResult . SUCCESS ;
return DependResult . SUCCESS ;
}
}
return DependResult . FAILED ;
return DependResult . FAILED ;
}
}
/ * *
* get depend task result
*
* @param taskCode
* @param processInstance
* @return
* /
private DependResult getDependTaskResult ( long taskCode , ProcessInstance processInstance , int testFlag ) {
DependResult result ;
TaskInstance taskInstance = null ;
List < TaskInstance > taskInstanceList =
taskInstanceDao . queryValidTaskListByWorkflowInstanceId ( processInstance . getId ( ) , testFlag ) ;
for ( TaskInstance task : taskInstanceList ) {
if ( task . getTaskCode ( ) = = taskCode ) {
taskInstance = task ;
break ;
}
}
if ( taskInstance = = null ) {
// cannot find task in the process instance
// maybe because process instance is running or failed.
if ( processInstance . getState ( ) . isFinished ( ) ) {
result = DependResult . FAILED ;
} else {
return DependResult . WAITING ;
}
} else {
result = getDependResultByState ( taskInstance . getState ( ) ) ;
}
return result ;
}
/ * *
/ * *
* depend type = depend_task
* depend type = depend_task
*
*
@ -303,12 +282,31 @@ public class DependentExecute {
log . info (
log . info (
"The dependent task is a streaming task, so return depend success. Task code: {}, task name: {}." ,
"The dependent task is a streaming task, so return depend success. Task code: {}, task name: {}." ,
taskInstance . getTaskCode ( ) , taskInstance . getName ( ) ) ;
taskInstance . getTaskCode ( ) , taskInstance . getName ( ) ) ;
addItemVarPool ( taskInstance . getVarPool ( ) , taskInstance . getEndTime ( ) . getTime ( ) ) ;
return DependResult . SUCCESS ;
return DependResult . SUCCESS ;
}
}
return getDependResultByState ( taskInstance . getState ( ) ) ;
return getDependResultByState ( taskInstance . getState ( ) ) ;
}
}
}
}
/ * *
* add varPool to dependItemVarPoolMap
*
* @param varPoolStr
* @param endTime
* /
private void addItemVarPool ( String varPoolStr , Long endTime ) {
List < Property > varPool = new ArrayList < > ( JSONUtils . toList ( varPoolStr , Property . class ) ) ;
if ( ! varPool . isEmpty ( ) ) {
Map < String , Property > varPoolPropertyMap = varPool . stream ( ) . filter ( p - > p . getDirect ( ) . equals ( Direct . OUT ) )
. collect ( Collectors . toMap ( Property : : getProp , Function . identity ( ) ) ) ;
Map < String , Long > varPoolEndTimeMap = varPool . stream ( ) . filter ( p - > p . getDirect ( ) . equals ( Direct . OUT ) )
. collect ( Collectors . toMap ( Property : : getProp , d - > endTime ) ) ;
dependItemVarPoolPropertyMap . putAll ( varPoolPropertyMap ) ;
dependItemVarPoolEndTimeMap . putAll ( varPoolEndTimeMap ) ;
}
}
/ * *
/ * *
* find the last one process instance that :
* find the last one process instance that :
* 1 . manual run and finish between the interval
* 1 . manual run and finish between the interval
@ -399,7 +397,13 @@ public class DependentExecute {
DependResult dependResult = getDependResultForItem ( dependentItem , currentTime , testFlag ) ;
DependResult dependResult = getDependResultForItem ( dependentItem , currentTime , testFlag ) ;
if ( dependResult ! = DependResult . WAITING & & dependResult ! = DependResult . FAILED ) {
if ( dependResult ! = DependResult . WAITING & & dependResult ! = DependResult . FAILED ) {
dependResultMap . put ( dependentItem . getKey ( ) , dependResult ) ;
dependResultMap . put ( dependentItem . getKey ( ) , dependResult ) ;
if ( dependentItem . getParameterPassing ( ) & & ! dependItemVarPoolPropertyMap . isEmpty ( ) ) {
DependentUtils . addTaskVarPool ( dependItemVarPoolPropertyMap , dependItemVarPoolEndTimeMap ,
dependTaskVarPoolPropertyMap , dependTaskVarPoolEndTimeMap ) ;
}
}
}
dependItemVarPoolPropertyMap . clear ( ) ;
dependItemVarPoolEndTimeMap . clear ( ) ;
dependResultList . add ( dependResult ) ;
dependResultList . add ( dependResult ) ;
}
}
return DependentUtils . getDependResultForRelation ( this . relation , dependResultList ) ;
return DependentUtils . getDependResultForRelation ( this . relation , dependResultList ) ;
@ -424,6 +428,14 @@ public class DependentExecute {
return dependResultMap ;
return dependResultMap ;
}
}
public Map < String , Property > getDependTaskVarPoolPropertyMap ( ) {
return dependTaskVarPoolPropertyMap ;
}
public Map < String , Long > getDependTaskVarPoolEndTimeMap ( ) {
return dependTaskVarPoolEndTimeMap ;
}
/ * *
/ * *
* check for self - dependent
* check for self - dependent
* @param dependentItem
* @param dependentItem