diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java index fec345ecf0..6f11ab4d0d 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java @@ -28,6 +28,7 @@ import org.apache.commons.collections.MapUtils; import java.time.Duration; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -108,11 +109,19 @@ public class MessageRetryRunner extends BaseDaemonThread { } long now = System.currentTimeMillis(); - for (Map.Entry> taskEntry : needToRetryMessages.entrySet()) { + Iterator>> iterator = + needToRetryMessages.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry> taskEntry = iterator.next(); Integer taskInstanceId = taskEntry.getKey(); + Map retryMessageMap = taskEntry.getValue(); + if (retryMessageMap.isEmpty()) { + iterator.remove(); + continue; + } LoggerUtils.setTaskInstanceIdMDC(taskInstanceId); try { - for (Map.Entry messageEntry : taskEntry.getValue().entrySet()) { + for (Map.Entry messageEntry : retryMessageMap.entrySet()) { CommandType messageType = messageEntry.getKey(); BaseCommand message = messageEntry.getValue(); if (now - message.getMessageSendTime() > MESSAGE_RETRY_WINDOW) {