diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java index 56e7ef2087..f80e536ea8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.api.controller; +import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR; import org.apache.dolphinscheduler.api.exceptions.ApiException; @@ -36,6 +37,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestAttribute; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -127,4 +129,30 @@ public class TaskInstanceController extends BaseController { return returnDataListPaging(result); } + /** + * change one single task instance's state from FAILURE to FORCED_SUCCESS + * + * @param loginUser login user + * @param projectName project name + * @param taskInstanceId task instance id + * @return the result code and msg + */ + @ApiOperation(value = "force-success", notes = "FORCE_SINGLE_TASK_SUCCESS") + @ApiImplicitParams({ + @ApiImplicitParam(name = "taskInstanceId", value = "TASK_INSTANCE_ID", required = true, dataType = "Int", example = "12") + }) + @PostMapping(value = "/force-success") + @ResponseStatus(HttpStatus.OK) + @ApiException(FORCE_TASK_SUCCESS_ERROR) + public Result forceSingleTaskSuccess(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, + @RequestParam(value = "taskInstanceId") Integer taskInstanceId) { + String userNameReplace = StringUtils.replaceNRTtoUnderline(loginUser.getUserName()); + String projectNameReplace = StringUtils.replaceNRTtoUnderline(projectName); + logger.info("force task success, login user: {}, project:{}, task instance id:{}", + userNameReplace, projectNameReplace, taskInstanceId); + Map result = taskInstanceService.forceSingleTaskSuccess(loginUser, projectName, taskInstanceId); + return returnDataList(result); + } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java index 6f42ba9f56..41c7abba85 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java @@ -57,6 +57,16 @@ public class TaskCountDto { .sum(); } + // remove the specified state + public void removeStateFromCountList(ExecutionStatus status) { + for (TaskStateCount count : this.taskCountDtos) { + if (count.getTaskStateType().equals(status)) { + this.taskCountDtos.remove(count); + break; + } + } + } + public List getTaskCountDtos() { return taskCountDtos; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java index cc1797295a..3076f9fcd0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java @@ -29,8 +29,9 @@ public enum ExecuteType { * 3 resume failure * 4 stop * 5 pause + * 6 resume from forced success */ - NONE,REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE; + NONE, REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE, RESUME_FROM_FORCED_SUCCESS; public static ExecuteType getEnum(int value){ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 6e23808c1f..93db6c9570 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -196,7 +196,9 @@ public enum Status { QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_ERROR(10162, "query authorized and user created project error error", "查询授权的和用户创建的项目错误"), DELETE_PROCESS_DEFINITION_BY_ID_FAIL(10163,"delete process definition by id fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"), CHECK_TENANT_CODE_ERROR(10164, "Please enter the English tenant code", "请输入英文租户编码"), - + FORCE_TASK_SUCCESS_ERROR(10165, "force task success error", "强制成功任务实例错误"), + TASK_INSTANCE_STATE_OPERATION_ERROR(10166, "the status of task instance {0} is {1},Cannot perform force success operation", "任务实例[{0}]的状态是[{1}],无法执行强制成功操作"), + UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), RESOURCE_NOT_EXIST(20004, "resource not exist", "资源不存在"), @@ -247,8 +249,9 @@ public enum Status { BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR(50026, "batch delete process definition by ids {0} error", "批量删除工作流定义[{0}]错误"), TENANT_NOT_SUITABLE(50027, "there is not any tenant suitable, please choose a tenant available.", "没有合适的租户,请选择可用的租户"), EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"), - BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028, "batch export process definition by ids error", "批量导出工作流定义错误"), + BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028,"batch export process definition by ids error", "批量导出工作流定义错误"), IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"), + NO_VALID_FORCED_SUCCESS_TASK(50030, "there is no valid forced success node in process instance {0}", "工作流实例[{0}]中不包含有效的强制成功的任务实例"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 7a0fd0f845..1f67eedbd9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.RunMode; import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.CollectionUtils; @@ -283,6 +284,13 @@ public class ExecutorService extends BaseService { result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE); } break; + case RESUME_FROM_FORCED_SUCCESS: + if (!this.checkValidForcedSuccessTask(processInstanceId)) { + putMsg(result, Status.NO_VALID_FORCED_SUCCESS_TASK, processInstance.getName()); + } else { + result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.RESUME_FROM_FORCED_SUCCESS); + } + break; default: logger.error("unknown execute type : {}", executeType); putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type"); @@ -330,6 +338,7 @@ public class ExecutorService extends BaseService { } break; case START_FAILURE_TASK_PROCESS: + case RESUME_FROM_FORCED_SUCCESS: if (executionStatus.typeIsFailure()) { checkResult = true; } @@ -610,4 +619,26 @@ public class ExecutorService extends BaseService { return null; } + /** + * check if the process instance contains valid forced success task + * + * @param processInstanceId + * @return + */ + private boolean checkValidForcedSuccessTask(int processInstanceId) { + List forcedSuccessList = processService.findTaskIdByInstanceState(processInstanceId, ExecutionStatus.FORCED_SUCCESS); + if (forcedSuccessList != null && !forcedSuccessList.isEmpty()) { + return true; + } + + List failedSubList = processService.findTaskIdByInstanceStatusAndType(processInstanceId, + new ExecutionStatus[]{ExecutionStatus.FAILURE, ExecutionStatus.KILL, ExecutionStatus.NEED_FAULT_TOLERANCE}, + TaskType.SUB_PROCESS); + for (int i = 0; i < failedSubList.size(); i++) { + if (processService.haveForcedSuccessInSubProcess(failedSubList.get(i))) { + return true; + } + } + return false; + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java index 012af8fd38..b3f810f9b4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java @@ -145,6 +145,51 @@ public class TaskInstanceService extends BaseService { return result; } + /** + * change one single task instance's state from failure to forced success + * + * @param loginUser login user + * @param projectName project name + * @param taskInstanceId task instance id + * @return the result code and msg + */ + public Map forceSingleTaskSuccess(User loginUser, String projectName, Integer taskInstanceId) { + Map result = new HashMap<>(5); + Project project = projectMapper.queryByName(projectName); + + // check user auth + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status status = (Status) checkResult.get(Constants.STATUS); + if (status != Status.SUCCESS) { + return checkResult; + } + + // check whether the task instance can be found + TaskInstance task = taskInstanceMapper.selectById(taskInstanceId); + if (task == null) { + putMsg(result, Status.TASK_INSTANCE_NOT_FOUND); + return result; + } + + // check whether the task instance state type is failure + if (!task.getState().typeIsFailure()) { + putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString()); + return result; + } + + // change the state of the task instance + task.setState(ExecutionStatus.FORCED_SUCCESS); + int changedNum = taskInstanceMapper.updateById(task); + if (changedNum > 0) { + putMsg(result, Status.SUCCESS); + } + else { + putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR); + } + + return result; + } + /*** * generate {@link org.apache.dolphinscheduler.api.enums.Status#REQUEST_PARAMS_NOT_VALID_ERROR} res with param name * @param result exist result map diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java index c9560e1c50..7254fc1a88 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.api.service.DataAnalysisService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -114,12 +115,17 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis * @return process instance state count data */ public Map countProcessInstanceStateByProject(User loginUser, int projectId, String startDate, String endDate) { - return this.countStateByProject( + Map result = this.countStateByProject( loginUser, projectId, startDate, endDate, (start, end, projectIds) -> this.processInstanceMapper.countInstanceStateByUser(start, end, projectIds)); + // process state count needs to remove state of forced success + if (result.containsKey(Constants.STATUS) && result.get(Constants.STATUS).equals(Status.SUCCESS)) { + ((TaskCountDto)result.get(Constants.DATA_LIST)).removeStateFromCountList(ExecutionStatus.FORCED_SUCCESS); + } + return result; } private Map countStateByProject(User loginUser, int projectId, String startDate, String endDate diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java index b97c7c192f..00f14f477e 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java @@ -28,6 +28,14 @@ import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.User; + +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import java.util.HashMap; import java.util.Map; @@ -40,11 +48,16 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.http.MediaType; +import org.springframework.test.web.servlet.MvcResult; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + /** * task instance controller test */ @RunWith(MockitoJUnitRunner.Silent.class) -public class TaskInstanceControllerTest { +public class TaskInstanceControllerTest extends AbstractControllerTest { @InjectMocks private TaskInstanceController taskInstanceController; @@ -70,4 +83,24 @@ public class TaskInstanceControllerTest { } + @Test + public void testForceSingleTaskSuccess() throws Exception { + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + paramsMap.add("taskInstanceId","104"); + + Map mockResult = new HashMap<>(5); + mockResult.put(Constants.STATUS, Status.SUCCESS); + mockResult.put(Constants.MSG, Status.SUCCESS.getMsg()); + when(taskInstanceService.forceSingleTaskSuccess(any(User.class), anyString(), anyInt())).thenReturn(mockResult); + + MvcResult mvcResult = mockMvc.perform(post("/projects/{projectName}/task-instance/force-success","test") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8)) + .andReturn(); + + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue()); + } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java index 9f8d1b21f0..532fcea581 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java @@ -18,19 +18,24 @@ package org.apache.dolphinscheduler.api.service; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.RunMode; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Tenant; @@ -82,12 +87,16 @@ public class ExecutorService2Test { private int processDefinitionId = 1; + private int processInstanceId = 1; + private int tenantId = 1; private int userId = 1; private ProcessDefinition processDefinition = new ProcessDefinition(); + private ProcessInstance processInstance = new ProcessInstance(); + private User loginUser = new User(); private String projectName = "projectName"; @@ -107,6 +116,13 @@ public class ExecutorService2Test { processDefinition.setTenantId(tenantId); processDefinition.setUserId(userId); + // processInstance + processInstance.setId(processInstanceId); + processInstance.setProcessDefinitionId(processDefinitionId); + processInstance.setState(ExecutionStatus.FAILURE); + processInstance.setExecutorId(userId); + processInstance.setTenantId(tenantId); + // project project.setName(projectName); @@ -120,6 +136,8 @@ public class ExecutorService2Test { Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant()); Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1); Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(getMasterServersList()); + Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance); + Mockito.when(processService.findProcessDefineById(processDefinitionId)).thenReturn(processDefinition); } /** @@ -257,6 +275,39 @@ public class ExecutorService2Test { } + @Test + public void testExecute() { + List mockRes = new ArrayList<>(); + mockRes.add(1); + mockRes.add(2); + Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))) + .thenReturn(true); + + // check execute type error + processInstance.setState(ExecutionStatus.SUCCESS); + Map checkExeTypeRes = executorService.execute(loginUser, projectName, processInstanceId, ExecuteType.RESUME_FROM_FORCED_SUCCESS); + Assert.assertEquals(Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, checkExeTypeRes.get(Constants.STATUS)); + + // no valid forced success task + processInstance.setState(ExecutionStatus.FAILURE); + Map noValidTaskRes = executorService.execute(loginUser, projectName, processInstanceId, ExecuteType.RESUME_FROM_FORCED_SUCCESS); + Assert.assertEquals(Status.NO_VALID_FORCED_SUCCESS_TASK, noValidTaskRes.get(Constants.STATUS)); + + // have forced success in sub-process + Mockito.when(processService.findTaskIdByInstanceStatusAndType(anyInt(), any(ExecutionStatus[].class), any(TaskType.class))) + .thenReturn(mockRes); + Mockito.when(processService.haveForcedSuccessInSubProcess(anyInt())) + .thenReturn(true); + Map successRes1 = executorService.execute(loginUser, projectName, processInstanceId, ExecuteType.RESUME_FROM_FORCED_SUCCESS); + Assert.assertEquals(Status.SUCCESS, successRes1.get(Constants.STATUS)); + + // test success + Mockito.when(processService.findTaskIdByInstanceState(processInstanceId, ExecutionStatus.FORCED_SUCCESS)).thenReturn(mockRes); + Map successRes = executorService.execute(loginUser, projectName, processInstanceId, ExecuteType.RESUME_FROM_FORCED_SUCCESS); + Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); + verify(processService, times(2)).createCommand(any(Command.class)); + } + private List getMasterServersList() { List masterServerList = new ArrayList<>(); Server masterServer1 = new Server(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java index 199b34cc1b..1c77cc0ed1 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java @@ -214,4 +214,48 @@ public class TaskInstanceServiceTest { result.put(Constants.MSG, status.getMsg()); } } -} \ No newline at end of file + + @Test + public void forceSingleTaskSuccess() { + User user = getAdminUser(); + String projectName = "test"; + Project project = getProject(projectName); + int taskId = 1; + TaskInstance task = getTaskInstance(); + + Map mockSuccess = new HashMap<>(5); + putMsg(mockSuccess, Status.SUCCESS); + when(projectMapper.queryByName(projectName)).thenReturn(project); + + // user auth failed + Map mockFailure = new HashMap<>(5); + putMsg(mockFailure, Status.USER_NO_OPERATION_PROJECT_PERM, user.getUserName(), projectName); + when(projectService.checkProjectAndAuth(user, project, projectName)).thenReturn(mockFailure); + Map authFailRes = taskInstanceService.forceSingleTaskSuccess(user, projectName, taskId); + Assert.assertNotSame(Status.SUCCESS, authFailRes.get(Constants.STATUS)); + + // test task not found + when(projectService.checkProjectAndAuth(user, project, projectName)).thenReturn(mockSuccess); + when(taskInstanceMapper.selectById(Mockito.anyInt())).thenReturn(null); + Map taskNotFoundRes = taskInstanceService.forceSingleTaskSuccess(user, projectName, taskId); + Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND, taskNotFoundRes.get(Constants.STATUS)); + + // test task instance state error + task.setState(ExecutionStatus.SUCCESS); + when(taskInstanceMapper.selectById(1)).thenReturn(task); + Map taskStateErrorRes = taskInstanceService.forceSingleTaskSuccess(user, projectName, taskId); + Assert.assertEquals(Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskStateErrorRes.get(Constants.STATUS)); + + // test error + task.setState(ExecutionStatus.FAILURE); + when(taskInstanceMapper.updateById(task)).thenReturn(0); + Map errorRes = taskInstanceService.forceSingleTaskSuccess(user, projectName, taskId); + Assert.assertEquals(Status.FORCE_TASK_SUCCESS_ERROR, errorRes.get(Constants.STATUS)); + + // test success + task.setState(ExecutionStatus.FAILURE); + when(taskInstanceMapper.updateById(task)).thenReturn(1); + Map successRes = taskInstanceService.forceSingleTaskSuccess(user, projectName, taskId); + Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java index 9682016d6f..3b663bc74d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java @@ -39,6 +39,7 @@ public enum CommandType { * 8 pause a process * 9 stop a process * 10 recover waiting thread + * 11 resume process from forced-success task nodes */ START_PROCESS(0, "start a new process"), START_CURRENT_TASK_PROCESS(1, "start a new process from current nodes"), @@ -50,7 +51,8 @@ public enum CommandType { REPEAT_RUNNING(7, "repeat running a process"), PAUSE(8, "pause a process"), STOP(9, "stop a process"), - RECOVER_WAITTING_THREAD(10, "recover waiting thread"); + RECOVER_WAITTING_THREAD(10, "recover waiting thread"), + RESUME_FROM_FORCED_SUCCESS(11, "resume process from forced-success task nodes"); CommandType(int code, String descp){ this.code = code; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java index f6ac2cf5ab..2f5c022333 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java @@ -41,6 +41,7 @@ public enum ExecutionStatus { * 10 waiting thread * 11 waiting depend node complete * 12 delay execution + * 13 forced success */ SUBMITTED_SUCCESS(0, "submit success"), RUNNING_EXECUTION(1, "running"), @@ -54,7 +55,8 @@ public enum ExecutionStatus { KILL(9, "kill"), WAITTING_THREAD(10, "waiting thread"), WAITTING_DEPEND(11, "waiting depend node complete"), - DELAY_EXECUTION(12, "delay execution"); + DELAY_EXECUTION(12, "delay execution"), + FORCED_SUCCESS(13, "forced success"); ExecutionStatus(int code, String descp) { this.code = code; @@ -79,7 +81,7 @@ public enum ExecutionStatus { * @return status */ public boolean typeIsSuccess() { - return this == SUCCESS; + return this == SUCCESS || this == FORCED_SUCCESS; } /** diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java index 23db4b626b..9350612f60 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/BusinessTimeUtils.java @@ -54,6 +54,7 @@ public class BusinessTimeUtils { case RECOVER_SUSPENDED_PROCESS: case START_FAILURE_TASK_PROCESS: case REPEAT_RUNNING: + case RESUME_FROM_FORCED_SUCCESS: case SCHEDULER: default: businessDate = addDays(new Date(), -1); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java index b0e9ca7338..6dc38fd294 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java @@ -72,4 +72,12 @@ public interface TaskInstanceMapper extends BaseMapper { @Param("startTime") Date startTime, @Param("endTime") Date endTime ); + + List queryTaskByPIdAndStatusAndType(@Param("processInstanceId") Integer processInstanceId, + @Param("states") int[] stateArray, + @Param("taskType") String taskType); + + List queryTaskBySubProcessTaskIdAndStatusAndType(@Param("subProcessTaskId") Integer subProcessTaskId, + @Param("states") int[] stateArray, + @Param("taskType") String taskType); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index 71bd251a4a..0f394fea5e 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -152,4 +152,27 @@ order by instance.start_time desc + + diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java index a225f7654c..2c40f04848 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java @@ -14,8 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.dao.mapper; +package org.apache.dolphinscheduler.dao.mapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.junit.Assert; import org.junit.Test; @@ -36,6 +37,7 @@ import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.transaction.annotation.Transactional; +import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -55,20 +57,38 @@ public class TaskInstanceMapperTest { @Autowired ProcessInstanceMapper processInstanceMapper; + @Autowired + ProcessInstanceMapMapper processInstanceMapMapper; + /** * insert + * * @return TaskInstance */ - private TaskInstance insertOne(){ + private TaskInstance insertOne() { //insertOne + return insertOne("us task", 1, ExecutionStatus.RUNNING_EXECUTION, TaskType.SHELL.toString()); + } + + /** + * construct a task instance and then insert + * + * @param taskName + * @param processInstanceId + * @param state + * @param taskType + * @return + */ + private TaskInstance insertOne(String taskName, int processInstanceId, ExecutionStatus state, String taskType) { TaskInstance taskInstance = new TaskInstance(); taskInstance.setFlag(Flag.YES); - taskInstance.setName("ut task"); - taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + taskInstance.setName(taskName); + taskInstance.setState(state); taskInstance.setStartTime(new Date()); taskInstance.setEndTime(new Date()); taskInstance.setTaskJson("{}"); - taskInstance.setTaskType(TaskType.SHELL.toString()); + taskInstance.setProcessInstanceId(processInstanceId); + taskInstance.setTaskType(taskType); taskInstanceMapper.insert(taskInstance); return taskInstance; } @@ -90,7 +110,7 @@ public class TaskInstanceMapperTest { * test delete */ @Test - public void testDelete(){ + public void testDelete() { TaskInstance taskInstance = insertOne(); int delete = taskInstanceMapper.deleteById(taskInstance.getId()); Assert.assertEquals(1, delete); @@ -149,7 +169,7 @@ public class TaskInstanceMapperTest { taskInstanceMapper.deleteById(task2.getId()); taskInstanceMapper.deleteById(task.getId()); Assert.assertNotEquals(taskInstances.size(), 0); - Assert.assertNotEquals(taskInstances1.size(), 0 ); + Assert.assertNotEquals(taskInstances1.size(), 0); } /** @@ -298,4 +318,53 @@ public class TaskInstanceMapperTest { Assert.assertNotEquals(taskInstanceIPage.getTotal(), 0); } -} \ No newline at end of file + + @Test + public void testQueryTaskByPIdAndStatusAndType() { + // insert three task instances with the same process instance id + List taskList = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + String name = "ut task" + String.valueOf(i); + taskList.add(insertOne(name, 66, ExecutionStatus.FAILURE, TaskType.SUB_PROCESS.toString())); + } + + // test query result + List resultArray = taskInstanceMapper.queryTaskByPIdAndStatusAndType(66, + new int[] {ExecutionStatus.FAILURE.ordinal(), ExecutionStatus.KILL.ordinal(), ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal()}, + TaskType.SUB_PROCESS.toString()); + Assert.assertEquals(3, resultArray.size()); + + // delete + for (int i = 0; i < 3; i++) { + taskInstanceMapper.deleteById(taskList.get(i)); + } + } + + @Test + public void testQueryTaskBySubProcessTaskIdAndStatusAndType() { + TaskInstance parentTask = insertOne("parent-task", 66, ExecutionStatus.FAILURE, TaskType.SUB_PROCESS.toString()); + + ProcessInstanceMap processInstanceMap = new ProcessInstanceMap(); + processInstanceMap.setParentProcessInstanceId(66); + processInstanceMap.setParentTaskInstanceId(parentTask.getId()); + processInstanceMap.setProcessInstanceId(67); + processInstanceMapMapper.insert(processInstanceMap); + + TaskInstance subTask1 = insertOne("sub1", 67, ExecutionStatus.SUCCESS, TaskType.SHELL.toString()); + TaskInstance subTask2 = insertOne("sub2", 67, ExecutionStatus.FORCED_SUCCESS, TaskType.SHELL.toString()); + + // test query result + List resultList = taskInstanceMapper.queryTaskBySubProcessTaskIdAndStatusAndType(parentTask.getId(), + new int[] {ExecutionStatus.FORCED_SUCCESS.ordinal()}, + null); + + Assert.assertEquals(1, resultList.size()); + Assert.assertEquals(subTask2.getId(), resultList.get(0).intValue()); + + // delete + taskInstanceMapper.deleteById(parentTask.getId()); + processInstanceMapMapper.deleteById(processInstanceMap.getId()); + taskInstanceMapper.deleteById(subTask1.getId()); + taskInstanceMapper.deleteById(subTask2.getId()); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index e68f9d5937..a400a7d60a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -414,7 +414,7 @@ public class MasterExecThread implements Runnable { if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(task.getName(), dag)) { continue; } - if (task.getState().typeIsFailure() && !task.taskCanRetry()) { + if (task.getState().typeIsFailure() && !task.taskCanRetry() && !task.isConditionsTask()) { errorTaskList.put(task.getName(), task); } } @@ -987,6 +987,7 @@ public class MasterExecThread implements Runnable { // updateProcessInstance completed task status // failure priority is higher than pause // if a task fails, other suspended tasks need to be reset kill + // check if there exists forced success nodes in errorTaskList if (errorTaskList.size() > 0) { for (Map.Entry entry : completeTaskList.entrySet()) { TaskInstance completeTask = entry.getValue(); @@ -996,6 +997,22 @@ public class MasterExecThread implements Runnable { processService.updateTaskInstance(completeTask); } } + for (Map.Entry entry : errorTaskList.entrySet()) { + TaskInstance errorTask = entry.getValue(); + TaskInstance currentTask = processService.findTaskInstanceById(errorTask.getId()); + if (currentTask == null) { + continue; + } + // for nodes that have been forced success + if (errorTask.getState().typeIsFailure() && currentTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) { + // update state in this thread and remove from errorTaskList + errorTask.setState(currentTask.getState()); + logger.info("task: {} has been forced success, remove it from error task list", errorTask.getName()); + errorTaskList.remove(errorTask.getName()); + // submit post nodes + submitPostNode(errorTask.getName()); + } + } } if (canSubmitTaskToQueue()) { submitStandByTask(); @@ -1096,6 +1113,18 @@ public class MasterExecThread implements Runnable { int length = readyToSubmitTaskQueue.size(); for (int i = 0; i < length; i++) { TaskInstance task = readyToSubmitTaskQueue.peek(); + // stop tasks which is retrying if forced success happens + if (task.taskCanRetry()) { + TaskInstance tmpTask = processService.findTaskInstanceById(task.getId()); + if (tmpTask != null && tmpTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) { + task.setState(tmpTask.getState()); + logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName()); + removeTaskFromStandbyList(task); + completeTaskList.put(task.getName(), task); + submitPostNode(task.getName()); + continue; + } + } DependResult dependResult = getDependResultForTask(task); if (DependResult.SUCCESS == dependResult) { if (retryTaskIntervalOverTime(task)) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java index 6b8d926c0f..17dc049103 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java @@ -81,6 +81,8 @@ public class AlertManager { return "pause"; case STOP: return "stop"; + case RESUME_FROM_FORCED_SUCCESS: + return "resume from forced success"; default: return "unknown type"; } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 14990042bc..47069512f4 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.service.process; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID; @@ -37,6 +38,7 @@ import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.model.DateInterval; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -262,6 +264,7 @@ public class ProcessService { cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1); cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1); cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1); + cmdTypeMap.put(CommandType.RESUME_FROM_FORCED_SUCCESS, 1); CommandType commandType = command.getCommandType(); if (cmdTypeMap.containsKey(commandType)) { @@ -772,6 +775,31 @@ public class ProcessService { break; case SCHEDULER: break; + case RESUME_FROM_FORCED_SUCCESS: + List failedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(), + new ExecutionStatus[]{ExecutionStatus.FAILURE, ExecutionStatus.KILL, ExecutionStatus.NEED_FAULT_TOLERANCE}, + TaskType.SUB_PROCESS); + + for (int i = 0; i < failedSubList.size(); i++) { + // if there exists forced success in the sub_process + if (haveForcedSuccessInSubProcess(failedSubList.get(i))) { + // change sub_process task's state into submitted_success + TaskInstance taskInstance = this.findTaskInstanceById(failedSubList.get(i)); + taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); + updateTaskInstance(taskInstance); + } + } + /** + * set resume node list to null + * 1. we can have a complete dag in the ExecThread so that it can restore the previous context + * 2. each time the operation is done the state of process will be reasonable as usual + */ + cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING); + cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING, + String.join(Constants.COMMA, convertIntListToString(null))); + processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); + processInstance.setRunTimes(runTime + 1); + break; default: break; } @@ -779,6 +807,30 @@ public class ProcessService { return processInstance; } + /** + * recursively check if a sub process node contains forced success node + * @param taskInstanceId task instance id + * @return true or false + */ + public boolean haveForcedSuccessInSubProcess(int taskInstanceId) { + List forcedSuccessList = this.findTaskIdInSubProcessByStatusAndType(taskInstanceId, + new ExecutionStatus[]{ExecutionStatus.FORCED_SUCCESS}, + null); + if (forcedSuccessList != null && !forcedSuccessList.isEmpty()) { + return true; + } + + List childSubList = this.findTaskIdInSubProcessByStatusAndType(taskInstanceId, + new ExecutionStatus[]{ExecutionStatus.FAILURE, ExecutionStatus.KILL, ExecutionStatus.NEED_FAULT_TOLERANCE}, + TaskType.SUB_PROCESS); + for (Integer child : childSubList) { + if (haveForcedSuccessInSubProcess(child)) { + return true; + } + } + return false; + } + /** * return complement data if the process start with complement data * @@ -1362,6 +1414,40 @@ public class ProcessService { return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, state.ordinal()); } + /** + * get id list by task state and type + * @param processInstanceId process instance id + * @param states task instance state array + * @param taskType task type + * @return task instance id list + */ + public List findTaskIdByInstanceStatusAndType(int processInstanceId, ExecutionStatus[] states, TaskType taskType) { + int[] statesArray = new int[states.length]; + for (int i = 0; i < states.length; i++) { + statesArray[i] = states[i].ordinal(); + } + return taskInstanceMapper.queryTaskByPIdAndStatusAndType(processInstanceId, statesArray, taskType.toString()); + } + + /** + * get tasks in sub_process by sub_process task id and state and type + * if param type is null, it queries all types + * @param taskId task instance id + * @param states task instance state array + * @param taskType task type + * @return task instance id list + */ + public List findTaskIdInSubProcessByStatusAndType(int taskId, ExecutionStatus[] states, TaskType taskType) { + int[] statesArray = new int[states.length]; + for (int i = 0; i < states.length; i++) { + statesArray[i] = states[i].ordinal(); + } + if (taskType == null) { + return taskInstanceMapper.queryTaskBySubProcessTaskIdAndStatusAndType(taskId, statesArray, null); + } + return taskInstanceMapper.queryTaskBySubProcessTaskIdAndStatusAndType(taskId, statesArray, taskType.toString()); + } + /** * find valid task list by process definition id *