From 3efcf0c5acc55df05b09187e17042f7552847b04 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 9 Aug 2023 20:19:02 +0800 Subject: [PATCH] Fix message in MessageRetryRunner might disorder (#14725) --- .../worker/message/MessageRetryRunner.java | 79 ++++++++++++++----- 1 file changed, 60 insertions(+), 19 deletions(-) diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java index 830f470a44..44befbe31d 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java @@ -27,12 +27,15 @@ import org.apache.dolphinscheduler.remote.command.MessageType; import org.apache.commons.collections4.MapUtils; import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import lombok.Data; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -40,6 +43,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; +import com.google.common.base.Objects; + @Component @Slf4j public class MessageRetryRunner extends BaseDaemonThread { @@ -48,15 +53,15 @@ public class MessageRetryRunner extends BaseDaemonThread { super("WorkerMessageRetryRunnerThread"); } - private static long MESSAGE_RETRY_WINDOW = Duration.ofMinutes(5L).toMillis(); + private static final long MESSAGE_RETRY_WINDOW = Duration.ofMinutes(5L).toMillis(); @Lazy @Autowired private List messageSenders; - private Map> messageSenderMap = new HashMap<>(); + private final Map> messageSenderMap = new HashMap<>(); - private Map> needToRetryMessages = new ConcurrentHashMap<>(); + private final Map> needToRetryMessages = new ConcurrentHashMap<>(); @Override public synchronized void start() { @@ -70,14 +75,14 @@ public class MessageRetryRunner extends BaseDaemonThread { } public void addRetryMessage(int taskInstanceId, @NonNull MessageType messageType, BaseMessage baseMessage) { - needToRetryMessages.computeIfAbsent(taskInstanceId, k -> new ConcurrentHashMap<>()).put(messageType, - baseMessage); + needToRetryMessages.computeIfAbsent(taskInstanceId, k -> Collections.synchronizedList(new ArrayList<>())) + .add(TaskInstanceMessage.of(taskInstanceId, messageType, baseMessage)); } public void removeRetryMessage(int taskInstanceId, @NonNull MessageType messageType) { - Map retryMessages = needToRetryMessages.get(taskInstanceId); - if (retryMessages != null) { - retryMessages.remove(messageType); + List taskInstanceMessages = needToRetryMessages.get(taskInstanceId); + if (taskInstanceMessages != null) { + taskInstanceMessages.remove(TaskInstanceMessage.of(taskInstanceId, messageType, null)); } } @@ -86,10 +91,10 @@ public class MessageRetryRunner extends BaseDaemonThread { } public void updateMessageHost(int taskInstanceId, String messageReceiverHost) { - Map needToRetryMessages = this.needToRetryMessages.get(taskInstanceId); - if (needToRetryMessages != null) { - needToRetryMessages.values().forEach(baseMessage -> { - baseMessage.setMessageReceiverAddress(messageReceiverHost); + List taskInstanceMessages = this.needToRetryMessages.get(taskInstanceId); + if (taskInstanceMessages != null) { + taskInstanceMessages.forEach(taskInstanceMessage -> { + taskInstanceMessage.getMessage().setMessageReceiverAddress(messageReceiverHost); }); } } @@ -102,21 +107,21 @@ public class MessageRetryRunner extends BaseDaemonThread { } long now = System.currentTimeMillis(); - Iterator>> iterator = + Iterator>> iterator = needToRetryMessages.entrySet().iterator(); while (iterator.hasNext()) { - Map.Entry> taskEntry = iterator.next(); + Map.Entry> taskEntry = iterator.next(); Integer taskInstanceId = taskEntry.getKey(); - Map retryMessageMap = taskEntry.getValue(); - if (retryMessageMap.isEmpty()) { + List taskInstanceMessages = taskEntry.getValue(); + if (taskInstanceMessages.isEmpty()) { iterator.remove(); continue; } LogUtils.setTaskInstanceIdMDC(taskInstanceId); try { - for (Map.Entry messageEntry : retryMessageMap.entrySet()) { - MessageType messageType = messageEntry.getKey(); - BaseMessage message = messageEntry.getValue(); + for (TaskInstanceMessage taskInstanceMessage : taskInstanceMessages) { + MessageType messageType = taskInstanceMessage.getMessageType(); + BaseMessage message = taskInstanceMessage.getMessage(); if (now - message.getMessageSendTime() > MESSAGE_RETRY_WINDOW) { log.info("Begin retry send message to master, message: {}", message); message.setMessageSendTime(now); @@ -144,4 +149,40 @@ public class MessageRetryRunner extends BaseDaemonThread { public void clearMessage() { needToRetryMessages.clear(); } + + /** + * If two message has the same taskInstanceId and messageType they will be considered as the same message + */ + @Data + public static class TaskInstanceMessage { + + private long taskInstanceId; + private MessageType messageType; + private BaseMessage message; + + public static TaskInstanceMessage of(long taskInstanceId, MessageType messageType, BaseMessage message) { + TaskInstanceMessage taskInstanceMessage = new TaskInstanceMessage(); + taskInstanceMessage.setTaskInstanceId(taskInstanceId); + taskInstanceMessage.setMessageType(messageType); + taskInstanceMessage.setMessage(message); + return taskInstanceMessage; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskInstanceMessage that = (TaskInstanceMessage) o; + return taskInstanceId == that.taskInstanceId && messageType == that.messageType; + } + + @Override + public int hashCode() { + return Objects.hashCode(taskInstanceId, messageType); + } + } }