diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 014dce8a98..d3da26f0f2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -61,7 +61,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; @@ -144,9 +143,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro @Autowired private ProcessTaskRelationMapper processTaskRelationMapper; - @Autowired - private ProcessTaskRelationLogMapper processTaskRelationLogMapper; - @Autowired TaskDefinitionLogMapper taskDefinitionLogMapper; @@ -190,9 +186,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return checkProcessJson; } - Long processDefinitionCode; try { - processDefinitionCode = SnowFlakeUtils.getInstance().nextId(); + long processDefinitionCode = SnowFlakeUtils.getInstance().nextId(); processDefinition.setCode(processDefinitionCode); } catch (SnowFlakeException e) { putMsg(result, Status.CREATE_PROCESS_DEFINITION); @@ -1346,7 +1341,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } runningNodeMap.remove(nodeName); } - if (waitingRunningNodeMap == null || waitingRunningNodeMap.size() == 0) { + if (waitingRunningNodeMap.size() == 0) { break; } else { runningNodeMap.putAll(waitingRunningNodeMap); @@ -1359,7 +1354,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } - /** * whether the graph has a ring * @@ -1368,15 +1362,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro */ private boolean graphHasCycle(List taskNodeResponseList) { DAG graph = new DAG<>(); - // Fill the vertices for (TaskNode taskNodeResponse : taskNodeResponseList) { graph.addNode(taskNodeResponse.getName(), taskNodeResponse); } - // Fill edge relations for (TaskNode taskNodeResponse : taskNodeResponseList) { - taskNodeResponse.getPreTasks(); List preTasks = JSONUtils.toList(taskNodeResponse.getPreTasks(), String.class); if (CollectionUtils.isNotEmpty(preTasks)) { for (String preTask : preTasks) { @@ -1386,7 +1377,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } } } - return graph.hasCycle(); } @@ -1634,7 +1624,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * @param processDefinitionId processDefinitionId */ private void setFailedProcessList(List failedProcessList, String processDefinitionId) { - ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(Integer.valueOf(processDefinitionId)); + ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(Integer.parseInt(processDefinitionId)); if (processDefinition != null) { failedProcessList.add(processDefinitionId + "[" + processDefinition.getName() + "]"); } else { diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 260e660910..981767a38b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -350,17 +350,15 @@ public class ProcessService { logger.error("process define not exists"); return new ArrayList<>(); } - List processTaskRelations = getProcessTaskRelationList(processDefinition.getCode(), processDefinition.getVersion()); - Map taskDefinitionMap = new HashMap<>(); - for (ProcessTaskRelation processTaskRelation : processTaskRelations) { - if (taskDefinitionMap.containsKey(processTaskRelation.getPostTaskCode())) { - TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion( - processTaskRelation.getPostTaskCode(), processTaskRelation.getPostNodeVersion()); - taskDefinitionMap.put(processTaskRelation.getPostTaskCode(), taskDefinition); + List processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); + Set taskDefinitionSet = new HashSet<>(); + for (ProcessTaskRelationLog processTaskRelation : processTaskRelations) { + if (processTaskRelation.getPostTaskCode() > 0) { + taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostNodeVersion())); } } - return new ArrayList<>(taskDefinitionMap.values()); - + List taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet); + return new ArrayList<>(taskDefinitionLogs); } /** @@ -614,7 +612,7 @@ public class ProcessService { processInstance.setConnects(processDefinition.getConnects()); // reset global params while there are start parameters - setGlobalParamIfCommanded(processDefinition,cmdParam); + setGlobalParamIfCommanded(processDefinition, cmdParam); // curing global params processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( @@ -753,10 +751,10 @@ public class ProcessService { // Recalculate global parameters after rerun. processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - commandTypeIfComplement, - processInstance.getScheduleTime())); + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + commandTypeIfComplement, + processInstance.getScheduleTime())); } processDefinition = processDefineMapper.selectById(processInstance.getProcessDefinitionId()); processInstance.setProcessDefinition(processDefinition); @@ -2090,13 +2088,13 @@ public class ProcessService { } /** - * get resource by resoruce id + * get resource by resource id * - * @param resoruceId resource id + * @param resourceId resource id * @return Resource */ - public Resource getResourceById(int resoruceId) { - return resourceMapper.selectById(resoruceId); + public Resource getResourceById(int resourceId) { + return resourceMapper.selectById(resourceId); } /** @@ -2134,16 +2132,15 @@ public class ProcessService { return Constants.EXIT_CODE_FAILURE; } - ProcessDefinition tmpDefinition = processDefinitionLog; - tmpDefinition.setId(processDefinition.getId()); - tmpDefinition.setReleaseState(ReleaseState.OFFLINE); - tmpDefinition.setFlag(Flag.YES); + processDefinitionLog.setId(processDefinition.getId()); + processDefinitionLog.setReleaseState(ReleaseState.OFFLINE); + processDefinitionLog.setFlag(Flag.YES); int switchResult = 0; if (0 == processDefinition.getId()) { - switchResult = processDefineMapper.insert(tmpDefinition); + switchResult = processDefineMapper.insert(processDefinitionLog); } else { - switchResult = processDefineMapper.updateById(tmpDefinition); + switchResult = processDefineMapper.updateById(processDefinitionLog); } switchProcessTaskRelationVersion(processDefinition); return switchResult; @@ -2156,8 +2153,7 @@ public class ProcessService { } List processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); for (ProcessTaskRelationLog processTaskRelationLog : processTaskRelationLogList) { - ProcessTaskRelation processTaskRelation = processTaskRelationLog; - processTaskRelationMapper.insert(processTaskRelation); + processTaskRelationMapper.insert(processTaskRelationLog); } } @@ -2327,14 +2323,12 @@ public class ProcessService { now)); } } else { - // todo relation name builderRelationList.add(new ProcessTaskRelation("", processDefinition.getVersion(), projectCode, processDefinition.getCode(), 0L, taskNameAndCode.get(taskNode.getName()), - // todo conditionType ConditionType.of("none"), taskNode.getConditionResult(), now, @@ -2394,33 +2388,17 @@ public class ProcessService { */ public DAG genDagGraph(ProcessDefinition processDefinition) { List taskNodeList = genTaskNodeList(processDefinition.getCode(), processDefinition.getVersion()); - List processTaskRelations = getProcessTaskRelationList(processDefinition.getCode(), processDefinition.getVersion()); - ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, processTaskRelations); + List processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); + ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new ArrayList<>(processTaskRelations)); // Generate concrete Dag to be executed return DagHelper.buildDagGraph(processDag); } - /** - * get process task relation list - * this function can be query relation list from log record - */ - public List getProcessTaskRelationList(Long processCode, int processVersion) { - List taskRelationLogs = processTaskRelationLogMapper.queryByProcessCodeAndVersion( - processCode, - processVersion); - List processTaskRelations = new ArrayList<>(); - for (ProcessTaskRelationLog processTaskRelationLog : taskRelationLogs) { - processTaskRelations.add(processTaskRelationLog); - } - return processTaskRelations; - } - /** * generate ProcessData */ public ProcessData genProcessData(ProcessDefinition processDefinition) { - List taskNodes = genTaskNodeList(processDefinition.getCode() - , processDefinition.getVersion()); + List taskNodes = genTaskNodeList(processDefinition.getCode(), processDefinition.getVersion()); ProcessData processData = new ProcessData(); processData.setTasks(taskNodes); processData.setGlobalParams(JSONUtils.toList(processDefinition.getGlobalParams(), Property.class)); @@ -2430,10 +2408,10 @@ public class ProcessService { } public List genTaskNodeList(Long processCode, int processVersion) { - List processTaskRelations = this.getProcessTaskRelationList(processCode, processVersion); + List processTaskRelations = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion); Set taskDefinitionSet = new HashSet<>(); Map taskNodeMap = new HashMap<>(); - for (ProcessTaskRelation processTaskRelation : processTaskRelations) { + for (ProcessTaskRelationLog processTaskRelation : processTaskRelations) { if (processTaskRelation.getPreTaskCode() > 0) { taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreNodeVersion())); }