Browse Source

[Improvement][Server] Improve dependent task unit tests (#4911)

pull/3/MERGE
Shiwen Cheng 4 years ago committed by GitHub
parent
commit
ccd8aaebab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 439
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

439
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -14,212 +14,399 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.dependent.DependentDateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.DependentTaskExecThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* DependentTaskTest
*/
@RunWith(MockitoJUnitRunner.Silent.class)
public class DependentTaskTest {
private static final Logger logger = LoggerFactory.getLogger(DependentTaskTest.class);
/**
* TaskNode.runFlag : task can be run normally
*/
public static final String FLOWNODE_RUN_FLAG_NORMAL = "NORMAL";
private ProcessService processService;
private ApplicationContext applicationContext;
private MasterConfig config;
/**
* the dependent task to be tested
* ProcessDefinition id=1
* Task id=task-10, name=D
* ProcessInstance id=100
* TaskInstance id=1000
* notice: must be initialized by setupTaskInstance() on each test case
*/
private ProcessInstance processInstance;
private TaskInstance taskInstance;
@Before
public void before() throws Exception{
public void before() {
ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
config = new MasterConfig();
MasterConfig config = new MasterConfig();
config.setMasterTaskCommitRetryTimes(3);
config.setMasterTaskCommitInterval(1000);
processService = Mockito.mock(ProcessService.class);
DateInterval dateInterval =DependentDateUtils.getTodayInterval(new Date()).get(0);
Mockito.when(processService
.findLastRunningProcess(4, dateInterval.getStartTime(),
dateInterval.getEndTime()))
.thenReturn(findLastProcessInterval());
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
processInstance = getProcessInstance(100, 1);
// for MasterBaseTaskExecThread.call
// for DependentTaskExecThread.waitTaskQuit
Mockito.when(processService
.getTaskNodeListByDefinitionId(4))
.thenReturn(getTaskNodes());
Mockito.when(processService
.findValidTaskListByProcessId(11))
.thenReturn(getTaskInstances());
.findProcessInstanceById(100))
.thenAnswer(i -> processInstance);
// for MasterBaseTaskExecThread.submit
Mockito.when(processService
.findTaskInstanceById(252612))
.thenReturn(getTaskInstance());
.submitTask(Mockito.argThat(taskInstance -> taskInstance.getId() == 1000)))
.thenAnswer(i -> taskInstance);
Mockito.when(processService.findProcessInstanceById(10111))
.thenReturn(getProcessInstance());
Mockito.when(processService.findProcessDefineById(0))
.thenReturn(getProcessDefinition());
Mockito.when(processService.saveTaskInstance(getTaskInstance()))
// for DependentTaskExecThread.initTaskParameters
Mockito.when(processService
.updateTaskInstance(Mockito.any()))
.thenReturn(true);
// for DependentTaskExecThread.updateTaskState
Mockito.when(processService
.saveTaskInstance(Mockito.any()))
.thenReturn(true);
applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
// for DependentTaskExecThread.waitTaskQuit
Mockito.when(processService
.findTaskInstanceById(1000))
.thenAnswer(i -> taskInstance);
}
@Test
public void testDependAll() throws Exception{
private void testBasicInit() {
TaskNode taskNode = getDependantTaskNode();
DependentTaskModel dependentTaskModel = new DependentTaskModel();
dependentTaskModel.setRelation(DependentRelation.AND);
dependentTaskModel.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, "A", "today", "day")
).collect(Collectors.toList()));
TaskInstance taskInstance = getTaskInstance();
String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"ALL\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}";
taskInstance.setDependency(dependString);
DependentParameters dependentParameters = new DependentParameters();
dependentParameters.setRelation(DependentRelation.AND);
dependentParameters.setDependTaskList(Stream.of(dependentTaskModel).collect(Collectors.toList()));
Mockito.when(processService.submitTask(taskInstance))
.thenReturn(taskInstance);
DependentTaskExecThread dependentTask =
new DependentTaskExecThread(taskInstance);
// dependence: AND(AND(2-A-day-today))
taskNode.setDependence(JSONUtils.toJsonString(dependentParameters));
dependentTask.call();
setupTaskInstance(taskNode);
}
Assert.assertEquals(ExecutionStatus.SUCCESS, dependentTask.getTaskInstance().getState());
@Test
public void testBasicSuccess() throws Exception {
testBasicInit();
ProcessInstance dependentProcessInstance =
getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.thenReturn(dependentProcessInstance);
DateInterval dateInterval =DependentDateUtils.getTodayInterval(new Date()).get(0);
// for DependentExecute.getDependTaskResult
Mockito.when(processService
.findValidTaskListByProcessId(200))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(2000, ExecutionStatus.SUCCESS, "A", dependentProcessInstance),
getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, "B", dependentProcessInstance)
).collect(Collectors.toList()));
DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
taskExecThread.call();
Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState());
}
@Test
public void testBasicFailure() throws Exception {
testBasicInit();
ProcessInstance dependentProcessInstance =
getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.thenReturn(dependentProcessInstance);
// for DependentExecute.getDependTaskResult
Mockito.when(processService
.findLastRunningProcess(4, dateInterval.getStartTime(),
dateInterval.getEndTime()))
.thenReturn(findLastStopProcessInterval());
DependentTaskExecThread dependentFailure = new DependentTaskExecThread(taskInstance);
dependentFailure.call();
Assert.assertEquals(ExecutionStatus.FAILURE, dependentFailure.getTaskInstance().getState());
.findValidTaskListByProcessId(200))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, "A", dependentProcessInstance),
getTaskInstanceForValidTaskList(2000, ExecutionStatus.SUCCESS, "B", dependentProcessInstance)
).collect(Collectors.toList()));
DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
taskExecThread.call();
Assert.assertEquals(ExecutionStatus.FAILURE, taskExecThread.getTaskInstance().getState());
}
@Test
public void testDependTask() throws Exception{
public void testDependentRelation() throws Exception {
DependentTaskModel dependentTaskModel1 = new DependentTaskModel();
dependentTaskModel1.setRelation(DependentRelation.AND);
dependentTaskModel1.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, "A", "today", "day"),
getDependentItemFromTaskNode(3, "B", "today", "day")
).collect(Collectors.toList()));
DependentTaskModel dependentTaskModel2 = new DependentTaskModel();
dependentTaskModel2.setRelation(DependentRelation.OR);
dependentTaskModel2.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, "A", "today", "day"),
getDependentItemFromTaskNode(3, "C", "today", "day")
).collect(Collectors.toList()));
TaskInstance taskInstance = getTaskInstance();
String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"D\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}";
taskInstance.setDependency(dependString);
Mockito.when(processService.submitTask(taskInstance))
.thenReturn(taskInstance);
DependentTaskExecThread dependentTask =
new DependentTaskExecThread(taskInstance);
/*
* OR AND 2-A-day-today 3-B-day-today
* OR 2-A-day-today 3-C-day-today
*/
DependentParameters dependentParameters = new DependentParameters();
dependentParameters.setRelation(DependentRelation.OR);
dependentParameters.setDependTaskList(Stream.of(
dependentTaskModel1,
dependentTaskModel2
).collect(Collectors.toList()));
TaskNode taskNode = getDependantTaskNode();
taskNode.setDependence(JSONUtils.toJsonString(dependentParameters));
setupTaskInstance(taskNode);
ProcessInstance processInstance200 =
getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE);
ProcessInstance processInstance300 =
getProcessInstanceForFindLastRunningProcess(300, 3, ExecutionStatus.SUCCESS);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.thenReturn(processInstance200);
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(3), Mockito.any(), Mockito.any()))
.thenReturn(processInstance300);
dependentTask.call();
// for DependentExecute.getDependTaskResult
Mockito.when(processService
.findValidTaskListByProcessId(200))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, "A", processInstance200)
).collect(Collectors.toList()));
Mockito.when(processService
.findValidTaskListByProcessId(300))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(3000, ExecutionStatus.SUCCESS, "B", processInstance300),
getTaskInstanceForValidTaskList(3001, ExecutionStatus.SUCCESS, "C", processInstance300)
).collect(Collectors.toList()));
DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
taskExecThread.call();
Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState());
}
Assert.assertEquals(ExecutionStatus.SUCCESS, dependentTask.getTaskInstance().getState());
/**
* test when dependent on ALL tasks in another process
*/
private void testDependentOnAllInit() {
TaskNode taskNode = getDependantTaskNode();
DependentTaskModel dependentTaskModel = new DependentTaskModel();
dependentTaskModel.setRelation(DependentRelation.AND);
dependentTaskModel.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, Constants.DEPENDENT_ALL, "today", "day")
).collect(Collectors.toList()));
DependentParameters dependentParameters = new DependentParameters();
dependentParameters.setRelation(DependentRelation.AND);
dependentParameters.setDependTaskList(Stream.of(dependentTaskModel).collect(Collectors.toList()));
// dependence: AND(AND(2:ALL today day))
taskNode.setDependence(JSONUtils.toJsonString(dependentParameters));
setupTaskInstance(taskNode);
}
DateInterval dateInterval =DependentDateUtils.getTodayInterval(new Date()).get(0);
@Test
public void testDependentOnAllSuccess() throws Exception {
testDependentOnAllInit();
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(4, dateInterval.getStartTime(),
dateInterval.getEndTime()))
.thenReturn(findLastStopProcessInterval());
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS));
Mockito.when(processService
.findValidTaskListByProcessId(11))
.thenReturn(getErrorTaskInstances());
DependentTaskExecThread dependentFailure = new DependentTaskExecThread(taskInstance);
dependentFailure.call();
Assert.assertEquals(ExecutionStatus.FAILURE, dependentFailure.getTaskInstance().getState());
DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
taskExecThread.call();
Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState());
}
private ProcessInstance findLastStopProcessInterval(){
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(11);
processInstance.setProcessDefinitionId(4);
processInstance.setState(ExecutionStatus.STOP);
return processInstance;
@Test
public void testDependentOnAllFailure() throws Exception {
testDependentOnAllInit();
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE));
DependentTaskExecThread dependentTask = new DependentTaskExecThread(taskInstance);
dependentTask.call();
Assert.assertEquals(ExecutionStatus.FAILURE, dependentTask.getTaskInstance().getState());
}
private ProcessInstance findLastProcessInterval(){
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(11);
processInstance.setProcessDefinitionId(4);
processInstance.setState(ExecutionStatus.SUCCESS);
return processInstance;
/**
* test whether waitTaskQuit has been well impl
*/
@Test
public void testWaitAndCancel() throws Exception {
// for the poor independence of UT, error on other place may causes the condition happens
if (!Stopper.isRunning()) {
return;
}
private ProcessDefinition getProcessDefinition(){
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(0);
return processDefinition;
TaskNode taskNode = getDependantTaskNode();
DependentTaskModel dependentTaskModel = new DependentTaskModel();
dependentTaskModel.setRelation(DependentRelation.AND);
dependentTaskModel.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, "A", "today", "day")
).collect(Collectors.toList()));
DependentParameters dependentParameters = new DependentParameters();
dependentParameters.setRelation(DependentRelation.AND);
dependentParameters.setDependTaskList(Stream.of(dependentTaskModel).collect(Collectors.toList()));
// dependence: AND(AND(2:A today day))
taskNode.setDependence(JSONUtils.toJsonString(dependentParameters));
setupTaskInstance(taskNode);
ProcessInstance dependentProcessInstance =
getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.RUNNING_EXECUTION);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.thenReturn(dependentProcessInstance);
DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
// for DependentExecute.getDependTaskResult
Mockito.when(processService
.findValidTaskListByProcessId(200))
.thenAnswer(i -> {
processInstance.setState(ExecutionStatus.READY_STOP);
return Stream.of(
getTaskInstanceForValidTaskList(2000, ExecutionStatus.RUNNING_EXECUTION, "A", dependentProcessInstance)
).collect(Collectors.toList());
})
.thenThrow(new IllegalStateException("have not been stopped as expected"));
taskExecThread.call();
Assert.assertEquals(ExecutionStatus.KILL, taskExecThread.getTaskInstance().getState());
}
private ProcessInstance getProcessInstance(){
private ProcessInstance getProcessInstance(int processInstanceId, int processDefinitionId) {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(10111);
processInstance.setProcessDefinitionId(0);
processInstance.setId(processInstanceId);
processInstance.setProcessDefinitionId(processDefinitionId);
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
return processInstance;
}
private List<TaskNode> getTaskNodes(){
List<TaskNode> list = new ArrayList<>();
/**
* task that dependent on others (and to be tested here)
* notice: should be filled with setDependence() and be passed to setupTaskInstance()
*/
private TaskNode getDependantTaskNode() {
TaskNode taskNode = new TaskNode();
taskNode.setName("C");
taskNode.setType("SQL");
list.add(taskNode);
return list;
taskNode.setId("tasks-10");
taskNode.setName("D");
taskNode.setType(TaskType.DEPENDENT.toString());
taskNode.setRunFlag(FLOWNODE_RUN_FLAG_NORMAL);
return taskNode;
}
private List<TaskInstance> getErrorTaskInstances(){
List<TaskInstance> list = new ArrayList<>();
TaskInstance taskInstance = new TaskInstance();
taskInstance.setName("C");
taskInstance.setState(ExecutionStatus.SUCCESS);
taskInstance.setDependency("1231");
list.add(taskInstance);
return list;
private void setupTaskInstance(TaskNode taskNode) {
taskInstance = new TaskInstance();
taskInstance.setId(1000);
taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId());
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
taskInstance.setTaskType(taskNode.getType());
taskInstance.setName(taskNode.getName());
}
private List<TaskInstance> getTaskInstances(){
List<TaskInstance> list = new ArrayList<>();
TaskInstance taskInstance = new TaskInstance();
taskInstance.setName("D");
taskInstance.setState(ExecutionStatus.SUCCESS);
taskInstance.setDependency("1231");
list.add(taskInstance);
return list;
/**
* DependentItem defines the condition for the dependent
*/
private DependentItem getDependentItemFromTaskNode(
int processDefinitionId, String taskName,
String date, String cycle
) {
DependentItem dependentItem = new DependentItem();
dependentItem.setDefinitionId(processDefinitionId);
dependentItem.setDepTasks(taskName);
dependentItem.setDateValue(date);
dependentItem.setCycle(cycle);
// so far, the following fields have no effect
dependentItem.setDependResult(DependResult.SUCCESS);
dependentItem.setStatus(ExecutionStatus.SUCCESS);
return dependentItem;
}
private TaskInstance getTaskInstance(){
private ProcessInstance getProcessInstanceForFindLastRunningProcess(
int processInstanceId, int processDefinitionId, ExecutionStatus state
) {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(processInstanceId);
processInstance.setProcessDefinitionId(processDefinitionId);
processInstance.setState(state);
return processInstance;
}
private TaskInstance getTaskInstanceForValidTaskList(
int taskInstanceId, ExecutionStatus state,
String taskName, ProcessInstance processInstance
) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("DEPENDENT");
taskInstance.setId(252612);
taskInstance.setName("C");
taskInstance.setProcessInstanceId(10111);
taskInstance.setId(taskInstanceId);
taskInstance.setName(taskName);
taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId());
taskInstance.setTaskJson("{}");
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
taskInstance.setState(state);
return taskInstance;
}
}
Loading…
Cancel
Save