Browse Source

[Fix-5875][API] When I saved the task that had the same name task in another flow ,the service would throw DuplicateKeyException (#6430)

* using the task code in the dag
3.0.0/version-upgrade
Hua Jiang 3 years ago committed by GitHub
parent
commit
fac6b4afd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  3. 3
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
  4. 148
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  5. 9
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
  6. 21
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
  7. 13
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
  8. 72
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  9. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  10. 3
      sql/dolphinscheduler_h2.sql
  11. 3
      sql/dolphinscheduler_mysql.sql
  12. 3
      sql/dolphinscheduler_postgre.sql
  13. 22
      sql/upgrade/1.4.0_schema/mysql/dolphinscheduler_ddl.sql
  14. 17
      sql/upgrade/1.4.0_schema/postgresql/dolphinscheduler_ddl.sql

22
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -1155,18 +1155,18 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Iterator<Map.Entry<String, List<TreeViewDto>>> iter = runningNodeMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, List<TreeViewDto>> en = iter.next();
String nodeName = en.getKey();
String nodeCode = en.getKey();
parentTreeViewDtoList = en.getValue();
TreeViewDto treeViewDto = new TreeViewDto();
treeViewDto.setName(nodeName);
TaskNode taskNode = dag.getNode(nodeName);
TaskNode taskNode = dag.getNode(nodeCode);
treeViewDto.setType(taskNode.getType());
treeViewDto.setCode(taskNode.getCode());
treeViewDto.setName(taskNode.getName());
//set treeViewDto instances
for (int i = limit - 1; i >= 0; i--) {
ProcessInstance processInstance = processInstanceList.get(i);
TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), nodeName);
TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndCode(processInstance.getId(), Long.parseLong(nodeCode));
if (taskInstance == null) {
treeViewDto.getInstances().add(new Instance(-1, "not running", 0, "null"));
} else {
@ -1188,18 +1188,18 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) {
pTreeViewDto.getChildren().add(treeViewDto);
}
postNodeList = dag.getSubsequentNodes(nodeName);
postNodeList = dag.getSubsequentNodes(nodeCode);
if (CollectionUtils.isNotEmpty(postNodeList)) {
for (String nextNodeName : postNodeList) {
List<TreeViewDto> treeViewDtoList = waitingRunningNodeMap.get(nextNodeName);
for (String nextNodeCode : postNodeList) {
List<TreeViewDto> treeViewDtoList = waitingRunningNodeMap.get(nextNodeCode);
if (CollectionUtils.isEmpty(treeViewDtoList)) {
treeViewDtoList = new ArrayList<>();
}
treeViewDtoList.add(treeViewDto);
waitingRunningNodeMap.put(nextNodeName, treeViewDtoList);
waitingRunningNodeMap.put(nextNodeCode, treeViewDtoList);
}
}
runningNodeMap.remove(nodeName);
runningNodeMap.remove(nodeCode);
}
if (waitingRunningNodeMap.size() == 0) {
break;
@ -1224,14 +1224,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
DAG<String, TaskNode, String> graph = new DAG<>();
// Fill the vertices
for (TaskNode taskNodeResponse : taskNodeResponseList) {
graph.addNode(taskNodeResponse.getName(), taskNodeResponse);
graph.addNode(Long.toString(taskNodeResponse.getCode()), taskNodeResponse);
}
// Fill edge relations
for (TaskNode taskNodeResponse : taskNodeResponseList) {
List<String> preTasks = JSONUtils.toList(taskNodeResponse.getPreTasks(), String.class);
if (CollectionUtils.isNotEmpty(preTasks)) {
for (String preTask : preTasks) {
if (!graph.addEdge(preTask, taskNodeResponse.getName())) {
if (!graph.addEdge(preTask, Long.toString(taskNodeResponse.getCode()))) {
return true;
}
}

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -707,7 +707,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
List<Task> taskList = new ArrayList<>();
for (String node : nodeList) {
TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstanceId, node);
TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndCode(processInstanceId, Long.parseLong(node));
if (taskInstance == null) {
continue;
}

3
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java

@ -51,6 +51,9 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
TaskInstance queryByInstanceIdAndName(@Param("processInstanceId") int processInstanceId,
@Param("name") String name);
TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int processInstanceId,
@Param("taskCode") Long taskCode);
Integer countTask(@Param("projectCodes") Long[] projectCodes,
@Param("taskIds") int[] taskIds);

148
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -63,9 +63,9 @@ public class DagHelper {
String preTasks = taskNode.getPreTasks();
List<String> preTaskList = JSONUtils.toList(preTasks, String.class);
if (preTaskList != null) {
for (String depNodeName : preTaskList) {
if (null != findNodeByName(taskNodeList, depNodeName)) {
nodeRelationList.add(new TaskNodeRelation(depNodeName, taskNode.getName()));
for (String depNodeCode : preTaskList) {
if (null != findNodeByCode(taskNodeList, depNodeCode)) {
nodeRelationList.add(new TaskNodeRelation(depNodeCode, Long.toString(taskNode.getCode())));
}
}
}
@ -78,12 +78,12 @@ public class DagHelper {
*
* @param taskNodeList taskNodeList
* @param startNodeNameList startNodeNameList
* @param recoveryNodeNameList recoveryNodeNameList
* @param recoveryNodeCodeList recoveryNodeCodeList
* @param taskDependType taskDependType
* @return task node list
*/
public static List<TaskNode> generateFlowNodeListByStartNode(List<TaskNode> taskNodeList, List<String> startNodeNameList,
List<String> recoveryNodeNameList, TaskDependType taskDependType) {
List<String> recoveryNodeCodeList, TaskDependType taskDependType) {
List<TaskNode> destFlowNodeList = new ArrayList<>();
List<String> startNodeList = startNodeNameList;
@ -92,32 +92,34 @@ public class DagHelper {
logger.error("start node list is empty! cannot continue run the process ");
return destFlowNodeList;
}
List<TaskNode> destTaskNodeList = new ArrayList<>();
List<TaskNode> tmpTaskNodeList = new ArrayList<>();
if (taskDependType == TaskDependType.TASK_POST
&& CollectionUtils.isNotEmpty(recoveryNodeNameList)) {
startNodeList = recoveryNodeNameList;
&& CollectionUtils.isNotEmpty(recoveryNodeCodeList)) {
startNodeList = recoveryNodeCodeList;
}
if (CollectionUtils.isEmpty(startNodeList)) {
// no special designation start nodes
tmpTaskNodeList = taskNodeList;
} else {
// specified start nodes or resume execution
for (String startNodeName : startNodeList) {
TaskNode startNode = findNodeByName(taskNodeList, startNodeName);
for (String startNodeCode : startNodeList) {
TaskNode startNode = findNodeByCode(taskNodeList, startNodeCode);
List<TaskNode> childNodeList = new ArrayList<>();
if (startNode == null) {
logger.error("start node name [{}] is not in task node list [{}] ",
startNodeName,
startNodeCode,
taskNodeList
);
continue;
} else if (TaskDependType.TASK_POST == taskDependType) {
List<String> visitedNodeNameList = new ArrayList<>();
childNodeList = getFlowNodeListPost(startNode, taskNodeList, visitedNodeNameList);
List<String> visitedNodeCodeList = new ArrayList<>();
childNodeList = getFlowNodeListPost(startNode, taskNodeList, visitedNodeCodeList);
} else if (TaskDependType.TASK_PRE == taskDependType) {
List<String> visitedNodeNameList = new ArrayList<>();
childNodeList = getFlowNodeListPre(startNode, recoveryNodeNameList, taskNodeList, visitedNodeNameList);
List<String> visitedNodeCodeList = new ArrayList<>();
childNodeList = getFlowNodeListPre(startNode, recoveryNodeCodeList, taskNodeList, visitedNodeCodeList);
} else {
childNodeList.add(startNode);
}
@ -126,7 +128,7 @@ public class DagHelper {
}
for (TaskNode taskNode : tmpTaskNodeList) {
if (null == findNodeByName(destTaskNodeList, taskNode.getName())) {
if (null == findNodeByCode(destTaskNodeList, Long.toString(taskNode.getCode()))) {
destTaskNodeList.add(taskNode);
}
}
@ -141,17 +143,17 @@ public class DagHelper {
* @param taskNodeList taskNodeList
* @return task node list
*/
private static List<TaskNode> getFlowNodeListPost(TaskNode startNode, List<TaskNode> taskNodeList, List<String> visitedNodeNameList) {
private static List<TaskNode> getFlowNodeListPost(TaskNode startNode, List<TaskNode> taskNodeList, List<String> visitedNodeCodeList) {
List<TaskNode> resultList = new ArrayList<>();
for (TaskNode taskNode : taskNodeList) {
List<String> depList = taskNode.getDepList();
if (null != depList && null != startNode && depList.contains(startNode.getName()) && !visitedNodeNameList.contains(taskNode.getName())) {
resultList.addAll(getFlowNodeListPost(taskNode, taskNodeList, visitedNodeNameList));
if (null != depList && null != startNode && depList.contains(Long.toString(startNode.getCode())) && !visitedNodeCodeList.contains(Long.toString(taskNode.getCode()))) {
resultList.addAll(getFlowNodeListPost(taskNode, taskNodeList, visitedNodeCodeList));
}
}
// why add (startNode != null) condition? for SonarCloud Quality Gate passed
if (null != startNode) {
visitedNodeNameList.add(startNode.getName());
visitedNodeCodeList.add(Long.toString(startNode.getCode()));
}
resultList.add(startNode);
@ -163,11 +165,11 @@ public class DagHelper {
* find all nodes that start nodes depend on.
*
* @param startNode startNode
* @param recoveryNodeNameList recoveryNodeNameList
* @param recoveryNodeCodeList recoveryNodeCodeList
* @param taskNodeList taskNodeList
* @return task node list
*/
private static List<TaskNode> getFlowNodeListPre(TaskNode startNode, List<String> recoveryNodeNameList, List<TaskNode> taskNodeList, List<String> visitedNodeNameList) {
private static List<TaskNode> getFlowNodeListPre(TaskNode startNode, List<String> recoveryNodeCodeList, List<TaskNode> taskNodeList, List<String> visitedNodeCodeList) {
List<TaskNode> resultList = new ArrayList<>();
@ -179,17 +181,17 @@ public class DagHelper {
if (CollectionUtils.isEmpty(depList)) {
return resultList;
}
for (String depNodeName : depList) {
TaskNode start = findNodeByName(taskNodeList, depNodeName);
if (recoveryNodeNameList.contains(depNodeName)) {
for (String depNodeCode : depList) {
TaskNode start = findNodeByCode(taskNodeList, depNodeCode);
if (recoveryNodeCodeList.contains(depNodeCode)) {
resultList.add(start);
} else if (!visitedNodeNameList.contains(depNodeName)) {
resultList.addAll(getFlowNodeListPre(start, recoveryNodeNameList, taskNodeList, visitedNodeNameList));
} else if (!visitedNodeCodeList.contains(depNodeCode)) {
resultList.addAll(getFlowNodeListPre(start, recoveryNodeCodeList, taskNodeList, visitedNodeCodeList));
}
}
// why add (startNode != null) condition? for SonarCloud Quality Gate passed
if (null != startNode) {
visitedNodeNameList.add(startNode.getName());
visitedNodeCodeList.add(Long.toString(startNode.getCode()));
}
return resultList;
}
@ -199,17 +201,17 @@ public class DagHelper {
*
* @param totalTaskNodeList totalTaskNodeList
* @param startNodeNameList startNodeNameList
* @param recoveryNodeNameList recoveryNodeNameList
* @param recoveryNodeCodeList recoveryNodeCodeList
* @param depNodeType depNodeType
* @return process dag
* @throws Exception if error throws Exception
*/
public static ProcessDag generateFlowDag(List<TaskNode> totalTaskNodeList,
List<String> startNodeNameList,
List<String> recoveryNodeNameList,
List<String> recoveryNodeCodeList,
TaskDependType depNodeType) throws Exception {
List<TaskNode> destTaskNodeList = generateFlowNodeListByStartNode(totalTaskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType);
List<TaskNode> destTaskNodeList = generateFlowNodeListByStartNode(totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType);
if (destTaskNodeList.isEmpty()) {
return null;
}
@ -236,6 +238,22 @@ public class DagHelper {
return null;
}
/**
* find node by node code
*
* @param nodeDetails nodeDetails
* @param nodeCode nodeCode
* @return task node
*/
public static TaskNode findNodeByCode(List<TaskNode> nodeDetails, String nodeCode) {
for (TaskNode taskNode : nodeDetails) {
if (Long.toString(taskNode.getCode()).equals(nodeCode)) {
return taskNode;
}
}
return null;
}
/**
* the task can be submit when all the depends nodes are forbidden or complete
*
@ -252,11 +270,11 @@ public class DagHelper {
if (dependList == null) {
return true;
}
for (String dependNodeName : dependList) {
TaskNode dependNode = dag.getNode(dependNodeName);
if (dependNode == null || completeTaskList.containsKey(dependNodeName)
for (String dependNodeCode : dependList) {
TaskNode dependNode = dag.getNode(dependNodeCode);
if (dependNode == null || completeTaskList.containsKey(dependNodeCode)
|| dependNode.isForbidden()
|| skipTaskNodeList.containsKey(dependNodeName)) {
|| skipTaskNodeList.containsKey(dependNodeCode)) {
continue;
} else {
return false;
@ -272,22 +290,23 @@ public class DagHelper {
*
* @return successor nodes
*/
public static Set<String> parsePostNodes(String preNodeName,
public static Set<String> parsePostNodes(String preNodeCode,
Map<String, TaskNode> skipTaskNodeList,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList) {
Set<String> postNodeList = new HashSet<>();
Collection<String> startVertexes = new ArrayList<>();
if (preNodeName == null) {
if (preNodeCode == null) {
startVertexes = dag.getBeginNode();
} else if (dag.getNode(preNodeName).isConditionsTask()) {
List<String> conditionTaskList = parseConditionTask(preNodeName, skipTaskNodeList, dag, completeTaskList);
} else if (dag.getNode(preNodeCode).isConditionsTask()) {
List<String> conditionTaskList = parseConditionTask(preNodeCode, skipTaskNodeList, dag, completeTaskList);
startVertexes.addAll(conditionTaskList);
} else if (dag.getNode(preNodeName).isSwitchTask()) {
List<String> conditionTaskList = parseSwitchTask(preNodeName, skipTaskNodeList, dag, completeTaskList);
} else if (dag.getNode(preNodeCode).isSwitchTask()) {
List<String> conditionTaskList = parseSwitchTask(preNodeCode, skipTaskNodeList, dag, completeTaskList);
startVertexes.addAll(conditionTaskList);
} else {
startVertexes = dag.getSubsequentNodes(preNodeName);
startVertexes = dag.getSubsequentNodes(preNodeCode);
}
for (String subsequent : startVertexes) {
TaskNode taskNode = dag.getNode(subsequent);
@ -329,19 +348,19 @@ public class DagHelper {
* parse condition task find the branch process
* set skip flag for another one.
*/
public static List<String> parseConditionTask(String nodeName,
public static List<String> parseConditionTask(String nodeCode,
Map<String, TaskNode> skipTaskNodeList,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList) {
List<String> conditionTaskList = new ArrayList<>();
TaskNode taskNode = dag.getNode(nodeName);
TaskNode taskNode = dag.getNode(nodeCode);
if (!taskNode.isConditionsTask()) {
return conditionTaskList;
}
if (!completeTaskList.containsKey(nodeName)) {
if (!completeTaskList.containsKey(nodeCode)) {
return conditionTaskList;
}
TaskInstance taskInstance = completeTaskList.get(nodeName);
TaskInstance taskInstance = completeTaskList.get(nodeCode);
ConditionsParameters conditionsParameters =
JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class);
List<String> skipNodeList = new ArrayList<>();
@ -352,7 +371,7 @@ public class DagHelper {
conditionTaskList = conditionsParameters.getFailedNode();
skipNodeList = conditionsParameters.getSuccessNode();
} else {
conditionTaskList.add(nodeName);
conditionTaskList.add(nodeCode);
}
for (String failedNode : skipNodeList) {
setTaskNodeSkip(failedNode, dag, completeTaskList, skipTaskNodeList);
@ -364,19 +383,19 @@ public class DagHelper {
* parse condition task find the branch process
* set skip flag for another one.
*
* @param nodeName
* @param nodeCode
* @return
*/
public static List<String> parseSwitchTask(String nodeName,
public static List<String> parseSwitchTask(String nodeCode,
Map<String, TaskNode> skipTaskNodeList,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList) {
List<String> conditionTaskList = new ArrayList<>();
TaskNode taskNode = dag.getNode(nodeName);
TaskNode taskNode = dag.getNode(nodeCode);
if (!taskNode.isSwitchTask()) {
return conditionTaskList;
}
if (!completeTaskList.containsKey(nodeName)) {
if (!completeTaskList.containsKey(nodeCode)) {
return conditionTaskList;
}
conditionTaskList = skipTaskNode4Switch(taskNode, skipTaskNodeList, completeTaskList, dag);
@ -386,6 +405,7 @@ public class DagHelper {
private static List<String> skipTaskNode4Switch(TaskNode taskNode, Map<String, TaskNode> skipTaskNodeList,
Map<String, TaskInstance> completeTaskList,
DAG<String, TaskNode, TaskNodeRelation> dag) {
SwitchParameters switchParameters = completeTaskList.get(taskNode.getName()).getSwitchDependency();
int resultConditionLocation = switchParameters.getResultConditionLocation();
List<SwitchResultVo> conditionResultVoList = switchParameters.getDependTaskList();
@ -406,15 +426,15 @@ public class DagHelper {
/**
* set task node and the post nodes skip flag
*/
private static void setTaskNodeSkip(String skipNodeName,
private static void setTaskNodeSkip(String skipNodeCode,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList,
Map<String, TaskNode> skipTaskNodeList) {
if (!dag.containsNode(skipNodeName)) {
if (!dag.containsNode(skipNodeCode)) {
return;
}
skipTaskNodeList.putIfAbsent(skipNodeName, dag.getNode(skipNodeName));
Collection<String> postNodeList = dag.getSubsequentNodes(skipNodeName);
skipTaskNodeList.putIfAbsent(skipNodeCode, dag.getNode(skipNodeCode));
Collection<String> postNodeList = dag.getSubsequentNodes(skipNodeCode);
for (String post : postNodeList) {
TaskNode postNode = dag.getNode(post);
if (isTaskNodeNeedSkip(postNode, skipTaskNodeList)) {
@ -436,7 +456,7 @@ public class DagHelper {
//add vertex
if (CollectionUtils.isNotEmpty(processDag.getNodes())) {
for (TaskNode node : processDag.getNodes()) {
dag.addNode(node.getName(), node);
dag.addNode(Long.toString(node.getCode()), node);
}
}
@ -466,7 +486,7 @@ public class DagHelper {
// If the dependency is not empty
if (preTasksList != null) {
for (String depNode : preTasksList) {
taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getName()));
taskNodeRelations.add(new TaskNodeRelation(depNode, Long.toString(taskNode.getCode())));
}
}
}
@ -500,7 +520,7 @@ public class DagHelper {
&& taskNodeMap.containsKey(preTaskCode) && taskNodeMap.containsKey(postTaskCode)) {
TaskNode preNode = taskNodeMap.get(preTaskCode);
TaskNode postNode = taskNodeMap.get(postTaskCode);
taskNodeRelations.add(new TaskNodeRelation(preNode.getName(), postNode.getName()));
taskNodeRelations.add(new TaskNodeRelation(Long.toString(preNode.getCode()), Long.toString(postNode.getCode())));
}
}
ProcessDag processDag = new ProcessDag();
@ -512,18 +532,18 @@ public class DagHelper {
/**
* is there have conditions after the parent node
*/
public static boolean haveConditionsAfterNode(String parentNodeName,
public static boolean haveConditionsAfterNode(String parentNodeCode,
DAG<String, TaskNode, TaskNodeRelation> dag
) {
boolean result = false;
Set<String> subsequentNodes = dag.getSubsequentNodes(parentNodeName);
Set<String> subsequentNodes = dag.getSubsequentNodes(parentNodeCode);
if (CollectionUtils.isEmpty(subsequentNodes)) {
return result;
}
for (String nodeName : subsequentNodes) {
TaskNode taskNode = dag.getNode(nodeName);
for (String nodeCode : subsequentNodes) {
TaskNode taskNode = dag.getNode(nodeCode);
List<String> preTasksList = JSONUtils.toList(taskNode.getPreTasks(), String.class);
if (preTasksList.contains(parentNodeName) && taskNode.isConditionsTask()) {
if (preTasksList.contains(parentNodeCode) && taskNode.isConditionsTask()) {
return true;
}
}
@ -533,13 +553,13 @@ public class DagHelper {
/**
* is there have conditions after the parent node
*/
public static boolean haveConditionsAfterNode(String parentNodeName, List<TaskNode> taskNodes) {
public static boolean haveConditionsAfterNode(String parentNodeCode, List<TaskNode> taskNodes) {
if (CollectionUtils.isEmpty(taskNodes)) {
return false;
}
for (TaskNode taskNode : taskNodes) {
List<String> preTasksList = JSONUtils.toList(taskNode.getPreTasks(), String.class);
if (preTasksList.contains(parentNodeName) && taskNode.isConditionsTask()) {
if (preTasksList.contains(parentNodeCode) && taskNode.isConditionsTask()) {
return true;
}
}

9
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

@ -95,6 +95,15 @@
and flag = 1
limit 1
</select>
<select id="queryByInstanceIdAndCode" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
from t_ds_task_instance
where process_instance_id = #{processInstanceId}
and task_code = #{taskCode}
and flag = 1
limit 1
</select>
<select id="countTask" resultType="java.lang.Integer">
select count(1) as count
from t_ds_task_instance task,t_ds_task_definition_log define

21
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java

@ -276,6 +276,27 @@ public class TaskInstanceMapperTest {
Assert.assertNotEquals(taskInstance, null);
}
/**
* test query by task instance id and code
*/
@Test
public void testQueryByInstanceIdAndCode() {
// insert ProcessInstance
ProcessInstance processInstance = insertProcessInstance();
// insert taskInstance
TaskInstance task = insertTaskInstance(processInstance.getId());
task.setHost("111.111.11.11");
taskInstanceMapper.updateById(task);
TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndCode(
task.getProcessInstanceId(),
task.getTaskCode()
);
taskInstanceMapper.deleteById(task.getId());
Assert.assertNotEquals(taskInstance, null);
}
/**
* test count task instance
*/

13
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java

@ -291,12 +291,14 @@ public class DagHelperTest {
TaskNode node1 = new TaskNode();
node1.setId("1");
node1.setName("1");
node1.setCode(1);
node1.setType(TaskType.SHELL.getDesc());
taskNodeList.add(node1);
TaskNode node2 = new TaskNode();
node2.setId("2");
node2.setName("2");
node2.setCode(2);
node2.setType(TaskType.SHELL.getDesc());
List<String> dep2 = new ArrayList<>();
dep2.add("1");
@ -306,12 +308,14 @@ public class DagHelperTest {
TaskNode node4 = new TaskNode();
node4.setId("4");
node4.setName("4");
node4.setCode(4);
node4.setType(TaskType.SHELL.getDesc());
taskNodeList.add(node4);
TaskNode node3 = new TaskNode();
node3.setId("3");
node3.setName("3");
node3.setCode(3);
node3.setType(TaskType.SHELL.getDesc());
List<String> dep3 = new ArrayList<>();
dep3.add("2");
@ -322,6 +326,7 @@ public class DagHelperTest {
TaskNode node5 = new TaskNode();
node5.setId("5");
node5.setName("5");
node5.setCode(5);
node5.setType(TaskType.SHELL.getDesc());
List<String> dep5 = new ArrayList<>();
dep5.add("3");
@ -332,6 +337,7 @@ public class DagHelperTest {
TaskNode node6 = new TaskNode();
node6.setId("6");
node6.setName("6");
node6.setCode(6);
node6.setType(TaskType.SHELL.getDesc());
List<String> dep6 = new ArrayList<>();
dep6.add("3");
@ -341,6 +347,7 @@ public class DagHelperTest {
TaskNode node7 = new TaskNode();
node7.setId("7");
node7.setName("7");
node7.setCode(7);
node7.setType(TaskType.SHELL.getDesc());
List<String> dep7 = new ArrayList<>();
dep7.add("5");
@ -350,6 +357,7 @@ public class DagHelperTest {
TaskNode node8 = new TaskNode();
node8.setId("8");
node8.setName("8");
node8.setCode(8);
node8.setType(TaskType.SHELL.getDesc());
List<String> dep8 = new ArrayList<>();
dep8.add("2");
@ -381,12 +389,14 @@ public class DagHelperTest {
TaskNode node = new TaskNode();
node.setId("0");
node.setName("0");
node.setCode(0);
node.setType("SHELL");
taskNodeList.add(node);
TaskNode node1 = new TaskNode();
node1.setId("1");
node1.setName("1");
node1.setCode(1);
node1.setType("switch");
node1.setDependence(JSONUtils.toJsonString(getSwitchNode()));
taskNodeList.add(node1);
@ -394,6 +404,7 @@ public class DagHelperTest {
TaskNode node2 = new TaskNode();
node2.setId("2");
node2.setName("2");
node2.setCode(2);
node2.setType("SHELL");
List<String> dep2 = new ArrayList<>();
dep2.add("1");
@ -403,6 +414,7 @@ public class DagHelperTest {
TaskNode node4 = new TaskNode();
node4.setId("4");
node4.setName("4");
node4.setCode(4);
node4.setType("SHELL");
List<String> dep4 = new ArrayList<>();
dep4.add("1");
@ -412,6 +424,7 @@ public class DagHelperTest {
TaskNode node5 = new TaskNode();
node5.setId("4");
node5.setName("4");
node5.setCode(4);
node5.setType("SHELL");
List<String> dep5 = new ArrayList<>();
dep5.add("1");

72
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -375,19 +375,19 @@ public class WorkflowExecuteThread implements Runnable {
return;
}
ProcessInstance processInstance = processService.findProcessInstanceById(this.processInstance.getId());
completeTaskList.put(task.getName(), task);
completeTaskList.put(Long.toString(task.getTaskCode()), task);
activeTaskProcessorMaps.remove(task.getId());
taskTimeoutCheckList.remove(task.getId());
if (task.getState().typeIsSuccess()) {
processInstance.setVarPool(task.getVarPool());
processService.saveProcessInstance(processInstance);
submitPostNode(task.getName());
submitPostNode(Long.toString(task.getTaskCode()));
} else if (task.getState().typeIsFailure()) {
if (task.isConditionsTask()
|| DagHelper.haveConditionsAfterNode(task.getName(), dag)) {
submitPostNode(task.getName());
|| DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
submitPostNode(Long.toString(task.getTaskCode()));
} else {
errorTaskList.put(task.getName(), task);
errorTaskList.put(Long.toString(task.getTaskCode()), task);
if (processInstance.getFailureStrategy() == FailureStrategy.END) {
killAllTasks();
}
@ -522,16 +522,18 @@ public class WorkflowExecuteThread implements Runnable {
List<TaskNode> taskNodeList =
processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()), Lists.newArrayList());
forbiddenTaskList.clear();
taskNodeList.forEach(taskNode -> {
if (taskNode.isForbidden()) {
forbiddenTaskList.put(taskNode.getName(), taskNode);
forbiddenTaskList.put(Long.toString(taskNode.getCode()), taskNode);
}
});
// generate process to get DAG info
List<String> recoveryNameList = getRecoveryNodeNameList();
List<String> recoveryNodeCodeList = getRecoveryNodeCodeList();
List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
ProcessDag processDag = generateFlowDag(taskNodeList,
startNodeNameList, recoveryNameList, processInstance.getTaskDependType());
startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType());
if (processDag == null) {
logger.error("processDag is null");
return;
@ -553,13 +555,13 @@ public class WorkflowExecuteThread implements Runnable {
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : taskInstanceList) {
if (task.isTaskComplete()) {
completeTaskList.put(task.getName(), task);
completeTaskList.put(Long.toString(task.getTaskCode()), task);
}
if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(task.getName(), dag)) {
if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
continue;
}
if (task.getState().typeIsFailure() && !task.taskCanRetry()) {
errorTaskList.put(task.getName(), task);
errorTaskList.put(Long.toString(task.getTaskCode()), task);
}
}
@ -806,8 +808,8 @@ public class WorkflowExecuteThread implements Runnable {
}
}
private void submitPostNode(String parentNodeName) {
Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);
private void submitPostNode(String parentNodeCode) {
Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeList, dag, completeTaskList);
List<TaskInstance> taskInstances = new ArrayList<>();
for (String taskNode : submitTaskNodeList) {
TaskNode taskNodeObject = dag.getNode(taskNode);
@ -825,7 +827,7 @@ public class WorkflowExecuteThread implements Runnable {
continue;
}
if (completeTaskList.containsKey(task.getName())) {
if (completeTaskList.containsKey(Long.toString(task.getTaskCode()))) {
logger.info("task {} has already run success", task.getName());
continue;
}
@ -844,16 +846,16 @@ public class WorkflowExecuteThread implements Runnable {
*
* @return DependResult
*/
private DependResult isTaskDepsComplete(String taskName) {
private DependResult isTaskDepsComplete(String taskCode) {
Collection<String> startNodes = dag.getBeginNode();
// if vertex,returns true directly
if (startNodes.contains(taskName)) {
if (startNodes.contains(taskCode)) {
return DependResult.SUCCESS;
}
TaskNode taskNode = dag.getNode(taskName);
List<String> depNameList = taskNode.getDepList();
for (String depsNode : depNameList) {
TaskNode taskNode = dag.getNode(taskCode);
List<String> depCodeList = taskNode.getDepList();
for (String depsNode : depCodeList) {
if (!dag.containsNode(depsNode)
|| forbiddenTaskList.containsKey(depsNode)
|| skipTaskNodeList.containsKey(depsNode)) {
@ -871,11 +873,11 @@ public class WorkflowExecuteThread implements Runnable {
if (taskNode.isConditionsTask()) {
continue;
}
if (!dependTaskSuccess(depsNode, taskName)) {
if (!dependTaskSuccess(depsNode, taskCode)) {
return DependResult.FAILED;
}
}
logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray()));
logger.info("taskCode: {} completeDependTaskList: {}", taskCode, Arrays.toString(completeTaskList.keySet().toArray()));
return DependResult.SUCCESS;
}
@ -1109,7 +1111,7 @@ public class WorkflowExecuteThread implements Runnable {
* @return DependResult
*/
private DependResult getDependResultForTask(TaskInstance taskInstance) {
return isTaskDepsComplete(taskInstance.getName());
return isTaskDepsComplete(Long.toString(taskInstance.getTaskCode()));
}
/**
@ -1228,15 +1230,15 @@ public class WorkflowExecuteThread implements Runnable {
task.setState(retryTask.getState());
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
removeTaskFromStandbyList(task);
completeTaskList.put(task.getName(), task);
submitPostNode(task.getName());
completeTaskList.put(Long.toString(task.getTaskCode()), task);
submitPostNode(Long.toString(task.getTaskCode()));
continue;
}
}
//init varPool only this task is the first time running
if (task.isFirstRun()) {
//get pre task ,get all the task varPool to this task
Set<String> preTask = dag.getPreviousNodes(task.getName());
Set<String> preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode()));
getPreVarPool(task, preTask);
}
DependResult dependResult = getDependResultForTask(task);
@ -1251,7 +1253,7 @@ public class WorkflowExecuteThread implements Runnable {
}
} else if (DependResult.FAILED == dependResult) {
// if the dependency fails, the current node is not submitted and the state changes to failure.
dependFailedTask.put(task.getName(), task);
dependFailedTask.put(Long.toString(task.getTaskCode()), task);
removeTaskFromStandbyList(task);
logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult);
} else if (DependResult.NON_EXEC == dependResult) {
@ -1331,19 +1333,19 @@ public class WorkflowExecuteThread implements Runnable {
}
/**
* generate start node name list from parsing command param;
* generate start node code list from parsing command param;
* if "StartNodeIdList" exists in command param, return StartNodeIdList
*
* @return recovery node name list
* @return recovery node code list
*/
private List<String> getRecoveryNodeNameList() {
List<String> recoveryNodeNameList = new ArrayList<>();
private List<String> getRecoveryNodeCodeList() {
List<String> recoveryNodeCodeList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(recoverNodeIdList)) {
for (TaskInstance task : recoverNodeIdList) {
recoveryNodeNameList.add(task.getName());
recoveryNodeCodeList.add(Long.toString(task.getTaskCode()));
}
}
return recoveryNodeNameList;
return recoveryNodeCodeList;
}
/**
@ -1351,15 +1353,15 @@ public class WorkflowExecuteThread implements Runnable {
*
* @param totalTaskNodeList total task node list
* @param startNodeNameList start node name list
* @param recoveryNodeNameList recovery node name list
* @param recoveryNodeCodeList recovery node code list
* @param depNodeType depend node type
* @return ProcessDag process dag
* @throws Exception exception
*/
public ProcessDag generateFlowDag(List<TaskNode> totalTaskNodeList,
List<String> startNodeNameList,
List<String> recoveryNodeNameList,
List<String> recoveryNodeCodeList,
TaskDependType depNodeType) throws Exception {
return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType);
return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType);
}
}

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2498,7 +2498,7 @@ public class ProcessService {
taskDefinitionLog.getTimeoutNotifyStrategy(),
taskDefinitionLog.getTimeout())));
taskNode.setDelayTime(taskDefinitionLog.getDelayTime());
taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getName).collect(Collectors.toList())));
taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getCode).collect(Collectors.toList())));
taskNodeList.add(taskNode);
}
}

3
sql/dolphinscheduler_h2.sql

@ -472,8 +472,7 @@ CREATE TABLE t_ds_task_definition
resource_ids varchar(255) DEFAULT NULL,
create_time datetime NOT NULL,
update_time datetime DEFAULT NULL,
PRIMARY KEY (id, code),
UNIQUE KEY task_unique (name,project_code) USING BTREE
PRIMARY KEY (id, code)
);
-- ----------------------------

3
sql/dolphinscheduler_mysql.sql

@ -473,8 +473,7 @@ CREATE TABLE `t_ds_task_definition` (
`resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource id, separated by comma',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
PRIMARY KEY (`id`,`code`),
UNIQUE KEY `task_unique` (`name`,`project_code`) USING BTREE
PRIMARY KEY (`id`,`code`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------

3
sql/dolphinscheduler_postgre.sql

@ -382,8 +382,7 @@ CREATE TABLE t_ds_task_definition (
resource_ids varchar(255) DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id) ,
CONSTRAINT task_definition_unique UNIQUE (name, project_code)
PRIMARY KEY (id)
) ;
create index task_definition_index on t_ds_task_definition (project_code,id);

22
sql/upgrade/1.4.0_schema/mysql/dolphinscheduler_ddl.sql

@ -337,6 +337,25 @@ delimiter ;
CALL uc_dolphin_T_t_ds_schedules_A_add_timezone();
DROP PROCEDURE uc_dolphin_T_t_ds_schedules_A_add_timezone;
-- uc_dolphin_T_t_ds_task_definition_A_drop_UN_taskName
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_definition_A_drop_UN_taskName;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_task_definition_A_drop_UN_taskName()
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.STATISTICS
WHERE TABLE_NAME='t_ds_task_definition'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND INDEX_NAME ='task_unique')
ALTER TABLE t_ds_task_definition drop INDEX `task_unique`;
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_task_definition_A_drop_UN_taskName();
DROP PROCEDURE uc_dolphin_T_t_ds_task_definition_A_drop_UN_taskName;
-- ----------------------------
-- Table structure for t_ds_environment
-- ----------------------------
@ -382,8 +401,7 @@ CREATE TABLE `t_ds_task_definition` (
`resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource id, separated by comma',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
PRIMARY KEY (`id`,`code`),
UNIQUE KEY `task_unique` (`name`,`project_code`) USING BTREE
PRIMARY KEY (`id`,`code`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------

17
sql/upgrade/1.4.0_schema/postgresql/dolphinscheduler_ddl.sql

@ -379,6 +379,23 @@ CREATE TABLE t_ds_environment_worker_group_relation (
CONSTRAINT environment_worker_group_unique UNIQUE (environment_code,worker_group)
);
-- uc_dolphin_T_t_ds_task_definition_A_drop_UN_taskName
delimiter d//
CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_task_definition_A_drop_UN_taskName() RETURNS void AS $$
BEGIN
IF EXISTS (SELECT 1 FROM pg_stat_all_indexes
WHERE relname='t_ds_task_definition'
AND indexrelname ='task_definition_unique')
ALTER TABLE t_ds_task_definition drop CONSTRAINT task_definition_unique;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
SELECT uc_dolphin_T_t_ds_task_definition_A_drop_UN_taskName();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_definition_A_drop_UN_taskName();
-- ----------------------------
-- These columns will not be used in the new version,if you determine that the historical data is useless, you can delete it using the sql below
-- ----------------------------

Loading…
Cancel
Save