Browse Source

Validate master/worker config (#10649)

(cherry picked from commit 35b25da863)
3.0.0/version-upgrade
Wenjun Ruan 2 years ago
parent
commit
4b224ae2e5
  1. 206
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  2. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
  3. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
  4. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  5. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  6. 10
      dolphinscheduler-master/src/main/resources/application.yaml
  7. 5
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
  8. 3
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
  9. 3
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  10. 3
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
  11. 3
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
  12. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  13. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  14. 16
      dolphinscheduler-standalone-server/src/main/resources/application.yaml
  15. 131
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  16. 2
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
  17. 4
      dolphinscheduler-worker/src/main/resources/application.yaml
  18. 13
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java

206
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -21,170 +21,108 @@ import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelect
import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.springframework.boot.context.properties.ConfigurationProperties; import java.time.Duration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;
@Component import org.springframework.boot.context.properties.ConfigurationProperties;
@EnableConfigurationProperties import org.springframework.context.annotation.Configuration;
@ConfigurationProperties("master") import org.springframework.validation.Errors;
public class MasterConfig { import org.springframework.validation.Validator;
import org.springframework.validation.annotation.Validated;
import lombok.Data;
@Data
@Validated
@Configuration
@ConfigurationProperties(prefix = "master")
public class MasterConfig implements Validator {
/** /**
* The master RPC server listen port. * The master RPC server listen port.
*/ */
private int listenPort; private int listenPort = 5678;
/** /**
* The max batch size used to fetch command from database. * The max batch size used to fetch command from database.
*/ */
private int fetchCommandNum; private int fetchCommandNum = 10;
/** /**
* The thread number used to prepare processInstance. This number shouldn't bigger than fetchCommandNum. * The thread number used to prepare processInstance. This number shouldn't bigger than fetchCommandNum.
*/ */
private int preExecThreads; private int preExecThreads = 10;
/** /**
* todo: We may need to split the process/task into different thread size. * todo: We may need to split the process/task into different thread size.
* The thread number used to handle processInstance and task event. * The thread number used to handle processInstance and task event.
* Will create two thread poll to execute {@link WorkflowExecuteRunnable} and {@link TaskExecuteRunnable}. * Will create two thread poll to execute {@link WorkflowExecuteRunnable} and {@link TaskExecuteRunnable}.
*/ */
private int execThreads; private int execThreads = 10;
/** /**
* The task dispatch thread pool size. * The task dispatch thread pool size.
*/ */
private int dispatchTaskNumber; private int dispatchTaskNumber = 3;
/** /**
* Worker select strategy. * Worker select strategy.
*/ */
private HostSelector hostSelector; private HostSelector hostSelector = HostSelector.LOWER_WEIGHT;
/** /**
* Master heart beat task execute interval. * Master heart beat task execute interval.
*/ */
private int heartbeatInterval; private Duration heartbeatInterval = Duration.ofSeconds(10);
/** /**
* task submit max retry times. * task submit max retry times.
*/ */
private int taskCommitRetryTimes; private int taskCommitRetryTimes = 5;
/** /**
* task submit retry interval/ms. * task submit retry interval.
*/ */
private int taskCommitInterval; private Duration taskCommitInterval = Duration.ofSeconds(1);
/** /**
* state wheel check interval/ms, if this value is bigger, may increase the delay of task/processInstance. * state wheel check interval, if this value is bigger, may increase the delay of task/processInstance.
*/ */
private int stateWheelInterval; private Duration stateWheelInterval = Duration.ofMillis(5);
private double maxCpuLoadAvg; private double maxCpuLoadAvg = -1;
private double reservedMemory; private double reservedMemory = 0.3;
private int failoverInterval; private Duration failoverInterval = Duration.ofMinutes(10);
private boolean killYarnJobWhenTaskFailover; private boolean killYarnJobWhenTaskFailover = true;
public int getListenPort() { @Override
return listenPort; public boolean supports(Class<?> clazz) {
} return MasterConfig.class.isAssignableFrom(clazz);
}
public void setListenPort(int listenPort) {
this.listenPort = listenPort; @Override
} public void validate(Object target, Errors errors) {
MasterConfig masterConfig = (MasterConfig) target;
public int getFetchCommandNum() { if (masterConfig.getListenPort() <= 0) {
return fetchCommandNum; errors.rejectValue("listen-port", null, "is invalidated");
} }
if (masterConfig.getFetchCommandNum() <= 0) {
public void setFetchCommandNum(int fetchCommandNum) { errors.rejectValue("fetch-command-num", null, "should be a positive value");
this.fetchCommandNum = fetchCommandNum; }
} if (masterConfig.getPreExecThreads() <= 0) {
errors.rejectValue("per-exec-threads", null, "should be a positive value");
public int getPreExecThreads() { }
return preExecThreads; if (masterConfig.getExecThreads() <= 0) {
} errors.rejectValue("exec-threads", null, "should be a positive value");
}
public void setPreExecThreads(int preExecThreads) { if (masterConfig.getDispatchTaskNumber() <= 0) {
this.preExecThreads = preExecThreads; errors.rejectValue("dispatch-task-number", null, "should be a positive value");
} }
if (masterConfig.getHeartbeatInterval().toMillis() < 0) {
public int getExecThreads() { errors.rejectValue("heartbeat-interval", null, "should be a valid duration");
return execThreads; }
} if (masterConfig.getTaskCommitRetryTimes() <= 0) {
errors.rejectValue("task-commit-retry-times", null, "should be a positive value");
public void setExecThreads(int execThreads) { }
this.execThreads = execThreads; if (masterConfig.getTaskCommitInterval().toMillis() <= 0) {
} errors.rejectValue("task-commit-interval", null, "should be a valid duration");
}
public int getDispatchTaskNumber() { if (masterConfig.getStateWheelInterval().toMillis() <= 0) {
return dispatchTaskNumber; errors.rejectValue("state-wheel-interval", null, "should be a valid duration");
} }
if (masterConfig.getFailoverInterval().toMillis() <= 0) {
public void setDispatchTaskNumber(int dispatchTaskNumber) { errors.rejectValue("failover-interval", null, "should be a valid duration");
this.dispatchTaskNumber = dispatchTaskNumber; }
} if (masterConfig.getMaxCpuLoadAvg() <= 0) {
masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
public HostSelector getHostSelector() { }
return hostSelector;
}
public void setHostSelector(HostSelector hostSelector) {
this.hostSelector = hostSelector;
}
public int getHeartbeatInterval() {
return heartbeatInterval;
}
public void setHeartbeatInterval(int heartbeatInterval) {
this.heartbeatInterval = heartbeatInterval;
}
public int getTaskCommitRetryTimes() {
return taskCommitRetryTimes;
}
public void setTaskCommitRetryTimes(int taskCommitRetryTimes) {
this.taskCommitRetryTimes = taskCommitRetryTimes;
}
public int getTaskCommitInterval() {
return taskCommitInterval;
}
public void setTaskCommitInterval(int taskCommitInterval) {
this.taskCommitInterval = taskCommitInterval;
}
public int getStateWheelInterval() {
return stateWheelInterval;
}
public void setStateWheelInterval(int stateWheelInterval) {
this.stateWheelInterval = stateWheelInterval;
}
public double getMaxCpuLoadAvg() {
return maxCpuLoadAvg > 0 ? maxCpuLoadAvg : Runtime.getRuntime().availableProcessors() * 2;
}
public void setMaxCpuLoadAvg(double maxCpuLoadAvg) {
this.maxCpuLoadAvg = maxCpuLoadAvg;
}
public double getReservedMemory() {
return reservedMemory;
}
public void setReservedMemory(double reservedMemory) {
this.reservedMemory = reservedMemory;
}
public int getFailoverInterval() {
return failoverInterval;
}
public void setFailoverInterval(int failoverInterval) {
this.failoverInterval = failoverInterval;
}
public boolean isKillYarnJobWhenTaskFailover() {
return killYarnJobWhenTaskFailover;
}
public void setKillYarnJobWhenTaskFailover(boolean killYarnJobWhenTaskFailover) {
this.killYarnJobWhenTaskFailover = killYarnJobWhenTaskFailover;
} }
} }

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.time.Duration;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -187,7 +188,7 @@ public class MasterRegistryClient {
void registry() { void registry() {
logger.info("Master node : {} registering to registry center", masterAddress); logger.info("Master node : {} registering to registry center", masterAddress);
String localNodePath = getCurrentNodePath(); String localNodePath = getCurrentNodePath();
int masterHeartbeatInterval = masterConfig.getHeartbeatInterval(); Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
masterConfig.getMaxCpuLoadAvg(), masterConfig.getMaxCpuLoadAvg(),
masterConfig.getReservedMemory(), masterConfig.getReservedMemory(),
@ -210,7 +211,7 @@ public class MasterRegistryClient {
// delete dead server // delete dead server
registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP); registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval, TimeUnit.SECONDS); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval.getSeconds(), TimeUnit.SECONDS);
logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s", masterAddress, masterHeartbeatInterval); logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s", masterAddress, masterHeartbeatInterval);
} }

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java

@ -67,7 +67,7 @@ public class FailoverExecuteThread extends BaseDaemonThread {
} catch (Exception e) { } catch (Exception e) {
logger.error("Master failover thread execute error", e); logger.error("Master failover thread execute error", e);
} finally { } finally {
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * masterConfig.getFailoverInterval() * 60); ThreadUtils.sleep(masterConfig.getFailoverInterval().toMillis());
} }
} }
} }

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -93,7 +93,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
@Override @Override
public void run() { public void run() {
Duration checkInterval = Duration.ofMillis(masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS); Duration checkInterval = masterConfig.getStateWheelInterval();
while (Stopper.isRunning()) { while (Stopper.isRunning()) {
try { try {
checkTask4Timeout(); checkTask4Timeout();

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -63,6 +63,7 @@ import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -107,7 +108,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected int maxRetryTimes; protected int maxRetryTimes;
protected int commitInterval; protected long commitInterval;
protected ProcessService processService; protected ProcessService processService;
@ -125,7 +126,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
this.taskInstance = taskInstance; this.taskInstance = taskInstance;
this.processInstance = processInstance; this.processInstance = processInstance;
this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes(); this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes();
this.commitInterval = masterConfig.getTaskCommitInterval(); this.commitInterval = masterConfig.getTaskCommitInterval().toMillis();
} }
protected javax.sql.DataSource defaultDataSource = protected javax.sql.DataSource defaultDataSource =

10
dolphinscheduler-master/src/main/resources/application.yaml

@ -96,19 +96,19 @@ master:
dispatch-task-number: 3 dispatch-task-number: 3
# master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
host-selector: lower_weight host-selector: lower_weight
# master heartbeat interval, the unit is second # master heartbeat interval
heartbeat-interval: 10 heartbeat-interval: 10s
# master commit task retry times # master commit task retry times
task-commit-retry-times: 5 task-commit-retry-times: 5
# master commit task interval, the unit is millisecond # master commit task interval
task-commit-interval: 1000 task-commit-interval: 1s
state-wheel-interval: 5 state-wheel-interval: 5
# master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2 # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
max-cpu-load-avg: -1 max-cpu-load-avg: -1
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
reserved-memory: 0.3 reserved-memory: 0.3
# failover interval, the unit is minute # failover interval, the unit is minute
failover-interval: 10 failover-interval: 10m
# kill yarn jon when failover taskInstance, default true # kill yarn jon when failover taskInstance, default true
kill-yarn-job-when-task-failover: true kill-yarn-job-when-task-failover: true

5
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java

@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -77,7 +78,7 @@ public class BlockingTaskTest {
config = new MasterConfig(); config = new MasterConfig();
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
config.setTaskCommitRetryTimes(3); config.setTaskCommitRetryTimes(3);
config.setTaskCommitInterval(1000); config.setTaskCommitInterval(Duration.ofSeconds(1));
// mock process service // mock process service
processService = Mockito.mock(ProcessService.class); processService = Mockito.mock(ProcessService.class);
@ -122,7 +123,7 @@ public class BlockingTaskTest {
Mockito.when(processService Mockito.when(processService
.submitTaskWithRetry(Mockito.any(ProcessInstance.class) .submitTaskWithRetry(Mockito.any(ProcessInstance.class)
, Mockito.any(TaskInstance.class) , Mockito.any(TaskInstance.class)
, Mockito.any(Integer.class), Mockito.any(Integer.class))) , Mockito.any(Integer.class), Mockito.any(Long.class)))
.thenReturn(taskInstance); .thenReturn(taskInstance);
return taskInstance; return taskInstance;
} }

3
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java

@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -66,7 +67,7 @@ public class ConditionsTaskTest {
MasterConfig config = new MasterConfig(); MasterConfig config = new MasterConfig();
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
config.setTaskCommitRetryTimes(3); config.setTaskCommitRetryTimes(3);
config.setTaskCommitInterval(1000); config.setTaskCommitInterval(Duration.ofSeconds(1));
processService = Mockito.mock(ProcessService.class); processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);

3
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.time.Duration;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -85,7 +86,7 @@ public class DependentTaskTest {
MasterConfig config = new MasterConfig(); MasterConfig config = new MasterConfig();
config.setTaskCommitRetryTimes(3); config.setTaskCommitRetryTimes(3);
config.setTaskCommitInterval(1000); config.setTaskCommitInterval(Duration.ofSeconds(1));
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
processService = Mockito.mock(ProcessService.class); processService = Mockito.mock(ProcessService.class);

3
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java

@ -44,6 +44,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -69,7 +70,7 @@ public class SubProcessTaskTest {
MasterConfig config = new MasterConfig(); MasterConfig config = new MasterConfig();
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
config.setTaskCommitRetryTimes(3); config.setTaskCommitRetryTimes(3);
config.setTaskCommitInterval(1000); config.setTaskCommitInterval(Duration.ofSeconds(1));
PowerMockito.mockStatic(Stopper.class); PowerMockito.mockStatic(Stopper.class);
PowerMockito.when(Stopper.isRunning()).thenReturn(true); PowerMockito.when(Stopper.isRunning()).thenReturn(true);

3
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -59,7 +60,7 @@ public class SwitchTaskTest {
MasterConfig config = new MasterConfig(); MasterConfig config = new MasterConfig();
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
config.setTaskCommitRetryTimes(3); config.setTaskCommitRetryTimes(3);
config.setTaskCommitInterval(1000); config.setTaskCommitInterval(Duration.ofSeconds(1));
processService = Mockito.mock(ProcessService.class); processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -103,7 +103,7 @@ public interface ProcessService {
void setSubProcessParam(ProcessInstance subProcessInstance); void setSubProcessParam(ProcessInstance subProcessInstance);
TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, int commitInterval); TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, long commitInterval);
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance); TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance);

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -1233,7 +1233,7 @@ public class ProcessServiceImpl implements ProcessService {
* retry submit task to db * retry submit task to db
*/ */
@Override @Override
public TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, int commitInterval) { public TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, long commitInterval) {
int retryTimes = 1; int retryTimes = 1;
TaskInstance task = null; TaskInstance task = null;
while (retryTimes <= commitRetryTimes) { while (retryTimes <= commitRetryTimes) {

16
dolphinscheduler-standalone-server/src/main/resources/application.yaml

@ -114,19 +114,19 @@ master:
dispatch-task-number: 3 dispatch-task-number: 3
# master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
host-selector: lower_weight host-selector: lower_weight
# master heartbeat interval, the unit is second # master heartbeat interval
heartbeat-interval: 10 heartbeat-interval: 10s
# master commit task retry times # master commit task retry times
task-commit-retry-times: 5 task-commit-retry-times: 5
# master commit task interval, the unit is millisecond # master commit task interval
task-commit-interval: 1000 task-commit-interval: 1s
state-wheel-interval: 5 state-wheel-interval: 5
# master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2 # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
max-cpu-load-avg: -1 max-cpu-load-avg: -1
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
reserved-memory: 0.3 reserved-memory: 0.3
# failover interval, the unit is minute # failover interval
failover-interval: 10 failover-interval: 10m
# kill yarn jon when failover taskInstance, default true # kill yarn jon when failover taskInstance, default true
kill-yarn-job-when-task-failover: true kill-yarn-job-when-task-failover: true
@ -135,8 +135,8 @@ worker:
listen-port: 1234 listen-port: 1234
# worker execute thread number to limit task instances in parallel # worker execute thread number to limit task instances in parallel
exec-threads: 10 exec-threads: 10
# worker heartbeat interval, the unit is second # worker heartbeat interval
heartbeat-interval: 10 heartbeat-interval: 10s
# worker host weight to dispatch tasks, default value 100 # worker host weight to dispatch tasks, default value 100
host-weight: 100 host-weight: 100
# worker tenant auto create # worker tenant auto create

131
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -17,104 +17,53 @@
package org.apache.dolphinscheduler.server.worker.config; package org.apache.dolphinscheduler.server.worker.config;
import java.time.Duration;
import java.util.Set; import java.util.Set;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.validation.Errors;
import org.springframework.validation.Validator;
import org.springframework.validation.annotation.Validated;
@Configuration import com.google.common.collect.Sets;
@EnableConfigurationProperties
@ConfigurationProperties("worker")
public class WorkerConfig {
private int listenPort;
private int execThreads;
private int heartbeatInterval;
private int hostWeight;
private boolean tenantAutoCreate;
private int maxCpuLoadAvg;
private double reservedMemory;
private Set<String> groups;
private String alertListenHost;
private int alertListenPort;
public int getListenPort() {
return listenPort;
}
public void setListenPort(int listenPort) {
this.listenPort = listenPort;
}
public int getExecThreads() {
return execThreads;
}
public void setExecThreads(int execThreads) {
this.execThreads = execThreads;
}
public int getHeartbeatInterval() {
return heartbeatInterval;
}
public void setHeartbeatInterval(int heartbeatInterval) {
this.heartbeatInterval = heartbeatInterval;
}
public int getHostWeight() {
return hostWeight;
}
public void setHostWeight(int hostWeight) {
this.hostWeight = hostWeight;
}
public boolean isTenantAutoCreate() {
return tenantAutoCreate;
}
public void setTenantAutoCreate(boolean tenantAutoCreate) { import lombok.Data;
this.tenantAutoCreate = tenantAutoCreate;
}
public int getMaxCpuLoadAvg() {
return maxCpuLoadAvg > 0 ? maxCpuLoadAvg : Runtime.getRuntime().availableProcessors() * 2;
}
public void setMaxCpuLoadAvg(int maxCpuLoadAvg) {
this.maxCpuLoadAvg = maxCpuLoadAvg;
}
public double getReservedMemory() { @Data
return reservedMemory; @Validated
} @Configuration
@ConfigurationProperties(prefix = "worker")
public void setReservedMemory(double reservedMemory) { public class WorkerConfig implements Validator {
this.reservedMemory = reservedMemory; private int listenPort = 1234;
} private int execThreads = 10;
private Duration heartbeatInterval = Duration.ofSeconds(10);
public Set<String> getGroups() { private int hostWeight = 100;
return groups; private boolean tenantAutoCreate = true;
} private boolean tenantDistributedUser = false;
private int maxCpuLoadAvg = -1;
public void setGroups(Set<String> groups) { private double reservedMemory = 0.3;
this.groups = groups; private Set<String> groups = Sets.newHashSet("default");
} private String alertListenHost = "localhost";
private int alertListenPort = 50052;
public String getAlertListenHost() {
return alertListenHost; @Override
} public boolean supports(Class<?> clazz) {
return WorkerConfig.class.isAssignableFrom(clazz);
public void setAlertListenHost(String alertListenHost) { }
this.alertListenHost = alertListenHost;
} @Override
public void validate(Object target, Errors errors) {
public int getAlertListenPort() { WorkerConfig workerConfig = (WorkerConfig) target;
return alertListenPort; if (workerConfig.getExecThreads() <= 0) {
} errors.rejectValue("exec-threads", null, "should be a positive value");
}
if (workerConfig.getHeartbeatInterval().toMillis() <= 0) {
errors.rejectValue("heartbeat-interval", null, "shoule be a valid duration");
}
if (workerConfig.getMaxCpuLoadAvg() <= 0) {
workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
public void setAlertListenPort(final int alertListenPort) {
this.alertListenPort = alertListenPort;
} }
} }

2
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java

@ -99,7 +99,7 @@ public class WorkerRegistryClient {
public void registry() { public void registry() {
String address = NetUtils.getAddr(workerConfig.getListenPort()); String address = NetUtils.getAddr(workerConfig.getListenPort());
Set<String> workerZkPaths = getWorkerZkPaths(); Set<String> workerZkPaths = getWorkerZkPaths();
int workerHeartbeatInterval = workerConfig.getHeartbeatInterval(); long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
workerConfig.getMaxCpuLoadAvg(), workerConfig.getMaxCpuLoadAvg(),

4
dolphinscheduler-worker/src/main/resources/application.yaml

@ -58,8 +58,8 @@ worker:
listen-port: 1234 listen-port: 1234
# worker execute thread number to limit task instances in parallel # worker execute thread number to limit task instances in parallel
exec-threads: 100 exec-threads: 100
# worker heartbeat interval, the unit is second # worker heartbeat interval
heartbeat-interval: 10 heartbeat-interval: 10s
# worker host weight to dispatch tasks, default value 100 # worker host weight to dispatch tasks, default value 100
host-weight: 100 host-weight: 100
# worker tenant auto create # worker tenant auto create

13
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java

@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.time.Duration;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -86,15 +87,15 @@ public class WorkerRegistryClientTest {
@Test @Test
public void testRegistry() { public void testRegistry() {
workerRegistryClient.initWorkRegistry(); workerRegistryClient.initWorkRegistry();
given(workerManagerThread.getThreadPoolQueueSize()).willReturn(1); given(workerManagerThread.getThreadPoolQueueSize()).willReturn(1);
given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(NodeType.class))).willReturn(true); given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(NodeType.class))).willReturn(true);
given(workerConfig.getHeartbeatInterval()).willReturn(1); given(workerConfig.getHeartbeatInterval()).willReturn(Duration.ofSeconds(1));
workerRegistryClient.registry(); workerRegistryClient.registry();
Mockito.verify(registryClient, Mockito.times(1)).handleDeadServer(Mockito.anyCollection(), Mockito.any(NodeType.class), Mockito.anyString()); Mockito.verify(registryClient, Mockito.times(1)).handleDeadServer(Mockito.anyCollection(), Mockito.any(NodeType.class), Mockito.anyString());
} }

Loading…
Cancel
Save