From 647cbae4002c0ab3758d57827460ad7125a2c853 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Mon, 29 Apr 2024 16:14:23 +0800 Subject: [PATCH] [DSIP-32][Master] Add command fetcher strategy for master fetch command (#15900) --- .../dolphinscheduler_env.sh | 1 - .../dolphinscheduler_env.sh | 1 - .../dolphinscheduler_env.sh | 1 - .../dolphinscheduler_env.sh | 1 - docs/docs/en/architecture/configuration.md | 4 +- .../en/guide/installation/pseudo-cluster.md | 1 - docs/docs/zh/architecture/configuration.md | 47 +++++----- .../zh/guide/installation/pseudo-cluster.md | 1 - .../dao/mapper/CommandMapper.java | 12 +-- .../dao/repository/BaseDao.java | 5 ++ .../dao/repository/CommandDao.java | 39 ++++++++ .../dolphinscheduler/dao/repository/IDao.java | 5 ++ .../dao/repository/impl/CommandDaoImpl.java | 41 +++++++++ .../dao/mapper/CommandMapper.xml | 6 +- .../dao/mapper/CommandMapperTest.java | 13 ++- .../repository/impl/CommandDaoImplTest.java | 88 +++++++++++++++++++ .../command/CommandFetcherConfiguration.java | 49 +++++++++++ .../master/command/ICommandFetcher.java | 36 ++++++++ .../command/IdSlotBasedCommandFetcher.java | 73 +++++++++++++++ .../master/config/CommandFetchStrategy.java | 63 +++++++++++++ .../server/master/config/MasterConfig.java | 12 +-- .../runner/MasterSchedulerBootstrap.java | 38 ++------ .../src/main/resources/application.yaml | 9 +- .../master/config/MasterConfigTest.java | 12 +++ .../src/test/resources/application.yaml | 9 +- .../service/command/CommandService.java | 11 --- .../service/command/CommandServiceImpl.java | 9 -- .../command/MessageServiceImplTest.java | 10 --- .../src/main/resources/application.yaml | 9 +- 29 files changed, 484 insertions(+), 122 deletions(-) create mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java create mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java create mode 100644 dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/CommandFetcherConfiguration.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/ICommandFetcher.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/IdSlotBasedCommandFetcher.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/CommandFetchStrategy.java diff --git a/.github/workflows/cluster-test/mysql_with_mysql_registry/dolphinscheduler_env.sh b/.github/workflows/cluster-test/mysql_with_mysql_registry/dolphinscheduler_env.sh index 58937e740c..8536eb0905 100755 --- a/.github/workflows/cluster-test/mysql_with_mysql_registry/dolphinscheduler_env.sh +++ b/.github/workflows/cluster-test/mysql_with_mysql_registry/dolphinscheduler_env.sh @@ -28,7 +28,6 @@ export SPRING_DATASOURCE_PASSWORD=123456 # DolphinScheduler server related configuration export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none} export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC} -export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10} # Registry center configuration, determines the type and link of the registry center export REGISTRY_TYPE=${REGISTRY_TYPE:-jdbc} diff --git a/.github/workflows/cluster-test/mysql_with_zookeeper_registry/dolphinscheduler_env.sh b/.github/workflows/cluster-test/mysql_with_zookeeper_registry/dolphinscheduler_env.sh index 671c70a5bb..f64e59b768 100755 --- a/.github/workflows/cluster-test/mysql_with_zookeeper_registry/dolphinscheduler_env.sh +++ b/.github/workflows/cluster-test/mysql_with_zookeeper_registry/dolphinscheduler_env.sh @@ -28,7 +28,6 @@ export SPRING_DATASOURCE_PASSWORD=123456 # DolphinScheduler server related configuration export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none} export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC} -export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10} # Registry center configuration, determines the type and link of the registry center export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper} diff --git a/.github/workflows/cluster-test/postgresql_with_postgresql_registry/dolphinscheduler_env.sh b/.github/workflows/cluster-test/postgresql_with_postgresql_registry/dolphinscheduler_env.sh index e7fd1b7204..29f8570319 100644 --- a/.github/workflows/cluster-test/postgresql_with_postgresql_registry/dolphinscheduler_env.sh +++ b/.github/workflows/cluster-test/postgresql_with_postgresql_registry/dolphinscheduler_env.sh @@ -28,7 +28,6 @@ export SPRING_DATASOURCE_PASSWORD=postgres # DolphinScheduler server related configuration export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none} export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC} -export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10} # Registry center configuration, determines the type and link of the registry center export REGISTRY_TYPE=jdbc diff --git a/.github/workflows/cluster-test/postgresql_with_zookeeper_registry/dolphinscheduler_env.sh b/.github/workflows/cluster-test/postgresql_with_zookeeper_registry/dolphinscheduler_env.sh index 1dbd63254e..6851716058 100644 --- a/.github/workflows/cluster-test/postgresql_with_zookeeper_registry/dolphinscheduler_env.sh +++ b/.github/workflows/cluster-test/postgresql_with_zookeeper_registry/dolphinscheduler_env.sh @@ -28,7 +28,6 @@ export SPRING_DATASOURCE_PASSWORD=postgres # DolphinScheduler server related configuration export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none} export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC} -export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10} # Registry center configuration, determines the type and link of the registry center export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper} diff --git a/docs/docs/en/architecture/configuration.md b/docs/docs/en/architecture/configuration.md index 13d8932943..fe0b7851ba 100644 --- a/docs/docs/en/architecture/configuration.md +++ b/docs/docs/en/architecture/configuration.md @@ -286,7 +286,6 @@ Location: `master-server/conf/application.yaml` | Parameters | Default value | Description | |-----------------------------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | master.listen-port | 5678 | master listen port | -| master.fetch-command-num | 10 | the number of commands fetched by master | | master.pre-exec-threads | 10 | master prepare execute thread number to limit handle commands in parallel | | master.exec-threads | 100 | master execute thread number to limit process instances in parallel | | master.dispatch-task-number | 3 | master dispatch task number per batch | @@ -305,6 +304,9 @@ Location: `master-server/conf/application.yaml` | master.registry-disconnect-strategy.strategy | stop | Used when the master disconnect from registry, default value: stop. Optional values include stop, waiting | | master.registry-disconnect-strategy.max-waiting-time | 100s | Used when the master disconnect from registry, and the disconnect strategy is waiting, this config means the master will waiting to reconnect to registry in given times, and after the waiting times, if the master still cannot connect to registry, will stop itself, if the value is 0s, the Master will wait infinitely | | master.worker-group-refresh-interval | 10s | The interval to refresh worker group from db to memory | +| master.command-fetch-strategy.type | ID_SLOT_BASED | The command fetch strategy, only support `ID_SLOT_BASED` | +| master.command-fetch-strategy.config.id-step | 1 | The id auto incremental step of t_ds_command in db | +| master.command-fetch-strategy.config.fetch-size | 10 | The number of commands fetched by master | ### Worker Server related configuration diff --git a/docs/docs/en/guide/installation/pseudo-cluster.md b/docs/docs/en/guide/installation/pseudo-cluster.md index e63436f203..7a3b43b00e 100644 --- a/docs/docs/en/guide/installation/pseudo-cluster.md +++ b/docs/docs/en/guide/installation/pseudo-cluster.md @@ -123,7 +123,6 @@ export SPRING_DATASOURCE_PASSWORD={password} # DolphinScheduler server related configuration export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none} export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC} -export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10} # Registry center configuration, determines the type and link of the registry center export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper} diff --git a/docs/docs/zh/architecture/configuration.md b/docs/docs/zh/architecture/configuration.md index 08fded19e0..d8d1d42d1e 100644 --- a/docs/docs/zh/architecture/configuration.md +++ b/docs/docs/zh/architecture/configuration.md @@ -281,29 +281,30 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId 位置:`master-server/conf/application.yaml` -| 参数 | 默认值 | 描述 | -|-----------------------------------------------------------------------------|--------------|-----------------------------------------------------------------------------------------| -| master.listen-port | 5678 | master监听端口 | -| master.fetch-command-num | 10 | master拉取command数量 | -| master.pre-exec-threads | 10 | master准备执行任务的数量,用于限制并行的command | -| master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 | -| master.dispatch-task-number | 3 | master每个批次的派发任务数量 | -| master.host-selector | lower_weight | master host选择器,用于选择合适的worker执行任务,可选值: random, round_robin, lower_weight | -| master.max-heartbeat-interval | 10s | master最大心跳间隔 | -| master.task-commit-retry-times | 5 | 任务重试次数 | -| master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 | -| master.state-wheel-interval | 5 | 轮询检查状态时间 | -| master.server-load-protection.enabled | true | 是否开启系统保护策略 | -| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.7 | master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统CPU | -| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU | -| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存 | -| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 | -| master.failover-interval | 10 | failover间隔,单位为分钟 | -| master.kill-application-when-task-failover | true | 当任务实例failover时,是否kill掉yarn或k8s application | -| master.registry-disconnect-strategy.strategy | stop | 当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting | -| master.registry-disconnect-strategy.max-waiting-time | 100s | 当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, | -| 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 | -| master.master.worker-group-refresh-interval | 10s | 定期将workerGroup从数据库中同步到内存的时间间隔 | +| 参数 | 默认值 | 描述 | +|-----------------------------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------| +| master.listen-port | 5678 | master监听端口 | +| master.pre-exec-threads | 10 | master准备执行任务的数量,用于限制并行的command | +| master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 | +| master.dispatch-task-number | 3 | master每个批次的派发任务数量 | +| master.host-selector | lower_weight | master host选择器,用于选择合适的worker执行任务,可选值: random, round_robin, lower_weight | +| master.max-heartbeat-interval | 10s | master最大心跳间隔 | +| master.task-commit-retry-times | 5 | 任务重试次数 | +| master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 | +| master.state-wheel-interval | 5 | 轮询检查状态时间 | +| master.server-load-protection.enabled | true | 是否开启系统保护策略 | +| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.7 | master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统CPU | +| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU | +| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存 | +| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 | +| master.failover-interval | 10 | failover间隔,单位为分钟 | +| master.kill-application-when-task-failover | true | 当任务实例failover时,是否kill掉yarn或k8s application | +| master.registry-disconnect-strategy.strategy | stop | 当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting | +| master.registry-disconnect-strategy.max-waiting-time | 100s | 当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 | +| master.master.worker-group-refresh-interval | 10s | 定期将workerGroup从数据库中同步到内存的时间间隔 | +| master.command-fetch-strategy.type | ID_SLOT_BASED | Command拉取策略, 目前仅支持 `ID_SLOT_BASED` | +| master.command-fetch-strategy.config.id-step | 1 | 数据库中t_ds_command的id自增步长 | +| master.command-fetch-strategy.config.fetch-size | 10 | master拉取command数量 | ## Worker Server相关配置 diff --git a/docs/docs/zh/guide/installation/pseudo-cluster.md b/docs/docs/zh/guide/installation/pseudo-cluster.md index 13479e0d9e..a199167e04 100644 --- a/docs/docs/zh/guide/installation/pseudo-cluster.md +++ b/docs/docs/zh/guide/installation/pseudo-cluster.md @@ -118,7 +118,6 @@ export SPRING_DATASOURCE_PASSWORD={password} # DolphinScheduler server related configuration export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none} export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC} -export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10} # Registry center configuration, determines the type and link of the registry center export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java index 9fb6643227..8c8314e7cc 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java @@ -52,14 +52,10 @@ public interface CommandMapper extends BaseMapper { */ List queryCommandPage(@Param("limit") int limit, @Param("offset") int offset); - /** - * query command page by slot - * - * @return command list - */ - List queryCommandPageBySlot(@Param("limit") int limit, - @Param("masterCount") int masterCount, - @Param("thisMasterSlot") int thisMasterSlot); + List queryCommandByIdSlot(@Param("currentSlotIndex") int currentSlotIndex, + @Param("totalSlot") int totalSlot, + @Param("idStep") int idStep, + @Param("fetchNumber") int fetchNum); void deleteByWorkflowInstanceIds(@Param("workflowInstanceIds") List workflowInstanceIds); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java index 2937957dbd..664b56ee47 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java @@ -56,6 +56,11 @@ public abstract class BaseDao> return mybatisMapper.selectBatchIds(ids); } + @Override + public List queryAll() { + return mybatisMapper.selectList(null); + } + @Override public List queryByCondition(ENTITY queryCondition) { if (queryCondition == null) { diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java new file mode 100644 index 0000000000..daa52b8318 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository; + +import org.apache.dolphinscheduler.dao.entity.Command; + +import java.util.List; + +public interface CommandDao extends IDao { + + /** + * Query command by command id and server slot, return the command which match (commandId / step) %s totalSlot = currentSlotIndex + * + * @param currentSlotIndex current slot index + * @param totalSlot total slot number + * @param idStep id step in db + * @param fetchNum fetch number + * @return command list + */ + List queryCommandByIdSlot(int currentSlotIndex, + int totalSlot, + int idStep, + int fetchNum); +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java index c566d9b904..ab77419600 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java @@ -41,6 +41,11 @@ public interface IDao { */ List queryByIds(Collection ids); + /** + * Query all entities. + */ + List queryAll(); + /** * Query the entity by condition. */ diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java new file mode 100644 index 0000000000..0b510d15b5 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.mapper.CommandMapper; +import org.apache.dolphinscheduler.dao.repository.BaseDao; +import org.apache.dolphinscheduler.dao.repository.CommandDao; + +import java.util.List; + +import org.springframework.stereotype.Repository; + +@Repository +public class CommandDaoImpl extends BaseDao implements CommandDao { + + public CommandDaoImpl(CommandMapper commandMapper) { + super(commandMapper); + } + + @Override + public List queryCommandByIdSlot(int currentSlotIndex, int totalSlot, int idStep, int fetchNum) { + return mybatisMapper.queryCommandByIdSlot(currentSlotIndex, totalSlot, idStep, fetchNum); + } + +} diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml index 56db890ef0..16f7c05f25 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml @@ -40,12 +40,12 @@ limit #{limit} offset #{offset} - select * from t_ds_command - where id % #{masterCount} = #{thisMasterSlot} + where (id / #{idStep}) % #{totalSlot} = #{currentSlotIndex} order by process_instance_priority, id asc - limit #{limit} + limit #{fetchNumber} delete from t_ds_command diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java index 2d367e46e4..560b68754a 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java @@ -187,7 +187,7 @@ public class CommandMapperTest extends BaseDaoTest { Command command = createCommand(); Integer id = command.getId(); boolean hit = id % masterCount == thisMasterSlot; - List commandList = commandMapper.queryCommandPageBySlot(1, masterCount, thisMasterSlot); + List commandList = commandMapper.queryCommandByIdSlot(thisMasterSlot, masterCount, 1, 1); if (hit) { Assertions.assertEquals(id, commandList.get(0).getId()); } else { @@ -201,8 +201,9 @@ public class CommandMapperTest extends BaseDaoTest { /** * create command map - * @param count map count - * @param commandType comman type + * + * @param count map count + * @param commandType comman type * @param processDefinitionCode process definition code * @return command map */ @@ -223,7 +224,8 @@ public class CommandMapperTest extends BaseDaoTest { } /** - * create process definition + * create process definition + * * @return process definition */ private ProcessDefinition createProcessDefinition() { @@ -243,6 +245,7 @@ public class CommandMapperTest extends BaseDaoTest { /** * create command map + * * @param count map count * @return command map */ @@ -258,6 +261,7 @@ public class CommandMapperTest extends BaseDaoTest { /** * create command + * * @return */ private Command createCommand() { @@ -266,6 +270,7 @@ public class CommandMapperTest extends BaseDaoTest { /** * create command + * * @return Command */ private Command createCommand(CommandType commandType, long processDefinitionCode) { diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java new file mode 100644 index 0000000000..85867ef3b5 --- /dev/null +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import static com.google.common.truth.Truth.assertThat; + +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.dao.BaseDaoTest; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.repository.CommandDao; + +import org.apache.commons.lang3.RandomUtils; + +import java.util.List; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +class CommandDaoImplTest extends BaseDaoTest { + + @Autowired + private CommandDao commandDao; + + @Test + void fetchCommandByIdSlot() { + int commandSize = RandomUtils.nextInt(1, 1000); + for (int i = 0; i < commandSize; i++) { + createCommand(CommandType.START_PROCESS, 0); + } + int totalSlot = RandomUtils.nextInt(1, 10); + int currentSlotIndex = RandomUtils.nextInt(0, totalSlot); + int fetchSize = RandomUtils.nextInt(10, 100); + for (int i = 1; i < 5; i++) { + int idStep = i; + List commands = commandDao.queryCommandByIdSlot(currentSlotIndex, totalSlot, idStep, fetchSize); + assertThat(commands.size()).isGreaterThan(0); + assertThat(commands.size()) + .isEqualTo(commandDao.queryAll() + .stream() + .filter(command -> (command.getId() / idStep) % totalSlot == currentSlotIndex) + .limit(fetchSize) + .count()); + + } + + } + + private void createCommand(CommandType commandType, int processDefinitionCode) { + Command command = new Command(); + command.setCommandType(commandType); + command.setProcessDefinitionCode(processDefinitionCode); + command.setExecutorId(4); + command.setCommandParam("test command param"); + command.setTaskDependType(TaskDependType.TASK_ONLY); + command.setFailureStrategy(FailureStrategy.CONTINUE); + command.setWarningType(WarningType.ALL); + command.setWarningGroupId(1); + command.setScheduleTime(DateUtils.stringToDate("2019-12-29 12:10:00")); + command.setProcessInstancePriority(Priority.MEDIUM); + command.setStartTime(DateUtils.stringToDate("2019-12-29 10:10:00")); + command.setUpdateTime(DateUtils.stringToDate("2019-12-29 10:10:00")); + command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); + command.setProcessInstanceId(0); + command.setProcessDefinitionVersion(0); + commandDao.insert(command); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/CommandFetcherConfiguration.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/CommandFetcherConfiguration.java new file mode 100644 index 0000000000..4a4d3c1efc --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/CommandFetcherConfiguration.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.command; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.dolphinscheduler.dao.repository.CommandDao; +import org.apache.dolphinscheduler.server.master.config.CommandFetchStrategy; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class CommandFetcherConfiguration { + + @Bean + public ICommandFetcher commandFetcher(MasterConfig masterConfig, + MasterSlotManager masterSlotManager, + CommandDao commandDao) { + CommandFetchStrategy commandFetchStrategy = + checkNotNull(masterConfig.getCommandFetchStrategy(), "command fetch strategy is null"); + switch (commandFetchStrategy.getType()) { + case ID_SLOT_BASED: + CommandFetchStrategy.IdSlotBasedFetchConfig idSlotBasedFetchConfig = + (CommandFetchStrategy.IdSlotBasedFetchConfig) commandFetchStrategy.getConfig(); + return new IdSlotBasedCommandFetcher(idSlotBasedFetchConfig, masterSlotManager, commandDao); + default: + throw new IllegalArgumentException( + "unsupported command fetch strategy type: " + commandFetchStrategy.getType()); + } + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/ICommandFetcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/ICommandFetcher.java new file mode 100644 index 0000000000..c315a9b294 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/ICommandFetcher.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.command; + +import org.apache.dolphinscheduler.dao.entity.Command; + +import java.util.List; + +/** + * The command fetcher used to fetch commands + */ +public interface ICommandFetcher { + + /** + * Fetch commands + * + * @return command list which need to be handled + */ + List fetchCommands(); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/IdSlotBasedCommandFetcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/IdSlotBasedCommandFetcher.java new file mode 100644 index 0000000000..a417820093 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/IdSlotBasedCommandFetcher.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.command; + +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.repository.CommandDao; +import org.apache.dolphinscheduler.server.master.config.CommandFetchStrategy; +import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; +import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager; + +import java.util.Collections; +import java.util.List; + +import lombok.extern.slf4j.Slf4j; + +/** + * The command fetcher which is fetch commands by command id and slot. + */ +@Slf4j +public class IdSlotBasedCommandFetcher implements ICommandFetcher { + + private final CommandFetchStrategy.IdSlotBasedFetchConfig idSlotBasedFetchConfig; + + private final CommandDao commandDao; + + private final MasterSlotManager masterSlotManager; + + public IdSlotBasedCommandFetcher(CommandFetchStrategy.IdSlotBasedFetchConfig idSlotBasedFetchConfig, + MasterSlotManager masterSlotManager, + CommandDao commandDao) { + this.idSlotBasedFetchConfig = idSlotBasedFetchConfig; + this.masterSlotManager = masterSlotManager; + this.commandDao = commandDao; + } + + @Override + public List fetchCommands() { + long scheduleStartTime = System.currentTimeMillis(); + int currentSlotIndex = masterSlotManager.getSlot(); + int totalSlot = masterSlotManager.getMasterSize(); + if (totalSlot <= 0 || currentSlotIndex < 0) { + log.warn("Slot is validated, current master slots: {}, the current slot index is {}", totalSlot, + currentSlotIndex); + return Collections.emptyList(); + } + List commands = commandDao.queryCommandByIdSlot( + currentSlotIndex, + totalSlot, + idSlotBasedFetchConfig.getIdStep(), + idSlotBasedFetchConfig.getFetchSize()); + long cost = System.currentTimeMillis() - scheduleStartTime; + log.info("Fetch commands: {} success, cost: {}ms, totalSlot: {}, currentSlotIndex: {}", commands.size(), cost, + totalSlot, currentSlotIndex); + ProcessInstanceMetrics.recordCommandQueryTime(cost); + return commands; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/CommandFetchStrategy.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/CommandFetchStrategy.java new file mode 100644 index 0000000000..e61941677c --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/CommandFetchStrategy.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.config; + +import lombok.Data; + +import org.springframework.validation.Errors; + +@Data +public class CommandFetchStrategy { + + private CommandFetchStrategyType type = CommandFetchStrategyType.ID_SLOT_BASED; + + private CommandFetchConfig config = new IdSlotBasedFetchConfig(); + + public void validate(Errors errors) { + config.validate(errors); + } + + public enum CommandFetchStrategyType { + ID_SLOT_BASED, + ; + } + + public interface CommandFetchConfig { + + void validate(Errors errors); + + } + + @Data + public static class IdSlotBasedFetchConfig implements CommandFetchConfig { + + private int idStep = 1; + private int fetchSize = 10; + + @Override + public void validate(Errors errors) { + if (idStep <= 0) { + errors.rejectValue("step", null, "step must be greater than 0"); + } + if (fetchSize <= 0) { + errors.rejectValue("fetchSize", null, "fetchSize must be greater than 0"); + } + } + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index 02c0dcb819..20d3cccef3 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -48,10 +48,6 @@ public class MasterConfig implements Validator { * The master RPC server listen port. */ private int listenPort = 5678; - /** - * The max batch size used to fetch command from database. - */ - private int fetchCommandNum = 10; /** * The thread number used to prepare processInstance. This number shouldn't bigger than fetchCommandNum. */ @@ -98,6 +94,8 @@ public class MasterConfig implements Validator { private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L); + private CommandFetchStrategy commandFetchStrategy = new CommandFetchStrategy(); + // ip:listenPort private String masterAddress; @@ -115,9 +113,6 @@ public class MasterConfig implements Validator { if (masterConfig.getListenPort() <= 0) { errors.rejectValue("listen-port", null, "is invalidated"); } - if (masterConfig.getFetchCommandNum() <= 0) { - errors.rejectValue("fetch-command-num", null, "should be a positive value"); - } if (masterConfig.getPreExecThreads() <= 0) { errors.rejectValue("per-exec-threads", null, "should be a positive value"); } @@ -149,6 +144,7 @@ public class MasterConfig implements Validator { if (StringUtils.isEmpty(masterConfig.getMasterAddress())) { masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort())); } + commandFetchStrategy.validate(errors); masterConfig.setMasterRegistryPath( RegistryNodeType.MASTER.getRegistryPath() + "/" + masterConfig.getMasterAddress()); @@ -159,7 +155,6 @@ public class MasterConfig implements Validator { String config = "\n****************************Master Configuration**************************************" + "\n listen-port -> " + listenPort + - "\n fetch-command-num -> " + fetchCommandNum + "\n pre-exec-threads -> " + preExecThreads + "\n exec-threads -> " + execThreads + "\n dispatch-task-number -> " + dispatchTaskNumber + @@ -175,6 +170,7 @@ public class MasterConfig implements Validator { "\n master-address -> " + masterAddress + "\n master-registry-path: " + masterRegistryPath + "\n worker-group-refresh-interval: " + workerGroupRefreshInterval + + "\n command-fetch-strategy: " + commandFetchStrategy + "\n****************************Master Configuration**************************************"; log.info(config); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java index 2fddd94384..c1b5d0ffab 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java @@ -26,21 +26,18 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.command.ICommandFetcher; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection; import org.apache.dolphinscheduler.server.master.event.WorkflowEvent; import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue; import org.apache.dolphinscheduler.server.master.event.WorkflowEventType; -import org.apache.dolphinscheduler.server.master.exception.MasterException; import org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException; import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics; -import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; -import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager; import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.commons.collections4.CollectionUtils; -import java.util.Collections; import java.util.List; import java.util.Optional; @@ -56,6 +53,9 @@ import org.springframework.stereotype.Service; @Slf4j public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCloseable { + @Autowired + private ICommandFetcher commandFetcher; + @Autowired private CommandService commandService; @@ -74,9 +74,6 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl @Autowired private WorkflowEventLooper workflowEventLooper; - @Autowired - private MasterSlotManager masterSlotManager; - @Autowired private MasterTaskExecutorBootstrap masterTaskExecutorBootstrap; @@ -125,7 +122,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } - List commands = findCommands(); + List commands = commandFetcher.fetchCommands(); if (CollectionUtils.isEmpty(commands)) { // indicate that no command ,sleep for 1s Thread.sleep(Constants.SLEEP_TIME_MILLIS); @@ -170,29 +167,4 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl } } - private List findCommands() throws MasterException { - try { - long scheduleStartTime = System.currentTimeMillis(); - int thisMasterSlot = masterSlotManager.getSlot(); - int masterCount = masterSlotManager.getMasterSize(); - if (masterCount <= 0) { - log.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot); - return Collections.emptyList(); - } - int pageSize = masterConfig.getFetchCommandNum(); - final List result = - commandService.findCommandPageBySlot(pageSize, masterCount, thisMasterSlot); - if (CollectionUtils.isNotEmpty(result)) { - long cost = System.currentTimeMillis() - scheduleStartTime; - log.info( - "Master schedule bootstrap loop command success, fetch command size: {}, cost: {}ms, current slot: {}, total slot size: {}", - result.size(), cost, thisMasterSlot, masterCount); - ProcessInstanceMetrics.recordCommandQueryTime(cost); - } - return result; - } catch (Exception ex) { - throw new MasterException("Master loop command from database error", ex); - } - } - } diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml index f18c6ef61d..17b1e41a71 100644 --- a/dolphinscheduler-master/src/main/resources/application.yaml +++ b/dolphinscheduler-master/src/main/resources/application.yaml @@ -83,8 +83,6 @@ registry: master: listen-port: 5678 - # master fetch command num - fetch-command-num: 10 # master prepare execute thread number to limit handle commands in parallel pre-exec-threads: 10 # master execute thread number to limit process instances in parallel @@ -121,6 +119,13 @@ master: # The max waiting time to reconnect to registry if you set the strategy to waiting max-waiting-time: 100s worker-group-refresh-interval: 10s + command-fetch-strategy: + type: ID_SLOT_BASED + config: + # The incremental id step + id-step: 1 + # master fetch command num + fetch-size: 10 server: port: 5679 diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java index faab44cf85..9d26aa81f4 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.config; +import static com.google.common.truth.Truth.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -47,6 +48,17 @@ public class MasterConfigTest { assertEquals(0.77, serverLoadProtection.getMaxJvmCpuUsagePercentageThresholds()); assertEquals(0.77, serverLoadProtection.getMaxSystemMemoryUsagePercentageThresholds()); assertEquals(0.77, serverLoadProtection.getMaxDiskUsagePercentageThresholds()); + } + + @Test + public void getCommandFetchStrategy() { + CommandFetchStrategy commandFetchStrategy = masterConfig.getCommandFetchStrategy(); + assertThat(commandFetchStrategy.getType()) + .isEqualTo(CommandFetchStrategy.CommandFetchStrategyType.ID_SLOT_BASED); + CommandFetchStrategy.IdSlotBasedFetchConfig idSlotBasedFetchConfig = + (CommandFetchStrategy.IdSlotBasedFetchConfig) commandFetchStrategy.getConfig(); + assertThat(idSlotBasedFetchConfig.getIdStep()).isEqualTo(3); + assertThat(idSlotBasedFetchConfig.getFetchSize()).isEqualTo(11); } } diff --git a/dolphinscheduler-master/src/test/resources/application.yaml b/dolphinscheduler-master/src/test/resources/application.yaml index f4827d4b3c..15f9199609 100644 --- a/dolphinscheduler-master/src/test/resources/application.yaml +++ b/dolphinscheduler-master/src/test/resources/application.yaml @@ -89,8 +89,6 @@ registry: master: listen-port: 5678 - # master fetch command num - fetch-command-num: 10 # master prepare execute thread number to limit handle commands in parallel pre-exec-threads: 10 # master execute thread number to limit process instances in parallel @@ -127,6 +125,13 @@ master: # The max waiting time to reconnect to registry if you set the strategy to waiting max-waiting-time: 100s worker-group-refresh-interval: 10s + command-fetch-strategy: + type: ID_SLOT_BASED + config: + # The incremental id step + id-step: 3 + # master fetch command num + fetch-size: 11 server: port: 5679 diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java index cff73c503f..43b81c4e5c 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java @@ -22,8 +22,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import java.util.List; - /** * Command Service */ @@ -44,15 +42,6 @@ public interface CommandService { */ int createCommand(Command command); - /** - * Get command page - * @param pageSize page size - * @param masterCount master count - * @param thisMasterSlot master slot - * @return command page - */ - List findCommandPageBySlot(int pageSize, int masterCount, int thisMasterSlot); - /** * check the input command exists in queue list * diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java index 483899446b..ee833a80b0 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java @@ -57,7 +57,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; import io.micrometer.core.annotation.Counted; /** @@ -107,14 +106,6 @@ public class CommandServiceImpl implements CommandService { return result; } - @Override - public List findCommandPageBySlot(int pageSize, int masterCount, int thisMasterSlot) { - if (masterCount <= 0) { - return Lists.newArrayList(); - } - return commandMapper.queryCommandPageBySlot(pageSize, masterCount, thisMasterSlot); - } - @Override public boolean verifyIsNeedCreateCommand(Command command) { boolean isNeedCreate = true; diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java index 0cde76bdfe..f60320fc63 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java @@ -214,14 +214,4 @@ class MessageServiceImplTest { Mockito.verify(commandMapper, Mockito.times(1)).insert(command); } - @Test - public void testFindCommandPageBySlot() { - int pageSize = 1; - int masterCount = 0; - int thisMasterSlot = 2; - List commandList = - commandService.findCommandPageBySlot(pageSize, masterCount, thisMasterSlot); - Assertions.assertEquals(0, commandList.size()); - } - } diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index 5122eea2a1..6757718929 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -160,8 +160,6 @@ casdoor: master: listen-port: 5678 - # master fetch command num - fetch-command-num: 10 # master prepare execute thread number to limit handle commands in parallel pre-exec-threads: 10 # master execute thread number to limit process instances in parallel @@ -192,6 +190,13 @@ master: # kill yarn/k8s application when failover taskInstance, default true kill-application-when-task-failover: true worker-group-refresh-interval: 10s + command-fetch-strategy: + type: ID_SLOT_BASED + config: + # The incremental id step + id-step: 1 + # master fetch command num + fetch-size: 10 worker: # worker listener port