Browse Source

[Fix-12422][MasterServer] The task needing fault tolerance cannot be submitted (#12423)

* fix complement data bug

* fix 12167

* fix complememt data bug

* fix 12422

Co-authored-by: JinyLeeChina <jiny.li@foxmail.com>
2.0.7-prepare
JinYong Li 2 years ago committed by GitHub
parent
commit
200d4554f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

17
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -22,6 +22,7 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -421,8 +422,11 @@ public class WorkflowExecuteThread implements Runnable {
private void taskFinished(TaskInstance task) {
logger.info("work flow {} task {} state:{} ", processInstance.getId(), task.getId(), task.getState());
if (task.isDependTask() && task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
logger.info("resubmit NEED_FAULT_TOLERANCE dependent task");
if (task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
logger.info("resubmit NEED_FAULT_TOLERANCE {} task", task.getId());
if (task.getMaxRetryTimes() == 0) {
task.setRetryTimes(task.getRetryTimes() + 1);
}
addTaskToStandByList(task);
submitStandByTask();
return;
@ -1464,9 +1468,12 @@ public class WorkflowExecuteThread implements Runnable {
submitPostNode(Long.toString(task.getTaskCode()));
continue;
}
if (retryTask != null && !retryTask.retryTaskIntervalOverTime()) {
logger.info("task name: {} retry waiting has not exceeded the interval time, and skip submission this time, task id:{}", task.getName(), task.getId());
continue;
if (retryTask != null && retryTask.getState() == ExecutionStatus.FAILURE && retryTask.getMaxRetryTimes() !=0 && retryTask.getRetryInterval() != 0) {
long failedTimeInterval = DateUtils.differSec(new Date(), retryTask.getEndTime());
if ((long) retryTask.getRetryInterval() * SEC_2_MINUTES_TIME_UNIT > failedTimeInterval) {
logger.info("task name: {} retry waiting has not exceeded the interval time, and skip submission this time, task id:{}", task.getName(), task.getId());
continue;
}
}
}
//init varPool only this task is the first time running

Loading…
Cancel
Save