Browse Source

Add workflowInstanceHost and taskInstanceHost in ITaskInstanceExecutionEvent (#14827)

3.2.1-prepare
Wenjun Ruan 1 year ago committed by GitHub
parent
commit
742f0c3e60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/ITaskInstanceExecutionEvent.java
  2. 4
      dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionFinishEvent.java
  3. 4
      dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionInfoEvent.java
  4. 4
      dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionRunningEvent.java
  5. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
  6. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java
  7. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java
  8. 2
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
  9. 5
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java
  10. 5
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java
  11. 5
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java

8
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/ITaskInstanceExecutionEvent.java

@ -27,7 +27,13 @@ public interface ITaskInstanceExecutionEvent {
void setEventSendTime(long eventSendTime); void setEventSendTime(long eventSendTime);
void setHost(String host); void setWorkflowInstanceHost(String host);
String getWorkflowInstanceHost();
void setTaskInstanceHost(String host);
String getTaskInstanceHost();
TaskInstanceExecutionEventType getEventType(); TaskInstanceExecutionEventType getEventType();

4
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionFinishEvent.java

@ -34,7 +34,9 @@ public class TaskInstanceExecutionFinishEvent implements ITaskInstanceExecutionE
private long startTime; private long startTime;
private String host; private String taskInstanceHost;
private String workflowInstanceHost;
private String logPath; private String logPath;

4
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionInfoEvent.java

@ -32,7 +32,9 @@ public class TaskInstanceExecutionInfoEvent implements ITaskInstanceExecutionEve
private long startTime; private long startTime;
private String host; private String workflowInstanceHost;
private String taskInstanceHost;
private String logPath; private String logPath;

4
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceExecutionRunningEvent.java

@ -34,7 +34,9 @@ public class TaskInstanceExecutionRunningEvent implements ITaskInstanceExecution
private long startTime; private long startTime;
private String host; private String taskInstanceHost;
private String workflowInstanceHost;
private TaskExecutionStatus status; private TaskExecutionStatus status;

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java

@ -114,7 +114,7 @@ public class TaskEvent {
event.setExecutePath(command.getExecutePath()); event.setExecutePath(command.getExecutePath());
event.setLogPath(command.getLogPath()); event.setLogPath(command.getLogPath());
event.setAppIds(command.getAppIds()); event.setAppIds(command.getAppIds());
event.setWorkerAddress(command.getHost()); event.setWorkerAddress(command.getTaskInstanceHost());
event.setEvent(TaskEventType.RUNNING); event.setEvent(TaskEventType.RUNNING);
return event; return event;
} }
@ -131,7 +131,7 @@ public class TaskEvent {
event.setProcessId(command.getProcessId()); event.setProcessId(command.getProcessId());
event.setAppIds(command.getAppIds()); event.setAppIds(command.getAppIds());
event.setVarPool(command.getVarPool()); event.setVarPool(command.getVarPool());
event.setWorkerAddress(command.getHost()); event.setWorkerAddress(command.getTaskInstanceHost());
event.setEvent(TaskEventType.RESULT); event.setEvent(TaskEventType.RESULT);
return event; return event;
} }
@ -151,7 +151,7 @@ public class TaskEvent {
event.setTaskInstanceId(command.getTaskInstanceId()); event.setTaskInstanceId(command.getTaskInstanceId());
event.setStartTime(DateUtils.timeStampToDate(command.getStartTime())); event.setStartTime(DateUtils.timeStampToDate(command.getStartTime()));
event.setLogPath(command.getLogPath()); event.setLogPath(command.getLogPath());
event.setWorkerAddress(command.getHost()); event.setWorkerAddress(command.getTaskInstanceHost());
event.setEvent(TaskEventType.UPDATE_PID); event.setEvent(TaskEventType.UPDATE_PID);
return event; return event;
} }

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java

@ -35,7 +35,7 @@ public class LogicTaskInstanceExecuteRunningEventSender
public void sendMessage(TaskInstanceExecutionRunningEvent taskInstanceExecutionRunningEvent) { public void sendMessage(TaskInstanceExecutionRunningEvent taskInstanceExecutionRunningEvent) {
ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener =
SingletonJdkDynamicRpcClientProxyFactory SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstanceExecutionRunningEvent.getHost(), .getProxyClient(taskInstanceExecutionRunningEvent.getWorkflowInstanceHost(),
ITaskInstanceExecutionEventListener.class); ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionRunning(taskInstanceExecutionRunningEvent); iTaskInstanceExecutionEventListener.onTaskInstanceExecutionRunning(taskInstanceExecutionRunningEvent);
} }
@ -47,7 +47,8 @@ public class LogicTaskInstanceExecuteRunningEventSender
taskExecuteRunningMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId()); taskExecuteRunningMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
taskExecuteRunningMessage.setStatus(taskExecutionContext.getCurrentExecutionStatus()); taskExecuteRunningMessage.setStatus(taskExecutionContext.getCurrentExecutionStatus());
taskExecuteRunningMessage.setLogPath(taskExecutionContext.getLogPath()); taskExecuteRunningMessage.setLogPath(taskExecutionContext.getLogPath());
taskExecuteRunningMessage.setHost(taskExecutionContext.getHost()); taskExecuteRunningMessage.setWorkflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost());
taskExecuteRunningMessage.setTaskInstanceHost(taskExecutionContext.getHost());
taskExecuteRunningMessage.setStartTime(taskExecutionContext.getStartTime()); taskExecuteRunningMessage.setStartTime(taskExecutionContext.getStartTime());
taskExecuteRunningMessage.setExecutePath(taskExecutionContext.getExecutePath()); taskExecuteRunningMessage.setExecutePath(taskExecutionContext.getExecutePath());
taskExecuteRunningMessage.setAppIds(taskExecutionContext.getAppIds()); taskExecuteRunningMessage.setAppIds(taskExecutionContext.getAppIds());

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java

@ -33,7 +33,7 @@ public class LogicTaskInstanceExecutionFinishEventSender
public void sendMessage(TaskInstanceExecutionFinishEvent message) { public void sendMessage(TaskInstanceExecutionFinishEvent message) {
ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener =
SingletonJdkDynamicRpcClientProxyFactory SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(message.getHost(), ITaskInstanceExecutionEventListener.class); .getProxyClient(message.getWorkflowInstanceHost(), ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionFinish(message); iTaskInstanceExecutionEventListener.onTaskInstanceExecutionFinish(message);
} }
@ -47,7 +47,8 @@ public class LogicTaskInstanceExecutionFinishEventSender
taskExecuteResultMessage.setExecutePath(taskExecutionContext.getExecutePath()); taskExecuteResultMessage.setExecutePath(taskExecutionContext.getExecutePath());
taskExecuteResultMessage.setAppIds(taskExecutionContext.getAppIds()); taskExecuteResultMessage.setAppIds(taskExecutionContext.getAppIds());
taskExecuteResultMessage.setProcessId(taskExecutionContext.getProcessId()); taskExecuteResultMessage.setProcessId(taskExecutionContext.getProcessId());
taskExecuteResultMessage.setHost(taskExecutionContext.getHost()); taskExecuteResultMessage.setWorkflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost());
taskExecuteResultMessage.setTaskInstanceHost(taskExecutionContext.getHost());
taskExecuteResultMessage.setStartTime(taskExecutionContext.getStartTime()); taskExecuteResultMessage.setStartTime(taskExecutionContext.getStartTime());
taskExecuteResultMessage.setEndTime(taskExecutionContext.getEndTime()); taskExecuteResultMessage.setEndTime(taskExecutionContext.getEndTime());
taskExecuteResultMessage.setVarPool(taskExecutionContext.getVarPool()); taskExecuteResultMessage.setVarPool(taskExecutionContext.getVarPool());

2
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java

@ -96,7 +96,7 @@ public class MessageRetryRunner extends BaseDaemonThread {
List<TaskInstanceMessage> taskInstanceMessages = this.needToRetryMessages.get(taskInstanceId); List<TaskInstanceMessage> taskInstanceMessages = this.needToRetryMessages.get(taskInstanceId);
if (taskInstanceMessages != null) { if (taskInstanceMessages != null) {
taskInstanceMessages.forEach(taskInstanceMessage -> { taskInstanceMessages.forEach(taskInstanceMessage -> {
taskInstanceMessage.getEvent().setHost(messageReceiverHost); taskInstanceMessage.getEvent().setWorkflowInstanceHost(messageReceiverHost);
}); });
} }
} }

5
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java

@ -34,7 +34,7 @@ public class TaskInstanceExecutionFinishEventSender
public void sendEvent(TaskInstanceExecutionFinishEvent taskInstanceExecutionFinishEvent) { public void sendEvent(TaskInstanceExecutionFinishEvent taskInstanceExecutionFinishEvent) {
ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener =
SingletonJdkDynamicRpcClientProxyFactory SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstanceExecutionFinishEvent.getHost(), .getProxyClient(taskInstanceExecutionFinishEvent.getWorkflowInstanceHost(),
ITaskInstanceExecutionEventListener.class); ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionFinish(taskInstanceExecutionFinishEvent); iTaskInstanceExecutionEventListener.onTaskInstanceExecutionFinish(taskInstanceExecutionFinishEvent);
} }
@ -49,7 +49,8 @@ public class TaskInstanceExecutionFinishEventSender
taskExecuteResultMessage.setExecutePath(taskExecutionContext.getExecutePath()); taskExecuteResultMessage.setExecutePath(taskExecutionContext.getExecutePath());
taskExecuteResultMessage.setAppIds(taskExecutionContext.getAppIds()); taskExecuteResultMessage.setAppIds(taskExecutionContext.getAppIds());
taskExecuteResultMessage.setProcessId(taskExecutionContext.getProcessId()); taskExecuteResultMessage.setProcessId(taskExecutionContext.getProcessId());
taskExecuteResultMessage.setHost(taskExecutionContext.getHost()); taskExecuteResultMessage.setWorkflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost());
taskExecuteResultMessage.setTaskInstanceHost(taskExecutionContext.getHost());
taskExecuteResultMessage.setStartTime(taskExecutionContext.getStartTime()); taskExecuteResultMessage.setStartTime(taskExecutionContext.getStartTime());
taskExecuteResultMessage.setEndTime(taskExecutionContext.getEndTime()); taskExecuteResultMessage.setEndTime(taskExecutionContext.getEndTime());
taskExecuteResultMessage.setVarPool(taskExecutionContext.getVarPool()); taskExecuteResultMessage.setVarPool(taskExecutionContext.getVarPool());

5
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java

@ -36,7 +36,7 @@ public class TaskInstanceExecutionInfoUpdateEventSender
public void sendEvent(TaskInstanceExecutionInfoEvent taskInstanceExecutionInfoEvent) { public void sendEvent(TaskInstanceExecutionInfoEvent taskInstanceExecutionInfoEvent) {
ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener =
SingletonJdkDynamicRpcClientProxyFactory SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstanceExecutionInfoEvent.getHost(), .getProxyClient(taskInstanceExecutionInfoEvent.getWorkflowInstanceHost(),
ITaskInstanceExecutionEventListener.class); ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionInfoUpdate(taskInstanceExecutionInfoEvent); iTaskInstanceExecutionEventListener.onTaskInstanceExecutionInfoUpdate(taskInstanceExecutionInfoEvent);
} }
@ -46,7 +46,8 @@ public class TaskInstanceExecutionInfoUpdateEventSender
TaskInstanceExecutionInfoEvent taskUpdatePidRequest = new TaskInstanceExecutionInfoEvent(); TaskInstanceExecutionInfoEvent taskUpdatePidRequest = new TaskInstanceExecutionInfoEvent();
taskUpdatePidRequest.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskUpdatePidRequest.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskUpdatePidRequest.setProcessInstanceId(taskExecutionContext.getProcessInstanceId()); taskUpdatePidRequest.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
taskUpdatePidRequest.setHost(taskExecutionContext.getHost()); taskUpdatePidRequest.setWorkflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost());
taskUpdatePidRequest.setTaskInstanceHost(taskExecutionContext.getHost());
taskUpdatePidRequest.setStartTime(taskExecutionContext.getStartTime()); taskUpdatePidRequest.setStartTime(taskExecutionContext.getStartTime());
return taskUpdatePidRequest; return taskUpdatePidRequest;
} }

5
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java

@ -36,7 +36,7 @@ public class TaskInstanceExecutionRunningEventSender
public void sendEvent(TaskInstanceExecutionRunningEvent taskInstanceExecutionRunningEvent) { public void sendEvent(TaskInstanceExecutionRunningEvent taskInstanceExecutionRunningEvent) {
ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener =
SingletonJdkDynamicRpcClientProxyFactory SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstanceExecutionRunningEvent.getHost(), .getProxyClient(taskInstanceExecutionRunningEvent.getWorkflowInstanceHost(),
ITaskInstanceExecutionEventListener.class); ITaskInstanceExecutionEventListener.class);
iTaskInstanceExecutionEventListener.onTaskInstanceExecutionRunning(taskInstanceExecutionRunningEvent); iTaskInstanceExecutionEventListener.onTaskInstanceExecutionRunning(taskInstanceExecutionRunningEvent);
} }
@ -48,7 +48,8 @@ public class TaskInstanceExecutionRunningEventSender
taskExecuteRunningMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId()); taskExecuteRunningMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
taskExecuteRunningMessage.setStatus(taskExecutionContext.getCurrentExecutionStatus()); taskExecuteRunningMessage.setStatus(taskExecutionContext.getCurrentExecutionStatus());
taskExecuteRunningMessage.setLogPath(taskExecutionContext.getLogPath()); taskExecuteRunningMessage.setLogPath(taskExecutionContext.getLogPath());
taskExecuteRunningMessage.setHost(taskExecutionContext.getHost()); taskExecuteRunningMessage.setWorkflowInstanceHost(taskExecutionContext.getWorkflowInstanceHost());
taskExecuteRunningMessage.setTaskInstanceHost(taskExecutionContext.getHost());
taskExecuteRunningMessage.setStartTime(taskExecutionContext.getStartTime()); taskExecuteRunningMessage.setStartTime(taskExecutionContext.getStartTime());
taskExecuteRunningMessage.setExecutePath(taskExecutionContext.getExecutePath()); taskExecuteRunningMessage.setExecutePath(taskExecutionContext.getExecutePath());
taskExecuteRunningMessage.setAppIds(taskExecutionContext.getAppIds()); taskExecuteRunningMessage.setAppIds(taskExecutionContext.getAppIds());

Loading…
Cancel
Save