From 5fa3e7b1ed68e203056112941a5be0f1285a6a7c Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Thu, 11 Aug 2022 09:25:53 +0800 Subject: [PATCH] Add pause to task instance status (#11390) --- .../server/master/metrics/TaskMetrics.java | 16 +++-- .../runner/WorkflowExecuteRunnable.java | 58 ++++++++++--------- .../runner/task/BlockingTaskProcessor.java | 2 +- .../runner/task/ConditionTaskProcessor.java | 2 +- .../runner/task/DependentTaskProcessor.java | 2 +- .../runner/task/SwitchTaskProcessor.java | 2 +- .../server/master/BlockingTaskTest.java | 2 +- .../WorkflowStateEventChangeCommand.java | 2 + .../service/process/ProcessServiceImpl.java | 4 +- .../task/api/enums/TaskExecutionStatus.java | 7 ++- 10 files changed, 53 insertions(+), 44 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java index 6f9634fa5d..e1b1307b80 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java @@ -17,25 +17,24 @@ package org.apache.dolphinscheduler.server.master.metrics; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.function.Supplier; - import com.facebook.presto.jdbc.internal.guava.collect.ImmutableSet; - import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Metrics; import lombok.experimental.UtilityClass; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; + @UtilityClass public class TaskMetrics { private final Map taskInstanceCounters = new HashMap<>(); private final Set taskInstanceStates = ImmutableSet.of( - "submit", "timeout", "finish", "failover", "retry", "dispatch", "success", "fail", "stop"); + "submit", "timeout", "finish", "failover", "retry", "dispatch", "success", "kill", "fail", "stop"); static { for (final String state : taskInstanceStates) { @@ -44,8 +43,7 @@ public class TaskMetrics { Counter.builder("ds.task.instance.count") .tags("state", state) .description(String.format("Process instance %s total count", state)) - .register(Metrics.globalRegistry) - ); + .register(Metrics.globalRegistry)); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 09a19f6d63..c952832c99 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -17,19 +17,11 @@ package org.apache.dolphinscheduler.server.master.runner; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; -import static org.apache.dolphinscheduler.common.Constants.COMMA; -import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; -import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING; -import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR; -import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; - +import com.google.common.collect.Lists; +import lombok.NonNull; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.FailureStrategy; @@ -68,7 +60,13 @@ import org.apache.dolphinscheduler.remote.command.HostUpdateCommand; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; -import org.apache.dolphinscheduler.server.master.event.*; +import org.apache.dolphinscheduler.server.master.event.StateEvent; +import org.apache.dolphinscheduler.server.master.event.StateEventHandleError; +import org.apache.dolphinscheduler.server.master.event.StateEventHandleException; +import org.apache.dolphinscheduler.server.master.event.StateEventHandler; +import org.apache.dolphinscheduler.server.master.event.StateEventHandlerManager; +import org.apache.dolphinscheduler.server.master.event.TaskStateEvent; +import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent; import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor; import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; @@ -79,10 +77,9 @@ import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.math.NumberUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.BeanUtils; import java.util.ArrayList; import java.util.Arrays; @@ -103,13 +100,18 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.BeanUtils; - -import com.google.common.collect.Lists; - -import lombok.NonNull; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; +import static org.apache.dolphinscheduler.common.Constants.COMMA; +import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; +import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING; +import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR; +import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; /** * Workflow execute task, used to execute a workflow instance. @@ -1509,7 +1511,9 @@ public class WorkflowExecuteRunnable implements Callable { return WorkflowExecutionStatus.FAILURE; } - if (processInstance.isBlocked() || !isComplementEnd() || readyToSubmitTaskQueue.size() > 0) { + List pauseList = getCompleteTaskByState(TaskExecutionStatus.PAUSE); + if (CollectionUtils.isNotEmpty(pauseList) || processInstance.isBlocked() || !isComplementEnd() + || readyToSubmitTaskQueue.size() > 0) { return WorkflowExecutionStatus.PAUSE; } else { return WorkflowExecutionStatus.SUCCESS; @@ -1534,7 +1538,7 @@ public class WorkflowExecuteRunnable implements Callable { } if (readyToSubmitTaskQueue.size() > 0) { for (Iterator iter = readyToSubmitTaskQueue.iterator(); iter.hasNext();) { - iter.next().setState(TaskExecutionStatus.KILL); + iter.next().setState(TaskExecutionStatus.PAUSE); } } return WorkflowExecutionStatus.BLOCK; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java index 5148a9733f..c6a1a99d88 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java @@ -85,7 +85,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor { @Override protected boolean pauseTask() { // todo: task cannot be pause - taskInstance.setState(TaskExecutionStatus.KILL); + taskInstance.setState(TaskExecutionStatus.PAUSE); taskInstance.setEndTime(new Date()); processService.saveTaskInstance(taskInstance); logger.info("blocking task has been paused"); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java index 91d414c590..2e4f7bf1ac 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java @@ -95,7 +95,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { @Override protected boolean pauseTask() { - this.taskInstance.setState(TaskExecutionStatus.KILL); + this.taskInstance.setState(TaskExecutionStatus.PAUSE); this.taskInstance.setEndTime(new Date()); processService.saveTaskInstance(taskInstance); return true; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java index e3f109163f..1dfa4812cf 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java @@ -140,7 +140,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor { @Override protected boolean pauseTask() { - this.taskInstance.setState(TaskExecutionStatus.KILL); + this.taskInstance.setState(TaskExecutionStatus.PAUSE); this.taskInstance.setEndTime(new Date()); processService.saveTaskInstance(taskInstance); return true; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java index 8c7c46cd6e..82802d8e61 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java @@ -104,7 +104,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { @Override protected boolean pauseTask() { - this.taskInstance.setState(TaskExecutionStatus.KILL); + this.taskInstance.setState(TaskExecutionStatus.PAUSE); this.taskInstance.setEndTime(new Date()); processService.saveTaskInstance(taskInstance); return true; diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java index eeef4b2c2e..9d0d663278 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java @@ -239,7 +239,7 @@ public class BlockingTaskTest { blockingTaskProcessor.action(TaskAction.SUBMIT); blockingTaskProcessor.action(TaskAction.PAUSE); TaskExecutionStatus status = taskInstance.getState(); - Assert.assertEquals(TaskExecutionStatus.KILL, status); + Assert.assertEquals(TaskExecutionStatus.PAUSE, status); } @Test diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowStateEventChangeCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowStateEventChangeCommand.java index 0b45c48215..5db49f8500 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowStateEventChangeCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowStateEventChangeCommand.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.remote.command; import lombok.Data; +import lombok.NoArgsConstructor; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -27,6 +28,7 @@ import java.io.Serializable; * db task final result response command */ @Data +@NoArgsConstructor public class WorkflowStateEventChangeCommand implements Serializable { private String key; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index f655d9b744..a48d783f9b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -1570,7 +1570,7 @@ public class ProcessServiceImpl implements ProcessService { return null; } if (processInstanceState == WorkflowExecutionStatus.READY_PAUSE) { - taskInstance.setState(TaskExecutionStatus.KILL); + taskInstance.setState(TaskExecutionStatus.PAUSE); } taskInstance.setExecutorId(processInstance.getExecutorId()); taskInstance.setState(getSubmitTaskState(taskInstance, processInstance)); @@ -1614,7 +1614,7 @@ public class ProcessServiceImpl implements ProcessService { // return pasue /stop if process instance state is ready pause / stop // or return submit success if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) { - state = TaskExecutionStatus.KILL; + state = TaskExecutionStatus.PAUSE; } else if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP || !checkProcessStrategy(taskInstance, processInstance)) { state = TaskExecutionStatus.KILL; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java index 5ca3f893d2..a8d297cb0d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java @@ -26,6 +26,7 @@ public enum TaskExecutionStatus { SUBMITTED_SUCCESS(0, "submit success"), RUNNING_EXECUTION(1, "running"), + PAUSE(3, "pause"), FAILURE(6, "failure"), SUCCESS(7, "success"), NEED_FAULT_TOLERANCE(8, "need fault tolerance"), @@ -82,8 +83,12 @@ public enum TaskExecutionStatus { return this == TaskExecutionStatus.FAILURE; } + public boolean isPause() { + return this == TaskExecutionStatus.PAUSE; + } + public boolean isFinished() { - return isSuccess() || isKill() || isFailure(); + return isSuccess() || isKill() || isFailure() || isPause(); } public boolean isNeedFaultTolerance() {