From 9a4c7f876aafb54cdd28a889ffb1b7856022c448 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Thu, 16 Jun 2022 21:46:18 +0800 Subject: [PATCH] Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe (#10479) * Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe (cherry picked from commit ad2646ff1f7baa5d76d29023ced2c28a89b52f6b) --- .../server/master/runner/WorkflowExecuteRunnable.java | 5 +++++ .../server/master/runner/WorkflowExecuteThreadPool.java | 2 +- .../server/master/runner/task/CommonTaskProcessor.java | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 014abd30bc..2b11498a32 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -2050,6 +2050,11 @@ public class WorkflowExecuteRunnable implements Runnable { } private void measureTaskState(StateEvent taskStateEvent) { + if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) { + // the event is broken + logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent); + return; + } if (taskStateEvent.getExecutionStatus().typeIsFinished()) { TaskMetrics.incTaskFinish(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index d8e2fbbeda..92ff7c04f1 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -107,7 +107,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { /** * Handle the events belong to the given workflow. */ - public void executeEvent(WorkflowExecuteRunnable workflowExecuteThread) { + public void executeEvent(final WorkflowExecuteRunnable workflowExecuteThread) { if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) { return; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index ae8198ebfd..a19d08affc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -125,7 +125,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { taskPriority.setTaskExecutionContext(taskExecutionContext); taskUpdateQueue.put(taskPriority); - logger.info("master submit success, task : {}", taskInstance.getName()); + logger.info("Master submit task to priority queue success, taskInstanceId : {}", taskInstance.getId()); return true; } catch (Exception e) { logger.error("submit task error", e);