Browse Source

[Bugfix-12568] [Master] The retry task was submitted in advance will block other task (#12570)

* fix 12568

* fix 12568

* code style

Co-authored-by: JinyLeeChina <jiny.li@foxmail.com>
2.0.8-release
JinYong Li 2 years ago committed by GitHub
parent
commit
ddfdc5dc62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -1452,6 +1452,7 @@ public class WorkflowExecuteThread implements Runnable {
private void submitStandByTask() {
try {
int length = readyToSubmitTaskQueue.size();
List<TaskInstance> skipSubmitInstances = new ArrayList<>();
for (int i = 0; i < length; i++) {
TaskInstance task = readyToSubmitTaskQueue.peek();
if (task == null) {
@ -1472,6 +1473,8 @@ public class WorkflowExecuteThread implements Runnable {
long failedTimeInterval = DateUtils.differSec(new Date(), retryTask.getEndTime());
if ((long) retryTask.getRetryInterval() * SEC_2_MINUTES_TIME_UNIT > failedTimeInterval) {
logger.info("task name: {} retry waiting has not exceeded the interval time, and skip submission this time, task id:{}", task.getName(), task.getId());
readyToSubmitTaskQueue.remove(task);
skipSubmitInstances.add(task);
continue;
}
}
@ -1512,6 +1515,10 @@ public class WorkflowExecuteThread implements Runnable {
logger.info("remove task {},id:{} , because depend result : {}", task.getName(), task.getId(), dependResult);
}
}
for (TaskInstance task : skipSubmitInstances) {
readyToSubmitTaskQueue.put(task);
}
skipSubmitInstances.clear();
} catch (Exception e) {
logger.error("submit standby task error", e);
}

Loading…
Cancel
Save