diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java index 3cf41f820a..88254d19fe 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java @@ -291,6 +291,18 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P putMsg(result, Status.DATA_IS_NULL, "preTaskCodes"); return result; } + List currentUpstreamList = upstreamList.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toList()); + if (currentUpstreamList.contains(0L)) { + putMsg(result, Status.DATA_IS_NOT_VALID, "currentUpstreamList"); + return result; + } + List tmpCurrent = Lists.newArrayList(currentUpstreamList); + tmpCurrent.removeAll(preTaskCodeList); + preTaskCodeList.removeAll(currentUpstreamList); + if (!preTaskCodeList.isEmpty()) { + putMsg(result, Status.DATA_IS_NOT_VALID, StringUtils.join(preTaskCodeList, Constants.COMMA)); + return result; + } ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(upstreamList.get(0).getProcessDefinitionCode()); if (processDefinition == null) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, upstreamList.get(0).getProcessDefinitionCode()); @@ -300,20 +312,17 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P List processTaskRelationList = Lists.newArrayList(processTaskRelations); List processTaskRelationWaitRemove = Lists.newArrayList(); for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { - if (preTaskCodeList.size() > 1) { - if (preTaskCodeList.contains(processTaskRelation.getPreTaskCode())) { - preTaskCodeList.remove(processTaskRelation.getPreTaskCode()); + if (currentUpstreamList.size() > 1) { + if (currentUpstreamList.contains(processTaskRelation.getPreTaskCode())) { + currentUpstreamList.remove(processTaskRelation.getPreTaskCode()); processTaskRelationWaitRemove.add(processTaskRelation); } } else { - if (processTaskRelation.getPostTaskCode() == taskCode) { + if (processTaskRelation.getPostTaskCode() == taskCode && (currentUpstreamList.isEmpty() || tmpCurrent.isEmpty())) { processTaskRelation.setPreTaskVersion(0); processTaskRelation.setPreTaskCode(0L); } } - if (preTaskCodeList.contains(processTaskRelation.getPostTaskCode())) { - processTaskRelationWaitRemove.add(processTaskRelation); - } } processTaskRelationList.removeAll(processTaskRelationWaitRemove); updateProcessDefiniteVersion(loginUser, result, processDefinition);