diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java index cada1c7092..8b8241a2ad 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java @@ -122,4 +122,18 @@ public interface TaskGroupQueueMapper extends BaseMapper { @Param("status") int status, @Param("inQueue") int inQueue, @Param("forceStart") int forceStart); + + int countUsingTaskGroupQueueByGroupId(@Param("taskGroupId") Integer taskGroupId, + @Param("status") int status, + @Param("inQueue") int inQueue, + @Param("forceStart") int forceStart); + + List queryInQueueTaskGroupQueue(@Param("inQueue") int inQueue, + @Param("minTaskGroupQueueId") int minTaskGroupQueueId, + @Param("limit") int limit); + + List queryWaitNotifyForceStartTaskGroupQueue(@Param("inQueue") int inQueue, + @Param("forceStart") int forceStart, + @Param("minTaskGroupQueueId") int minTaskGroupQueueId, + @Param("limit") int limit); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java index a788b29bb1..468b7b758c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java @@ -38,6 +38,17 @@ public interface TaskGroupQueueDao extends IDao { */ List queryAllInQueueTaskGroupQueue(); + /** + * Query all {@link TaskGroupQueue} which + * in_queue is {@link org.apache.dolphinscheduler.common.enums.Flag#YES} + * and id > minTaskGroupQueueId + * ordered by id asc + * limit #{limit} + * + * @return TaskGroupQueue ordered by id asc + */ + List queryInQueueTaskGroupQueue(int minTaskGroupQueueId, int limit); + /** * Query all {@link TaskGroupQueue} which in_queue is {@link org.apache.dolphinscheduler.common.enums.Flag#YES} and taskGroupId is taskGroupId * @@ -61,4 +72,24 @@ public interface TaskGroupQueueDao extends IDao { * @return TaskGroupQueue */ List queryAcquiredTaskGroupQueueByGroupId(Integer taskGroupId); + + /** + * Count all {@link TaskGroupQueue} which status is TaskGroupQueueStatus.ACQUIRE_SUCCESS and forceStart is {@link org.apache.dolphinscheduler.common.enums.Flag#NO}. + * + * @param taskGroupId taskGroupId + * @return TaskGroupQueue + */ + int countUsingTaskGroupQueueByGroupId(Integer taskGroupId); + + /** + * Query all {@link TaskGroupQueue} which + * in_queue is {@link org.apache.dolphinscheduler.common.enums.Flag#YES} + * and forceStart is {@link org.apache.dolphinscheduler.common.enums.Flag#YES} + * and id > minTaskGroupQueueId + * order by id asc + * limit #{limit} + * + * @return TaskGroupQueue ordered by priority desc + */ + List queryWaitNotifyForceStartTaskGroupQueue(int minTaskGroupQueueId, int limit); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java index a1808a9091..5fd50deaae 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java @@ -52,6 +52,11 @@ public class TaskGroupQueueDaoImpl extends BaseDao queryInQueueTaskGroupQueue(int minTaskGroupQueueId, int limit) { + return mybatisMapper.queryInQueueTaskGroupQueue(Flag.YES.getCode(), minTaskGroupQueueId, limit); + } + @Override public List queryAllInQueueTaskGroupQueueByGroupId(Integer taskGroupId) { return mybatisMapper.queryAllInQueueTaskGroupQueueByGroupId(taskGroupId, Flag.YES.getCode()); @@ -70,4 +75,21 @@ public class TaskGroupQueueDaoImpl extends BaseDao queryWaitNotifyForceStartTaskGroupQueue(int minTaskGroupQueueId, int limit) { + return mybatisMapper.queryWaitNotifyForceStartTaskGroupQueue( + Flag.YES.getCode(), + Flag.YES.getCode(), + minTaskGroupQueueId, + limit); + } } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml index 790ad7bfae..800e82e847 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml @@ -219,6 +219,16 @@ where in_queue = #{inQueue} order by priority desc + + + + + + diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImplTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImplTest.java index 17c1537184..13dcf91f55 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImplTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImplTest.java @@ -27,7 +27,11 @@ import org.apache.dolphinscheduler.dao.BaseDaoTest; import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.repository.TaskGroupQueueDao; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.RandomUtils; + import java.util.Date; +import java.util.List; import org.assertj.core.util.Lists; import org.junit.jupiter.api.Test; @@ -55,6 +59,35 @@ class TaskGroupQueueDaoImplTest extends BaseDaoTest { assertEquals(1, taskGroupQueueDao.queryAllInQueueTaskGroupQueue().size()); } + @Test + void queryInQueueTaskGroupQueue_withMinId() { + // Insert 1w ~ 10w records + int insertCount = RandomUtils.nextInt(10000, 100000); + List insertTaskGroupQueue = Lists.newArrayList(); + for (int i = 0; i < insertCount; i++) { + TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS); + insertTaskGroupQueue.add(taskGroupQueue); + } + taskGroupQueueDao.insertBatch(insertTaskGroupQueue); + + int minTaskGroupQueueId = -1; + int limit = 1000; + int queryCount = 0; + while (true) { + List taskGroupQueues = + taskGroupQueueDao.queryInQueueTaskGroupQueue(minTaskGroupQueueId, limit); + if (CollectionUtils.isEmpty(taskGroupQueues)) { + break; + } + queryCount += taskGroupQueues.size(); + if (taskGroupQueues.size() < limit) { + break; + } + minTaskGroupQueueId = taskGroupQueues.get(taskGroupQueues.size() - 1).getId(); + } + assertEquals(insertCount, queryCount); + } + @Test void queryAllInQueueTaskGroupQueueByGroupId() { TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS); @@ -91,6 +124,49 @@ class TaskGroupQueueDaoImplTest extends BaseDaoTest { assertEquals(1, taskGroupQueueDao.queryAcquiredTaskGroupQueueByGroupId(1).size()); } + @Test + void countUsingTaskGroupQueueByGroupId() { + assertEquals(0, taskGroupQueueDao.countUsingTaskGroupQueueByGroupId(1)); + + TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.NO, TaskGroupQueueStatus.ACQUIRE_SUCCESS); + taskGroupQueueDao.insert(taskGroupQueue); + assertEquals(1, taskGroupQueueDao.countUsingTaskGroupQueueByGroupId(1)); + + taskGroupQueue = createTaskGroupQueue(Flag.YES, TaskGroupQueueStatus.WAIT_QUEUE); + taskGroupQueueDao.insert(taskGroupQueue); + assertEquals(1, taskGroupQueueDao.countUsingTaskGroupQueueByGroupId(1)); + } + + @Test + void queryWaitNotifyForceStartTaskGroupQueue() { + // Insert 1w records + int insertCount = RandomUtils.nextInt(10000, 20000); + List insertTaskGroupQueue = Lists.newArrayList(); + for (int i = 0; i < insertCount; i++) { + TaskGroupQueue taskGroupQueue = createTaskGroupQueue(Flag.YES, TaskGroupQueueStatus.ACQUIRE_SUCCESS); + + insertTaskGroupQueue.add(taskGroupQueue); + } + taskGroupQueueDao.insertBatch(insertTaskGroupQueue); + + int beginTaskGroupQueueId = -1; + int limit = 1000; + int queryCount = 0; + while (true) { + List taskGroupQueues = + taskGroupQueueDao.queryWaitNotifyForceStartTaskGroupQueue(beginTaskGroupQueueId, limit); + if (CollectionUtils.isEmpty(taskGroupQueues)) { + break; + } + queryCount += taskGroupQueues.size(); + if (taskGroupQueues.size() < limit) { + break; + } + beginTaskGroupQueueId = taskGroupQueues.get(taskGroupQueues.size() - 1).getId(); + } + assertEquals(insertCount, queryCount); + } + private TaskGroupQueue createTaskGroupQueue(Flag forceStart, TaskGroupQueueStatus taskGroupQueueStatus) { return TaskGroupQueue.builder() .taskId(1) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java index fae0d2b91f..bd7af94611 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java @@ -96,6 +96,8 @@ public class TaskGroupCoordinator extends BaseDaemonThread { @Autowired private ProcessInstanceDao processInstanceDao; + private static int DEFAULT_LIMIT = 1000; + public TaskGroupCoordinator() { super("TaskGroupCoordinator"); } @@ -147,10 +149,10 @@ public class TaskGroupCoordinator extends BaseDaemonThread { if (CollectionUtils.isEmpty(taskGroups)) { return; } + StopWatch taskGroupCoordinatorRoundTimeCost = StopWatch.createStarted(); + for (TaskGroup taskGroup : taskGroups) { - List taskGroupQueues = - taskGroupQueueDao.queryAcquiredTaskGroupQueueByGroupId(taskGroup.getId()); - int actualUseSize = taskGroupQueues.size(); + int actualUseSize = taskGroupQueueDao.countUsingTaskGroupQueueByGroupId(taskGroup.getId()); if (taskGroup.getUseSize() == actualUseSize) { continue; } @@ -160,13 +162,35 @@ public class TaskGroupCoordinator extends BaseDaemonThread { taskGroup.setUseSize(actualUseSize); taskGroupDao.updateById(taskGroup); } + log.info("Success amend TaskGroup useSize cost: {}/ms", taskGroupCoordinatorRoundTimeCost.getTime()); } /** * Make sure the TaskGroupQueue status is {@link TaskGroupQueueStatus#RELEASE} when the related {@link TaskInstance} is not exist or status is finished. */ private void amendTaskGroupQueueStatus() { - List taskGroupQueues = taskGroupQueueDao.queryAllInQueueTaskGroupQueue(); + int minTaskGroupQueueId = -1; + int limit = DEFAULT_LIMIT; + StopWatch taskGroupCoordinatorRoundTimeCost = StopWatch.createStarted(); + while (true) { + List taskGroupQueues = + taskGroupQueueDao.queryInQueueTaskGroupQueue(minTaskGroupQueueId, limit); + if (CollectionUtils.isEmpty(taskGroupQueues)) { + break; + } + amendTaskGroupQueueStatus(taskGroupQueues); + if (taskGroupQueues.size() < limit) { + break; + } + minTaskGroupQueueId = taskGroupQueues.get(taskGroupQueues.size() - 1).getId(); + } + log.info("Success amend TaskGroupQueue status cost: {}/ms", taskGroupCoordinatorRoundTimeCost.getTime()); + } + + /** + * Make sure the TaskGroupQueue status is {@link TaskGroupQueueStatus#RELEASE} when the related {@link TaskInstance} is not exist or status is finished. + */ + private void amendTaskGroupQueueStatus(List taskGroupQueues) { List taskInstanceIds = taskGroupQueues.stream() .map(TaskGroupQueue::getTaskId) .collect(Collectors.toList()); @@ -198,10 +222,30 @@ public class TaskGroupCoordinator extends BaseDaemonThread { // Find the force start task group queue(Which is inQueue and forceStart is YES) // Notify the related waiting task instance // Set the taskGroupQueue status to RELEASE and remove it from queue - List taskGroupQueues = taskGroupQueueDao.queryAllInQueueTaskGroupQueue() - .stream() - .filter(taskGroupQueue -> Flag.YES.getCode() == taskGroupQueue.getForceStart()) - .collect(Collectors.toList()); + // We use limit here to avoid OOM, and we will retry to notify force start queue at next time + int minTaskGroupQueueId = -1; + int limit = DEFAULT_LIMIT; + StopWatch taskGroupCoordinatorRoundTimeCost = StopWatch.createStarted(); + while (true) { + List taskGroupQueues = + taskGroupQueueDao.queryWaitNotifyForceStartTaskGroupQueue(minTaskGroupQueueId, limit); + if (CollectionUtils.isEmpty(taskGroupQueues)) { + break; + } + dealWithForceStartTaskGroupQueue(taskGroupQueues); + if (taskGroupQueues.size() < limit) { + break; + } + minTaskGroupQueueId = taskGroupQueues.get(taskGroupQueues.size() - 1).getId(); + } + log.info("Success deal with force start TaskGroupQueue cost: {}/ms", + taskGroupCoordinatorRoundTimeCost.getTime()); + } + + private void dealWithForceStartTaskGroupQueue(List taskGroupQueues) { + // Find the force start task group queue(Which is inQueue and forceStart is YES) + // Notify the related waiting task instance + // Set the taskGroupQueue status to RELEASE and remove it from queue for (TaskGroupQueue taskGroupQueue : taskGroupQueues) { try { LogUtils.setTaskInstanceIdMDC(taskGroupQueue.getTaskId());