diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java index 27c6dd3a02..5a5d8613b7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java @@ -634,14 +634,6 @@ public class ProcessDefinitionController extends BaseController { return returnDataList(result); } - /** - * delete process definition by code - * - * @param loginUser login user - * @param projectCode project code - * @param code process definition code - * @return delete result code - */ @Operation(summary = "deleteByCode", description = "DELETE_PROCESS_DEFINITION_BY_ID_NOTES") @Parameters({ @Parameter(name = "code", description = "PROCESS_DEFINITION_CODE", schema = @Schema(implementation = int.class, example = "100")) @@ -652,8 +644,8 @@ public class ProcessDefinitionController extends BaseController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result deleteProcessDefinitionByCode(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @PathVariable("code") long code) { - processDefinitionService.deleteProcessDefinitionByCode(loginUser, code); + @PathVariable("code") long workflowDefinitionCode) { + processDefinitionService.deleteProcessDefinitionByCode(loginUser, workflowDefinitionCode); return new Result(Status.SUCCESS); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 64df82f8c9..0fc682d403 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -248,14 +248,7 @@ public interface ProcessDefinitionService { long projectCode, String codes); - /** - * delete process definition by code - * - * @param loginUser login user - * @param code process definition code - */ - void deleteProcessDefinitionByCode(User loginUser, - long code); + void deleteProcessDefinitionByCode(User loginUser, long workflowDefinitionCode); /** * release process definition: online / offline diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index 20bea9b600..5b8bab32c0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -221,4 +221,9 @@ public interface ProcessInstanceService { */ List queryByProcessDefineCode(Long processDefinitionCode, int size); + + void deleteProcessInstanceByWorkflowDefinitionCode(long workflowDefinitionCode); + + void deleteProcessInstanceById(int workflowInstanceId); + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java index f5a1f7a082..57502afae8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java @@ -158,4 +158,8 @@ public interface ProcessTaskRelationService { */ Map deleteEdge(User loginUser, long projectCode, long processDefinitionCode, long preTaskCode, long postTaskCode); + + List queryByWorkflowDefinitionCode(long workflowDefinitionCode, int workflowDefinitionVersion); + + void deleteByWorkflowDefinitionCode(long workflowDefinitionCode, int workflowDefinitionVersion); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionLogService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionLogService.java new file mode 100644 index 0000000000..72c5890d9a --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionLogService.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service; + +public interface TaskDefinitionLogService { + + void deleteTaskByWorkflowDefinitionCode(long workflowDefinitionCode); +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java index 8e76925667..00ebbc64e5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java @@ -253,4 +253,6 @@ public interface TaskDefinitionService { long projectCode, long code, ReleaseState releaseState); + + void deleteTaskByWorkflowDefinitionCode(long workflowDefinitionCode, int workflowDefinitionVersion); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java index ed23d3695e..b9d7e91e98 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java @@ -110,4 +110,6 @@ public interface TaskInstanceService { * @return */ TaskInstanceRemoveCacheResponse removeTaskInstanceCache(User loginUser, long projectCode, Integer taskInstanceId); + + void deleteByWorkflowInstanceId(Integer workflowInstanceId); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 5b87f20069..5afc8b32ea 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -58,6 +58,8 @@ import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.service.ProcessInstanceService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.SchedulerService; +import org.apache.dolphinscheduler.api.service.TaskDefinitionLogService; +import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.service.WorkFlowLineageService; import org.apache.dolphinscheduler.api.utils.CheckUtils; import org.apache.dolphinscheduler.api.utils.FileUtils; @@ -111,6 +113,7 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.model.PageListingResult; import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType; @@ -198,6 +201,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro @Autowired private ProcessDefinitionDao processDefinitionDao; + @Autowired + private ProcessDefinitionLogDao processDefinitionLogDao; @Lazy @Autowired private ProcessInstanceService processInstanceService; @@ -223,6 +228,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro @Autowired TaskDefinitionLogMapper taskDefinitionLogMapper; + @Lazy + @Autowired + private TaskDefinitionService taskDefinitionService; + + @Autowired + private TaskDefinitionLogService taskDefinitionLogService; + @Autowired private TaskDefinitionMapper taskDefinitionMapper; @@ -1044,19 +1056,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } } - /** - * delete process definition by code - * - * @param loginUser login user - * @param code process definition code - */ - @Override - @Transactional public void deleteProcessDefinitionByCode(User loginUser, long code) { - ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); - if (processDefinition == null) { - throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code)); - } + ProcessDefinition processDefinition = processDefinitionDao.queryByCode(code) + .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, String.valueOf(code))); Project project = projectMapper.queryByCode(processDefinition.getProjectCode()); // check user access for project @@ -1082,38 +1084,22 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro throw new ServiceException(Status.SCHEDULE_STATE_ONLINE, scheduleObj.getId()); } } - List processTaskRelations = processTaskRelationMapper - .queryByProcessCode(project.getCode(), processDefinition.getCode()); - if (CollectionUtils.isNotEmpty(processTaskRelations)) { - Set taskCodeList = new HashSet<>(processTaskRelations.size() * 2); - for (ProcessTaskRelation processTaskRelation : processTaskRelations) { - if (processTaskRelation.getPreTaskCode() != 0) { - taskCodeList.add(processTaskRelation.getPreTaskCode()); - } - if (processTaskRelation.getPostTaskCode() != 0) { - taskCodeList.add(processTaskRelation.getPostTaskCode()); - } - } - if (CollectionUtils.isNotEmpty(taskCodeList)) { - int i = taskDefinitionMapper.deleteByBatchCodes(new ArrayList<>(taskCodeList)); - if (i != taskCodeList.size()) { - logger.error("Delete task definition error, processDefinitionCode:{}.", code); - throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); - } - } - } - int delete = processDefinitionMapper.deleteById(processDefinition.getId()); - if (delete == 0) { - logger.error("Delete process definition error, processDefinitionCode:{}.", code); - throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR); - } - int deleteRelation = processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()); - if (deleteRelation == 0) { - logger.warn( - "The process definition has not relation, it will be delete successfully, processDefinitionCode:{}.", - code); - } + + // delete workflow instance, will delete workflow instance, sub workflow instance, task instance, alert + processInstanceService.deleteProcessInstanceByWorkflowDefinitionCode(processDefinition.getCode()); + // delete task definition + taskDefinitionService.deleteTaskByWorkflowDefinitionCode(processDefinition.getCode(), + processDefinition.getVersion()); + // delete task definition log + taskDefinitionLogService.deleteTaskByWorkflowDefinitionCode(processDefinition.getCode()); + // delete workflow definition log + processDefinitionLogDao.deleteByWorkflowDefinitionCode(processDefinition.getCode()); deleteOtherRelation(project, new HashMap<>(), processDefinition); + + // we delete the workflow definition at last to avoid using transaction here. + // If delete error, we can call this interface again. + processDefinitionDao.deleteByWorkflowDefinitionCode(processDefinition.getCode()); + logger.info("Success delete workflow definition workflowDefinitionCode: {}", code); } /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 5b8cfb0eeb..0adf53cd89 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.api.service.LoggerService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.service.ProcessInstanceService; import org.apache.dolphinscheduler.api.service.ProjectService; +import org.apache.dolphinscheduler.api.service.TaskInstanceService; import org.apache.dolphinscheduler.api.service.UsersService; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; @@ -71,6 +72,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.utils.WorkflowUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; @@ -102,6 +104,7 @@ import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -133,12 +136,19 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce @Autowired TaskInstanceDao taskInstanceDao; + @Lazy + @Autowired + private TaskInstanceService taskInstanceService; + @Autowired ProcessInstanceMapper processInstanceMapper; @Autowired ProcessInstanceDao processInstanceDao; + @Autowired + private ProcessInstanceMapDao processInstanceMapDao; + @Autowired ProcessDefinitionMapper processDefineMapper; @@ -816,29 +826,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce throw new ServiceException(PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), processInstance.getState(), "delete"); } - - // delete database cascade - int delete = processService.deleteWorkProcessInstanceById(processInstanceId); - - processService.deleteAllSubWorkProcessByParentId(processInstanceId); - processService.deleteWorkProcessMapByParentId(processInstanceId); - // We need to remove the task log file before deleting the task instance - // because the task log file is query from task instance. - // When delete task instance error, the task log file will also be deleted, this may cause data inconsistency. - processService.removeTaskLogFile(processInstanceId); - taskInstanceDao.deleteByWorkflowInstanceId(processInstanceId); - alertDao.deleteByWorkflowInstanceId(processInstanceId); - - if (delete > 0) { - logger.info( - "Delete process instance complete, ProcessDefinitionCode{}, processInstanceId:{}.", - processInstance.getProcessDefinitionCode(), processInstanceId); - } else { - logger.error( - "Delete process instance error, ProcessDefinitionCode{}, processInstanceId:{}.", - processInstance.getProcessDefinitionCode(), processInstanceId); - throw new ServiceException(Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR); - } + deleteProcessInstanceById(processInstanceId); } /** @@ -1032,4 +1020,50 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce public List queryByProcessDefineCode(Long processDefinitionCode, int size) { return processInstanceMapper.queryByProcessDefineCode(processDefinitionCode, size); } + + @Override + public void deleteProcessInstanceByWorkflowDefinitionCode(long workflowDefinitionCode) { + while (true) { + List processInstances = + processInstanceMapper.queryByProcessDefineCode(workflowDefinitionCode, 100); + if (CollectionUtils.isEmpty(processInstances)) { + break; + } + logger.info("Begin to delete workflow instance, workflow definition code: {}", workflowDefinitionCode); + for (ProcessInstance processInstance : processInstances) { + if (!processInstance.getState().isFinished()) { + logger.warn("Workflow instance is not finished cannot delete, process instance id:{}", + processInstance.getId()); + throw new ServiceException(PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), + processInstance.getState(), "delete"); + } + deleteProcessInstanceById(processInstance.getId()); + } + logger.info("Success delete workflow instance, workflow definition code: {}, size: {}", + workflowDefinitionCode, processInstances.size()); + } + } + + @Override + public void deleteProcessInstanceById(int workflowInstanceId) { + // delete task instance + taskInstanceService.deleteByWorkflowInstanceId(workflowInstanceId); + // delete sub process instances + deleteSubWorkflowInstanceIfNeeded(workflowInstanceId); + // delete alert + alertDao.deleteByWorkflowInstanceId(workflowInstanceId); + // delete process instance + processInstanceDao.deleteById(workflowInstanceId); + } + + private void deleteSubWorkflowInstanceIfNeeded(int workflowInstanceId) { + List subWorkflowInstanceIds = processInstanceMapDao.querySubWorkflowInstanceIds(workflowInstanceId); + if (org.apache.commons.collections4.CollectionUtils.isEmpty(subWorkflowInstanceIds)) { + return; + } + for (Integer subWorkflowInstanceId : subWorkflowInstanceIds) { + deleteProcessInstanceById(subWorkflowInstanceId); + } + processInstanceMapDao.deleteByParentId(workflowInstanceId); + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java index 4feff61e4a..dbbcedb465 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java @@ -913,6 +913,18 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P return result; } + @Override + public List queryByWorkflowDefinitionCode(long workflowDefinitionCode, + int workflowDefinitionVersion) { + return processTaskRelationMapper.queryProcessTaskRelationsByProcessDefinitionCode(workflowDefinitionCode, + workflowDefinitionVersion); + } + + @Override + public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode, int workflowDefinitionVersion) { + processTaskRelationMapper.deleteByWorkflowDefinitionCode(workflowDefinitionCode, workflowDefinitionVersion); + } + /** * build task definition * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java index 3ef1d18b1f..f1821fcb63 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java @@ -856,6 +856,11 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe @Override public Map queryResourceList(User loginUser, ResourceType type, String fullName) { Map result = new HashMap<>(); + if (storageOperate == null) { + result.put(Constants.DATA_LIST, Collections.emptyList()); + result.put(Constants.STATUS, Status.SUCCESS); + return result; + } User user = userMapper.selectById(loginUser.getId()); if (user == null) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionLogServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionLogServiceImpl.java new file mode 100644 index 0000000000..44c57c07f0 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionLogServiceImpl.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import org.apache.dolphinscheduler.api.service.TaskDefinitionLogService; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; +import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class TaskDefinitionLogServiceImpl implements TaskDefinitionLogService { + + @Autowired + private ProcessTaskRelationLogDao processTaskRelationLogDao; + + @Autowired + private TaskDefinitionLogDao taskDefinitionLogDao; + + @Override + public void deleteTaskByWorkflowDefinitionCode(long workflowDefinitionCode) { + List processTaskRelations = + processTaskRelationLogDao.findByWorkflowDefinitionCode(workflowDefinitionCode); + if (CollectionUtils.isEmpty(processTaskRelations)) { + return; + } + // delete task definition + Set needToDeleteTaskDefinitionCodes = new HashSet<>(); + for (ProcessTaskRelation processTaskRelation : processTaskRelations) { + needToDeleteTaskDefinitionCodes.add(processTaskRelation.getPreTaskCode()); + needToDeleteTaskDefinitionCodes.add(processTaskRelation.getPostTaskCode()); + } + taskDefinitionLogDao.deleteByTaskDefinitionCodes(needToDeleteTaskDefinitionCodes); + // delete task workflow relation + processTaskRelationLogDao.deleteByWorkflowDefinitionCode(workflowDefinitionCode); + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 6e10493200..7dd38bba05 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -63,6 +63,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -77,6 +78,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -111,6 +113,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Autowired private TaskDefinitionMapper taskDefinitionMapper; + @Autowired + private TaskDefinitionDao taskDefinitionDao; + @Autowired private TaskDefinitionLogMapper taskDefinitionLogMapper; @@ -1220,4 +1225,22 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.SUCCESS); return result; } + + @Override + public void deleteTaskByWorkflowDefinitionCode(long workflowDefinitionCode, int workflowDefinitionVersion) { + List processTaskRelations = processTaskRelationService + .queryByWorkflowDefinitionCode(workflowDefinitionCode, workflowDefinitionVersion); + if (CollectionUtils.isEmpty(processTaskRelations)) { + return; + } + // delete task definition + Set needToDeleteTaskDefinitionCodes = new HashSet<>(); + for (ProcessTaskRelation processTaskRelation : processTaskRelations) { + needToDeleteTaskDefinitionCodes.add(processTaskRelation.getPreTaskCode()); + needToDeleteTaskDefinitionCodes.add(processTaskRelation.getPostTaskCode()); + } + taskDefinitionDao.deleteByTaskDefinitionCodes(needToDeleteTaskDefinitionCodes); + // delete task workflow relation + processTaskRelationService.deleteByWorkflowDefinitionCode(workflowDefinitionCode, workflowDefinitionVersion); + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index d0bee2ae39..d8e8de5b90 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -23,7 +23,6 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse; import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.service.ProcessInstanceService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.TaskInstanceService; import org.apache.dolphinscheduler.api.service.UsersService; @@ -40,6 +39,7 @@ import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; +import org.apache.dolphinscheduler.dao.repository.DqExecuteResultDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; @@ -47,6 +47,7 @@ import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.command.TaskSavePointRequestCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.service.log.LogClient; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.lang3.StringUtils; @@ -91,9 +92,6 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst @Autowired TaskInstanceDao taskInstanceDao; - @Autowired - ProcessInstanceService processInstanceService; - @Autowired UsersService usersService; @@ -103,6 +101,12 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst @Autowired private StateEventCallbackService stateEventCallbackService; + @Autowired + private LogClient logClient; + + @Autowired + private DqExecuteResultDao dqExecuteResultDao; + /** * query task list by project, process instance, task name, task start time, task end time, task status, keyword paging * @@ -361,4 +365,27 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst return new TaskInstanceRemoveCacheResponse(result, cacheKey); } + @Override + public void deleteByWorkflowInstanceId(Integer workflowInstanceId) { + List needToDeleteTaskInstances = + taskInstanceDao.findTaskInstanceByWorkflowInstanceId(workflowInstanceId); + if (org.apache.commons.collections4.CollectionUtils.isEmpty(needToDeleteTaskInstances)) { + return; + } + for (TaskInstance taskInstance : needToDeleteTaskInstances) { + // delete log + if (StringUtils.isNotEmpty(taskInstance.getLogPath())) { + try { + logClient.removeTaskLog(Host.of(taskInstance.getHost()), taskInstance.getLogPath()); + } catch (Exception e) { + logger.error( + "Remove task log error, meet an unknown exception, taskInstanceId: {}, host: {}, logPath: {}", + taskInstance.getId(), taskInstance.getHost(), taskInstance.getLogPath(), e); + } + } + } + dqExecuteResultDao.deleteByWorkflowInstanceId(workflowInstanceId); + taskInstanceDao.deleteByWorkflowInstanceId(workflowInstanceId); + } + } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 11407c5979..1d59ba8c58 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -71,6 +71,7 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.model.PageListingResult; import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -88,6 +89,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.zip.ZipEntry; @@ -173,6 +175,15 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { @Mock private WorkFlowLineageService workFlowLineageService; + @Mock + private TaskDefinitionService taskDefinitionService; + + @Mock + private TaskDefinitionLogService taskDefinitionLogService; + + @Mock + private ProcessDefinitionLogDao processDefinitionLogDao; + @Mock private UserMapper userMapper; @@ -476,7 +487,7 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode()); // project check auth fail - Mockito.when(processDefinitionMapper.queryByCode(6L)).thenReturn(this.getProcessDefinition()); + Mockito.when(processDefinitionDao.queryByCode(6L)).thenReturn(Optional.of(getProcessDefinition())); Mockito.doThrow(new ServiceException(Status.PROJECT_NOT_FOUND)).when(projectService) .checkProjectAndAuthThrowException(user, project, WORKFLOW_DEFINITION_DELETE); exception = Assertions.assertThrows(ServiceException.class, @@ -486,14 +497,14 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { // project check auth success, instance not exist Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, project, WORKFLOW_DEFINITION_DELETE); - Mockito.when(processDefinitionMapper.queryByCode(1L)).thenReturn(null); + Mockito.when(processDefinitionDao.queryByCode(1L)).thenReturn(Optional.empty()); exception = Assertions.assertThrows(ServiceException.class, () -> processDefinitionService.deleteProcessDefinitionByCode(user, 1L)); Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode()); ProcessDefinition processDefinition = getProcessDefinition(); // user no auth - Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition); + Mockito.when(processDefinitionDao.queryByCode(46L)).thenReturn(Optional.of(processDefinition)); exception = Assertions.assertThrows(ServiceException.class, () -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L)); Assertions.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(), ((ServiceException) exception).getCode()); @@ -501,19 +512,16 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { // process definition online user.setUserType(UserType.ADMIN_USER); processDefinition.setReleaseState(ReleaseState.ONLINE); - Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition); + Mockito.when(processDefinitionDao.queryByCode(46L)).thenReturn(Optional.of(processDefinition)); exception = Assertions.assertThrows(ServiceException.class, () -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L)); Assertions.assertEquals(Status.PROCESS_DEFINE_STATE_ONLINE.getCode(), ((ServiceException) exception).getCode()); // scheduler list elements > 1 processDefinition.setReleaseState(ReleaseState.OFFLINE); - Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition); + Mockito.when(processDefinitionDao.queryByCode(46L)).thenReturn(Optional.of(processDefinition)); Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule()); Mockito.when(scheduleMapper.deleteById(46)).thenReturn(1); - Mockito.when(processDefinitionMapper.deleteById(processDefinition.getId())).thenReturn(1); - Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())) - .thenReturn(1); Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode())) .thenReturn(Collections.emptySet()); processDefinitionService.deleteProcessDefinitionByCode(user, 46L); @@ -538,24 +546,12 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { // delete success schedule.setReleaseState(ReleaseState.OFFLINE); - Mockito.when(processTaskRelationMapper.queryByProcessCode(1, 11)) - .thenReturn(getProcessTaskRelation()); - Mockito.when(taskDefinitionMapper.deleteByBatchCodes(Arrays.asList(100L, 200L))).thenReturn(2); - Mockito.when(processDefinitionMapper.deleteById(46)).thenReturn(1); - Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1); - Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())) - .thenReturn(1); Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule()); + Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1); Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode())) .thenReturn(Collections.emptySet()); Assertions.assertDoesNotThrow(() -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L)); - // delete fail - Mockito.when(taskDefinitionMapper.deleteByBatchCodes(Arrays.asList(100L, 200L))).thenReturn(1); - exception = Assertions.assertThrows(ServiceException.class, - () -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L)); - Assertions.assertEquals(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR.getCode(), - ((ServiceException) exception).getCode()); } @Test @@ -584,7 +580,7 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { definitionCodes = Lists.newArrayList(singleCodes.split(Constants.COMMA)).stream().map(Long::parseLong) .collect(Collectors.toSet()); Mockito.when(processDefinitionMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList); - Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(process); + Mockito.when(processDefinitionDao.queryByCode(processDefinitionCode)).thenReturn(Optional.of(process)); // process definition online user.setUserType(UserType.ADMIN_USER); @@ -600,10 +596,7 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { // delete success process.setReleaseState(ReleaseState.OFFLINE); - Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(process); - Mockito.when(processDefinitionMapper.deleteById(process.getId())).thenReturn(1); - Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), process.getCode())) - .thenReturn(1); + Mockito.when(processDefinitionDao.queryByCode(processDefinitionCode)).thenReturn(Optional.of(process)); Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), process.getCode())) .thenReturn(Collections.emptySet()); putMsg(result, Status.SUCCESS, projectCode); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index ba62a9c78e..61d94c5067 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -58,6 +58,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; @@ -150,6 +151,12 @@ public class ProcessInstanceServiceTest { @Mock AlertDao alertDao; + @Mock + private TaskInstanceService taskInstanceService; + + @Mock + private ProcessInstanceMapDao processInstanceMapDao; + private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789," + "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789," + "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]"; @@ -682,8 +689,7 @@ public class ProcessInstanceServiceTest { processInstanceService.deleteProcessInstanceById(loginUser, 1); when(processService.deleteWorkProcessInstanceById(1)).thenReturn(0); - Assertions.assertThrows(ServiceException.class, - () -> processInstanceService.deleteProcessInstanceById(loginUser, 1)); + Assertions.assertDoesNotThrow(() -> processInstanceService.deleteProcessInstanceById(loginUser, 1)); } @Test diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java index 58bf333f2e..a094647861 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java @@ -338,6 +338,15 @@ public class AlertDao { if (processInstanceId == null) { return; } + List alertList = alertMapper.selectByWorkflowInstanceId(processInstanceId); + if (CollectionUtils.isEmpty(alertList)) { + return; + } alertMapper.deleteByWorkflowInstanceId(processInstanceId); + List alertIds = alertList + .stream() + .map(Alert::getId) + .collect(Collectors.toList()); + alertSendStatusMapper.deleteByAlertIds(alertIds); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java index d96101101e..c30c1c9043 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java @@ -46,4 +46,6 @@ public interface AlertMapper extends BaseMapper { @Param("crashAlarmSuppressionStartTime") Date crashAlarmSuppressionStartTime); void deleteByWorkflowInstanceId(@Param("workflowInstanceId") Integer processInstanceId); + + List selectByWorkflowInstanceId(@Param("workflowInstanceId") Integer processInstanceId); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.java index b1a5fea153..3377e3d682 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.dao.mapper; import org.apache.dolphinscheduler.dao.entity.AlertSendStatus; +import org.apache.ibatis.annotations.Param; + import java.util.List; import com.baomidou.mybatisplus.core.mapper.BaseMapper; @@ -26,4 +28,6 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper; public interface AlertSendStatusMapper extends BaseMapper { int batchInsert(List alertSendStatuses); + + void deleteByAlertIds(@Param("alertIds") List alertIds); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DqExecuteResultMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DqExecuteResultMapper.java index c522019edf..d320f4bd0d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DqExecuteResultMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DqExecuteResultMapper.java @@ -57,4 +57,6 @@ public interface DqExecuteResultMapper extends BaseMapper { * @return DqExecuteResult */ DqExecuteResult getExecuteResultById(@Param("taskInstanceId") int taskInstanceId); + + void deleteByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java index ba2d6cb9e3..3886b27f5c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java @@ -94,4 +94,6 @@ public interface ProcessDefinitionLogMapper extends BaseMapper */ List querySubIdListByParentId(@Param("parentInstanceId") int parentInstanceId); + void deleteByParentId(@Param("parentWorkflowInstanceId") int workflowInstanceId); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java index e4ffa49955..3e73fd1f56 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java @@ -74,4 +74,8 @@ public interface ProcessTaskRelationLogMapper extends BaseMapper queryByProcessCode(@Param("workflowDefinitionCode") long workflowDefinitionCode); + + void deleteByWorkflowDefinitionCode(@Param("workflowDefinitionCode") long workflowDefinitionCode); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java index b2592fa72a..9b8dfc8736 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java @@ -222,4 +222,7 @@ public interface ProcessTaskRelationMapper extends BaseMapper { */ IPage queryTaskDefinitionVersionsPaging(Page page, @Param("code") long code, @Param("projectCode") long projectCode); + + void deleteByTaskDefinitionCodes(@Param("taskDefinitionCodes") Set taskDefinitionCodes); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java index 4539256e02..105cf04700 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java @@ -156,4 +156,6 @@ public interface TaskDefinitionMapper extends BaseMapper { * @return deleted row count */ int deleteByBatchCodes(@Param("taskCodeList") List taskCodeList); + + void deleteByWorkflowDefinitionCodeAndVersion(long workflowDefinitionCode, int workflowDefinitionVersion); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/DqExecuteResultDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/DqExecuteResultDao.java new file mode 100644 index 0000000000..843548c866 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/DqExecuteResultDao.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository; + +public interface DqExecuteResultDao { + + void deleteByWorkflowInstanceId(Integer workflowInstanceId); +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java index e89adf3367..8e009f6bfb 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.model.PageListingResult; import java.util.List; +import java.util.Optional; import javax.annotation.Nullable; @@ -46,4 +47,10 @@ public interface ProcessDefinitionDao { * @return */ List queryProcessDefinitionsByCodesAndVersions(List processInstances); + + Optional queryByCode(long code); + + void deleteById(Integer workflowDefinitionId); + + void deleteByWorkflowDefinitionCode(long workflowDefinitionCode); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java new file mode 100644 index 0000000000..7ba19d6892 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository; + +public interface ProcessDefinitionLogDao { + + void deleteByWorkflowDefinitionCode(long workflowDefinitionCode); +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java index 08eba0a4cc..f9002c823d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.dao.repository; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import java.util.List; + public interface ProcessInstanceDao { public int insertProcessInstance(ProcessInstance processInstance); @@ -31,4 +33,9 @@ public interface ProcessInstanceDao { * @param processInstance processInstance */ public int upsertProcessInstance(ProcessInstance processInstance); + + void deleteByIds(List needToDeleteWorkflowInstanceIds); + + void deleteById(Integer workflowInstanceId); + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java index 5465c6d40d..7f74eaf313 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.dao.repository; import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; +import java.util.List; + /** * Process Instance Map DAO */ @@ -46,4 +48,7 @@ public interface ProcessInstanceMapDao { */ ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId); + List querySubWorkflowInstanceIds(int workflowInstanceId); + + void deleteByParentId(int workflowInstanceId); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java new file mode 100644 index 0000000000..1c1ba25783 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository; + +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; + +import java.util.List; + +public interface ProcessTaskRelationLogDao { + + List findByWorkflowDefinitionCode(long workflowDefinitionCode); + + void deleteByWorkflowDefinitionCode(long workflowDefinitionCode); +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java index 9aaed9e576..4dce211eb8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.dao.repository; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import java.util.List; +import java.util.Set; /** * Task Instance DAO @@ -41,4 +42,7 @@ public interface TaskDefinitionDao { */ TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion); + void deleteByWorkflowDefinitionCodeAndVersion(long workflowDefinitionCode, int workflowDefinitionVersion); + + void deleteByTaskDefinitionCodes(Set needToDeleteTaskDefinitionCodes); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java index 8b6f290f49..b942196c32 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import java.util.List; +import java.util.Set; /** * Task Definition Log DAO @@ -41,4 +42,5 @@ public interface TaskDefinitionLogDao { */ List getTaskDefineLogListByRelation(List processTaskRelations); + void deleteByTaskDefinitionCodes(Set taskDefinitionCodes); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/DqExecuteResultDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/DqExecuteResultDaoImpl.java new file mode 100644 index 0000000000..f496d92f88 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/DqExecuteResultDaoImpl.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.dolphinscheduler.dao.mapper.DqExecuteResultMapper; +import org.apache.dolphinscheduler.dao.repository.DqExecuteResultDao; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; + +@Repository +public class DqExecuteResultDaoImpl implements DqExecuteResultDao { + + @Autowired + private DqExecuteResultMapper dqExecuteResultMapper; + + @Override + public void deleteByWorkflowInstanceId(Integer workflowInstanceId) { + dqExecuteResultMapper.deleteByWorkflowInstanceId(workflowInstanceId); + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java index 8a459991ea..d7398453a0 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; @@ -79,4 +80,20 @@ public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao { return processDefinitions; } + + @Override + public Optional queryByCode(long code) { + return Optional.ofNullable( + processDefinitionMapper.queryByCode(code)); + } + + @Override + public void deleteById(Integer workflowDefinitionId) { + processDefinitionMapper.deleteById(workflowDefinitionId); + } + + @Override + public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) { + processDefinitionMapper.deleteByCode(workflowDefinitionCode); + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java new file mode 100644 index 0000000000..47a1747c4f --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; + +@Repository +public class ProcessDefinitionLogDaoImpl implements ProcessDefinitionLogDao { + + @Autowired + private ProcessDefinitionLogMapper processDefinitionLogMapper; + + @Override + public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) { + processDefinitionLogMapper.deleteByProcessDefinitionCode(workflowDefinitionCode); + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java index 51f993bfae..2832e0a3f4 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java @@ -21,6 +21,10 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.commons.collections4.CollectionUtils; + +import java.util.List; + import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -52,4 +56,17 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao { return insertProcessInstance(processInstance); } } + + @Override + public void deleteByIds(List needToDeleteWorkflowInstanceIds) { + if (CollectionUtils.isEmpty(needToDeleteWorkflowInstanceIds)) { + return; + } + processInstanceMapper.deleteBatchIds(needToDeleteWorkflowInstanceIds); + } + + @Override + public void deleteById(Integer workflowInstanceId) { + processInstanceMapper.deleteById(workflowInstanceId); + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java index 94dbc075bd..7d30c81ded 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java @@ -21,6 +21,8 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao; +import java.util.List; + import lombok.NonNull; import org.springframework.beans.factory.annotation.Autowired; @@ -49,4 +51,14 @@ public class ProcessInstanceMapDaoImpl implements ProcessInstanceMapDao { public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) { return processInstanceMapMapper.queryByParentId(parentWorkProcessId, parentTaskId); } + + @Override + public List querySubWorkflowInstanceIds(int workflowInstanceId) { + return processInstanceMapMapper.querySubIdListByParentId(workflowInstanceId); + } + + @Override + public void deleteByParentId(int workflowInstanceId) { + processInstanceMapMapper.deleteByParentId(workflowInstanceId); + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java new file mode 100644 index 0000000000..d64f67ad7d --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao; + +import java.util.List; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; + +@Repository +public class ProcessTaskRelationLogDaoImpl implements ProcessTaskRelationLogDao { + + @Autowired + private ProcessTaskRelationLogMapper processTaskRelationLogMapper; + + @Override + public List findByWorkflowDefinitionCode(long workflowDefinitionCode) { + return processTaskRelationLogMapper.queryByProcessCode(workflowDefinitionCode); + } + + @Override + public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) { + processTaskRelationLogMapper.deleteByWorkflowDefinitionCode(workflowDefinitionCode); + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java index 33b6033454..06b960eb2f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java @@ -27,6 +27,9 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; +import org.apache.commons.collections4.CollectionUtils; + +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -84,4 +87,18 @@ public class TaskDefinitionDaoImpl implements TaskDefinitionDao { return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion); } + @Override + public void deleteByWorkflowDefinitionCodeAndVersion(long workflowDefinitionCode, int workflowDefinitionVersion) { + taskDefinitionMapper.deleteByWorkflowDefinitionCodeAndVersion(workflowDefinitionCode, + workflowDefinitionVersion); + } + + @Override + public void deleteByTaskDefinitionCodes(Set needToDeleteTaskDefinitionCodes) { + if (CollectionUtils.isEmpty(needToDeleteTaskDefinitionCodes)) { + return; + } + taskDefinitionMapper.deleteByBatchCodes(new ArrayList<>(needToDeleteTaskDefinitionCodes)); + } + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java index efd000a410..8daef54308 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java @@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; +import org.apache.commons.collections4.CollectionUtils; + import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -88,4 +90,12 @@ public class TaskDefinitionLogDaoImpl implements TaskDefinitionLogDao { }); return taskDefinitionLogs; } + + @Override + public void deleteByTaskDefinitionCodes(Set taskDefinitionCodes) { + if (CollectionUtils.isEmpty(taskDefinitionCodes)) { + return; + } + taskDefinitionLogMapper.deleteByTaskDefinitionCodes(taskDefinitionCodes); + } } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml index a9054986d3..e56afa830e 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml @@ -24,6 +24,12 @@ process_definition_code, process_instance_id, alert_type + insert into t_ds_alert(sign, title, content, alert_status, warning_type, log, alertgroup_id, create_time, diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.xml index 248d7b0573..f27c55d70b 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.xml @@ -32,4 +32,13 @@ + + delete + from t_ds_alert_send_status + where alert_id in + + #{alertId} + + + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DqExecuteResultMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DqExecuteResultMapper.xml index 5ca517e616..b19da7e0b3 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DqExecuteResultMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DqExecuteResultMapper.xml @@ -89,17 +89,23 @@ a.user_id, a.comparison_type, a.error_output_path, - b.name as process_definition_name, - e.name as process_instance_name, - c.name as task_name, + b.name as process_definition_name, + e.name as process_instance_name, + c.name as task_name, cp.type as comparison_type_name, d.user_name FROM t_ds_dq_execute_result a - left join t_ds_process_definition b on a.process_definition_id = b.id - left join t_ds_task_instance c on a.task_instance_id = c.id - left join t_ds_process_instance e on a.process_instance_id = e.id - left join t_ds_user d on d.id = a.user_id - left join t_ds_dq_comparison_type cp on cp.id = a.comparison_type - where task_instance_id = #{taskInstanceId} + left join t_ds_process_definition b on a.process_definition_id = b.id + left join t_ds_task_instance c on a.task_instance_id = c.id + left join t_ds_process_instance e on a.process_instance_id = e.id + left join t_ds_user d on d.id = a.user_id + left join t_ds_dq_comparison_type cp on cp.id = a.comparison_type + where task_instance_id = #{taskInstanceId} + + + delete + from t_ds_dq_execute_result + where process_instance_id = #{workflowInstanceId} + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml index 75a35ad1a3..9d93e858a7 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml @@ -82,4 +82,10 @@ where code = #{code} and version = #{version} + + + delete + from t_ds_process_definition_log + where code = #{workflowDefinitionCode} + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml index 249fb8669f..2aec7739b9 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml @@ -44,4 +44,10 @@ from t_ds_relation_process_instance where parent_process_instance_id = #{parentInstanceId} + + + delete + from t_ds_relation_process_instance + where parent_process_instance_id = #{parentWorkflowInstanceId} + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml index c3c0579d76..6fae94d3a8 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml @@ -70,4 +70,16 @@ and post_task_code = #{processTaskRelation.postTaskCode} and post_task_version = #{processTaskRelation.postTaskVersion} + + + + delete + from t_ds_process_task_relation_log + WHERE process_definition_code = #{workflowDefinitionCode} + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml index c1ad2fac34..2da8514c55 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml @@ -224,16 +224,20 @@ - update t_ds_process_task_relation - set - pre_task_version=#{processTaskRelation.preTaskVersion}, - post_task_version=#{processTaskRelation.postTaskVersion} - where - id = #{processTaskRelation.id} + update t_ds_process_task_relation + set pre_task_version=#{processTaskRelation.preTaskVersion}, + post_task_version=#{processTaskRelation.postTaskVersion} + where id = #{processTaskRelation.id} + + + delete + from t_ds_process_task_relation + where process_definition_code = #{workflowDefinitionCode} and process_definition_version = #{workflowDefinitionVersion} + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml index 756213a336..14c71f83c5 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml @@ -60,7 +60,8 @@ #{taskDefinitionLog.flag},#{taskDefinitionLog.isCache},#{taskDefinitionLog.taskPriority},#{taskDefinitionLog.workerGroup},#{taskDefinitionLog.environmentCode}, #{taskDefinitionLog.failRetryTimes},#{taskDefinitionLog.failRetryInterval},#{taskDefinitionLog.timeoutFlag},#{taskDefinitionLog.timeoutNotifyStrategy}, #{taskDefinitionLog.timeout},#{taskDefinitionLog.delayTime},#{taskDefinitionLog.resourceIds},#{taskDefinitionLog.operator},#{taskDefinitionLog.operateTime}, - #{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime}, #{taskDefinitionLog.taskGroupId},#{taskDefinitionLog.taskGroupPriority}, + #{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime}, + #{taskDefinitionLog.taskGroupId},#{taskDefinitionLog.taskGroupPriority}, #{taskDefinitionLog.cpuQuota},#{taskDefinitionLog.memoryMax},#{taskDefinitionLog.taskExecuteType}) @@ -70,7 +71,8 @@ where code = #{code} and version = #{version} - select from t_ds_task_definition_log @@ -80,4 +82,12 @@ order by version desc + + delete + from t_ds_task_definition_log + where code in + + #{taskDefinitionCode} + +