Browse Source

Fix memory leak in worker due to message retry map (#12878)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
403c6a6bb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java

13
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java

@ -28,6 +28,7 @@ import org.apache.commons.collections.MapUtils;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -108,11 +109,19 @@ public class MessageRetryRunner extends BaseDaemonThread {
}
long now = System.currentTimeMillis();
for (Map.Entry<Integer, Map<CommandType, BaseCommand>> taskEntry : needToRetryMessages.entrySet()) {
Iterator<Map.Entry<Integer, Map<CommandType, BaseCommand>>> iterator =
needToRetryMessages.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Integer, Map<CommandType, BaseCommand>> taskEntry = iterator.next();
Integer taskInstanceId = taskEntry.getKey();
Map<CommandType, BaseCommand> retryMessageMap = taskEntry.getValue();
if (retryMessageMap.isEmpty()) {
iterator.remove();
continue;
}
LoggerUtils.setTaskInstanceIdMDC(taskInstanceId);
try {
for (Map.Entry<CommandType, BaseCommand> messageEntry : taskEntry.getValue().entrySet()) {
for (Map.Entry<CommandType, BaseCommand> messageEntry : retryMessageMap.entrySet()) {
CommandType messageType = messageEntry.getKey();
BaseCommand message = messageEntry.getValue();
if (now - message.getMessageSendTime() > MESSAGE_RETRY_WINDOW) {

Loading…
Cancel
Save