From 742f0c3e60187efb515977aaf2df0a38119840a3 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Tue, 29 Aug 2023 13:32:17 +0800 Subject: [PATCH] Add workflowInstanceHost and taskInstanceHost in ITaskInstanceExecutionEvent (#14827) --- .../master/transportor/ITaskInstanceExecutionEvent.java | 8 +++++++- .../transportor/TaskInstanceExecutionFinishEvent.java | 4 +++- .../transportor/TaskInstanceExecutionInfoEvent.java | 4 +++- .../transportor/TaskInstanceExecutionRunningEvent.java | 4 +++- .../server/master/processor/queue/TaskEvent.java | 6 +++--- .../LogicTaskInstanceExecuteRunningEventSender.java | 5 +++-- .../LogicTaskInstanceExecutionFinishEventSender.java | 5 +++-- .../server/worker/message/MessageRetryRunner.java | 2 +- .../message/TaskInstanceExecutionFinishEventSender.java | 5 +++-- .../TaskInstanceExecutionInfoUpdateEventSender.java | 5 +++-- .../message/TaskInstanceExecutionRunningEventSender.java | 5 +++-- 11 files changed, 35 insertions(+), 18 deletions(-) diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/ITaskInstanceExecutionEvent.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/ITaskInstanceExecutionEvent.java index 3fa5ace8ce..25b7b7e727 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/ITaskInstanceExecutionEvent.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/ITaskInstanceExecutionEvent.java @@ -27,7 +27,13 @@ public interface ITaskInstanceExecutionEvent { void setEventSendTime(long eventSendTime); - void setHost(String host); + void setWorkflowInstanceHost(String host); + + String getWorkflowInstanceHost(); + + void setTaskInstanceHost(String host); + + String getTaskInstanceHost(); TaskInstanceExecutionEventType getEventType(); diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionFinishEvent.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionFinishEvent.java index 33ac1280d0..469412c66e 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionFinishEvent.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionFinishEvent.java @@ -34,7 +34,9 @@ public class TaskInstanceExecutionFinishEvent implements ITaskInstanceExecutionE private long startTime; - private String host; + private String taskInstanceHost; + + private String workflowInstanceHost; private String logPath; diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionInfoEvent.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionInfoEvent.java index 22039a9e70..1623e03e92 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionInfoEvent.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionInfoEvent.java @@ -32,7 +32,9 @@ public class TaskInstanceExecutionInfoEvent implements ITaskInstanceExecutionEve private long startTime; - private String host; + private String workflowInstanceHost; + + private String taskInstanceHost; private String logPath; diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionRunningEvent.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionRunningEvent.java index 023d953ad8..4c48a0a381 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionRunningEvent.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionRunningEvent.java @@ -34,7 +34,9 @@ public class TaskInstanceExecutionRunningEvent implements ITaskInstanceExecution private long startTime; - private String host; + private String taskInstanceHost; + + private String workflowInstanceHost; private TaskExecutionStatus status; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java index e55dfbdd5d..be21148ef6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java @@ -114,7 +114,7 @@ public class TaskEvent { event.setExecutePath(command.getExecutePath()); event.setLogPath(command.getLogPath()); event.setAppIds(command.getAppIds()); - event.setWorkerAddress(command.getHost()); + event.setWorkerAddress(command.getTaskInstanceHost()); event.setEvent(TaskEventType.RUNNING); return event; } @@ -131,7 +131,7 @@ public class TaskEvent { event.setProcessId(command.getProcessId()); event.setAppIds(command.getAppIds()); event.setVarPool(command.getVarPool()); - event.setWorkerAddress(command.getHost()); + event.setWorkerAddress(command.getTaskInstanceHost()); event.setEvent(TaskEventType.RESULT); return event; } @@ -151,7 +151,7 @@ public class TaskEvent { event.setTaskInstanceId(command.getTaskInstanceId()); event.setStartTime(DateUtils.timeStampToDate(command.getStartTime())); event.setLogPath(command.getLogPath()); - event.setWorkerAddress(command.getHost()); + event.setWorkerAddress(command.getTaskInstanceHost()); event.setEvent(TaskEventType.UPDATE_PID); return event; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java index 6ea085b31b..9b22486efe 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java @@ -35,7 +35,7 @@ public class LogicTaskInstanceExecuteRunningEventSender public void sendMessage(TaskInstanceExecutionRunningEvent taskInstanceExecutionRunningEvent) { ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(taskInstanceExecutionRunningEvent.getHost(), + .getProxyClient(taskInstanceExecutionRunningEvent.getWorkflowInstanceHost(), ITaskInstanceExecutionEventListener.class); iTaskInstanceExecutionEventListener.onTaskInstanceExecutionRunning(taskInstanceExecutionRunningEvent); } @@ -47,7 +47,8 @@ public class LogicTaskInstanceExecuteRunningEventSender taskExecuteRunningMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId()); taskExecuteRunningMessage.setStatus(taskExecutionContext.getCurrentExecutionStatus()); taskExecuteRunningMessage.setLogPath(taskExecutionContext.getLogPath()); - taskExecuteRunningMessage.setHost(taskExecutionContext.getHost()); + taskExecuteRunningMessage.setWorkflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost()); + taskExecuteRunningMessage.setTaskInstanceHost(taskExecutionContext.getHost()); taskExecuteRunningMessage.setStartTime(taskExecutionContext.getStartTime()); taskExecuteRunningMessage.setExecutePath(taskExecutionContext.getExecutePath()); taskExecuteRunningMessage.setAppIds(taskExecutionContext.getAppIds()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java index 1949145e86..871a087358 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java @@ -33,7 +33,7 @@ public class LogicTaskInstanceExecutionFinishEventSender public void sendMessage(TaskInstanceExecutionFinishEvent message) { ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(message.getHost(), ITaskInstanceExecutionEventListener.class); + .getProxyClient(message.getWorkflowInstanceHost(), ITaskInstanceExecutionEventListener.class); iTaskInstanceExecutionEventListener.onTaskInstanceExecutionFinish(message); } @@ -47,7 +47,8 @@ public class LogicTaskInstanceExecutionFinishEventSender taskExecuteResultMessage.setExecutePath(taskExecutionContext.getExecutePath()); taskExecuteResultMessage.setAppIds(taskExecutionContext.getAppIds()); taskExecuteResultMessage.setProcessId(taskExecutionContext.getProcessId()); - taskExecuteResultMessage.setHost(taskExecutionContext.getHost()); + taskExecuteResultMessage.setWorkflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost()); + taskExecuteResultMessage.setTaskInstanceHost(taskExecutionContext.getHost()); taskExecuteResultMessage.setStartTime(taskExecutionContext.getStartTime()); taskExecuteResultMessage.setEndTime(taskExecutionContext.getEndTime()); taskExecuteResultMessage.setVarPool(taskExecutionContext.getVarPool()); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java index b5a388312b..2b9ce2ec5f 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java @@ -96,7 +96,7 @@ public class MessageRetryRunner extends BaseDaemonThread { List taskInstanceMessages = this.needToRetryMessages.get(taskInstanceId); if (taskInstanceMessages != null) { taskInstanceMessages.forEach(taskInstanceMessage -> { - taskInstanceMessage.getEvent().setHost(messageReceiverHost); + taskInstanceMessage.getEvent().setWorkflowInstanceHost(messageReceiverHost); }); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java index 9469ba1a69..ae371ec4bf 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java @@ -34,7 +34,7 @@ public class TaskInstanceExecutionFinishEventSender public void sendEvent(TaskInstanceExecutionFinishEvent taskInstanceExecutionFinishEvent) { ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(taskInstanceExecutionFinishEvent.getHost(), + .getProxyClient(taskInstanceExecutionFinishEvent.getWorkflowInstanceHost(), ITaskInstanceExecutionEventListener.class); iTaskInstanceExecutionEventListener.onTaskInstanceExecutionFinish(taskInstanceExecutionFinishEvent); } @@ -49,7 +49,8 @@ public class TaskInstanceExecutionFinishEventSender taskExecuteResultMessage.setExecutePath(taskExecutionContext.getExecutePath()); taskExecuteResultMessage.setAppIds(taskExecutionContext.getAppIds()); taskExecuteResultMessage.setProcessId(taskExecutionContext.getProcessId()); - taskExecuteResultMessage.setHost(taskExecutionContext.getHost()); + taskExecuteResultMessage.setWorkflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost()); + taskExecuteResultMessage.setTaskInstanceHost(taskExecutionContext.getHost()); taskExecuteResultMessage.setStartTime(taskExecutionContext.getStartTime()); taskExecuteResultMessage.setEndTime(taskExecutionContext.getEndTime()); taskExecuteResultMessage.setVarPool(taskExecutionContext.getVarPool()); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java index 4b9e7e76b0..62649ae445 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java @@ -36,7 +36,7 @@ public class TaskInstanceExecutionInfoUpdateEventSender public void sendEvent(TaskInstanceExecutionInfoEvent taskInstanceExecutionInfoEvent) { ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(taskInstanceExecutionInfoEvent.getHost(), + .getProxyClient(taskInstanceExecutionInfoEvent.getWorkflowInstanceHost(), ITaskInstanceExecutionEventListener.class); iTaskInstanceExecutionEventListener.onTaskInstanceExecutionInfoUpdate(taskInstanceExecutionInfoEvent); } @@ -46,7 +46,8 @@ public class TaskInstanceExecutionInfoUpdateEventSender TaskInstanceExecutionInfoEvent taskUpdatePidRequest = new TaskInstanceExecutionInfoEvent(); taskUpdatePidRequest.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskUpdatePidRequest.setProcessInstanceId(taskExecutionContext.getProcessInstanceId()); - taskUpdatePidRequest.setHost(taskExecutionContext.getHost()); + taskUpdatePidRequest.setWorkflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost()); + taskUpdatePidRequest.setTaskInstanceHost(taskExecutionContext.getHost()); taskUpdatePidRequest.setStartTime(taskExecutionContext.getStartTime()); return taskUpdatePidRequest; } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java index 4f64a94002..8ae8ca85cb 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java @@ -36,7 +36,7 @@ public class TaskInstanceExecutionRunningEventSender public void sendEvent(TaskInstanceExecutionRunningEvent taskInstanceExecutionRunningEvent) { ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(taskInstanceExecutionRunningEvent.getHost(), + .getProxyClient(taskInstanceExecutionRunningEvent.getWorkflowInstanceHost(), ITaskInstanceExecutionEventListener.class); iTaskInstanceExecutionEventListener.onTaskInstanceExecutionRunning(taskInstanceExecutionRunningEvent); } @@ -48,7 +48,8 @@ public class TaskInstanceExecutionRunningEventSender taskExecuteRunningMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId()); taskExecuteRunningMessage.setStatus(taskExecutionContext.getCurrentExecutionStatus()); taskExecuteRunningMessage.setLogPath(taskExecutionContext.getLogPath()); - taskExecuteRunningMessage.setHost(taskExecutionContext.getHost()); + taskExecuteRunningMessage.setWorkflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost()); + taskExecuteRunningMessage.setTaskInstanceHost(taskExecutionContext.getHost()); taskExecuteRunningMessage.setStartTime(taskExecutionContext.getStartTime()); taskExecuteRunningMessage.setExecutePath(taskExecutionContext.getExecutePath()); taskExecuteRunningMessage.setAppIds(taskExecutionContext.getAppIds());