diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index c6562bc8d7..c65195c4ce 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.CountDownLatch; @@ -120,7 +121,7 @@ public class TaskPriorityQueueConsumer extends Thread { * batch dispatch with thread pool */ private List batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException { - List failedDispatchTasks = new ArrayList<>(); + List failedDispatchTasks = Collections.synchronizedList(new ArrayList<>()); CountDownLatch latch = new CountDownLatch(fetchTaskNum); for (int i = 0; i < fetchTaskNum; i++) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 2202cc57fb..9d61aa1dec 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.collections4.CollectionUtils; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; @@ -179,7 +180,8 @@ public class MasterSchedulerService extends Thread { } private List command2ProcessInstance(List commands) { - List processInstances = new ArrayList<>(commands.size()); + + List processInstances = Collections.synchronizedList(new ArrayList<>(commands.size())); CountDownLatch latch = new CountDownLatch(commands.size()); for (final Command command : commands) { masterPrepareExecService.execute(() -> { diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 8fabaf98e4..b62e6ed5a1 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -886,11 +886,11 @@ public class ProcessService { //reset command parameter if (processInstance.getCommandParam() != null) { Map processCmdParam = JSONUtils.toMap(processInstance.getCommandParam()); - for (Map.Entry entry : processCmdParam.entrySet()) { - if (!cmdParam.containsKey(entry.getKey())) { - cmdParam.put(entry.getKey(), entry.getValue()); + processCmdParam.forEach((key, value) -> { + if (!cmdParam.containsKey(key)) { + cmdParam.put(key, value); } - } + }); } // reset command parameter if sub process if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {