Browse Source

Fix message in MessageRetryRunner might disorder (#14725)

3.2.1-prepare
Wenjun Ruan 1 year ago committed by GitHub
parent
commit
3efcf0c5ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 79
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java

79
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java

@ -27,12 +27,15 @@ import org.apache.dolphinscheduler.remote.command.MessageType;
import org.apache.commons.collections4.MapUtils; import org.apache.commons.collections4.MapUtils;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import lombok.Data;
import lombok.NonNull; import lombok.NonNull;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -40,6 +43,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.google.common.base.Objects;
@Component @Component
@Slf4j @Slf4j
public class MessageRetryRunner extends BaseDaemonThread { public class MessageRetryRunner extends BaseDaemonThread {
@ -48,15 +53,15 @@ public class MessageRetryRunner extends BaseDaemonThread {
super("WorkerMessageRetryRunnerThread"); super("WorkerMessageRetryRunnerThread");
} }
private static long MESSAGE_RETRY_WINDOW = Duration.ofMinutes(5L).toMillis(); private static final long MESSAGE_RETRY_WINDOW = Duration.ofMinutes(5L).toMillis();
@Lazy @Lazy
@Autowired @Autowired
private List<MessageSender> messageSenders; private List<MessageSender> messageSenders;
private Map<MessageType, MessageSender<BaseMessage>> messageSenderMap = new HashMap<>(); private final Map<MessageType, MessageSender<BaseMessage>> messageSenderMap = new HashMap<>();
private Map<Integer, Map<MessageType, BaseMessage>> needToRetryMessages = new ConcurrentHashMap<>(); private final Map<Integer, List<TaskInstanceMessage>> needToRetryMessages = new ConcurrentHashMap<>();
@Override @Override
public synchronized void start() { public synchronized void start() {
@ -70,14 +75,14 @@ public class MessageRetryRunner extends BaseDaemonThread {
} }
public void addRetryMessage(int taskInstanceId, @NonNull MessageType messageType, BaseMessage baseMessage) { public void addRetryMessage(int taskInstanceId, @NonNull MessageType messageType, BaseMessage baseMessage) {
needToRetryMessages.computeIfAbsent(taskInstanceId, k -> new ConcurrentHashMap<>()).put(messageType, needToRetryMessages.computeIfAbsent(taskInstanceId, k -> Collections.synchronizedList(new ArrayList<>()))
baseMessage); .add(TaskInstanceMessage.of(taskInstanceId, messageType, baseMessage));
} }
public void removeRetryMessage(int taskInstanceId, @NonNull MessageType messageType) { public void removeRetryMessage(int taskInstanceId, @NonNull MessageType messageType) {
Map<MessageType, BaseMessage> retryMessages = needToRetryMessages.get(taskInstanceId); List<TaskInstanceMessage> taskInstanceMessages = needToRetryMessages.get(taskInstanceId);
if (retryMessages != null) { if (taskInstanceMessages != null) {
retryMessages.remove(messageType); taskInstanceMessages.remove(TaskInstanceMessage.of(taskInstanceId, messageType, null));
} }
} }
@ -86,10 +91,10 @@ public class MessageRetryRunner extends BaseDaemonThread {
} }
public void updateMessageHost(int taskInstanceId, String messageReceiverHost) { public void updateMessageHost(int taskInstanceId, String messageReceiverHost) {
Map<MessageType, BaseMessage> needToRetryMessages = this.needToRetryMessages.get(taskInstanceId); List<TaskInstanceMessage> taskInstanceMessages = this.needToRetryMessages.get(taskInstanceId);
if (needToRetryMessages != null) { if (taskInstanceMessages != null) {
needToRetryMessages.values().forEach(baseMessage -> { taskInstanceMessages.forEach(taskInstanceMessage -> {
baseMessage.setMessageReceiverAddress(messageReceiverHost); taskInstanceMessage.getMessage().setMessageReceiverAddress(messageReceiverHost);
}); });
} }
} }
@ -102,21 +107,21 @@ public class MessageRetryRunner extends BaseDaemonThread {
} }
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
Iterator<Map.Entry<Integer, Map<MessageType, BaseMessage>>> iterator = Iterator<Map.Entry<Integer, List<TaskInstanceMessage>>> iterator =
needToRetryMessages.entrySet().iterator(); needToRetryMessages.entrySet().iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
Map.Entry<Integer, Map<MessageType, BaseMessage>> taskEntry = iterator.next(); Map.Entry<Integer, List<TaskInstanceMessage>> taskEntry = iterator.next();
Integer taskInstanceId = taskEntry.getKey(); Integer taskInstanceId = taskEntry.getKey();
Map<MessageType, BaseMessage> retryMessageMap = taskEntry.getValue(); List<TaskInstanceMessage> taskInstanceMessages = taskEntry.getValue();
if (retryMessageMap.isEmpty()) { if (taskInstanceMessages.isEmpty()) {
iterator.remove(); iterator.remove();
continue; continue;
} }
LogUtils.setTaskInstanceIdMDC(taskInstanceId); LogUtils.setTaskInstanceIdMDC(taskInstanceId);
try { try {
for (Map.Entry<MessageType, BaseMessage> messageEntry : retryMessageMap.entrySet()) { for (TaskInstanceMessage taskInstanceMessage : taskInstanceMessages) {
MessageType messageType = messageEntry.getKey(); MessageType messageType = taskInstanceMessage.getMessageType();
BaseMessage message = messageEntry.getValue(); BaseMessage message = taskInstanceMessage.getMessage();
if (now - message.getMessageSendTime() > MESSAGE_RETRY_WINDOW) { if (now - message.getMessageSendTime() > MESSAGE_RETRY_WINDOW) {
log.info("Begin retry send message to master, message: {}", message); log.info("Begin retry send message to master, message: {}", message);
message.setMessageSendTime(now); message.setMessageSendTime(now);
@ -144,4 +149,40 @@ public class MessageRetryRunner extends BaseDaemonThread {
public void clearMessage() { public void clearMessage() {
needToRetryMessages.clear(); needToRetryMessages.clear();
} }
/**
* If two message has the same taskInstanceId and messageType they will be considered as the same message
*/
@Data
public static class TaskInstanceMessage {
private long taskInstanceId;
private MessageType messageType;
private BaseMessage message;
public static TaskInstanceMessage of(long taskInstanceId, MessageType messageType, BaseMessage message) {
TaskInstanceMessage taskInstanceMessage = new TaskInstanceMessage();
taskInstanceMessage.setTaskInstanceId(taskInstanceId);
taskInstanceMessage.setMessageType(messageType);
taskInstanceMessage.setMessage(message);
return taskInstanceMessage;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TaskInstanceMessage that = (TaskInstanceMessage) o;
return taskInstanceId == that.taskInstanceId && messageType == that.messageType;
}
@Override
public int hashCode() {
return Objects.hashCode(taskInstanceId, messageType);
}
}
} }

Loading…
Cancel
Save