From ed78aea2f8a473eac747993c1a9b4fcfa537a6e9 Mon Sep 17 00:00:00 2001 From: lenboo Date: Sat, 1 Aug 2020 16:48:23 +0800 Subject: [PATCH] fix bug: there exists some problems in checking task dependency --- .../master/runner/MasterExecThread.java | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index c0bc3aaea8..14b9a3aea6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner; import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; +import net.sf.jsqlparser.expression.NextValExpression; import org.apache.commons.io.FileUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; @@ -663,7 +664,6 @@ public class MasterExecThread implements Runnable { if(startNodes.contains(taskName)){ return DependResult.SUCCESS; } - TaskNode taskNode = dag.getNode(taskName); List depNameList = taskNode.getDepList(); for(String depsNode : depNameList ){ @@ -672,25 +672,30 @@ public class MasterExecThread implements Runnable { || skipTaskNodeList.containsKey(depsNode)){ continue; } - // dependencies must be fully completed + // all the dependencies must be completed if(!completeTaskList.containsKey(depsNode)){ return DependResult.WAITING; } + // depend node has already complete. ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); - // conditions task would not return failed. - if(depTaskState.typeIsFailure() - && !DagHelper.haveConditionsAfterNode(depsNode, dag ) - && !dag.getNode(depsNode).isConditionsTask()){ - return DependResult.FAILED; - } - if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){ return DependResult.WAITING; } + // ignore task state if current task is condition + if(taskNode.isConditionsTask()){ + continue; + } + if(dag.getNode(depsNode).isConditionsTask()){ + //condition task need check the branch to run + List nextTaskList = parseConditionTask(depsNode); + if(!nextTaskList.contains(taskName)){ + return DependResult.FAILED; + } + }else if(depTaskState.typeIsFailure()){ + return DependResult.FAILED; + } } - logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray())); - return DependResult.SUCCESS; }