Browse Source

[Feature][JsonSplit]modify import/export processdefinition, add genProcessData (#4864)

* Modify Project and ProjectUser Mapper

* Modify Project and ProjectUser Mapper

* project_code is bigint(20)

* modify ERROR name

* modify saveProcessDefine, remove the duplicate code with createTaskAndRelation

* modify import/export processdefinition, add genProcessData
pull/3/MERGE
Simon 4 years ago committed by GitHub
parent
commit
2fdeba0988
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 132
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  3. 14
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

132
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -115,7 +115,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
private static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionServiceImpl.class); private static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionServiceImpl.class);
private static final String PROCESSDEFINITIONID = "processDefinitionId"; private static final String PROCESSDEFINITIONCODE = "processDefinitionCode";
private static final String RELEASESTATE = "releaseState"; private static final String RELEASESTATE = "releaseState";
@ -153,7 +153,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired @Autowired
TaskDefinitionLogMapper taskDefinitionLogMapper; TaskDefinitionLogMapper taskDefinitionLogMapper;
private SchedulerService schedulerService; private SchedulerService schedulerService;
/** /**
@ -277,6 +277,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
List<ProcessDefinition> resourceList = processDefinitionMapper.queryAllDefinitionList(project.getId()); List<ProcessDefinition> resourceList = processDefinitionMapper.queryAllDefinitionList(project.getId());
resourceList.stream().forEach(processDefinition -> {
ProcessData processData = processService.genProcessData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
});
result.put(Constants.DATA_LIST, resourceList); result.put(Constants.DATA_LIST, resourceList);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
@ -310,6 +316,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
IPage<ProcessDefinition> processDefinitionIPage = processDefinitionMapper.queryDefineListPaging( IPage<ProcessDefinition> processDefinitionIPage = processDefinitionMapper.queryDefineListPaging(
page, searchVal, userId, project.getId(), isAdmin(loginUser)); page, searchVal, userId, project.getId(), isAdmin(loginUser));
List<ProcessDefinition> records = processDefinitionIPage.getRecords();
records.stream().forEach(processDefinition -> {
ProcessData processData = processService.genProcessData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
});
processDefinitionIPage.setRecords(records);
PageInfo<ProcessDefinition> pageInfo = new PageInfo<>(pageNo, pageSize); PageInfo<ProcessDefinition> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotalCount((int) processDefinitionIPage.getTotal()); pageInfo.setTotalCount((int) processDefinitionIPage.getTotal());
pageInfo.setLists(processDefinitionIPage.getRecords()); pageInfo.setLists(processDefinitionIPage.getRecords());
@ -340,6 +353,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
ProcessDefinition processDefinition = processDefinitionMapper.selectById(processId); ProcessDefinition processDefinition = processDefinitionMapper.selectById(processId);
ProcessData processData = processService.genProcessData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
if (processDefinition == null) { if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
} else { } else {
@ -362,6 +379,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getId(), processDefinitionName); ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getId(), processDefinitionName);
ProcessData processData = processService.genProcessData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
if (processDefinition == null) { if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName);
} else { } else {
@ -534,8 +554,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
} }
// TODO: replace id to code
// ProcessDefinition processDefinition = processDefineMapper.deleteByCode(processDefinitionCode);
int delete = processDefinitionMapper.deleteById(processDefinitionId); int delete = processDefinitionMapper.deleteById(processDefinitionId);
processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()); processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode());
if (delete > 0) { if (delete > 0) {
@ -657,9 +675,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
//get workflow info //get workflow info
int processDefinitionId = Integer.parseInt(strProcessDefinitionId); int processDefinitionId = Integer.parseInt(strProcessDefinitionId);
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(processDefinitionId); ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(processDefinitionId);
if (null != processDefinition) { String processDefinitionJson = JSONUtils.toJsonString(processService.genProcessData(processDefinition));
processDefinitionList.add(exportProcessMetaData(processDefinitionId, processDefinition)); processDefinition.setProcessDefinitionJson(processDefinitionJson);
} processDefinitionList.add(exportProcessMetaData(processDefinitionId, processDefinition));
} }
return processDefinitionList; return processDefinitionList;
@ -718,15 +736,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @return export process metadata string * @return export process metadata string
*/ */
public ProcessMeta exportProcessMetaData(Integer processDefinitionId, ProcessDefinition processDefinition) { public ProcessMeta exportProcessMetaData(Integer processDefinitionId, ProcessDefinition processDefinition) {
String processDefinitionJson = processDefinition.getProcessDefinitionJson();
//correct task param which has data source or dependent param //correct task param which has data source or dependent param
String correctProcessDefinitionJson = addExportTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson()); String correctProcessDefinitionJson = addExportTaskNodeSpecialParam(processDefinitionJson);
processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson); processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson);
//export process metadata //export process metadata
ProcessMeta exportProcessMeta = new ProcessMeta(); ProcessMeta exportProcessMeta = new ProcessMeta();
exportProcessMeta.setProjectName(processDefinition.getProjectName()); exportProcessMeta.setProjectName(processDefinition.getProjectName());
exportProcessMeta.setProcessDefinitionName(processDefinition.getName()); exportProcessMeta.setProcessDefinitionName(processDefinition.getName());
exportProcessMeta.setProcessDefinitionJson(processDefinition.getProcessDefinitionJson()); exportProcessMeta.setProcessDefinitionJson(processDefinitionJson);
exportProcessMeta.setProcessDefinitionDescription(processDefinition.getDescription()); exportProcessMeta.setProcessDefinitionDescription(processDefinition.getDescription());
exportProcessMeta.setProcessDefinitionLocations(processDefinition.getLocations()); exportProcessMeta.setProcessDefinitionLocations(processDefinition.getLocations());
exportProcessMeta.setProcessDefinitionConnects(processDefinition.getConnects()); exportProcessMeta.setProcessDefinitionConnects(processDefinition.getConnects());
@ -963,15 +982,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
} }
//recursive sub-process parameter correction map key for old process id value for new process id //recursive sub-process parameter correction map key for old process code value for new process code
Map<Integer, Integer> subProcessIdMap = new HashMap<>(); Map<Long, Long> subProcessCodeMap = new HashMap<>();
List<Object> subProcessList = StreamUtils.asStream(jsonArray.elements()) List<Object> subProcessList = StreamUtils.asStream(jsonArray.elements())
.filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).path("type").asText())) .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).path("type").asText()))
.collect(Collectors.toList()); .collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(subProcessList)) { if (CollectionUtils.isNotEmpty(subProcessList)) {
importSubProcess(loginUser, targetProject, jsonArray, subProcessIdMap); importSubProcess(loginUser, targetProject, jsonArray, subProcessCodeMap);
} }
jsonObject.set(TASKS, jsonArray); jsonObject.set(TASKS, jsonArray);
@ -1038,9 +1057,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @param loginUser login user * @param loginUser login user
* @param targetProject target project * @param targetProject target project
* @param jsonArray process task array * @param jsonArray process task array
* @param subProcessIdMap correct sub process id map * @param subProcessCodeMap correct sub process id map
*/ */
private void importSubProcess(User loginUser, Project targetProject, ArrayNode jsonArray, Map<Integer, Integer> subProcessIdMap) { private void importSubProcess(User loginUser, Project targetProject, ArrayNode jsonArray, Map<Long, Long> subProcessCodeMap) {
for (int i = 0; i < jsonArray.size(); i++) { for (int i = 0; i < jsonArray.size(); i++) {
ObjectNode taskNode = (ObjectNode) jsonArray.path(i); ObjectNode taskNode = (ObjectNode) jsonArray.path(i);
String taskType = taskNode.path("type").asText(); String taskType = taskNode.path("type").asText();
@ -1050,68 +1069,59 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
//get sub process info //get sub process info
ObjectNode subParams = (ObjectNode) taskNode.path("params"); ObjectNode subParams = (ObjectNode) taskNode.path("params");
Integer subProcessId = subParams.path(PROCESSDEFINITIONID).asInt(); Long subProcessCode = subParams.path(PROCESSDEFINITIONCODE).asLong();
ProcessDefinition subProcess = processDefinitionMapper.queryByDefineId(subProcessId); ProcessDefinition subProcess = processDefinitionMapper.queryByCode(subProcessCode);
//check is sub process exist in db //check is sub process exist in db
if (null == subProcess) { if (null == subProcess) {
continue; continue;
} }
String subProcessJson = subProcess.getProcessDefinitionJson();
String subProcessJson = JSONUtils.toJsonString(processService.genProcessData(subProcess));
//check current project has sub process //check current project has sub process
ProcessDefinition currentProjectSubProcess = processDefinitionMapper.queryByDefineName(targetProject.getId(), subProcess.getName()); ProcessDefinition currentProjectSubProcess = processDefinitionMapper.queryByDefineName(targetProject.getId(), subProcess.getName());
if (null == currentProjectSubProcess) { if (null == currentProjectSubProcess) {
ArrayNode subJsonArray = (ArrayNode) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get(TASKS); ArrayNode subJsonArray = (ArrayNode) JSONUtils.parseObject(subProcessJson).get(TASKS);
List<Object> subProcessList = StreamUtils.asStream(subJsonArray.elements()) List<Object> subProcessList = StreamUtils.asStream(subJsonArray.elements())
.filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).path("type").asText())) .filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).path("type").asText()))
.collect(Collectors.toList()); .collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(subProcessList)) { if (CollectionUtils.isNotEmpty(subProcessList)) {
importSubProcess(loginUser, targetProject, subJsonArray, subProcessIdMap); importSubProcess(loginUser, targetProject, subJsonArray, subProcessCodeMap);
//sub process processId correct //sub process processId correct
if (!subProcessIdMap.isEmpty()) { if (!subProcessCodeMap.isEmpty()) {
for (Map.Entry<Integer, Integer> entry : subProcessIdMap.entrySet()) { for (Map.Entry<Long, Long> entry : subProcessCodeMap.entrySet()) {
String oldSubProcessId = "\"processDefinitionId\":" + entry.getKey(); String oldSubProcessCode = "\"processDefinitionCode\":" + entry.getKey();
String newSubProcessId = "\"processDefinitionId\":" + entry.getValue(); String newSubProcessCode = "\"processDefinitionCode\":" + entry.getValue();
subProcessJson = subProcessJson.replaceAll(oldSubProcessId, newSubProcessId); subProcessJson = subProcessJson.replaceAll(oldSubProcessCode, newSubProcessCode);
} }
subProcessIdMap.clear(); subProcessCodeMap.clear();
} }
} }
//if sub-process recursion try {
Date now = new Date(); createProcessDefinition(loginUser
//create sub process in target project , targetProject.getName(),
ProcessDefinition processDefine = new ProcessDefinition(); subProcess.getName(),
processDefine.setName(subProcess.getName()); subProcessJson,
processDefine.setVersion(subProcess.getVersion()); subProcess.getDescription(),
processDefine.setReleaseState(subProcess.getReleaseState()); subProcess.getLocations(),
processDefine.setProjectId(targetProject.getId()); subProcess.getConnects());
processDefine.setUserId(loginUser.getId()); logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), subProcess.getName());
processDefine.setProcessDefinitionJson(subProcessJson);
processDefine.setDescription(subProcess.getDescription()); } catch (Exception e) {
processDefine.setLocations(subProcess.getLocations()); logger.error("import process meta json data: {}", e.getMessage(), e);
processDefine.setConnects(subProcess.getConnects()); }
processDefine.setTimeout(subProcess.getTimeout());
processDefine.setTenantId(subProcess.getTenantId());
processDefine.setGlobalParams(subProcess.getGlobalParams());
processDefine.setCreateTime(now);
processDefine.setUpdateTime(now);
processDefine.setFlag(subProcess.getFlag());
processDefine.setWarningGroupId(subProcess.getWarningGroupId());
processDefinitionMapper.insert(processDefine);
logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), processDefine.getName());
//modify task node //modify task node
ProcessDefinition newSubProcessDefine = processDefinitionMapper.queryByDefineName(processDefine.getProjectId(), processDefine.getName()); ProcessDefinition newSubProcessDefine = processDefinitionMapper.queryByDefineName(subProcess.getProjectId(), subProcess.getName());
if (null != newSubProcessDefine) { if (null != newSubProcessDefine) {
subProcessIdMap.put(subProcessId, newSubProcessDefine.getId()); subProcessCodeMap.put(subProcessCode, newSubProcessDefine.getCode());
subParams.put(PROCESSDEFINITIONID, newSubProcessDefine.getId()); subParams.put(PROCESSDEFINITIONCODE, newSubProcessDefine.getId());
taskNode.set("params", subParams); taskNode.set("params", subParams);
} }
} }
@ -1187,15 +1197,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineId); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineId);
return result; return result;
} }
ProcessData processData = processService.genProcessData(processDefinition);
String processDefinitionJson = processDefinition.getProcessDefinitionJson();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
//process data check //process data check
if (null == processData) { if (null == processData) {
logger.error("process data is null"); logger.error("process data is null");
putMsg(result, Status.DATA_IS_NOT_VALID, processDefinitionJson); putMsg(result, Status.DATA_IS_NOT_VALID, JSONUtils.toJsonString(processData));
return result; return result;
} }
@ -1233,8 +1240,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
for (ProcessDefinition processDefinition : processDefinitionList) { for (ProcessDefinition processDefinition : processDefinitionList) {
String processDefinitionJson = processDefinition.getProcessDefinitionJson(); ProcessData processData = processService.genProcessData(processDefinition);
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
taskNodeMap.put(processDefinition.getId(), taskNodeList); taskNodeMap.put(processDefinition.getId(), taskNodeList);
} }
@ -1258,6 +1264,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
HashMap<String, Object> result = new HashMap<>(); HashMap<String, Object> result = new HashMap<>();
List<ProcessDefinition> resourceList = processDefinitionMapper.queryAllDefinitionList(projectId); List<ProcessDefinition> resourceList = processDefinitionMapper.queryAllDefinitionList(projectId);
resourceList.stream().forEach(processDefinition -> {
ProcessData processData = processService.genProcessData(processDefinition);
processDefinition.setProcessDefinitionJson(JSONUtils.toJsonString(processData));
});
result.put(Constants.DATA_LIST, resourceList); result.put(Constants.DATA_LIST, resourceList);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
@ -1449,16 +1459,18 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
return result; return result;
} else { } else {
ProcessData processData = JSONUtils.parseObject(processDefinition.getProcessDefinitionJson(), ProcessData.class); ProcessData processData = processService.genProcessData(processDefinition);
List<TaskNode> taskNodeList = processData.getTasks(); List<TaskNode> taskNodeList = processData.getTasks();
taskNodeList.stream().forEach(taskNode -> { taskNodeList.stream().forEach(taskNode -> {
taskNode.setCode(0L); taskNode.setCode(0L);
}); });
processData.setTasks(taskNodeList);
String processDefinitionJson = JSONUtils.toJsonString(processData);
return createProcessDefinition( return createProcessDefinition(
loginUser, loginUser,
targetProject.getName(), targetProject.getName(),
processDefinition.getName() + "_copy_" + DateUtils.getCurrentTimeStamp(), processDefinition.getName() + "_copy_" + DateUtils.getCurrentTimeStamp(),
processDefinition.getProcessDefinitionJson(), processDefinitionJson,
processDefinition.getDescription(), processDefinition.getDescription(),
processDefinition.getLocations(), processDefinition.getLocations(),
processDefinition.getConnects()); processDefinition.getConnects());

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -17,8 +17,9 @@
package org.apache.dolphinscheduler.api.service.impl; package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.CheckUtils; import org.apache.dolphinscheduler.api.utils.CheckUtils;
@ -102,7 +103,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class); TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class);
checkTaskNode(result, taskNode, taskDefinitionJson); checkTaskNode(result, taskNode, taskDefinitionJson);
if (result.get(Constants.STATUS) != Status.SUCCESS) { if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID
|| result.get(Constants.STATUS) == Status.PROCESS_NODE_S_PARAMETER_INVALID) {
return result; return result;
} }
TaskDefinition taskDefinition = new TaskDefinition(); TaskDefinition taskDefinition = new TaskDefinition();
@ -218,7 +220,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
} }
TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class); TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class);
checkTaskNode(result, taskNode, taskDefinitionJson); checkTaskNode(result, taskNode, taskDefinitionJson);
if (result.get(Constants.STATUS) != Status.SUCCESS) { if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID
|| result.get(Constants.STATUS) == Status.PROCESS_NODE_S_PARAMETER_INVALID) {
return result; return result;
} }
int update = processService.updateTaskDefinition(loginUser, project.getCode(), taskNode, taskDefinition); int update = processService.updateTaskDefinition(loginUser, project.getCode(), taskNode, taskDefinition);

14
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2415,6 +2415,20 @@ public class ProcessService {
return processTaskRelations; return processTaskRelations;
} }
/**
* generate ProcessData
*/
public ProcessData genProcessData(ProcessDefinition processDefinition) {
List<TaskNode> taskNodes = genTaskNodeList(processDefinition.getCode()
, processDefinition.getVersion());
ProcessData processData = new ProcessData();
processData.setTasks(taskNodes);
processData.setGlobalParams(JSONUtils.toList(processDefinition.getGlobalParams(), Property.class));
processData.setTenantId(processDefinition.getTenantId());
processData.setTimeout(processDefinition.getTimeout());
return processData;
}
public List<TaskNode> genTaskNodeList(Long processCode, int processVersion) { public List<TaskNode> genTaskNodeList(Long processCode, int processVersion) {
List<ProcessTaskRelation> processTaskRelations = this.getProcessTaskRelationList(processCode, processVersion); List<ProcessTaskRelation> processTaskRelations = this.getProcessTaskRelationList(processCode, processVersion);
Set<TaskDefinition> taskDefinitionSet = new HashSet<>(); Set<TaskDefinition> taskDefinitionSet = new HashSet<>();

Loading…
Cancel
Save