Browse Source

cherry-pick Fix MessageSender may be null due to class initialize late #13446

3.1.4-release
Wenjun Ruan 2 years ago committed by zhuangchong
parent
commit
299619c511
  1. 22
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
  2. 9
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java

22
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java

@ -30,17 +30,17 @@ import org.apache.commons.collections.MapUtils;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@ -60,25 +60,21 @@ public class MessageRetryRunner extends BaseDaemonThread {
private static long MESSAGE_RETRY_WINDOW = Duration.ofMinutes(5L).toMillis();
@Lazy
@Autowired
private ApplicationContext applicationContext;
private List<MessageSender> messageSenders;
private Map<CommandType, MessageSender<BaseCommand>> messageSenderMap = new HashMap<>();
private Map<Integer, Map<CommandType, BaseCommand>> needToRetryMessages = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
Map<String, MessageSender> messageSenders = applicationContext.getBeansOfType(MessageSender.class);
messageSenders.values().forEach(messageSender -> {
messageSenderMap.put(messageSender.getMessageType(), messageSender);
logger.info("Injected message sender: {}", messageSender.getClass().getName());
});
}
@Override
public synchronized void start() {
logger.info("Message retry runner staring");
messageSenders.forEach(messageSender -> {
messageSenderMap.put(messageSender.getMessageType(), messageSender);
logger.info("Injected message sender: {}", messageSender.getClass().getName());
});
super.start();
logger.info("Message retry runner started");
}

9
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.message.MessageSender;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
@ -32,7 +33,6 @@ import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import lombok.NonNull;
@ -46,15 +46,14 @@ public class WorkerMessageSender {
private MessageRetryRunner messageRetryRunner;
@Autowired
private ApplicationContext applicationContext;
private List<MessageSender> messageSenders;
private Map<CommandType, MessageSender> messageSenderMap = new HashMap<>();
@PostConstruct
public void init() {
Map<String, MessageSender> messageSenders = applicationContext.getBeansOfType(MessageSender.class);
messageSenders.values().forEach(messageSender -> messageSenderMap.put(messageSender.getMessageType(),
messageSender));
messageSenders.forEach(messageSender -> messageSenderMap.put(messageSender.getMessageType(),
messageSender));
}
// todo: use message rather than context

Loading…
Cancel
Save