diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 5bccb01a77..f3225a272a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -30,8 +30,6 @@ import org.apache.dolphinscheduler.api.service.SchedulerService; import org.apache.dolphinscheduler.api.utils.CheckUtils; import org.apache.dolphinscheduler.api.utils.FileUtils; import org.apache.dolphinscheduler.api.utils.PageInfo; -import org.apache.dolphinscheduler.api.utils.exportprocess.ProcessAddTaskParam; -import org.apache.dolphinscheduler.api.utils.exportprocess.TaskNodeParamFactory; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.FailureStrategy; @@ -51,6 +49,7 @@ import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils; import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException; import org.apache.dolphinscheduler.common.utils.StreamUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; @@ -62,6 +61,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; @@ -159,6 +159,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro @Autowired private SchedulerService schedulerService; + @Autowired + private DataSourceMapper dataSourceMapper; + /** * create process definition * @@ -720,23 +723,54 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } /** - * correct task param which has datasource or dependent + * Injecting parameters into export process definition + * Because the import and export environment resource IDs may be inconsistent,So inject the resource name + * + * SQL and PROCEDURE node, inject datasourceName + * DEPENDENT node, inject projectName and definitionName * * @param processData process data - * @return correct processDefinitionJson */ private void addExportTaskNodeSpecialParam(ProcessData processData) { - List taskNodeList = processData.getTasks(); - List tmpNodeList = new ArrayList<>(); - for (TaskNode taskNode : taskNodeList) { - ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskNode.getType()); - JsonNode jsonNode = JSONUtils.toJsonNode(taskNode); - if (null != addTaskParam) { - addTaskParam.addExportSpecialParam(jsonNode); + for (TaskNode taskNode : processData.getTasks()) { + if (TaskType.SQL.getDesc().equals(taskNode.getType()) + || TaskType.PROCEDURE.getDesc().equals(taskNode.getType())) { + ObjectNode sqlParameters = JSONUtils.parseObject(taskNode.getParams()); + + DataSource dataSource = dataSourceMapper.selectById( + sqlParameters.path(Constants.TASK_PARAMS_DATASOURCE).asInt()); + + if (dataSource != null) { + sqlParameters.put(Constants.TASK_PARAMS_DATASOURCE_NAME, dataSource.getName()); + taskNode.setParams(JSONUtils.toJsonString(sqlParameters)); + } + } + + if (TaskType.DEPENDENT.getDesc().equals(taskNode.getType())) { + ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.getDependence()); + + if (dependentParameters != null) { + ArrayNode dependTaskList = (ArrayNode)dependentParameters.get( + Constants.TASK_DEPENDENCE_DEPEND_TASK_LIST); + for (int j = 0; j < dependTaskList.size(); j++) { + JsonNode dependentTaskModel = dependTaskList.path(j); + ArrayNode dependItemList = (ArrayNode)dependentTaskModel.get( + Constants.TASK_DEPENDENCE_DEPEND_ITEM_LIST); + for (int k = 0; k < dependItemList.size(); k++) { + ObjectNode dependentItem = (ObjectNode)dependItemList.path(k); + int definitionId = dependentItem.path(Constants.TASK_DEPENDENCE_DEFINITION_ID).asInt(); + ProcessDefinition definition = processDefinitionMapper.queryByDefineId(definitionId); + if (definition != null) { + dependentItem.put(Constants.TASK_DEPENDENCE_PROJECT_NAME, definition.getProjectName()); + dependentItem.put(Constants.TASK_DEPENDENCE_DEFINITION_NAME, definition.getName()); + } + } + } + + taskNode.setDependence(dependentParameters.toString()); + } } - tmpNodeList.add(JSONUtils.parseObject(jsonNode.toString(), TaskNode.class)); } - processData.setTasks(tmpNodeList); } /** @@ -918,15 +952,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro private String addImportTaskNodeParam(User loginUser, String processDefinitionJson, Project targetProject) { ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson); ArrayNode jsonArray = (ArrayNode) jsonObject.get(TASKS); - //add sql and dependent param - for (int i = 0; i < jsonArray.size(); i++) { - JsonNode taskNode = jsonArray.path(i); - String taskType = taskNode.path("type").asText(); - ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - if (null != addTaskParam) { - addTaskParam.addImportSpecialParam(taskNode); - } - } + + addImportTaskNodeSpecialParam(jsonArray); //recursive sub-process parameter correction map key for old process code value for new process code Map subProcessCodeMap = new HashMap<>(); @@ -943,6 +970,65 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return jsonObject.toString(); } + /** + * Replace the injecting parameters in import process definition + * + * SQL and PROCEDURE node, inject datasource by datasourceName + * DEPENDENT node, inject projectId and definitionId by projectName and definitionName + * + * @param jsonArray array node + */ + private void addImportTaskNodeSpecialParam(ArrayNode jsonArray) { + // add sql and dependent param + for (int i = 0; i < jsonArray.size(); i++) { + JsonNode taskNode = jsonArray.path(i); + String taskType = taskNode.path("type").asText(); + + if (TaskType.SQL.getDesc().equals(taskType) || TaskType.PROCEDURE.getDesc().equals(taskType)) { + ObjectNode sqlParameters = (ObjectNode)taskNode.path(Constants.TASK_PARAMS); + + List dataSources = dataSourceMapper.queryDataSourceByName( + sqlParameters.path(Constants.TASK_PARAMS_DATASOURCE_NAME).asText()); + if (!dataSources.isEmpty()) { + DataSource dataSource = dataSources.get(0); + sqlParameters.put(Constants.TASK_PARAMS_DATASOURCE, dataSource.getId()); + } + + ((ObjectNode)taskNode).set(Constants.TASK_PARAMS, sqlParameters); + } + + if (TaskType.DEPENDENT.getDesc().equals(taskType)) { + ObjectNode dependentParameters = (ObjectNode)taskNode.path(Constants.DEPENDENCE); + if (dependentParameters != null) { + ArrayNode dependTaskList = (ArrayNode)dependentParameters.path( + Constants.TASK_DEPENDENCE_DEPEND_TASK_LIST); + for (int h = 0; h < dependTaskList.size(); h++) { + ObjectNode dependentTaskModel = (ObjectNode)dependTaskList.path(h); + ArrayNode dependItemList = (ArrayNode)dependentTaskModel.get( + Constants.TASK_DEPENDENCE_DEPEND_ITEM_LIST); + for (int k = 0; k < dependItemList.size(); k++) { + ObjectNode dependentItem = (ObjectNode)dependItemList.path(k); + Project dependentItemProject = projectMapper.queryByName( + dependentItem.path(Constants.TASK_DEPENDENCE_PROJECT_NAME).asText()); + if (dependentItemProject != null) { + ProcessDefinition definition = processDefinitionMapper.queryByDefineName( + dependentItemProject.getCode(), + dependentItem.path(Constants.TASK_DEPENDENCE_DEFINITION_NAME).asText()); + if (definition != null) { + dependentItem.put(Constants.TASK_DEPENDENCE_PROJECT_ID, + dependentItemProject.getId()); + dependentItem.put(Constants.TASK_DEPENDENCE_DEFINITION_ID, definition.getId()); + } + } + } + } + + ((ObjectNode)taskNode).set(Constants.DEPENDENCE, dependentParameters); + } + } + } + } + /** * import process schedule * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java deleted file mode 100644 index 8572d7b482..0000000000 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.api.utils.exportprocess; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.dolphinscheduler.common.enums.TaskType; -import org.apache.dolphinscheduler.dao.entity.DataSource; -import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.util.List; - -/** - * task node add datasource param strategy - */ -@Service -public class DataSourceParam implements ProcessAddTaskParam, InitializingBean { - - private static final String PARAMS = "params"; - @Autowired - private DataSourceMapper dataSourceMapper; - - /** - * add datasource params - * @param taskNode task node json object - * @return task node json object - */ - @Override - public JsonNode addExportSpecialParam(JsonNode taskNode) { - // add sqlParameters - ObjectNode sqlParameters = (ObjectNode) taskNode.path(PARAMS); - DataSource dataSource = dataSourceMapper.selectById(sqlParameters.get("datasource").asInt()); - if (null != dataSource) { - sqlParameters.put("datasourceName", dataSource.getName()); - } - ((ObjectNode)taskNode).set(PARAMS, sqlParameters); - - return taskNode; - } - - /** - * import process add datasource params - * @param taskNode task node json object - * @return task node json object - */ - @Override - public JsonNode addImportSpecialParam(JsonNode taskNode) { - ObjectNode sqlParameters = (ObjectNode) taskNode.path(PARAMS); - List dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.path("datasourceName").asText()); - if (!dataSources.isEmpty()) { - DataSource dataSource = dataSources.get(0); - sqlParameters.put("datasource", dataSource.getId()); - } - ((ObjectNode)taskNode).set(PARAMS, sqlParameters); - return taskNode; - } - - - /** - * put datasource strategy - */ - @Override - public void afterPropertiesSet() { - TaskNodeParamFactory.register(TaskType.SQL.getDesc(), this); - TaskNodeParamFactory.register(TaskType.PROCEDURE.getDesc(), this); - } -} \ No newline at end of file diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java deleted file mode 100644 index 29746f8f2a..0000000000 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.api.utils.exportprocess; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.dolphinscheduler.common.enums.TaskType; -import org.apache.dolphinscheduler.common.utils.*; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.Project; -import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -/** - * task node add dependent param strategy - */ -@Service -public class DependentParam implements ProcessAddTaskParam, InitializingBean { - - private static final String DEPENDENCE = "dependence"; - - @Autowired - ProcessDefinitionMapper processDefineMapper; - - @Autowired - ProjectMapper projectMapper; - - /** - * add dependent param - * @param taskNode task node json object - * @return task node json object - */ - @Override - public JsonNode addExportSpecialParam(JsonNode taskNode) { - // add dependent param - ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.path(DEPENDENCE).asText()); - - if (null != dependentParameters) { - ArrayNode dependTaskList = (ArrayNode) dependentParameters.get("dependTaskList"); - for (int j = 0; j < dependTaskList.size(); j++) { - JsonNode dependentTaskModel = dependTaskList.path(j); - ArrayNode dependItemList = (ArrayNode) dependentTaskModel.get("dependItemList"); - for (int k = 0; k < dependItemList.size(); k++) { - ObjectNode dependentItem = (ObjectNode) dependItemList.path(k); - int definitionId = dependentItem.path("definitionId").asInt(); - ProcessDefinition definition = processDefineMapper.queryByDefineId(definitionId); - if (null != definition) { - dependentItem.put("projectName", definition.getProjectName()); - dependentItem.put("definitionName", definition.getName()); - } - } - } - ((ObjectNode)taskNode).set(DEPENDENCE, dependentParameters); - } - - return taskNode; - } - - /** - * import process add dependent param - * @param taskNode task node json object - * @return - */ - @Override - public JsonNode addImportSpecialParam(JsonNode taskNode) { - ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.path(DEPENDENCE).asText()); - if(dependentParameters != null){ - ArrayNode dependTaskList = (ArrayNode) dependentParameters.path("dependTaskList"); - for (int h = 0; h < dependTaskList.size(); h++) { - ObjectNode dependentTaskModel = (ObjectNode) dependTaskList.path(h); - ArrayNode dependItemList = (ArrayNode) dependentTaskModel.get("dependItemList"); - for (int k = 0; k < dependItemList.size(); k++) { - ObjectNode dependentItem = (ObjectNode) dependItemList.path(k); - Project dependentItemProject = projectMapper.queryByName(dependentItem.path("projectName").asText()); - if(dependentItemProject != null){ - ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getCode(),dependentItem.path("definitionName").asText()); - if(definition != null){ - dependentItem.put("projectId",dependentItemProject.getId()); - dependentItem.put("definitionId",definition.getId()); - } - } - } - } - ((ObjectNode)taskNode).set(DEPENDENCE, dependentParameters); - } - return taskNode; - } - - /** - * put dependent strategy - */ - @Override - public void afterPropertiesSet() { - TaskNodeParamFactory.register(TaskType.DEPENDENT.getDesc(), this); - } -} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java deleted file mode 100644 index 8e408556b0..0000000000 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.api.utils.exportprocess; - -import com.fasterxml.jackson.databind.JsonNode; - -/** - * ProcessAddTaskParam - */ -public interface ProcessAddTaskParam { - - /** - * add export task special param: sql task dependent task - * @param taskNode task node json object - * @return task node json object - */ - JsonNode addExportSpecialParam(JsonNode taskNode); - - /** - * add task special param: sql task dependent task - * @param taskNode task node json object - * @return task node json object - */ - JsonNode addImportSpecialParam(JsonNode taskNode); -} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java deleted file mode 100644 index b8f7b03dee..0000000000 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.api.utils.exportprocess; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * task node param factory - */ -public class TaskNodeParamFactory { - - private static Map taskServices = new ConcurrentHashMap<>(); - - public static ProcessAddTaskParam getByTaskType(String taskType){ - return taskServices.get(taskType); - } - - static void register(String taskType, ProcessAddTaskParam addSpecialTaskParam){ - if (null != taskType) { - taskServices.put(taskType, addSpecialTaskParam); - } - } -} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 8ee5b9aedf..fc0b42a072 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -47,6 +47,7 @@ import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; @@ -59,6 +60,7 @@ import org.apache.http.entity.ContentType; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.lang.reflect.Method; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; @@ -82,6 +84,9 @@ import org.springframework.web.multipart.MultipartFile; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; /** * process definition service test @@ -129,6 +134,7 @@ public class ProcessDefinitionServiceTest { + " \"tenantId\": 1,\n" + " \"timeout\": 0\n" + "}"; + private static final String CYCLE_SHELL_JSON = "{\n" + " \"globalParams\": [\n" + " \n" @@ -231,25 +237,37 @@ public class ProcessDefinitionServiceTest { + " \"tenantId\": 1,\n" + " \"timeout\": 0\n" + "}"; + @InjectMocks private ProcessDefinitionServiceImpl processDefinitionService; + @Mock - private ProcessDefinitionMapper processDefineMapper; + private ProcessDefinitionMapper processDefinitionMapper; + @Mock private ProcessTaskRelationMapper processTaskRelationMapper; + @Mock private ProjectMapper projectMapper; + @Mock private ProjectServiceImpl projectService; + @Mock private ScheduleMapper scheduleMapper; + @Mock private ProcessService processService; + @Mock private ProcessInstanceService processInstanceService; + @Mock private TaskInstanceMapper taskInstanceMapper; + @Mock + private DataSourceMapper dataSourceMapper; + @Test public void testQueryProcessDefinitionList() { String projectName = "project_test1"; @@ -273,7 +291,7 @@ public class ProcessDefinitionServiceTest { Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); List resourceList = new ArrayList<>(); resourceList.add(getProcessDefinition()); - Mockito.when(processDefineMapper.queryAllDefinitionList(project.getCode())).thenReturn(resourceList); + Mockito.when(processDefinitionMapper.queryAllDefinitionList(project.getCode())).thenReturn(resourceList); Map checkSuccessRes = processDefinitionService.queryProcessDefinitionList(loginUser, "project_test1"); Assert.assertEquals(Status.SUCCESS, checkSuccessRes.get(Constants.STATUS)); } @@ -303,7 +321,7 @@ public class ProcessDefinitionServiceTest { Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); Page page = new Page<>(1, 10); page.setTotal(30); - Mockito.when(processDefineMapper.queryDefineListPaging( + Mockito.when(processDefinitionMapper.queryDefineListPaging( Mockito.any(IPage.class) , Mockito.eq("") , Mockito.eq(loginUser.getId()) @@ -339,7 +357,7 @@ public class ProcessDefinitionServiceTest { //project check auth success, instance not exist putMsg(result, Status.SUCCESS, projectName); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); - Mockito.when(processDefineMapper.selectById(1)).thenReturn(null); + Mockito.when(processDefinitionMapper.selectById(1)).thenReturn(null); String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}" @@ -356,7 +374,7 @@ public class ProcessDefinitionServiceTest { Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS)); //instance exit - Mockito.when(processDefineMapper.selectById(46)).thenReturn(getProcessDefinition()); + Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(getProcessDefinition()); Map successRes = processDefinitionService.queryProcessDefinitionById(loginUser, "project_test1", 46); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); @@ -385,7 +403,7 @@ public class ProcessDefinitionServiceTest { //project check auth success, instance not exist putMsg(result, Status.SUCCESS, projectName); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); - Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test_def")).thenReturn(null); + Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), "test_def")).thenReturn(null); ProcessData processData = getProcessData(); Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData); @@ -394,7 +412,7 @@ public class ProcessDefinitionServiceTest { Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS)); //instance exit - Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test")).thenReturn(getProcessDefinition()); + Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), "test")).thenReturn(getProcessDefinition()); Map successRes = processDefinitionService.queryProcessDefinitionByName(loginUser, "project_test1", "test"); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); @@ -448,7 +466,7 @@ public class ProcessDefinitionServiceTest { definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"); definition.setConnects("[]"); - Mockito.when(processDefineMapper.selectById(46)).thenReturn(definition); + Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(definition); Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(getProcessData()); Map map3 = processDefinitionService.batchCopyProcessDefinition( @@ -517,7 +535,7 @@ public class ProcessDefinitionServiceTest { //project check auth success, instance not exist putMsg(result, Status.SUCCESS, projectName); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); - Mockito.when(processDefineMapper.selectById(1)).thenReturn(null); + Mockito.when(processDefinitionMapper.selectById(1)).thenReturn(null); Map instanceNotexitRes = processDefinitionService.deleteProcessDefinitionById(loginUser, "project_test1", 1); Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS)); @@ -525,7 +543,7 @@ public class ProcessDefinitionServiceTest { ProcessDefinition processDefinition = getProcessDefinition(); //user no auth loginUser.setUserType(UserType.GENERAL_USER); - Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); + Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition); Map userNoAuthRes = processDefinitionService.deleteProcessDefinitionById(loginUser, "project_test1", 46); Assert.assertEquals(Status.USER_NO_OPERATION_PERM, userNoAuthRes.get(Constants.STATUS)); @@ -533,14 +551,14 @@ public class ProcessDefinitionServiceTest { //process definition online loginUser.setUserType(UserType.ADMIN_USER); processDefinition.setReleaseState(ReleaseState.ONLINE); - Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); + Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition); Map dfOnlineRes = processDefinitionService.deleteProcessDefinitionById(loginUser, "project_test1", 46); Assert.assertEquals(Status.PROCESS_DEFINE_STATE_ONLINE, dfOnlineRes.get(Constants.STATUS)); //scheduler list elements > 1 processDefinition.setReleaseState(ReleaseState.OFFLINE); - Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); + Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition); List schedules = new ArrayList<>(); schedules.add(getSchedule()); schedules.add(getSchedule()); @@ -564,13 +582,13 @@ public class ProcessDefinitionServiceTest { schedule.setReleaseState(ReleaseState.OFFLINE); schedules.add(schedule); Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(schedules); - Mockito.when(processDefineMapper.deleteById(46)).thenReturn(0); + Mockito.when(processDefinitionMapper.deleteById(46)).thenReturn(0); Map deleteFail = processDefinitionService.deleteProcessDefinitionById(loginUser, "project_test1", 46); Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, deleteFail.get(Constants.STATUS)); //delete success - Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1); + Mockito.when(processDefinitionMapper.deleteById(46)).thenReturn(1); Map deleteSuccess = processDefinitionService.deleteProcessDefinitionById(loginUser, "project_test1", 46); Assert.assertEquals(Status.SUCCESS, deleteSuccess.get(Constants.STATUS)); @@ -597,7 +615,7 @@ public class ProcessDefinitionServiceTest { // project check auth success, processs definition online putMsg(result, Status.SUCCESS, projectName); - Mockito.when(processDefineMapper.selectById(46)).thenReturn(getProcessDefinition()); + Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(getProcessDefinition()); Map onlineRes = processDefinitionService.releaseProcessDefinition( loginUser, "project_test1", 46, ReleaseState.ONLINE); Assert.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS)); @@ -605,7 +623,7 @@ public class ProcessDefinitionServiceTest { // project check auth success, processs definition online ProcessDefinition processDefinition1 = getProcessDefinition(); processDefinition1.setResourceIds("1,2"); - Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition1); + Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition1); Mockito.when(processService.getUserById(1)).thenReturn(loginUser); Map onlineWithResourceRes = processDefinitionService.releaseProcessDefinition( loginUser, "project_test1", 46, ReleaseState.ONLINE); @@ -638,13 +656,13 @@ public class ProcessDefinitionServiceTest { //project check auth success, process not exist putMsg(result, Status.SUCCESS, projectName); - Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(null); + Mockito.when(processDefinitionMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(null); Map processNotExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser, "project_test1", "test_pdf"); Assert.assertEquals(Status.SUCCESS, processNotExistRes.get(Constants.STATUS)); //process exist - Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(getProcessDefinition()); + Mockito.when(processDefinitionMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(getProcessDefinition()); Map processExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser, "project_test1", "test_pdf"); Assert.assertEquals(Status.PROCESS_DEFINITION_NAME_EXIST, processExistRes.get(Constants.STATUS)); @@ -684,19 +702,19 @@ public class ProcessDefinitionServiceTest { @Test public void testGetTaskNodeListByDefinitionId() { //process definition not exist - Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(null); + Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(null); Map processDefinitionNullRes = processDefinitionService.getTaskNodeListByDefinitionCode(46L); Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS)); //process data null ProcessDefinition processDefinition = getProcessDefinition(); - Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); + Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition); Map successRes = processDefinitionService.getTaskNodeListByDefinitionCode(46L); Assert.assertEquals(Status.DATA_IS_NOT_VALID, successRes.get(Constants.STATUS)); //success Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(new ProcessData()); - Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); + Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition); Map dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionCode(46L); Assert.assertEquals(Status.SUCCESS, dataNotValidRes.get(Constants.STATUS)); } @@ -707,7 +725,7 @@ public class ProcessDefinitionServiceTest { String defineCodeList = "46"; Long[] codeArray = {46L}; List codeList = Arrays.asList(codeArray); - Mockito.when(processDefineMapper.queryByCodes(codeList)).thenReturn(null); + Mockito.when(processDefinitionMapper.queryByCodes(codeList)).thenReturn(null); Map processNotExistRes = processDefinitionService.getTaskNodeListByDefinitionCodeList(defineCodeList); Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processNotExistRes.get(Constants.STATUS)); @@ -715,7 +733,7 @@ public class ProcessDefinitionServiceTest { ProcessDefinition processDefinition = getProcessDefinition(); List processDefinitionList = new ArrayList<>(); processDefinitionList.add(processDefinition); - Mockito.when(processDefineMapper.queryByCodes(codeList)).thenReturn(processDefinitionList); + Mockito.when(processDefinitionMapper.queryByCodes(codeList)).thenReturn(processDefinitionList); ProcessData processData = getProcessData(); Mockito.when(processService.genProcessData(processDefinition)).thenReturn(processData); @@ -748,7 +766,7 @@ public class ProcessDefinitionServiceTest { processDefinitionList.add(processDefinition); Project test = getProject("test"); Mockito.when(projectMapper.selectById(projectId)).thenReturn(test); - Mockito.when(processDefineMapper.queryAllDefinitionList(test.getCode())).thenReturn(processDefinitionList); + Mockito.when(processDefinitionMapper.queryAllDefinitionList(test.getCode())).thenReturn(processDefinitionList); Map successRes = processDefinitionService.queryProcessDefinitionAllByProjectId(projectId); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); } @@ -758,7 +776,7 @@ public class ProcessDefinitionServiceTest { //process definition not exist ProcessDefinition processDefinition = getProcessDefinition(); processDefinition.setProcessDefinitionJson(SHELL_JSON); - Mockito.when(processDefineMapper.selectById(46)).thenReturn(null); + Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(null); Map processDefinitionNullRes = processDefinitionService.viewTree(46, 10); Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS)); @@ -782,7 +800,7 @@ public class ProcessDefinitionServiceTest { taskInstance.setHost("192.168.xx.xx"); //task instance not exist - Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); + Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition); Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>()); Map taskNullRes = processDefinitionService.viewTree(46, 10); Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS)); @@ -817,7 +835,7 @@ public class ProcessDefinitionServiceTest { taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); taskInstance.setHost("192.168.xx.xx"); taskInstance.setTaskParams("\"processDefinitionId\": \"222\",\n"); - Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); + Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition); Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>()); Map taskNotNuLLRes = processDefinitionService.viewTree(46, 10); Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS)); @@ -1035,6 +1053,7 @@ public class ProcessDefinitionServiceTest { + " \"tenantId\": 1,\n" + " \"timeout\": 0\n" + "}"; + Map updateResult = processDefinitionService.updateProcessDefinition(loginUser, projectName, 1, "test", sqlDependentJson, "", "", ""); @@ -1075,7 +1094,7 @@ public class ProcessDefinitionServiceTest { checkResult.put(Constants.STATUS, Status.SUCCESS); Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(checkResult); - Mockito.when(processDefineMapper.queryByDefineId(1)).thenReturn(processDefinition); + Mockito.when(processDefinitionMapper.queryByDefineId(1)).thenReturn(processDefinition); HttpServletResponse response = mock(HttpServletResponse.class); ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); @@ -1218,4 +1237,110 @@ public class ProcessDefinitionServiceTest { Assert.assertEquals(0, processDefinitionService.importProcessSchedule(loginUser, projectName, processMeta, processDefinitionName, processDefinitionId)); } + @Test + public void testAddExportTaskNodeSpecialParam() { + String sqlDependentJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":" + + "\"sql\",\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\",\"udfs\":\"\"" + + ",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\"," + + "\"localParams\":[],\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[]},\"description\":" + + "\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," + + "\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," + + "\"workerGroupId\":-1,\"preTasks\":[\"dependent\"]},{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\"," + + "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":" + + "{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"projectId\":" + + "2,\"definitionId\":46,\"depTasks\":\"ALL\",\"cycle\":\"day\",\"dateValue\":\"today\"}]}]}," + + "\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"enable\":false}," + + "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1," + + "\"timeout\":0}"; + + ProcessData processData = JSONUtils.parseObject(sqlDependentJson, ProcessData.class); + + DataSource dataSource = new DataSource(); + dataSource.setName("testDataSource"); + when(dataSourceMapper.selectById(1)).thenReturn(dataSource); + + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setProjectName("testProjectName"); + processDefinition.setName("testDefinitionName"); + when(processDefinitionMapper.queryByDefineId(46)).thenReturn(processDefinition); + + try { + Class clazz = ProcessDefinitionServiceImpl.class; + Method method = clazz.getDeclaredMethod("addExportTaskNodeSpecialParam", ProcessData.class); + method.setAccessible(true); + method.invoke(processDefinitionService, processData); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } + + List taskNodeList = processData.getTasks(); + ObjectNode sqlParameters = JSONUtils.parseObject(taskNodeList.get(0).getParams()); + Assert.assertEquals("testDataSource", sqlParameters.get(Constants.TASK_PARAMS_DATASOURCE_NAME).asText()); + + ObjectNode dependentParameters = JSONUtils.parseObject(taskNodeList.get(1).getDependence()); + ArrayNode dependTaskList = (ArrayNode)dependentParameters.get(Constants.TASK_DEPENDENCE_DEPEND_TASK_LIST); + JsonNode dependentTaskModel = dependTaskList.path(0); + ArrayNode dependItemList = (ArrayNode)dependentTaskModel.get(Constants.TASK_DEPENDENCE_DEPEND_ITEM_LIST); + ObjectNode dependentItem = (ObjectNode)dependItemList.path(0); + Assert.assertEquals("testProjectName", dependentItem.get(Constants.TASK_DEPENDENCE_PROJECT_NAME).asText()); + Assert.assertEquals("testDefinitionName", dependentItem.get(Constants.TASK_DEPENDENCE_DEFINITION_NAME).asText()); + } + + @Test + public void testAddImportTaskNodeSpecialParam() { + String definitionJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," + + "\"params\":{\"type\":\"MYSQL\",\"datasourceName\":\"testDataSource\",\"sql\":\"select * from test\"," + + "\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":" + + "\"TABLE\",\"localParams\":[],\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[]}," + + "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," + + "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":" + + "\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[\"dependent\"]},{\"type\":\"DEPENDENT\",\"id\":" + + "\"tasks-33787\",\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"," + + "\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":" + + "[{\"projectName\":\"testProjectName\",\"definitionName\":\"testDefinitionName\",\"depTasks\":\"ALL\"" + + ",\"cycle\":\"day\",\"dateValue\":\"today\"}]}]},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\"" + + "timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\"" + + ":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}"; + + ObjectNode jsonObject = JSONUtils.parseObject(definitionJson); + ArrayNode jsonArray = (ArrayNode) jsonObject.get("tasks"); + + List dataSources = new ArrayList<>(); + DataSource dataSource = new DataSource(); + dataSource.setId(1); + dataSources.add(dataSource); + when(dataSourceMapper.queryDataSourceByName("testDataSource")).thenReturn(dataSources); + + Project project = new Project(); + project.setId(1); + project.setCode(1L); + when(projectMapper.queryByName("testProjectName")).thenReturn(project); + + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setId(1); + when(processDefinitionMapper.queryByDefineName(1L, "testDefinitionName")).thenReturn(processDefinition); + + try { + Class clazz = ProcessDefinitionServiceImpl.class; + Method method = clazz.getDeclaredMethod("addImportTaskNodeSpecialParam", ArrayNode.class); + method.setAccessible(true); + method.invoke(processDefinitionService, jsonArray); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } + + ObjectNode sqlParameters = (ObjectNode)jsonArray.path(0).path(Constants.TASK_PARAMS); + Assert.assertEquals(1, sqlParameters.get(Constants.TASK_PARAMS_DATASOURCE).asInt()); + + ObjectNode dependentParameters = (ObjectNode)jsonArray.path(1).path(Constants.DEPENDENCE); + ArrayNode dependTaskList = (ArrayNode)dependentParameters.get(Constants.TASK_DEPENDENCE_DEPEND_TASK_LIST); + JsonNode dependentTaskModel = dependTaskList.path(0); + ArrayNode dependItemList = (ArrayNode)dependentTaskModel.get(Constants.TASK_DEPENDENCE_DEPEND_ITEM_LIST); + ObjectNode dependentItem = (ObjectNode)dependItemList.path(0); + Assert.assertEquals(1, dependentItem.get(Constants.TASK_DEPENDENCE_PROJECT_ID).asInt()); + Assert.assertEquals(1, dependentItem.get(Constants.TASK_DEPENDENCE_DEFINITION_ID).asInt()); + } + } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java deleted file mode 100644 index ceee22fc2c..0000000000 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.api.utils.exportprocess; - -import org.apache.dolphinscheduler.api.controller.AbstractControllerTest; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; - -import org.json.JSONException; -import org.junit.Test; -import org.skyscreamer.jsonassert.JSONAssert; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; - -/** - * DataSourceParamTest - */ -public class DataSourceParamTest extends AbstractControllerTest { - - @Test - public void testAddExportDependentSpecialParam() throws JSONException { - - String sqlJson = "{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," - + "\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\"," - + "\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\"" - + ",\"localParams\":[],\"connParams\":\"\"," - + "\"preStatements\":[],\"postStatements\":[]}," - + "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," - + "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\"," - + "\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," - + "\"preTasks\":[\"dependent\"]}"; - - ObjectNode taskNode = JSONUtils.parseObject(sqlJson); - if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { - String taskType = taskNode.path("type").asText(); - - ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - - JsonNode sql = addTaskParam.addExportSpecialParam(taskNode); - - JSONAssert.assertEquals(taskNode.toString(), sql.toString(), false); - } - } - - @Test - public void testAddImportDependentSpecialParam() throws JSONException { - String sqlJson = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\"," - + "\"type\":\"SQL\",\"params\":{\"postStatements\":[]," - + "\"connParams\":\"\",\"receiversCc\":\"\",\"udfs\":\"\"," - + "\"type\":\"MYSQL\",\"title\":\"\",\"sql\":\"show tables\",\"" - + "preStatements\":[],\"sqlType\":\"1\",\"receivers\":\"\",\"datasource\":1," - + "\"showType\":\"TABLE\",\"localParams\":[],\"datasourceName\":\"dsmetadata\"},\"timeout\"" - + ":{\"enable\":false,\"strategy\":\"\"},\"maxRetryTimes\":\"0\"," - + "\"taskInstancePriority\":\"MEDIUM\",\"name\":\"mysql\",\"dependence\":{}," - + "\"retryInterval\":\"1\",\"preTasks\":[\"dependent\"],\"id\":\"tasks-8745\"}"; - - ObjectNode taskNode = JSONUtils.parseObject(sqlJson); - if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { - String taskType = taskNode.path("type").asText(); - - ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - - JsonNode sql = addTaskParam.addImportSpecialParam(taskNode); - - JSONAssert.assertEquals(taskNode.toString(), sql.toString(), false); - } - } -} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java deleted file mode 100644 index 531856cb28..0000000000 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.api.utils.exportprocess; - -import org.apache.dolphinscheduler.api.controller.AbstractControllerTest; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; - -import org.json.JSONException; -import org.junit.Test; -import org.skyscreamer.jsonassert.JSONAssert; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; - -/** - * DependentParamTest - */ -public class DependentParamTest extends AbstractControllerTest { - - @Test - public void testAddExportDependentSpecialParam() throws JSONException { - String dependentJson = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\"," - + "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"," - + "\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\"," - + "\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\"," - + "\"cycle\":\"day\",\"dateValue\":\"today\"}]}]}}"; - - ObjectNode taskNode = JSONUtils.parseObject(dependentJson); - if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { - String taskType = taskNode.path("type").asText(); - - ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - - JsonNode dependent = addTaskParam.addExportSpecialParam(taskNode); - - JSONAssert.assertEquals(taskNode.toString(), dependent.toString(), false); - } - - String dependentEmpty = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\"," - + "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"}"; - - ObjectNode taskEmpty = JSONUtils.parseObject(dependentEmpty); - if (StringUtils.isNotEmpty(taskEmpty.path("type").asText())) { - String taskType = taskEmpty.path("type").asText(); - - ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - - JsonNode dependent = addTaskParam.addImportSpecialParam(taskEmpty); - - JSONAssert.assertEquals(taskEmpty.toString(), dependent.toString(), false); - } - - } - - @Test - public void testAddImportDependentSpecialParam() throws JSONException { - String dependentJson = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\"" - + ",\"type\":\"DEPENDENT\",\"params\":{},\"timeout\":{\"enable\":false," - + "\"strategy\":\"\"},\"maxRetryTimes\":\"0\",\"taskInstancePriority\":\"MEDIUM\"" - + ",\"name\":\"dependent\"," - + "\"dependence\":{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\"," - + "\"definitionName\":\"shell-1\",\"depTasks\":\"shell-1\",\"projectName\":\"test\"," - + "\"projectId\":1,\"cycle\":\"day\",\"definitionId\":7}],\"relation\":\"AND\"}]," - + "\"relation\":\"AND\"},\"retryInterval\":\"1\",\"preTasks\":[],\"id\":\"tasks-55485\"}"; - - ObjectNode taskNode = JSONUtils.parseObject(dependentJson); - if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { - String taskType = taskNode.path("type").asText(); - - ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - - JsonNode dependent = addTaskParam.addImportSpecialParam(taskNode); - - JSONAssert.assertEquals(taskNode.toString(), dependent.toString(), false); - } - - String dependentEmpty = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\"" - + ",\"type\":\"DEPENDENT\",\"params\":{},\"timeout\":{\"enable\":false," - + "\"strategy\":\"\"},\"maxRetryTimes\":\"0\",\"taskInstancePriority\":\"MEDIUM\"" - + ",\"name\":\"dependent\",\"retryInterval\":\"1\",\"preTasks\":[],\"id\":\"tasks-55485\"}"; - - JsonNode taskNodeEmpty = JSONUtils.parseObject(dependentEmpty); - if (StringUtils.isNotEmpty(taskNodeEmpty.path("type").asText())) { - String taskType = taskNodeEmpty.path("type").asText(); - - ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - - JsonNode dependent = addTaskParam.addImportSpecialParam(taskNode); - - JSONAssert.assertEquals(taskNodeEmpty.toString(), dependent.toString(), false); - } - - } -} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 225dc2367d..53645b7e00 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -1091,4 +1091,17 @@ public final class Constants { public static final boolean DOCKER_MODE = StringUtils.isNotEmpty(System.getenv("DOCKER")); public static final boolean KUBERNETES_MODE = StringUtils.isNotEmpty(System.getenv("KUBERNETES_SERVICE_HOST")) && StringUtils.isNotEmpty(System.getenv("KUBERNETES_SERVICE_PORT")); + /** + * task parameter keys + */ + public static final String TASK_PARAMS = "params"; + public static final String TASK_PARAMS_DATASOURCE = "datasource"; + public static final String TASK_PARAMS_DATASOURCE_NAME = "datasourceName"; + public static final String TASK_DEPENDENCE = "dependence"; + public static final String TASK_DEPENDENCE_DEPEND_TASK_LIST = "dependTaskList"; + public static final String TASK_DEPENDENCE_DEPEND_ITEM_LIST = "dependItemList"; + public static final String TASK_DEPENDENCE_PROJECT_ID = "projectId"; + public static final String TASK_DEPENDENCE_PROJECT_NAME = "projectName"; + public static final String TASK_DEPENDENCE_DEFINITION_ID = "definitionId"; + public static final String TASK_DEPENDENCE_DEFINITION_NAME = "definitionName"; }