Browse Source

Validate master/worker config (#10649)

3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
35b25da863
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 178
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  2. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
  3. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
  4. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  5. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  6. 10
      dolphinscheduler-master/src/main/resources/application.yaml
  7. 5
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
  8. 3
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
  9. 3
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  10. 3
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
  11. 3
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
  12. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  13. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  14. 16
      dolphinscheduler-standalone-server/src/main/resources/application.yaml
  15. 138
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  16. 3
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
  17. 4
      dolphinscheduler-worker/src/main/resources/application.yaml
  18. 3
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java

178
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -21,170 +21,108 @@ import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelect
import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.springframework.boot.context.properties.ConfigurationProperties; import java.time.Duration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;
@Component import org.springframework.boot.context.properties.ConfigurationProperties;
@EnableConfigurationProperties import org.springframework.context.annotation.Configuration;
@ConfigurationProperties("master") import org.springframework.validation.Errors;
public class MasterConfig { import org.springframework.validation.Validator;
import org.springframework.validation.annotation.Validated;
import lombok.Data;
@Data
@Validated
@Configuration
@ConfigurationProperties(prefix = "master")
public class MasterConfig implements Validator {
/** /**
* The master RPC server listen port. * The master RPC server listen port.
*/ */
private int listenPort; private int listenPort = 5678;
/** /**
* The max batch size used to fetch command from database. * The max batch size used to fetch command from database.
*/ */
private int fetchCommandNum; private int fetchCommandNum = 10;
/** /**
* The thread number used to prepare processInstance. This number shouldn't bigger than fetchCommandNum. * The thread number used to prepare processInstance. This number shouldn't bigger than fetchCommandNum.
*/ */
private int preExecThreads; private int preExecThreads = 10;
/** /**
* todo: We may need to split the process/task into different thread size. * todo: We may need to split the process/task into different thread size.
* The thread number used to handle processInstance and task event. * The thread number used to handle processInstance and task event.
* Will create two thread poll to execute {@link WorkflowExecuteRunnable} and {@link TaskExecuteRunnable}. * Will create two thread poll to execute {@link WorkflowExecuteRunnable} and {@link TaskExecuteRunnable}.
*/ */
private int execThreads; private int execThreads = 10;
/** /**
* The task dispatch thread pool size. * The task dispatch thread pool size.
*/ */
private int dispatchTaskNumber; private int dispatchTaskNumber = 3;
/** /**
* Worker select strategy. * Worker select strategy.
*/ */
private HostSelector hostSelector; private HostSelector hostSelector = HostSelector.LOWER_WEIGHT;
/** /**
* Master heart beat task execute interval. * Master heart beat task execute interval.
*/ */
private int heartbeatInterval; private Duration heartbeatInterval = Duration.ofSeconds(10);
/** /**
* task submit max retry times. * task submit max retry times.
*/ */
private int taskCommitRetryTimes; private int taskCommitRetryTimes = 5;
/** /**
* task submit retry interval/ms. * task submit retry interval.
*/ */
private int taskCommitInterval; private Duration taskCommitInterval = Duration.ofSeconds(1);
/** /**
* state wheel check interval/ms, if this value is bigger, may increase the delay of task/processInstance. * state wheel check interval, if this value is bigger, may increase the delay of task/processInstance.
*/ */
private int stateWheelInterval; private Duration stateWheelInterval = Duration.ofMillis(5);
private double maxCpuLoadAvg; private double maxCpuLoadAvg = -1;
private double reservedMemory; private double reservedMemory = 0.3;
private int failoverInterval; private Duration failoverInterval = Duration.ofMinutes(10);
private boolean killYarnJobWhenTaskFailover; private boolean killYarnJobWhenTaskFailover = true;
public int getListenPort() {
return listenPort;
}
public void setListenPort(int listenPort) {
this.listenPort = listenPort;
}
public int getFetchCommandNum() {
return fetchCommandNum;
}
public void setFetchCommandNum(int fetchCommandNum) {
this.fetchCommandNum = fetchCommandNum;
}
public int getPreExecThreads() {
return preExecThreads;
}
public void setPreExecThreads(int preExecThreads) { @Override
this.preExecThreads = preExecThreads; public boolean supports(Class<?> clazz) {
return MasterConfig.class.isAssignableFrom(clazz);
} }
public int getExecThreads() { @Override
return execThreads; public void validate(Object target, Errors errors) {
MasterConfig masterConfig = (MasterConfig) target;
if (masterConfig.getListenPort() <= 0) {
errors.rejectValue("listen-port", null, "is invalidated");
} }
if (masterConfig.getFetchCommandNum() <= 0) {
public void setExecThreads(int execThreads) { errors.rejectValue("fetch-command-num", null, "should be a positive value");
this.execThreads = execThreads;
} }
if (masterConfig.getPreExecThreads() <= 0) {
public int getDispatchTaskNumber() { errors.rejectValue("per-exec-threads", null, "should be a positive value");
return dispatchTaskNumber;
} }
if (masterConfig.getExecThreads() <= 0) {
public void setDispatchTaskNumber(int dispatchTaskNumber) { errors.rejectValue("exec-threads", null, "should be a positive value");
this.dispatchTaskNumber = dispatchTaskNumber;
} }
if (masterConfig.getDispatchTaskNumber() <= 0) {
public HostSelector getHostSelector() { errors.rejectValue("dispatch-task-number", null, "should be a positive value");
return hostSelector;
} }
if (masterConfig.getHeartbeatInterval().toMillis() < 0) {
public void setHostSelector(HostSelector hostSelector) { errors.rejectValue("heartbeat-interval", null, "should be a valid duration");
this.hostSelector = hostSelector;
} }
if (masterConfig.getTaskCommitRetryTimes() <= 0) {
public int getHeartbeatInterval() { errors.rejectValue("task-commit-retry-times", null, "should be a positive value");
return heartbeatInterval;
} }
if (masterConfig.getTaskCommitInterval().toMillis() <= 0) {
public void setHeartbeatInterval(int heartbeatInterval) { errors.rejectValue("task-commit-interval", null, "should be a valid duration");
this.heartbeatInterval = heartbeatInterval;
} }
if (masterConfig.getStateWheelInterval().toMillis() <= 0) {
public int getTaskCommitRetryTimes() { errors.rejectValue("state-wheel-interval", null, "should be a valid duration");
return taskCommitRetryTimes;
} }
if (masterConfig.getFailoverInterval().toMillis() <= 0) {
public void setTaskCommitRetryTimes(int taskCommitRetryTimes) { errors.rejectValue("failover-interval", null, "should be a valid duration");
this.taskCommitRetryTimes = taskCommitRetryTimes;
} }
if (masterConfig.getMaxCpuLoadAvg() <= 0) {
public int getTaskCommitInterval() { masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
return taskCommitInterval;
} }
public void setTaskCommitInterval(int taskCommitInterval) {
this.taskCommitInterval = taskCommitInterval;
}
public int getStateWheelInterval() {
return stateWheelInterval;
}
public void setStateWheelInterval(int stateWheelInterval) {
this.stateWheelInterval = stateWheelInterval;
}
public double getMaxCpuLoadAvg() {
return maxCpuLoadAvg > 0 ? maxCpuLoadAvg : Runtime.getRuntime().availableProcessors() * 2;
}
public void setMaxCpuLoadAvg(double maxCpuLoadAvg) {
this.maxCpuLoadAvg = maxCpuLoadAvg;
}
public double getReservedMemory() {
return reservedMemory;
}
public void setReservedMemory(double reservedMemory) {
this.reservedMemory = reservedMemory;
}
public int getFailoverInterval() {
return failoverInterval;
}
public void setFailoverInterval(int failoverInterval) {
this.failoverInterval = failoverInterval;
}
public boolean isKillYarnJobWhenTaskFailover() {
return killYarnJobWhenTaskFailover;
}
public void setKillYarnJobWhenTaskFailover(boolean killYarnJobWhenTaskFailover) {
this.killYarnJobWhenTaskFailover = killYarnJobWhenTaskFailover;
} }
} }

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.time.Duration;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -186,7 +187,7 @@ public class MasterRegistryClient {
void registry() { void registry() {
logger.info("Master node : {} registering to registry center", masterAddress); logger.info("Master node : {} registering to registry center", masterAddress);
String localNodePath = getCurrentNodePath(); String localNodePath = getCurrentNodePath();
int masterHeartbeatInterval = masterConfig.getHeartbeatInterval(); Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
masterConfig.getMaxCpuLoadAvg(), masterConfig.getMaxCpuLoadAvg(),
masterConfig.getReservedMemory(), masterConfig.getReservedMemory(),
@ -209,7 +210,7 @@ public class MasterRegistryClient {
// delete dead server // delete dead server
registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP); registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval, TimeUnit.SECONDS); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval.getSeconds(), TimeUnit.SECONDS);
logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s", masterAddress, masterHeartbeatInterval); logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s", masterAddress, masterHeartbeatInterval);
} }

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java

@ -67,7 +67,7 @@ public class FailoverExecuteThread extends BaseDaemonThread {
} catch (Exception e) { } catch (Exception e) {
logger.error("Master failover thread execute error", e); logger.error("Master failover thread execute error", e);
} finally { } finally {
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * masterConfig.getFailoverInterval() * 60); ThreadUtils.sleep(masterConfig.getFailoverInterval().toMillis());
} }
} }
} }

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -93,7 +93,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
@Override @Override
public void run() { public void run() {
Duration checkInterval = Duration.ofMillis(masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS); Duration checkInterval = masterConfig.getStateWheelInterval();
while (Stopper.isRunning()) { while (Stopper.isRunning()) {
try { try {
checkTask4Timeout(); checkTask4Timeout();

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -81,6 +81,7 @@ import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -114,7 +115,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected int maxRetryTimes; protected int maxRetryTimes;
protected int commitInterval; protected long commitInterval;
protected ProcessService processService; protected ProcessService processService;
@ -132,7 +133,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
this.taskInstance = taskInstance; this.taskInstance = taskInstance;
this.processInstance = processInstance; this.processInstance = processInstance;
this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes(); this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes();
this.commitInterval = masterConfig.getTaskCommitInterval(); this.commitInterval = masterConfig.getTaskCommitInterval().toMillis();
} }
protected javax.sql.DataSource defaultDataSource = protected javax.sql.DataSource defaultDataSource =

10
dolphinscheduler-master/src/main/resources/application.yaml

@ -96,19 +96,19 @@ master:
dispatch-task-number: 3 dispatch-task-number: 3
# master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
host-selector: lower_weight host-selector: lower_weight
# master heartbeat interval, the unit is second # master heartbeat interval
heartbeat-interval: 10 heartbeat-interval: 10s
# master commit task retry times # master commit task retry times
task-commit-retry-times: 5 task-commit-retry-times: 5
# master commit task interval, the unit is millisecond # master commit task interval
task-commit-interval: 1000 task-commit-interval: 1s
state-wheel-interval: 5 state-wheel-interval: 5
# master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2 # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
max-cpu-load-avg: -1 max-cpu-load-avg: -1
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
reserved-memory: 0.3 reserved-memory: 0.3
# failover interval, the unit is minute # failover interval, the unit is minute
failover-interval: 10 failover-interval: 10m
# kill yarn jon when failover taskInstance, default true # kill yarn jon when failover taskInstance, default true
kill-yarn-job-when-task-failover: true kill-yarn-job-when-task-failover: true

5
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java

@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -77,7 +78,7 @@ public class BlockingTaskTest {
config = new MasterConfig(); config = new MasterConfig();
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
config.setTaskCommitRetryTimes(3); config.setTaskCommitRetryTimes(3);
config.setTaskCommitInterval(1000); config.setTaskCommitInterval(Duration.ofSeconds(1));
// mock process service // mock process service
processService = Mockito.mock(ProcessService.class); processService = Mockito.mock(ProcessService.class);
@ -122,7 +123,7 @@ public class BlockingTaskTest {
Mockito.when(processService Mockito.when(processService
.submitTaskWithRetry(Mockito.any(ProcessInstance.class) .submitTaskWithRetry(Mockito.any(ProcessInstance.class)
, Mockito.any(TaskInstance.class) , Mockito.any(TaskInstance.class)
, Mockito.any(Integer.class), Mockito.any(Integer.class))) , Mockito.any(Integer.class), Mockito.any(Long.class)))
.thenReturn(taskInstance); .thenReturn(taskInstance);
return taskInstance; return taskInstance;
} }

3
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java

@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -66,7 +67,7 @@ public class ConditionsTaskTest {
MasterConfig config = new MasterConfig(); MasterConfig config = new MasterConfig();
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
config.setTaskCommitRetryTimes(3); config.setTaskCommitRetryTimes(3);
config.setTaskCommitInterval(1000); config.setTaskCommitInterval(Duration.ofSeconds(1));
processService = Mockito.mock(ProcessService.class); processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);

3
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.time.Duration;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -85,7 +86,7 @@ public class DependentTaskTest {
MasterConfig config = new MasterConfig(); MasterConfig config = new MasterConfig();
config.setTaskCommitRetryTimes(3); config.setTaskCommitRetryTimes(3);
config.setTaskCommitInterval(1000); config.setTaskCommitInterval(Duration.ofSeconds(1));
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
processService = Mockito.mock(ProcessService.class); processService = Mockito.mock(ProcessService.class);

3
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java

@ -44,6 +44,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -69,7 +70,7 @@ public class SubProcessTaskTest {
MasterConfig config = new MasterConfig(); MasterConfig config = new MasterConfig();
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
config.setTaskCommitRetryTimes(3); config.setTaskCommitRetryTimes(3);
config.setTaskCommitInterval(1000); config.setTaskCommitInterval(Duration.ofSeconds(1));
PowerMockito.mockStatic(Stopper.class); PowerMockito.mockStatic(Stopper.class);
PowerMockito.when(Stopper.isRunning()).thenReturn(true); PowerMockito.when(Stopper.isRunning()).thenReturn(true);

3
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -59,7 +60,7 @@ public class SwitchTaskTest {
MasterConfig config = new MasterConfig(); MasterConfig config = new MasterConfig();
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
config.setTaskCommitRetryTimes(3); config.setTaskCommitRetryTimes(3);
config.setTaskCommitInterval(1000); config.setTaskCommitInterval(Duration.ofSeconds(1));
processService = Mockito.mock(ProcessService.class); processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -103,7 +103,7 @@ public interface ProcessService {
void setSubProcessParam(ProcessInstance subProcessInstance); void setSubProcessParam(ProcessInstance subProcessInstance);
TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, int commitInterval); TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, long commitInterval);
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance); TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance);

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -1259,7 +1259,7 @@ public class ProcessServiceImpl implements ProcessService {
* retry submit task to db * retry submit task to db
*/ */
@Override @Override
public TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, int commitInterval) { public TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, long commitInterval) {
int retryTimes = 1; int retryTimes = 1;
TaskInstance task = null; TaskInstance task = null;
while (retryTimes <= commitRetryTimes) { while (retryTimes <= commitRetryTimes) {

16
dolphinscheduler-standalone-server/src/main/resources/application.yaml

@ -131,19 +131,19 @@ master:
dispatch-task-number: 3 dispatch-task-number: 3
# master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
host-selector: lower_weight host-selector: lower_weight
# master heartbeat interval, the unit is second # master heartbeat interval
heartbeat-interval: 10 heartbeat-interval: 10s
# master commit task retry times # master commit task retry times
task-commit-retry-times: 5 task-commit-retry-times: 5
# master commit task interval, the unit is millisecond # master commit task interval
task-commit-interval: 1000 task-commit-interval: 1s
state-wheel-interval: 5 state-wheel-interval: 5
# master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2 # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
max-cpu-load-avg: -1 max-cpu-load-avg: -1
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
reserved-memory: 0.3 reserved-memory: 0.3
# failover interval, the unit is minute # failover interval
failover-interval: 10 failover-interval: 10m
# kill yarn jon when failover taskInstance, default true # kill yarn jon when failover taskInstance, default true
kill-yarn-job-when-task-failover: true kill-yarn-job-when-task-failover: true
@ -152,8 +152,8 @@ worker:
listen-port: 1234 listen-port: 1234
# worker execute thread number to limit task instances in parallel # worker execute thread number to limit task instances in parallel
exec-threads: 10 exec-threads: 10
# worker heartbeat interval, the unit is second # worker heartbeat interval
heartbeat-interval: 10 heartbeat-interval: 10s
# worker host weight to dispatch tasks, default value 100 # worker host weight to dispatch tasks, default value 100
host-weight: 100 host-weight: 100
# tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. # tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true.

138
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -17,113 +17,53 @@
package org.apache.dolphinscheduler.server.worker.config; package org.apache.dolphinscheduler.server.worker.config;
import java.time.Duration;
import java.util.Set; import java.util.Set;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.validation.Errors;
import org.springframework.validation.Validator;
import org.springframework.validation.annotation.Validated;
@Configuration import com.google.common.collect.Sets;
@EnableConfigurationProperties
@ConfigurationProperties("worker")
public class WorkerConfig {
private int listenPort;
private int execThreads;
private int heartbeatInterval;
private int hostWeight;
private boolean tenantAutoCreate;
private boolean tenantDistributedUser;
private int maxCpuLoadAvg;
private double reservedMemory;
private Set<String> groups;
private String alertListenHost;
private int alertListenPort;
public int getListenPort() {
return listenPort;
}
public void setListenPort(int listenPort) { import lombok.Data;
this.listenPort = listenPort;
}
public int getExecThreads() {
return execThreads;
}
public void setExecThreads(int execThreads) { @Data
this.execThreads = execThreads; @Validated
} @Configuration
@ConfigurationProperties(prefix = "worker")
public int getHeartbeatInterval() { public class WorkerConfig implements Validator {
return heartbeatInterval; private int listenPort = 1234;
} private int execThreads = 10;
private Duration heartbeatInterval = Duration.ofSeconds(10);
public void setHeartbeatInterval(int heartbeatInterval) { private int hostWeight = 100;
this.heartbeatInterval = heartbeatInterval; private boolean tenantAutoCreate = true;
} private boolean tenantDistributedUser = false;
private int maxCpuLoadAvg = -1;
public int getHostWeight() { private double reservedMemory = 0.3;
return hostWeight; private Set<String> groups = Sets.newHashSet("default");
} private String alertListenHost = "localhost";
private int alertListenPort = 50052;
public void setHostWeight(int hostWeight) {
this.hostWeight = hostWeight; @Override
} public boolean supports(Class<?> clazz) {
return WorkerConfig.class.isAssignableFrom(clazz);
public boolean isTenantAutoCreate() { }
return tenantAutoCreate;
} @Override
public void validate(Object target, Errors errors) {
public void setTenantAutoCreate(boolean tenantAutoCreate) { WorkerConfig workerConfig = (WorkerConfig) target;
this.tenantAutoCreate = tenantAutoCreate; if (workerConfig.getExecThreads() <= 0) {
} errors.rejectValue("exec-threads", null, "should be a positive value");
}
public int getMaxCpuLoadAvg() { if (workerConfig.getHeartbeatInterval().toMillis() <= 0) {
return maxCpuLoadAvg > 0 ? maxCpuLoadAvg : Runtime.getRuntime().availableProcessors() * 2; errors.rejectValue("heartbeat-interval", null, "shoule be a valid duration");
} }
if (workerConfig.getMaxCpuLoadAvg() <= 0) {
public void setMaxCpuLoadAvg(int maxCpuLoadAvg) { workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
this.maxCpuLoadAvg = maxCpuLoadAvg;
}
public double getReservedMemory() {
return reservedMemory;
}
public void setReservedMemory(double reservedMemory) {
this.reservedMemory = reservedMemory;
}
public Set<String> getGroups() {
return groups;
}
public void setGroups(Set<String> groups) {
this.groups = groups;
}
public String getAlertListenHost() {
return alertListenHost;
}
public void setAlertListenHost(String alertListenHost) {
this.alertListenHost = alertListenHost;
}
public int getAlertListenPort() {
return alertListenPort;
}
public void setAlertListenPort(final int alertListenPort) {
this.alertListenPort = alertListenPort;
}
public boolean isTenantDistributedUser() {
return tenantDistributedUser;
} }
public void setTenantDistributedUser(boolean tenantDistributedUser) {
this.tenantDistributedUser = tenantDistributedUser;
} }
} }

3
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java

@ -46,6 +46,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -97,7 +98,7 @@ public class WorkerRegistryClient {
public void registry() { public void registry() {
String address = NetUtils.getAddr(workerConfig.getListenPort()); String address = NetUtils.getAddr(workerConfig.getListenPort());
Set<String> workerZkPaths = getWorkerZkPaths(); Set<String> workerZkPaths = getWorkerZkPaths();
int workerHeartbeatInterval = workerConfig.getHeartbeatInterval(); long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
workerConfig.getMaxCpuLoadAvg(), workerConfig.getMaxCpuLoadAvg(),

4
dolphinscheduler-worker/src/main/resources/application.yaml

@ -58,8 +58,8 @@ worker:
listen-port: 1234 listen-port: 1234
# worker execute thread number to limit task instances in parallel # worker execute thread number to limit task instances in parallel
exec-threads: 100 exec-threads: 100
# worker heartbeat interval, the unit is second # worker heartbeat interval
heartbeat-interval: 10 heartbeat-interval: 10s
# worker host weight to dispatch tasks, default value 100 # worker host weight to dispatch tasks, default value 100
host-weight: 100 host-weight: 100
# tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. # tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true.

3
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java

@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.time.Duration;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -91,7 +92,7 @@ public class WorkerRegistryClientTest {
given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(NodeType.class))).willReturn(true); given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(NodeType.class))).willReturn(true);
given(workerConfig.getHeartbeatInterval()).willReturn(1); given(workerConfig.getHeartbeatInterval()).willReturn(Duration.ofSeconds(1));
workerRegistryClient.registry(); workerRegistryClient.registry();

Loading…
Cancel
Save