From ed78aea2f8a473eac747993c1a9b4fcfa537a6e9 Mon Sep 17 00:00:00 2001 From: lenboo Date: Sat, 1 Aug 2020 16:48:23 +0800 Subject: [PATCH 1/4] fix bug: there exists some problems in checking task dependency --- .../master/runner/MasterExecThread.java | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index c0bc3aaea8..14b9a3aea6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner; import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; +import net.sf.jsqlparser.expression.NextValExpression; import org.apache.commons.io.FileUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; @@ -663,7 +664,6 @@ public class MasterExecThread implements Runnable { if(startNodes.contains(taskName)){ return DependResult.SUCCESS; } - TaskNode taskNode = dag.getNode(taskName); List depNameList = taskNode.getDepList(); for(String depsNode : depNameList ){ @@ -672,25 +672,30 @@ public class MasterExecThread implements Runnable { || skipTaskNodeList.containsKey(depsNode)){ continue; } - // dependencies must be fully completed + // all the dependencies must be completed if(!completeTaskList.containsKey(depsNode)){ return DependResult.WAITING; } + // depend node has already complete. ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); - // conditions task would not return failed. - if(depTaskState.typeIsFailure() - && !DagHelper.haveConditionsAfterNode(depsNode, dag ) - && !dag.getNode(depsNode).isConditionsTask()){ - return DependResult.FAILED; - } - if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){ return DependResult.WAITING; } + // ignore task state if current task is condition + if(taskNode.isConditionsTask()){ + continue; + } + if(dag.getNode(depsNode).isConditionsTask()){ + //condition task need check the branch to run + List nextTaskList = parseConditionTask(depsNode); + if(!nextTaskList.contains(taskName)){ + return DependResult.FAILED; + } + }else if(depTaskState.typeIsFailure()){ + return DependResult.FAILED; + } } - logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray())); - return DependResult.SUCCESS; } From 0e885cd0eecff4994bed2c2020c728ff48ee0fa1 Mon Sep 17 00:00:00 2001 From: lenboo Date: Sat, 1 Aug 2020 17:26:30 +0800 Subject: [PATCH 2/4] remove unused import --- .../dolphinscheduler/server/master/runner/MasterExecThread.java | 1 - 1 file changed, 1 deletion(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 14b9a3aea6..097e8d74d2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.master.runner; import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; -import net.sf.jsqlparser.expression.NextValExpression; import org.apache.commons.io.FileUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; From cdd76d6b118a65f863f3581f0911807f6fa43fc8 Mon Sep 17 00:00:00 2001 From: lenboo Date: Sun, 2 Aug 2020 11:13:50 +0800 Subject: [PATCH 3/4] refactor depend result --- .../master/runner/MasterExecThread.java | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 097e8d74d2..479d6560a6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -675,7 +675,6 @@ public class MasterExecThread implements Runnable { if(!completeTaskList.containsKey(depsNode)){ return DependResult.WAITING; } - // depend node has already complete. ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){ return DependResult.WAITING; @@ -684,13 +683,7 @@ public class MasterExecThread implements Runnable { if(taskNode.isConditionsTask()){ continue; } - if(dag.getNode(depsNode).isConditionsTask()){ - //condition task need check the branch to run - List nextTaskList = parseConditionTask(depsNode); - if(!nextTaskList.contains(taskName)){ - return DependResult.FAILED; - } - }else if(depTaskState.typeIsFailure()){ + if(!dependTaskSuccess(depsNode, taskName)){ return DependResult.FAILED; } } @@ -698,6 +691,28 @@ public class MasterExecThread implements Runnable { return DependResult.SUCCESS; } + /** + * depend node is completed, but here need check the condition task branch is the next node + * @param dependNodeName + * @param nextNodeName + * @return + */ + private boolean dependTaskSuccess(String dependNodeName, String nextNodeName){ + if(dag.getNode(dependNodeName).isConditionsTask()){ + //condition task need check the branch to run + List nextTaskList = parseConditionTask(dependNodeName); + if(!nextTaskList.contains(nextNodeName)){ + return false; + } + }else { + ExecutionStatus depTaskState = completeTaskList.get(dependNodeName).getState(); + if(depTaskState.typeIsFailure()){ + return false; + } + } + return true; + } + /** * query task instance by complete state From f936b07fdf1a145147e58eb2dcf392465764607c Mon Sep 17 00:00:00 2001 From: lenboo Date: Tue, 4 Aug 2020 16:49:30 +0800 Subject: [PATCH 4/4] fix bug: Fix master task dependency check bug --- .../server/master/runner/MasterExecThread.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 479d6560a6..6692598ce6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -667,10 +667,12 @@ public class MasterExecThread implements Runnable { List depNameList = taskNode.getDepList(); for(String depsNode : depNameList ){ if(!dag.containsNode(depsNode) - || forbiddenTaskList.containsKey(depsNode) - || skipTaskNodeList.containsKey(depsNode)){ + || forbiddenTaskList.containsKey(depsNode)){ continue; } + if(skipTaskNodeList.containsKey(depsNode)){ + return DependResult.FAILED; + } // all the dependencies must be completed if(!completeTaskList.containsKey(depsNode)){ return DependResult.WAITING;