Browse Source

[Bug] [Master] WorkflowExecuteRunnable will face a infinite loop #11838 (#11864) (#11950)

* [Bug] [Master] WorkflowExecuteRunnable will face a infinite loop #11838

Co-authored-by: Yann Ann <2993643785@qq.com>
3.1.0-release
caishunfeng 2 years ago committed by GitHub
parent
commit
27b69e608a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 37
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
  2. 1
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

37
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java

@ -28,35 +28,48 @@ import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import java.util.Map; import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@AutoService(StateEventHandler.class) @AutoService(StateEventHandler.class)
public class TaskTimeoutStateEventHandler implements StateEventHandler { public class TaskTimeoutStateEventHandler implements StateEventHandler {
private static final Logger logger = LoggerFactory.getLogger(TaskTimeoutStateEventHandler.class);
@Override @Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable,
StateEvent stateEvent) throws StateEventHandleError { StateEvent stateEvent) throws StateEventHandleError {
TaskStateEvent taskStateEvent = (TaskStateEvent) stateEvent; TaskStateEvent taskStateEvent = (TaskStateEvent) stateEvent;
TaskMetrics.incTaskInstanceByState("timeout"); TaskMetrics.incTaskInstanceByState("timeout");
workflowExecuteRunnable.checkTaskInstanceByStateEvent(taskStateEvent); workflowExecuteRunnable.checkTaskInstanceByStateEvent(taskStateEvent);
TaskInstance taskInstance = TaskInstance taskInstance =
workflowExecuteRunnable.getTaskInstance(taskStateEvent.getTaskInstanceId()).orElseThrow( workflowExecuteRunnable.getTaskInstance(taskStateEvent.getTaskInstanceId()).orElseThrow(
() -> new StateEventHandleError(String.format( () -> new StateEventHandleError(String.format(
"Cannot find the task instance from workflow execute runnable, taskInstanceId: %s", "Cannot find the task instance from workflow execute runnable, taskInstanceId: %s",
taskStateEvent.getTaskInstanceId()))); taskStateEvent.getTaskInstanceId())));
if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) { if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) {
return true; return true;
} }
TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy(); TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine()
Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap(); .getTimeoutNotifyStrategy();
if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable
|| TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) { .getActiveTaskProcessMap();
ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode()); if ((TaskTimeoutStrategy.FAILED == taskTimeoutStrategy
taskProcessor.action(TaskAction.TIMEOUT); || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy)) {
if (activeTaskProcessMap.containsKey(taskInstance.getTaskCode())) {
ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode());
taskProcessor.action(TaskAction.TIMEOUT);
} else {
logger.warn(
"cannot find the task processor for task {}, so skip task processor action.",
taskInstance.getTaskCode());
}
} }
if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) { if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy
|| TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
workflowExecuteRunnable.processTimeout(); workflowExecuteRunnable.processTimeout();
workflowExecuteRunnable.taskTimeout(taskInstance); workflowExecuteRunnable.taskTimeout(taskInstance);
} }

1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -300,7 +300,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
} finally { } finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
} }
} }
} }

Loading…
Cancel
Save