From 939fa0c3bac50b43a519dfd3f4a6b8db1be92e92 Mon Sep 17 00:00:00 2001 From: Kerwin <37063904+zhuangchong@users.noreply.github.com> Date: Fri, 10 Dec 2021 16:55:56 +0800 Subject: [PATCH] fix TaskResponseProcessor process NullPointerException (#7318) --- .../server/master/MasterServer.java | 22 ++++++++++++++----- .../executor/NettyExecutorManager.java | 5 ++++- .../master/processor/CacheProcessor.java | 7 +++--- .../master/processor/StateEventProcessor.java | 8 +++---- .../master/processor/TaskEventProcessor.java | 8 +++---- .../processor/TaskResponseProcessor.java | 2 ++ .../master/processor/CacheProcessorTest.java | 12 +++------- 7 files changed, 38 insertions(+), 26 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 9157ad704b..41bc63a6dc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -77,6 +77,18 @@ public class MasterServer implements IStoppable { @Autowired private TaskAckProcessor taskAckProcessor; + @Autowired + private TaskResponseProcessor taskResponseProcessor; + + @Autowired + private TaskEventProcessor taskEventProcessor; + + @Autowired + private StateEventProcessor stateEventProcessor; + + @Autowired + private CacheProcessor cacheProcessor; + public static void main(String[] args) { Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER); SpringApplication.run(MasterServer.class); @@ -91,13 +103,13 @@ public class MasterServer implements IStoppable { NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(masterConfig.getListenPort()); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, new StateEventProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, new TaskEventProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, new TaskEventProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, new CacheProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor); this.nettyRemotingServer.start(); // self tolerant diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index 886a7fc64c..b0b8d25799 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -62,6 +62,9 @@ public class NettyExecutorManager extends AbstractExecutorManager{ @Autowired private TaskAckProcessor taskAckProcessor; + @Autowired + private TaskResponseProcessor taskResponseProcessor; + /** * netty remote client */ @@ -81,7 +84,7 @@ public class NettyExecutorManager extends AbstractExecutorManager{ * register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor * register EXECUTE_TASK_ACK command type TaskAckProcessor */ - this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); + this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor); this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java index 6db7f65d7f..0712162829 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java @@ -27,8 +27,10 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; +import org.springframework.stereotype.Component; import com.google.common.base.Preconditions; @@ -37,10 +39,12 @@ import io.netty.channel.Channel; /** * cache process from master/api */ +@Component public class CacheProcessor implements NettyRequestProcessor { private final Logger logger = LoggerFactory.getLogger(CacheProcessor.class); + @Autowired private CacheManager cacheManager; @Override @@ -55,9 +59,6 @@ public class CacheProcessor implements NettyRequestProcessor { } private void cacheExpire(CacheExpireCommand cacheExpireCommand) { - if (cacheManager == null) { - cacheManager = SpringApplicationContext.getBean(CacheManager.class); - } if (cacheExpireCommand.getCacheKey().isEmpty()) { return; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java index b03940395d..6b4c3daf20 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java @@ -30,6 +30,8 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; import com.google.common.base.Preconditions; @@ -38,16 +40,14 @@ import io.netty.channel.Channel; /** * handle state event received from master/api */ +@Component public class StateEventProcessor implements NettyRequestProcessor { private final Logger logger = LoggerFactory.getLogger(StateEventProcessor.class); + @Autowired private StateEventResponseService stateEventResponseService; - public StateEventProcessor() { - stateEventResponseService = SpringApplicationContext.getBean(StateEventResponseService.class); - } - @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.STATE_EVENT_REQUEST == command.getType(), String.format("invalid command type: %s", command.getType())); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java index 2200e8315a..64a22cbdaf 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java @@ -29,6 +29,8 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; import com.google.common.base.Preconditions; @@ -37,16 +39,14 @@ import io.netty.channel.Channel; /** * handle state event received from master/api */ +@Component public class TaskEventProcessor implements NettyRequestProcessor { private final Logger logger = LoggerFactory.getLogger(TaskEventProcessor.class); + @Autowired private StateEventResponseService stateEventResponseService; - public TaskEventProcessor() { - stateEventResponseService = SpringApplicationContext.getBean(StateEventResponseService.class); - } - @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_FORCE_STATE_EVENT_REQUEST == command.getType() diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index 966a07d433..a367cfe4ae 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -34,10 +34,12 @@ import com.google.common.base.Preconditions; import io.netty.channel.Channel; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; /** * task response processor */ +@Component public class TaskResponseProcessor implements NettyRequestProcessor { private final Logger logger = LoggerFactory.getLogger(TaskResponseProcessor.class); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java index 5c177ca94e..20befa0f1c 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java @@ -21,15 +21,13 @@ import org.apache.dolphinscheduler.common.enums.CacheType; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.remote.command.CacheExpireCommand; import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; @@ -40,10 +38,9 @@ import io.netty.channel.Channel; * task ack processor test */ @RunWith(PowerMockRunner.class) -@PrepareForTest({SpringApplicationContext.class}) public class CacheProcessorTest { - - private CacheProcessor cacheProcessor; + @InjectMocks + private CacheProcessor cacheProcessor = new CacheProcessor(); @Mock private Channel channel; @@ -56,10 +53,7 @@ public class CacheProcessorTest { @Before public void before() { - PowerMockito.mockStatic(SpringApplicationContext.class); - PowerMockito.when(SpringApplicationContext.getBean(CacheManager.class)).thenReturn(cacheManager); Mockito.when(cacheManager.getCache(CacheType.TENANT.getCacheName())).thenReturn(cache); - cacheProcessor = new CacheProcessor(); } @Test