@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.* ;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder ;
import org.apache.dolphinscheduler.server.entity.* ;
import org.apache.dolphinscheduler.server.master.config.MasterConfig ;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher ;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext ;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType ;
@ -85,6 +86,13 @@ public class TaskPriorityQueueConsumer extends Thread{
@Autowired
private ExecutorDispatcher dispatcher ;
/ * *
* master config
* /
@Autowired
private MasterConfig masterConfig ;
@PostConstruct
public void init ( ) {
super . setName ( "TaskUpdateQueueConsumerThread" ) ;
@ -93,14 +101,23 @@ public class TaskPriorityQueueConsumer extends Thread{
@Override
public void run ( ) {
List < String > failedDispatchTasks = new ArrayList < > ( ) ;
while ( Stopper . isRunning ( ) ) {
try {
int fetchTaskNum = masterConfig . getMasterDispatchTaskNumber ( ) ;
failedDispatchTasks . clear ( ) ;
for ( int i = 0 ; i < fetchTaskNum ; i + + ) {
// if not task , blocking here
String taskPriorityInfo = taskPriorityQueue . take ( ) ;
TaskPriority taskPriority = TaskPriority . of ( taskPriorityInfo ) ;
dispatch ( taskPriority . getTaskId ( ) ) ;
boolean dispatchResult = dispatch ( taskPriority . getTaskId ( ) ) ;
if ( ! dispatchResult ) {
failedDispatchTasks . add ( taskPriorityInfo ) ;
}
}
for ( String taskPriorityInfo : failedDispatchTasks ) {
taskPriorityQueue . put ( taskPriorityInfo ) ;
}
} catch ( Exception e ) {
logger . error ( "dispatcher task error" , e ) ;
}
@ -114,21 +131,20 @@ public class TaskPriorityQueueConsumer extends Thread{
* @param taskInstanceId taskInstanceId
* @return result
* /
private Boolean dispatch ( int taskInstanceId ) {
private boolean dispatch ( int taskInstanceId ) {
boolean result = false ;
try {
TaskExecutionContext context = getTaskExecutionContext ( taskInstanceId ) ;
ExecutionContext executionContext = new ExecutionContext ( context . toCommand ( ) , ExecutorType . WORKER , context . getWorkerGroup ( ) ) ;
Boolean result = false ;
while ( Stopper . isRunning ( ) ) {
try {
if ( taskInstanceIsFinalState ( taskInstanceId ) ) {
// when task finish, ignore this task, there is no need to dispatch anymore
return true ;
} else {
result = dispatcher . dispatch ( executionContext ) ;
}
} catch ( ExecuteException e ) {
logger . error ( "dispatch error" , e ) ;
ThreadUtils . sleep ( SLEEP_TIME_MILLIS ) ;
}
if ( result | | taskInstanceIsFinalState ( taskInstanceId ) ) {
break ;
}
}
return result ;
}