From 1c5c8b75b94237b0b304a0fa72d417cac18661f1 Mon Sep 17 00:00:00 2001 From: Yelli <51317527+Yeleights@users.noreply.github.com> Date: Sat, 11 Jan 2020 15:05:30 +0800 Subject: [PATCH] refactor import process (#1804) * refactor import process * add refactor import process UT * add import process UT * add null check UT for import process metadata * add UT for import process * modify dependentparam UT * modify testAddImportDependentSpecialParam * modify DependentParamTest * modify processDefinitionService UT --- .../dolphinscheduler/api/dto/ProcessMeta.java | 8 +- .../api/service/ProcessDefinitionService.java | 370 ++++++++---------- .../dolphinscheduler/api/utils/FileUtils.java | 28 +- .../utils/exportprocess/DataSourceParam.java | 23 +- .../utils/exportprocess/DependentParam.java | 39 +- ...askParam.java => ProcessAddTaskParam.java} | 13 +- .../exportprocess/TaskNodeParamFactory.java | 6 +- .../service/ProcessDefinitionServiceTest.java | 121 +++++- .../api/utils/FileUtilsTest.java | 31 +- .../exportprocess/DataSourceParamTest.java | 47 ++- .../exportprocess/DependentParamTest.java | 77 +++- 11 files changed, 500 insertions(+), 263 deletions(-) rename dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/{exportProcessAddTaskParam.java => ProcessAddTaskParam.java} (75%) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java index 7c4a5cf23e..f14d8df097 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java @@ -59,7 +59,7 @@ public class ProcessMeta { /** * warning group id */ - private int scheduleWarningGroupId; + private Integer scheduleWarningGroupId; /** * warning group name @@ -99,7 +99,7 @@ public class ProcessMeta { /** * worker group id */ - private int scheduleWorkerGroupId; + private Integer scheduleWorkerGroupId; /** * worker group name @@ -165,7 +165,7 @@ public class ProcessMeta { this.scheduleWarningType = scheduleWarningType; } - public int getScheduleWarningGroupId() { + public Integer getScheduleWarningGroupId() { return scheduleWarningGroupId; } @@ -229,7 +229,7 @@ public class ProcessMeta { this.scheduleProcessInstancePriority = scheduleProcessInstancePriority; } - public int getScheduleWorkerGroupId() { + public Integer getScheduleWorkerGroupId() { return scheduleWorkerGroupId; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 80967acac2..2d462b3cac 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -27,9 +27,10 @@ import org.apache.dolphinscheduler.api.dto.treeview.Instance; import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.CheckUtils; +import org.apache.dolphinscheduler.api.utils.FileUtils; import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.exportprocess.ProcessAddTaskParam; import org.apache.dolphinscheduler.api.utils.exportprocess.TaskNodeParamFactory; -import org.apache.dolphinscheduler.api.utils.exportprocess.exportProcessAddTaskParam; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; import org.apache.dolphinscheduler.common.graph.DAG; @@ -56,9 +57,7 @@ import org.springframework.web.multipart.MultipartFile; import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletResponse; import java.io.BufferedOutputStream; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -97,9 +96,6 @@ public class ProcessDefinitionService extends BaseDAGService { @Autowired private ProcessDao processDao; - @Autowired - private DataSourceMapper dataSourceMapper; - @Autowired private WorkerGroupMapper workerGroupMapper; @@ -540,7 +536,7 @@ public class ProcessDefinitionService extends BaseDAGService { */ public String exportProcessMetaDataStr(Integer processDefinitionId, ProcessDefinition processDefinition) { //correct task param which has data source or dependent param - String correctProcessDefinitionJson = addTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson()); + String correctProcessDefinitionJson = addExportTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson()); processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson); //export process metadata @@ -586,7 +582,7 @@ public class ProcessDefinitionService extends BaseDAGService { * @param processDefinitionJson processDefinitionJson * @return correct processDefinitionJson */ - public String addTaskNodeSpecialParam(String processDefinitionJson) { + public String addExportTaskNodeSpecialParam(String processDefinitionJson) { JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson); JSONArray jsonArray = (JSONArray) jsonObject.get("tasks"); @@ -595,9 +591,9 @@ public class ProcessDefinitionService extends BaseDAGService { if (StringUtils.isNotEmpty(taskNode.getString("type"))) { String taskType = taskNode.getString("type"); - exportProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); + ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); if (null != addTaskParam) { - addTaskParam.addSpecialParam(taskNode); + addTaskParam.addExportSpecialParam(taskNode); } } } @@ -605,24 +601,6 @@ public class ProcessDefinitionService extends BaseDAGService { return jsonObject.toString(); } - /** - * check task if has dependent - * @param taskType task type - * @return if task has dependent return true else false - */ - private boolean checkTaskHasDependent(String taskType) { - return taskType.equals(TaskType.DEPENDENT.name()); - } - - /** - * check task if has data source info - * @param taskType task type - * @return if task has data source return true else false - */ - private boolean checkTaskHasDataSource(String taskType) { - return taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name()); - } - /** * check task if has sub process * @param taskType task type @@ -642,206 +620,168 @@ public class ProcessDefinitionService extends BaseDAGService { @Transactional(rollbackFor = Exception.class) public Map importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) { Map result = new HashMap<>(5); + String processMetaJson = FileUtils.file2String(file); + ProcessMeta processMeta = JSONUtils.parseObject(processMetaJson, ProcessMeta.class); - JSONObject json; + //check file content + if (null == processMeta) { + putMsg(result, Status.DATA_IS_NULL, "fileContent"); + return result; + } + if (StringUtils.isEmpty(processMeta.getProjectName())) { + putMsg(result, Status.DATA_IS_NULL, "projectName"); + return result; + } + if (StringUtils.isEmpty(processMeta.getProcessDefinitionName())) { + putMsg(result, Status.DATA_IS_NULL, "processDefinitionName"); + return result; + } + if (StringUtils.isEmpty(processMeta.getProcessDefinitionJson())) { + putMsg(result, Status.DATA_IS_NULL, "processDefinitionJson"); + return result; + } - //read workflow json - try(InputStreamReader inputStreamReader = new InputStreamReader( file.getInputStream(), StandardCharsets.UTF_8)) { - BufferedReader streamReader = new BufferedReader(inputStreamReader); - StringBuilder respomseStrBuilder = new StringBuilder(); - String inputStr; + //deal with process name + String processDefinitionName = processMeta.getProcessDefinitionName(); + //use currentProjectName to query + Project targetProject = projectMapper.queryByName(currentProjectName); + if(null != targetProject){ + processDefinitionName = recursionProcessDefinitionName(targetProject.getId(), + processDefinitionName, 1); + } - while ((inputStr = streamReader.readLine())!= null){ - respomseStrBuilder.append( inputStr ); - } + //add special task param + String importProcessParam = addImportTaskNodeParam(loginUser, processMeta.getProcessDefinitionJson(), targetProject); - json = JSONObject.parseObject( respomseStrBuilder.toString() ); - - if(null != json){ - String originProjectName = null; - String processDefinitionName = null; - String processDefinitionJson = null; - String processDefinitionDesc = null; - String processDefinitionLocations = null; - String processDefinitionConnects = null; - - String scheduleWarningType = null; - String scheduleWarningGroupId = null; - String scheduleStartTime = null; - String scheduleEndTime = null; - String scheduleCrontab = null; - String scheduleFailureStrategy = null; - String scheduleReleaseState = null; - String scheduleProcessInstancePriority = null; - String scheduleWorkerGroupId = null; - String scheduleWorkerGroupName = null; - - if (Objects.nonNull(json.get("projectName"))) { - originProjectName = json.get("projectName").toString(); - } else { - putMsg(result, Status.DATA_IS_NULL, "processDefinitionName"); - return result; - } - if (Objects.nonNull(json.get("processDefinitionName"))) { - processDefinitionName = json.get("processDefinitionName").toString(); - } else { - putMsg(result, Status.DATA_IS_NULL, "processDefinitionName"); - return result; - } - if (Objects.nonNull(json.get("processDefinitionJson"))) { - processDefinitionJson = json.get("processDefinitionJson").toString(); - } else { - putMsg(result, Status.DATA_IS_NULL, "processDefinitionJson"); - return result; - } - if (Objects.nonNull(json.get("processDefinitionDescription"))) { - processDefinitionDesc = json.get("processDefinitionDescription").toString(); - } - if (Objects.nonNull(json.get("processDefinitionLocations"))) { - processDefinitionLocations = json.get("processDefinitionLocations").toString(); - } - if (Objects.nonNull(json.get("processDefinitionConnects"))) { - processDefinitionConnects = json.get("processDefinitionConnects").toString(); - } - - //check user access for org project - Project originProject = projectMapper.queryByName(originProjectName); - Map checkResult = projectService.checkProjectAndAuth(loginUser, originProject, originProjectName); - Status resultStatus = (Status) checkResult.get(Constants.STATUS); + Map createProcessResult; + try { + createProcessResult = createProcessDefinition(loginUser + ,currentProjectName, + processDefinitionName, + importProcessParam, + processMeta.getProcessDefinitionDescription(), + processMeta.getProcessDefinitionLocations(), + processMeta.getProcessDefinitionConnects()); + } catch (JsonProcessingException e) { + logger.error("import process meta json data: {}", e.getMessage(), e); + putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); + return result; + } - if (resultStatus == Status.SUCCESS) { - //use currentProjectName to query - Project targetProject = projectMapper.queryByName(currentProjectName); - if(null != targetProject){ - processDefinitionName = recursionProcessDefinitionName(targetProject.getId(), processDefinitionName, 1); - } + putMsg(result, Status.SUCCESS); + //create process definition + Integer processDefinitionId = null; + if (null != createProcessResult && Objects.nonNull(createProcessResult.get("processDefinitionId"))) { + processDefinitionId = Integer.parseInt(createProcessResult.get("processDefinitionId").toString()); + } + //scheduler param + if (null != processMeta.getScheduleCrontab() && null != processDefinitionId) { + int scheduleInsert = importProcessSchedule(loginUser, + currentProjectName, + processMeta, + processDefinitionName, + processDefinitionId); + + if (0 == scheduleInsert) { + putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); + return result; + } + } - JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson); - JSONArray jsonArray = (JSONArray) jsonObject.get("tasks"); - - for (int j = 0; j < jsonArray.size(); j++) { - JSONObject taskNode = jsonArray.getJSONObject(j); - String taskType = taskNode.getString("type"); - if(checkTaskHasDataSource(taskType)) { - JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params")); - List dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName")); - if (!dataSources.isEmpty()) { - DataSource dataSource = dataSources.get(0); - sqlParameters.put("datasource", dataSource.getId()); - } - taskNode.put("params", sqlParameters); - }else if(checkTaskHasDependent(taskType)){ - JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence")); - if(dependentParameters != null){ - JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList"); - for (int h = 0; h < dependTaskList.size(); h++) { - JSONObject dependentTaskModel = dependTaskList.getJSONObject(h); - JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList"); - for (int k = 0; k < dependItemList.size(); k++) { - JSONObject dependentItem = dependItemList.getJSONObject(k); - Project dependentItemProject = projectMapper.queryByName(dependentItem.getString("projectName")); - if(dependentItemProject != null){ - ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getId(),dependentItem.getString("definitionName")); - if(definition != null){ - dependentItem.put("projectId",dependentItemProject.getId()); - dependentItem.put("definitionId",definition.getId()); - } - } - } - } - taskNode.put("dependence", dependentParameters); - } - } - } + return result; + } - //recursive sub-process parameter correction map key for old process id value for new process id - Map subProcessIdMap = new HashMap<>(20); + /** + * import process add special task param + * @param loginUser login user + * @param processDefinitionJson process definition json + * @param targetProject target project + * @return import process param + */ + private String addImportTaskNodeParam(User loginUser, String processDefinitionJson, Project targetProject) { + JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson); + JSONArray jsonArray = (JSONArray) jsonObject.get("tasks"); + //add sql and dependent param + for (int i = 0; i < jsonArray.size(); i++) { + JSONObject taskNode = jsonArray.getJSONObject(i); + String taskType = taskNode.getString("type"); + ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); + if (null != addTaskParam) { + addTaskParam.addImportSpecialParam(taskNode); + } + } - List subProcessList = jsonArray.stream() - .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).getString("type"))) - .collect(Collectors.toList()); + //recursive sub-process parameter correction map key for old process id value for new process id + Map subProcessIdMap = new HashMap<>(20); - if (!subProcessList.isEmpty()) { - importSubProcess(loginUser, targetProject, jsonArray, subProcessIdMap); - } + List subProcessList = jsonArray.stream() + .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).getString("type"))) + .collect(Collectors.toList()); - jsonObject.put("tasks", jsonArray); + if (CollectionUtils.isNotEmpty(subProcessList)) { + importSubProcess(loginUser, targetProject, jsonArray, subProcessIdMap); + } - Map createProcessDefinitionResult = createProcessDefinition(loginUser,currentProjectName,processDefinitionName,jsonObject.toString(),processDefinitionDesc,processDefinitionLocations,processDefinitionConnects); - Integer processDefinitionId = null; - if (Objects.nonNull(createProcessDefinitionResult.get("processDefinitionId"))) { - processDefinitionId = Integer.parseInt(createProcessDefinitionResult.get("processDefinitionId").toString()); - } - if (Objects.nonNull(json.get("scheduleCrontab")) && processDefinitionId != null) { - Date now = new Date(); - Schedule scheduleObj = new Schedule(); - scheduleObj.setProjectName(currentProjectName); - scheduleObj.setProcessDefinitionId(processDefinitionId); - scheduleObj.setProcessDefinitionName(processDefinitionName); - scheduleObj.setCreateTime(now); - scheduleObj.setUpdateTime(now); - scheduleObj.setUserId(loginUser.getId()); - scheduleObj.setUserName(loginUser.getUserName()); - - - scheduleCrontab = json.get("scheduleCrontab").toString(); - scheduleObj.setCrontab(scheduleCrontab); - if (Objects.nonNull(json.get("scheduleStartTime"))) { - scheduleStartTime = json.get("scheduleStartTime").toString(); - scheduleObj.setStartTime(DateUtils.stringToDate(scheduleStartTime)); - } - if (Objects.nonNull(json.get("scheduleEndTime"))) { - scheduleEndTime = json.get("scheduleEndTime").toString(); - scheduleObj.setEndTime(DateUtils.stringToDate(scheduleEndTime)); - } - if (Objects.nonNull(json.get("scheduleWarningType"))) { - scheduleWarningType = json.get("scheduleWarningType").toString(); - scheduleObj.setWarningType(WarningType.valueOf(scheduleWarningType)); - } - if (Objects.nonNull(json.get("scheduleWarningGroupId"))) { - scheduleWarningGroupId = json.get("scheduleWarningGroupId").toString(); - scheduleObj.setWarningGroupId(Integer.parseInt(scheduleWarningGroupId)); - } - if (Objects.nonNull(json.get("scheduleFailureStrategy"))) { - scheduleFailureStrategy = json.get("scheduleFailureStrategy").toString(); - scheduleObj.setFailureStrategy(FailureStrategy.valueOf(scheduleFailureStrategy)); - } - if (Objects.nonNull(json.get("scheduleReleaseState"))) { - scheduleReleaseState = json.get("scheduleReleaseState").toString(); - scheduleObj.setReleaseState(ReleaseState.valueOf(scheduleReleaseState)); - } - if (Objects.nonNull(json.get("scheduleProcessInstancePriority"))) { - scheduleProcessInstancePriority = json.get("scheduleProcessInstancePriority").toString(); - scheduleObj.setProcessInstancePriority(Priority.valueOf(scheduleProcessInstancePriority)); - } - if (Objects.nonNull(json.get("scheduleWorkerGroupId"))) { - scheduleWorkerGroupId = json.get("scheduleWorkerGroupId").toString(); - if(scheduleWorkerGroupId != null){ - scheduleObj.setWorkerGroupId(Integer.parseInt(scheduleWorkerGroupId)); - }else{ - if (Objects.nonNull(json.get("scheduleWorkerGroupName"))) { - scheduleWorkerGroupName = json.get("scheduleWorkerGroupName").toString(); - List workerGroups = workerGroupMapper.queryWorkerGroupByName(scheduleWorkerGroupName); - if(!workerGroups.isEmpty()){ - scheduleObj.setWorkerGroupId(workerGroups.get(0).getId()); - } - } - } - } - scheduleMapper.insert(scheduleObj); - } + jsonObject.put("tasks", jsonArray); + return jsonObject.toString(); + } - putMsg(result, Status.SUCCESS); - return result; + /** + * import process schedule + * @param loginUser login user + * @param currentProjectName current project name + * @param processMeta process meta data + * @param processDefinitionName process definition name + * @param processDefinitionId process definition id + * @return insert schedule flag + */ + public int importProcessSchedule(User loginUser, String currentProjectName, ProcessMeta processMeta, + String processDefinitionName, Integer processDefinitionId) { + Date now = new Date(); + Schedule scheduleObj = new Schedule(); + scheduleObj.setProjectName(currentProjectName); + scheduleObj.setProcessDefinitionId(processDefinitionId); + scheduleObj.setProcessDefinitionName(processDefinitionName); + scheduleObj.setCreateTime(now); + scheduleObj.setUpdateTime(now); + scheduleObj.setUserId(loginUser.getId()); + scheduleObj.setUserName(loginUser.getUserName()); + + scheduleObj.setCrontab(processMeta.getScheduleCrontab()); + + if (null != processMeta.getScheduleStartTime()) { + scheduleObj.setStartTime(DateUtils.stringToDate(processMeta.getScheduleStartTime())); + } + if (null != processMeta.getScheduleEndTime()) { + scheduleObj.setEndTime(DateUtils.stringToDate(processMeta.getScheduleEndTime())); + } + if (null != processMeta.getScheduleWarningType()) { + scheduleObj.setWarningType(WarningType.valueOf(processMeta.getScheduleWarningType())); + } + if (null != processMeta.getScheduleWarningGroupId()) { + scheduleObj.setWarningGroupId(processMeta.getScheduleWarningGroupId()); + } + if (null != processMeta.getScheduleFailureStrategy()) { + scheduleObj.setFailureStrategy(FailureStrategy.valueOf(processMeta.getScheduleFailureStrategy())); + } + if (null != processMeta.getScheduleReleaseState()) { + scheduleObj.setReleaseState(ReleaseState.valueOf(processMeta.getScheduleReleaseState())); + } + if (null != processMeta.getScheduleProcessInstancePriority()) { + scheduleObj.setProcessInstancePriority(Priority.valueOf(processMeta.getScheduleProcessInstancePriority())); + } + if (null != processMeta.getScheduleWorkerGroupId()) { + scheduleObj.setWorkerGroupId(processMeta.getScheduleWorkerGroupId()); + } else { + if (null != processMeta.getScheduleWorkerGroupName()) { + List workerGroups = workerGroupMapper.queryWorkerGroupByName(processMeta.getScheduleWorkerGroupName()); + if(CollectionUtils.isNotEmpty(workerGroups)){ + scheduleObj.setWorkerGroupId(workerGroups.get(0).getId()); } - }else{ - putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); - return result; } - } catch (IOException e) { - throw new RuntimeException(e.getMessage(), e); } - return result; + + return scheduleMapper.insert(scheduleObj); } /** @@ -873,7 +813,7 @@ public class ProcessDefinitionService extends BaseDAGService { .filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).getString("type"))) .collect(Collectors.toList()); - if (!subProcessList.isEmpty()) { + if (CollectionUtils.isNotEmpty(subProcessList)) { importSubProcess(loginUser, targetProject, subJsonArray, subProcessIdMap); //sub process processId correct if (!subProcessIdMap.isEmpty()) { @@ -1307,7 +1247,7 @@ public class ProcessDefinitionService extends BaseDAGService { private String recursionProcessDefinitionName(Integer projectId,String processDefinitionName,int num){ ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(projectId, processDefinitionName); if (processDefinition != null) { - if(num>1){ + if(num > 1){ String str = processDefinitionName.substring(0,processDefinitionName.length() - 3); processDefinitionName = str + "("+num+")"; }else{ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FileUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FileUtils.java index 9c0e339b07..f88d26164b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FileUtils.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FileUtils.java @@ -22,9 +22,12 @@ import org.springframework.core.io.Resource; import org.springframework.core.io.UrlResource; import org.springframework.web.multipart.MultipartFile; +import java.io.BufferedReader; import java.io.File; import java.io.IOException; +import java.io.InputStreamReader; import java.net.MalformedURLException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -38,7 +41,7 @@ public class FileUtils { /** * copy source file to target file * - * @param file file + * @param file file * @param destFilename destination file name */ @@ -77,4 +80,27 @@ public class FileUtils { } return null; } + + /** + * file convert String + * @param file MultipartFile file + * @return file content string + */ + public static String file2String(MultipartFile file) { + StringBuilder strBuilder = new StringBuilder(); + + try (InputStreamReader inputStreamReader = new InputStreamReader(file.getInputStream(), StandardCharsets.UTF_8)) { + BufferedReader streamReader = new BufferedReader(inputStreamReader); + String inputStr; + + while ((inputStr = streamReader.readLine()) != null) { + strBuilder.append(inputStr); + } + + } catch (IOException e) { + logger.error("file convert to string failed: {}", file.getName()); + } + + return strBuilder.toString(); + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java index c013aac92b..00d95779ed 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java @@ -25,11 +25,13 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.List; + /** * task node add datasource param strategy */ @Service -public class DataSourceParam implements exportProcessAddTaskParam, InitializingBean { +public class DataSourceParam implements ProcessAddTaskParam, InitializingBean { @Autowired private DataSourceMapper dataSourceMapper; @@ -40,7 +42,7 @@ public class DataSourceParam implements exportProcessAddTaskParam, InitializingB * @return task node json object */ @Override - public JSONObject addSpecialParam(JSONObject taskNode) { + public JSONObject addExportSpecialParam(JSONObject taskNode) { // add sqlParameters JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params")); DataSource dataSource = dataSourceMapper.selectById((Integer) sqlParameters.get("datasource")); @@ -52,6 +54,23 @@ public class DataSourceParam implements exportProcessAddTaskParam, InitializingB return taskNode; } + /** + * import process add datasource params + * @param taskNode task node json object + * @return task node json object + */ + @Override + public JSONObject addImportSpecialParam(JSONObject taskNode) { + JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params")); + List dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName")); + if (!dataSources.isEmpty()) { + DataSource dataSource = dataSources.get(0); + sqlParameters.put("datasource", dataSource.getId()); + } + taskNode.put("params", sqlParameters); + return taskNode; + } + /** * put datasource strategy diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java index bdf202c5fa..b42b3b5a02 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java @@ -21,7 +21,9 @@ import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -30,19 +32,22 @@ import org.springframework.stereotype.Service; * task node add dependent param strategy */ @Service -public class DependentParam implements exportProcessAddTaskParam, InitializingBean { +public class DependentParam implements ProcessAddTaskParam, InitializingBean { @Autowired ProcessDefinitionMapper processDefineMapper; + @Autowired + ProjectMapper projectMapper; + /** * add dependent param * @param taskNode task node json object * @return task node json object */ @Override - public JSONObject addSpecialParam(JSONObject taskNode) { + public JSONObject addExportSpecialParam(JSONObject taskNode) { // add dependent param JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence")); @@ -67,6 +72,36 @@ public class DependentParam implements exportProcessAddTaskParam, InitializingBe return taskNode; } + /** + * import process add dependent param + * @param taskNode task node json object + * @return + */ + @Override + public JSONObject addImportSpecialParam(JSONObject taskNode) { + JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence")); + if(dependentParameters != null){ + JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList"); + for (int h = 0; h < dependTaskList.size(); h++) { + JSONObject dependentTaskModel = dependTaskList.getJSONObject(h); + JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList"); + for (int k = 0; k < dependItemList.size(); k++) { + JSONObject dependentItem = dependItemList.getJSONObject(k); + Project dependentItemProject = projectMapper.queryByName(dependentItem.getString("projectName")); + if(dependentItemProject != null){ + ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getId(),dependentItem.getString("definitionName")); + if(definition != null){ + dependentItem.put("projectId",dependentItemProject.getId()); + dependentItem.put("definitionId",definition.getId()); + } + } + } + } + taskNode.put("dependence", dependentParameters); + } + return taskNode; + } + /** * put dependent strategy */ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/exportProcessAddTaskParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java similarity index 75% rename from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/exportProcessAddTaskParam.java rename to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java index 5ae1667cfb..b30b777ca3 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/exportProcessAddTaskParam.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java @@ -19,14 +19,21 @@ package org.apache.dolphinscheduler.api.utils.exportprocess; import com.alibaba.fastjson.JSONObject; /** - * exportProcessAddTaskParam + * ProcessAddTaskParam */ -public interface exportProcessAddTaskParam { +public interface ProcessAddTaskParam { + + /** + * add export task special param: sql task dependent task + * @param taskNode task node json object + * @return task node json object + */ + JSONObject addExportSpecialParam(JSONObject taskNode); /** * add task special param: sql task dependent task * @param taskNode task node json object * @return task node json object */ - JSONObject addSpecialParam(JSONObject taskNode); + JSONObject addImportSpecialParam(JSONObject taskNode); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java index f4faa152a6..b8f7b03dee 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java @@ -24,13 +24,13 @@ import java.util.concurrent.ConcurrentHashMap; */ public class TaskNodeParamFactory { - private static Map taskServices = new ConcurrentHashMap<>(); + private static Map taskServices = new ConcurrentHashMap<>(); - public static exportProcessAddTaskParam getByTaskType(String taskType){ + public static ProcessAddTaskParam getByTaskType(String taskType){ return taskServices.get(taskType); } - static void register(String taskType, exportProcessAddTaskParam addSpecialTaskParam){ + static void register(String taskType, ProcessAddTaskParam addSpecialTaskParam){ if (null != taskType) { taskServices.put(taskType, addSpecialTaskParam); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 82ba43d520..b30a919598 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -20,9 +20,11 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.api.ApiApplicationServer; +import org.apache.dolphinscheduler.api.dto.ProcessMeta; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.*; @@ -164,7 +166,7 @@ public class ProcessDefinitionServiceTest { Mockito.when(dataSourceMapper.selectById(1)).thenReturn(getDataSource()); Mockito.when(processDefineMapper.queryByDefineId(2)).thenReturn(getProcessDefinition()); - String corSqlDependentJson = processDefinitionService.addTaskNodeSpecialParam(sqlDependentJson); + String corSqlDependentJson = processDefinitionService.addExportTaskNodeSpecialParam(sqlDependentJson); JSONAssert.assertEquals(sqlDependentJson,corSqlDependentJson,false); @@ -182,6 +184,62 @@ public class ProcessDefinitionServiceTest { Assert.assertNotEquals(sqlDependentJson,exportProcessMetaDataStr); } + @Test + public void testAddExportTaskNodeSpecialParam() throws JSONException { + String shellJson = "{\"globalParams\":[],\"tasks\":[{\"id\":\"tasks-9527\",\"name\":\"shell-1\"," + + "\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"}," + + "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," + + "\"timeout\":{\"strategy\":\"\",\"interval\":1,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," + + "\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}"; + + String resultStr = processDefinitionService.addExportTaskNodeSpecialParam(shellJson); + JSONAssert.assertEquals(shellJson, resultStr, false); + } + + @Test + public void testImportProcessSchedule() { + User loginUser = new User(); + loginUser.setId(1); + loginUser.setUserType(UserType.GENERAL_USER); + + String currentProjectName = "test"; + String processDefinitionName = "test_process"; + Integer processDefinitionId = 1; + Schedule schedule = getSchedule(); + + ProcessMeta processMeta = getProcessMeta(); + + int insertFlag = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMeta, + processDefinitionName, processDefinitionId); + Assert.assertEquals(0, insertFlag); + + ProcessMeta processMetaCron = new ProcessMeta(); + processMetaCron.setScheduleCrontab(schedule.getCrontab()); + + int insertFlagCron = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMetaCron, + processDefinitionName, processDefinitionId); + Assert.assertEquals(0, insertFlagCron); + + WorkerGroup workerGroup = new WorkerGroup(); + workerGroup.setName("ds-test-workergroup"); + workerGroup.setId(2); + List workerGroups = new ArrayList<>(); + workerGroups.add(workerGroup); + Mockito.when(workerGroupMapper.queryWorkerGroupByName("ds-test")).thenReturn(workerGroups); + + processMetaCron.setScheduleWorkerGroupName("ds-test"); + int insertFlagWorker = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMetaCron, + processDefinitionName, processDefinitionId); + Assert.assertEquals(0, insertFlagWorker); + + Mockito.when(workerGroupMapper.queryWorkerGroupByName("ds-test")).thenReturn(null); + int workerNullFlag = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMetaCron, + processDefinitionName, processDefinitionId); + Assert.assertEquals(0, workerNullFlag); + + + } + /** * import sub process test */ @@ -321,9 +379,50 @@ public class ProcessDefinitionServiceTest { Assert.assertTrue(delete); + String processMetaJson = ""; + improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson); + processMetaJson = "{\"scheduleWorkerGroupId\":-1}"; + improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson); + + processMetaJson = "{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\"}"; + improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson); + + processMetaJson = "{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\",\"processDefinitionName\":\"test_definition\"}"; + improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson); + + + } + + /** + * check import process metadata + * @param file file + * @param loginUser login user + * @param currentProjectName current project name + * @param processMetaJson process meta json + * @throws IOException IO exception + */ + private void improssProcessCheckData(File file, User loginUser, String currentProjectName, String processMetaJson) throws IOException { + //check null + FileUtils.writeStringToFile(new File("/tmp/task.json"),processMetaJson); + + File fileEmpty = new File("/tmp/task.json"); + + FileInputStream fileEmptyInputStream = new FileInputStream("/tmp/task.json"); + + MultipartFile multiFileEmpty = new MockMultipartFile(fileEmpty.getName(), fileEmpty.getName(), + ContentType.APPLICATION_OCTET_STREAM.toString(), fileEmptyInputStream); + + Map resEmptyProcess = processDefinitionService.importProcessDefinition(loginUser, multiFileEmpty, currentProjectName); + + Assert.assertEquals(Status.DATA_IS_NULL, resEmptyProcess.get(Constants.STATUS)); + + boolean deleteFlag = file.delete(); + + Assert.assertTrue(deleteFlag); } + /** * get mock datasource * @return DataSource @@ -382,6 +481,26 @@ public class ProcessDefinitionServiceTest { return schedule; } + /** + * get mock processMeta + * @return processMeta + */ + private ProcessMeta getProcessMeta() { + ProcessMeta processMeta = new ProcessMeta(); + Schedule schedule = getSchedule(); + processMeta.setScheduleCrontab(schedule.getCrontab()); + processMeta.setScheduleStartTime(DateUtils.dateToString(schedule.getStartTime())); + processMeta.setScheduleEndTime(DateUtils.dateToString(schedule.getEndTime())); + processMeta.setScheduleWarningType(String.valueOf(schedule.getWarningType())); + processMeta.setScheduleWarningGroupId(schedule.getWarningGroupId()); + processMeta.setScheduleFailureStrategy(String.valueOf(schedule.getFailureStrategy())); + processMeta.setScheduleReleaseState(String.valueOf(schedule.getReleaseState())); + processMeta.setScheduleProcessInstancePriority(String.valueOf(schedule.getProcessInstancePriority())); + processMeta.setScheduleWorkerGroupId(schedule.getWorkerGroupId()); + processMeta.setScheduleWorkerGroupName("workgroup1"); + return processMeta; + } + private List getSchedulerList() { List scheduleList = new ArrayList<>(); scheduleList.add(getSchedule()); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/FileUtilsTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/FileUtilsTest.java index f205d39156..1b05eb0e99 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/FileUtilsTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/FileUtilsTest.java @@ -17,21 +17,17 @@ package org.apache.dolphinscheduler.api.utils; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; +import org.apache.http.entity.ContentType; +import org.junit.*; import org.junit.rules.TemporaryFolder; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.io.Resource; +import org.springframework.mock.web.MockMultipartFile; import org.springframework.web.multipart.MultipartFile; -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; +import java.io.*; import static org.junit.Assert.*; @@ -106,4 +102,23 @@ public class FileUtilsTest { assertNull(resource1); } + + @Test + public void testFile2String() throws IOException { + String content = "123"; + org.apache.dolphinscheduler.common.utils.FileUtils.writeStringToFile(new File("/tmp/task.json"),content); + + File file = new File("/tmp/task.json"); + FileInputStream fileInputStream = new FileInputStream("/tmp/task.json"); + MultipartFile multipartFile = new MockMultipartFile(file.getName(), file.getName(), + ContentType.APPLICATION_OCTET_STREAM.toString(), fileInputStream); + + String resultStr = FileUtils.file2String(multipartFile); + + Assert.assertEquals(content, resultStr); + + boolean delete = file.delete(); + + Assert.assertTrue(delete); + } } \ No newline at end of file diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java index 0a271d984f..b8fcd62333 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java @@ -35,25 +35,52 @@ import org.springframework.test.context.junit4.SpringRunner; public class DataSourceParamTest { @Test - public void testAddDependentSpecialParam() throws JSONException { + public void testAddExportDependentSpecialParam() throws JSONException { - String dependentJson = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\"," + - "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"," + - "\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\"," + - "\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\"," + - "\"cycle\":\"day\",\"dateValue\":\"today\"}]}]}}"; + String sqlJson = "{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," + + "\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\"," + + "\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\"" + + ",\"localParams\":[],\"connParams\":\"\"," + + "\"preStatements\":[],\"postStatements\":[]}," + + "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," + + "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\"," + + "\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," + + "\"preTasks\":[\"dependent\"]}"; - JSONObject taskNode = JSONUtils.parseObject(dependentJson); + JSONObject taskNode = JSONUtils.parseObject(sqlJson); if (StringUtils.isNotEmpty(taskNode.getString("type"))) { String taskType = taskNode.getString("type"); - exportProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); + ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - JSONObject dependent = addTaskParam.addSpecialParam(taskNode); + JSONObject sql = addTaskParam.addExportSpecialParam(taskNode); - JSONAssert.assertEquals(taskNode.toString(),dependent.toString(),false); + JSONAssert.assertEquals(taskNode.toString(), sql.toString(), false); } + } + + @Test + public void testAddImportDependentSpecialParam() throws JSONException { + String sqlJson = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\"," + + "\"type\":\"SQL\",\"params\":{\"postStatements\":[]," + + "\"connParams\":\"\",\"receiversCc\":\"\",\"udfs\":\"\"," + + "\"type\":\"MYSQL\",\"title\":\"\",\"sql\":\"show tables\",\"" + + "preStatements\":[],\"sqlType\":\"1\",\"receivers\":\"\",\"datasource\":1," + + "\"showType\":\"TABLE\",\"localParams\":[],\"datasourceName\":\"dsmetadata\"},\"timeout\"" + + ":{\"enable\":false,\"strategy\":\"\"},\"maxRetryTimes\":\"0\"," + + "\"taskInstancePriority\":\"MEDIUM\",\"name\":\"mysql\",\"dependence\":{}," + + "\"retryInterval\":\"1\",\"preTasks\":[\"dependent\"],\"id\":\"tasks-8745\"}"; + JSONObject taskNode = JSONUtils.parseObject(sqlJson); + if (StringUtils.isNotEmpty(taskNode.getString("type"))) { + String taskType = taskNode.getString("type"); + + ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); + + JSONObject sql = addTaskParam.addImportSpecialParam(taskNode); + + JSONAssert.assertEquals(taskNode.toString(), sql.toString(), false); + } } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java index db81138f43..d21b7be0e2 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java @@ -34,29 +34,78 @@ import org.springframework.test.context.junit4.SpringRunner; @SpringBootTest(classes = ApiApplicationServer.class) public class DependentParamTest { + @Test - public void testAddDependentSpecialParam() throws JSONException { + public void testAddExportDependentSpecialParam() throws JSONException { + String dependentJson = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\"," + + "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"," + + "\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\"," + + "\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\"," + + "\"cycle\":\"day\",\"dateValue\":\"today\"}]}]}}"; + + JSONObject taskNode = JSONUtils.parseObject(dependentJson); + if (StringUtils.isNotEmpty(taskNode.getString("type"))) { + String taskType = taskNode.getString("type"); + + ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); + + JSONObject dependent = addTaskParam.addExportSpecialParam(taskNode); + + JSONAssert.assertEquals(taskNode.toString(), dependent.toString(), false); + } + + String dependentEmpty = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\"," + + "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"}"; + + JSONObject taskEmpty = JSONUtils.parseObject(dependentEmpty); + if (StringUtils.isNotEmpty(taskEmpty.getString("type"))) { + String taskType = taskEmpty.getString("type"); + + ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - String sqlJson = "{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," + - "\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\"," + - "\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\"" + - ",\"localParams\":[],\"connParams\":\"\"," + - "\"preStatements\":[],\"postStatements\":[]}," + - "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," + - "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\"," + - "\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," + - "\"preTasks\":[\"dependent\"]}"; + JSONObject dependent = addTaskParam.addImportSpecialParam(taskEmpty); + JSONAssert.assertEquals(taskEmpty.toString(), dependent.toString(), false); + } + + } - JSONObject taskNode = JSONUtils.parseObject(sqlJson); + @Test + public void testAddImportDependentSpecialParam() throws JSONException { + String dependentJson = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\"" + + ",\"type\":\"DEPENDENT\",\"params\":{},\"timeout\":{\"enable\":false," + + "\"strategy\":\"\"},\"maxRetryTimes\":\"0\",\"taskInstancePriority\":\"MEDIUM\"" + + ",\"name\":\"dependent\"," + + "\"dependence\":{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\"," + + "\"definitionName\":\"shell-1\",\"depTasks\":\"shell-1\",\"projectName\":\"test\"," + + "\"projectId\":1,\"cycle\":\"day\",\"definitionId\":7}],\"relation\":\"AND\"}]," + + "\"relation\":\"AND\"},\"retryInterval\":\"1\",\"preTasks\":[],\"id\":\"tasks-55485\"}"; + + JSONObject taskNode = JSONUtils.parseObject(dependentJson); if (StringUtils.isNotEmpty(taskNode.getString("type"))) { String taskType = taskNode.getString("type"); - exportProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); + ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); + + JSONObject dependent = addTaskParam.addImportSpecialParam(taskNode); + + JSONAssert.assertEquals(taskNode.toString(), dependent.toString(), false); + } + + String dependentEmpty = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\"" + + ",\"type\":\"DEPENDENT\",\"params\":{},\"timeout\":{\"enable\":false," + + "\"strategy\":\"\"},\"maxRetryTimes\":\"0\",\"taskInstancePriority\":\"MEDIUM\"" + + ",\"name\":\"dependent\",\"retryInterval\":\"1\",\"preTasks\":[],\"id\":\"tasks-55485\"}"; + + JSONObject taskNodeEmpty = JSONUtils.parseObject(dependentEmpty); + if (StringUtils.isNotEmpty(taskNodeEmpty.getString("type"))) { + String taskType = taskNodeEmpty.getString("type"); + + ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - JSONObject sql = addTaskParam.addSpecialParam(taskNode); + JSONObject dependent = addTaskParam.addImportSpecialParam(taskNode); - JSONAssert.assertEquals(taskNode.toString(),sql.toString(),false); + JSONAssert.assertEquals(taskNodeEmpty.toString(), dependent.toString(), false); } }