From df0416c1937b8a3a774c6070b958769c48a6a5e0 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 13 Jul 2022 20:46:33 +0800 Subject: [PATCH] Fix taskInstance's host is not worker nettyServer address (#10926) * Fix taskInstance's host is not worker nettyServer address * Remove unnecessary mock --- .../master/processor/TaskExecuteResponseProcessor.java | 4 +++- .../master/processor/TaskExecuteRunningProcessor.java | 4 +++- .../server/master/processor/queue/TaskEvent.java | 9 ++++----- .../processor/queue/TaskResponseServiceTest.java | 10 ++++++---- 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java index 7f438e1d9c..775aded34a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java @@ -60,7 +60,9 @@ public class TaskExecuteResponseProcessor implements NettyRequestProcessor { TaskExecuteResultCommand taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteResultCommand.class); - TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel); + TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, + channel, + taskExecuteResultMessage.getMessageSenderAddress()); try { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(), taskResultEvent.getTaskInstanceId()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java index 96ff1ca405..47a66a71f0 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java @@ -57,7 +57,9 @@ public class TaskExecuteRunningProcessor implements NettyRequestProcessor { TaskExecuteRunningCommand taskExecuteRunningMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class); logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningMessage); - TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel); + TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, + channel, + taskExecuteRunningMessage.getMessageSenderAddress()); taskEventService.addEvent(taskEvent); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java index e383cad612..a2cee4986c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java @@ -22,7 +22,6 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; import org.apache.dolphinscheduler.remote.command.TaskRejectCommand; -import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import java.util.Date; @@ -106,7 +105,7 @@ public class TaskEvent { return event; } - public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command, Channel channel) { + public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command, Channel channel, String workerAddress) { TaskEvent event = new TaskEvent(); event.setProcessInstanceId(command.getProcessInstanceId()); event.setTaskInstanceId(command.getTaskInstanceId()); @@ -115,12 +114,12 @@ public class TaskEvent { event.setExecutePath(command.getExecutePath()); event.setLogPath(command.getLogPath()); event.setChannel(channel); - event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress()); + event.setWorkerAddress(workerAddress); event.setEvent(TaskEventType.RUNNING); return event; } - public static TaskEvent newResultEvent(TaskExecuteResultCommand command, Channel channel) { + public static TaskEvent newResultEvent(TaskExecuteResultCommand command, Channel channel, String workerAddress) { TaskEvent event = new TaskEvent(); event.setProcessInstanceId(command.getProcessInstanceId()); event.setTaskInstanceId(command.getTaskInstanceId()); @@ -133,7 +132,7 @@ public class TaskEvent { event.setAppIds(command.getAppIds()); event.setVarPool(command.getVarPool()); event.setChannel(channel); - event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress()); + event.setWorkerAddress(workerAddress); event.setEvent(TaskEventType.RESULT); return event; } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index 3854ad77b0..6f4fe9ceb7 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -75,8 +75,6 @@ public class TaskResponseServiceTest { public void before() { taskEventService.start(); - Mockito.when(channel.remoteAddress()).thenReturn(InetSocketAddress.createUnresolved("127.0.0.1", 1234)); - TaskExecuteRunningCommand taskExecuteRunningMessage = new TaskExecuteRunningCommand("127.0.0.1:5678", "127.0.0.1:1234", System.currentTimeMillis()); @@ -88,7 +86,9 @@ public class TaskResponseServiceTest { taskExecuteRunningMessage.setHost("127.*.*.*"); taskExecuteRunningMessage.setStartTime(new Date()); - ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel); + ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, + channel, + taskExecuteRunningMessage.getMessageSenderAddress()); TaskExecuteResultCommand taskExecuteResultMessage = new TaskExecuteResultCommand(NetUtils.getAddr(1234), NetUtils.getAddr(5678), @@ -100,7 +100,9 @@ public class TaskResponseServiceTest { taskExecuteResultMessage.setVarPool("varPol"); taskExecuteResultMessage.setAppIds("ids"); taskExecuteResultMessage.setProcessId(1); - resultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel); + resultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, + channel, + taskExecuteResultMessage.getMessageSenderAddress()); taskInstance = new TaskInstance(); taskInstance.setId(22);