Browse Source

[Improvement-16612][Master] For logical tasks on the Master, there should be support for dry run (#16616)

dev
veli.yang 1 month ago committed by GitHub
parent
commit
ccedd2ed95
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java
  2. 5
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java
  3. 90
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner.execute;
import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
import static org.apache.dolphinscheduler.common.constants.Constants.DRY_RUN_FLAG_YES;
import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -103,6 +104,15 @@ public abstract class MasterTaskExecutor implements Runnable {
TaskInstanceLogHeader.printLoadTaskInstancePluginHeader();
beforeExecute();
if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
taskExecutionContext.setEndTime(System.currentTimeMillis());
MasterTaskExecutorHolder.removeMasterTaskExecutor(taskExecutionContext.getTaskInstanceId());
logicTaskInstanceExecutionEventSenderManager.successEventSender().sendMessage(taskExecutionContext);
log.info(
"The current execute mode is dry run, will stop the logic task and set the taskInstance status to success");
return;
}
TaskInstanceLogHeader.printExecuteTaskHeader();
executeTask();

5
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.integration;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@ -61,6 +62,7 @@ public class WorkflowOperator {
.workflowDefinitionVersion(workflowTriggerDTO.workflowDefinition.getVersion())
.startNodes(workflowTriggerDTO.getRunWorkflowCommandParam().getStartNodes())
.startParamList(workflowTriggerDTO.getRunWorkflowCommandParam().getCommandParams())
.dryRun(workflowTriggerDTO.dryRun)
.build();
final WorkflowManualTriggerResponse manualTriggerWorkflowResponse =
@ -139,6 +141,9 @@ public class WorkflowOperator {
private final WorkflowDefinition workflowDefinition;
private final RunWorkflowCommandParam runWorkflowCommandParam;
@Builder.Default
private Flag dryRun = Flag.NO;
}
@Data

90
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java

@ -87,12 +87,50 @@ public class WorkflowStartTestCase extends AbstractMasterIntegrationTestCase {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
.matches(
workflowInstance -> workflowInstance.getDryRun() == Flag.NO.getCode());
Assertions
.assertThat(repository.queryTaskInstance(workflow))
.satisfiesExactly(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("A");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
assertThat(taskInstance.getDryRun()).isEqualTo(Flag.NO.getCode());
});
});
assertThat(workflowRepository.getAll()).isEmpty();
}
@Test
@DisplayName("Test start a workflow with one fake task(A) dry run success")
public void testStartWorkflow_with_oneSuccessTaskDryRun() {
final String yaml = "/it/start/workflow_with_one_fake_task_success.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition workflow = context.getWorkflows().get(0);
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(workflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.dryRun(Flag.YES)
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
.matches(
workflowInstance -> workflowInstance.getDryRun() == Flag.YES.getCode());
Assertions
.assertThat(repository.queryTaskInstance(workflow))
.satisfiesExactly(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("A");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
assertThat(taskInstance.getDryRun()).isEqualTo(Flag.YES.getCode());
});
});
@ -121,7 +159,9 @@ public class WorkflowStartTestCase extends AbstractMasterIntegrationTestCase {
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
.matches(
workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO);
workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO)
.matches(
workflowInstance -> workflowInstance.getDryRun() == Flag.NO.getCode());
final List<WorkflowInstance> subWorkflowInstance =
repository.queryWorkflowInstance(context.getWorkflows().get(1));
@ -131,6 +171,7 @@ public class WorkflowStartTestCase extends AbstractMasterIntegrationTestCase {
.satisfiesExactly(workflowInstance -> {
assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
assertThat(workflowInstance.getIsSubWorkflow()).isEqualTo(Flag.YES);
assertThat(workflowInstance.getDryRun()).isEqualTo(Flag.NO.getCode());
});
Assertions
@ -151,6 +192,51 @@ public class WorkflowStartTestCase extends AbstractMasterIntegrationTestCase {
assertThat(workflowRepository.getAll()).isEmpty();
}
@Test
@DisplayName("Test start a workflow with one sub workflow task(A) dry run, will not execute")
public void testStartWorkflow_with_subWorkflowTask_dryRunSuccess() {
final String yaml = "/it/start/workflow_with_sub_workflow_task_success.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition parentWorkflow = context.getWorkflows().get(0);
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(parentWorkflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.dryRun(Flag.YES)
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
.matches(
workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO)
.matches(
workflowInstance -> workflowInstance.getDryRun() == Flag.YES.getCode());
final List<WorkflowInstance> subWorkflowInstance =
repository.queryWorkflowInstance(context.getWorkflows().get(1));
Assertions
.assertThat(subWorkflowInstance)
.isEmpty();
Assertions
.assertThat(repository.queryTaskInstance(workflowInstanceId))
.satisfiesExactly(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("sub_logic_task");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
assertThat(taskInstance.getDryRun()).isEqualTo(Flag.YES.getCode());
});
});
assertThat(workflowRepository.getAll()).isEmpty();
}
@Test
@DisplayName("Test start a workflow with one sub workflow task(A) failed")
public void testStartWorkflow_with_subWorkflowTask_failed() {

Loading…
Cancel
Save