diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java index a864c61cd4..a9b8291b2f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java @@ -115,75 +115,58 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH); return result; } - List processTaskRelations = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, postTaskCode); + updateProcessDefiniteVersion(loginUser, result, processDefinition); + List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); if (!processTaskRelations.isEmpty()) { - Map preTaskCodeMap = processTaskRelations.stream() + Map preTaskCodeMap = processTaskRelations.stream().filter(r -> r.getPostTaskCode() == postTaskCode) .collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode, processTaskRelation -> processTaskRelation)); - if (preTaskCodeMap.containsKey(preTaskCode) || (!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) { - putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, processDefinitionCode); - return result; - } - if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) { - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(preTaskCodeMap.get(0L)); - // delete no upstream - int delete = processTaskRelationMapper.deleteRelation(processTaskRelationLog); - int deleteLog = processTaskRelationLogMapper.deleteRelation(processTaskRelationLog); - if ((delete & deleteLog) == 0) { - putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); - throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); + if (!preTaskCodeMap.isEmpty()) { + if (preTaskCodeMap.containsKey(preTaskCode) || (!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) { + putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, processDefinitionCode); + return result; + } + if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) { + // delete no upstream + processTaskRelations.remove(preTaskCodeMap.get(0L)); } } } - updateProcessDefiniteVersion(loginUser, result, processDefinition); - Date now = new Date(); - List processTaskRelationLogs = new ArrayList<>(); + TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode); + ProcessTaskRelation processTaskRelation = setRelation(processDefinition, postTaskDefinition); if (preTaskCode != 0L) { - // upstream is or not exist - List upstreamProcessTaskRelations = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, preTaskCode); TaskDefinition preTaskDefinition = taskDefinitionMapper.queryByCode(preTaskCode); - if (upstreamProcessTaskRelations.isEmpty()) { - ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), preTaskDefinition); - processTaskRelationLog.setPreTaskCode(0L); - processTaskRelationLog.setPreTaskVersion(0); - processTaskRelationLogs.add(processTaskRelationLog); + List upstreamTaskRelationList = processTaskRelations.stream().filter(r -> r.getPostTaskCode() == preTaskCode).collect(Collectors.toList()); + // upstream is or not exist + if (upstreamTaskRelationList.isEmpty()) { + ProcessTaskRelation preProcessTaskRelation = setRelation(processDefinition, preTaskDefinition); + preProcessTaskRelation.setPreTaskCode(0L); + preProcessTaskRelation.setPreTaskVersion(0); + processTaskRelations.add(preProcessTaskRelation); } - TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode); - ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), postTaskDefinition); - processTaskRelationLog.setPreTaskCode(preTaskDefinition.getCode()); - processTaskRelationLog.setPreTaskVersion(preTaskDefinition.getVersion()); - processTaskRelationLogs.add(processTaskRelationLog); - } else { - TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode); - ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), postTaskDefinition); - processTaskRelationLog.setPreTaskCode(0L); - processTaskRelationLog.setPreTaskVersion(0); - processTaskRelationLogs.add(processTaskRelationLog); - } - int insert = processTaskRelationMapper.batchInsert(processTaskRelationLogs); - int insertLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogs); - if ((insert & insertLog) > 0) { - putMsg(result, Status.SUCCESS); + processTaskRelation.setPreTaskCode(preTaskDefinition.getCode()); + processTaskRelation.setPreTaskVersion(preTaskDefinition.getVersion()); } else { - putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); - throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); + processTaskRelation.setPreTaskCode(0L); + processTaskRelation.setPreTaskVersion(0); } + processTaskRelations.add(processTaskRelation); + updateRelation(loginUser, result, processDefinition, processTaskRelations); return result; } - private ProcessTaskRelationLog setRelationLog(ProcessDefinition processDefinition, Date now, int userId, TaskDefinition taskDefinition) { - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); - processTaskRelationLog.setProjectCode(processDefinition.getProjectCode()); - processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode()); - processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion()); - processTaskRelationLog.setPostTaskCode(taskDefinition.getCode()); - processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion()); - processTaskRelationLog.setConditionType(ConditionType.NONE); - processTaskRelationLog.setConditionParams("{}"); - processTaskRelationLog.setCreateTime(now); - processTaskRelationLog.setUpdateTime(now); - processTaskRelationLog.setOperator(userId); - processTaskRelationLog.setOperateTime(now); - return processTaskRelationLog; + private ProcessTaskRelation setRelation(ProcessDefinition processDefinition, TaskDefinition taskDefinition) { + Date now = new Date(); + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); + processTaskRelation.setProjectCode(processDefinition.getProjectCode()); + processTaskRelation.setProcessDefinitionCode(processDefinition.getCode()); + processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion()); + processTaskRelation.setPostTaskCode(taskDefinition.getCode()); + processTaskRelation.setPostTaskVersion(taskDefinition.getVersion()); + processTaskRelation.setConditionType(ConditionType.NONE); + processTaskRelation.setConditionParams("{}"); + processTaskRelation.setCreateTime(now); + processTaskRelation.setUpdateTime(now); + return processTaskRelation; } private void updateProcessDefiniteVersion(User loginUser, Map result, ProcessDefinition processDefinition) { @@ -245,6 +228,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(downstreamList, ",")); return result; } + updateProcessDefiniteVersion(loginUser, result, processDefinition); updateRelation(loginUser, result, processDefinition, processTaskRelationList); if (TaskType.CONDITIONS.getDesc().equals(taskDefinition.getTaskType()) || TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType()) @@ -261,7 +245,6 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P private void updateRelation(User loginUser, Map result, ProcessDefinition processDefinition, List processTaskRelationList) { - updateProcessDefiniteVersion(loginUser, result, processDefinition); List relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion(), relationLogs, Lists.newArrayList(), Boolean.TRUE); @@ -331,6 +314,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P } } processTaskRelationList.removeAll(processTaskRelationWaitRemove); + updateProcessDefiniteVersion(loginUser, result, processDefinition); updateRelation(loginUser, result, processDefinition, processTaskRelationList); return result; } @@ -374,6 +358,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P } List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode()); processTaskRelationList.removeIf(processTaskRelation -> postTaskCodeList.contains(processTaskRelation.getPostTaskCode()) && processTaskRelation.getPreTaskCode() == taskCode); + updateProcessDefiniteVersion(loginUser, result, processDefinition); updateRelation(loginUser, result, processDefinition, processTaskRelationList); return result; } @@ -511,6 +496,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P processTaskRelation.setPreTaskCode(0L); processTaskRelationList.add(processTaskRelation); } + updateProcessDefiniteVersion(loginUser, result, processDefinition); updateRelation(loginUser, result, processDefinition, processTaskRelationList); return result; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 2ed0f61a08..199670d325 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -28,9 +28,9 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.ReleaseState; -import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; @@ -39,7 +39,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; @@ -67,6 +66,7 @@ import org.springframework.transaction.annotation.Transactional; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.google.common.collect.Lists; /** * task definition service impl @@ -93,9 +93,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Autowired private ProcessTaskRelationMapper processTaskRelationMapper; - @Autowired - private ProcessTaskRelationLogMapper processTaskRelationLogMapper; - @Autowired private ProcessDefinitionMapper processDefinitionMapper; @@ -218,17 +215,13 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe if (delete > 0) { List taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); if (!taskRelationList.isEmpty()) { - int deleteRelation = 0; - for (ProcessTaskRelation processTaskRelation : taskRelationList) { - deleteRelation += processTaskRelationMapper.deleteById(processTaskRelation.getId()); - } - if (deleteRelation == 0) { - throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR); - } long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); - updateProcessDefiniteVersion(loginUser, processDefinitionCode); + List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + List relationList = processTaskRelations.stream().filter(r -> r.getPostTaskCode() != taskCode).collect(Collectors.toList()); + updateDag(loginUser, result, processDefinitionCode, relationList, Lists.newArrayList()); + } else { + putMsg(result, Status.SUCCESS); } - putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); @@ -236,7 +229,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } - private int updateProcessDefiniteVersion(User loginUser, long processDefinitionCode) { + private void updateDag(User loginUser, Map result, long processDefinitionCode, List processTaskRelationList, + List taskDefinitionLogs) { ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); if (processDefinition == null) { throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST); @@ -245,7 +239,16 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe if (insertVersion <= 0) { throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } - return insertVersion; + List relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); + int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), + insertVersion, relationLogs, taskDefinitionLogs, Boolean.TRUE); + if (insertResult == Constants.EXIT_CODE_SUCCESS) { + putMsg(result, Status.SUCCESS); + result.put(Constants.DATA_LIST, processDefinition); + } else { + putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); + throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); + } } /** @@ -308,54 +311,28 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); } - handleRelation(loginUser, taskCode, version, now); + List taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); + if (!taskRelationList.isEmpty()) { + long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); + List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + updateDag(loginUser, result, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate)); + } else { + putMsg(result, Status.SUCCESS); + } result.put(Constants.DATA_LIST, taskCode); putMsg(result, Status.SUCCESS, update); return result; } - private void handleRelation(User loginUser, long taskCode, Integer version, Date now) { - List processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode); - if (!processTaskRelationList.isEmpty()) { - long processDefinitionCode = processTaskRelationList.get(0).getProcessDefinitionCode(); - int definiteVersion = updateProcessDefiniteVersion(loginUser, processDefinitionCode); - List processTaskRelationLogList = new ArrayList<>(); - int delete = 0; - for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); - delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog); - if (processTaskRelationLog.getPreTaskCode() == taskCode) { - processTaskRelationLog.setPreTaskVersion(version); - } - if (processTaskRelationLog.getPostTaskCode() == taskCode) { - processTaskRelationLog.setPostTaskVersion(version); - } - processTaskRelationLog.setProcessDefinitionVersion(definiteVersion); - processTaskRelationLog.setOperator(loginUser.getId()); - processTaskRelationLog.setOperateTime(now); - processTaskRelationLog.setUpdateTime(now); - processTaskRelationLogList.add(processTaskRelationLog); - } - if (delete == 0) { - throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR); - } else { - int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList); - int insertRelationLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogList); - if ((insertRelation & insertRelationLog) == 0) { - throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); - } - } - } - } - /** - * update task definition + * switch task definition * * @param loginUser login user * @param projectCode project code * @param taskCode task code * @param version the version user want to switch */ + @Transactional(rollbackFor = RuntimeException.class) @Override public Map switchVersion(User loginUser, long projectCode, long taskCode, int version) { Project project = projectMapper.queryByCode(projectCode); @@ -375,14 +352,18 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } TaskDefinitionLog taskDefinitionUpdate = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version); taskDefinitionUpdate.setUserId(loginUser.getId()); - Date now = new Date(); taskDefinitionUpdate.setUpdateTime(new Date()); taskDefinitionUpdate.setId(taskDefinition.getId()); int switchVersion = taskDefinitionMapper.updateById(taskDefinitionUpdate); if (switchVersion > 0) { - handleRelation(loginUser, taskCode, version, now); - result.put(Constants.DATA_LIST, taskCode); - putMsg(result, Status.SUCCESS); + List taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); + if (!taskRelationList.isEmpty()) { + long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); + List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + updateDag(loginUser, result, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionUpdate)); + } else { + putMsg(result, Status.SUCCESS); + } } else { putMsg(result, Status.SWITCH_TASK_DEFINITION_VERSION_ERROR); }