Browse Source

add null check when import process has sub-process (#1806)

* fix bug: zk hasTask method NPE

* add retMap null check for AlertSender

* add null check when import process has sub-process
pull/2/head
Yelli 5 years ago committed by qiaozhanwei
parent
commit
38f30b5c4d
  1. 114
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

114
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -802,66 +802,68 @@ public class ProcessDefinitionService extends BaseDAGService {
JSONObject subParams = JSONUtils.parseObject(taskNode.getString("params"));
Integer subProcessId = subParams.getInteger("processDefinitionId");
ProcessDefinition subProcess = processDefineMapper.queryByDefineId(subProcessId);
String subProcessJson = subProcess.getProcessDefinitionJson();
//check current project has sub process
ProcessDefinition currentProjectSubProcess = processDefineMapper.queryByDefineName(targetProject.getId(), subProcess.getName());
if (null == currentProjectSubProcess) {
JSONArray subJsonArray = (JSONArray) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get("tasks");
List<Object> subProcessList = subJsonArray.stream()
.filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).getString("type")))
.collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(subProcessList)) {
importSubProcess(loginUser, targetProject, subJsonArray, subProcessIdMap);
//sub process processId correct
if (!subProcessIdMap.isEmpty()) {
for (Map.Entry<Integer, Integer> entry : subProcessIdMap.entrySet()) {
String oldSubProcessId = "\"processDefinitionId\":" + entry.getKey();
String newSubProcessId = "\"processDefinitionId\":" + entry.getValue();
subProcessJson = subProcessJson.replaceAll(oldSubProcessId, newSubProcessId);
//check is sub process exist in db
if (null != subProcess) {
String subProcessJson = subProcess.getProcessDefinitionJson();
//check current project has sub process
ProcessDefinition currentProjectSubProcess = processDefineMapper.queryByDefineName(targetProject.getId(), subProcess.getName());
if (null == currentProjectSubProcess) {
JSONArray subJsonArray = (JSONArray) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get("tasks");
List<Object> subProcessList = subJsonArray.stream()
.filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).getString("type")))
.collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(subProcessList)) {
importSubProcess(loginUser, targetProject, subJsonArray, subProcessIdMap);
//sub process processId correct
if (!subProcessIdMap.isEmpty()) {
for (Map.Entry<Integer, Integer> entry : subProcessIdMap.entrySet()) {
String oldSubProcessId = "\"processDefinitionId\":" + entry.getKey();
String newSubProcessId = "\"processDefinitionId\":" + entry.getValue();
subProcessJson = subProcessJson.replaceAll(oldSubProcessId, newSubProcessId);
}
subProcessIdMap.clear();
}
subProcessIdMap.clear();
}
}
//if sub-process recursion
Date now = new Date();
//create sub process in target project
ProcessDefinition processDefine = new ProcessDefinition();
processDefine.setName(subProcess.getName());
processDefine.setVersion(subProcess.getVersion());
processDefine.setReleaseState(subProcess.getReleaseState());
processDefine.setProjectId(targetProject.getId());
processDefine.setUserId(loginUser.getId());
processDefine.setProcessDefinitionJson(subProcessJson);
processDefine.setDescription(subProcess.getDescription());
processDefine.setLocations(subProcess.getLocations());
processDefine.setConnects(subProcess.getConnects());
processDefine.setTimeout(subProcess.getTimeout());
processDefine.setTenantId(subProcess.getTenantId());
processDefine.setGlobalParams(subProcess.getGlobalParams());
processDefine.setCreateTime(now);
processDefine.setUpdateTime(now);
processDefine.setFlag(subProcess.getFlag());
processDefine.setReceivers(subProcess.getReceivers());
processDefine.setReceiversCc(subProcess.getReceiversCc());
processDefineMapper.insert(processDefine);
logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), processDefine.getName());
//modify task node
ProcessDefinition newSubProcessDefine = processDefineMapper.queryByDefineName(processDefine.getProjectId(),processDefine.getName());
if (null != newSubProcessDefine) {
subProcessIdMap.put(subProcessId, newSubProcessDefine.getId());
subParams.put("processDefinitionId", newSubProcessDefine.getId());
taskNode.put("params", subParams);
//if sub-process recursion
Date now = new Date();
//create sub process in target project
ProcessDefinition processDefine = new ProcessDefinition();
processDefine.setName(subProcess.getName());
processDefine.setVersion(subProcess.getVersion());
processDefine.setReleaseState(subProcess.getReleaseState());
processDefine.setProjectId(targetProject.getId());
processDefine.setUserId(loginUser.getId());
processDefine.setProcessDefinitionJson(subProcessJson);
processDefine.setDescription(subProcess.getDescription());
processDefine.setLocations(subProcess.getLocations());
processDefine.setConnects(subProcess.getConnects());
processDefine.setTimeout(subProcess.getTimeout());
processDefine.setTenantId(subProcess.getTenantId());
processDefine.setGlobalParams(subProcess.getGlobalParams());
processDefine.setCreateTime(now);
processDefine.setUpdateTime(now);
processDefine.setFlag(subProcess.getFlag());
processDefine.setReceivers(subProcess.getReceivers());
processDefine.setReceiversCc(subProcess.getReceiversCc());
processDefineMapper.insert(processDefine);
logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), processDefine.getName());
//modify task node
ProcessDefinition newSubProcessDefine = processDefineMapper.queryByDefineName(processDefine.getProjectId(),processDefine.getName());
if (null != newSubProcessDefine) {
subProcessIdMap.put(subProcessId, newSubProcessDefine.getId());
subParams.put("processDefinitionId", newSubProcessDefine.getId());
taskNode.put("params", subParams);
}
}
}
}
}

Loading…
Cancel
Save