|
|
|
@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
|
|
|
|
|
import org.apache.dolphinscheduler.api.utils.Result; |
|
|
|
|
import org.apache.dolphinscheduler.common.Constants; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.Flag; |
|
|
|
|
import org.apache.dolphinscheduler.common.utils.CollectionUtils; |
|
|
|
|
import org.apache.dolphinscheduler.common.utils.DateUtils; |
|
|
|
|
import org.apache.dolphinscheduler.dao.entity.*; |
|
|
|
@ -177,13 +178,21 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ProcessInstance processInstance = processService.findProcessInstanceDetailById(task.getProcessInstanceId()); |
|
|
|
|
if (processInstance != null && processInstance.getState().typeIsFailure()) { |
|
|
|
|
if (processInstance != null && (processInstance.getState().typeIsFailure() || processInstance.getState().typeIsCancel())) { |
|
|
|
|
List<TaskInstance> validTaskList = processService.findValidTaskListByProcessId(processInstance.getId()); |
|
|
|
|
List<Integer> failTaskList = validTaskList.stream().filter(instance -> instance.getState().typeIsFailure()) |
|
|
|
|
List<Long> instanceTaskCodeList = validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList()); |
|
|
|
|
List<ProcessTaskRelation> taskRelations = processService.findRelationByCode(processInstance.getProcessDefinitionCode(), |
|
|
|
|
processInstance.getProcessDefinitionVersion()); |
|
|
|
|
List<TaskDefinitionLog> taskDefinitionLogs = processService.genTaskDefineList(taskRelations); |
|
|
|
|
List<Long> definiteTaskCodeList = taskDefinitionLogs.stream().filter(definitionLog -> definitionLog.getFlag() == Flag.YES) |
|
|
|
|
.map(TaskDefinitionLog::getCode).collect(Collectors.toList()); |
|
|
|
|
if (CollectionUtils.equalLists(instanceTaskCodeList, definiteTaskCodeList)) { |
|
|
|
|
List<Integer> failTaskList = validTaskList.stream().filter(instance -> instance.getState().typeIsFailure() || instance.getState().typeIsCancel()) |
|
|
|
|
.map(TaskInstance::getId).collect(Collectors.toList()); |
|
|
|
|
if (failTaskList.size() == 1 && failTaskList.contains(taskInstanceId)) { |
|
|
|
|
processInstance.setState(ExecutionStatus.SUCCESS); |
|
|
|
|
processService.updateProcessInstance(processInstance); |
|
|
|
|
if (failTaskList.size() == 1 && failTaskList.contains(taskInstanceId)) { |
|
|
|
|
processInstance.setState(ExecutionStatus.SUCCESS); |
|
|
|
|
processService.updateProcessInstance(processInstance); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// change the state of the task instance
|
|
|
|
|