From 3e471f8b4c9ea781aed152ea4b9c3913503d5909 Mon Sep 17 00:00:00 2001 From: Jiajie Zhong Date: Mon, 16 May 2022 11:01:32 +0800 Subject: [PATCH] [feat] Add batch rerun and stop for process instance (#10013) * [feat] Add batch rerun and stop for process instance * fix execute type pass * error message changed * correct error message * add note to i18n ignore * enhance the note of process instance ids --- .../api/controller/ExecutorController.java | 60 ++++++++++++++++++- .../controller/ProcessInstanceController.java | 4 +- .../dolphinscheduler/api/enums/Status.java | 1 + .../main/resources/i18n/messages.properties | 1 + .../resources/i18n/messages_en_US.properties | 3 +- .../resources/i18n/messages_zh_CN.properties | 3 +- 6 files changed, 67 insertions(+), 5 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java index 0a6fed5072..47c2744d87 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.api.controller; +import static org.apache.dolphinscheduler.api.enums.Status.BATCH_EXECUTE_PROCESS_INSTANCE_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.CHECK_PROCESS_DEFINITION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.EXECUTE_PROCESS_INSTANCE_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.START_PROCESS_INSTANCE_ERROR; @@ -38,6 +39,8 @@ import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.User; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.PathVariable; @@ -55,6 +58,9 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import springfox.documentation.annotations.ApiIgnore; +import org.apache.commons.lang.StringUtils; + +import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -70,6 +76,8 @@ import java.util.stream.Collectors; @RequestMapping("projects/{projectCode}/executors") public class ExecutorController extends BaseController { + private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceController.class); + @Autowired private ExecutorService execService; @@ -259,7 +267,7 @@ public class ExecutorController extends BaseController { } /** - * do action to process instance:pause, stop, repeat, recover from pause, recover from stop + * do action to process instance: pause, stop, repeat, recover from pause, recover from stop * * @param loginUser login user * @param projectCode project code @@ -285,6 +293,56 @@ public class ExecutorController extends BaseController { return returnDataList(result); } + /** + * batch execute and do action to process instance + * + * @param loginUser login user + * @param projectCode project code + * @param processInstanceIds process instance ids, delimiter by "," if more than one id + * @param executeType execute type + * @return execute result code + */ + @ApiOperation(value = "batchExecute", notes = "BATCH_EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, dataType = "Int"), + @ApiImplicitParam(name = "processInstanceIds", value = "PROCESS_INSTANCE_IDS", required = true, dataType = "String"), + @ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType") + }) + @PostMapping(value = "/batch-execute") + @ResponseStatus(HttpStatus.OK) + @ApiException(BATCH_EXECUTE_PROCESS_INSTANCE_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result batchExecute(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @PathVariable long projectCode, + @RequestParam("processInstanceIds") String processInstanceIds, + @RequestParam("executeType") ExecuteType executeType + ) { + Map result = new HashMap<>(); + List executeFailedIdList = new ArrayList<>(); + if (!StringUtils.isEmpty(processInstanceIds)) { + String[] processInstanceIdArray = processInstanceIds.split(Constants.COMMA); + + for (String strProcessInstanceId : processInstanceIdArray) { + int processInstanceId = Integer.parseInt(strProcessInstanceId); + try { + Map singleResult = execService.execute(loginUser, projectCode, processInstanceId, executeType); + if (!Status.SUCCESS.equals(singleResult.get(Constants.STATUS))) { + executeFailedIdList.add((String) singleResult.get(Constants.MSG)); + logger.error((String) singleResult.get(Constants.MSG)); + } + } catch (Exception e) { + executeFailedIdList.add(MessageFormat.format(Status.PROCESS_INSTANCE_ERROR.getMsg(), strProcessInstanceId)); + } + } + } + if (!executeFailedIdList.isEmpty()) { + putMsg(result, Status.BATCH_EXECUTE_PROCESS_INSTANCE_ERROR, String.join("\n", executeFailedIdList)); + } else { + putMsg(result, Status.SUCCESS); + } + return returnDataList(result); + } + /** * check process definition and all the son process definitions is online. * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java index aee4740d5c..5043d81923 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java @@ -380,7 +380,7 @@ public class ProcessInstanceController extends BaseController { */ @ApiOperation(value = "batchDeleteProcessInstanceByIds", notes = "BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "projectName", value = "PROJECT_NAME", required = true, dataType = "String"), + @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, dataType = "Int"), @ApiImplicitParam(name = "processInstanceIds", value = "PROCESS_INSTANCE_IDS", required = true, dataType = "String"), }) @PostMapping(value = "/batch-delete") @@ -394,7 +394,7 @@ public class ProcessInstanceController extends BaseController { Map result = new HashMap<>(); List deleteFailedIdList = new ArrayList<>(); if (!StringUtils.isEmpty(processInstanceIds)) { - String[] processInstanceIdArray = processInstanceIds.split(","); + String[] processInstanceIdArray = processInstanceIds.split(Constants.COMMA); for (String strProcessInstanceId : processInstanceIdArray) { int processInstanceId = Integer.parseInt(strProcessInstanceId); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index a755a75e1c..6108a6ed76 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -300,6 +300,7 @@ public enum Status { DELETE_EDGE_ERROR(50055, "delete edge error", "删除工作流任务连接线错误"), NOT_SUPPORT_UPDATE_TASK_DEFINITION(50056, "task state does not support modification", "当前任务不支持修改"), NOT_SUPPORT_COPY_TASK_TYPE(50057, "task type [{0}] does not support copy", "不支持复制的任务类型[{0}]"), + BATCH_EXECUTE_PROCESS_INSTANCE_ERROR(50058, "change process instance status error: {0}", "修改工作实例状态错误: {0}"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), STORAGE_NOT_STARTUP(60002, "storage not startup", "存储未启用"), S3_CANNOT_RENAME(60003, "directory cannot be renamed", "S3无法重命名文件夹"), diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages.properties b/dolphinscheduler-api/src/main/resources/i18n/messages.properties index 6bfcaf92cd..5069bb7d33 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages.properties @@ -20,6 +20,7 @@ EXECUTE_PROCESS_TAG=execute process related operation PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation RUN_PROCESS_INSTANCE_NOTES=run process instance BATCH_RUN_PROCESS_INSTANCE_NOTES=batch run process instance +BATCH_EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=batch change execute state for process instance START_NODE_LIST=start node list(node name) TASK_DEPEND_TYPE=task depend type COMMAND_TYPE=command type diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties index 39bc227a21..17c34d052f 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties @@ -19,6 +19,7 @@ EXECUTE_PROCESS_TAG=execute process related operation PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation RUN_PROCESS_INSTANCE_NOTES=run process instance BATCH_RUN_PROCESS_INSTANCE_NOTES=batch run process instance(If any processDefinitionCode cannot be found, the failure information is returned and the status is set to failed. The successful task will run normally and will not stop) +BATCH_EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=batch change state for muliple process instances(Will raise error with specific id when any it cannot be found, and will only show detail error message when some instances change state not as expected) START_NODE_LIST=start node list(node name) TASK_DEPEND_TYPE=task depend type COMMAND_TYPE=command type @@ -259,7 +260,7 @@ BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_NOTES=batch delete process instance by proc QUERY_PROCESS_INSTANCE_BY_ID_NOTES=query process instance by process instance id DELETE_PROCESS_INSTANCE_BY_ID_NOTES=delete process instance by process instance id TASK_ID=task instance id -PROCESS_INSTANCE_IDS=process_instance ids +PROCESS_INSTANCE_IDS=process_instance ids, delimiter by "," if more than one id SKIP_LINE_NUM=skip line num QUERY_TASK_INSTANCE_LOG_NOTES=query task instance log DOWNLOAD_TASK_INSTANCE_LOG_NOTES=download task instance log diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties index 1e779863c6..223a853307 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties @@ -20,6 +20,7 @@ UI_PLUGINS_TAG=UI插件相关操作 WORK_FLOW_LINEAGE_TAG=工作流血缘相关操作 RUN_PROCESS_INSTANCE_NOTES=运行流程实例 BATCH_RUN_PROCESS_INSTANCE_NOTES=批量运行流程实例(其中有任意一个processDefinitionCode找不到,则返回失败信息并且状态置为失败,成功的任务会正常运行,不会停止) +BATCH_EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=批量修改工作流实例状态(当实例id找不到时会报对应id的错误,当部分实例更改状态不符合预期会说明这部分的具体原因) START_NODE_LIST=开始节点列表(节点name) TASK_DEPEND_TYPE=任务依赖类型 COMMAND_TYPE=指令类型 @@ -203,7 +204,7 @@ QUERY_PROCESS_DEFINITION_LIST_PAGING_NOTES=分页查询流程定义列表 QUERY_ALL_DEFINITION_LIST_NOTES=查询所有流程定义 PAGE_NO=页码号 PROCESS_INSTANCE_ID=流程实例ID -PROCESS_INSTANCE_IDS=流程实例ID集合 +PROCESS_INSTANCE_IDS=流程实例ID集合,如果有多个流程实例则用 "," 分隔 PROCESS_INSTANCE_JSON=流程实例信息(json格式) PREVIEW_SCHEDULE_NOTES=定时调度预览 SCHEDULE_TIME=定时时间,空字符串表示当前天