From fac6b4afd3d185aeb09c9cb17de1549539b61c3f Mon Sep 17 00:00:00 2001 From: Hua Jiang Date: Fri, 8 Oct 2021 09:55:54 +0800 Subject: [PATCH] [Fix-5875][API] When I saved the task that had the same name task in another flow ,the service would throw DuplicateKeyException (#6430) * using the task code in the dag --- .../impl/ProcessDefinitionServiceImpl.java | 22 +-- .../impl/ProcessInstanceServiceImpl.java | 2 +- .../dao/mapper/TaskInstanceMapper.java | 3 + .../dolphinscheduler/dao/utils/DagHelper.java | 148 ++++++++++-------- .../dao/mapper/TaskInstanceMapper.xml | 9 ++ .../dao/mapper/TaskInstanceMapperTest.java | 21 +++ .../dao/utils/DagHelperTest.java | 13 ++ .../master/runner/WorkflowExecuteThread.java | 72 ++++----- .../service/process/ProcessService.java | 2 +- sql/dolphinscheduler_h2.sql | 3 +- sql/dolphinscheduler_mysql.sql | 3 +- sql/dolphinscheduler_postgre.sql | 3 +- .../mysql/dolphinscheduler_ddl.sql | 22 ++- .../postgresql/dolphinscheduler_ddl.sql | 17 ++ 14 files changed, 220 insertions(+), 120 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index fc2a2ac5ab..a138451bfe 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -1155,18 +1155,18 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro Iterator>> iter = runningNodeMap.entrySet().iterator(); while (iter.hasNext()) { Map.Entry> en = iter.next(); - String nodeName = en.getKey(); + String nodeCode = en.getKey(); parentTreeViewDtoList = en.getValue(); TreeViewDto treeViewDto = new TreeViewDto(); - treeViewDto.setName(nodeName); - TaskNode taskNode = dag.getNode(nodeName); + TaskNode taskNode = dag.getNode(nodeCode); treeViewDto.setType(taskNode.getType()); treeViewDto.setCode(taskNode.getCode()); + treeViewDto.setName(taskNode.getName()); //set treeViewDto instances for (int i = limit - 1; i >= 0; i--) { ProcessInstance processInstance = processInstanceList.get(i); - TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), nodeName); + TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndCode(processInstance.getId(), Long.parseLong(nodeCode)); if (taskInstance == null) { treeViewDto.getInstances().add(new Instance(-1, "not running", 0, "null")); } else { @@ -1188,18 +1188,18 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) { pTreeViewDto.getChildren().add(treeViewDto); } - postNodeList = dag.getSubsequentNodes(nodeName); + postNodeList = dag.getSubsequentNodes(nodeCode); if (CollectionUtils.isNotEmpty(postNodeList)) { - for (String nextNodeName : postNodeList) { - List treeViewDtoList = waitingRunningNodeMap.get(nextNodeName); + for (String nextNodeCode : postNodeList) { + List treeViewDtoList = waitingRunningNodeMap.get(nextNodeCode); if (CollectionUtils.isEmpty(treeViewDtoList)) { treeViewDtoList = new ArrayList<>(); } treeViewDtoList.add(treeViewDto); - waitingRunningNodeMap.put(nextNodeName, treeViewDtoList); + waitingRunningNodeMap.put(nextNodeCode, treeViewDtoList); } } - runningNodeMap.remove(nodeName); + runningNodeMap.remove(nodeCode); } if (waitingRunningNodeMap.size() == 0) { break; @@ -1224,14 +1224,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro DAG graph = new DAG<>(); // Fill the vertices for (TaskNode taskNodeResponse : taskNodeResponseList) { - graph.addNode(taskNodeResponse.getName(), taskNodeResponse); + graph.addNode(Long.toString(taskNodeResponse.getCode()), taskNodeResponse); } // Fill edge relations for (TaskNode taskNodeResponse : taskNodeResponseList) { List preTasks = JSONUtils.toList(taskNodeResponse.getPreTasks(), String.class); if (CollectionUtils.isNotEmpty(preTasks)) { for (String preTask : preTasks) { - if (!graph.addEdge(preTask, taskNodeResponse.getName())) { + if (!graph.addEdge(preTask, Long.toString(taskNodeResponse.getCode()))) { return true; } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 194a9922d8..3df20600c5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -707,7 +707,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce List taskList = new ArrayList<>(); for (String node : nodeList) { - TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstanceId, node); + TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndCode(processInstanceId, Long.parseLong(node)); if (taskInstance == null) { continue; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java index 898708e888..795004d0c9 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java @@ -51,6 +51,9 @@ public interface TaskInstanceMapper extends BaseMapper { TaskInstance queryByInstanceIdAndName(@Param("processInstanceId") int processInstanceId, @Param("name") String name); + TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int processInstanceId, + @Param("taskCode") Long taskCode); + Integer countTask(@Param("projectCodes") Long[] projectCodes, @Param("taskIds") int[] taskIds); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index de27f173ea..e692645dae 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -63,9 +63,9 @@ public class DagHelper { String preTasks = taskNode.getPreTasks(); List preTaskList = JSONUtils.toList(preTasks, String.class); if (preTaskList != null) { - for (String depNodeName : preTaskList) { - if (null != findNodeByName(taskNodeList, depNodeName)) { - nodeRelationList.add(new TaskNodeRelation(depNodeName, taskNode.getName())); + for (String depNodeCode : preTaskList) { + if (null != findNodeByCode(taskNodeList, depNodeCode)) { + nodeRelationList.add(new TaskNodeRelation(depNodeCode, Long.toString(taskNode.getCode()))); } } } @@ -78,12 +78,12 @@ public class DagHelper { * * @param taskNodeList taskNodeList * @param startNodeNameList startNodeNameList - * @param recoveryNodeNameList recoveryNodeNameList + * @param recoveryNodeCodeList recoveryNodeCodeList * @param taskDependType taskDependType * @return task node list */ public static List generateFlowNodeListByStartNode(List taskNodeList, List startNodeNameList, - List recoveryNodeNameList, TaskDependType taskDependType) { + List recoveryNodeCodeList, TaskDependType taskDependType) { List destFlowNodeList = new ArrayList<>(); List startNodeList = startNodeNameList; @@ -92,32 +92,34 @@ public class DagHelper { logger.error("start node list is empty! cannot continue run the process "); return destFlowNodeList; } + List destTaskNodeList = new ArrayList<>(); List tmpTaskNodeList = new ArrayList<>(); + if (taskDependType == TaskDependType.TASK_POST - && CollectionUtils.isNotEmpty(recoveryNodeNameList)) { - startNodeList = recoveryNodeNameList; + && CollectionUtils.isNotEmpty(recoveryNodeCodeList)) { + startNodeList = recoveryNodeCodeList; } if (CollectionUtils.isEmpty(startNodeList)) { // no special designation start nodes tmpTaskNodeList = taskNodeList; } else { // specified start nodes or resume execution - for (String startNodeName : startNodeList) { - TaskNode startNode = findNodeByName(taskNodeList, startNodeName); + for (String startNodeCode : startNodeList) { + TaskNode startNode = findNodeByCode(taskNodeList, startNodeCode); List childNodeList = new ArrayList<>(); if (startNode == null) { logger.error("start node name [{}] is not in task node list [{}] ", - startNodeName, + startNodeCode, taskNodeList ); continue; } else if (TaskDependType.TASK_POST == taskDependType) { - List visitedNodeNameList = new ArrayList<>(); - childNodeList = getFlowNodeListPost(startNode, taskNodeList, visitedNodeNameList); + List visitedNodeCodeList = new ArrayList<>(); + childNodeList = getFlowNodeListPost(startNode, taskNodeList, visitedNodeCodeList); } else if (TaskDependType.TASK_PRE == taskDependType) { - List visitedNodeNameList = new ArrayList<>(); - childNodeList = getFlowNodeListPre(startNode, recoveryNodeNameList, taskNodeList, visitedNodeNameList); + List visitedNodeCodeList = new ArrayList<>(); + childNodeList = getFlowNodeListPre(startNode, recoveryNodeCodeList, taskNodeList, visitedNodeCodeList); } else { childNodeList.add(startNode); } @@ -126,7 +128,7 @@ public class DagHelper { } for (TaskNode taskNode : tmpTaskNodeList) { - if (null == findNodeByName(destTaskNodeList, taskNode.getName())) { + if (null == findNodeByCode(destTaskNodeList, Long.toString(taskNode.getCode()))) { destTaskNodeList.add(taskNode); } } @@ -141,17 +143,17 @@ public class DagHelper { * @param taskNodeList taskNodeList * @return task node list */ - private static List getFlowNodeListPost(TaskNode startNode, List taskNodeList, List visitedNodeNameList) { + private static List getFlowNodeListPost(TaskNode startNode, List taskNodeList, List visitedNodeCodeList) { List resultList = new ArrayList<>(); for (TaskNode taskNode : taskNodeList) { List depList = taskNode.getDepList(); - if (null != depList && null != startNode && depList.contains(startNode.getName()) && !visitedNodeNameList.contains(taskNode.getName())) { - resultList.addAll(getFlowNodeListPost(taskNode, taskNodeList, visitedNodeNameList)); + if (null != depList && null != startNode && depList.contains(Long.toString(startNode.getCode())) && !visitedNodeCodeList.contains(Long.toString(taskNode.getCode()))) { + resultList.addAll(getFlowNodeListPost(taskNode, taskNodeList, visitedNodeCodeList)); } } // why add (startNode != null) condition? for SonarCloud Quality Gate passed if (null != startNode) { - visitedNodeNameList.add(startNode.getName()); + visitedNodeCodeList.add(Long.toString(startNode.getCode())); } resultList.add(startNode); @@ -163,11 +165,11 @@ public class DagHelper { * find all nodes that start nodes depend on. * * @param startNode startNode - * @param recoveryNodeNameList recoveryNodeNameList + * @param recoveryNodeCodeList recoveryNodeCodeList * @param taskNodeList taskNodeList * @return task node list */ - private static List getFlowNodeListPre(TaskNode startNode, List recoveryNodeNameList, List taskNodeList, List visitedNodeNameList) { + private static List getFlowNodeListPre(TaskNode startNode, List recoveryNodeCodeList, List taskNodeList, List visitedNodeCodeList) { List resultList = new ArrayList<>(); @@ -179,17 +181,17 @@ public class DagHelper { if (CollectionUtils.isEmpty(depList)) { return resultList; } - for (String depNodeName : depList) { - TaskNode start = findNodeByName(taskNodeList, depNodeName); - if (recoveryNodeNameList.contains(depNodeName)) { + for (String depNodeCode : depList) { + TaskNode start = findNodeByCode(taskNodeList, depNodeCode); + if (recoveryNodeCodeList.contains(depNodeCode)) { resultList.add(start); - } else if (!visitedNodeNameList.contains(depNodeName)) { - resultList.addAll(getFlowNodeListPre(start, recoveryNodeNameList, taskNodeList, visitedNodeNameList)); + } else if (!visitedNodeCodeList.contains(depNodeCode)) { + resultList.addAll(getFlowNodeListPre(start, recoveryNodeCodeList, taskNodeList, visitedNodeCodeList)); } } // why add (startNode != null) condition? for SonarCloud Quality Gate passed if (null != startNode) { - visitedNodeNameList.add(startNode.getName()); + visitedNodeCodeList.add(Long.toString(startNode.getCode())); } return resultList; } @@ -199,17 +201,17 @@ public class DagHelper { * * @param totalTaskNodeList totalTaskNodeList * @param startNodeNameList startNodeNameList - * @param recoveryNodeNameList recoveryNodeNameList + * @param recoveryNodeCodeList recoveryNodeCodeList * @param depNodeType depNodeType * @return process dag * @throws Exception if error throws Exception */ public static ProcessDag generateFlowDag(List totalTaskNodeList, List startNodeNameList, - List recoveryNodeNameList, + List recoveryNodeCodeList, TaskDependType depNodeType) throws Exception { - List destTaskNodeList = generateFlowNodeListByStartNode(totalTaskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType); + List destTaskNodeList = generateFlowNodeListByStartNode(totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType); if (destTaskNodeList.isEmpty()) { return null; } @@ -236,6 +238,22 @@ public class DagHelper { return null; } + /** + * find node by node code + * + * @param nodeDetails nodeDetails + * @param nodeCode nodeCode + * @return task node + */ + public static TaskNode findNodeByCode(List nodeDetails, String nodeCode) { + for (TaskNode taskNode : nodeDetails) { + if (Long.toString(taskNode.getCode()).equals(nodeCode)) { + return taskNode; + } + } + return null; + } + /** * the task can be submit when all the depends nodes are forbidden or complete * @@ -252,11 +270,11 @@ public class DagHelper { if (dependList == null) { return true; } - for (String dependNodeName : dependList) { - TaskNode dependNode = dag.getNode(dependNodeName); - if (dependNode == null || completeTaskList.containsKey(dependNodeName) + for (String dependNodeCode : dependList) { + TaskNode dependNode = dag.getNode(dependNodeCode); + if (dependNode == null || completeTaskList.containsKey(dependNodeCode) || dependNode.isForbidden() - || skipTaskNodeList.containsKey(dependNodeName)) { + || skipTaskNodeList.containsKey(dependNodeCode)) { continue; } else { return false; @@ -272,22 +290,23 @@ public class DagHelper { * * @return successor nodes */ - public static Set parsePostNodes(String preNodeName, + public static Set parsePostNodes(String preNodeCode, Map skipTaskNodeList, DAG dag, Map completeTaskList) { Set postNodeList = new HashSet<>(); Collection startVertexes = new ArrayList<>(); - if (preNodeName == null) { + + if (preNodeCode == null) { startVertexes = dag.getBeginNode(); - } else if (dag.getNode(preNodeName).isConditionsTask()) { - List conditionTaskList = parseConditionTask(preNodeName, skipTaskNodeList, dag, completeTaskList); + } else if (dag.getNode(preNodeCode).isConditionsTask()) { + List conditionTaskList = parseConditionTask(preNodeCode, skipTaskNodeList, dag, completeTaskList); startVertexes.addAll(conditionTaskList); - } else if (dag.getNode(preNodeName).isSwitchTask()) { - List conditionTaskList = parseSwitchTask(preNodeName, skipTaskNodeList, dag, completeTaskList); + } else if (dag.getNode(preNodeCode).isSwitchTask()) { + List conditionTaskList = parseSwitchTask(preNodeCode, skipTaskNodeList, dag, completeTaskList); startVertexes.addAll(conditionTaskList); } else { - startVertexes = dag.getSubsequentNodes(preNodeName); + startVertexes = dag.getSubsequentNodes(preNodeCode); } for (String subsequent : startVertexes) { TaskNode taskNode = dag.getNode(subsequent); @@ -329,19 +348,19 @@ public class DagHelper { * parse condition task find the branch process * set skip flag for another one. */ - public static List parseConditionTask(String nodeName, + public static List parseConditionTask(String nodeCode, Map skipTaskNodeList, DAG dag, Map completeTaskList) { List conditionTaskList = new ArrayList<>(); - TaskNode taskNode = dag.getNode(nodeName); + TaskNode taskNode = dag.getNode(nodeCode); if (!taskNode.isConditionsTask()) { return conditionTaskList; } - if (!completeTaskList.containsKey(nodeName)) { + if (!completeTaskList.containsKey(nodeCode)) { return conditionTaskList; } - TaskInstance taskInstance = completeTaskList.get(nodeName); + TaskInstance taskInstance = completeTaskList.get(nodeCode); ConditionsParameters conditionsParameters = JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class); List skipNodeList = new ArrayList<>(); @@ -352,7 +371,7 @@ public class DagHelper { conditionTaskList = conditionsParameters.getFailedNode(); skipNodeList = conditionsParameters.getSuccessNode(); } else { - conditionTaskList.add(nodeName); + conditionTaskList.add(nodeCode); } for (String failedNode : skipNodeList) { setTaskNodeSkip(failedNode, dag, completeTaskList, skipTaskNodeList); @@ -364,19 +383,19 @@ public class DagHelper { * parse condition task find the branch process * set skip flag for another one. * - * @param nodeName + * @param nodeCode * @return */ - public static List parseSwitchTask(String nodeName, + public static List parseSwitchTask(String nodeCode, Map skipTaskNodeList, DAG dag, Map completeTaskList) { List conditionTaskList = new ArrayList<>(); - TaskNode taskNode = dag.getNode(nodeName); + TaskNode taskNode = dag.getNode(nodeCode); if (!taskNode.isSwitchTask()) { return conditionTaskList; } - if (!completeTaskList.containsKey(nodeName)) { + if (!completeTaskList.containsKey(nodeCode)) { return conditionTaskList; } conditionTaskList = skipTaskNode4Switch(taskNode, skipTaskNodeList, completeTaskList, dag); @@ -386,6 +405,7 @@ public class DagHelper { private static List skipTaskNode4Switch(TaskNode taskNode, Map skipTaskNodeList, Map completeTaskList, DAG dag) { + SwitchParameters switchParameters = completeTaskList.get(taskNode.getName()).getSwitchDependency(); int resultConditionLocation = switchParameters.getResultConditionLocation(); List conditionResultVoList = switchParameters.getDependTaskList(); @@ -406,15 +426,15 @@ public class DagHelper { /** * set task node and the post nodes skip flag */ - private static void setTaskNodeSkip(String skipNodeName, + private static void setTaskNodeSkip(String skipNodeCode, DAG dag, Map completeTaskList, Map skipTaskNodeList) { - if (!dag.containsNode(skipNodeName)) { + if (!dag.containsNode(skipNodeCode)) { return; } - skipTaskNodeList.putIfAbsent(skipNodeName, dag.getNode(skipNodeName)); - Collection postNodeList = dag.getSubsequentNodes(skipNodeName); + skipTaskNodeList.putIfAbsent(skipNodeCode, dag.getNode(skipNodeCode)); + Collection postNodeList = dag.getSubsequentNodes(skipNodeCode); for (String post : postNodeList) { TaskNode postNode = dag.getNode(post); if (isTaskNodeNeedSkip(postNode, skipTaskNodeList)) { @@ -436,7 +456,7 @@ public class DagHelper { //add vertex if (CollectionUtils.isNotEmpty(processDag.getNodes())) { for (TaskNode node : processDag.getNodes()) { - dag.addNode(node.getName(), node); + dag.addNode(Long.toString(node.getCode()), node); } } @@ -466,7 +486,7 @@ public class DagHelper { // If the dependency is not empty if (preTasksList != null) { for (String depNode : preTasksList) { - taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getName())); + taskNodeRelations.add(new TaskNodeRelation(depNode, Long.toString(taskNode.getCode()))); } } } @@ -500,7 +520,7 @@ public class DagHelper { && taskNodeMap.containsKey(preTaskCode) && taskNodeMap.containsKey(postTaskCode)) { TaskNode preNode = taskNodeMap.get(preTaskCode); TaskNode postNode = taskNodeMap.get(postTaskCode); - taskNodeRelations.add(new TaskNodeRelation(preNode.getName(), postNode.getName())); + taskNodeRelations.add(new TaskNodeRelation(Long.toString(preNode.getCode()), Long.toString(postNode.getCode()))); } } ProcessDag processDag = new ProcessDag(); @@ -512,18 +532,18 @@ public class DagHelper { /** * is there have conditions after the parent node */ - public static boolean haveConditionsAfterNode(String parentNodeName, + public static boolean haveConditionsAfterNode(String parentNodeCode, DAG dag ) { boolean result = false; - Set subsequentNodes = dag.getSubsequentNodes(parentNodeName); + Set subsequentNodes = dag.getSubsequentNodes(parentNodeCode); if (CollectionUtils.isEmpty(subsequentNodes)) { return result; } - for (String nodeName : subsequentNodes) { - TaskNode taskNode = dag.getNode(nodeName); + for (String nodeCode : subsequentNodes) { + TaskNode taskNode = dag.getNode(nodeCode); List preTasksList = JSONUtils.toList(taskNode.getPreTasks(), String.class); - if (preTasksList.contains(parentNodeName) && taskNode.isConditionsTask()) { + if (preTasksList.contains(parentNodeCode) && taskNode.isConditionsTask()) { return true; } } @@ -533,13 +553,13 @@ public class DagHelper { /** * is there have conditions after the parent node */ - public static boolean haveConditionsAfterNode(String parentNodeName, List taskNodes) { + public static boolean haveConditionsAfterNode(String parentNodeCode, List taskNodes) { if (CollectionUtils.isEmpty(taskNodes)) { return false; } for (TaskNode taskNode : taskNodes) { List preTasksList = JSONUtils.toList(taskNode.getPreTasks(), String.class); - if (preTasksList.contains(parentNodeName) && taskNode.isConditionsTask()) { + if (preTasksList.contains(parentNodeCode) && taskNode.isConditionsTask()) { return true; } } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index 0964d1cf77..f41b58ae13 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -95,6 +95,15 @@ and flag = 1 limit 1 +