Browse Source

update

pull/2/head
lenboo 5 years ago
parent
commit
1cac85f2ef
  1. 18
      escheduler-dao/src/main/java/cn/escheduler/dao/utils/DagHelper.java
  2. 5
      escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java

18
escheduler-dao/src/main/java/cn/escheduler/dao/utils/DagHelper.java

@ -24,15 +24,13 @@ import cn.escheduler.common.model.TaskNodeRelation;
import cn.escheduler.common.process.ProcessDag;
import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.dao.model.ProcessData;
import cn.escheduler.dao.model.TaskInstance;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
@ -240,9 +238,15 @@ public class DagHelper {
* it would find the post node if the start vertex is forbidden running
* @param parentNodeName the previous node
* @param dag
* @param completeTaskList
* @return
*/
public static Collection<String> getStartVertex(String parentNodeName, DAG<String, TaskNode, TaskNodeRelation> dag){
public static Collection<String> getStartVertex(String parentNodeName, DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList){
if(completeTaskList == null){
completeTaskList = new HashMap<>();
}
Collection<String> startVertexs = null;
if(StringUtils.isNotEmpty(parentNodeName)){
startVertexs = dag.getSubsequentNodes(parentNodeName);
@ -257,10 +261,10 @@ public class DagHelper {
for(String start : startVertexs){
TaskNode startNode = dag.getNode(start);
if(!startNode.isForbidden()){
if(!startNode.isForbidden() && !completeTaskList.containsKey(start)){
continue;
}
Collection<String> postNodes = getStartVertex(start, dag);
Collection<String> postNodes = getStartVertex(start, dag, completeTaskList);
for(String post : postNodes){
if(checkForbiddenPostCanSubmit(post, dag)){

5
escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java

@ -426,15 +426,12 @@ public class MasterExecThread implements Runnable {
private List<TaskInstance> getPostTaskInstanceByNode(DAG<String, TaskNode, TaskNodeRelation> dag, String parentNodeName){
List<TaskInstance> postTaskList = new ArrayList<>();
Collection<String> startVertex = DagHelper.getStartVertex(parentNodeName, dag);
Collection<String> startVertex = DagHelper.getStartVertex(parentNodeName, dag, completeTaskList);
if(startVertex == null){
return postTaskList;
}
for (String nodeName : startVertex){
if(completeTaskList.containsKey(nodeName)){
continue;
}
// encapsulation task instance
TaskInstance taskInstance = createTaskInstance(processInstance, nodeName ,
dag.getNode(nodeName),parentNodeName);

Loading…
Cancel
Save