From f6a2130ba1cbe34558207725c3148ecd72f8e1d3 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Thu, 20 Feb 2020 12:11:13 +0800 Subject: [PATCH] master/worker basic communication --- .../master/runner/MasterBaseTaskExecThread.java | 2 +- .../worker/processor/WorkerNettyRequestProcessor.java | 11 +++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index 25447af010..3b578f72e2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -124,7 +124,7 @@ public class MasterBaseTaskExecThread implements Callable { final Address address = new Address("192.168.220.247", 12346); ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand(taskInstanceJson); try { - Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), 5000); + Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), Integer.MAX_VALUE); logger.info("response result : {}",response); } catch (InterruptedException | RemotingException ex) { logger.error(String.format("send command to : %s error", address), ex); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java index b1f5da0916..d8a288d805 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java @@ -30,6 +30,8 @@ import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; +import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; +import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -64,9 +66,11 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor { @Override public void process(Channel channel, Command command) { - Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); + Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), + String.format("invalid command type : %s", command.getType())); logger.debug("received command : {}", command); - ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskRequestCommand.class); + ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize( + command.getBody(), ExecuteTaskRequestCommand.class); String taskInstanceJson = taskRequestCommand.getTaskInstanceJson(); @@ -101,6 +105,9 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor { // submit task taskInstanceCallbackService.addCallbackChannel(taskInstance.getId(), new CallbackChannel(channel, command.getOpaque())); workerExecService.submit(new TaskScheduleThread(taskInstance, processService, taskInstanceCallbackService)); + + ExecuteTaskResponseCommand executeTaskResponseCommand = new ExecuteTaskResponseCommand(taskInstance.getId()); + channel.writeAndFlush(executeTaskResponseCommand.convert2Command()); } private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {