Browse Source

Rename Node and Edge

dev_wenjun_refactorMaster
Wenjun Ruan 6 months ago
parent
commit
aa9a8b9431
  1. 46
      dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/DAG.java
  2. 92
      dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/DAGNode.java
  3. 22
      dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/Edge.java
  4. 47
      dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/Node.java
  5. 36
      dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/NodeContext.java
  6. 19
      dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/NodeIdentify.java
  7. 78
      dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/WorkflowDAG.java
  8. 8
      dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/WorkflowDAGBuilder.java
  9. 8
      dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/engine/DAGEngine.java
  10. 9
      dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/IChain.java
  11. 2
      dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/IWorkflowExecutionDAG.java
  12. 19
      dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/IWorkflowExecutionRunnableDelegate.java
  13. 22
      dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/WorkflowExecutionDAG.java
  14. 20
      dolphinscheduler-workflow-engine/src/test/java/org/apache/dolphinscheduler/workflow/engine/dag/NodeTest.java

46
dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/DAG.java

@ -45,13 +45,13 @@ public interface DAG {
* <li> The post node of 3 is empty.
* <li> The post node of null is 1,4,6.
*
* @param dagNode the node of the DAG, can be null.
* @param node the node of the DAG, can be null.
* @return post node list, sort by priority.
*/
List<DAGNode> getDirectPostNodes(DAGNode dagNode);
List<Node> getDirectPostNodes(Node node);
/**
* Same with {@link #getDirectPostNodes(DAGNode)}.
* Same with {@link #getDirectPostNodes(Node)}.
* <p>
* If the dagNodeName is null, return the nodes which doesn't have inDegrees. Otherwise, return the post nodes of
* the given dagNodeName. If the dagNodeName is not null and cannot find the node in DAG, throw IllegalArgumentException.
@ -60,13 +60,7 @@ public interface DAG {
* @return post task name list, sort by priority.
* @throws IllegalArgumentException if the dagNodeName is not null and cannot find the node in DAG.
*/
default List<DAGNode> getDirectPostNodes(String dagNodeName) {
DAGNode dagNode = getDAGNode(dagNodeName);
if (dagNodeName != null && dagNode == null) {
throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG");
}
return getDirectPostNodes(dagNode);
}
List<Node> getDirectPostNodes(String dagNodeName);
/**
* Same with {@link #getDirectPostNodes(String)}. Return the post node names.
@ -75,13 +69,7 @@ public interface DAG {
* @return post task name list, sort by priority.
* @throws IllegalArgumentException if the dagNodeName is not null and cannot find the node in DAG.
*/
default List<String> getDirectPostNodeNames(String dagNodeName) {
DAGNode dagNode = getDAGNode(dagNodeName);
if (dagNodeName != null && dagNode == null) {
throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG");
}
return getDirectPostNodes(dagNode).stream().map(DAGNode::getNodeName).collect(Collectors.toList());
}
List<String> getDirectPostNodeNames(String dagNodeName);
/**
* Get the direct pre node of given dagNode, if the dagNode is null, return the nodes which doesn't have outDegrees.
@ -97,13 +85,13 @@ public interface DAG {
* <li> The pre node of 3 is 2.
* <li> The pre node of null is 3,5,6.
*
* @param dagNode the node of the DAG, can be null.
* @param node the node of the DAG, can be null.
* @return pre node list, sort by priority.
*/
List<DAGNode> getDirectPreNodes(DAGNode dagNode);
List<Node> getDirectPreNodes(Node node);
/**
* Same with {@link #getDirectPreNodes(DAGNode)}.
* Same with {@link #getDirectPreNodes(Node)}.
* <p>
* If the dagNodeName is null, return the nodes which doesn't have outDegrees. Otherwise, return the pre nodes of
* the given dagNodeName. If the dagNodeName is not null and cannot find the node in DAG, throw IllegalArgumentException.
@ -112,13 +100,7 @@ public interface DAG {
* @return pre task name list, sort by priority.
* @throws IllegalArgumentException if the dagNodeName is not null and cannot find the node in DAG.
*/
default List<DAGNode> getDirectPreNodes(String dagNodeName) {
DAGNode dagNode = getDAGNode(dagNodeName);
if (dagNodeName != null && dagNode == null) {
throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG");
}
return getDirectPreNodes(dagNode);
}
List<Node> getDirectPreNodes(String dagNodeName);
/**
* Same with {@link #getDirectPreNodes(String)}. Return the pre node names.
@ -127,13 +109,7 @@ public interface DAG {
* @return pre task name list, sort by priority.
* @throws IllegalArgumentException if the dagNodeName is not null and cannot find the node in DAG.
*/
default List<String> getDirectPreNodeNames(String dagNodeName) {
DAGNode dagNode = getDAGNode(dagNodeName);
if (dagNodeName != null && dagNode == null) {
throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG");
}
return getDirectPreNodes(dagNode).stream().map(DAGNode::getNodeName).collect(Collectors.toList());
}
List<String> getDirectPreNodeNames(String dagNodeName);
/**
* Get the node of the DAG by the node name.
@ -141,6 +117,6 @@ public interface DAG {
* @param nodeName the node name.
* @return the node of the DAG, return null if cannot find the node.
*/
DAGNode getDAGNode(String nodeName);
Node getDAGNode(String nodeName);
}

92
dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/DAGNode.java

@ -1,92 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.dag;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
/**
* The node of the DAG.
* <p>
* The node contains the node name, the content of the node, the inDegrees and the outDegrees.
* The inDegrees is the edge from other nodes to the current node, the outDegrees is the edge from the current
* node to other nodes.
*/
@Getter
@Builder
@NoArgsConstructor
public class DAGNode implements IDAGNode {
private String nodeName;
/**
* whether the node is skipped, default is false, which means the node is not skipped.
* If the node is skipped, the node will not be executed.
*/
@Builder.Default
private boolean skip = false;
private List<DAGEdge> inDegrees;
private List<DAGEdge> outDegrees;
public DAGNode(String nodeName,
List<DAGEdge> inDegrees,
List<DAGEdge> outDegrees) {
this(nodeName, false, inDegrees, outDegrees);
}
public DAGNode(String nodeName,
boolean skip,
List<DAGEdge> inDegrees,
List<DAGEdge> outDegrees) {
if (StringUtils.isEmpty(nodeName)) {
throw new IllegalArgumentException("nodeName cannot be empty");
}
if (CollectionUtils.isNotEmpty(inDegrees)) {
inDegrees.forEach(dagEdge -> {
if (!nodeName.equals(dagEdge.getToNodeName())) {
throw new IllegalArgumentException(
"The toNodeName of inDegree should be the nodeName of the node: "
+ nodeName + ", inDegree: " + dagEdge);
}
});
}
if (CollectionUtils.isNotEmpty(outDegrees)) {
outDegrees.forEach(dagEdge -> {
if (!nodeName.equals(dagEdge.getFromNodeName())) {
throw new IllegalArgumentException(
"The fromNodeName of outDegree should be the nodeName of the node: "
+ nodeName + ", outDegree: " + dagEdge);
}
});
}
this.nodeName = nodeName;
this.inDegrees = inDegrees;
this.outDegrees = outDegrees;
this.skip = skip;
}
}

22
dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/IDAGNodeAction.java → dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/Edge.java

@ -17,19 +17,19 @@
package org.apache.dolphinscheduler.workflow.engine.dag;
/**
* The IDAGNodeAction represent the action of a DAG node.
*/
public interface IDAGNodeAction {
void run();
void kill();
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
void pause();
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Edge {
void success();
private NodeIdentify from;
void failure();
private NodeIdentify to;
}

47
dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/Node.java

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.dag;
import lombok.AllArgsConstructor;
import lombok.Builder;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
import lombok.Getter;
import lombok.NoArgsConstructor;
/**
* The node of the DAG.
* <p>
* The node contains the node name, the content of the node, the inDegrees and the outDegrees.
* The inDegrees is the edge from other nodes to the current node, the outDegrees is the edge from the current
* node to other nodes.
*/
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Node {
private NodeIdentify nodeIdentify;
private NodeContext nodeContext;
}

36
dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/NodeContext.java

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.dag;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class NodeContext {
/**
* whether the node is skipped, default is false, which means the node is not skipped.
* If the node is skipped, the node will not be executed.
*/
private boolean skip = false;
}

19
dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/IDAGNode.java → dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/NodeIdentify.java

@ -17,16 +17,19 @@
package org.apache.dolphinscheduler.workflow.engine.dag;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
public interface IDAGNode {
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class NodeIdentify {
String getNodeName();
private Long id;
boolean isSkip();
List<DAGEdge> getInDegrees();
List<DAGEdge> getOutDegrees();
private String name;
}

78
dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/WorkflowDAG.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.workflow.engine.dag;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@ -29,46 +30,91 @@ import java.util.stream.Collectors;
*/
public class WorkflowDAG implements DAG {
private final Map<String, DAGNode> dagNodeMap;
private final Map<NodeIdentify, Node> dagNodeMap;
public WorkflowDAG(List<DAGNode> dagNodes) {
this.dagNodeMap = dagNodes.stream().collect(Collectors.toMap(DAGNode::getNodeName, Function.identity()));
private final Map<NodeIdentify, List<Node>> outdegreeMap;
private final Map<NodeIdentify, List<Node>> inDegredMap;
public WorkflowDAG(List<Node> nodes, List<Edge> edges) {
this.dagNodeMap = nodes.stream().collect(Collectors.toMap(Node::getNodeIdentify, Function.identity()));
this.outdegreeMap = new HashMap<>();
this.inDegredMap = new HashMap<>();
// todo:
}
@Override
public List<DAGNode> getDirectPostNodes(DAGNode dagNode) {
final String nodeName = dagNode.getNodeName();
if (!dagNodeMap.containsKey(nodeName)) {
public List<Node> getDirectPostNodes(Node dagNode) {
NodeIdentify nodeIdentify = dagNode.getNodeIdentify();
if (!dagNodeMap.containsKey(nodeIdentify)) {
return Collections.emptyList();
}
DAGNode node = dagNodeMap.get(nodeName);
List<DAGNode> dagNodes = new ArrayList<>();
Node node = dagNodeMap.get(nodeIdentify);
List<Node> nodes = new ArrayList<>();
for (DAGEdge edge : node.getOutDegrees()) {
if (dagNodeMap.containsKey(edge.getToNodeName())) {
dagNodes.add(dagNodeMap.get(edge.getToNodeName()));
nodes.add(dagNodeMap.get(edge.getToNodeName()));
}
}
return dagNodes;
return nodes;
}
@Override
public List<Node> getDirectPostNodes(String dagNodeName) {
Node node = getDAGNode(dagNodeName);
if (dagNodeName != null && node == null) {
throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG");
}
return getDirectPostNodes(node);
}
@Override
public List<String> getDirectPostNodeNames(String dagNodeName) {
Node node = getDAGNode(dagNodeName);
if (dagNodeName != null && node == null) {
throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG");
}
return getDirectPostNodes(node).stream()
.map(Node::getNodeName)
.collect(Collectors.toList());
}
@Override
public List<DAGNode> getDirectPreNodes(DAGNode dagNode) {
public List<Node> getDirectPreNodes(Node dagNode) {
final String nodeName = dagNode.getNodeName();
if (!dagNodeMap.containsKey(nodeName)) {
return Collections.emptyList();
}
DAGNode node = dagNodeMap.get(nodeName);
List<DAGNode> dagNodes = new ArrayList<>();
Node node = dagNodeMap.get(nodeName);
List<Node> nodes = new ArrayList<>();
for (DAGEdge edge : node.getInDegrees()) {
if (dagNodeMap.containsKey(edge.getFromNodeName())) {
dagNodes.add(dagNodeMap.get(edge.getFromNodeName()));
nodes.add(dagNodeMap.get(edge.getFromNodeName()));
}
}
return dagNodes;
return nodes;
}
@Override
public List<Node> getDirectPreNodes(String dagNodeName) {
Node node = getDAGNode(dagNodeName);
if (dagNodeName != null && node == null) {
throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG");
}
return getDirectPreNodes(node);
}
@Override
public List<String> getDirectPreNodeNames(String dagNodeName) {
Node node = getDAGNode(dagNodeName);
if (dagNodeName != null && node == null) {
throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG");
}
return getDirectPreNodes(node).stream().map(Node::getNodeName).collect(Collectors.toList());
}
@Override
public DAGNode getDAGNode(String nodeName) {
public Node getDAGNode(String nodeName) {
return dagNodeMap.get(nodeName);
}

8
dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/WorkflowDAGBuilder.java

@ -41,7 +41,7 @@ import java.util.Map;
*/
public class WorkflowDAGBuilder {
private final Map<String, DAGNode> taskNameMap;
private final Map<String, Node> taskNameMap;
private WorkflowDAGBuilder() {
this.taskNameMap = new HashMap<>();
@ -62,7 +62,7 @@ public class WorkflowDAGBuilder {
throw new IllegalArgumentException("TaskNode with name " + nodeName + " already exists");
}
DAGNode taskNode = DAGNode.builder()
Node taskNode = Node.builder()
.nodeName(nodeName)
.inDegrees(new ArrayList<>())
.outDegrees(new ArrayList<>())
@ -82,7 +82,7 @@ public class WorkflowDAGBuilder {
String toNodeName = dagEdge.getToNodeName();
if (taskNameMap.containsKey(fromNodeName)) {
DAGNode fromTask = taskNameMap.get(fromNodeName);
Node fromTask = taskNameMap.get(fromNodeName);
if (fromTask.getOutDegrees().contains(dagEdge)) {
throw new IllegalArgumentException(
"Edge from " + fromNodeName + " to " + toNodeName + " already exists");
@ -90,7 +90,7 @@ public class WorkflowDAGBuilder {
fromTask.getOutDegrees().add(dagEdge);
}
if (taskNameMap.containsKey(toNodeName)) {
DAGNode toTask = taskNameMap.get(toNodeName);
Node toTask = taskNameMap.get(toNodeName);
if (toTask.getInDegrees().contains(dagEdge)) {
throw new IllegalArgumentException(
"Edge from " + fromNodeName + " to " + toNodeName + " already exists");

8
dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/engine/DAGEngine.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.workflow.engine.engine;
import org.apache.dolphinscheduler.workflow.engine.dag.DAGNode;
import org.apache.dolphinscheduler.workflow.engine.dag.Node;
import org.apache.dolphinscheduler.workflow.engine.event.TaskOperationEvent;
import org.apache.dolphinscheduler.workflow.engine.event.TaskOperationType;
import org.apache.dolphinscheduler.workflow.engine.workflow.ITaskExecutionRunnable;
@ -50,8 +50,8 @@ public class DAGEngine implements IDAGEngine {
@Override
public void triggerTask(String taskName) {
IWorkflowExecutionDAG workflowExecutionDAG = workflowExecutionContext.getWorkflowExecutionDAG();
DAGNode dagNode = workflowExecutionDAG.getDAGNode(taskName);
if (dagNode == null) {
Node node = workflowExecutionDAG.getDAGNode(taskName);
if (node == null) {
log.error("Cannot find the DAGNode for task: {}", taskName);
return;
}
@ -63,7 +63,7 @@ public class DAGEngine implements IDAGEngine {
return;
}
if (dagNode.isSkip()) {
if (node.isSkip()) {
log.info("The task: {} is skipped", taskName);
triggerNextTasks(taskName);
return;

9
dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/engine/IEventDispatcher.java → dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/IChain.java

@ -15,12 +15,5 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.engine;
import org.apache.dolphinscheduler.workflow.engine.event.IEvent;
public interface IEventDispatcher {
void dispatch(IEvent event);
package org.apache.dolphinscheduler.workflow.engine.workflow;public interface IChain {
}

2
dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/IWorkflowExecutionDAG.java

@ -24,7 +24,7 @@ import java.util.List;
/**
* The WorkflowExecutionDAG represent the running workflow DAG.
*/
public interface IWorkflowExecutionDAG extends DAG {
public interface IWorkflowExecutionDAG {
List<String> getStartNodeNames();

19
dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/IWorkflowExecutionRunnableDelegate.java

@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;public interface IWorkflowExecutionRunnableDelegate {
}

22
dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/WorkflowExecutionDAG.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.dag.DAGNode;
import org.apache.dolphinscheduler.workflow.engine.dag.Node;
import org.apache.dolphinscheduler.workflow.engine.dag.WorkflowDAG;
import java.util.ArrayList;
@ -80,13 +80,13 @@ public class WorkflowExecutionDAG implements IWorkflowExecutionDAG {
@Override
public boolean isTaskAbleToBeTriggered(String taskNodeName) {
// todo: Check whether the workflow instance is finished or ready to finish.
List<DAGNode> directPreNodes = getDirectPreNodes(taskNodeName);
List<Node> directPreNodes = getDirectPreNodes(taskNodeName);
if (log.isDebugEnabled()) {
log.debug("Begin to check whether the task {} is able to be triggered.", taskNodeName);
log.debug("Task {} directly dependent on the task: {}.", taskNodeName,
directPreNodes.stream().map(DAGNode::getNodeName).collect(Collectors.toList()));
directPreNodes.stream().map(Node::getNodeName).collect(Collectors.toList()));
}
for (DAGNode directPreNode : directPreNodes) {
for (Node directPreNode : directPreNodes) {
if (directPreNode.isSkip()) {
log.debug("The task {} is skipped.", directPreNode.getNodeName());
continue;
@ -105,19 +105,5 @@ public class WorkflowExecutionDAG implements IWorkflowExecutionDAG {
taskExecutionRunnableRepository.storeTaskExecutionRunnable(taskExecutionRunnable);
}
@Override
public List<DAGNode> getDirectPostNodes(DAGNode dagNode) {
return workflowDAG.getDirectPostNodes(dagNode);
}
@Override
public List<DAGNode> getDirectPreNodes(DAGNode dagNode) {
return workflowDAG.getDirectPreNodes(dagNode);
}
@Override
public DAGNode getDAGNode(String nodeName) {
return workflowDAG.getDAGNode(nodeName);
}
}

20
dolphinscheduler-workflow-engine/src/test/java/org/apache/dolphinscheduler/workflow/engine/dag/DAGNodeTest.java → dolphinscheduler-workflow-engine/src/test/java/org/apache/dolphinscheduler/workflow/engine/dag/NodeTest.java

@ -12,12 +12,12 @@ import org.junit.jupiter.api.Test;
import com.google.common.collect.Lists;
class DAGNodeTest {
class NodeTest {
@Test
void buildDAGNode_EmptyNodeName() {
IllegalArgumentException illegalArgumentException = assertThrows(IllegalArgumentException.class,
() -> DAGNode.builder()
() -> Node.builder()
.inDegrees(new ArrayList<>())
.outDegrees(new ArrayList<>())
.build());
@ -27,7 +27,7 @@ class DAGNodeTest {
@Test
void buildDAGNode_BadInDegree() {
IllegalArgumentException illegalArgumentException = assertThrows(IllegalArgumentException.class,
() -> DAGNode.builder()
() -> Node.builder()
.nodeName("A")
.inDegrees(Lists.newArrayList(DAGEdge.builder()
.fromNodeName(null)
@ -42,7 +42,7 @@ class DAGNodeTest {
@Test
void buildDAGNode_NiceInDegree() {
assertDoesNotThrow(() -> DAGNode.builder()
assertDoesNotThrow(() -> Node.builder()
.nodeName("A")
.inDegrees(Lists.newArrayList(DAGEdge.builder()
.fromNodeName(null)
@ -55,7 +55,7 @@ class DAGNodeTest {
@Test
void buildDAGNode_BadOutDegree() {
IllegalArgumentException illegalArgumentException = assertThrows(IllegalArgumentException.class,
() -> DAGNode.builder()
() -> Node.builder()
.nodeName("A")
.inDegrees(new ArrayList<>())
.outDegrees(Lists.newArrayList(DAGEdge.builder()
@ -70,7 +70,7 @@ class DAGNodeTest {
@Test
void buildDAGNode_NiceOutDegree() {
assertDoesNotThrow(() -> DAGNode.builder()
assertDoesNotThrow(() -> Node.builder()
.nodeName("A")
.inDegrees(new ArrayList<>())
.outDegrees(Lists.newArrayList(DAGEdge.builder()
@ -82,23 +82,23 @@ class DAGNodeTest {
@Test
void buildDAGNode_NotSkip() {
DAGNode dagNode = DAGNode.builder()
Node node = Node.builder()
.nodeName("A")
.inDegrees(new ArrayList<>())
.outDegrees(new ArrayList<>())
.build();
assertFalse(dagNode.isSkip());
assertFalse(node.isSkip());
}
@Test
void buildDAGNode_Skip() {
DAGNode dagNode = DAGNode.builder()
Node node = Node.builder()
.nodeName("A")
.skip(true)
.inDegrees(new ArrayList<>())
.outDegrees(new ArrayList<>())
.build();
assertTrue(dagNode.isSkip());
assertTrue(node.isSkip());
}
}
Loading…
Cancel
Save