|
|
|
@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
|
|
|
|
|
|
|
|
|
|
import java.time.Duration; |
|
|
|
|
import java.util.HashMap; |
|
|
|
|
import java.util.Iterator; |
|
|
|
|
import java.util.Map; |
|
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
|
|
|
|
|
@ -106,11 +107,19 @@ public class MessageRetryRunner extends BaseDaemonThread {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
long now = System.currentTimeMillis(); |
|
|
|
|
for (Map.Entry<Integer, Map<CommandType, BaseCommand>> taskEntry : needToRetryMessages.entrySet()) { |
|
|
|
|
Iterator<Map.Entry<Integer, Map<CommandType, BaseCommand>>> iterator = |
|
|
|
|
needToRetryMessages.entrySet().iterator(); |
|
|
|
|
while (iterator.hasNext()) { |
|
|
|
|
Map.Entry<Integer, Map<CommandType, BaseCommand>> taskEntry = iterator.next(); |
|
|
|
|
Integer taskInstanceId = taskEntry.getKey(); |
|
|
|
|
Map<CommandType, BaseCommand> retryMessageMap = taskEntry.getValue(); |
|
|
|
|
if (retryMessageMap.isEmpty()) { |
|
|
|
|
iterator.remove(); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
LoggerUtils.setTaskInstanceIdMDC(taskInstanceId); |
|
|
|
|
try { |
|
|
|
|
for (Map.Entry<CommandType, BaseCommand> messageEntry : taskEntry.getValue().entrySet()) { |
|
|
|
|
for (Map.Entry<CommandType, BaseCommand> messageEntry : retryMessageMap.entrySet()) { |
|
|
|
|
CommandType messageType = messageEntry.getKey(); |
|
|
|
|
BaseCommand message = messageEntry.getValue(); |
|
|
|
|
if (now - message.getMessageSendTime() > MESSAGE_RETRY_WINDOW) { |
|
|
|
|