From ccd8aaebab635174c0b8dccbd49f7415c43cd405 Mon Sep 17 00:00:00 2001 From: Shiwen Cheng Date: Wed, 3 Mar 2021 08:52:45 +0800 Subject: [PATCH] [Improvement][Server] Improve dependent task unit tests (#4911) --- .../server/master/DependentTaskTest.java | 445 +++++++++++++----- 1 file changed, 316 insertions(+), 129 deletions(-) diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java index 0f6239af0e..48a0750666 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java @@ -14,212 +14,399 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.master; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.DependResult; +import org.apache.dolphinscheduler.common.enums.DependentRelation; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.model.DateInterval; +import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.model.DependentItem; +import org.apache.dolphinscheduler.common.model.DependentTaskModel; import org.apache.dolphinscheduler.common.model.TaskNode; -import org.apache.dolphinscheduler.common.utils.dependent.DependentDateUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; +import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.DependentTaskExecThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.stream.Collectors; +import java.util.stream.Stream; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - +/** + * DependentTaskTest + */ @RunWith(MockitoJUnitRunner.Silent.class) public class DependentTaskTest { - private static final Logger logger = LoggerFactory.getLogger(DependentTaskTest.class); + /** + * TaskNode.runFlag : task can be run normally + */ + public static final String FLOWNODE_RUN_FLAG_NORMAL = "NORMAL"; private ProcessService processService; - private ApplicationContext applicationContext; - - private MasterConfig config; + /** + * the dependent task to be tested + * ProcessDefinition id=1 + * Task id=task-10, name=D + * ProcessInstance id=100 + * TaskInstance id=1000 + * notice: must be initialized by setupTaskInstance() on each test case + */ + private ProcessInstance processInstance; + private TaskInstance taskInstance; @Before - public void before() throws Exception{ + public void before() { + ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); - config = new MasterConfig(); + MasterConfig config = new MasterConfig(); config.setMasterTaskCommitRetryTimes(3); config.setMasterTaskCommitInterval(1000); - processService = Mockito.mock(ProcessService.class); - DateInterval dateInterval =DependentDateUtils.getTodayInterval(new Date()).get(0); - Mockito.when(processService - .findLastRunningProcess(4, dateInterval.getStartTime(), - dateInterval.getEndTime())) - .thenReturn(findLastProcessInterval()); + Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); + processService = Mockito.mock(ProcessService.class); + Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + processInstance = getProcessInstance(100, 1); + // for MasterBaseTaskExecThread.call + // for DependentTaskExecThread.waitTaskQuit Mockito.when(processService - .getTaskNodeListByDefinitionId(4)) - .thenReturn(getTaskNodes()); - Mockito.when(processService - .findValidTaskListByProcessId(11)) - .thenReturn(getTaskInstances()); + .findProcessInstanceById(100)) + .thenAnswer(i -> processInstance); + // for MasterBaseTaskExecThread.submit Mockito.when(processService - .findTaskInstanceById(252612)) - .thenReturn(getTaskInstance()); + .submitTask(Mockito.argThat(taskInstance -> taskInstance.getId() == 1000))) + .thenAnswer(i -> taskInstance); - - Mockito.when(processService.findProcessInstanceById(10111)) - .thenReturn(getProcessInstance()); - Mockito.when(processService.findProcessDefineById(0)) - .thenReturn(getProcessDefinition()); - Mockito.when(processService.saveTaskInstance(getTaskInstance())) + // for DependentTaskExecThread.initTaskParameters + Mockito.when(processService + .updateTaskInstance(Mockito.any())) + .thenReturn(true); + // for DependentTaskExecThread.updateTaskState + Mockito.when(processService + .saveTaskInstance(Mockito.any())) .thenReturn(true); - applicationContext = Mockito.mock(ApplicationContext.class); - SpringApplicationContext springApplicationContext = new SpringApplicationContext(); - springApplicationContext.setApplicationContext(applicationContext); - Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); - Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); + // for DependentTaskExecThread.waitTaskQuit + Mockito.when(processService + .findTaskInstanceById(1000)) + .thenAnswer(i -> taskInstance); } - @Test - public void testDependAll() throws Exception{ + private void testBasicInit() { + TaskNode taskNode = getDependantTaskNode(); + DependentTaskModel dependentTaskModel = new DependentTaskModel(); + dependentTaskModel.setRelation(DependentRelation.AND); + dependentTaskModel.setDependItemList(Stream.of( + getDependentItemFromTaskNode(2, "A", "today", "day") + ).collect(Collectors.toList())); - TaskInstance taskInstance = getTaskInstance(); - String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"ALL\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}"; - taskInstance.setDependency(dependString); + DependentParameters dependentParameters = new DependentParameters(); + dependentParameters.setRelation(DependentRelation.AND); + dependentParameters.setDependTaskList(Stream.of(dependentTaskModel).collect(Collectors.toList())); - Mockito.when(processService.submitTask(taskInstance)) - .thenReturn(taskInstance); - DependentTaskExecThread dependentTask = - new DependentTaskExecThread(taskInstance); + // dependence: AND(AND(2-A-day-today)) + taskNode.setDependence(JSONUtils.toJsonString(dependentParameters)); - dependentTask.call(); + setupTaskInstance(taskNode); + } - Assert.assertEquals(ExecutionStatus.SUCCESS, dependentTask.getTaskInstance().getState()); + @Test + public void testBasicSuccess() throws Exception { + testBasicInit(); + ProcessInstance dependentProcessInstance = + getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE); + // for DependentExecute.findLastProcessInterval + Mockito.when(processService + .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .thenReturn(dependentProcessInstance); - DateInterval dateInterval =DependentDateUtils.getTodayInterval(new Date()).get(0); + // for DependentExecute.getDependTaskResult + Mockito.when(processService + .findValidTaskListByProcessId(200)) + .thenReturn(Stream.of( + getTaskInstanceForValidTaskList(2000, ExecutionStatus.SUCCESS, "A", dependentProcessInstance), + getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, "B", dependentProcessInstance) + ).collect(Collectors.toList())); + + DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance); + taskExecThread.call(); + Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState()); + } + @Test + public void testBasicFailure() throws Exception { + testBasicInit(); + ProcessInstance dependentProcessInstance = + getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS); + // for DependentExecute.findLastProcessInterval + Mockito.when(processService + .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .thenReturn(dependentProcessInstance); + // for DependentExecute.getDependTaskResult Mockito.when(processService - .findLastRunningProcess(4, dateInterval.getStartTime(), - dateInterval.getEndTime())) - .thenReturn(findLastStopProcessInterval()); - DependentTaskExecThread dependentFailure = new DependentTaskExecThread(taskInstance); - dependentFailure.call(); - Assert.assertEquals(ExecutionStatus.FAILURE, dependentFailure.getTaskInstance().getState()); + .findValidTaskListByProcessId(200)) + .thenReturn(Stream.of( + getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, "A", dependentProcessInstance), + getTaskInstanceForValidTaskList(2000, ExecutionStatus.SUCCESS, "B", dependentProcessInstance) + ).collect(Collectors.toList())); + + DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance); + taskExecThread.call(); + Assert.assertEquals(ExecutionStatus.FAILURE, taskExecThread.getTaskInstance().getState()); } @Test - public void testDependTask() throws Exception{ - - TaskInstance taskInstance = getTaskInstance(); - String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"D\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}"; - taskInstance.setDependency(dependString); - Mockito.when(processService.submitTask(taskInstance)) - .thenReturn(taskInstance); - DependentTaskExecThread dependentTask = - new DependentTaskExecThread(taskInstance); + public void testDependentRelation() throws Exception { + DependentTaskModel dependentTaskModel1 = new DependentTaskModel(); + dependentTaskModel1.setRelation(DependentRelation.AND); + dependentTaskModel1.setDependItemList(Stream.of( + getDependentItemFromTaskNode(2, "A", "today", "day"), + getDependentItemFromTaskNode(3, "B", "today", "day") + ).collect(Collectors.toList())); + + DependentTaskModel dependentTaskModel2 = new DependentTaskModel(); + dependentTaskModel2.setRelation(DependentRelation.OR); + dependentTaskModel2.setDependItemList(Stream.of( + getDependentItemFromTaskNode(2, "A", "today", "day"), + getDependentItemFromTaskNode(3, "C", "today", "day") + ).collect(Collectors.toList())); + + /* + * OR AND 2-A-day-today 3-B-day-today + * OR 2-A-day-today 3-C-day-today + */ + DependentParameters dependentParameters = new DependentParameters(); + dependentParameters.setRelation(DependentRelation.OR); + dependentParameters.setDependTaskList(Stream.of( + dependentTaskModel1, + dependentTaskModel2 + ).collect(Collectors.toList())); + + TaskNode taskNode = getDependantTaskNode(); + taskNode.setDependence(JSONUtils.toJsonString(dependentParameters)); + setupTaskInstance(taskNode); + + ProcessInstance processInstance200 = + getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE); + ProcessInstance processInstance300 = + getProcessInstanceForFindLastRunningProcess(300, 3, ExecutionStatus.SUCCESS); + + // for DependentExecute.findLastProcessInterval + Mockito.when(processService + .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .thenReturn(processInstance200); + Mockito.when(processService + .findLastRunningProcess(Mockito.eq(3), Mockito.any(), Mockito.any())) + .thenReturn(processInstance300); - dependentTask.call(); + // for DependentExecute.getDependTaskResult + Mockito.when(processService + .findValidTaskListByProcessId(200)) + .thenReturn(Stream.of( + getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, "A", processInstance200) + ).collect(Collectors.toList())); + Mockito.when(processService + .findValidTaskListByProcessId(300)) + .thenReturn(Stream.of( + getTaskInstanceForValidTaskList(3000, ExecutionStatus.SUCCESS, "B", processInstance300), + getTaskInstanceForValidTaskList(3001, ExecutionStatus.SUCCESS, "C", processInstance300) + ).collect(Collectors.toList())); + + DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance); + taskExecThread.call(); + Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState()); + } - Assert.assertEquals(ExecutionStatus.SUCCESS, dependentTask.getTaskInstance().getState()); + /** + * test when dependent on ALL tasks in another process + */ + private void testDependentOnAllInit() { + TaskNode taskNode = getDependantTaskNode(); + DependentTaskModel dependentTaskModel = new DependentTaskModel(); + dependentTaskModel.setRelation(DependentRelation.AND); + dependentTaskModel.setDependItemList(Stream.of( + getDependentItemFromTaskNode(2, Constants.DEPENDENT_ALL, "today", "day") + ).collect(Collectors.toList())); + + DependentParameters dependentParameters = new DependentParameters(); + dependentParameters.setRelation(DependentRelation.AND); + dependentParameters.setDependTaskList(Stream.of(dependentTaskModel).collect(Collectors.toList())); + + // dependence: AND(AND(2:ALL today day)) + taskNode.setDependence(JSONUtils.toJsonString(dependentParameters)); + + setupTaskInstance(taskNode); + } - DateInterval dateInterval =DependentDateUtils.getTodayInterval(new Date()).get(0); + @Test + public void testDependentOnAllSuccess() throws Exception { + testDependentOnAllInit(); + // for DependentExecute.findLastProcessInterval Mockito.when(processService - .findLastRunningProcess(4, dateInterval.getStartTime(), - dateInterval.getEndTime())) - .thenReturn(findLastStopProcessInterval()); + .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS)); - Mockito.when(processService - .findValidTaskListByProcessId(11)) - .thenReturn(getErrorTaskInstances()); - DependentTaskExecThread dependentFailure = new DependentTaskExecThread(taskInstance); - dependentFailure.call(); - Assert.assertEquals(ExecutionStatus.FAILURE, dependentFailure.getTaskInstance().getState()); + DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance); + taskExecThread.call(); + Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState()); } - private ProcessInstance findLastStopProcessInterval(){ - ProcessInstance processInstance = new ProcessInstance(); - processInstance.setId(11); - processInstance.setProcessDefinitionId(4); - processInstance.setState(ExecutionStatus.STOP); - return processInstance; - } + @Test + public void testDependentOnAllFailure() throws Exception { + testDependentOnAllInit(); + // for DependentExecute.findLastProcessInterval + Mockito.when(processService + .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE)); - private ProcessInstance findLastProcessInterval(){ - ProcessInstance processInstance = new ProcessInstance(); - processInstance.setId(11); - processInstance.setProcessDefinitionId(4); - processInstance.setState(ExecutionStatus.SUCCESS); - return processInstance; + DependentTaskExecThread dependentTask = new DependentTaskExecThread(taskInstance); + dependentTask.call(); + Assert.assertEquals(ExecutionStatus.FAILURE, dependentTask.getTaskInstance().getState()); } - private ProcessDefinition getProcessDefinition(){ - ProcessDefinition processDefinition = new ProcessDefinition(); - processDefinition.setId(0); - return processDefinition; + /** + * test whether waitTaskQuit has been well impl + */ + @Test + public void testWaitAndCancel() throws Exception { + // for the poor independence of UT, error on other place may causes the condition happens + if (!Stopper.isRunning()) { + return; + } + + TaskNode taskNode = getDependantTaskNode(); + DependentTaskModel dependentTaskModel = new DependentTaskModel(); + dependentTaskModel.setRelation(DependentRelation.AND); + dependentTaskModel.setDependItemList(Stream.of( + getDependentItemFromTaskNode(2, "A", "today", "day") + ).collect(Collectors.toList())); + + DependentParameters dependentParameters = new DependentParameters(); + dependentParameters.setRelation(DependentRelation.AND); + dependentParameters.setDependTaskList(Stream.of(dependentTaskModel).collect(Collectors.toList())); + + // dependence: AND(AND(2:A today day)) + taskNode.setDependence(JSONUtils.toJsonString(dependentParameters)); + + setupTaskInstance(taskNode); + + ProcessInstance dependentProcessInstance = + getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.RUNNING_EXECUTION); + // for DependentExecute.findLastProcessInterval + Mockito.when(processService + .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .thenReturn(dependentProcessInstance); + + DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance); + + // for DependentExecute.getDependTaskResult + Mockito.when(processService + .findValidTaskListByProcessId(200)) + .thenAnswer(i -> { + processInstance.setState(ExecutionStatus.READY_STOP); + return Stream.of( + getTaskInstanceForValidTaskList(2000, ExecutionStatus.RUNNING_EXECUTION, "A", dependentProcessInstance) + ).collect(Collectors.toList()); + }) + .thenThrow(new IllegalStateException("have not been stopped as expected")); + + taskExecThread.call(); + Assert.assertEquals(ExecutionStatus.KILL, taskExecThread.getTaskInstance().getState()); } - private ProcessInstance getProcessInstance(){ + private ProcessInstance getProcessInstance(int processInstanceId, int processDefinitionId) { ProcessInstance processInstance = new ProcessInstance(); - processInstance.setId(10111); - processInstance.setProcessDefinitionId(0); + processInstance.setId(processInstanceId); + processInstance.setProcessDefinitionId(processDefinitionId); processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); - return processInstance; } - - private List getTaskNodes(){ - List list = new ArrayList<>(); + /** + * task that dependent on others (and to be tested here) + * notice: should be filled with setDependence() and be passed to setupTaskInstance() + */ + private TaskNode getDependantTaskNode() { TaskNode taskNode = new TaskNode(); - taskNode.setName("C"); - taskNode.setType("SQL"); - list.add(taskNode); - return list; + taskNode.setId("tasks-10"); + taskNode.setName("D"); + taskNode.setType(TaskType.DEPENDENT.toString()); + taskNode.setRunFlag(FLOWNODE_RUN_FLAG_NORMAL); + return taskNode; } - private List getErrorTaskInstances(){ - List list = new ArrayList<>(); - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setName("C"); - taskInstance.setState(ExecutionStatus.SUCCESS); - taskInstance.setDependency("1231"); - list.add(taskInstance); - return list; + private void setupTaskInstance(TaskNode taskNode) { + taskInstance = new TaskInstance(); + taskInstance.setId(1000); + taskInstance.setProcessInstanceId(processInstance.getId()); + taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId()); + taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); + taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); + taskInstance.setTaskType(taskNode.getType()); + taskInstance.setName(taskNode.getName()); } - private List getTaskInstances(){ - List list = new ArrayList<>(); - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setName("D"); - taskInstance.setState(ExecutionStatus.SUCCESS); - taskInstance.setDependency("1231"); - list.add(taskInstance); - return list; + /** + * DependentItem defines the condition for the dependent + */ + private DependentItem getDependentItemFromTaskNode( + int processDefinitionId, String taskName, + String date, String cycle + ) { + DependentItem dependentItem = new DependentItem(); + dependentItem.setDefinitionId(processDefinitionId); + dependentItem.setDepTasks(taskName); + dependentItem.setDateValue(date); + dependentItem.setCycle(cycle); + // so far, the following fields have no effect + dependentItem.setDependResult(DependResult.SUCCESS); + dependentItem.setStatus(ExecutionStatus.SUCCESS); + return dependentItem; + } + + private ProcessInstance getProcessInstanceForFindLastRunningProcess( + int processInstanceId, int processDefinitionId, ExecutionStatus state + ) { + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(processInstanceId); + processInstance.setProcessDefinitionId(processDefinitionId); + processInstance.setState(state); + return processInstance; } - private TaskInstance getTaskInstance(){ + private TaskInstance getTaskInstanceForValidTaskList( + int taskInstanceId, ExecutionStatus state, + String taskName, ProcessInstance processInstance + ) { TaskInstance taskInstance = new TaskInstance(); taskInstance.setTaskType("DEPENDENT"); - taskInstance.setId(252612); - taskInstance.setName("C"); - taskInstance.setProcessInstanceId(10111); + taskInstance.setId(taskInstanceId); + taskInstance.setName(taskName); + taskInstance.setProcessInstanceId(processInstance.getId()); + taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId()); taskInstance.setTaskJson("{}"); - taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); + taskInstance.setState(state); return taskInstance; } - -} \ No newline at end of file +}