diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java index 3cff1988fc..4f651c0159 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java @@ -23,6 +23,7 @@ import static org.apache.dolphinscheduler.api.enums.Status.START_PROCESS_INSTANC import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; import org.apache.dolphinscheduler.api.enums.ExecuteType; +import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.ExecutorService; import org.apache.dolphinscheduler.api.utils.Result; @@ -36,8 +37,6 @@ import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.User; -import java.util.Map; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.PathVariable; @@ -55,6 +54,13 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import springfox.documentation.annotations.ApiIgnore; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + /** * executor controller */ @@ -138,6 +144,100 @@ public class ExecutorController extends BaseController { return returnDataList(result); } + /** + * batch execute process instance + * If any processDefinitionCode cannot be found, the failure information is returned and the status is set to + * failed. The successful task will run normally and will not stop + * + * @param loginUser login user + * @param projectCode project code + * @param processDefinitionCodes process definition codes + * @param scheduleTime schedule time + * @param failureStrategy failure strategy + * @param startNodeList start nodes list + * @param taskDependType task depend type + * @param execType execute type + * @param warningType warning type + * @param warningGroupId warning group id + * @param runMode run mode + * @param processInstancePriority process instance priority + * @param workerGroup worker group + * @param timeout timeout + * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode + * @return start process result code + */ + @ApiOperation(value = "batchStartProcessInstance", notes = "BATCH_RUN_PROCESS_INSTANCE_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processDefinitionCodes", value = "PROCESS_DEFINITION_CODES", required = true, dataType = "String", example = "1,2,3"), + @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String"), + @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"), + @ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType = "String"), + @ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType = "TaskDependType"), + @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType = "CommandType"), + @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataType = "WarningType"), + @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = "RunMode"), + @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"), + @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"), + @ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataType = "Long", example = "-1"), + @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"), + @ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8") + }) + @PostMapping(value = "batch-start-process-instance") + @ResponseStatus(HttpStatus.OK) + @ApiException(START_PROCESS_INSTANCE_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result batchStartProcessInstance(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @RequestParam(value = "processDefinitionCodes") String processDefinitionCodes, + @RequestParam(value = "scheduleTime", required = false) String scheduleTime, + @RequestParam(value = "failureStrategy", required = true) FailureStrategy failureStrategy, + @RequestParam(value = "startNodeList", required = false) String startNodeList, + @RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType, + @RequestParam(value = "execType", required = false) CommandType execType, + @RequestParam(value = "warningType", required = true) WarningType warningType, + @RequestParam(value = "warningGroupId", required = false) int warningGroupId, + @RequestParam(value = "runMode", required = false) RunMode runMode, + @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority, + @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup, + @RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode, + @RequestParam(value = "timeout", required = false) Integer timeout, + @RequestParam(value = "startParams", required = false) String startParams, + @RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber, + @RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun) { + + if (timeout == null) { + timeout = Constants.MAX_TASK_TIMEOUT; + } + + Map startParamMap = null; + if (startParams != null) { + startParamMap = JSONUtils.toMap(startParams); + } + + Map result = new HashMap<>(); + List processDefinitionCodeArray = Arrays.asList(processDefinitionCodes.split(Constants.COMMA)); + List startFailedProcessDefinitionCodeList = new ArrayList<>(); + + processDefinitionCodeArray = processDefinitionCodeArray.stream().distinct().collect(Collectors.toList()); + + for (String strProcessDefinitionCode : processDefinitionCodeArray) { + long processDefinitionCode = Long.parseLong(strProcessDefinitionCode); + result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, execType, failureStrategy, + startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun); + + if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) { + startFailedProcessDefinitionCodeList.add(String.valueOf(processDefinitionCode)); + } + } + + if (!startFailedProcessDefinitionCodeList.isEmpty()) { + putMsg(result, Status.BATCH_START_PROCESS_INSTANCE_ERROR, String.join(Constants.COMMA, startFailedProcessDefinitionCodeList)); + } + + return returnDataList(result); + } + /** * do action to process instance:pause, stop, repeat, recover from pause, recover from stop * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index ec4931b7d6..b5202c5c68 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -251,6 +251,7 @@ public enum Status { COUNT_PROCESS_INSTANCE_STATE_ERROR(50012, "count process instance state error", "查询各状态流程实例数错误"), COUNT_PROCESS_DEFINITION_USER_ERROR(50013, "count process definition user error", "查询各用户流程定义数错误"), START_PROCESS_INSTANCE_ERROR(50014, "start process instance error", "运行工作流实例错误"), + BATCH_START_PROCESS_INSTANCE_ERROR(50014, "batch start process instance error: {0}", "批量运行工作流实例错误: {0}"), EXECUTE_PROCESS_INSTANCE_ERROR(50015, "execute process instance error", "操作工作流实例错误"), CHECK_PROCESS_DEFINITION_ERROR(50016, "check process definition error", "工作流定义错误"), QUERY_RECIPIENTS_AND_COPYERS_BY_PROCESS_DEFINITION_ERROR(50017, "query recipients and copyers by process definition error", "查询收件人和抄送人错误"), diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages.properties b/dolphinscheduler-api/src/main/resources/i18n/messages.properties index 7a4282a63c..928e608880 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages.properties @@ -18,13 +18,14 @@ QUERY_SCHEDULE_LIST_NOTES=query schedule list EXECUTE_PROCESS_TAG=execute process related operation PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation -RUN_PROCESS_INSTANCE_NOTES=run process instance +RUN_PROCESS_INSTANCE_NOTES=run process instance +BATCH_RUN_PROCESS_INSTANCE_NOTES=batch run process instance START_NODE_LIST=start node list(node name) TASK_DEPEND_TYPE=task depend type COMMAND_TYPE=command type RUN_MODE=run mode TIMEOUT=timeout -EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance +EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance EXECUTE_TYPE=execute type START_CHECK_PROCESS_DEFINITION_NOTES=start check process definition GET_RECEIVER_CC_NOTES=query receiver cc diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties index 56800f29e6..a6ef33ae22 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties @@ -18,6 +18,8 @@ QUERY_SCHEDULE_LIST_NOTES=query schedule list EXECUTE_PROCESS_TAG=execute process related operation PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation RUN_PROCESS_INSTANCE_NOTES=run process instance +BATCH_RUN_PROCESS_INSTANCE_NOTES=batch run process instance(If any processDefinitionCode cannot be found, the failure\ + \ information is returned and the status is set to failed. The successful task will run normally and will not stop) START_NODE_LIST=start node list(node name) TASK_DEPEND_TYPE=task depend type COMMAND_TYPE=command type diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties index 6343d89f30..e5e7476c7b 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties @@ -19,6 +19,7 @@ PROCESS_INSTANCE_EXECUTOR_TAG=流程实例执行相关操作 UI_PLUGINS_TAG=UI插件相关操作 WORK_FLOW_LINEAGE_TAG=工作流血缘相关操作 RUN_PROCESS_INSTANCE_NOTES=运行流程实例 +BATCH_RUN_PROCESS_INSTANCE_NOTES=批量运行流程实例(其中有任意一个processDefinitionCode找不到,则返回失败信息并且状态置为失败,成功的任务会正常运行,不会停止) START_NODE_LIST=开始节点列表(节点name) TASK_DEPEND_TYPE=任务依赖类型 COMMAND_TYPE=指令类型