Browse Source

Remove alert when delete workflow instance (#13281)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
fd3afd84ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/BaseController.java
  2. 14
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
  3. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
  4. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/v2/WorkflowInstanceV2Controller.java
  5. 14
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  6. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
  7. 60
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  8. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  9. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/Result.java
  10. 10
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java
  11. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/v2/WorkflowInstanceV2ControllerTest.java
  12. 59
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  13. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
  14. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
  15. 1
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java
  16. 6
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml
  17. 5
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/BaseController.java

@ -47,8 +47,8 @@ public class BaseController {
* @return check result code * @return check result code
*/ */
// todo: directly throw exception // todo: directly throw exception
public Result checkPageParams(int pageNo, int pageSize) { public <T> Result<T> checkPageParams(int pageNo, int pageSize) {
Result result = new Result(); Result<T> result = new Result<>();
Status resultEnum = Status.SUCCESS; Status resultEnum = Status.SUCCESS;
String msg = Status.SUCCESS.getMsg(); String msg = Status.SUCCESS.getMsg();
if (pageNo <= 0) { if (pageNo <= 0) {

14
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java

@ -267,11 +267,11 @@ public class ProcessInstanceController extends BaseController {
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
@ApiException(Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR) @ApiException(Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser") @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<ProcessInstance> deleteProcessInstanceById(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, public Result<Void> deleteProcessInstanceById(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable("id") Integer id) { @PathVariable("id") Integer id) {
Map<String, Object> result = processInstanceService.deleteProcessInstanceById(loginUser, projectCode, id); processInstanceService.deleteProcessInstanceById(loginUser, id);
return returnDataList(result); return Result.success();
} }
/** /**
@ -396,13 +396,9 @@ public class ProcessInstanceController extends BaseController {
for (String strProcessInstanceId : processInstanceIdArray) { for (String strProcessInstanceId : processInstanceIdArray) {
int processInstanceId = Integer.parseInt(strProcessInstanceId); int processInstanceId = Integer.parseInt(strProcessInstanceId);
try { try {
Map<String, Object> deleteResult = processInstanceService.deleteProcessInstanceById(loginUser, processInstanceId);
processInstanceService.deleteProcessInstanceById(loginUser, projectCode, processInstanceId);
if (!Status.SUCCESS.equals(deleteResult.get(Constants.STATUS))) {
deleteFailedIdList.add((String) deleteResult.get(Constants.MSG));
logger.error((String) deleteResult.get(Constants.MSG));
}
} catch (Exception e) { } catch (Exception e) {
logger.error("Delete workflow instance: {} error", strProcessInstanceId, e);
deleteFailedIdList deleteFailedIdList
.add(MessageFormat.format(Status.PROCESS_INSTANCE_ERROR.getMsg(), strProcessInstanceId)); .add(MessageFormat.format(Status.PROCESS_INSTANCE_ERROR.getMsg(), strProcessInstanceId));
} }

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java

@ -46,11 +46,13 @@ import org.apache.dolphinscheduler.api.dto.resources.DeleteDataTransferResponse;
import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.ResourcesService; import org.apache.dolphinscheduler.api.service.ResourcesService;
import org.apache.dolphinscheduler.api.service.UdfFuncService; import org.apache.dolphinscheduler.api.service.UdfFuncService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
@ -233,14 +235,14 @@ public class ResourcesController extends BaseController {
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_RESOURCES_LIST_PAGING) @ApiException(QUERY_RESOURCES_LIST_PAGING)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser") @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<Object> queryResourceListPaging(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, public Result<PageInfo<StorageEntity>> queryResourceListPaging(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "fullName") String fullName, @RequestParam(value = "fullName") String fullName,
@RequestParam(value = "tenantCode") String tenantCode, @RequestParam(value = "tenantCode") String tenantCode,
@RequestParam(value = "type") ResourceType type, @RequestParam(value = "type") ResourceType type,
@RequestParam("pageNo") Integer pageNo, @RequestParam("pageNo") Integer pageNo,
@RequestParam(value = "searchVal", required = false) String searchVal, @RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam("pageSize") Integer pageSize) { @RequestParam("pageSize") Integer pageSize) {
Result<Object> result = checkPageParams(pageNo, pageSize); Result<PageInfo<StorageEntity>> result = checkPageParams(pageNo, pageSize);
if (!result.checkResult()) { if (!result.checkResult()) {
return result; return result;
} }

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/v2/WorkflowInstanceV2Controller.java

@ -122,7 +122,7 @@ public class WorkflowInstanceV2Controller extends BaseController {
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
@ApiException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR) @ApiException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser") @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result deleteWorkflowInstance(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, public Result<Void> deleteWorkflowInstance(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable("workflowInstanceId") Integer workflowInstanceId) { @PathVariable("workflowInstanceId") Integer workflowInstanceId) {
processInstanceService.deleteProcessInstanceById(loginUser, workflowInstanceId); processInstanceService.deleteProcessInstanceById(loginUser, workflowInstanceId);
return Result.success(); return Result.success();

14
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -176,24 +176,12 @@ public interface ProcessInstanceService {
* delete process instance by id, at the same timedelete task instance and their mapping relation data * delete process instance by id, at the same timedelete task instance and their mapping relation data
* *
* @param loginUser login user * @param loginUser login user
* @param projectCode project code
* @param processInstanceId process instance id * @param processInstanceId process instance id
* @return delete result code * @return delete result code
*/ */
Map<String, Object> deleteProcessInstanceById(User loginUser, void deleteProcessInstanceById(User loginUser,
long projectCode,
Integer processInstanceId); Integer processInstanceId);
/**
* delete process instance by id, at the same timedelete task instance and their mapping relation data
*
* @param loginUser login user
* @param workflowInstanceId work instance id
* @return delete result code
*/
Map<String, Object> deleteProcessInstanceById(User loginUser,
Integer workflowInstanceId);
/** /**
* view process instance variables * view process instance variables
* *

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.dto.resources.DeleteDataTransferResponse; import org.apache.dolphinscheduler.api.dto.resources.DeleteDataTransferResponse;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
@ -97,8 +98,9 @@ public interface ResourcesService {
* @param pageSize page size * @param pageSize page size
* @return resource list page * @return resource list page
*/ */
Result queryResourceListPaging(User loginUser, String fullName, String resTenantCode, Result<PageInfo<StorageEntity>> queryResourceListPaging(User loginUser, String fullName, String resTenantCode,
ResourceType type, String searchVal, Integer pageNo, Integer pageSize); ResourceType type, String searchVal, Integer pageNo,
Integer pageSize);
/** /**
* query resource list * query resource list

60
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -50,6 +50,7 @@ import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
@ -174,6 +175,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
@Autowired @Autowired
private ScheduleMapper scheduleMapper; private ScheduleMapper scheduleMapper;
@Autowired
private AlertDao alertDao;
@Autowired @Autowired
private CuringParamsService curingGlobalParamsService; private CuringParamsService curingGlobalParamsService;
@ -781,23 +785,21 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
* delete process instance by id, at the same timedelete task instance and their mapping relation data * delete process instance by id, at the same timedelete task instance and their mapping relation data
* *
* @param loginUser login user * @param loginUser login user
* @param projectCode project code
* @param processInstanceId process instance id * @param processInstanceId process instance id
* @return delete result code * @return delete result code
*/ */
@Override @Override
@Transactional @Transactional
public Map<String, Object> deleteProcessInstanceById(User loginUser, long projectCode, Integer processInstanceId) { public void deleteProcessInstanceById(User loginUser, Integer processInstanceId) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
ApiFuncIdentificationConstant.INSTANCE_DELETE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId) ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId)
.orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId)); .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
ProcessDefinition processDefinition = processDefinitionLogMapper.queryByDefinitionCodeAndVersion(
processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
Project project = projectMapper.queryByCode(processDefinition.getProjectCode());
// check user access for project
projectService.checkProjectAndAuthThrowException(loginUser, project,
ApiFuncIdentificationConstant.INSTANCE_DELETE);
// check process instance status // check process instance status
if (!processInstance.getState().isFinished()) { if (!processInstance.getState().isFinished()) {
logger.warn("Process Instance state is {} so can not delete process instance, processInstanceId:{}.", logger.warn("Process Instance state is {} so can not delete process instance, processInstanceId:{}.",
@ -806,14 +808,6 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processInstance.getState(), "delete"); processInstance.getState(), "delete");
} }
ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
logger.error("Process definition does not exist, projectCode:{}, ProcessDefinitionCode:{}.",
projectCode, processInstance.getProcessDefinitionCode());
throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
}
// delete database cascade // delete database cascade
int delete = processService.deleteWorkProcessInstanceById(processInstanceId); int delete = processService.deleteWorkProcessInstanceById(processInstanceId);
@ -824,38 +818,18 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
// When delete task instance error, the task log file will also be deleted, this may cause data inconsistency. // When delete task instance error, the task log file will also be deleted, this may cause data inconsistency.
processService.removeTaskLogFile(processInstanceId); processService.removeTaskLogFile(processInstanceId);
taskInstanceDao.deleteByWorkflowInstanceId(processInstanceId); taskInstanceDao.deleteByWorkflowInstanceId(processInstanceId);
alertDao.deleteByWorkflowInstanceId(processInstanceId);
if (delete > 0) { if (delete > 0) {
logger.info( logger.info(
"Delete process instance complete, projectCode:{}, ProcessDefinitionCode{}, processInstanceId:{}.", "Delete process instance complete, ProcessDefinitionCode{}, processInstanceId:{}.",
projectCode, processInstance.getProcessDefinitionCode(), processInstanceId); processInstance.getProcessDefinitionCode(), processInstanceId);
putMsg(result, Status.SUCCESS);
} else { } else {
logger.error( logger.error(
"Delete process instance error, projectCode:{}, ProcessDefinitionCode{}, processInstanceId:{}.", "Delete process instance error, ProcessDefinitionCode{}, processInstanceId:{}.",
projectCode, processInstance.getProcessDefinitionCode(), processInstanceId); processInstance.getProcessDefinitionCode(), processInstanceId);
putMsg(result, Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR);
throw new ServiceException(Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR); throw new ServiceException(Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR);
} }
return result;
}
/**
* delete workflow instance by id, at the same timedelete task instance and their mapping relation data
*
* @param loginUser login user
* @param workflowInstanceId workflow instance id
* @return delete result code
*/
@Override
public Map<String, Object> deleteProcessInstanceById(User loginUser, Integer workflowInstanceId) {
ProcessInstance processInstance = processService.findProcessInstanceDetailById(workflowInstanceId)
.orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, workflowInstanceId));
ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
return deleteProcessInstanceById(loginUser, processDefinition.getProjectCode(), workflowInstanceId);
} }
/** /**

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -679,10 +679,16 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
* @return resource list page * @return resource list page
*/ */
@Override @Override
public Result queryResourceListPaging(User loginUser, String fullName, String resTenantCode, public Result<PageInfo<StorageEntity>> queryResourceListPaging(User loginUser, String fullName,
ResourceType type, String searchVal, Integer pageNo, Integer pageSize) { String resTenantCode,
Result<Object> result = new Result<>(); ResourceType type, String searchVal, Integer pageNo,
Integer pageSize) {
Result<PageInfo<StorageEntity>> result = new Result<>();
PageInfo<StorageEntity> pageInfo = new PageInfo<>(pageNo, pageSize); PageInfo<StorageEntity> pageInfo = new PageInfo<>(pageNo, pageSize);
if (storageOperate == null) {
logger.warn("The resource storage is not opened.");
return Result.success(pageInfo);
}
User user = userMapper.selectById(loginUser.getId()); User user = userMapper.selectById(loginUser.getId());
if (user == null) { if (user == null) {

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/Result.java

@ -75,7 +75,7 @@ public class Result<T> {
return new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg(), data); return new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg(), data);
} }
public static Result success() { public static <T> Result<T> success() {
return success(null); return success(null);
} }

10
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java

@ -218,9 +218,7 @@ public class ProcessInstanceControllerTest extends AbstractControllerTest {
public void testDeleteProcessInstanceById() throws Exception { public void testDeleteProcessInstanceById() throws Exception {
Map<String, Object> mockResult = new HashMap<>(); Map<String, Object> mockResult = new HashMap<>();
mockResult.put(Constants.STATUS, Status.SUCCESS); mockResult.put(Constants.STATUS, Status.SUCCESS);
Mockito.when( Mockito.doNothing().when(processInstanceService).deleteProcessInstanceById(Mockito.any(), Mockito.anyInt());
processInstanceService.deleteProcessInstanceById(Mockito.any(), Mockito.anyLong(), Mockito.anyInt()))
.thenReturn(mockResult);
MvcResult mvcResult = mockMvc.perform(delete("/projects/{projectCode}/process-instances/{id}", "1113", "123") MvcResult mvcResult = mockMvc.perform(delete("/projects/{projectCode}/process-instances/{id}", "1113", "123")
.header(SESSION_ID, sessionId)) .header(SESSION_ID, sessionId))
@ -238,9 +236,7 @@ public class ProcessInstanceControllerTest extends AbstractControllerTest {
Map<String, Object> mockResult = new HashMap<>(); Map<String, Object> mockResult = new HashMap<>();
mockResult.put(Constants.STATUS, Status.PROCESS_INSTANCE_NOT_EXIST); mockResult.put(Constants.STATUS, Status.PROCESS_INSTANCE_NOT_EXIST);
Mockito.when( Mockito.doNothing().when(processInstanceService).deleteProcessInstanceById(Mockito.any(), Mockito.anyInt());
processInstanceService.deleteProcessInstanceById(Mockito.any(), Mockito.anyLong(), Mockito.anyInt()))
.thenReturn(mockResult);
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/process-instances/batch-delete", "1113") MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/process-instances/batch-delete", "1113")
.header(SESSION_ID, sessionId) .header(SESSION_ID, sessionId)
.param("processInstanceIds", "1205,1206")) .param("processInstanceIds", "1205,1206"))
@ -250,6 +246,6 @@ public class ProcessInstanceControllerTest extends AbstractControllerTest {
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assertions.assertNotNull(result); Assertions.assertNotNull(result);
Assertions.assertEquals(Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR.getCode(), result.getCode().intValue()); Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
} }
} }

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/v2/WorkflowInstanceV2ControllerTest.java

@ -95,7 +95,7 @@ public class WorkflowInstanceV2ControllerTest extends AbstractControllerTest {
public void testDeleteWorkflowInstanceById() { public void testDeleteWorkflowInstanceById() {
User loginUser = getLoginUser(); User loginUser = getLoginUser();
Mockito.when(processInstanceService.deleteProcessInstanceById(any(), eq(1))).thenReturn(null); Mockito.doNothing().when(processInstanceService).deleteProcessInstanceById(any(), eq(1));
Result result = workflowInstanceV2Controller.deleteWorkflowInstance(loginUser, 1); Result result = workflowInstanceV2Controller.deleteWorkflowInstance(loginUser, 1);
Assertions.assertTrue(result.isSuccess()); Assertions.assertTrue(result.isSuccess());
} }

59
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -146,6 +147,9 @@ public class ProcessInstanceServiceTest {
@Mock @Mock
CuringParamsService curingGlobalParamsService; CuringParamsService curingGlobalParamsService;
@Mock
AlertDao alertDao;
private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789," private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789," + "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
+ "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]"; + "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
@ -522,15 +526,10 @@ public class ProcessInstanceServiceTest {
when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_UPDATE)).thenReturn(result); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_UPDATE)).thenReturn(result);
when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.empty()); when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.empty());
try { Assertions.assertThrows(ServiceException.class, () -> {
Map<String, Object> processInstanceNullRes =
processInstanceService.updateProcessInstance(loginUser, projectCode, 1, processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
shellJson, taskJson, "2020-02-21 00:00:00", true, "", "", 0, ""); shellJson, taskJson, "2020-02-21 00:00:00", true, "", "", 0, "");
Assertions.fail(); });
} catch (ServiceException ex) {
Assertions.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST.getCode(), ex.getCode());
}
// process instance not finish // process instance not finish
when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.ofNullable(processInstance)); when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.ofNullable(processInstance));
processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
@ -598,13 +597,9 @@ public class ProcessInstanceServiceTest {
when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.empty()); when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.empty());
try { Assertions.assertThrows(ServiceException.class, () -> {
Map<String, Object> processInstanceNullRes =
processInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1); processInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1);
});
} catch (ServiceException ex) {
Assertions.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST.getCode(), ex.getCode());
}
// not sub process // not sub process
ProcessInstance processInstance = getProcessInstance(); ProcessInstance processInstance = getProcessInstance();
@ -642,9 +637,9 @@ public class ProcessInstanceServiceTest {
// project auth fail // project auth fail
when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_DELETE)).thenReturn(result); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_DELETE)).thenReturn(result);
Map<String, Object> projectAuthFailRes =
processInstanceService.deleteProcessInstanceById(loginUser, projectCode, 1); Assertions.assertThrows(ServiceException.class,
Assertions.assertEquals(Status.PROJECT_NOT_FOUND, projectAuthFailRes.get(Constants.STATUS)); () -> processInstanceService.deleteProcessInstanceById(loginUser, 1));
// not sub process // not sub process
ProcessInstance processInstance = getProcessInstance(); ProcessInstance processInstance = getProcessInstance();
@ -652,12 +647,10 @@ public class ProcessInstanceServiceTest {
processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
putMsg(result, Status.SUCCESS, projectCode); putMsg(result, Status.SUCCESS, projectCode);
when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.ofNullable(processInstance)); when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.ofNullable(processInstance));
try { when(processDefinitionLogMapper.queryByDefinitionCodeAndVersion(Mockito.anyLong(), Mockito.anyInt()))
processInstanceService.deleteProcessInstanceById(loginUser, projectCode, 1); .thenReturn(new ProcessDefinitionLog());
Assertions.fail(); Assertions.assertThrows(ServiceException.class,
} catch (ServiceException ex) { () -> processInstanceService.deleteProcessInstanceById(loginUser, 1));
Assertions.assertEquals(Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR.getCode(), ex.getCode());
}
processInstance.setState(WorkflowExecutionStatus.SUCCESS); processInstance.setState(WorkflowExecutionStatus.SUCCESS);
processInstance.setState(WorkflowExecutionStatus.SUCCESS); processInstance.setState(WorkflowExecutionStatus.SUCCESS);
@ -670,26 +663,18 @@ public class ProcessInstanceServiceTest {
processDefinition.setUserId(1); processDefinition.setUserId(1);
processDefinition.setProjectCode(0L); processDefinition.setProjectCode(0L);
when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
try { when(processService.findProcessInstanceDetailById(Mockito.anyInt())).thenReturn(Optional.empty());
processInstanceService.deleteProcessInstanceById(loginUser, projectCode, 1); Assertions.assertThrows(ServiceException.class,
Assertions.fail(); () -> processInstanceService.deleteProcessInstanceById(loginUser, 1));
} catch (ServiceException ex) {
Assertions.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST.getCode(), ex.getCode());
}
processDefinition.setProjectCode(projectCode); processDefinition.setProjectCode(projectCode);
when(processService.findProcessInstanceDetailById(Mockito.anyInt())).thenReturn(Optional.of(processInstance));
when(processService.deleteWorkProcessInstanceById(1)).thenReturn(1); when(processService.deleteWorkProcessInstanceById(1)).thenReturn(1);
Map<String, Object> successRes = processInstanceService.deleteProcessInstanceById(loginUser, 1);
processInstanceService.deleteProcessInstanceById(loginUser, projectCode, 1);
Assertions.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
when(processService.deleteWorkProcessInstanceById(1)).thenReturn(0); when(processService.deleteWorkProcessInstanceById(1)).thenReturn(0);
try { Assertions.assertThrows(ServiceException.class,
processInstanceService.deleteProcessInstanceById(loginUser, projectCode, 1); () -> processInstanceService.deleteProcessInstanceById(loginUser, 1));
Assertions.fail();
} catch (ServiceException ex) {
Assertions.assertEquals(Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR.getCode(), ex.getCode());
}
} }
@Test @Test

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java

@ -28,6 +28,8 @@ public final class Constants {
throw new UnsupportedOperationException("Construct Constants"); throw new UnsupportedOperationException("Construct Constants");
} }
public static final String AUTO_CLOSE_ALERT = "alert.auto-close";
/** /**
* common properties path * common properties path
*/ */

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java

@ -333,4 +333,11 @@ public class AlertDao {
public void setCrashAlarmSuppression(Integer crashAlarmSuppression) { public void setCrashAlarmSuppression(Integer crashAlarmSuppression) {
this.crashAlarmSuppression = crashAlarmSuppression; this.crashAlarmSuppression = crashAlarmSuppression;
} }
public void deleteByWorkflowInstanceId(Integer processInstanceId) {
if (processInstanceId == null) {
return;
}
alertMapper.deleteByWorkflowInstanceId(processInstanceId);
}
} }

1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java

@ -45,4 +45,5 @@ public interface AlertMapper extends BaseMapper<Alert> {
void insertAlertWhenServerCrash(@Param("alert") Alert alert, void insertAlertWhenServerCrash(@Param("alert") Alert alert,
@Param("crashAlarmSuppressionStartTime") Date crashAlarmSuppressionStartTime); @Param("crashAlarmSuppressionStartTime") Date crashAlarmSuppressionStartTime);
void deleteByWorkflowInstanceId(@Param("workflowInstanceId") Integer processInstanceId);
} }

6
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml

@ -52,4 +52,10 @@
where alert_status = #{alertStatus} where alert_status = #{alertStatus}
limit #{limit} limit #{limit}
</select> </select>
<delete id="deleteByWorkflowInstanceId">
delete
from t_ds_alert
where process_instance_id = #{workflowInstanceId}
</delete>
</mapper> </mapper>

5
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java

@ -17,11 +17,13 @@
package org.apache.dolphinscheduler.service.alert; package org.apache.dolphinscheduler.service.alert;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.DqExecuteResult; import org.apache.dolphinscheduler.dao.entity.DqExecuteResult;
@ -275,6 +277,9 @@ public class ProcessAlertManager {
* @param processInstance success process instance * @param processInstance success process instance
*/ */
public void closeAlert(ProcessInstance processInstance) { public void closeAlert(ProcessInstance processInstance) {
if (!PropertyUtils.getBoolean(Constants.AUTO_CLOSE_ALERT, false)) {
return;
}
List<Alert> alerts = alertDao.listAlerts(processInstance.getId()); List<Alert> alerts = alertDao.listAlerts(processInstance.getId());
if (CollectionUtils.isEmpty(alerts)) { if (CollectionUtils.isEmpty(alerts)) {
// no need to close alert // no need to close alert

Loading…
Cancel
Save