diff --git a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/JSONUtilsTest.java b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/JSONUtilsTest.java index a151abc714..843bcf4083 100644 --- a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/JSONUtilsTest.java +++ b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/JSONUtilsTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.alert.utils; +import com.fasterxml.jackson.databind.JsonNode; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -109,4 +110,5 @@ public class JSONUtilsTest { } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 69213211cc..cbf432d8c0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -17,11 +17,12 @@ package org.apache.dolphinscheduler.api.service; import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.api.dto.ProcessMeta; import org.apache.dolphinscheduler.api.dto.treeview.Instance; import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto; @@ -73,6 +74,11 @@ public class ProcessDefinitionService extends BaseDAGService { private static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionService.class); + private static final String PROCESSDEFINITIONID = "processDefinitionId"; + + private static final String RELEASESTATE = "releaseState"; + + private static final String TASKS = "tasks"; @Autowired private ProjectMapper projectMapper; @@ -99,13 +105,13 @@ public class ProcessDefinitionService extends BaseDAGService { /** * create process definition * - * @param loginUser login user - * @param projectName project name - * @param name process definition name + * @param loginUser login user + * @param projectName project name + * @param name process definition name * @param processDefinitionJson process definition json - * @param desc description - * @param locations locations for nodes - * @param connects connects for nodes + * @param desc description + * @param locations locations for nodes + * @param connects connects for nodes * @return create result code * @throws JsonProcessingException JsonProcessingException */ @@ -163,29 +169,30 @@ public class ProcessDefinitionService extends BaseDAGService { // return processDefinition object with ID result.put(Constants.DATA_LIST, processDefineMapper.selectById(processDefine.getId())); putMsg(result, Status.SUCCESS); - result.put("processDefinitionId",processDefine.getId()); + result.put("processDefinitionId", processDefine.getId()); return result; } /** * get resource ids + * * @param processData process data * @return resource ids */ private String getResourceIds(ProcessData processData) { List tasks = processData.getTasks(); Set resourceIds = new HashSet<>(); - for(TaskNode taskNode : tasks){ + for (TaskNode taskNode : tasks) { String taskParameter = taskNode.getParams(); - AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(),taskParameter); + AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(), taskParameter); if (CollectionUtils.isNotEmpty(params.getResourceFilesList())) { - Set tempSet = params.getResourceFilesList().stream().map(t->t.getId()).collect(Collectors.toSet()); + Set tempSet = params.getResourceFilesList().stream().map(t -> t.getId()).collect(Collectors.toSet()); resourceIds.addAll(tempSet); } } StringBuilder sb = new StringBuilder(); - for(int i : resourceIds) { + for (int i : resourceIds) { if (sb.length() > 0) { sb.append(","); } @@ -198,7 +205,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * query process definition list * - * @param loginUser login user + * @param loginUser login user * @param projectName project name * @return definition list */ @@ -224,12 +231,12 @@ public class ProcessDefinitionService extends BaseDAGService { /** * query process definition list paging * - * @param loginUser login user + * @param loginUser login user * @param projectName project name - * @param searchVal search value - * @param pageNo page number - * @param pageSize page size - * @param userId user id + * @param searchVal search value + * @param pageNo page number + * @param pageSize page size + * @param userId user id * @return process definition page */ public Map queryProcessDefinitionListPaging(User loginUser, String projectName, String searchVal, Integer pageNo, Integer pageSize, Integer userId) { @@ -245,10 +252,10 @@ public class ProcessDefinitionService extends BaseDAGService { Page page = new Page(pageNo, pageSize); IPage processDefinitionIPage = processDefineMapper.queryDefineListPaging( - page, searchVal, userId, project.getId(),isAdmin(loginUser)); + page, searchVal, userId, project.getId(), isAdmin(loginUser)); PageInfo pageInfo = new PageInfo(pageNo, pageSize); - pageInfo.setTotalCount((int)processDefinitionIPage.getTotal()); + pageInfo.setTotalCount((int) processDefinitionIPage.getTotal()); pageInfo.setLists(processDefinitionIPage.getRecords()); result.put(Constants.DATA_LIST, pageInfo); putMsg(result, Status.SUCCESS); @@ -259,9 +266,9 @@ public class ProcessDefinitionService extends BaseDAGService { /** * query datail of process definition * - * @param loginUser login user + * @param loginUser login user * @param projectName project name - * @param processId process definition id + * @param processId process definition id * @return process definition detail */ public Map queryProcessDefinitionById(User loginUser, String projectName, Integer processId) { @@ -289,12 +296,12 @@ public class ProcessDefinitionService extends BaseDAGService { /** * copy process definition * - * @param loginUser login user + * @param loginUser login user * @param projectName project name - * @param processId process definition id + * @param processId process definition id * @return copy result code */ - public Map copyProcessDefinition(User loginUser, String projectName, Integer processId) throws JsonProcessingException{ + public Map copyProcessDefinition(User loginUser, String projectName, Integer processId) throws JsonProcessingException { Map result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); @@ -313,7 +320,7 @@ public class ProcessDefinitionService extends BaseDAGService { return createProcessDefinition( loginUser, projectName, - processDefinition.getName()+"_copy_"+System.currentTimeMillis(), + processDefinition.getName() + "_copy_" + System.currentTimeMillis(), processDefinition.getProcessDefinitionJson(), processDefinition.getDescription(), processDefinition.getLocations(), @@ -324,14 +331,14 @@ public class ProcessDefinitionService extends BaseDAGService { /** * update process definition * - * @param loginUser login user - * @param projectName project name - * @param name process definition name - * @param id process definition id + * @param loginUser login user + * @param projectName project name + * @param name process definition name + * @param id process definition id * @param processDefinitionJson process definition json - * @param desc description - * @param locations locations for nodes - * @param connects connects for nodes + * @param desc description + * @param locations locations for nodes + * @param connects connects for nodes * @return update result code */ public Map updateProcessDefinition(User loginUser, String projectName, int id, String name, @@ -400,9 +407,9 @@ public class ProcessDefinitionService extends BaseDAGService { /** * verify process definition name unique * - * @param loginUser login user + * @param loginUser login user * @param projectName project name - * @param name name + * @param name name * @return true if process definition name not exists, otherwise false */ public Map verifyProcessDefinitionName(User loginUser, String projectName, String name) { @@ -427,8 +434,8 @@ public class ProcessDefinitionService extends BaseDAGService { /** * delete process definition by id * - * @param loginUser login user - * @param projectName project name + * @param loginUser login user + * @param projectName project name * @param processDefinitionId process definition id * @return delete result code */ @@ -459,22 +466,22 @@ public class ProcessDefinitionService extends BaseDAGService { // check process definition is already online if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { - putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE,processDefinitionId); + putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinitionId); return result; } // get the timing according to the process definition List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId); if (!schedules.isEmpty() && schedules.size() > 1) { - logger.warn("scheduler num is {},Greater than 1",schedules.size()); + logger.warn("scheduler num is {},Greater than 1", schedules.size()); putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR); return result; - }else if(schedules.size() == 1){ + } else if (schedules.size() == 1) { Schedule schedule = schedules.get(0); - if(schedule.getReleaseState() == ReleaseState.OFFLINE){ + if (schedule.getReleaseState() == ReleaseState.OFFLINE) { scheduleMapper.deleteById(schedule.getId()); - }else if(schedule.getReleaseState() == ReleaseState.ONLINE){ - putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE,schedule.getId()); + } else if (schedule.getReleaseState() == ReleaseState.ONLINE) { + putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId()); return result; } } @@ -492,9 +499,9 @@ public class ProcessDefinitionService extends BaseDAGService { /** * release process definition: online / offline * - * @param loginUser login user - * @param projectName project name - * @param id process definition id + * @param loginUser login user + * @param projectName project name + * @param id process definition id * @param releaseState release state * @return release result code */ @@ -513,7 +520,7 @@ public class ProcessDefinitionService extends BaseDAGService { // check state if (null == state) { - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "releaseState"); + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); return result; } @@ -525,12 +532,12 @@ public class ProcessDefinitionService extends BaseDAGService { String resourceIds = processDefinition.getResourceIds(); if (StringUtils.isNotBlank(resourceIds)) { Integer[] resourceIdArray = Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new); - PermissionCheck permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_ID,processService,resourceIdArray,loginUser.getId(),logger); + PermissionCheck permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE_ID, processService, resourceIdArray, loginUser.getId(), logger); try { permissionCheck.checkPermission(); } catch (Exception e) { - logger.error(e.getMessage(),e); - putMsg(result, Status.RESOURCE_NOT_EXIST_OR_NO_PERMISSION, "releaseState"); + logger.error(e.getMessage(), e); + putMsg(result, Status.RESOURCE_NOT_EXIST_OR_NO_PERMISSION, RELEASESTATE); return result; } } @@ -545,7 +552,7 @@ public class ProcessDefinitionService extends BaseDAGService { new int[]{processDefinition.getId()} ); - for(Schedule schedule:scheduleList){ + for (Schedule schedule : scheduleList) { logger.info("set schedule offline, project id: {}, schedule id: {}, process definition id: {}", project.getId(), schedule.getId(), id); // set status schedule.setReleaseState(ReleaseState.OFFLINE); @@ -554,7 +561,7 @@ public class ProcessDefinitionService extends BaseDAGService { } break; default: - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "releaseState"); + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); return result; } @@ -564,14 +571,15 @@ public class ProcessDefinitionService extends BaseDAGService { /** * batch export process definition by ids + * * @param loginUser * @param projectName * @param processDefinitionIds * @param response */ - public void batchExportProcessDefinitionByIds(User loginUser, String projectName, String processDefinitionIds, HttpServletResponse response){ + public void batchExportProcessDefinitionByIds(User loginUser, String projectName, String processDefinitionIds, HttpServletResponse response) { - if(StringUtils.isEmpty(processDefinitionIds)){ + if (StringUtils.isEmpty(processDefinitionIds)) { return; } @@ -582,24 +590,25 @@ public class ProcessDefinitionService extends BaseDAGService { Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Status resultStatus = (Status) checkResult.get(Constants.STATUS); - if(resultStatus != Status.SUCCESS){ + if (resultStatus != Status.SUCCESS) { return; } List processDefinitionList = getProcessDefinitionList(processDefinitionIds); - if(CollectionUtils.isNotEmpty(processDefinitionList)){ + if (CollectionUtils.isNotEmpty(processDefinitionList)) { downloadProcessDefinitionFile(response, processDefinitionList); } } /** * get process definition list by ids + * * @param processDefinitionIds * @return */ - private List getProcessDefinitionList(String processDefinitionIds){ + private List getProcessDefinitionList(String processDefinitionIds) { List processDefinitionList = new ArrayList<>(); String[] processDefinitionIdArray = processDefinitionIds.split(","); for (String strProcessDefinitionId : processDefinitionIdArray) { @@ -616,6 +625,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * download the process definition file + * * @param response * @param processDefinitionList */ @@ -631,7 +641,7 @@ public class ProcessDefinitionService extends BaseDAGService { buff.close(); } catch (IOException e) { logger.warn("export process fail", e); - }finally { + } finally { if (null != buff) { try { buff.close(); @@ -651,19 +661,21 @@ public class ProcessDefinitionService extends BaseDAGService { /** * get export process metadata string + * * @param processDefinitionId process definition id - * @param processDefinition process definition + * @param processDefinition process definition * @return export process metadata string */ public String exportProcessMetaDataStr(Integer processDefinitionId, ProcessDefinition processDefinition) { //create workflow json file - return JSONUtils.toJsonString(exportProcessMetaData(processDefinitionId,processDefinition)); + return JSONUtils.toJsonString(exportProcessMetaData(processDefinitionId, processDefinition)); } /** * get export process metadata string + * * @param processDefinitionId process definition id - * @param processDefinition process definition + * @param processDefinition process definition * @return export process metadata string */ public ProcessMeta exportProcessMetaData(Integer processDefinitionId, ProcessDefinition processDefinition) { @@ -699,17 +711,18 @@ public class ProcessDefinitionService extends BaseDAGService { /** * correct task param which has datasource or dependent + * * @param processDefinitionJson processDefinitionJson * @return correct processDefinitionJson */ public String addExportTaskNodeSpecialParam(String processDefinitionJson) { - JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson); - JSONArray jsonArray = (JSONArray) jsonObject.get("tasks"); + ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson); + ArrayNode jsonArray = (ArrayNode) jsonObject.path(TASKS); for (int i = 0; i < jsonArray.size(); i++) { - JSONObject taskNode = jsonArray.getJSONObject(i); - if (StringUtils.isNotEmpty(taskNode.getString("type"))) { - String taskType = taskNode.getString("type"); + JsonNode taskNode = jsonArray.path(i); + if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { + String taskType = taskNode.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); if (null != addTaskParam) { @@ -717,12 +730,13 @@ public class ProcessDefinitionService extends BaseDAGService { } } } - jsonObject.put("tasks", jsonArray); + jsonObject.set(TASKS, jsonArray); return jsonObject.toString(); } /** * check task if has sub process + * * @param taskType task type * @return if task has sub process return true else false */ @@ -732,8 +746,9 @@ public class ProcessDefinitionService extends BaseDAGService { /** * import process definition - * @param loginUser login user - * @param file process metadata json file + * + * @param loginUser login user + * @param file process metadata json file * @param currentProjectName current project name * @return import process */ @@ -741,7 +756,7 @@ public class ProcessDefinitionService extends BaseDAGService { public Map importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) { Map result = new HashMap<>(5); String processMetaJson = FileUtils.file2String(file); - List processMetaList = JSON.parseArray(processMetaJson,ProcessMeta.class); + List processMetaList = JSON.parseArray(processMetaJson, ProcessMeta.class); //check file content if (CollectionUtils.isEmpty(processMetaList)) { @@ -749,9 +764,9 @@ public class ProcessDefinitionService extends BaseDAGService { return result; } - for(ProcessMeta processMeta:processMetaList){ + for (ProcessMeta processMeta : processMetaList) { - if (!checkAndImportProcessDefinition(loginUser, currentProjectName, result, processMeta)){ + if (!checkAndImportProcessDefinition(loginUser, currentProjectName, result, processMeta)) { return result; } } @@ -761,6 +776,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * check and import process definition + * * @param loginUser * @param currentProjectName * @param result @@ -769,7 +785,7 @@ public class ProcessDefinitionService extends BaseDAGService { */ private boolean checkAndImportProcessDefinition(User loginUser, String currentProjectName, Map result, ProcessMeta processMeta) { - if(!checkImportanceParams(processMeta,result)){ + if (!checkImportanceParams(processMeta, result)) { return false; } @@ -777,7 +793,7 @@ public class ProcessDefinitionService extends BaseDAGService { String processDefinitionName = processMeta.getProcessDefinitionName(); //use currentProjectName to query Project targetProject = projectMapper.queryByName(currentProjectName); - if(null != targetProject){ + if (null != targetProject) { processDefinitionName = recursionProcessDefinitionName(targetProject.getId(), processDefinitionName, 1); } @@ -801,14 +817,14 @@ public class ProcessDefinitionService extends BaseDAGService { processDefinitionName, addImportTaskNodeParam(loginUser, processMeta.getProcessDefinitionJson(), targetProject)); - if(createProcessResult == null){ + if (createProcessResult == null) { return false; } //create process definition Integer processDefinitionId = - Objects.isNull(createProcessResult.get("processDefinitionId"))? - null:Integer.parseInt(createProcessResult.get("processDefinitionId").toString()); + Objects.isNull(createProcessResult.get(PROCESSDEFINITIONID)) ? + null : Integer.parseInt(createProcessResult.get(PROCESSDEFINITIONID).toString()); //scheduler param return getImportProcessScheduleResult(loginUser, @@ -822,6 +838,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * get create process result + * * @param loginUser * @param currentProjectName * @param result @@ -835,12 +852,12 @@ public class ProcessDefinitionService extends BaseDAGService { Map result, ProcessMeta processMeta, String processDefinitionName, - String importProcessParam){ + String importProcessParam) { Map createProcessResult = null; try { createProcessResult = createProcessDefinition(loginUser - ,currentProjectName, - processDefinitionName+"_import_"+System.currentTimeMillis(), + , currentProjectName, + processDefinitionName + "_import_" + System.currentTimeMillis(), importProcessParam, processMeta.getProcessDefinitionDescription(), processMeta.getProcessDefinitionLocations(), @@ -856,6 +873,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * get import process schedule result + * * @param loginUser * @param currentProjectName * @param result @@ -887,11 +905,12 @@ public class ProcessDefinitionService extends BaseDAGService { /** * check importance params + * * @param processMeta * @param result * @return */ - private boolean checkImportanceParams(ProcessMeta processMeta,Map result){ + private boolean checkImportanceParams(ProcessMeta processMeta, Map result) { if (StringUtils.isEmpty(processMeta.getProjectName())) { putMsg(result, Status.DATA_IS_NULL, "projectName"); return false; @@ -910,18 +929,19 @@ public class ProcessDefinitionService extends BaseDAGService { /** * import process add special task param - * @param loginUser login user + * + * @param loginUser login user * @param processDefinitionJson process definition json - * @param targetProject target project + * @param targetProject target project * @return import process param */ private String addImportTaskNodeParam(User loginUser, String processDefinitionJson, Project targetProject) { - JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson); - JSONArray jsonArray = (JSONArray) jsonObject.get("tasks"); + ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson); + ArrayNode jsonArray = (ArrayNode) jsonObject.get(TASKS); //add sql and dependent param for (int i = 0; i < jsonArray.size(); i++) { - JSONObject taskNode = jsonArray.getJSONObject(i); - String taskType = taskNode.getString("type"); + JsonNode taskNode = jsonArray.path(i); + String taskType = taskNode.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); if (null != addTaskParam) { addTaskParam.addImportSpecialParam(taskNode); @@ -931,25 +951,26 @@ public class ProcessDefinitionService extends BaseDAGService { //recursive sub-process parameter correction map key for old process id value for new process id Map subProcessIdMap = new HashMap<>(20); - List subProcessList = jsonArray.stream() - .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).getString("type"))) + List subProcessList = StreamUtils.asStream(jsonArray.elements()) + .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).path("type").asText())) .collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(subProcessList)) { importSubProcess(loginUser, targetProject, jsonArray, subProcessIdMap); } - jsonObject.put("tasks", jsonArray); + jsonObject.set(TASKS, jsonArray); return jsonObject.toString(); } /** * import process schedule - * @param loginUser login user - * @param currentProjectName current project name - * @param processMeta process meta data + * + * @param loginUser login user + * @param currentProjectName current project name + * @param processMeta process meta data * @param processDefinitionName process definition name - * @param processDefinitionId process definition id + * @param processDefinitionId process definition id * @return insert schedule flag */ public int importProcessSchedule(User loginUser, String currentProjectName, ProcessMeta processMeta, @@ -998,84 +1019,87 @@ public class ProcessDefinitionService extends BaseDAGService { /** * check import process has sub process * recursion create sub process - * @param loginUser login user - * @param targetProject target project - * @param jsonArray process task array + * + * @param loginUser login user + * @param targetProject target project + * @param jsonArray process task array * @param subProcessIdMap correct sub process id map */ - public void importSubProcess(User loginUser, Project targetProject, JSONArray jsonArray, Map subProcessIdMap) { + public void importSubProcess(User loginUser, Project targetProject, ArrayNode jsonArray, Map subProcessIdMap) { for (int i = 0; i < jsonArray.size(); i++) { - JSONObject taskNode = jsonArray.getJSONObject(i); - String taskType = taskNode.getString("type"); - - if (checkTaskHasSubProcess(taskType)) { - //get sub process info - JSONObject subParams = JSONUtils.parseObject(taskNode.getString("params")); - Integer subProcessId = subParams.getInteger("processDefinitionId"); - ProcessDefinition subProcess = processDefineMapper.queryByDefineId(subProcessId); - //check is sub process exist in db - if (null != subProcess) { - String subProcessJson = subProcess.getProcessDefinitionJson(); - //check current project has sub process - ProcessDefinition currentProjectSubProcess = processDefineMapper.queryByDefineName(targetProject.getId(), subProcess.getName()); - - if (null == currentProjectSubProcess) { - JSONArray subJsonArray = (JSONArray) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get("tasks"); - - List subProcessList = subJsonArray.stream() - .filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).getString("type"))) - .collect(Collectors.toList()); - - if (CollectionUtils.isNotEmpty(subProcessList)) { - importSubProcess(loginUser, targetProject, subJsonArray, subProcessIdMap); - //sub process processId correct - if (!subProcessIdMap.isEmpty()) { - - for (Map.Entry entry : subProcessIdMap.entrySet()) { - String oldSubProcessId = "\"processDefinitionId\":" + entry.getKey(); - String newSubProcessId = "\"processDefinitionId\":" + entry.getValue(); - subProcessJson = subProcessJson.replaceAll(oldSubProcessId, newSubProcessId); - } - - subProcessIdMap.clear(); - } - } + ObjectNode taskNode = (ObjectNode) jsonArray.path(i); + String taskType = taskNode.path("type").asText(); - //if sub-process recursion - Date now = new Date(); - //create sub process in target project - ProcessDefinition processDefine = new ProcessDefinition(); - processDefine.setName(subProcess.getName()); - processDefine.setVersion(subProcess.getVersion()); - processDefine.setReleaseState(subProcess.getReleaseState()); - processDefine.setProjectId(targetProject.getId()); - processDefine.setUserId(loginUser.getId()); - processDefine.setProcessDefinitionJson(subProcessJson); - processDefine.setDescription(subProcess.getDescription()); - processDefine.setLocations(subProcess.getLocations()); - processDefine.setConnects(subProcess.getConnects()); - processDefine.setTimeout(subProcess.getTimeout()); - processDefine.setTenantId(subProcess.getTenantId()); - processDefine.setGlobalParams(subProcess.getGlobalParams()); - processDefine.setCreateTime(now); - processDefine.setUpdateTime(now); - processDefine.setFlag(subProcess.getFlag()); - processDefine.setReceivers(subProcess.getReceivers()); - processDefine.setReceiversCc(subProcess.getReceiversCc()); - processDefineMapper.insert(processDefine); - - logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), processDefine.getName()); - - //modify task node - ProcessDefinition newSubProcessDefine = processDefineMapper.queryByDefineName(processDefine.getProjectId(),processDefine.getName()); - - if (null != newSubProcessDefine) { - subProcessIdMap.put(subProcessId, newSubProcessDefine.getId()); - subParams.put("processDefinitionId", newSubProcessDefine.getId()); - taskNode.put("params", subParams); + if (!checkTaskHasSubProcess(taskType)) { + continue; + } + //get sub process info + ObjectNode subParams = (ObjectNode) taskNode.path("params"); + Integer subProcessId = subParams.path(PROCESSDEFINITIONID).asInt(); + ProcessDefinition subProcess = processDefineMapper.queryByDefineId(subProcessId); + //check is sub process exist in db + if (null == subProcess) { + continue; + } + String subProcessJson = subProcess.getProcessDefinitionJson(); + //check current project has sub process + ProcessDefinition currentProjectSubProcess = processDefineMapper.queryByDefineName(targetProject.getId(), subProcess.getName()); + + if (null == currentProjectSubProcess) { + ArrayNode subJsonArray = (ArrayNode) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get(TASKS); + + List subProcessList = StreamUtils.asStream(subJsonArray.elements()) + .filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).path("type").asText())) + .collect(Collectors.toList()); + + if (CollectionUtils.isNotEmpty(subProcessList)) { + importSubProcess(loginUser, targetProject, subJsonArray, subProcessIdMap); + //sub process processId correct + if (!subProcessIdMap.isEmpty()) { + + for (Map.Entry entry : subProcessIdMap.entrySet()) { + String oldSubProcessId = "\"processDefinitionId\":" + entry.getKey(); + String newSubProcessId = "\"processDefinitionId\":" + entry.getValue(); + subProcessJson = subProcessJson.replaceAll(oldSubProcessId, newSubProcessId); } + + subProcessIdMap.clear(); } } + + //if sub-process recursion + Date now = new Date(); + //create sub process in target project + ProcessDefinition processDefine = new ProcessDefinition(); + processDefine.setName(subProcess.getName()); + processDefine.setVersion(subProcess.getVersion()); + processDefine.setReleaseState(subProcess.getReleaseState()); + processDefine.setProjectId(targetProject.getId()); + processDefine.setUserId(loginUser.getId()); + processDefine.setProcessDefinitionJson(subProcessJson); + processDefine.setDescription(subProcess.getDescription()); + processDefine.setLocations(subProcess.getLocations()); + processDefine.setConnects(subProcess.getConnects()); + processDefine.setTimeout(subProcess.getTimeout()); + processDefine.setTenantId(subProcess.getTenantId()); + processDefine.setGlobalParams(subProcess.getGlobalParams()); + processDefine.setCreateTime(now); + processDefine.setUpdateTime(now); + processDefine.setFlag(subProcess.getFlag()); + processDefine.setReceivers(subProcess.getReceivers()); + processDefine.setReceiversCc(subProcess.getReceiversCc()); + processDefineMapper.insert(processDefine); + + logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), processDefine.getName()); + + //modify task node + ProcessDefinition newSubProcessDefine = processDefineMapper.queryByDefineName(processDefine.getProjectId(), processDefine.getName()); + + if (null != newSubProcessDefine) { + subProcessIdMap.put(subProcessId, newSubProcessDefine.getId()); + subParams.put(PROCESSDEFINITIONID, newSubProcessDefine.getId()); + taskNode.set("params", subParams); + } } } } @@ -1084,7 +1108,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * check the process definition node meets the specifications * - * @param processData process data + * @param processData process data * @param processDefinitionJson process definition json * @return check result code */ @@ -1094,7 +1118,7 @@ public class ProcessDefinitionService extends BaseDAGService { try { if (processData == null) { logger.error("process data is null"); - putMsg(result,Status.DATA_IS_NOT_VALID, processDefinitionJson); + putMsg(result, Status.DATA_IS_NOT_VALID, processDefinitionJson); return result; } @@ -1125,7 +1149,7 @@ public class ProcessDefinitionService extends BaseDAGService { // check extra params CheckUtils.checkOtherParams(taskNode.getExtras()); } - putMsg(result,Status.SUCCESS); + putMsg(result, Status.SUCCESS); } catch (Exception e) { result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR); result.put(Constants.MSG, e.getMessage()); @@ -1138,9 +1162,8 @@ public class ProcessDefinitionService extends BaseDAGService { * * @param defineId define id * @return task node list - * @throws Exception exception */ - public Map getTaskNodeListByDefinitionId(Integer defineId) throws Exception { + public Map getTaskNodeListByDefinitionId(Integer defineId) { Map result = new HashMap<>(); ProcessDefinition processDefinition = processDefineMapper.selectById(defineId); @@ -1158,7 +1181,7 @@ public class ProcessDefinitionService extends BaseDAGService { //process data check if (null == processData) { logger.error("process data is null"); - putMsg(result,Status.DATA_IS_NOT_VALID, processDefinitionJson); + putMsg(result, Status.DATA_IS_NOT_VALID, processDefinitionJson); return result; } @@ -1176,15 +1199,14 @@ public class ProcessDefinitionService extends BaseDAGService { * * @param defineIdList define id list * @return task node list - * @throws Exception exception */ - public Map getTaskNodeListByDefinitionIdList(String defineIdList) throws Exception { + public Map getTaskNodeListByDefinitionIdList(String defineIdList) { Map result = new HashMap<>(); Map> taskNodeMap = new HashMap<>(); String[] idList = defineIdList.split(","); List idIntList = new ArrayList<>(); - for(String definitionId : idList) { + for (String definitionId : idList) { idIntList.add(Integer.parseInt(definitionId)); } Integer[] idArray = idIntList.toArray(new Integer[idIntList.size()]); @@ -1195,7 +1217,7 @@ public class ProcessDefinitionService extends BaseDAGService { return result; } - for(ProcessDefinition processDefinition : processDefinitionList){ + for (ProcessDefinition processDefinition : processDefinitionList) { String processDefinitionJson = processDefinition.getProcessDefinitionJson(); ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); List taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); @@ -1231,7 +1253,7 @@ public class ProcessDefinitionService extends BaseDAGService { * Encapsulates the TreeView structure * * @param processId process definition id - * @param limit limit + * @param limit limit * @return tree view json data * @throws Exception exception */ @@ -1241,7 +1263,7 @@ public class ProcessDefinitionService extends BaseDAGService { ProcessDefinition processDefinition = processDefineMapper.selectById(processId); if (null == processDefinition) { logger.info("process define not exists"); - putMsg(result,Status.PROCESS_DEFINE_NOT_EXIST, processDefinition); + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinition); return result; } DAG dag = genDagGraph(processDefinition); @@ -1260,8 +1282,8 @@ public class ProcessDefinitionService extends BaseDAGService { */ List processInstanceList = processInstanceMapper.queryByProcessDefineId(processId, limit); - for(ProcessInstance processInstance:processInstanceList){ - processInstance.setDuration(DateUtils.differSec(processInstance.getStartTime(),processInstance.getEndTime())); + for (ProcessInstance processInstance : processInstanceList) { + processInstance.setDuration(DateUtils.differSec(processInstance.getStartTime(), processInstance.getEndTime())); } if (limit > processInstanceList.size()) { @@ -1364,9 +1386,8 @@ public class ProcessDefinitionService extends BaseDAGService { * * @param processDefinition process definition * @return dag graph - * @throws Exception if exception happens */ - private DAG genDagGraph(ProcessDefinition processDefinition) throws Exception { + private DAG genDagGraph(ProcessDefinition processDefinition) { String processDefinitionJson = processDefinition.getProcessDefinitionJson(); @@ -1386,8 +1407,6 @@ public class ProcessDefinitionService extends BaseDAGService { } - - /** * whether the graph has a ring * @@ -1405,7 +1424,7 @@ public class ProcessDefinitionService extends BaseDAGService { // Fill edge relations for (TaskNode taskNodeResponse : taskNodeResponseList) { taskNodeResponse.getPreTasks(); - List preTasks = JSONUtils.toList(taskNodeResponse.getPreTasks(),String.class); + List preTasks = JSONUtils.toList(taskNodeResponse.getPreTasks(), String.class); if (CollectionUtils.isNotEmpty(preTasks)) { for (String preTask : preTasks) { if (!graph.addEdge(preTask, taskNodeResponse.getName())) { @@ -1418,19 +1437,19 @@ public class ProcessDefinitionService extends BaseDAGService { return graph.hasCycle(); } - private String recursionProcessDefinitionName(Integer projectId,String processDefinitionName,int num){ + private String recursionProcessDefinitionName(Integer projectId, String processDefinitionName, int num) { ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(projectId, processDefinitionName); if (processDefinition != null) { - if(num > 1){ - String str = processDefinitionName.substring(0,processDefinitionName.length() - 3); - processDefinitionName = str + "("+num+")"; - }else{ - processDefinitionName = processDefinition.getName() + "("+num+")"; + if (num > 1) { + String str = processDefinitionName.substring(0, processDefinitionName.length() - 3); + processDefinitionName = str + "(" + num + ")"; + } else { + processDefinitionName = processDefinition.getName() + "(" + num + ")"; } - }else{ + } else { return processDefinitionName; } - return recursionProcessDefinitionName(projectId,processDefinitionName,num + 1); + return recursionProcessDefinitionName(projectId, processDefinitionName, num + 1); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index a5a341376e..7e8a2abeae 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -504,9 +504,8 @@ public class ProcessInstanceService extends BaseDAGService { * * @param processInstanceId process instance id * @return variables data - * @throws Exception exception */ - public Map viewVariables( Integer processInstanceId) throws Exception { + public Map viewVariables(Integer processInstanceId) { Map result = new HashMap<>(5); ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId); @@ -537,7 +536,7 @@ public class ProcessInstanceService extends BaseDAGService { List taskNodeList = workflowData.getTasks(); // global param string - String globalParamStr = JSON.toJSONString(globalParams); + String globalParamStr = JSONUtils.toJson(globalParams); globalParamStr = ParameterUtils.convertParameterPlaceholders(globalParamStr, timeParams); globalParams = JSON.parseArray(globalParamStr, Property.class); for (Property property : globalParams) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java index 00d95779ed..f34554bb89 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java @@ -16,9 +16,9 @@ */ package org.apache.dolphinscheduler.api.utils.exportprocess; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.common.enums.TaskType; -import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; import org.springframework.beans.factory.InitializingBean; @@ -33,6 +33,7 @@ import java.util.List; @Service public class DataSourceParam implements ProcessAddTaskParam, InitializingBean { + private static final String PARAMS = "params"; @Autowired private DataSourceMapper dataSourceMapper; @@ -42,14 +43,14 @@ public class DataSourceParam implements ProcessAddTaskParam, InitializingBean { * @return task node json object */ @Override - public JSONObject addExportSpecialParam(JSONObject taskNode) { + public JsonNode addExportSpecialParam(JsonNode taskNode) { // add sqlParameters - JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params")); - DataSource dataSource = dataSourceMapper.selectById((Integer) sqlParameters.get("datasource")); + ObjectNode sqlParameters = (ObjectNode) taskNode.path(PARAMS); + DataSource dataSource = dataSourceMapper.selectById(sqlParameters.get("datasource").asInt()); if (null != dataSource) { sqlParameters.put("datasourceName", dataSource.getName()); } - taskNode.put("params", sqlParameters); + ((ObjectNode)taskNode).set(PARAMS, sqlParameters); return taskNode; } @@ -60,14 +61,14 @@ public class DataSourceParam implements ProcessAddTaskParam, InitializingBean { * @return task node json object */ @Override - public JSONObject addImportSpecialParam(JSONObject taskNode) { - JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params")); - List dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName")); + public JsonNode addImportSpecialParam(JsonNode taskNode) { + ObjectNode sqlParameters = (ObjectNode) taskNode.path(PARAMS); + List dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.path("datasourceName").asText()); if (!dataSources.isEmpty()) { DataSource dataSource = dataSources.get(0); sqlParameters.put("datasource", dataSource.getId()); } - taskNode.put("params", sqlParameters); + ((ObjectNode)taskNode).set(PARAMS, sqlParameters); return taskNode; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java index b42b3b5a02..ce43d0ec01 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java @@ -16,8 +16,9 @@ */ package org.apache.dolphinscheduler.api.utils.exportprocess; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; @@ -34,6 +35,7 @@ import org.springframework.stereotype.Service; @Service public class DependentParam implements ProcessAddTaskParam, InitializingBean { + private static final String DEPENDENCE = "dependence"; @Autowired ProcessDefinitionMapper processDefineMapper; @@ -47,18 +49,18 @@ public class DependentParam implements ProcessAddTaskParam, InitializingBean { * @return task node json object */ @Override - public JSONObject addExportSpecialParam(JSONObject taskNode) { + public JsonNode addExportSpecialParam(JsonNode taskNode) { // add dependent param - JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence")); + ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.path(DEPENDENCE).asText()); if (null != dependentParameters) { - JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList"); + ArrayNode dependTaskList = (ArrayNode) dependentParameters.get("dependTaskList"); for (int j = 0; j < dependTaskList.size(); j++) { - JSONObject dependentTaskModel = dependTaskList.getJSONObject(j); - JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList"); + JsonNode dependentTaskModel = dependTaskList.path(j); + ArrayNode dependItemList = (ArrayNode) dependentTaskModel.get("dependItemList"); for (int k = 0; k < dependItemList.size(); k++) { - JSONObject dependentItem = dependItemList.getJSONObject(k); - int definitionId = dependentItem.getInteger("definitionId"); + ObjectNode dependentItem = (ObjectNode) dependItemList.path(k); + int definitionId = dependentItem.path("definitionId").asInt(); ProcessDefinition definition = processDefineMapper.queryByDefineId(definitionId); if (null != definition) { dependentItem.put("projectName", definition.getProjectName()); @@ -66,7 +68,7 @@ public class DependentParam implements ProcessAddTaskParam, InitializingBean { } } } - taskNode.put("dependence", dependentParameters); + ((ObjectNode)taskNode).set(DEPENDENCE, dependentParameters); } return taskNode; @@ -78,18 +80,18 @@ public class DependentParam implements ProcessAddTaskParam, InitializingBean { * @return */ @Override - public JSONObject addImportSpecialParam(JSONObject taskNode) { - JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence")); + public JsonNode addImportSpecialParam(JsonNode taskNode) { + ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.path(DEPENDENCE).asText()); if(dependentParameters != null){ - JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList"); + ArrayNode dependTaskList = (ArrayNode) dependentParameters.path("dependTaskList"); for (int h = 0; h < dependTaskList.size(); h++) { - JSONObject dependentTaskModel = dependTaskList.getJSONObject(h); - JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList"); + ObjectNode dependentTaskModel = (ObjectNode) dependTaskList.path(h); + ArrayNode dependItemList = (ArrayNode) dependentTaskModel.get("dependItemList"); for (int k = 0; k < dependItemList.size(); k++) { - JSONObject dependentItem = dependItemList.getJSONObject(k); - Project dependentItemProject = projectMapper.queryByName(dependentItem.getString("projectName")); + ObjectNode dependentItem = (ObjectNode) dependItemList.path(k); + Project dependentItemProject = projectMapper.queryByName(dependentItem.path("projectName").asText()); if(dependentItemProject != null){ - ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getId(),dependentItem.getString("definitionName")); + ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getId(),dependentItem.path("definitionName").asText()); if(definition != null){ dependentItem.put("projectId",dependentItemProject.getId()); dependentItem.put("definitionId",definition.getId()); @@ -97,7 +99,7 @@ public class DependentParam implements ProcessAddTaskParam, InitializingBean { } } } - taskNode.put("dependence", dependentParameters); + ((ObjectNode)taskNode).set(DEPENDENCE, dependentParameters); } return taskNode; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java index b30b777ca3..8e408556b0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java @@ -16,7 +16,7 @@ */ package org.apache.dolphinscheduler.api.utils.exportprocess; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.JsonNode; /** * ProcessAddTaskParam @@ -28,12 +28,12 @@ public interface ProcessAddTaskParam { * @param taskNode task node json object * @return task node json object */ - JSONObject addExportSpecialParam(JSONObject taskNode); + JsonNode addExportSpecialParam(JsonNode taskNode); /** * add task special param: sql task dependent task * @param taskNode task node json object * @return task node json object */ - JSONObject addImportSpecialParam(JSONObject taskNode); + JsonNode addImportSpecialParam(JsonNode taskNode); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 8f69b94274..d1a051295b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -16,8 +16,8 @@ */ package org.apache.dolphinscheduler.api.service; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.api.dto.ProcessMeta; import org.apache.dolphinscheduler.api.enums.Status; @@ -621,8 +621,8 @@ public class ProcessDefinitionServiceTest { "\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}"; - JSONObject jsonObject = JSONUtils.parseObject(topProcessJson); - JSONArray jsonArray = (JSONArray) jsonObject.get("tasks"); + ObjectNode jsonObject = JSONUtils.parseObject(topProcessJson); + ArrayNode jsonArray = (ArrayNode) jsonObject.path("tasks"); String originSubJson = jsonArray.toString(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java index b8fcd62333..4566d93af2 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java @@ -16,7 +16,8 @@ */ package org.apache.dolphinscheduler.api.utils.exportprocess; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -48,13 +49,13 @@ public class DataSourceParamTest { "\"preTasks\":[\"dependent\"]}"; - JSONObject taskNode = JSONUtils.parseObject(sqlJson); - if (StringUtils.isNotEmpty(taskNode.getString("type"))) { - String taskType = taskNode.getString("type"); + ObjectNode taskNode = JSONUtils.parseObject(sqlJson); + if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { + String taskType = taskNode.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - JSONObject sql = addTaskParam.addExportSpecialParam(taskNode); + JsonNode sql = addTaskParam.addExportSpecialParam(taskNode); JSONAssert.assertEquals(taskNode.toString(), sql.toString(), false); } @@ -72,13 +73,13 @@ public class DataSourceParamTest { "\"taskInstancePriority\":\"MEDIUM\",\"name\":\"mysql\",\"dependence\":{}," + "\"retryInterval\":\"1\",\"preTasks\":[\"dependent\"],\"id\":\"tasks-8745\"}"; - JSONObject taskNode = JSONUtils.parseObject(sqlJson); - if (StringUtils.isNotEmpty(taskNode.getString("type"))) { - String taskType = taskNode.getString("type"); + ObjectNode taskNode = JSONUtils.parseObject(sqlJson); + if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { + String taskType = taskNode.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - JSONObject sql = addTaskParam.addImportSpecialParam(taskNode); + JsonNode sql = addTaskParam.addImportSpecialParam(taskNode); JSONAssert.assertEquals(taskNode.toString(), sql.toString(), false); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java index d21b7be0e2..be61ab7559 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.api.utils.exportprocess; import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -43,13 +45,13 @@ public class DependentParamTest { "\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\"," + "\"cycle\":\"day\",\"dateValue\":\"today\"}]}]}}"; - JSONObject taskNode = JSONUtils.parseObject(dependentJson); - if (StringUtils.isNotEmpty(taskNode.getString("type"))) { - String taskType = taskNode.getString("type"); + ObjectNode taskNode = JSONUtils.parseObject(dependentJson); + if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { + String taskType = taskNode.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - JSONObject dependent = addTaskParam.addExportSpecialParam(taskNode); + JsonNode dependent = addTaskParam.addExportSpecialParam(taskNode); JSONAssert.assertEquals(taskNode.toString(), dependent.toString(), false); } @@ -57,13 +59,13 @@ public class DependentParamTest { String dependentEmpty = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\"," + "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"}"; - JSONObject taskEmpty = JSONUtils.parseObject(dependentEmpty); - if (StringUtils.isNotEmpty(taskEmpty.getString("type"))) { - String taskType = taskEmpty.getString("type"); + ObjectNode taskEmpty = JSONUtils.parseObject(dependentEmpty); + if (StringUtils.isNotEmpty(taskEmpty.path("type").asText())) { + String taskType = taskEmpty.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - JSONObject dependent = addTaskParam.addImportSpecialParam(taskEmpty); + JsonNode dependent = addTaskParam.addImportSpecialParam(taskEmpty); JSONAssert.assertEquals(taskEmpty.toString(), dependent.toString(), false); } @@ -81,13 +83,13 @@ public class DependentParamTest { "\"projectId\":1,\"cycle\":\"day\",\"definitionId\":7}],\"relation\":\"AND\"}]," + "\"relation\":\"AND\"},\"retryInterval\":\"1\",\"preTasks\":[],\"id\":\"tasks-55485\"}"; - JSONObject taskNode = JSONUtils.parseObject(dependentJson); - if (StringUtils.isNotEmpty(taskNode.getString("type"))) { - String taskType = taskNode.getString("type"); + ObjectNode taskNode = JSONUtils.parseObject(dependentJson); + if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { + String taskType = taskNode.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - JSONObject dependent = addTaskParam.addImportSpecialParam(taskNode); + JsonNode dependent = addTaskParam.addImportSpecialParam(taskNode); JSONAssert.assertEquals(taskNode.toString(), dependent.toString(), false); } @@ -97,13 +99,13 @@ public class DependentParamTest { "\"strategy\":\"\"},\"maxRetryTimes\":\"0\",\"taskInstancePriority\":\"MEDIUM\"" + ",\"name\":\"dependent\",\"retryInterval\":\"1\",\"preTasks\":[],\"id\":\"tasks-55485\"}"; - JSONObject taskNodeEmpty = JSONUtils.parseObject(dependentEmpty); - if (StringUtils.isNotEmpty(taskNodeEmpty.getString("type"))) { - String taskType = taskNodeEmpty.getString("type"); + JsonNode taskNodeEmpty = JSONUtils.parseObject(dependentEmpty); + if (StringUtils.isNotEmpty(taskNodeEmpty.path("type").asText())) { + String taskType = taskNodeEmpty.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - JSONObject dependent = addTaskParam.addImportSpecialParam(taskNode); + JsonNode dependent = addTaskParam.addImportSpecialParam(taskNode); JSONAssert.assertEquals(taskNodeEmpty.toString(), dependent.toString(), false); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java index f0aed91a0d..4f701490e5 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java @@ -16,13 +16,12 @@ */ package org.apache.dolphinscheduler.common.utils; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.alibaba.fastjson.TypeReference; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.*; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,230 +34,233 @@ import java.util.*; */ public class JSONUtils { - private static final Logger logger = LoggerFactory.getLogger(JSONUtils.class); - - /** - * can use static singleton, inject: just make sure to reuse! - */ - private static final ObjectMapper objectMapper = new ObjectMapper(); - - private JSONUtils() { - //Feature that determines whether encountering of unknown properties, false means not analyzer unknown properties - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).setTimeZone(TimeZone.getDefault()); - } - - /** - * json representation of object - * @param object object - * @return object to json string - */ - public static String toJson(Object object) { - try{ - return JSON.toJSONString(object,false); - } catch (Exception e) { - logger.error("object to json exception!",e); + private static final Logger logger = LoggerFactory.getLogger(JSONUtils.class); + + /** + * can use static singleton, inject: just make sure to reuse! + */ + private static final ObjectMapper objectMapper = new ObjectMapper(); + + private JSONUtils() { } - return null; - } - - - /** - * - * This method deserializes the specified Json into an object of the specified class. It is not - * suitable to use if the specified class is a generic type since it will not have the generic - * type information because of the Type Erasure feature of Java. Therefore, this method should not - * be used if the desired type is a generic type. Note that this method works fine if the any of - * the fields of the specified object are generics, just the object itself should not be a - * generic type. - * - * @param json the string from which the object is to be deserialized - * @param clazz the class of T - * @param T - * @return an object of type T from the string - * classOfT - */ - public static T parseObject(String json, Class clazz) { - if (StringUtils.isEmpty(json)) { - return null; + static { + //Feature that determines whether encountering of unknown properties, false means not analyzer unknown properties + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).setTimeZone(TimeZone.getDefault()); + objectMapper.configure(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true).setTimeZone(TimeZone.getDefault()); } - try { - return JSON.parseObject(json, clazz); - } catch (Exception e) { - logger.error("parse object exception!",e); + /** + * json representation of object + * + * @param object object + * @return object to json string + */ + public static String toJson(Object object) { + try { + return objectMapper.writeValueAsString(object); + } catch (Exception e) { + logger.error("object to json exception!", e); + } + + return null; } - return null; - } - - - /** - * json to list - * - * @param json json string - * @param clazz class - * @param T - * @return list - */ - public static List toList(String json, Class clazz) { - if (StringUtils.isEmpty(json)) { - return new ArrayList<>(); + + + /** + * This method deserializes the specified Json into an object of the specified class. It is not + * suitable to use if the specified class is a generic type since it will not have the generic + * type information because of the Type Erasure feature of Java. Therefore, this method should not + * be used if the desired type is a generic type. Note that this method works fine if the any of + * the fields of the specified object are generics, just the object itself should not be a + * generic type. + * + * @param json the string from which the object is to be deserialized + * @param clazz the class of T + * @param T + * @return an object of type T from the string + * classOfT + */ + public static T parseObject(String json, Class clazz) { + if (StringUtils.isEmpty(json)) { + return null; + } + + try { + return objectMapper.readValue(json, clazz); + } catch (Exception e) { + logger.error("parse object exception!", e); + } + return null; } - try { - return JSONArray.parseArray(json, clazz); - } catch (Exception e) { - logger.error("JSONArray.parseArray exception!",e); + + + /** + * json to list + * + * @param json json string + * @param clazz class + * @param T + * @return list + */ + public static List toList(String json, Class clazz) { + if (StringUtils.isEmpty(json)) { + return new ArrayList<>(); + } + try { + return objectMapper.readValue(json, new TypeReference>() { + }); + } catch (Exception e) { + logger.error("JSONArray.parseArray exception!", e); + } + + return new ArrayList<>(); } - return new ArrayList<>(); - } + /** + * check json object valid + * + * @param json json + * @return true if valid + */ + public static boolean checkJsonValid(String json) { + if (StringUtils.isEmpty(json)) { + return false; + } - /** - * check json object valid - * - * @param json json - * @return true if valid - */ - public static boolean checkJsonValid(String json) { + try { + objectMapper.readTree(json); + return true; + } catch (IOException e) { + logger.error("check json object valid exception!", e); + } - if (StringUtils.isEmpty(json)) { - return false; + return false; } - try { - objectMapper.readTree(json); - return true; - } catch (IOException e) { - logger.error("check json object valid exception!",e); - } - return false; - } - - - /** - * Method for finding a JSON Object field with specified name in this - * node or its child nodes, and returning value it has. - * If no matching field is found in this node or its descendants, returns null. - * - * @param jsonNode json node - * @param fieldName Name of field to look for - * - * @return Value of first matching node found, if any; null if none - */ - public static String findValue(JsonNode jsonNode, String fieldName) { - JsonNode node = jsonNode.findValue(fieldName); - - if (node == null) { - return null; - } + /** + * Method for finding a JSON Object field with specified name in this + * node or its child nodes, and returning value it has. + * If no matching field is found in this node or its descendants, returns null. + * + * @param jsonNode json node + * @param fieldName Name of field to look for + * @return Value of first matching node found, if any; null if none + */ + public static String findValue(JsonNode jsonNode, String fieldName) { + JsonNode node = jsonNode.findValue(fieldName); - return node.toString(); - } - - - /** - * json to map - * - * {@link #toMap(String, Class, Class)} - * - * @param json json - * @return json to map - */ - public static Map toMap(String json) { - if (StringUtils.isEmpty(json)) { - return null; - } + if (node == null) { + return null; + } - try { - return JSON.parseObject(json, new TypeReference>(){}); - } catch (Exception e) { - logger.error("json to map exception!",e); + return node.toString(); } - return null; - } - - /** - * - * json to map - * - * @param json json - * @param classK classK - * @param classV classV - * @param K - * @param V - * @return to map - */ - public static Map toMap(String json, Class classK, Class classV) { - if (StringUtils.isEmpty(json)) { - return null; + + /** + * json to map + *

+ * {@link #toMap(String, Class, Class)} + * + * @param json json + * @return json to map + */ + public static Map toMap(String json) { + if (StringUtils.isEmpty(json)) { + return null; + } + + try { + return objectMapper.readValue(json, new TypeReference>() {}); + } catch (Exception e) { + logger.error("json to map exception!", e); + } + + return null; } - try { - return JSON.parseObject(json, new TypeReference>() {}); - } catch (Exception e) { - logger.error("json to map exception!",e); + /** + * json to map + * + * @param json json + * @param classK classK + * @param classV classV + * @param K + * @param V + * @return to map + */ + public static Map toMap(String json, Class classK, Class classV) { + if (StringUtils.isEmpty(json)) { + return null; + } + + try { + return objectMapper.readValue(json, new TypeReference>() { + }); + } catch (Exception e) { + logger.error("json to map exception!", e); + } + + return null; } - return null; - } - - /** - * object to json string - * @param object object - * @return json string - */ - public static String toJsonString(Object object) { - try{ - return JSON.toJSONString(object,false); - } catch (Exception e) { - throw new RuntimeException("Object json deserialization exception.", e); + /** + * object to json string + * + * @param object object + * @return json string + */ + public static String toJsonString(Object object) { + try { + return objectMapper.writeValueAsString(object); + } catch (Exception e) { + throw new RuntimeException("Object json deserialization exception.", e); + } } - } - public static JSONObject parseObject(String text) { - try{ - return JSON.parseObject(text); - } catch (Exception e) { - throw new RuntimeException("String json deserialization exception.", e); + public static ObjectNode parseObject(String text) { + try { + return (ObjectNode) objectMapper.readTree(text); + } catch (Exception e) { + throw new RuntimeException("String json deserialization exception.", e); + } } - } - public static JSONArray parseArray(String text) { - try{ - return JSON.parseArray(text); - } catch (Exception e) { - throw new RuntimeException("Json deserialization exception.", e); + public static ArrayNode parseArray(String text) { + try { + return (ArrayNode) objectMapper.readTree(text); + } catch (Exception e) { + throw new RuntimeException("Json deserialization exception.", e); + } } - } + /** + * json serializer + */ + public static class JsonDataSerializer extends JsonSerializer { - /** - * json serializer - */ - public static class JsonDataSerializer extends JsonSerializer { + @Override + public void serialize(String value, JsonGenerator gen, SerializerProvider provider) throws IOException { + gen.writeRawValue(value); + } - @Override - public void serialize(String value, JsonGenerator gen, SerializerProvider provider) throws IOException { - gen.writeRawValue(value); } - } + /** + * json data deserializer + */ + public static class JsonDataDeserializer extends JsonDeserializer { - /** - * json data deserializer - */ - public static class JsonDataDeserializer extends JsonDeserializer { + @Override + public String deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + JsonNode node = p.getCodec().readTree(p); + return node.toString(); + } - @Override - public String deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { - JsonNode node = p.getCodec().readTree(p); - return node.toString(); } - - } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java index 270e0c4696..f3efef2eca 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java @@ -196,7 +196,7 @@ public class ParameterUtils { property.setValue(val); } } - return JSON.toJSONString(globalParamList); + return JSONUtils.toJson(globalParamList); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StreamUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StreamUtils.java new file mode 100644 index 0000000000..f30638cda2 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StreamUtils.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils; + +import java.util.Iterator; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public class StreamUtils { + + private StreamUtils() { } + + public static Stream asStream(Iterator sourceIterator) { + return asStream(sourceIterator, false); + } + + public static Stream asStream(Iterator sourceIterator, boolean parallel) { + Iterable iterable = () -> sourceIterator; + return StreamSupport.stream(iterable.spliterator(), parallel); + } + +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java index 8ce60349ed..1756078fe1 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.common.utils; import com.alibaba.fastjson.JSON; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.process.Property; @@ -101,9 +103,6 @@ public class JSONUtilsTest { @Test public void testParseObject() { - Assert.assertEquals("{\"foo\":\"bar\"}", JSONUtils.parseObject( - "{\n" + "\"foo\": \"bar\",\n" + "}", String.class)); - Assert.assertNull(JSONUtils.parseObject("", null)); Assert.assertNull(JSONUtils.parseObject("foo", String.class)); } @@ -134,15 +133,19 @@ public class JSONUtilsTest { map.put("foo","bar"); Assert.assertTrue(map.equals(JSONUtils.toMap( - "{\n" + "\"foo\": \"bar\",\n" + "}"))); + "{\n" + "\"foo\": \"bar\"\n" + "}"))); Assert.assertFalse(map.equals(JSONUtils.toMap( - "{\n" + "\"bar\": \"foo\",\n" + "}"))); + "{\n" + "\"bar\": \"foo\"\n" + "}"))); Assert.assertNull(JSONUtils.toMap("3")); Assert.assertNull(JSONUtils.toMap(null)); Assert.assertNull(JSONUtils.toMap("3", null, null)); Assert.assertNull(JSONUtils.toMap(null, null, null)); + + String str = "{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"}"; + Map m = JSONUtils.toMap(str); + Assert.assertNotNull(m); } @Test @@ -155,4 +158,27 @@ public class JSONUtilsTest { Assert.assertEquals(String.valueOf((Object) null), JSONUtils.toJsonString(null)); } + + @Test + public void parseObject() { + String str = "{\"color\":\"yellow\",\"type\":\"renault\"}"; + ObjectNode node = JSONUtils.parseObject(str); + + Assert.assertEquals("yellow", node.path("color").asText()); + + node.put("price", 100); + Assert.assertEquals(100, node.path("price").asInt()); + + node.put("color", "red"); + Assert.assertEquals("red", node.path("color").asText()); + } + + @Test + public void parseArray() { + String str = "[{\"color\":\"yellow\",\"type\":\"renault\"}]"; + ArrayNode node = JSONUtils.parseArray(str); + + Assert.assertEquals("yellow", node.path(0).path("color").asText()); + } + } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java index abdc15cc6e..3d40e7a4cf 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java @@ -91,13 +91,13 @@ public class ParameterUtilsTest { globalParamList.add(property); String result2 = ParameterUtils.curingGlobalParams(null,globalParamList,CommandType.START_CURRENT_TASK_PROCESS,scheduleTime); - Assert.assertEquals(result2, JSON.toJSONString(globalParamList)); + Assert.assertEquals(result2, JSONUtils.toJson(globalParamList)); String result3 = ParameterUtils.curingGlobalParams(globalParamMap,globalParamList,CommandType.START_CURRENT_TASK_PROCESS,null); - Assert.assertEquals(result3, JSON.toJSONString(globalParamList)); + Assert.assertEquals(result3, JSONUtils.toJson(globalParamList)); String result4 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime); - Assert.assertEquals(result4, JSON.toJSONString(globalParamList)); + Assert.assertEquals(result4, JSONUtils.toJson(globalParamList)); //test var $ startsWith globalParamMap.put("bizDate","${system.biz.date}"); @@ -113,7 +113,7 @@ public class ParameterUtilsTest { globalParamList.add(property4); String result5 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime); - Assert.assertEquals(result5,JSONUtils.toJsonString(globalParamList)); + Assert.assertEquals(result5, JSONUtils.toJson(globalParamList)); } /** diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StreamUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StreamUtilsTest.java new file mode 100644 index 0000000000..5a04969dee --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StreamUtilsTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.Assert.*; + +public class StreamUtilsTest { + + @Test + public void asStream() { + List list = Arrays.asList("a", "b", "c"); + List ret = StreamUtils.asStream(list.iterator()) + .filter(item -> item.equals("a")) + .collect(Collectors.toList()); + Assert.assertEquals("a", ret.get(0)); + } + +} \ No newline at end of file