Browse Source

fix TaskResponseProcessor process NullPointerException (#7318)

3.0.0/version-upgrade
Kerwin 3 years ago committed by GitHub
parent
commit
939fa0c3ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  2. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
  3. 7
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
  4. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
  5. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
  6. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
  7. 12
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java

22
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -77,6 +77,18 @@ public class MasterServer implements IStoppable {
@Autowired
private TaskAckProcessor taskAckProcessor;
@Autowired
private TaskResponseProcessor taskResponseProcessor;
@Autowired
private TaskEventProcessor taskEventProcessor;
@Autowired
private StateEventProcessor stateEventProcessor;
@Autowired
private CacheProcessor cacheProcessor;
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
SpringApplication.run(MasterServer.class);
@ -91,13 +103,13 @@ public class MasterServer implements IStoppable {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, new StateEventProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, new TaskEventProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, new TaskEventProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, new CacheProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
this.nettyRemotingServer.start();
// self tolerant

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java

@ -62,6 +62,9 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
@Autowired
private TaskAckProcessor taskAckProcessor;
@Autowired
private TaskResponseProcessor taskResponseProcessor;
/**
* netty remote client
*/
@ -81,7 +84,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
* register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor
* register EXECUTE_TASK_ACK command type TaskAckProcessor
*/
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
}

7
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java

@ -27,8 +27,10 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@ -37,10 +39,12 @@ import io.netty.channel.Channel;
/**
* cache process from master/api
*/
@Component
public class CacheProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(CacheProcessor.class);
@Autowired
private CacheManager cacheManager;
@Override
@ -55,9 +59,6 @@ public class CacheProcessor implements NettyRequestProcessor {
}
private void cacheExpire(CacheExpireCommand cacheExpireCommand) {
if (cacheManager == null) {
cacheManager = SpringApplicationContext.getBean(CacheManager.class);
}
if (cacheExpireCommand.getCacheKey().isEmpty()) {
return;

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java

@ -30,6 +30,8 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@ -38,16 +40,14 @@ import io.netty.channel.Channel;
/**
* handle state event received from master/api
*/
@Component
public class StateEventProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(StateEventProcessor.class);
@Autowired
private StateEventResponseService stateEventResponseService;
public StateEventProcessor() {
stateEventResponseService = SpringApplicationContext.getBean(StateEventResponseService.class);
}
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.STATE_EVENT_REQUEST == command.getType(), String.format("invalid command type: %s", command.getType()));

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java

@ -29,6 +29,8 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@ -37,16 +39,14 @@ import io.netty.channel.Channel;
/**
* handle state event received from master/api
*/
@Component
public class TaskEventProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskEventProcessor.class);
@Autowired
private StateEventResponseService stateEventResponseService;
public TaskEventProcessor() {
stateEventResponseService = SpringApplicationContext.getBean(StateEventResponseService.class);
}
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_FORCE_STATE_EVENT_REQUEST == command.getType()

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

@ -34,10 +34,12 @@ import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* task response processor
*/
@Component
public class TaskResponseProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskResponseProcessor.class);

12
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java

@ -21,15 +21,13 @@ import org.apache.dolphinscheduler.common.enums.CacheType;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
@ -40,10 +38,9 @@ import io.netty.channel.Channel;
* task ack processor test
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class})
public class CacheProcessorTest {
private CacheProcessor cacheProcessor;
@InjectMocks
private CacheProcessor cacheProcessor = new CacheProcessor();
@Mock
private Channel channel;
@ -56,10 +53,7 @@ public class CacheProcessorTest {
@Before
public void before() {
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(CacheManager.class)).thenReturn(cacheManager);
Mockito.when(cacheManager.getCache(CacheType.TENANT.getCacheName())).thenReturn(cache);
cacheProcessor = new CacheProcessor();
}
@Test

Loading…
Cancel
Save