From 90ea6ebb6f6e5db0e0e444c32da8ccaa180bd459 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A9=E4=BB=87?= <532066967@qq.com> Date: Sun, 16 Jan 2022 22:30:09 +0800 Subject: [PATCH] [Improvement-7907][refactor] Optimization for query task instances list when build dag flow (#7915) Optimization for query task instances list when build dag flow This closes #7907 --- .../master/runner/WorkflowExecuteThread.java | 37 ++++++++----------- .../master/WorkflowExecuteThreadTest.java | 13 +++++-- .../service/process/ProcessService.java | 15 +++++++- .../service/process/ProcessServiceTest.java | 18 +++++++++ 4 files changed, 56 insertions(+), 27 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 911439dcff..73e655f0ab 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -1528,27 +1528,25 @@ public class WorkflowExecuteThread { } /** - * get recovery task instance + * get recovery task instance list * - * @param taskId task id - * @return recovery task instance + * @param taskIdArray task id array + * @return recovery task instance list */ - private TaskInstance getRecoveryTaskInstance(String taskId) { - if (!StringUtils.isNotEmpty(taskId)) { - return null; + private List getRecoverTaskInstanceList(String[] taskIdArray) { + if (taskIdArray == null || taskIdArray.length == 0) { + return new ArrayList<>(); } - try { - Integer intId = Integer.valueOf(taskId); - TaskInstance task = processService.findTaskInstanceById(intId); - if (task == null) { - logger.error("start node id cannot be found: {}", taskId); - } else { - return task; + List taskIdList = new ArrayList<>(taskIdArray.length); + for (String taskId : taskIdArray) { + try { + Integer id = Integer.valueOf(taskId); + taskIdList.add(id); + } catch (Exception e) { + logger.error("get recovery task instance failed ", e); } - } catch (Exception e) { - logger.error("get recovery task instance failed ", e); } - return null; + return processService.findTaskInstanceByIdList(taskIdList); } /** @@ -1564,12 +1562,7 @@ public class WorkflowExecuteThread { if (paramMap != null && paramMap.containsKey(CMD_PARAM_RECOVERY_START_NODE_STRING)) { String[] idList = paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA); - for (String nodeId : idList) { - TaskInstance task = getRecoveryTaskInstance(nodeId); - if (task != null) { - instanceList.add(task); - } - } + instanceList = getRecoverTaskInstanceList(idList); } return instanceList; } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java index 1e3893824d..3d18498f8b 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java @@ -45,6 +45,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.text.ParseException; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -154,15 +155,19 @@ public class WorkflowExecuteThreadTest { taskInstance4.setId(4); Map cmdParam = new HashMap<>(); cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1,2,3,4"); - Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance1); - Mockito.when(processService.findTaskInstanceById(2)).thenReturn(taskInstance2); - Mockito.when(processService.findTaskInstanceById(3)).thenReturn(taskInstance3); - Mockito.when(processService.findTaskInstanceById(4)).thenReturn(taskInstance4); + Mockito.when(processService.findTaskInstanceByIdList( + Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId())) + ).thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4)); Class masterExecThreadClass = WorkflowExecuteThread.class; Method method = masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList", String.class); method.setAccessible(true); List taskInstances = (List) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam)); Assert.assertEquals(4, taskInstances.size()); + + cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, ""); + List taskInstanceEmpty = (List) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam)); + Assert.assertTrue(taskInstanceEmpty.isEmpty()); + } catch (Exception e) { Assert.fail(); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index a531470d05..5cebcf7ad7 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -1549,12 +1549,25 @@ public class ProcessService { * find task instance by id * * @param taskId task id - * @return task intance + * @return task instance */ public TaskInstance findTaskInstanceById(Integer taskId) { return taskInstanceMapper.selectById(taskId); } + /** + * find task instance list by id list + * + * @param idList task id list + * @return task instance list + */ + public List findTaskInstanceByIdList(List idList) { + if (CollectionUtils.isEmpty(idList)) { + return new ArrayList<>(); + } + return taskInstanceMapper.selectBatchIds(idList); + } + /** * package task instance */ diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 76b5bad47d..c7aab24a59 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -70,6 +70,7 @@ import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -790,6 +791,23 @@ public class ProcessServiceTest { } + @Test + public void testFindTaskInstanceByIdList() { + List emptyList = new ArrayList<>(); + Mockito.when(taskInstanceMapper.selectBatchIds(emptyList)).thenReturn(new ArrayList<>()); + Assert.assertEquals(0, processService.findTaskInstanceByIdList(emptyList).size()); + + List idList = Collections.singletonList(1); + TaskInstance instance = new TaskInstance(); + instance.setId(1); + + Mockito.when(taskInstanceMapper.selectBatchIds(idList)).thenReturn(Collections.singletonList(instance)); + List taskInstanceByIdList = processService.findTaskInstanceByIdList(idList); + + Assert.assertEquals(1, taskInstanceByIdList.size()); + Assert.assertEquals(instance.getId(), taskInstanceByIdList.get(0).getId()); + } + private TaskGroupQueue getTaskGroupQueue() { TaskGroupQueue taskGroupQueue = new TaskGroupQueue(); taskGroupQueue.setTaskName("task name");