diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java index 56e7ef2087..a07478315a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.api.controller; +import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR; import org.apache.dolphinscheduler.api.exceptions.ApiException; @@ -36,6 +37,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestAttribute; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -127,4 +129,27 @@ public class TaskInstanceController extends BaseController { return returnDataListPaging(result); } + /** + * change one task instance's state from FAILURE to FORCED_SUCCESS + * + * @param loginUser login user + * @param projectName project name + * @param taskInstanceId task instance id + * @return the result code and msg + */ + @ApiOperation(value = "force-success", notes = "FORCE_TASK_SUCCESS") + @ApiImplicitParams({ + @ApiImplicitParam(name = "taskInstanceId", value = "TASK_INSTANCE_ID", required = true, dataType = "Int", example = "12") + }) + @PostMapping(value = "/force-success") + @ResponseStatus(HttpStatus.OK) + @ApiException(FORCE_TASK_SUCCESS_ERROR) + public Result forceTaskSuccess(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, + @RequestParam(value = "taskInstanceId") Integer taskInstanceId) { + logger.info("force task success, project: {}, task instance id: {}", projectName, taskInstanceId); + Map result = taskInstanceService.forceTaskSuccess(loginUser, projectName, taskInstanceId); + return returnDataList(result); + } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java index 6f42ba9f56..41c7abba85 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java @@ -57,6 +57,16 @@ public class TaskCountDto { .sum(); } + // remove the specified state + public void removeStateFromCountList(ExecutionStatus status) { + for (TaskStateCount count : this.taskCountDtos) { + if (count.getTaskStateType().equals(status)) { + this.taskCountDtos.remove(count); + break; + } + } + } + public List getTaskCountDtos() { return taskCountDtos; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java index cc1797295a..f4dac5aecd 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java @@ -30,7 +30,7 @@ public enum ExecuteType { * 4 stop * 5 pause */ - NONE,REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE; + NONE, REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE; public static ExecuteType getEnum(int value){ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 86b815cf9a..52d246dcb9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -196,6 +196,9 @@ public enum Status { QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_ERROR(10162, "query authorized and user created project error error", "查询授权的和用户创建的项目错误"), DELETE_PROCESS_DEFINITION_BY_ID_FAIL(10163,"delete process definition by id fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"), CHECK_OS_TENANT_CODE_ERROR(10164, "Please enter the English os tenant code", "请输入英文操作系统租户"), + FORCE_TASK_SUCCESS_ERROR(10165, "force task success error", "强制成功任务实例错误"), + TASK_INSTANCE_STATE_OPERATION_ERROR(10166, "the status of task instance {0} is {1},Cannot perform force success operation", "任务实例[{0}]的状态是[{1}],无法执行强制成功操作"), + UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), @@ -247,7 +250,7 @@ public enum Status { BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR(50026, "batch delete process definition by ids {0} error", "批量删除工作流定义[{0}]错误"), TENANT_NOT_SUITABLE(50027, "there is not any tenant suitable, please choose a tenant available.", "没有合适的租户,请选择可用的租户"), EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"), - BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028, "batch export process definition by ids error", "批量导出工作流定义错误"), + BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028,"batch export process definition by ids error", "批量导出工作流定义错误"), IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java index 012af8fd38..dd2caff3b6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java @@ -145,6 +145,50 @@ public class TaskInstanceService extends BaseService { return result; } + /** + * change one task instance's state from failure to forced success + * + * @param loginUser login user + * @param projectName project name + * @param taskInstanceId task instance id + * @return the result code and msg + */ + public Map forceTaskSuccess(User loginUser, String projectName, Integer taskInstanceId) { + Map result = new HashMap<>(5); + Project project = projectMapper.queryByName(projectName); + + // check user auth + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status status = (Status) checkResult.get(Constants.STATUS); + if (status != Status.SUCCESS) { + return checkResult; + } + + // check whether the task instance can be found + TaskInstance task = taskInstanceMapper.selectById(taskInstanceId); + if (task == null) { + putMsg(result, Status.TASK_INSTANCE_NOT_FOUND); + return result; + } + + // check whether the task instance state type is failure + if (!task.getState().typeIsFailure()) { + putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString()); + return result; + } + + // change the state of the task instance + task.setState(ExecutionStatus.FORCED_SUCCESS); + int changedNum = taskInstanceMapper.updateById(task); + if (changedNum > 0) { + putMsg(result, Status.SUCCESS); + } else { + putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR); + } + + return result; + } + /*** * generate {@link org.apache.dolphinscheduler.api.enums.Status#REQUEST_PARAMS_NOT_VALID_ERROR} res with param name * @param result exist result map diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java index c9560e1c50..7254fc1a88 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.api.service.DataAnalysisService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -114,12 +115,17 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis * @return process instance state count data */ public Map countProcessInstanceStateByProject(User loginUser, int projectId, String startDate, String endDate) { - return this.countStateByProject( + Map result = this.countStateByProject( loginUser, projectId, startDate, endDate, (start, end, projectIds) -> this.processInstanceMapper.countInstanceStateByUser(start, end, projectIds)); + // process state count needs to remove state of forced success + if (result.containsKey(Constants.STATUS) && result.get(Constants.STATUS).equals(Status.SUCCESS)) { + ((TaskCountDto)result.get(Constants.DATA_LIST)).removeStateFromCountList(ExecutionStatus.FORCED_SUCCESS); + } + return result; } private Map countStateByProject(User loginUser, int projectId, String startDate, String endDate diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java index b97c7c192f..031d7819cb 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java @@ -18,8 +18,13 @@ package org.apache.dolphinscheduler.api.controller; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.TaskInstanceService; @@ -27,24 +32,28 @@ import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.User; import java.util.HashMap; import java.util.Map; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.http.MediaType; +import org.springframework.test.web.servlet.MvcResult; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; /** * task instance controller test */ -@RunWith(MockitoJUnitRunner.Silent.class) -public class TaskInstanceControllerTest { +public class TaskInstanceControllerTest extends AbstractControllerTest { @InjectMocks private TaskInstanceController taskInstanceController; @@ -67,7 +76,27 @@ public class TaskInstanceControllerTest { Result taskResult = taskInstanceController.queryTaskListPaging(null, "", 1, "", "", "", "", ExecutionStatus.SUCCESS,"192.168.xx.xx", "2020-01-01 00:00:00", "2020-01-02 00:00:00",pageNo, pageSize); Assert.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode()); - } + @Ignore + @Test + public void testForceTaskSuccess() throws Exception { + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + paramsMap.add("taskInstanceId", "104"); + + Map mockResult = new HashMap<>(5); + mockResult.put(Constants.STATUS, Status.SUCCESS); + mockResult.put(Constants.MSG, Status.SUCCESS.getMsg()); + when(taskInstanceService.forceTaskSuccess(any(User.class), anyString(), anyInt())).thenReturn(mockResult); + + MvcResult mvcResult = mockMvc.perform(post("/projects/{projectName}/task-instance/force-success", "cxc_1113") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8)) + .andReturn(); + + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); + } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java index 9f8d1b21f0..09b9f7d2e5 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java @@ -25,12 +25,14 @@ import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.RunMode; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Tenant; @@ -82,12 +84,16 @@ public class ExecutorService2Test { private int processDefinitionId = 1; + private int processInstanceId = 1; + private int tenantId = 1; private int userId = 1; private ProcessDefinition processDefinition = new ProcessDefinition(); + private ProcessInstance processInstance = new ProcessInstance(); + private User loginUser = new User(); private String projectName = "projectName"; @@ -107,6 +113,13 @@ public class ExecutorService2Test { processDefinition.setTenantId(tenantId); processDefinition.setUserId(userId); + // processInstance + processInstance.setId(processInstanceId); + processInstance.setProcessDefinitionId(processDefinitionId); + processInstance.setState(ExecutionStatus.FAILURE); + processInstance.setExecutorId(userId); + processInstance.setTenantId(tenantId); + // project project.setName(projectName); @@ -120,6 +133,8 @@ public class ExecutorService2Test { Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant()); Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1); Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(getMasterServersList()); + Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance); + Mockito.when(processService.findProcessDefineById(processDefinitionId)).thenReturn(processDefinition); } /** diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java index 199b34cc1b..b1989b4e31 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java @@ -214,4 +214,48 @@ public class TaskInstanceServiceTest { result.put(Constants.MSG, status.getMsg()); } } -} \ No newline at end of file + + @Test + public void forceTaskSuccess() { + User user = getAdminUser(); + String projectName = "test"; + Project project = getProject(projectName); + int taskId = 1; + TaskInstance task = getTaskInstance(); + + Map mockSuccess = new HashMap<>(5); + putMsg(mockSuccess, Status.SUCCESS); + when(projectMapper.queryByName(projectName)).thenReturn(project); + + // user auth failed + Map mockFailure = new HashMap<>(5); + putMsg(mockFailure, Status.USER_NO_OPERATION_PROJECT_PERM, user.getUserName(), projectName); + when(projectService.checkProjectAndAuth(user, project, projectName)).thenReturn(mockFailure); + Map authFailRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId); + Assert.assertNotSame(Status.SUCCESS, authFailRes.get(Constants.STATUS)); + + // test task not found + when(projectService.checkProjectAndAuth(user, project, projectName)).thenReturn(mockSuccess); + when(taskInstanceMapper.selectById(Mockito.anyInt())).thenReturn(null); + Map taskNotFoundRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId); + Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND, taskNotFoundRes.get(Constants.STATUS)); + + // test task instance state error + task.setState(ExecutionStatus.SUCCESS); + when(taskInstanceMapper.selectById(1)).thenReturn(task); + Map taskStateErrorRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId); + Assert.assertEquals(Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskStateErrorRes.get(Constants.STATUS)); + + // test error + task.setState(ExecutionStatus.FAILURE); + when(taskInstanceMapper.updateById(task)).thenReturn(0); + Map errorRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId); + Assert.assertEquals(Status.FORCE_TASK_SUCCESS_ERROR, errorRes.get(Constants.STATUS)); + + // test success + task.setState(ExecutionStatus.FAILURE); + when(taskInstanceMapper.updateById(task)).thenReturn(1); + Map successRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId); + Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java index f6ac2cf5ab..2f5c022333 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java @@ -41,6 +41,7 @@ public enum ExecutionStatus { * 10 waiting thread * 11 waiting depend node complete * 12 delay execution + * 13 forced success */ SUBMITTED_SUCCESS(0, "submit success"), RUNNING_EXECUTION(1, "running"), @@ -54,7 +55,8 @@ public enum ExecutionStatus { KILL(9, "kill"), WAITTING_THREAD(10, "waiting thread"), WAITTING_DEPEND(11, "waiting depend node complete"), - DELAY_EXECUTION(12, "delay execution"); + DELAY_EXECUTION(12, "delay execution"), + FORCED_SUCCESS(13, "forced success"); ExecutionStatus(int code, String descp) { this.code = code; @@ -79,7 +81,7 @@ public enum ExecutionStatus { * @return status */ public boolean typeIsSuccess() { - return this == SUCCESS; + return this == SUCCESS || this == FORCED_SUCCESS; } /** diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java index 89a8605a99..3d4f65ab50 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java @@ -63,7 +63,7 @@ public class VarPoolUtils { * @throws ParseException ParseException */ public static void convertVarPoolToMap(Map propToValue, String varPool) throws ParseException { - if (varPool == null || propToValue == null) { + if (propToValue == null || StringUtils.isEmpty(varPool)) { return; } String[] splits = varPool.split("\\$VarPool\\$"); diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java index a225f7654c..4bef7f1b68 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java @@ -14,8 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.dao.mapper; +package org.apache.dolphinscheduler.dao.mapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -27,6 +27,10 @@ import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; + +import java.util.Date; +import java.util.List; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -36,9 +40,6 @@ import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.transaction.annotation.Transactional; -import java.util.Date; -import java.util.List; - @RunWith(SpringRunner.class) @SpringBootTest @Transactional @@ -55,20 +56,38 @@ public class TaskInstanceMapperTest { @Autowired ProcessInstanceMapper processInstanceMapper; + @Autowired + ProcessInstanceMapMapper processInstanceMapMapper; + /** * insert + * * @return TaskInstance */ - private TaskInstance insertOne(){ + private TaskInstance insertOne() { //insertOne + return insertOne("us task", 1, ExecutionStatus.RUNNING_EXECUTION, TaskType.SHELL.toString()); + } + + /** + * construct a task instance and then insert + * + * @param taskName + * @param processInstanceId + * @param state + * @param taskType + * @return + */ + private TaskInstance insertOne(String taskName, int processInstanceId, ExecutionStatus state, String taskType) { TaskInstance taskInstance = new TaskInstance(); taskInstance.setFlag(Flag.YES); - taskInstance.setName("ut task"); - taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + taskInstance.setName(taskName); + taskInstance.setState(state); taskInstance.setStartTime(new Date()); taskInstance.setEndTime(new Date()); taskInstance.setTaskJson("{}"); - taskInstance.setTaskType(TaskType.SHELL.toString()); + taskInstance.setProcessInstanceId(processInstanceId); + taskInstance.setTaskType(taskType); taskInstanceMapper.insert(taskInstance); return taskInstance; } @@ -90,7 +109,7 @@ public class TaskInstanceMapperTest { * test delete */ @Test - public void testDelete(){ + public void testDelete() { TaskInstance taskInstance = insertOne(); int delete = taskInstanceMapper.deleteById(taskInstance.getId()); Assert.assertEquals(1, delete); @@ -149,7 +168,7 @@ public class TaskInstanceMapperTest { taskInstanceMapper.deleteById(task2.getId()); taskInstanceMapper.deleteById(task.getId()); Assert.assertNotEquals(taskInstances.size(), 0); - Assert.assertNotEquals(taskInstances1.size(), 0 ); + Assert.assertNotEquals(taskInstances1.size(), 0); } /** @@ -298,4 +317,4 @@ public class TaskInstanceMapperTest { Assert.assertNotEquals(taskInstanceIPage.getTotal(), 0); } -} \ No newline at end of file +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index e68f9d5937..f5946d5614 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -987,6 +987,7 @@ public class MasterExecThread implements Runnable { // updateProcessInstance completed task status // failure priority is higher than pause // if a task fails, other suspended tasks need to be reset kill + // check if there exists forced success nodes in errorTaskList if (errorTaskList.size() > 0) { for (Map.Entry entry : completeTaskList.entrySet()) { TaskInstance completeTask = entry.getValue(); @@ -996,6 +997,22 @@ public class MasterExecThread implements Runnable { processService.updateTaskInstance(completeTask); } } + for (Map.Entry entry : errorTaskList.entrySet()) { + TaskInstance errorTask = entry.getValue(); + TaskInstance currentTask = processService.findTaskInstanceById(errorTask.getId()); + if (currentTask == null) { + continue; + } + // for nodes that have been forced success + if (errorTask.getState().typeIsFailure() && currentTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) { + // update state in this thread and remove from errorTaskList + errorTask.setState(currentTask.getState()); + logger.info("task: {} has been forced success, remove it from error task list", errorTask.getName()); + errorTaskList.remove(errorTask.getName()); + // submit post nodes + submitPostNode(errorTask.getName()); + } + } } if (canSubmitTaskToQueue()) { submitStandByTask(); @@ -1096,6 +1113,18 @@ public class MasterExecThread implements Runnable { int length = readyToSubmitTaskQueue.size(); for (int i = 0; i < length; i++) { TaskInstance task = readyToSubmitTaskQueue.peek(); + // stop tasks which is retrying if forced success happens + if (task.taskCanRetry()) { + TaskInstance retryTask = processService.findTaskInstanceById(task.getId()); + if (retryTask != null && retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) { + task.setState(retryTask.getState()); + logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName()); + removeTaskFromStandbyList(task); + completeTaskList.put(task.getName(), task); + submitPostNode(task.getName()); + continue; + } + } DependResult dependResult = getDependResultForTask(task); if (DependResult.SUCCESS == dependResult) { if (retryTaskIntervalOverTime(task)) { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue index 69d1a4c503..980a755120 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue @@ -17,7 +17,7 @@