Browse Source

Forbidden forcess success a task instance in a running workflow instance (#15855)

3.2.2-release-bak
Wenjun Ruan 8 months ago committed by GitHub
parent
commit
76d059810a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java
  2. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2Controller.java
  3. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
  4. 62
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
  5. 8
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/AssertionsHelper.java
  6. 4
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java
  7. 8
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2ControllerTest.java
  8. 119
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
  9. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  10. 13
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java

@ -153,10 +153,11 @@ public class TaskInstanceController extends BaseController {
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
@ApiException(FORCE_TASK_SUCCESS_ERROR) @ApiException(FORCE_TASK_SUCCESS_ERROR)
@OperatorLog(auditType = AuditType.TASK_INSTANCE_FORCE_SUCCESS) @OperatorLog(auditType = AuditType.TASK_INSTANCE_FORCE_SUCCESS)
public Result forceTaskSuccess(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, public Result<Void> forceTaskSuccess(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Schema(name = "projectCode", required = true) @PathVariable long projectCode, @Schema(name = "projectCode", required = true) @PathVariable long projectCode,
@PathVariable(value = "id") Integer id) { @PathVariable(value = "id") Integer id) {
return taskInstanceService.forceTaskSuccess(loginUser, projectCode, id); taskInstanceService.forceTaskSuccess(loginUser, projectCode, id);
return Result.success();
} }
/** /**

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2Controller.java

@ -167,8 +167,8 @@ public class TaskInstanceV2Controller extends BaseController {
public TaskInstanceSuccessResponse forceTaskSuccess(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, public TaskInstanceSuccessResponse forceTaskSuccess(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "id") Integer id) { @PathVariable(value = "id") Integer id) {
Result result = taskInstanceService.forceTaskSuccess(loginUser, projectCode, id); taskInstanceService.forceTaskSuccess(loginUser, projectCode, id);
return new TaskInstanceSuccessResponse(result); return new TaskInstanceSuccessResponse(Result.success());
} }
/** /**

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java

@ -72,9 +72,9 @@ public interface TaskInstanceService {
* @param taskInstanceId task instance id * @param taskInstanceId task instance id
* @return the result code and msg * @return the result code and msg
*/ */
Result forceTaskSuccess(User loginUser, void forceTaskSuccess(User loginUser,
long projectCode, long projectCode,
Integer taskInstanceId); Integer taskInstanceId);
/** /**
* task savepoint * task savepoint

62
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java

@ -23,6 +23,7 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse; import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskGroupQueueService; import org.apache.dolphinscheduler.api.service.TaskGroupQueueService;
import org.apache.dolphinscheduler.api.service.TaskInstanceService; import org.apache.dolphinscheduler.api.service.TaskInstanceService;
@ -33,14 +34,15 @@ import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType; import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.DqExecuteResultDao; import org.apache.dolphinscheduler.dao.repository.DqExecuteResultDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils; import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
@ -107,6 +109,9 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
@Autowired @Autowired
private TaskGroupQueueService taskGroupQueueService; private TaskGroupQueueService taskGroupQueueService;
@Autowired
private ProcessInstanceDao workflowInstanceDao;
/** /**
* query task list by project, process instance, task name, task start time, task end time, task status, keyword paging * query task list by project, process instance, task name, task start time, task end time, task status, keyword paging
* *
@ -216,58 +221,39 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
*/ */
@Transactional @Transactional
@Override @Override
public Result forceTaskSuccess(User loginUser, long projectCode, Integer taskInstanceId) { public void forceTaskSuccess(User loginUser, long projectCode, Integer taskInstanceId) {
Result result = new Result();
Project project = projectMapper.queryByCode(projectCode);
// check user access for project // check user access for project
Map<String, Object> checkResult = projectService.checkProjectAndAuthThrowException(loginUser, projectCode, FORCED_SUCCESS);
projectService.checkProjectAndAuth(loginUser, project, projectCode, FORCED_SUCCESS);
Status status = (Status) checkResult.get(Constants.STATUS);
if (status != Status.SUCCESS) {
putMsg(result, status);
return result;
}
// check whether the task instance can be found TaskInstance task = taskInstanceDao.queryOptionalById(taskInstanceId)
TaskInstance task = taskInstanceMapper.selectById(taskInstanceId); .orElseThrow(() -> new ServiceException(Status.TASK_INSTANCE_NOT_FOUND));
if (task == null) {
log.error("Task instance can not be found, projectCode:{}, taskInstanceId:{}.", projectCode, if (task.getProjectCode() != projectCode) {
taskInstanceId); throw new ServiceException("The task instance is not under the project: " + projectCode);
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return result;
} }
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode()); ProcessInstance processInstance = workflowInstanceDao.queryOptionalById(task.getProcessInstanceId())
if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) { .orElseThrow(
log.error("Task definition can not be found, projectCode:{}, taskDefinitionCode:{}.", projectCode, () -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, task.getProcessInstanceId()));
task.getTaskCode()); if (!processInstance.getState().isFinished()) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND, taskInstanceId); throw new ServiceException("The workflow instance is not finished: " + processInstance.getState()
return result; + " cannot force start task instance");
} }
// check whether the task instance state type is failure or cancel // check whether the task instance state type is failure or cancel
if (!task.getState().isFailure() && !task.getState().isKill()) { if (!task.getState().isFailure() && !task.getState().isKill()) {
log.warn("{} type task instance can not perform force success, projectCode:{}, taskInstanceId:{}.", throw new ServiceException(Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState());
task.getState().getDesc(), projectCode, taskInstanceId);
putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString());
return result;
} }
// change the state of the task instance // change the state of the task instance
task.setState(TaskExecutionStatus.FORCED_SUCCESS); task.setState(TaskExecutionStatus.FORCED_SUCCESS);
task.setEndTime(new Date()); task.setEndTime(new Date());
int changedNum = taskInstanceMapper.updateById(task); int changedNum = taskInstanceMapper.updateById(task);
if (changedNum > 0) { if (changedNum <= 0) {
processService.forceProcessInstanceSuccessByTaskInstanceId(taskInstanceId); throw new ServiceException(Status.FORCE_TASK_SUCCESS_ERROR);
log.info("Task instance performs force success complete, projectCode:{}, taskInstanceId:{}", projectCode,
taskInstanceId);
putMsg(result, Status.SUCCESS);
} else {
log.error("Task instance performs force success complete, projectCode:{}, taskInstanceId:{}",
projectCode, taskInstanceId);
putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR);
} }
return result; processService.forceProcessInstanceSuccessByTaskInstanceId(task);
log.info("Force success task instance:{} success", taskInstanceId);
} }
@Override @Override

8
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/AssertionsHelper.java

@ -20,6 +20,8 @@ package org.apache.dolphinscheduler.api;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import java.text.MessageFormat;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.function.Executable; import org.junit.jupiter.api.function.Executable;
@ -30,6 +32,12 @@ public class AssertionsHelper extends Assertions {
Assertions.assertEquals(status.getCode(), exception.getCode()); Assertions.assertEquals(status.getCode(), exception.getCode());
} }
public static void assertThrowsServiceException(String message, Executable executable) {
ServiceException exception = Assertions.assertThrows(ServiceException.class, executable);
Assertions.assertEquals(MessageFormat.format(Status.INTERNAL_SERVER_ERROR_ARGS.getMsg(), message),
exception.getMessage());
}
public static void assertDoesNotThrow(Executable executable) { public static void assertDoesNotThrow(Executable executable) {
Assertions.assertDoesNotThrow(executable); Assertions.assertDoesNotThrow(executable);
} }

4
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java

@ -82,9 +82,7 @@ public class TaskInstanceControllerTest extends AbstractControllerTest {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>(); MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("taskInstanceId", "104"); paramsMap.add("taskInstanceId", "104");
Result mockResult = new Result(); Mockito.doNothing().when(taskInstanceService).forceTaskSuccess(any(User.class), anyLong(), anyInt());
putMsg(mockResult, Status.SUCCESS);
when(taskInstanceService.forceTaskSuccess(any(User.class), anyLong(), anyInt())).thenReturn(mockResult);
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectName}/task-instance/force-success", "cxc_1113") MvcResult mvcResult = mockMvc.perform(post("/projects/{projectName}/task-instance/force-success", "cxc_1113")
.header(SESSION_ID, sessionId) .header(SESSION_ID, sessionId)

8
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/v2/TaskInstanceV2ControllerTest.java

@ -23,6 +23,7 @@ import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.controller.AbstractControllerTest; import org.apache.dolphinscheduler.api.controller.AbstractControllerTest;
import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceQueryRequest; import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceQueryRequest;
import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceSuccessResponse;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.TaskInstanceService; import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.PageInfo;
@ -85,12 +86,9 @@ public class TaskInstanceV2ControllerTest extends AbstractControllerTest {
@Test @Test
public void testForceTaskSuccess() { public void testForceTaskSuccess() {
Result mockResult = new Result(); Mockito.doNothing().when(taskInstanceService).forceTaskSuccess(any(), Mockito.anyLong(), Mockito.anyInt());
putMsg(mockResult, Status.SUCCESS);
when(taskInstanceService.forceTaskSuccess(any(), Mockito.anyLong(), Mockito.anyInt())).thenReturn(mockResult);
Result taskResult = taskInstanceV2Controller.forceTaskSuccess(null, 1L, 1); TaskInstanceSuccessResponse taskResult = taskInstanceV2Controller.forceTaskSuccess(null, 1L, 1);
Assertions.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode()); Assertions.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode());
} }

119
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import static org.apache.dolphinscheduler.api.AssertionsHelper.assertThrowsServiceException;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
@ -25,7 +26,6 @@ import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse; import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.exceptions.ServiceException;
@ -35,15 +35,16 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType; import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -65,7 +66,6 @@ import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness; import org.mockito.quality.Strictness;
import org.springframework.boot.test.context.SpringBootTest;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@ -74,7 +74,6 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
*/ */
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT) @MockitoSettings(strictness = Strictness.LENIENT)
@SpringBootTest(classes = ApiApplicationServer.class)
public class TaskInstanceServiceTest { public class TaskInstanceServiceTest {
@InjectMocks @InjectMocks
@ -100,6 +99,8 @@ public class TaskInstanceServiceTest {
@Mock @Mock
TaskInstanceDao taskInstanceDao; TaskInstanceDao taskInstanceDao;
@Mock
ProcessInstanceDao workflowInstanceDao;
@Test @Test
public void queryTaskListPaging() { public void queryTaskListPaging() {
@ -324,6 +325,7 @@ public class TaskInstanceServiceTest {
private TaskInstance getTaskInstance() { private TaskInstance getTaskInstance() {
TaskInstance taskInstance = new TaskInstance(); TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1); taskInstance.setId(1);
taskInstance.setProjectCode(1L);
taskInstance.setName("test_task_instance"); taskInstance.setName("test_task_instance");
taskInstance.setStartTime(new Date()); taskInstance.setStartTime(new Date());
taskInstance.setEndTime(new Date()); taskInstance.setEndTime(new Date());
@ -343,64 +345,69 @@ public class TaskInstanceServiceTest {
} }
@Test @Test
public void testForceTaskSuccess() { public void testForceTaskSuccess_withNoPermission() {
User user = getAdminUser();
TaskInstance task = getTaskInstance();
doThrow(new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM)).when(projectService)
.checkProjectAndAuthThrowException(user, task.getProjectCode(), FORCED_SUCCESS);
assertThrowsServiceException(Status.USER_NO_OPERATION_PROJECT_PERM,
() -> taskInstanceService.forceTaskSuccess(user, task.getProjectCode(), task.getId()));
}
@Test
public void testForceTaskSuccess_withTaskInstanceNotFound() {
User user = getAdminUser();
TaskInstance task = getTaskInstance();
doNothing().when(projectService).checkProjectAndAuthThrowException(user, task.getProjectCode(), FORCED_SUCCESS);
when(taskInstanceDao.queryOptionalById(task.getId())).thenReturn(Optional.empty());
assertThrowsServiceException(Status.TASK_INSTANCE_NOT_FOUND,
() -> taskInstanceService.forceTaskSuccess(user, task.getProjectCode(), task.getId()));
}
@Test
public void testForceTaskSuccess_withWorkflowInstanceNotFound() {
User user = getAdminUser();
TaskInstance task = getTaskInstance();
doNothing().when(projectService).checkProjectAndAuthThrowException(user, task.getProjectCode(), FORCED_SUCCESS);
when(taskInstanceDao.queryOptionalById(task.getId())).thenReturn(Optional.of(task));
when(workflowInstanceDao.queryOptionalById(task.getProcessInstanceId())).thenReturn(Optional.empty());
assertThrowsServiceException(Status.PROCESS_INSTANCE_NOT_EXIST,
() -> taskInstanceService.forceTaskSuccess(user, task.getProjectCode(), task.getId()));
}
@Test
public void testForceTaskSuccess_withWorkflowInstanceNotFinished() {
User user = getAdminUser(); User user = getAdminUser();
long projectCode = 1L; long projectCode = 1L;
Project project = getProject(projectCode);
int taskId = 1;
TaskInstance task = getTaskInstance(); TaskInstance task = getTaskInstance();
ProcessInstance processInstance = getProcessInstance();
processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
doNothing().when(projectService).checkProjectAndAuthThrowException(user, projectCode, FORCED_SUCCESS);
when(taskInstanceDao.queryOptionalById(task.getId())).thenReturn(Optional.of(task));
when(workflowInstanceDao.queryOptionalById(task.getProcessInstanceId()))
.thenReturn(Optional.of(processInstance));
Map<String, Object> mockSuccess = new HashMap<>(5); assertThrowsServiceException(
putMsg(mockSuccess, Status.SUCCESS); "The workflow instance is not finished: " + processInstance.getState()
when(projectMapper.queryByCode(projectCode)).thenReturn(project); + " cannot force start task instance",
() -> taskInstanceService.forceTaskSuccess(user, projectCode, task.getId()));
}
// user auth failed @Test
Map<String, Object> mockFailure = new HashMap<>(5); public void testForceTaskSuccess_withTaskInstanceNotFinished() {
putMsg(mockFailure, Status.USER_NO_OPERATION_PROJECT_PERM, user.getUserName(), projectCode); User user = getAdminUser();
when(projectService.checkProjectAndAuth(user, project, projectCode, FORCED_SUCCESS)).thenReturn(mockFailure); TaskInstance task = getTaskInstance();
Result authFailRes = taskInstanceService.forceTaskSuccess(user, projectCode, taskId); ProcessInstance processInstance = getProcessInstance();
Assertions.assertNotSame(Status.SUCCESS.getCode(), authFailRes.getCode()); processInstance.setState(WorkflowExecutionStatus.FAILURE);
doNothing().when(projectService).checkProjectAndAuthThrowException(user, task.getProjectCode(), FORCED_SUCCESS);
// test task not found when(taskInstanceDao.queryOptionalById(task.getId())).thenReturn(Optional.of(task));
when(projectService.checkProjectAndAuth(user, project, projectCode, FORCED_SUCCESS)).thenReturn(mockSuccess); when(workflowInstanceDao.queryOptionalById(task.getProcessInstanceId()))
when(taskInstanceMapper.selectById(Mockito.anyInt())).thenReturn(null); .thenReturn(Optional.of(processInstance));
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
when(taskDefinitionMapper.queryByCode(task.getTaskCode())).thenReturn(taskDefinition);
Result taskNotFoundRes = taskInstanceService.forceTaskSuccess(user, projectCode, taskId);
Assertions.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), taskNotFoundRes.getCode().intValue());
// test task instance state error
task.setState(TaskExecutionStatus.SUCCESS);
when(taskInstanceMapper.selectById(1)).thenReturn(task);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(user, project, projectCode, FORCED_SUCCESS)).thenReturn(result);
Result taskStateErrorRes = taskInstanceService.forceTaskSuccess(user, projectCode, taskId);
Assertions.assertEquals(Status.TASK_INSTANCE_STATE_OPERATION_ERROR.getCode(),
taskStateErrorRes.getCode().intValue());
// test error
task.setState(TaskExecutionStatus.FAILURE);
when(taskInstanceMapper.updateById(task)).thenReturn(0);
putMsg(result, Status.SUCCESS, projectCode);
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(user, project, projectCode, FORCED_SUCCESS)).thenReturn(result);
Result errorRes = taskInstanceService.forceTaskSuccess(user, projectCode, taskId);
Assertions.assertEquals(Status.FORCE_TASK_SUCCESS_ERROR.getCode(), errorRes.getCode().intValue());
// test success
task.setState(TaskExecutionStatus.FAILURE);
task.setEndTime(null);
when(taskInstanceMapper.updateById(task)).thenReturn(1);
putMsg(result, Status.SUCCESS, projectCode);
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(user, project, projectCode, FORCED_SUCCESS)).thenReturn(result);
Result successRes = taskInstanceService.forceTaskSuccess(user, projectCode, taskId);
Assertions.assertEquals(Status.SUCCESS.getCode(), successRes.getCode().intValue());
Assertions.assertNotNull(task.getEndTime());
assertThrowsServiceException(
Status.TASK_INSTANCE_STATE_OPERATION_ERROR,
() -> taskInstanceService.forceTaskSuccess(user, task.getProjectCode(), task.getId()));
} }
@Test @Test

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -189,7 +189,7 @@ public interface ProcessService {
public String findConfigYamlByName(String clusterName); public String findConfigYamlByName(String clusterName);
void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId); void forceProcessInstanceSuccessByTaskInstanceId(TaskInstance taskInstance);
void saveCommandTrigger(Integer commandId, Integer processInstanceId); void saveCommandTrigger(Integer commandId, Integer processInstanceId);

13
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -276,6 +276,7 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired @Autowired
private TriggerRelationService triggerRelationService; private TriggerRelationService triggerRelationService;
/** /**
* todo: split this method * todo: split this method
* handle Command (construct ProcessInstance from Command) , wrapped in transaction * handle Command (construct ProcessInstance from Command) , wrapped in transaction
@ -621,13 +622,13 @@ public class ProcessServiceImpl implements ProcessService {
/** /**
* Get workflow runtime tenant * Get workflow runtime tenant
* * <p>
* the workflow provides a tenant and uses the provided tenant; * the workflow provides a tenant and uses the provided tenant;
* when no tenant is provided or the provided tenant is the default tenant, \ * when no tenant is provided or the provided tenant is the default tenant, \
* the user's tenant created by the workflow is used * the user's tenant created by the workflow is used
* *
* @param tenantCode tenantCode * @param tenantCode tenantCode
* @param userId userId * @param userId userId
* @return tenant code * @return tenant code
*/ */
@Override @Override
@ -2114,11 +2115,7 @@ public class ProcessServiceImpl implements ProcessService {
} }
@Override @Override
public void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId) { public void forceProcessInstanceSuccessByTaskInstanceId(TaskInstance task) {
TaskInstance task = taskInstanceMapper.selectById(taskInstanceId);
if (task == null) {
return;
}
ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId()).orElse(null); ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId()).orElse(null);
if (processInstance != null if (processInstance != null
&& (processInstance.getState().isFailure() || processInstance.getState().isStop())) { && (processInstance.getState().isFailure() || processInstance.getState().isStop())) {
@ -2139,7 +2136,7 @@ public class ProcessServiceImpl implements ProcessService {
List<Integer> failTaskList = validTaskList.stream() List<Integer> failTaskList = validTaskList.stream()
.filter(instance -> instance.getState().isFailure() || instance.getState().isKill()) .filter(instance -> instance.getState().isFailure() || instance.getState().isKill())
.map(TaskInstance::getId).collect(Collectors.toList()); .map(TaskInstance::getId).collect(Collectors.toList());
if (failTaskList.size() == 1 && failTaskList.contains(taskInstanceId)) { if (failTaskList.size() == 1 && failTaskList.contains(task.getId())) {
processInstance.setStateWithDesc(WorkflowExecutionStatus.SUCCESS, "success by task force success"); processInstance.setStateWithDesc(WorkflowExecutionStatus.SUCCESS, "success by task force success");
processInstanceDao.updateById(processInstance); processInstanceDao.updateById(processInstance);
} }

Loading…
Cancel
Save