From 76b1eefb68a7a6333c6b93987e5d3e842eabb2db Mon Sep 17 00:00:00 2001 From: calvin Date: Wed, 5 Jul 2023 14:35:18 +0800 Subject: [PATCH] [Feature-14321][API] Support to complement data in descending or ascending order of date (#14450) --- .../api/controller/ExecutorController.java | 19 +- .../dolphinscheduler/api/enums/Status.java | 3 + .../api/python/PythonGateway.java | 5 +- .../api/service/ExecutorService.java | 4 +- .../api/service/impl/ExecutorServiceImpl.java | 259 ++++++++---------- .../ExecuteFunctionControllerTest.java | 16 +- .../service/ExecuteFunctionServiceTest.java | 47 ++-- .../common/enums/ExecutionOrder.java | 50 ++++ .../src/locales/en_US/project.ts | 3 + .../src/locales/zh_CN/project.ts | 3 + .../definition/components/start-modal.tsx | 15 + .../definition/components/use-form.ts | 3 +- 12 files changed, 256 insertions(+), 171 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionOrder.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java index 0cd3a59346..5151665b34 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; +import org.apache.dolphinscheduler.common.enums.ExecutionOrder; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.RunMode; @@ -101,6 +102,7 @@ public class ExecutorController extends BaseController { * @param timeout timeout * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode * @param testFlag testFlag + * @param executionOrder complement data in some kind of order * @return start process result code */ @Operation(summary = "startProcessInstance", description = "RUN_PROCESS_INSTANCE_NOTES") @@ -123,7 +125,8 @@ public class ExecutorController extends BaseController { @Parameter(name = "dryRun", description = "DRY_RUN", schema = @Schema(implementation = int.class, example = "0")), @Parameter(name = "testFlag", description = "TEST_FLAG", schema = @Schema(implementation = int.class, example = "0")), @Parameter(name = "complementDependentMode", description = "COMPLEMENT_DEPENDENT_MODE", schema = @Schema(implementation = ComplementDependentMode.class)), - @Parameter(name = "allLevelDependent", description = "ALL_LEVEL_DEPENDENT", schema = @Schema(implementation = boolean.class, example = "false")) + @Parameter(name = "allLevelDependent", description = "ALL_LEVEL_DEPENDENT", schema = @Schema(implementation = boolean.class, example = "false")), + @Parameter(name = "executionOrder", description = "EXECUTION_ORDER", schema = @Schema(implementation = ExecutionOrder.class)) }) @PostMapping(value = "start-process-instance") @ResponseStatus(HttpStatus.OK) @@ -151,7 +154,8 @@ public class ExecutorController extends BaseController { @RequestParam(value = "testFlag", defaultValue = "0") int testFlag, @RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode, @RequestParam(value = "version", required = false) Integer version, - @RequestParam(value = "allLevelDependent", required = false, defaultValue = "false") boolean allLevelDependent) { + @RequestParam(value = "allLevelDependent", required = false, defaultValue = "false") boolean allLevelDependent, + @RequestParam(value = "executionOrder", required = false) ExecutionOrder executionOrder) { if (timeout == null) { timeout = Constants.MAX_TASK_TIMEOUT; @@ -170,7 +174,7 @@ public class ExecutorController extends BaseController { startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, workerGroup, tenantCode, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, testFlag, - complementDependentMode, version, allLevelDependent); + complementDependentMode, version, allLevelDependent, executionOrder); return returnDataList(result); } @@ -196,6 +200,7 @@ public class ExecutorController extends BaseController { * @param timeout timeout * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode * @param testFlag testFlag + * @param executionOrder complement data in some kind of order * @return start process result code */ @Operation(summary = "batchStartProcessInstance", description = "BATCH_RUN_PROCESS_INSTANCE_NOTES") @@ -218,7 +223,8 @@ public class ExecutorController extends BaseController { @Parameter(name = "dryRun", description = "DRY_RUN", schema = @Schema(implementation = int.class, example = "0")), @Parameter(name = "testFlag", description = "TEST_FLAG", schema = @Schema(implementation = int.class, example = "0")), @Parameter(name = "complementDependentMode", description = "COMPLEMENT_DEPENDENT_MODE", schema = @Schema(implementation = ComplementDependentMode.class)), - @Parameter(name = "allLevelDependent", description = "ALL_LEVEL_DEPENDENT", schema = @Schema(implementation = boolean.class, example = "false")) + @Parameter(name = "allLevelDependent", description = "ALL_LEVEL_DEPENDENT", schema = @Schema(implementation = boolean.class, example = "false")), + @Parameter(name = "executionOrder", description = "EXECUTION_ORDER", schema = @Schema(implementation = ExecutionOrder.class)) }) @PostMapping(value = "batch-start-process-instance") @ResponseStatus(HttpStatus.OK) @@ -245,7 +251,8 @@ public class ExecutorController extends BaseController { @RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun, @RequestParam(value = "testFlag", defaultValue = "0") int testFlag, @RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode, - @RequestParam(value = "allLevelDependent", required = false, defaultValue = "false") boolean allLevelDependent) { + @RequestParam(value = "allLevelDependent", required = false, defaultValue = "false") boolean allLevelDependent, + @RequestParam(value = "executionOrder", required = false) ExecutionOrder executionOrder) { if (timeout == null) { log.debug("Parameter timeout set to {} due to null.", Constants.MAX_TASK_TIMEOUT); @@ -275,7 +282,7 @@ public class ExecutorController extends BaseController { startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, workerGroup, tenantCode, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, testFlag, - complementDependentMode, null, allLevelDependent); + complementDependentMode, null, allLevelDependent, executionOrder); if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) { log.error("Process definition start failed, projectCode:{}, processDefinitionCode:{}.", projectCode, diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index eaa6300ac2..7cc5c22c93 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -413,6 +413,9 @@ public enum Status { WORKFLOW_INSTANCE_IS_NOT_FINISHED(50071, "the workflow instance is not finished, can not do this operation", "工作流实例未结束,不能执行此操作"), + TASK_PARALLELISM_PARAMS_ERROR(50080, "task parallelism parameter is not valid", "任务并行度参数无效"), + TASK_COMPLEMENT_DATA_DATE_ERROR(50081, "The range of date for complementing date is not valid", "补数选择的日期范围无效"), + HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), STORAGE_NOT_STARTUP(60002, "storage not startup", "存储未启用"), S3_CANNOT_RENAME(60003, "directory cannot be renamed", "S3无法重命名文件夹"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java index a40a89db30..fe57f12d16 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java @@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.api.service.UsersService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; +import org.apache.dolphinscheduler.common.enums.ExecutionOrder; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; @@ -94,6 +95,7 @@ public class PythonGateway { private static final TaskDependType DEFAULT_TASK_DEPEND_TYPE = TaskDependType.TASK_POST; private static final RunMode DEFAULT_RUN_MODE = RunMode.RUN_MODE_SERIAL; + private static final ExecutionOrder DEFAULT_EXECUTION_ORDER = ExecutionOrder.DESC_ORDER; private static final int DEFAULT_DRY_RUN = 0; private static final int DEFAULT_TEST_FLAG = 0; private static final ComplementDependentMode COMPLEMENT_DEPENDENT_MODE = ComplementDependentMode.OFF_MODE; @@ -405,7 +407,8 @@ public class PythonGateway { DEFAULT_TEST_FLAG, COMPLEMENT_DEPENDENT_MODE, processDefinition.getVersion(), - false); + false, + DEFAULT_EXECUTION_ORDER); } // side object diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 9e8ee66de0..c67f790c34 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteRespo import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; +import org.apache.dolphinscheduler.common.enums.ExecutionOrder; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.RunMode; @@ -58,6 +59,7 @@ public interface ExecutorService { * @param timeout timeout * @param startParams the global param values which pass to new process instance * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode + * @param executionOrder the execution order when complementing data * @return execute process instance code */ Map execProcessInstance(User loginUser, long projectCode, @@ -72,7 +74,7 @@ public interface ExecutorService { Map startParams, Integer expectedParallelismNumber, int dryRun, int testFlag, ComplementDependentMode complementDependentMode, Integer version, - boolean allLevelDependent); + boolean allLevelDependent, ExecutionOrder executionOrder); /** * check whether the process definition can be executed diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index a707beab0f..ba996a46eb 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -48,6 +48,7 @@ import org.apache.dolphinscheduler.common.enums.ApiTriggerType; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; import org.apache.dolphinscheduler.common.enums.CycleEnum; +import org.apache.dolphinscheduler.common.enums.ExecutionOrder; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; @@ -104,10 +105,12 @@ import org.apache.commons.lang3.StringUtils; import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -118,7 +121,7 @@ import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import com.google.common.collect.Lists; +import com.google.common.base.Splitter; /** * executor service impl @@ -207,6 +210,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @param startParams the global param values which pass to new process instance * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode * @param testFlag testFlag + * @param executionOrder the execution order when complementing data * @return execute process instance code */ @Override @@ -222,7 +226,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ Map startParams, Integer expectedParallelismNumber, int dryRun, int testFlag, ComplementDependentMode complementDependentMode, Integer version, - boolean allLevelDependent) { + boolean allLevelDependent, ExecutionOrder executionOrder) { Project project = projectMapper.queryByCode(projectCode); // check user access for project Map result = @@ -236,6 +240,14 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR); return result; } + + if (Objects.nonNull(expectedParallelismNumber) && expectedParallelismNumber <= 0) { + log.warn("Parameter expectedParallelismNumber is invalid, expectedParallelismNumber:{}.", + expectedParallelismNumber); + putMsg(result, Status.TASK_PARALLELISM_PARAMS_ERROR); + return result; + } + checkValidTenant(tenantCode); ProcessDefinition processDefinition; if (null != version) { @@ -264,7 +276,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority, workerGroup, tenantCode, environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag, - complementDependentMode, allLevelDependent); + complementDependentMode, allLevelDependent, executionOrder); if (create > 0) { processDefinition.setWarningGroupId(warningGroupId); @@ -738,6 +750,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @param testFlag testFlag * @param environmentCode environmentCode * @param allLevelDependent allLevelDependent + * @param executionOrder executionOrder * @return command id */ private int createCommand(Long triggerCode, CommandType commandType, long processDefineCode, TaskDependType nodeDep, @@ -747,7 +760,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ Long environmentCode, Map startParams, Integer expectedParallelismNumber, int dryRun, int testFlag, ComplementDependentMode complementDependentMode, - boolean allLevelDependent) { + boolean allLevelDependent, ExecutionOrder executionOrder) { /** * instantiate command schedule instance @@ -806,7 +819,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ log.info("Start to create {} command, processDefinitionCode:{}.", command.getCommandType().getDescp(), processDefineCode); return createComplementCommandList(triggerCode, schedule, runMode, command, expectedParallelismNumber, - complementDependentMode, allLevelDependent); + complementDependentMode, allLevelDependent, executionOrder); } catch (CronParseException cronParseException) { // We catch the exception here just to make compiler happy, since we have already validated the schedule // cron expression before @@ -822,174 +835,138 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ } } + private int createComplementCommand(Long triggerCode, Command command, Map cmdParam, + List dateTimeList, List schedules, + ComplementDependentMode complementDependentMode, boolean allLevelDependent) { + + String dateTimeListStr = dateTimeList.stream() + .map(item -> DateUtils.dateToString(item)) + .collect(Collectors.joining(COMMA)); + + cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateTimeListStr); + command.setCommandParam(JSONUtils.toJsonString(cmdParam)); + + log.info("Creating command, commandInfo:{}.", command); + int createCount = commandService.createCommand(command); + + if (createCount > 0) { + log.info("Create {} command complete, processDefinitionCode:{}", + command.getCommandType().getDescp(), command.getProcessDefinitionCode()); + } else { + log.error("Create {} command error, processDefinitionCode:{}", + command.getCommandType().getDescp(), command.getProcessDefinitionCode()); + } + + if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { + log.info( + "Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", + command.getProcessDefinitionCode()); + } else { + log.info( + "Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", + command.getProcessDefinitionCode()); + createComplementDependentCommand(schedules, command, allLevelDependent); + } + + if (createCount > 0) { + triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode, command.getId()); + } + return createCount; + } + /** * create complement command * close left and close right * * @param scheduleTimeParam * @param runMode + * @param executionOrder * @return */ protected int createComplementCommandList(Long triggerCode, String scheduleTimeParam, RunMode runMode, Command command, Integer expectedParallelismNumber, ComplementDependentMode complementDependentMode, - boolean allLevelDependent) throws CronParseException { + boolean allLevelDependent, + ExecutionOrder executionOrder) throws CronParseException { int createCount = 0; - String startDate = null; - String endDate = null; - String dateList = null; int dependentProcessDefinitionCreateCount = 0; runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode; Map cmdParam = JSONUtils.toMap(command.getCommandParam()); Map scheduleParam = JSONUtils.toMap(scheduleTimeParam); - if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { - dateList = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST); - dateList = removeDuplicates(dateList); + + if (Objects.isNull(executionOrder)) { + executionOrder = ExecutionOrder.DESC_ORDER; } + + List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode( + command.getProcessDefinitionCode()); + + List listDate = new ArrayList<>(); if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_START_DATE) && scheduleParam.containsKey( CMD_PARAM_COMPLEMENT_DATA_END_DATE)) { - startDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE); - endDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE); + String startDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE); + String endDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE); + if (startDate != null && endDate != null) { + listDate = CronUtils.getSelfFireDateList( + DateUtils.stringToZoneDateTime(startDate), + DateUtils.stringToZoneDateTime(endDate), + schedules); + } } + + if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { + String dateList = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST); + + if (StringUtils.isNotBlank(dateList)) { + listDate = Splitter.on(COMMA).splitToStream(dateList) + .map(item -> DateUtils.stringToZoneDateTime(item.trim())) + .distinct() + .collect(Collectors.toList()); + } + } + + if (CollectionUtils.isEmpty(listDate)) { + throw new ServiceException(Status.TASK_COMPLEMENT_DATA_DATE_ERROR); + } + + if (executionOrder.equals(ExecutionOrder.DESC_ORDER)) { + Collections.sort(listDate, Collections.reverseOrder()); + } else { + Collections.sort(listDate); + } + switch (runMode) { case RUN_MODE_SERIAL: { log.info("RunMode of {} command is serial run, processDefinitionCode:{}.", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); - if (StringUtils.isNotEmpty(dateList)) { - cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList); - command.setCommandParam(JSONUtils.toJsonString(cmdParam)); - log.info("Creating command, commandInfo:{}.", command); - createCount = commandService.createCommand(command); - if (createCount > 0) { - log.info("Create {} command complete, processDefinitionCode:{}", - command.getCommandType().getDescp(), command.getProcessDefinitionCode()); - } else { - log.error("Create {} command error, processDefinitionCode:{}", - command.getCommandType().getDescp(), command.getProcessDefinitionCode()); - } - } - if (startDate != null && endDate != null) { - cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE, startDate); - cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, endDate); - command.setCommandParam(JSONUtils.toJsonString(cmdParam)); - log.info("Creating command, commandInfo:{}.", command); - createCount = commandService.createCommand(command); - if (createCount > 0) { - log.info("Create {} command complete, processDefinitionCode:{}", - command.getCommandType().getDescp(), command.getProcessDefinitionCode()); - } else { - log.error("Create {} command error, processDefinitionCode:{}", - command.getCommandType().getDescp(), command.getProcessDefinitionCode()); - } - // dependent process definition - List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode( - command.getProcessDefinitionCode()); - - if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { - log.info( - "Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", - command.getProcessDefinitionCode()); - } else { - log.info( - "Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", - command.getProcessDefinitionCode()); - dependentProcessDefinitionCreateCount += - createComplementDependentCommand(schedules, command, allLevelDependent); - } - } - if (createCount > 0) { - triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode, command.getId()); - } + createCount = createComplementCommand(triggerCode, command, cmdParam, listDate, schedules, + complementDependentMode, allLevelDependent); break; } case RUN_MODE_PARALLEL: { log.info("RunMode of {} command is parallel run, processDefinitionCode:{}.", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); - if (startDate != null && endDate != null) { - List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode( - command.getProcessDefinitionCode()); - List listDate = CronUtils.getSelfFireDateList( - DateUtils.stringToZoneDateTime(startDate), - DateUtils.stringToZoneDateTime(endDate), - schedules); - int listDateSize = listDate.size(); - createCount = listDate.size(); - if (!CollectionUtils.isEmpty(listDate)) { - if (expectedParallelismNumber != null && expectedParallelismNumber != 0) { - createCount = Math.min(createCount, expectedParallelismNumber); - } - log.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", - createCount); - - // Distribute the number of tasks equally to each command. - // The last command with insufficient quantity will be assigned to the remaining tasks. - int itemsPerCommand = (listDateSize / createCount); - int remainingItems = (listDateSize % createCount); - int startDateIndex = 0; - int endDateIndex = 0; - - for (int i = 1; i <= createCount; i++) { - int extra = (i <= remainingItems) ? 1 : 0; - int singleCommandItems = (itemsPerCommand + extra); - - if (i == 1) { - endDateIndex += singleCommandItems - 1; - } else { - startDateIndex = endDateIndex + 1; - endDateIndex += singleCommandItems; - } - - cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE, - DateUtils.dateToString(listDate.get(startDateIndex))); - cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, - DateUtils.dateToString(listDate.get(endDateIndex))); - command.setCommandParam(JSONUtils.toJsonString(cmdParam)); - log.info("Creating command, commandInfo:{}.", command); - if (commandService.createCommand(command) > 0) { - log.info("Create {} command complete, processDefinitionCode:{}", - command.getCommandType().getDescp(), command.getProcessDefinitionCode()); - triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode, - command.getId()); - } else { - log.error("Create {} command error, processDefinitionCode:{}", - command.getCommandType().getDescp(), command.getProcessDefinitionCode()); - } - if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { - log.info( - "Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", - command.getProcessDefinitionCode()); - } else { - log.info( - "Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", - command.getProcessDefinitionCode()); - dependentProcessDefinitionCreateCount += - createComplementDependentCommand(schedules, command, allLevelDependent); - } - } + + int queueNum = 0; + if (CollectionUtils.isNotEmpty(listDate)) { + queueNum = listDate.size(); + if (expectedParallelismNumber != null && expectedParallelismNumber != 0) { + queueNum = Math.min(queueNum, expectedParallelismNumber); } - } - if (StringUtils.isNotEmpty(dateList)) { - List listDate = Arrays.asList(dateList.split(COMMA)); - createCount = listDate.size(); - if (!CollectionUtils.isEmpty(listDate)) { - if (expectedParallelismNumber != null && expectedParallelismNumber != 0) { - createCount = Math.min(createCount, expectedParallelismNumber); - } - log.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", - createCount); - for (List stringDate : Lists.partition(listDate, createCount)) { - cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, stringDate)); - command.setCommandParam(JSONUtils.toJsonString(cmdParam)); - log.info("Creating command, commandInfo:{}.", command); - if (commandService.createCommand(command) > 0) { - log.info("Create {} command complete, processDefinitionCode:{}", - command.getCommandType().getDescp(), command.getProcessDefinitionCode()); - } else { - log.error("Create {} command error, processDefinitionCode:{}", - command.getCommandType().getDescp(), command.getProcessDefinitionCode()); - } + log.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", + queueNum); + List[] queues = new List[queueNum]; + + for (int i = 0; i < listDate.size(); i++) { + if (Objects.isNull(queues[i % queueNum])) { + queues[i % queueNum] = new ArrayList(); } + queues[i % queueNum].add(listDate.get(i)); + } + for (List queue : queues) { + createCount = createComplementCommand(triggerCode, command, cmdParam, queue, schedules, + complementDependentMode, allLevelDependent); } } break; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java index 83f66dbeac..7cd97a5551 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.api.service.ExecutorService; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; +import org.apache.dolphinscheduler.common.enums.ExecutionOrder; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.RunMode; @@ -68,6 +69,7 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest { final WarningType warningType = WarningType.NONE; final int warningGroupId = 3; final RunMode runMode = RunMode.RUN_MODE_SERIAL; + final ExecutionOrder executionOrder = ExecutionOrder.DESC_ORDER; final Priority processInstancePriority = Priority.HIGH; final String workerGroup = "workerGroup"; final String tenantCode = "root"; @@ -112,6 +114,7 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest { paramsMap.add("expectedParallelismNumber", String.valueOf(expectedParallelismNumber)); paramsMap.add("dryRun", String.valueOf(dryRun)); paramsMap.add("testFlag", String.valueOf(testFlag)); + paramsMap.add("executionOrder", String.valueOf(executionOrder)); when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode), eq(scheduleTime), eq(execType), eq(failureStrategy), eq(startNodeList), eq(taskDependType), @@ -120,7 +123,7 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest { eq(environmentCode), eq(timeout), eq(startParams), eq(expectedParallelismNumber), eq(dryRun), eq(testFlag), eq(complementDependentMode), eq(version), - eq(allLevelDependent))) + eq(allLevelDependent), eq(executionOrder))) .thenReturn(executeServiceResult); // When @@ -158,6 +161,7 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest { paramsMap.add("expectedParallelismNumber", String.valueOf(expectedParallelismNumber)); paramsMap.add("dryRun", String.valueOf(dryRun)); paramsMap.add("testFlag", String.valueOf(testFlag)); + paramsMap.add("executionOrder", String.valueOf(executionOrder)); when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode), eq(scheduleTime), eq(execType), eq(failureStrategy), eq(startNodeList), eq(taskDependType), @@ -166,7 +170,8 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest { eq(environmentCode), eq(Constants.MAX_TASK_TIMEOUT), eq(startParams), eq(expectedParallelismNumber), eq(dryRun), eq(testFlag), - eq(complementDependentMode), eq(version), eq(allLevelDependent))).thenReturn(executeServiceResult); + eq(complementDependentMode), eq(version), eq(allLevelDependent), eq(executionOrder))) + .thenReturn(executeServiceResult); // When final MvcResult mvcResult = mockMvc @@ -203,6 +208,7 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest { paramsMap.add("expectedParallelismNumber", String.valueOf(expectedParallelismNumber)); paramsMap.add("dryRun", String.valueOf(dryRun)); paramsMap.add("testFlag", String.valueOf(testFlag)); + paramsMap.add("executionOrder", String.valueOf(executionOrder)); when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode), eq(scheduleTime), eq(execType), eq(failureStrategy), eq(startNodeList), eq(taskDependType), @@ -210,7 +216,8 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest { eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(tenantCode), eq(environmentCode), eq(timeout), eq(null), eq(expectedParallelismNumber), eq(dryRun), eq(testFlag), - eq(complementDependentMode), eq(version), eq(allLevelDependent))).thenReturn(executeServiceResult); + eq(complementDependentMode), eq(version), eq(allLevelDependent), eq(executionOrder))) + .thenReturn(executeServiceResult); // When final MvcResult mvcResult = mockMvc @@ -239,7 +246,8 @@ public class ExecuteFunctionControllerTest extends AbstractControllerTest { eq(scheduleTime), eq(null), eq(failureStrategy), eq(null), eq(null), eq(warningType), eq(null), eq(null), eq(null), eq("default"), eq("default"), eq(-1L), eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0), eq(0), - eq(complementDependentMode), eq(version), eq(allLevelDependent))).thenReturn(executeServiceResult); + eq(complementDependentMode), eq(version), eq(allLevelDependent), eq(null))) + .thenReturn(executeServiceResult); // When final MvcResult mvcResult = mockMvc diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java index eba1f34e17..ff2aad42cc 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java @@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; +import org.apache.dolphinscheduler.common.enums.ExecutionOrder; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; @@ -280,10 +281,12 @@ public class ExecuteFunctionServiceTest { null, null, null, null, null, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 10, null, null, + Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE, null, - false); + false, + ExecutionOrder.DESC_ORDER); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(commandService, times(1)).createCommand(any(Command.class)); @@ -305,10 +308,12 @@ public class ExecuteFunctionServiceTest { null, "123456789,987654321", null, null, null, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, null, + Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE, null, - false); + false, + ExecutionOrder.DESC_ORDER); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(commandService, times(1)).createCommand(any(Command.class)); @@ -332,7 +337,8 @@ public class ExecuteFunctionServiceTest { Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE, null, - false); + false, + ExecutionOrder.DESC_ORDER); } catch (ServiceException e) { Assertions.assertEquals(Status.START_NODE_NOT_EXIST_IN_LAST_PROCESS.getCode(), e.getCode()); } @@ -413,10 +419,11 @@ public class ExecuteFunctionServiceTest { null, null, null, null, null, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 2, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE, null, - false); + false, + ExecutionOrder.DESC_ORDER); Assertions.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); verify(commandService, times(0)).createCommand(any(Command.class)); } @@ -437,10 +444,12 @@ public class ExecuteFunctionServiceTest { null, null, null, null, null, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, null, + Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE, null, - false); + false, + ExecutionOrder.DESC_ORDER); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(commandService, times(1)).createCommand(any(Command.class)); } @@ -461,13 +470,14 @@ public class ExecuteFunctionServiceTest { null, null, null, null, null, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 2, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE, null, - false); - Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - verify(commandService, times(31)).createCommand(any(Command.class)); + false, + ExecutionOrder.DESC_ORDER); + Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + verify(commandService, times(2)).createCommand(any(Command.class)); } /** @@ -490,7 +500,8 @@ public class ExecuteFunctionServiceTest { Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE, null, - false); + false, + ExecutionOrder.DESC_ORDER); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(commandService, times(15)).createCommand(any(Command.class)); @@ -518,11 +529,12 @@ public class ExecuteFunctionServiceTest { 100L, 110, null, - 0, + null, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE, null, - false)); + false, + ExecutionOrder.DESC_ORDER)); } @Test @@ -555,7 +567,8 @@ public class ExecuteFunctionServiceTest { Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_YES, ComplementDependentMode.OFF_MODE, null, - false); + false, + ExecutionOrder.DESC_ORDER); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionOrder.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionOrder.java new file mode 100644 index 0000000000..4c82d291dc --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionOrder.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.enums; + +import com.baomidou.mybatisplus.annotation.EnumValue; + +/** + * complement data in some kind of order + */ +public enum ExecutionOrder { + + /** + * 0 complement data in descending order + * 1 complement data in ascending order + */ + DESC_ORDER(0, "descending order"), + ASC_ORDER(1, "ascending order"); + + ExecutionOrder(int code, String desc) { + this.code = code; + this.desc = desc; + } + + @EnumValue + private final int code; + private final String desc; + + public int getCode() { + return code; + } + + public String getDesc() { + return desc; + } +} diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index fa69010413..091af80c99 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -140,6 +140,9 @@ export default { parallelism: 'Parallelism', custom_parallelism: 'Custom Parallelism', please_enter_parallelism: 'Please enter Parallelism', + order_of_execution: 'Order of execution', + ascending_order: 'In ascending order', + descending_order: 'In descending order', please_choose: 'Please Choose', start_time: 'Start Time', end_time: 'End Time', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index 20d1bb8993..c62f792bfe 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -141,6 +141,9 @@ export default { parallelism: '并行度', custom_parallelism: '自定义并行度', please_enter_parallelism: '请输入并行度', + order_of_execution: '执行顺序', + ascending_order: '按日期升序执行', + descending_order: '按日期降序执行', please_choose: '请选择', start_time: '开始时间', end_time: '结束时间', diff --git a/dolphinscheduler-ui/src/views/projects/workflow/definition/components/start-modal.tsx b/dolphinscheduler-ui/src/views/projects/workflow/definition/components/start-modal.tsx index 45e24e1506..f2c939a261 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/definition/components/start-modal.tsx +++ b/dolphinscheduler-ui/src/views/projects/workflow/definition/components/start-modal.tsx @@ -432,6 +432,21 @@ export default defineComponent({ /> )} + + + + + {t('project.workflow.descending_order')} + + + {t('project.workflow.ascending_order')} + + + + { dryRun: 0, testFlag: 0, version: null, - allLevelDependent: 'false' + allLevelDependent: 'false', + executionOrder: 'DESC_ORDER', }, saving: false, rules: {