Browse Source

[Improvement-7907][refactor] Optimization for query task instances list when build dag flow (#7915)

Optimization for query task instances list when build dag flow
This closes #7907
3.0.0/version-upgrade
天仇 2 years ago committed by GitHub
parent
commit
90ea6ebb6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 37
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  2. 13
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
  3. 15
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  4. 18
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

37
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -1528,27 +1528,25 @@ public class WorkflowExecuteThread {
}
/**
* get recovery task instance
* get recovery task instance list
*
* @param taskId task id
* @return recovery task instance
* @param taskIdArray task id array
* @return recovery task instance list
*/
private TaskInstance getRecoveryTaskInstance(String taskId) {
if (!StringUtils.isNotEmpty(taskId)) {
return null;
private List<TaskInstance> getRecoverTaskInstanceList(String[] taskIdArray) {
if (taskIdArray == null || taskIdArray.length == 0) {
return new ArrayList<>();
}
try {
Integer intId = Integer.valueOf(taskId);
TaskInstance task = processService.findTaskInstanceById(intId);
if (task == null) {
logger.error("start node id cannot be found: {}", taskId);
} else {
return task;
List<Integer> taskIdList = new ArrayList<>(taskIdArray.length);
for (String taskId : taskIdArray) {
try {
Integer id = Integer.valueOf(taskId);
taskIdList.add(id);
} catch (Exception e) {
logger.error("get recovery task instance failed ", e);
}
} catch (Exception e) {
logger.error("get recovery task instance failed ", e);
}
return null;
return processService.findTaskInstanceByIdList(taskIdList);
}
/**
@ -1564,12 +1562,7 @@ public class WorkflowExecuteThread {
if (paramMap != null && paramMap.containsKey(CMD_PARAM_RECOVERY_START_NODE_STRING)) {
String[] idList = paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA);
for (String nodeId : idList) {
TaskInstance task = getRecoveryTaskInstance(nodeId);
if (task != null) {
instanceList.add(task);
}
}
instanceList = getRecoverTaskInstanceList(idList);
}
return instanceList;
}

13
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java

@ -45,6 +45,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@ -154,15 +155,19 @@ public class WorkflowExecuteThreadTest {
taskInstance4.setId(4);
Map<String, String> cmdParam = new HashMap<>();
cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1,2,3,4");
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance1);
Mockito.when(processService.findTaskInstanceById(2)).thenReturn(taskInstance2);
Mockito.when(processService.findTaskInstanceById(3)).thenReturn(taskInstance3);
Mockito.when(processService.findTaskInstanceById(4)).thenReturn(taskInstance4);
Mockito.when(processService.findTaskInstanceByIdList(
Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId()))
).thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4));
Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
Method method = masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList", String.class);
method.setAccessible(true);
List<TaskInstance> taskInstances = (List<TaskInstance>) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam));
Assert.assertEquals(4, taskInstances.size());
cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "");
List<TaskInstance> taskInstanceEmpty = (List<TaskInstance>) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam));
Assert.assertTrue(taskInstanceEmpty.isEmpty());
} catch (Exception e) {
Assert.fail();
}

15
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -1549,12 +1549,25 @@ public class ProcessService {
* find task instance by id
*
* @param taskId task id
* @return task intance
* @return task instance
*/
public TaskInstance findTaskInstanceById(Integer taskId) {
return taskInstanceMapper.selectById(taskId);
}
/**
* find task instance list by id list
*
* @param idList task id list
* @return task instance list
*/
public List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList) {
if (CollectionUtils.isEmpty(idList)) {
return new ArrayList<>();
}
return taskInstanceMapper.selectBatchIds(idList);
}
/**
* package task instance
*/

18
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -70,6 +70,7 @@ import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@ -790,6 +791,23 @@ public class ProcessServiceTest {
}
@Test
public void testFindTaskInstanceByIdList() {
List<Integer> emptyList = new ArrayList<>();
Mockito.when(taskInstanceMapper.selectBatchIds(emptyList)).thenReturn(new ArrayList<>());
Assert.assertEquals(0, processService.findTaskInstanceByIdList(emptyList).size());
List<Integer> idList = Collections.singletonList(1);
TaskInstance instance = new TaskInstance();
instance.setId(1);
Mockito.when(taskInstanceMapper.selectBatchIds(idList)).thenReturn(Collections.singletonList(instance));
List<TaskInstance> taskInstanceByIdList = processService.findTaskInstanceByIdList(idList);
Assert.assertEquals(1, taskInstanceByIdList.size());
Assert.assertEquals(instance.getId(), taskInstanceByIdList.get(0).getId());
}
private TaskGroupQueue getTaskGroupQueue() {
TaskGroupQueue taskGroupQueue = new TaskGroupQueue();
taskGroupQueue.setTaskName("task name");

Loading…
Cancel
Save