diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 12fed3ab56..7bf2f30e8c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -82,6 +82,7 @@ import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory; +import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.utils.TaskUtils; import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; @@ -122,6 +123,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -983,7 +985,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { } } // 4. submit to dispatch queue - taskExecuteRunnable.dispatch(); + tryToDispatchTaskInstance(taskInstance, taskExecuteRunnable); stateWheelExecuteThread.addTask4TimeoutCheck(workflowInstance, taskInstance); return true; @@ -996,6 +998,35 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { } } + /** + * Sometimes (such as pause), if the task instance status has already been finished, + * there is no need to dispatch it + */ + @VisibleForTesting + void tryToDispatchTaskInstance(TaskInstance taskInstance, TaskExecuteRunnable taskExecuteRunnable) { + if (!taskInstance.getState().isFinished()) { + taskExecuteRunnable.dispatch(); + } else { + if (workflowExecuteContext.getWorkflowInstance().isBlocked()) { + TaskStateEvent processBlockEvent = TaskStateEvent.builder() + .processInstanceId(workflowExecuteContext.getWorkflowInstance().getId()) + .taskInstanceId(taskInstance.getId()) + .status(taskInstance.getState()) + .type(StateEventType.PROCESS_BLOCKED) + .build(); + this.stateEvents.add(processBlockEvent); + } + + TaskStateEvent taskStateChangeEvent = TaskStateEvent.builder() + .processInstanceId(workflowExecuteContext.getWorkflowInstance().getId()) + .taskInstanceId(taskInstance.getId()) + .status(taskInstance.getState()) + .type(StateEventType.TASK_STATE_CHANGE) + .build(); + this.stateEvents.add(taskStateChangeEvent); + } + } + /** * find task instance in db. * in case submit more than one same name task in the same time. diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java index da96be2066..9599478f4a 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java @@ -35,9 +35,11 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory; +import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.command.CommandService; @@ -371,4 +373,20 @@ public class WorkflowExecuteRunnableTest { return schedulerList; } + @Test + void testTryToDispatchTaskInstance() { + // task instance already finished, not dispatch + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setState(TaskExecutionStatus.PAUSE); + Mockito.when(processInstance.isBlocked()).thenReturn(true); + TaskExecuteRunnable taskExecuteRunnable = Mockito.mock(TaskExecuteRunnable.class); + workflowExecuteThread.tryToDispatchTaskInstance(taskInstance, taskExecuteRunnable); + Mockito.verify(taskExecuteRunnable, Mockito.never()).dispatch(); + + // submit success should dispatch + taskInstance = new TaskInstance(); + taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS); + workflowExecuteThread.tryToDispatchTaskInstance(taskInstance, taskExecuteRunnable); + Mockito.verify(taskExecuteRunnable).dispatch(); + } }