diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java index 1865dfee5d..e0055595c9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java @@ -153,10 +153,11 @@ public class TaskInstanceController extends BaseController { @ResponseStatus(HttpStatus.OK) @ApiException(FORCE_TASK_SUCCESS_ERROR) @OperatorLog(auditType = AuditType.TASK_INSTANCE_FORCE_SUCCESS) - public Result forceTaskSuccess(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @Schema(name = "projectCode", required = true) @PathVariable long projectCode, - @PathVariable(value = "id") Integer id) { - return taskInstanceService.forceTaskSuccess(loginUser, projectCode, id); + public Result forceTaskSuccess(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @Schema(name = "projectCode", required = true) @PathVariable long projectCode, + @PathVariable(value = "id") Integer id) { + taskInstanceService.forceTaskSuccess(loginUser, projectCode, id); + return Result.success(); } /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2Controller.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2Controller.java index 3e3a87681b..ea767f0fa3 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2Controller.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2Controller.java @@ -167,8 +167,8 @@ public class TaskInstanceV2Controller extends BaseController { public TaskInstanceSuccessResponse forceTaskSuccess(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, @PathVariable(value = "id") Integer id) { - Result result = taskInstanceService.forceTaskSuccess(loginUser, projectCode, id); - return new TaskInstanceSuccessResponse(result); + taskInstanceService.forceTaskSuccess(loginUser, projectCode, id); + return new TaskInstanceSuccessResponse(Result.success()); } /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java index 86e5396dbe..bff0518685 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java @@ -72,9 +72,9 @@ public interface TaskInstanceService { * @param taskInstanceId task instance id * @return the result code and msg */ - Result forceTaskSuccess(User loginUser, - long projectCode, - Integer taskInstanceId); + void forceTaskSuccess(User loginUser, + long projectCode, + Integer taskInstanceId); /** * task savepoint diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index f06f8115a9..7469d8db13 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -23,6 +23,7 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.TaskGroupQueueService; import org.apache.dolphinscheduler.api.service.TaskInstanceService; @@ -33,14 +34,15 @@ import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.TaskExecuteType; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.repository.DqExecuteResultDao; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils; import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; @@ -107,6 +109,9 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst @Autowired private TaskGroupQueueService taskGroupQueueService; + @Autowired + private ProcessInstanceDao workflowInstanceDao; + /** * query task list by project, process instance, task name, task start time, task end time, task status, keyword paging * @@ -216,58 +221,39 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst */ @Transactional @Override - public Result forceTaskSuccess(User loginUser, long projectCode, Integer taskInstanceId) { - Result result = new Result(); - Project project = projectMapper.queryByCode(projectCode); + public void forceTaskSuccess(User loginUser, long projectCode, Integer taskInstanceId) { // check user access for project - Map checkResult = - projectService.checkProjectAndAuth(loginUser, project, projectCode, FORCED_SUCCESS); - Status status = (Status) checkResult.get(Constants.STATUS); - if (status != Status.SUCCESS) { - putMsg(result, status); - return result; - } + projectService.checkProjectAndAuthThrowException(loginUser, projectCode, FORCED_SUCCESS); - // check whether the task instance can be found - TaskInstance task = taskInstanceMapper.selectById(taskInstanceId); - if (task == null) { - log.error("Task instance can not be found, projectCode:{}, taskInstanceId:{}.", projectCode, - taskInstanceId); - putMsg(result, Status.TASK_INSTANCE_NOT_FOUND); - return result; + TaskInstance task = taskInstanceDao.queryOptionalById(taskInstanceId) + .orElseThrow(() -> new ServiceException(Status.TASK_INSTANCE_NOT_FOUND)); + + if (task.getProjectCode() != projectCode) { + throw new ServiceException("The task instance is not under the project: " + projectCode); } - TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode()); - if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) { - log.error("Task definition can not be found, projectCode:{}, taskDefinitionCode:{}.", projectCode, - task.getTaskCode()); - putMsg(result, Status.TASK_INSTANCE_NOT_FOUND, taskInstanceId); - return result; + ProcessInstance processInstance = workflowInstanceDao.queryOptionalById(task.getProcessInstanceId()) + .orElseThrow( + () -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, task.getProcessInstanceId())); + if (!processInstance.getState().isFinished()) { + throw new ServiceException("The workflow instance is not finished: " + processInstance.getState() + + " cannot force start task instance"); } // check whether the task instance state type is failure or cancel if (!task.getState().isFailure() && !task.getState().isKill()) { - log.warn("{} type task instance can not perform force success, projectCode:{}, taskInstanceId:{}.", - task.getState().getDesc(), projectCode, taskInstanceId); - putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString()); - return result; + throw new ServiceException(Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState()); } // change the state of the task instance task.setState(TaskExecutionStatus.FORCED_SUCCESS); task.setEndTime(new Date()); int changedNum = taskInstanceMapper.updateById(task); - if (changedNum > 0) { - processService.forceProcessInstanceSuccessByTaskInstanceId(taskInstanceId); - log.info("Task instance performs force success complete, projectCode:{}, taskInstanceId:{}", projectCode, - taskInstanceId); - putMsg(result, Status.SUCCESS); - } else { - log.error("Task instance performs force success complete, projectCode:{}, taskInstanceId:{}", - projectCode, taskInstanceId); - putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR); + if (changedNum <= 0) { + throw new ServiceException(Status.FORCE_TASK_SUCCESS_ERROR); } - return result; + processService.forceProcessInstanceSuccessByTaskInstanceId(task); + log.info("Force success task instance:{} success", taskInstanceId); } @Override diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/AssertionsHelper.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/AssertionsHelper.java index d2da5bc638..eae064bb24 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/AssertionsHelper.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/AssertionsHelper.java @@ -20,6 +20,8 @@ package org.apache.dolphinscheduler.api; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; +import java.text.MessageFormat; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.function.Executable; @@ -30,6 +32,12 @@ public class AssertionsHelper extends Assertions { Assertions.assertEquals(status.getCode(), exception.getCode()); } + public static void assertThrowsServiceException(String message, Executable executable) { + ServiceException exception = Assertions.assertThrows(ServiceException.class, executable); + Assertions.assertEquals(MessageFormat.format(Status.INTERNAL_SERVER_ERROR_ARGS.getMsg(), message), + exception.getMessage()); + } + public static void assertDoesNotThrow(Executable executable) { Assertions.assertDoesNotThrow(executable); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java index 7ebe5bf757..b58944537b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java @@ -82,9 +82,7 @@ public class TaskInstanceControllerTest extends AbstractControllerTest { MultiValueMap paramsMap = new LinkedMultiValueMap<>(); paramsMap.add("taskInstanceId", "104"); - Result mockResult = new Result(); - putMsg(mockResult, Status.SUCCESS); - when(taskInstanceService.forceTaskSuccess(any(User.class), anyLong(), anyInt())).thenReturn(mockResult); + Mockito.doNothing().when(taskInstanceService).forceTaskSuccess(any(User.class), anyLong(), anyInt()); MvcResult mvcResult = mockMvc.perform(post("/projects/{projectName}/task-instance/force-success", "cxc_1113") .header(SESSION_ID, sessionId) diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2ControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2ControllerTest.java index cc70f200e1..8e76ec1694 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2ControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2ControllerTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.when; import org.apache.dolphinscheduler.api.controller.AbstractControllerTest; import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceQueryRequest; +import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceSuccessResponse; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.TaskInstanceService; import org.apache.dolphinscheduler.api.utils.PageInfo; @@ -85,12 +86,9 @@ public class TaskInstanceV2ControllerTest extends AbstractControllerTest { @Test public void testForceTaskSuccess() { - Result mockResult = new Result(); - putMsg(mockResult, Status.SUCCESS); - - when(taskInstanceService.forceTaskSuccess(any(), Mockito.anyLong(), Mockito.anyInt())).thenReturn(mockResult); + Mockito.doNothing().when(taskInstanceService).forceTaskSuccess(any(), Mockito.anyLong(), Mockito.anyInt()); - Result taskResult = taskInstanceV2Controller.forceTaskSuccess(null, 1L, 1); + TaskInstanceSuccessResponse taskResult = taskInstanceV2Controller.forceTaskSuccess(null, 1L, 1); Assertions.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode()); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java index aca0d80a6f..dd7acb16d5 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.api.service; +import static org.apache.dolphinscheduler.api.AssertionsHelper.assertThrowsServiceException; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE; import static org.mockito.ArgumentMatchers.any; @@ -25,7 +26,6 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.when; -import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; @@ -35,15 +35,16 @@ import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.TaskExecuteType; import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -65,7 +66,6 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import org.springframework.boot.test.context.SpringBootTest; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -74,7 +74,6 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page; */ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) -@SpringBootTest(classes = ApiApplicationServer.class) public class TaskInstanceServiceTest { @InjectMocks @@ -100,6 +99,8 @@ public class TaskInstanceServiceTest { @Mock TaskInstanceDao taskInstanceDao; + @Mock + ProcessInstanceDao workflowInstanceDao; @Test public void queryTaskListPaging() { @@ -324,6 +325,7 @@ public class TaskInstanceServiceTest { private TaskInstance getTaskInstance() { TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1); + taskInstance.setProjectCode(1L); taskInstance.setName("test_task_instance"); taskInstance.setStartTime(new Date()); taskInstance.setEndTime(new Date()); @@ -343,64 +345,69 @@ public class TaskInstanceServiceTest { } @Test - public void testForceTaskSuccess() { + public void testForceTaskSuccess_withNoPermission() { + User user = getAdminUser(); + TaskInstance task = getTaskInstance(); + doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM)).when(projectService) + .checkProjectAndAuthThrowException(user, task.getProjectCode(), FORCED_SUCCESS); + assertThrowsServiceException(Status.USER_NO_OPERATION_PROJECT_PERM, + () -> taskInstanceService.forceTaskSuccess(user, task.getProjectCode(), task.getId())); + } + + @Test + public void testForceTaskSuccess_withTaskInstanceNotFound() { + User user = getAdminUser(); + TaskInstance task = getTaskInstance(); + doNothing().when(projectService).checkProjectAndAuthThrowException(user, task.getProjectCode(), FORCED_SUCCESS); + when(taskInstanceDao.queryOptionalById(task.getId())).thenReturn(Optional.empty()); + assertThrowsServiceException(Status.TASK_INSTANCE_NOT_FOUND, + () -> taskInstanceService.forceTaskSuccess(user, task.getProjectCode(), task.getId())); + } + + @Test + public void testForceTaskSuccess_withWorkflowInstanceNotFound() { + User user = getAdminUser(); + TaskInstance task = getTaskInstance(); + doNothing().when(projectService).checkProjectAndAuthThrowException(user, task.getProjectCode(), FORCED_SUCCESS); + when(taskInstanceDao.queryOptionalById(task.getId())).thenReturn(Optional.of(task)); + when(workflowInstanceDao.queryOptionalById(task.getProcessInstanceId())).thenReturn(Optional.empty()); + + assertThrowsServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, + () -> taskInstanceService.forceTaskSuccess(user, task.getProjectCode(), task.getId())); + } + + @Test + public void testForceTaskSuccess_withWorkflowInstanceNotFinished() { User user = getAdminUser(); long projectCode = 1L; - Project project = getProject(projectCode); - int taskId = 1; TaskInstance task = getTaskInstance(); + ProcessInstance processInstance = getProcessInstance(); + processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); + doNothing().when(projectService).checkProjectAndAuthThrowException(user, projectCode, FORCED_SUCCESS); + when(taskInstanceDao.queryOptionalById(task.getId())).thenReturn(Optional.of(task)); + when(workflowInstanceDao.queryOptionalById(task.getProcessInstanceId())) + .thenReturn(Optional.of(processInstance)); - Map mockSuccess = new HashMap<>(5); - putMsg(mockSuccess, Status.SUCCESS); - when(projectMapper.queryByCode(projectCode)).thenReturn(project); + assertThrowsServiceException( + "The workflow instance is not finished: " + processInstance.getState() + + " cannot force start task instance", + () -> taskInstanceService.forceTaskSuccess(user, projectCode, task.getId())); + } - // user auth failed - Map mockFailure = new HashMap<>(5); - putMsg(mockFailure, Status.USER_NO_OPERATION_PROJECT_PERM, user.getUserName(), projectCode); - when(projectService.checkProjectAndAuth(user, project, projectCode, FORCED_SUCCESS)).thenReturn(mockFailure); - Result authFailRes = taskInstanceService.forceTaskSuccess(user, projectCode, taskId); - Assertions.assertNotSame(Status.SUCCESS.getCode(), authFailRes.getCode()); - - // test task not found - when(projectService.checkProjectAndAuth(user, project, projectCode, FORCED_SUCCESS)).thenReturn(mockSuccess); - when(taskInstanceMapper.selectById(Mockito.anyInt())).thenReturn(null); - TaskDefinition taskDefinition = new TaskDefinition(); - taskDefinition.setProjectCode(projectCode); - when(taskDefinitionMapper.queryByCode(task.getTaskCode())).thenReturn(taskDefinition); - Result taskNotFoundRes = taskInstanceService.forceTaskSuccess(user, projectCode, taskId); - Assertions.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), taskNotFoundRes.getCode().intValue()); - - // test task instance state error - task.setState(TaskExecutionStatus.SUCCESS); - when(taskInstanceMapper.selectById(1)).thenReturn(task); - Map result = new HashMap<>(); - putMsg(result, Status.SUCCESS, projectCode); - when(projectMapper.queryByCode(projectCode)).thenReturn(project); - when(projectService.checkProjectAndAuth(user, project, projectCode, FORCED_SUCCESS)).thenReturn(result); - Result taskStateErrorRes = taskInstanceService.forceTaskSuccess(user, projectCode, taskId); - Assertions.assertEquals(Status.TASK_INSTANCE_STATE_OPERATION_ERROR.getCode(), - taskStateErrorRes.getCode().intValue()); - - // test error - task.setState(TaskExecutionStatus.FAILURE); - when(taskInstanceMapper.updateById(task)).thenReturn(0); - putMsg(result, Status.SUCCESS, projectCode); - when(projectMapper.queryByCode(projectCode)).thenReturn(project); - when(projectService.checkProjectAndAuth(user, project, projectCode, FORCED_SUCCESS)).thenReturn(result); - Result errorRes = taskInstanceService.forceTaskSuccess(user, projectCode, taskId); - Assertions.assertEquals(Status.FORCE_TASK_SUCCESS_ERROR.getCode(), errorRes.getCode().intValue()); - - // test success - task.setState(TaskExecutionStatus.FAILURE); - task.setEndTime(null); - when(taskInstanceMapper.updateById(task)).thenReturn(1); - putMsg(result, Status.SUCCESS, projectCode); - when(projectMapper.queryByCode(projectCode)).thenReturn(project); - when(projectService.checkProjectAndAuth(user, project, projectCode, FORCED_SUCCESS)).thenReturn(result); - Result successRes = taskInstanceService.forceTaskSuccess(user, projectCode, taskId); - Assertions.assertEquals(Status.SUCCESS.getCode(), successRes.getCode().intValue()); - Assertions.assertNotNull(task.getEndTime()); + @Test + public void testForceTaskSuccess_withTaskInstanceNotFinished() { + User user = getAdminUser(); + TaskInstance task = getTaskInstance(); + ProcessInstance processInstance = getProcessInstance(); + processInstance.setState(WorkflowExecutionStatus.FAILURE); + doNothing().when(projectService).checkProjectAndAuthThrowException(user, task.getProjectCode(), FORCED_SUCCESS); + when(taskInstanceDao.queryOptionalById(task.getId())).thenReturn(Optional.of(task)); + when(workflowInstanceDao.queryOptionalById(task.getProcessInstanceId())) + .thenReturn(Optional.of(processInstance)); + assertThrowsServiceException( + Status.TASK_INSTANCE_STATE_OPERATION_ERROR, + () -> taskInstanceService.forceTaskSuccess(user, task.getProjectCode(), task.getId())); } @Test diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 101350e90d..8787aabc8b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -189,7 +189,7 @@ public interface ProcessService { public String findConfigYamlByName(String clusterName); - void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId); + void forceProcessInstanceSuccessByTaskInstanceId(TaskInstance taskInstance); void saveCommandTrigger(Integer commandId, Integer processInstanceId); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 028ab7651f..3c207ae982 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -276,6 +276,7 @@ public class ProcessServiceImpl implements ProcessService { @Autowired private TriggerRelationService triggerRelationService; + /** * todo: split this method * handle Command (construct ProcessInstance from Command) , wrapped in transaction @@ -621,13 +622,13 @@ public class ProcessServiceImpl implements ProcessService { /** * Get workflow runtime tenant - * + *

* the workflow provides a tenant and uses the provided tenant; * when no tenant is provided or the provided tenant is the default tenant, \ * the user's tenant created by the workflow is used * * @param tenantCode tenantCode - * @param userId userId + * @param userId userId * @return tenant code */ @Override @@ -2114,11 +2115,7 @@ public class ProcessServiceImpl implements ProcessService { } @Override - public void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId) { - TaskInstance task = taskInstanceMapper.selectById(taskInstanceId); - if (task == null) { - return; - } + public void forceProcessInstanceSuccessByTaskInstanceId(TaskInstance task) { ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId()).orElse(null); if (processInstance != null && (processInstance.getState().isFailure() || processInstance.getState().isStop())) { @@ -2139,7 +2136,7 @@ public class ProcessServiceImpl implements ProcessService { List failTaskList = validTaskList.stream() .filter(instance -> instance.getState().isFailure() || instance.getState().isKill()) .map(TaskInstance::getId).collect(Collectors.toList()); - if (failTaskList.size() == 1 && failTaskList.contains(taskInstanceId)) { + if (failTaskList.size() == 1 && failTaskList.contains(task.getId())) { processInstance.setStateWithDesc(WorkflowExecutionStatus.SUCCESS, "success by task force success"); processInstanceDao.updateById(processInstance); }