|
|
|
@ -455,7 +455,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
} |
|
|
|
|
TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance); |
|
|
|
|
if (newTaskInstance == null) { |
|
|
|
|
logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}", |
|
|
|
|
logger.error("Retry task fail because new taskInstance is null, task code:{}, task id:{}", |
|
|
|
|
taskInstance.getTaskCode(), |
|
|
|
|
taskInstance.getId()); |
|
|
|
|
return; |
|
|
|
@ -463,7 +463,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance); |
|
|
|
|
if (!taskInstance.retryTaskIntervalOverTime()) { |
|
|
|
|
logger.info( |
|
|
|
|
"failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}", |
|
|
|
|
"Failure task will be submitted, process id: {}, task instance code: {}, state: {}, retry times: {} / {}, interval: {}", |
|
|
|
|
processInstance.getId(), newTaskInstance.getTaskCode(), |
|
|
|
|
newTaskInstance.getState(), newTaskInstance.getRetryTimes(), newTaskInstance.getMaxRetryTimes(), |
|
|
|
|
newTaskInstance.getRetryInterval()); |
|
|
|
@ -785,7 +785,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
ProcessDag processDag = generateFlowDag(taskNodeList, startNodeNameList, recoveryNodeCodeList, |
|
|
|
|
processInstance.getTaskDependType()); |
|
|
|
|
if (processDag == null) { |
|
|
|
|
logger.error("processDag is null"); |
|
|
|
|
logger.error("ProcessDag is null"); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// generate process dag
|
|
|
|
@ -818,6 +818,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
task.getTaskCode(), |
|
|
|
|
task.getState()); |
|
|
|
|
if (validTaskMap.containsKey(task.getTaskCode())) { |
|
|
|
|
logger.warn("Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}", |
|
|
|
|
task.getTaskCode()); |
|
|
|
|
int oldTaskInstanceId = validTaskMap.get(task.getTaskCode()); |
|
|
|
|
TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId); |
|
|
|
|
if (!oldTaskInstance.getState().isFinished() && task.getState().isFinished()) { |
|
|
|
@ -825,14 +827,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
processService.updateTaskInstance(task); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}", |
|
|
|
|
task.getTaskCode()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
validTaskMap.put(task.getTaskCode(), task.getId()); |
|
|
|
|
taskInstanceMap.put(task.getId(), task); |
|
|
|
|
|
|
|
|
|
if (task.isTaskComplete()) { |
|
|
|
|
logger.info("TaskInstance is already complete."); |
|
|
|
|
completeTaskMap.put(task.getTaskCode(), task.getId()); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
@ -842,6 +843,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
} |
|
|
|
|
if (task.taskCanRetry()) { |
|
|
|
|
if (task.getState().isNeedFaultTolerance()) { |
|
|
|
|
logger.info("TaskInstance needs fault tolerance, will be added to standby list."); |
|
|
|
|
task.setFlag(Flag.NO); |
|
|
|
|
processService.updateTaskInstance(task); |
|
|
|
|
|
|
|
|
@ -849,6 +851,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
TaskInstance tolerantTaskInstance = cloneTolerantTaskInstance(task); |
|
|
|
|
addTaskToStandByList(tolerantTaskInstance); |
|
|
|
|
} else { |
|
|
|
|
logger.info("Retry taskInstance, taskState: {}", task.getState()); |
|
|
|
|
retryTaskInstance(task); |
|
|
|
|
} |
|
|
|
|
continue; |
|
|
|
@ -930,15 +933,14 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
|
|
|
|
|
boolean submit = taskProcessor.action(TaskAction.SUBMIT); |
|
|
|
|
if (!submit) { |
|
|
|
|
logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!", |
|
|
|
|
processInstance.getId(), |
|
|
|
|
processInstance.getName(), |
|
|
|
|
taskInstance.getId(), |
|
|
|
|
logger.error("Submit standby task failed!, taskCode: {}, taskName: {}", |
|
|
|
|
taskInstance.getTaskCode(), |
|
|
|
|
taskInstance.getName()); |
|
|
|
|
return Optional.empty(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// in a dag, only one taskInstance is valid per taskCode, so need to set the old taskInstance invalid
|
|
|
|
|
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId()); |
|
|
|
|
if (validTaskMap.containsKey(taskInstance.getTaskCode())) { |
|
|
|
|
int oldTaskInstanceId = validTaskMap.get(taskInstance.getTaskCode()); |
|
|
|
|
if (taskInstance.getId() != oldTaskInstanceId) { |
|
|
|
@ -965,19 +967,16 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
taskInstance.getProcessInstanceId(), |
|
|
|
|
taskInstance.getTaskGroupPriority()); |
|
|
|
|
if (!acquireTaskGroup) { |
|
|
|
|
logger.info("submit task name :{}, but the first time to try to acquire task group failed", |
|
|
|
|
taskInstance.getName()); |
|
|
|
|
logger.info("Submitted task will not be dispatch right now because the first time to try to acquire" + |
|
|
|
|
" task group failed, taskInstanceName: {}, taskGroupId: {}", |
|
|
|
|
taskInstance.getName(), taskGroupId); |
|
|
|
|
return Optional.of(taskInstance); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH); |
|
|
|
|
if (!dispatchSuccess) { |
|
|
|
|
logger.error("process id:{} name:{} dispatch standby task id:{} name:{} failed!", |
|
|
|
|
processInstance.getId(), |
|
|
|
|
processInstance.getName(), |
|
|
|
|
taskInstance.getId(), |
|
|
|
|
taskInstance.getName()); |
|
|
|
|
logger.error("Dispatch standby process {} task {} failed", processInstance.getName(), taskInstance.getName()); |
|
|
|
|
return Optional.empty(); |
|
|
|
|
} |
|
|
|
|
taskProcessor.action(TaskAction.RUN); |
|
|
|
@ -1005,11 +1004,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
} |
|
|
|
|
return Optional.of(taskInstance); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
logger.error("submit standby task error, taskCode: {}, taskInstanceId: {}", |
|
|
|
|
taskInstance.getTaskCode(), |
|
|
|
|
taskInstance.getId(), |
|
|
|
|
e); |
|
|
|
|
logger.error("Submit standby task {} error, taskCode: {}", taskInstance.getName(), |
|
|
|
|
taskInstance.getTaskCode(), e); |
|
|
|
|
return Optional.empty(); |
|
|
|
|
} finally { |
|
|
|
|
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1072,7 +1071,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) { |
|
|
|
|
TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode())); |
|
|
|
|
if (taskNode == null) { |
|
|
|
|
logger.error("taskNode is null, code:{}", taskInstance.getTaskCode()); |
|
|
|
|
logger.error("Clone retry taskInstance error because taskNode is null, taskCode:{}", |
|
|
|
|
taskInstance.getTaskCode()); |
|
|
|
|
return null; |
|
|
|
|
} |
|
|
|
|
TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode); |
|
|
|
@ -1099,7 +1099,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) { |
|
|
|
|
TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode())); |
|
|
|
|
if (taskNode == null) { |
|
|
|
|
logger.error("taskNode is null, code:{}", taskInstance.getTaskCode()); |
|
|
|
|
logger.error("Clone tolerant taskInstance error because taskNode is null, taskCode:{}", |
|
|
|
|
taskInstance.getTaskCode()); |
|
|
|
|
return null; |
|
|
|
|
} |
|
|
|
|
TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode); |
|
|
|
@ -1340,11 +1341,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (task.getId() != null && completeTaskMap.containsKey(task.getTaskCode())) { |
|
|
|
|
logger.info("task {} has already run success", task.getName()); |
|
|
|
|
logger.info("Task has already run success, taskName: {}", task.getName()); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
if (task.getState().isKill()) { |
|
|
|
|
logger.info("task {} stopped, the state is {}", task.getName(), task.getState()); |
|
|
|
|
logger.info("Task is be stopped, the state is {}, taskInstanceId: {}", task.getState(), task.getId()); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1396,8 +1397,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
logger.info("taskCode: {} completeDependTaskList: {}", taskCode, |
|
|
|
|
Arrays.toString(completeTaskMap.keySet().toArray())); |
|
|
|
|
logger.info("The dependTasks of task all success, currentTaskCode: {}, dependTaskCodes: {}", |
|
|
|
|
taskCode, Arrays.toString(completeTaskMap.keySet().toArray())); |
|
|
|
|
return DependResult.SUCCESS; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1429,6 +1430,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
List<String> nextTaskList = |
|
|
|
|
DagHelper.parseConditionTask(dependNodeName, skipTaskNodeMap, dag, getCompleteTaskInstanceMap()); |
|
|
|
|
if (!nextTaskList.contains(nextNodeName)) { |
|
|
|
|
logger.info("DependTask is a condition task, and its next condition branch does not hava current task, " + |
|
|
|
|
"dependTaskCode: {}, currentTaskCode: {}", dependNodeName, nextNodeName |
|
|
|
|
); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
@ -1710,10 +1714,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
*/ |
|
|
|
|
public void addTaskToStandByList(TaskInstance taskInstance) { |
|
|
|
|
if (readyToSubmitTaskQueue.contains(taskInstance)) { |
|
|
|
|
logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode()); |
|
|
|
|
logger.warn("Task already exists in ready submit queue, no need to add again, task code:{}", |
|
|
|
|
taskInstance.getTaskCode()); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}", |
|
|
|
|
logger.info("Add task to stand by list, task name:{}, task id:{}, task code:{}", |
|
|
|
|
taskInstance.getName(), |
|
|
|
|
taskInstance.getId(), |
|
|
|
|
taskInstance.getTaskCode()); |
|
|
|
@ -1798,8 +1803,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
TaskInstance retryTask = processService.findTaskInstanceById(task.getId()); |
|
|
|
|
if (retryTask != null && retryTask.getState().isForceSuccess()) { |
|
|
|
|
task.setState(retryTask.getState()); |
|
|
|
|
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", |
|
|
|
|
task.getName()); |
|
|
|
|
logger.info("Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}", |
|
|
|
|
task.getName(), task.getId()); |
|
|
|
|
removeTaskFromStandbyList(task); |
|
|
|
|
completeTaskMap.put(task.getTaskCode(), task.getId()); |
|
|
|
|
taskInstanceMap.put(task.getId(), task); |
|
|
|
@ -1815,6 +1820,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
} |
|
|
|
|
DependResult dependResult = getDependResultForTask(task); |
|
|
|
|
if (DependResult.SUCCESS == dependResult) { |
|
|
|
|
logger.info("The dependResult of task {} is success, so ready to submit to execute", task.getName()); |
|
|
|
|
Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task); |
|
|
|
|
if (!taskInstanceOptional.isPresent()) { |
|
|
|
|
this.taskFailedSubmit = true; |
|
|
|
|