Browse Source

Clear the task group data when delete a project/workflowInstance (#13383)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
98dfd64444
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java
  3. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  4. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
  5. 23
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java
  6. 19
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
  7. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
  8. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectServiceTest.java
  9. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
  10. 3
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
  11. 6
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
  12. 7
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
  13. 23
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml
  14. 31
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
@ -68,7 +69,13 @@ public interface TaskGroupQueueService {
*/ */
boolean deleteByTaskId(int taskId); boolean deleteByTaskId(int taskId);
void deleteByTaskInstanceIds(List<Integer> taskInstanceIds);
void deleteByWorkflowInstanceId(Integer workflowInstanceId);
void forceStartTask(int queueId, int forceStart); void forceStartTask(int queueId, int forceStart);
void modifyPriority(Integer queueId, Integer priority); void modifyPriority(Integer queueId, Integer priority);
void deleteByTaskGroupIds(List<Integer> taskGroupIds);
} }

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java

@ -139,4 +139,6 @@ public interface TaskGroupService {
Map<String, Object> forceStartTask(User loginUser, int taskId); Map<String, Object> forceStartTask(User loginUser, int taskId);
Map<String, Object> modifyPriority(User loginUser, Integer queueId, Integer priority); Map<String, Object> modifyPriority(User loginUser, Integer queueId, Integer priority);
void deleteTaskGroupByProjectCode(long projectCode);
} }

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -104,6 +104,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
@ -135,6 +136,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
@Autowired @Autowired
private ProcessInstanceMapper processInstanceMapper; private ProcessInstanceMapper processInstanceMapper;
@Lazy()
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java

@ -24,6 +24,7 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskGroupService;
import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.constants.Constants;
@ -60,6 +61,7 @@ import lombok.NonNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@ -74,6 +76,10 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
private static final Logger logger = LoggerFactory.getLogger(ProjectServiceImpl.class); private static final Logger logger = LoggerFactory.getLogger(ProjectServiceImpl.class);
@Lazy
@Autowired
private TaskGroupService taskGroupService;
@Autowired @Autowired
private ProjectMapper projectMapper; private ProjectMapper projectMapper;
@ -446,6 +452,9 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
putMsg(result, Status.DELETE_PROJECT_ERROR_DEFINES_NOT_NULL); putMsg(result, Status.DELETE_PROJECT_ERROR_DEFINES_NOT_NULL);
return result; return result;
} }
// delete the task group
taskGroupService.deleteTaskGroupByProjectCode(project.getCode());
int delete = projectMapper.deleteById(project.getId()); int delete = projectMapper.deleteById(project.getId());
if (delete > 0) { if (delete > 0) {
logger.info("Project is deleted and id is :{}.", project.getId()); logger.info("Project is deleted and id is :{}.", project.getId());

23
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java

@ -28,6 +28,8 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.commons.collections4.CollectionUtils;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -145,6 +147,19 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr
return taskGroupQueueMapper.deleteByTaskId(taskId) == 1; return taskGroupQueueMapper.deleteByTaskId(taskId) == 1;
} }
@Override
public void deleteByTaskInstanceIds(List<Integer> taskInstanceIds) {
if (CollectionUtils.isEmpty(taskInstanceIds)) {
return;
}
taskGroupQueueMapper.deleteByTaskInstanceIds(taskInstanceIds);
}
@Override
public void deleteByWorkflowInstanceId(Integer workflowInstanceId) {
taskGroupQueueMapper.deleteByWorkflowInstanceId(workflowInstanceId);
}
@Override @Override
public void forceStartTask(int queueId, int forceStart) { public void forceStartTask(int queueId, int forceStart) {
taskGroupQueueMapper.updateForceStart(queueId, forceStart); taskGroupQueueMapper.updateForceStart(queueId, forceStart);
@ -154,4 +169,12 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr
public void modifyPriority(Integer queueId, Integer priority) { public void modifyPriority(Integer queueId, Integer priority) {
taskGroupQueueMapper.modifyPriority(queueId, priority); taskGroupQueueMapper.modifyPriority(queueId, priority);
} }
@Override
public void deleteByTaskGroupIds(List<Integer> taskGroupIds) {
if (CollectionUtils.isEmpty(taskGroupIds)) {
return;
}
taskGroupQueueMapper.deleteByTaskGroupIds(taskGroupIds);
}
} }

19
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java

@ -29,8 +29,8 @@ import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.TaskGroup; import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList; import java.util.ArrayList;
@ -40,6 +40,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -63,9 +64,6 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
@Autowired @Autowired
private TaskGroupQueueService taskGroupQueueService; private TaskGroupQueueService taskGroupQueueService;
@Autowired
private ProcessService processService;
@Autowired @Autowired
private ExecutorService executorService; private ExecutorService executorService;
@ -429,4 +427,17 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
return result; return result;
} }
@Override
public void deleteTaskGroupByProjectCode(long projectCode) {
List<TaskGroup> taskGroups = taskGroupMapper.selectByProjectCode(projectCode);
if (CollectionUtils.isEmpty(taskGroups)) {
return;
}
List<Integer> taskGroupIds = taskGroups.stream()
.map(TaskGroup::getId)
.collect(Collectors.toList());
taskGroupQueueService.deleteByTaskGroupIds(taskGroupIds);
taskGroupMapper.deleteBatchIds(taskGroupIds);
}
} }

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java

@ -24,6 +24,7 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse; import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskGroupQueueService;
import org.apache.dolphinscheduler.api.service.TaskInstanceService; import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.service.UsersService; import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.PageInfo;
@ -107,6 +108,9 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
@Autowired @Autowired
private DqExecuteResultDao dqExecuteResultDao; private DqExecuteResultDao dqExecuteResultDao;
@Autowired
private TaskGroupQueueService taskGroupQueueService;
/** /**
* query task list by project, process instance, task name, task start time, task end time, task status, keyword paging * query task list by project, process instance, task name, task start time, task end time, task status, keyword paging
* *
@ -384,7 +388,9 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
} }
} }
} }
dqExecuteResultDao.deleteByWorkflowInstanceId(workflowInstanceId); dqExecuteResultDao.deleteByWorkflowInstanceId(workflowInstanceId);
taskGroupQueueService.deleteByWorkflowInstanceId(workflowInstanceId);
taskInstanceDao.deleteByWorkflowInstanceId(workflowInstanceId); taskInstanceDao.deleteByWorkflowInstanceId(workflowInstanceId);
} }

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectServiceTest.java

@ -96,6 +96,9 @@ public class ProjectServiceTest {
@Mock @Mock
private ResourcePermissionCheckService resourcePermissionCheckService; private ResourcePermissionCheckService resourcePermissionCheckService;
@Mock
private TaskGroupService taskGroupService;
private String projectName = "ProjectServiceTest"; private String projectName = "ProjectServiceTest";
private String userName = "ProjectServiceTest"; private String userName = "ProjectServiceTest";

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java

@ -72,7 +72,7 @@ public class TaskGroup implements Serializable {
*/ */
private Date updateTime; private Date updateTime;
/** /**
* project Id * project code
*/ */
private long projectCode; private long projectCode;

3
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java

@ -82,8 +82,11 @@ public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
/** /**
* listAuthorizedResource * listAuthorizedResource
*
* @param userId * @param userId
* @return * @return
*/ */
List<TaskGroup> listAuthorizedResource(@Param("userId") int userId); List<TaskGroup> listAuthorizedResource(@Param("userId") int userId);
List<TaskGroup> selectByProjectCode(@Param("projectCode") long projectCode);
} }

6
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java

@ -99,4 +99,10 @@ public interface TaskGroupQueueMapper extends BaseMapper<TaskGroupQueue> {
@Param("status") Integer status, @Param("status") Integer status,
@Param("groupId") int groupId, @Param("groupId") int groupId,
@Param("projects") List<Project> projects); @Param("projects") List<Project> projects);
void deleteByTaskInstanceIds(@Param("taskInstanceIds") List<Integer> taskInstanceIds);
void deleteByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId);
void deleteByTaskGroupIds(@Param("taskGroupIds") List<Integer> taskGroupIds);
} }

7
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml

@ -73,6 +73,13 @@
order by update_time desc order by update_time desc
</select> </select>
<select id="selectByProjectCode" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroup">
select
<include refid="baseSql"/>
from t_ds_task_group
where project_code = #{projectCode}
</select>
<!--modify data by id--> <!--modify data by id-->
<update id="robTaskGroupResource"> <update id="robTaskGroupResource">
update t_ds_task_group update t_ds_task_group

23
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml

@ -157,4 +157,27 @@
order by queue.update_time desc order by queue.update_time desc
</select> </select>
<delete id="deleteByTaskIds">
delete from t_ds_task_group_queue
where task_id in
<foreach collection="taskInstanceIds" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</delete>
<delete id="deleteByWorkflowInstanceId">
delete
from t_ds_task_group_queue
where process_id = #{workflowInstanceId}
</delete>
<delete id="deleteByTaskGroupIds">
delete
from t_ds_task_group_queue
where group_id in
<foreach collection="taskGroupIds" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</delete>
</mapper> </mapper>

31
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -2433,8 +2433,12 @@ public class ProcessServiceImpl implements ProcessService {
* @param taskId task id * @param taskId task id
*/ */
@Override @Override
public boolean acquireTaskGroup(int taskId, String taskName, int groupId, int processId, int priority) { public boolean acquireTaskGroup(int taskInstanceId,
TaskGroup taskGroup = taskGroupMapper.selectById(groupId); String taskName,
int taskGroupId,
int workflowInstanceId,
int taskGroupPriority) {
TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupId);
if (taskGroup == null) { if (taskGroup == null) {
// we don't throw exception here, to avoid the task group has been deleted during workflow running // we don't throw exception here, to avoid the task group has been deleted during workflow running
return true; return true;
@ -2444,17 +2448,17 @@ public class ProcessServiceImpl implements ProcessService {
return true; return true;
} }
// Create a waiting taskGroupQueue, after acquire resource, we can update the status to ACQUIRE_SUCCESS // Create a waiting taskGroupQueue, after acquire resource, we can update the status to ACQUIRE_SUCCESS
TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskId); TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstanceId);
if (taskGroupQueue == null) { if (taskGroupQueue == null) {
taskGroupQueue = insertIntoTaskGroupQueue( taskGroupQueue = insertIntoTaskGroupQueue(
taskId, taskInstanceId,
taskName, taskName,
groupId, taskGroupId,
processId, workflowInstanceId,
priority, taskGroupPriority,
TaskGroupQueueStatus.WAIT_QUEUE); TaskGroupQueueStatus.WAIT_QUEUE);
} else { } else {
logger.info("The task queue is already exist, taskId: {}", taskId); logger.info("The task queue is already exist, taskId: {}", taskInstanceId);
if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) { if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) {
return true; return true;
} }
@ -2464,19 +2468,19 @@ public class ProcessServiceImpl implements ProcessService {
} }
// check if there already exist higher priority tasks // check if there already exist higher priority tasks
List<TaskGroupQueue> highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks( List<TaskGroupQueue> highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(
groupId, taskGroupId,
priority, taskGroupPriority,
TaskGroupQueueStatus.WAIT_QUEUE.getCode()); TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (CollectionUtils.isNotEmpty(highPriorityTasks)) { if (CollectionUtils.isNotEmpty(highPriorityTasks)) {
return false; return false;
} }
// try to get taskGroup // try to get taskGroup
int count = taskGroupMapper.selectAvailableCountById(groupId); int count = taskGroupMapper.selectAvailableCountById(taskGroupId);
if (count == 1 && robTaskGroupResource(taskGroupQueue)) { if (count == 1 && robTaskGroupResource(taskGroupQueue)) {
logger.info("Success acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskId, groupId); logger.info("Success acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskInstanceId, taskGroupId);
return true; return true;
} }
logger.info("Failed to acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskId, groupId); logger.info("Failed to acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskInstanceId, taskGroupId);
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
return false; return false;
} }
@ -2608,6 +2612,7 @@ public class ProcessServiceImpl implements ProcessService {
.processId(workflowInstanceId) .processId(workflowInstanceId)
.priority(taskGroupPriority) .priority(taskGroupPriority)
.status(status) .status(status)
.inQueue(Flag.NO.getCode())
.createTime(now) .createTime(now)
.updateTime(now) .updateTime(now)
.build(); .build();

Loading…
Cancel
Save