|
|
|
@ -15,7 +15,7 @@
|
|
|
|
|
* limitations under the License. |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
package org.apache.dolphinscheduler.server.master.it.cases; |
|
|
|
|
package org.apache.dolphinscheduler.server.master.integration.cases; |
|
|
|
|
|
|
|
|
|
import static org.awaitility.Awaitility.await; |
|
|
|
|
|
|
|
|
@ -24,12 +24,12 @@ import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
|
|
|
|
|
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
|
|
|
|
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; |
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTest; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTestCase; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBus; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.engine.system.event.GlobalMasterFailoverEvent; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.it.Repository; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.it.WorkflowITContext; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.it.WorkflowITContextFactory; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.integration.Repository; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.integration.WorkflowTestCaseContext; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.integration.WorkflowTestCaseContextFactory; |
|
|
|
|
|
|
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
|
|
|
|
|
|
@ -41,10 +41,10 @@ import org.assertj.core.api.Assertions;
|
|
|
|
|
import org.junit.jupiter.api.Test; |
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
|
|
|
|
|
|
public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest { |
|
|
|
|
public class WorkflowInstanceFailoverTestCase extends AbstractMasterIntegrationTestCase { |
|
|
|
|
|
|
|
|
|
@Autowired |
|
|
|
|
private WorkflowITContextFactory workflowITContextFactory; |
|
|
|
|
private WorkflowTestCaseContextFactory workflowTestCaseContextFactory; |
|
|
|
|
|
|
|
|
|
@Autowired |
|
|
|
|
private SystemEventBus systemEventBus; |
|
|
|
@ -55,7 +55,7 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
@Test |
|
|
|
|
public void testGlobalFailover_runningWorkflow_withSubmittedTasks() { |
|
|
|
|
final String yaml = "/it/failover/running_workflowInstance_with_one_submitted_fake_task.yaml"; |
|
|
|
|
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowDefinition workflow = context.getWorkflows().get(0); |
|
|
|
|
|
|
|
|
|
systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date())); |
|
|
|
@ -91,7 +91,7 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
@Test |
|
|
|
|
public void testGlobalFailover_runningWorkflow_withDispatchTasks() { |
|
|
|
|
final String yaml = "/it/failover/running_workflowInstance_with_one_dispatched_fake_task.yaml"; |
|
|
|
|
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowDefinition workflow = context.getWorkflows().get(0); |
|
|
|
|
|
|
|
|
|
systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date())); |
|
|
|
@ -131,7 +131,7 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
@Test |
|
|
|
|
public void testGlobalFailover_runningWorkflow_withRunningTasks() { |
|
|
|
|
final String yaml = "/it/failover/running_workflowInstance_with_one_running_fake_task.yaml"; |
|
|
|
|
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowDefinition workflow = context.getWorkflows().get(0); |
|
|
|
|
|
|
|
|
|
systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date())); |
|
|
|
@ -171,7 +171,7 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
@Test |
|
|
|
|
public void testGlobalFailover_runningWorkflow_withSuccessTasks() { |
|
|
|
|
final String yaml = "/it/failover/running_workflowInstance_with_one_success_fake_task.yaml"; |
|
|
|
|
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowDefinition workflow = context.getWorkflows().get(0); |
|
|
|
|
|
|
|
|
|
systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date())); |
|
|
|
@ -205,7 +205,7 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
@Test |
|
|
|
|
public void testGlobalFailover_runningWorkflow_withFailedTasks() { |
|
|
|
|
final String yaml = "/it/failover/running_workflowInstance_with_one_failed_fake_task.yaml"; |
|
|
|
|
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowDefinition workflow = context.getWorkflows().get(0); |
|
|
|
|
|
|
|
|
|
systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date())); |
|
|
|
@ -240,7 +240,7 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
@Test |
|
|
|
|
public void testGlobalFailover_readyPauseWorkflow_withSubmittedTasks() { |
|
|
|
|
final String yaml = "/it/failover/readyPause_workflowInstance_with_one_submitted_fake_task.yaml"; |
|
|
|
|
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowDefinition workflow = context.getWorkflows().get(0); |
|
|
|
|
|
|
|
|
|
systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date())); |
|
|
|
@ -275,11 +275,14 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
@Test |
|
|
|
|
public void testGlobalFailover_readyPauseWorkflow_withDispatchedTasks() { |
|
|
|
|
final String yaml = "/it/failover/readyPause_workflowInstance_with_one_dispatched_fake_task.yaml"; |
|
|
|
|
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowDefinition workflow = context.getWorkflows().get(0); |
|
|
|
|
|
|
|
|
|
systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date())); |
|
|
|
|
|
|
|
|
|
// Since the task take over failed
|
|
|
|
|
// So will create a new task instance and trigger it, but the workflow instance is ready pause
|
|
|
|
|
// The task will be paused.
|
|
|
|
|
await() |
|
|
|
|
.atMost(Duration.ofMinutes(1)) |
|
|
|
|
.untilAsserted(() -> { |
|
|
|
@ -289,7 +292,7 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
.anySatisfy(workflowInstance -> { |
|
|
|
|
Assertions |
|
|
|
|
.assertThat(workflowInstance.getState()) |
|
|
|
|
.isEqualTo(WorkflowExecutionStatus.SUCCESS); |
|
|
|
|
.isEqualTo(WorkflowExecutionStatus.PAUSE); |
|
|
|
|
Assertions |
|
|
|
|
.assertThat(workflowInstance.getName()) |
|
|
|
|
.isEqualTo("workflow_with_one_fake_task_success-20240816071251690"); |
|
|
|
@ -306,7 +309,7 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
|
|
|
|
|
Assertions |
|
|
|
|
.assertThat(taskInstances.get(1)) |
|
|
|
|
.matches(t -> t.getState() == TaskExecutionStatus.SUCCESS) |
|
|
|
|
.matches(t -> t.getState() == TaskExecutionStatus.PAUSE) |
|
|
|
|
.matches(t -> t.getFlag() == Flag.YES); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
@ -315,7 +318,7 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
@Test |
|
|
|
|
public void testGlobalFailover_readyPauseWorkflow_withSuccessTasks() { |
|
|
|
|
final String yaml = "/it/failover/readyPause_workflowInstance_with_one_success_fake_task.yaml"; |
|
|
|
|
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowDefinition workflow = context.getWorkflows().get(0); |
|
|
|
|
|
|
|
|
|
systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date())); |
|
|
|
@ -350,7 +353,7 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
@Test |
|
|
|
|
public void testGlobalFailover_readyPauseWorkflow_withFailedTasks() { |
|
|
|
|
final String yaml = "/it/failover/readyPause_workflowInstance_with_one_failed_fake_task.yaml"; |
|
|
|
|
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowDefinition workflow = context.getWorkflows().get(0); |
|
|
|
|
|
|
|
|
|
systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date())); |
|
|
|
@ -385,7 +388,7 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
@Test |
|
|
|
|
public void testGlobalFailover_readyPauseWorkflow_withPausedTasks() { |
|
|
|
|
final String yaml = "/it/failover/readyPause_workflowInstance_with_one_paused_fake_task.yaml"; |
|
|
|
|
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowDefinition workflow = context.getWorkflows().get(0); |
|
|
|
|
|
|
|
|
|
systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date())); |
|
|
|
@ -420,7 +423,7 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
@Test |
|
|
|
|
public void testGlobalFailover_readyStopWorkflow_withSubmittedTasks() { |
|
|
|
|
final String yaml = "/it/failover/readyStop_workflowInstance_with_one_submitted_fake_task.yaml"; |
|
|
|
|
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowDefinition workflow = context.getWorkflows().get(0); |
|
|
|
|
|
|
|
|
|
systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date())); |
|
|
|
@ -455,11 +458,14 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
@Test |
|
|
|
|
public void testGlobalFailover_readyStopWorkflow_withDispatchedTasks() { |
|
|
|
|
final String yaml = "/it/failover/readyStop_workflowInstance_with_one_dispatched_fake_task.yaml"; |
|
|
|
|
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowDefinition workflow = context.getWorkflows().get(0); |
|
|
|
|
|
|
|
|
|
systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date())); |
|
|
|
|
|
|
|
|
|
// Since the task take over failed
|
|
|
|
|
// So will create a new task instance and trigger it, but the workflow instance is ready stop
|
|
|
|
|
// The task will be killed.
|
|
|
|
|
await() |
|
|
|
|
.atMost(Duration.ofMinutes(1)) |
|
|
|
|
.untilAsserted(() -> { |
|
|
|
@ -469,7 +475,7 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
.anySatisfy(workflowInstance -> { |
|
|
|
|
Assertions |
|
|
|
|
.assertThat(workflowInstance.getState()) |
|
|
|
|
.isEqualTo(WorkflowExecutionStatus.SUCCESS); |
|
|
|
|
.isEqualTo(WorkflowExecutionStatus.STOP); |
|
|
|
|
Assertions |
|
|
|
|
.assertThat(workflowInstance.getName()) |
|
|
|
|
.isEqualTo("workflow_with_one_fake_task_success-20240816071251690"); |
|
|
|
@ -486,7 +492,7 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
|
|
|
|
|
Assertions |
|
|
|
|
.assertThat(taskInstances.get(1)) |
|
|
|
|
.matches(t -> t.getState() == TaskExecutionStatus.SUCCESS) |
|
|
|
|
.matches(t -> t.getState() == TaskExecutionStatus.KILL) |
|
|
|
|
.matches(t -> t.getFlag() == Flag.YES); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
@ -495,7 +501,7 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
@Test |
|
|
|
|
public void testGlobalFailover_readyStopWorkflow_withSuccessTasks() { |
|
|
|
|
final String yaml = "/it/failover/readyStop_workflowInstance_with_one_success_fake_task.yaml"; |
|
|
|
|
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowDefinition workflow = context.getWorkflows().get(0); |
|
|
|
|
|
|
|
|
|
systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date())); |
|
|
|
@ -530,7 +536,7 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
@Test |
|
|
|
|
public void testGlobalFailover_readyStopWorkflow_withFailedTasks() { |
|
|
|
|
final String yaml = "/it/failover/readyStop_workflowInstance_with_one_failed_fake_task.yaml"; |
|
|
|
|
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowDefinition workflow = context.getWorkflows().get(0); |
|
|
|
|
|
|
|
|
|
systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date())); |
|
|
|
@ -565,7 +571,7 @@ public class WorkflowInstanceFailoverIT extends AbstractMasterIntegrationTest {
|
|
|
|
|
@Test |
|
|
|
|
public void testGlobalFailover_readyStopWorkflow_withKilledTasks() { |
|
|
|
|
final String yaml = "/it/failover/readyStop_workflowInstance_with_one_killed_fake_task.yaml"; |
|
|
|
|
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); |
|
|
|
|
final WorkflowDefinition workflow = context.getWorkflows().get(0); |
|
|
|
|
|
|
|
|
|
systemEventBus.publish(GlobalMasterFailoverEvent.of(new Date())); |