Browse Source

[Feature] Support run all level dependent when complement (#11778)

* ADD: support all level dependent
* MOD: delete log
* MOD: format
* FIX: ut
* ADD: ut
* format
3.2.0-release
Stalary 2 years ago committed by GitHub
parent
commit
d2f83ee8cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
  2. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
  3. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  4. 62
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  5. 11
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java
  6. 46
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
  7. 1
      dolphinscheduler-ui/src/locales/en_US/project.ts
  8. 1
      dolphinscheduler-ui/src/locales/zh_CN/project.ts
  9. 19
      dolphinscheduler-ui/src/views/projects/workflow/definition/components/start-modal.tsx
  10. 3
      dolphinscheduler-ui/src/views/projects/workflow/definition/components/use-form.ts

16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java

@ -122,7 +122,8 @@ public class ExecutorController extends BaseController {
@Parameter(name = "expectedParallelismNumber", description = "EXPECTED_PARALLELISM_NUMBER", schema = @Schema(implementation = int.class, example = "8")), @Parameter(name = "expectedParallelismNumber", description = "EXPECTED_PARALLELISM_NUMBER", schema = @Schema(implementation = int.class, example = "8")),
@Parameter(name = "dryRun", description = "DRY_RUN", schema = @Schema(implementation = int.class, example = "0")), @Parameter(name = "dryRun", description = "DRY_RUN", schema = @Schema(implementation = int.class, example = "0")),
@Parameter(name = "testFlag", description = "TEST_FLAG", schema = @Schema(implementation = int.class, example = "0")), @Parameter(name = "testFlag", description = "TEST_FLAG", schema = @Schema(implementation = int.class, example = "0")),
@Parameter(name = "complementDependentMode", description = "COMPLEMENT_DEPENDENT_MODE", schema = @Schema(implementation = ComplementDependentMode.class)) @Parameter(name = "complementDependentMode", description = "COMPLEMENT_DEPENDENT_MODE", schema = @Schema(implementation = ComplementDependentMode.class)),
@Parameter(name = "allLevelDependent", description = "ALL_LEVEL_DEPENDENT", schema = @Schema(implementation = boolean.class, example = "false"))
}) })
@PostMapping(value = "start-process-instance") @PostMapping(value = "start-process-instance")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
@ -149,7 +150,8 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun, @RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "testFlag", defaultValue = "0") int testFlag, @RequestParam(value = "testFlag", defaultValue = "0") int testFlag,
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode, @RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode,
@RequestParam(value = "version", required = false) Integer version) { @RequestParam(value = "version", required = false) Integer version,
@RequestParam(value = "allLevelDependent", required = false, defaultValue = "false") boolean allLevelDependent) {
if (timeout == null) { if (timeout == null) {
timeout = Constants.MAX_TASK_TIMEOUT; timeout = Constants.MAX_TASK_TIMEOUT;
@ -168,7 +170,7 @@ public class ExecutorController extends BaseController {
startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority,
workerGroup, tenantCode, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, workerGroup, tenantCode, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun,
testFlag, testFlag,
complementDependentMode, version); complementDependentMode, version, allLevelDependent);
return returnDataList(result); return returnDataList(result);
} }
@ -215,7 +217,8 @@ public class ExecutorController extends BaseController {
@Parameter(name = "expectedParallelismNumber", description = "EXPECTED_PARALLELISM_NUMBER", schema = @Schema(implementation = int.class, example = "8")), @Parameter(name = "expectedParallelismNumber", description = "EXPECTED_PARALLELISM_NUMBER", schema = @Schema(implementation = int.class, example = "8")),
@Parameter(name = "dryRun", description = "DRY_RUN", schema = @Schema(implementation = int.class, example = "0")), @Parameter(name = "dryRun", description = "DRY_RUN", schema = @Schema(implementation = int.class, example = "0")),
@Parameter(name = "testFlag", description = "TEST_FLAG", schema = @Schema(implementation = int.class, example = "0")), @Parameter(name = "testFlag", description = "TEST_FLAG", schema = @Schema(implementation = int.class, example = "0")),
@Parameter(name = "complementDependentMode", description = "COMPLEMENT_DEPENDENT_MODE", schema = @Schema(implementation = ComplementDependentMode.class)) @Parameter(name = "complementDependentMode", description = "COMPLEMENT_DEPENDENT_MODE", schema = @Schema(implementation = ComplementDependentMode.class)),
@Parameter(name = "allLevelDependent", description = "ALL_LEVEL_DEPENDENT", schema = @Schema(implementation = boolean.class, example = "false"))
}) })
@PostMapping(value = "batch-start-process-instance") @PostMapping(value = "batch-start-process-instance")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
@ -241,7 +244,8 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber, @RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun, @RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "testFlag", defaultValue = "0") int testFlag, @RequestParam(value = "testFlag", defaultValue = "0") int testFlag,
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode) { @RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode,
@RequestParam(value = "allLevelDependent", required = false, defaultValue = "false") boolean allLevelDependent) {
if (timeout == null) { if (timeout == null) {
log.debug("Parameter timeout set to {} due to null.", Constants.MAX_TASK_TIMEOUT); log.debug("Parameter timeout set to {} due to null.", Constants.MAX_TASK_TIMEOUT);
@ -271,7 +275,7 @@ public class ExecutorController extends BaseController {
startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority,
workerGroup, tenantCode, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, workerGroup, tenantCode, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun,
testFlag, testFlag,
complementDependentMode, null); complementDependentMode, null, allLevelDependent);
if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) { if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) {
log.error("Process definition start failed, projectCode:{}, processDefinitionCode:{}.", projectCode, log.error("Process definition start failed, projectCode:{}, processDefinitionCode:{}.", projectCode,

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -404,7 +404,8 @@ public class PythonGateway {
DEFAULT_DRY_RUN, DEFAULT_DRY_RUN,
DEFAULT_TEST_FLAG, DEFAULT_TEST_FLAG,
COMPLEMENT_DEPENDENT_MODE, COMPLEMENT_DEPENDENT_MODE,
processDefinition.getVersion()); processDefinition.getVersion(),
false);
} }
// side object // side object

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -71,7 +71,8 @@ public interface ExecutorService {
Integer timeout, Integer timeout,
Map<String, String> startParams, Integer expectedParallelismNumber, Map<String, String> startParams, Integer expectedParallelismNumber,
int dryRun, int testFlag, int dryRun, int testFlag,
ComplementDependentMode complementDependentMode, Integer version); ComplementDependentMode complementDependentMode, Integer version,
boolean allLevelDependent);
/** /**
* check whether the process definition can be executed * check whether the process definition can be executed

62
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -222,7 +222,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
Long environmentCode, Integer timeout, Long environmentCode, Integer timeout,
Map<String, String> startParams, Integer expectedParallelismNumber, Map<String, String> startParams, Integer expectedParallelismNumber,
int dryRun, int testFlag, int dryRun, int testFlag,
ComplementDependentMode complementDependentMode, Integer version) { ComplementDependentMode complementDependentMode, Integer version,
boolean allLevelDependent) {
Project project = projectMapper.queryByCode(projectCode); Project project = projectMapper.queryByCode(projectCode);
// check user access for project // check user access for project
Map<String, Object> result = Map<String, Object> result =
@ -264,7 +265,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority, cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority,
workerGroup, tenantCode, workerGroup, tenantCode,
environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag, environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag,
complementDependentMode); complementDependentMode, allLevelDependent);
if (create > 0) { if (create > 0) {
processDefinition.setWarningGroupId(warningGroupId); processDefinition.setWarningGroupId(warningGroupId);
@ -427,8 +428,6 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
/** /**
* do action to workflow instancepause, stop, repeat, recover from pause, recover from stoprerun failed task * do action to workflow instancepause, stop, repeat, recover from pause, recover from stoprerun failed task
* *
* @param loginUser login user * @param loginUser login user
* @param workflowInstanceId workflow instance id * @param workflowInstanceId workflow instance id
@ -740,6 +739,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @param workerGroup workerGroup * @param workerGroup workerGroup
* @param testFlag testFlag * @param testFlag testFlag
* @param environmentCode environmentCode * @param environmentCode environmentCode
* @param allLevelDependent allLevelDependent
* @return command id * @return command id
*/ */
private int createCommand(Long triggerCode, CommandType commandType, long processDefineCode, TaskDependType nodeDep, private int createCommand(Long triggerCode, CommandType commandType, long processDefineCode, TaskDependType nodeDep,
@ -748,7 +748,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
Priority processInstancePriority, String workerGroup, String tenantCode, Priority processInstancePriority, String workerGroup, String tenantCode,
Long environmentCode, Long environmentCode,
Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun, Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun,
int testFlag, ComplementDependentMode complementDependentMode) { int testFlag, ComplementDependentMode complementDependentMode,
boolean allLevelDependent) {
/** /**
* instantiate command schedule instance * instantiate command schedule instance
@ -807,7 +808,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
log.info("Start to create {} command, processDefinitionCode:{}.", log.info("Start to create {} command, processDefinitionCode:{}.",
command.getCommandType().getDescp(), processDefineCode); command.getCommandType().getDescp(), processDefineCode);
return createComplementCommandList(triggerCode, schedule, runMode, command, expectedParallelismNumber, return createComplementCommandList(triggerCode, schedule, runMode, command, expectedParallelismNumber,
complementDependentMode); complementDependentMode, allLevelDependent);
} catch (CronParseException cronParseException) { } catch (CronParseException cronParseException) {
// We catch the exception here just to make compiler happy, since we have already validated the schedule // We catch the exception here just to make compiler happy, since we have already validated the schedule
// cron expression before // cron expression before
@ -834,7 +835,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
protected int createComplementCommandList(Long triggerCode, String scheduleTimeParam, RunMode runMode, protected int createComplementCommandList(Long triggerCode, String scheduleTimeParam, RunMode runMode,
Command command, Command command,
Integer expectedParallelismNumber, Integer expectedParallelismNumber,
ComplementDependentMode complementDependentMode) throws CronParseException { ComplementDependentMode complementDependentMode,
boolean allLevelDependent) throws CronParseException {
int createCount = 0; int createCount = 0;
String startDate = null; String startDate = null;
String endDate = null; String endDate = null;
@ -894,7 +896,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
log.info( log.info(
"Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", "Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.",
command.getProcessDefinitionCode()); command.getProcessDefinitionCode());
dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command); dependentProcessDefinitionCreateCount +=
createComplementDependentCommand(schedules, command, allLevelDependent);
} }
} }
if (createCount > 0) { if (createCount > 0) {
@ -963,7 +966,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
"Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", "Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.",
command.getProcessDefinitionCode()); command.getProcessDefinitionCode());
dependentProcessDefinitionCreateCount += dependentProcessDefinitionCreateCount +=
createComplementDependentCommand(schedules, command); createComplementDependentCommand(schedules, command, allLevelDependent);
} }
} }
} }
@ -1004,7 +1007,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
/** /**
* create complement dependent command * create complement dependent command
*/ */
public int createComplementDependentCommand(List<Schedule> schedules, Command command) { public int createComplementDependentCommand(List<Schedule> schedules, Command command, boolean allLevelDependent) {
int dependentProcessDefinitionCreateCount = 0; int dependentProcessDefinitionCreateCount = 0;
Command dependentCommand; Command dependentCommand;
@ -1017,7 +1020,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
List<DependentProcessDefinition> dependentProcessDefinitionList = List<DependentProcessDefinition> dependentProcessDefinitionList =
getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(), getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(),
CronUtils.getMaxCycle(schedules.get(0).getCrontab()), dependentCommand.getWorkerGroup()); CronUtils.getMaxCycle(schedules.get(0).getCrontab()), dependentCommand.getWorkerGroup(),
allLevelDependent);
dependentCommand.setTaskDependType(TaskDependType.TASK_POST); dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) { for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
// If the id is Integer, the auto-increment id will be obtained by mybatis-plus // If the id is Integer, the auto-increment id will be obtained by mybatis-plus
@ -1041,12 +1045,38 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
*/ */
private List<DependentProcessDefinition> getComplementDependentDefinitionList(long processDefinitionCode, private List<DependentProcessDefinition> getComplementDependentDefinitionList(long processDefinitionCode,
CycleEnum processDefinitionCycle, CycleEnum processDefinitionCycle,
String workerGroup) { String workerGroup,
boolean allLevelDependent) {
List<DependentProcessDefinition> dependentProcessDefinitionList = List<DependentProcessDefinition> dependentProcessDefinitionList =
processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode); checkDependentProcessDefinitionValid(
processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode),
return checkDependentProcessDefinitionValid(dependentProcessDefinitionList, processDefinitionCycle, processDefinitionCycle, workerGroup,
workerGroup, processDefinitionCode); processDefinitionCode);
if (dependentProcessDefinitionList.isEmpty()) {
return dependentProcessDefinitionList;
}
if (allLevelDependent) {
List<DependentProcessDefinition> childList = new ArrayList<>(dependentProcessDefinitionList);
while (true) {
List<DependentProcessDefinition> childDependentList = childList
.stream()
.flatMap(dependentProcessDefinition -> checkDependentProcessDefinitionValid(
processService.queryDependentProcessDefinitionByProcessDefinitionCode(
dependentProcessDefinition.getProcessDefinitionCode()),
processDefinitionCycle,
workerGroup,
dependentProcessDefinition.getProcessDefinitionCode()).stream())
.collect(Collectors.toList());
if (childDependentList.isEmpty()) {
break;
}
dependentProcessDefinitionList.addAll(childDependentList);
childList = new ArrayList<>(childDependentList);
}
}
return dependentProcessDefinitionList;
} }
/** /**

11
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java

@ -79,7 +79,7 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest {
final int testFlag = 0; final int testFlag = 0;
final ComplementDependentMode complementDependentMode = ComplementDependentMode.OFF_MODE; final ComplementDependentMode complementDependentMode = ComplementDependentMode.OFF_MODE;
final Integer version = null; final Integer version = null;
final boolean allLevelDependent = false;
final JsonObject expectResponseContent = gson final JsonObject expectResponseContent = gson
.fromJson("{\"code\":0,\"msg\":\"success\",\"data\":\"Test Data\",\"success\":true,\"failed\":false}", .fromJson("{\"code\":0,\"msg\":\"success\",\"data\":\"Test Data\",\"success\":true,\"failed\":false}",
JsonObject.class); JsonObject.class);
@ -119,7 +119,8 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest {
eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(tenantCode), eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(tenantCode),
eq(environmentCode), eq(environmentCode),
eq(timeout), eq(startParams), eq(expectedParallelismNumber), eq(dryRun), eq(testFlag), eq(timeout), eq(startParams), eq(expectedParallelismNumber), eq(dryRun), eq(testFlag),
eq(complementDependentMode), eq(version))) eq(complementDependentMode), eq(version),
eq(allLevelDependent)))
.thenReturn(executeServiceResult); .thenReturn(executeServiceResult);
// When // When
@ -165,7 +166,7 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest {
eq(environmentCode), eq(environmentCode),
eq(Constants.MAX_TASK_TIMEOUT), eq(startParams), eq(expectedParallelismNumber), eq(dryRun), eq(Constants.MAX_TASK_TIMEOUT), eq(startParams), eq(expectedParallelismNumber), eq(dryRun),
eq(testFlag), eq(testFlag),
eq(complementDependentMode), eq(version))).thenReturn(executeServiceResult); eq(complementDependentMode), eq(version), eq(allLevelDependent))).thenReturn(executeServiceResult);
// When // When
final MvcResult mvcResult = mockMvc final MvcResult mvcResult = mockMvc
@ -209,7 +210,7 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest {
eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(tenantCode), eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(tenantCode),
eq(environmentCode), eq(environmentCode),
eq(timeout), eq(null), eq(expectedParallelismNumber), eq(dryRun), eq(testFlag), eq(timeout), eq(null), eq(expectedParallelismNumber), eq(dryRun), eq(testFlag),
eq(complementDependentMode), eq(version))).thenReturn(executeServiceResult); eq(complementDependentMode), eq(version), eq(allLevelDependent))).thenReturn(executeServiceResult);
// When // When
final MvcResult mvcResult = mockMvc final MvcResult mvcResult = mockMvc
@ -238,7 +239,7 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest {
eq(scheduleTime), eq(null), eq(failureStrategy), eq(null), eq(null), eq(warningType), eq(scheduleTime), eq(null), eq(failureStrategy), eq(null), eq(null), eq(warningType),
eq(null), eq(null), eq(null), eq("default"), eq("default"), eq(-1L), eq(null), eq(null), eq(null), eq("default"), eq("default"), eq(-1L),
eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0), eq(0), eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0), eq(0),
eq(complementDependentMode), eq(version))).thenReturn(executeServiceResult); eq(complementDependentMode), eq(version), eq(allLevelDependent))).thenReturn(executeServiceResult);
// When // When
final MvcResult mvcResult = mockMvc final MvcResult mvcResult = mockMvc

46
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java

@ -282,7 +282,8 @@ public class ExecuteFunctionServiceTest {
RunMode.RUN_MODE_SERIAL, RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null); ComplementDependentMode.OFF_MODE, null,
false);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(commandService, times(1)).createCommand(any(Command.class)); verify(commandService, times(1)).createCommand(any(Command.class));
@ -306,7 +307,8 @@ public class ExecuteFunctionServiceTest {
RunMode.RUN_MODE_SERIAL, RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null); ComplementDependentMode.OFF_MODE, null,
false);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(commandService, times(1)).createCommand(any(Command.class)); verify(commandService, times(1)).createCommand(any(Command.class));
@ -329,7 +331,8 @@ public class ExecuteFunctionServiceTest {
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0,
Constants.DRY_RUN_FLAG_NO, Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null); ComplementDependentMode.OFF_MODE, null,
false);
} catch (ServiceException e) { } catch (ServiceException e) {
Assertions.assertEquals(Status.START_NODE_NOT_EXIST_IN_LAST_PROCESS.getCode(), e.getCode()); Assertions.assertEquals(Status.START_NODE_NOT_EXIST_IN_LAST_PROCESS.getCode(), e.getCode());
} }
@ -375,8 +378,23 @@ public class ExecuteFunctionServiceTest {
command.setProcessDefinitionCode(processDefinitionCode); command.setProcessDefinitionCode(processDefinitionCode);
command.setExecutorId(1); command.setExecutorId(1);
int count = executorService.createComplementDependentCommand(schedules, command); // not enable allLevelDependent
int count = executorService.createComplementDependentCommand(schedules, command, false);
Assertions.assertEquals(1, count); Assertions.assertEquals(1, count);
// enable allLevelDependent
DependentProcessDefinition childDependent = new DependentProcessDefinition();
childDependent.setProcessDefinitionCode(3);
childDependent.setProcessDefinitionVersion(1);
childDependent.setTaskDefinitionCode(4);
childDependent.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
childDependent.setTaskParams(
"{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":3,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}");
Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode(
dependentProcessDefinition.getProcessDefinitionCode())).thenReturn(Lists.newArrayList(childDependent))
.thenReturn(Lists.newArrayList());
int allLevelDependentCount = executorService.createComplementDependentCommand(schedules, command, true);
Assertions.assertEquals(2, allLevelDependentCount);
} }
/** /**
@ -397,7 +415,8 @@ public class ExecuteFunctionServiceTest {
RunMode.RUN_MODE_SERIAL, RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null); ComplementDependentMode.OFF_MODE, null,
false);
Assertions.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); Assertions.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
verify(commandService, times(0)).createCommand(any(Command.class)); verify(commandService, times(0)).createCommand(any(Command.class));
} }
@ -420,7 +439,8 @@ public class ExecuteFunctionServiceTest {
RunMode.RUN_MODE_SERIAL, RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null); ComplementDependentMode.OFF_MODE, null,
false);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(commandService, times(1)).createCommand(any(Command.class)); verify(commandService, times(1)).createCommand(any(Command.class));
} }
@ -443,7 +463,8 @@ public class ExecuteFunctionServiceTest {
RunMode.RUN_MODE_PARALLEL, RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null); ComplementDependentMode.OFF_MODE, null,
false);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(commandService, times(31)).createCommand(any(Command.class)); verify(commandService, times(31)).createCommand(any(Command.class));
@ -468,7 +489,8 @@ public class ExecuteFunctionServiceTest {
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 15, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 15,
Constants.DRY_RUN_FLAG_NO, Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null); ComplementDependentMode.OFF_MODE, null,
false);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(commandService, times(15)).createCommand(any(Command.class)); verify(commandService, times(15)).createCommand(any(Command.class));
@ -499,9 +521,8 @@ public class ExecuteFunctionServiceTest {
0, 0,
Constants.DRY_RUN_FLAG_NO, Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, ComplementDependentMode.OFF_MODE, null,
null)); false));
} }
@Test @Test
@ -533,7 +554,8 @@ public class ExecuteFunctionServiceTest {
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 15, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 15,
Constants.DRY_RUN_FLAG_NO, Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_YES, Constants.TEST_FLAG_YES,
ComplementDependentMode.OFF_MODE, null); ComplementDependentMode.OFF_MODE, null,
false);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
} }

1
dolphinscheduler-ui/src/locales/en_US/project.ts

@ -206,6 +206,7 @@ export default {
cancel_full_screen: 'Cancel full screen', cancel_full_screen: 'Cancel full screen',
task_state: 'Task status', task_state: 'Task status',
mode_of_dependent: 'Mode of dependent', mode_of_dependent: 'Mode of dependent',
all_level_dependent: 'all level dependent',
open: 'Open', open: 'Open',
project_name_required: 'Project name is required', project_name_required: 'Project name is required',
related_items: 'Related items', related_items: 'Related items',

1
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@ -207,6 +207,7 @@ export default {
cancel_full_screen: '取消全屏', cancel_full_screen: '取消全屏',
task_state: '任务状态', task_state: '任务状态',
mode_of_dependent: '依赖模式', mode_of_dependent: '依赖模式',
all_level_dependent: '所有层级依赖',
open: '打开', open: '打开',
project_name_required: '项目名称必填', project_name_required: '项目名称必填',
related_items: '关联项目', related_items: '关联项目',

19
dolphinscheduler-ui/src/views/projects/workflow/definition/components/start-modal.tsx

@ -380,6 +380,25 @@ export default defineComponent({
</NSpace> </NSpace>
</NRadioGroup> </NRadioGroup>
</NFormItem> </NFormItem>
{this.startForm.complementDependentMode === 'ALL_DEPENDENT' && (
<NFormItem
label={t('project.workflow.all_level_dependent')}
path='allLevelDependent'
>
<NRadioGroup
v-model:value={this.startForm.allLevelDependent}
>
<NSpace>
<NRadio value={'false'}>
{t('project.workflow.close')}
</NRadio>
<NRadio value={'true'}>
{t('project.workflow.open')}
</NRadio>
</NSpace>
</NRadioGroup>
</NFormItem>
)}
<NFormItem <NFormItem
label={t('project.workflow.mode_of_execution')} label={t('project.workflow.mode_of_execution')}
path='runMode' path='runMode'

3
dolphinscheduler-ui/src/views/projects/workflow/definition/components/use-form.ts

@ -69,7 +69,8 @@ export const useForm = () => {
expectedParallelismNumber: '', expectedParallelismNumber: '',
dryRun: 0, dryRun: 0,
testFlag: 0, testFlag: 0,
version: null version: null,
allLevelDependent: false
}, },
saving: false, saving: false,
rules: { rules: {

Loading…
Cancel
Save