Browse Source

[Feature][JsonSplit-api]query taskNodeList of processDefinition (#5802)

* fix processDefinitonController projectCode

* queryAllByProjectCode of processDefinition

* query taskNodeList of processDefinition

Co-authored-by: JinyLeeChina <297062848@qq.com>
2.0.7-release
JinyLeeChina 3 years ago committed by GitHub
parent
commit
f66a16676a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  2. 14
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  3. 92
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  4. 8
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
  5. 67
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

20
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -516,32 +516,32 @@ public class ProcessDefinitionController extends BaseController {
public Result getNodeListByDefinitionCode(
@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam("processDefinitionCode") Long processDefinitionCode) {
@RequestParam("processDefinitionCode") long processDefinitionCode) {
logger.info("query task node name list by definitionCode, login user:{}, project name:{}, code : {}",
loginUser.getUserName(), projectCode, processDefinitionCode);
Map<String, Object> result = processDefinitionService.getTaskNodeListByDefinitionCode(processDefinitionCode);
Map<String, Object> result = processDefinitionService.getTaskNodeListByDefinitionCode(loginUser, projectCode, processDefinitionCode);
return returnDataList(result);
}
/**
* get tasks list by process definition code list
* get tasks list map by process definition multiple code
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCodeList process definition code list
* @param processDefinitionCodes process definition codes
* @return node list data
*/
@ApiOperation(value = "getNodeListByDefinitionCodeList", notes = "GET_NODE_LIST_BY_DEFINITION_CODE_NOTES")
@ApiOperation(value = "getNodeListByDefinitionCodes", notes = "GET_NODE_LIST_BY_DEFINITION_CODE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionCodeList", value = "PROCESS_DEFINITION_CODE_LIST", required = true, type = "String")
@ApiImplicitParam(name = "processDefinitionCodes", value = "PROCESS_DEFINITION_CODES", required = true, type = "String", example = "100,200,300")
})
@GetMapping(value = "get-task-list")
@GetMapping(value = "gen-task-list-map")
@ResponseStatus(HttpStatus.OK)
@ApiException(GET_TASKS_LIST_BY_PROCESS_DEFINITION_ID_ERROR)
public Result getNodeListByDefinitionCodeList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
public Result getNodeListMapByDefinitionCodes(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam("processDefinitionCodeList") String processDefinitionCodeList) {
Map<String, Object> result = processDefinitionService.getTaskNodeListByDefinitionCodeList(processDefinitionCodeList);
@RequestParam("processDefinitionCodes") String processDefinitionCodes) {
Map<String, Object> result = processDefinitionService.getNodeListMapByDefinitionCodes(loginUser, projectCode, processDefinitionCodes);
return returnDataList(result);
}

14
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -245,18 +245,26 @@ public interface ProcessDefinitionService {
/**
* get task node details based on process definition
*
* @param loginUser loginUser
* @param projectCode project code
* @param defineCode define code
* @return task node list
*/
Map<String, Object> getTaskNodeListByDefinitionCode(Long defineCode);
Map<String, Object> getTaskNodeListByDefinitionCode(User loginUser,
long projectCode,
long defineCode);
/**
* get task node details based on process definition
* get task node details map based on process definition
*
* @param loginUser loginUser
* @param projectCode project code
* @param defineCodeList define code list
* @return task node list
*/
Map<String, Object> getTaskNodeListByDefinitionCodeList(String defineCodeList);
Map<String, Object> getNodeListMapByDefinitionCodes(User loginUser,
long projectCode,
String defineCodeList);
/**
* query process definition all by project code

92
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -112,6 +112,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
/**
* process definition service impl
@ -203,6 +204,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
// check whether the new process define name exist
ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name);
if (definition != null) {
putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name);
return result;
}
List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
Map<String, Object> checkRelationJson = checkTaskRelationList(taskRelationList, taskRelationJson);
if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) {
@ -296,13 +304,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
List<ProcessDefinition> resourceList = processDefinitionMapper.queryAllDefinitionList(projectCode);
resourceList.forEach(processDefinition -> {
ProcessData processData = processService.genProcessData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
});
result.put(Constants.DATA_LIST, resourceList);
List<DagData> dagDataList = resourceList.stream().map(processService::genDagData).collect(Collectors.toList());
result.put(Constants.DATA_LIST, dagDataList);
putMsg(result, Status.SUCCESS);
return result;
}
@ -458,6 +461,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName());
return result;
}
if (!name.equals(processDefinition.getName())) {
// check whether the new process define name exist
ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name);
if (definition != null) {
putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name);
return result;
}
}
processDefinition.set(projectCode, name, description, globalParams, locations, connects, timeout, tenant.getId());
return updateProcessDefine(loginUser, result, taskRelationList, processDefinition);
@ -1174,63 +1185,59 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* get task node details based on process definition
*
* @param loginUser loginUser
* @param projectCode project code
* @param defineCode define code
* @return task node list
*/
public Map<String, Object> getTaskNodeListByDefinitionCode(Long defineCode) {
Map<String, Object> result = new HashMap<>();
public Map<String, Object> getTaskNodeListByDefinitionCode(User loginUser, long projectCode, long defineCode) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, project.getName());
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(defineCode);
if (processDefinition == null) {
logger.info("process define not exists");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineCode);
return result;
}
ProcessData processData = processService.genProcessData(processDefinition);
//process data check
if (null == processData) {
logger.error("process data is null");
putMsg(result, Status.DATA_IS_NOT_VALID, JSONUtils.toJsonString(processData));
return result;
}
List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
result.put(Constants.DATA_LIST, taskNodeList);
DagData dagData = processService.genDagData(processDefinition);
result.put(Constants.DATA_LIST, dagData.getTaskDefinitionList());
putMsg(result, Status.SUCCESS);
return result;
}
/**
* get task node details based on process definition
* get task node details map based on process definition
*
* @param defineCodeList define code list
* @param loginUser loginUser
* @param projectCode project code
* @param defineCodes define codes
* @return task node list
*/
@Override
public Map<String, Object> getTaskNodeListByDefinitionCodeList(String defineCodeList) {
Map<String, Object> result = new HashMap<>();
Map<Integer, List<TaskNode>> taskNodeMap = new HashMap<>();
String[] codeArr = defineCodeList.split(",");
List<Long> codeList = new ArrayList<>();
for (String definitionCode : codeArr) {
codeList.add(Long.parseLong(definitionCode));
public Map<String, Object> getNodeListMapByDefinitionCodes(User loginUser, long projectCode, String defineCodes) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, project.getName());
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryByCodes(codeList);
Set<Long> defineCodeSet = Lists.newArrayList(defineCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryByCodes(defineCodeSet);
if (CollectionUtils.isEmpty(processDefinitionList)) {
logger.info("process definition not exists");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineCodeList);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineCodes);
return result;
}
Map<Long, List<TaskDefinitionLog>> taskNodeMap = new HashMap<>();
for (ProcessDefinition processDefinition : processDefinitionList) {
ProcessData processData = processService.genProcessData(processDefinition);
List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
taskNodeMap.put(processDefinition.getId(), taskNodeList);
DagData dagData = processService.genDagData(processDefinition);
taskNodeMap.put(processDefinition.getCode(), dagData.getTaskDefinitionList());
}
result.put(Constants.DATA_LIST, taskNodeMap);
@ -1635,10 +1642,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
// check the if pageNo or pageSize less than 1
if (pageNo <= 0 || pageSize <= 0) {
putMsg(result
, Status.QUERY_PROCESS_DEFINITION_VERSIONS_PAGE_NO_OR_PAGE_SIZE_LESS_THAN_1_ERROR
, pageNo
, pageSize);
putMsg(result, Status.QUERY_PROCESS_DEFINITION_VERSIONS_PAGE_NO_OR_PAGE_SIZE_LESS_THAN_1_ERROR, pageNo, pageSize);
return result;
}
@ -1656,8 +1660,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
IPage<ProcessDefinitionLog> processDefinitionVersionsPaging = processDefinitionLogMapper.queryProcessDefinitionVersionsPaging(page, processDefinition.getCode());
List<ProcessDefinitionLog> processDefinitionLogs = processDefinitionVersionsPaging.getRecords();
ProcessData processData = processService.genProcessData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
pageInfo.setLists(processDefinitionLogs);
pageInfo.setTotalCount((int) processDefinitionVersionsPaging.getTotal());
return ImmutableMap.of(

8
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java

@ -284,22 +284,22 @@ public class ProcessDefinitionControllerTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
Mockito.when(processDefinitionService.getTaskNodeListByDefinitionCode(code)).thenReturn(result);
Mockito.when(processDefinitionService.getTaskNodeListByDefinitionCode(user, projectCode, code)).thenReturn(result);
Result response = processDefinitionController.getNodeListByDefinitionCode(user, projectCode, code);
Assert.assertTrue(response != null && response.isSuccess());
}
@Test
public void testGetNodeListByDefinitionIdList() throws Exception {
public void testGetNodeListByDefinitionIdList() {
long projectCode = 1L;
String codeList = "1,2,3";
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
Mockito.when(processDefinitionService.getTaskNodeListByDefinitionCodeList(codeList)).thenReturn(result);
Result response = processDefinitionController.getNodeListByDefinitionCodeList(user, projectCode, codeList);
Mockito.when(processDefinitionService.getNodeListMapByDefinitionCodes(user, projectCode, codeList)).thenReturn(result);
Result response = processDefinitionController.getNodeListMapByDefinitionCodes(user, projectCode, codeList);
Assert.assertTrue(response != null && response.isSuccess());
}

67
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -86,6 +86,7 @@ import org.springframework.web.multipart.MultipartFile;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
/**
* process definition service test
@ -380,10 +381,8 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result);
Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test_def")).thenReturn(null);
ProcessData processData = getProcessData();
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData);
Map<String, Object> instanceNotexitRes = processDefinitionService.queryProcessDefinitionByName(loginUser, projectCode, "test_def");
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
Map<String, Object> instanceNotExitRes = processDefinitionService.queryProcessDefinitionByName(loginUser, projectCode, "test_def");
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotExitRes.get(Constants.STATUS));
//instance exit
Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test")).thenReturn(getProcessDefinition());
@ -654,44 +653,62 @@ public class ProcessDefinitionServiceTest {
}
@Test
public void testGetTaskNodeListByDefinitionId() {
public void testGetTaskNodeListByDefinitionCode() {
long projectCode = 1L;
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
Project project = getProject(projectCode);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
//project check auth fail
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result);
//process definition not exist
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(null);
Map<String, Object> processDefinitionNullRes = processDefinitionService.getTaskNodeListByDefinitionCode(46L);
Map<String, Object> processDefinitionNullRes = processDefinitionService.getTaskNodeListByDefinitionCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
//process data null
ProcessDefinition processDefinition = getProcessDefinition();
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Map<String, Object> successRes = processDefinitionService.getTaskNodeListByDefinitionCode(46L);
Assert.assertEquals(Status.DATA_IS_NOT_VALID, successRes.get(Constants.STATUS));
//success
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(new ProcessData());
ProcessDefinition processDefinition = getProcessDefinition();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(processService.genDagData(Mockito.any())).thenReturn(new DagData(processDefinition, null, null));
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Map<String, Object> dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionCode(46L);
Map<String, Object> dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.SUCCESS, dataNotValidRes.get(Constants.STATUS));
}
@Test
public void testGetTaskNodeListByDefinitionIdList() {
public void testGetTaskNodeListByDefinitionCodes() {
long projectCode = 1L;
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
Project project = getProject(projectCode);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
//project check auth fail
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result);
//process definition not exist
String defineCodeList = "46";
Long[] codeArray = {46L};
List<Long> codeList = Arrays.asList(codeArray);
Mockito.when(processDefineMapper.queryByCodes(codeList)).thenReturn(null);
Map<String, Object> processNotExistRes = processDefinitionService.getTaskNodeListByDefinitionCodeList(defineCodeList);
String defineCodes = "46";
Set<Long> defineCodeSet = Lists.newArrayList(defineCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
Mockito.when(processDefineMapper.queryByCodes(defineCodeSet)).thenReturn(null);
Map<String, Object> processNotExistRes = processDefinitionService.getNodeListMapByDefinitionCodes(loginUser, projectCode, defineCodes);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processNotExistRes.get(Constants.STATUS));
//process definition exist
putMsg(result, Status.SUCCESS, projectCode);
ProcessDefinition processDefinition = getProcessDefinition();
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(processDefinition);
Mockito.when(processDefineMapper.queryByCodes(codeList)).thenReturn(processDefinitionList);
ProcessData processData = getProcessData();
Mockito.when(processService.genProcessData(processDefinition)).thenReturn(processData);
Mockito.when(processDefineMapper.queryByCodes(defineCodeSet)).thenReturn(processDefinitionList);
Mockito.when(processService.genDagData(Mockito.any())).thenReturn(new DagData(processDefinition, null, null));
Map<String, Object> successRes = processDefinitionService.getTaskNodeListByDefinitionCodeList(defineCodeList);
Map<String, Object> successRes = processDefinitionService.getNodeListMapByDefinitionCodes(loginUser, projectCode, defineCodes);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}

Loading…
Cancel
Save