Browse Source

[FIX-3929] condition task would post wrong tasks when failover. (#3999)

* fix #3966 sub process doesnot send alert mail after process instance ending.

* fix bug 3964: sub_process The timeout warning does not take effect
add timeout warning for sub_process/dependent task.

* fix code smell

* fix code smell

* fix code smell

* update worker group inherit from parent

* remove stdout in logback configuration

* fix bug #3929 condition task would post error when failover.

* remove unused test

* add comments

* add skip node judge

Co-authored-by: baoliang <baoliang@analysys.com.cn>
pull/3/MERGE
bao liang 4 years ago committed by GitHub
parent
commit
6caac0f366
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/resources/logback-api.xml
  2. 164
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  3. 226
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
  4. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
  5. 129
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  6. 1
      dolphinscheduler-server/src/main/resources/logback-master.xml
  7. 1
      dolphinscheduler-server/src/main/resources/logback-worker.xml
  8. 25
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java

2
dolphinscheduler-api/src/main/resources/logback-api.xml

@ -55,7 +55,7 @@
<root level="INFO">
<appender-ref ref="STDOUT"/>
<!--<appender-ref ref="STDOUT"/>-->
<appender-ref ref="APILOGFILE"/>
</root>

164
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
@ -238,78 +239,153 @@ public class DagHelper {
return null;
}
/**
* get start vertex in one dag
* it would find the post node if the start vertex is forbidden running
* @param parentNodeName previous node
* the task can be submit when all the depends nodes are forbidden or complete
* @param taskNode taskNode
* @param dag dag
* @param completeTaskList completeTaskList
* @return start Vertex list
* @return can submit
*/
public static Collection<String> getStartVertex(String parentNodeName, DAG<String, TaskNode, TaskNodeRelation> dag,
public static boolean allDependsForbiddenOrEnd(TaskNode taskNode,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskNode> skipTaskNodeList,
Map<String, TaskInstance> completeTaskList) {
if(completeTaskList == null){
completeTaskList = new HashMap<>();
List<String> dependList = taskNode.getDepList();
if (dependList == null) {
return true;
}
Collection<String> startVertexs = null;
if(StringUtils.isNotEmpty(parentNodeName)){
startVertexs = dag.getSubsequentNodes(parentNodeName);
for (String dependNodeName : dependList) {
TaskNode dependNode = dag.getNode(dependNodeName);
if (completeTaskList.containsKey(dependNodeName)
|| dependNode.isForbidden()
|| skipTaskNodeList.containsKey(dependNodeName)) {
continue;
} else {
startVertexs = dag.getBeginNode();
return false;
}
List<String> tmpStartVertexs = new ArrayList<>();
if(startVertexs!= null){
tmpStartVertexs.addAll(startVertexs);
}
return true;
}
for(String start : startVertexs){
TaskNode startNode = dag.getNode(start);
if(!startNode.isForbidden() && !completeTaskList.containsKey(start)){
// the start can be submit if not forbidden and not in complete tasks
/**
* parse the successor nodes of previous node.
* this function parse the condition node to find the right branch.
* also check all the depends nodes forbidden or complete
* @param preNodeName
* @return successor nodes
*/
public static Set<String> parsePostNodes(String preNodeName,
Map<String, TaskNode> skipTaskNodeList,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList) {
Set<String> postNodeList = new HashSet<>();
Collection<String> startVertexes = new ArrayList<>();
if (preNodeName == null) {
startVertexes = dag.getBeginNode();
} else if (dag.getNode(preNodeName).isConditionsTask()) {
List<String> conditionTaskList = parseConditionTask(preNodeName, skipTaskNodeList, dag, completeTaskList);
startVertexes.addAll(conditionTaskList);
} else {
startVertexes = dag.getSubsequentNodes(preNodeName);
}
for (String subsequent : startVertexes) {
TaskNode taskNode = dag.getNode(subsequent);
if (isTaskNodeNeedSkip(taskNode, skipTaskNodeList)) {
setTaskNodeSkip(subsequent, dag, completeTaskList, skipTaskNodeList );
continue;
}
// then submit the post nodes
Collection<String> postNodes = getStartVertex(start, dag, completeTaskList);
for(String post : postNodes){
TaskNode postNode = dag.getNode(post);
if(taskNodeCanSubmit(postNode, dag, completeTaskList)){
tmpStartVertexs.add(post);
if (!DagHelper.allDependsForbiddenOrEnd(taskNode, dag, skipTaskNodeList, completeTaskList)) {
continue;
}
if (taskNode.isForbidden() || completeTaskList.containsKey(subsequent)) {
postNodeList.addAll(parsePostNodes(subsequent, skipTaskNodeList, dag, completeTaskList));
continue;
}
tmpStartVertexs.remove(start);
postNodeList.add(subsequent);
}
return tmpStartVertexs;
return postNodeList;
}
/**
* the task can be submit when all the depends nodes are forbidden or complete
* @param taskNode taskNode
* @param dag dag
* @param completeTaskList completeTaskList
* @return can submit
* if all of the task dependence are skipped, skip it too.
* @param taskNode
* @return
*/
public static boolean taskNodeCanSubmit(TaskNode taskNode,
private static boolean isTaskNodeNeedSkip(TaskNode taskNode,
Map<String, TaskNode> skipTaskNodeList
){
if(CollectionUtils.isEmpty(taskNode.getDepList())){
return false;
}
for(String depNode : taskNode.getDepList()){
if(!skipTaskNodeList.containsKey(depNode)){
return false;
}
}
return true;
}
/**
* parse condition task find the branch process
* set skip flag for another one.
* @param nodeName
* @return
*/
public static List<String> parseConditionTask(String nodeName,
Map<String, TaskNode> skipTaskNodeList,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList){
List<String> dependList = taskNode.getDepList();
if(dependList == null){
return true;
List<String> conditionTaskList = new ArrayList<>();
TaskNode taskNode = dag.getNode(nodeName);
if (!taskNode.isConditionsTask()){
return conditionTaskList;
}
if (!completeTaskList.containsKey(nodeName)){
return conditionTaskList;
}
TaskInstance taskInstance = completeTaskList.get(nodeName);
ConditionsParameters conditionsParameters =
JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class);
List<String> skipNodeList = new ArrayList<>();
if(taskInstance.getState().typeIsSuccess()){
conditionTaskList = conditionsParameters.getSuccessNode();
skipNodeList = conditionsParameters.getFailedNode();
}else if(taskInstance.getState().typeIsFailure()){
conditionTaskList = conditionsParameters.getFailedNode();
skipNodeList = conditionsParameters.getSuccessNode();
}else{
conditionTaskList.add(nodeName);
}
for(String failedNode : skipNodeList){
setTaskNodeSkip(failedNode, dag, completeTaskList, skipTaskNodeList);
}
return conditionTaskList;
}
for(String dependNodeName : dependList){
TaskNode dependNode = dag.getNode(dependNodeName);
if(!dependNode.isForbidden() && !completeTaskList.containsKey(dependNodeName)){
return false;
/**
* set task node and the post nodes skip flag
* @param skipNodeName
* @param dag
* @param completeTaskList
* @param skipTaskNodeList
*/
private static void setTaskNodeSkip(String skipNodeName,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList,
Map<String, TaskNode> skipTaskNodeList){
skipTaskNodeList.putIfAbsent(skipNodeName, dag.getNode(skipNodeName));
Collection<String> postNodeList = dag.getSubsequentNodes(skipNodeName);
for(String post : postNodeList){
TaskNode postNode = dag.getNode(post);
if(isTaskNodeNeedSkip(postNode, skipTaskNodeList)){
setTaskNodeSkip(post, dag, completeTaskList, skipTaskNodeList);
}
}
return true;
}
/***
* build dag graph
* @param processDag processDag

226
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.dao.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
@ -27,6 +28,7 @@ import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.junit.Assert;
import org.junit.Test;
@ -34,6 +36,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* dag helper test
@ -41,15 +45,17 @@ import java.util.Map;
public class DagHelperTest {
/**
* test task node can submit
*
* @throws JsonProcessingException if error throws JsonProcessingException
*/
@Test
public void testTaskNodeCanSubmit() throws JsonProcessingException {
//1->2->3->5
//4->3
//1->2->3->5->7
//4->3->6
DAG<String, TaskNode, TaskNodeRelation> dag = generateDag();
TaskNode taskNode3 = dag.getNode("3");
Map<String, TaskInstance> completeTaskList = new HashMap<>();
Map<String, TaskNode> skipNodeList = new HashMap<>();
completeTaskList.putIfAbsent("1", new TaskInstance());
Boolean canSubmit = false;
@ -58,27 +64,199 @@ public class DagHelperTest {
node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
TaskNode nodex = dag.getNode("4");
nodex.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList);
canSubmit = DagHelper.allDependsForbiddenOrEnd(taskNode3, dag, skipNodeList, completeTaskList);
Assert.assertEquals(canSubmit, true);
// 2forbidden, 3 cannot be submit
completeTaskList.putIfAbsent("2", new TaskInstance());
TaskNode nodey = dag.getNode("4");
nodey.setRunFlag("");
canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList);
canSubmit = DagHelper.allDependsForbiddenOrEnd(taskNode3, dag, skipNodeList, completeTaskList);
Assert.assertEquals(canSubmit, false);
// 2/3 forbidden submit 5
TaskNode node3 = dag.getNode("3");
node3.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
TaskNode node8 = dag.getNode("8");
node8.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
TaskNode node5 = dag.getNode("5");
canSubmit = DagHelper.taskNodeCanSubmit(node5, dag, completeTaskList);
canSubmit = DagHelper.allDependsForbiddenOrEnd(node5, dag, skipNodeList, completeTaskList);
Assert.assertEquals(canSubmit, true);
}
/**
* 1->2->3->5
* 4->3
* test parse post node list
*/
@Test
public void testParsePostNodeList() throws JsonProcessingException {
DAG<String, TaskNode, TaskNodeRelation> dag = generateDag();
Map<String, TaskInstance> completeTaskList = new HashMap<>();
Map<String, TaskNode> skipNodeList = new HashMap<>();
Set<String> postNodes = null;
//complete : null
// expect post: 1/4
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assert.assertEquals(2, postNodes.size());
Assert.assertTrue(postNodes.contains("1"));
Assert.assertTrue(postNodes.contains("4"));
//complete : 1
// expect post: 2/4
completeTaskList.put("1", new TaskInstance());
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assert.assertEquals(2, postNodes.size());
Assert.assertTrue(postNodes.contains("2"));
Assert.assertTrue(postNodes.contains("4"));
// complete : 1/2
// expect post: 4
completeTaskList.put("2", new TaskInstance());
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assert.assertEquals(2, postNodes.size());
Assert.assertTrue(postNodes.contains("4"));
Assert.assertTrue(postNodes.contains("8"));
// complete : 1/2/4
// expect post: 3
completeTaskList.put("4", new TaskInstance());
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assert.assertEquals(2, postNodes.size());
Assert.assertTrue(postNodes.contains("3"));
Assert.assertTrue(postNodes.contains("8"));
// complete : 1/2/4/3
// expect post: 8/6
completeTaskList.put("3", new TaskInstance());
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assert.assertEquals(2, postNodes.size());
Assert.assertTrue(postNodes.contains("8"));
Assert.assertTrue(postNodes.contains("6"));
// complete : 1/2/4/3/8
// expect post: 6/5
completeTaskList.put("8", new TaskInstance());
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assert.assertEquals(2, postNodes.size());
Assert.assertTrue(postNodes.contains("5"));
Assert.assertTrue(postNodes.contains("6"));
// complete : 1/2/4/3/5/6/8
// expect post: 7
completeTaskList.put("6", new TaskInstance());
completeTaskList.put("5", new TaskInstance());
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assert.assertEquals(1, postNodes.size());
Assert.assertTrue(postNodes.contains("7"));
}
/**
* test forbidden post node
* @throws JsonProcessingException
*/
@Test
public void testForbiddenPostNode() throws JsonProcessingException {
DAG<String, TaskNode, TaskNodeRelation> dag = generateDag();
Map<String, TaskInstance> completeTaskList = new HashMap<>();
Map<String, TaskNode> skipNodeList = new HashMap<>();
Set<String> postNodes = null;
// dag: 1-2-3-5-7 4-3-6 2-8-5-7
// forbid:2 complete:1 post:4/8
completeTaskList.put("1", new TaskInstance());
TaskNode node2 = dag.getNode("2");
node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assert.assertEquals(2, postNodes.size());
Assert.assertTrue(postNodes.contains("4"));
Assert.assertTrue(postNodes.contains("8"));
//forbid:2/4 complete:1 post:3/8
TaskNode node4 = dag.getNode("4");
node4.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assert.assertEquals(2, postNodes.size());
Assert.assertTrue(postNodes.contains("3"));
Assert.assertTrue(postNodes.contains("8"));
//forbid:2/4/5 complete:1/8 post:3
completeTaskList.put("8", new TaskInstance());
TaskNode node5 = dag.getNode("5");
node5.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assert.assertEquals(1, postNodes.size());
Assert.assertTrue(postNodes.contains("3"));
}
/**
* test condition post node
* @throws JsonProcessingException
*/
@Test
public void testConditionPostNode() throws JsonProcessingException {
DAG<String, TaskNode, TaskNodeRelation> dag = generateDag();
Map<String, TaskInstance> completeTaskList = new HashMap<>();
Map<String, TaskNode> skipNodeList = new HashMap<>();
Set<String> postNodes = null;
// dag: 1-2-3-5-7 4-3-6 2-8-5-7
// 3-if
completeTaskList.put("1", new TaskInstance());
completeTaskList.put("2", new TaskInstance());
completeTaskList.put("4", new TaskInstance());
TaskNode node3 = dag.getNode("3");
node3.setType("CONDITIONS");
node3.setConditionResult("{\n" +
" \"successNode\": [5\n" +
" ],\n" +
" \"failedNode\": [6\n" +
" ]\n" +
" }");
completeTaskList.remove("3");
TaskInstance taskInstance = new TaskInstance();
taskInstance.setState(ExecutionStatus.SUCCESS);
//complete 1/2/3/4 expect:8
completeTaskList.put("3", taskInstance);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assert.assertEquals(1, postNodes.size());
Assert.assertTrue(postNodes.contains("8"));
//2.complete 1/2/3/4/8 expect:5 skip:6
completeTaskList.put("8", new TaskInstance());
postNodes = DagHelper.parsePostNodes(null ,skipNodeList, dag, completeTaskList);
Assert.assertTrue(postNodes.contains("5"));
Assert.assertEquals(1, skipNodeList.size());
Assert.assertTrue(skipNodeList.containsKey("6"));
// 3.complete 1/2/3/4/5/8 expect post:7 skip:6
skipNodeList.clear();
TaskInstance taskInstance1 = new TaskInstance();
taskInstance.setState(ExecutionStatus.SUCCESS);
completeTaskList.put("5", taskInstance1);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assert.assertEquals(1, postNodes.size());
Assert.assertTrue(postNodes.contains("7"));
Assert.assertEquals(1, skipNodeList.size());
Assert.assertTrue(skipNodeList.containsKey("6"));
// dag: 1-2-3-5-7 4-3-6
// 3-if , complete:1/2/3/4
// 1.failure:3 expect post:6 skip:5/7
skipNodeList.clear();
completeTaskList.remove("3");
taskInstance = new TaskInstance();
taskInstance.setState(ExecutionStatus.FAILURE);
completeTaskList.put("3", taskInstance);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assert.assertEquals(1, postNodes.size());
Assert.assertTrue(postNodes.contains("6"));
Assert.assertEquals(2, skipNodeList.size());
Assert.assertTrue(skipNodeList.containsKey("5"));
Assert.assertTrue(skipNodeList.containsKey("7"));
}
/**
* 1->2->3->5->7
* 4->3->6
* 2->8->5->7
*
* @return dag
* @throws JsonProcessingException if error throws JsonProcessingException
*/
@ -87,11 +265,13 @@ public class DagHelperTest {
TaskNode node1 = new TaskNode();
node1.setId("1");
node1.setName("1");
node1.setType("SHELL");
taskNodeList.add(node1);
TaskNode node2 = new TaskNode();
node2.setId("2");
node2.setName("2");
node2.setType("SHELL");
List<String> dep2 = new ArrayList<>();
dep2.add("1");
node2.setDepList(dep2);
@ -101,11 +281,13 @@ public class DagHelperTest {
TaskNode node4 = new TaskNode();
node4.setId("4");
node4.setName("4");
node4.setType("SHELL");
taskNodeList.add(node4);
TaskNode node3 = new TaskNode();
node3.setId("3");
node3.setName("3");
node3.setType("SHELL");
List<String> dep3 = new ArrayList<>();
dep3.add("2");
dep3.add("4");
@ -115,11 +297,40 @@ public class DagHelperTest {
TaskNode node5 = new TaskNode();
node5.setId("5");
node5.setName("5");
node5.setType("SHELL");
List<String> dep5 = new ArrayList<>();
dep5.add("3");
dep5.add("8");
node5.setDepList(dep5);
taskNodeList.add(node5);
TaskNode node6 = new TaskNode();
node6.setId("6");
node6.setName("6");
node6.setType("SHELL");
List<String> dep6 = new ArrayList<>();
dep6.add("3");
node6.setDepList(dep6);
taskNodeList.add(node6);
TaskNode node7 = new TaskNode();
node7.setId("7");
node7.setName("7");
node7.setType("SHELL");
List<String> dep7 = new ArrayList<>();
dep7.add("5");
node7.setDepList(dep7);
taskNodeList.add(node7);
TaskNode node8 = new TaskNode();
node8.setId("8");
node8.setName("8");
node8.setType("SHELL");
List<String> dep8 = new ArrayList<>();
dep8.add("2");
node8.setDepList(dep8);
taskNodeList.add(node8);
List<String> startNodes = new ArrayList<>();
List<String> recoveryNodes = new ArrayList<>();
List<TaskNode> destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList,
@ -128,7 +339,6 @@ public class DagHelperTest {
ProcessDag processDag = new ProcessDag();
processDag.setEdges(taskNodeRelations);
processDag.setNodes(destTaskNodeList);
return DagHelper.buildDagGraph(processDag);
}

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java

@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentHashMap;
public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
/**
* dependent parameters
*/
@ -131,7 +130,6 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
this.dependentParameters = JSONUtils.parseObject(this.taskInstance.getDependency(), DependentParameters.class);
}
/**
* depend result for depend item
* @param item
@ -155,5 +153,4 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
return dependResult;
}
}

129
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -25,7 +25,6 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*;
@ -39,6 +38,7 @@ import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -497,144 +497,22 @@ public class MasterExecThread implements Runnable {
return taskInstance;
}
/**
* if all of the task dependence are skip, skip it too.
* @param taskNode
* @return
*/
private boolean isTaskNodeNeedSkip(TaskNode taskNode){
if(CollectionUtils.isEmpty(taskNode.getDepList())){
return false;
}
for(String depNode : taskNode.getDepList()){
if(!skipTaskNodeList.containsKey(depNode)){
return false;
}
}
return true;
}
/**
* set task node skip if dependence all skip
* @param taskNodesSkipList
*/
private void setTaskNodeSkip(List<String> taskNodesSkipList){
for(String skipNode : taskNodesSkipList){
skipTaskNodeList.putIfAbsent(skipNode, dag.getNode(skipNode));
Collection<String> postNodeList = DagHelper.getStartVertex(skipNode, dag, completeTaskList);
List<String> postSkipList = new ArrayList<>();
for(String post : postNodeList){
TaskNode postNode = dag.getNode(post);
if(isTaskNodeNeedSkip(postNode)){
postSkipList.add(post);
}
}
setTaskNodeSkip(postSkipList);
}
}
/**
* parse condition task find the branch process
* set skip flag for another one.
* @param nodeName
* @return
*/
private List<String> parseConditionTask(String nodeName){
List<String> conditionTaskList = new ArrayList<>();
TaskNode taskNode = dag.getNode(nodeName);
if(!taskNode.isConditionsTask()){
return conditionTaskList;
}
ConditionsParameters conditionsParameters =
JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class);
TaskInstance taskInstance = completeTaskList.get(nodeName);
if(taskInstance == null){
logger.error("task instance {} cannot find, please check it!", nodeName);
return conditionTaskList;
}
if(taskInstance.getState().typeIsSuccess()){
conditionTaskList = conditionsParameters.getSuccessNode();
setTaskNodeSkip(conditionsParameters.getFailedNode());
}else if(taskInstance.getState().typeIsFailure()){
conditionTaskList = conditionsParameters.getFailedNode();
setTaskNodeSkip(conditionsParameters.getSuccessNode());
}else{
conditionTaskList.add(nodeName);
}
return conditionTaskList;
}
/**
* parse post node list of previous node
* if condition node: return process according to the settings
* if post node completed, return post nodes of the completed node
* @param previousNodeName
* @return
*/
private List<String> parsePostNodeList(String previousNodeName){
List<String> postNodeList = new ArrayList<>();
TaskNode taskNode = dag.getNode(previousNodeName);
if(taskNode != null && taskNode.isConditionsTask()){
return parseConditionTask(previousNodeName);
}
Collection<String> postNodeCollection = DagHelper.getStartVertex(previousNodeName, dag, completeTaskList);
List<String> postSkipList = new ArrayList<>();
// delete success node, parse the past nodes
// if conditions node,
// 1. parse the branch process according the conditions setting
// 2. set skip flag on anther branch process
for(String postNode : postNodeCollection){
if(completeTaskList.containsKey(postNode)){
TaskInstance postTaskInstance = completeTaskList.get(postNode);
if(dag.getNode(postNode).isConditionsTask()){
List<String> conditionTaskNodeList = parseConditionTask(postNode);
for(String conditions : conditionTaskNodeList){
postNodeList.addAll(parsePostNodeList(conditions));
}
}else if(postTaskInstance.getState().typeIsSuccess()){
postNodeList.addAll(parsePostNodeList(postNode));
}else{
postNodeList.add(postNode);
}
}else if(isTaskNodeNeedSkip(dag.getNode(postNode))){
postSkipList.add(postNode);
setTaskNodeSkip(postSkipList);
postSkipList.clear();
}else{
postNodeList.add(postNode);
}
}
return postNodeList;
}
/**
* submit post node
* @param parentNodeName parent node name
*/
private void submitPostNode(String parentNodeName){
List<String> submitTaskNodeList = parsePostNodeList(parentNodeName);
Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);
List<TaskInstance> taskInstances = new ArrayList<>();
for(String taskNode : submitTaskNodeList){
taskInstances.add(createTaskInstance(processInstance, taskNode,
dag.getNode(taskNode)));
}
// if previous node success , post node submit
for(TaskInstance task : taskInstances){
if(readyToSubmitTaskList.containsKey(task.getName())){
continue;
}
if(completeTaskList.containsKey(task.getName())){
logger.info("task {} has already run success", task.getName());
continue;
@ -695,7 +573,7 @@ public class MasterExecThread implements Runnable {
private boolean dependTaskSuccess(String dependNodeName, String nextNodeName){
if(dag.getNode(dependNodeName).isConditionsTask()){
//condition task need check the branch to run
List<String> nextTaskList = parseConditionTask(dependNodeName);
List<String> nextTaskList = DagHelper.parseConditionTask(dependNodeName, skipTaskNodeList, dag, completeTaskList);
if(!nextTaskList.contains(nextNodeName)){
return false;
}
@ -708,7 +586,6 @@ public class MasterExecThread implements Runnable {
return true;
}
/**
* query task instance by complete state
* @param state state

1
dolphinscheduler-server/src/main/resources/logback-master.xml

@ -74,7 +74,6 @@
<!-- master server logback config end -->
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="TASKLOGFILE"/>
<appender-ref ref="MASTERLOGFILE"/>
</root>

1
dolphinscheduler-server/src/main/resources/logback-worker.xml

@ -75,7 +75,6 @@
<!-- worker server logback config end -->
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="TASKLOGFILE"/>
<appender-ref ref="WORKERLOGFILE"/>
</root>

25
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java

@ -111,29 +111,4 @@ public class MasterCommandTest {
}
@Test
public void testDagHelper(){
ProcessDefinition processDefinition = processDefinitionMapper.selectById(19);
try {
ProcessDag processDag = DagHelper.generateFlowDag(processDefinition.getProcessDefinitionJson(),
new ArrayList<>(), new ArrayList<>(), TaskDependType.TASK_POST);
DAG<String,TaskNode,TaskNodeRelation> dag = DagHelper.buildDagGraph(processDag);
Collection<String> start = DagHelper.getStartVertex("1", dag, null);
System.out.println(start.toString());
Map<String, TaskNode> forbidden = DagHelper.getForbiddenTaskNodeMaps(processDefinition.getProcessDefinitionJson());
System.out.println(forbidden);
} catch (Exception e) {
e.printStackTrace();
}
}
}

Loading…
Cancel
Save