diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java index 22f3f9dd8f..1678594323 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java @@ -28,35 +28,48 @@ import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor; import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @AutoService(StateEventHandler.class) public class TaskTimeoutStateEventHandler implements StateEventHandler { + private static final Logger logger = LoggerFactory.getLogger(TaskTimeoutStateEventHandler.class); + @Override public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, - StateEvent stateEvent) throws StateEventHandleError { + StateEvent stateEvent) throws StateEventHandleError { TaskStateEvent taskStateEvent = (TaskStateEvent) stateEvent; TaskMetrics.incTaskInstanceByState("timeout"); workflowExecuteRunnable.checkTaskInstanceByStateEvent(taskStateEvent); TaskInstance taskInstance = - workflowExecuteRunnable.getTaskInstance(taskStateEvent.getTaskInstanceId()).orElseThrow( - () -> new StateEventHandleError(String.format( - "Cannot find the task instance from workflow execute runnable, taskInstanceId: %s", - taskStateEvent.getTaskInstanceId()))); + workflowExecuteRunnable.getTaskInstance(taskStateEvent.getTaskInstanceId()).orElseThrow( + () -> new StateEventHandleError(String.format( + "Cannot find the task instance from workflow execute runnable, taskInstanceId: %s", + taskStateEvent.getTaskInstanceId()))); if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) { return true; } - TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy(); - Map activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap(); - if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy - || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) { - ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode()); - taskProcessor.action(TaskAction.TIMEOUT); + TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine() + .getTimeoutNotifyStrategy(); + Map activeTaskProcessMap = workflowExecuteRunnable + .getActiveTaskProcessMap(); + if ((TaskTimeoutStrategy.FAILED == taskTimeoutStrategy + || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy)) { + if (activeTaskProcessMap.containsKey(taskInstance.getTaskCode())) { + ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode()); + taskProcessor.action(TaskAction.TIMEOUT); + } else { + logger.warn( + "cannot find the task processor for task {}, so skip task processor action.", + taskInstance.getTaskCode()); + } } - if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) { + if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy + || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) { workflowExecuteRunnable.processTimeout(); workflowExecuteRunnable.taskTimeout(taskInstance); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 67ac86808c..0b1b706744 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -305,7 +305,6 @@ public class WorkflowExecuteRunnable implements Callable { } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } - } }