Browse Source

[cherry-pick-2.0.2] fix depend task project 202 #7726 (#7727)

* [DS-7654][ApiServer]fix dependent node on change projectCode error

* test

Co-authored-by: caishunfeng <534328519@qq.com>
2.0.7-release
wind 3 years ago committed by GitHub
parent
commit
35a1d732c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 224
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 7
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

224
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -106,7 +106,6 @@ import org.springframework.web.multipart.MultipartFile;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
@ -169,15 +168,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* create process definition
*
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param description description
* @param globalParams global params
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param description description
* @param globalParams global params
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return create result code
*/
@ -233,7 +232,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description,
globalParams, locations, timeout, loginUser.getId(), tenantId);
globalParams, locations, timeout, loginUser.getId(), tenantId);
return createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs);
}
@ -299,8 +298,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
List<ProcessTaskRelation> processTaskRelations = taskRelationList.stream()
.map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class))
.collect(Collectors.toList());
.map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class))
.collect(Collectors.toList());
List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
if (taskNodeList.size() != taskRelationList.size()) {
Set<Long> postTaskCodes = taskRelationList.stream().map(ProcessTaskRelationLog::getPostTaskCode).collect(Collectors.toSet());
@ -337,7 +336,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query process definition list
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @return definition list
*/
@ -359,7 +358,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query process definition simple list
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @return definition simple list
*/
@ -389,12 +388,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query process definition list paging
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param searchVal search value
* @param userId user id
* @param pageNo page number
* @param pageSize page size
* @param searchVal search value
* @param userId user id
* @param pageNo page number
* @param pageSize page size
* @return process definition page
*/
@Override
@ -411,7 +410,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Page<ProcessDefinition> page = new Page<>(pageNo, pageSize);
IPage<ProcessDefinition> processDefinitionIPage = processDefinitionMapper.queryDefineListPaging(
page, searchVal, userId, project.getCode(), isAdmin(loginUser));
page, searchVal, userId, project.getCode(), isAdmin(loginUser));
List<ProcessDefinition> records = processDefinitionIPage.getRecords();
for (ProcessDefinition pd : records) {
@ -432,9 +431,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query detail of process definition
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param code process definition code
* @return process definition detail
*/
@Override
@ -484,16 +483,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* update process definition
*
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param code process definition code
* @param description description
* @param globalParams global params
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param code process definition code
* @param description description
* @param globalParams global params
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return update result code
*/
@ -588,7 +587,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs);
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
@ -602,9 +601,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* verify process definition name unique
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param name name
* @param name name
* @return true if process definition name not exists, otherwise false
*/
@Override
@ -627,9 +626,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* delete process definition by code
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param code process definition code
* @return delete result code
*/
@Override
@ -697,9 +696,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* release process definition: online / offline
*
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param releaseState release state
* @return release result code
*/
@ -838,9 +837,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* import process definition
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param file process metadata json file
* @param file process metadata json file
* @return import process
*/
@Override
@ -1048,9 +1047,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* get task node details based on process definition
*
* @param loginUser loginUser
* @param loginUser loginUser
* @param projectCode project code
* @param code process definition code
* @param code process definition code
* @return task node list
*/
@Override
@ -1062,11 +1061,19 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
if (processDefinition == null) {
logger.info("process define not exists");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
return result;
}
HashMap<Long, Project> userProjects = new HashMap<>(Constants.DEFAULT_HASH_MAP_SIZE);
projectMapper.queryProjectCreatedAndAuthorizedByUserId(loginUser.getId())
.forEach(userProject -> userProjects.put(userProject.getCode(), userProject));
if (!userProjects.containsKey(projectCode)) {
logger.info("process define not exists, project dismatch");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
return result;
}
DagData dagData = processService.genDagData(processDefinition);
result.put(Constants.DATA_LIST, dagData.getTaskDefinitionList());
putMsg(result, Status.SUCCESS);
@ -1077,9 +1084,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* get task node details map based on process definition
*
* @param loginUser loginUser
* @param loginUser loginUser
* @param projectCode project code
* @param codes define codes
* @param codes define codes
* @return task node list
*/
@Override
@ -1098,13 +1105,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes);
return result;
}
HashMap<Long, Project> userProjects = new HashMap<>(Constants.DEFAULT_HASH_MAP_SIZE);
HashMap<Long, Project> userProjects = new HashMap<>(Constants.DEFAULT_HASH_MAP_SIZE);
projectMapper.queryProjectCreatedAndAuthorizedByUserId(loginUser.getId())
.forEach(userProject -> userProjects.put(userProject.getCode(), userProject));
.forEach(userProject -> userProjects.put(userProject.getCode(), userProject));
// check processDefinition exist in project
List<ProcessDefinition> processDefinitionListInProject = processDefinitionList.stream()
.filter(o -> userProjects.containsKey(o.getProjectCode())).collect(Collectors.toList());
.filter(o -> userProjects.containsKey(o.getProjectCode())).collect(Collectors.toList());
if (CollectionUtils.isEmpty(processDefinitionListInProject)) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes);
return result;
@ -1125,7 +1132,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query process definition all by project code
*
* @param loginUser loginUser
* @param loginUser loginUser
* @param projectCode project code
* @return process definitions in the project
*/
@ -1148,7 +1155,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* Encapsulates the TreeView structure
*
* @param projectCode project code
* @param code process definition code
* @param code process definition code
* @param limit limit
* @return tree view json data
*/
@ -1173,7 +1180,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processInstanceList.forEach(processInstance -> processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime())));
List<TaskDefinitionLog> taskDefinitionList = processService.genTaskDefineList(processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()));
Map<Long, TaskDefinitionLog> taskDefinitionMap = taskDefinitionList.stream()
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
if (limit > processInstanceList.size()) {
limit = processInstanceList.size();
@ -1188,8 +1195,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessInstance processInstance = processInstanceList.get(i);
Date endTime = processInstance.getEndTime() == null ? new Date() : processInstance.getEndTime();
parentTreeViewDto.getInstances().add(new Instance(processInstance.getId(), processInstance.getName(), processInstance.getProcessDefinitionCode(),
"", processInstance.getState().toString(), processInstance.getStartTime(), endTime, processInstance.getHost(),
DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime())));
"", processInstance.getState().toString(), processInstance.getStartTime(), endTime, processInstance.getHost(),
DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime())));
}
List<TreeViewDto> parentTreeViewDtoList = new ArrayList<>();
@ -1227,11 +1234,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (taskInstance.isSubProcess()) {
TaskDefinition taskDefinition = taskDefinitionMap.get(taskInstance.getTaskCode());
subProcessCode = Integer.parseInt(JSONUtils.parseObject(
taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_CODE).asText());
taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_CODE).asText());
}
treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskCode(),
taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(),
taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessCode));
taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(),
taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessCode));
}
}
for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) {
@ -1292,9 +1299,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* batch copy process definition
*
* @param loginUser loginUser
* @param projectCode projectCode
* @param codes processDefinitionCodes
* @param loginUser loginUser
* @param projectCode projectCode
* @param codes processDefinitionCodes
* @param targetProjectCode targetProjectCode
*/
@Override
@ -1318,6 +1325,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* batch move process definition
* Will be deleted
*
* @param loginUser loginUser
* @param projectCode projectCode
* @param codes processDefinitionCodes
@ -1383,7 +1391,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
diffCode.forEach(code -> failedProcessList.add(code + "[null]"));
for (ProcessDefinition processDefinition : processDefinitionList) {
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
List<ProcessTaskRelationLog> taskRelationList = processTaskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
processDefinition.setProjectCode(targetProjectCode);
if (isCopy) {
@ -1391,9 +1399,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Map<Long, Long> taskCodeMap = new HashMap<>();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (TaskType.CONDITIONS.getDesc().equals(taskDefinitionLog.getTaskType())
|| TaskType.SWITCH.getDesc().equals(taskDefinitionLog.getTaskType())
|| TaskType.SUB_PROCESS.getDesc().equals(taskDefinitionLog.getTaskType())
|| TaskType.DEPENDENT.getDesc().equals(taskDefinitionLog.getTaskType())) {
|| TaskType.SWITCH.getDesc().equals(taskDefinitionLog.getTaskType())
|| TaskType.SUB_PROCESS.getDesc().equals(taskDefinitionLog.getTaskType())
|| TaskType.DEPENDENT.getDesc().equals(taskDefinitionLog.getTaskType())) {
putMsg(result, Status.NOT_SUPPORT_COPY_TASK_TYPE, taskDefinitionLog.getTaskType());
return;
}
@ -1458,10 +1466,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* switch the defined process definition version
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param version the version user want to switch
* @param code process definition code
* @param version the version user want to switch
* @return switch process definition version result code
*/
@Override
@ -1497,11 +1505,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* check batch operate result
*
* @param srcProjectCode srcProjectCode
* @param srcProjectCode srcProjectCode
* @param targetProjectCode targetProjectCode
* @param result result
* @param result result
* @param failedProcessList failedProcessList
* @param isCopy isCopy
* @param isCopy isCopy
*/
private void checkBatchOperateResult(long srcProjectCode, long targetProjectCode,
Map<String, Object> result, List<String> failedProcessList, boolean isCopy) {
@ -1519,11 +1527,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query the pagination versions info by one certain process definition code
*
* @param loginUser login user info to check auth
* @param loginUser login user info to check auth
* @param projectCode project code
* @param pageNo page number
* @param pageSize page size
* @param code process definition code
* @param pageNo page number
* @param pageSize page size
* @param code process definition code
* @return the pagination process definition versions info of the certain process definition
*/
@Override
@ -1553,10 +1561,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* delete one certain process definition by version number and process definition code
*
* @param loginUser login user info to check auth
* @param loginUser login user info to check auth
* @param projectCode project code
* @param code process definition code
* @param version version number
* @param code process definition code
* @param version version number
* @return delete result code
*/
@Override
@ -1591,13 +1599,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* create empty process definition
*
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param description description
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param description description
* @param globalParams globalParams
* @param timeout timeout
* @param tenantCode tenantCode
* @param timeout timeout
* @param tenantCode tenantCode
* @param scheduleJson scheduleJson
* @return process definition code
*/
@ -1642,7 +1650,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description,
globalParams, "", timeout, loginUser.getId(), tenantId);
globalParams, "", timeout, loginUser.getId(), tenantId);
result = createEmptyDagDefine(loginUser, processDefinition);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
@ -1713,15 +1721,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* update process definition basic info
*
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param code process definition code
* @param description description
* @param globalParams globalParams
* @param timeout timeout
* @param tenantCode tenantCode
* @param scheduleJson scheduleJson
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param code process definition code
* @param description description
* @param globalParams globalParams
* @param timeout timeout
* @param tenantCode tenantCode
* @param scheduleJson scheduleJson
* @return update result code
*/
@Override
@ -1817,24 +1825,24 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
param.setTimezoneId(schedule.getTimezoneId());
return schedulerService.updateScheduleByProcessDefinitionCode(
loginUser,
projectCode,
processDefinitionCode,
JSONUtils.toJsonString(param),
warningType,
warningGroupId,
failureStrategy,
processInstancePriority,
workerGroup,
environmentCode);
loginUser,
projectCode,
processDefinitionCode,
JSONUtils.toJsonString(param),
warningType,
warningGroupId,
failureStrategy,
processInstancePriority,
workerGroup,
environmentCode);
}
/**
* release process definition and schedule
*
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param releaseState releaseState
* @return update result code
*/

7
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -514,6 +514,7 @@ public class ProcessDefinitionServiceTest {
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(processService.genDagData(Mockito.any())).thenReturn(new DagData(processDefinition, null, null));
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(projectMapper.queryProjectCreatedAndAuthorizedByUserId(loginUser.getId())).thenReturn(Lists.newArrayList(project));
Map<String, Object> dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.SUCCESS, dataNotValidRes.get(Constants.STATUS));
}
@ -578,17 +579,17 @@ public class ProcessDefinitionServiceTest {
public void testViewTree() {
//process definition not exist
ProcessDefinition processDefinition = getProcessDefinition();
Map<String, Object> processDefinitionNullRes = processDefinitionService.viewTree(processDefinition.getProjectCode(),46, 10);
Map<String, Object> processDefinitionNullRes = processDefinitionService.viewTree(processDefinition.getProjectCode(), 46, 10);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
//task instance not exist
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Map<String, Object> taskNullRes = processDefinitionService.viewTree(processDefinition.getProjectCode(),46, 10);
Map<String, Object> taskNullRes = processDefinitionService.viewTree(processDefinition.getProjectCode(), 46, 10);
Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS));
//task instance exist
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(processDefinition.getProjectCode(),46, 10);
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(processDefinition.getProjectCode(), 46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
}

Loading…
Cancel
Save