From 7a05c007b45ea742e3ed1a7558c1a641515c140c Mon Sep 17 00:00:00 2001 From: bao liang <29528966+lenboo@users.noreply.github.com> Date: Thu, 21 May 2020 10:34:14 +0800 Subject: [PATCH] [BUF FIX]fix: the master would be blocked when worker group not exists (#2764) * feature: add number configuration for master dispatch tasks * fix bug(#2762) the master would be blocked when worker group not exists * fix bug(#2762) the master would be blocked when worker group not exists * fix ut * fix ut Co-authored-by: baoliang --- .../server/master/config/MasterConfig.java | 11 ++++ .../consumer/TaskPriorityQueueConsumer.java | 54 ++++++++++++------- .../src/main/resources/master.properties | 4 ++ .../TaskPriorityQueueConsumerTest.java | 3 +- .../runner/MasterTaskExecThreadTest.java | 2 +- .../server/registry/DependencyConfig.java | 5 ++ 6 files changed, 58 insertions(+), 21 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index 5b4b5daef1..efd29ddd3c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -37,6 +37,9 @@ public class MasterConfig { @Value("${master.task.commit.retryTimes:5}") private int masterTaskCommitRetryTimes; + @Value("${master.dispatch.task.num :3}") + private int masterDispatchTaskNumber; + @Value("${master.task.commit.interval:1000}") private int masterTaskCommitInterval; @@ -126,4 +129,12 @@ public class MasterConfig { public void setMasterReservedMemory(double masterReservedMemory) { this.masterReservedMemory = masterReservedMemory; } + + public int getMasterDispatchTaskNumber() { + return masterDispatchTaskNumber; + } + + public void setMasterDispatchTaskNumber(int masterDispatchTaskNumber) { + this.masterDispatchTaskNumber = masterDispatchTaskNumber; + } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 3314789fdb..6e19ad1847 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.entity.*; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; @@ -85,6 +86,13 @@ public class TaskPriorityQueueConsumer extends Thread{ @Autowired private ExecutorDispatcher dispatcher; + + /** + * master config + */ + @Autowired + private MasterConfig masterConfig; + @PostConstruct public void init(){ super.setName("TaskUpdateQueueConsumerThread"); @@ -93,14 +101,23 @@ public class TaskPriorityQueueConsumer extends Thread{ @Override public void run() { + List failedDispatchTasks = new ArrayList<>(); while (Stopper.isRunning()){ try { - // if not task , blocking here - String taskPriorityInfo = taskPriorityQueue.take(); - - TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo); - - dispatch(taskPriority.getTaskId()); + int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber(); + failedDispatchTasks.clear(); + for(int i = 0; i < fetchTaskNum; i++){ + // if not task , blocking here + String taskPriorityInfo = taskPriorityQueue.take(); + TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo); + boolean dispatchResult = dispatch(taskPriority.getTaskId()); + if(!dispatchResult){ + failedDispatchTasks.add(taskPriorityInfo); + } + } + for(String taskPriorityInfo: failedDispatchTasks){ + taskPriorityQueue.put(taskPriorityInfo); + } }catch (Exception e){ logger.error("dispatcher task error",e); } @@ -114,21 +131,20 @@ public class TaskPriorityQueueConsumer extends Thread{ * @param taskInstanceId taskInstanceId * @return result */ - private Boolean dispatch(int taskInstanceId){ - TaskExecutionContext context = getTaskExecutionContext(taskInstanceId); - ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup()); - Boolean result = false; - while (Stopper.isRunning()){ - try { + private boolean dispatch(int taskInstanceId){ + boolean result = false; + try { + TaskExecutionContext context = getTaskExecutionContext(taskInstanceId); + ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup()); + + if (taskInstanceIsFinalState(taskInstanceId)){ + // when task finish, ignore this task, there is no need to dispatch anymore + return true; + }else{ result = dispatcher.dispatch(executionContext); - } catch (ExecuteException e) { - logger.error("dispatch error",e); - ThreadUtils.sleep(SLEEP_TIME_MILLIS); - } - - if (result || taskInstanceIsFinalState(taskInstanceId)){ - break; } + } catch (ExecuteException e) { + logger.error("dispatch error",e); } return result; } diff --git a/dolphinscheduler-server/src/main/resources/master.properties b/dolphinscheduler-server/src/main/resources/master.properties index f09f4693fc..44301fb54e 100644 --- a/dolphinscheduler-server/src/main/resources/master.properties +++ b/dolphinscheduler-server/src/main/resources/master.properties @@ -21,6 +21,10 @@ # master execute task number in parallel #master.exec.task.num=20 + +# master dispatch task number +#master.dispatch.task.num = 3 + # master heartbeat interval #master.heartbeat.interval=10 diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index b6f118a734..b14dbc3ccf 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.dao.entity.*; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.registry.DependencyConfig; @@ -48,7 +49,7 @@ import java.util.Date; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class, - ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class}) + ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, MasterConfig.class}) public class TaskPriorityQueueConsumerTest { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java index ebddec4fb1..2606dc1c1f 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java @@ -42,7 +42,7 @@ import java.util.Set; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class, - ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, MasterConfig.class}) + ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class}) public class MasterTaskExecThreadTest { @Test diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java index e0c4188aba..30f1053d3a 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.registry; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager; import org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager; @@ -63,6 +64,10 @@ public class DependencyConfig { return Mockito.mock(ProcessService.class); } + @Bean + public MasterConfig masterConfig(){ + return Mockito.mock(MasterConfig.class); + } @Bean public UserMapper userMapper(){ return Mockito.mock(UserMapper.class);