diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java index 24e9242d11..87a70428da 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java @@ -83,22 +83,24 @@ public class ExecutorController extends BaseController { * @param processInstancePriority process instance priority * @param workerGroup worker group * @param timeout timeout + * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode * @return start process result code */ @ApiOperation(value = "startProcessInstance", notes = "RUN_PROCESS_INSTANCE_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"), - @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String"), - @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"), - @ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType = "String"), - @ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType = "TaskDependType"), - @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType = "CommandType"), - @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataType = "WarningType"), - @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"), - @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = "RunMode"), - @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"), - @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"), - @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"), + @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String"), + @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"), + @ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType = "String"), + @ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType = "TaskDependType"), + @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType = "CommandType"), + @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataType = "WarningType"), + @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = "RunMode"), + @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"), + @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"), + @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"), + @ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8"), }) @PostMapping(value = "start-process-instance") @ResponseStatus(HttpStatus.OK) @@ -118,7 +120,9 @@ public class ExecutorController extends BaseController { @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority, @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup, @RequestParam(value = "timeout", required = false) Integer timeout, - @RequestParam(value = "startParams", required = false) String startParams) { + @RequestParam(value = "startParams", required = false) String startParams, + @RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber + ) { if (timeout == null) { timeout = Constants.MAX_TASK_TIMEOUT; @@ -129,7 +133,7 @@ public class ExecutorController extends BaseController { } Map result = execService.execProcessInstance(loginUser, projectName, processDefinitionId, scheduleTime, execType, failureStrategy, startNodeList, taskDependType, warningType, - warningGroupId, runMode, processInstancePriority, workerGroup, timeout, startParamMap); + warningGroupId, runMode, processInstancePriority, workerGroup, timeout, startParamMap, expectedParallelismNumber); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 72a0089bc1..910f2235a4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -52,6 +52,7 @@ public interface ExecutorService { * @param runMode run mode * @param timeout timeout * @param startParams the global param values which pass to new process instance + * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode * @return execute process instance code */ Map execProcessInstance(User loginUser, String projectName, @@ -60,7 +61,7 @@ public interface ExecutorService { TaskDependType taskDependType, WarningType warningType, int warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, Integer timeout, - Map startParams); + Map startParams, Integer expectedParallelismNumber); /** * check whether the process definition can be executed diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index e8813b31ad..a87e7aed61 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -116,6 +116,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @param runMode run mode * @param timeout timeout * @param startParams the global param values which pass to new process instance + * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode * @return execute process instance code */ @Override @@ -125,7 +126,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ TaskDependType taskDependType, WarningType warningType, int warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, Integer timeout, - Map startParams) { + Map startParams, Integer expectedParallelismNumber) { Map result = new HashMap<>(); // timeout is invalid if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) { @@ -162,7 +163,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ */ int create = this.createCommand(commandType, processDefinitionId, taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), - warningGroupId, runMode, processInstancePriority, workerGroup, startParams); + warningGroupId, runMode, processInstancePriority, workerGroup, startParams, expectedParallelismNumber); if (create > 0) { processDefinition.setWarningGroupId(warningGroupId); @@ -392,10 +393,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ /** * insert command, used in the implementation of the page, re run, recovery (pause / failure) execution * - * @param loginUser login user - * @param instanceId instance id + * @param loginUser login user + * @param instanceId instance id * @param processDefinitionId process definition id - * @param commandType command type + * @param commandType command type * @return insert result code */ private Map insertCommand(User loginUser, Integer instanceId, Integer processDefinitionId, CommandType commandType, String startParams) { @@ -489,7 +490,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ String startNodeList, String schedule, WarningType warningType, int executorId, int warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, - Map startParams) { + Map startParams, Integer expectedParallelismNumber) { /** * instantiate command schedule instance @@ -534,7 +535,6 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ end = DateUtils.getScheduleDate(interval[1]); } } - // determine whether to complement if (commandType == CommandType.COMPLEMENT_DATA) { runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode; @@ -546,21 +546,31 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return processService.createCommand(command); } else if (runMode == RunMode.RUN_MODE_PARALLEL) { List schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefineId); - List listDate = new LinkedList<>(); + LinkedList listDate = new LinkedList<>(); if (!CollectionUtils.isEmpty(schedules)) { for (Schedule item : schedules) { listDate.addAll(CronUtils.getSelfFireDateList(start, end, item.getCrontab())); } } if (!CollectionUtils.isEmpty(listDate)) { - // loop by schedule date - for (Date date : listDate) { - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(date)); - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(date)); + int effectThreadsCount = expectedParallelismNumber == null ? listDate.size() : Math.min(listDate.size(), expectedParallelismNumber); + logger.info("In parallel mode, current expectedParallelismNumber:{}", effectThreadsCount); + + int chunkSize = listDate.size() / effectThreadsCount; + listDate.addFirst(start); + listDate.addLast(end); + + for (int i = 0; i < effectThreadsCount; i++) { + int rangeStart = i == 0 ? i : (i * chunkSize); + int rangeEnd = i == effectThreadsCount - 1 ? listDate.size() - 1 + : rangeStart + chunkSize + 1; + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(listDate.get(rangeStart))); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(rangeEnd))); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); processService.createCommand(command); } - return listDate.size(); + + return effectThreadsCount; } else { // loop by day int runCunt = 0; diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties index a7c8b84324..db705be6ab 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties @@ -171,6 +171,7 @@ PROCESS_INSTANCE_START_TIME=process instance start time PROCESS_INSTANCE_END_TIME=process instance end time PROCESS_INSTANCE_SIZE=process instance size PROCESS_INSTANCE_PRIORITY=process instance priority +EXPECTED_PARALLELISM_NUMBER=custom parallelism to set the complement task threads UPDATE_SCHEDULE_NOTES=update schedule SCHEDULE_ID=schedule id ONLINE_SCHEDULE_NOTES=online schedule diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties index acc5be8631..ec88f74fb5 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties @@ -157,6 +157,7 @@ RECEIVERS=收件人 RECEIVERS_CC=收件人(抄送) WORKER_GROUP_ID=Worker Server分组ID PROCESS_INSTANCE_PRIORITY=流程实例优先级 +EXPECTED_PARALLELISM_NUMBER=补数任务自定义并行度 UPDATE_SCHEDULE_NOTES=更新定时 SCHEDULE_ID=定时ID ONLINE_SCHEDULE_NOTES=定时上线 diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java index e389d0b621..b7d9fe1827 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java @@ -153,7 +153,7 @@ public class ExecutorService2Test { null, null, null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, 4); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); @@ -171,7 +171,7 @@ public class ExecutorService2Test { null, "n1,n2", null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, null); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); @@ -190,7 +190,7 @@ public class ExecutorService2Test { null, null, null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, null); Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); verify(processService, times(0)).createCommand(any(Command.class)); } @@ -207,7 +207,7 @@ public class ExecutorService2Test { null, null, null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, null); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); @@ -225,7 +225,7 @@ public class ExecutorService2Test { null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, null); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(31)).createCommand(any(Command.class)); @@ -243,9 +243,9 @@ public class ExecutorService2Test { null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, 4); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - verify(processService, times(15)).createCommand(any(Command.class)); + verify(processService, times(4)).createCommand(any(Command.class)); } @@ -258,7 +258,7 @@ public class ExecutorService2Test { null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null, 4); Assert.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS); } @@ -266,7 +266,7 @@ public class ExecutorService2Test { @Test public void testExecuteRepeatRunning() { Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true); - + Map result = executorService.execute(loginUser, projectName, processInstanceId, ExecuteType.REPEAT_RUNNING); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue index 2b478be062..982a15664b 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue @@ -116,12 +116,31 @@ {{$t('Mode of execution')}}
- + {{$t('Serial execution')}} {{$t('Parallel execution')}}
+
+
+ + {{$t('Parallelism')}} +
+
+ {{$t('Custom Parallelism')}} + + + +
+
{{$t('Schedule date')}} @@ -164,6 +183,7 @@