|
|
@ -19,16 +19,16 @@ package org.apache.dolphinscheduler.server.master; |
|
|
|
|
|
|
|
|
|
|
|
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING; |
|
|
|
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING; |
|
|
|
|
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; |
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; |
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; |
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.common.enums.TimeoutFlag; |
|
|
|
import org.apache.dolphinscheduler.common.enums.TimeoutFlag; |
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; |
|
|
|
import org.apache.dolphinscheduler.common.model.TaskNode; |
|
|
|
import org.apache.dolphinscheduler.common.model.TaskNode; |
|
|
|
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
|
|
|
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
|
|
|
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation; |
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; |
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters; |
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters; |
|
|
@ -47,14 +47,12 @@ import java.util.stream.Collectors; |
|
|
|
import java.util.stream.Stream; |
|
|
|
import java.util.stream.Stream; |
|
|
|
|
|
|
|
|
|
|
|
import org.junit.Assert; |
|
|
|
import org.junit.Assert; |
|
|
|
import org.junit.Before; |
|
|
|
import org.junit.jupiter.api.AfterEach; |
|
|
|
import org.junit.Test; |
|
|
|
import org.junit.jupiter.api.BeforeEach; |
|
|
|
import org.junit.runner.RunWith; |
|
|
|
import org.junit.jupiter.api.Test; |
|
|
|
|
|
|
|
import org.mockito.MockedStatic; |
|
|
|
import org.mockito.Mockito; |
|
|
|
import org.mockito.Mockito; |
|
|
|
import org.mockito.junit.MockitoJUnitRunner; |
|
|
|
|
|
|
|
import org.springframework.context.ApplicationContext; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@RunWith(MockitoJUnitRunner.Silent.class) |
|
|
|
|
|
|
|
public class BlockingTaskTest { |
|
|
|
public class BlockingTaskTest { |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
@ -68,22 +66,22 @@ public class BlockingTaskTest { |
|
|
|
|
|
|
|
|
|
|
|
private MasterConfig config; |
|
|
|
private MasterConfig config; |
|
|
|
|
|
|
|
|
|
|
|
@Before |
|
|
|
private MockedStatic<SpringApplicationContext> mockedStaticSpringApplicationContext; |
|
|
|
public void before() { |
|
|
|
|
|
|
|
// init spring context
|
|
|
|
|
|
|
|
ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class); |
|
|
|
|
|
|
|
SpringApplicationContext springApplicationContext = new SpringApplicationContext(); |
|
|
|
|
|
|
|
springApplicationContext.setApplicationContext(applicationContext); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@BeforeEach |
|
|
|
|
|
|
|
public void before() { |
|
|
|
// mock master
|
|
|
|
// mock master
|
|
|
|
config = new MasterConfig(); |
|
|
|
config = new MasterConfig(); |
|
|
|
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); |
|
|
|
|
|
|
|
config.setTaskCommitRetryTimes(3); |
|
|
|
config.setTaskCommitRetryTimes(3); |
|
|
|
config.setTaskCommitInterval(Duration.ofSeconds(1)); |
|
|
|
config.setTaskCommitInterval(Duration.ofSeconds(1)); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mockedStaticSpringApplicationContext = Mockito.mockStatic(SpringApplicationContext.class); |
|
|
|
|
|
|
|
Mockito.when(SpringApplicationContext.getBean(MasterConfig.class)).thenReturn(config); |
|
|
|
|
|
|
|
|
|
|
|
// mock process service
|
|
|
|
// mock process service
|
|
|
|
processService = Mockito.mock(ProcessService.class); |
|
|
|
processService = Mockito.mock(ProcessService.class); |
|
|
|
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); |
|
|
|
|
|
|
|
|
|
|
|
Mockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService); |
|
|
|
|
|
|
|
|
|
|
|
// mock process instance
|
|
|
|
// mock process instance
|
|
|
|
processInstance = getProcessInstance(); |
|
|
|
processInstance = getProcessInstance(); |
|
|
@ -99,6 +97,11 @@ public class BlockingTaskTest { |
|
|
|
.thenReturn(taskDefinition); |
|
|
|
.thenReturn(taskDefinition); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@AfterEach |
|
|
|
|
|
|
|
public void after() { |
|
|
|
|
|
|
|
mockedStaticSpringApplicationContext.close(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private ProcessInstance getProcessInstance() { |
|
|
|
private ProcessInstance getProcessInstance() { |
|
|
|
// mock process instance
|
|
|
|
// mock process instance
|
|
|
|
ProcessInstance processInstance = new ProcessInstance(); |
|
|
|
ProcessInstance processInstance = new ProcessInstance(); |
|
|
@ -200,8 +203,8 @@ public class BlockingTaskTest { |
|
|
|
|
|
|
|
|
|
|
|
// for BlockingTaskExecThread.waitTaskQuit
|
|
|
|
// for BlockingTaskExecThread.waitTaskQuit
|
|
|
|
List<TaskInstance> conditions = getTaskInstanceForValidTaskList(expectResults); |
|
|
|
List<TaskInstance> conditions = getTaskInstanceForValidTaskList(expectResults); |
|
|
|
Mockito.when(processService. |
|
|
|
Mockito.when( |
|
|
|
findValidTaskListByProcessId(processInstance.getId(),processInstance.getTestFlag())) |
|
|
|
processService.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag())) |
|
|
|
.thenReturn(conditions); |
|
|
|
.thenReturn(conditions); |
|
|
|
return taskInstance; |
|
|
|
return taskInstance; |
|
|
|
} |
|
|
|
} |
|
|
|