From 35a1d732c64077580b109a49cb5de8a6246a2452 Mon Sep 17 00:00:00 2001 From: wind Date: Thu, 30 Dec 2021 17:54:39 +0800 Subject: [PATCH] [cherry-pick-2.0.2] fix depend task project 202 #7726 (#7727) * [DS-7654][ApiServer]fix dependent node on change projectCode error * test Co-authored-by: caishunfeng <534328519@qq.com> --- .../impl/ProcessDefinitionServiceImpl.java | 224 +++++++++--------- .../service/ProcessDefinitionServiceTest.java | 7 +- 2 files changed, 120 insertions(+), 111 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 20fa7264ce..9cfb9f9488 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -106,7 +106,6 @@ import org.springframework.web.multipart.MultipartFile; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; @@ -169,15 +168,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * create process definition * - * @param loginUser login user - * @param projectCode project code - * @param name process definition name - * @param description description - * @param globalParams global params - * @param locations locations for nodes - * @param timeout timeout - * @param tenantCode tenantCode - * @param taskRelationJson relation json for nodes + * @param loginUser login user + * @param projectCode project code + * @param name process definition name + * @param description description + * @param globalParams global params + * @param locations locations for nodes + * @param timeout timeout + * @param tenantCode tenantCode + * @param taskRelationJson relation json for nodes * @param taskDefinitionJson taskDefinitionJson * @return create result code */ @@ -233,7 +232,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description, - globalParams, locations, timeout, loginUser.getId(), tenantId); + globalParams, locations, timeout, loginUser.getId(), tenantId); return createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs); } @@ -299,8 +298,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } List processTaskRelations = taskRelationList.stream() - .map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class)) - .collect(Collectors.toList()); + .map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class)) + .collect(Collectors.toList()); List taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs); if (taskNodeList.size() != taskRelationList.size()) { Set postTaskCodes = taskRelationList.stream().map(ProcessTaskRelationLog::getPostTaskCode).collect(Collectors.toSet()); @@ -337,7 +336,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * query process definition list * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code * @return definition list */ @@ -359,7 +358,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * query process definition simple list * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code * @return definition simple list */ @@ -389,12 +388,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * query process definition list paging * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param searchVal search value - * @param userId user id - * @param pageNo page number - * @param pageSize page size + * @param searchVal search value + * @param userId user id + * @param pageNo page number + * @param pageSize page size * @return process definition page */ @Override @@ -411,7 +410,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro Page page = new Page<>(pageNo, pageSize); IPage processDefinitionIPage = processDefinitionMapper.queryDefineListPaging( - page, searchVal, userId, project.getCode(), isAdmin(loginUser)); + page, searchVal, userId, project.getCode(), isAdmin(loginUser)); List records = processDefinitionIPage.getRecords(); for (ProcessDefinition pd : records) { @@ -432,9 +431,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * query detail of process definition * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param code process definition code + * @param code process definition code * @return process definition detail */ @Override @@ -484,16 +483,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * update process definition * - * @param loginUser login user - * @param projectCode project code - * @param name process definition name - * @param code process definition code - * @param description description - * @param globalParams global params - * @param locations locations for nodes - * @param timeout timeout - * @param tenantCode tenantCode - * @param taskRelationJson relation json for nodes + * @param loginUser login user + * @param projectCode project code + * @param name process definition name + * @param code process definition code + * @param description description + * @param globalParams global params + * @param locations locations for nodes + * @param timeout timeout + * @param tenantCode tenantCode + * @param taskRelationJson relation json for nodes * @param taskDefinitionJson taskDefinitionJson * @return update result code */ @@ -588,7 +587,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), - processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); + processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); if (insertResult == Constants.EXIT_CODE_SUCCESS) { putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, processDefinition); @@ -602,9 +601,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * verify process definition name unique * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param name name + * @param name name * @return true if process definition name not exists, otherwise false */ @Override @@ -627,9 +626,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * delete process definition by code * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param code process definition code + * @param code process definition code * @return delete result code */ @Override @@ -697,9 +696,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * release process definition: online / offline * - * @param loginUser login user - * @param projectCode project code - * @param code process definition code + * @param loginUser login user + * @param projectCode project code + * @param code process definition code * @param releaseState release state * @return release result code */ @@ -838,9 +837,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * import process definition * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param file process metadata json file + * @param file process metadata json file * @return import process */ @Override @@ -1048,9 +1047,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * get task node details based on process definition * - * @param loginUser loginUser + * @param loginUser loginUser * @param projectCode project code - * @param code process definition code + * @param code process definition code * @return task node list */ @Override @@ -1062,11 +1061,19 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); - if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { + if (processDefinition == null) { logger.info("process define not exists"); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code); return result; } + HashMap userProjects = new HashMap<>(Constants.DEFAULT_HASH_MAP_SIZE); + projectMapper.queryProjectCreatedAndAuthorizedByUserId(loginUser.getId()) + .forEach(userProject -> userProjects.put(userProject.getCode(), userProject)); + if (!userProjects.containsKey(projectCode)) { + logger.info("process define not exists, project dismatch"); + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code); + return result; + } DagData dagData = processService.genDagData(processDefinition); result.put(Constants.DATA_LIST, dagData.getTaskDefinitionList()); putMsg(result, Status.SUCCESS); @@ -1077,9 +1084,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * get task node details map based on process definition * - * @param loginUser loginUser + * @param loginUser loginUser * @param projectCode project code - * @param codes define codes + * @param codes define codes * @return task node list */ @Override @@ -1098,13 +1105,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes); return result; } - HashMap userProjects = new HashMap<>(Constants.DEFAULT_HASH_MAP_SIZE); + HashMap userProjects = new HashMap<>(Constants.DEFAULT_HASH_MAP_SIZE); projectMapper.queryProjectCreatedAndAuthorizedByUserId(loginUser.getId()) - .forEach(userProject -> userProjects.put(userProject.getCode(), userProject)); + .forEach(userProject -> userProjects.put(userProject.getCode(), userProject)); // check processDefinition exist in project List processDefinitionListInProject = processDefinitionList.stream() - .filter(o -> userProjects.containsKey(o.getProjectCode())).collect(Collectors.toList()); + .filter(o -> userProjects.containsKey(o.getProjectCode())).collect(Collectors.toList()); if (CollectionUtils.isEmpty(processDefinitionListInProject)) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes); return result; @@ -1125,7 +1132,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * query process definition all by project code * - * @param loginUser loginUser + * @param loginUser loginUser * @param projectCode project code * @return process definitions in the project */ @@ -1148,7 +1155,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * Encapsulates the TreeView structure * * @param projectCode project code - * @param code process definition code + * @param code process definition code * @param limit limit * @return tree view json data */ @@ -1173,7 +1180,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processInstanceList.forEach(processInstance -> processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime()))); List taskDefinitionList = processService.genTaskDefineList(processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode())); Map taskDefinitionMap = taskDefinitionList.stream() - .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); + .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); if (limit > processInstanceList.size()) { limit = processInstanceList.size(); @@ -1188,8 +1195,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro ProcessInstance processInstance = processInstanceList.get(i); Date endTime = processInstance.getEndTime() == null ? new Date() : processInstance.getEndTime(); parentTreeViewDto.getInstances().add(new Instance(processInstance.getId(), processInstance.getName(), processInstance.getProcessDefinitionCode(), - "", processInstance.getState().toString(), processInstance.getStartTime(), endTime, processInstance.getHost(), - DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime()))); + "", processInstance.getState().toString(), processInstance.getStartTime(), endTime, processInstance.getHost(), + DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime()))); } List parentTreeViewDtoList = new ArrayList<>(); @@ -1227,11 +1234,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro if (taskInstance.isSubProcess()) { TaskDefinition taskDefinition = taskDefinitionMap.get(taskInstance.getTaskCode()); subProcessCode = Integer.parseInt(JSONUtils.parseObject( - taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_CODE).asText()); + taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_CODE).asText()); } treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskCode(), - taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(), - taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessCode)); + taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(), + taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessCode)); } } for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) { @@ -1292,9 +1299,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * batch copy process definition * - * @param loginUser loginUser - * @param projectCode projectCode - * @param codes processDefinitionCodes + * @param loginUser loginUser + * @param projectCode projectCode + * @param codes processDefinitionCodes * @param targetProjectCode targetProjectCode */ @Override @@ -1318,6 +1325,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * batch move process definition * Will be deleted + * * @param loginUser loginUser * @param projectCode projectCode * @param codes processDefinitionCodes @@ -1383,7 +1391,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro diffCode.forEach(code -> failedProcessList.add(code + "[null]")); for (ProcessDefinition processDefinition : processDefinitionList) { List processTaskRelations = - processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()); + processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()); List taskRelationList = processTaskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); processDefinition.setProjectCode(targetProjectCode); if (isCopy) { @@ -1391,9 +1399,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro Map taskCodeMap = new HashMap<>(); for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { if (TaskType.CONDITIONS.getDesc().equals(taskDefinitionLog.getTaskType()) - || TaskType.SWITCH.getDesc().equals(taskDefinitionLog.getTaskType()) - || TaskType.SUB_PROCESS.getDesc().equals(taskDefinitionLog.getTaskType()) - || TaskType.DEPENDENT.getDesc().equals(taskDefinitionLog.getTaskType())) { + || TaskType.SWITCH.getDesc().equals(taskDefinitionLog.getTaskType()) + || TaskType.SUB_PROCESS.getDesc().equals(taskDefinitionLog.getTaskType()) + || TaskType.DEPENDENT.getDesc().equals(taskDefinitionLog.getTaskType())) { putMsg(result, Status.NOT_SUPPORT_COPY_TASK_TYPE, taskDefinitionLog.getTaskType()); return; } @@ -1458,10 +1466,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * switch the defined process definition version * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param code process definition code - * @param version the version user want to switch + * @param code process definition code + * @param version the version user want to switch * @return switch process definition version result code */ @Override @@ -1497,11 +1505,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * check batch operate result * - * @param srcProjectCode srcProjectCode + * @param srcProjectCode srcProjectCode * @param targetProjectCode targetProjectCode - * @param result result + * @param result result * @param failedProcessList failedProcessList - * @param isCopy isCopy + * @param isCopy isCopy */ private void checkBatchOperateResult(long srcProjectCode, long targetProjectCode, Map result, List failedProcessList, boolean isCopy) { @@ -1519,11 +1527,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * query the pagination versions info by one certain process definition code * - * @param loginUser login user info to check auth + * @param loginUser login user info to check auth * @param projectCode project code - * @param pageNo page number - * @param pageSize page size - * @param code process definition code + * @param pageNo page number + * @param pageSize page size + * @param code process definition code * @return the pagination process definition versions info of the certain process definition */ @Override @@ -1553,10 +1561,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * delete one certain process definition by version number and process definition code * - * @param loginUser login user info to check auth + * @param loginUser login user info to check auth * @param projectCode project code - * @param code process definition code - * @param version version number + * @param code process definition code + * @param version version number * @return delete result code */ @Override @@ -1591,13 +1599,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * create empty process definition * - * @param loginUser login user - * @param projectCode project code - * @param name process definition name - * @param description description + * @param loginUser login user + * @param projectCode project code + * @param name process definition name + * @param description description * @param globalParams globalParams - * @param timeout timeout - * @param tenantCode tenantCode + * @param timeout timeout + * @param tenantCode tenantCode * @param scheduleJson scheduleJson * @return process definition code */ @@ -1642,7 +1650,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description, - globalParams, "", timeout, loginUser.getId(), tenantId); + globalParams, "", timeout, loginUser.getId(), tenantId); result = createEmptyDagDefine(loginUser, processDefinition); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; @@ -1713,15 +1721,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * update process definition basic info * - * @param loginUser login user - * @param projectCode project code - * @param name process definition name - * @param code process definition code - * @param description description - * @param globalParams globalParams - * @param timeout timeout - * @param tenantCode tenantCode - * @param scheduleJson scheduleJson + * @param loginUser login user + * @param projectCode project code + * @param name process definition name + * @param code process definition code + * @param description description + * @param globalParams globalParams + * @param timeout timeout + * @param tenantCode tenantCode + * @param scheduleJson scheduleJson * @return update result code */ @Override @@ -1817,24 +1825,24 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro param.setTimezoneId(schedule.getTimezoneId()); return schedulerService.updateScheduleByProcessDefinitionCode( - loginUser, - projectCode, - processDefinitionCode, - JSONUtils.toJsonString(param), - warningType, - warningGroupId, - failureStrategy, - processInstancePriority, - workerGroup, - environmentCode); + loginUser, + projectCode, + processDefinitionCode, + JSONUtils.toJsonString(param), + warningType, + warningGroupId, + failureStrategy, + processInstancePriority, + workerGroup, + environmentCode); } /** * release process definition and schedule * - * @param loginUser login user - * @param projectCode project code - * @param code process definition code + * @param loginUser login user + * @param projectCode project code + * @param code process definition code * @param releaseState releaseState * @return update result code */ diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index ee4190b53e..ce9c7e1cd9 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -514,6 +514,7 @@ public class ProcessDefinitionServiceTest { putMsg(result, Status.SUCCESS, projectCode); Mockito.when(processService.genDagData(Mockito.any())).thenReturn(new DagData(processDefinition, null, null)); Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); + Mockito.when(projectMapper.queryProjectCreatedAndAuthorizedByUserId(loginUser.getId())).thenReturn(Lists.newArrayList(project)); Map dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionCode(loginUser, projectCode, 46L); Assert.assertEquals(Status.SUCCESS, dataNotValidRes.get(Constants.STATUS)); } @@ -578,17 +579,17 @@ public class ProcessDefinitionServiceTest { public void testViewTree() { //process definition not exist ProcessDefinition processDefinition = getProcessDefinition(); - Map processDefinitionNullRes = processDefinitionService.viewTree(processDefinition.getProjectCode(),46, 10); + Map processDefinitionNullRes = processDefinitionService.viewTree(processDefinition.getProjectCode(), 46, 10); Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS)); //task instance not exist Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>()); - Map taskNullRes = processDefinitionService.viewTree(processDefinition.getProjectCode(),46, 10); + Map taskNullRes = processDefinitionService.viewTree(processDefinition.getProjectCode(), 46, 10); Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS)); //task instance exist - Map taskNotNuLLRes = processDefinitionService.viewTree(processDefinition.getProjectCode(),46, 10); + Map taskNotNuLLRes = processDefinitionService.viewTree(processDefinition.getProjectCode(), 46, 10); Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS)); }