diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java index 25fa88dd00..ec3382aa3f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java @@ -109,7 +109,7 @@ public class TaskDelayEventHandler implements TaskEventHandler { private void sendAckToWorker(TaskEvent taskEvent) { // If event handle success, send ack to worker to otherwise the worker will retry this event TaskExecuteRunningAckMessage taskExecuteRunningAckMessage = - new TaskExecuteRunningAckMessage(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); + new TaskExecuteRunningAckMessage(ExecutionStatus.SUCCESS, taskEvent.getTaskInstanceId()); taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckMessage.convert2Command()); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java index ac4dab50f2..ab63ada0ed 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java @@ -69,7 +69,7 @@ public class TaskRejectByWorkerEventHandler implements TaskEventHandler { } public void sendAckToWorker(TaskEvent taskEvent) { - TaskRejectAckCommand taskRejectAckMessage = new TaskRejectAckCommand(ExecutionStatus.SUCCESS.getCode(), + TaskRejectAckCommand taskRejectAckMessage = new TaskRejectAckCommand(ExecutionStatus.SUCCESS, taskEvent.getTaskInstanceId(), masterConfig.getMasterAddress(), taskEvent.getWorkerAddress(), diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java index bd0708e26c..b50a81311d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java @@ -110,7 +110,7 @@ public class TaskResultEventHandler implements TaskEventHandler { public void sendAckToWorker(TaskEvent taskEvent) { // we didn't set the receiver address, since the ack doen's need to retry - TaskExecuteAckCommand taskExecuteAckMessage = new TaskExecuteAckCommand(ExecutionStatus.SUCCESS.getCode(), + TaskExecuteAckCommand taskExecuteAckMessage = new TaskExecuteAckCommand(ExecutionStatus.SUCCESS, taskEvent.getTaskInstanceId(), masterConfig.getMasterAddress(), taskEvent.getWorkerAddress(), diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java index d271839596..a4de2d4b44 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java @@ -107,7 +107,7 @@ public class TaskRunningEventHandler implements TaskEventHandler { private void sendAckToWorker(TaskEvent taskEvent) { // If event handle success, send ack to worker to otherwise the worker will retry this event TaskExecuteRunningAckMessage taskExecuteRunningAckMessage = - new TaskExecuteRunningAckMessage(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); + new TaskExecuteRunningAckMessage(ExecutionStatus.SUCCESS, taskEvent.getTaskInstanceId()); taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckMessage.convert2Command()); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java index 80c90ee1b7..a09d661ebd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java @@ -136,7 +136,7 @@ public class StateEventResponseService { private void writeResponse(StateEvent stateEvent, ExecutionStatus status) { Channel channel = stateEvent.getChannel(); if (channel != null) { - StateEventResponseCommand command = new StateEventResponseCommand(status.getCode(), stateEvent.getKey()); + StateEventResponseCommand command = new StateEventResponseCommand(status, stateEvent.getKey()); channel.writeAndFlush(command.convert2Command()); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java index a2cee4986c..1a367a96a2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java @@ -109,7 +109,7 @@ public class TaskEvent { TaskEvent event = new TaskEvent(); event.setProcessInstanceId(command.getProcessInstanceId()); event.setTaskInstanceId(command.getTaskInstanceId()); - event.setState(ExecutionStatus.of(command.getStatus())); + event.setState(command.getStatus()); event.setStartTime(command.getStartTime()); event.setExecutePath(command.getExecutePath()); event.setLogPath(command.getLogPath()); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java index 8c15cd9cbe..0f3d1431da 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.processor; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; @@ -66,7 +67,7 @@ public class TaskAckProcessorTest { taskExecuteRunningMessage = new TaskExecuteRunningCommand("127.0.0.1:5678", " 127.0.0.1:1234", System.currentTimeMillis()); - taskExecuteRunningMessage.setStatus(1); + taskExecuteRunningMessage.setStatus(ExecutionStatus.RUNNING_EXECUTION); taskExecuteRunningMessage.setExecutePath("/dolphinscheduler/worker"); taskExecuteRunningMessage.setHost("localhost"); taskExecuteRunningMessage.setLogPath("/temp/worker.log"); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java index c7f047569e..50c1231348 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.processor; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; @@ -50,7 +51,7 @@ public class TaskKillResponseProcessorTest { new ArrayList() {{ add("task_1"); }}); taskKillResponseCommand.setHost("localhost"); taskKillResponseCommand.setProcessId(1); - taskKillResponseCommand.setStatus(1); + taskKillResponseCommand.setStatus(ExecutionStatus.RUNNING_EXECUTION); taskKillResponseCommand.setTaskInstanceId(1); } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index f87dd33689..5df4d918b4 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -78,7 +78,7 @@ public class TaskResponseServiceTest { System.currentTimeMillis()); taskExecuteRunningMessage.setProcessId(1); taskExecuteRunningMessage.setTaskInstanceId(22); - taskExecuteRunningMessage.setStatus(ExecutionStatus.RUNNING_EXECUTION.getCode()); + taskExecuteRunningMessage.setStatus(ExecutionStatus.RUNNING_EXECUTION); taskExecuteRunningMessage.setExecutePath("path"); taskExecuteRunningMessage.setLogPath("logPath"); taskExecuteRunningMessage.setHost("127.*.*.*"); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java index 5500c744bd..c080ce94f1 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import java.io.Serializable; @@ -27,22 +28,22 @@ import java.io.Serializable; public class StateEventResponseCommand implements Serializable { private String key; - private int status; + private ExecutionStatus status; public StateEventResponseCommand() { super(); } - public StateEventResponseCommand(int status, String key) { + public StateEventResponseCommand(ExecutionStatus status, String key) { this.status = status; this.key = key; } - public int getStatus() { + public ExecutionStatus getStatus() { return status; } - public void setStatus(int status) { + public void setStatus(ExecutionStatus status) { this.status = status; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java index a70cf8f239..b423fc6f91 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import lombok.Data; import lombok.EqualsAndHashCode; @@ -35,9 +36,9 @@ import lombok.ToString; public class TaskExecuteAckCommand extends BaseCommand { private int taskInstanceId; - private int status; + private ExecutionStatus status; - public TaskExecuteAckCommand(int status, + public TaskExecuteAckCommand(ExecutionStatus status, int taskInstanceId, String sourceServerAddress, String messageReceiverAddress, diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckMessage.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckMessage.java index 3e17ade0ad..fbbe9190b5 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckMessage.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckMessage.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import java.io.Serializable; @@ -28,13 +29,13 @@ import java.io.Serializable; public class TaskExecuteRunningAckMessage implements Serializable { private int taskInstanceId; - private int status; + private ExecutionStatus status; public TaskExecuteRunningAckMessage() { super(); } - public TaskExecuteRunningAckMessage(int status, int taskInstanceId) { + public TaskExecuteRunningAckMessage(ExecutionStatus status, int taskInstanceId) { this.status = status; this.taskInstanceId = taskInstanceId; } @@ -47,11 +48,11 @@ public class TaskExecuteRunningAckMessage implements Serializable { this.taskInstanceId = taskInstanceId; } - public int getStatus() { + public ExecutionStatus getStatus() { return status; } - public void setStatus(int status) { + public void setStatus(ExecutionStatus status) { this.status = status; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java index 241d37744a..3a0bcea227 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import java.util.Date; @@ -58,7 +59,7 @@ public class TaskExecuteRunningCommand extends BaseCommand { /** * status */ - private int status; + private ExecutionStatus status; /** * logPath diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java index 03ad4dd694..55ce7ebc84 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import java.io.Serializable; import java.util.List; @@ -40,7 +41,7 @@ public class TaskKillResponseCommand implements Serializable { /** * status */ - private int status; + private ExecutionStatus status; /** @@ -69,11 +70,11 @@ public class TaskKillResponseCommand implements Serializable { this.host = host; } - public int getStatus() { + public ExecutionStatus getStatus() { return status; } - public void setStatus(int status) { + public void setStatus(ExecutionStatus status) { this.status = status; } @@ -111,7 +112,7 @@ public class TaskKillResponseCommand implements Serializable { return "TaskKillResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", host='" + host + '\'' - + ", status=" + status + + ", status=" + status.getDescp() + ", processId=" + processId + ", appIds=" + appIds + '}'; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java index aed647bb4c..f6f57e027f 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import lombok.Data; import lombok.EqualsAndHashCode; @@ -31,9 +32,9 @@ import lombok.ToString; public class TaskRejectAckCommand extends BaseCommand { private int taskInstanceId; - private int status; + private ExecutionStatus status; - public TaskRejectAckCommand(int status, + public TaskRejectAckCommand(ExecutionStatus status, int taskInstanceId, String messageSenderAddress, String messageReceiverAddress, diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteRunningMessageSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteRunningMessageSender.java index 7891f4be70..7ba75efba1 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteRunningMessageSender.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteRunningMessageSender.java @@ -52,7 +52,7 @@ public class TaskExecuteRunningMessageSender implements MessageSender