|
|
|
@ -39,7 +39,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
|
|
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
import java.util.stream.Stream; |
|
|
|
|
|
|
|
|
|
import org.junit.Assert; |
|
|
|
|
import org.junit.Before; |
|
|
|
|
import org.junit.Test; |
|
|
|
|
import org.junit.runner.RunWith; |
|
|
|
@ -60,6 +59,10 @@ public class DependentTaskTest {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public static final Long TASK_CODE = 1111L; |
|
|
|
|
public static final Long DEPEND_TASK_CODE_A = 110L; |
|
|
|
|
public static final Long DEPEND_TASK_CODE_B = 111L; |
|
|
|
|
public static final Long DEPEND_TASK_CODE_C = 112L; |
|
|
|
|
public static final Long DEPEND_TASK_CODE_D = 113L; |
|
|
|
|
public static final int TASK_VERSION = 1; |
|
|
|
|
|
|
|
|
|
private ProcessService processService; |
|
|
|
@ -125,7 +128,7 @@ public class DependentTaskTest {
|
|
|
|
|
DependentTaskModel dependentTaskModel = new DependentTaskModel(); |
|
|
|
|
dependentTaskModel.setRelation(DependentRelation.AND); |
|
|
|
|
dependentTaskModel.setDependItemList(Stream.of( |
|
|
|
|
getDependentItemFromTaskNode(2L, "A", "today", "day") |
|
|
|
|
getDependentItemFromTaskNode(2L, DEPEND_TASK_CODE_A, "today", "day") |
|
|
|
|
).collect(Collectors.toList())); |
|
|
|
|
|
|
|
|
|
DependentParameters dependentParameters = new DependentParameters(); |
|
|
|
@ -152,8 +155,8 @@ public class DependentTaskTest {
|
|
|
|
|
Mockito.when(processService |
|
|
|
|
.findValidTaskListByProcessId(200)) |
|
|
|
|
.thenReturn(Stream.of( |
|
|
|
|
getTaskInstanceForValidTaskList(2000, ExecutionStatus.SUCCESS, "A", dependentProcessInstance), |
|
|
|
|
getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, "B", dependentProcessInstance) |
|
|
|
|
getTaskInstanceForValidTaskList(2000, ExecutionStatus.SUCCESS, DEPEND_TASK_CODE_A, dependentProcessInstance), |
|
|
|
|
getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, DEPEND_TASK_CODE_B, dependentProcessInstance) |
|
|
|
|
).collect(Collectors.toList())); |
|
|
|
|
|
|
|
|
|
} |
|
|
|
@ -172,8 +175,8 @@ public class DependentTaskTest {
|
|
|
|
|
Mockito.when(processService |
|
|
|
|
.findValidTaskListByProcessId(200)) |
|
|
|
|
.thenReturn(Stream.of( |
|
|
|
|
getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, "A", dependentProcessInstance), |
|
|
|
|
getTaskInstanceForValidTaskList(2000, ExecutionStatus.SUCCESS, "B", dependentProcessInstance) |
|
|
|
|
getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, DEPEND_TASK_CODE_A, dependentProcessInstance), |
|
|
|
|
getTaskInstanceForValidTaskList(2000, ExecutionStatus.SUCCESS, DEPEND_TASK_CODE_B, dependentProcessInstance) |
|
|
|
|
).collect(Collectors.toList())); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -182,15 +185,15 @@ public class DependentTaskTest {
|
|
|
|
|
DependentTaskModel dependentTaskModel1 = new DependentTaskModel(); |
|
|
|
|
dependentTaskModel1.setRelation(DependentRelation.AND); |
|
|
|
|
dependentTaskModel1.setDependItemList(Stream.of( |
|
|
|
|
getDependentItemFromTaskNode(2L, "A", "today", "day"), |
|
|
|
|
getDependentItemFromTaskNode(3L, "B", "today", "day") |
|
|
|
|
getDependentItemFromTaskNode(2L, DEPEND_TASK_CODE_A, "today", "day"), |
|
|
|
|
getDependentItemFromTaskNode(3L, DEPEND_TASK_CODE_B, "today", "day") |
|
|
|
|
).collect(Collectors.toList())); |
|
|
|
|
|
|
|
|
|
DependentTaskModel dependentTaskModel2 = new DependentTaskModel(); |
|
|
|
|
dependentTaskModel2.setRelation(DependentRelation.OR); |
|
|
|
|
dependentTaskModel2.setDependItemList(Stream.of( |
|
|
|
|
getDependentItemFromTaskNode(2L, "A", "today", "day"), |
|
|
|
|
getDependentItemFromTaskNode(3L, "C", "today", "day") |
|
|
|
|
getDependentItemFromTaskNode(2L, DEPEND_TASK_CODE_A, "today", "day"), |
|
|
|
|
getDependentItemFromTaskNode(3L, DEPEND_TASK_CODE_C, "today", "day") |
|
|
|
|
).collect(Collectors.toList())); |
|
|
|
|
|
|
|
|
|
/* |
|
|
|
@ -225,13 +228,13 @@ public class DependentTaskTest {
|
|
|
|
|
Mockito.when(processService |
|
|
|
|
.findValidTaskListByProcessId(200)) |
|
|
|
|
.thenReturn(Stream.of( |
|
|
|
|
getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, "A", processInstance200) |
|
|
|
|
getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, DEPEND_TASK_CODE_A, processInstance200) |
|
|
|
|
).collect(Collectors.toList())); |
|
|
|
|
Mockito.when(processService |
|
|
|
|
.findValidTaskListByProcessId(300)) |
|
|
|
|
.thenReturn(Stream.of( |
|
|
|
|
getTaskInstanceForValidTaskList(3000, ExecutionStatus.SUCCESS, "B", processInstance300), |
|
|
|
|
getTaskInstanceForValidTaskList(3001, ExecutionStatus.SUCCESS, "C", processInstance300) |
|
|
|
|
getTaskInstanceForValidTaskList(3000, ExecutionStatus.SUCCESS, DEPEND_TASK_CODE_B, processInstance300), |
|
|
|
|
getTaskInstanceForValidTaskList(3001, ExecutionStatus.SUCCESS, DEPEND_TASK_CODE_C, processInstance300) |
|
|
|
|
).collect(Collectors.toList())); |
|
|
|
|
|
|
|
|
|
//DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
|
|
|
|
@ -247,7 +250,7 @@ public class DependentTaskTest {
|
|
|
|
|
DependentTaskModel dependentTaskModel = new DependentTaskModel(); |
|
|
|
|
dependentTaskModel.setRelation(DependentRelation.AND); |
|
|
|
|
dependentTaskModel.setDependItemList(Stream.of( |
|
|
|
|
getDependentItemFromTaskNode(2L, Constants.DEPENDENT_ALL, "today", "day") |
|
|
|
|
getDependentItemFromTaskNode(2L, Constants.DEPENDENT_ALL_TASK_CODE, "today", "day") |
|
|
|
|
).collect(Collectors.toList())); |
|
|
|
|
|
|
|
|
|
DependentParameters dependentParameters = new DependentParameters(); |
|
|
|
@ -300,7 +303,7 @@ public class DependentTaskTest {
|
|
|
|
|
DependentTaskModel dependentTaskModel = new DependentTaskModel(); |
|
|
|
|
dependentTaskModel.setRelation(DependentRelation.AND); |
|
|
|
|
dependentTaskModel.setDependItemList(Stream.of( |
|
|
|
|
getDependentItemFromTaskNode(2L, "A", "today", "day") |
|
|
|
|
getDependentItemFromTaskNode(2L, DEPEND_TASK_CODE_A, "today", "day") |
|
|
|
|
).collect(Collectors.toList())); |
|
|
|
|
|
|
|
|
|
DependentParameters dependentParameters = new DependentParameters(); |
|
|
|
@ -327,7 +330,7 @@ public class DependentTaskTest {
|
|
|
|
|
.thenAnswer(i -> { |
|
|
|
|
processInstance.setState(ExecutionStatus.READY_STOP); |
|
|
|
|
return Stream.of( |
|
|
|
|
getTaskInstanceForValidTaskList(2000, ExecutionStatus.RUNNING_EXECUTION, "A", dependentProcessInstance) |
|
|
|
|
getTaskInstanceForValidTaskList(2000, ExecutionStatus.RUNNING_EXECUTION, DEPEND_TASK_CODE_A, dependentProcessInstance) |
|
|
|
|
).collect(Collectors.toList()); |
|
|
|
|
}) |
|
|
|
|
.thenThrow(new IllegalStateException("have not been stopped as expected")); |
|
|
|
@ -351,6 +354,7 @@ public class DependentTaskTest {
|
|
|
|
|
TaskNode taskNode = new TaskNode(); |
|
|
|
|
taskNode.setId("tasks-10"); |
|
|
|
|
taskNode.setName("D"); |
|
|
|
|
taskNode.setCode(DEPEND_TASK_CODE_D); |
|
|
|
|
taskNode.setType(TaskType.DEPENDENT.getDesc()); |
|
|
|
|
taskNode.setRunFlag(FLOWNODE_RUN_FLAG_NORMAL); |
|
|
|
|
return taskNode; |
|
|
|
@ -380,10 +384,10 @@ public class DependentTaskTest {
|
|
|
|
|
/** |
|
|
|
|
* DependentItem defines the condition for the dependent |
|
|
|
|
*/ |
|
|
|
|
private DependentItem getDependentItemFromTaskNode(Long processDefinitionCode, String taskName, String date, String cycle) { |
|
|
|
|
private DependentItem getDependentItemFromTaskNode(Long processDefinitionCode, long taskCode, String date, String cycle) { |
|
|
|
|
DependentItem dependentItem = new DependentItem(); |
|
|
|
|
dependentItem.setDefinitionCode(processDefinitionCode); |
|
|
|
|
dependentItem.setDepTasks(taskName); |
|
|
|
|
dependentItem.setDepTaskCode(taskCode); |
|
|
|
|
dependentItem.setDateValue(date); |
|
|
|
|
dependentItem.setCycle(cycle); |
|
|
|
|
// so far, the following fields have no effect
|
|
|
|
@ -401,12 +405,12 @@ public class DependentTaskTest {
|
|
|
|
|
|
|
|
|
|
private TaskInstance getTaskInstanceForValidTaskList( |
|
|
|
|
int taskInstanceId, ExecutionStatus state, |
|
|
|
|
String taskName, ProcessInstance processInstance |
|
|
|
|
long taskCode, ProcessInstance processInstance |
|
|
|
|
) { |
|
|
|
|
TaskInstance taskInstance = new TaskInstance(); |
|
|
|
|
taskInstance.setTaskType(TaskType.DEPENDENT.getDesc()); |
|
|
|
|
taskInstance.setId(taskInstanceId); |
|
|
|
|
taskInstance.setName(taskName); |
|
|
|
|
taskInstance.setTaskCode(taskCode); |
|
|
|
|
taskInstance.setProcessInstanceId(processInstance.getId()); |
|
|
|
|
taskInstance.setState(state); |
|
|
|
|
return taskInstance; |
|
|
|
|