Browse Source

cherry-pick [Bug](dependent) Dependent downstream trigger error when schedule cycle not day. (#11987)

Co-authored-by: Stalary <stalary@163.com>
3.0.1-release
Kerwin 2 years ago committed by GitHub
parent
commit
901d01fb5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  2. 17
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java
  3. 1
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml

17
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -804,6 +804,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
dependentCommand.setProcessDefinitionVersion(dependentProcessDefinition.getProcessDefinitionVersion());
dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
Map<String, String> cmdParam = JSONUtils.toMap(dependentCommand.getCommandParam());
cmdParam.put(CMD_PARAM_START_NODES, String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
@ -823,7 +824,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
List<DependentProcessDefinition> dependentProcessDefinitionList =
processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
return checkDependentProcessDefinitionValid(dependentProcessDefinitionList,processDefinitionCycle,workerGroup);
return checkDependentProcessDefinitionValid(dependentProcessDefinitionList, processDefinitionCycle,
workerGroup, processDefinitionCode);
}
/**
@ -831,9 +833,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* the dependent process definition and if there is no worker group in the schedule, use the complement selection's
* worker group
*/
private List<DependentProcessDefinition> checkDependentProcessDefinitionValid(List<DependentProcessDefinition> dependentProcessDefinitionList,
CycleEnum processDefinitionCycle,
String workerGroup) {
private List<DependentProcessDefinition> checkDependentProcessDefinitionValid(
List<DependentProcessDefinition> dependentProcessDefinitionList,
CycleEnum processDefinitionCycle,
String workerGroup,
long upstreamProcessDefinitionCode) {
List<DependentProcessDefinition> validDependentProcessDefinitionList = new ArrayList<>();
List<Long> processDefinitionCodeList = dependentProcessDefinitionList.stream()
@ -843,8 +847,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
Map<Long, String> processDefinitionWorkerGroupMap = processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
if (dependentProcessDefinition.getDependentCycle() == processDefinitionCycle) {
if (processDefinitionWorkerGroupMap.get(dependentProcessDefinition.getProcessDefinitionCode()) == null) {
if (dependentProcessDefinition.getDependentCycle(upstreamProcessDefinitionCode) == processDefinitionCycle) {
if (processDefinitionWorkerGroupMap
.get(dependentProcessDefinition.getProcessDefinitionCode()) == null) {
dependentProcessDefinition.setWorkerGroup(workerGroup);
}

17
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DependentProcessDefinition.java

@ -41,6 +41,11 @@ public class DependentProcessDefinition {
*/
private String processDefinitionName;
/**
* process definition version
**/
private int processDefinitionVersion;
/**
* task definition name
*/
@ -60,14 +65,14 @@ public class DependentProcessDefinition {
* get dependent cycle
* @return CycleEnum
*/
public CycleEnum getDependentCycle() {
public CycleEnum getDependentCycle(long upstreamProcessDefinitionCode) {
DependentParameters dependentParameters = this.getDependentParameters();
List<DependentTaskModel> dependentTaskModelList = dependentParameters.getDependTaskList();
for (DependentTaskModel dependentTaskModel : dependentTaskModelList) {
List<DependentItem> dependentItemList = dependentTaskModel.getDependItemList();
for (DependentItem dependentItem : dependentItemList) {
if (this.getProcessDefinitionCode() == dependentItem.getDefinitionCode()) {
if (upstreamProcessDefinitionCode == dependentItem.getDefinitionCode()) {
return cycle2CycleEnum(dependentItem.getCycle());
}
}
@ -122,6 +127,14 @@ public class DependentProcessDefinition {
this.processDefinitionCode = code;
}
public int getProcessDefinitionVersion() {
return processDefinitionVersion;
}
public void setProcessDefinitionVersion(int processDefinitionVersion) {
this.processDefinitionVersion = processDefinitionVersion;
}
public long getTaskDefinitionCode() {
return this.taskDefinitionCode;
}

1
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml

@ -149,6 +149,7 @@
SELECT
c.code AS process_definition_code
,c.name AS process_definition_name
,c.version as process_definition_version
,a.code AS task_definition_code
,a.task_params
FROM

Loading…
Cancel
Save