diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index f7ddf06672..5abd71a915 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -115,7 +115,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro private static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionServiceImpl.class); - private static final String PROCESSDEFINITIONID = "processDefinitionId"; + private static final String PROCESSDEFINITIONCODE = "processDefinitionCode"; private static final String RELEASESTATE = "releaseState"; @@ -153,7 +153,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro @Autowired TaskDefinitionLogMapper taskDefinitionLogMapper; - + private SchedulerService schedulerService; /** @@ -277,6 +277,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } List resourceList = processDefinitionMapper.queryAllDefinitionList(project.getId()); + + resourceList.stream().forEach(processDefinition -> { + ProcessData processData = processService.genProcessData(processDefinition); + processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData)); + }); + result.put(Constants.DATA_LIST, resourceList); putMsg(result, Status.SUCCESS); @@ -310,6 +316,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro IPage processDefinitionIPage = processDefinitionMapper.queryDefineListPaging( page, searchVal, userId, project.getId(), isAdmin(loginUser)); + List records = processDefinitionIPage.getRecords(); + records.stream().forEach(processDefinition -> { + ProcessData processData = processService.genProcessData(processDefinition); + processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData)); + }); + processDefinitionIPage.setRecords(records); + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); pageInfo.setTotalCount((int) processDefinitionIPage.getTotal()); pageInfo.setLists(processDefinitionIPage.getRecords()); @@ -340,6 +353,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } ProcessDefinition processDefinition = processDefinitionMapper.selectById(processId); + + ProcessData processData = processService.genProcessData(processDefinition); + processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData)); + if (processDefinition == null) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); } else { @@ -362,6 +379,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getId(), processDefinitionName); + ProcessData processData = processService.genProcessData(processDefinition); + processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData)); + if (processDefinition == null) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName); } else { @@ -534,8 +554,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } } - // TODO: replace id to code - // ProcessDefinition processDefinition = processDefineMapper.deleteByCode(processDefinitionCode); int delete = processDefinitionMapper.deleteById(processDefinitionId); processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()); if (delete > 0) { @@ -657,9 +675,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro //get workflow info int processDefinitionId = Integer.parseInt(strProcessDefinitionId); ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(processDefinitionId); - if (null != processDefinition) { - processDefinitionList.add(exportProcessMetaData(processDefinitionId, processDefinition)); - } + String processDefinitionJson = JSONUtils.toJsonString(processService.genProcessData(processDefinition)); + processDefinition.setProcessDefinitionJson(processDefinitionJson); + processDefinitionList.add(exportProcessMetaData(processDefinitionId, processDefinition)); } return processDefinitionList; @@ -718,15 +736,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * @return export process metadata string */ public ProcessMeta exportProcessMetaData(Integer processDefinitionId, ProcessDefinition processDefinition) { + String processDefinitionJson = processDefinition.getProcessDefinitionJson(); //correct task param which has data source or dependent param - String correctProcessDefinitionJson = addExportTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson()); + String correctProcessDefinitionJson = addExportTaskNodeSpecialParam(processDefinitionJson); processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson); //export process metadata ProcessMeta exportProcessMeta = new ProcessMeta(); exportProcessMeta.setProjectName(processDefinition.getProjectName()); exportProcessMeta.setProcessDefinitionName(processDefinition.getName()); - exportProcessMeta.setProcessDefinitionJson(processDefinition.getProcessDefinitionJson()); + exportProcessMeta.setProcessDefinitionJson(processDefinitionJson); exportProcessMeta.setProcessDefinitionDescription(processDefinition.getDescription()); exportProcessMeta.setProcessDefinitionLocations(processDefinition.getLocations()); exportProcessMeta.setProcessDefinitionConnects(processDefinition.getConnects()); @@ -963,15 +982,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } } - //recursive sub-process parameter correction map key for old process id value for new process id - Map subProcessIdMap = new HashMap<>(); + //recursive sub-process parameter correction map key for old process code value for new process code + Map subProcessCodeMap = new HashMap<>(); List subProcessList = StreamUtils.asStream(jsonArray.elements()) .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).path("type").asText())) .collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(subProcessList)) { - importSubProcess(loginUser, targetProject, jsonArray, subProcessIdMap); + importSubProcess(loginUser, targetProject, jsonArray, subProcessCodeMap); } jsonObject.set(TASKS, jsonArray); @@ -1038,9 +1057,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * @param loginUser login user * @param targetProject target project * @param jsonArray process task array - * @param subProcessIdMap correct sub process id map + * @param subProcessCodeMap correct sub process id map */ - private void importSubProcess(User loginUser, Project targetProject, ArrayNode jsonArray, Map subProcessIdMap) { + private void importSubProcess(User loginUser, Project targetProject, ArrayNode jsonArray, Map subProcessCodeMap) { for (int i = 0; i < jsonArray.size(); i++) { ObjectNode taskNode = (ObjectNode) jsonArray.path(i); String taskType = taskNode.path("type").asText(); @@ -1050,68 +1069,59 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } //get sub process info ObjectNode subParams = (ObjectNode) taskNode.path("params"); - Integer subProcessId = subParams.path(PROCESSDEFINITIONID).asInt(); - ProcessDefinition subProcess = processDefinitionMapper.queryByDefineId(subProcessId); + Long subProcessCode = subParams.path(PROCESSDEFINITIONCODE).asLong(); + ProcessDefinition subProcess = processDefinitionMapper.queryByCode(subProcessCode); //check is sub process exist in db if (null == subProcess) { continue; } - String subProcessJson = subProcess.getProcessDefinitionJson(); + + String subProcessJson = JSONUtils.toJsonString(processService.genProcessData(subProcess)); //check current project has sub process ProcessDefinition currentProjectSubProcess = processDefinitionMapper.queryByDefineName(targetProject.getId(), subProcess.getName()); if (null == currentProjectSubProcess) { - ArrayNode subJsonArray = (ArrayNode) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get(TASKS); + ArrayNode subJsonArray = (ArrayNode) JSONUtils.parseObject(subProcessJson).get(TASKS); List subProcessList = StreamUtils.asStream(subJsonArray.elements()) .filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).path("type").asText())) .collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(subProcessList)) { - importSubProcess(loginUser, targetProject, subJsonArray, subProcessIdMap); + importSubProcess(loginUser, targetProject, subJsonArray, subProcessCodeMap); //sub process processId correct - if (!subProcessIdMap.isEmpty()) { + if (!subProcessCodeMap.isEmpty()) { - for (Map.Entry entry : subProcessIdMap.entrySet()) { - String oldSubProcessId = "\"processDefinitionId\":" + entry.getKey(); - String newSubProcessId = "\"processDefinitionId\":" + entry.getValue(); - subProcessJson = subProcessJson.replaceAll(oldSubProcessId, newSubProcessId); + for (Map.Entry entry : subProcessCodeMap.entrySet()) { + String oldSubProcessCode = "\"processDefinitionCode\":" + entry.getKey(); + String newSubProcessCode = "\"processDefinitionCode\":" + entry.getValue(); + subProcessJson = subProcessJson.replaceAll(oldSubProcessCode, newSubProcessCode); } - subProcessIdMap.clear(); + subProcessCodeMap.clear(); } } - //if sub-process recursion - Date now = new Date(); - //create sub process in target project - ProcessDefinition processDefine = new ProcessDefinition(); - processDefine.setName(subProcess.getName()); - processDefine.setVersion(subProcess.getVersion()); - processDefine.setReleaseState(subProcess.getReleaseState()); - processDefine.setProjectId(targetProject.getId()); - processDefine.setUserId(loginUser.getId()); - processDefine.setProcessDefinitionJson(subProcessJson); - processDefine.setDescription(subProcess.getDescription()); - processDefine.setLocations(subProcess.getLocations()); - processDefine.setConnects(subProcess.getConnects()); - processDefine.setTimeout(subProcess.getTimeout()); - processDefine.setTenantId(subProcess.getTenantId()); - processDefine.setGlobalParams(subProcess.getGlobalParams()); - processDefine.setCreateTime(now); - processDefine.setUpdateTime(now); - processDefine.setFlag(subProcess.getFlag()); - processDefine.setWarningGroupId(subProcess.getWarningGroupId()); - processDefinitionMapper.insert(processDefine); - - logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), processDefine.getName()); + try { + createProcessDefinition(loginUser + , targetProject.getName(), + subProcess.getName(), + subProcessJson, + subProcess.getDescription(), + subProcess.getLocations(), + subProcess.getConnects()); + logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), subProcess.getName()); + + } catch (Exception e) { + logger.error("import process meta json data: {}", e.getMessage(), e); + } //modify task node - ProcessDefinition newSubProcessDefine = processDefinitionMapper.queryByDefineName(processDefine.getProjectId(), processDefine.getName()); + ProcessDefinition newSubProcessDefine = processDefinitionMapper.queryByDefineName(subProcess.getProjectId(), subProcess.getName()); if (null != newSubProcessDefine) { - subProcessIdMap.put(subProcessId, newSubProcessDefine.getId()); - subParams.put(PROCESSDEFINITIONID, newSubProcessDefine.getId()); + subProcessCodeMap.put(subProcessCode, newSubProcessDefine.getCode()); + subParams.put(PROCESSDEFINITIONCODE, newSubProcessDefine.getId()); taskNode.set("params", subParams); } } @@ -1187,15 +1197,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineId); return result; } - - String processDefinitionJson = processDefinition.getProcessDefinitionJson(); - - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + ProcessData processData = processService.genProcessData(processDefinition); //process data check if (null == processData) { logger.error("process data is null"); - putMsg(result, Status.DATA_IS_NOT_VALID, processDefinitionJson); + putMsg(result, Status.DATA_IS_NOT_VALID, JSONUtils.toJsonString(processData)); return result; } @@ -1233,8 +1240,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } for (ProcessDefinition processDefinition : processDefinitionList) { - String processDefinitionJson = processDefinition.getProcessDefinitionJson(); - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + ProcessData processData = processService.genProcessData(processDefinition); List taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); taskNodeMap.put(processDefinition.getId(), taskNodeList); } @@ -1258,6 +1264,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro HashMap result = new HashMap<>(); List resourceList = processDefinitionMapper.queryAllDefinitionList(projectId); + resourceList.stream().forEach(processDefinition -> { + ProcessData processData = processService.genProcessData(processDefinition); + processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData)); + }); result.put(Constants.DATA_LIST, resourceList); putMsg(result, Status.SUCCESS); @@ -1449,16 +1459,18 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); return result; } else { - ProcessData processData = JSONUtils.parseObject(processDefinition.getProcessDefinitionJson(), ProcessData.class); + ProcessData processData = processService.genProcessData(processDefinition); List taskNodeList = processData.getTasks(); taskNodeList.stream().forEach(taskNode -> { taskNode.setCode(0L); }); + processData.setTasks(taskNodeList); + String processDefinitionJson = JSONUtils.toJsonString(processData); return createProcessDefinition( loginUser, targetProject.getName(), processDefinition.getName() + "_copy_" + DateUtils.getCurrentTimeStamp(), - processDefinition.getProcessDefinitionJson(), + processDefinitionJson, processDefinition.getDescription(), processDefinition.getLocations(), processDefinition.getConnects()); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 8e321339ae..9a52acb245 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -17,8 +17,9 @@ package org.apache.dolphinscheduler.api.service.impl; +import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID; + import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.service.BaseService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.utils.CheckUtils; @@ -102,7 +103,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class); checkTaskNode(result, taskNode, taskDefinitionJson); - if (result.get(Constants.STATUS) != Status.SUCCESS) { + if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID + || result.get(Constants.STATUS) == Status.PROCESS_NODE_S_PARAMETER_INVALID) { return result; } TaskDefinition taskDefinition = new TaskDefinition(); @@ -218,7 +220,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class); checkTaskNode(result, taskNode, taskDefinitionJson); - if (result.get(Constants.STATUS) != Status.SUCCESS) { + if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID + || result.get(Constants.STATUS) == Status.PROCESS_NODE_S_PARAMETER_INVALID) { return result; } int update = processService.updateTaskDefinition(loginUser, project.getCode(), taskNode, taskDefinition); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index abc0037417..3eb69230fd 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -2415,6 +2415,20 @@ public class ProcessService { return processTaskRelations; } + /** + * generate ProcessData + */ + public ProcessData genProcessData(ProcessDefinition processDefinition) { + List taskNodes = genTaskNodeList(processDefinition.getCode() + , processDefinition.getVersion()); + ProcessData processData = new ProcessData(); + processData.setTasks(taskNodes); + processData.setGlobalParams(JSONUtils.toList(processDefinition.getGlobalParams(), Property.class)); + processData.setTenantId(processDefinition.getTenantId()); + processData.setTimeout(processDefinition.getTimeout()); + return processData; + } + public List genTaskNodeList(Long processCode, int processVersion) { List processTaskRelations = this.getProcessTaskRelationList(processCode, processVersion); Set taskDefinitionSet = new HashSet<>();