Browse Source

Fix taskInstance's host is not worker nettyServer address (#10926)

* Fix taskInstance's host is not worker nettyServer address

* Remove unnecessary mock

(cherry picked from commit df0416c193)
3.0.0/version-upgrade
Wenjun Ruan 2 years ago
parent
commit
812d7a8f26
  1. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
  2. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
  3. 9
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
  4. 10
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java

@ -60,7 +60,9 @@ public class TaskExecuteResponseProcessor implements NettyRequestProcessor {
TaskExecuteResultCommand taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteResultCommand taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(),
TaskExecuteResultCommand.class); TaskExecuteResultCommand.class);
TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel); TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage,
channel,
taskExecuteResultMessage.getMessageSenderAddress());
try { try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(), LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(),
taskResultEvent.getTaskInstanceId()); taskResultEvent.getTaskInstanceId());

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java

@ -57,7 +57,9 @@ public class TaskExecuteRunningProcessor implements NettyRequestProcessor {
TaskExecuteRunningCommand taskExecuteRunningMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class); TaskExecuteRunningCommand taskExecuteRunningMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningMessage); logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningMessage);
TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel); TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage,
channel,
taskExecuteRunningMessage.getMessageSenderAddress());
taskEventService.addEvent(taskEvent); taskEventService.addEvent(taskEvent);
} }

9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import java.util.Date; import java.util.Date;
@ -105,7 +104,7 @@ public class TaskEvent {
return event; return event;
} }
public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command, Channel channel) { public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command, Channel channel, String workerAddress) {
TaskEvent event = new TaskEvent(); TaskEvent event = new TaskEvent();
event.setProcessInstanceId(command.getProcessInstanceId()); event.setProcessInstanceId(command.getProcessInstanceId());
event.setTaskInstanceId(command.getTaskInstanceId()); event.setTaskInstanceId(command.getTaskInstanceId());
@ -114,12 +113,12 @@ public class TaskEvent {
event.setExecutePath(command.getExecutePath()); event.setExecutePath(command.getExecutePath());
event.setLogPath(command.getLogPath()); event.setLogPath(command.getLogPath());
event.setChannel(channel); event.setChannel(channel);
event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress()); event.setWorkerAddress(workerAddress);
event.setEvent(TaskEventType.RUNNING); event.setEvent(TaskEventType.RUNNING);
return event; return event;
} }
public static TaskEvent newResultEvent(TaskExecuteResultCommand command, Channel channel) { public static TaskEvent newResultEvent(TaskExecuteResultCommand command, Channel channel, String workerAddress) {
TaskEvent event = new TaskEvent(); TaskEvent event = new TaskEvent();
event.setProcessInstanceId(command.getProcessInstanceId()); event.setProcessInstanceId(command.getProcessInstanceId());
event.setTaskInstanceId(command.getTaskInstanceId()); event.setTaskInstanceId(command.getTaskInstanceId());
@ -132,7 +131,7 @@ public class TaskEvent {
event.setAppIds(command.getAppIds()); event.setAppIds(command.getAppIds());
event.setVarPool(command.getVarPool()); event.setVarPool(command.getVarPool());
event.setChannel(channel); event.setChannel(channel);
event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress()); event.setWorkerAddress(workerAddress);
event.setEvent(TaskEventType.RESULT); event.setEvent(TaskEventType.RESULT);
return event; return event;
} }

10
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java

@ -75,8 +75,6 @@ public class TaskResponseServiceTest {
public void before() { public void before() {
taskEventService.start(); taskEventService.start();
Mockito.when(channel.remoteAddress()).thenReturn(InetSocketAddress.createUnresolved("127.0.0.1", 1234));
TaskExecuteRunningCommand taskExecuteRunningMessage = new TaskExecuteRunningCommand("127.0.0.1:5678", TaskExecuteRunningCommand taskExecuteRunningMessage = new TaskExecuteRunningCommand("127.0.0.1:5678",
"127.0.0.1:1234", "127.0.0.1:1234",
System.currentTimeMillis()); System.currentTimeMillis());
@ -88,7 +86,9 @@ public class TaskResponseServiceTest {
taskExecuteRunningMessage.setHost("127.*.*.*"); taskExecuteRunningMessage.setHost("127.*.*.*");
taskExecuteRunningMessage.setStartTime(new Date()); taskExecuteRunningMessage.setStartTime(new Date());
ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel); ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage,
channel,
taskExecuteRunningMessage.getMessageSenderAddress());
TaskExecuteResultCommand taskExecuteResultMessage = new TaskExecuteResultCommand(NetUtils.getAddr(1234), TaskExecuteResultCommand taskExecuteResultMessage = new TaskExecuteResultCommand(NetUtils.getAddr(1234),
NetUtils.getAddr(5678), NetUtils.getAddr(5678),
@ -100,7 +100,9 @@ public class TaskResponseServiceTest {
taskExecuteResultMessage.setVarPool("varPol"); taskExecuteResultMessage.setVarPool("varPol");
taskExecuteResultMessage.setAppIds("ids"); taskExecuteResultMessage.setAppIds("ids");
taskExecuteResultMessage.setProcessId(1); taskExecuteResultMessage.setProcessId(1);
resultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel); resultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage,
channel,
taskExecuteResultMessage.getMessageSenderAddress());
taskInstance = new TaskInstance(); taskInstance = new TaskInstance();
taskInstance.setId(22); taskInstance.setId(22);

Loading…
Cancel
Save