From ddfdc5dc6258e764d075b1a81d4a4d0a8aa5ef47 Mon Sep 17 00:00:00 2001 From: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com> Date: Wed, 2 Nov 2022 11:55:17 +0800 Subject: [PATCH] [Bugfix-12568] [Master] The retry task was submitted in advance will block other task (#12570) * fix 12568 * fix 12568 * code style Co-authored-by: JinyLeeChina --- .../server/master/runner/WorkflowExecuteThread.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 4d84227c40..89090cb639 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -1452,6 +1452,7 @@ public class WorkflowExecuteThread implements Runnable { private void submitStandByTask() { try { int length = readyToSubmitTaskQueue.size(); + List skipSubmitInstances = new ArrayList<>(); for (int i = 0; i < length; i++) { TaskInstance task = readyToSubmitTaskQueue.peek(); if (task == null) { @@ -1472,6 +1473,8 @@ public class WorkflowExecuteThread implements Runnable { long failedTimeInterval = DateUtils.differSec(new Date(), retryTask.getEndTime()); if ((long) retryTask.getRetryInterval() * SEC_2_MINUTES_TIME_UNIT > failedTimeInterval) { logger.info("task name: {} retry waiting has not exceeded the interval time, and skip submission this time, task id:{}", task.getName(), task.getId()); + readyToSubmitTaskQueue.remove(task); + skipSubmitInstances.add(task); continue; } } @@ -1512,6 +1515,10 @@ public class WorkflowExecuteThread implements Runnable { logger.info("remove task {},id:{} , because depend result : {}", task.getName(), task.getId(), dependResult); } } + for (TaskInstance task : skipSubmitInstances) { + readyToSubmitTaskQueue.put(task); + } + skipSubmitInstances.clear(); } catch (Exception e) { logger.error("submit standby task error", e); }