|
|
@ -1180,14 +1180,13 @@ public class MasterExecThread implements Runnable { |
|
|
|
/** |
|
|
|
/** |
|
|
|
* handling the list of tasks to be submitted |
|
|
|
* handling the list of tasks to be submitted |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
private void submitStandByTask(){ |
|
|
|
private void submitStandByTask() { |
|
|
|
for(Map.Entry<String, TaskInstance> entry: readyToSubmitTaskList.entrySet()) { |
|
|
|
for (Map.Entry<String, TaskInstance> entry : readyToSubmitTaskList.entrySet()) { |
|
|
|
TaskInstance task = entry.getValue(); |
|
|
|
TaskInstance task = entry.getValue(); |
|
|
|
// stop tasks which is retrying if forced success happens
|
|
|
|
// stop tasks which is retrying if forced success happens
|
|
|
|
if (task.taskCanRetry()) { |
|
|
|
if (task.taskCanRetry()) { |
|
|
|
TaskInstance tmpTask = processService.findTaskInstanceById(task.getId()); |
|
|
|
TaskInstance tmpTask = processService.findTaskInstanceById(task.getId()); |
|
|
|
if (tmpTask != null) { |
|
|
|
if (tmpTask != null && tmpTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) { |
|
|
|
if (tmpTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) { |
|
|
|
|
|
|
|
task.setState(tmpTask.getState()); |
|
|
|
task.setState(tmpTask.getState()); |
|
|
|
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName()); |
|
|
|
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName()); |
|
|
|
removeTaskFromStandbyList(task); |
|
|
|
removeTaskFromStandbyList(task); |
|
|
@ -1196,18 +1195,17 @@ public class MasterExecThread implements Runnable { |
|
|
|
continue; |
|
|
|
continue; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
DependResult dependResult = getDependResultForTask(task); |
|
|
|
DependResult dependResult = getDependResultForTask(task); |
|
|
|
if(DependResult.SUCCESS == dependResult){ |
|
|
|
if (DependResult.SUCCESS == dependResult) { |
|
|
|
if(retryTaskIntervalOverTime(task)){ |
|
|
|
if (retryTaskIntervalOverTime(task)) { |
|
|
|
submitTaskExec(task); |
|
|
|
submitTaskExec(task); |
|
|
|
removeTaskFromStandbyList(task); |
|
|
|
removeTaskFromStandbyList(task); |
|
|
|
} |
|
|
|
} |
|
|
|
}else if(DependResult.FAILED == dependResult){ |
|
|
|
} else if (DependResult.FAILED == dependResult) { |
|
|
|
// if the dependency fails, the current node is not submitted and the state changes to failure.
|
|
|
|
// if the dependency fails, the current node is not submitted and the state changes to failure.
|
|
|
|
dependFailedTask.put(entry.getKey(), task); |
|
|
|
dependFailedTask.put(entry.getKey(), task); |
|
|
|
removeTaskFromStandbyList(task); |
|
|
|
removeTaskFromStandbyList(task); |
|
|
|
logger.info("task {},id:{} depend result : {}",task.getName(), task.getId(), dependResult); |
|
|
|
logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|