Browse Source

[Feature-7769][API] Add batch start process api (#7771)

* add notes
Co-authored-by: SbloodyS <sbloodys@qq.com>
3.0.0/version-upgrade
xiangzihao 3 years ago committed by GitHub
parent
commit
f2541248eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 104
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
  2. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  3. 5
      dolphinscheduler-api/src/main/resources/i18n/messages.properties
  4. 2
      dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
  5. 1
      dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties

104
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java

@ -23,6 +23,7 @@ import static org.apache.dolphinscheduler.api.enums.Status.START_PROCESS_INSTANC
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.utils.Result;
@ -36,8 +37,6 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.PathVariable;
@ -55,6 +54,13 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* executor controller
*/
@ -138,6 +144,100 @@ public class ExecutorController extends BaseController {
return returnDataList(result);
}
/**
* batch execute process instance
* If any processDefinitionCode cannot be found, the failure information is returned and the status is set to
* failed. The successful task will run normally and will not stop
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCodes process definition codes
* @param scheduleTime schedule time
* @param failureStrategy failure strategy
* @param startNodeList start nodes list
* @param taskDependType task depend type
* @param execType execute type
* @param warningType warning type
* @param warningGroupId warning group id
* @param runMode run mode
* @param processInstancePriority process instance priority
* @param workerGroup worker group
* @param timeout timeout
* @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode
* @return start process result code
*/
@ApiOperation(value = "batchStartProcessInstance", notes = "BATCH_RUN_PROCESS_INSTANCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionCodes", value = "PROCESS_DEFINITION_CODES", required = true, dataType = "String", example = "1,2,3"),
@ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String"),
@ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"),
@ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType = "String"),
@ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType = "TaskDependType"),
@ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType = "CommandType"),
@ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataType = "WarningType"),
@ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = "RunMode"),
@ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"),
@ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"),
@ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataType = "Long", example = "-1"),
@ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8")
})
@PostMapping(value = "batch-start-process-instance")
@ResponseStatus(HttpStatus.OK)
@ApiException(START_PROCESS_INSTANCE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result batchStartProcessInstance(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "processDefinitionCodes") String processDefinitionCodes,
@RequestParam(value = "scheduleTime", required = false) String scheduleTime,
@RequestParam(value = "failureStrategy", required = true) FailureStrategy failureStrategy,
@RequestParam(value = "startNodeList", required = false) String startNodeList,
@RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
@RequestParam(value = "execType", required = false) CommandType execType,
@RequestParam(value = "warningType", required = true) WarningType warningType,
@RequestParam(value = "warningGroupId", required = false) int warningGroupId,
@RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode,
@RequestParam(value = "timeout", required = false) Integer timeout,
@RequestParam(value = "startParams", required = false) String startParams,
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun) {
if (timeout == null) {
timeout = Constants.MAX_TASK_TIMEOUT;
}
Map<String, String> startParamMap = null;
if (startParams != null) {
startParamMap = JSONUtils.toMap(startParams);
}
Map<String, Object> result = new HashMap<>();
List<String> processDefinitionCodeArray = Arrays.asList(processDefinitionCodes.split(Constants.COMMA));
List<String> startFailedProcessDefinitionCodeList = new ArrayList<>();
processDefinitionCodeArray = processDefinitionCodeArray.stream().distinct().collect(Collectors.toList());
for (String strProcessDefinitionCode : processDefinitionCodeArray) {
long processDefinitionCode = Long.parseLong(strProcessDefinitionCode);
result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, execType, failureStrategy,
startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun);
if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) {
startFailedProcessDefinitionCodeList.add(String.valueOf(processDefinitionCode));
}
}
if (!startFailedProcessDefinitionCodeList.isEmpty()) {
putMsg(result, Status.BATCH_START_PROCESS_INSTANCE_ERROR, String.join(Constants.COMMA, startFailedProcessDefinitionCodeList));
}
return returnDataList(result);
}
/**
* do action to process instancepause, stop, repeat, recover from pause, recover from stop
*

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -251,6 +251,7 @@ public enum Status {
COUNT_PROCESS_INSTANCE_STATE_ERROR(50012, "count process instance state error", "查询各状态流程实例数错误"),
COUNT_PROCESS_DEFINITION_USER_ERROR(50013, "count process definition user error", "查询各用户流程定义数错误"),
START_PROCESS_INSTANCE_ERROR(50014, "start process instance error", "运行工作流实例错误"),
BATCH_START_PROCESS_INSTANCE_ERROR(50014, "batch start process instance error: {0}", "批量运行工作流实例错误: {0}"),
EXECUTE_PROCESS_INSTANCE_ERROR(50015, "execute process instance error", "操作工作流实例错误"),
CHECK_PROCESS_DEFINITION_ERROR(50016, "check process definition error", "工作流定义错误"),
QUERY_RECIPIENTS_AND_COPYERS_BY_PROCESS_DEFINITION_ERROR(50017, "query recipients and copyers by process definition error", "查询收件人和抄送人错误"),

5
dolphinscheduler-api/src/main/resources/i18n/messages.properties

@ -18,13 +18,14 @@
QUERY_SCHEDULE_LIST_NOTES=query schedule list
EXECUTE_PROCESS_TAG=execute process related operation
PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation
RUN_PROCESS_INSTANCE_NOTES=run process instance
RUN_PROCESS_INSTANCE_NOTES=run process instance
BATCH_RUN_PROCESS_INSTANCE_NOTES=batch run process instance
START_NODE_LIST=start node list(node name)
TASK_DEPEND_TYPE=task depend type
COMMAND_TYPE=command type
RUN_MODE=run mode
TIMEOUT=timeout
EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance
EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance
EXECUTE_TYPE=execute type
START_CHECK_PROCESS_DEFINITION_NOTES=start check process definition
GET_RECEIVER_CC_NOTES=query receiver cc

2
dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties

@ -18,6 +18,8 @@ QUERY_SCHEDULE_LIST_NOTES=query schedule list
EXECUTE_PROCESS_TAG=execute process related operation
PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation
RUN_PROCESS_INSTANCE_NOTES=run process instance
BATCH_RUN_PROCESS_INSTANCE_NOTES=batch run process instance(If any processDefinitionCode cannot be found, the failure\
\ information is returned and the status is set to failed. The successful task will run normally and will not stop)
START_NODE_LIST=start node list(node name)
TASK_DEPEND_TYPE=task depend type
COMMAND_TYPE=command type

1
dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties

@ -19,6 +19,7 @@ PROCESS_INSTANCE_EXECUTOR_TAG=流程实例执行相关操作
UI_PLUGINS_TAG=UI插件相关操作
WORK_FLOW_LINEAGE_TAG=工作流血缘相关操作
RUN_PROCESS_INSTANCE_NOTES=运行流程实例
BATCH_RUN_PROCESS_INSTANCE_NOTES=批量运行流程实例(其中有任意一个processDefinitionCode找不到,则返回失败信息并且状态置为失败,成功的任务会正常运行,不会停止)
START_NODE_LIST=开始节点列表(节点name)
TASK_DEPEND_TYPE=任务依赖类型
COMMAND_TYPE=指令类型

Loading…
Cancel
Save