diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 957447dd68..b4fb3fb8aa 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -907,6 +907,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ dependentCommand.setTaskDependType(TaskDependType.TASK_POST); for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) { dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode()); + dependentCommand.setProcessDefinitionVersion(dependentProcessDefinition.getProcessDefinitionVersion()); dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup()); Map cmdParam = JSONUtils.toMap(dependentCommand.getCommandParam()); cmdParam.put(CMD_PARAM_START_NODES, String.valueOf(dependentProcessDefinition.getTaskDefinitionCode())); @@ -927,7 +928,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode); return checkDependentProcessDefinitionValid(dependentProcessDefinitionList, processDefinitionCycle, - workerGroup); + workerGroup, processDefinitionCode); } /** @@ -938,7 +939,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ private List checkDependentProcessDefinitionValid( List dependentProcessDefinitionList, CycleEnum processDefinitionCycle, - String workerGroup) { + String workerGroup, + long upstreamProcessDefinitionCode) { List validDependentProcessDefinitionList = new ArrayList<>(); List processDefinitionCodeList = @@ -949,7 +951,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList); for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) { - if (dependentProcessDefinition.getDependentCycle() == processDefinitionCycle) { + if (dependentProcessDefinition.getDependentCycle(upstreamProcessDefinitionCode) == processDefinitionCycle) { if (processDefinitionWorkerGroupMap .get(dependentProcessDefinition.getProcessDefinitionCode()) == null) { dependentProcessDefinition.setWorkerGroup(workerGroup); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java index 9de57dff33..87bb3d4234 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java @@ -41,6 +41,11 @@ public class DependentProcessDefinition { */ private String processDefinitionName; + /** + * process definition version + **/ + private int processDefinitionVersion; + /** * task definition name */ @@ -60,14 +65,14 @@ public class DependentProcessDefinition { * get dependent cycle * @return CycleEnum */ - public CycleEnum getDependentCycle() { + public CycleEnum getDependentCycle(long upstreamProcessDefinitionCode) { DependentParameters dependentParameters = this.getDependentParameters(); List dependentTaskModelList = dependentParameters.getDependTaskList(); for (DependentTaskModel dependentTaskModel : dependentTaskModelList) { List dependentItemList = dependentTaskModel.getDependItemList(); for (DependentItem dependentItem : dependentItemList) { - if (this.getProcessDefinitionCode() == dependentItem.getDefinitionCode()) { + if (upstreamProcessDefinitionCode == dependentItem.getDefinitionCode()) { return cycle2CycleEnum(dependentItem.getCycle()); } } @@ -122,6 +127,14 @@ public class DependentProcessDefinition { this.processDefinitionCode = code; } + public int getProcessDefinitionVersion() { + return processDefinitionVersion; + } + + public void setProcessDefinitionVersion(int processDefinitionVersion) { + this.processDefinitionVersion = processDefinitionVersion; + } + public long getTaskDefinitionCode() { return this.taskDefinitionCode; } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml index b17499bb60..2689b6d50f 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml @@ -149,6 +149,7 @@ SELECT c.code AS process_definition_code ,c.name AS process_definition_name + ,c.version as process_definition_version ,a.code AS task_definition_code ,a.task_params FROM