From d99ba29b66aa08398c2be6fe166c66c4fccb0d34 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 16 Nov 2022 10:20:22 +0800 Subject: [PATCH] Fix master cluster may loop command unbalanced (#12891) (cherry picked from commit 3b2b86661be76b7c1404a910c865d78b7936313d) --- .../dolphinscheduler/dao/mapper/CommandMapper.java | 2 +- .../dolphinscheduler/dao/mapper/CommandMapper.xml | 2 +- .../dolphinscheduler/dao/mapper/CommandMapperTest.java | 2 +- .../server/master/runner/MasterSchedulerBootstrap.java | 10 +++++----- .../service/command/CommandService.java | 3 +-- .../service/command/CommandServiceImpl.java | 4 ++-- .../service/command/CommandServiceImplTest.java | 3 +-- 7 files changed, 12 insertions(+), 14 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java index 877f7442e2..d3cdb1a3ab 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java @@ -54,7 +54,7 @@ public interface CommandMapper extends BaseMapper { * query command page by slot * @return command list */ - List queryCommandPageBySlot(@Param("limit") int limit, @Param("offset") int offset, + List queryCommandPageBySlot(@Param("limit") int limit, @Param("masterCount") int masterCount, @Param("thisMasterSlot") int thisMasterSlot); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml index aa2bf13bdc..0cbf7ea0ff 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml @@ -45,6 +45,6 @@ from t_ds_command where id % #{masterCount} = #{thisMasterSlot} order by process_instance_priority, id asc - limit #{limit} offset #{offset} + limit #{limit} diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java index bd465fbaea..beaf75cf8c 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java @@ -176,7 +176,7 @@ public class CommandMapperTest extends BaseDaoTest { Command command = createCommand(); Integer id = command.getId(); boolean hit = id % masterCount == thisMasterSlot; - List commandList = commandMapper.queryCommandPageBySlot(1, 0, masterCount, thisMasterSlot); + List commandList = commandMapper.queryCommandPageBySlot(1, masterCount, thisMasterSlot); if (hit) { Assertions.assertEquals(id, commandList.get(0).getId()); } else { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java index 32b4ec9b96..42bbe075ab 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java @@ -269,16 +269,16 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl logger.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot); return Collections.emptyList(); } - int pageNumber = 0; int pageSize = masterConfig.getFetchCommandNum(); final List result = - commandService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot); + commandService.findCommandPageBySlot(pageSize, masterCount, thisMasterSlot); if (CollectionUtils.isNotEmpty(result)) { + long cost = System.currentTimeMillis() - scheduleStartTime; logger.info( - "Master schedule bootstrap loop command success, command size: {}, current slot: {}, total slot size: {}", - result.size(), thisMasterSlot, masterCount); + "Master schedule bootstrap loop command success, fetch command size: {}, cost: {}ms, current slot: {}, total slot size: {}", + result.size(), cost, thisMasterSlot, masterCount); + ProcessInstanceMetrics.recordCommandQueryTime(cost); } - ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime); return result; } catch (Exception ex) { throw new MasterException("Master loop command from database error", ex); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java index 48d0a1bbae..cff73c503f 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java @@ -47,12 +47,11 @@ public interface CommandService { /** * Get command page * @param pageSize page size - * @param pageNumber page number * @param masterCount master count * @param thisMasterSlot master slot * @return command page */ - List findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot); + List findCommandPageBySlot(int pageSize, int masterCount, int thisMasterSlot); /** * check the input command exists in queue list diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java index 5d04013ba2..4b778af7ca 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java @@ -109,11 +109,11 @@ public class CommandServiceImpl implements CommandService { } @Override - public List findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) { + public List findCommandPageBySlot(int pageSize, int masterCount, int thisMasterSlot) { if (masterCount <= 0) { return Lists.newArrayList(); } - return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot); + return commandMapper.queryCommandPageBySlot(pageSize, masterCount, thisMasterSlot); } @Override diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/CommandServiceImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/CommandServiceImplTest.java index 64266827f7..1139bcf1da 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/CommandServiceImplTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/CommandServiceImplTest.java @@ -217,11 +217,10 @@ class CommandServiceImplTest { @Test public void testFindCommandPageBySlot() { int pageSize = 1; - int pageNumber = 0; int masterCount = 0; int thisMasterSlot = 2; List commandList = - commandService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot); + commandService.findCommandPageBySlot(pageSize, masterCount, thisMasterSlot); Assertions.assertEquals(0, commandList.size()); }