From d3251c9bcc5744905723df7fab11a676029e7d6e Mon Sep 17 00:00:00 2001 From: worry <7039986@qq.com> Date: Mon, 28 Mar 2022 09:52:12 +0800 Subject: [PATCH] [Improvement-9227][master]implement use the slot to scan the database (#9228) when the master assigns tasks by slot,implement use the slot to scan the database. This closes #9227 --- .../dao/mapper/CommandMapper.java | 6 ++++ .../dao/mapper/CommandMapper.xml | 8 +++++ .../dao/mapper/CommandMapperTest.java | 33 +++++++++++++++++++ .../master/runner/MasterSchedulerService.java | 25 +++----------- .../service/process/ProcessService.java | 10 ++++++ .../service/process/ProcessServiceTest.java | 10 ++++++ 6 files changed, 72 insertions(+), 20 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java index c1e30fdfb5..8305ae2965 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java @@ -52,4 +52,10 @@ public interface CommandMapper extends BaseMapper { */ List queryCommandPage(@Param("limit") int limit, @Param("offset") int offset); + + /** + * query command page by slot + * @return command list + */ + List queryCommandPageBySlot(@Param("limit") int limit, @Param("offset") int offset, @Param("masterCount") int masterCount, @Param("thisMasterSlot") int thisMasterSlot); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml index b0ea477431..aa2bf13bdc 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml @@ -39,4 +39,12 @@ order by process_instance_priority, id asc limit #{limit} offset #{offset} + + diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java index b056d91e24..c937eabf7b 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.dao.mapper; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -166,6 +167,38 @@ public class CommandMapperTest extends BaseDaoTest { assertThat(actualCommandCounts.size(),greaterThanOrEqualTo(1)); } + /** + * test query command page by slot + */ + @Test + public void testQueryCommandPageBySlot() { + int masterCount = 4; + int thisMasterSlot = 2; + // for hit or miss + toTestQueryCommandPageBySlot(masterCount,thisMasterSlot); + toTestQueryCommandPageBySlot(masterCount,thisMasterSlot); + toTestQueryCommandPageBySlot(masterCount,thisMasterSlot); + toTestQueryCommandPageBySlot(masterCount,thisMasterSlot); + } + + private boolean toTestQueryCommandPageBySlot(int masterCount, int thisMasterSlot) { + Command command = createCommand(); + int id = command.getId(); + boolean hit = id % masterCount == thisMasterSlot; + List commandList = commandMapper.queryCommandPageBySlot(1, 0, masterCount, thisMasterSlot); + if (hit) { + assertEquals(id,commandList.get(0).getId()); + } else { + commandList.forEach(o -> { + assertNotEquals(id, o.getId()); + assertEquals(thisMasterSlot, o.getId() % masterCount); + }); + } + return hit; + } + + + /** * create command map * @param count map count diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 56b6aacaed..f448f0fa7c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -226,27 +226,12 @@ public class MasterSchedulerService extends Thread { int pageNumber = 0; int pageSize = masterConfig.getFetchCommandNum(); List result = new ArrayList<>(); - while (Stopper.isRunning()) { - // todo: Can we use the slot to scan database? - List commandList = processService.findCommandPage(pageSize, pageNumber); - if (commandList.size() == 0) { - return result; - } - for (Command command : commandList) { - SlotCheckState slotCheckState = slotCheck(command); - if (slotCheckState.equals(SlotCheckState.CHANGE)) { - // return and wait next scan, don't reset param, waste resources of cpu - return new ArrayList<>(); - } - if (slotCheckState.equals(SlotCheckState.PASS)) { - result.add(command); - } - } - if (CollectionUtils.isNotEmpty(result)) { - logger.info("find {} commands, slot:{}", result.size(), ServerNodeManager.getSlot()); - break; + if (Stopper.isRunning()) { + int thisMasterSlot = ServerNodeManager.getSlot(); + int masterCount = ServerNodeManager.getMasterSize(); + if (masterCount > 0) { + result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot); } - pageNumber += 1; } return result; } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index deaf56816b..cf12b05960 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -407,6 +407,16 @@ public class ProcessService { return commandMapper.queryCommandPage(pageSize, pageNumber * pageSize); } + /** + * get command page + */ + public List findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) { + if (masterCount <= 0) { + return Lists.newArrayList(); + } + return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot); + } + /** * check the input command exists in queue list * diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 37c12f36a8..c57e6ba5c5 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -844,6 +844,16 @@ public class ProcessServiceTest { Assert.assertEquals(instance.getId(), taskInstanceByIdList.get(0).getId()); } + @Test + public void testFindCommandPageBySlot() { + int pageSize = 1; + int pageNumber = 0; + int masterCount = 0; + int thisMasterSlot = 2; + List commandList = processService.findCommandPageBySlot(pageSize,pageNumber,masterCount,thisMasterSlot); + Assert.assertEquals(0,commandList.size()); + } + private TaskGroupQueue getTaskGroupQueue() { TaskGroupQueue taskGroupQueue = new TaskGroupQueue(); taskGroupQueue.setTaskName("task name");