Browse Source

[Improvement][Api Module] refactor DataSourceParam and DependentParam, remove spring annotation (#5832)

* fix: refactor api utils class, remove spring annotation.

* fix: Optimization comments

Co-authored-by: wen-hemin <wenhemin@apache.com>
2.0.7-release
wen-hemin 3 years ago committed by GitHub
parent
commit
72d68041e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 130
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 84
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java
  3. 114
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java
  4. 39
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java
  5. 38
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java
  6. 181
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  7. 84
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java
  8. 110
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java
  9. 13
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

130
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -30,8 +30,6 @@ import org.apache.dolphinscheduler.api.service.SchedulerService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.FileUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.exportprocess.ProcessAddTaskParam;
import org.apache.dolphinscheduler.api.utils.exportprocess.TaskNodeParamFactory;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@ -51,6 +49,7 @@ import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.common.utils.StreamUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
@ -62,6 +61,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
@ -159,6 +159,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired
private SchedulerService schedulerService;
@Autowired
private DataSourceMapper dataSourceMapper;
/**
* create process definition
*
@ -720,23 +723,54 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
/**
* correct task param which has datasource or dependent
* Injecting parameters into export process definition
* Because the import and export environment resource IDs may be inconsistentSo inject the resource name
*
* SQL and PROCEDURE node, inject datasourceName
* DEPENDENT node, inject projectName and definitionName
*
* @param processData process data
* @return correct processDefinitionJson
*/
private void addExportTaskNodeSpecialParam(ProcessData processData) {
List<TaskNode> taskNodeList = processData.getTasks();
List<TaskNode> tmpNodeList = new ArrayList<>();
for (TaskNode taskNode : taskNodeList) {
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskNode.getType());
JsonNode jsonNode = JSONUtils.toJsonNode(taskNode);
if (null != addTaskParam) {
addTaskParam.addExportSpecialParam(jsonNode);
for (TaskNode taskNode : processData.getTasks()) {
if (TaskType.SQL.getDesc().equals(taskNode.getType())
|| TaskType.PROCEDURE.getDesc().equals(taskNode.getType())) {
ObjectNode sqlParameters = JSONUtils.parseObject(taskNode.getParams());
DataSource dataSource = dataSourceMapper.selectById(
sqlParameters.path(Constants.TASK_PARAMS_DATASOURCE).asInt());
if (dataSource != null) {
sqlParameters.put(Constants.TASK_PARAMS_DATASOURCE_NAME, dataSource.getName());
taskNode.setParams(JSONUtils.toJsonString(sqlParameters));
}
}
if (TaskType.DEPENDENT.getDesc().equals(taskNode.getType())) {
ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.getDependence());
if (dependentParameters != null) {
ArrayNode dependTaskList = (ArrayNode)dependentParameters.get(
Constants.TASK_DEPENDENCE_DEPEND_TASK_LIST);
for (int j = 0; j < dependTaskList.size(); j++) {
JsonNode dependentTaskModel = dependTaskList.path(j);
ArrayNode dependItemList = (ArrayNode)dependentTaskModel.get(
Constants.TASK_DEPENDENCE_DEPEND_ITEM_LIST);
for (int k = 0; k < dependItemList.size(); k++) {
ObjectNode dependentItem = (ObjectNode)dependItemList.path(k);
int definitionId = dependentItem.path(Constants.TASK_DEPENDENCE_DEFINITION_ID).asInt();
ProcessDefinition definition = processDefinitionMapper.queryByDefineId(definitionId);
if (definition != null) {
dependentItem.put(Constants.TASK_DEPENDENCE_PROJECT_NAME, definition.getProjectName());
dependentItem.put(Constants.TASK_DEPENDENCE_DEFINITION_NAME, definition.getName());
}
}
}
taskNode.setDependence(dependentParameters.toString());
}
}
tmpNodeList.add(JSONUtils.parseObject(jsonNode.toString(), TaskNode.class));
}
processData.setTasks(tmpNodeList);
}
/**
@ -918,15 +952,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
private String addImportTaskNodeParam(User loginUser, String processDefinitionJson, Project targetProject) {
ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson);
ArrayNode jsonArray = (ArrayNode) jsonObject.get(TASKS);
//add sql and dependent param
for (int i = 0; i < jsonArray.size(); i++) {
JsonNode taskNode = jsonArray.path(i);
String taskType = taskNode.path("type").asText();
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
if (null != addTaskParam) {
addTaskParam.addImportSpecialParam(taskNode);
}
}
addImportTaskNodeSpecialParam(jsonArray);
//recursive sub-process parameter correction map key for old process code value for new process code
Map<Long, Long> subProcessCodeMap = new HashMap<>();
@ -943,6 +970,65 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return jsonObject.toString();
}
/**
* Replace the injecting parameters in import process definition
*
* SQL and PROCEDURE node, inject datasource by datasourceName
* DEPENDENT node, inject projectId and definitionId by projectName and definitionName
*
* @param jsonArray array node
*/
private void addImportTaskNodeSpecialParam(ArrayNode jsonArray) {
// add sql and dependent param
for (int i = 0; i < jsonArray.size(); i++) {
JsonNode taskNode = jsonArray.path(i);
String taskType = taskNode.path("type").asText();
if (TaskType.SQL.getDesc().equals(taskType) || TaskType.PROCEDURE.getDesc().equals(taskType)) {
ObjectNode sqlParameters = (ObjectNode)taskNode.path(Constants.TASK_PARAMS);
List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(
sqlParameters.path(Constants.TASK_PARAMS_DATASOURCE_NAME).asText());
if (!dataSources.isEmpty()) {
DataSource dataSource = dataSources.get(0);
sqlParameters.put(Constants.TASK_PARAMS_DATASOURCE, dataSource.getId());
}
((ObjectNode)taskNode).set(Constants.TASK_PARAMS, sqlParameters);
}
if (TaskType.DEPENDENT.getDesc().equals(taskType)) {
ObjectNode dependentParameters = (ObjectNode)taskNode.path(Constants.DEPENDENCE);
if (dependentParameters != null) {
ArrayNode dependTaskList = (ArrayNode)dependentParameters.path(
Constants.TASK_DEPENDENCE_DEPEND_TASK_LIST);
for (int h = 0; h < dependTaskList.size(); h++) {
ObjectNode dependentTaskModel = (ObjectNode)dependTaskList.path(h);
ArrayNode dependItemList = (ArrayNode)dependentTaskModel.get(
Constants.TASK_DEPENDENCE_DEPEND_ITEM_LIST);
for (int k = 0; k < dependItemList.size(); k++) {
ObjectNode dependentItem = (ObjectNode)dependItemList.path(k);
Project dependentItemProject = projectMapper.queryByName(
dependentItem.path(Constants.TASK_DEPENDENCE_PROJECT_NAME).asText());
if (dependentItemProject != null) {
ProcessDefinition definition = processDefinitionMapper.queryByDefineName(
dependentItemProject.getCode(),
dependentItem.path(Constants.TASK_DEPENDENCE_DEFINITION_NAME).asText());
if (definition != null) {
dependentItem.put(Constants.TASK_DEPENDENCE_PROJECT_ID,
dependentItemProject.getId());
dependentItem.put(Constants.TASK_DEPENDENCE_DEFINITION_ID, definition.getId());
}
}
}
}
((ObjectNode)taskNode).set(Constants.DEPENDENCE, dependentParameters);
}
}
}
}
/**
* import process schedule
*

84
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java

@ -1,84 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.utils.exportprocess;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* task node add datasource param strategy
*/
@Service
public class DataSourceParam implements ProcessAddTaskParam, InitializingBean {
private static final String PARAMS = "params";
@Autowired
private DataSourceMapper dataSourceMapper;
/**
* add datasource params
* @param taskNode task node json object
* @return task node json object
*/
@Override
public JsonNode addExportSpecialParam(JsonNode taskNode) {
// add sqlParameters
ObjectNode sqlParameters = (ObjectNode) taskNode.path(PARAMS);
DataSource dataSource = dataSourceMapper.selectById(sqlParameters.get("datasource").asInt());
if (null != dataSource) {
sqlParameters.put("datasourceName", dataSource.getName());
}
((ObjectNode)taskNode).set(PARAMS, sqlParameters);
return taskNode;
}
/**
* import process add datasource params
* @param taskNode task node json object
* @return task node json object
*/
@Override
public JsonNode addImportSpecialParam(JsonNode taskNode) {
ObjectNode sqlParameters = (ObjectNode) taskNode.path(PARAMS);
List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.path("datasourceName").asText());
if (!dataSources.isEmpty()) {
DataSource dataSource = dataSources.get(0);
sqlParameters.put("datasource", dataSource.getId());
}
((ObjectNode)taskNode).set(PARAMS, sqlParameters);
return taskNode;
}
/**
* put datasource strategy
*/
@Override
public void afterPropertiesSet() {
TaskNodeParamFactory.register(TaskType.SQL.getDesc(), this);
TaskNodeParamFactory.register(TaskType.PROCEDURE.getDesc(), this);
}
}

114
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java

@ -1,114 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.utils.exportprocess;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* task node add dependent param strategy
*/
@Service
public class DependentParam implements ProcessAddTaskParam, InitializingBean {
private static final String DEPENDENCE = "dependence";
@Autowired
ProcessDefinitionMapper processDefineMapper;
@Autowired
ProjectMapper projectMapper;
/**
* add dependent param
* @param taskNode task node json object
* @return task node json object
*/
@Override
public JsonNode addExportSpecialParam(JsonNode taskNode) {
// add dependent param
ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.path(DEPENDENCE).asText());
if (null != dependentParameters) {
ArrayNode dependTaskList = (ArrayNode) dependentParameters.get("dependTaskList");
for (int j = 0; j < dependTaskList.size(); j++) {
JsonNode dependentTaskModel = dependTaskList.path(j);
ArrayNode dependItemList = (ArrayNode) dependentTaskModel.get("dependItemList");
for (int k = 0; k < dependItemList.size(); k++) {
ObjectNode dependentItem = (ObjectNode) dependItemList.path(k);
int definitionId = dependentItem.path("definitionId").asInt();
ProcessDefinition definition = processDefineMapper.queryByDefineId(definitionId);
if (null != definition) {
dependentItem.put("projectName", definition.getProjectName());
dependentItem.put("definitionName", definition.getName());
}
}
}
((ObjectNode)taskNode).set(DEPENDENCE, dependentParameters);
}
return taskNode;
}
/**
* import process add dependent param
* @param taskNode task node json object
* @return
*/
@Override
public JsonNode addImportSpecialParam(JsonNode taskNode) {
ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.path(DEPENDENCE).asText());
if(dependentParameters != null){
ArrayNode dependTaskList = (ArrayNode) dependentParameters.path("dependTaskList");
for (int h = 0; h < dependTaskList.size(); h++) {
ObjectNode dependentTaskModel = (ObjectNode) dependTaskList.path(h);
ArrayNode dependItemList = (ArrayNode) dependentTaskModel.get("dependItemList");
for (int k = 0; k < dependItemList.size(); k++) {
ObjectNode dependentItem = (ObjectNode) dependItemList.path(k);
Project dependentItemProject = projectMapper.queryByName(dependentItem.path("projectName").asText());
if(dependentItemProject != null){
ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getCode(),dependentItem.path("definitionName").asText());
if(definition != null){
dependentItem.put("projectId",dependentItemProject.getId());
dependentItem.put("definitionId",definition.getId());
}
}
}
}
((ObjectNode)taskNode).set(DEPENDENCE, dependentParameters);
}
return taskNode;
}
/**
* put dependent strategy
*/
@Override
public void afterPropertiesSet() {
TaskNodeParamFactory.register(TaskType.DEPENDENT.getDesc(), this);
}
}

39
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java

@ -1,39 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.utils.exportprocess;
import com.fasterxml.jackson.databind.JsonNode;
/**
* ProcessAddTaskParam
*/
public interface ProcessAddTaskParam {
/**
* add export task special param: sql task dependent task
* @param taskNode task node json object
* @return task node json object
*/
JsonNode addExportSpecialParam(JsonNode taskNode);
/**
* add task special param: sql task dependent task
* @param taskNode task node json object
* @return task node json object
*/
JsonNode addImportSpecialParam(JsonNode taskNode);
}

38
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java

@ -1,38 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.utils.exportprocess;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* task node param factory
*/
public class TaskNodeParamFactory {
private static Map<String, ProcessAddTaskParam> taskServices = new ConcurrentHashMap<>();
public static ProcessAddTaskParam getByTaskType(String taskType){
return taskServices.get(taskType);
}
static void register(String taskType, ProcessAddTaskParam addSpecialTaskParam){
if (null != taskType) {
taskServices.put(taskType, addSpecialTaskParam);
}
}
}

181
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -47,6 +47,7 @@ import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
@ -59,6 +60,7 @@ import org.apache.http.entity.ContentType;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Method;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
@ -82,6 +84,9 @@ import org.springframework.web.multipart.MultipartFile;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* process definition service test
@ -129,6 +134,7 @@ public class ProcessDefinitionServiceTest {
+ " \"tenantId\": 1,\n"
+ " \"timeout\": 0\n"
+ "}";
private static final String CYCLE_SHELL_JSON = "{\n"
+ " \"globalParams\": [\n"
+ " \n"
@ -231,25 +237,37 @@ public class ProcessDefinitionServiceTest {
+ " \"tenantId\": 1,\n"
+ " \"timeout\": 0\n"
+ "}";
@InjectMocks
private ProcessDefinitionServiceImpl processDefinitionService;
@Mock
private ProcessDefinitionMapper processDefineMapper;
private ProcessDefinitionMapper processDefinitionMapper;
@Mock
private ProcessTaskRelationMapper processTaskRelationMapper;
@Mock
private ProjectMapper projectMapper;
@Mock
private ProjectServiceImpl projectService;
@Mock
private ScheduleMapper scheduleMapper;
@Mock
private ProcessService processService;
@Mock
private ProcessInstanceService processInstanceService;
@Mock
private TaskInstanceMapper taskInstanceMapper;
@Mock
private DataSourceMapper dataSourceMapper;
@Test
public void testQueryProcessDefinitionList() {
String projectName = "project_test1";
@ -273,7 +291,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
List<ProcessDefinition> resourceList = new ArrayList<>();
resourceList.add(getProcessDefinition());
Mockito.when(processDefineMapper.queryAllDefinitionList(project.getCode())).thenReturn(resourceList);
Mockito.when(processDefinitionMapper.queryAllDefinitionList(project.getCode())).thenReturn(resourceList);
Map<String, Object> checkSuccessRes = processDefinitionService.queryProcessDefinitionList(loginUser, "project_test1");
Assert.assertEquals(Status.SUCCESS, checkSuccessRes.get(Constants.STATUS));
}
@ -303,7 +321,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Page<ProcessDefinition> page = new Page<>(1, 10);
page.setTotal(30);
Mockito.when(processDefineMapper.queryDefineListPaging(
Mockito.when(processDefinitionMapper.queryDefineListPaging(
Mockito.any(IPage.class)
, Mockito.eq("")
, Mockito.eq(loginUser.getId())
@ -339,7 +357,7 @@ public class ProcessDefinitionServiceTest {
//project check auth success, instance not exist
putMsg(result, Status.SUCCESS, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Mockito.when(processDefineMapper.selectById(1)).thenReturn(null);
Mockito.when(processDefinitionMapper.selectById(1)).thenReturn(null);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
@ -356,7 +374,7 @@ public class ProcessDefinitionServiceTest {
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
//instance exit
Mockito.when(processDefineMapper.selectById(46)).thenReturn(getProcessDefinition());
Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(getProcessDefinition());
Map<String, Object> successRes = processDefinitionService.queryProcessDefinitionById(loginUser,
"project_test1", 46);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
@ -385,7 +403,7 @@ public class ProcessDefinitionServiceTest {
//project check auth success, instance not exist
putMsg(result, Status.SUCCESS, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test_def")).thenReturn(null);
Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), "test_def")).thenReturn(null);
ProcessData processData = getProcessData();
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData);
@ -394,7 +412,7 @@ public class ProcessDefinitionServiceTest {
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
//instance exit
Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test")).thenReturn(getProcessDefinition());
Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), "test")).thenReturn(getProcessDefinition());
Map<String, Object> successRes = processDefinitionService.queryProcessDefinitionByName(loginUser,
"project_test1", "test");
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
@ -448,7 +466,7 @@ public class ProcessDefinitionServiceTest {
definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}");
definition.setConnects("[]");
Mockito.when(processDefineMapper.selectById(46)).thenReturn(definition);
Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(definition);
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(getProcessData());
Map<String, Object> map3 = processDefinitionService.batchCopyProcessDefinition(
@ -517,7 +535,7 @@ public class ProcessDefinitionServiceTest {
//project check auth success, instance not exist
putMsg(result, Status.SUCCESS, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Mockito.when(processDefineMapper.selectById(1)).thenReturn(null);
Mockito.when(processDefinitionMapper.selectById(1)).thenReturn(null);
Map<String, Object> instanceNotexitRes = processDefinitionService.deleteProcessDefinitionById(loginUser,
"project_test1", 1);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
@ -525,7 +543,7 @@ public class ProcessDefinitionServiceTest {
ProcessDefinition processDefinition = getProcessDefinition();
//user no auth
loginUser.setUserType(UserType.GENERAL_USER);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition);
Map<String, Object> userNoAuthRes = processDefinitionService.deleteProcessDefinitionById(loginUser,
"project_test1", 46);
Assert.assertEquals(Status.USER_NO_OPERATION_PERM, userNoAuthRes.get(Constants.STATUS));
@ -533,14 +551,14 @@ public class ProcessDefinitionServiceTest {
//process definition online
loginUser.setUserType(UserType.ADMIN_USER);
processDefinition.setReleaseState(ReleaseState.ONLINE);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition);
Map<String, Object> dfOnlineRes = processDefinitionService.deleteProcessDefinitionById(loginUser,
"project_test1", 46);
Assert.assertEquals(Status.PROCESS_DEFINE_STATE_ONLINE, dfOnlineRes.get(Constants.STATUS));
//scheduler list elements > 1
processDefinition.setReleaseState(ReleaseState.OFFLINE);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition);
List<Schedule> schedules = new ArrayList<>();
schedules.add(getSchedule());
schedules.add(getSchedule());
@ -564,13 +582,13 @@ public class ProcessDefinitionServiceTest {
schedule.setReleaseState(ReleaseState.OFFLINE);
schedules.add(schedule);
Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(schedules);
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(0);
Mockito.when(processDefinitionMapper.deleteById(46)).thenReturn(0);
Map<String, Object> deleteFail = processDefinitionService.deleteProcessDefinitionById(loginUser,
"project_test1", 46);
Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, deleteFail.get(Constants.STATUS));
//delete success
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
Mockito.when(processDefinitionMapper.deleteById(46)).thenReturn(1);
Map<String, Object> deleteSuccess = processDefinitionService.deleteProcessDefinitionById(loginUser,
"project_test1", 46);
Assert.assertEquals(Status.SUCCESS, deleteSuccess.get(Constants.STATUS));
@ -597,7 +615,7 @@ public class ProcessDefinitionServiceTest {
// project check auth success, processs definition online
putMsg(result, Status.SUCCESS, projectName);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(getProcessDefinition());
Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(getProcessDefinition());
Map<String, Object> onlineRes = processDefinitionService.releaseProcessDefinition(
loginUser, "project_test1", 46, ReleaseState.ONLINE);
Assert.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS));
@ -605,7 +623,7 @@ public class ProcessDefinitionServiceTest {
// project check auth success, processs definition online
ProcessDefinition processDefinition1 = getProcessDefinition();
processDefinition1.setResourceIds("1,2");
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition1);
Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition1);
Mockito.when(processService.getUserById(1)).thenReturn(loginUser);
Map<String, Object> onlineWithResourceRes = processDefinitionService.releaseProcessDefinition(
loginUser, "project_test1", 46, ReleaseState.ONLINE);
@ -638,13 +656,13 @@ public class ProcessDefinitionServiceTest {
//project check auth success, process not exist
putMsg(result, Status.SUCCESS, projectName);
Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(null);
Mockito.when(processDefinitionMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(null);
Map<String, Object> processNotExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser,
"project_test1", "test_pdf");
Assert.assertEquals(Status.SUCCESS, processNotExistRes.get(Constants.STATUS));
//process exist
Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(getProcessDefinition());
Mockito.when(processDefinitionMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(getProcessDefinition());
Map<String, Object> processExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser,
"project_test1", "test_pdf");
Assert.assertEquals(Status.PROCESS_DEFINITION_NAME_EXIST, processExistRes.get(Constants.STATUS));
@ -684,19 +702,19 @@ public class ProcessDefinitionServiceTest {
@Test
public void testGetTaskNodeListByDefinitionId() {
//process definition not exist
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(null);
Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(null);
Map<String, Object> processDefinitionNullRes = processDefinitionService.getTaskNodeListByDefinitionCode(46L);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
//process data null
ProcessDefinition processDefinition = getProcessDefinition();
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
Map<String, Object> successRes = processDefinitionService.getTaskNodeListByDefinitionCode(46L);
Assert.assertEquals(Status.DATA_IS_NOT_VALID, successRes.get(Constants.STATUS));
//success
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(new ProcessData());
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
Map<String, Object> dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionCode(46L);
Assert.assertEquals(Status.SUCCESS, dataNotValidRes.get(Constants.STATUS));
}
@ -707,7 +725,7 @@ public class ProcessDefinitionServiceTest {
String defineCodeList = "46";
Long[] codeArray = {46L};
List<Long> codeList = Arrays.asList(codeArray);
Mockito.when(processDefineMapper.queryByCodes(codeList)).thenReturn(null);
Mockito.when(processDefinitionMapper.queryByCodes(codeList)).thenReturn(null);
Map<String, Object> processNotExistRes = processDefinitionService.getTaskNodeListByDefinitionCodeList(defineCodeList);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processNotExistRes.get(Constants.STATUS));
@ -715,7 +733,7 @@ public class ProcessDefinitionServiceTest {
ProcessDefinition processDefinition = getProcessDefinition();
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(processDefinition);
Mockito.when(processDefineMapper.queryByCodes(codeList)).thenReturn(processDefinitionList);
Mockito.when(processDefinitionMapper.queryByCodes(codeList)).thenReturn(processDefinitionList);
ProcessData processData = getProcessData();
Mockito.when(processService.genProcessData(processDefinition)).thenReturn(processData);
@ -748,7 +766,7 @@ public class ProcessDefinitionServiceTest {
processDefinitionList.add(processDefinition);
Project test = getProject("test");
Mockito.when(projectMapper.selectById(projectId)).thenReturn(test);
Mockito.when(processDefineMapper.queryAllDefinitionList(test.getCode())).thenReturn(processDefinitionList);
Mockito.when(processDefinitionMapper.queryAllDefinitionList(test.getCode())).thenReturn(processDefinitionList);
Map<String, Object> successRes = processDefinitionService.queryProcessDefinitionAllByProjectId(projectId);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
@ -758,7 +776,7 @@ public class ProcessDefinitionServiceTest {
//process definition not exist
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(SHELL_JSON);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(null);
Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(null);
Map<String, Object> processDefinitionNullRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
@ -782,7 +800,7 @@ public class ProcessDefinitionServiceTest {
taskInstance.setHost("192.168.xx.xx");
//task instance not exist
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Map<String, Object> taskNullRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS));
@ -817,7 +835,7 @@ public class ProcessDefinitionServiceTest {
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setHost("192.168.xx.xx");
taskInstance.setTaskParams("\"processDefinitionId\": \"222\",\n");
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
@ -1035,6 +1053,7 @@ public class ProcessDefinitionServiceTest {
+ " \"tenantId\": 1,\n"
+ " \"timeout\": 0\n"
+ "}";
Map<String, Object> updateResult = processDefinitionService.updateProcessDefinition(loginUser, projectName, 1, "test",
sqlDependentJson, "", "", "");
@ -1075,7 +1094,7 @@ public class ProcessDefinitionServiceTest {
checkResult.put(Constants.STATUS, Status.SUCCESS);
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(checkResult);
Mockito.when(processDefineMapper.queryByDefineId(1)).thenReturn(processDefinition);
Mockito.when(processDefinitionMapper.queryByDefineId(1)).thenReturn(processDefinition);
HttpServletResponse response = mock(HttpServletResponse.class);
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
@ -1218,4 +1237,110 @@ public class ProcessDefinitionServiceTest {
Assert.assertEquals(0, processDefinitionService.importProcessSchedule(loginUser, projectName, processMeta, processDefinitionName, processDefinitionId));
}
@Test
public void testAddExportTaskNodeSpecialParam() {
String sqlDependentJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":"
+ "\"sql\",\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\",\"udfs\":\"\""
+ ",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\","
+ "\"localParams\":[],\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[]},\"description\":"
+ "\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\","
+ "\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\","
+ "\"workerGroupId\":-1,\"preTasks\":[\"dependent\"]},{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\","
+ "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":"
+ "{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"projectId\":"
+ "2,\"definitionId\":46,\"depTasks\":\"ALL\",\"cycle\":\"day\",\"dateValue\":\"today\"}]}]},"
+ "\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"enable\":false},"
+ "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,"
+ "\"timeout\":0}";
ProcessData processData = JSONUtils.parseObject(sqlDependentJson, ProcessData.class);
DataSource dataSource = new DataSource();
dataSource.setName("testDataSource");
when(dataSourceMapper.selectById(1)).thenReturn(dataSource);
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setProjectName("testProjectName");
processDefinition.setName("testDefinitionName");
when(processDefinitionMapper.queryByDefineId(46)).thenReturn(processDefinition);
try {
Class clazz = ProcessDefinitionServiceImpl.class;
Method method = clazz.getDeclaredMethod("addExportTaskNodeSpecialParam", ProcessData.class);
method.setAccessible(true);
method.invoke(processDefinitionService, processData);
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
}
List<TaskNode> taskNodeList = processData.getTasks();
ObjectNode sqlParameters = JSONUtils.parseObject(taskNodeList.get(0).getParams());
Assert.assertEquals("testDataSource", sqlParameters.get(Constants.TASK_PARAMS_DATASOURCE_NAME).asText());
ObjectNode dependentParameters = JSONUtils.parseObject(taskNodeList.get(1).getDependence());
ArrayNode dependTaskList = (ArrayNode)dependentParameters.get(Constants.TASK_DEPENDENCE_DEPEND_TASK_LIST);
JsonNode dependentTaskModel = dependTaskList.path(0);
ArrayNode dependItemList = (ArrayNode)dependentTaskModel.get(Constants.TASK_DEPENDENCE_DEPEND_ITEM_LIST);
ObjectNode dependentItem = (ObjectNode)dependItemList.path(0);
Assert.assertEquals("testProjectName", dependentItem.get(Constants.TASK_DEPENDENCE_PROJECT_NAME).asText());
Assert.assertEquals("testDefinitionName", dependentItem.get(Constants.TASK_DEPENDENCE_DEFINITION_NAME).asText());
}
@Test
public void testAddImportTaskNodeSpecialParam() {
String definitionJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\","
+ "\"params\":{\"type\":\"MYSQL\",\"datasourceName\":\"testDataSource\",\"sql\":\"select * from test\","
+ "\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":"
+ "\"TABLE\",\"localParams\":[],\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[]},"
+ "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+ "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":"
+ "\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[\"dependent\"]},{\"type\":\"DEPENDENT\",\"id\":"
+ "\"tasks-33787\",\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\","
+ "\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":"
+ "[{\"projectName\":\"testProjectName\",\"definitionName\":\"testDefinitionName\",\"depTasks\":\"ALL\""
+ ",\"cycle\":\"day\",\"dateValue\":\"today\"}]}]},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\""
+ "timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\""
+ ":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
ObjectNode jsonObject = JSONUtils.parseObject(definitionJson);
ArrayNode jsonArray = (ArrayNode) jsonObject.get("tasks");
List<DataSource> dataSources = new ArrayList<>();
DataSource dataSource = new DataSource();
dataSource.setId(1);
dataSources.add(dataSource);
when(dataSourceMapper.queryDataSourceByName("testDataSource")).thenReturn(dataSources);
Project project = new Project();
project.setId(1);
project.setCode(1L);
when(projectMapper.queryByName("testProjectName")).thenReturn(project);
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(1);
when(processDefinitionMapper.queryByDefineName(1L, "testDefinitionName")).thenReturn(processDefinition);
try {
Class clazz = ProcessDefinitionServiceImpl.class;
Method method = clazz.getDeclaredMethod("addImportTaskNodeSpecialParam", ArrayNode.class);
method.setAccessible(true);
method.invoke(processDefinitionService, jsonArray);
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
}
ObjectNode sqlParameters = (ObjectNode)jsonArray.path(0).path(Constants.TASK_PARAMS);
Assert.assertEquals(1, sqlParameters.get(Constants.TASK_PARAMS_DATASOURCE).asInt());
ObjectNode dependentParameters = (ObjectNode)jsonArray.path(1).path(Constants.DEPENDENCE);
ArrayNode dependTaskList = (ArrayNode)dependentParameters.get(Constants.TASK_DEPENDENCE_DEPEND_TASK_LIST);
JsonNode dependentTaskModel = dependTaskList.path(0);
ArrayNode dependItemList = (ArrayNode)dependentTaskModel.get(Constants.TASK_DEPENDENCE_DEPEND_ITEM_LIST);
ObjectNode dependentItem = (ObjectNode)dependItemList.path(0);
Assert.assertEquals(1, dependentItem.get(Constants.TASK_DEPENDENCE_PROJECT_ID).asInt());
Assert.assertEquals(1, dependentItem.get(Constants.TASK_DEPENDENCE_DEFINITION_ID).asInt());
}
}

84
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java

@ -1,84 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.utils.exportprocess;
import org.apache.dolphinscheduler.api.controller.AbstractControllerTest;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.json.JSONException;
import org.junit.Test;
import org.skyscreamer.jsonassert.JSONAssert;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* DataSourceParamTest
*/
public class DataSourceParamTest extends AbstractControllerTest {
@Test
public void testAddExportDependentSpecialParam() throws JSONException {
String sqlJson = "{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\","
+ "\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\","
+ "\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\""
+ ",\"localParams\":[],\"connParams\":\"\","
+ "\"preStatements\":[],\"postStatements\":[]},"
+ "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+ "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\","
+ "\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,"
+ "\"preTasks\":[\"dependent\"]}";
ObjectNode taskNode = JSONUtils.parseObject(sqlJson);
if (StringUtils.isNotEmpty(taskNode.path("type").asText())) {
String taskType = taskNode.path("type").asText();
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
JsonNode sql = addTaskParam.addExportSpecialParam(taskNode);
JSONAssert.assertEquals(taskNode.toString(), sql.toString(), false);
}
}
@Test
public void testAddImportDependentSpecialParam() throws JSONException {
String sqlJson = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\","
+ "\"type\":\"SQL\",\"params\":{\"postStatements\":[],"
+ "\"connParams\":\"\",\"receiversCc\":\"\",\"udfs\":\"\","
+ "\"type\":\"MYSQL\",\"title\":\"\",\"sql\":\"show tables\",\""
+ "preStatements\":[],\"sqlType\":\"1\",\"receivers\":\"\",\"datasource\":1,"
+ "\"showType\":\"TABLE\",\"localParams\":[],\"datasourceName\":\"dsmetadata\"},\"timeout\""
+ ":{\"enable\":false,\"strategy\":\"\"},\"maxRetryTimes\":\"0\","
+ "\"taskInstancePriority\":\"MEDIUM\",\"name\":\"mysql\",\"dependence\":{},"
+ "\"retryInterval\":\"1\",\"preTasks\":[\"dependent\"],\"id\":\"tasks-8745\"}";
ObjectNode taskNode = JSONUtils.parseObject(sqlJson);
if (StringUtils.isNotEmpty(taskNode.path("type").asText())) {
String taskType = taskNode.path("type").asText();
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
JsonNode sql = addTaskParam.addImportSpecialParam(taskNode);
JSONAssert.assertEquals(taskNode.toString(), sql.toString(), false);
}
}
}

110
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java

@ -1,110 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.utils.exportprocess;
import org.apache.dolphinscheduler.api.controller.AbstractControllerTest;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.json.JSONException;
import org.junit.Test;
import org.skyscreamer.jsonassert.JSONAssert;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* DependentParamTest
*/
public class DependentParamTest extends AbstractControllerTest {
@Test
public void testAddExportDependentSpecialParam() throws JSONException {
String dependentJson = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\","
+ "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\","
+ "\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\","
+ "\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\","
+ "\"cycle\":\"day\",\"dateValue\":\"today\"}]}]}}";
ObjectNode taskNode = JSONUtils.parseObject(dependentJson);
if (StringUtils.isNotEmpty(taskNode.path("type").asText())) {
String taskType = taskNode.path("type").asText();
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
JsonNode dependent = addTaskParam.addExportSpecialParam(taskNode);
JSONAssert.assertEquals(taskNode.toString(), dependent.toString(), false);
}
String dependentEmpty = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\","
+ "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"}";
ObjectNode taskEmpty = JSONUtils.parseObject(dependentEmpty);
if (StringUtils.isNotEmpty(taskEmpty.path("type").asText())) {
String taskType = taskEmpty.path("type").asText();
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
JsonNode dependent = addTaskParam.addImportSpecialParam(taskEmpty);
JSONAssert.assertEquals(taskEmpty.toString(), dependent.toString(), false);
}
}
@Test
public void testAddImportDependentSpecialParam() throws JSONException {
String dependentJson = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\""
+ ",\"type\":\"DEPENDENT\",\"params\":{},\"timeout\":{\"enable\":false,"
+ "\"strategy\":\"\"},\"maxRetryTimes\":\"0\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"name\":\"dependent\","
+ "\"dependence\":{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\","
+ "\"definitionName\":\"shell-1\",\"depTasks\":\"shell-1\",\"projectName\":\"test\","
+ "\"projectId\":1,\"cycle\":\"day\",\"definitionId\":7}],\"relation\":\"AND\"}],"
+ "\"relation\":\"AND\"},\"retryInterval\":\"1\",\"preTasks\":[],\"id\":\"tasks-55485\"}";
ObjectNode taskNode = JSONUtils.parseObject(dependentJson);
if (StringUtils.isNotEmpty(taskNode.path("type").asText())) {
String taskType = taskNode.path("type").asText();
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
JsonNode dependent = addTaskParam.addImportSpecialParam(taskNode);
JSONAssert.assertEquals(taskNode.toString(), dependent.toString(), false);
}
String dependentEmpty = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\""
+ ",\"type\":\"DEPENDENT\",\"params\":{},\"timeout\":{\"enable\":false,"
+ "\"strategy\":\"\"},\"maxRetryTimes\":\"0\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"name\":\"dependent\",\"retryInterval\":\"1\",\"preTasks\":[],\"id\":\"tasks-55485\"}";
JsonNode taskNodeEmpty = JSONUtils.parseObject(dependentEmpty);
if (StringUtils.isNotEmpty(taskNodeEmpty.path("type").asText())) {
String taskType = taskNodeEmpty.path("type").asText();
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
JsonNode dependent = addTaskParam.addImportSpecialParam(taskNode);
JSONAssert.assertEquals(taskNodeEmpty.toString(), dependent.toString(), false);
}
}
}

13
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -1091,4 +1091,17 @@ public final class Constants {
public static final boolean DOCKER_MODE = StringUtils.isNotEmpty(System.getenv("DOCKER"));
public static final boolean KUBERNETES_MODE = StringUtils.isNotEmpty(System.getenv("KUBERNETES_SERVICE_HOST")) && StringUtils.isNotEmpty(System.getenv("KUBERNETES_SERVICE_PORT"));
/**
* task parameter keys
*/
public static final String TASK_PARAMS = "params";
public static final String TASK_PARAMS_DATASOURCE = "datasource";
public static final String TASK_PARAMS_DATASOURCE_NAME = "datasourceName";
public static final String TASK_DEPENDENCE = "dependence";
public static final String TASK_DEPENDENCE_DEPEND_TASK_LIST = "dependTaskList";
public static final String TASK_DEPENDENCE_DEPEND_ITEM_LIST = "dependItemList";
public static final String TASK_DEPENDENCE_PROJECT_ID = "projectId";
public static final String TASK_DEPENDENCE_PROJECT_NAME = "projectName";
public static final String TASK_DEPENDENCE_DEFINITION_ID = "definitionId";
public static final String TASK_DEPENDENCE_DEFINITION_NAME = "definitionName";
}

Loading…
Cancel
Save