Browse Source

[Feature-7024] Add waiting strategy to support master/worker can recover from registry lost (#11368)

* Add waiting strategy to support master/worker can recover from registry lost

* throw exception when zookeeper registry start failed due to interrupted
3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
7ff34c3947
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      docs/docs/en/architecture/configuration.md
  2. 4
      docs/docs/zh/architecture/configuration.md
  3. 20
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
  4. 5
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
  5. 513
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  6. 11
      dolphinscheduler-bom/pom.xml
  7. 29
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  8. 29
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/lifecycle/ServerLifeCycleException.java
  9. 75
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/lifecycle/ServerLifeCycleManager.java
  10. 45
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/lifecycle/ServerStatus.java
  11. 58
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
  12. 16
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  13. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
  14. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
  15. 53
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  16. 28
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  17. 12
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
  18. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventQueue.java
  19. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
  20. 21
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
  21. 24
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectStrategy.java
  22. 27
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
  23. 88
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
  24. 58
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java
  25. 134
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
  26. 9
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
  27. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
  28. 9
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
  29. 50
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
  30. 11
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  31. 19
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java
  32. 7
      dolphinscheduler-master/src/main/resources/application.yaml
  33. 13
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  34. 8
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
  35. 4
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
  36. 4
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
  37. 9
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
  38. 2
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
  39. 31
      dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategy.java
  40. 31
      dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategyProperties.java
  41. 11
      dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
  42. 25
      dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/StrategyType.java
  43. 40
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java
  44. 9
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java
  45. 45
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
  46. 34
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
  47. 55
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CronUtils.java
  48. 81
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
  49. 35
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  50. 43
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  51. 23
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
  52. 24
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectStrategy.java
  53. 61
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListener.java
  54. 90
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
  55. 55
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java
  56. 135
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
  57. 10
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
  58. 29
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
  59. 7
      dolphinscheduler-worker/src/main/resources/application.yaml
  60. 17
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java

4
docs/docs/en/architecture/configuration.md

@ -268,6 +268,8 @@ Location: `master-server/conf/application.yaml`
|master.reserved-memory|0.3|master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G|
|master.failover-interval|10|failover interval, the unit is minute|
|master.kill-yarn-job-when-task-failover|true|whether to kill yarn job when failover taskInstance|
|master.registry-disconnect-strategy.strategy|stop|Used when the master disconnect from registry, default value: stop. Optional values include stop, waiting|
|master.registry-disconnect-strategy.max-waiting-time|100s|Used when the master disconnect from registry, and the disconnect strategy is waiting, this config means the master will waiting to reconnect to registry in given times, and after the waiting times, if the master still cannot connect to registry, will stop itself, if the value is 0s, the Master will waitting infinitely|
### Worker Server related configuration
@ -285,6 +287,8 @@ Location: `worker-server/conf/application.yaml`
|worker.groups|default|worker groups separated by comma, e.g., 'worker.groups=default,test' <br> worker will join corresponding group according to this config when startup|
|worker.alert-listen-host|localhost|the alert listen host of worker|
|worker.alert-listen-port|50052|the alert listen port of worker|
|worker.registry-disconnect-strategy.strategy|stop|Used when the worker disconnect from registry, default value: stop. Optional values include stop, waiting|
|worker.registry-disconnect-strategy.max-waiting-time|100s|Used when the worker disconnect from registry, and the disconnect strategy is waiting, this config means the worker will waiting to reconnect to registry in given times, and after the waiting times, if the worker still cannot connect to registry, will stop itself, if the value is 0s, will waitting infinitely |
### Alert Server related configuration
Location: `alert-server/conf/application.yaml`

4
docs/docs/zh/architecture/configuration.md

@ -255,6 +255,8 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn相关的配置
|master.reserved-memory|0.3|master预留内存,只有低于系统可用内存时,master服务才能调度任务,单位为G|
|master.failover-interval|10|failover间隔,单位为分钟|
|master.kill-yarn-job-when-task-failover|true|当任务实例failover时,是否kill掉yarn job|
|master.registry-disconnect-strategy.strategy|stop|当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting|
|master.registry-disconnect-strategy.max-waiting-time|100s|当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 |
## Worker Server相关配置
位置:`worker-server/conf/application.yaml`
@ -270,6 +272,8 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn相关的配置
|worker.groups|default|worker分组配置,逗号分隔,例如'worker.groups=default,test' <br> worker启动时会根据该配置自动加入对应的分组|
|worker.alert-listen-host|localhost|alert监听host|
|worker.alert-listen-port|50052|alert监听端口|
|worker.registry-disconnect-strategy.strategy|stop|当Worker与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting|
|worker.registry-disconnect-strategy.max-waiting-time|100s|当Worker与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Worker与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Worker会丢弃kill正在执行的任务。值为0表示会无限期等待 |
## Alert Server相关配置

20
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.alert;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertConstants;
import org.apache.dolphinscheduler.alert.api.AlertData;
@ -26,7 +28,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
@ -34,9 +36,11 @@ import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.apache.commons.collections.CollectionUtils;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -44,14 +48,6 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.google.common.collect.Lists;
@Service
public final class AlertSenderService extends Thread {
@ -76,7 +72,7 @@ public final class AlertSenderService extends Thread {
@Override
public void run() {
logger.info("alert sender started");
while (Stopper.isRunning()) {
while (!ServerLifeCycleManager.isStopped()) {
try {
List<Alert> alerts = alertDao.listPendingAlerts();
AlertServerMetrics.registerPendingAlertGauge(alerts::size);

5
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java

@ -18,7 +18,7 @@
package org.apache.dolphinscheduler.alert;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
@ -40,6 +40,7 @@ import org.springframework.context.event.EventListener;
@SpringBootApplication
@ComponentScan("org.apache.dolphinscheduler")
public class AlertServer implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(AlertServer.class);
private final PluginDao pluginDao;
@ -94,7 +95,7 @@ public class AlertServer implements Closeable {
try {
// set stop signal is true
// execute only once
if (!Stopper.stop()) {
if (!ServerLifeCycleManager.toStopped()) {
logger.warn("AlterServer is already stopped");
return;
}

513
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

File diff suppressed because it is too large Load Diff

11
dolphinscheduler-bom/pom.xml

@ -15,7 +15,6 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@ -25,8 +24,8 @@
<version>dev-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-bom</artifactId>
<name>${project.artifactId}</name>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<properties>
<netty.version>4.1.53.Final</netty.version>
@ -99,8 +98,8 @@
<groupId>io.netty</groupId>
<artifactId>netty-bom</artifactId>
<version>${netty.version}</version>
<scope>import</scope>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
@ -199,8 +198,8 @@
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<artifactId>netty</artifactId>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>com.github.spotbugs</groupId>
@ -395,12 +394,12 @@
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<artifactId>com.sun.jersey</artifactId>
<groupId>jersey-json</groupId>
<artifactId>com.sun.jersey</artifactId>
</exclusion>
<exclusion>
<groupId>junit</groupId>

29
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -25,9 +25,6 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import java.time.Duration;
import java.util.regex.Pattern;
/**
* Constants
*/
public final class Constants {
private Constants() {
@ -44,13 +41,9 @@ public final class Constants {
*/
public static final String REGISTRY_DOLPHINSCHEDULER_MASTERS = "/nodes/master";
public static final String REGISTRY_DOLPHINSCHEDULER_WORKERS = "/nodes/worker";
public static final String REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS = "/dead-servers";
public static final String REGISTRY_DOLPHINSCHEDULER_NODE = "/nodes";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS = "/lock/masters";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS =
"/lock/failover/startup-masters";
public static final String FORMAT_SS = "%s%s";
public static final String FORMAT_S_S = "%s/%s";
@ -639,20 +632,20 @@ public final class Constants {
public static final String TASK_LOG_INFO_FORMAT = "TaskLogInfo-%s";
public static final int[] NOT_TERMINATED_STATES = new int[]{
WorkflowExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
TaskExecutionStatus.DISPATCH.ordinal(),
WorkflowExecutionStatus.RUNNING_EXECUTION.ordinal(),
WorkflowExecutionStatus.DELAY_EXECUTION.ordinal(),
WorkflowExecutionStatus.READY_PAUSE.ordinal(),
WorkflowExecutionStatus.READY_STOP.ordinal(),
TaskExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
WorkflowExecutionStatus.SUBMITTED_SUCCESS.getCode(),
TaskExecutionStatus.DISPATCH.getCode(),
WorkflowExecutionStatus.RUNNING_EXECUTION.getCode(),
WorkflowExecutionStatus.DELAY_EXECUTION.getCode(),
WorkflowExecutionStatus.READY_PAUSE.getCode(),
WorkflowExecutionStatus.READY_STOP.getCode(),
TaskExecutionStatus.NEED_FAULT_TOLERANCE.getCode(),
};
public static final int[] RUNNING_PROCESS_STATE = new int[]{
TaskExecutionStatus.RUNNING_EXECUTION.ordinal(),
TaskExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
TaskExecutionStatus.DISPATCH.ordinal(),
WorkflowExecutionStatus.SERIAL_WAIT.ordinal()
TaskExecutionStatus.RUNNING_EXECUTION.getCode(),
TaskExecutionStatus.SUBMITTED_SUCCESS.getCode(),
TaskExecutionStatus.DISPATCH.getCode(),
WorkflowExecutionStatus.SERIAL_WAIT.getCode()
};
/**

29
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/lifecycle/ServerLifeCycleException.java

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.lifecycle;
public class ServerLifeCycleException extends Exception {
public ServerLifeCycleException(String message) {
super(message);
}
public ServerLifeCycleException(String message, Throwable throwable) {
super(message, throwable);
}
}

75
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/lifecycle/ServerLifeCycleManager.java

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.lifecycle;
import lombok.experimental.UtilityClass;
@UtilityClass
public class ServerLifeCycleManager {
private static volatile ServerStatus serverStatus = ServerStatus.RUNNING;
public static boolean isRunning() {
return serverStatus == ServerStatus.RUNNING;
}
public static boolean isStopped() {
return serverStatus == ServerStatus.STOPPED;
}
public static ServerStatus getServerStatus() {
return serverStatus;
}
/**
* Change the current server state to {@link ServerStatus#WAITING}, only {@link ServerStatus#RUNNING} can change to {@link ServerStatus#WAITING}.
*
* @throws ServerLifeCycleException if change failed.
*/
public static synchronized void toWaiting() throws ServerLifeCycleException {
if (isStopped()) {
throw new ServerLifeCycleException("The current server is already stopped, cannot change to waiting");
}
if (serverStatus != ServerStatus.RUNNING) {
throw new ServerLifeCycleException("The current server is not at running status, cannot change to waiting");
}
serverStatus = ServerStatus.WAITING;
}
/**
* Recover from {@link ServerStatus#WAITING} to {@link ServerStatus#RUNNING}.
*
* @throws ServerLifeCycleException if change failed
*/
public static synchronized void recoverFromWaiting() throws ServerLifeCycleException {
if (serverStatus != ServerStatus.WAITING) {
throw new ServerLifeCycleException("The current server status is not waiting, cannot recover form waiting");
}
serverStatus = ServerStatus.RUNNING;
}
public static synchronized boolean toStopped() {
if (serverStatus == ServerStatus.STOPPED) {
return false;
}
serverStatus = ServerStatus.STOPPED;
return true;
}
}

45
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/lifecycle/ServerStatus.java

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.lifecycle;
/**
* This enum is used to represent the server status, include master/worker.
*/
public enum ServerStatus {
RUNNING(0, "The current server is running"),
WAITING(1, "The current server is waiting, this means it cannot work"),
STOPPED(2, "The current server is stopped"),
;
private final int code;
private final String desc;
ServerStatus(int code, String desc) {
this.code = code;
this.desc = desc;
}
public int getCode() {
return code;
}
public String getDesc() {
return desc;
}
}

58
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java

@ -1,58 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.thread;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.experimental.UtilityClass;
/**
* If the process closes, a signal is placed as true, and all threads get this flag to stop working.
*/
@UtilityClass
public class Stopper {
private static final AtomicBoolean stoppedSignal = new AtomicBoolean(false);
/**
* Return the flag if the Server is stopped.
*
* @return True, if the server is stopped; False, the server is still running.
*/
public static boolean isStopped() {
return stoppedSignal.get();
}
/**
* Return the flag if the Server is stopped.
*
* @return True, if the server is running, False, the server is stopped.
*/
public static boolean isRunning() {
return !stoppedSignal.get();
}
/**
* Stop the server
*
* @return True, if the server stopped success. False, if the server is already stopped.
*/
public static boolean stop() {
return stoppedSignal.compareAndSet(false, true);
}
}

16
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
@ -29,9 +29,6 @@ import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import javax.annotation.PostConstruct;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,11 +39,14 @@ import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.annotation.PostConstruct;
@SpringBootApplication
@ComponentScan("org.apache.dolphinscheduler")
@EnableTransactionManagement
@EnableCaching
public class MasterServer implements IStoppable {
private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);
@Autowired
@ -90,7 +90,6 @@ public class MasterServer implements IStoppable {
this.taskPluginManager.loadPlugin();
// self tolerant
this.masterRegistryClient.init();
this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this);
@ -103,7 +102,7 @@ public class MasterServer implements IStoppable {
this.schedulerApi.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
if (!ServerLifeCycleManager.isStopped()) {
close("MasterServer shutdownHook");
}
}));
@ -117,13 +116,14 @@ public class MasterServer implements IStoppable {
public void close(String cause) {
// set stop signal is true
// execute only once
if (!Stopper.stop()) {
if (!ServerLifeCycleManager.toStopped()) {
logger.warn("MasterServer is already stopped, current cause: {}", cause);
return;
}
// thread sleep 3 seconds for thread quietly stop
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
try (SchedulerApi closedSchedulerApi = schedulerApi;
try (
SchedulerApi closedSchedulerApi = schedulerApi;
MasterSchedulerBootstrap closedSchedulerBootstrap = masterSchedulerBootstrap;
MasterRPCServer closedRpcServer = masterRPCServer;
MasterRegistryClient closedMasterRegistryClient = masterRegistryClient;

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java vendored

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.cache;
import lombok.NonNull;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import java.util.Collection;
@ -65,4 +66,6 @@ public interface ProcessInstanceExecCacheManager {
* @return all WorkflowExecuteThread in cache
*/
Collection<WorkflowExecuteRunnable> getAll();
void clearCache();
}

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java vendored

@ -38,7 +38,8 @@ import lombok.NonNull;
@Component
public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecCacheManager {
private final ConcurrentHashMap<Integer, WorkflowExecuteRunnable> processInstanceExecMaps = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, WorkflowExecuteRunnable> processInstanceExecMaps =
new ConcurrentHashMap<>();
@PostConstruct
public void registerMetrics() {
@ -69,4 +70,9 @@ public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecC
public Collection<WorkflowExecuteRunnable> getAll() {
return ImmutableList.copyOf(processInstanceExecMaps.values());
}
@Override
public void clearCache() {
processInstanceExecMaps.clear();
}
}

53
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -17,26 +17,32 @@
package org.apache.dolphinscheduler.server.master.config;
import lombok.Data;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.Errors;
import org.springframework.validation.Validator;
import org.springframework.validation.annotation.Validated;
import lombok.Data;
import java.time.Duration;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
@Data
@Validated
@Configuration
@ConfigurationProperties(prefix = "master")
public class MasterConfig implements Validator {
private Logger logger = LoggerFactory.getLogger(MasterConfig.class);
/**
* The master RPC server listen port.
*/
@ -67,10 +73,6 @@ public class MasterConfig implements Validator {
* Master heart beat task execute interval.
*/
private Duration heartbeatInterval = Duration.ofSeconds(10);
/**
* Master heart beat task error threshold, if the continuous error count exceed this count, the master will close.
*/
private int heartbeatErrorThreshold = 5;
/**
* task submit max retry times.
*/
@ -87,11 +89,14 @@ public class MasterConfig implements Validator {
private double reservedMemory = 0.3;
private Duration failoverInterval = Duration.ofMinutes(10);
private boolean killYarnJobWhenTaskFailover = true;
/**
* ip:listenPort
*/
private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties();
// ip:listenPort
private String masterAddress;
// /nodes/master/ip:listenPort
private String masterRegistryNodePath;
@Override
public boolean supports(Class<?> clazz) {
return MasterConfig.class.isAssignableFrom(clazz);
@ -133,9 +138,29 @@ public class MasterConfig implements Validator {
if (masterConfig.getMaxCpuLoadAvg() <= 0) {
masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
if (masterConfig.getHeartbeatErrorThreshold() <= 0) {
errors.rejectValue("heartbeat-error-threshold", null, "should be a positive value");
}
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
masterConfig
.setMasterRegistryNodePath(REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterConfig.getMasterAddress());
printConfig();
}
private void printConfig() {
logger.info("Master config: listenPort -> {} ", listenPort);
logger.info("Master config: fetchCommandNum -> {} ", fetchCommandNum);
logger.info("Master config: preExecThreads -> {} ", preExecThreads);
logger.info("Master config: execThreads -> {} ", execThreads);
logger.info("Master config: dispatchTaskNumber -> {} ", dispatchTaskNumber);
logger.info("Master config: hostSelector -> {} ", hostSelector);
logger.info("Master config: heartbeatInterval -> {} ", heartbeatInterval);
logger.info("Master config: taskCommitRetryTimes -> {} ", taskCommitRetryTimes);
logger.info("Master config: taskCommitInterval -> {} ", taskCommitInterval);
logger.info("Master config: stateWheelInterval -> {} ", stateWheelInterval);
logger.info("Master config: maxCpuLoadAvg -> {} ", maxCpuLoadAvg);
logger.info("Master config: reservedMemory -> {} ", reservedMemory);
logger.info("Master config: failoverInterval -> {} ", failoverInterval);
logger.info("Master config: killYarnJobWhenTaskFailover -> {} ", killYarnJobWhenTaskFailover);
logger.info("Master config: registryDisconnectStrategy -> {} ", registryDisconnectStrategy);
logger.info("Master config: masterAddress -> {} ", masterAddress);
logger.info("Master config: masterRegistryNodePath -> {} ", masterRegistryNodePath);
}
}

28
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -17,9 +17,10 @@
package org.apache.dolphinscheduler.server.master.consumer;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -39,9 +40,12 @@ import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.apache.commons.collections.CollectionUtils;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -51,13 +55,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* TaskUpdateQueue consumer
*/
@ -116,7 +113,8 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
@PostConstruct
public void init() {
this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());
this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils
.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());
logger.info("Task priority queue consume thread staring");
super.start();
logger.info("Task priority queue consume thread started");
@ -125,7 +123,7 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
@Override
public void run() {
int fetchTaskNum = masterConfig.getDispatchTaskNumber();
while (Stopper.isRunning()) {
while (!ServerLifeCycleManager.isStopped()) {
try {
List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);
@ -205,7 +203,8 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
TaskInstance taskInstance = taskInstanceOptional.get();
TaskExecutionContext context = taskPriority.getTaskExecutionContext();
ExecutionContext executionContext =
new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup(), taskInstance);
new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup(),
taskInstance);
if (isTaskNeedToCheck(taskPriority)) {
if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
@ -236,7 +235,8 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
* add dispatch event
*/
private void addDispatchEvent(TaskExecutionContext context, ExecutionContext executionContext) {
TaskEvent taskEvent = TaskEvent.newDispatchEvent(context.getProcessInstanceId(), context.getTaskInstanceId(), executionContext.getHost().getAddress());
TaskEvent taskEvent = TaskEvent.newDispatchEvent(context.getProcessInstanceId(), context.getTaskInstanceId(),
executionContext.getHost().getAddress());
taskEventService.addEvent(taskEvent);
}

12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java

@ -61,15 +61,9 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
@Autowired
private ServerNodeManager serverNodeManager;
@Autowired
private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
@Autowired
private TaskKillResponseProcessor taskKillResponseProcessor;
@Autowired
private TaskExecuteResponseProcessor taskExecuteResponseProcessor;
@Autowired
private TaskRecallProcessor taskRecallProcessor;
@ -115,7 +109,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
doExecute(host, command);
success = true;
context.setHost(host);
// We set the host to taskInstance to avoid when the worker down, this taskInstance may not be failovered, due to the taskInstance's host
// We set the host to taskInstance to avoid when the worker down, this taskInstance may not be
// failovered, due to the taskInstance's host
// is not belongs to the down worker ISSUE-10842.
context.getTaskInstance().setHost(host.getAddress());
} catch (ExecuteException ex) {
@ -197,7 +192,4 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
return nodes;
}
public NettyRemotingClient getNettyRemotingClient() {
return nettyRemotingClient;
}
}

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventQueue.java

@ -45,4 +45,7 @@ public class WorkflowEventQueue {
return workflowEventQueue.take();
}
public void clearWorkflowEventQueue() {
workflowEventQueue.clear();
}
}

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java

@ -18,8 +18,8 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@ -112,7 +112,7 @@ public class StateEventResponseService {
@Override
public void run() {
logger.info("State event loop service started");
while (Stopper.isRunning()) {
while (!ServerLifeCycleManager.isStopped()) {
try {
// if not task , blocking here
StateEvent stateEvent = eventQueue.take();

21
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java

@ -18,23 +18,21 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* task manager
*/
@ -105,13 +103,14 @@ public class TaskEventService {
* Dispatch event to target task runnable.
*/
class TaskEventDispatchThread extends BaseDaemonThread {
protected TaskEventDispatchThread() {
super("TaskEventLoopThread");
}
@Override
public void run() {
while (Stopper.isRunning()) {
while (!ServerLifeCycleManager.isStopped()) {
try {
// if not task event, blocking here
TaskEvent taskEvent = eventQueue.take();
@ -139,7 +138,7 @@ public class TaskEventService {
@Override
public void run() {
logger.info("event handler thread started");
while (Stopper.isRunning()) {
while (!ServerLifeCycleManager.isStopped()) {
try {
taskExecuteThreadPool.eventHandler();
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);

24
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectStrategy.java

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.registry.api.ConnectStrategy;
public interface MasterConnectStrategy extends ConnectStrategy {
}

27
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java

@ -17,44 +17,45 @@
package org.apache.dolphinscheduler.server.master.registry;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.NonNull;
public class MasterConnectionStateListener implements ConnectionListener {
private static final Logger logger = LoggerFactory.getLogger(MasterConnectionStateListener.class);
private final String masterNodePath;
private final MasterConfig masterConfig;
private final RegistryClient registryClient;
private final MasterConnectStrategy masterConnectStrategy;
public MasterConnectionStateListener(@NonNull String masterNodePath, @NonNull RegistryClient registryClient) {
this.masterNodePath = masterNodePath;
public MasterConnectionStateListener(@NonNull MasterConfig masterConfig,
@NonNull RegistryClient registryClient,
@NonNull MasterConnectStrategy masterConnectStrategy) {
this.masterConfig = masterConfig;
this.registryClient = registryClient;
this.masterConnectStrategy = masterConnectStrategy;
}
@Override
public void onUpdate(ConnectionState state) {
logger.info("Master received a {} event from registry, the current server state is {}", state,
ServerLifeCycleManager.getServerStatus());
switch (state) {
case CONNECTED:
logger.debug("registry connection state is {}", state);
break;
case SUSPENDED:
logger.warn("registry connection state is {}, ready to retry connection", state);
break;
case RECONNECTED:
logger.debug("registry connection state is {}, clean the node info", state);
registryClient.remove(masterNodePath);
registryClient.persistEphemeral(masterNodePath, "");
masterConnectStrategy.reconnect();
break;
case DISCONNECTED:
logger.warn("registry connection state is {}, ready to stop myself", state);
registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself");
masterConnectStrategy.disconnect();
break;
default:
}

88
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -17,11 +17,8 @@
package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.apache.dolphinscheduler.common.Constants;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@ -32,21 +29,18 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.service.FailoverService;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.collect.Sets;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
/**
* <p>DolphinScheduler master register client, used to connect to registry and hand the registry events.
@ -55,48 +49,36 @@ import com.google.common.collect.Sets;
@Component
public class MasterRegistryClient implements AutoCloseable {
/**
* logger
*/
private static final Logger logger = LoggerFactory.getLogger(MasterRegistryClient.class);
/**
* failover service
*/
@Autowired
private FailoverService failoverService;
@Autowired
private RegistryClient registryClient;
/**
* master config
*/
@Autowired
private MasterConfig masterConfig;
/**
* heartbeat executor
*/
@Autowired
private MasterConnectStrategy masterConnectStrategy;
private ScheduledExecutorService heartBeatExecutor;
/**
* master startup time, ms
*/
private long startupTime;
private String masterAddress;
public void init() {
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
this.startupTime = System.currentTimeMillis();
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
}
public void start() {
try {
this.startupTime = System.currentTimeMillis();
this.heartBeatExecutor =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
// master registry
registry();
registryClient.addConnectionStateListener(new MasterConnectionStateListener(getCurrentNodePath(), registryClient));
registryClient.addConnectionStateListener(
new MasterConnectionStateListener(masterConfig, registryClient, masterConnectStrategy));
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
} catch (Exception e) {
throw new RegistryException("Master registry client start up error", e);
@ -137,11 +119,8 @@ public class MasterRegistryClient implements AutoCloseable {
try {
if (!registryClient.exists(path)) {
logger.info("path: {} not exists", path);
// handle dead server
registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP);
}
//failover server
// failover server
if (failover) {
failoverService.failoverServerWhenDown(serverHost, nodeType);
}
@ -169,11 +148,9 @@ public class MasterRegistryClient implements AutoCloseable {
}
if (!registryClient.exists(path)) {
logger.info("path: {} not exists", path);
// handle dead server
registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP);
}
}
//failover server
// failover server
if (failover) {
failoverService.failoverServerWhenDown(serverHost, nodeType);
}
@ -186,16 +163,14 @@ public class MasterRegistryClient implements AutoCloseable {
* Registry the current master server itself to registry.
*/
void registry() {
logger.info("Master node : {} registering to registry center", masterAddress);
String localNodePath = getCurrentNodePath();
logger.info("Master node : {} registering to registry center", masterConfig.getMasterAddress());
String localNodePath = masterConfig.getMasterRegistryNodePath();
Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
masterConfig.getMaxCpuLoadAvg(),
masterConfig.getReservedMemory(),
Sets.newHashSet(localNodePath),
Constants.MASTER_TYPE,
registryClient,
masterConfig.getHeartbeatErrorThreshold());
registryClient);
// remove before persist
registryClient.remove(localNodePath);
@ -209,19 +184,17 @@ public class MasterRegistryClient implements AutoCloseable {
// sleep 1s, waiting master failover remove
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
// delete dead server
registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval.getSeconds(), TimeUnit.SECONDS);
logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s", masterAddress, masterHeartbeatInterval);
this.heartBeatExecutor.scheduleWithFixedDelay(heartBeatTask, 0L, masterHeartbeatInterval.getSeconds(),
TimeUnit.SECONDS);
logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s",
masterConfig.getMasterAddress(), masterHeartbeatInterval);
}
public void deregister() {
try {
String localNodePath = getCurrentNodePath();
registryClient.remove(localNodePath);
logger.info("Master node : {} unRegistry to register center.", masterAddress);
registryClient.remove(masterConfig.getMasterRegistryNodePath());
logger.info("Master node : {} unRegistry to register center.", masterConfig.getMasterAddress());
heartBeatExecutor.shutdown();
logger.info("MasterServer heartbeat executor shutdown");
registryClient.close();
@ -230,11 +203,4 @@ public class MasterRegistryClient implements AutoCloseable {
}
}
/**
* get master path
*/
private String getCurrentNodePath() {
return REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterAddress;
}
}

58
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.registry.api.StrategyType;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
/**
* This strategy will stop the master server, when disconnected from {@link org.apache.dolphinscheduler.registry.api.Registry}.
*/
@Service
@ConditionalOnProperty(prefix = "master.registry-disconnect-strategy", name = "strategy", havingValue = "stop", matchIfMissing = true)
public class MasterStopStrategy implements MasterConnectStrategy {
private final Logger logger = LoggerFactory.getLogger(MasterStopStrategy.class);
@Autowired
private RegistryClient registryClient;
@Autowired
private MasterConfig masterConfig;
@Override
public void disconnect() {
registryClient.getStoppable()
.stop("Master disconnected from registry, will stop myself due to the stop strategy");
}
@Override
public void reconnect() {
logger.warn("The current connect strategy is stop, so the master will not reconnect to registry");
}
@Override
public StrategyType getStrategyType() {
return StrategyType.STOP;
}
}

134
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.lifecycle.ServerStatus;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.StrategyType;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import java.time.Duration;
/**
* This strategy will change the server status to {@link ServerStatus#WAITING} when disconnect from {@link Registry}.
*/
@Service
@ConditionalOnProperty(prefix = "master.registry-disconnect-strategy", name = "strategy", havingValue = "waiting")
public class MasterWaitingStrategy implements MasterConnectStrategy {
private final Logger logger = LoggerFactory.getLogger(MasterWaitingStrategy.class);
@Autowired
private MasterConfig masterConfig;
@Autowired
private RegistryClient registryClient;
@Autowired
private MasterRPCServer masterRPCServer;
@Autowired
private WorkflowEventQueue workflowEventQueue;
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Autowired
private StateWheelExecuteThread stateWheelExecuteThread;
@Override
public void disconnect() {
try {
ServerLifeCycleManager.toWaiting();
// todo: clear the current resource
clearMasterResource();
Duration maxWaitingTime = masterConfig.getRegistryDisconnectStrategy().getMaxWaitingTime();
try {
logger.info("Master disconnect from registry will try to reconnect in {} s",
maxWaitingTime.getSeconds());
registryClient.connectUntilTimeout(maxWaitingTime);
} catch (RegistryException ex) {
throw new ServerLifeCycleException(
String.format("Waiting to reconnect to registry in %s failed", maxWaitingTime), ex);
}
} catch (ServerLifeCycleException e) {
String errorMessage = String.format(
"Disconnect from registry and change the current status to waiting error, the current server state is %s, will stop the current server",
ServerLifeCycleManager.getServerStatus());
logger.error(errorMessage, e);
registryClient.getStoppable().stop(errorMessage);
} catch (RegistryException ex) {
String errorMessage = "Disconnect from registry and waiting to reconnect failed, will stop the server";
logger.error(errorMessage, ex);
registryClient.getStoppable().stop(errorMessage);
} catch (Exception ex) {
String errorMessage = "Disconnect from registry and get an unknown exception, will stop the server";
logger.error(errorMessage, ex);
registryClient.getStoppable().stop(errorMessage);
}
}
@Override
public void reconnect() {
try {
ServerLifeCycleManager.recoverFromWaiting();
reStartMasterResource();
// reopen the resource
logger.info("Recover from waiting success, the current server status is {}",
ServerLifeCycleManager.getServerStatus());
} catch (Exception e) {
String errorMessage =
String.format("Recover from waiting failed, the current server status is %s, will stop the server",
ServerLifeCycleManager.getServerStatus());
logger.error(errorMessage, e);
registryClient.getStoppable().stop(errorMessage);
}
}
@Override
public StrategyType getStrategyType() {
return StrategyType.WAITING;
}
private void clearMasterResource() {
// close the worker resource, if close failed should stop the worker server
masterRPCServer.close();
logger.warn("Master closed RPC server due to lost registry connection");
workflowEventQueue.clearWorkflowEventQueue();
logger.warn("Master clear workflow event queue due to lost registry connection");
processInstanceExecCacheManager.clearCache();
logger.warn("Master clear process instance cache due to lost registry connection");
stateWheelExecuteThread.clearAllTasks();
logger.warn("Master clear all state wheel task due to lost registry connection");
}
private void reStartMasterResource() {
// reopen the resource, if reopen failed should stop the worker server
masterRPCServer.start();
logger.warn("Master restarted RPC server due to reconnect to registry");
}
}

9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java

@ -82,8 +82,8 @@ public class MasterRPCServer implements AutoCloseable {
@Autowired
private TaskExecuteStartProcessor taskExecuteStartProcessor;
@PostConstruct
private void init() {
public void start() {
logger.info("Starting Master RPC Server...");
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
@ -106,11 +106,6 @@ public class MasterRPCServer implements AutoCloseable {
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.start();
}
public void start() {
logger.info("Starting Master RPC Server...");
this.nettyRemotingServer.start();
logger.info("Started Master RPC Server...");
}

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java

@ -18,8 +18,8 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.cache.StreamTaskInstanceExecCacheManager;
@ -61,7 +61,7 @@ public class EventExecuteService extends BaseDaemonThread {
@Override
public void run() {
while (Stopper.isRunning()) {
while (!ServerLifeCycleManager.isStopped()) {
try {
workflowEventHandler();
streamTaskEventHandler();

9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java

@ -18,13 +18,11 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.service.FailoverService;
import org.apache.dolphinscheduler.server.master.service.MasterFailoverService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -60,8 +58,11 @@ public class FailoverExecuteThread extends BaseDaemonThread {
// when startup, wait 10s for ready
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 10);
while (Stopper.isRunning()) {
while (!ServerLifeCycleManager.isStopped()) {
try {
if (!ServerLifeCycleManager.isRunning()) {
continue;
}
// todo: DO we need to schedule a task to do this kind of check
// This kind of check may only need to be executed when a master server start
masterFailoverService.checkMasterFailover();

50
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java

@ -17,10 +17,11 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.SlotCheckState;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
@ -40,8 +41,10 @@ import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collections;
@ -49,11 +52,6 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
*/
@ -104,7 +102,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
* constructor of MasterSchedulerService
*/
public void init() {
this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils
.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
}
@ -127,8 +126,12 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
*/
@Override
public void run() {
while (Stopper.isRunning()) {
while (!ServerLifeCycleManager.isStopped()) {
try {
if (!ServerLifeCycleManager.isRunning()) {
// the current server is not at running status, cannot consume command.
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
// todo: if the workflow event queue is much, we need to handle the back pressure
boolean isOverload =
OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
@ -156,7 +159,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
if (processInstanceExecCacheManager.contains(processInstance.getId())) {
logger.error("The workflow instance is already been cached, this case shouldn't be happened");
logger.error(
"The workflow instance is already been cached, this case shouldn't be happened");
}
WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,
processService,
@ -186,24 +190,28 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
private List<ProcessInstance> command2ProcessInstance(List<Command> commands) throws InterruptedException {
long commandTransformStartTime = System.currentTimeMillis();
logger.info("Master schedule bootstrap transforming command to ProcessInstance, commandSize: {}", commands.size());
logger.info("Master schedule bootstrap transforming command to ProcessInstance, commandSize: {}",
commands.size());
List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));
CountDownLatch latch = new CountDownLatch(commands.size());
for (final Command command : commands) {
masterPrepareExecService.execute(() -> {
try {
// Note: this check is not safe, the slot may change after command transform.
// We use the database transaction in `handleCommand` so that we can guarantee the command will always be executed
// We use the database transaction in `handleCommand` so that we can guarantee the command will
// always be executed
// by only one master
SlotCheckState slotCheckState = slotCheck(command);
if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) {
logger.info("Master handle command {} skip, slot check state: {}", command.getId(), slotCheckState);
logger.info("Master handle command {} skip, slot check state: {}", command.getId(),
slotCheckState);
return;
}
ProcessInstance processInstance = processService.handleCommand(masterAddress, command);
if (processInstance != null) {
processInstances.add(processInstance);
logger.info("Master handle command {} end, create process instance {}", command.getId(), processInstance.getId());
logger.info("Master handle command {} end, create process instance {}", command.getId(),
processInstance.getId());
}
} catch (Exception e) {
logger.error("Master handle command {} error ", command.getId(), e);
@ -216,9 +224,11 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
// make sure to finish handling command each time before next scan
latch.await();
logger.info("Master schedule bootstrap transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}",
logger.info(
"Master schedule bootstrap transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}",
commands.size(), processInstances.size());
ProcessInstanceMetrics.recordProcessInstanceGenerateTime(System.currentTimeMillis() - commandTransformStartTime);
ProcessInstanceMetrics
.recordProcessInstanceGenerateTime(System.currentTimeMillis() - commandTransformStartTime);
return processInstances;
}
@ -233,9 +243,11 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
}
int pageNumber = 0;
int pageSize = masterConfig.getFetchCommandNum();
final List<Command> result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
final List<Command> result =
processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
if (CollectionUtils.isNotEmpty(result)) {
logger.info("Master schedule bootstrap loop command success, command size: {}, current slot: {}, total slot size: {}",
logger.info(
"Master schedule bootstrap loop command success, command size: {}, current slot: {}, total slot size: {}",
result.size(), thisMasterSlot, masterCount);
}
ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime);

11
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -21,9 +21,9 @@ import lombok.NonNull;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -97,7 +97,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
@Override
public void run() {
final long checkInterval = masterConfig.getStateWheelInterval().toMillis();
while (Stopper.isRunning()) {
while (!ServerLifeCycleManager.isStopped()) {
try {
checkTask4Timeout();
checkTask4Retry();
@ -235,6 +235,13 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
logger.info("Removed task instance from state check list");
}
public void clearAllTasks() {
processInstanceTimeoutCheckList.clear();
taskInstanceTimeoutCheckList.clear();
taskInstanceRetryCheckList.clear();
taskInstanceStateCheckList.clear();
}
private void checkTask4Timeout() {
if (taskInstanceTimeoutCheckList.isEmpty()) {
return;

19
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java

@ -18,8 +18,8 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.master.event.WorkflowEvent;
@ -28,18 +28,16 @@ import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandleExcept
import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandler;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class WorkflowEventLooper extends BaseDaemonThread {
@ -59,7 +57,8 @@ public class WorkflowEventLooper extends BaseDaemonThread {
@PostConstruct
public void init() {
workflowEventHandlerList.forEach(workflowEventHandler -> workflowEventHandlerMap.put(workflowEventHandler.getHandleWorkflowEventType(),
workflowEventHandlerList.forEach(
workflowEventHandler -> workflowEventHandlerMap.put(workflowEventHandler.getHandleWorkflowEventType(),
workflowEventHandler));
}
@ -72,7 +71,7 @@ public class WorkflowEventLooper extends BaseDaemonThread {
public void run() {
WorkflowEvent workflowEvent = null;
while (Stopper.isRunning()) {
while (!ServerLifeCycleManager.isStopped()) {
try {
workflowEvent = workflowEventQueue.poolEvent();
LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId());

7
dolphinscheduler-master/src/main/resources/application.yaml

@ -98,8 +98,6 @@ master:
host-selector: lower_weight
# master heartbeat interval
heartbeat-interval: 10s
# Master heart beat task error threshold, if the continuous error count exceed this count, the master will close.
heartbeat-error-threshold: 5
# master commit task retry times
task-commit-retry-times: 5
# master commit task interval
@ -113,6 +111,11 @@ master:
failover-interval: 10m
# kill yarn jon when failover taskInstance, default true
kill-yarn-job-when-task-failover: true
registry-disconnect-strategy:
# The disconnect strategy: stop, waiting
strategy: waiting
# The max waiting time to reconnect to registry if you set the strategy to waiting
max-waiting-time: 100s
server:
port: 5679

13
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -19,9 +19,9 @@ package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@ -35,11 +35,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameter
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.time.Duration;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -47,6 +42,10 @@ import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.context.ApplicationContext;
import java.time.Duration;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* DependentTaskTest
*/
@ -298,7 +297,7 @@ public class DependentTaskTest {
@Test
public void testWaitAndCancel() {
// for the poor independence of UT, error on other place may causes the condition happens
if (!Stopper.isRunning()) {
if (!ServerLifeCycleManager.isRunning()) {
return;
}

8
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java

@ -18,9 +18,9 @@
package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -50,7 +50,7 @@ import java.util.ArrayList;
import java.util.List;
@RunWith(PowerMockRunner.class)
@PrepareForTest({Stopper.class})
@PrepareForTest({ServerLifeCycleManager.class})
public class SubProcessTaskTest {
/**
@ -73,8 +73,8 @@ public class SubProcessTaskTest {
config.setTaskCommitRetryTimes(3);
config.setTaskCommitInterval(Duration.ofSeconds(1));
PowerMockito.mockStatic(Stopper.class);
PowerMockito.when(Stopper.isRunning()).thenReturn(true);
PowerMockito.mockStatic(ServerLifeCycleManager.class);
PowerMockito.when(ServerLifeCycleManager.isStopped()).thenReturn(false);
processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);

4
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.master.consumer;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -347,7 +347,7 @@ public class TaskPriorityQueueConsumerTest {
@After
public void close() {
Stopper.stop();
ServerLifeCycleManager.toStopped();
}
}

4
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java

@ -66,11 +66,11 @@ public class ExecutorDispatcherTest {
nettyRemotingServer.start();
//
workerConfig.setListenPort(port);
workerRegistryClient.registry();
workerRegistryClient.start();
ExecutionContext executionContext = ExecutionContextTestUtils.getExecutionContext(port);
executorDispatcher.dispatch(executionContext);
workerRegistryClient.unRegistry();
workerRegistryClient.close();
}
}

9
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java

@ -80,7 +80,6 @@ public class MasterRegistryClientTest {
given(registryClient.getStoppable()).willReturn(cause -> {
});
doNothing().when(registryClient).handleDeadServer(Mockito.anySet(), Mockito.any(NodeType.class), Mockito.anyString());
ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient);
ProcessInstance processInstance = new ProcessInstance();
@ -89,13 +88,15 @@ public class MasterRegistryClientTest {
processInstance.setRestartTime(new Date());
processInstance.setHistoryCmd("xxx");
processInstance.setCommandType(CommandType.STOP);
given(processService.queryNeedFailoverProcessInstances(Mockito.anyString())).willReturn(Arrays.asList(processInstance));
given(processService.queryNeedFailoverProcessInstances(Mockito.anyString()))
.willReturn(Arrays.asList(processInstance));
doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class));
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setStartTime(new Date());
taskInstance.setHost("127.0.0.1:8080");
given(processService.queryNeedFailoverTaskInstances(Mockito.anyString())).willReturn(Arrays.asList(taskInstance));
given(processService.queryNeedFailoverTaskInstances(Mockito.anyString()))
.willReturn(Arrays.asList(taskInstance));
given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance);
given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any())).willReturn(true);
Server server = new Server();
@ -115,7 +116,7 @@ public class MasterRegistryClientTest {
public void removeNodePathTest() {
masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, false);
masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, true);
//Cannot mock static methods
// Cannot mock static methods
masterRegistryClient.removeWorkerNodePath("/path", NodeType.WORKER, true);
}
}

2
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java

@ -129,8 +129,6 @@ public class FailoverServiceTest {
given(registryClient.getStoppable()).willReturn(cause -> {
});
given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any())).willReturn(true);
doNothing().when(registryClient).handleDeadServer(Mockito.anySet(), Mockito.any(NodeType.class),
Mockito.anyString());
processInstance = new ProcessInstance();
processInstance.setId(1);

31
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategy.java

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.registry.api;
/**
* This interface defined a method to be executed when the server disconnected from registry.
*/
public interface ConnectStrategy {
void disconnect();
void reconnect();
StrategyType getStrategyType();
}

31
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategyProperties.java

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.registry.api;
import lombok.Data;
import java.time.Duration;
@Data
public class ConnectStrategyProperties {
private StrategyType strategy = StrategyType.STOP;
private Duration maxWaitingTime = Duration.ofSeconds(0);
}

11
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java

@ -19,11 +19,22 @@
package org.apache.dolphinscheduler.registry.api;
import lombok.NonNull;
import java.io.Closeable;
import java.time.Duration;
import java.util.Collection;
public interface Registry extends Closeable {
/**
* Connect to the registry, will wait in the given timeout
*
* @param timeout max timeout, if timeout <= 0 will wait indefinitely.
* @throws RegistryException cannot connect in the given timeout
*/
void connectUntilTimeout(@NonNull Duration timeout) throws RegistryException;
boolean subscribe(String path, SubscribeListener listener);
void unsubscribe(String path);

25
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/StrategyType.java

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.registry.api;
public enum StrategyType {
STOP,
WAITING,
;
}

40
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java

@ -17,25 +17,24 @@
package org.apache.dolphinscheduler.plugin.registry.mysql;
import lombok.NonNull;
import org.apache.dolphinscheduler.plugin.registry.mysql.task.EphemeralDateManager;
import org.apache.dolphinscheduler.plugin.registry.mysql.task.RegistryLockManager;
import org.apache.dolphinscheduler.plugin.registry.mysql.task.SubscribeDataManager;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collection;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collection;
/**
* This is one of the implementation of {@link Registry}, with this implementation, you need to rely on mysql database to
@ -47,6 +46,7 @@ public class MysqlRegistry implements Registry {
private static Logger LOGGER = LoggerFactory.getLogger(MysqlRegistry.class);
private final MysqlRegistryProperties mysqlRegistryProperties;
private final EphemeralDateManager ephemeralDateManager;
private final SubscribeDataManager subscribeDataManager;
private final RegistryLockManager registryLockManager;
@ -56,6 +56,7 @@ public class MysqlRegistry implements Registry {
this.mysqlOperator = new MysqlOperator(mysqlRegistryProperties);
mysqlOperator.clearExpireLock();
mysqlOperator.clearExpireEphemeralDate();
this.mysqlRegistryProperties = mysqlRegistryProperties;
this.ephemeralDateManager = new EphemeralDateManager(mysqlRegistryProperties, mysqlOperator);
this.subscribeDataManager = new SubscribeDataManager(mysqlRegistryProperties, mysqlOperator);
this.registryLockManager = new RegistryLockManager(mysqlRegistryProperties, mysqlOperator);
@ -72,6 +73,27 @@ public class MysqlRegistry implements Registry {
LOGGER.info("Started Mysql Registry...");
}
@Override
public void connectUntilTimeout(@NonNull Duration timeout) throws RegistryException {
long beginTimeMillis = System.currentTimeMillis();
long endTimeMills = timeout.getSeconds() <= 0 ? Long.MAX_VALUE : beginTimeMillis + timeout.toMillis();
while (true) {
if (System.currentTimeMillis() > endTimeMills) {
throw new RegistryException(
String.format("Cannot connect to mysql registry in %s s", timeout.getSeconds()));
}
if (ephemeralDateManager.getConnectionState() == ConnectionState.CONNECTED) {
return;
}
try {
Thread.sleep(mysqlRegistryProperties.getTermRefreshInterval().toMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RegistryException("Cannot connect to mysql registry due to interrupted exception", e);
}
}
}
@Override
public boolean subscribe(String path, SubscribeListener listener) {
// new a schedule thread to query the path, if the path
@ -156,12 +178,12 @@ public class MysqlRegistry implements Registry {
return true;
}
@Override
public void close() {
LOGGER.info("Closing Mysql Registry...");
// remove the current Ephemeral node, if can connect to mysql
try (EphemeralDateManager closed1 = ephemeralDateManager;
try (
EphemeralDateManager closed1 = ephemeralDateManager;
SubscribeDataManager close2 = subscribeDataManager;
RegistryLockManager close3 = registryLockManager;
MysqlOperator closed4 = mysqlOperator) {

9
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java

@ -46,6 +46,7 @@ public class EphemeralDateManager implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(EphemeralDateManager.class);
private ConnectionState connectionState;
private final MysqlOperator mysqlOperator;
private final MysqlRegistryProperties registryProperties;
private final List<ConnectionListener> connectionListeners = Collections.synchronizedList(new ArrayList<>());
@ -78,6 +79,10 @@ public class EphemeralDateManager implements AutoCloseable {
return ephemeralId;
}
public ConnectionState getConnectionState() {
return connectionState;
}
@Override
public void close() throws SQLException {
ephemeralDateIds.clear();
@ -89,11 +94,11 @@ public class EphemeralDateManager implements AutoCloseable {
}
// Use this task to refresh ephemeral term and check the connect state.
static class EphemeralDateTermRefreshTask implements Runnable {
class EphemeralDateTermRefreshTask implements Runnable {
private final List<ConnectionListener> connectionListeners;
private final Set<Long> ephemeralDateIds;
private final MysqlOperator mysqlOperator;
private ConnectionState connectionState;
private EphemeralDateTermRefreshTask(MysqlOperator mysqlOperator,
List<ConnectionListener> connectionListeners,

45
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java

@ -17,14 +17,8 @@
package org.apache.dolphinscheduler.plugin.registry.zookeeper;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import com.google.common.base.Strings;
import lombok.NonNull;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
@ -34,11 +28,19 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Comparator;
@ -47,16 +49,12 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import com.google.common.base.Strings;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@Component
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "zookeeper")
public final class ZookeeperRegistry implements Registry {
private final ZookeeperRegistryProperties.ZookeeperProperties properties;
private final CuratorFramework client;
@ -90,6 +88,7 @@ public final class ZookeeperRegistry implements Registry {
private void buildDigest(CuratorFrameworkFactory.Builder builder, String digest) {
builder.authorization("digest", digest.getBytes(StandardCharsets.UTF_8))
.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
@ -112,6 +111,7 @@ public final class ZookeeperRegistry implements Registry {
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RegistryException("Zookeeper registry start failed", e);
}
}
@ -120,6 +120,22 @@ public final class ZookeeperRegistry implements Registry {
client.getConnectionStateListenable().addListener(new ZookeeperConnectionStateListener(listener));
}
@Override
public void connectUntilTimeout(@NonNull Duration timeout) throws RegistryException {
try {
if (!client.blockUntilConnected((int) timeout.toMillis(), MILLISECONDS)) {
throw new RegistryException(
String.format("Cannot connect to the Zookeeper registry in %s s", timeout.getSeconds()));
}
} catch (RegistryException e) {
throw e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RegistryException(
String.format("Cannot connect to the Zookeeper registry in %s s", timeout.getSeconds()), e);
}
}
@Override
public boolean subscribe(String path, SubscribeListener listener) {
final TreeCache treeCache = treeCacheMap.computeIfAbsent(path, $ -> new TreeCache(client, path));
@ -239,6 +255,7 @@ public final class ZookeeperRegistry implements Registry {
}
static final class EventAdaptor extends Event {
public EventAdaptor(TreeCacheEvent event, String key) {
key(key);

34
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java

@ -17,15 +17,15 @@
package org.apache.dolphinscheduler.server.registry;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Heart beat task
*/
@ -36,25 +36,18 @@ public class HeartBeatTask implements Runnable {
private final Set<String> heartBeatPaths;
private final RegistryClient registryClient;
private int workerWaitingTaskCount;
private final String serverType;
private final HeartBeat heartBeat;
private final int heartBeatErrorThreshold;
private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
public HeartBeatTask(long startupTime,
double maxCpuloadAvg,
double reservedMemory,
Set<String> heartBeatPaths,
String serverType,
RegistryClient registryClient,
int heartBeatErrorThreshold) {
RegistryClient registryClient) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
this.serverType = serverType;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory);
this.heartBeatErrorThreshold = heartBeatErrorThreshold;
}
public HeartBeatTask(long startupTime,
@ -62,17 +55,13 @@ public class HeartBeatTask implements Runnable {
double reservedMemory,
int hostWeight,
Set<String> heartBeatPaths,
String serverType,
RegistryClient registryClient,
int workerThreadCount,
int workerWaitingTaskCount,
int heartBeatErrorThreshold) {
int workerWaitingTaskCount) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
this.workerWaitingTaskCount = workerWaitingTaskCount;
this.serverType = serverType;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory, hostWeight, workerThreadCount);
this.heartBeatErrorThreshold = heartBeatErrorThreshold;
}
public String getHeartBeatInfo() {
@ -82,14 +71,9 @@ public class HeartBeatTask implements Runnable {
@Override
public void run() {
try {
// check dead or not in zookeeper
for (String heartBeatPath : heartBeatPaths) {
if (registryClient.checkIsDeadServer(heartBeatPath, serverType)) {
registryClient.getStoppable().stop("i was judged to death, release resources and stop myself");
if (!ServerLifeCycleManager.isRunning()) {
return;
}
}
// update waiting task count
heartBeat.setWorkerWaitingTaskCount(workerWaitingTaskCount);
@ -98,11 +82,7 @@ public class HeartBeatTask implements Runnable {
}
heartBeatErrorTimes.set(0);
} catch (Throwable ex) {
logger.error("HeartBeat task execute failed", ex);
if (heartBeatErrorTimes.incrementAndGet() >= heartBeatErrorThreshold) {
registryClient.getStoppable()
.stop("HeartBeat task connect to zk failed too much times: " + heartBeatErrorTimes);
}
logger.error("HeartBeat task execute failed, errorTimes: {}", heartBeatErrorTimes.get(), ex);
}
}
}

55
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CronUtils.java

@ -17,26 +17,21 @@
package org.apache.dolphinscheduler.service.cron;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.COMMA;
import static org.apache.dolphinscheduler.service.cron.CycleFactory.day;
import static org.apache.dolphinscheduler.service.cron.CycleFactory.hour;
import static org.apache.dolphinscheduler.service.cron.CycleFactory.min;
import static org.apache.dolphinscheduler.service.cron.CycleFactory.month;
import static org.apache.dolphinscheduler.service.cron.CycleFactory.week;
import static org.apache.dolphinscheduler.service.cron.CycleFactory.year;
import static com.cronutils.model.CronType.QUARTZ;
import com.cronutils.model.Cron;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;
import lombok.NonNull;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.ZoneId;
import java.time.ZonedDateTime;
@ -49,29 +44,30 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cronutils.model.Cron;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;
import lombok.NonNull;
import static com.cronutils.model.CronType.QUARTZ;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.COMMA;
import static org.apache.dolphinscheduler.service.cron.CycleFactory.day;
import static org.apache.dolphinscheduler.service.cron.CycleFactory.hour;
import static org.apache.dolphinscheduler.service.cron.CycleFactory.min;
import static org.apache.dolphinscheduler.service.cron.CycleFactory.month;
import static org.apache.dolphinscheduler.service.cron.CycleFactory.week;
import static org.apache.dolphinscheduler.service.cron.CycleFactory.year;
/**
* // todo: this utils is heavy, it rely on quartz and corn-utils.
* cron utils
*/
public class CronUtils {
private CronUtils() {
throw new IllegalStateException("CronUtils class");
}
private static final Logger logger = LoggerFactory.getLogger(CronUtils.class);
private static final CronParser QUARTZ_CRON_PARSER = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(QUARTZ));
private static final CronParser QUARTZ_CRON_PARSER =
new CronParser(CronDefinitionBuilder.instanceDefinitionFor(QUARTZ));
/**
* parse to cron
@ -94,7 +90,8 @@ public class CronUtils {
* @return CycleEnum
*/
public static CycleEnum getMaxCycle(Cron cron) {
return min(cron).addCycle(hour(cron)).addCycle(day(cron)).addCycle(week(cron)).addCycle(month(cron)).addCycle(year(cron)).getCycle();
return min(cron).addCycle(hour(cron)).addCycle(day(cron)).addCycle(week(cron)).addCycle(month(cron))
.addCycle(year(cron)).getCycle();
}
/**
@ -126,7 +123,6 @@ public class CronUtils {
}
}
public static List<ZonedDateTime> getFireDateList(@NonNull ZonedDateTime startTime,
@NonNull ZonedDateTime endTime,
@NonNull String cron) throws CronParseException {
@ -147,7 +143,7 @@ public class CronUtils {
List<ZonedDateTime> dateList = new ArrayList<>();
ExecutionTime executionTime = ExecutionTime.forCron(cron);
while (Stopper.isRunning()) {
while (!ServerLifeCycleManager.isStopped()) {
Optional<ZonedDateTime> nextExecutionTimeOptional = executionTime.nextExecution(startTime);
if (!nextExecutionTimeOptional.isPresent()) {
break;
@ -208,8 +204,7 @@ public class CronUtils {
*/
public static List<ZonedDateTime> getSelfFireDateList(@NonNull final ZonedDateTime startTime,
@NonNull final ZonedDateTime endTime,
@NonNull final List<Schedule> schedules)
throws CronParseException {
@NonNull final List<Schedule> schedules) throws CronParseException {
List<ZonedDateTime> result = new ArrayList<>();
if (startTime.equals(endTime)) {
result.add(startTime);

81
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java

@ -17,19 +17,8 @@
package org.apache.dolphinscheduler.service.registry;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.dolphinscheduler.common.Constants.ADD_OP;
import static org.apache.dolphinscheduler.common.Constants.COLON;
import static org.apache.dolphinscheduler.common.Constants.DELETE_OP;
import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
import static org.apache.dolphinscheduler.common.Constants.MASTER_TYPE;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE;
import com.google.common.base.Strings;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
@ -40,7 +29,11 @@ import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
@ -52,16 +45,16 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.google.common.base.Strings;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.dolphinscheduler.common.Constants.COLON;
import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
@Component
public class RegistryClient {
private static final Logger logger = LoggerFactory.getLogger(RegistryClient.class);
private static final String EMPTY = "";
@ -78,6 +71,10 @@ public class RegistryClient {
initNodes();
}
public void connectUntilTimeout(@NonNull Duration duration) throws RegistryException {
registry.connectUntilTimeout(duration);
}
public int getActiveMasterNum() {
Collection<String> childrenList = new ArrayList<>();
try {
@ -146,31 +143,6 @@ public class RegistryClient {
.anyMatch(it -> it.contains(host));
}
public void handleDeadServer(Collection<String> nodes, NodeType nodeType, String opType) {
nodes.forEach(node -> {
final String host = getHostByEventDataPath(node);
final String type = nodeType == NodeType.MASTER ? MASTER_TYPE : WORKER_TYPE;
if (opType.equals(DELETE_OP)) {
removeDeadServerByHost(host, type);
} else if (opType.equals(ADD_OP)) {
String deadServerPath = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + type + UNDERLINE + host;
// Add dead server info to zk dead server path : /dead-servers/
registry.put(deadServerPath, type + UNDERLINE + host, false);
logger.info("{} server dead , and {} added to zk dead server path success", nodeType, node);
}
});
}
public boolean checkIsDeadServer(String node, String serverType) {
// ip_sequence_no
String[] zNodesPath = node.split("/");
String ipSeqNo = zNodesPath[zNodesPath.length - 1];
String deadServerPath = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + serverType + UNDERLINE + ipSeqNo;
return !exists(node) || exists(deadServerPath);
}
public Collection<String> getMasterNodesDirectly() {
return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_MASTERS);
}
@ -271,7 +243,6 @@ public class RegistryClient {
private void initNodes() {
registry.put(REGISTRY_DOLPHINSCHEDULER_MASTERS, EMPTY, false);
registry.put(REGISTRY_DOLPHINSCHEDULER_WORKERS, EMPTY, false);
registry.put(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, EMPTY, false);
}
private String rootNodePath(NodeType type) {
@ -280,8 +251,6 @@ public class RegistryClient {
return Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
case WORKER:
return Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
case DEAD_SERVER:
return Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
default:
throw new IllegalStateException("Should not reach here");
}
@ -293,21 +262,9 @@ public class RegistryClient {
if (nodeType != NodeType.WORKER) {
return serverList;
}
return serverList.stream().flatMap(group ->
getChildrenKeys(path + SINGLE_SLASH + group)
return serverList.stream().flatMap(group -> getChildrenKeys(path + SINGLE_SLASH + group)
.stream()
.map(it -> group + SINGLE_SLASH + it)
).collect(Collectors.toList());
.map(it -> group + SINGLE_SLASH + it)).collect(Collectors.toList());
}
private void removeDeadServerByHost(String host, String serverType) {
Collection<String> deadServers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS);
for (String serverPath : deadServers) {
if (serverPath.startsWith(serverType + UNDERLINE + host)) {
String server = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + serverPath;
remove(server);
logger.info("{} server {} deleted from zk dead server path success", serverType, host);
}
}
}
}

35
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -17,9 +17,10 @@
package org.apache.dolphinscheduler.server.worker;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
@ -33,13 +34,6 @@ import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.collections4.CollectionUtils;
import java.util.Collection;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -49,16 +43,17 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.annotation.PostConstruct;
import java.util.Collection;
@SpringBootApplication
@EnableTransactionManagement
@ComponentScan(basePackages = "org.apache.dolphinscheduler",
excludeFilters = {
@ComponentScan(basePackages = "org.apache.dolphinscheduler", excludeFilters = {
@ComponentScan.Filter(type = FilterType.REGEX, pattern = {
"org.apache.dolphinscheduler.service.process.*",
"org.apache.dolphinscheduler.service.queue.*",
})
}
)
})
public class WorkerServer implements IStoppable {
/**
@ -116,9 +111,8 @@ public class WorkerServer implements IStoppable {
this.workerRpcClient.start();
this.taskPluginManager.loadPlugin();
this.workerRegistryClient.registry();
this.workerRegistryClient.setRegistryStoppable(this);
this.workerRegistryClient.handleDeadServer();
this.workerRegistryClient.start();
this.workerManagerThread.start();
@ -128,20 +122,21 @@ public class WorkerServer implements IStoppable {
* registry hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
if (!ServerLifeCycleManager.isStopped()) {
close("WorkerServer shutdown hook");
}
}));
}
public void close(String cause) {
if (!Stopper.stop()) {
if (!ServerLifeCycleManager.toStopped()) {
logger.warn("WorkerServer is already stopped, current cause: {}", cause);
return;
}
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
try (WorkerRpcServer closedWorkerRpcServer = workerRpcServer;
try (
WorkerRpcServer closedWorkerRpcServer = workerRpcServer;
WorkerRegistryClient closedRegistryClient = workerRegistryClient;
AlertClientService closedAlertClientService = alertClientService;
SpringApplicationContext closedSpringContext = springApplicationContext;) {
@ -173,7 +168,8 @@ public class WorkerServer implements IStoppable {
for (TaskExecutionContext taskRequest : taskRequests) {
// kill task when it's not finished yet
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(), taskRequest.getTaskInstanceId());
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(),
taskRequest.getTaskInstanceId());
if (ProcessUtils.kill(taskRequest)) {
killNumber++;
}
@ -181,6 +177,7 @@ public class WorkerServer implements IStoppable {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
logger.info("Worker after kill all cache task, task size: {}, killed number: {}", taskRequests.size(), killNumber);
logger.info("Worker after kill all cache task, task size: {}, killed number: {}", taskRequests.size(),
killNumber);
}
}

43
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -17,33 +17,32 @@
package org.apache.dolphinscheduler.server.worker.config;
import com.google.common.collect.Sets;
import lombok.Data;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import java.time.Duration;
import java.util.Set;
import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.Errors;
import org.springframework.validation.Validator;
import org.springframework.validation.annotation.Validated;
import com.google.common.collect.Sets;
import lombok.Data;
import java.time.Duration;
import java.util.Set;
@Data
@Validated
@Configuration
@ConfigurationProperties(prefix = "worker")
public class WorkerConfig implements Validator {
private Logger logger = LoggerFactory.getLogger(WorkerConfig.class);
private int listenPort = 1234;
private int execThreads = 10;
private Duration heartbeatInterval = Duration.ofSeconds(10);
/**
* Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close.
*/
private int heartbeatErrorThreshold = 5;
private int hostWeight = 100;
private boolean tenantAutoCreate = true;
private boolean tenantDistributedUser = false;
@ -52,6 +51,8 @@ public class WorkerConfig implements Validator {
private Set<String> groups = Sets.newHashSet("default");
private String alertListenHost = "localhost";
private int alertListenPort = 50052;
private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties();
/**
* This field doesn't need to set at config file, it will be calculated by workerIp:listenPort
*/
@ -74,9 +75,23 @@ public class WorkerConfig implements Validator {
if (workerConfig.getMaxCpuLoadAvg() <= 0) {
workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
if (workerConfig.getHeartbeatErrorThreshold() <= 0) {
errors.rejectValue("heartbeat-error-threshold", null, "should be a positive value");
}
workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort()));
printConfig();
}
private void printConfig() {
logger.info("Worker config: listenPort -> {}", listenPort);
logger.info("Worker config: execThreads -> {}", execThreads);
logger.info("Worker config: heartbeatInterval -> {}", heartbeatInterval);
logger.info("Worker config: hostWeight -> {}", hostWeight);
logger.info("Worker config: tenantAutoCreate -> {}", tenantAutoCreate);
logger.info("Worker config: tenantDistributedUser -> {}", tenantDistributedUser);
logger.info("Worker config: maxCpuLoadAvg -> {}", maxCpuLoadAvg);
logger.info("Worker config: reservedMemory -> {}", reservedMemory);
logger.info("Worker config: groups -> {}", groups);
logger.info("Worker config: alertListenHost -> {}", alertListenHost);
logger.info("Worker config: alertListenPort -> {}", alertListenPort);
logger.info("Worker config: registryDisconnectStrategy -> {}", registryDisconnectStrategy);
logger.info("Worker config: workerAddress -> {}", registryDisconnectStrategy);
}
}

23
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java

@ -17,27 +17,24 @@
package org.apache.dolphinscheduler.server.worker.message;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.BaseCommand;
import org.apache.dolphinscheduler.remote.command.CommandType;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import lombok.NonNull;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class MessageRetryRunner extends BaseDaemonThread {
@ -99,7 +96,7 @@ public class MessageRetryRunner extends BaseDaemonThread {
}
public void run() {
while (Stopper.isRunning()) {
while (!ServerLifeCycleManager.isStopped()) {
try {
if (needToRetryMessages.isEmpty()) {
Thread.sleep(MESSAGE_RETRY_WINDOW);
@ -136,4 +133,8 @@ public class MessageRetryRunner extends BaseDaemonThread {
}
}
}
public void clearMessage() {
needToRetryMessages.clear();
}
}

24
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectStrategy.java

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.dolphinscheduler.registry.api.ConnectStrategy;
public interface WorkerConnectStrategy extends ConnectStrategy {
}

61
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListener.java

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.registry;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WorkerConnectionStateListener implements ConnectionListener {
private final Logger logger = LoggerFactory.getLogger(WorkerConnectionStateListener.class);
private final WorkerConfig workerConfig;
private final RegistryClient registryClient;
private final WorkerConnectStrategy workerConnectStrategy;
public WorkerConnectionStateListener(@NonNull WorkerConfig workerConfig,
@NonNull RegistryClient registryClient,
@NonNull WorkerConnectStrategy workerConnectStrategy) {
this.workerConfig = workerConfig;
this.registryClient = registryClient;
this.workerConnectStrategy = workerConnectStrategy;
}
@Override
public void onUpdate(ConnectionState state) {
logger.info("Worker received a {} event from registry, the current server state is {}", state,
ServerLifeCycleManager.getServerStatus());
switch (state) {
case CONNECTED:
break;
case SUSPENDED:
break;
case RECONNECTED:
workerConnectStrategy.reconnect();
break;
case DISCONNECTED:
workerConnectStrategy.disconnect();
default:
}
}
}

90
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java

@ -17,22 +17,25 @@
package org.apache.dolphinscheduler.server.worker.registry;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.Set;
import java.util.StringJoiner;
@ -40,15 +43,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
/**
* worker registry
@ -78,6 +76,9 @@ public class WorkerRegistryClient implements AutoCloseable {
@Autowired
private RegistryClient registryClient;
@Autowired
private WorkerConnectStrategy workerConnectStrategy;
/**
* worker startup time, ms
*/
@ -89,13 +90,24 @@ public class WorkerRegistryClient implements AutoCloseable {
public void initWorkRegistry() {
this.workerGroups = workerConfig.getGroups();
this.startupTime = System.currentTimeMillis();
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
this.heartBeatExecutor =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
}
public void start() {
try {
registry();
registryClient.addConnectionStateListener(
new WorkerConnectionStateListener(workerConfig, registryClient, workerConnectStrategy));
} catch (Exception ex) {
throw new RegistryException("Worker registry client start up error", ex);
}
}
/**
* registry
*/
public void registry() {
private void registry() {
String address = NetUtils.getAddr(workerConfig.getListenPort());
Set<String> workerZkPaths = getWorkerZkPaths();
long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds();
@ -105,11 +117,9 @@ public class WorkerRegistryClient implements AutoCloseable {
workerConfig.getReservedMemory(),
workerConfig.getHostWeight(),
workerZkPaths,
Constants.WORKER_TYPE,
registryClient,
workerConfig.getExecThreads(),
workerManagerThread.getThreadPoolQueueSize(),
workerConfig.getHeartbeatErrorThreshold());
workerManagerThread.getThreadPoolQueueSize());
for (String workerZKPath : workerZkPaths) {
// remove before persist
@ -125,37 +135,11 @@ public class WorkerRegistryClient implements AutoCloseable {
// sleep 1s, waiting master failover remove
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
// delete dead server
registryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
this.heartBeatExecutor.scheduleWithFixedDelay(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval,
TimeUnit.SECONDS);
logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval);
}
/**
* remove registry info
*/
public void unRegistry() throws IOException {
try {
String address = getLocalAddress();
Set<String> workerZkPaths = getWorkerZkPaths();
for (String workerZkPath : workerZkPaths) {
registryClient.remove(workerZkPath);
logger.info("worker node : {} unRegistry from ZK {}.", address, workerZkPath);
}
} catch (Exception ex) {
logger.error("remove worker zk path exception", ex);
}
if (heartBeatExecutor != null) {
heartBeatExecutor.shutdownNow();
logger.info("Heartbeat executor shutdown");
}
registryClient.close();
logger.info("registry client closed");
}
/**
* get worker path
*/
@ -177,11 +161,6 @@ public class WorkerRegistryClient implements AutoCloseable {
return workerPaths;
}
public void handleDeadServer() {
Set<String> workerZkPaths = getWorkerZkPaths();
registryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
}
/**
* get local address
*/
@ -195,7 +174,12 @@ public class WorkerRegistryClient implements AutoCloseable {
@Override
public void close() throws IOException {
unRegistry();
if (heartBeatExecutor != null) {
heartBeatExecutor.shutdownNow();
logger.info("Heartbeat executor shutdown");
}
registryClient.close();
logger.info("registry client closed");
}
}

55
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.dolphinscheduler.registry.api.StrategyType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
@Service
@ConditionalOnProperty(prefix = "worker.registry-disconnect-strategy", name = "strategy", havingValue = "stop", matchIfMissing = true)
public class WorkerStopStrategy implements WorkerConnectStrategy {
private final Logger logger = LoggerFactory.getLogger(WorkerStopStrategy.class);
@Autowired
public RegistryClient registryClient;
@Autowired
private WorkerConfig workerConfig;
@Override
public void disconnect() {
registryClient.getStoppable()
.stop("Worker disconnected from registry, will stop myself due to the stop strategy");
}
@Override
public void reconnect() {
logger.warn("The current connect strategy is stop, so the worker will not reconnect to registry");
}
@Override
public StrategyType getStrategyType() {
return StrategyType.STOP;
}
}

135
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.StrategyType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import java.time.Duration;
@Service
@ConditionalOnProperty(prefix = "worker.registry-disconnect-strategy", name = "strategy", havingValue = "waiting")
public class WorkerWaitingStrategy implements WorkerConnectStrategy {
private final Logger logger = LoggerFactory.getLogger(WorkerWaitingStrategy.class);
@Autowired
private WorkerConfig workerConfig;
@Autowired
private RegistryClient registryClient;
@Autowired
private WorkerRpcServer workerRpcServer;
@Autowired
private WorkerRpcClient workerRpcClient;
@Autowired
private MessageRetryRunner messageRetryRunner;
@Autowired
private WorkerManagerThread workerManagerThread;
@Override
public void disconnect() {
try {
ServerLifeCycleManager.toWaiting();
clearWorkerResource();
Duration maxWaitingTime = workerConfig.getRegistryDisconnectStrategy().getMaxWaitingTime();
try {
logger.info("Worker disconnect from registry will try to reconnect in {} s",
maxWaitingTime.getSeconds());
registryClient.connectUntilTimeout(maxWaitingTime);
} catch (RegistryException ex) {
throw new ServerLifeCycleException(
String.format("Waiting to reconnect to registry in %s failed", maxWaitingTime), ex);
}
} catch (ServerLifeCycleException e) {
String errorMessage = String.format(
"Disconnect from registry and change the current status to waiting error, the current server state is %s, will stop the current server",
ServerLifeCycleManager.getServerStatus());
logger.error(errorMessage, e);
registryClient.getStoppable().stop(errorMessage);
} catch (RegistryException ex) {
String errorMessage = "Disconnect from registry and waiting to reconnect failed, will stop the server";
logger.error(errorMessage, ex);
registryClient.getStoppable().stop(errorMessage);
} catch (Exception ex) {
String errorMessage = "Disconnect from registry and get an unknown exception, will stop the server";
logger.error(errorMessage, ex);
registryClient.getStoppable().stop(errorMessage);
}
}
@Override
public void reconnect() {
try {
ServerLifeCycleManager.recoverFromWaiting();
reStartWorkerResource();
logger.info("Recover from waiting success, the current server status is {}",
ServerLifeCycleManager.getServerStatus());
} catch (Exception e) {
String errorMessage =
String.format("Recover from waiting failed, the current server status is %s, will stop the server",
ServerLifeCycleManager.getServerStatus());
logger.error(errorMessage, e);
registryClient.getStoppable().stop(errorMessage);
}
}
@Override
public StrategyType getStrategyType() {
return StrategyType.WAITING;
}
private void clearWorkerResource() {
// close the worker resource, if close failed should stop the worker server
workerRpcServer.close();
logger.warn("Worker server close the RPC server due to lost connection from registry");
workerRpcClient.close();
logger.warn("Worker server close the RPC client due to lost connection from registry");
workerManagerThread.clearTask();
logger.warn("Worker server clear the tasks due to lost connection from registry");
messageRetryRunner.clearMessage();
logger.warn("Worker server clear the retry message due to lost connection from registry");
}
private void reStartWorkerResource() {
// reopen the resource, if reopen failed should stop the worker server
workerRpcServer.start();
logger.warn("Worker server restart PRC server due to reconnect to registry");
workerRpcClient.start();
logger.warn("Worker server restart PRC client due to reconnect to registry");
}
}

10
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@ -33,6 +34,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
public class WorkerExecService {
/**
* logger of WorkerExecService
*/
@ -50,7 +52,8 @@ public class WorkerExecService {
*/
private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap;
public WorkerExecService(ExecutorService execService, ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap) {
public WorkerExecService(ExecutorService execService,
ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap) {
this.execService = execService;
this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService);
this.taskExecuteThreadMap = taskExecuteThreadMap;
@ -61,6 +64,7 @@ public class WorkerExecService {
taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread);
ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread);
FutureCallback futureCallback = new FutureCallback() {
@Override
public void onSuccess(Object o) {
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
@ -87,4 +91,8 @@ public class WorkerExecService {
return ((ThreadPoolExecutor) this.execService).getQueue().size();
}
public Map<Integer, TaskExecuteThread> getTaskExecuteThreadMap() {
return taskExecuteThreadMap;
}
}

29
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java

@ -18,22 +18,21 @@
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
/**
* Manage tasks
*/
@ -73,8 +72,7 @@ public class WorkerManagerThread implements Runnable {
this.waitSubmitQueue = new DelayQueue<>();
workerExecService = new WorkerExecService(
ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()),
taskExecuteThreadMap
);
taskExecuteThreadMap);
}
public TaskExecuteThread getTaskExecuteThread(Integer taskInstanceId) {
@ -105,11 +103,11 @@ public class WorkerManagerThread implements Runnable {
*/
public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
waitSubmitQueue.stream()
.filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == taskInstanceId)
.filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext()
.getTaskInstanceId() == taskInstanceId)
.forEach(waitSubmitQueue::remove);
}
/**
* submit task
*
@ -140,8 +138,11 @@ public class WorkerManagerThread implements Runnable {
public void run() {
Thread.currentThread().setName("Worker-Execute-Manager-Thread");
TaskExecuteThread taskExecuteThread;
while (Stopper.isRunning()) {
while (!ServerLifeCycleManager.isStopped()) {
try {
if (!ServerLifeCycleManager.isRunning()) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
if (this.getThreadPoolQueueSize() <= workerExecThreads) {
taskExecuteThread = waitSubmitQueue.take();
workerExecService.submit(taskExecuteThread);
@ -157,4 +158,10 @@ public class WorkerManagerThread implements Runnable {
}
}
}
public void clearTask() {
waitSubmitQueue.clear();
workerExecService.getTaskExecuteThreadMap().values().forEach(TaskExecuteThread::kill);
workerExecService.getTaskExecuteThreadMap().clear();
}
}

7
dolphinscheduler-worker/src/main/resources/application.yaml

@ -60,8 +60,6 @@ worker:
exec-threads: 100
# worker heartbeat interval
heartbeat-interval: 10s
# Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close.
heartbeat-error-threshold: 5
# worker host weight to dispatch tasks, default value 100
host-weight: 100
# tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true.
@ -78,6 +76,11 @@ worker:
# alert server listen host
alert-listen-host: localhost
alert-listen-port: 50052
registry-disconnect-strategy:
# The disconnect strategy: stop, waiting
strategy: waiting
# The max waiting time to reconnect to registry if you set the strategy to waiting
max-waiting-time: 100s
server:
port: 1235

17
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java

@ -69,7 +69,10 @@ public class WorkerRegistryClientTest {
@Mock
private WorkerManagerThread workerManagerThread;
//private static final Set<String> workerGroups;
@Mock
private WorkerConnectStrategy workerConnectStrategy;
// private static final Set<String> workerGroups;
static {
// workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP);
@ -78,14 +81,15 @@ public class WorkerRegistryClientTest {
@Before
public void before() {
given(workerConfig.getGroups()).willReturn(Sets.newHashSet("127.0.0.1"));
//given(heartBeatExecutor.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1"));
//scheduleAtFixedRate
given(heartBeatExecutor.scheduleAtFixedRate(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(TimeUnit.class))).willReturn(null);
// given(heartBeatExecutor.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1"));
// scheduleAtFixedRate
given(heartBeatExecutor.scheduleAtFixedRate(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(),
Mockito.any(TimeUnit.class))).willReturn(null);
}
@Test
public void testRegistry() {
public void testStart() {
workerRegistryClient.initWorkRegistry();
given(workerManagerThread.getThreadPoolQueueSize()).willReturn(1);
@ -94,9 +98,8 @@ public class WorkerRegistryClientTest {
given(workerConfig.getHeartbeatInterval()).willReturn(Duration.ofSeconds(1));
workerRegistryClient.registry();
workerRegistryClient.start();
Mockito.verify(registryClient, Mockito.times(1)).handleDeadServer(Mockito.anyCollection(), Mockito.any(NodeType.class), Mockito.anyString());
}
@Test

Loading…
Cancel
Save