diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java index 9cf7024aec..1d722e1fd9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java @@ -112,11 +112,18 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe putMsg(result, Status.TASK_GROUP_NAME_EXSIT); return result; } - TaskGroup taskGroup = new TaskGroup(name, projectCode, description, - groupSize, loginUser.getId(), Flag.YES.getCode()); + Date now = new Date(); + TaskGroup taskGroup = TaskGroup.builder() + .name(name) + .projectCode(projectCode) + .description(description) + .groupSize(groupSize) + .userId(loginUser.getId()) + .status(Flag.YES.getCode()) + .createTime(now) + .updateTime(now) + .build(); - taskGroup.setCreateTime(new Date()); - taskGroup.setUpdateTime(new Date()); if (taskGroupMapper.insert(taskGroup) > 0) { permissionPostHandle(AuthorizationType.TASK_GROUP, loginUser.getId(), Collections.singletonList(taskGroup.getId()), logger); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java index 7d475a621e..1f8f2abb4a 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java @@ -103,8 +103,15 @@ public class TaskGroupServiceTest { } private TaskGroup getTaskGroup() { - TaskGroup taskGroup = new TaskGroup(taskGroupName, 0, taskGroupDesc, - 100, 1, 1); + TaskGroup taskGroup = TaskGroup.builder() + .name(taskGroupName) + .projectCode(0) + .description(taskGroupDesc) + .groupSize(100) + .userId(1) + .status(Flag.YES.getCode()) + .build(); + return taskGroup; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java index dcc9ad3147..0d42d8480f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java @@ -20,13 +20,19 @@ package org.apache.dolphinscheduler.dao.entity; import java.io.Serializable; import java.util.Date; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; @Data +@Builder +@AllArgsConstructor +@NoArgsConstructor @TableName("t_ds_task_group") public class TaskGroup implements Serializable { @@ -70,23 +76,4 @@ public class TaskGroup implements Serializable { */ private long projectCode; - public TaskGroup(String name, long projectCode, String description, int groupSize, int userId, int status) { - this.name = name; - this.projectCode = projectCode; - this.description = description; - this.groupSize = groupSize; - this.userId = userId; - this.status = status; - init(); - - } - - public TaskGroup() { - init(); - } - - public void init() { - this.status = 1; - this.useSize = 0; - } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java index be674856c0..9302d141e3 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java @@ -22,7 +22,10 @@ import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import java.io.Serializable; import java.util.Date; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; @@ -30,6 +33,9 @@ import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor @TableName("t_ds_task_group_queue") public class TaskGroupQueue implements Serializable { @@ -95,18 +101,4 @@ public class TaskGroupQueue implements Serializable { * update time */ private Date updateTime; - - public TaskGroupQueue() { - - } - - public TaskGroupQueue(int taskId, String taskName, int groupId, int processId, int priority, - TaskGroupQueueStatus status) { - this.taskId = taskId; - this.taskName = taskName; - this.groupId = groupId; - this.processId = processId; - this.priority = priority; - this.status = status; - } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java index 819e70dcfb..9893726873 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java @@ -35,14 +35,10 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page; */ public interface TaskGroupMapper extends BaseMapper { - /** - * compard and set to update table of task group - * - * @param id primary key - * @return affected rows - */ - int updateTaskGroupResource(@Param("id") int id, @Param("queueId") int queueId, - @Param("queueStatus") int queueStatus); + int robTaskGroupResource(@Param("id") int id, + @Param("currentUseSize") int currentUseSize, + @Param("queueId") int queueId, + @Param("queueStatus") int queueStatus); /** * update table of task group diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml index 7d3fa96b11..969077309e 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml @@ -74,11 +74,13 @@ - + update t_ds_task_group - set use_size = use_size+1 - where id = #{id} and use_size < group_size and - (select count(1) FROM t_ds_task_group_queue where id = #{queueId} and status = #{queueStatus} ) = 1 + set use_size = use_size + 1 + where id = #{id} + and use_size < group_size + and use_size = #{currentUseSize} + and (select count(1) FROM t_ds_task_group_queue where id = #{queueId} and status = #{queueStatus}) = 1 diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index bca4f0f7dc..e321ce5441 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -219,8 +219,11 @@ public interface ProcessService { void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status); TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId, - String taskName, Integer groupId, - Integer processId, Integer priority, TaskGroupQueueStatus status); + String taskName, + Integer groupId, + Integer processId, + Integer priority, + TaskGroupQueueStatus status); int updateTaskGroupQueueStatus(Integer taskId, int status); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 55f6bde34c..8c171185d2 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -2424,7 +2424,12 @@ public class ProcessServiceImpl implements ProcessService { // Create a waiting taskGroupQueue, after acquire resource, we can update the status to ACQUIRE_SUCCESS TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskId); if (taskGroupQueue == null) { - taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName, groupId, processId, priority, + taskGroupQueue = insertIntoTaskGroupQueue( + taskId, + taskName, + groupId, + processId, + priority, TaskGroupQueueStatus.WAIT_QUEUE); } else { logger.info("The task queue is already exist, taskId: {}", taskId); @@ -2436,7 +2441,9 @@ public class ProcessServiceImpl implements ProcessService { this.taskGroupQueueMapper.updateById(taskGroupQueue); } // check if there already exist higher priority tasks - List highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority, + List highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks( + groupId, + priority, TaskGroupQueueStatus.WAIT_QUEUE.getCode()); if (CollectionUtils.isNotEmpty(highPriorityTasks)) { return false; @@ -2457,20 +2464,27 @@ public class ProcessServiceImpl implements ProcessService { */ @Override public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) { - TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId()); - int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(), - taskGroupQueue.getId(), - TaskGroupQueueStatus.WAIT_QUEUE.getCode()); - if (affectedCount > 0) { - logger.info("Success rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), - taskGroupQueue.getId()); - taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); - this.taskGroupQueueMapper.updateById(taskGroupQueue); - this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); - return true; + // set the default max size to avoid dead loop + for (int i = 0; i < 10; i++) { + TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId()); + if (taskGroup.getGroupSize() <= taskGroup.getUseSize()) { + logger.info("The current task Group is full, taskGroup: {}", taskGroup); + return false; + } + int affectedCount = taskGroupMapper.robTaskGroupResource(taskGroup.getId(), + taskGroup.getUseSize(), + taskGroupQueue.getId(), + TaskGroupQueueStatus.WAIT_QUEUE.getCode()); + if (affectedCount > 0) { + logger.info("Success rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), + taskGroupQueue.getId()); + taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); + this.taskGroupQueueMapper.updateById(taskGroupQueue); + this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); + return true; + } } - logger.info("Failed to rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), - taskGroupQueue.getId()); + logger.info("Failed to rob taskGroup, taskGroupQueue: {}", taskGroupQueue); return false; } @@ -2551,23 +2565,24 @@ public class ProcessServiceImpl implements ProcessService { taskGroupQueueMapper.updateById(taskGroupQueue); } - /** - * insert into task group queue - * - * @param taskId task id - * @param taskName task name - * @param groupId group id - * @param processId process id - * @param priority priority - * @return inserted task group queue - */ - @Override - public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId, - String taskName, Integer groupId, - Integer processId, Integer priority, TaskGroupQueueStatus status) { - TaskGroupQueue taskGroupQueue = new TaskGroupQueue(taskId, taskName, groupId, processId, priority, status); - taskGroupQueue.setCreateTime(new Date()); - taskGroupQueue.setUpdateTime(new Date()); + @Override + public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskInstanceId, + String taskName, + Integer taskGroupId, + Integer workflowInstanceId, + Integer taskGroupPriority, + TaskGroupQueueStatus status) { + Date now = new Date(); + TaskGroupQueue taskGroupQueue = TaskGroupQueue.builder() + .taskId(taskInstanceId) + .taskName(taskName) + .groupId(taskGroupId) + .processId(workflowInstanceId) + .priority(taskGroupPriority) + .status(status) + .createTime(now) + .updateTime(now) + .build(); taskGroupQueueMapper.insert(taskGroupQueue); return taskGroupQueue; }