Browse Source

Fix MessageSender may be null due to class initialize late (#13446)

3.2.0-release
Wenjun Ruan 1 year ago committed by GitHub
parent
commit
45694aa697
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 21
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
  2. 7
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java

21
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java

@ -29,17 +29,16 @@ import org.apache.commons.collections4.MapUtils;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
@Component
@ -53,25 +52,21 @@ public class MessageRetryRunner extends BaseDaemonThread {
private static long MESSAGE_RETRY_WINDOW = Duration.ofMinutes(5L).toMillis();
@Lazy
@Autowired
private ApplicationContext applicationContext;
private List<MessageSender> messageSenders;
private Map<CommandType, MessageSender<BaseCommand>> messageSenderMap = new HashMap<>();
private Map<Integer, Map<CommandType, BaseCommand>> needToRetryMessages = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
Map<String, MessageSender> messageSenders = applicationContext.getBeansOfType(MessageSender.class);
messageSenders.values().forEach(messageSender -> {
messageSenderMap.put(messageSender.getMessageType(), messageSender);
logger.info("Injected message sender: {}", messageSender.getClass().getName());
});
}
@Override
public synchronized void start() {
logger.info("Message retry runner staring");
messageSenders.forEach(messageSender -> {
messageSenderMap.put(messageSender.getMessageType(), messageSender);
logger.info("Injected message sender: {}", messageSender.getClass().getName());
});
super.start();
logger.info("Message retry runner started");
}

7
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.message.MessageSender;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
@ -34,7 +35,6 @@ import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
@Component
@ -46,14 +46,13 @@ public class WorkerMessageSender {
private MessageRetryRunner messageRetryRunner;
@Autowired
private ApplicationContext applicationContext;
private List<MessageSender> messageSenders;
private Map<CommandType, MessageSender> messageSenderMap = new HashMap<>();
@PostConstruct
public void init() {
Map<String, MessageSender> messageSenders = applicationContext.getBeansOfType(MessageSender.class);
messageSenders.values().forEach(messageSender -> messageSenderMap.put(messageSender.getMessageType(),
messageSenders.forEach(messageSender -> messageSenderMap.put(messageSender.getMessageType(),
messageSender));
}

Loading…
Cancel
Save