|
|
@ -550,11 +550,9 @@ public class MasterExecThread implements Runnable { |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
private List<TaskInstance> getCompleteTaskByState(ExecutionStatus state){ |
|
|
|
private List<TaskInstance> getCompleteTaskByState(ExecutionStatus state){ |
|
|
|
List<TaskInstance> resultList = new ArrayList<>(); |
|
|
|
List<TaskInstance> resultList = new ArrayList<>(); |
|
|
|
Set<String> taskList = completeTaskList.keySet(); |
|
|
|
for (Map.Entry<String, TaskInstance> entry: completeTaskList.entrySet()) { |
|
|
|
for(String taskName : taskList){ |
|
|
|
if(entry.getValue().getState() == state){ |
|
|
|
TaskInstance taskInstance = completeTaskList.get(taskName); |
|
|
|
resultList.add(entry.getValue()); |
|
|
|
if(taskInstance.getState() == state){ |
|
|
|
|
|
|
|
resultList.add(taskInstance); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return resultList; |
|
|
|
return resultList; |
|
|
@ -766,10 +764,8 @@ public class MasterExecThread implements Runnable { |
|
|
|
* @return |
|
|
|
* @return |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
private Boolean hasRetryTaskInStandBy(){ |
|
|
|
private Boolean hasRetryTaskInStandBy(){ |
|
|
|
Set<String> taskNameSet = this.readyToSubmitTaskList.keySet(); |
|
|
|
for (Map.Entry<String, TaskInstance> entry: readyToSubmitTaskList.entrySet()) { |
|
|
|
for(String taskName : taskNameSet){ |
|
|
|
if(entry.getValue().getState().typeIsFailure()){ |
|
|
|
TaskInstance task = this.readyToSubmitTaskList.get(taskName); |
|
|
|
|
|
|
|
if(task.getState().typeIsFailure()){ |
|
|
|
|
|
|
|
return true; |
|
|
|
return true; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -791,16 +787,15 @@ public class MasterExecThread implements Runnable { |
|
|
|
processDao.findProcessDefineById(processInstance.getProcessDefinitionId())); |
|
|
|
processDao.findProcessDefineById(processInstance.getProcessDefinitionId())); |
|
|
|
sendTimeWarning = true; |
|
|
|
sendTimeWarning = true; |
|
|
|
} |
|
|
|
} |
|
|
|
Set<MasterBaseTaskExecThread> keys = activeTaskNode.keySet(); |
|
|
|
for(Map.Entry<MasterBaseTaskExecThread,Future<Boolean>> entry: activeTaskNode.entrySet()) { |
|
|
|
for (MasterBaseTaskExecThread taskExecThread : keys) { |
|
|
|
Future<Boolean> future = entry.getValue(); |
|
|
|
Future<Boolean> future = activeTaskNode.get(taskExecThread); |
|
|
|
TaskInstance task = entry.getKey().getTaskInstance(); |
|
|
|
TaskInstance task = taskExecThread.getTaskInstance(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if(!future.isDone()){ |
|
|
|
if(!future.isDone()){ |
|
|
|
continue; |
|
|
|
continue; |
|
|
|
} |
|
|
|
} |
|
|
|
// node monitor thread complete
|
|
|
|
// node monitor thread complete
|
|
|
|
activeTaskNode.remove(taskExecThread); |
|
|
|
activeTaskNode.remove(entry.getKey()); |
|
|
|
if(task == null){ |
|
|
|
if(task == null){ |
|
|
|
this.taskFailedSubmit = true; |
|
|
|
this.taskFailedSubmit = true; |
|
|
|
continue; |
|
|
|
continue; |
|
|
@ -842,11 +837,11 @@ public class MasterExecThread implements Runnable { |
|
|
|
// failure priority is higher than pause
|
|
|
|
// failure priority is higher than pause
|
|
|
|
// if a task fails, other suspended tasks need to be reset kill
|
|
|
|
// if a task fails, other suspended tasks need to be reset kill
|
|
|
|
if(errorTaskList.size() > 0){ |
|
|
|
if(errorTaskList.size() > 0){ |
|
|
|
for(String taskName : completeTaskList.keySet()){ |
|
|
|
for(Map.Entry<String, TaskInstance> entry: completeTaskList.entrySet()) { |
|
|
|
TaskInstance completeTask = completeTaskList.get(taskName); |
|
|
|
TaskInstance completeTask = entry.getValue(); |
|
|
|
if(completeTask.getState()== ExecutionStatus.PAUSE){ |
|
|
|
if(completeTask.getState()== ExecutionStatus.PAUSE){ |
|
|
|
completeTask.setState(ExecutionStatus.KILL); |
|
|
|
completeTask.setState(ExecutionStatus.KILL); |
|
|
|
completeTaskList.put(taskName, completeTask); |
|
|
|
completeTaskList.put(entry.getKey(), completeTask); |
|
|
|
processDao.updateTaskInstance(completeTask); |
|
|
|
processDao.updateTaskInstance(completeTask); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -938,9 +933,8 @@ public class MasterExecThread implements Runnable { |
|
|
|
* handling the list of tasks to be submitted |
|
|
|
* handling the list of tasks to be submitted |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
private void submitStandByTask(){ |
|
|
|
private void submitStandByTask(){ |
|
|
|
Set<String> readySubmitTaskNames = readyToSubmitTaskList.keySet(); |
|
|
|
for(Map.Entry<String, TaskInstance> entry: readyToSubmitTaskList.entrySet()) { |
|
|
|
for(String readySubmitTaskName : readySubmitTaskNames){ |
|
|
|
TaskInstance task = entry.getValue(); |
|
|
|
TaskInstance task = readyToSubmitTaskList.get(readySubmitTaskName); |
|
|
|
|
|
|
|
DependResult dependResult = getDependResultForTask(task); |
|
|
|
DependResult dependResult = getDependResultForTask(task); |
|
|
|
if(DependResult.SUCCESS == dependResult){ |
|
|
|
if(DependResult.SUCCESS == dependResult){ |
|
|
|
if(retryTaskIntervalOverTime(task)){ |
|
|
|
if(retryTaskIntervalOverTime(task)){ |
|
|
@ -949,7 +943,7 @@ public class MasterExecThread implements Runnable { |
|
|
|
} |
|
|
|
} |
|
|
|
}else if(DependResult.FAILED == dependResult){ |
|
|
|
}else if(DependResult.FAILED == dependResult){ |
|
|
|
// if the dependency fails, the current node is not submitted and the state changes to failure.
|
|
|
|
// if the dependency fails, the current node is not submitted and the state changes to failure.
|
|
|
|
dependFailedTask.put(readySubmitTaskName, task); |
|
|
|
dependFailedTask.put(entry.getKey(), task); |
|
|
|
removeTaskFromStandbyList(task); |
|
|
|
removeTaskFromStandbyList(task); |
|
|
|
logger.info("task {},id:{} depend result : {}",task.getName(), task.getId(), dependResult); |
|
|
|
logger.info("task {},id:{} depend result : {}",task.getName(), task.getId(), dependResult); |
|
|
|
} |
|
|
|
} |
|
|
|