Browse Source

Merge pull request #4267 from chengshiwen/dev

[Feature-3575][*] Force Task Success
pull/3/MERGE
dailidong 4 years ago committed by GitHub
parent
commit
21d17783c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java
  2. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java
  3. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java
  4. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  5. 44
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
  6. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java
  7. 39
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java
  8. 15
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
  9. 44
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
  10. 6
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
  11. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
  12. 39
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
  13. 29
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  14. 65
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue
  15. 17
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/index.vue
  16. 6
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue
  17. 8
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/index.vue

25
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.controller; package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR;
import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.exceptions.ApiException;
@ -36,6 +37,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute; import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RequestParam;
@ -127,4 +129,27 @@ public class TaskInstanceController extends BaseController {
return returnDataListPaging(result); return returnDataListPaging(result);
} }
/**
* change one task instance's state from FAILURE to FORCED_SUCCESS
*
* @param loginUser login user
* @param projectName project name
* @param taskInstanceId task instance id
* @return the result code and msg
*/
@ApiOperation(value = "force-success", notes = "FORCE_TASK_SUCCESS")
@ApiImplicitParams({
@ApiImplicitParam(name = "taskInstanceId", value = "TASK_INSTANCE_ID", required = true, dataType = "Int", example = "12")
})
@PostMapping(value = "/force-success")
@ResponseStatus(HttpStatus.OK)
@ApiException(FORCE_TASK_SUCCESS_ERROR)
public Result<Object> forceTaskSuccess(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam(value = "taskInstanceId") Integer taskInstanceId) {
logger.info("force task success, project: {}, task instance id: {}", projectName, taskInstanceId);
Map<String, Object> result = taskInstanceService.forceTaskSuccess(loginUser, projectName, taskInstanceId);
return returnDataList(result);
}
} }

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java

@ -57,6 +57,16 @@ public class TaskCountDto {
.sum(); .sum();
} }
// remove the specified state
public void removeStateFromCountList(ExecutionStatus status) {
for (TaskStateCount count : this.taskCountDtos) {
if (count.getTaskStateType().equals(status)) {
this.taskCountDtos.remove(count);
break;
}
}
}
public List<TaskStateCount> getTaskCountDtos() { public List<TaskStateCount> getTaskCountDtos() {
return taskCountDtos; return taskCountDtos;
} }

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java

@ -30,7 +30,7 @@ public enum ExecuteType {
* 4 stop * 4 stop
* 5 pause * 5 pause
*/ */
NONE,REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE; NONE, REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE;
public static ExecuteType getEnum(int value){ public static ExecuteType getEnum(int value){

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -196,6 +196,9 @@ public enum Status {
QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_ERROR(10162, "query authorized and user created project error error", "查询授权的和用户创建的项目错误"), QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_ERROR(10162, "query authorized and user created project error error", "查询授权的和用户创建的项目错误"),
DELETE_PROCESS_DEFINITION_BY_ID_FAIL(10163,"delete process definition by id fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"), DELETE_PROCESS_DEFINITION_BY_ID_FAIL(10163,"delete process definition by id fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"),
CHECK_OS_TENANT_CODE_ERROR(10164, "Please enter the English os tenant code", "请输入英文操作系统租户"), CHECK_OS_TENANT_CODE_ERROR(10164, "Please enter the English os tenant code", "请输入英文操作系统租户"),
FORCE_TASK_SUCCESS_ERROR(10165, "force task success error", "强制成功任务实例错误"),
TASK_INSTANCE_STATE_OPERATION_ERROR(10166, "the status of task instance {0} is {1},Cannot perform force success operation", "任务实例[{0}]的状态是[{1}],无法执行强制成功操作"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
@ -247,7 +250,7 @@ public enum Status {
BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR(50026, "batch delete process definition by ids {0} error", "批量删除工作流定义[{0}]错误"), BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR(50026, "batch delete process definition by ids {0} error", "批量删除工作流定义[{0}]错误"),
TENANT_NOT_SUITABLE(50027, "there is not any tenant suitable, please choose a tenant available.", "没有合适的租户,请选择可用的租户"), TENANT_NOT_SUITABLE(50027, "there is not any tenant suitable, please choose a tenant available.", "没有合适的租户,请选择可用的租户"),
EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"), EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"),
BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028, "batch export process definition by ids error", "批量导出工作流定义错误"), BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028,"batch export process definition by ids error", "批量导出工作流定义错误"),
IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"), IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),

44
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java

@ -145,6 +145,50 @@ public class TaskInstanceService extends BaseService {
return result; return result;
} }
/**
* change one task instance's state from failure to forced success
*
* @param loginUser login user
* @param projectName project name
* @param taskInstanceId task instance id
* @return the result code and msg
*/
public Map<String, Object> forceTaskSuccess(User loginUser, String projectName, Integer taskInstanceId) {
Map<String, Object> result = new HashMap<>(5);
Project project = projectMapper.queryByName(projectName);
// check user auth
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status status = (Status) checkResult.get(Constants.STATUS);
if (status != Status.SUCCESS) {
return checkResult;
}
// check whether the task instance can be found
TaskInstance task = taskInstanceMapper.selectById(taskInstanceId);
if (task == null) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return result;
}
// check whether the task instance state type is failure
if (!task.getState().typeIsFailure()) {
putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString());
return result;
}
// change the state of the task instance
task.setState(ExecutionStatus.FORCED_SUCCESS);
int changedNum = taskInstanceMapper.updateById(task);
if (changedNum > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR);
}
return result;
}
/*** /***
* generate {@link org.apache.dolphinscheduler.api.enums.Status#REQUEST_PARAMS_NOT_VALID_ERROR} res with param name * generate {@link org.apache.dolphinscheduler.api.enums.Status#REQUEST_PARAMS_NOT_VALID_ERROR} res with param name
* @param result exist result map * @param result exist result map

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.api.service.DataAnalysisService;
import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
@ -114,12 +115,17 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis
* @return process instance state count data * @return process instance state count data
*/ */
public Map<String, Object> countProcessInstanceStateByProject(User loginUser, int projectId, String startDate, String endDate) { public Map<String, Object> countProcessInstanceStateByProject(User loginUser, int projectId, String startDate, String endDate) {
return this.countStateByProject( Map<String, Object> result = this.countStateByProject(
loginUser, loginUser,
projectId, projectId,
startDate, startDate,
endDate, endDate,
(start, end, projectIds) -> this.processInstanceMapper.countInstanceStateByUser(start, end, projectIds)); (start, end, projectIds) -> this.processInstanceMapper.countInstanceStateByUser(start, end, projectIds));
// process state count needs to remove state of forced success
if (result.containsKey(Constants.STATUS) && result.get(Constants.STATUS).equals(Status.SUCCESS)) {
((TaskCountDto)result.get(Constants.DATA_LIST)).removeStateFromCountList(ExecutionStatus.FORCED_SUCCESS);
}
return result;
} }
private Map<String, Object> countStateByProject(User loginUser, int projectId, String startDate, String endDate private Map<String, Object> countStateByProject(User loginUser, int projectId, String startDate, String endDate

39
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java

@ -18,8 +18,13 @@
package org.apache.dolphinscheduler.api.controller; package org.apache.dolphinscheduler.api.controller;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.TaskInstanceService; import org.apache.dolphinscheduler.api.service.TaskInstanceService;
@ -27,24 +32,28 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks; import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner; import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/** /**
* task instance controller test * task instance controller test
*/ */
@RunWith(MockitoJUnitRunner.Silent.class) public class TaskInstanceControllerTest extends AbstractControllerTest {
public class TaskInstanceControllerTest {
@InjectMocks @InjectMocks
private TaskInstanceController taskInstanceController; private TaskInstanceController taskInstanceController;
@ -67,7 +76,27 @@ public class TaskInstanceControllerTest {
Result taskResult = taskInstanceController.queryTaskListPaging(null, "", 1, "", "", Result taskResult = taskInstanceController.queryTaskListPaging(null, "", 1, "", "",
"", "", ExecutionStatus.SUCCESS,"192.168.xx.xx", "2020-01-01 00:00:00", "2020-01-02 00:00:00",pageNo, pageSize); "", "", ExecutionStatus.SUCCESS,"192.168.xx.xx", "2020-01-01 00:00:00", "2020-01-02 00:00:00",pageNo, pageSize);
Assert.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode()); Assert.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode());
} }
@Ignore
@Test
public void testForceTaskSuccess() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("taskInstanceId", "104");
Map<String, Object> mockResult = new HashMap<>(5);
mockResult.put(Constants.STATUS, Status.SUCCESS);
mockResult.put(Constants.MSG, Status.SUCCESS.getMsg());
when(taskInstanceService.forceTaskSuccess(any(User.class), anyString(), anyInt())).thenReturn(mockResult);
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectName}/task-instance/force-success", "cxc_1113")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
}
} }

15
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java

@ -25,12 +25,14 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode; import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.Tenant;
@ -82,12 +84,16 @@ public class ExecutorService2Test {
private int processDefinitionId = 1; private int processDefinitionId = 1;
private int processInstanceId = 1;
private int tenantId = 1; private int tenantId = 1;
private int userId = 1; private int userId = 1;
private ProcessDefinition processDefinition = new ProcessDefinition(); private ProcessDefinition processDefinition = new ProcessDefinition();
private ProcessInstance processInstance = new ProcessInstance();
private User loginUser = new User(); private User loginUser = new User();
private String projectName = "projectName"; private String projectName = "projectName";
@ -107,6 +113,13 @@ public class ExecutorService2Test {
processDefinition.setTenantId(tenantId); processDefinition.setTenantId(tenantId);
processDefinition.setUserId(userId); processDefinition.setUserId(userId);
// processInstance
processInstance.setId(processInstanceId);
processInstance.setProcessDefinitionId(processDefinitionId);
processInstance.setState(ExecutionStatus.FAILURE);
processInstance.setExecutorId(userId);
processInstance.setTenantId(tenantId);
// project // project
project.setName(projectName); project.setName(projectName);
@ -120,6 +133,8 @@ public class ExecutorService2Test {
Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant()); Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1); Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1);
Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(getMasterServersList()); Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(getMasterServersList());
Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance);
Mockito.when(processService.findProcessDefineById(processDefinitionId)).thenReturn(processDefinition);
} }
/** /**

44
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java

@ -214,4 +214,48 @@ public class TaskInstanceServiceTest {
result.put(Constants.MSG, status.getMsg()); result.put(Constants.MSG, status.getMsg());
} }
} }
@Test
public void forceTaskSuccess() {
User user = getAdminUser();
String projectName = "test";
Project project = getProject(projectName);
int taskId = 1;
TaskInstance task = getTaskInstance();
Map<String, Object> mockSuccess = new HashMap<>(5);
putMsg(mockSuccess, Status.SUCCESS);
when(projectMapper.queryByName(projectName)).thenReturn(project);
// user auth failed
Map<String, Object> mockFailure = new HashMap<>(5);
putMsg(mockFailure, Status.USER_NO_OPERATION_PROJECT_PERM, user.getUserName(), projectName);
when(projectService.checkProjectAndAuth(user, project, projectName)).thenReturn(mockFailure);
Map<String, Object> authFailRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
Assert.assertNotSame(Status.SUCCESS, authFailRes.get(Constants.STATUS));
// test task not found
when(projectService.checkProjectAndAuth(user, project, projectName)).thenReturn(mockSuccess);
when(taskInstanceMapper.selectById(Mockito.anyInt())).thenReturn(null);
Map<String, Object> taskNotFoundRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND, taskNotFoundRes.get(Constants.STATUS));
// test task instance state error
task.setState(ExecutionStatus.SUCCESS);
when(taskInstanceMapper.selectById(1)).thenReturn(task);
Map<String, Object> taskStateErrorRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
Assert.assertEquals(Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskStateErrorRes.get(Constants.STATUS));
// test error
task.setState(ExecutionStatus.FAILURE);
when(taskInstanceMapper.updateById(task)).thenReturn(0);
Map<String, Object> errorRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
Assert.assertEquals(Status.FORCE_TASK_SUCCESS_ERROR, errorRes.get(Constants.STATUS));
// test success
task.setState(ExecutionStatus.FAILURE);
when(taskInstanceMapper.updateById(task)).thenReturn(1);
Map<String, Object> successRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
} }

6
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java

@ -41,6 +41,7 @@ public enum ExecutionStatus {
* 10 waiting thread * 10 waiting thread
* 11 waiting depend node complete * 11 waiting depend node complete
* 12 delay execution * 12 delay execution
* 13 forced success
*/ */
SUBMITTED_SUCCESS(0, "submit success"), SUBMITTED_SUCCESS(0, "submit success"),
RUNNING_EXECUTION(1, "running"), RUNNING_EXECUTION(1, "running"),
@ -54,7 +55,8 @@ public enum ExecutionStatus {
KILL(9, "kill"), KILL(9, "kill"),
WAITTING_THREAD(10, "waiting thread"), WAITTING_THREAD(10, "waiting thread"),
WAITTING_DEPEND(11, "waiting depend node complete"), WAITTING_DEPEND(11, "waiting depend node complete"),
DELAY_EXECUTION(12, "delay execution"); DELAY_EXECUTION(12, "delay execution"),
FORCED_SUCCESS(13, "forced success");
ExecutionStatus(int code, String descp) { ExecutionStatus(int code, String descp) {
this.code = code; this.code = code;
@ -79,7 +81,7 @@ public enum ExecutionStatus {
* @return status * @return status
*/ */
public boolean typeIsSuccess() { public boolean typeIsSuccess() {
return this == SUCCESS; return this == SUCCESS || this == FORCED_SUCCESS;
} }
/** /**

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java

@ -63,7 +63,7 @@ public class VarPoolUtils {
* @throws ParseException ParseException * @throws ParseException ParseException
*/ */
public static void convertVarPoolToMap(Map<String, Object> propToValue, String varPool) throws ParseException { public static void convertVarPoolToMap(Map<String, Object> propToValue, String varPool) throws ParseException {
if (varPool == null || propToValue == null) { if (propToValue == null || StringUtils.isEmpty(varPool)) {
return; return;
} }
String[] splits = varPool.split("\\$VarPool\\$"); String[] splits = varPool.split("\\$VarPool\\$");

39
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java

@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.mapper;
package org.apache.dolphinscheduler.dao.mapper;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@ -27,6 +27,10 @@ import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.Date;
import java.util.List;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -36,9 +40,6 @@ import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.List;
@RunWith(SpringRunner.class) @RunWith(SpringRunner.class)
@SpringBootTest @SpringBootTest
@Transactional @Transactional
@ -55,20 +56,38 @@ public class TaskInstanceMapperTest {
@Autowired @Autowired
ProcessInstanceMapper processInstanceMapper; ProcessInstanceMapper processInstanceMapper;
@Autowired
ProcessInstanceMapMapper processInstanceMapMapper;
/** /**
* insert * insert
*
* @return TaskInstance * @return TaskInstance
*/ */
private TaskInstance insertOne(){ private TaskInstance insertOne() {
//insertOne //insertOne
return insertOne("us task", 1, ExecutionStatus.RUNNING_EXECUTION, TaskType.SHELL.toString());
}
/**
* construct a task instance and then insert
*
* @param taskName
* @param processInstanceId
* @param state
* @param taskType
* @return
*/
private TaskInstance insertOne(String taskName, int processInstanceId, ExecutionStatus state, String taskType) {
TaskInstance taskInstance = new TaskInstance(); TaskInstance taskInstance = new TaskInstance();
taskInstance.setFlag(Flag.YES); taskInstance.setFlag(Flag.YES);
taskInstance.setName("ut task"); taskInstance.setName(taskName);
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); taskInstance.setState(state);
taskInstance.setStartTime(new Date()); taskInstance.setStartTime(new Date());
taskInstance.setEndTime(new Date()); taskInstance.setEndTime(new Date());
taskInstance.setTaskJson("{}"); taskInstance.setTaskJson("{}");
taskInstance.setTaskType(TaskType.SHELL.toString()); taskInstance.setProcessInstanceId(processInstanceId);
taskInstance.setTaskType(taskType);
taskInstanceMapper.insert(taskInstance); taskInstanceMapper.insert(taskInstance);
return taskInstance; return taskInstance;
} }
@ -90,7 +109,7 @@ public class TaskInstanceMapperTest {
* test delete * test delete
*/ */
@Test @Test
public void testDelete(){ public void testDelete() {
TaskInstance taskInstance = insertOne(); TaskInstance taskInstance = insertOne();
int delete = taskInstanceMapper.deleteById(taskInstance.getId()); int delete = taskInstanceMapper.deleteById(taskInstance.getId());
Assert.assertEquals(1, delete); Assert.assertEquals(1, delete);
@ -149,7 +168,7 @@ public class TaskInstanceMapperTest {
taskInstanceMapper.deleteById(task2.getId()); taskInstanceMapper.deleteById(task2.getId());
taskInstanceMapper.deleteById(task.getId()); taskInstanceMapper.deleteById(task.getId());
Assert.assertNotEquals(taskInstances.size(), 0); Assert.assertNotEquals(taskInstances.size(), 0);
Assert.assertNotEquals(taskInstances1.size(), 0 ); Assert.assertNotEquals(taskInstances1.size(), 0);
} }
/** /**

29
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -987,6 +987,7 @@ public class MasterExecThread implements Runnable {
// updateProcessInstance completed task status // updateProcessInstance completed task status
// failure priority is higher than pause // failure priority is higher than pause
// if a task fails, other suspended tasks need to be reset kill // if a task fails, other suspended tasks need to be reset kill
// check if there exists forced success nodes in errorTaskList
if (errorTaskList.size() > 0) { if (errorTaskList.size() > 0) {
for (Map.Entry<String, TaskInstance> entry : completeTaskList.entrySet()) { for (Map.Entry<String, TaskInstance> entry : completeTaskList.entrySet()) {
TaskInstance completeTask = entry.getValue(); TaskInstance completeTask = entry.getValue();
@ -996,6 +997,22 @@ public class MasterExecThread implements Runnable {
processService.updateTaskInstance(completeTask); processService.updateTaskInstance(completeTask);
} }
} }
for (Map.Entry<String, TaskInstance> entry : errorTaskList.entrySet()) {
TaskInstance errorTask = entry.getValue();
TaskInstance currentTask = processService.findTaskInstanceById(errorTask.getId());
if (currentTask == null) {
continue;
}
// for nodes that have been forced success
if (errorTask.getState().typeIsFailure() && currentTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
// update state in this thread and remove from errorTaskList
errorTask.setState(currentTask.getState());
logger.info("task: {} has been forced success, remove it from error task list", errorTask.getName());
errorTaskList.remove(errorTask.getName());
// submit post nodes
submitPostNode(errorTask.getName());
}
}
} }
if (canSubmitTaskToQueue()) { if (canSubmitTaskToQueue()) {
submitStandByTask(); submitStandByTask();
@ -1096,6 +1113,18 @@ public class MasterExecThread implements Runnable {
int length = readyToSubmitTaskQueue.size(); int length = readyToSubmitTaskQueue.size();
for (int i = 0; i < length; i++) { for (int i = 0; i < length; i++) {
TaskInstance task = readyToSubmitTaskQueue.peek(); TaskInstance task = readyToSubmitTaskQueue.peek();
// stop tasks which is retrying if forced success happens
if (task.taskCanRetry()) {
TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
if (retryTask != null && retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
task.setState(retryTask.getState());
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
removeTaskFromStandbyList(task);
completeTaskList.put(task.getName(), task);
submitPostNode(task.getName());
continue;
}
}
DependResult dependResult = getDependResultForTask(task); DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) { if (DependResult.SUCCESS == dependResult) {
if (retryTaskIntervalOverTime(task)) { if (retryTaskIntervalOverTime(task)) {

65
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue

@ -17,7 +17,7 @@
<template> <template>
<div class="list-model" style="position: relative;"> <div class="list-model" style="position: relative;">
<div class="table-box"> <div class="table-box">
<el-table :data="list" size="mini" style="width: 100%" @selection-change="_arrDelChange"> <el-table class="fixed" :data="list" size="mini" style="width: 100%" @selection-change="_arrDelChange">
<el-table-column type="selection" width="50"></el-table-column> <el-table-column type="selection" width="50"></el-table-column>
<el-table-column type="index" :label="$t('#')" width="50"></el-table-column> <el-table-column type="index" :label="$t('#')" width="50"></el-table-column>
<el-table-column :label="$t('Process Name')" min-width="200"> <el-table-column :label="$t('Process Name')" min-width="200">
@ -102,35 +102,34 @@
<div v-show="!scope.row.disabled"> <div v-show="!scope.row.disabled">
<!--Edit--> <!--Edit-->
<el-button <el-button
type="info" type="info"
size="mini" size="mini"
icon="el-icon-edit-outline" icon="el-icon-edit-outline"
disabled="true" disabled="true"
circle> circle>
</el-button> </el-button>
<!--Rerun--> <!--Rerun-->
<el-tooltip :content="$t('Rerun')" placement="top" :enterable="false"> <span>
<span> <el-button
<el-button v-show="buttonType === 'run'"
v-show="buttonType === 'run'" type="info"
type="info" size="mini"
size="mini" disabled="true"
disabled="true" circle>
circle> <span style="padding: 0 2px">{{scope.row.count}}</span>
<span style="padding: 0 2px">{{scope.row.count}}</span> </el-button>
</el-button> </span>
</span>
</el-tooltip>
<el-button <el-button
v-show="buttonType !== 'run'" v-show="buttonType !== 'run'"
type="info" type="info"
size="mini" size="mini"
icon="el-icon-refresh" icon="el-icon-refresh"
disabled="true" disabled="true"
circle> circle>
</el-button> </el-button>
<!--Store-->
<span> <span>
<el-button <el-button
v-show="buttonType === 'store'" v-show="buttonType === 'store'"
@ -153,13 +152,12 @@
<!--Recovery Suspend/Pause--> <!--Recovery Suspend/Pause-->
<span> <span>
<el-button <el-button
style="padding: 0 3px"
v-show="(scope.row.state === 'PAUSE' || scope.row.state === 'STOP') && buttonType === 'suspend'" v-show="(scope.row.state === 'PAUSE' || scope.row.state === 'STOP') && buttonType === 'suspend'"
type="warning" type="warning"
size="mini" size="mini"
circle circle
disabled="true"> disabled="true">
{{scope.row.count}} <span style="padding: 0 3px">{{scope.row.count}}</span>
</el-button> </el-button>
</span> </span>
@ -185,7 +183,19 @@
</el-button> </el-button>
</span> </span>
<!--delete--> <!--Stop-->
<span>
<el-button
v-show="scope.row.state !== 'STOP'"
type="warning"
size="mini"
circle
icon="el-icon-video-pause"
disabled="true">
</el-button>
</span>
<!--Delete-->
<el-button <el-button
type="danger" type="danger"
circle circle
@ -286,7 +296,6 @@
* @param REPEAT_RUNNING * @param REPEAT_RUNNING
*/ */
_reRun (item, index) { _reRun (item, index) {
console.log(index)
this._countDownFn({ this._countDownFn({
id: item.id, id: item.id,
executeType: 'REPEAT_RUNNING', executeType: 'REPEAT_RUNNING',

17
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/index.vue

@ -213,21 +213,10 @@
.fixed { .fixed {
table-layout: auto; table-layout: auto;
tr { tr {
th:last-child,td:last-child {
background: inherit;
width: 230px;
height: 40px;
line-height: 40px;
border-left:1px solid #ecf3ff;
position: absolute;
right: 0;
z-index: 2;
}
td:last-child { td:last-child {
border-bottom:1px solid #ecf3ff; .el-button+.el-button {
} margin-left: 0;
th:nth-last-child(2) { }
padding-right: 260px;
} }
} }
} }

6
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue

@ -117,15 +117,14 @@
this.logDialog = true this.logDialog = true
}, },
ok () {}, ok () {},
close () { close () {
this.logDialog = false this.logDialog = false
}, },
_forceSuccess (item) { _forceSuccess (item) {
this.forceTaskSuccess({ taskInstanceId: item.id }).then(res => { this.forceTaskSuccess({ taskInstanceId: item.id }).then(res => {
if (res.code === 0) { if (res.code === 0) {
this.$message.success(res.msg) this.$message.success(res.msg)
setTimeout(this._onUpdate, 1000)
} else { } else {
this.$message.error(res.msg) this.$message.error(res.msg)
} }
@ -133,6 +132,9 @@
this.$message.error(e.msg) this.$message.error(e.msg)
}) })
}, },
_onUpdate () {
this.$emit('on-update')
},
_go (item) { _go (item) {
this.$router.push({ path: `/projects/instance/list/${item.processInstanceId}` }) this.$router.push({ path: `/projects/instance/list/${item.processInstanceId}` })
} }

8
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/index.vue

@ -23,7 +23,7 @@
<template slot="content"> <template slot="content">
<template v-if="taskInstanceList.length"> <template v-if="taskInstanceList.length">
<m-list :task-instance-list="taskInstanceList" :page-no="searchParams.pageNo" :page-size="searchParams.pageSize"> <m-list :task-instance-list="taskInstanceList" @on-update="_onUpdate" :page-no="searchParams.pageNo" :page-size="searchParams.pageSize">
</m-list> </m-list>
<div class="page-box"> <div class="page-box">
<el-pagination <el-pagination
@ -126,6 +126,12 @@
this.isLoading = false this.isLoading = false
}) })
}, },
/**
* update
*/
_onUpdate () {
this._debounceGET()
},
/** /**
* Anti shake request interface * Anti shake request interface
* @desc Prevent functions from being called multiple times * @desc Prevent functions from being called multiple times

Loading…
Cancel
Save