@ -55,6 +55,9 @@ import static org.apache.dolphinscheduler.common.Constants.*;
* /
public class MasterExecThread implements Runnable {
/ * *
* logger of MasterExecThread
* /
private static final Logger logger = LoggerFactory . getLogger ( MasterExecThread . class ) ;
/ * *
@ -62,28 +65,64 @@ public class MasterExecThread implements Runnable {
* /
private ProcessInstance processInstance ;
/ * *
* runing TaskNode
* /
private final Map < MasterBaseTaskExecThread , Future < Boolean > > activeTaskNode = new ConcurrentHashMap < MasterBaseTaskExecThread , Future < Boolean > > ( ) ;
/ * *
* task exec service
* /
private final ExecutorService taskExecService ;
/ * *
* submit failure nodes
* /
private Boolean taskFailedSubmit = false ;
/ * *
* recover node id list
* /
private List < TaskInstance > recoverNodeIdList = new ArrayList < > ( ) ;
/ * *
* error task list
* /
private Map < String , TaskInstance > errorTaskList = new ConcurrentHashMap < > ( ) ;
/ * *
* complete task list
* /
private Map < String , TaskInstance > completeTaskList = new ConcurrentHashMap < > ( ) ;
/ * *
* ready to submit task list
* /
private Map < String , TaskInstance > readyToSubmitTaskList = new ConcurrentHashMap < > ( ) ;
/ * *
* depend failed task map
* /
private Map < String , TaskInstance > dependFailedTask = new ConcurrentHashMap < > ( ) ;
/ * *
* forbidden task map
* /
private Map < String , TaskNode > forbiddenTaskList = new ConcurrentHashMap < > ( ) ;
/ * *
* recover tolerance fault task list
* /
private List < TaskInstance > recoverToleranceFaultTaskList = new ArrayList < > ( ) ;
/ * *
* alert manager
* /
private AlertManager alertManager = new AlertManager ( ) ;
/ * *
* the object of DAG
* /
private DAG < String , TaskNode , TaskNodeRelation > dag ;
/ * *
@ -96,6 +135,11 @@ public class MasterExecThread implements Runnable {
* /
private static Configuration conf ;
/ * *
* constructor of MasterExecThread
* @param processInstance process instance
* @param processDao process dao
* /
public MasterExecThread ( ProcessInstance processInstance , ProcessDao processDao ) {
this . processDao = processDao ;
@ -153,6 +197,10 @@ public class MasterExecThread implements Runnable {
}
}
/ * *
* execute process
* @throws Exception excpetion
* /
private void executeProcess ( ) throws Exception {
prepareProcess ( ) ;
runProcess ( ) ;
@ -160,8 +208,8 @@ public class MasterExecThread implements Runnable {
}
/ * *
* execute complement process
* @throws Exception
* execute complement process
* @throws Exception excpetion
* /
private void executeComplementProcess ( ) throws Exception {
@ -237,7 +285,7 @@ public class MasterExecThread implements Runnable {
/ * *
* prepare process parameter
* @throws Exception
* @throws Exception excpetion
* /
private void prepareProcess ( ) throws Exception {
// init task queue
@ -265,7 +313,7 @@ public class MasterExecThread implements Runnable {
/ * *
* generate process dag
* @throws Exception
* @throws Exception excpetion
* /
private void buildFlowDag ( ) throws Exception {
recoverNodeIdList = getStartTaskInstanceList ( processInstance . getCommandParam ( ) ) ;
@ -277,7 +325,6 @@ public class MasterExecThread implements Runnable {
ProcessDag processDag = generateFlowDag ( processInstance . getProcessInstanceJson ( ) ,
startNodeNameList , recoveryNameList , processInstance . getTaskDependType ( ) ) ;
if ( processDag = = null ) {
//TODO...
logger . error ( "processDag is null" ) ;
return ;
}
@ -286,6 +333,9 @@ public class MasterExecThread implements Runnable {
}
/ * *
* init task queue
* /
private void initTaskQueue ( ) {
taskFailedSubmit = false ;
@ -325,11 +375,10 @@ public class MasterExecThread implements Runnable {
}
}
/ * *
* submit task to execute
* @param taskInstance
* submit task to execute
* @param taskInstance task instance
* @return TaskInstance
* /
private TaskInstance submitTaskExec ( TaskInstance taskInstance ) {
MasterBaseTaskExecThread abstractExecThread = null ;
@ -346,8 +395,8 @@ public class MasterExecThread implements Runnable {
/ * *
* find task instance in db .
* in case submit more than one same name task in the same time .
* @param taskName
* @return
* @param taskName task name
* @return TaskInstance
* /
private TaskInstance findTaskIfExists ( String taskName ) {
List < TaskInstance > taskInstanceList = processDao . findValidTaskListByProcessId ( this . processInstance . getId ( ) ) ;
@ -361,9 +410,9 @@ public class MasterExecThread implements Runnable {
/ * *
* encapsulation task
* @param processInstance
* @param nodeName
* @return
* @param processInstance process instance
* @param nodeName node name
* @return TaskInstance
* /
private TaskInstance createTaskInstance ( ProcessInstance processInstance , String nodeName ,
TaskNode taskNode , String parentNodeName ) {
@ -415,14 +464,11 @@ public class MasterExecThread implements Runnable {
return taskInstance ;
}
/ * *
* get post task instance by node
*
* @param dag
* @param parentNodeName
* @return
* get post task instance by node
* @param dag dag
* @param parentNodeName parent node name
* @return task instance list
* /
private List < TaskInstance > getPostTaskInstanceByNode ( DAG < String , TaskNode , TaskNodeRelation > dag , String parentNodeName ) {
@ -442,10 +488,8 @@ public class MasterExecThread implements Runnable {
}
/ * *
*
* return start task node list
*
* @return
* return start task node list
* @return task instance list
* /
private List < TaskInstance > getStartSubmitTaskList ( ) {
@ -476,8 +520,8 @@ public class MasterExecThread implements Runnable {
}
/ * *
* submit post node
* @param parentNodeName
* submit post node
* @param parentNodeName parent node name
* /
private void submitPostNode ( String parentNodeName ) {
@ -507,7 +551,7 @@ public class MasterExecThread implements Runnable {
/ * *
* determine whether the dependencies of the task node are complete
* @return
* @return DependResult
* /
private DependResult isTaskDepsComplete ( String taskName ) {
@ -545,8 +589,8 @@ public class MasterExecThread implements Runnable {
/ * *
* query task instance by complete state
* @param state
* @return
* @param state state
* @return task isntance list
* /
private List < TaskInstance > getCompleteTaskByState ( ExecutionStatus state ) {
List < TaskInstance > resultList = new ArrayList < > ( ) ;
@ -560,8 +604,8 @@ public class MasterExecThread implements Runnable {
/ * *
* where there are ongoing tasks
* @param state
* @return
* @param state state
* @return ExecutionStatus
* /
private ExecutionStatus runningState ( ExecutionStatus state ) {
if ( state = = ExecutionStatus . READY_STOP | |
@ -575,9 +619,9 @@ public class MasterExecThread implements Runnable {
}
/ * *
* exists failure task , contains submit failure 、 dependency failure , execute failure ( retry after )
* exists failure task , contains submit failure 、 dependency failure , execute failure ( retry after )
*
* @return
* @return Boolean whether has failed task
* /
private Boolean hasFailedTask ( ) {
@ -591,9 +635,9 @@ public class MasterExecThread implements Runnable {
}
/ * *
* process instance failure
* process instance failure
*
* @return
* @return Boolean whether process instance failed
* /
private Boolean processFailed ( ) {
if ( hasFailedTask ( ) ) {
@ -608,11 +652,10 @@ public class MasterExecThread implements Runnable {
}
/ * *
* whether task for waiting thread
* @return
* whether task for waiting thread
* @return Boolean whether has waiting thread task
* /
private Boolean hasWaitingThreadTask ( ) {
List < TaskInstance > waitingList = getCompleteTaskByState ( ExecutionStatus . WAITTING_THREAD ) ;
return waitingList . size ( ) > 0 ;
}
@ -622,7 +665,7 @@ public class MasterExecThread implements Runnable {
* 1 , failed retry task in the preparation queue , returns to failure directly
* 2 , exists pause task , complement not completed , pending submission of tasks , return to suspension
* 3 , success
* @return
* @return ExecutionStatus
* /
private ExecutionStatus processReadyPause ( ) {
if ( hasRetryTaskInStandBy ( ) ) {
@ -642,7 +685,7 @@ public class MasterExecThread implements Runnable {
/ * *
* generate the latest process instance status by the tasks state
* @return
* @return process instance execution status
* /
private ExecutionStatus getProcessInstanceState ( ) {
ProcessInstance instance = processDao . findProcessInstanceById ( processInstance . getId ( ) ) ;
@ -692,8 +735,8 @@ public class MasterExecThread implements Runnable {
}
/ * *
* whether complement end
* @return
* whether complement end
* @return Boolean whether is complement end
* /
private Boolean isComplementEnd ( ) {
if ( ! processInstance . isComplementData ( ) ) {
@ -711,7 +754,7 @@ public class MasterExecThread implements Runnable {
}
/ * *
* updateProcessInstance process instance state
* updateProcessInstance process instance state
* after each batch of tasks is executed , the status of the process instance is updated
* /
private void updateProcessInstanceState ( ) {
@ -732,9 +775,9 @@ public class MasterExecThread implements Runnable {
}
/ * *
* get task dependency result
* @param taskInstance
* @return
* get task dependency result
* @param taskInstance task instance
* @return DependResult
* /
private DependResult getDependResultForTask ( TaskInstance taskInstance ) {
DependResult inner = isTaskDepsComplete ( taskInstance . getName ( ) ) ;
@ -742,8 +785,8 @@ public class MasterExecThread implements Runnable {
}
/ * *
* add task to standy list
* @param taskInstance
* add task to standy list
* @param taskInstance task instance
* /
private void addTaskToStandByList ( TaskInstance taskInstance ) {
logger . info ( "add task to stand by list: {}" , taskInstance . getName ( ) ) ;
@ -751,8 +794,8 @@ public class MasterExecThread implements Runnable {
}
/ * *
* remove task from stand by list
* @param taskInstance
* remove task from stand by list
* @param taskInstance task instance
* /
private void removeTaskFromStandbyList ( TaskInstance taskInstance ) {
logger . info ( "remove task from stand by list: {}" , taskInstance . getName ( ) ) ;
@ -760,8 +803,8 @@ public class MasterExecThread implements Runnable {
}
/ * *
* has retry task in standby
* @return
* has retry task in standby
* @return Boolean whether has retry task in standby
* /
private Boolean hasRetryTaskInStandBy ( ) {
for ( Map . Entry < String , TaskInstance > entry : readyToSubmitTaskList . entrySet ( ) ) {
@ -861,9 +904,9 @@ public class MasterExecThread implements Runnable {
}
/ * *
* check process time out
* @param processInstance
* @return
* whether check process time out
* @param processInstance task instance
* @return true if time out of process instance > running time of process instance
* /
private boolean checkProcessTimeOut ( ProcessInstance processInstance ) {
if ( processInstance . getTimeout ( ) = = 0 ) {
@ -879,6 +922,10 @@ public class MasterExecThread implements Runnable {
return false ;
}
/ * *
* whether can submit task to queue
* @return boolean
* /
private boolean canSubmitTaskToQueue ( ) {
return OSUtils . checkResource ( conf , true ) ;
}
@ -912,9 +959,9 @@ public class MasterExecThread implements Runnable {
}
/ * *
* whether the retry interval is timed out
* @param taskInstance
* @return
* whether the retry interval is timed out
* @param taskInstance task instance
* @return Boolean
* /
private Boolean retryTaskIntervalOverTime ( TaskInstance taskInstance ) {
if ( taskInstance . getState ( ) ! = ExecutionStatus . FAILURE ) {
@ -955,6 +1002,11 @@ public class MasterExecThread implements Runnable {
}
}
/ * *
* get recovery task instance
* @param taskId task id
* @return recovery task instance
* /
private TaskInstance getRecoveryTaskInstance ( String taskId ) {
if ( ! StringUtils . isNotEmpty ( taskId ) ) {
return null ;
@ -974,11 +1026,11 @@ public class MasterExecThread implements Runnable {
}
/ * *
* get start task instance list
* @param cmdParam
* @return
* get start task instance list
* @param cmdParam command param
* @return task instance list
* /
private List < TaskInstance > getStartTaskInstanceList ( String cmdParam ) {
private List < TaskInstance > getStartTaskInstanceList ( String cmdParam ) {
List < TaskInstance > instanceList = new ArrayList < > ( ) ;
Map < String , String > paramMap = JSONUtils . toMap ( cmdParam ) ;
@ -996,9 +1048,9 @@ public class MasterExecThread implements Runnable {
}
/ * *
* parse "StartNodeNameList" from cmd param
* @param cmdParam
* @return
* parse "StartNodeNameList" from cmd param
* @param cmdParam command param
* @return start node name list
* /
private List < String > parseStartNodeName ( String cmdParam ) {
List < String > startNodeNameList = new ArrayList < > ( ) ;
@ -1013,9 +1065,9 @@ public class MasterExecThread implements Runnable {
}
/ * *
* generate start node name list from parsing command param ;
* if "StartNodeIdList" exists in command param , return StartNodeIdList
* @return
* generate start node name list from parsing command param ;
* if "StartNodeIdList" exists in command param , return StartNodeIdList
* @return recovery node name list
* /
private List < String > getRecoveryNodeNameList ( ) {
List < String > recoveryNodeNameList = new ArrayList < > ( ) ;
@ -1029,9 +1081,12 @@ public class MasterExecThread implements Runnable {
/ * *
* generate flow dag
* @param processDefinitionJson
* @return
* @throws Exception
* @param processDefinitionJson process definition json
* @param startNodeNameList start node name list
* @param recoveryNodeNameList recovery node name list
* @param depNodeType depend node type
* @return ProcessDag process dag
* @throws Exception exception
* /
public ProcessDag generateFlowDag ( String processDefinitionJson ,
List < String > startNodeNameList ,