Browse Source

Fix task group might be rob by multiple task (#13115)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
f5801a0e1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
  2. 11
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
  3. 25
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
  4. 20
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
  5. 10
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
  6. 8
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
  7. 7
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  8. 59
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

15
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java

@ -112,11 +112,18 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
putMsg(result, Status.TASK_GROUP_NAME_EXSIT);
return result;
}
TaskGroup taskGroup = new TaskGroup(name, projectCode, description,
groupSize, loginUser.getId(), Flag.YES.getCode());
Date now = new Date();
TaskGroup taskGroup = TaskGroup.builder()
.name(name)
.projectCode(projectCode)
.description(description)
.groupSize(groupSize)
.userId(loginUser.getId())
.status(Flag.YES.getCode())
.createTime(now)
.updateTime(now)
.build();
taskGroup.setCreateTime(new Date());
taskGroup.setUpdateTime(new Date());
if (taskGroupMapper.insert(taskGroup) > 0) {
permissionPostHandle(AuthorizationType.TASK_GROUP, loginUser.getId(),
Collections.singletonList(taskGroup.getId()), logger);

11
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java

@ -103,8 +103,15 @@ public class TaskGroupServiceTest {
}
private TaskGroup getTaskGroup() {
TaskGroup taskGroup = new TaskGroup(taskGroupName, 0, taskGroupDesc,
100, 1, 1);
TaskGroup taskGroup = TaskGroup.builder()
.name(taskGroupName)
.projectCode(0)
.description(taskGroupDesc)
.groupSize(100)
.userId(1)
.status(Flag.YES.getCode())
.build();
return taskGroup;
}

25
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java

@ -20,13 +20,19 @@ package org.apache.dolphinscheduler.dao.entity;
import java.io.Serializable;
import java.util.Date;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@TableName("t_ds_task_group")
public class TaskGroup implements Serializable {
@ -70,23 +76,4 @@ public class TaskGroup implements Serializable {
*/
private long projectCode;
public TaskGroup(String name, long projectCode, String description, int groupSize, int userId, int status) {
this.name = name;
this.projectCode = projectCode;
this.description = description;
this.groupSize = groupSize;
this.userId = userId;
this.status = status;
init();
}
public TaskGroup() {
init();
}
public void init() {
this.status = 1;
this.useSize = 0;
}
}

20
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java

@ -22,7 +22,10 @@ import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import java.io.Serializable;
import java.util.Date;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
@ -30,6 +33,9 @@ import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("t_ds_task_group_queue")
public class TaskGroupQueue implements Serializable {
@ -95,18 +101,4 @@ public class TaskGroupQueue implements Serializable {
* update time
*/
private Date updateTime;
public TaskGroupQueue() {
}
public TaskGroupQueue(int taskId, String taskName, int groupId, int processId, int priority,
TaskGroupQueueStatus status) {
this.taskId = taskId;
this.taskName = taskName;
this.groupId = groupId;
this.processId = processId;
this.priority = priority;
this.status = status;
}
}

10
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java

@ -35,13 +35,9 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
*/
public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
/**
* compard and set to update table of task group
*
* @param id primary key
* @return affected rows
*/
int updateTaskGroupResource(@Param("id") int id, @Param("queueId") int queueId,
int robTaskGroupResource(@Param("id") int id,
@Param("currentUseSize") int currentUseSize,
@Param("queueId") int queueId,
@Param("queueStatus") int queueStatus);
/**

8
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml

@ -74,11 +74,13 @@
</select>
<!--modify data by id-->
<update id="updateTaskGroupResource">
<update id="robTaskGroupResource">
update t_ds_task_group
set use_size = use_size + 1
where id = #{id} and use_size &lt; group_size and
(select count(1) FROM t_ds_task_group_queue where id = #{queueId} and status = #{queueStatus} ) = 1
where id = #{id}
and use_size &lt; group_size
and use_size = #{currentUseSize}
and (select count(1) FROM t_ds_task_group_queue where id = #{queueId} and status = #{queueStatus}) = 1
</update>
<!--modify data by id-->

7
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -219,8 +219,11 @@ public interface ProcessService {
void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status);
TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
String taskName, Integer groupId,
Integer processId, Integer priority, TaskGroupQueueStatus status);
String taskName,
Integer groupId,
Integer processId,
Integer priority,
TaskGroupQueueStatus status);
int updateTaskGroupQueueStatus(Integer taskId, int status);

59
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -2424,7 +2424,12 @@ public class ProcessServiceImpl implements ProcessService {
// Create a waiting taskGroupQueue, after acquire resource, we can update the status to ACQUIRE_SUCCESS
TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskId);
if (taskGroupQueue == null) {
taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName, groupId, processId, priority,
taskGroupQueue = insertIntoTaskGroupQueue(
taskId,
taskName,
groupId,
processId,
priority,
TaskGroupQueueStatus.WAIT_QUEUE);
} else {
logger.info("The task queue is already exist, taskId: {}", taskId);
@ -2436,7 +2441,9 @@ public class ProcessServiceImpl implements ProcessService {
this.taskGroupQueueMapper.updateById(taskGroupQueue);
}
// check if there already exist higher priority tasks
List<TaskGroupQueue> highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority,
List<TaskGroupQueue> highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(
groupId,
priority,
TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (CollectionUtils.isNotEmpty(highPriorityTasks)) {
return false;
@ -2457,8 +2464,15 @@ public class ProcessServiceImpl implements ProcessService {
*/
@Override
public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) {
// set the default max size to avoid dead loop
for (int i = 0; i < 10; i++) {
TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId());
int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(),
if (taskGroup.getGroupSize() <= taskGroup.getUseSize()) {
logger.info("The current task Group is full, taskGroup: {}", taskGroup);
return false;
}
int affectedCount = taskGroupMapper.robTaskGroupResource(taskGroup.getId(),
taskGroup.getUseSize(),
taskGroupQueue.getId(),
TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (affectedCount > 0) {
@ -2469,8 +2483,8 @@ public class ProcessServiceImpl implements ProcessService {
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
return true;
}
logger.info("Failed to rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(),
taskGroupQueue.getId());
}
logger.info("Failed to rob taskGroup, taskGroupQueue: {}", taskGroupQueue);
return false;
}
@ -2551,23 +2565,24 @@ public class ProcessServiceImpl implements ProcessService {
taskGroupQueueMapper.updateById(taskGroupQueue);
}
/**
* insert into task group queue
*
* @param taskId task id
* @param taskName task name
* @param groupId group id
* @param processId process id
* @param priority priority
* @return inserted task group queue
*/
@Override
public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
String taskName, Integer groupId,
Integer processId, Integer priority, TaskGroupQueueStatus status) {
TaskGroupQueue taskGroupQueue = new TaskGroupQueue(taskId, taskName, groupId, processId, priority, status);
taskGroupQueue.setCreateTime(new Date());
taskGroupQueue.setUpdateTime(new Date());
@Override
public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskInstanceId,
String taskName,
Integer taskGroupId,
Integer workflowInstanceId,
Integer taskGroupPriority,
TaskGroupQueueStatus status) {
Date now = new Date();
TaskGroupQueue taskGroupQueue = TaskGroupQueue.builder()
.taskId(taskInstanceId)
.taskName(taskName)
.groupId(taskGroupId)
.processId(workflowInstanceId)
.priority(taskGroupPriority)
.status(status)
.createTime(now)
.updateTime(now)
.build();
taskGroupQueueMapper.insert(taskGroupQueue);
return taskGroupQueue;
}

Loading…
Cancel
Save