From 07c4e6835e81e600fb589421b003b8b485a395ce Mon Sep 17 00:00:00 2001 From: bao liang <29528966+lenboo@users.noreply.github.com> Date: Mon, 14 Oct 2019 10:39:36 +0800 Subject: [PATCH] fix bug: task cannot submit when recovery failover (#1011) * update english documents * refactor zk client * update documents * update zkclient * update zkclient * update documents * add architecture-design * change i18n * update i18n * update english documents * add architecture-design * update english documents * update en-US documents * add architecture-design * update demo site * add mybatis plus model * modify mybatisplus * modify mybatisplus * change interface by mybatisplus * add unit test * refactor dao interface. * add unit test for dao... * add unit test for dao... * add unit test for dao... * Merge remote-tracking branch 'upstream/dev-db' into dev-db # Conflicts: # dolphinscheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ProjectMapper.xml # dolphinscheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ScheduleMapper.xml # escheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ProcessInstanceMapper.xml # escheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ProjectUserMapper.xml # escheduler-dao/src/main/resources/cn.escheduler.dao.mapper/QueueMapper.xml # escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ProcessInstanceMapperTest.java # escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ProjectUserMapperTest.java # escheduler-dao/src/test/java/cn/escheduler/dao/mapper/QueueMapperTest.java # escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ResourceUserMapperTest.java # escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ScheduleMapperTest.java # escheduler-dao/src/test/java/cn/escheduler/dao/mapper/SessionMapperTest.java # escheduler-dao/src/test/java/cn/escheduler/dao/mapper/TenantMapperTest.java * Merge remote-tracking branch 'upstream/dev-db' into dev-db # Conflicts: # dolphinscheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ProjectMapper.xml # dolphinscheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ScheduleMapper.xml # escheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ProcessInstanceMapper.xml # escheduler-dao/src/main/resources/cn.escheduler.dao.mapper/ProjectUserMapper.xml # escheduler-dao/src/main/resources/cn.escheduler.dao.mapper/QueueMapper.xml # escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ProcessInstanceMapperTest.java # escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ProjectUserMapperTest.java # escheduler-dao/src/test/java/cn/escheduler/dao/mapper/QueueMapperTest.java # escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ResourceUserMapperTest.java # escheduler-dao/src/test/java/cn/escheduler/dao/mapper/ScheduleMapperTest.java # escheduler-dao/src/test/java/cn/escheduler/dao/mapper/SessionMapperTest.java # escheduler-dao/src/test/java/cn/escheduler/dao/mapper/TenantMapperTest.java * Merge remote-tracking branch 'upstream/dev-db' into dev-db # Conflicts: # dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml # dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml # dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml * update some dao bugs * update for some bugs * update some bugs * Merge remote-tracking branch 'upstream/dev-db' into dev-db # Conflicts: # dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml # dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.xml # dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml * update * update * add multiply settings for application.yml * add multiply settings for application.yml * revert * update configuration settings in task record dao... * change application_master to application-master * change application_master to application-master * update application.yml to application.properties * revert * revert * add properties * add properties * revert * revert * add api start up.. add alert send try catch * update dao info level * fix bug: task cannot submit when recovery failover * fix bug: task cannot submit when recovery failover * merge from dev-db * revert --- .../api/service/DataAnalysisService.java | 3 + .../dolphinscheduler/dao/utils/DagHelper.java | 31 +++-- .../dao/mapper/TaskInstanceMapper.xml | 1 - .../dao/utils/DagHelperTest.java | 114 ++++++++++++++++++ .../server/utils/AlertManager.java | 30 +++-- 5 files changed, 152 insertions(+), 27 deletions(-) create mode 100644 dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java index a64e01e35f..782835e4d8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java @@ -332,6 +332,9 @@ public class DataAnalysisService { projectIds.add(projectId); }else if(loginUser.getUserType() == UserType.GENERAL_USER){ projectIds = processDao.getProjectIdListHavePerm(loginUser.getId()); + if(projectIds.size() ==0 ){ + projectIds.add(0); + } } return projectIds.toArray(new Integer[projectIds.size()]); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index dba8aeb9fc..7ebd45bbee 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -49,7 +49,7 @@ public class DagHelper { * @param taskNodeList * @return */ - private static List generateRelationListByFlowNodes(List taskNodeList) { + public static List generateRelationListByFlowNodes(List taskNodeList) { List nodeRelationList = new ArrayList<>(); for (TaskNode taskNode : taskNodeList) { String preTasks = taskNode.getPreTasks(); @@ -73,8 +73,8 @@ public class DagHelper { * @param taskDependType * @return */ - private static List generateFlowNodeListByStartNode(List taskNodeList, List startNodeNameList, - List recoveryNodeNameList, TaskDependType taskDependType) { + public static List generateFlowNodeListByStartNode(List taskNodeList, List startNodeNameList, + List recoveryNodeNameList, TaskDependType taskDependType) { List destFlowNodeList = new ArrayList<>(); List startNodeList = startNodeNameList; @@ -262,35 +262,41 @@ public class DagHelper { for(String start : startVertexs){ TaskNode startNode = dag.getNode(start); if(!startNode.isForbidden() && !completeTaskList.containsKey(start)){ + // the start can be submit if not forbidden and not in complete tasks continue; } + // then submit the post nodes Collection postNodes = getStartVertex(start, dag, completeTaskList); - for(String post : postNodes){ - if(checkForbiddenPostCanSubmit(post, dag)){ + TaskNode postNode = dag.getNode(post); + if(taskNodeCanSubmit(postNode, dag, completeTaskList)){ tmpStartVertexs.add(post); } } tmpStartVertexs.remove(start); } - return tmpStartVertexs; } /** - * - * @param postNodeName + * the task can be submit when all the depends nodes are forbidden or complete + * @param taskNode * @param dag + * @param completeTaskList * @return */ - private static boolean checkForbiddenPostCanSubmit(String postNodeName, DAG dag){ + public static boolean taskNodeCanSubmit(TaskNode taskNode, + DAG dag, + Map completeTaskList) { - TaskNode postNode = dag.getNode(postNodeName); - List dependList = postNode.getDepList(); + List dependList = taskNode.getDepList(); + if(dependList == null){ + return true; + } for(String dependNodeName : dependList){ TaskNode dependNode = dag.getNode(dependNodeName); - if(!dependNode.isForbidden()){ + if(!dependNode.isForbidden() && !completeTaskList.containsKey(dependNodeName)){ return false; } } @@ -298,7 +304,6 @@ public class DagHelper { } - /*** * generate dag graph * @param processDag diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index 77290dee79..2acfd6eb02 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -50,7 +50,6 @@ #{i} - and t.flag = 1 and t.start_time > #{startTime} and t.start_time #{endTime} diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java new file mode 100644 index 0000000000..b85cfb5b6f --- /dev/null +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java @@ -0,0 +1,114 @@ +package org.apache.dolphinscheduler.dao.utils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.graph.DAG; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.model.TaskNodeRelation; +import org.apache.dolphinscheduler.common.process.ProcessDag; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DagHelperTest { + + + @Test + public void testTaskNodeCanSubmit() throws JsonProcessingException { + + + //1->2->3->5 + //4->3 + DAG dag = generateDag(); + TaskNode taskNode3 = dag.getNode("3"); + Map completeTaskList = new HashMap<>(); + completeTaskList.putIfAbsent("1", new TaskInstance()); + Boolean canSubmit = false; + + // 2/4 are forbidden submit 3 + TaskNode node2 = dag.getNode("2"); + node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); + TaskNode nodex = dag.getNode("4"); + nodex.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); + canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList); + Assert.assertEquals(canSubmit, true); + + // 2forbidden, 3 cannot be submit + completeTaskList.putIfAbsent("2", new TaskInstance()); + TaskNode nodey = dag.getNode("4"); + nodey.setRunFlag(""); + canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList); + Assert.assertEquals(canSubmit, false); + + // 2/3 forbidden submit 5 + TaskNode node3 = dag.getNode("3"); + node3.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); + TaskNode node5 = dag.getNode("5"); + canSubmit = DagHelper.taskNodeCanSubmit(node5, dag, completeTaskList); + Assert.assertEquals(canSubmit, true); + } + + /** + * 1->2->3->5 + * 4->3 + * @return + * @throws JsonProcessingException + */ + private DAG generateDag() throws JsonProcessingException { + List taskNodeList = new ArrayList<>(); + TaskNode node1 = new TaskNode(); + node1.setId("1"); + node1.setName("1"); + taskNodeList.add(node1); + + TaskNode node2 = new TaskNode(); + node2.setId("2"); + node2.setName("2"); + List dep2 = new ArrayList<>(); + dep2.add("1"); + node2.setDepList(dep2); + taskNodeList.add(node2); + + + TaskNode node4 = new TaskNode(); + node4.setId("4"); + node4.setName("4"); +// node4.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); + taskNodeList.add(node4); + + TaskNode node3 = new TaskNode(); + node3.setId("3"); + node3.setName("3"); + List dep3 = new ArrayList<>(); + dep3.add("2"); + dep3.add("4"); + node3.setDepList(dep3); + taskNodeList.add(node3); + + TaskNode node5 = new TaskNode(); + node5.setId("5"); + node5.setName("5"); + List dep5 = new ArrayList<>(); + dep5.add("3"); + node5.setDepList(dep5); + taskNodeList.add(node5); + + List startNodes = new ArrayList<>(); + List recoveryNodes = new ArrayList<>(); + List destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList, + startNodes, recoveryNodes, TaskDependType.TASK_POST); + List taskNodeRelations =DagHelper.generateRelationListByFlowNodes(destTaskNodeList); + ProcessDag processDag = new ProcessDag(); + processDag.setEdges(taskNodeRelations); + processDag.setNodes(destTaskNodeList); + + return DagHelper.buildDagGraph(processDag); + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java index 3a79f7073d..41f0abbc3c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java @@ -168,19 +168,23 @@ public class AlertManager { * @param toleranceTaskList */ public void sendAlertWorkerToleranceFault(ProcessInstance processInstance, List toleranceTaskList){ - Alert alert = new Alert(); - alert.setTitle("worker fault tolerance"); - alert.setShowType(ShowType.TABLE); - String content = getWorkerToleranceContent(processInstance, toleranceTaskList); - alert.setContent(content); - alert.setAlertType(AlertType.EMAIL); - alert.setCreateTime(new Date()); - alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1:processInstance.getWarningGroupId()); - alert.setReceivers(processInstance.getProcessDefinition().getReceivers()); - alert.setReceiversCc(processInstance.getProcessDefinition().getReceiversCc()); - - alertDao.addAlert(alert); - logger.info("add alert to db , alert : {}", alert.toString()); + try{ + Alert alert = new Alert(); + alert.setTitle("worker fault tolerance"); + alert.setShowType(ShowType.TABLE); + String content = getWorkerToleranceContent(processInstance, toleranceTaskList); + alert.setContent(content); + alert.setAlertType(AlertType.EMAIL); + alert.setCreateTime(new Date()); + alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1:processInstance.getWarningGroupId()); + alert.setReceivers(processInstance.getProcessDefinition().getReceivers()); + alert.setReceiversCc(processInstance.getProcessDefinition().getReceiversCc()); + alertDao.addAlert(alert); + logger.info("add alert to db , alert : {}", alert.toString()); + + }catch (Exception e){ + logger.error("send alert failed! " + e); + } }