Browse Source

Refactor worker (#2000)

* Refactor worker (#2)

* Refactor worker (#1993)

* Refactor worker (#1)

* add TaskResponseProcessor (#1983)

* 1, master persistent task 2. extract  master and worker communication model (#1992)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* TaskExecutionContext create modify (#1994)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

* add- register processor

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>
pull/2/head
Tboy 5 years ago committed by GitHub
parent
commit
c7be43cbd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  2. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
  3. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
  4. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -120,8 +120,8 @@ public class MasterServer implements IStoppable {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(45678);
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor(processService));
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor(processService));
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
this.nettyRemotingServer.start();
//

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
@ -28,6 +29,8 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -53,6 +56,8 @@ public class NettyExecutorManager extends AbstractExecutorManager{
public NettyExecutorManager(){
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
}
@Override

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,8 +42,8 @@ public class TaskAckProcessor implements NettyRequestProcessor {
*/
private final ProcessService processService;
public TaskAckProcessor(ProcessService processService){
this.processService = processService;
public TaskAckProcessor(){
this.processService = SpringApplicationContext.getBean(ProcessService.class);
}
@Override

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,8 +42,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
*/
private final ProcessService processService;
public TaskResponseProcessor(ProcessService processService){
this.processService = processService;
public TaskResponseProcessor(){
this.processService = SpringApplicationContext.getBean(ProcessService.class);
}
/**

Loading…
Cancel
Save