diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2Controller.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2Controller.java index 8d416da1bb..521ba5bc10 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2Controller.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2Controller.java @@ -18,7 +18,10 @@ package org.apache.dolphinscheduler.api.controller; import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_INSTANCE_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.TASK_SAVEPOINT_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.TASK_STOP_ERROR; import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceListPagingResponse; @@ -28,6 +31,7 @@ import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.TaskInstanceService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; @@ -105,6 +109,50 @@ public class TaskInstanceV2Controller extends BaseController { return new TaskInstanceListPagingResponse(result); } + /** + * task savepoint, for stream task + * + * @param loginUser login user + * @param projectCode project code + * @param id task instance id + * @return the result code and msg + */ + @Operation(summary = "savepoint", description = "TASK_SAVEPOINT") + @Parameters({ + @Parameter(name = "id", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "12")) + }) + @PostMapping(value = "/{id}/savepoint") + @ResponseStatus(HttpStatus.OK) + @ApiException(TASK_SAVEPOINT_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result taskSavePoint(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @PathVariable(value = "id") Integer id) { + return taskInstanceService.taskSavePoint(loginUser, projectCode, id); + } + + /** + * task stop, for stream task + * + * @param loginUser login user + * @param projectCode project code + * @param id task instance id + * @return the result code and msg + */ + @Operation(summary = "stop", description = "TASK_INSTANCE_STOP") + @Parameters({ + @Parameter(name = "id", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "12")) + }) + @PostMapping(value = "/{id}/stop") + @ResponseStatus(HttpStatus.OK) + @ApiException(TASK_STOP_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result stopTask(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @PathVariable(value = "id") Integer id) { + return taskInstanceService.stopTask(loginUser, projectCode, id); + } + /** * change one task instance's state from FAILURE to FORCED_SUCCESS * @@ -127,4 +175,27 @@ public class TaskInstanceV2Controller extends BaseController { Result result = taskInstanceService.forceTaskSuccess(loginUser, projectCode, id); return new TaskInstanceSuccessResponse(result); } + + /** + * query taskInstance by taskInstanceCode + * + * @param loginUser login user + * @param projectCode project code + * @param taskInstanceId taskInstance Id + * @return the result code and msg + */ + @Operation(summary = "queryOneTaskInstance", description = "QUERY_ONE_TASK_INSTANCE") + @Parameters({ + @Parameter(name = "taskInstanceId", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = Long.class), example = "1234567890") + }) + @PostMapping(value = "/{taskInstanceId}", consumes = {"application/json"}) + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_TASK_INSTANCE_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public TaskInstance queryTaskInstanceByCode(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @PathVariable(value = "taskInstanceId") Long taskInstanceId) { + TaskInstance taskInstance = taskInstanceService.queryTaskInstanceById(loginUser, projectCode, taskInstanceId); + return taskInstance; + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index d82732d620..d0f82b3e99 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -260,10 +260,9 @@ public enum Status { SCHEDULE_NOT_EXISTS(10203, "schedule {0} does not exist", "调度 id {0} 不存在"), SCHEDULE_ALREADY_EXISTS(10204, "workflow {0} schedule {1} already exist, please update or delete it", "工作流 {0} 的定时 {1} 已经存在,请更新或删除"), - - EXECUTE_NOT_DEFINE_TASK(10204, "please save and try again", + QUERY_TASK_INSTANCE_ERROR(10205, "query task instance error", "查询任务实例错误"), + EXECUTE_NOT_DEFINE_TASK(10206, "please save and try again", "请先保存后再执行"), - UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), RESOURCE_NOT_EXIST(20004, "resource not exist", "资源不存在"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java index d5d05faa14..9fa259eb2b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.enums.TaskExecuteType; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; @@ -89,4 +90,14 @@ public interface TaskInstanceService { * @return */ Result stopTask(User loginUser, long projectCode, Integer taskInstanceId); + + /** + * query taskInstance by taskInstanceCode + * + * @param loginUser login user + * @param projectCode project code + * @param taskInstanceId taskInstance id + * @return the result code and msg + */ + TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long taskInstanceId); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index d830db5bf2..a212cb8299 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -306,4 +306,17 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst return result; } + + @Override + public TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long taskInstanceId) { + Project project = projectMapper.queryByCode(projectCode); + // check user access for project + projectService.checkProjectAndAuthThrowException(loginUser, project, FORCED_SUCCESS); + TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstanceId); + if (taskInstance == null) { + logger.error("Task instance can not be found, projectCode:{}, taskInstanceId:{}.", projectCode, + taskInstanceId); + } + return taskInstance; + } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2ControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2ControllerTest.java index 7624255087..bf1b5c5ac6 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2ControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2ControllerTest.java @@ -93,4 +93,42 @@ public class TaskInstanceV2ControllerTest extends AbstractControllerTest { } + @Test + public void testTaskSavePoint() { + + Result mockResult = new Result(); + putMsg(mockResult, Status.SUCCESS); + + when(taskInstanceService.taskSavePoint(any(), Mockito.anyLong(), Mockito.anyInt())).thenReturn(mockResult); + + Result taskResult = taskInstanceV2Controller.taskSavePoint(null, 1L, 1); + Assertions.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode()); + + } + + @Test + public void testStopTask() { + + Result mockResult = new Result(); + putMsg(mockResult, Status.SUCCESS); + + when(taskInstanceService.stopTask(any(), Mockito.anyLong(), Mockito.anyInt())).thenReturn(mockResult); + + Result taskResult = taskInstanceV2Controller.stopTask(null, 1L, 1); + Assertions.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode()); + + } + + @Test + public void testQueryTaskInstanceById() { + TaskInstance taskInstance = new TaskInstance(); + + when(taskInstanceService.queryTaskInstanceById(any(), Mockito.anyLong(), Mockito.anyLong())) + .thenReturn(taskInstance); + + TaskInstance taskResult = taskInstanceV2Controller.queryTaskInstanceByCode(null, 1L, 1L); + Assertions.assertNotNull(taskResult); + + } + } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index d3726ebf95..fc454a6841 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -336,7 +336,7 @@ where instance.process_instance_id = #{processInstanceId} and que.status = #{status} - + delete from t_ds_task_instance