From 1cac85f2effc928a853f5ab2f8d878d710837394 Mon Sep 17 00:00:00 2001 From: lenboo Date: Fri, 14 Jun 2019 18:31:16 +0800 Subject: [PATCH] update --- .../cn/escheduler/dao/utils/DagHelper.java | 18 +++++++++++------- .../server/master/runner/MasterExecThread.java | 5 +---- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/utils/DagHelper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/utils/DagHelper.java index d811cf78bc..bc509b1e99 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/utils/DagHelper.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/utils/DagHelper.java @@ -24,15 +24,13 @@ import cn.escheduler.common.model.TaskNodeRelation; import cn.escheduler.common.process.ProcessDag; import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.dao.model.ProcessData; +import cn.escheduler.dao.model.TaskInstance; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** @@ -240,9 +238,15 @@ public class DagHelper { * it would find the post node if the start vertex is forbidden running * @param parentNodeName the previous node * @param dag + * @param completeTaskList * @return */ - public static Collection getStartVertex(String parentNodeName, DAG dag){ + public static Collection getStartVertex(String parentNodeName, DAG dag, + Map completeTaskList){ + + if(completeTaskList == null){ + completeTaskList = new HashMap<>(); + } Collection startVertexs = null; if(StringUtils.isNotEmpty(parentNodeName)){ startVertexs = dag.getSubsequentNodes(parentNodeName); @@ -257,10 +261,10 @@ public class DagHelper { for(String start : startVertexs){ TaskNode startNode = dag.getNode(start); - if(!startNode.isForbidden()){ + if(!startNode.isForbidden() && !completeTaskList.containsKey(start)){ continue; } - Collection postNodes = getStartVertex(start, dag); + Collection postNodes = getStartVertex(start, dag, completeTaskList); for(String post : postNodes){ if(checkForbiddenPostCanSubmit(post, dag)){ diff --git a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java index cb38c7a722..9ce08d691a 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java @@ -426,15 +426,12 @@ public class MasterExecThread implements Runnable { private List getPostTaskInstanceByNode(DAG dag, String parentNodeName){ List postTaskList = new ArrayList<>(); - Collection startVertex = DagHelper.getStartVertex(parentNodeName, dag); + Collection startVertex = DagHelper.getStartVertex(parentNodeName, dag, completeTaskList); if(startVertex == null){ return postTaskList; } for (String nodeName : startVertex){ - if(completeTaskList.containsKey(nodeName)){ - continue; - } // encapsulation task instance TaskInstance taskInstance = createTaskInstance(processInstance, nodeName , dag.getNode(nodeName),parentNodeName);