Browse Source

[Bug](dependent) Dependent downstream trigger error when schedule cycle not day. (#11734)

* FIX: dependent

* FIX: version

* MOD: for review
3.1.0-release
Stalary 2 years ago committed by caishunfeng
parent
commit
d7f40b19b5
  1. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  2. 17
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
  3. 1
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -909,6 +909,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// and causing duplicate when clone it.
dependentCommand.setId(null);
dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
dependentCommand.setProcessDefinitionVersion(dependentProcessDefinition.getProcessDefinitionVersion());
dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
Map<String, String> cmdParam = JSONUtils.toMap(dependentCommand.getCommandParam());
cmdParam.put(CMD_PARAM_START_NODES, String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
@ -929,7 +930,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
return checkDependentProcessDefinitionValid(dependentProcessDefinitionList, processDefinitionCycle,
workerGroup);
workerGroup, processDefinitionCode);
}
/**
@ -940,7 +941,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
private List<DependentProcessDefinition> checkDependentProcessDefinitionValid(
List<DependentProcessDefinition> dependentProcessDefinitionList,
CycleEnum processDefinitionCycle,
String workerGroup) {
String workerGroup,
long upstreamProcessDefinitionCode) {
List<DependentProcessDefinition> validDependentProcessDefinitionList = new ArrayList<>();
List<Long> processDefinitionCodeList =
@ -951,7 +953,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
if (dependentProcessDefinition.getDependentCycle() == processDefinitionCycle) {
if (dependentProcessDefinition.getDependentCycle(upstreamProcessDefinitionCode) == processDefinitionCycle) {
if (processDefinitionWorkerGroupMap
.get(dependentProcessDefinition.getProcessDefinitionCode()) == null) {
dependentProcessDefinition.setWorkerGroup(workerGroup);

17
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java

@ -41,6 +41,11 @@ public class DependentProcessDefinition {
*/
private String processDefinitionName;
/**
* process definition version
**/
private int processDefinitionVersion;
/**
* task definition name
*/
@ -60,14 +65,14 @@ public class DependentProcessDefinition {
* get dependent cycle
* @return CycleEnum
*/
public CycleEnum getDependentCycle() {
public CycleEnum getDependentCycle(long upstreamProcessDefinitionCode) {
DependentParameters dependentParameters = this.getDependentParameters();
List<DependentTaskModel> dependentTaskModelList = dependentParameters.getDependTaskList();
for (DependentTaskModel dependentTaskModel : dependentTaskModelList) {
List<DependentItem> dependentItemList = dependentTaskModel.getDependItemList();
for (DependentItem dependentItem : dependentItemList) {
if (this.getProcessDefinitionCode() == dependentItem.getDefinitionCode()) {
if (upstreamProcessDefinitionCode == dependentItem.getDefinitionCode()) {
return cycle2CycleEnum(dependentItem.getCycle());
}
}
@ -122,6 +127,14 @@ public class DependentProcessDefinition {
this.processDefinitionCode = code;
}
public int getProcessDefinitionVersion() {
return processDefinitionVersion;
}
public void setProcessDefinitionVersion(int processDefinitionVersion) {
this.processDefinitionVersion = processDefinitionVersion;
}
public long getTaskDefinitionCode() {
return this.taskDefinitionCode;
}

1
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml

@ -149,6 +149,7 @@
SELECT
c.code AS process_definition_code
,c.name AS process_definition_name
,c.version as process_definition_version
,a.code AS task_definition_code
,a.task_params
FROM

Loading…
Cancel
Save