From 0f4bce18833ca91f38a6a58b705e269eaec5ba4c Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 4 Sep 2024 13:57:27 +0800 Subject: [PATCH] [Improvement-16574][Master] Move some task operations into ITaskExecutionRunnable (#16575) --- .../BackfillWorkflowExecutorDelegate.java | 1 - .../WorkflowBackfillTriggerRequest.java | 3 - .../TaskPauseLifecycleEventHandler.java | 3 - .../TaskStartLifecycleEventHandler.java | 2 +- .../task/runnable/ITaskExecutionRunnable.java | 41 ++++++++-- .../task/runnable/TaskExecutionRunnable.java | 81 +++++++++++++++---- .../statemachine/AbstractTaskStateAction.java | 46 +---------- .../statemachine/TaskFailureStateAction.java | 9 +-- .../WorkflowSuccessLifecycleListener.java | 1 - .../runnable/WorkflowExecutionRunnable.java | 12 ++- .../WorkflowRunningStateAction.java | 11 +-- .../trigger/WorkflowBackfillTrigger.java | 2 +- 12 files changed, 122 insertions(+), 90 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java index 85cd20c75c..ad965d8557 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java @@ -122,7 +122,6 @@ public class BackfillWorkflowExecutorDelegate implements IExecutorDelegate { - String getName(); + default String getName() { + return getTaskDefinition().getName(); + } + /** + * Whether the task instance is initialized. + *

If the ITaskExecutionRunnable is never triggered, it is not initialized. + *

If the ITaskExecutionRunnable is created by failover, recovered then it is initialized. + */ boolean isTaskInstanceInitialized(); - void initializeTaskInstance(); + /** + * Initialize the task instance with {@link FirstRunTaskInstanceFactory} + */ + void initializeFirstRunTaskInstance(); - boolean isTaskInstanceNeedRetry(); + /** + * Whether the task instance is running. + */ + boolean isTaskInstanceCanRetry(); - void initializeRetryTaskInstance(); + /** + * Retry the TaskExecutionRunnable. + *

Will create retry task instance and start it. + */ + void retry(); - void initializeFailoverTaskInstance(); + /** + * Failover the TaskExecutionRunnable. + *

The failover logic is judged by the task instance state. + */ + void failover(); + + /** + * Pause the TaskExecutionRunnable. + */ + void pause(); + + /** + * Kill the TaskExecutionRunnable. + */ + void kill(); WorkflowEventBus getWorkflowEventBus(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java index d2037e642a..09466eec68 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java @@ -24,11 +24,22 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.extract.base.client.Clients; +import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator; +import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskRequest; +import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskResponse; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph; +import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKillLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPauseLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskStartLifecycleEvent; import org.apache.dolphinscheduler.server.master.runner.TaskExecutionContextFactory; +import org.apache.commons.lang3.StringUtils; + import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -62,24 +73,20 @@ public class TaskExecutionRunnable implements ITaskExecutionRunnable { this.workflowInstance = checkNotNull(taskExecutionRunnableBuilder.getWorkflowInstance()); this.taskDefinition = checkNotNull(taskExecutionRunnableBuilder.getTaskDefinition()); this.taskInstance = taskExecutionRunnableBuilder.getTaskInstance(); - if (taskInstance != null) { + if (isTaskInstanceInitialized()) { initializeTaskExecutionContext(); } } - @Override - public String getName() { - return taskDefinition.getName(); - } - @Override public boolean isTaskInstanceInitialized() { return taskInstance != null; } @Override - public void initializeTaskInstance() { - checkState(taskInstance == null, "The task instance is not null, should not initialize again."); + public void initializeFirstRunTaskInstance() { + checkState(!isTaskInstanceInitialized(), + "The task instance is already initialized, can't initialize first run task."); this.taskInstance = applicationContext.getBean(TaskInstanceFactories.class) .firstRunTaskInstanceFactory() .builder() @@ -90,35 +97,50 @@ public class TaskExecutionRunnable implements ITaskExecutionRunnable { } @Override - public boolean isTaskInstanceNeedRetry() { + public boolean isTaskInstanceCanRetry() { return taskInstance.getRetryTimes() < taskInstance.getMaxRetryTimes(); } @Override - public void initializeRetryTaskInstance() { - checkState(taskInstance != null, "The task instance can't retry, should not initialize retry task instance."); + public void retry() { + checkState(isTaskInstanceInitialized(), "The task instance is not initialized, can't initialize retry task."); this.taskInstance = applicationContext.getBean(TaskInstanceFactories.class) .retryTaskInstanceFactory() .builder() .withTaskInstance(taskInstance) .build(); initializeTaskExecutionContext(); + getWorkflowEventBus().publish(TaskStartLifecycleEvent.of(this)); } @Override - public void initializeFailoverTaskInstance() { - checkState(taskInstance != null, - "The task instance can't failover, should not initialize failover task instance."); + public void failover() { + checkState(isTaskInstanceInitialized(), "The task instance is not initialized, can't failover."); + if (takeOverTaskFromExecutor()) { + log.info("Failover task success, the task {} has been taken-over from executor", taskInstance.getName()); + return; + } this.taskInstance = applicationContext.getBean(TaskInstanceFactories.class) .failoverTaskInstanceFactory() .builder() .withTaskInstance(taskInstance) .build(); initializeTaskExecutionContext(); + getWorkflowEventBus().publish(TaskStartLifecycleEvent.of(this)); + } + + @Override + public void pause() { + getWorkflowEventBus().publish(TaskPauseLifecycleEvent.of(this)); + } + + @Override + public void kill() { + getWorkflowEventBus().publish(TaskKillLifecycleEvent.of(this)); } private void initializeTaskExecutionContext() { - checkState(taskInstance != null, "The task instance is null, can't initialize TaskExecutionContext."); + checkState(isTaskInstanceInitialized(), "The task instance is null, can't initialize TaskExecutionContext."); final TaskExecutionContextCreateRequest request = TaskExecutionContextCreateRequest.builder() .workflowDefinition(workflowDefinition) .workflowInstance(workflowInstance) @@ -129,6 +151,32 @@ public class TaskExecutionRunnable implements ITaskExecutionRunnable { .createTaskExecutionContext(request); } + private boolean takeOverTaskFromExecutor() { + checkState(isTaskInstanceInitialized(), "The task instance is null, can't take over from executor."); + if (TaskTypeUtils.isLogicTask(taskInstance.getTaskType())) { + return false; + } + if (StringUtils.isEmpty(taskInstance.getHost())) { + log.debug("Task: {} host is empty, cannot take over the task from executor(This is normal case).", + taskInstance.getName()); + return false; + } + try { + final TakeOverTaskRequest takeOverTaskRequest = TakeOverTaskRequest.builder() + .taskInstanceId(taskInstance.getId()) + .workflowHost(applicationContext.getBean(MasterConfig.class).getMasterAddress()) + .build(); + final TakeOverTaskResponse takeOverTaskResponse = Clients + .withService(ITaskInstanceOperator.class) + .withHost(taskInstance.getHost()) + .takeOverTask(takeOverTaskRequest); + return takeOverTaskResponse.isSuccess(); + } catch (Exception ex) { + log.warn("Take over task: {} failed", taskInstance.getName(), ex); + return false; + } + } + @Override public int compareTo(ITaskExecutionRunnable other) { if (other == null) { @@ -159,6 +207,9 @@ public class TaskExecutionRunnable implements ITaskExecutionRunnable { @Override public String toString() { + if (taskInstance != null) { + return "TaskExecutionRunnable{" + "name=" + getName() + ", state=" + taskInstance.getState() + '}'; + } return "TaskExecutionRunnable{" + "name=" + getName() + '}'; } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java index 23dbf6754e..1de7181746 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java @@ -23,12 +23,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionSta import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; -import org.apache.dolphinscheduler.extract.base.client.Clients; -import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator; -import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskRequest; -import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskResponse; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; -import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.engine.AbstractLifecycleEvent; @@ -170,7 +165,7 @@ public abstract class AbstractTaskStateAction implements ITaskStateAction { releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable); persistentTaskInstanceFailedEventToDB(taskExecutionRunnable, taskFailedEvent); - if (taskExecutionRunnable.isTaskInstanceNeedRetry()) { + if (taskExecutionRunnable.isTaskInstanceCanRetry()) { taskExecutionRunnable.getWorkflowEventBus().publish(TaskRetryLifecycleEvent.of(taskExecutionRunnable)); return; } @@ -227,44 +222,7 @@ public abstract class AbstractTaskStateAction implements ITaskStateAction { *

If the take-over fails, will generate a failover task-instance and mark the task instance status to {@link TaskExecutionStatus#NEED_FAULT_TOLERANCE}. */ protected void failoverTask(final ITaskExecutionRunnable taskExecutionRunnable) { - if (!taskExecutionRunnable.isTaskInstanceInitialized()) { - throw new IllegalStateException("The task instance hasn't been initialized, cannot take over the task"); - } - if (takeOverTask(taskExecutionRunnable)) { - log.info("Failover task success, the task {} has been taken-over", taskExecutionRunnable.getName()); - return; - } - taskExecutionRunnable.initializeFailoverTaskInstance(); - tryToDispatchTask(taskExecutionRunnable); - log.info("Failover task success, the task {} has been resubmitted.", taskExecutionRunnable.getName()); - } - - private boolean takeOverTask(final ITaskExecutionRunnable taskExecutionRunnable) { - if (!taskExecutionRunnable.isTaskInstanceInitialized()) { - log.debug("Task: {} doesn't initialized yet, cannot take over the task", taskExecutionRunnable.getName()); - return false; - } - if (TaskTypeUtils.isLogicTask(taskExecutionRunnable.getTaskInstance().getTaskType())) { - return false; - } - if (StringUtils.isEmpty(taskExecutionRunnable.getTaskInstance().getHost())) { - log.debug("Task: {} host is empty, cannot take over the task", taskExecutionRunnable.getName()); - return false; - } - try { - final TakeOverTaskRequest takeOverTaskRequest = TakeOverTaskRequest.builder() - .taskInstanceId(taskExecutionRunnable.getTaskInstance().getId()) - .workflowHost(masterConfig.getMasterAddress()) - .build(); - final TakeOverTaskResponse takeOverTaskResponse = Clients - .withService(ITaskInstanceOperator.class) - .withHost(taskExecutionRunnable.getTaskInstance().getHost()) - .takeOverTask(takeOverTaskRequest); - return takeOverTaskResponse.isSuccess(); - } catch (Exception ex) { - log.warn("Take over task: {} failed", taskExecutionRunnable.getName(), ex); - return false; - } + taskExecutionRunnable.failover(); } protected void tryToDispatchTask(final ITaskExecutionRunnable taskExecutionRunnable) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskFailureStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskFailureStateAction.java index 3f7736fa2f..005a0003ca 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskFailureStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/TaskFailureStateAction.java @@ -73,15 +73,14 @@ public class TaskFailureStateAction extends AbstractTaskStateAction { throwExceptionIfStateIsNotMatch(taskExecutionRunnable); final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance(); // check the retry times - if (!taskExecutionRunnable.isTaskInstanceNeedRetry()) { + if (!taskExecutionRunnable.isTaskInstanceCanRetry()) { log.info("The task: {} cannot retry, because the retry times: {} is over the max retry times: {}", taskInstance.getName(), taskInstance.getRetryTimes(), taskInstance.getMaxRetryTimes()); return; } - taskExecutionRunnable.initializeRetryTaskInstance(); - taskExecutionRunnable.getWorkflowEventBus().publish(TaskStartLifecycleEvent.of(taskExecutionRunnable)); + taskExecutionRunnable.retry(); } @Override @@ -117,7 +116,7 @@ public class TaskFailureStateAction extends AbstractTaskStateAction { // This case happen when the task is failure but the task is in delay retry queue. // We don't remove the event in GlobalWorkflowDelayEventCoordinator the event should be dropped when the task is // killed. - if (taskExecutionRunnable.isTaskInstanceNeedRetry() + if (taskExecutionRunnable.isTaskInstanceCanRetry() && workflowExecutionGraph.isTaskExecutionRunnableActive(taskExecutionRunnable)) { workflowExecutionGraph.markTaskExecutionRunnableChainPause(taskExecutionRunnable); publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable); @@ -143,7 +142,7 @@ public class TaskFailureStateAction extends AbstractTaskStateAction { // This case happen when the task is failure but the task is in delay retry queue. // We don't remove the event in GlobalWorkflowDelayEventCoordinator the event should be dropped when the task is // killed. - if (taskExecutionRunnable.isTaskInstanceNeedRetry() + if (taskExecutionRunnable.isTaskInstanceCanRetry() && workflowExecutionGraph.isTaskExecutionRunnableActive(taskExecutionRunnable)) { workflowExecutionGraph.markTaskExecutionRunnableChainKill(taskExecutionRunnable); publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/listener/WorkflowSuccessLifecycleListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/listener/WorkflowSuccessLifecycleListener.java index afc993af7d..4801b714be 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/listener/WorkflowSuccessLifecycleListener.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/listener/WorkflowSuccessLifecycleListener.java @@ -89,7 +89,6 @@ public class WorkflowSuccessLifecycleListener implements IWorkflowLifecycleListe .startNodes(commandParam.getStartNodes()) .failureStrategy(workflowInstance.getFailureStrategy()) .taskDependType(workflowInstance.getTaskDependType()) - .execType(CommandType.COMPLEMENT_DATA) .warningType(workflowInstance.getWarningType()) .warningGroupId(workflowInstance.getWarningGroupId()) .workflowInstancePriority(workflowInstance.getProcessInstancePriority()) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnable.java index 73e17aac74..7fdd59b2e6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnable.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.engine.workflow.runnable; import static com.google.common.base.Preconditions.checkArgument; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowPauseLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStopLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.workflow.listener.IWorkflowLifecycleListener; @@ -29,8 +30,6 @@ import java.util.List; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.springframework.context.ApplicationContext; - @Slf4j public class WorkflowExecutionRunnable implements IWorkflowExecutionRunnable { @@ -41,7 +40,6 @@ public class WorkflowExecutionRunnable implements IWorkflowExecutionRunnable { private final List workflowInstanceLifecycleListeners; public WorkflowExecutionRunnable(WorkflowExecutionRunnableBuilder workflowExecutionRunnableBuilder) { - final ApplicationContext applicationContext = workflowExecutionRunnableBuilder.getApplicationContext(); this.workflowExecuteContext = workflowExecutionRunnableBuilder.getWorkflowExecuteContextBuilder().build(); this.workflowInstanceLifecycleListeners = workflowExecuteContext.getWorkflowInstanceLifecycleListeners(); } @@ -67,4 +65,12 @@ public class WorkflowExecutionRunnable implements IWorkflowExecutionRunnable { workflowInstanceLifecycleListeners.add(listener); } + @Override + public String toString() { + final WorkflowInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + return "WorkflowExecutionRunnable{" + + "name=" + workflowInstance.getName() + + ", state=" + workflowInstance.getState().name() + + '}'; + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java index 16ef3c1fb0..914d050191 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/WorkflowRunningStateAction.java @@ -21,8 +21,7 @@ import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph; -import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKillLifecycleEvent; -import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPauseLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFailedLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowFinalizeLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowPauseLifecycleEvent; @@ -67,12 +66,10 @@ public class WorkflowRunningStateAction extends AbstractWorkflowStateAction { super.transformWorkflowInstanceState(workflowExecutionRunnable, WorkflowExecutionStatus.READY_PAUSE); try { LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId()); - final WorkflowEventBus workflowEventBus = workflowExecutionRunnable.getWorkflowEventBus(); workflowExecutionRunnable .getWorkflowExecutionGraph() .getActiveTaskExecutionRunnable() - .forEach(taskExecutionRunnable -> workflowEventBus - .publish(TaskPauseLifecycleEvent.of(taskExecutionRunnable))); + .forEach(ITaskExecutionRunnable::pause); } finally { LogUtils.removeWorkflowInstanceIdMDC(); } @@ -93,12 +90,10 @@ public class WorkflowRunningStateAction extends AbstractWorkflowStateAction { // do pause action try { LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId()); - final WorkflowEventBus workflowEventBus = workflowExecutionRunnable.getWorkflowEventBus(); workflowExecutionRunnable .getWorkflowExecutionGraph() .getActiveTaskExecutionRunnable() - .forEach(taskExecutionRunnable -> workflowEventBus - .publish(TaskKillLifecycleEvent.of(taskExecutionRunnable))); + .forEach(ITaskExecutionRunnable::kill); } finally { LogUtils.removeWorkflowInstanceIdMDC(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowBackfillTrigger.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowBackfillTrigger.java index 3a00d94944..bee552340b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowBackfillTrigger.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowBackfillTrigger.java @@ -98,7 +98,7 @@ public class WorkflowBackfillTrigger .backfillTimeList(backfillTriggerRequest.getBackfillTimeList()) .build(); return Command.builder() - .commandType(backfillTriggerRequest.getExecType()) + .commandType(CommandType.COMPLEMENT_DATA) .processDefinitionCode(backfillTriggerRequest.getWorkflowCode()) .processDefinitionVersion(backfillTriggerRequest.getWorkflowVersion()) .processInstanceId(workflowInstance.getId())