diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java index d00a05282c..9e59c3932c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java @@ -60,6 +60,8 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import springfox.documentation.annotations.ApiIgnore; +import org.apache.commons.lang3.StringUtils; + /** * task definition controller */ @@ -121,7 +123,7 @@ public class TaskDefinitionController extends BaseController { @RequestParam(value = "processDefinitionCode", required = true) long processDefinitionCode, @RequestParam(value = "taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj, @RequestParam(value = "upstreamCodes", required = false) String upstreamCodes) { - Map result = taskDefinitionService.createTaskBindsWorkFlow(loginUser, projectCode, processDefinitionCode, taskDefinitionJsonObj, upstreamCodes); + Map result = taskDefinitionService.createTaskBindsWorkFlow(loginUser, projectCode, processDefinitionCode, taskDefinitionJsonObj, StringUtils.defaultString(upstreamCodes)); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java index a864c61cd4..5761584a72 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java @@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; @@ -72,9 +71,6 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P @Autowired private ProcessTaskRelationMapper processTaskRelationMapper; - @Autowired - private ProcessTaskRelationLogMapper processTaskRelationLogMapper; - @Autowired private TaskDefinitionLogMapper taskDefinitionLogMapper; @@ -115,75 +111,59 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH); return result; } - List processTaskRelations = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, postTaskCode); + updateProcessDefiniteVersion(loginUser, result, processDefinition); + List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + List processTaskRelations = Lists.newArrayList(processTaskRelationList); if (!processTaskRelations.isEmpty()) { - Map preTaskCodeMap = processTaskRelations.stream() + Map preTaskCodeMap = processTaskRelations.stream().filter(r -> r.getPostTaskCode() == postTaskCode) .collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode, processTaskRelation -> processTaskRelation)); - if (preTaskCodeMap.containsKey(preTaskCode) || (!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) { - putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, processDefinitionCode); - return result; - } - if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) { - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(preTaskCodeMap.get(0L)); - // delete no upstream - int delete = processTaskRelationMapper.deleteRelation(processTaskRelationLog); - int deleteLog = processTaskRelationLogMapper.deleteRelation(processTaskRelationLog); - if ((delete & deleteLog) == 0) { - putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); - throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); + if (!preTaskCodeMap.isEmpty()) { + if (preTaskCodeMap.containsKey(preTaskCode) || (!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) { + putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, processDefinitionCode); + return result; + } + if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) { + // delete no upstream + processTaskRelations.remove(preTaskCodeMap.get(0L)); } } } - updateProcessDefiniteVersion(loginUser, result, processDefinition); - Date now = new Date(); - List processTaskRelationLogs = new ArrayList<>(); + TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode); + ProcessTaskRelation processTaskRelation = setRelation(processDefinition, postTaskDefinition); if (preTaskCode != 0L) { - // upstream is or not exist - List upstreamProcessTaskRelations = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, preTaskCode); TaskDefinition preTaskDefinition = taskDefinitionMapper.queryByCode(preTaskCode); - if (upstreamProcessTaskRelations.isEmpty()) { - ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), preTaskDefinition); - processTaskRelationLog.setPreTaskCode(0L); - processTaskRelationLog.setPreTaskVersion(0); - processTaskRelationLogs.add(processTaskRelationLog); + List upstreamTaskRelationList = processTaskRelations.stream().filter(r -> r.getPostTaskCode() == preTaskCode).collect(Collectors.toList()); + // upstream is or not exist + if (upstreamTaskRelationList.isEmpty()) { + ProcessTaskRelation preProcessTaskRelation = setRelation(processDefinition, preTaskDefinition); + preProcessTaskRelation.setPreTaskCode(0L); + preProcessTaskRelation.setPreTaskVersion(0); + processTaskRelations.add(preProcessTaskRelation); } - TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode); - ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), postTaskDefinition); - processTaskRelationLog.setPreTaskCode(preTaskDefinition.getCode()); - processTaskRelationLog.setPreTaskVersion(preTaskDefinition.getVersion()); - processTaskRelationLogs.add(processTaskRelationLog); - } else { - TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode); - ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), postTaskDefinition); - processTaskRelationLog.setPreTaskCode(0L); - processTaskRelationLog.setPreTaskVersion(0); - processTaskRelationLogs.add(processTaskRelationLog); - } - int insert = processTaskRelationMapper.batchInsert(processTaskRelationLogs); - int insertLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogs); - if ((insert & insertLog) > 0) { - putMsg(result, Status.SUCCESS); + processTaskRelation.setPreTaskCode(preTaskDefinition.getCode()); + processTaskRelation.setPreTaskVersion(preTaskDefinition.getVersion()); } else { - putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); - throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); + processTaskRelation.setPreTaskCode(0L); + processTaskRelation.setPreTaskVersion(0); } + processTaskRelations.add(processTaskRelation); + updateRelation(loginUser, result, processDefinition, processTaskRelations); return result; } - private ProcessTaskRelationLog setRelationLog(ProcessDefinition processDefinition, Date now, int userId, TaskDefinition taskDefinition) { - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); - processTaskRelationLog.setProjectCode(processDefinition.getProjectCode()); - processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode()); - processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion()); - processTaskRelationLog.setPostTaskCode(taskDefinition.getCode()); - processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion()); - processTaskRelationLog.setConditionType(ConditionType.NONE); - processTaskRelationLog.setConditionParams("{}"); - processTaskRelationLog.setCreateTime(now); - processTaskRelationLog.setUpdateTime(now); - processTaskRelationLog.setOperator(userId); - processTaskRelationLog.setOperateTime(now); - return processTaskRelationLog; + private ProcessTaskRelation setRelation(ProcessDefinition processDefinition, TaskDefinition taskDefinition) { + Date now = new Date(); + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); + processTaskRelation.setProjectCode(processDefinition.getProjectCode()); + processTaskRelation.setProcessDefinitionCode(processDefinition.getCode()); + processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion()); + processTaskRelation.setPostTaskCode(taskDefinition.getCode()); + processTaskRelation.setPostTaskVersion(taskDefinition.getVersion()); + processTaskRelation.setConditionType(ConditionType.NONE); + processTaskRelation.setConditionParams("{}"); + processTaskRelation.setCreateTime(now); + processTaskRelation.setUpdateTime(now); + return processTaskRelation; } private void updateProcessDefiniteVersion(User loginUser, Map result, ProcessDefinition processDefinition) { @@ -227,7 +207,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); return result; } - List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + List processTaskRelationList = Lists.newArrayList(processTaskRelations); if (CollectionUtils.isEmpty(processTaskRelationList)) { putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList"); return result; @@ -245,6 +226,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(downstreamList, ",")); return result; } + updateProcessDefiniteVersion(loginUser, result, processDefinition); updateRelation(loginUser, result, processDefinition, processTaskRelationList); if (TaskType.CONDITIONS.getDesc().equals(taskDefinition.getTaskType()) || TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType()) @@ -261,7 +243,6 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P private void updateRelation(User loginUser, Map result, ProcessDefinition processDefinition, List processTaskRelationList) { - updateProcessDefiniteVersion(loginUser, result, processDefinition); List relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion(), relationLogs, Lists.newArrayList(), Boolean.TRUE); @@ -312,7 +293,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, upstreamList.get(0).getProcessDefinitionCode()); return result; } - List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode()); + List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode()); + List processTaskRelationList = Lists.newArrayList(processTaskRelations); List processTaskRelationWaitRemove = Lists.newArrayList(); for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { if (preTaskCodeList.size() > 1) { @@ -331,6 +313,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P } } processTaskRelationList.removeAll(processTaskRelationWaitRemove); + updateProcessDefiniteVersion(loginUser, result, processDefinition); updateRelation(loginUser, result, processDefinition, processTaskRelationList); return result; } @@ -372,8 +355,10 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, downstreamList.get(0).getProcessDefinitionCode()); return result; } - List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode()); + List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode()); + List processTaskRelationList = Lists.newArrayList(processTaskRelations); processTaskRelationList.removeIf(processTaskRelation -> postTaskCodeList.contains(processTaskRelation.getPostTaskCode()) && processTaskRelation.getPreTaskCode() == taskCode); + updateProcessDefiniteVersion(loginUser, result, processDefinition); updateRelation(loginUser, result, processDefinition, processTaskRelationList); return result; } @@ -474,7 +459,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode); return result; } - List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + List processTaskRelationList = Lists.newArrayList(processTaskRelations); if (CollectionUtils.isEmpty(processTaskRelationList)) { putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList"); return result; @@ -511,6 +497,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P processTaskRelation.setPreTaskCode(0L); processTaskRelationList.add(processTaskRelation); } + updateProcessDefiniteVersion(loginUser, result, processDefinition); updateRelation(loginUser, result, processDefinition, processTaskRelationList); return result; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 8b07435dcc..54709697de 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -42,7 +42,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskMainInfo; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; @@ -50,6 +49,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.service.permission.PermissionCheck; import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; @@ -97,14 +97,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe private ProcessTaskRelationMapper processTaskRelationMapper; @Autowired - private ProcessTaskRelationLogMapper processTaskRelationLogMapper; + private ProcessDefinitionMapper processDefinitionMapper; @Autowired private ProcessService processService; - @Autowired - private ProcessDefinitionMapper processDefinitionMapper; - /** * create task definition * @@ -314,17 +311,13 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe if (delete > 0) { List taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); if (!taskRelationList.isEmpty()) { - int deleteRelation = 0; - for (ProcessTaskRelation processTaskRelation : taskRelationList) { - deleteRelation += processTaskRelationMapper.deleteById(processTaskRelation.getId()); - } - if (deleteRelation == 0) { - throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR); - } long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); - updateProcessDefiniteVersion(loginUser, processDefinitionCode); + List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + List relationList = processTaskRelations.stream().filter(r -> r.getPostTaskCode() != taskCode).collect(Collectors.toList()); + updateDag(loginUser, result, processDefinitionCode, relationList, Lists.newArrayList()); + } else { + putMsg(result, Status.SUCCESS); } - putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); @@ -332,7 +325,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } - private int updateProcessDefiniteVersion(User loginUser, long processDefinitionCode) { + private void updateDag(User loginUser, Map result, long processDefinitionCode, List processTaskRelationList, + List taskDefinitionLogs) { ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); if (processDefinition == null) { throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST); @@ -341,7 +335,16 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe if (insertVersion <= 0) { throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } - return insertVersion; + List relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); + int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), + insertVersion, relationLogs, taskDefinitionLogs, Boolean.TRUE); + if (insertResult == Constants.EXIT_CODE_SUCCESS) { + putMsg(result, Status.SUCCESS); + result.put(Constants.DATA_LIST, processDefinition); + } else { + putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); + throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); + } } /** @@ -356,50 +359,55 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Override public Map updateTaskDefinition(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj) { Map result = new HashMap<>(); - int version = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result); - if (version <= 0) { + TaskDefinitionLog taskDefinitionToUpdate = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result); + if (taskDefinitionToUpdate == null) { return result; } - handleRelation(loginUser, taskCode, version); + List taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); + if (!taskRelationList.isEmpty()) { + long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); + List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + updateDag(loginUser, result, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate)); + } result.put(Constants.DATA_LIST, taskCode); putMsg(result, Status.SUCCESS); return result; } - private int updateTask(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, Map result) { + private TaskDefinitionLog updateTask(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, Map result) { Project project = projectMapper.queryByCode(projectCode); //check user access for project result.putAll(projectService.checkProjectAndAuth(loginUser, project, projectCode)); if (result.get(Constants.STATUS) != Status.SUCCESS) { - return Constants.EXIT_CODE_FAILURE; + return null; } TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); if (taskDefinition == null) { putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); - return Constants.EXIT_CODE_FAILURE; + return null; } if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) { putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION); - return Constants.EXIT_CODE_FAILURE; + return null; } TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class); if (taskDefinition.equals(taskDefinitionToUpdate)) { - return taskDefinition.getVersion(); + return null; } if (taskDefinitionToUpdate == null) { logger.error("taskDefinitionJson is not valid json"); putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj); - return Constants.EXIT_CODE_FAILURE; + return null; } if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionToUpdate)) { logger.error("task definition {} parameter invalid", taskDefinitionToUpdate.getName()); putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionToUpdate.getName()); - return Constants.EXIT_CODE_FAILURE; + return null; } Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode); if (version == null || version == 0) { putMsg(result, Status.DATA_IS_NOT_VALID, taskCode); - return Constants.EXIT_CODE_FAILURE; + return null; } Date now = new Date(); taskDefinitionToUpdate.setCode(taskCode); @@ -419,42 +427,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); } - return version; - } - - private void handleRelation(User loginUser, long taskCode, Integer version) { - List processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode); - if (!processTaskRelationList.isEmpty()) { - long processDefinitionCode = processTaskRelationList.get(0).getProcessDefinitionCode(); - int definiteVersion = updateProcessDefiniteVersion(loginUser, processDefinitionCode); - List processTaskRelationLogList = new ArrayList<>(); - int delete = 0; - Date now = new Date(); - for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); - delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog); - if (processTaskRelationLog.getPreTaskCode() == taskCode) { - processTaskRelationLog.setPreTaskVersion(version); - } - if (processTaskRelationLog.getPostTaskCode() == taskCode) { - processTaskRelationLog.setPostTaskVersion(version); - } - processTaskRelationLog.setProcessDefinitionVersion(definiteVersion); - processTaskRelationLog.setOperator(loginUser.getId()); - processTaskRelationLog.setOperateTime(now); - processTaskRelationLog.setUpdateTime(now); - processTaskRelationLogList.add(processTaskRelationLog); - } - if (delete == 0) { - throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR); - } else { - int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList); - int insertRelationLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogList); - if ((insertRelation & insertRelationLog) == 0) { - throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); - } - } - } + return taskDefinitionToUpdate; } /** @@ -470,13 +443,23 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Override public Map updateTaskWithUpstream(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, String upstreamCodes) { Map result = new HashMap<>(); - int version = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result); - if (version <= 0) { + TaskDefinitionLog taskDefinitionToUpdate = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result); + if (result.get(Constants.STATUS) != Status.SUCCESS && taskDefinitionToUpdate == null) { return result; } + List upstreamTaskRelations = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); + Set upstreamCodeSet = upstreamTaskRelations.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toSet()); + Set upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); + if (CollectionUtils.isEqualCollection(upstreamCodeSet, upstreamTaskCodes) && taskDefinitionToUpdate == null) { + putMsg(result, Status.SUCCESS); + return result; + } else { + if (taskDefinitionToUpdate == null) { + taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class); + } + } Map queryUpStreamTaskCodeMap; - if (StringUtils.isNotBlank(upstreamCodes)) { - Set upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); + if (!upstreamTaskCodes.isEmpty()) { List upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes); queryUpStreamTaskCodeMap = upstreamTaskDefinitionList.stream().collect(Collectors.toMap(TaskDefinition::getCode, taskDefinition -> taskDefinition)); // upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet @@ -488,76 +471,48 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } else { queryUpStreamTaskCodeMap = new HashMap<>(); } - List processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode); - if (!queryUpStreamTaskCodeMap.isEmpty() && processTaskRelationList.isEmpty()) { - putMsg(result, Status.PROCESS_TASK_RELATION_NOT_EXIST, StringUtils.join(queryUpStreamTaskCodeMap.keySet(), Constants.COMMA)); - throw new ServiceException(Status.PROCESS_TASK_RELATION_NOT_EXIST); - } - if (!processTaskRelationList.isEmpty()) { - long processDefinitionCode = processTaskRelationList.get(0).getProcessDefinitionCode(); - int definiteVersion = updateProcessDefiniteVersion(loginUser, processDefinitionCode); - List relationLogs = new ArrayList<>(); - Date now = new Date(); - int delete = 0; - int deleteLog = 0; + if (!upstreamTaskRelations.isEmpty()) { + ProcessTaskRelation taskRelation = upstreamTaskRelations.get(0); + List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, taskRelation.getProcessDefinitionCode()); + List processTaskRelationList = Lists.newArrayList(processTaskRelations); + List relationList = Lists.newArrayList(); for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); - delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog); - deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog); - processTaskRelationLog.setOperator(loginUser.getId()); - processTaskRelationLog.setOperateTime(now); - processTaskRelationLog.setUpdateTime(now); - if (processTaskRelationLog.getPreTaskCode() == taskCode) { - processTaskRelationLog.setPreTaskVersion(version); - } - if (processTaskRelationLog.getPostTaskCode() == taskCode) { - processTaskRelationLog.setPostTaskVersion(version); - TaskDefinition definition = queryUpStreamTaskCodeMap.remove(processTaskRelationLog.getPreTaskCode()); - if (definition == null) { - processTaskRelationLog.setPreTaskCode(0L); - processTaskRelationLog.setPreTaskVersion(0); + if (processTaskRelation.getPostTaskCode() == taskCode) { + if (queryUpStreamTaskCodeMap.containsKey(processTaskRelation.getPreTaskCode()) && processTaskRelation.getPreTaskCode() != 0L) { + queryUpStreamTaskCodeMap.remove(processTaskRelation.getPreTaskCode()); + } else { + processTaskRelation.setPreTaskCode(0L); + processTaskRelation.setPreTaskVersion(0); + relationList.add(processTaskRelation); } } - processTaskRelationLog.setProcessDefinitionVersion(definiteVersion); - relationLogs.add(processTaskRelationLog); - } - if ((delete & deleteLog) == 0) { - throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR); } - if (!queryUpStreamTaskCodeMap.isEmpty()) { - ProcessTaskRelationLog taskRelationLogDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(relationLogs.get(0)), ProcessTaskRelationLog.class); - assert taskRelationLogDeepCopy != null; - for (TaskDefinition upstreamTask : queryUpStreamTaskCodeMap.values()) { - taskRelationLogDeepCopy.setPreTaskCode(upstreamTask.getCode()); - taskRelationLogDeepCopy.setPreTaskVersion(upstreamTask.getVersion()); - relationLogs.add(taskRelationLogDeepCopy); - } + processTaskRelationList.removeAll(relationList); + for (Map.Entry queryUpStreamTask : queryUpStreamTaskCodeMap.entrySet()) { + taskRelation.setPreTaskCode(queryUpStreamTask.getKey()); + taskRelation.setPreTaskVersion(queryUpStreamTask.getValue().getVersion()); + processTaskRelationList.add(taskRelation); } - Map taskRelationLogMap = - relationLogs.stream().collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode, processTaskRelationLog -> processTaskRelationLog)); - if (taskRelationLogMap.containsKey(0L) && taskRelationLogMap.size() >= 3) { - taskRelationLogMap.remove(0L); - } - int insertRelation = processTaskRelationMapper.batchInsert(relationLogs); - int insertRelationLog = processTaskRelationLogMapper.batchInsert(relationLogs); - if ((insertRelation & insertRelationLog) == 0) { - putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); - throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); + if (queryUpStreamTaskCodeMap.isEmpty() && !processTaskRelationList.isEmpty()) { + processTaskRelationList.add(processTaskRelationList.get(0)); } + updateDag(loginUser, result, taskRelation.getProcessDefinitionCode(), processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate)); } result.put(Constants.DATA_LIST, taskCode); putMsg(result, Status.SUCCESS); return result; } + /** - * Switch task definition + * switch task definition * * @param loginUser login user * @param projectCode project code * @param taskCode task code * @param version the version user want to switch */ + @Transactional(rollbackFor = RuntimeException.class) @Override public Map switchVersion(User loginUser, long projectCode, long taskCode, int version) { Project project = projectMapper.queryByCode(projectCode); @@ -581,9 +536,14 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe taskDefinitionUpdate.setId(taskDefinition.getId()); int switchVersion = taskDefinitionMapper.updateById(taskDefinitionUpdate); if (switchVersion > 0) { - handleRelation(loginUser, taskCode, version); - result.put(Constants.DATA_LIST, taskCode); - putMsg(result, Status.SUCCESS); + List taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); + if (!taskRelationList.isEmpty()) { + long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); + List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + updateDag(loginUser, result, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionUpdate)); + } else { + putMsg(result, Status.SUCCESS); + } } else { putMsg(result, Status.SWITCH_TASK_DEFINITION_VERSION_ERROR); }