Browse Source

[DSIP-32][Master] Add command fetcher strategy for master fetch command (#15900)

3.2.2-prepare
Wenjun Ruan 8 months ago committed by GitHub
parent
commit
647cbae400
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      .github/workflows/cluster-test/mysql_with_mysql_registry/dolphinscheduler_env.sh
  2. 1
      .github/workflows/cluster-test/mysql_with_zookeeper_registry/dolphinscheduler_env.sh
  3. 1
      .github/workflows/cluster-test/postgresql_with_postgresql_registry/dolphinscheduler_env.sh
  4. 1
      .github/workflows/cluster-test/postgresql_with_zookeeper_registry/dolphinscheduler_env.sh
  5. 4
      docs/docs/en/architecture/configuration.md
  6. 1
      docs/docs/en/guide/installation/pseudo-cluster.md
  7. 47
      docs/docs/zh/architecture/configuration.md
  8. 1
      docs/docs/zh/guide/installation/pseudo-cluster.md
  9. 12
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
  10. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java
  11. 39
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java
  12. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java
  13. 41
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
  14. 6
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
  15. 13
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
  16. 88
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java
  17. 49
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/CommandFetcherConfiguration.java
  18. 36
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/ICommandFetcher.java
  19. 73
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/IdSlotBasedCommandFetcher.java
  20. 63
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/CommandFetchStrategy.java
  21. 12
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  22. 38
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
  23. 9
      dolphinscheduler-master/src/main/resources/application.yaml
  24. 12
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java
  25. 9
      dolphinscheduler-master/src/test/resources/application.yaml
  26. 11
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java
  27. 9
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java
  28. 10
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java
  29. 9
      dolphinscheduler-standalone-server/src/main/resources/application.yaml

1
.github/workflows/cluster-test/mysql_with_mysql_registry/dolphinscheduler_env.sh

@ -28,7 +28,6 @@ export SPRING_DATASOURCE_PASSWORD=123456
# DolphinScheduler server related configuration
export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}
export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC}
export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}
# Registry center configuration, determines the type and link of the registry center
export REGISTRY_TYPE=${REGISTRY_TYPE:-jdbc}

1
.github/workflows/cluster-test/mysql_with_zookeeper_registry/dolphinscheduler_env.sh

@ -28,7 +28,6 @@ export SPRING_DATASOURCE_PASSWORD=123456
# DolphinScheduler server related configuration
export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}
export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC}
export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}
# Registry center configuration, determines the type and link of the registry center
export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}

1
.github/workflows/cluster-test/postgresql_with_postgresql_registry/dolphinscheduler_env.sh

@ -28,7 +28,6 @@ export SPRING_DATASOURCE_PASSWORD=postgres
# DolphinScheduler server related configuration
export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}
export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC}
export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}
# Registry center configuration, determines the type and link of the registry center
export REGISTRY_TYPE=jdbc

1
.github/workflows/cluster-test/postgresql_with_zookeeper_registry/dolphinscheduler_env.sh

@ -28,7 +28,6 @@ export SPRING_DATASOURCE_PASSWORD=postgres
# DolphinScheduler server related configuration
export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}
export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC}
export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}
# Registry center configuration, determines the type and link of the registry center
export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}

4
docs/docs/en/architecture/configuration.md

@ -286,7 +286,6 @@ Location: `master-server/conf/application.yaml`
| Parameters | Default value | Description |
|-----------------------------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| master.listen-port | 5678 | master listen port |
| master.fetch-command-num | 10 | the number of commands fetched by master |
| master.pre-exec-threads | 10 | master prepare execute thread number to limit handle commands in parallel |
| master.exec-threads | 100 | master execute thread number to limit process instances in parallel |
| master.dispatch-task-number | 3 | master dispatch task number per batch |
@ -305,6 +304,9 @@ Location: `master-server/conf/application.yaml`
| master.registry-disconnect-strategy.strategy | stop | Used when the master disconnect from registry, default value: stop. Optional values include stop, waiting |
| master.registry-disconnect-strategy.max-waiting-time | 100s | Used when the master disconnect from registry, and the disconnect strategy is waiting, this config means the master will waiting to reconnect to registry in given times, and after the waiting times, if the master still cannot connect to registry, will stop itself, if the value is 0s, the Master will wait infinitely |
| master.worker-group-refresh-interval | 10s | The interval to refresh worker group from db to memory |
| master.command-fetch-strategy.type | ID_SLOT_BASED | The command fetch strategy, only support `ID_SLOT_BASED` |
| master.command-fetch-strategy.config.id-step | 1 | The id auto incremental step of t_ds_command in db |
| master.command-fetch-strategy.config.fetch-size | 10 | The number of commands fetched by master |
### Worker Server related configuration

1
docs/docs/en/guide/installation/pseudo-cluster.md

@ -123,7 +123,6 @@ export SPRING_DATASOURCE_PASSWORD={password}
# DolphinScheduler server related configuration
export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}
export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC}
export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}
# Registry center configuration, determines the type and link of the registry center
export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}

47
docs/docs/zh/architecture/configuration.md

@ -281,29 +281,30 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
位置:`master-server/conf/application.yaml`
| 参数 | 默认值 | 描述 |
|-----------------------------------------------------------------------------|--------------|-----------------------------------------------------------------------------------------|
| master.listen-port | 5678 | master监听端口 |
| master.fetch-command-num | 10 | master拉取command数量 |
| master.pre-exec-threads | 10 | master准备执行任务的数量,用于限制并行的command |
| master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 |
| master.dispatch-task-number | 3 | master每个批次的派发任务数量 |
| master.host-selector | lower_weight | master host选择器,用于选择合适的worker执行任务,可选值: random, round_robin, lower_weight |
| master.max-heartbeat-interval | 10s | master最大心跳间隔 |
| master.task-commit-retry-times | 5 | 任务重试次数 |
| master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 |
| master.state-wheel-interval | 5 | 轮询检查状态时间 |
| master.server-load-protection.enabled | true | 是否开启系统保护策略 |
| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.7 | master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统CPU |
| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU |
| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存 |
| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 |
| master.failover-interval | 10 | failover间隔,单位为分钟 |
| master.kill-application-when-task-failover | true | 当任务实例failover时,是否kill掉yarn或k8s application |
| master.registry-disconnect-strategy.strategy | stop | 当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting |
| master.registry-disconnect-strategy.max-waiting-time | 100s | 当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, |
| 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 |
| master.master.worker-group-refresh-interval | 10s | 定期将workerGroup从数据库中同步到内存的时间间隔 |
| 参数 | 默认值 | 描述 |
|-----------------------------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------|
| master.listen-port | 5678 | master监听端口 |
| master.pre-exec-threads | 10 | master准备执行任务的数量,用于限制并行的command |
| master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 |
| master.dispatch-task-number | 3 | master每个批次的派发任务数量 |
| master.host-selector | lower_weight | master host选择器,用于选择合适的worker执行任务,可选值: random, round_robin, lower_weight |
| master.max-heartbeat-interval | 10s | master最大心跳间隔 |
| master.task-commit-retry-times | 5 | 任务重试次数 |
| master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 |
| master.state-wheel-interval | 5 | 轮询检查状态时间 |
| master.server-load-protection.enabled | true | 是否开启系统保护策略 |
| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.7 | master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统CPU |
| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU |
| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存 |
| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 |
| master.failover-interval | 10 | failover间隔,单位为分钟 |
| master.kill-application-when-task-failover | true | 当任务实例failover时,是否kill掉yarn或k8s application |
| master.registry-disconnect-strategy.strategy | stop | 当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting |
| master.registry-disconnect-strategy.max-waiting-time | 100s | 当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 |
| master.master.worker-group-refresh-interval | 10s | 定期将workerGroup从数据库中同步到内存的时间间隔 |
| master.command-fetch-strategy.type | ID_SLOT_BASED | Command拉取策略, 目前仅支持 `ID_SLOT_BASED` |
| master.command-fetch-strategy.config.id-step | 1 | 数据库中t_ds_command的id自增步长 |
| master.command-fetch-strategy.config.fetch-size | 10 | master拉取command数量 |
## Worker Server相关配置

1
docs/docs/zh/guide/installation/pseudo-cluster.md

@ -118,7 +118,6 @@ export SPRING_DATASOURCE_PASSWORD={password}
# DolphinScheduler server related configuration
export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}
export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC}
export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}
# Registry center configuration, determines the type and link of the registry center
export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}

12
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java

@ -52,14 +52,10 @@ public interface CommandMapper extends BaseMapper<Command> {
*/
List<Command> queryCommandPage(@Param("limit") int limit, @Param("offset") int offset);
/**
* query command page by slot
*
* @return command list
*/
List<Command> queryCommandPageBySlot(@Param("limit") int limit,
@Param("masterCount") int masterCount,
@Param("thisMasterSlot") int thisMasterSlot);
List<Command> queryCommandByIdSlot(@Param("currentSlotIndex") int currentSlotIndex,
@Param("totalSlot") int totalSlot,
@Param("idStep") int idStep,
@Param("fetchNumber") int fetchNum);
void deleteByWorkflowInstanceIds(@Param("workflowInstanceIds") List<Integer> workflowInstanceIds);
}

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java

@ -56,6 +56,11 @@ public abstract class BaseDao<ENTITY, MYBATIS_MAPPER extends BaseMapper<ENTITY>>
return mybatisMapper.selectBatchIds(ids);
}
@Override
public List<ENTITY> queryAll() {
return mybatisMapper.selectList(null);
}
@Override
public List<ENTITY> queryByCondition(ENTITY queryCondition) {
if (queryCondition == null) {

39
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.Command;
import java.util.List;
public interface CommandDao extends IDao<Command> {
/**
* Query command by command id and server slot, return the command which match (commandId / step) %s totalSlot = currentSlotIndex
*
* @param currentSlotIndex current slot index
* @param totalSlot total slot number
* @param idStep id step in db
* @param fetchNum fetch number
* @return command list
*/
List<Command> queryCommandByIdSlot(int currentSlotIndex,
int totalSlot,
int idStep,
int fetchNum);
}

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java

@ -41,6 +41,11 @@ public interface IDao<Entity> {
*/
List<Entity> queryByIds(Collection<? extends Serializable> ids);
/**
* Query all entities.
*/
List<Entity> queryAll();
/**
* Query the entity by condition.
*/

41
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.CommandDao;
import java.util.List;
import org.springframework.stereotype.Repository;
@Repository
public class CommandDaoImpl extends BaseDao<Command, CommandMapper> implements CommandDao {
public CommandDaoImpl(CommandMapper commandMapper) {
super(commandMapper);
}
@Override
public List<Command> queryCommandByIdSlot(int currentSlotIndex, int totalSlot, int idStep, int fetchNum) {
return mybatisMapper.queryCommandByIdSlot(currentSlotIndex, totalSlot, idStep, fetchNum);
}
}

6
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml

@ -40,12 +40,12 @@
limit #{limit} offset #{offset}
</select>
<select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
<select id="queryCommandByIdSlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
select *
from t_ds_command
where id % #{masterCount} = #{thisMasterSlot}
where (id / #{idStep}) % #{totalSlot} = #{currentSlotIndex}
order by process_instance_priority, id asc
limit #{limit}
limit #{fetchNumber}
</select>
<delete id="deleteByWorkflowInstanceIds" >
delete from t_ds_command

13
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java

@ -187,7 +187,7 @@ public class CommandMapperTest extends BaseDaoTest {
Command command = createCommand();
Integer id = command.getId();
boolean hit = id % masterCount == thisMasterSlot;
List<Command> commandList = commandMapper.queryCommandPageBySlot(1, masterCount, thisMasterSlot);
List<Command> commandList = commandMapper.queryCommandByIdSlot(thisMasterSlot, masterCount, 1, 1);
if (hit) {
Assertions.assertEquals(id, commandList.get(0).getId());
} else {
@ -201,8 +201,9 @@ public class CommandMapperTest extends BaseDaoTest {
/**
* create command map
* @param count map count
* @param commandType comman type
*
* @param count map count
* @param commandType comman type
* @param processDefinitionCode process definition code
* @return command map
*/
@ -223,7 +224,8 @@ public class CommandMapperTest extends BaseDaoTest {
}
/**
* create process definition
* create process definition
*
* @return process definition
*/
private ProcessDefinition createProcessDefinition() {
@ -243,6 +245,7 @@ public class CommandMapperTest extends BaseDaoTest {
/**
* create command map
*
* @param count map count
* @return command map
*/
@ -258,6 +261,7 @@ public class CommandMapperTest extends BaseDaoTest {
/**
* create command
*
* @return
*/
private Command createCommand() {
@ -266,6 +270,7 @@ public class CommandMapperTest extends BaseDaoTest {
/**
* create command
*
* @return Command
*/
private Command createCommand(CommandType commandType, long processDefinitionCode) {

88
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import static com.google.common.truth.Truth.assertThat;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.repository.CommandDao;
import org.apache.commons.lang3.RandomUtils;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
class CommandDaoImplTest extends BaseDaoTest {
@Autowired
private CommandDao commandDao;
@Test
void fetchCommandByIdSlot() {
int commandSize = RandomUtils.nextInt(1, 1000);
for (int i = 0; i < commandSize; i++) {
createCommand(CommandType.START_PROCESS, 0);
}
int totalSlot = RandomUtils.nextInt(1, 10);
int currentSlotIndex = RandomUtils.nextInt(0, totalSlot);
int fetchSize = RandomUtils.nextInt(10, 100);
for (int i = 1; i < 5; i++) {
int idStep = i;
List<Command> commands = commandDao.queryCommandByIdSlot(currentSlotIndex, totalSlot, idStep, fetchSize);
assertThat(commands.size()).isGreaterThan(0);
assertThat(commands.size())
.isEqualTo(commandDao.queryAll()
.stream()
.filter(command -> (command.getId() / idStep) % totalSlot == currentSlotIndex)
.limit(fetchSize)
.count());
}
}
private void createCommand(CommandType commandType, int processDefinitionCode) {
Command command = new Command();
command.setCommandType(commandType);
command.setProcessDefinitionCode(processDefinitionCode);
command.setExecutorId(4);
command.setCommandParam("test command param");
command.setTaskDependType(TaskDependType.TASK_ONLY);
command.setFailureStrategy(FailureStrategy.CONTINUE);
command.setWarningType(WarningType.ALL);
command.setWarningGroupId(1);
command.setScheduleTime(DateUtils.stringToDate("2019-12-29 12:10:00"));
command.setProcessInstancePriority(Priority.MEDIUM);
command.setStartTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
command.setUpdateTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
command.setProcessInstanceId(0);
command.setProcessDefinitionVersion(0);
commandDao.insert(command);
}
}

49
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/CommandFetcherConfiguration.java

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.command;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.dao.repository.CommandDao;
import org.apache.dolphinscheduler.server.master.config.CommandFetchStrategy;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CommandFetcherConfiguration {
@Bean
public ICommandFetcher commandFetcher(MasterConfig masterConfig,
MasterSlotManager masterSlotManager,
CommandDao commandDao) {
CommandFetchStrategy commandFetchStrategy =
checkNotNull(masterConfig.getCommandFetchStrategy(), "command fetch strategy is null");
switch (commandFetchStrategy.getType()) {
case ID_SLOT_BASED:
CommandFetchStrategy.IdSlotBasedFetchConfig idSlotBasedFetchConfig =
(CommandFetchStrategy.IdSlotBasedFetchConfig) commandFetchStrategy.getConfig();
return new IdSlotBasedCommandFetcher(idSlotBasedFetchConfig, masterSlotManager, commandDao);
default:
throw new IllegalArgumentException(
"unsupported command fetch strategy type: " + commandFetchStrategy.getType());
}
}
}

36
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/ICommandFetcher.java

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.command;
import org.apache.dolphinscheduler.dao.entity.Command;
import java.util.List;
/**
* The command fetcher used to fetch commands
*/
public interface ICommandFetcher {
/**
* Fetch commands
*
* @return command list which need to be handled
*/
List<Command> fetchCommands();
}

73
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/IdSlotBasedCommandFetcher.java

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.command;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.repository.CommandDao;
import org.apache.dolphinscheduler.server.master.config.CommandFetchStrategy;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager;
import java.util.Collections;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
/**
* The command fetcher which is fetch commands by command id and slot.
*/
@Slf4j
public class IdSlotBasedCommandFetcher implements ICommandFetcher {
private final CommandFetchStrategy.IdSlotBasedFetchConfig idSlotBasedFetchConfig;
private final CommandDao commandDao;
private final MasterSlotManager masterSlotManager;
public IdSlotBasedCommandFetcher(CommandFetchStrategy.IdSlotBasedFetchConfig idSlotBasedFetchConfig,
MasterSlotManager masterSlotManager,
CommandDao commandDao) {
this.idSlotBasedFetchConfig = idSlotBasedFetchConfig;
this.masterSlotManager = masterSlotManager;
this.commandDao = commandDao;
}
@Override
public List<Command> fetchCommands() {
long scheduleStartTime = System.currentTimeMillis();
int currentSlotIndex = masterSlotManager.getSlot();
int totalSlot = masterSlotManager.getMasterSize();
if (totalSlot <= 0 || currentSlotIndex < 0) {
log.warn("Slot is validated, current master slots: {}, the current slot index is {}", totalSlot,
currentSlotIndex);
return Collections.emptyList();
}
List<Command> commands = commandDao.queryCommandByIdSlot(
currentSlotIndex,
totalSlot,
idSlotBasedFetchConfig.getIdStep(),
idSlotBasedFetchConfig.getFetchSize());
long cost = System.currentTimeMillis() - scheduleStartTime;
log.info("Fetch commands: {} success, cost: {}ms, totalSlot: {}, currentSlotIndex: {}", commands.size(), cost,
totalSlot, currentSlotIndex);
ProcessInstanceMetrics.recordCommandQueryTime(cost);
return commands;
}
}

63
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/CommandFetchStrategy.java

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.config;
import lombok.Data;
import org.springframework.validation.Errors;
@Data
public class CommandFetchStrategy {
private CommandFetchStrategyType type = CommandFetchStrategyType.ID_SLOT_BASED;
private CommandFetchConfig config = new IdSlotBasedFetchConfig();
public void validate(Errors errors) {
config.validate(errors);
}
public enum CommandFetchStrategyType {
ID_SLOT_BASED,
;
}
public interface CommandFetchConfig {
void validate(Errors errors);
}
@Data
public static class IdSlotBasedFetchConfig implements CommandFetchConfig {
private int idStep = 1;
private int fetchSize = 10;
@Override
public void validate(Errors errors) {
if (idStep <= 0) {
errors.rejectValue("step", null, "step must be greater than 0");
}
if (fetchSize <= 0) {
errors.rejectValue("fetchSize", null, "fetchSize must be greater than 0");
}
}
}
}

12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -48,10 +48,6 @@ public class MasterConfig implements Validator {
* The master RPC server listen port.
*/
private int listenPort = 5678;
/**
* The max batch size used to fetch command from database.
*/
private int fetchCommandNum = 10;
/**
* The thread number used to prepare processInstance. This number shouldn't bigger than fetchCommandNum.
*/
@ -98,6 +94,8 @@ public class MasterConfig implements Validator {
private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L);
private CommandFetchStrategy commandFetchStrategy = new CommandFetchStrategy();
// ip:listenPort
private String masterAddress;
@ -115,9 +113,6 @@ public class MasterConfig implements Validator {
if (masterConfig.getListenPort() <= 0) {
errors.rejectValue("listen-port", null, "is invalidated");
}
if (masterConfig.getFetchCommandNum() <= 0) {
errors.rejectValue("fetch-command-num", null, "should be a positive value");
}
if (masterConfig.getPreExecThreads() <= 0) {
errors.rejectValue("per-exec-threads", null, "should be a positive value");
}
@ -149,6 +144,7 @@ public class MasterConfig implements Validator {
if (StringUtils.isEmpty(masterConfig.getMasterAddress())) {
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
}
commandFetchStrategy.validate(errors);
masterConfig.setMasterRegistryPath(
RegistryNodeType.MASTER.getRegistryPath() + "/" + masterConfig.getMasterAddress());
@ -159,7 +155,6 @@ public class MasterConfig implements Validator {
String config =
"\n****************************Master Configuration**************************************" +
"\n listen-port -> " + listenPort +
"\n fetch-command-num -> " + fetchCommandNum +
"\n pre-exec-threads -> " + preExecThreads +
"\n exec-threads -> " + execThreads +
"\n dispatch-task-number -> " + dispatchTaskNumber +
@ -175,6 +170,7 @@ public class MasterConfig implements Validator {
"\n master-address -> " + masterAddress +
"\n master-registry-path: " + masterRegistryPath +
"\n worker-group-refresh-interval: " + workerGroupRefreshInterval +
"\n command-fetch-strategy: " + commandFetchStrategy +
"\n****************************Master Configuration**************************************";
log.info(config);
}

38
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java

@ -26,21 +26,18 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.command.ICommandFetcher;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection;
import org.apache.dolphinscheduler.server.master.event.WorkflowEvent;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventType;
import org.apache.dolphinscheduler.server.master.exception.MasterException;
import org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.commons.collections4.CollectionUtils;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@ -56,6 +53,9 @@ import org.springframework.stereotype.Service;
@Slf4j
public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCloseable {
@Autowired
private ICommandFetcher commandFetcher;
@Autowired
private CommandService commandService;
@ -74,9 +74,6 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
@Autowired
private WorkflowEventLooper workflowEventLooper;
@Autowired
private MasterSlotManager masterSlotManager;
@Autowired
private MasterTaskExecutorBootstrap masterTaskExecutorBootstrap;
@ -125,7 +122,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
List<Command> commands = findCommands();
List<Command> commands = commandFetcher.fetchCommands();
if (CollectionUtils.isEmpty(commands)) {
// indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
@ -170,29 +167,4 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
}
}
private List<Command> findCommands() throws MasterException {
try {
long scheduleStartTime = System.currentTimeMillis();
int thisMasterSlot = masterSlotManager.getSlot();
int masterCount = masterSlotManager.getMasterSize();
if (masterCount <= 0) {
log.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot);
return Collections.emptyList();
}
int pageSize = masterConfig.getFetchCommandNum();
final List<Command> result =
commandService.findCommandPageBySlot(pageSize, masterCount, thisMasterSlot);
if (CollectionUtils.isNotEmpty(result)) {
long cost = System.currentTimeMillis() - scheduleStartTime;
log.info(
"Master schedule bootstrap loop command success, fetch command size: {}, cost: {}ms, current slot: {}, total slot size: {}",
result.size(), cost, thisMasterSlot, masterCount);
ProcessInstanceMetrics.recordCommandQueryTime(cost);
}
return result;
} catch (Exception ex) {
throw new MasterException("Master loop command from database error", ex);
}
}
}

9
dolphinscheduler-master/src/main/resources/application.yaml

@ -83,8 +83,6 @@ registry:
master:
listen-port: 5678
# master fetch command num
fetch-command-num: 10
# master prepare execute thread number to limit handle commands in parallel
pre-exec-threads: 10
# master execute thread number to limit process instances in parallel
@ -121,6 +119,13 @@ master:
# The max waiting time to reconnect to registry if you set the strategy to waiting
max-waiting-time: 100s
worker-group-refresh-interval: 10s
command-fetch-strategy:
type: ID_SLOT_BASED
config:
# The incremental id step
id-step: 1
# master fetch command num
fetch-size: 10
server:
port: 5679

12
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.config;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -47,6 +48,17 @@ public class MasterConfigTest {
assertEquals(0.77, serverLoadProtection.getMaxJvmCpuUsagePercentageThresholds());
assertEquals(0.77, serverLoadProtection.getMaxSystemMemoryUsagePercentageThresholds());
assertEquals(0.77, serverLoadProtection.getMaxDiskUsagePercentageThresholds());
}
@Test
public void getCommandFetchStrategy() {
CommandFetchStrategy commandFetchStrategy = masterConfig.getCommandFetchStrategy();
assertThat(commandFetchStrategy.getType())
.isEqualTo(CommandFetchStrategy.CommandFetchStrategyType.ID_SLOT_BASED);
CommandFetchStrategy.IdSlotBasedFetchConfig idSlotBasedFetchConfig =
(CommandFetchStrategy.IdSlotBasedFetchConfig) commandFetchStrategy.getConfig();
assertThat(idSlotBasedFetchConfig.getIdStep()).isEqualTo(3);
assertThat(idSlotBasedFetchConfig.getFetchSize()).isEqualTo(11);
}
}

9
dolphinscheduler-master/src/test/resources/application.yaml

@ -89,8 +89,6 @@ registry:
master:
listen-port: 5678
# master fetch command num
fetch-command-num: 10
# master prepare execute thread number to limit handle commands in parallel
pre-exec-threads: 10
# master execute thread number to limit process instances in parallel
@ -127,6 +125,13 @@ master:
# The max waiting time to reconnect to registry if you set the strategy to waiting
max-waiting-time: 100s
worker-group-refresh-interval: 10s
command-fetch-strategy:
type: ID_SLOT_BASED
config:
# The incremental id step
id-step: 3
# master fetch command num
fetch-size: 11
server:
port: 5679

11
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java

@ -22,8 +22,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.List;
/**
* Command Service
*/
@ -44,15 +42,6 @@ public interface CommandService {
*/
int createCommand(Command command);
/**
* Get command page
* @param pageSize page size
* @param masterCount master count
* @param thisMasterSlot master slot
* @return command page
*/
List<Command> findCommandPageBySlot(int pageSize, int masterCount, int thisMasterSlot);
/**
* check the input command exists in queue list
*

9
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java

@ -57,7 +57,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import io.micrometer.core.annotation.Counted;
/**
@ -107,14 +106,6 @@ public class CommandServiceImpl implements CommandService {
return result;
}
@Override
public List<Command> findCommandPageBySlot(int pageSize, int masterCount, int thisMasterSlot) {
if (masterCount <= 0) {
return Lists.newArrayList();
}
return commandMapper.queryCommandPageBySlot(pageSize, masterCount, thisMasterSlot);
}
@Override
public boolean verifyIsNeedCreateCommand(Command command) {
boolean isNeedCreate = true;

10
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java

@ -214,14 +214,4 @@ class MessageServiceImplTest {
Mockito.verify(commandMapper, Mockito.times(1)).insert(command);
}
@Test
public void testFindCommandPageBySlot() {
int pageSize = 1;
int masterCount = 0;
int thisMasterSlot = 2;
List<Command> commandList =
commandService.findCommandPageBySlot(pageSize, masterCount, thisMasterSlot);
Assertions.assertEquals(0, commandList.size());
}
}

9
dolphinscheduler-standalone-server/src/main/resources/application.yaml

@ -160,8 +160,6 @@ casdoor:
master:
listen-port: 5678
# master fetch command num
fetch-command-num: 10
# master prepare execute thread number to limit handle commands in parallel
pre-exec-threads: 10
# master execute thread number to limit process instances in parallel
@ -192,6 +190,13 @@ master:
# kill yarn/k8s application when failover taskInstance, default true
kill-application-when-task-failover: true
worker-group-refresh-interval: 10s
command-fetch-strategy:
type: ID_SLOT_BASED
config:
# The incremental id step
id-step: 1
# master fetch command num
fetch-size: 10
worker:
# worker listener port

Loading…
Cancel
Save