@ -17,9 +17,18 @@
package org.apache.dolphinscheduler.server.master.runner.task ;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS ;
import org.apache.dolphinscheduler.common.enums.Direct ;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus ;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy ;
import org.apache.dolphinscheduler.common.enums.TaskType ;
import org.apache.dolphinscheduler.common.graph.DAG ;
import org.apache.dolphinscheduler.common.model.TaskNode ;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation ;
import org.apache.dolphinscheduler.common.process.Property ;
import org.apache.dolphinscheduler.common.utils.JSONUtils ;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition ;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance ;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition ;
import org.apache.dolphinscheduler.dao.entity.TaskInstance ;
@ -28,9 +37,19 @@ import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.server.utils.LogUtils ;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext ;
import org.apache.commons.lang3.StringUtils ;
import java.util.Comparator ;
import java.util.Date ;
import java.util.HashMap ;
import java.util.List ;
import java.util.Map ;
import java.util.Set ;
import java.util.concurrent.locks.Lock ;
import java.util.concurrent.locks.ReentrantLock ;
import java.util.stream.Collectors ;
import com.fasterxml.jackson.core.type.TypeReference ;
/ * *
*
@ -110,6 +129,77 @@ public class SubTaskProcessor extends BaseTaskProcessor {
}
}
private Map < String , Property > mergeEndNodeTaskInstanceVarPool ( Set < String > taskCodes ) {
List < TaskInstance > taskInstanceList = processService . findValidTaskListByProcessId ( subProcessInstance . getId ( ) ) ;
logger . info ( "in dealFinish1, mergeEndNodeTaskInstanceVarPool, taskInstanceList.size:{}, subProcessInstance.getId:{}" , taskInstanceList . size ( ) , subProcessInstance . getId ( ) ) ;
// filter end nodes and sort by end time reversed
List < TaskInstance > endTaskInstancesSortedByEndTimeReversed = taskInstanceList . stream ( )
. filter ( o - > taskCodes . contains ( Long . toString ( o . getTaskCode ( ) ) ) ) .
sorted ( Comparator . comparing ( TaskInstance : : getEndTime ) . reversed ( ) ) . collect ( Collectors . toList ( ) ) ;
logger . info ( "in dealFinish1, mergeEndNodeTaskInstanceVarPool, endTaskInstancesSortedByEndTimeReversed.size:{}" , endTaskInstancesSortedByEndTimeReversed . size ( ) ) ;
Map < String , Property > allProperties = new HashMap < > ( ) ;
for ( TaskInstance taskInstance : endTaskInstancesSortedByEndTimeReversed ) {
String varPool = taskInstance . getVarPool ( ) ;
if ( org . apache . commons . lang . StringUtils . isNotEmpty ( varPool ) ) {
List < Property > properties = JSONUtils . toList ( varPool , Property . class ) ;
properties . forEach ( o - > {
allProperties . put ( o . getProp ( ) , o ) ;
} ) ;
}
}
return allProperties ;
}
private void dealFinish1 ( ) {
// build dag
ProcessDefinition processDefinition = processService . findProcessDefinition ( subProcessInstance . getProcessDefinitionCode ( ) , subProcessInstance . getProcessDefinitionVersion ( ) ) ;
if ( null = = processDefinition ) {
logger . error ( "process definition not found in meta data, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}" ,
subProcessInstance . getProcessDefinitionCode ( ) , subProcessInstance . getProcessDefinitionVersion ( ) , subProcessInstance . getId ( ) ) ;
throw new RuntimeException ( String . format ( "process definition code %s, version %s does not exist" , subProcessInstance . getProcessDefinitionCode ( ) , subProcessInstance . getProcessDefinitionVersion ( ) ) ) ;
}
subProcessInstance . setProcessDefinition ( processDefinition ) ;
DAG < String , TaskNode , TaskNodeRelation > dag = processService . genDagGraph ( subProcessInstance . getProcessDefinition ( ) ) ;
// get end nodes
Set < String > endTaskCodes = dag . getEndNode ( ) . stream ( ) . collect ( Collectors . toSet ( ) ) ;
logger . info ( "in dealFinish1, endTaskCodes:{}" , endTaskCodes ) ;
if ( endTaskCodes = = null | | endTaskCodes . isEmpty ( ) ) {
return ;
}
// get var pool of sub progress instance;
Map < String , Property > varPoolPropertiesMap = mergeEndNodeTaskInstanceVarPool ( endTaskCodes ) ;
logger . debug ( "in dealFinish1, varPoolPropertiesMap:{}" , varPoolPropertiesMap ) ;
// merge var pool: 1. task instance var pool from pre task ; 2. var pool from sub progress
// filter by localParams
String taskVarPool = taskInstance . getVarPool ( ) ;
Map < String , Property > taskVarPoolProperties = new HashMap < > ( ) ;
if ( StringUtils . isNotEmpty ( taskVarPool ) ) {
taskVarPoolProperties = JSONUtils . toList ( taskVarPool , Property . class ) . stream ( ) . collect ( Collectors . toMap ( Property : : getProp , ( p ) - > p ) ) ;
}
Map < String , Object > taskParams = JSONUtils . parseObject ( taskInstance . getTaskParams ( ) , new TypeReference < Map < String , Object > > ( ) {
} ) ;
Object localParams = taskParams . get ( LOCAL_PARAMS ) ;
Map < String , Property > outProperties = new HashMap < > ( ) ;
if ( localParams ! = null ) {
List < Property > properties = JSONUtils . toList ( JSONUtils . toJsonString ( localParams ) , Property . class ) ;
outProperties = properties . stream ( ) . filter ( r - > Direct . OUT = = r . getDirect ( ) ) . collect ( Collectors . toMap ( Property : : getProp , ( p ) - > p ) ) ;
// put all task instance var pool from pre task
outProperties . putAll ( taskVarPoolProperties ) ;
for ( Map . Entry < String , Property > o : outProperties . entrySet ( ) ) {
if ( varPoolPropertiesMap . containsKey ( o . getKey ( ) ) ) {
o . getValue ( ) . setValue ( varPoolPropertiesMap . get ( o . getKey ( ) ) . getValue ( ) ) ;
}
}
} else {
outProperties . putAll ( taskVarPoolProperties ) ;
outProperties . putAll ( varPoolPropertiesMap ) ;
}
taskInstance . setVarPool ( JSONUtils . toJsonString ( outProperties . values ( ) ) ) ;
logger . debug ( "in dealFinish1, varPool:{}" , taskInstance . getVarPool ( ) ) ;
//deal with localParam for show in the page
processService . changeOutParam ( taskInstance ) ;
}
@Override
protected boolean persistTask ( TaskAction taskAction ) {
switch ( taskAction ) {
@ -175,6 +265,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
sendToSubProcess ( ) ;
this . taskInstance . setState ( ExecutionStatus . KILL ) ;
this . taskInstance . setEndTime ( new Date ( ) ) ;
dealFinish1 ( ) ;
processService . saveTaskInstance ( taskInstance ) ;
return true ;
}