From 929655fdbe57e649ac8dc72a87af15fa3c5ef718 Mon Sep 17 00:00:00 2001 From: lenboo Date: Fri, 26 Feb 2021 15:05:32 +0800 Subject: [PATCH 1/2] refactor task node --- .../master/runner/MasterExecThread.java | 2 +- .../server/master/DependentTaskTest.java | 11 ++++-- .../service/process/ProcessService.java | 37 +++++++------------ 3 files changed, 21 insertions(+), 29 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 55832402ab..b8c3e59c5c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -388,7 +388,7 @@ public class MasterExecThread implements Runnable { */ private void buildFlowDag() throws Exception { recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam()); - List taskNodeList = processService.getTaskNodeListByDefinitionId(processInstance.getProcessDefinitionId()); + List taskNodeList = processService.genTaskNodeList(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); forbiddenTaskList.clear(); taskNodeList.stream().forEach(taskNode -> { if (taskNode.isForbidden()) { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java index 0f6239af0e..52ed4a96c5 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java @@ -17,11 +17,13 @@ package org.apache.dolphinscheduler.server.master; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.model.DateInterval; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.utils.dependent.DependentDateUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.DependentTaskExecThread; @@ -182,11 +184,12 @@ public class DependentTaskTest { } - private List getTaskNodes(){ - List list = new ArrayList<>(); - TaskNode taskNode = new TaskNode(); + private List getTaskNodes(){ + List list = new ArrayList<>(); + TaskDefinition taskNode = new TaskDefinition(); + taskNode.setCode(1111L); taskNode.setName("C"); - taskNode.setType("SQL"); + taskNode.setTaskType(TaskType.SQL); list.add(taskNode); return list; } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 6b0e8c315c..83cc50bcaa 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -344,25 +344,23 @@ public class ProcessService { /** * get task node list by definitionId */ - public List getTaskNodeListByDefinitionId(Integer defineId) { + public List getTaskNodeListByDefinitionId(Integer defineId) { ProcessDefinition processDefinition = processDefineMapper.selectById(defineId); if (processDefinition == null) { logger.error("process define not exists"); return new ArrayList<>(); } - List processTaskRelations = getProcessTaskRelationList(processDefinition.getCode(), processDefinition.getVersion()); Map taskDefinitionMap = new HashMap<>(); for (ProcessTaskRelation processTaskRelation : processTaskRelations) { if (taskDefinitionMap.containsKey(processTaskRelation.getPostTaskCode())) { - TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(processTaskRelation.getPostTaskCode()); + TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion( + processTaskRelation.getPostTaskCode(), processTaskRelation.getPostNodeVersion()); taskDefinitionMap.put(processTaskRelation.getPostTaskCode(), taskDefinition); } } - return taskDefinitionMap.entrySet() - .stream() - .map(e -> JSONUtils.parseObject(JSONUtils.toJsonString(e.getValue()), TaskNode.class)) - .collect(Collectors.toList()); + return new ArrayList<>(taskDefinitionMap.values()); + } /** @@ -394,23 +392,12 @@ public class ProcessService { public ProcessDefinition findProcessDefinition(Long processDefinitionCode, int version) { ProcessDefinition processDefinition = processDefineMapper.queryByCode(processDefinitionCode); if (processDefinition.getVersion() != version) { - ProcessDefinitionLog log = processDefineLogMapper.queryByDefinitionCodeAndVersion(processDefinitionCode, version); - processDefinition = convertFromLog(log); + processDefinition = processDefineLogMapper.queryByDefinitionCodeAndVersion(processDefinitionCode, version); + processDefinition.setId(0); } return processDefinition; } - /** - * covert log to process definition - */ - public ProcessDefinition convertFromLog(ProcessDefinitionLog processDefinitionLog) { - ProcessDefinition definition = processDefinitionLog; - if (null != definition) { - definition.setId(0); - } - return definition; - } - /** * delete work process instance by id * @@ -500,11 +487,13 @@ public class ProcessService { * @param ids ids */ public void recurseFindSubProcessId(int parentId, List ids) { - List taskNodeList = this.getTaskNodeListByDefinitionId(parentId); + List taskNodeList = this.getTaskNodeListByDefinitionId(parentId); + + if (taskNodeList != null && !taskNodeList.isEmpty()) { - for (TaskNode taskNode : taskNodeList) { - String parameter = taskNode.getParams(); + for (TaskDefinition taskNode : taskNodeList) { + String parameter = taskNode.getTaskParams(); ObjectNode parameterJson = JSONUtils.parseObject(parameter); if (parameterJson.get(CMD_PARAM_SUB_PROCESS_DEFINE_ID) != null) { SubProcessParameters subProcessParam = JSONUtils.parseObject(parameter, SubProcessParameters.class); @@ -2404,7 +2393,7 @@ public class ProcessService { * @return dag graph */ public DAG genDagGraph(ProcessDefinition processDefinition) { - List taskNodeList = this.getTaskNodeListByDefinitionId(processDefinition.getId()); + List taskNodeList = genTaskNodeList(processDefinition.getCode(), processDefinition.getVersion()); List processTaskRelations = getProcessTaskRelationList(processDefinition.getCode(), processDefinition.getVersion()); ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, processTaskRelations); // Generate concrete Dag to be executed From 6d087ccce0b899966036a4a09f6acecf0903a19f Mon Sep 17 00:00:00 2001 From: Simon Date: Mon, 1 Mar 2021 15:01:17 +0800 Subject: [PATCH 2/2] [Feature][JsonSplit] fix processDefinitionService ut and bug (#4894) * Modify Project and ProjectUser Mapper * Modify Project and ProjectUser Mapper * project_code is bigint(20) * modify ERROR name * modify saveProcessDefine, remove the duplicate code with createTaskAndRelation * modify import/export processdefinition, add genProcessData * fix ut and bug * code style --- .../impl/ProcessDefinitionServiceImpl.java | 9 +- .../service/ProcessDefinitionServiceTest.java | 90 +++++++++--- .../common/utils/VarPoolUtilsTest.java | 133 ++++++++++++++---- .../service/process/ProcessServiceTest.java | 19 ++- 4 files changed, 190 insertions(+), 61 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 5abd71a915..afa362f636 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -354,12 +354,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro ProcessDefinition processDefinition = processDefinitionMapper.selectById(processId); - ProcessData processData = processService.genProcessData(processDefinition); - processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData)); - if (processDefinition == null) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); } else { + ProcessData processData = processService.genProcessData(processDefinition); + processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData)); result.put(Constants.DATA_LIST, processDefinition); putMsg(result, Status.SUCCESS); } @@ -379,12 +378,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getId(), processDefinitionName); - ProcessData processData = processService.genProcessData(processDefinition); - processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData)); if (processDefinition == null) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName); } else { + ProcessData processData = processService.genProcessData(processDefinition); + processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData)); result.put(Constants.DATA_LIST, processDefinition); putMsg(result, Status.SUCCESS); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 37ab31dada..589ab76767 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; @@ -48,6 +49,7 @@ import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; @@ -238,6 +240,8 @@ public class ProcessDefinitionServiceTest { @Mock private ProcessDefinitionMapper processDefineMapper; @Mock + private ProcessTaskRelationMapper processTaskRelationMapper; + @Mock private ProjectMapper projectMapper; @Mock private ProjectServiceImpl projectService; @@ -342,6 +346,17 @@ public class ProcessDefinitionServiceTest { putMsg(result, Status.SUCCESS, projectName); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); Mockito.when(processDefineMapper.selectById(1)).thenReturn(null); + + String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" + + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}" + + ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\"" + + ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}" + + ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\"" + + ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\"" + + ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}"; + ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData); + Map instanceNotexitRes = processDefinitionService.queryProcessDefinitionById(loginUser, "project_test1", 1); Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS)); @@ -376,13 +391,23 @@ public class ProcessDefinitionServiceTest { //project check auth success, instance not exist putMsg(result, Status.SUCCESS, projectName); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); - Mockito.when(processDefineMapper.queryByDefineName(project.getId(),"test_def")).thenReturn(null); + Mockito.when(processDefineMapper.queryByDefineName(project.getId(), "test_def")).thenReturn(null); + + String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" + + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}" + + ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\"" + + ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}" + + ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\"" + + ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\"" + + ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}"; + ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData); Map instanceNotexitRes = processDefinitionService.queryProcessDefinitionByName(loginUser, "project_test1", "test_def"); Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS)); //instance exit - Mockito.when(processDefineMapper.queryByDefineName(project.getId(),"test")).thenReturn(getProcessDefinition()); + Mockito.when(processDefineMapper.queryByDefineName(project.getId(), "test")).thenReturn(getProcessDefinition()); Map successRes = processDefinitionService.queryProcessDefinitionByName(loginUser, "project_test1", "test"); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); @@ -393,6 +418,11 @@ public class ProcessDefinitionServiceTest { String projectName = "project_test1"; Project project = getProject(projectName); + String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\"," + + "\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234" + + "\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," + + "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false}," + + "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}"; User loginUser = new User(); loginUser.setId(-1); @@ -435,15 +465,24 @@ public class ProcessDefinitionServiceTest { // instance exit ProcessDefinition definition = getProcessDefinition(); definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"); - definition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\"," - + "\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234" - + "\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," - + "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false}," - + "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}"); + definition.setProcessDefinitionJson(processDefinitionJson); definition.setConnects("[]"); Mockito.when(processDefineMapper.selectById(46)).thenReturn(definition); + Mockito.when(processService.saveProcessDefinition(Mockito.eq(loginUser) + , Mockito.eq(project2) + , Mockito.anyString() + , Mockito.anyString() + , Mockito.anyString() + , Mockito.anyString() + , Mockito.any(ProcessData.class) + , Mockito.any(ProcessDefinition.class))) + .thenReturn(1); + + ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData); + Map map3 = processDefinitionService.batchCopyProcessDefinition( loginUser, projectName, "46", 1); Assert.assertEquals(Status.SUCCESS, map3.get(Constants.STATUS)); @@ -568,6 +607,7 @@ public class ProcessDefinitionServiceTest { schedules.add(schedule); Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(schedules); Mockito.when(processDefineMapper.deleteById(46)).thenReturn(0); + Mockito.when(processTaskRelationMapper.deleteByCode(null, null)).thenReturn(0); Map deleteFail = processDefinitionService.deleteProcessDefinitionById(loginUser, "project_test1", 46); Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, deleteFail.get(Constants.STATUS)); @@ -708,6 +748,7 @@ public class ProcessDefinitionServiceTest { Assert.assertEquals(Status.DATA_IS_NOT_VALID, successRes.get(Constants.STATUS)); //success + Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(new ProcessData()); processDefinition.setProcessDefinitionJson(SHELL_JSON); Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); Map dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionId(46); @@ -729,6 +770,16 @@ public class ProcessDefinitionServiceTest { List processDefinitionList = new ArrayList<>(); processDefinitionList.add(processDefinition); Mockito.when(processDefineMapper.queryDefinitionListByIdList(idArray)).thenReturn(processDefinitionList); + String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" + + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}" + + ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\"" + + ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}" + + ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\"" + + ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\"" + + ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}"; + ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + Mockito.when(processService.genProcessData(processDefinition)).thenReturn(processData); + Map successRes = processDefinitionService.getTaskNodeListByDefinitionIdList(defineIdList); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); } @@ -776,12 +827,11 @@ public class ProcessDefinitionServiceTest { //task instance not exist Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); Mockito.when(processInstanceService.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList); - Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), "shell-1")).thenReturn(null); + Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>()); Map taskNullRes = processDefinitionService.viewTree(46, 10); Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS)); //task instance exist - Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), "shell-1")).thenReturn(taskInstance); Map taskNotNuLLRes = processDefinitionService.viewTree(46, 10); Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS)); @@ -843,8 +893,8 @@ public class ProcessDefinitionServiceTest { + "}"); //task instance exist Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); + Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>()); Mockito.when(processInstanceService.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList); - Mockito.when(taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), "shell-1")).thenReturn(taskInstance); Map taskNotNuLLRes = processDefinitionService.viewTree(46, 10); Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS)); @@ -944,7 +994,6 @@ public class ProcessDefinitionServiceTest { Mockito.when(projectMapper.queryByName(currentProjectName)).thenReturn(getProject(currentProjectName)); Mockito.when(projectService.checkProjectAndAuth(loginUser, getProject(currentProjectName), currentProjectName)).thenReturn(result); - Mockito.when(processDefineMapper.queryByDefineId(46)).thenReturn(shellDefinition2); Map importProcessResult = processDefinitionService.importProcessDefinition(loginUser, multipartFile, currentProjectName); @@ -972,7 +1021,6 @@ public class ProcessDefinitionServiceTest { Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName)); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); Mockito.when(processService.findProcessDefineById(1)).thenReturn(processDefinition); - Mockito.when(processDefinitionVersionService.addProcessDefinitionVersion(processDefinition)).thenReturn(1); String sqlDependentJson = "{\n" + " \"globalParams\": [\n" @@ -1074,6 +1122,13 @@ public class ProcessDefinitionServiceTest { processDefinitionService.batchExportProcessDefinitionByIds( null, null, null, null); + String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" + + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}" + + ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\"" + + ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}" + + ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\"" + + ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\"" + + ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}"; User loginUser = new User(); loginUser.setId(1); loginUser.setUserType(UserType.ADMIN_USER); @@ -1091,13 +1146,7 @@ public class ProcessDefinitionServiceTest { ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setId(1); - processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" - + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}" - + ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\"" - + ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}" - + ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\"" - + ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\"" - + ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}"); + processDefinition.setProcessDefinitionJson(processDefinitionJson); Map checkResult = new HashMap<>(); checkResult.put(Constants.STATUS, Status.SUCCESS); Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project); @@ -1105,6 +1154,9 @@ public class ProcessDefinitionServiceTest { Mockito.when(processDefineMapper.queryByDefineId(1)).thenReturn(processDefinition); HttpServletResponse response = mock(HttpServletResponse.class); + ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + Mockito.when(processService.genProcessData(processDefinition)).thenReturn(processData); + ServletOutputStream outputStream = mock(ServletOutputStream.class); when(response.getOutputStream()).thenReturn(outputStream); processDefinitionService.batchExportProcessDefinitionByIds( diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java index 6713b221bc..4773067d6a 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java @@ -28,8 +28,10 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; + public class VarPoolUtilsTest { - + private static final Logger logger = LoggerFactory.getLogger(VarPoolUtilsTest.class); @Test @@ -37,53 +39,122 @@ public class VarPoolUtilsTest { String varPool = "p1,66$VarPool$p2,69$VarPool$"; ConcurrentHashMap propToValue = new ConcurrentHashMap(); VarPoolUtils.convertVarPoolToMap(propToValue, varPool); - Assert.assertEquals((String)propToValue.get("p1"), "66"); - Assert.assertEquals((String)propToValue.get("p2"), "69"); + Assert.assertEquals((String) propToValue.get("p1"), "66"); + Assert.assertEquals((String) propToValue.get("p2"), "69"); logger.info(propToValue.toString()); } - + @Test public void testConvertPythonScriptPlaceholders() throws Exception { String rawScript = "print(${p1});\n${setShareVar(${p1},3)};\n${setShareVar(${p2},4)};"; rawScript = VarPoolUtils.convertPythonScriptPlaceholders(rawScript); Assert.assertEquals(rawScript, "print(${p1});\n" - + "print(\"${{setValue({},{})}}\".format(\"p1\",3));\n" - + "print(\"${{setValue({},{})}}\".format(\"p2\",4));"); + + "print(\"${{setValue({},{})}}\".format(\"p1\",3));\n" + + "print(\"${{setValue({},{})}}\".format(\"p2\",4));"); logger.info(rawScript); } @Test public void testSetTaskNodeLocalParams() throws Exception { String taskJson = "{\"id\":\"tasks-66199\",\"name\":\"file-shell\",\"desc\":null,\"type\":\"SHELL\"," - + "\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"" - + "params\":{\"rawScript\":\"sh n-1/n-1-1/run.sh\",\"" - + "localParams\":[{\"prop\":\"k1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v1\"},{\"prop\":\"k2\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v2\"}," - + "{\"prop\":\"k3\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v3\"}],\"" - + "resourceList\":[{\"id\":\"dolphinschedule-code\",\"res\":\"n-1/n-1-1/dolphinscheduler-api-server.log\"}," - + "{\"id\":\"mr-code\",\"res\":\"n-1/n-1-1/hadoop-mapreduce-examples-2.7.4.jar\"}," - + "{\"id\":\"run\",\"res\":\"n-1/n-1-1/run.sh\"}]},\"preTasks\":[],\"extras\":null,\"depList\":[],\"" - + "dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"" - + "workerGroup\":\"default\",\"workerGroupId\":null,\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}"; - String changeTaskJson = "{\"id\":\"tasks-66199\",\"name\":\"file-shell\",\"desc\":null,\"type\":\"SHELL\"," - + "\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"" - + "params\":{\"rawScript\":\"sh n-1/n-1-1/run.sh\",\"" - + "localParams\":[{\"prop\":\"k1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"k1-value-change\"}," - + "{\"prop\":\"k2\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"k2-value-change\"}," - + "{\"prop\":\"k3\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v3\"}],\"" - + "resourceList\":[{\"id\":\"dolphinschedule-code\",\"res\":\"n-1/n-1-1/dolphinscheduler-api-server.log\"}," - + "{\"id\":\"mr-code\",\"res\":\"n-1/n-1-1/hadoop-mapreduce-examples-2.7.4.jar\"}," - + "{\"id\":\"run\",\"res\":\"n-1/n-1-1/run.sh\"}]},\"preTasks\":[],\"extras\":null,\"depList\":[],\"" - + "dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"" - + "workerGroup\":\"default\",\"workerGroupId\":null,\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}"; + + "\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"" + + "params\":{\"rawScript\":\"sh n-1/n-1-1/run.sh\",\"" + + "localParams\":[{\"prop\":\"k1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v1\"},{\"prop\":\"k2\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v2\"}," + + "{\"prop\":\"k3\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v3\"}],\"" + + "resourceList\":[{\"id\":\"dolphinschedule-code\",\"res\":\"n-1/n-1-1/dolphinscheduler-api-server.log\"}," + + "{\"id\":\"mr-code\",\"res\":\"n-1/n-1-1/hadoop-mapreduce-examples-2.7.4.jar\"}," + + "{\"id\":\"run\",\"res\":\"n-1/n-1-1/run.sh\"}]},\"preTasks\":[],\"extras\":null,\"depList\":[],\"" + + "dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"" + + "workerGroup\":\"default\",\"workerGroupId\":null,\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}"; + + String changeTaskJson = "{" + + " \"id\":\"tasks-66199\"," + + " \"code\":null," + + " \"version\":0," + + " \"name\":\"file-shell\"," + + " \"desc\":null," + + " \"type\":\"SHELL\"," + + " \"runFlag\":\"NORMAL\"," + + " \"loc\":null," + + " \"maxRetryTimes\":0," + + " \"retryInterval\":1," + + " \"params\":{" + + " \"rawScript\":\"sh n-1/n-1-1/run.sh\"," + + " \"localParams\":[" + + " {" + + " \"prop\":\"k1\"," + + " \"direct\":\"IN\"," + + " \"type\":\"VARCHAR\"," + + " \"value\":\"k1-value-change\"" + + " }," + + " {" + + " \"prop\":\"k2\"," + + " \"direct\":\"IN\"," + + " \"type\":\"VARCHAR\"," + + " \"value\":\"k2-value-change\"" + + " }," + + " {" + + " \"prop\":\"k3\"," + + " \"direct\":\"IN\"," + + " \"type\":\"VARCHAR\"," + + " \"value\":\"v3\"" + + " }" + + " ]," + + " \"resourceList\":[" + + " {" + + " \"id\":\"dolphinschedule-code\"," + + " \"res\":\"n-1/n-1-1/dolphinscheduler-api-server.log\"" + + " }," + + " {" + + " \"id\":\"mr-code\"," + + " \"res\":\"n-1/n-1-1/hadoop-mapreduce-examples-2.7.4.jar\"" + + " }," + + " {" + + " \"id\":\"run\"," + + " \"res\":\"n-1/n-1-1/run.sh\"" + + " }" + + " ]" + + " }," + + " \"preTasks\":[" + + "" + + " ]," + + " \"preTaskNodeList\":null," + + " \"extras\":null," + + " \"depList\":[" + + "" + + " ]," + + " \"dependence\":{" + + "" + + " }," + + " \"conditionResult\":{" + + " \"successNode\":[" + + " \"\"" + + " ]," + + " \"failedNode\":[" + + " \"\"" + + " ]" + + " }," + + " \"taskInstancePriority\":\"MEDIUM\"," + + " \"workerGroup\":\"default\"," + + " \"workerGroupId\":null," + + " \"timeout\":{" + + " \"strategy\":\"\"," + + " \"interval\":null," + + " \"enable\":false" + + " }," + + " \"delayTime\":0" + + "}"; + + ObjectNode jsonNodes = JSONUtils.parseObject(changeTaskJson); Map propToValue = new HashMap(); - propToValue.put("k1","k1-value-change"); - propToValue.put("k2","k2-value-change"); + propToValue.put("k1", "k1-value-change"); + propToValue.put("k2", "k2-value-change"); - TaskNode taskNode = JSONUtils.parseObject(taskJson,TaskNode.class); + TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class); - VarPoolUtils.setTaskNodeLocalParams(taskNode,propToValue); + VarPoolUtils.setTaskNodeLocalParams(taskNode, propToValue); - Assert.assertEquals(changeTaskJson,JSONUtils.toJsonString(taskNode)); + Assert.assertEquals(JSONUtils.toJsonString(jsonNodes), JSONUtils.toJsonString(taskNode).trim()); } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index cfa8caa85c..32ffe99d66 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -34,12 +34,14 @@ import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.CommandMapper; import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest; @@ -72,15 +74,12 @@ public class ProcessServiceTest { @InjectMocks private ProcessService processService; - - @Mock private CommandMapper commandMapper; - - + @Mock + private ProcessTaskRelationLogMapper processTaskRelationLogMapper; @Mock private ErrorCommandMapper errorCommandMapper; - @Mock private ProcessDefinitionMapper processDefineMapper; @Mock @@ -238,6 +237,7 @@ public class ProcessServiceTest { processDefinition.setId(123); processDefinition.setName("test"); processDefinition.setVersion(1); + processDefinition.setCode(11L); processDefinition.setProcessDefinitionJson("{\"globalParams\":[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}],\"tasks\":[{\"conditionResult\":" + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}" + ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\"" @@ -314,6 +314,7 @@ public class ProcessServiceTest { @Test public void testRecurseFindSubProcessId() { ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setCode(10L); processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\"" + ",\"dependence\":{},\"description\":\"\",\"id\":\"tasks-76544\"" @@ -326,6 +327,7 @@ public class ProcessServiceTest { int parentId = 111; List ids = new ArrayList<>(); ProcessDefinition processDefinition2 = new ProcessDefinition(); + processDefinition2.setCode(11L); processDefinition2.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\"" + ":{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}," + "\"description\":\"\",\"id\":\"tasks-76544\",\"maxRetryTimes\":\"0\",\"name\":\"test\"," @@ -334,7 +336,12 @@ public class ProcessServiceTest { + "\"MEDIUM\",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":" + "\"SHELL\",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}"); Mockito.when(processDefineMapper.selectById(parentId)).thenReturn(processDefinition); - Mockito.when(processDefineMapper.selectById(222)).thenReturn(processDefinition2); + + List relationLogList = new ArrayList<>(); + Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong() + , Mockito.anyInt())) + .thenReturn(relationLogList); + processService.recurseFindSubProcessId(parentId, ids); }