Browse Source

add- register processor

pull/2/head
Technoboy- 5 years ago
parent
commit
5dca139595
  1. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  2. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
  3. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
  4. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -120,8 +120,8 @@ public class MasterServer implements IStoppable {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(45678);
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor(processService));
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor(processService));
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
this.nettyRemotingServer.start();
//

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
@ -28,6 +29,8 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -53,6 +56,8 @@ public class NettyExecutorManager extends AbstractExecutorManager{
public NettyExecutorManager(){
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
}
@Override

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,8 +42,8 @@ public class TaskAckProcessor implements NettyRequestProcessor {
*/
private final ProcessService processService;
public TaskAckProcessor(ProcessService processService){
this.processService = processService;
public TaskAckProcessor(){
this.processService = SpringApplicationContext.getBean(ProcessService.class);
}
@Override

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,8 +42,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
*/
private final ProcessService processService;
public TaskResponseProcessor(ProcessService processService){
this.processService = processService;
public TaskResponseProcessor(){
this.processService = SpringApplicationContext.getBean(ProcessService.class);
}
/**

Loading…
Cancel
Save