Browse Source

[Feature][API] New restful API for taskInstance (#13070)

3.2.0-release
insist777 2 years ago committed by GitHub
parent
commit
631e3a81d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 71
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2Controller.java
  2. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  3. 11
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
  4. 13
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
  5. 38
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2ControllerTest.java
  6. 2
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

71
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2Controller.java

@ -18,7 +18,10 @@
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_INSTANCE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.TASK_SAVEPOINT_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.TASK_STOP_ERROR;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceListPagingResponse;
@ -28,6 +31,7 @@ import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
@ -105,6 +109,50 @@ public class TaskInstanceV2Controller extends BaseController {
return new TaskInstanceListPagingResponse(result);
}
/**
* task savepoint, for stream task
*
* @param loginUser login user
* @param projectCode project code
* @param id task instance id
* @return the result code and msg
*/
@Operation(summary = "savepoint", description = "TASK_SAVEPOINT")
@Parameters({
@Parameter(name = "id", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "12"))
})
@PostMapping(value = "/{id}/savepoint")
@ResponseStatus(HttpStatus.OK)
@ApiException(TASK_SAVEPOINT_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<Object> taskSavePoint(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "id") Integer id) {
return taskInstanceService.taskSavePoint(loginUser, projectCode, id);
}
/**
* task stop, for stream task
*
* @param loginUser login user
* @param projectCode project code
* @param id task instance id
* @return the result code and msg
*/
@Operation(summary = "stop", description = "TASK_INSTANCE_STOP")
@Parameters({
@Parameter(name = "id", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "12"))
})
@PostMapping(value = "/{id}/stop")
@ResponseStatus(HttpStatus.OK)
@ApiException(TASK_STOP_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<Object> stopTask(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "id") Integer id) {
return taskInstanceService.stopTask(loginUser, projectCode, id);
}
/**
* change one task instance's state from FAILURE to FORCED_SUCCESS
*
@ -127,4 +175,27 @@ public class TaskInstanceV2Controller extends BaseController {
Result result = taskInstanceService.forceTaskSuccess(loginUser, projectCode, id);
return new TaskInstanceSuccessResponse(result);
}
/**
* query taskInstance by taskInstanceCode
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstanceId taskInstance Id
* @return the result code and msg
*/
@Operation(summary = "queryOneTaskInstance", description = "QUERY_ONE_TASK_INSTANCE")
@Parameters({
@Parameter(name = "taskInstanceId", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = Long.class), example = "1234567890")
})
@PostMapping(value = "/{taskInstanceId}", consumes = {"application/json"})
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_INSTANCE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public TaskInstance queryTaskInstanceByCode(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "taskInstanceId") Long taskInstanceId) {
TaskInstance taskInstance = taskInstanceService.queryTaskInstanceById(loginUser, projectCode, taskInstanceId);
return taskInstance;
}
}

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -260,10 +260,9 @@ public enum Status {
SCHEDULE_NOT_EXISTS(10203, "schedule {0} does not exist", "调度 id {0} 不存在"),
SCHEDULE_ALREADY_EXISTS(10204, "workflow {0} schedule {1} already exist, please update or delete it",
"工作流 {0} 的定时 {1} 已经存在,请更新或删除"),
EXECUTE_NOT_DEFINE_TASK(10204, "please save and try again",
QUERY_TASK_INSTANCE_ERROR(10205, "query task instance error", "查询任务实例错误"),
EXECUTE_NOT_DEFINE_TASK(10206, "please save and try again",
"请先保存后再执行"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
RESOURCE_NOT_EXIST(20004, "resource not exist", "资源不存在"),

11
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@ -89,4 +90,14 @@ public interface TaskInstanceService {
* @return
*/
Result stopTask(User loginUser, long projectCode, Integer taskInstanceId);
/**
* query taskInstance by taskInstanceCode
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstanceId taskInstance id
* @return the result code and msg
*/
TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long taskInstanceId);
}

13
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java

@ -306,4 +306,17 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
return result;
}
@Override
public TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long taskInstanceId) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
projectService.checkProjectAndAuthThrowException(loginUser, project, FORCED_SUCCESS);
TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstanceId);
if (taskInstance == null) {
logger.error("Task instance can not be found, projectCode:{}, taskInstanceId:{}.", projectCode,
taskInstanceId);
}
return taskInstance;
}
}

38
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceV2ControllerTest.java

@ -93,4 +93,42 @@ public class TaskInstanceV2ControllerTest extends AbstractControllerTest {
}
@Test
public void testTaskSavePoint() {
Result mockResult = new Result();
putMsg(mockResult, Status.SUCCESS);
when(taskInstanceService.taskSavePoint(any(), Mockito.anyLong(), Mockito.anyInt())).thenReturn(mockResult);
Result taskResult = taskInstanceV2Controller.taskSavePoint(null, 1L, 1);
Assertions.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode());
}
@Test
public void testStopTask() {
Result mockResult = new Result();
putMsg(mockResult, Status.SUCCESS);
when(taskInstanceService.stopTask(any(), Mockito.anyLong(), Mockito.anyInt())).thenReturn(mockResult);
Result taskResult = taskInstanceV2Controller.stopTask(null, 1L, 1);
Assertions.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode());
}
@Test
public void testQueryTaskInstanceById() {
TaskInstance taskInstance = new TaskInstance();
when(taskInstanceService.queryTaskInstanceById(any(), Mockito.anyLong(), Mockito.anyLong()))
.thenReturn(taskInstance);
TaskInstance taskResult = taskInstanceV2Controller.queryTaskInstanceByCode(null, 1L, 1L);
Assertions.assertNotNull(taskResult);
}
}

2
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

@ -336,7 +336,7 @@
where instance.process_instance_id = #{processInstanceId}
and que.status = #{status}
</select>
<delete id="deleteByWorkflowInstanceId">
delete
from t_ds_task_instance

Loading…
Cancel
Save