Browse Source

[Fix][Master] Fix master-server execution logic thread unsafe problem (#8707)

* Fix thread safety issue in master service

* optimize code.
3.0.0/version-upgrade
Kerwin 2 years ago committed by GitHub
parent
commit
c4fc59c9bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  2. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  3. 8
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
@ -120,7 +121,7 @@ public class TaskPriorityQueueConsumer extends Thread {
* batch dispatch with thread pool
*/
private List<TaskPriority> batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException {
List<TaskPriority> failedDispatchTasks = new ArrayList<>();
List<TaskPriority> failedDispatchTasks = Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(fetchTaskNum);
for (int i = 0; i < fetchTaskNum; i++) {

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
@ -179,7 +180,8 @@ public class MasterSchedulerService extends Thread {
}
private List<ProcessInstance> command2ProcessInstance(List<Command> commands) {
List<ProcessInstance> processInstances = new ArrayList<>(commands.size());
List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));
CountDownLatch latch = new CountDownLatch(commands.size());
for (final Command command : commands) {
masterPrepareExecService.execute(() -> {

8
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -886,11 +886,11 @@ public class ProcessService {
//reset command parameter
if (processInstance.getCommandParam() != null) {
Map<String, String> processCmdParam = JSONUtils.toMap(processInstance.getCommandParam());
for (Map.Entry<String, String> entry : processCmdParam.entrySet()) {
if (!cmdParam.containsKey(entry.getKey())) {
cmdParam.put(entry.getKey(), entry.getValue());
processCmdParam.forEach((key, value) -> {
if (!cmdParam.containsKey(key)) {
cmdParam.put(key, value);
}
}
});
}
// reset command parameter if sub process
if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {

Loading…
Cancel
Save