diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 34e7cabf6b..582d75178d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -621,17 +621,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * get process definition list by ids */ private List getProcessDefinitionList(String processDefinitionIds) { - List processDefinitionList = new ArrayList<>(); String[] processDefinitionIdArray = processDefinitionIds.split(","); + + List processDefinitionList = new ArrayList<>(); for (String strProcessDefinitionId : processDefinitionIdArray) { //get workflow info int processDefinitionId = Integer.parseInt(strProcessDefinitionId); ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(processDefinitionId); - String processDefinitionJson = JSONUtils.toJsonString(processService.genProcessData(processDefinition)); - processDefinition.setProcessDefinitionJson(processDefinitionJson); - processDefinitionList.add(exportProcessMetaData(processDefinitionId, processDefinition)); + processDefinitionList.add(exportProcessMetaData(processDefinition)); } - return processDefinitionList; } @@ -671,39 +669,25 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * get export process metadata string * - * @param processDefinitionId process definition id * @param processDefinition process definition * @return export process metadata string */ - public String exportProcessMetaDataStr(Integer processDefinitionId, ProcessDefinition processDefinition) { - //create workflow json file - return JSONUtils.toJsonString(exportProcessMetaData(processDefinitionId, processDefinition)); - } - - /** - * get export process metadata string - * - * @param processDefinitionId process definition id - * @param processDefinition process definition - * @return export process metadata string - */ - public ProcessMeta exportProcessMetaData(Integer processDefinitionId, ProcessDefinition processDefinition) { - String processDefinitionJson = processDefinition.getProcessDefinitionJson(); + public ProcessMeta exportProcessMetaData(ProcessDefinition processDefinition) { + ProcessData processData = processService.genProcessData(processDefinition); //correct task param which has data source or dependent param - String correctProcessDefinitionJson = addExportTaskNodeSpecialParam(processDefinitionJson); - processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson); + addExportTaskNodeSpecialParam(processData); //export process metadata ProcessMeta exportProcessMeta = new ProcessMeta(); exportProcessMeta.setProjectName(processDefinition.getProjectName()); exportProcessMeta.setProcessDefinitionName(processDefinition.getName()); - exportProcessMeta.setProcessDefinitionJson(processDefinitionJson); + exportProcessMeta.setProcessDefinitionJson(JSONUtils.toJsonString(processService.genProcessData(processDefinition))); exportProcessMeta.setProcessDefinitionDescription(processDefinition.getDescription()); exportProcessMeta.setProcessDefinitionLocations(processDefinition.getLocations()); exportProcessMeta.setProcessDefinitionConnects(processDefinition.getConnects()); //schedule info - List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId); + List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinition.getId()); if (!schedules.isEmpty()) { Schedule schedule = schedules.get(0); exportProcessMeta.setScheduleWarningType(schedule.getWarningType().toString()); @@ -723,26 +707,21 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * correct task param which has datasource or dependent * - * @param processDefinitionJson processDefinitionJson + * @param processData process data * @return correct processDefinitionJson */ - private String addExportTaskNodeSpecialParam(String processDefinitionJson) { - ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson); - ArrayNode jsonArray = (ArrayNode) jsonObject.path(TASKS); - - for (int i = 0; i < jsonArray.size(); i++) { - JsonNode taskNode = jsonArray.path(i); - if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { - String taskType = taskNode.path("type").asText(); - - ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - if (null != addTaskParam) { - addTaskParam.addExportSpecialParam(taskNode); - } + private void addExportTaskNodeSpecialParam(ProcessData processData) { + List taskNodeList = processData.getTasks(); + List tmpNodeList = new ArrayList<>(); + for (TaskNode taskNode : taskNodeList) { + ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskNode.getType()); + JsonNode jsonNode = JSONUtils.toJsonNode(taskNode); + if (null != addTaskParam) { + addTaskParam.addExportSpecialParam(jsonNode); } + tmpNodeList.add(JSONUtils.parseObject(jsonNode.toString(), TaskNode.class)); } - jsonObject.set(TASKS, jsonArray); - return jsonObject.toString(); + processData.setTasks(tmpNodeList); } /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 69480618d9..95e3f5c4a5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -453,7 +453,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce int update = processService.updateProcessInstance(processInstance); int updateDefine = 1; if (Boolean.TRUE.equals(syncDefine)) { - updateDefine = syncDefinition(loginUser, project, processInstanceJson, locations, connects, + updateDefine = syncDefinition(loginUser, project, locations, connects, processInstance, processDefinition, processData); } if (update > 0 && updateDefine > 0) { @@ -467,12 +467,11 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce /** * sync definition according process instance */ - private int syncDefinition(User loginUser, Project project, String processInstanceJson, String locations, String connects, + private int syncDefinition(User loginUser, Project project, String locations, String connects, ProcessInstance processInstance, ProcessDefinition processDefinition, ProcessData processData) { String originDefParams = JSONUtils.toJsonString(processData.getGlobalParams()); - processDefinition.setProcessDefinitionJson(processInstanceJson); processDefinition.setGlobalParams(originDefParams); processDefinition.setLocations(locations); processDefinition.setConnects(connects); @@ -513,7 +512,6 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce if (tenant != null) { processInstance.setTenantCode(tenant.getTenantCode()); } - processInstance.setProcessInstanceJson(processInstanceJson); processInstance.setGlobalParams(globalParams); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java index 96d6ad264b..69d9b22474 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java @@ -184,7 +184,6 @@ public class ProcessDefinitionControllerTest { processDefinition.setId(id); processDefinition.setLocations(locations); processDefinition.setName(name); - processDefinition.setProcessDefinitionJson(json); Map result = new HashMap<>(); putMsg(result, Status.SUCCESS); @@ -267,7 +266,6 @@ public class ProcessDefinitionControllerTest { processDefinition.setId(id); processDefinition.setLocations(locations); processDefinition.setName(name); - processDefinition.setProcessDefinitionJson(json); String name2 = "dag_test"; int id2 = 2; @@ -279,7 +277,6 @@ public class ProcessDefinitionControllerTest { processDefinition2.setId(id2); processDefinition2.setLocations(locations); processDefinition2.setName(name2); - processDefinition2.setProcessDefinitionJson(json); resourceList.add(processDefinition); resourceList.add(processDefinition2); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 4f9620824f..234dd4dc4d 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; import org.apache.dolphinscheduler.common.utils.DateUtils; @@ -391,14 +392,7 @@ public class ProcessDefinitionServiceTest { Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test_def")).thenReturn(null); - String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" - + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}" - + ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\"" - + ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}" - + ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\"" - + ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\"" - + ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}"; - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + ProcessData processData = getProcessData(); Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData); Map instanceNotexitRes = processDefinitionService.queryProcessDefinitionByName(loginUser, "project_test1", "test_def"); @@ -416,12 +410,6 @@ public class ProcessDefinitionServiceTest { String projectName = "project_test1"; Project project = getProject(projectName); - String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\"," - + "\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234" - + "\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," - + "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false}," - + "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}"; - User loginUser = new User(); loginUser.setId(-1); loginUser.setUserType(UserType.GENERAL_USER); @@ -463,7 +451,6 @@ public class ProcessDefinitionServiceTest { // instance exit ProcessDefinition definition = getProcessDefinition(); definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"); - definition.setProcessDefinitionJson(processDefinitionJson); definition.setConnects("[]"); Mockito.when(processDefineMapper.selectById(46)).thenReturn(definition); @@ -478,8 +465,7 @@ public class ProcessDefinitionServiceTest { , Mockito.any(ProcessDefinition.class))) .thenReturn(1); - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData); + Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(getProcessData()); Map map3 = processDefinitionService.batchCopyProcessDefinition( loginUser, projectName, "46", 1); @@ -518,11 +504,6 @@ public class ProcessDefinitionServiceTest { ProcessDefinition definition = getProcessDefinition(); definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"); - definition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\"" - + ",\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234" - + "\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," - + "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false}," - + "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}"); definition.setConnects("[]"); // check target project result == null @@ -656,16 +637,6 @@ public class ProcessDefinitionServiceTest { loginUser, "project_test1", 46, ReleaseState.getEnum(2)); Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failRes.get(Constants.STATUS)); - //FIXME has function exit code 1 when exception - //process definition offline - // List schedules = new ArrayList<>(); - // Schedule schedule = getSchedule(); - // schedules.add(schedule); - // Mockito.when(scheduleMapper.selectAllByProcessDefineArray(new int[]{46})).thenReturn(schedules); - // Mockito.when(scheduleMapper.updateById(schedule)).thenReturn(1); - // Map offlineRes = processDefinitionService.releaseProcessDefinition(loginUser, "project_test1", - // 46, ReleaseState.OFFLINE.getCode()); - // Assert.assertEquals(Status.SUCCESS, offlineRes.get(Constants.STATUS)); } @Test @@ -746,7 +717,6 @@ public class ProcessDefinitionServiceTest { //success Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(new ProcessData()); - processDefinition.setProcessDefinitionJson(SHELL_JSON); Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); Map dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionId(46); Assert.assertEquals(Status.SUCCESS, dataNotValidRes.get(Constants.STATUS)); @@ -763,29 +733,37 @@ public class ProcessDefinitionServiceTest { //process definition exist ProcessDefinition processDefinition = getProcessDefinition(); - processDefinition.setProcessDefinitionJson(SHELL_JSON); List processDefinitionList = new ArrayList<>(); processDefinitionList.add(processDefinition); Mockito.when(processDefineMapper.queryDefinitionListByIdList(idArray)).thenReturn(processDefinitionList); - String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" - + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}" - + ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\"" - + ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}" - + ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\"" - + ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\"" - + ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}"; - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + ProcessData processData = getProcessData(); Mockito.when(processService.genProcessData(processDefinition)).thenReturn(processData); Map successRes = processDefinitionService.getTaskNodeListByDefinitionIdList(defineIdList); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); } + private ProcessData getProcessData() { + ProcessData processData = new ProcessData(); + List taskNodeList = new ArrayList<>(); + processData.setTasks(taskNodeList); + List properties = new ArrayList<>(); + processData.setGlobalParams(properties); + processData.setTenantId(10); + processData.setTimeout(100); + return processData; + } + @Test public void testQueryProcessDefinitionAllByProjectId() { int projectId = 1; + Long projectCode = 2L; + Project project = new Project(); + project.setId(projectId); + project.setCode(projectCode); + Mockito.when(projectMapper.selectById(projectId)).thenReturn(project); + ProcessDefinition processDefinition = getProcessDefinition(); - processDefinition.setProcessDefinitionJson(SHELL_JSON); List processDefinitionList = new ArrayList<>(); processDefinitionList.add(processDefinition); Project test = getProject("test"); @@ -1188,6 +1166,7 @@ public class ProcessDefinitionServiceTest { processDefinition.setProjectId(2); processDefinition.setTenantId(1); processDefinition.setDescription(""); + processDefinition.setCode(9999L); return processDefinition; } @@ -1284,15 +1263,7 @@ public class ProcessDefinitionServiceTest { Integer processDefinitionId = 111; ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setId(processDefinitionId); - processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" - + "{\"failedNode\":[\"\"],\"successNode\":" - + "[\"\"]},\"delayTime\":\"0\",\"dependence\":{}," - + "\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\"," - + "\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}," - + "\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\"," - + "\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\"," - + "\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}"); - Assert.assertNotNull(processDefinitionService.exportProcessMetaData(processDefinitionId, processDefinition)); + Assert.assertNotNull(processDefinitionService.exportProcessMetaData(processDefinition)); } @Test diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 2adc57b2d6..4c3a3e7d98 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -1582,7 +1582,6 @@ public class ProcessService { String locations, String connects) { ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId); if (processInstance != null) { - processInstance.setProcessInstanceJson(processJson); processInstance.setGlobalParams(globalParams); processInstance.setScheduleTime(scheduleTime); processInstance.setLocations(locations); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 32ffe99d66..364b283c34 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -315,28 +315,11 @@ public class ProcessServiceTest { public void testRecurseFindSubProcessId() { ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setCode(10L); - processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" - + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\"" - + ",\"dependence\":{},\"description\":\"\",\"id\":\"tasks-76544\"" - + ",\"maxRetryTimes\":\"0\",\"name\":\"test\",\"params\":{\"localParams\":[]," - + "\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[],\"processDefinitionId\"" - + ":\"222\"},\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\"," - + "\"taskInstancePriority\":\"MEDIUM\",\"timeout\":{\"enable\":false,\"interval\":" - + "null,\"strategy\":\"\"},\"type\":\"SHELL\",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}]," - + "\"tenantId\":4,\"timeout\":0}"); int parentId = 111; List ids = new ArrayList<>(); ProcessDefinition processDefinition2 = new ProcessDefinition(); processDefinition2.setCode(11L); - processDefinition2.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\"" - + ":{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}," - + "\"description\":\"\",\"id\":\"tasks-76544\",\"maxRetryTimes\":\"0\",\"name\":\"test\"," - + "\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}," - + "\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":" - + "\"MEDIUM\",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":" - + "\"SHELL\",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}"); Mockito.when(processDefineMapper.selectById(parentId)).thenReturn(processDefinition); - List relationLogList = new ArrayList<>(); Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong() , Mockito.anyInt()))