Browse Source

[feat] Add batch rerun and stop for process instance (#10013)

* [feat] Add batch rerun and stop for process instance

* fix execute type pass

* error message changed

* correct error message

* add note to i18n ignore

* enhance the note of process instance ids
3.1.0-release
Jiajie Zhong 2 years ago committed by GitHub
parent
commit
3e471f8b4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 60
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
  2. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
  3. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  4. 1
      dolphinscheduler-api/src/main/resources/i18n/messages.properties
  5. 3
      dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
  6. 3
      dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties

60
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.BATCH_EXECUTE_PROCESS_INSTANCE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.CHECK_PROCESS_DEFINITION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.EXECUTE_PROCESS_INSTANCE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.START_PROCESS_INSTANCE_ERROR;
@ -38,6 +39,8 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.PathVariable;
@ -55,6 +58,9 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore;
import org.apache.commons.lang.StringUtils;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -70,6 +76,8 @@ import java.util.stream.Collectors;
@RequestMapping("projects/{projectCode}/executors")
public class ExecutorController extends BaseController {
private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceController.class);
@Autowired
private ExecutorService execService;
@ -259,7 +267,7 @@ public class ExecutorController extends BaseController {
}
/**
* do action to process instancepause, stop, repeat, recover from pause, recover from stop
* do action to process instance: pause, stop, repeat, recover from pause, recover from stop
*
* @param loginUser login user
* @param projectCode project code
@ -285,6 +293,56 @@ public class ExecutorController extends BaseController {
return returnDataList(result);
}
/**
* batch execute and do action to process instance
*
* @param loginUser login user
* @param projectCode project code
* @param processInstanceIds process instance ids, delimiter by "," if more than one id
* @param executeType execute type
* @return execute result code
*/
@ApiOperation(value = "batchExecute", notes = "BATCH_EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, dataType = "Int"),
@ApiImplicitParam(name = "processInstanceIds", value = "PROCESS_INSTANCE_IDS", required = true, dataType = "String"),
@ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType")
})
@PostMapping(value = "/batch-execute")
@ResponseStatus(HttpStatus.OK)
@ApiException(BATCH_EXECUTE_PROCESS_INSTANCE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result batchExecute(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable long projectCode,
@RequestParam("processInstanceIds") String processInstanceIds,
@RequestParam("executeType") ExecuteType executeType
) {
Map<String, Object> result = new HashMap<>();
List<String> executeFailedIdList = new ArrayList<>();
if (!StringUtils.isEmpty(processInstanceIds)) {
String[] processInstanceIdArray = processInstanceIds.split(Constants.COMMA);
for (String strProcessInstanceId : processInstanceIdArray) {
int processInstanceId = Integer.parseInt(strProcessInstanceId);
try {
Map<String, Object> singleResult = execService.execute(loginUser, projectCode, processInstanceId, executeType);
if (!Status.SUCCESS.equals(singleResult.get(Constants.STATUS))) {
executeFailedIdList.add((String) singleResult.get(Constants.MSG));
logger.error((String) singleResult.get(Constants.MSG));
}
} catch (Exception e) {
executeFailedIdList.add(MessageFormat.format(Status.PROCESS_INSTANCE_ERROR.getMsg(), strProcessInstanceId));
}
}
}
if (!executeFailedIdList.isEmpty()) {
putMsg(result, Status.BATCH_EXECUTE_PROCESS_INSTANCE_ERROR, String.join("\n", executeFailedIdList));
} else {
putMsg(result, Status.SUCCESS);
}
return returnDataList(result);
}
/**
* check process definition and all the son process definitions is online.
*

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java

@ -380,7 +380,7 @@ public class ProcessInstanceController extends BaseController {
*/
@ApiOperation(value = "batchDeleteProcessInstanceByIds", notes = "BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "projectName", value = "PROJECT_NAME", required = true, dataType = "String"),
@ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, dataType = "Int"),
@ApiImplicitParam(name = "processInstanceIds", value = "PROCESS_INSTANCE_IDS", required = true, dataType = "String"),
})
@PostMapping(value = "/batch-delete")
@ -394,7 +394,7 @@ public class ProcessInstanceController extends BaseController {
Map<String, Object> result = new HashMap<>();
List<String> deleteFailedIdList = new ArrayList<>();
if (!StringUtils.isEmpty(processInstanceIds)) {
String[] processInstanceIdArray = processInstanceIds.split(",");
String[] processInstanceIdArray = processInstanceIds.split(Constants.COMMA);
for (String strProcessInstanceId : processInstanceIdArray) {
int processInstanceId = Integer.parseInt(strProcessInstanceId);

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -300,6 +300,7 @@ public enum Status {
DELETE_EDGE_ERROR(50055, "delete edge error", "删除工作流任务连接线错误"),
NOT_SUPPORT_UPDATE_TASK_DEFINITION(50056, "task state does not support modification", "当前任务不支持修改"),
NOT_SUPPORT_COPY_TASK_TYPE(50057, "task type [{0}] does not support copy", "不支持复制的任务类型[{0}]"),
BATCH_EXECUTE_PROCESS_INSTANCE_ERROR(50058, "change process instance status error: {0}", "修改工作实例状态错误: {0}"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
STORAGE_NOT_STARTUP(60002, "storage not startup", "存储未启用"),
S3_CANNOT_RENAME(60003, "directory cannot be renamed", "S3无法重命名文件夹"),

1
dolphinscheduler-api/src/main/resources/i18n/messages.properties

@ -20,6 +20,7 @@ EXECUTE_PROCESS_TAG=execute process related operation
PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation
RUN_PROCESS_INSTANCE_NOTES=run process instance
BATCH_RUN_PROCESS_INSTANCE_NOTES=batch run process instance
BATCH_EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=batch change execute state for process instance
START_NODE_LIST=start node list(node name)
TASK_DEPEND_TYPE=task depend type
COMMAND_TYPE=command type

3
dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties

@ -19,6 +19,7 @@ EXECUTE_PROCESS_TAG=execute process related operation
PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation
RUN_PROCESS_INSTANCE_NOTES=run process instance
BATCH_RUN_PROCESS_INSTANCE_NOTES=batch run process instance(If any processDefinitionCode cannot be found, the failure information is returned and the status is set to failed. The successful task will run normally and will not stop)
BATCH_EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=batch change state for muliple process instances(Will raise error with specific id when any it cannot be found, and will only show detail error message when some instances change state not as expected)
START_NODE_LIST=start node list(node name)
TASK_DEPEND_TYPE=task depend type
COMMAND_TYPE=command type
@ -259,7 +260,7 @@ BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_NOTES=batch delete process instance by proc
QUERY_PROCESS_INSTANCE_BY_ID_NOTES=query process instance by process instance id
DELETE_PROCESS_INSTANCE_BY_ID_NOTES=delete process instance by process instance id
TASK_ID=task instance id
PROCESS_INSTANCE_IDS=process_instance ids
PROCESS_INSTANCE_IDS=process_instance ids, delimiter by "," if more than one id
SKIP_LINE_NUM=skip line num
QUERY_TASK_INSTANCE_LOG_NOTES=query task instance log
DOWNLOAD_TASK_INSTANCE_LOG_NOTES=download task instance log

3
dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties

@ -20,6 +20,7 @@ UI_PLUGINS_TAG=UI插件相关操作
WORK_FLOW_LINEAGE_TAG=工作流血缘相关操作
RUN_PROCESS_INSTANCE_NOTES=运行流程实例
BATCH_RUN_PROCESS_INSTANCE_NOTES=批量运行流程实例(其中有任意一个processDefinitionCode找不到,则返回失败信息并且状态置为失败,成功的任务会正常运行,不会停止)
BATCH_EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=批量修改工作流实例状态(当实例id找不到时会报对应id的错误,当部分实例更改状态不符合预期会说明这部分的具体原因)
START_NODE_LIST=开始节点列表(节点name)
TASK_DEPEND_TYPE=任务依赖类型
COMMAND_TYPE=指令类型
@ -203,7 +204,7 @@ QUERY_PROCESS_DEFINITION_LIST_PAGING_NOTES=分页查询流程定义列表
QUERY_ALL_DEFINITION_LIST_NOTES=查询所有流程定义
PAGE_NO=页码号
PROCESS_INSTANCE_ID=流程实例ID
PROCESS_INSTANCE_IDS=流程实例ID集合
PROCESS_INSTANCE_IDS=流程实例ID集合,如果有多个流程实例则用 "," 分隔
PROCESS_INSTANCE_JSON=流程实例信息(json格式)
PREVIEW_SCHEDULE_NOTES=定时调度预览
SCHEDULE_TIME=定时时间,空字符串表示当前天

Loading…
Cancel
Save