From c4fc59c9bb4797fcc044c1c335b1c0691da5994c Mon Sep 17 00:00:00 2001 From: Kerwin <37063904+zhuangchong@users.noreply.github.com> Date: Mon, 14 Mar 2022 16:22:48 +0800 Subject: [PATCH] [Fix][Master] Fix master-server execution logic thread unsafe problem (#8707) * Fix thread safety issue in master service * optimize code. --- .../server/master/consumer/TaskPriorityQueueConsumer.java | 3 ++- .../server/master/runner/MasterSchedulerService.java | 4 +++- .../dolphinscheduler/service/process/ProcessService.java | 8 ++++---- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index c6562bc8d7..c65195c4ce 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.CountDownLatch; @@ -120,7 +121,7 @@ public class TaskPriorityQueueConsumer extends Thread { * batch dispatch with thread pool */ private List batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException { - List failedDispatchTasks = new ArrayList<>(); + List failedDispatchTasks = Collections.synchronizedList(new ArrayList<>()); CountDownLatch latch = new CountDownLatch(fetchTaskNum); for (int i = 0; i < fetchTaskNum; i++) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 2202cc57fb..9d61aa1dec 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.collections4.CollectionUtils; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; @@ -179,7 +180,8 @@ public class MasterSchedulerService extends Thread { } private List command2ProcessInstance(List commands) { - List processInstances = new ArrayList<>(commands.size()); + + List processInstances = Collections.synchronizedList(new ArrayList<>(commands.size())); CountDownLatch latch = new CountDownLatch(commands.size()); for (final Command command : commands) { masterPrepareExecService.execute(() -> { diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 8fabaf98e4..b62e6ed5a1 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -886,11 +886,11 @@ public class ProcessService { //reset command parameter if (processInstance.getCommandParam() != null) { Map processCmdParam = JSONUtils.toMap(processInstance.getCommandParam()); - for (Map.Entry entry : processCmdParam.entrySet()) { - if (!cmdParam.containsKey(entry.getKey())) { - cmdParam.put(entry.getKey(), entry.getValue()); + processCmdParam.forEach((key, value) -> { + if (!cmdParam.containsKey(key)) { + cmdParam.put(key, value); } - } + }); } // reset command parameter if sub process if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {