diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 374abb6fca..56f71aef8c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -190,6 +190,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro try { long processDefinitionCode = SnowFlakeUtils.getInstance().nextId(); processDefinition.setCode(processDefinitionCode); + processDefinition.setVersion(1); } catch (SnowFlakeException e) { putMsg(result, Status.CREATE_PROCESS_DEFINITION); return result; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java index da97df6d06..aad2b5699e 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java @@ -115,7 +115,9 @@ public class ProcessTaskRelation { long projectCode, long processDefinitionCode, long preTaskCode, + int preTaskVersion, long postTaskCode, + int postTaskVersion, ConditionType conditionType, String conditionParams, Date createTime, @@ -125,7 +127,9 @@ public class ProcessTaskRelation { this.projectCode = projectCode; this.processDefinitionCode = processDefinitionCode; this.preTaskCode = preTaskCode; + this.preTaskVersion = preTaskVersion; this.postTaskCode = postTaskCode; + this.postTaskVersion = postTaskVersion; this.conditionType = conditionType; this.conditionParams = conditionParams; this.createTime = createTime; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java index 71f72ab13f..59ea5c567b 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java @@ -43,12 +43,11 @@ public interface TaskDefinitionLogMapper extends BaseMapper { @Param("taskDefinitionName") String name); /** - * query task definition log list + * query max version for definition * * @param taskDefinitionCode taskDefinitionCode - * @return task definition log list */ - List queryByDefinitionCode(@Param("taskDefinitionCode") long taskDefinitionCode); + Integer queryMaxVersionForDefinition(@Param("taskDefinitionCode") long taskDefinitionCode); /** * query task definition log diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml index 5670f064b8..f051aa7eca 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml @@ -34,9 +34,8 @@ WHERE p.code = #{projectCode} and td.name = #{taskDefinitionName} - + select max(version) from t_ds_task_definition_log WHERE code = #{taskDefinitionCode} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 4f9e88a6a3..c14038f476 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -2126,7 +2126,7 @@ public class ProcessService { /** * switch process definition version to process definition log version */ - public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) { + public int processDefinitionToDB(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) { if (null == processDefinition || null == processDefinitionLog) { return Constants.EXIT_CODE_FAILURE; } @@ -2135,13 +2135,23 @@ public class ProcessService { processDefinitionLog.setReleaseState(ReleaseState.OFFLINE); processDefinitionLog.setFlag(Flag.YES); - int switchResult = 0; + int result; if (0 == processDefinition.getId()) { - switchResult = processDefineMapper.insert(processDefinitionLog); + result = processDefineMapper.insert(processDefinitionLog); } else { - switchResult = processDefineMapper.updateById(processDefinitionLog); + result = processDefineMapper.updateById(processDefinitionLog); + } + return result; + } + + /** + * switch process definition version to process definition log version + */ + public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) { + int switchResult = processDefinitionToDB(processDefinition, processDefinitionLog); + if (switchResult != Constants.EXIT_CODE_FAILURE) { + switchProcessTaskRelationVersion(processDefinition); } - switchProcessTaskRelationVersion(processDefinition); return switchResult; } @@ -2161,16 +2171,11 @@ public class ProcessService { */ public int updateTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) { - List taskDefinitionLogs = taskDefinitionLogMapper.queryByDefinitionCode(taskDefinition.getCode()); - int version = taskDefinitionLogs - .stream() - .map(TaskDefinitionLog::getVersion) - .max((x, y) -> x > y ? x : y) - .orElse(0) + 1; + Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinition.getCode()); Date now = new Date(); taskDefinition.setProjectCode(projectCode); taskDefinition.setUserId(operator.getId()); - taskDefinition.setVersion(version); + taskDefinition.setVersion(version == null || version == 0 ? 1 : version); taskDefinition.setUpdateTime(now); setTaskFromTaskNode(taskNode, taskDefinition); int update = taskDefinitionMapper.updateById(taskDefinition); @@ -2230,7 +2235,7 @@ public class ProcessService { createTaskAndRelation(operator, project.getCode(), processDefinition, processData); ProcessDefinitionLog processDefinitionLog = insertProcessDefinitionLog(operator, processDefinition.getCode(), name, processData, project, desc, locations, connects); - return switchVersion(processDefinition, processDefinitionLog); + return processDefinitionToDB(processDefinition, processDefinitionLog); } /** @@ -2280,13 +2285,12 @@ public class ProcessService { ProcessDefinition processDefinition, ProcessData processData) { List taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); - Map taskNameAndCode = new HashMap<>(); + Map taskNameAndCode = new HashMap<>(); for (TaskNode taskNode : taskNodeList) { TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionName(projectCode, taskNode.getName()); if (taskDefinition == null) { - long code; try { - code = SnowFlakeUtils.getInstance().nextId(); + long code = SnowFlakeUtils.getInstance().nextId(); taskDefinition = new TaskDefinition(); taskDefinition.setCode(code); } catch (SnowFlakeException e) { @@ -2299,7 +2303,7 @@ public class ProcessService { } updateTaskDefinition(operator, projectCode, taskNode, taskDefinition); } - taskNameAndCode.put(taskNode.getName(), taskDefinition.getCode()); + taskNameAndCode.put(taskNode.getName(), taskDefinition); } List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode()); if (!processTaskRelationList.isEmpty()) { @@ -2315,8 +2319,10 @@ public class ProcessService { processDefinition.getVersion(), projectCode, processDefinition.getCode(), - taskNameAndCode.get(preTaskName), - taskNameAndCode.get(taskNode.getName()), + taskNameAndCode.get(preTaskName).getCode(), + taskNameAndCode.get(preTaskName).getVersion(), + taskNameAndCode.get(taskNode.getName()).getCode(), + taskNameAndCode.get(taskNode.getName()).getVersion(), ConditionType.of("none"), taskNode.getConditionResult(), now, @@ -2327,8 +2333,10 @@ public class ProcessService { processDefinition.getVersion(), projectCode, processDefinition.getCode(), - 0L, - taskNameAndCode.get(taskNode.getName()), + 0L, // this isn't previous task node, set zero + 0, + taskNameAndCode.get(taskNode.getName()).getCode(), + taskNameAndCode.get(taskNode.getName()).getVersion(), ConditionType.of("none"), taskNode.getConditionResult(), now,