Browse Source

Delete workflow will delete workflow instance (#13336)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
a074f7e2e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  2. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  3. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  4. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
  5. 23
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionLogService.java
  6. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
  7. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
  8. 72
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  9. 80
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  10. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
  11. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  12. 61
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionLogServiceImpl.java
  13. 23
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  14. 35
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
  15. 45
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  16. 10
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  17. 9
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
  18. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java
  19. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.java
  20. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DqExecuteResultMapper.java
  21. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
  22. 1
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.java
  23. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
  24. 3
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
  25. 3
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
  26. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
  27. 23
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/DqExecuteResultDao.java
  28. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
  29. 23
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
  30. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
  31. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java
  32. 29
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java
  33. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java
  34. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java
  35. 36
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/DqExecuteResultDaoImpl.java
  36. 17
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
  37. 36
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
  38. 17
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
  39. 12
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java
  40. 44
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java
  41. 17
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java
  42. 10
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java
  43. 6
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml
  44. 9
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.xml
  45. 24
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DqExecuteResultMapper.xml
  46. 6
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
  47. 6
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml
  48. 12
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
  49. 20
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
  50. 14
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -634,14 +634,6 @@ public class ProcessDefinitionController extends BaseController {
return returnDataList(result);
}
/**
* delete process definition by code
*
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @return delete result code
*/
@Operation(summary = "deleteByCode", description = "DELETE_PROCESS_DEFINITION_BY_ID_NOTES")
@Parameters({
@Parameter(name = "code", description = "PROCESS_DEFINITION_CODE", schema = @Schema(implementation = int.class, example = "100"))
@ -652,8 +644,8 @@ public class ProcessDefinitionController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result deleteProcessDefinitionByCode(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable("code") long code) {
processDefinitionService.deleteProcessDefinitionByCode(loginUser, code);
@PathVariable("code") long workflowDefinitionCode) {
processDefinitionService.deleteProcessDefinitionByCode(loginUser, workflowDefinitionCode);
return new Result(Status.SUCCESS);
}

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -248,14 +248,7 @@ public interface ProcessDefinitionService {
long projectCode,
String codes);
/**
* delete process definition by code
*
* @param loginUser login user
* @param code process definition code
*/
void deleteProcessDefinitionByCode(User loginUser,
long code);
void deleteProcessDefinitionByCode(User loginUser, long workflowDefinitionCode);
/**
* release process definition: online / offline

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -221,4 +221,9 @@ public interface ProcessInstanceService {
*/
List<ProcessInstance> queryByProcessDefineCode(Long processDefinitionCode,
int size);
void deleteProcessInstanceByWorkflowDefinitionCode(long workflowDefinitionCode);
void deleteProcessInstanceById(int workflowInstanceId);
}

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java

@ -158,4 +158,8 @@ public interface ProcessTaskRelationService {
*/
Map<String, Object> deleteEdge(User loginUser, long projectCode, long processDefinitionCode, long preTaskCode,
long postTaskCode);
List<ProcessTaskRelation> queryByWorkflowDefinitionCode(long workflowDefinitionCode, int workflowDefinitionVersion);
void deleteByWorkflowDefinitionCode(long workflowDefinitionCode, int workflowDefinitionVersion);
}

23
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionLogService.java

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service;
public interface TaskDefinitionLogService {
void deleteTaskByWorkflowDefinitionCode(long workflowDefinitionCode);
}

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java

@ -253,4 +253,6 @@ public interface TaskDefinitionService {
long projectCode,
long code,
ReleaseState releaseState);
void deleteTaskByWorkflowDefinitionCode(long workflowDefinitionCode, int workflowDefinitionVersion);
}

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java

@ -110,4 +110,6 @@ public interface TaskInstanceService {
* @return
*/
TaskInstanceRemoveCacheResponse removeTaskInstanceCache(User loginUser, long projectCode, Integer taskInstanceId);
void deleteByWorkflowInstanceId(Integer workflowInstanceId);
}

72
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -58,6 +58,8 @@ import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.SchedulerService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionLogService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.service.WorkFlowLineageService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.FileUtils;
@ -111,6 +113,7 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType;
@ -198,6 +201,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired
private ProcessDefinitionDao processDefinitionDao;
@Autowired
private ProcessDefinitionLogDao processDefinitionLogDao;
@Lazy
@Autowired
private ProcessInstanceService processInstanceService;
@ -223,6 +228,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired
TaskDefinitionLogMapper taskDefinitionLogMapper;
@Lazy
@Autowired
private TaskDefinitionService taskDefinitionService;
@Autowired
private TaskDefinitionLogService taskDefinitionLogService;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
@ -1044,19 +1056,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
}
/**
* delete process definition by code
*
* @param loginUser login user
* @param code process definition code
*/
@Override
@Transactional
public void deleteProcessDefinitionByCode(User loginUser, long code) {
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null) {
throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
}
ProcessDefinition processDefinition = processDefinitionDao.queryByCode(code)
.orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, String.valueOf(code)));
Project project = projectMapper.queryByCode(processDefinition.getProjectCode());
// check user access for project
@ -1082,38 +1084,22 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
throw new ServiceException(Status.SCHEDULE_STATE_ONLINE, scheduleObj.getId());
}
}
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper
.queryByProcessCode(project.getCode(), processDefinition.getCode());
if (CollectionUtils.isNotEmpty(processTaskRelations)) {
Set<Long> taskCodeList = new HashSet<>(processTaskRelations.size() * 2);
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() != 0) {
taskCodeList.add(processTaskRelation.getPreTaskCode());
}
if (processTaskRelation.getPostTaskCode() != 0) {
taskCodeList.add(processTaskRelation.getPostTaskCode());
}
}
if (CollectionUtils.isNotEmpty(taskCodeList)) {
int i = taskDefinitionMapper.deleteByBatchCodes(new ArrayList<>(taskCodeList));
if (i != taskCodeList.size()) {
logger.error("Delete task definition error, processDefinitionCode:{}.", code);
throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
}
}
}
int delete = processDefinitionMapper.deleteById(processDefinition.getId());
if (delete == 0) {
logger.error("Delete process definition error, processDefinitionCode:{}.", code);
throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
}
int deleteRelation = processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode());
if (deleteRelation == 0) {
logger.warn(
"The process definition has not relation, it will be delete successfully, processDefinitionCode:{}.",
code);
}
// delete workflow instance, will delete workflow instance, sub workflow instance, task instance, alert
processInstanceService.deleteProcessInstanceByWorkflowDefinitionCode(processDefinition.getCode());
// delete task definition
taskDefinitionService.deleteTaskByWorkflowDefinitionCode(processDefinition.getCode(),
processDefinition.getVersion());
// delete task definition log
taskDefinitionLogService.deleteTaskByWorkflowDefinitionCode(processDefinition.getCode());
// delete workflow definition log
processDefinitionLogDao.deleteByWorkflowDefinitionCode(processDefinition.getCode());
deleteOtherRelation(project, new HashMap<>(), processDefinition);
// we delete the workflow definition at last to avoid using transaction here.
// If delete error, we can call this interface again.
processDefinitionDao.deleteByWorkflowDefinitionCode(processDefinition.getCode());
logger.info("Success delete workflow definition workflowDefinitionCode: {}", code);
}
/**

80
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.api.service.LoggerService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
@ -71,6 +72,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.WorkflowUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
@ -102,6 +104,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -133,12 +136,19 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
@Autowired
TaskInstanceDao taskInstanceDao;
@Lazy
@Autowired
private TaskInstanceService taskInstanceService;
@Autowired
ProcessInstanceMapper processInstanceMapper;
@Autowired
ProcessInstanceDao processInstanceDao;
@Autowired
private ProcessInstanceMapDao processInstanceMapDao;
@Autowired
ProcessDefinitionMapper processDefineMapper;
@ -816,29 +826,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
throw new ServiceException(PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(),
processInstance.getState(), "delete");
}
// delete database cascade
int delete = processService.deleteWorkProcessInstanceById(processInstanceId);
processService.deleteAllSubWorkProcessByParentId(processInstanceId);
processService.deleteWorkProcessMapByParentId(processInstanceId);
// We need to remove the task log file before deleting the task instance
// because the task log file is query from task instance.
// When delete task instance error, the task log file will also be deleted, this may cause data inconsistency.
processService.removeTaskLogFile(processInstanceId);
taskInstanceDao.deleteByWorkflowInstanceId(processInstanceId);
alertDao.deleteByWorkflowInstanceId(processInstanceId);
if (delete > 0) {
logger.info(
"Delete process instance complete, ProcessDefinitionCode{}, processInstanceId:{}.",
processInstance.getProcessDefinitionCode(), processInstanceId);
} else {
logger.error(
"Delete process instance error, ProcessDefinitionCode{}, processInstanceId:{}.",
processInstance.getProcessDefinitionCode(), processInstanceId);
throw new ServiceException(Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR);
}
deleteProcessInstanceById(processInstanceId);
}
/**
@ -1032,4 +1020,50 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
public List<ProcessInstance> queryByProcessDefineCode(Long processDefinitionCode, int size) {
return processInstanceMapper.queryByProcessDefineCode(processDefinitionCode, size);
}
@Override
public void deleteProcessInstanceByWorkflowDefinitionCode(long workflowDefinitionCode) {
while (true) {
List<ProcessInstance> processInstances =
processInstanceMapper.queryByProcessDefineCode(workflowDefinitionCode, 100);
if (CollectionUtils.isEmpty(processInstances)) {
break;
}
logger.info("Begin to delete workflow instance, workflow definition code: {}", workflowDefinitionCode);
for (ProcessInstance processInstance : processInstances) {
if (!processInstance.getState().isFinished()) {
logger.warn("Workflow instance is not finished cannot delete, process instance id:{}",
processInstance.getId());
throw new ServiceException(PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(),
processInstance.getState(), "delete");
}
deleteProcessInstanceById(processInstance.getId());
}
logger.info("Success delete workflow instance, workflow definition code: {}, size: {}",
workflowDefinitionCode, processInstances.size());
}
}
@Override
public void deleteProcessInstanceById(int workflowInstanceId) {
// delete task instance
taskInstanceService.deleteByWorkflowInstanceId(workflowInstanceId);
// delete sub process instances
deleteSubWorkflowInstanceIfNeeded(workflowInstanceId);
// delete alert
alertDao.deleteByWorkflowInstanceId(workflowInstanceId);
// delete process instance
processInstanceDao.deleteById(workflowInstanceId);
}
private void deleteSubWorkflowInstanceIfNeeded(int workflowInstanceId) {
List<Integer> subWorkflowInstanceIds = processInstanceMapDao.querySubWorkflowInstanceIds(workflowInstanceId);
if (org.apache.commons.collections4.CollectionUtils.isEmpty(subWorkflowInstanceIds)) {
return;
}
for (Integer subWorkflowInstanceId : subWorkflowInstanceIds) {
deleteProcessInstanceById(subWorkflowInstanceId);
}
processInstanceMapDao.deleteByParentId(workflowInstanceId);
}
}

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

@ -913,6 +913,18 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
return result;
}
@Override
public List<ProcessTaskRelation> queryByWorkflowDefinitionCode(long workflowDefinitionCode,
int workflowDefinitionVersion) {
return processTaskRelationMapper.queryProcessTaskRelationsByProcessDefinitionCode(workflowDefinitionCode,
workflowDefinitionVersion);
}
@Override
public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode, int workflowDefinitionVersion) {
processTaskRelationMapper.deleteByWorkflowDefinitionCode(workflowDefinitionCode, workflowDefinitionVersion);
}
/**
* build task definition
*

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -856,6 +856,11 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
@Override
public Map<String, Object> queryResourceList(User loginUser, ResourceType type, String fullName) {
Map<String, Object> result = new HashMap<>();
if (storageOperate == null) {
result.put(Constants.DATA_LIST, Collections.emptyList());
result.put(Constants.STATUS, Status.SUCCESS);
return result;
}
User user = userMapper.selectById(loginUser.getId());
if (user == null) {

61
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionLogServiceImpl.java

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.service.TaskDefinitionLogService;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.commons.collections4.CollectionUtils;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TaskDefinitionLogServiceImpl implements TaskDefinitionLogService {
@Autowired
private ProcessTaskRelationLogDao processTaskRelationLogDao;
@Autowired
private TaskDefinitionLogDao taskDefinitionLogDao;
@Override
public void deleteTaskByWorkflowDefinitionCode(long workflowDefinitionCode) {
List<ProcessTaskRelationLog> processTaskRelations =
processTaskRelationLogDao.findByWorkflowDefinitionCode(workflowDefinitionCode);
if (CollectionUtils.isEmpty(processTaskRelations)) {
return;
}
// delete task definition
Set<Long> needToDeleteTaskDefinitionCodes = new HashSet<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
needToDeleteTaskDefinitionCodes.add(processTaskRelation.getPreTaskCode());
needToDeleteTaskDefinitionCodes.add(processTaskRelation.getPostTaskCode());
}
taskDefinitionLogDao.deleteByTaskDefinitionCodes(needToDeleteTaskDefinitionCodes);
// delete task workflow relation
processTaskRelationLogDao.deleteByWorkflowDefinitionCode(workflowDefinitionCode);
}
}

23
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -63,6 +63,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -77,6 +78,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -111,6 +113,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
@Autowired
private TaskDefinitionDao taskDefinitionDao;
@Autowired
private TaskDefinitionLogMapper taskDefinitionLogMapper;
@ -1220,4 +1225,22 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.SUCCESS);
return result;
}
@Override
public void deleteTaskByWorkflowDefinitionCode(long workflowDefinitionCode, int workflowDefinitionVersion) {
List<ProcessTaskRelation> processTaskRelations = processTaskRelationService
.queryByWorkflowDefinitionCode(workflowDefinitionCode, workflowDefinitionVersion);
if (CollectionUtils.isEmpty(processTaskRelations)) {
return;
}
// delete task definition
Set<Long> needToDeleteTaskDefinitionCodes = new HashSet<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
needToDeleteTaskDefinitionCodes.add(processTaskRelation.getPreTaskCode());
needToDeleteTaskDefinitionCodes.add(processTaskRelation.getPostTaskCode());
}
taskDefinitionDao.deleteByTaskDefinitionCodes(needToDeleteTaskDefinitionCodes);
// delete task workflow relation
processTaskRelationService.deleteByWorkflowDefinitionCode(workflowDefinitionCode, workflowDefinitionVersion);
}
}

35
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java

@ -23,7 +23,6 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.service.UsersService;
@ -40,6 +39,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.DqExecuteResultDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@ -47,6 +47,7 @@ import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskSavePointRequestCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang3.StringUtils;
@ -91,9 +92,6 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
@Autowired
TaskInstanceDao taskInstanceDao;
@Autowired
ProcessInstanceService processInstanceService;
@Autowired
UsersService usersService;
@ -103,6 +101,12 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
@Autowired
private StateEventCallbackService stateEventCallbackService;
@Autowired
private LogClient logClient;
@Autowired
private DqExecuteResultDao dqExecuteResultDao;
/**
* query task list by project, process instance, task name, task start time, task end time, task status, keyword paging
*
@ -361,4 +365,27 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
return new TaskInstanceRemoveCacheResponse(result, cacheKey);
}
@Override
public void deleteByWorkflowInstanceId(Integer workflowInstanceId) {
List<TaskInstance> needToDeleteTaskInstances =
taskInstanceDao.findTaskInstanceByWorkflowInstanceId(workflowInstanceId);
if (org.apache.commons.collections4.CollectionUtils.isEmpty(needToDeleteTaskInstances)) {
return;
}
for (TaskInstance taskInstance : needToDeleteTaskInstances) {
// delete log
if (StringUtils.isNotEmpty(taskInstance.getLogPath())) {
try {
logClient.removeTaskLog(Host.of(taskInstance.getHost()), taskInstance.getLogPath());
} catch (Exception e) {
logger.error(
"Remove task log error, meet an unknown exception, taskInstanceId: {}, host: {}, logPath: {}",
taskInstance.getId(), taskInstance.getHost(), taskInstance.getLogPath(), e);
}
}
}
dqExecuteResultDao.deleteByWorkflowInstanceId(workflowInstanceId);
taskInstanceDao.deleteByWorkflowInstanceId(workflowInstanceId);
}
}

45
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -71,6 +71,7 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.DbType;
@ -88,6 +89,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
@ -173,6 +175,15 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
@Mock
private WorkFlowLineageService workFlowLineageService;
@Mock
private TaskDefinitionService taskDefinitionService;
@Mock
private TaskDefinitionLogService taskDefinitionLogService;
@Mock
private ProcessDefinitionLogDao processDefinitionLogDao;
@Mock
private UserMapper userMapper;
@ -476,7 +487,7 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
// project check auth fail
Mockito.when(processDefinitionMapper.queryByCode(6L)).thenReturn(this.getProcessDefinition());
Mockito.when(processDefinitionDao.queryByCode(6L)).thenReturn(Optional.of(getProcessDefinition()));
Mockito.doThrow(new ServiceException(Status.PROJECT_NOT_FOUND)).when(projectService)
.checkProjectAndAuthThrowException(user, project, WORKFLOW_DEFINITION_DELETE);
exception = Assertions.assertThrows(ServiceException.class,
@ -486,14 +497,14 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
// project check auth success, instance not exist
Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, project,
WORKFLOW_DEFINITION_DELETE);
Mockito.when(processDefinitionMapper.queryByCode(1L)).thenReturn(null);
Mockito.when(processDefinitionDao.queryByCode(1L)).thenReturn(Optional.empty());
exception = Assertions.assertThrows(ServiceException.class,
() -> processDefinitionService.deleteProcessDefinitionByCode(user, 1L));
Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
ProcessDefinition processDefinition = getProcessDefinition();
// user no auth
Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processDefinitionDao.queryByCode(46L)).thenReturn(Optional.of(processDefinition));
exception = Assertions.assertThrows(ServiceException.class,
() -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L));
Assertions.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(), ((ServiceException) exception).getCode());
@ -501,19 +512,16 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
// process definition online
user.setUserType(UserType.ADMIN_USER);
processDefinition.setReleaseState(ReleaseState.ONLINE);
Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processDefinitionDao.queryByCode(46L)).thenReturn(Optional.of(processDefinition));
exception = Assertions.assertThrows(ServiceException.class,
() -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L));
Assertions.assertEquals(Status.PROCESS_DEFINE_STATE_ONLINE.getCode(), ((ServiceException) exception).getCode());
// scheduler list elements > 1
processDefinition.setReleaseState(ReleaseState.OFFLINE);
Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processDefinitionDao.queryByCode(46L)).thenReturn(Optional.of(processDefinition));
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
Mockito.when(scheduleMapper.deleteById(46)).thenReturn(1);
Mockito.when(processDefinitionMapper.deleteById(processDefinition.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()))
.thenReturn(1);
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode()))
.thenReturn(Collections.emptySet());
processDefinitionService.deleteProcessDefinitionByCode(user, 46L);
@ -538,24 +546,12 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
// delete success
schedule.setReleaseState(ReleaseState.OFFLINE);
Mockito.when(processTaskRelationMapper.queryByProcessCode(1, 11))
.thenReturn(getProcessTaskRelation());
Mockito.when(taskDefinitionMapper.deleteByBatchCodes(Arrays.asList(100L, 200L))).thenReturn(2);
Mockito.when(processDefinitionMapper.deleteById(46)).thenReturn(1);
Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()))
.thenReturn(1);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1);
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode()))
.thenReturn(Collections.emptySet());
Assertions.assertDoesNotThrow(() -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L));
// delete fail
Mockito.when(taskDefinitionMapper.deleteByBatchCodes(Arrays.asList(100L, 200L))).thenReturn(1);
exception = Assertions.assertThrows(ServiceException.class,
() -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L));
Assertions.assertEquals(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR.getCode(),
((ServiceException) exception).getCode());
}
@Test
@ -584,7 +580,7 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
definitionCodes = Lists.newArrayList(singleCodes.split(Constants.COMMA)).stream().map(Long::parseLong)
.collect(Collectors.toSet());
Mockito.when(processDefinitionMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(process);
Mockito.when(processDefinitionDao.queryByCode(processDefinitionCode)).thenReturn(Optional.of(process));
// process definition online
user.setUserType(UserType.ADMIN_USER);
@ -600,10 +596,7 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
// delete success
process.setReleaseState(ReleaseState.OFFLINE);
Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(process);
Mockito.when(processDefinitionMapper.deleteById(process.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), process.getCode()))
.thenReturn(1);
Mockito.when(processDefinitionDao.queryByCode(processDefinitionCode)).thenReturn(Optional.of(process));
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), process.getCode()))
.thenReturn(Collections.emptySet());
putMsg(result, Status.SUCCESS, projectCode);

10
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -58,6 +58,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
@ -150,6 +151,12 @@ public class ProcessInstanceServiceTest {
@Mock
AlertDao alertDao;
@Mock
private TaskInstanceService taskInstanceService;
@Mock
private ProcessInstanceMapDao processInstanceMapDao;
private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
+ "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
@ -682,8 +689,7 @@ public class ProcessInstanceServiceTest {
processInstanceService.deleteProcessInstanceById(loginUser, 1);
when(processService.deleteWorkProcessInstanceById(1)).thenReturn(0);
Assertions.assertThrows(ServiceException.class,
() -> processInstanceService.deleteProcessInstanceById(loginUser, 1));
Assertions.assertDoesNotThrow(() -> processInstanceService.deleteProcessInstanceById(loginUser, 1));
}
@Test

9
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java

@ -338,6 +338,15 @@ public class AlertDao {
if (processInstanceId == null) {
return;
}
List<Alert> alertList = alertMapper.selectByWorkflowInstanceId(processInstanceId);
if (CollectionUtils.isEmpty(alertList)) {
return;
}
alertMapper.deleteByWorkflowInstanceId(processInstanceId);
List<Integer> alertIds = alertList
.stream()
.map(Alert::getId)
.collect(Collectors.toList());
alertSendStatusMapper.deleteByAlertIds(alertIds);
}
}

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java

@ -46,4 +46,6 @@ public interface AlertMapper extends BaseMapper<Alert> {
@Param("crashAlarmSuppressionStartTime") Date crashAlarmSuppressionStartTime);
void deleteByWorkflowInstanceId(@Param("workflowInstanceId") Integer processInstanceId);
List<Alert> selectByWorkflowInstanceId(@Param("workflowInstanceId") Integer processInstanceId);
}

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.java

@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.AlertSendStatus;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
@ -26,4 +28,6 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface AlertSendStatusMapper extends BaseMapper<AlertSendStatus> {
int batchInsert(List<AlertSendStatus> alertSendStatuses);
void deleteByAlertIds(@Param("alertIds") List<Integer> alertIds);
}

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DqExecuteResultMapper.java

@ -57,4 +57,6 @@ public interface DqExecuteResultMapper extends BaseMapper<DqExecuteResult> {
* @return DqExecuteResult
*/
DqExecuteResult getExecuteResultById(@Param("taskInstanceId") int taskInstanceId);
void deleteByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId);
}

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java

@ -94,4 +94,6 @@ public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinition
* @return delete result
*/
int deleteByProcessDefinitionCodeAndVersion(@Param("code") long code, @Param("version") int version);
void deleteByProcessDefinitionCode(@Param("workflowDefinitionCode") long workflowDefinitionCode);
}

1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.java

@ -60,4 +60,5 @@ public interface ProcessInstanceMapMapper extends BaseMapper<ProcessInstanceMap>
*/
List<Integer> querySubIdListByParentId(@Param("parentInstanceId") int parentInstanceId);
void deleteByParentId(@Param("parentWorkflowInstanceId") int workflowInstanceId);
}

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java

@ -74,4 +74,8 @@ public interface ProcessTaskRelationLogMapper extends BaseMapper<ProcessTaskRela
* @return process task relation log
*/
ProcessTaskRelationLog queryRelationLogByRelation(@Param("processTaskRelation") ProcessTaskRelation processTaskRelation);
List<ProcessTaskRelationLog> queryByProcessCode(@Param("workflowDefinitionCode") long workflowDefinitionCode);
void deleteByWorkflowDefinitionCode(@Param("workflowDefinitionCode") long workflowDefinitionCode);
}

3
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java

@ -222,4 +222,7 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
Long queryTaskCodeByTaskName(@Param("workflowCode") Long workflowCode,
@Param("taskName") String taskName);
void deleteByWorkflowDefinitionCode(@Param("workflowDefinitionCode") long workflowDefinitionCode,
@Param("workflowDefinitionVersion") int workflowDefinitionVersion);
}

3
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java

@ -24,6 +24,7 @@ import org.apache.ibatis.annotations.Param;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
@ -96,4 +97,6 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
*/
IPage<TaskDefinitionLog> queryTaskDefinitionVersionsPaging(Page<TaskDefinitionLog> page, @Param("code") long code,
@Param("projectCode") long projectCode);
void deleteByTaskDefinitionCodes(@Param("taskDefinitionCodes") Set<Long> taskDefinitionCodes);
}

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java

@ -156,4 +156,6 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
* @return deleted row count
*/
int deleteByBatchCodes(@Param("taskCodeList") List<Long> taskCodeList);
void deleteByWorkflowDefinitionCodeAndVersion(long workflowDefinitionCode, int workflowDefinitionVersion);
}

23
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/DqExecuteResultDao.java

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository;
public interface DqExecuteResultDao {
void deleteByWorkflowInstanceId(Integer workflowInstanceId);
}

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.model.PageListingResult;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
@ -46,4 +47,10 @@ public interface ProcessDefinitionDao {
* @return
*/
List<ProcessDefinition> queryProcessDefinitionsByCodesAndVersions(List<ProcessInstance> processInstances);
Optional<ProcessDefinition> queryByCode(long code);
void deleteById(Integer workflowDefinitionId);
void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
}

23
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository;
public interface ProcessDefinitionLogDao {
void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
}

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java

@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import java.util.List;
public interface ProcessInstanceDao {
public int insertProcessInstance(ProcessInstance processInstance);
@ -31,4 +33,9 @@ public interface ProcessInstanceDao {
* @param processInstance processInstance
*/
public int upsertProcessInstance(ProcessInstance processInstance);
void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds);
void deleteById(Integer workflowInstanceId);
}

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceMapDao.java

@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import java.util.List;
/**
* Process Instance Map DAO
*/
@ -46,4 +48,7 @@ public interface ProcessInstanceMapDao {
*/
ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId);
List<Integer> querySubWorkflowInstanceIds(int workflowInstanceId);
void deleteByParentId(int workflowInstanceId);
}

29
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import java.util.List;
public interface ProcessTaskRelationLogDao {
List<ProcessTaskRelationLog> findByWorkflowDefinitionCode(long workflowDefinitionCode);
void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
}

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import java.util.List;
import java.util.Set;
/**
* Task Instance DAO
@ -41,4 +42,7 @@ public interface TaskDefinitionDao {
*/
TaskDefinition findTaskDefinition(long taskCode, int taskDefinitionVersion);
void deleteByWorkflowDefinitionCodeAndVersion(long workflowDefinitionCode, int workflowDefinitionVersion);
void deleteByTaskDefinitionCodes(Set<Long> needToDeleteTaskDefinitionCodes);
}

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionLogDao.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import java.util.List;
import java.util.Set;
/**
* Task Definition Log DAO
@ -41,4 +42,5 @@ public interface TaskDefinitionLogDao {
*/
List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations);
void deleteByTaskDefinitionCodes(Set<Long> taskDefinitionCodes);
}

36
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/DqExecuteResultDaoImpl.java

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.mapper.DqExecuteResultMapper;
import org.apache.dolphinscheduler.dao.repository.DqExecuteResultDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
@Repository
public class DqExecuteResultDaoImpl implements DqExecuteResultDao {
@Autowired
private DqExecuteResultMapper dqExecuteResultMapper;
@Override
public void deleteByWorkflowInstanceId(Integer workflowInstanceId) {
dqExecuteResultMapper.deleteByWorkflowInstanceId(workflowInstanceId);
}
}

17
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
@ -79,4 +80,20 @@ public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao {
return processDefinitions;
}
@Override
public Optional<ProcessDefinition> queryByCode(long code) {
return Optional.ofNullable(
processDefinitionMapper.queryByCode(code));
}
@Override
public void deleteById(Integer workflowDefinitionId) {
processDefinitionMapper.deleteById(workflowDefinitionId);
}
@Override
public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) {
processDefinitionMapper.deleteByCode(workflowDefinitionCode);
}
}

36
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
@Repository
public class ProcessDefinitionLogDaoImpl implements ProcessDefinitionLogDao {
@Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper;
@Override
public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) {
processDefinitionLogMapper.deleteByProcessDefinitionCode(workflowDefinitionCode);
}
}

17
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java

@ -21,6 +21,10 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.commons.collections4.CollectionUtils;
import java.util.List;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@ -52,4 +56,17 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
return insertProcessInstance(processInstance);
}
}
@Override
public void deleteByIds(List<Integer> needToDeleteWorkflowInstanceIds) {
if (CollectionUtils.isEmpty(needToDeleteWorkflowInstanceIds)) {
return;
}
processInstanceMapper.deleteBatchIds(needToDeleteWorkflowInstanceIds);
}
@Override
public void deleteById(Integer workflowInstanceId) {
processInstanceMapper.deleteById(workflowInstanceId);
}
}

12
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceMapDaoImpl.java

@ -21,6 +21,8 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
import java.util.List;
import lombok.NonNull;
import org.springframework.beans.factory.annotation.Autowired;
@ -49,4 +51,14 @@ public class ProcessInstanceMapDaoImpl implements ProcessInstanceMapDao {
public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) {
return processInstanceMapMapper.queryByParentId(parentWorkProcessId, parentTaskId);
}
@Override
public List<Integer> querySubWorkflowInstanceIds(int workflowInstanceId) {
return processInstanceMapMapper.querySubIdListByParentId(workflowInstanceId);
}
@Override
public void deleteByParentId(int workflowInstanceId) {
processInstanceMapMapper.deleteByParentId(workflowInstanceId);
}
}

44
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
@Repository
public class ProcessTaskRelationLogDaoImpl implements ProcessTaskRelationLogDao {
@Autowired
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Override
public List<ProcessTaskRelationLog> findByWorkflowDefinitionCode(long workflowDefinitionCode) {
return processTaskRelationLogMapper.queryByProcessCode(workflowDefinitionCode);
}
@Override
public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) {
processTaskRelationLogMapper.deleteByWorkflowDefinitionCode(workflowDefinitionCode);
}
}

17
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java

@ -27,6 +27,9 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -84,4 +87,18 @@ public class TaskDefinitionDaoImpl implements TaskDefinitionDao {
return taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinitionVersion);
}
@Override
public void deleteByWorkflowDefinitionCodeAndVersion(long workflowDefinitionCode, int workflowDefinitionVersion) {
taskDefinitionMapper.deleteByWorkflowDefinitionCodeAndVersion(workflowDefinitionCode,
workflowDefinitionVersion);
}
@Override
public void deleteByTaskDefinitionCodes(Set<Long> needToDeleteTaskDefinitionCodes) {
if (CollectionUtils.isEmpty(needToDeleteTaskDefinitionCodes)) {
return;
}
taskDefinitionMapper.deleteByBatchCodes(new ArrayList<>(needToDeleteTaskDefinitionCodes));
}
}

10
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionLogDaoImpl.java

@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -88,4 +90,12 @@ public class TaskDefinitionLogDaoImpl implements TaskDefinitionLogDao {
});
return taskDefinitionLogs;
}
@Override
public void deleteByTaskDefinitionCodes(Set<Long> taskDefinitionCodes) {
if (CollectionUtils.isEmpty(taskDefinitionCodes)) {
return;
}
taskDefinitionLogMapper.deleteByTaskDefinitionCodes(taskDefinitionCodes);
}
}

6
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml

@ -24,6 +24,12 @@
process_definition_code, process_instance_id, alert_type
</sql>
<select id="selectByWorkflowInstanceId" resultType="org.apache.dolphinscheduler.dao.entity.Alert">
select
<include refid="baseSql"/>
from t_ds_alert
where process_instance_id = #{workflowInstanceId}
</select>
<insert id="insertAlertWhenServerCrash">
insert into t_ds_alert(sign, title, content, alert_status, warning_type, log, alertgroup_id, create_time,

9
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.xml

@ -32,4 +32,13 @@
</insert>
<delete id="deleteByAlertIds">
delete
from t_ds_alert_send_status
where alert_id in
<foreach collection="alertIds" item="alertId" separator="," open="(" close=")">
#{alertId}
</foreach>
</delete>
</mapper>

24
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DqExecuteResultMapper.xml

@ -89,17 +89,23 @@
a.user_id,
a.comparison_type,
a.error_output_path,
b.name as process_definition_name,
e.name as process_instance_name,
c.name as task_name,
b.name as process_definition_name,
e.name as process_instance_name,
c.name as task_name,
cp.type as comparison_type_name,
d.user_name
FROM t_ds_dq_execute_result a
left join t_ds_process_definition b on a.process_definition_id = b.id
left join t_ds_task_instance c on a.task_instance_id = c.id
left join t_ds_process_instance e on a.process_instance_id = e.id
left join t_ds_user d on d.id = a.user_id
left join t_ds_dq_comparison_type cp on cp.id = a.comparison_type
where task_instance_id = #{taskInstanceId}
left join t_ds_process_definition b on a.process_definition_id = b.id
left join t_ds_task_instance c on a.task_instance_id = c.id
left join t_ds_process_instance e on a.process_instance_id = e.id
left join t_ds_user d on d.id = a.user_id
left join t_ds_dq_comparison_type cp on cp.id = a.comparison_type
where task_instance_id = #{taskInstanceId}
</select>
<delete id="deleteByWorkflowInstanceId">
delete
from t_ds_dq_execute_result
where process_instance_id = #{workflowInstanceId}
</delete>
</mapper>

6
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml

@ -82,4 +82,10 @@
where code = #{code}
and version = #{version}
</delete>
<delete id="deleteByProcessDefinitionCode">
delete
from t_ds_process_definition_log
where code = #{workflowDefinitionCode}
</delete>
</mapper>

6
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml

@ -44,4 +44,10 @@
from t_ds_relation_process_instance
where parent_process_instance_id = #{parentInstanceId}
</select>
<delete id="querySubIdListByParentId">
delete
from t_ds_relation_process_instance
where parent_process_instance_id = #{parentWorkflowInstanceId}
</delete>
</mapper>

12
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml

@ -70,4 +70,16 @@
and post_task_code = #{processTaskRelation.postTaskCode}
and post_task_version = #{processTaskRelation.postTaskVersion}
</select>
<select id="queryByProcessCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog">
select
<include refid="baseSql"/>
from t_ds_process_task_relation_log
WHERE process_definition_code = #{workflowDefinitionCode}
</select>
<delete id="deleteByWorkflowDefinitionCode">
delete
from t_ds_process_task_relation_log
WHERE process_definition_code = #{workflowDefinitionCode}
</delete>
</mapper>

20
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

@ -224,16 +224,20 @@
<select id="queryTaskCodeByTaskName" resultType="java.lang.Long">
select r.post_task_code
from t_ds_process_task_relation r
left join t_ds_task_definition d on d.code = r.post_task_code
left join t_ds_task_definition d on d.code = r.post_task_code
where r.process_definition_code = #{workflowCode}
and d.name = #{taskName}
and d.name = #{taskName}
</select>
<update id="updateProcessTaskRelationTaskVersion">
update t_ds_process_task_relation
set
pre_task_version=#{processTaskRelation.preTaskVersion},
post_task_version=#{processTaskRelation.postTaskVersion}
where
id = #{processTaskRelation.id}
update t_ds_process_task_relation
set pre_task_version=#{processTaskRelation.preTaskVersion},
post_task_version=#{processTaskRelation.postTaskVersion}
where id = #{processTaskRelation.id}
</update>
<delete id="deleteByWorkflowDefinitionCode">
delete
from t_ds_process_task_relation
where process_definition_code = #{workflowDefinitionCode} and process_definition_version = #{workflowDefinitionVersion}
</delete>
</mapper>

14
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml

@ -60,7 +60,8 @@
#{taskDefinitionLog.flag},#{taskDefinitionLog.isCache},#{taskDefinitionLog.taskPriority},#{taskDefinitionLog.workerGroup},#{taskDefinitionLog.environmentCode},
#{taskDefinitionLog.failRetryTimes},#{taskDefinitionLog.failRetryInterval},#{taskDefinitionLog.timeoutFlag},#{taskDefinitionLog.timeoutNotifyStrategy},
#{taskDefinitionLog.timeout},#{taskDefinitionLog.delayTime},#{taskDefinitionLog.resourceIds},#{taskDefinitionLog.operator},#{taskDefinitionLog.operateTime},
#{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime}, #{taskDefinitionLog.taskGroupId},#{taskDefinitionLog.taskGroupPriority},
#{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime},
#{taskDefinitionLog.taskGroupId},#{taskDefinitionLog.taskGroupPriority},
#{taskDefinitionLog.cpuQuota},#{taskDefinitionLog.memoryMax},#{taskDefinitionLog.taskExecuteType})
</foreach>
</insert>
@ -70,7 +71,8 @@
where code = #{code}
and version = #{version}
</delete>
<select id="queryTaskDefinitionVersionsPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog">
<select id="queryTaskDefinitionVersionsPaging"
resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog">
select
<include refid="baseSql"/>
from t_ds_task_definition_log
@ -80,4 +82,12 @@
</if>
order by version desc
</select>
<delete id="deleteByTaskDefinitionCodes">
delete
from t_ds_task_definition_log
where code in
<foreach collection="taskDefinitionCodes" item="taskDefinitionCode" separator="," open="(" close=")">
#{taskDefinitionCode}
</foreach>
</delete>
</mapper>

Loading…
Cancel
Save