Browse Source

Fix task group might be rob by multiple task #13115

3.1.3-release
Wenjun Ruan 2 years ago committed by zhuangchong
parent
commit
1796577b53
  1. 15
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
  2. 10
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
  3. 25
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
  4. 20
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
  5. 12
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
  6. 10
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
  7. 7
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  8. 77
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

15
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java

@ -108,11 +108,18 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
putMsg(result, Status.TASK_GROUP_NAME_EXSIT); putMsg(result, Status.TASK_GROUP_NAME_EXSIT);
return result; return result;
} }
TaskGroup taskGroup = new TaskGroup(name, projectCode, description, Date now = new Date();
groupSize, loginUser.getId(), Flag.YES.getCode()); TaskGroup taskGroup = TaskGroup.builder()
.name(name)
.projectCode(projectCode)
.description(description)
.groupSize(groupSize)
.userId(loginUser.getId())
.status(Flag.YES.getCode())
.createTime(now)
.updateTime(now)
.build();
taskGroup.setCreateTime(new Date());
taskGroup.setUpdateTime(new Date());
if (taskGroupMapper.insert(taskGroup) > 0) { if (taskGroupMapper.insert(taskGroup) > 0) {
permissionPostHandle(AuthorizationType.TASK_GROUP, loginUser.getId(), permissionPostHandle(AuthorizationType.TASK_GROUP, loginUser.getId(),
Collections.singletonList(taskGroup.getId()), logger); Collections.singletonList(taskGroup.getId()), logger);

10
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java

@ -100,8 +100,14 @@ public class TaskGroupServiceTest {
} }
private TaskGroup getTaskGroup() { private TaskGroup getTaskGroup() {
TaskGroup taskGroup = new TaskGroup(taskGroupName,0, taskGroupDesc, TaskGroup taskGroup = TaskGroup.builder()
100, 1,1); .name(taskGroupName)
.projectCode(0)
.description(taskGroupDesc)
.groupSize(100)
.userId(1)
.status(Flag.YES.getCode())
.build();
return taskGroup; return taskGroup;
} }

25
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java

@ -20,13 +20,19 @@ package org.apache.dolphinscheduler.dao.entity;
import java.io.Serializable; import java.io.Serializable;
import java.util.Date; import java.util.Date;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
@Data @Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@TableName("t_ds_task_group") @TableName("t_ds_task_group")
public class TaskGroup implements Serializable { public class TaskGroup implements Serializable {
@ -70,23 +76,4 @@ public class TaskGroup implements Serializable {
*/ */
private long projectCode; private long projectCode;
public TaskGroup(String name, long projectCode, String description, int groupSize, int userId, int status) {
this.name = name;
this.projectCode = projectCode;
this.description = description;
this.groupSize = groupSize;
this.userId = userId;
this.status = status;
init();
}
public TaskGroup() {
init();
}
public void init() {
this.status = 1;
this.useSize = 0;
}
} }

20
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java

@ -22,7 +22,10 @@ import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import java.io.Serializable; import java.io.Serializable;
import java.util.Date; import java.util.Date;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableField;
@ -30,6 +33,9 @@ import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
@Data @Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("t_ds_task_group_queue") @TableName("t_ds_task_group_queue")
public class TaskGroupQueue implements Serializable { public class TaskGroupQueue implements Serializable {
@ -95,18 +101,4 @@ public class TaskGroupQueue implements Serializable {
* update time * update time
*/ */
private Date updateTime; private Date updateTime;
public TaskGroupQueue() {
}
public TaskGroupQueue(int taskId, String taskName, int groupId, int processId, int priority,
TaskGroupQueueStatus status) {
this.taskId = taskId;
this.taskName = taskName;
this.groupId = groupId;
this.processId = processId;
this.priority = priority;
this.status = status;
}
} }

12
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java

@ -35,14 +35,10 @@ import java.util.List;
*/ */
public interface TaskGroupMapper extends BaseMapper<TaskGroup> { public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
/** int robTaskGroupResource(@Param("id") int id,
* compard and set to update table of task group @Param("currentUseSize") int currentUseSize,
* @Param("queueId") int queueId,
* @param id primary key @Param("queueStatus") int queueStatus);
* @return affected rows
*/
int updateTaskGroupResource(@Param("id") int id, @Param("queueId") int queueId,
@Param("queueStatus") int queueStatus);
/** /**
* update table of task group * update table of task group

10
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml

@ -74,11 +74,13 @@
</select> </select>
<!--modify data by id--> <!--modify data by id-->
<update id="updateTaskGroupResource"> <update id="robTaskGroupResource">
update t_ds_task_group update t_ds_task_group
set use_size = use_size+1 set use_size = use_size + 1
where id = #{id} and use_size &lt; group_size and where id = #{id}
(select count(1) FROM t_ds_task_group_queue where id = #{queueId} and status = #{queueStatus} ) = 1 and use_size &lt; group_size
and use_size = #{currentUseSize}
and (select count(1) FROM t_ds_task_group_queue where id = #{queueId} and status = #{queueStatus}) = 1
</update> </update>
<!--modify data by id--> <!--modify data by id-->

7
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -281,8 +281,11 @@ public interface ProcessService {
void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status); void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status);
TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId, TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
String taskName, Integer groupId, String taskName,
Integer processId, Integer priority, TaskGroupQueueStatus status); Integer groupId,
Integer processId,
Integer priority,
TaskGroupQueueStatus status);
int updateTaskGroupQueueStatus(Integer taskId, int status); int updateTaskGroupQueueStatus(Integer taskId, int status);

77
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -2944,7 +2944,12 @@ public class ProcessServiceImpl implements ProcessService {
// Create a waiting taskGroupQueue, after acquire resource, we can update the status to ACQUIRE_SUCCESS // Create a waiting taskGroupQueue, after acquire resource, we can update the status to ACQUIRE_SUCCESS
TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskId); TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskId);
if (taskGroupQueue == null) { if (taskGroupQueue == null) {
taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName, groupId, processId, priority, taskGroupQueue = insertIntoTaskGroupQueue(
taskId,
taskName,
groupId,
processId,
priority,
TaskGroupQueueStatus.WAIT_QUEUE); TaskGroupQueueStatus.WAIT_QUEUE);
} else { } else {
logger.info("The task queue is already exist, taskId: {}", taskId); logger.info("The task queue is already exist, taskId: {}", taskId);
@ -2956,7 +2961,9 @@ public class ProcessServiceImpl implements ProcessService {
this.taskGroupQueueMapper.updateById(taskGroupQueue); this.taskGroupQueueMapper.updateById(taskGroupQueue);
} }
// check if there already exist higher priority tasks // check if there already exist higher priority tasks
List<TaskGroupQueue> highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority, List<TaskGroupQueue> highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(
groupId,
priority,
TaskGroupQueueStatus.WAIT_QUEUE.getCode()); TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (CollectionUtils.isNotEmpty(highPriorityTasks)) { if (CollectionUtils.isNotEmpty(highPriorityTasks)) {
return false; return false;
@ -2977,20 +2984,27 @@ public class ProcessServiceImpl implements ProcessService {
*/ */
@Override @Override
public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) { public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) {
TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId()); // set the default max size to avoid dead loop
int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(), for (int i = 0; i < 10; i++) {
taskGroupQueue.getId(), TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId());
TaskGroupQueueStatus.WAIT_QUEUE.getCode()); if (taskGroup.getGroupSize() <= taskGroup.getUseSize()) {
if (affectedCount > 0) { logger.info("The current task Group is full, taskGroup: {}", taskGroup);
logger.info("Success rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), return false;
taskGroupQueue.getId()); }
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); int affectedCount = taskGroupMapper.robTaskGroupResource(taskGroup.getId(),
this.taskGroupQueueMapper.updateById(taskGroupQueue); taskGroup.getUseSize(),
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); taskGroupQueue.getId(),
return true; TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (affectedCount > 0) {
logger.info("Success rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(),
taskGroupQueue.getId());
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
this.taskGroupQueueMapper.updateById(taskGroupQueue);
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
return true;
}
} }
logger.info("Failed to rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), logger.info("Failed to rob taskGroup, taskGroupQueue: {}", taskGroupQueue);
taskGroupQueue.getId());
return false; return false;
} }
@ -3071,23 +3085,24 @@ public class ProcessServiceImpl implements ProcessService {
taskGroupQueueMapper.updateById(taskGroupQueue); taskGroupQueueMapper.updateById(taskGroupQueue);
} }
/**
* insert into task group queue
*
* @param taskId task id
* @param taskName task name
* @param groupId group id
* @param processId process id
* @param priority priority
* @return inserted task group queue
*/
@Override @Override
public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId, public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskInstanceId,
String taskName, Integer groupId, String taskName,
Integer processId, Integer priority, TaskGroupQueueStatus status) { Integer taskGroupId,
TaskGroupQueue taskGroupQueue = new TaskGroupQueue(taskId, taskName, groupId, processId, priority, status); Integer workflowInstanceId,
taskGroupQueue.setCreateTime(new Date()); Integer taskGroupPriority,
taskGroupQueue.setUpdateTime(new Date()); TaskGroupQueueStatus status) {
Date now = new Date();
TaskGroupQueue taskGroupQueue = TaskGroupQueue.builder()
.taskId(taskInstanceId)
.taskName(taskName)
.groupId(taskGroupId)
.processId(workflowInstanceId)
.priority(taskGroupPriority)
.status(status)
.createTime(now)
.updateTime(now)
.build();
taskGroupQueueMapper.insert(taskGroupQueue); taskGroupQueueMapper.insert(taskGroupQueue);
return taskGroupQueue; return taskGroupQueue;
} }

Loading…
Cancel
Save