diff --git a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/DAG.java b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/DAG.java index 63e6370887..aed6193c83 100644 --- a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/DAG.java +++ b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/DAG.java @@ -45,13 +45,13 @@ public interface DAG { *
  • The post node of 3 is empty. *
  • The post node of null is 1,4,6. * - * @param dagNode the node of the DAG, can be null. + * @param node the node of the DAG, can be null. * @return post node list, sort by priority. */ - List getDirectPostNodes(DAGNode dagNode); + List getDirectPostNodes(Node node); /** - * Same with {@link #getDirectPostNodes(DAGNode)}. + * Same with {@link #getDirectPostNodes(Node)}. *

    * If the dagNodeName is null, return the nodes which doesn't have inDegrees. Otherwise, return the post nodes of * the given dagNodeName. If the dagNodeName is not null and cannot find the node in DAG, throw IllegalArgumentException. @@ -60,13 +60,7 @@ public interface DAG { * @return post task name list, sort by priority. * @throws IllegalArgumentException if the dagNodeName is not null and cannot find the node in DAG. */ - default List getDirectPostNodes(String dagNodeName) { - DAGNode dagNode = getDAGNode(dagNodeName); - if (dagNodeName != null && dagNode == null) { - throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG"); - } - return getDirectPostNodes(dagNode); - } + List getDirectPostNodes(String dagNodeName); /** * Same with {@link #getDirectPostNodes(String)}. Return the post node names. @@ -75,13 +69,7 @@ public interface DAG { * @return post task name list, sort by priority. * @throws IllegalArgumentException if the dagNodeName is not null and cannot find the node in DAG. */ - default List getDirectPostNodeNames(String dagNodeName) { - DAGNode dagNode = getDAGNode(dagNodeName); - if (dagNodeName != null && dagNode == null) { - throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG"); - } - return getDirectPostNodes(dagNode).stream().map(DAGNode::getNodeName).collect(Collectors.toList()); - } + List getDirectPostNodeNames(String dagNodeName); /** * Get the direct pre node of given dagNode, if the dagNode is null, return the nodes which doesn't have outDegrees. @@ -97,13 +85,13 @@ public interface DAG { *

  • The pre node of 3 is 2. *
  • The pre node of null is 3,5,6. * - * @param dagNode the node of the DAG, can be null. + * @param node the node of the DAG, can be null. * @return pre node list, sort by priority. */ - List getDirectPreNodes(DAGNode dagNode); + List getDirectPreNodes(Node node); /** - * Same with {@link #getDirectPreNodes(DAGNode)}. + * Same with {@link #getDirectPreNodes(Node)}. *

    * If the dagNodeName is null, return the nodes which doesn't have outDegrees. Otherwise, return the pre nodes of * the given dagNodeName. If the dagNodeName is not null and cannot find the node in DAG, throw IllegalArgumentException. @@ -112,13 +100,7 @@ public interface DAG { * @return pre task name list, sort by priority. * @throws IllegalArgumentException if the dagNodeName is not null and cannot find the node in DAG. */ - default List getDirectPreNodes(String dagNodeName) { - DAGNode dagNode = getDAGNode(dagNodeName); - if (dagNodeName != null && dagNode == null) { - throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG"); - } - return getDirectPreNodes(dagNode); - } + List getDirectPreNodes(String dagNodeName); /** * Same with {@link #getDirectPreNodes(String)}. Return the pre node names. @@ -127,13 +109,7 @@ public interface DAG { * @return pre task name list, sort by priority. * @throws IllegalArgumentException if the dagNodeName is not null and cannot find the node in DAG. */ - default List getDirectPreNodeNames(String dagNodeName) { - DAGNode dagNode = getDAGNode(dagNodeName); - if (dagNodeName != null && dagNode == null) { - throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG"); - } - return getDirectPreNodes(dagNode).stream().map(DAGNode::getNodeName).collect(Collectors.toList()); - } + List getDirectPreNodeNames(String dagNodeName); /** * Get the node of the DAG by the node name. @@ -141,6 +117,6 @@ public interface DAG { * @param nodeName the node name. * @return the node of the DAG, return null if cannot find the node. */ - DAGNode getDAGNode(String nodeName); + Node getDAGNode(String nodeName); } diff --git a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/DAGNode.java b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/DAGNode.java deleted file mode 100644 index 2e945a5463..0000000000 --- a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/DAGNode.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.workflow.engine.dag; - -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; - -import java.util.List; - -import lombok.Builder; -import lombok.Getter; -import lombok.NoArgsConstructor; - -/** - * The node of the DAG. - *

    - * The node contains the node name, the content of the node, the inDegrees and the outDegrees. - * The inDegrees is the edge from other nodes to the current node, the outDegrees is the edge from the current - * node to other nodes. - */ -@Getter -@Builder -@NoArgsConstructor -public class DAGNode implements IDAGNode { - - private String nodeName; - - /** - * whether the node is skipped, default is false, which means the node is not skipped. - * If the node is skipped, the node will not be executed. - */ - @Builder.Default - private boolean skip = false; - - private List inDegrees; - private List outDegrees; - - public DAGNode(String nodeName, - List inDegrees, - List outDegrees) { - this(nodeName, false, inDegrees, outDegrees); - } - - public DAGNode(String nodeName, - boolean skip, - List inDegrees, - List outDegrees) { - if (StringUtils.isEmpty(nodeName)) { - throw new IllegalArgumentException("nodeName cannot be empty"); - } - - if (CollectionUtils.isNotEmpty(inDegrees)) { - inDegrees.forEach(dagEdge -> { - if (!nodeName.equals(dagEdge.getToNodeName())) { - throw new IllegalArgumentException( - "The toNodeName of inDegree should be the nodeName of the node: " - + nodeName + ", inDegree: " + dagEdge); - } - }); - } - - if (CollectionUtils.isNotEmpty(outDegrees)) { - outDegrees.forEach(dagEdge -> { - if (!nodeName.equals(dagEdge.getFromNodeName())) { - throw new IllegalArgumentException( - "The fromNodeName of outDegree should be the nodeName of the node: " - + nodeName + ", outDegree: " + dagEdge); - } - }); - } - - this.nodeName = nodeName; - this.inDegrees = inDegrees; - this.outDegrees = outDegrees; - this.skip = skip; - } -} diff --git a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/IDAGNodeAction.java b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/Edge.java similarity index 77% rename from dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/IDAGNodeAction.java rename to dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/Edge.java index 9b6684a37f..bf8a81766c 100644 --- a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/IDAGNodeAction.java +++ b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/Edge.java @@ -17,19 +17,19 @@ package org.apache.dolphinscheduler.workflow.engine.dag; -/** - * The IDAGNodeAction represent the action of a DAG node. - */ -public interface IDAGNodeAction { - - void run(); - - void kill(); +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; - void pause(); +@Getter +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class Edge { - void success(); + private NodeIdentify from; - void failure(); + private NodeIdentify to; } diff --git a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/Node.java b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/Node.java new file mode 100644 index 0000000000..8e8719ac7d --- /dev/null +++ b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/Node.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.workflow.engine.dag; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.List; + +import lombok.Getter; +import lombok.NoArgsConstructor; + +/** + * The node of the DAG. + *

    + * The node contains the node name, the content of the node, the inDegrees and the outDegrees. + * The inDegrees is the edge from other nodes to the current node, the outDegrees is the edge from the current + * node to other nodes. + */ +@Getter +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class Node { + + private NodeIdentify nodeIdentify; + + private NodeContext nodeContext; + +} diff --git a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/NodeContext.java b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/NodeContext.java new file mode 100644 index 0000000000..5c35b04de9 --- /dev/null +++ b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/NodeContext.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.workflow.engine.dag; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class NodeContext { + + /** + * whether the node is skipped, default is false, which means the node is not skipped. + * If the node is skipped, the node will not be executed. + */ + private boolean skip = false; +} diff --git a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/IDAGNode.java b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/NodeIdentify.java similarity index 78% rename from dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/IDAGNode.java rename to dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/NodeIdentify.java index c3d85ae3ac..d7bf6034c5 100644 --- a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/IDAGNode.java +++ b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/NodeIdentify.java @@ -17,16 +17,19 @@ package org.apache.dolphinscheduler.workflow.engine.dag; -import java.util.List; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; -public interface IDAGNode { +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class NodeIdentify { - String getNodeName(); + private Long id; - boolean isSkip(); - - List getInDegrees(); - - List getOutDegrees(); + private String name; } diff --git a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/WorkflowDAG.java b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/WorkflowDAG.java index c31749af93..1b3cb35e40 100644 --- a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/WorkflowDAG.java +++ b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/WorkflowDAG.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.workflow.engine.dag; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -29,46 +30,91 @@ import java.util.stream.Collectors; */ public class WorkflowDAG implements DAG { - private final Map dagNodeMap; + private final Map dagNodeMap; - public WorkflowDAG(List dagNodes) { - this.dagNodeMap = dagNodes.stream().collect(Collectors.toMap(DAGNode::getNodeName, Function.identity())); + private final Map> outdegreeMap; + + private final Map> inDegredMap; + + public WorkflowDAG(List nodes, List edges) { + this.dagNodeMap = nodes.stream().collect(Collectors.toMap(Node::getNodeIdentify, Function.identity())); + this.outdegreeMap = new HashMap<>(); + this.inDegredMap = new HashMap<>(); + // todo: } @Override - public List getDirectPostNodes(DAGNode dagNode) { - final String nodeName = dagNode.getNodeName(); - if (!dagNodeMap.containsKey(nodeName)) { + public List getDirectPostNodes(Node dagNode) { + NodeIdentify nodeIdentify = dagNode.getNodeIdentify(); + if (!dagNodeMap.containsKey(nodeIdentify)) { return Collections.emptyList(); } - DAGNode node = dagNodeMap.get(nodeName); - List dagNodes = new ArrayList<>(); + Node node = dagNodeMap.get(nodeIdentify); + List nodes = new ArrayList<>(); for (DAGEdge edge : node.getOutDegrees()) { if (dagNodeMap.containsKey(edge.getToNodeName())) { - dagNodes.add(dagNodeMap.get(edge.getToNodeName())); + nodes.add(dagNodeMap.get(edge.getToNodeName())); } } - return dagNodes; + return nodes; + } + + @Override + public List getDirectPostNodes(String dagNodeName) { + Node node = getDAGNode(dagNodeName); + if (dagNodeName != null && node == null) { + throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG"); + } + return getDirectPostNodes(node); + } + + @Override + public List getDirectPostNodeNames(String dagNodeName) { + Node node = getDAGNode(dagNodeName); + if (dagNodeName != null && node == null) { + throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG"); + } + return getDirectPostNodes(node).stream() + .map(Node::getNodeName) + .collect(Collectors.toList()); } @Override - public List getDirectPreNodes(DAGNode dagNode) { + public List getDirectPreNodes(Node dagNode) { final String nodeName = dagNode.getNodeName(); if (!dagNodeMap.containsKey(nodeName)) { return Collections.emptyList(); } - DAGNode node = dagNodeMap.get(nodeName); - List dagNodes = new ArrayList<>(); + Node node = dagNodeMap.get(nodeName); + List nodes = new ArrayList<>(); for (DAGEdge edge : node.getInDegrees()) { if (dagNodeMap.containsKey(edge.getFromNodeName())) { - dagNodes.add(dagNodeMap.get(edge.getFromNodeName())); + nodes.add(dagNodeMap.get(edge.getFromNodeName())); } } - return dagNodes; + return nodes; + } + + @Override + public List getDirectPreNodes(String dagNodeName) { + Node node = getDAGNode(dagNodeName); + if (dagNodeName != null && node == null) { + throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG"); + } + return getDirectPreNodes(node); + } + + @Override + public List getDirectPreNodeNames(String dagNodeName) { + Node node = getDAGNode(dagNodeName); + if (dagNodeName != null && node == null) { + throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG"); + } + return getDirectPreNodes(node).stream().map(Node::getNodeName).collect(Collectors.toList()); } @Override - public DAGNode getDAGNode(String nodeName) { + public Node getDAGNode(String nodeName) { return dagNodeMap.get(nodeName); } diff --git a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/WorkflowDAGBuilder.java b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/WorkflowDAGBuilder.java index fbf666d947..d1387c0de9 100644 --- a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/WorkflowDAGBuilder.java +++ b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/dag/WorkflowDAGBuilder.java @@ -41,7 +41,7 @@ import java.util.Map; */ public class WorkflowDAGBuilder { - private final Map taskNameMap; + private final Map taskNameMap; private WorkflowDAGBuilder() { this.taskNameMap = new HashMap<>(); @@ -62,7 +62,7 @@ public class WorkflowDAGBuilder { throw new IllegalArgumentException("TaskNode with name " + nodeName + " already exists"); } - DAGNode taskNode = DAGNode.builder() + Node taskNode = Node.builder() .nodeName(nodeName) .inDegrees(new ArrayList<>()) .outDegrees(new ArrayList<>()) @@ -82,7 +82,7 @@ public class WorkflowDAGBuilder { String toNodeName = dagEdge.getToNodeName(); if (taskNameMap.containsKey(fromNodeName)) { - DAGNode fromTask = taskNameMap.get(fromNodeName); + Node fromTask = taskNameMap.get(fromNodeName); if (fromTask.getOutDegrees().contains(dagEdge)) { throw new IllegalArgumentException( "Edge from " + fromNodeName + " to " + toNodeName + " already exists"); @@ -90,7 +90,7 @@ public class WorkflowDAGBuilder { fromTask.getOutDegrees().add(dagEdge); } if (taskNameMap.containsKey(toNodeName)) { - DAGNode toTask = taskNameMap.get(toNodeName); + Node toTask = taskNameMap.get(toNodeName); if (toTask.getInDegrees().contains(dagEdge)) { throw new IllegalArgumentException( "Edge from " + fromNodeName + " to " + toNodeName + " already exists"); diff --git a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/engine/DAGEngine.java b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/engine/DAGEngine.java index d5273367ef..0a847583b4 100644 --- a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/engine/DAGEngine.java +++ b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/engine/DAGEngine.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.workflow.engine.engine; -import org.apache.dolphinscheduler.workflow.engine.dag.DAGNode; +import org.apache.dolphinscheduler.workflow.engine.dag.Node; import org.apache.dolphinscheduler.workflow.engine.event.TaskOperationEvent; import org.apache.dolphinscheduler.workflow.engine.event.TaskOperationType; import org.apache.dolphinscheduler.workflow.engine.workflow.ITaskExecutionRunnable; @@ -50,8 +50,8 @@ public class DAGEngine implements IDAGEngine { @Override public void triggerTask(String taskName) { IWorkflowExecutionDAG workflowExecutionDAG = workflowExecutionContext.getWorkflowExecutionDAG(); - DAGNode dagNode = workflowExecutionDAG.getDAGNode(taskName); - if (dagNode == null) { + Node node = workflowExecutionDAG.getDAGNode(taskName); + if (node == null) { log.error("Cannot find the DAGNode for task: {}", taskName); return; } @@ -63,7 +63,7 @@ public class DAGEngine implements IDAGEngine { return; } - if (dagNode.isSkip()) { + if (node.isSkip()) { log.info("The task: {} is skipped", taskName); triggerNextTasks(taskName); return; diff --git a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/engine/IEventDispatcher.java b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/IChain.java similarity index 80% rename from dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/engine/IEventDispatcher.java rename to dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/IChain.java index 32724fefc5..5c86c8b6f7 100644 --- a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/engine/IEventDispatcher.java +++ b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/IChain.java @@ -15,12 +15,5 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.workflow.engine.engine; - -import org.apache.dolphinscheduler.workflow.engine.event.IEvent; - -public interface IEventDispatcher { - - void dispatch(IEvent event); - +package org.apache.dolphinscheduler.workflow.engine.workflow;public interface IChain { } diff --git a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/IWorkflowExecutionDAG.java b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/IWorkflowExecutionDAG.java index d76ee1a64a..81732181ae 100644 --- a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/IWorkflowExecutionDAG.java +++ b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/IWorkflowExecutionDAG.java @@ -24,7 +24,7 @@ import java.util.List; /** * The WorkflowExecutionDAG represent the running workflow DAG. */ -public interface IWorkflowExecutionDAG extends DAG { +public interface IWorkflowExecutionDAG { List getStartNodeNames(); diff --git a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/IWorkflowExecutionRunnableDelegate.java b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/IWorkflowExecutionRunnableDelegate.java new file mode 100644 index 0000000000..5eae17192e --- /dev/null +++ b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/IWorkflowExecutionRunnableDelegate.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.workflow.engine.workflow;public interface IWorkflowExecutionRunnableDelegate { +} diff --git a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/WorkflowExecutionDAG.java b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/WorkflowExecutionDAG.java index 9e6fa7c210..6bcd123b2a 100644 --- a/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/WorkflowExecutionDAG.java +++ b/dolphinscheduler-workflow-engine/src/main/java/org/apache/dolphinscheduler/workflow/engine/workflow/WorkflowExecutionDAG.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.workflow.engine.workflow; -import org.apache.dolphinscheduler.workflow.engine.dag.DAGNode; +import org.apache.dolphinscheduler.workflow.engine.dag.Node; import org.apache.dolphinscheduler.workflow.engine.dag.WorkflowDAG; import java.util.ArrayList; @@ -80,13 +80,13 @@ public class WorkflowExecutionDAG implements IWorkflowExecutionDAG { @Override public boolean isTaskAbleToBeTriggered(String taskNodeName) { // todo: Check whether the workflow instance is finished or ready to finish. - List directPreNodes = getDirectPreNodes(taskNodeName); + List directPreNodes = getDirectPreNodes(taskNodeName); if (log.isDebugEnabled()) { log.debug("Begin to check whether the task {} is able to be triggered.", taskNodeName); log.debug("Task {} directly dependent on the task: {}.", taskNodeName, - directPreNodes.stream().map(DAGNode::getNodeName).collect(Collectors.toList())); + directPreNodes.stream().map(Node::getNodeName).collect(Collectors.toList())); } - for (DAGNode directPreNode : directPreNodes) { + for (Node directPreNode : directPreNodes) { if (directPreNode.isSkip()) { log.debug("The task {} is skipped.", directPreNode.getNodeName()); continue; @@ -105,19 +105,5 @@ public class WorkflowExecutionDAG implements IWorkflowExecutionDAG { taskExecutionRunnableRepository.storeTaskExecutionRunnable(taskExecutionRunnable); } - @Override - public List getDirectPostNodes(DAGNode dagNode) { - return workflowDAG.getDirectPostNodes(dagNode); - } - - @Override - public List getDirectPreNodes(DAGNode dagNode) { - return workflowDAG.getDirectPreNodes(dagNode); - } - - @Override - public DAGNode getDAGNode(String nodeName) { - return workflowDAG.getDAGNode(nodeName); - } } diff --git a/dolphinscheduler-workflow-engine/src/test/java/org/apache/dolphinscheduler/workflow/engine/dag/DAGNodeTest.java b/dolphinscheduler-workflow-engine/src/test/java/org/apache/dolphinscheduler/workflow/engine/dag/NodeTest.java similarity index 89% rename from dolphinscheduler-workflow-engine/src/test/java/org/apache/dolphinscheduler/workflow/engine/dag/DAGNodeTest.java rename to dolphinscheduler-workflow-engine/src/test/java/org/apache/dolphinscheduler/workflow/engine/dag/NodeTest.java index 0502450f82..2962ce8c68 100644 --- a/dolphinscheduler-workflow-engine/src/test/java/org/apache/dolphinscheduler/workflow/engine/dag/DAGNodeTest.java +++ b/dolphinscheduler-workflow-engine/src/test/java/org/apache/dolphinscheduler/workflow/engine/dag/NodeTest.java @@ -12,12 +12,12 @@ import org.junit.jupiter.api.Test; import com.google.common.collect.Lists; -class DAGNodeTest { +class NodeTest { @Test void buildDAGNode_EmptyNodeName() { IllegalArgumentException illegalArgumentException = assertThrows(IllegalArgumentException.class, - () -> DAGNode.builder() + () -> Node.builder() .inDegrees(new ArrayList<>()) .outDegrees(new ArrayList<>()) .build()); @@ -27,7 +27,7 @@ class DAGNodeTest { @Test void buildDAGNode_BadInDegree() { IllegalArgumentException illegalArgumentException = assertThrows(IllegalArgumentException.class, - () -> DAGNode.builder() + () -> Node.builder() .nodeName("A") .inDegrees(Lists.newArrayList(DAGEdge.builder() .fromNodeName(null) @@ -42,7 +42,7 @@ class DAGNodeTest { @Test void buildDAGNode_NiceInDegree() { - assertDoesNotThrow(() -> DAGNode.builder() + assertDoesNotThrow(() -> Node.builder() .nodeName("A") .inDegrees(Lists.newArrayList(DAGEdge.builder() .fromNodeName(null) @@ -55,7 +55,7 @@ class DAGNodeTest { @Test void buildDAGNode_BadOutDegree() { IllegalArgumentException illegalArgumentException = assertThrows(IllegalArgumentException.class, - () -> DAGNode.builder() + () -> Node.builder() .nodeName("A") .inDegrees(new ArrayList<>()) .outDegrees(Lists.newArrayList(DAGEdge.builder() @@ -70,7 +70,7 @@ class DAGNodeTest { @Test void buildDAGNode_NiceOutDegree() { - assertDoesNotThrow(() -> DAGNode.builder() + assertDoesNotThrow(() -> Node.builder() .nodeName("A") .inDegrees(new ArrayList<>()) .outDegrees(Lists.newArrayList(DAGEdge.builder() @@ -82,23 +82,23 @@ class DAGNodeTest { @Test void buildDAGNode_NotSkip() { - DAGNode dagNode = DAGNode.builder() + Node node = Node.builder() .nodeName("A") .inDegrees(new ArrayList<>()) .outDegrees(new ArrayList<>()) .build(); - assertFalse(dagNode.isSkip()); + assertFalse(node.isSkip()); } @Test void buildDAGNode_Skip() { - DAGNode dagNode = DAGNode.builder() + Node node = Node.builder() .nodeName("A") .skip(true) .inDegrees(new ArrayList<>()) .outDegrees(new ArrayList<>()) .build(); - assertTrue(dagNode.isSkip()); + assertTrue(node.isSkip()); } }