Browse Source

[DS-6640][WorkerServer] support process update host command type (#6642)

Co-authored-by: caishunfeng <534328519@qq.com>
2.0.7-release
wind 3 years ago committed by lenboo
parent
commit
cd173ac140
  1. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
  2. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java
  3. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateResponseCommand.java
  4. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  5. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java

@ -126,7 +126,7 @@ public enum CommandType {
/** /**
* process host update * process host update
*/ */
PROCESS_HOST_UPDATE_REQUST, PROCESS_HOST_UPDATE_REQUEST,
/** /**
* process host update response * process host update response

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java

@ -56,7 +56,7 @@ public class HostUpdateCommand implements Serializable {
*/ */
public Command convert2Command() { public Command convert2Command() {
Command command = new Command(); Command command = new Command();
command.setType(CommandType.PROCESS_HOST_UPDATE_REQUST); command.setType(CommandType.PROCESS_HOST_UPDATE_REQUEST);
byte[] body = JSONUtils.toJsonByteArray(this); byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body); command.setBody(body);
return command; return command;

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateResponseCommand.java

@ -66,7 +66,7 @@ public class HostUpdateResponseCommand implements Serializable {
*/ */
public Command convert2Command() { public Command convert2Command() {
Command command = new Command(); Command command = new Command();
command.setType(CommandType.PROCESS_HOST_UPDATE_REQUST); command.setType(CommandType.PROCESS_HOST_UPDATE_REQUEST);
byte[] body = JSONUtils.toJsonByteArray(this); byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body); command.setBody(body);
return command; return command;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager; import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor; import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor; import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
@ -140,6 +141,7 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, new HostUpdateProcessor());
this.nettyRemotingServer.start(); this.nettyRemotingServer.start();
// worker registry // worker registry

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java

@ -51,7 +51,7 @@ public class HostUpdateProcessor implements NettyRequestProcessor {
@Override @Override
public void process(Channel channel, Command command) { public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUST == command.getType(), String.format("invalid command type : %s", command.getType())); Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
HostUpdateCommand updateCommand = JSONUtils.parseObject(command.getBody(), HostUpdateCommand.class); HostUpdateCommand updateCommand = JSONUtils.parseObject(command.getBody(), HostUpdateCommand.class);
logger.info("received host update command : {}", updateCommand); logger.info("received host update command : {}", updateCommand);
taskCallbackService.changeRemoteChannel(updateCommand.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); taskCallbackService.changeRemoteChannel(updateCommand.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));

Loading…
Cancel
Save