From 8bd88d90c4852e5c58ef71a6c65acecbc3a930a9 Mon Sep 17 00:00:00 2001 From: JinyLeeChina <42576980+JinyLeeChina@users.noreply.github.com> Date: Wed, 4 Aug 2021 13:52:19 +0800 Subject: [PATCH] [Feature][JsonSplit-api] checkProcessNode of processDefinition (#5946) * check has cycle of ProcessDefinition * checkProcessNode of processDefinition Co-authored-by: JinyLeeChina <297062848@qq.com> --- .../api/service/ProcessDefinitionService.java | 8 +- .../impl/ProcessDefinitionServiceImpl.java | 15 +- .../impl/ProcessInstanceServiceImpl.java | 4 +- .../service/ProcessDefinitionServiceTest.java | 329 ++---------------- .../service/ProcessInstanceServiceTest.java | 2 +- .../service/process/ProcessService.java | 5 +- 6 files changed, 47 insertions(+), 316 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index a49a08f6dc..6290c310c3 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -229,14 +229,12 @@ public interface ProcessDefinitionService { MultipartFile file); /** - * check the process definition node meets the specifications + * check the process task relation json * - * @param processData process data - * @param processDefinitionJson process definition json + * @param processTaskRelationJson process task relation json * @return check result code */ - Map checkProcessNodeList(ProcessData processData, - String processDefinitionJson); + Map checkProcessNodeList(String processTaskRelationJson); /** * get task node details based on process definition diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index f0fcec3632..9f50ca63be 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -868,25 +868,24 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } /** - * check the process definition node meets the specifications + * check the process task relation json * - * @param processData process data - * @param processDefinitionJson process definition json + * @param processTaskRelationJson process task relation json * @return check result code */ @Override - public Map checkProcessNodeList(ProcessData processData, String processDefinitionJson) { - + public Map checkProcessNodeList(String processTaskRelationJson) { Map result = new HashMap<>(); try { - if (processData == null) { + if (processTaskRelationJson == null) { logger.error("process data is null"); - putMsg(result, Status.DATA_IS_NOT_VALID, processDefinitionJson); + putMsg(result, Status.DATA_IS_NOT_VALID, processTaskRelationJson); return result; } + List taskRelationList = JSONUtils.toList(processTaskRelationJson, ProcessTaskRelationLog.class); // Check whether the task node is normal - List taskNodes = processData.getTasks(); + List taskNodes = processService.transformTask(taskRelationList); if (CollectionUtils.isEmpty(taskNodes)) { logger.error("process node info is empty"); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index de0808ead9..dda40eaa11 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -450,8 +450,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class); - //check workflow json is valid - result = processDefinitionService.checkProcessNodeList(processData, processInstanceJson); + //check workflow json is valid TODO processInstanceJson --> processTaskRelationJson + result = processDefinitionService.checkProcessNodeList(processInstanceJson); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index f6a6a7fb7d..0ec7e35fff 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -23,27 +23,18 @@ import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; -import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.graph.DAG; -import org.apache.dolphinscheduler.common.model.TaskNode; -import org.apache.dolphinscheduler.common.process.Property; -import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.DagData; -import org.apache.dolphinscheduler.dao.entity.DataSource; -import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; @@ -52,7 +43,6 @@ import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.service.process.ProcessService; -import java.io.IOException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; @@ -83,148 +73,10 @@ import com.google.common.collect.Lists; @RunWith(MockitoJUnitRunner.class) public class ProcessDefinitionServiceTest { - private static final String SHELL_JSON = "{\n" - + " \"globalParams\": [\n" - + " \n" - + " ],\n" - + " \"tasks\": [\n" - + " {\n" - + " \"type\": \"SHELL\",\n" - + " \"id\": \"tasks-9527\",\n" - + " \"name\": \"shell-1\",\n" - + " \"params\": {\n" - + " \"resourceList\": [\n" - + " \n" - + " ],\n" - + " \"localParams\": [\n" - + " \n" - + " ],\n" - + " \"rawScript\": \"#!/bin/bash\\necho \\\"shell-1\\\"\"\n" - + " },\n" - + " \"description\": \"\",\n" - + " \"runFlag\": \"NORMAL\",\n" - + " \"dependence\": {\n" - + " \n" - + " },\n" - + " \"maxRetryTimes\": \"0\",\n" - + " \"retryInterval\": \"1\",\n" - + " \"timeout\": {\n" - + " \"strategy\": \"\",\n" - + " \"interval\": 1,\n" - + " \"enable\": false\n" - + " },\n" - + " \"taskInstancePriority\": \"MEDIUM\",\n" - + " \"workerGroupId\": -1,\n" - + " \"preTasks\": [\n" - + " \n" - + " ]\n" - + " }\n" - + " ],\n" - + " \"tenantId\": 1,\n" - + " \"timeout\": 0\n" - + "}"; - private static final String CYCLE_SHELL_JSON = "{\n" - + " \"globalParams\": [\n" - + " \n" - + " ],\n" - + " \"tasks\": [\n" - + " {\n" - + " \"type\": \"SHELL\",\n" - + " \"id\": \"tasks-9527\",\n" - + " \"name\": \"shell-1\",\n" - + " \"params\": {\n" - + " \"resourceList\": [\n" - + " \n" - + " ],\n" - + " \"localParams\": [\n" - + " \n" - + " ],\n" - + " \"rawScript\": \"#!/bin/bash\\necho \\\"shell-1\\\"\"\n" - + " },\n" - + " \"description\": \"\",\n" - + " \"runFlag\": \"NORMAL\",\n" - + " \"dependence\": {\n" - + " \n" - + " },\n" - + " \"maxRetryTimes\": \"0\",\n" - + " \"retryInterval\": \"1\",\n" - + " \"timeout\": {\n" - + " \"strategy\": \"\",\n" - + " \"interval\": 1,\n" - + " \"enable\": false\n" - + " },\n" - + " \"taskInstancePriority\": \"MEDIUM\",\n" - + " \"workerGroupId\": -1,\n" - + " \"preTasks\": [\n" - + " \"tasks-9529\"\n" - + " ]\n" - + " },\n" - + " {\n" - + " \"type\": \"SHELL\",\n" - + " \"id\": \"tasks-9528\",\n" - + " \"name\": \"shell-1\",\n" - + " \"params\": {\n" - + " \"resourceList\": [\n" - + " \n" - + " ],\n" - + " \"localParams\": [\n" - + " \n" - + " ],\n" - + " \"rawScript\": \"#!/bin/bash\\necho \\\"shell-1\\\"\"\n" - + " },\n" - + " \"description\": \"\",\n" - + " \"runFlag\": \"NORMAL\",\n" - + " \"dependence\": {\n" - + " \n" - + " },\n" - + " \"maxRetryTimes\": \"0\",\n" - + " \"retryInterval\": \"1\",\n" - + " \"timeout\": {\n" - + " \"strategy\": \"\",\n" - + " \"interval\": 1,\n" - + " \"enable\": false\n" - + " },\n" - + " \"taskInstancePriority\": \"MEDIUM\",\n" - + " \"workerGroupId\": -1,\n" - + " \"preTasks\": [\n" - + " \"tasks-9527\"\n" - + " ]\n" - + " },\n" - + " {\n" - + " \"type\": \"SHELL\",\n" - + " \"id\": \"tasks-9529\",\n" - + " \"name\": \"shell-1\",\n" - + " \"params\": {\n" - + " \"resourceList\": [\n" - + " \n" - + " ],\n" - + " \"localParams\": [\n" - + " \n" - + " ],\n" - + " \"rawScript\": \"#!/bin/bash\\necho \\\"shell-1\\\"\"\n" - + " },\n" - + " \"description\": \"\",\n" - + " \"runFlag\": \"NORMAL\",\n" - + " \"dependence\": {\n" - + " \n" - + " },\n" - + " \"maxRetryTimes\": \"0\",\n" - + " \"retryInterval\": \"1\",\n" - + " \"timeout\": {\n" - + " \"strategy\": \"\",\n" - + " \"interval\": 1,\n" - + " \"enable\": false\n" - + " },\n" - + " \"taskInstancePriority\": \"MEDIUM\",\n" - + " \"workerGroupId\": -1,\n" - + " \"preTasks\": [\n" - + " \"tasks-9528\"\n" - + " ]\n" - + " }\n" - + " ],\n" - + " \"tenantId\": 1,\n" - + " \"timeout\": 0\n" - + "}"; + private static final String taskRelationJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789," + + "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789," + + "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]"; + @InjectMocks private ProcessDefinitionServiceImpl processDefinitionService; @Mock @@ -298,14 +150,14 @@ public class ProcessDefinitionServiceTest { Page page = new Page<>(1, 10); page.setTotal(30); Mockito.when(processDefineMapper.queryDefineListPaging( - Mockito.any(IPage.class) - , Mockito.eq("") - , Mockito.eq(loginUser.getId()) - , Mockito.eq(project.getCode()) - , Mockito.anyBoolean())).thenReturn(page); + Mockito.any(IPage.class) + , Mockito.eq("") + , Mockito.eq(loginUser.getId()) + , Mockito.eq(project.getCode()) + , Mockito.anyBoolean())).thenReturn(page); Map map1 = processDefinitionService.queryProcessDefinitionListPaging( - loginUser, 1L, "", 1, 10, loginUser.getId()); + loginUser, 1L, "", 1, 10, loginUser.getId()); Assert.assertEquals(Status.SUCCESS, map1.get(Constants.STATUS)); } @@ -402,7 +254,7 @@ public class ProcessDefinitionServiceTest { putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result); Map map1 = processDefinitionService.batchCopyProcessDefinition( - loginUser, projectCode, String.valueOf(project.getId()), 2L); + loginUser, projectCode, String.valueOf(project.getId()), 2L); Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map1.get(Constants.STATUS)); // project check auth success, target project name not equal project name, check auth target project fail @@ -419,7 +271,7 @@ public class ProcessDefinitionServiceTest { Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList); Map map3 = processDefinitionService.batchCopyProcessDefinition( - loginUser, projectCode, "46", 1L); + loginUser, projectCode, "46", 1L); Assert.assertEquals(Status.COPY_PROCESS_DEFINITION_ERROR, map3.get(Constants.STATUS)); } @@ -448,11 +300,11 @@ public class ProcessDefinitionServiceTest { processDefinitionList.add(definition); Set definitionCodes = Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList); - Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 46L)).thenReturn(getProcessTaskRelation(projectCode, 46L)); + Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 46L)).thenReturn(getProcessTaskRelation(projectCode)); putMsg(result, Status.SUCCESS); Map successRes = processDefinitionService.batchMoveProcessDefinition( - loginUser, projectCode, "46", projectCode2); + loginUser, projectCode, "46", projectCode2); Assert.assertEquals(Status.MOVE_PROCESS_DEFINITION_ERROR, successRes.get(Constants.STATUS)); } @@ -495,7 +347,7 @@ public class ProcessDefinitionServiceTest { Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result); processDefinition.setReleaseState(ReleaseState.ONLINE); Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); - Map dfOnlineRes = processDefinitionService.deleteProcessDefinitionById(loginUser,projectCode, 46); + Map dfOnlineRes = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46); Assert.assertEquals(Status.PROCESS_DEFINE_STATE_ONLINE, dfOnlineRes.get(Constants.STATUS)); //scheduler list elements > 1 @@ -555,26 +407,26 @@ public class ProcessDefinitionServiceTest { putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result); Map map = processDefinitionService.releaseProcessDefinition(loginUser, projectCode, - 6, ReleaseState.OFFLINE); + 6, ReleaseState.OFFLINE); Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS)); // project check auth success, processs definition online putMsg(result, Status.SUCCESS, projectCode); Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(getProcessDefinition()); Map onlineRes = processDefinitionService.releaseProcessDefinition( - loginUser, projectCode, 46, ReleaseState.ONLINE); + loginUser, projectCode, 46, ReleaseState.ONLINE); Assert.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS)); // project check auth success, processs definition online ProcessDefinition processDefinition1 = getProcessDefinition(); processDefinition1.setResourceIds("1,2"); Map onlineWithResourceRes = processDefinitionService.releaseProcessDefinition( - loginUser, projectCode, 46, ReleaseState.ONLINE); + loginUser, projectCode, 46, ReleaseState.ONLINE); Assert.assertEquals(Status.SUCCESS, onlineWithResourceRes.get(Constants.STATUS)); // release error code Map failRes = processDefinitionService.releaseProcessDefinition( - loginUser, projectCode, 46, ReleaseState.getEnum(2)); + loginUser, projectCode, 46, ReleaseState.getEnum(2)); Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failRes.get(Constants.STATUS)); } @@ -594,51 +446,30 @@ public class ProcessDefinitionServiceTest { putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result); Map map = processDefinitionService.verifyProcessDefinitionName(loginUser, - projectCode, "test_pdf"); + projectCode, "test_pdf"); Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS)); //project check auth success, process not exist putMsg(result, Status.SUCCESS, projectCode); Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(null); Map processNotExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser, - projectCode, "test_pdf"); + projectCode, "test_pdf"); Assert.assertEquals(Status.SUCCESS, processNotExistRes.get(Constants.STATUS)); //process exist Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(getProcessDefinition()); Map processExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser, - projectCode, "test_pdf"); + projectCode, "test_pdf"); Assert.assertEquals(Status.PROCESS_DEFINITION_NAME_EXIST, processExistRes.get(Constants.STATUS)); } @Test public void testCheckProcessNodeList() { - Map dataNotValidRes = processDefinitionService.checkProcessNodeList(null, ""); + Map dataNotValidRes = processDefinitionService.checkProcessNodeList(null); Assert.assertEquals(Status.DATA_IS_NOT_VALID, dataNotValidRes.get(Constants.STATUS)); - // task not empty - String processDefinitionJson = SHELL_JSON; - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - Assert.assertNotNull(processData); - Map taskEmptyRes = processDefinitionService.checkProcessNodeList(processData, processDefinitionJson); - Assert.assertEquals(Status.SUCCESS, taskEmptyRes.get(Constants.STATUS)); - - // task empty - processData.setTasks(null); - Map taskNotEmptyRes = processDefinitionService.checkProcessNodeList(processData, processDefinitionJson); - Assert.assertEquals(Status.PROCESS_DAG_IS_EMPTY, taskNotEmptyRes.get(Constants.STATUS)); - - // task cycle - String processDefinitionJsonCycle = CYCLE_SHELL_JSON; - ProcessData processDataCycle = JSONUtils.parseObject(processDefinitionJsonCycle, ProcessData.class); - Map taskCycleRes = processDefinitionService.checkProcessNodeList(processDataCycle, processDefinitionJsonCycle); - Assert.assertEquals(Status.PROCESS_NODE_HAS_CYCLE, taskCycleRes.get(Constants.STATUS)); - - //json abnormal - String abnormalJson = processDefinitionJson.replaceAll(TaskType.SHELL.getDesc(), ""); - processData = JSONUtils.parseObject(abnormalJson, ProcessData.class); - Map abnormalTaskRes = processDefinitionService.checkProcessNodeList(processData, abnormalJson); - Assert.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID, abnormalTaskRes.get(Constants.STATUS)); + Map taskEmptyRes = processDefinitionService.checkProcessNodeList(taskRelationJson); + Assert.assertEquals(Status.PROCESS_DAG_IS_EMPTY, taskEmptyRes.get(Constants.STATUS)); } @Test @@ -701,17 +532,6 @@ public class ProcessDefinitionServiceTest { Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); } - private ProcessData getProcessData() { - ProcessData processData = new ProcessData(); - List taskNodeList = new ArrayList<>(); - processData.setTasks(taskNodeList); - List properties = new ArrayList<>(); - processData.setGlobalParams(properties); - processData.setTenantId(10); - processData.setTimeout(100); - return processData; - } - @Test public void testQueryAllProcessDefinitionByProjectCode() { User loginUser = new User(); @@ -735,29 +555,9 @@ public class ProcessDefinitionServiceTest { public void testViewTree() { //process definition not exist ProcessDefinition processDefinition = getProcessDefinition(); - processDefinition.setProcessDefinitionJson(SHELL_JSON); Map processDefinitionNullRes = processDefinitionService.viewTree(46, 10); Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS)); - List processInstanceList = new ArrayList<>(); - ProcessInstance processInstance = new ProcessInstance(); - processInstance.setId(1); - processInstance.setName("test_instance"); - processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); - processInstance.setHost("192.168.xx.xx"); - processInstance.setStartTime(new Date()); - processInstance.setEndTime(new Date()); - processInstanceList.add(processInstance); - - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setStartTime(new Date()); - taskInstance.setEndTime(new Date()); - taskInstance.setTaskType(TaskType.SHELL.getDesc()); - taskInstance.setId(1); - taskInstance.setName("test_task_instance"); - taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); - taskInstance.setHost("192.168.xx.xx"); - //task instance not exist Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>()); @@ -767,38 +567,15 @@ public class ProcessDefinitionServiceTest { //task instance exist Map taskNotNuLLRes = processDefinitionService.viewTree(46, 10); Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS)); - } @Test public void testSubProcessViewTree() { - ProcessDefinition processDefinition = getProcessDefinition(); - processDefinition.setProcessDefinitionJson(SHELL_JSON); - List processInstanceList = new ArrayList<>(); - ProcessInstance processInstance = new ProcessInstance(); - processInstance.setId(1); - processInstance.setName("test_instance"); - processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); - processInstance.setHost("192.168.xx.xx"); - processInstance.setStartTime(new Date()); - processInstance.setEndTime(new Date()); - processInstanceList.add(processInstance); - - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setStartTime(new Date()); - taskInstance.setEndTime(new Date()); - taskInstance.setTaskType(TaskType.SUB_PROCESS.getDesc()); - taskInstance.setId(1); - taskInstance.setName("test_task_instance"); - taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); - taskInstance.setHost("192.168.xx.xx"); - taskInstance.setTaskParams("\"processDefinitionId\": \"222\",\n"); Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>()); Map taskNotNuLLRes = processDefinitionService.viewTree(46, 10); Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS)); - } @Test @@ -812,25 +589,17 @@ public class ProcessDefinitionServiceTest { long projectCode = 1L; Project project = getProject(projectCode); - - ProcessDefinition processDefinition = getProcessDefinition(); - Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode)); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result); - String taskRelationJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789," - + "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":{}},{\"name\":\"\",\"preTaskCode\":123456789," - + "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":{}}]"; Map updateResult = processDefinitionService.updateProcessDefinition(loginUser, projectCode, "test", 1, - "", "", "", 0, "root", taskRelationJson); - + "", "", "", 0, "root", null); Assert.assertEquals(Status.DATA_IS_NOT_VALID, updateResult.get(Constants.STATUS)); } @Test - public void testBatchExportProcessDefinitionByCodes() throws IOException { - processDefinitionService.batchExportProcessDefinitionByCodes( - null, 1L, null, null); + public void testBatchExportProcessDefinitionByCodes() { + processDefinitionService.batchExportProcessDefinitionByCodes(null, 1L, null, null); User loginUser = new User(); loginUser.setId(1); @@ -845,7 +614,7 @@ public class ProcessDefinitionServiceTest { Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result); processDefinitionService.batchExportProcessDefinitionByCodes( - loginUser, projectCode, "1", null); + loginUser, projectCode, "1", null); ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setId(1); @@ -857,23 +626,10 @@ public class ProcessDefinitionServiceTest { DagData dagData = new DagData(getProcessDefinition(), null, null); Mockito.when(processService.genDagData(Mockito.any())).thenReturn(dagData); - processDefinitionService.batchExportProcessDefinitionByCodes( - loginUser, projectCode, "1", response); + processDefinitionService.batchExportProcessDefinitionByCodes(loginUser, projectCode, "1", response); Assert.assertNotNull(processDefinitionService.exportProcessDagData(processDefinition)); } - /** - * get mock datasource - * - * @return DataSource - */ - private DataSource getDataSource() { - DataSource dataSource = new DataSource(); - dataSource.setId(2); - dataSource.setName("test"); - return dataSource; - } - /** * get mock processDefinition * @@ -888,7 +644,6 @@ public class ProcessDefinitionServiceTest { processDefinition.setTenantId(1); processDefinition.setDescription(""); processDefinition.setCode(46L); - return processDefinition; } @@ -907,30 +662,16 @@ public class ProcessDefinitionServiceTest { return project; } - private List getProcessTaskRelation(long projectCode, long processCode) { + private List getProcessTaskRelation(long projectCode) { List processTaskRelations = new ArrayList<>(); ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); processTaskRelation.setProjectCode(projectCode); - processTaskRelation.setProcessDefinitionCode(processCode); + processTaskRelation.setProcessDefinitionCode(46L); processTaskRelation.setProcessDefinitionVersion(1); processTaskRelations.add(processTaskRelation); return processTaskRelations; } - /** - * get mock Project - * - * @param projectId projectId - * @return Project - */ - private Project getProjectById(int projectId) { - Project project = new Project(); - project.setId(projectId); - project.setName("project_test2"); - project.setUserId(1); - return project; - } - /** * get mock schedule * @@ -954,12 +695,6 @@ public class ProcessDefinitionServiceTest { return schedule; } - private List getSchedulerList() { - List scheduleList = new ArrayList<>(); - scheduleList.add(getSchedule()); - return scheduleList; - } - private void putMsg(Map result, Status status, Object... statusParams) { result.put(Constants.STATUS, status); if (statusParams != null && statusParams.length > 0) { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index 132be73855..31da02991a 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -400,7 +400,7 @@ public class ProcessInstanceServiceTest { when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant); when(processService.updateProcessInstance(processInstance)).thenReturn(1); - when(processDefinitionService.checkProcessNodeList(Mockito.any(), eq(shellJson))).thenReturn(result); + when(processDefinitionService.checkProcessNodeList(shellJson)).thenReturn(result); when(processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion())).thenReturn(processDefinition); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index ac91b0685d..e64af1055c 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -2371,11 +2371,9 @@ public class ProcessService { * @param processDefinition process definition * @return dag graph */ - @Deprecated public DAG genDagGraph(ProcessDefinition processDefinition) { - Map locationMap = locationToMap(processDefinition.getLocations()); - List taskNodeList = genTaskNodeList(processDefinition.getCode(), processDefinition.getVersion(), locationMap); List processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); + List taskNodeList = transformTask(processTaskRelations); ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new ArrayList<>(processTaskRelations)); // Generate concrete Dag to be executed return DagHelper.buildDagGraph(processDag); @@ -2418,6 +2416,7 @@ public class ProcessService { return processData; } + @Deprecated public List genTaskNodeList(Long processCode, int processVersion, Map locationMap) { List processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion); Set taskDefinitionSet = new HashSet<>();