Browse Source

Support alert server HA (#13865)

* Support alert server cluster

* Remove WORKER_ALERT_LISTEN_HOST in statefulset-dolphinscheduler-worker.yaml
3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
41a8ba9aab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      deploy/docker/docker-compose.yml
  2. 2
      deploy/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml
  3. 4
      deploy/terraform/aws/dolphinscheduler-worker.tf
  4. 5
      dolphinscheduler-alert/dolphinscheduler-alert-server/pom.xml
  5. 59
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
  6. 68
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/config/AlertConfig.java
  7. 2
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java
  8. 58
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java
  9. 74
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java
  10. 57
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertRegistryClient.java
  11. 11
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRequestProcessor.java
  12. 60
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java
  13. 36
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java
  14. 15
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/application.yaml
  15. 51
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertPluginManagerTest.java
  16. 24
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertBootstrapServiceTest.java
  17. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java
  18. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java
  19. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
  20. 4
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
  21. 6
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java
  22. 8
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
  23. 6
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
  24. 23
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java
  25. 6
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java
  26. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  27. 11
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
  28. 13
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
  29. 18
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
  30. 7
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
  31. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
  32. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
  33. 8
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
  34. 16
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
  35. 67
      dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
  36. 38
      dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
  37. 4
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java
  38. 4
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
  39. 15
      dolphinscheduler-standalone-server/src/main/resources/application.yaml
  40. 4
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  41. 7
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
  42. 21
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
  43. 7
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java
  44. 10
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java
  45. 7
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java
  46. 6
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java
  47. 22
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
  48. 7
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java
  49. 3
      dolphinscheduler-worker/src/main/resources/application.yaml
  50. 5
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
  51. 9
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java

3
deploy/docker/docker-compose.yml

@ -98,6 +98,9 @@ services:
interval: 30s
timeout: 5s
retries: 3
depends_on:
dolphinscheduler-zookeeper:
condition: service_healthy
volumes:
- dolphinscheduler-logs:/opt/dolphinscheduler/logs
networks:

2
deploy/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml

@ -65,8 +65,6 @@ spec:
value: {{ .Values.timezone }}
- name: SPRING_JACKSON_TIME_ZONE
value: {{ .Values.timezone }}
- name: WORKER_ALERT_LISTEN_HOST
value: {{ include "dolphinscheduler.fullname" . }}-alert
{{- include "dolphinscheduler.database.env_vars" . | nindent 12 }}
{{- include "dolphinscheduler.registry.env_vars" . | nindent 12 }}
{{ range $key, $value := .Values.worker.env }}

4
deploy/terraform/aws/dolphinscheduler-worker.tf

@ -15,9 +15,6 @@
# specific language governing permissions and limitations
# under the License.
locals {
alert_server_ip = var.ds_component_replicas.alert > 0 ? aws_instance.alert[0].private_ip : aws_instance.standalone_server[0].private_ip
}
resource "aws_security_group" "worker" {
name = "worker_server_sg"
@ -60,7 +57,6 @@ data "template_file" "worker_user_data" {
"database_username" = aws_db_instance.database.username
"database_password" = aws_db_instance.database.password
"zookeeper_connect_string" = var.zookeeper_connect_string != "" ? var.zookeeper_connect_string : aws_instance.zookeeper[0].private_ip
"alert_server_host" = local.alert_server_ip
"s3_access_key_id" = aws_iam_access_key.s3.id
"s3_secret_access_key" = aws_iam_access_key.s3.secret
"s3_region" = var.aws_region

5
dolphinscheduler-alert/dolphinscheduler-alert-server/pom.xml

@ -46,6 +46,11 @@
<artifactId>dolphinscheduler-dao</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-all</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>

59
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java

@ -17,16 +17,13 @@
package org.apache.dolphinscheduler.alert;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
import org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.factory.NettyRemotingServerFactory;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import java.io.Closeable;
import java.util.List;
import javax.annotation.PreDestroy;
@ -42,18 +39,16 @@ import org.springframework.context.event.EventListener;
@SpringBootApplication
@ComponentScan("org.apache.dolphinscheduler")
@Slf4j
public class AlertServer implements Closeable {
public class AlertServer {
@Autowired
private PluginDao pluginDao;
private AlertBootstrapService alertBootstrapService;
@Autowired
private AlertSenderService alertSenderService;
private AlertRpcServer alertRpcServer;
@Autowired
private List<NettyRequestProcessor> nettyRequestProcessors;
private AlertPluginManager alertPluginManager;
@Autowired
private AlertConfig alertConfig;
private NettyRemotingServer nettyRemotingServer;
private AlertRegistryClient alertRegistryClient;
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_ALERT_SERVER);
@ -63,14 +58,13 @@ public class AlertServer implements Closeable {
@EventListener
public void run(ApplicationReadyEvent readyEvent) {
log.info("Alert server is staring ...");
checkTable();
startServer();
alertSenderService.start();
alertPluginManager.start();
alertRegistryClient.start();
alertBootstrapService.start();
alertRpcServer.start();
log.info("Alert server is started ...");
}
@Override
@PreDestroy
public void close() {
destroy("alert server destroy");
@ -90,33 +84,18 @@ public class AlertServer implements Closeable {
log.warn("AlterServer is already stopped");
return;
}
log.info("Alert server is stopping, cause: {}", cause);
try (
AlertRpcServer closedAlertRpcServer = alertRpcServer;
AlertBootstrapService closedAlertBootstrapService = alertBootstrapService;
AlertRegistryClient closedAlertRegistryClient = alertRegistryClient) {
// close resource
}
// thread sleep 3 seconds for thread quietly stop
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
// close
this.nettyRemotingServer.close();
log.info("Alter server stopped, cause: {}", cause);
} catch (Exception e) {
log.error("Alert server stop failed, cause: {}", cause, e);
}
}
protected void checkTable() {
if (!pluginDao.checkPluginDefineTableExist()) {
log.error("Plugin Define Table t_ds_plugin_define Not Exist . Please Create it First !");
System.exit(1);
}
}
protected void startServer() {
nettyRemotingServer = NettyRemotingServerFactory.buildNettyRemotingServer(alertConfig.getPort());
for (NettyRequestProcessor nettyRequestProcessor : nettyRequestProcessors) {
nettyRemotingServer.registerProcessor(nettyRequestProcessor);
log.info("Success register netty processor: {}", nettyRequestProcessor.getClass().getName());
}
nettyRemotingServer.start();
}
}

68
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/config/AlertConfig.java

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.config;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import java.time.Duration;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.springframework.validation.Errors;
import org.springframework.validation.Validator;
@Slf4j
@Data
@Component
@ConfigurationProperties("alert")
public final class AlertConfig implements Validator {
private int port;
private int waitTimeout;
private Duration heartbeatInterval = Duration.ofSeconds(60);
private String alertServerAddress;
@Override
public boolean supports(Class<?> clazz) {
return AlertConfig.class.isAssignableFrom(clazz);
}
@Override
public void validate(Object target, Errors errors) {
AlertConfig alertConfig = (AlertConfig) target;
if (heartbeatInterval.getSeconds() <= 0) {
errors.rejectValue("heartbeat-interval", null, "should be a valid duration");
}
alertConfig.setAlertServerAddress(NetUtils.getAddr(port));
printConfig();
}
private void printConfig() {
log.info("Alert config: port -> {}", port);
log.info("Alert config: alertServerAddress -> {}", alertServerAddress);
log.info("Alert config: heartbeatInterval -> {}", heartbeatInterval);
}
}

2
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServerMetrics.java → dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert;
package org.apache.dolphinscheduler.alert.metrics;
import java.util.function.Supplier;

58
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java → dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert;
package org.apache.dolphinscheduler.alert.plugin;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertChannelFactory;
@ -39,8 +39,6 @@ import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
@ -53,24 +51,32 @@ public final class AlertPluginManager {
this.pluginDao = pluginDao;
}
private final Map<Integer, AlertChannel> channelKeyedById = new HashMap<>();
private final Map<Integer, AlertChannel> alertPluginMap = new HashMap<>();
private final PluginParams warningTypeParams = getWarningTypeParams();
public void start() {
log.info("AlertPluginManager start ...");
checkAlertPluginExist();
installAlertPlugin();
log.info("AlertPluginManager started ...");
}
public PluginParams getWarningTypeParams() {
return RadioParam.newBuilder(AlertConstants.NAME_WARNING_TYPE, AlertConstants.WARNING_TYPE)
.addParamsOptions(
new ParamsOptions(WarningType.SUCCESS.getDescp(), WarningType.SUCCESS.getDescp(), false))
.addParamsOptions(
new ParamsOptions(WarningType.FAILURE.getDescp(), WarningType.FAILURE.getDescp(), false))
.addParamsOptions(new ParamsOptions(WarningType.ALL.getDescp(), WarningType.ALL.getDescp(), false))
.setValue(WarningType.ALL.getDescp())
.addValidate(Validate.newBuilder().setRequired(true).build())
.build();
public Optional<AlertChannel> getAlertChannel(int id) {
return Optional.ofNullable(alertPluginMap.get(id));
}
public int size() {
return alertPluginMap.size();
}
private void checkAlertPluginExist() {
if (!pluginDao.checkPluginDefineTableExist()) {
log.error("Plugin Define Table t_ds_plugin_define Not Exist . Please Create it First !");
System.exit(1);
}
}
@EventListener
public void installPlugin(ApplicationReadyEvent readyEvent) {
private void installAlertPlugin() {
final PluginParams warningTypeParams = getWarningTypeParams();
PrioritySPIFactory<AlertChannelFactory> prioritySPIFactory =
new PrioritySPIFactory<>(AlertChannelFactory.class);
@ -92,15 +98,19 @@ public final class AlertPluginManager {
final PluginDefine pluginDefine = new PluginDefine(name, PluginType.ALERT.getDesc(), paramsJson);
final int id = pluginDao.addOrUpdatePluginDefine(pluginDefine);
channelKeyedById.put(id, alertChannel);
alertPluginMap.put(id, alertChannel);
}
}
public Optional<AlertChannel> getAlertChannel(int id) {
return Optional.ofNullable(channelKeyedById.get(id));
}
public int size() {
return channelKeyedById.size();
private PluginParams getWarningTypeParams() {
return RadioParam.newBuilder(AlertConstants.NAME_WARNING_TYPE, AlertConstants.WARNING_TYPE)
.addParamsOptions(
new ParamsOptions(WarningType.SUCCESS.getDescp(), WarningType.SUCCESS.getDescp(), false))
.addParamsOptions(
new ParamsOptions(WarningType.FAILURE.getDescp(), WarningType.FAILURE.getDescp(), false))
.addParamsOptions(new ParamsOptions(WarningType.ALL.getDescp(), WarningType.ALL.getDescp(), false))
.setValue(WarningType.ALL.getDescp())
.addValidate(Validate.newBuilder().setRequired(true).build())
.build();
}
}

74
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.registry;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.common.model.AlertServerHeartBeat;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class AlertHeartbeatTask extends BaseHeartBeatTask<AlertServerHeartBeat> {
private final AlertConfig alertConfig;
private final Integer processId;
private final RegistryClient registryClient;
private final String heartBeatPath;
private final long startupTime;
public AlertHeartbeatTask(AlertConfig alertConfig,
RegistryClient registryClient) {
super("AlertHeartbeatTask", alertConfig.getHeartbeatInterval().toMillis());
this.startupTime = System.currentTimeMillis();
this.alertConfig = alertConfig;
this.registryClient = registryClient;
this.heartBeatPath =
RegistryNodeType.ALERT_SERVER.getRegistryPath() + "/" + alertConfig.getAlertServerAddress();
this.processId = OSUtils.getProcessID();
}
@Override
public AlertServerHeartBeat getHeartBeat() {
return AlertServerHeartBeat.builder()
.processId(processId)
.startupTime(startupTime)
.reportTime(System.currentTimeMillis())
.cpuUsage(OSUtils.cpuUsage())
.memoryUsage(OSUtils.memoryUsage())
.loadAverage(OSUtils.loadAverage())
.availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize())
.alertServerAddress(alertConfig.getAlertServerAddress())
.build();
}
@Override
public void writeHeartBeat(AlertServerHeartBeat heartBeat) {
String heartBeatJson = JSONUtils.toJsonString(heartBeat);
registryClient.persistEphemeral(heartBeatPath, heartBeatJson);
log.debug("Success write master heartBeatInfo into registry, masterRegistryPath: {}, heartBeatInfo: {}",
heartBeatPath, heartBeatJson);
}
}

57
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertRegistryClient.java

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.registry;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class AlertRegistryClient implements AutoCloseable {
@Autowired
private RegistryClient registryClient;
@Autowired
private AlertConfig alertConfig;
private AlertHeartbeatTask alertHeartbeatTask;
public void start() {
log.info("AlertRegistryClient starting...");
registryClient.getLock(RegistryNodeType.ALERT_LOCK.getRegistryPath());
alertHeartbeatTask = new AlertHeartbeatTask(alertConfig, registryClient);
alertHeartbeatTask.start();
// start heartbeat task
log.info("AlertRegistryClient started...");
}
@Override
public void close() {
log.info("AlertRegistryClient closing...");
alertHeartbeatTask.shutdown();
registryClient.releaseLock(RegistryNodeType.ALERT_LOCK.getRegistryPath());
log.info("AlertRegistryClient closed...");
}
}

11
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java → dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRequestProcessor.java

@ -15,8 +15,9 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert;
package org.apache.dolphinscheduler.alert.rpc;
import org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
import org.apache.dolphinscheduler.remote.command.Message;
import org.apache.dolphinscheduler.remote.command.MessageType;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequest;
@ -34,10 +35,10 @@ import io.netty.channel.Channel;
@Slf4j
public final class AlertRequestProcessor implements NettyRequestProcessor {
private final AlertSenderService alertSenderService;
private final AlertBootstrapService alertBootstrapService;
public AlertRequestProcessor(AlertSenderService alertSenderService) {
this.alertSenderService = alertSenderService;
public AlertRequestProcessor(AlertBootstrapService alertBootstrapService) {
this.alertBootstrapService = alertBootstrapService;
}
@Override
@ -46,7 +47,7 @@ public final class AlertRequestProcessor implements NettyRequestProcessor {
log.info("Received command : {}", alertSendRequest);
AlertSendResponse alertSendResponse = alertSenderService.syncHandler(
AlertSendResponse alertSendResponse = alertBootstrapService.syncHandler(
alertSendRequest.getGroupId(),
alertSendRequest.getTitle(),
alertSendRequest.getContent(),

60
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.rpc;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.factory.NettyRemotingServerFactory;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class AlertRpcServer implements AutoCloseable {
@Autowired
private List<NettyRequestProcessor> nettyRequestProcessors;
@Autowired
private AlertConfig alertConfig;
private NettyRemotingServer nettyRemotingServer;
public void start() {
log.info("Starting alert rpc server...");
nettyRemotingServer = NettyRemotingServerFactory.buildNettyRemotingServer(alertConfig.getPort());
for (NettyRequestProcessor nettyRequestProcessor : nettyRequestProcessors) {
nettyRemotingServer.registerProcessor(nettyRequestProcessor);
log.info("Success register netty processor: {}", nettyRequestProcessor.getClass().getName());
}
nettyRemotingServer.start();
log.info("Started alert rpc server...");
}
@Override
public void close() throws Exception {
log.info("Closing alert rpc server...");
nettyRemotingServer.close();
log.info("Closed alert rpc server...");
}
}

36
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java → dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java

@ -15,18 +15,22 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert;
package org.apache.dolphinscheduler.alert.service;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertConstants;
import org.apache.dolphinscheduler.alert.api.AlertData;
import org.apache.dolphinscheduler.alert.api.AlertInfo;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
@ -50,28 +54,24 @@ import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.google.common.collect.Lists;
@Service
@Slf4j
public final class AlertSenderService extends Thread {
public final class AlertBootstrapService extends BaseDaemonThread implements AutoCloseable {
private final AlertDao alertDao;
private final AlertPluginManager alertPluginManager;
private final AlertConfig alertConfig;
@Autowired
private AlertDao alertDao;
@Autowired
private AlertPluginManager alertPluginManager;
@Autowired
private AlertConfig alertConfig;
public AlertSenderService(AlertDao alertDao, AlertPluginManager alertPluginManager, AlertConfig alertConfig) {
this.alertDao = alertDao;
this.alertPluginManager = alertPluginManager;
this.alertConfig = alertConfig;
}
@Override
public synchronized void start() {
super.setName("AlertSenderService");
super.start();
public AlertBootstrapService() {
super("AlertBootstrapService");
}
@Override
@ -301,4 +301,10 @@ public final class AlertSenderService extends Thread {
return new AlertResult("false", e.getMessage());
}
}
@Override
public void close() {
log.info("Closed AlertBootstrapService...");
}
}

15
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/application.yaml

@ -64,6 +64,21 @@ alert:
# Mark each alert of alert server if late after x milliseconds as failed.
# Define value is (0 = infinite), and alert server would be waiting alert result.
wait-timeout: 0
heartbeat-interval: 60s
registry:
type: zookeeper
zookeeper:
namespace: dolphinscheduler
connect-string: localhost:2181
retry-policy:
base-sleep-time: 60ms
max-sleep: 300ms
max-retries: 5
session-timeout: 30s
connection-timeout: 9s
block-until-connected: 600ms
digest: ~
metrics:
enabled: true

51
dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertPluginManagerTest.java

@ -1,51 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert;
import static org.mockito.ArgumentMatchers.any;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class AlertPluginManagerTest {
@Mock
private PluginDao pluginDao;
@InjectMocks
private AlertPluginManager alertPluginManager;
@Test
public void testAlertPluginManager() {
Mockito.when(pluginDao.addOrUpdatePluginDefine(any())).thenReturn(0);
alertPluginManager.installPlugin(null);
Assertions.assertEquals(1, alertPluginManager.size());
Assertions.assertNotNull(alertPluginManager.getAlertChannel(0));
}
}

24
dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java → dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertBootstrapServiceTest.java

@ -20,11 +20,11 @@ package org.apache.dolphinscheduler.alert.runner;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.alert.AlertConfig;
import org.apache.dolphinscheduler.alert.AlertPluginManager;
import org.apache.dolphinscheduler.alert.AlertSenderService;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.PluginDao;
@ -47,9 +47,9 @@ import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AlertSenderServiceTest {
public class AlertBootstrapServiceTest {
private static final Logger logger = LoggerFactory.getLogger(AlertSenderServiceTest.class);
private static final Logger logger = LoggerFactory.getLogger(AlertBootstrapServiceTest.class);
@Mock
private AlertDao alertDao;
@ -61,7 +61,7 @@ public class AlertSenderServiceTest {
private AlertConfig alertConfig;
@InjectMocks
private AlertSenderService alertSenderService;
private AlertBootstrapService alertBootstrapService;
@BeforeEach
public void before() {
@ -80,7 +80,7 @@ public class AlertSenderServiceTest {
when(alertConfig.getWaitTimeout()).thenReturn(0);
AlertSendResponse alertSendResponse =
alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
alertBootstrapService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assertions.assertFalse(alertSendResponse.isSuccess());
alertSendResponse.getResResults().forEach(result -> logger
.info("alert send response result, status:{}, message:{}", result.isSuccess(), result.getMessage()));
@ -101,7 +101,7 @@ public class AlertSenderServiceTest {
when(pluginDao.getPluginDefineById(pluginDefineId)).thenReturn(pluginDefine);
alertSendResponse =
alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
alertBootstrapService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assertions.assertFalse(alertSendResponse.isSuccess());
alertSendResponse.getResResults().forEach(result -> logger
.info("alert send response result, status:{}, message:{}", result.isSuccess(), result.getMessage()));
@ -113,7 +113,7 @@ public class AlertSenderServiceTest {
when(alertConfig.getWaitTimeout()).thenReturn(0);
alertSendResponse =
alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
alertBootstrapService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assertions.assertFalse(alertSendResponse.isSuccess());
alertSendResponse.getResResults().forEach(result -> logger
.info("alert send response result, status:{}, message:{}", result.isSuccess(), result.getMessage()));
@ -126,7 +126,7 @@ public class AlertSenderServiceTest {
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
alertSendResponse =
alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
alertBootstrapService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assertions.assertFalse(alertSendResponse.isSuccess());
alertSendResponse.getResResults().forEach(result -> logger
.info("alert send response result, status:{}, message:{}", result.isSuccess(), result.getMessage()));
@ -140,7 +140,7 @@ public class AlertSenderServiceTest {
when(alertConfig.getWaitTimeout()).thenReturn(5000);
alertSendResponse =
alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
alertBootstrapService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assertions.assertTrue(alertSendResponse.isSuccess());
alertSendResponse.getResResults().forEach(result -> logger
.info("alert send response result, status:{}, message:{}", result.isSuccess(), result.getMessage()));
@ -184,6 +184,6 @@ public class AlertSenderServiceTest {
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
Assertions.assertTrue(Boolean.parseBoolean(alertResult.getStatus()));
when(alertDao.listInstanceByAlertGroupId(1)).thenReturn(new ArrayList<>());
alertSenderService.send(alertList);
alertBootstrapService.send(alertList);
}
}

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java

@ -19,9 +19,9 @@ package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
import org.apache.dolphinscheduler.api.service.MetricsCleanUpService;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.remote.command.workflow.WorkflowMetricsCleanUpRequest;
import org.apache.dolphinscheduler.remote.utils.Host;
@ -46,7 +46,7 @@ public class MetricsCleanUpServiceImpl implements MetricsCleanUpService {
public void cleanUpWorkflowMetricsByDefinitionCode(String workflowDefinitionCode) {
WorkflowMetricsCleanUpRequest workflowMetricsCleanUpRequest = new WorkflowMetricsCleanUpRequest();
workflowMetricsCleanUpRequest.setProcessDefinitionCode(workflowDefinitionCode);
List<Server> masterNodeList = registryClient.getServerList(NodeType.MASTER);
List<Server> masterNodeList = registryClient.getServerList(RegistryNodeType.MASTER);
for (Server server : masterNodeList) {
try {
final String host = String.format("%s:%s", server.getHost(), server.getPort());

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java

@ -20,13 +20,13 @@ package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerServerModel;
import org.apache.dolphinscheduler.dao.MonitorDBDao;
import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import java.util.HashMap;
import java.util.List;
@ -130,8 +130,8 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
@Override
public List<Server> getServerListFromRegistry(boolean isMaster) {
return isMaster
? registryClient.getServerList(NodeType.MASTER)
: registryClient.getServerList(NodeType.WORKER);
? registryClient.getServerList(RegistryNodeType.MASTER)
: registryClient.getServerList(RegistryNodeType.WORKER);
}
}

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java

@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -39,6 +38,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections4.CollectionUtils;
@ -187,7 +187,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
if (Strings.isNullOrEmpty(workerGroup.getAddrList())) {
return null;
}
Map<String, String> serverMaps = registryClient.getServerMaps(NodeType.WORKER);
Map<String, String> serverMaps = registryClient.getServerMaps(RegistryNodeType.WORKER);
for (String addr : workerGroup.getAddrList().split(Constants.COMMA)) {
if (!serverMaps.containsKey(addr)) {
return addr;
@ -296,7 +296,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
.anyMatch(workerGroup -> Constants.DEFAULT_WORKER_GROUP.equals(workerGroup.getName()));
if (!containDefaultWorkerGroups) {
// there doesn't exist a default WorkerGroup, we will add all worker to the default worker group.
Set<String> activeWorkerNodes = registryClient.getServerNodeSet(NodeType.WORKER);
Set<String> activeWorkerNodes = registryClient.getServerNodeSet(RegistryNodeType.WORKER);
WorkerGroup defaultWorkerGroup = new WorkerGroup();
defaultWorkerGroup.setName(Constants.DEFAULT_WORKER_GROUP);
defaultWorkerGroup.setAddrList(String.join(Constants.COMMA, activeWorkerNodes));
@ -363,7 +363,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
@Override
public Map<String, Object> getWorkerAddressList() {
Map<String, Object> result = new HashMap<>();
Set<String> serverNodeList = registryClient.getServerNodeSet(NodeType.WORKER);
Set<String> serverNodeList = registryClient.getServerNodeSet(RegistryNodeType.WORKER);
result.put(Constants.DATA_LIST, serverNodeList);
putMsg(result, Status.SUCCESS);
return result;

4
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java

@ -24,12 +24,12 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import java.util.HashMap;
import java.util.Map;
@ -66,7 +66,7 @@ public class WorkerGroupControllerTest extends AbstractControllerTest {
Map<String, String> serverMaps = new HashMap<>();
serverMaps.put("192.168.0.1", "192.168.0.1");
serverMaps.put("192.168.0.2", "192.168.0.2");
Mockito.when(registryClient.getServerMaps(NodeType.WORKER)).thenReturn(serverMaps);
Mockito.when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("name", "cxc_work_group");

6
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java

@ -24,13 +24,13 @@ import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.MonitorServiceImpl;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.MonitorDBDao;
import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections4.CollectionUtils;
@ -103,7 +103,7 @@ public class MonitorServiceTest {
@Test
public void testQueryMaster() {
mockPermissionCheck(ApiFuncIdentificationConstant.MONITOR_MASTER_VIEW, true);
Mockito.when(registryClient.getServerList(NodeType.MASTER)).thenReturn(getServerList());
Mockito.when(registryClient.getServerList(RegistryNodeType.MASTER)).thenReturn(getServerList());
Map<String, Object> result = monitorService.queryMaster(user);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
@ -115,7 +115,7 @@ public class MonitorServiceTest {
@Test
public void testQueryWorker() {
mockPermissionCheck(ApiFuncIdentificationConstant.MONITOR_WORKER_VIEW, true);
Mockito.when(registryClient.getServerList(NodeType.WORKER)).thenReturn(getServerList());
Mockito.when(registryClient.getServerList(RegistryNodeType.WORKER)).thenReturn(getServerList());
Map<String, Object> result = monitorService.queryWorker(user);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));

8
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java

@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.api.service.impl.WorkerGroupServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -38,6 +37,7 @@ import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapp
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
@ -155,7 +155,7 @@ public class WorkerGroupServiceTest {
Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
Map<String, String> serverMaps = new HashMap<>();
serverMaps.put("localhost1:0000", "");
Mockito.when(registryClient.getServerMaps(NodeType.WORKER)).thenReturn(serverMaps);
Mockito.when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps);
Map<String, Object> result =
workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", "");
@ -175,7 +175,7 @@ public class WorkerGroupServiceTest {
Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
Map<String, String> serverMaps = new HashMap<>();
serverMaps.put("localhost:0000", "");
Mockito.when(registryClient.getServerMaps(NodeType.WORKER)).thenReturn(serverMaps);
Mockito.when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps);
Mockito.when(workerGroupMapper.insert(any())).thenReturn(1);
Map<String, Object> result =
@ -197,7 +197,7 @@ public class WorkerGroupServiceTest {
Set<String> activeWorkerNodes = new HashSet<>();
activeWorkerNodes.add("localhost:12345");
activeWorkerNodes.add("localhost:23456");
Mockito.when(registryClient.getServerNodeSet(NodeType.WORKER)).thenReturn(activeWorkerNodes);
Mockito.when(registryClient.getServerNodeSet(RegistryNodeType.WORKER)).thenReturn(activeWorkerNodes);
Result result = workerGroupService.queryAllGroupPaging(loginUser, 1, 1, null);
Assertions.assertEquals(result.getCode(), Status.SUCCESS.getCode());

6
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java

@ -35,14 +35,8 @@ public final class Constants {
*/
public static final String COMMON_PROPERTIES_PATH = "/common.properties";
/**
* registry properties
*/
public static final String REGISTRY_DOLPHINSCHEDULER_MASTERS = "/nodes/master";
public static final String REGISTRY_DOLPHINSCHEDULER_WORKERS = "/nodes/worker";
public static final String REGISTRY_DOLPHINSCHEDULER_NODE = "/nodes";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS = "/lock/masters";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters";
public static final String FORMAT_SS = "%s%s";
public static final String FORMAT_S_S = "%s/%s";

23
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/NodeType.java → dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java

@ -15,8 +15,25 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
package org.apache.dolphinscheduler.common.model;
public enum NodeType {
MASTER, WORKER, DEAD_SERVER
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AlertServerHeartBeat implements HeartBeat {
private int processId;
private long startupTime;
private long reportTime;
private double cpuUsage;
private double memoryUsage;
private double loadAverage;
private double availablePhysicalMemorySize;
private String alertServerAddress;
}

6
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java

@ -39,9 +39,9 @@ public abstract class BaseHeartBeatTask<T> extends BaseDaemonThread {
@Override
public synchronized void start() {
log.info("Starting {}", threadName);
log.info("Starting {}...", threadName);
super.start();
log.info("Started {}, heartBeatInterval: {}", threadName, heartBeatInterval);
log.info("Started {}, heartBeatInterval: {}...", threadName, heartBeatInterval);
}
@Override
@ -68,8 +68,8 @@ public abstract class BaseHeartBeatTask<T> extends BaseDaemonThread {
}
public void shutdown() {
log.warn("{} task finished", threadName);
runningFlag = false;
log.warn("{} finished...", threadName);
}
private void handleInterruptException(InterruptedException ex) {

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -17,10 +17,9 @@
package org.apache.dolphinscheduler.server.master.config;
import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
@ -145,7 +144,8 @@ public class MasterConfig implements Validator {
}
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
masterConfig.setMasterRegistryPath(REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterConfig.getMasterAddress());
masterConfig.setMasterRegistryPath(
RegistryNodeType.MASTER.getRegistryPath() + "/" + masterConfig.getMasterAddress());
printConfig();
}

11
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -17,16 +17,15 @@
package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
import static org.apache.dolphinscheduler.common.constants.Constants.SLEEP_TIME_MILLIS;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.service.FailoverService;
import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask;
@ -67,7 +66,7 @@ public class MasterRegistryClient implements AutoCloseable {
registry();
registryClient.addConnectionStateListener(
new MasterConnectionStateListener(masterConfig, registryClient, masterConnectStrategy));
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
registryClient.subscribe(RegistryNodeType.ALL_SERVERS.getRegistryPath(), new MasterRegistryDataListener());
} catch (Exception e) {
throw new RegistryException("Master registry client start up error", e);
}
@ -90,7 +89,7 @@ public class MasterRegistryClient implements AutoCloseable {
* @param nodeType node type
* @param failover is failover
*/
public void removeMasterNodePath(String path, NodeType nodeType, boolean failover) {
public void removeMasterNodePath(String path, RegistryNodeType nodeType, boolean failover) {
log.info("{} node deleted : {}", nodeType, path);
if (StringUtils.isEmpty(path)) {
@ -124,7 +123,7 @@ public class MasterRegistryClient implements AutoCloseable {
* @param nodeType node type
* @param failover is failover
*/
public void removeWorkerNodePath(String path, NodeType nodeType, boolean failover) {
public void removeWorkerNodePath(String path, RegistryNodeType nodeType, boolean failover) {
log.info("{} node deleted : {}", nodeType, path);
try {
String serverHost = null;
@ -158,7 +157,7 @@ public class MasterRegistryClient implements AutoCloseable {
registryClient.remove(masterRegistryPath);
registryClient.persistEphemeral(masterRegistryPath, JSONUtils.toJsonString(masterHeartBeatTask.getHeartBeat()));
while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
while (!registryClient.checkNodeExists(NetUtils.getHost(), RegistryNodeType.MASTER)) {
log.warn("The current master server node:{} cannot find in registry", NetUtils.getHost());
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}

13
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java

@ -17,13 +17,10 @@
package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import lombok.extern.slf4j.Slf4j;
@ -46,9 +43,9 @@ public class MasterRegistryDataListener implements SubscribeListener {
return;
}
// monitor master
if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS + Constants.SINGLE_SLASH)) {
if (path.startsWith(RegistryNodeType.MASTER.getRegistryPath() + Constants.SINGLE_SLASH)) {
handleMasterEvent(event);
} else if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH)) {
} else if (path.startsWith(RegistryNodeType.WORKER.getRegistryPath() + Constants.SINGLE_SLASH)) {
// monitor worker
handleWorkerEvent(event);
}
@ -61,7 +58,7 @@ public class MasterRegistryDataListener implements SubscribeListener {
log.info("master node added : {}", path);
break;
case REMOVE:
masterRegistryClient.removeMasterNodePath(path, NodeType.MASTER, true);
masterRegistryClient.removeMasterNodePath(path, RegistryNodeType.MASTER, true);
break;
default:
@ -77,7 +74,7 @@ public class MasterRegistryDataListener implements SubscribeListener {
break;
case REMOVE:
log.info("worker node deleted : {}", path);
masterRegistryClient.removeWorkerNodePath(path, NodeType.WORKER, true);
masterRegistryClient.removeWorkerNodePath(path, RegistryNodeType.WORKER, true);
break;
default:
break;

18
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@ -17,11 +17,7 @@
package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -32,6 +28,7 @@ import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Event.Type;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
@ -140,11 +137,8 @@ public class ServerNodeManager implements InitializingBean {
masterConfig.getWorkerGroupRefreshInterval().getSeconds(),
TimeUnit.SECONDS);
// init MasterNodeListener listener
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener());
// init WorkerNodeListener listener
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener());
registryClient.subscribe(RegistryNodeType.MASTER.getRegistryPath(), new MasterDataListener());
registryClient.subscribe(RegistryNodeType.WORKER.getRegistryPath(), new WorkerDataListener());
}
class WorkerNodeInfoAndGroupDbSyncTask implements Runnable {
@ -238,11 +232,11 @@ public class ServerNodeManager implements InitializingBean {
currentSlot = 0;
totalSlot = 0;
this.masterNodes.clear();
String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
String nodeLock = RegistryNodeType.MASTER_NODE_LOCK.getRegistryPath();
try {
registryClient.getLock(nodeLock);
Collection<String> currentNodes = registryClient.getMasterNodesDirectly();
List<Server> masterNodeList = registryClient.getServerList(NodeType.MASTER);
List<Server> masterNodeList = registryClient.getServerList(RegistryNodeType.MASTER);
syncMasterNodes(currentNodes, masterNodeList);
} catch (Exception e) {
log.error("update master nodes error", e);
@ -255,7 +249,7 @@ public class ServerNodeManager implements InitializingBean {
private void updateWorkerNodes() {
workerGroupWriteLock.lock();
try {
Map<String, String> workerNodeMaps = registryClient.getServerMaps(NodeType.WORKER);
Map<String, String> workerNodeMaps = registryClient.getServerMaps(RegistryNodeType.WORKER);
for (Map.Entry<String, String> entry : workerNodeMaps.entrySet()) {
workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
}

7
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java

@ -17,16 +17,13 @@
package org.apache.dolphinscheduler.server.master.service;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* failover service
*/
@Component
@Slf4j
public class FailoverService {
@ -46,7 +43,7 @@ public class FailoverService {
* @param serverHost server host
* @param nodeType node type
*/
public void failoverServerWhenDown(String serverHost, NodeType nodeType) {
public void failoverServerWhenDown(String serverHost, RegistryNodeType nodeType) {
switch (nodeType) {
case MASTER:
log.info("Master failover starting, masterServer: {}", serverHost);

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java

@ -18,11 +18,11 @@
package org.apache.dolphinscheduler.server.master.service;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
@ -76,7 +76,8 @@ public class MasterFailoverService {
List<String> needFailoverMasterHosts = processService.queryNeedFailoverProcessInstanceHost()
.stream()
// failover myself || dead server
.filter(host -> localAddress.equals(host) || !registryClient.checkNodeExists(host, NodeType.MASTER))
.filter(host -> localAddress.equals(host)
|| !registryClient.checkNodeExists(host, RegistryNodeType.MASTER))
.distinct()
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(needFailoverMasterHosts)) {
@ -90,7 +91,7 @@ public class MasterFailoverService {
}
public void failoverMaster(String masterHost) {
String failoverPath = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + masterHost;
String failoverPath = RegistryNodeType.MASTER_FAILOVER_LOCK + "/" + masterHost;
try {
registryClient.getLock(failoverPath);
doFailoverMaster(masterHost);
@ -111,7 +112,8 @@ public class MasterFailoverService {
private void doFailoverMaster(@NonNull String masterHost) {
StopWatch failoverTimeCost = StopWatch.createStarted();
Optional<Date> masterStartupTimeOptional = getServerStartupTime(registryClient.getServerList(NodeType.MASTER),
Optional<Date> masterStartupTimeOptional =
getServerStartupTime(registryClient.getServerList(RegistryNodeType.MASTER),
masterHost);
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(
masterHost);

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.service;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -29,6 +28,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.master.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@ -105,7 +105,7 @@ public class WorkerFailoverService {
// we query the task instance from cache, so that we can directly update the cache
final Optional<Date> needFailoverWorkerStartTime =
getServerStartupTime(registryClient.getServerList(NodeType.WORKER), workerHost);
getServerStartupTime(registryClient.getServerList(RegistryNodeType.WORKER), workerHost);
final List<TaskInstance> needFailoverTaskInstanceList = getNeedFailoverTaskInstance(workerHost);
if (CollectionUtils.isEmpty(needFailoverTaskInstanceList)) {

8
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java

@ -20,11 +20,11 @@ package org.apache.dolphinscheduler.server.master.registry;
import static org.mockito.BDDMockito.given;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -94,9 +94,9 @@ public class MasterRegistryClientTest {
@Test
public void removeNodePathTest() {
masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, false);
masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, true);
masterRegistryClient.removeMasterNodePath("/path", RegistryNodeType.MASTER, false);
masterRegistryClient.removeMasterNodePath("/path", RegistryNodeType.MASTER, true);
// Cannot mock static methods
masterRegistryClient.removeWorkerNodePath("/path", NodeType.WORKER, true);
masterRegistryClient.removeWorkerNodePath("/path", RegistryNodeType.WORKER, true);
}
}

16
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java

@ -24,7 +24,6 @@ import static org.mockito.Mockito.doNothing;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -32,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
@ -166,8 +166,10 @@ public class FailoverServiceTest {
workerServer.setPort(workerPort);
workerServer.setCreateTime(new Date());
given(registryClient.getServerList(NodeType.WORKER)).willReturn(new ArrayList<>(Arrays.asList(workerServer)));
given(registryClient.getServerList(NodeType.MASTER)).willReturn(new ArrayList<>(Arrays.asList(masterServer)));
given(registryClient.getServerList(RegistryNodeType.WORKER))
.willReturn(new ArrayList<>(Arrays.asList(workerServer)));
given(registryClient.getServerList(RegistryNodeType.MASTER))
.willReturn(new ArrayList<>(Arrays.asList(masterServer)));
doNothing().when(workflowExecuteThreadPool).submitStateEvent(Mockito.any(StateEvent.class));
}
@ -176,17 +178,17 @@ public class FailoverServiceTest {
public void failoverMasterTest() {
processInstance.setHost(Constants.NULL);
masterTaskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
failoverService.failoverServerWhenDown(testMasterHost, NodeType.MASTER);
failoverService.failoverServerWhenDown(testMasterHost, RegistryNodeType.MASTER);
Assertions.assertNotEquals(masterTaskInstance.getState(), TaskExecutionStatus.NEED_FAULT_TOLERANCE);
processInstance.setHost(testMasterHost);
masterTaskInstance.setState(TaskExecutionStatus.SUCCESS);
failoverService.failoverServerWhenDown(testMasterHost, NodeType.MASTER);
failoverService.failoverServerWhenDown(testMasterHost, RegistryNodeType.MASTER);
Assertions.assertNotEquals(masterTaskInstance.getState(), TaskExecutionStatus.NEED_FAULT_TOLERANCE);
processInstance.setHost(testMasterHost);
masterTaskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);
failoverService.failoverServerWhenDown(testMasterHost, NodeType.MASTER);
failoverService.failoverServerWhenDown(testMasterHost, RegistryNodeType.MASTER);
Assertions.assertEquals(masterTaskInstance.getState(), TaskExecutionStatus.RUNNING_EXECUTION);
}
@ -200,7 +202,7 @@ public class FailoverServiceTest {
Mockito.when(cacheManager.getAll()).thenReturn(Lists.newArrayList(workflowExecuteRunnable));
Mockito.when(cacheManager.getByProcessInstanceId(Mockito.anyInt())).thenReturn(workflowExecuteRunnable);
failoverService.failoverServerWhenDown(testWorkerHost, NodeType.WORKER);
failoverService.failoverServerWhenDown(testWorkerHost, RegistryNodeType.WORKER);
Assertions.assertEquals(TaskExecutionStatus.NEED_FAULT_TOLERANCE, workerTaskInstance.getState());
}
}

67
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java

@ -21,11 +21,12 @@ import static com.google.common.base.Preconditions.checkArgument;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.AlertServerHeartBeat;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.commons.lang3.StringUtils;
@ -71,22 +72,8 @@ public class RegistryClient {
registry.connectUntilTimeout(duration);
}
public int getActiveMasterNum() {
Collection<String> childrenList = new ArrayList<>();
try {
// read master node parent path from conf
if (exists(rootNodePath(NodeType.MASTER))) {
childrenList = getChildrenKeys(rootNodePath(NodeType.MASTER));
}
} catch (Exception e) {
log.error("getActiveMasterNum error", e);
}
return childrenList.size();
}
public List<Server> getServerList(NodeType nodeType) {
Map<String, String> serverMaps = getServerMaps(nodeType);
String parentPath = rootNodePath(nodeType);
public List<Server> getServerList(RegistryNodeType registryNodeType) {
Map<String, String> serverMaps = getServerMaps(registryNodeType);
List<Server> serverList = new ArrayList<>();
for (Map.Entry<String, String> entry : serverMaps.entrySet()) {
@ -97,7 +84,7 @@ public class RegistryClient {
continue;
}
Server server = new Server();
switch (nodeType) {
switch (registryNodeType) {
case MASTER:
MasterHeartBeat masterHeartBeat = JSONUtils.parseObject(heartBeatJson, MasterHeartBeat.class);
server.setCreateTime(new Date(masterHeartBeat.getStartupTime()));
@ -110,11 +97,17 @@ public class RegistryClient {
server.setLastHeartbeatTime(new Date(workerHeartBeat.getReportTime()));
server.setId(workerHeartBeat.getProcessId());
break;
case ALERT_SERVER:
AlertServerHeartBeat alertServerHeartBeat =
JSONUtils.parseObject(heartBeatJson, AlertServerHeartBeat.class);
server.setCreateTime(new Date(alertServerHeartBeat.getStartupTime()));
server.setLastHeartbeatTime(new Date(alertServerHeartBeat.getReportTime()));
server.setId(alertServerHeartBeat.getProcessId());
}
server.setResInfo(heartBeatJson);
// todo: add host, port in heartBeat Info, so that we don't need to parse this again
server.setZkDirectory(parentPath + "/" + serverPath);
server.setZkDirectory(registryNodeType.getRegistryPath() + "/" + serverPath);
// set host and port
String[] hostAndPort = serverPath.split(Constants.COLON);
// fetch the last one
@ -128,13 +121,12 @@ public class RegistryClient {
/**
* Return server host:port -> value
*/
public Map<String, String> getServerMaps(NodeType nodeType) {
public Map<String, String> getServerMaps(RegistryNodeType nodeType) {
Map<String, String> serverMap = new HashMap<>();
try {
String path = rootNodePath(nodeType);
Collection<String> serverList = getServerNodes(nodeType);
for (String server : serverList) {
serverMap.putIfAbsent(server, get(path + Constants.SINGLE_SLASH + server));
serverMap.putIfAbsent(server, get(nodeType.getRegistryPath() + Constants.SINGLE_SLASH + server));
}
} catch (Exception e) {
log.error("get server list failed", e);
@ -143,14 +135,14 @@ public class RegistryClient {
return serverMap;
}
public boolean checkNodeExists(String host, NodeType nodeType) {
public boolean checkNodeExists(String host, RegistryNodeType nodeType) {
return getServerMaps(nodeType).keySet()
.stream()
.anyMatch(it -> it.contains(host));
}
public Collection<String> getMasterNodesDirectly() {
return getChildrenKeys(Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS);
return getChildrenKeys(RegistryNodeType.MASTER.getRegistryPath());
}
/**
@ -214,18 +206,18 @@ public class RegistryClient {
}
public boolean isMasterPath(String path) {
return path != null && path.startsWith(Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS);
return path != null && path.startsWith(RegistryNodeType.MASTER.getRegistryPath());
}
public boolean isWorkerPath(String path) {
return path != null && path.startsWith(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS);
return path != null && path.startsWith(RegistryNodeType.WORKER.getRegistryPath());
}
public Collection<String> getChildrenKeys(final String key) {
return registry.children(key);
}
public Set<String> getServerNodeSet(NodeType nodeType) {
public Set<String> getServerNodeSet(RegistryNodeType nodeType) {
try {
return new HashSet<>(getServerNodes(nodeType));
} catch (Exception e) {
@ -234,24 +226,13 @@ public class RegistryClient {
}
private void initNodes() {
registry.put(Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS, EMPTY, false);
registry.put(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS, EMPTY, false);
}
private String rootNodePath(NodeType type) {
switch (type) {
case MASTER:
return Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
case WORKER:
return Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
default:
throw new IllegalStateException("Should not reach here");
}
registry.put(RegistryNodeType.MASTER.getRegistryPath(), EMPTY, false);
registry.put(RegistryNodeType.WORKER.getRegistryPath(), EMPTY, false);
registry.put(RegistryNodeType.ALERT_SERVER.getRegistryPath(), EMPTY, false);
}
private Collection<String> getServerNodes(NodeType nodeType) {
final String path = rootNodePath(nodeType);
return getChildrenKeys(path);
private Collection<String> getServerNodes(RegistryNodeType nodeType) {
return getChildrenKeys(nodeType.getRegistryPath());
}
}

38
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertConfig.java → dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java

@ -15,32 +15,26 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert;
package org.apache.dolphinscheduler.registry.api.enums;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Component
@ConfigurationProperties("alert")
public final class AlertConfig {
@Getter
@AllArgsConstructor
public enum RegistryNodeType {
private int port;
ALL_SERVERS("nodes", "/nodes"),
MASTER("Master", "/nodes/master"),
MASTER_NODE_LOCK("MasterNodeLock", "/lock/master-node"),
MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
WORKER("Worker", "/nodes/worker"),
ALERT_SERVER("AlertServer", "/nodes/alert-server"),
ALERT_LOCK("AlertNodeLock", "/lock/alert"),
;
private int waitTimeout;
private final String name;
public int getPort() {
return port;
}
public void setPort(final int port) {
this.port = port;
}
private final String registryPath;
public int getWaitTimeout() {
return waitTimeout;
}
public void setWaitTimeout(final int waitTimeout) {
this.waitTimeout = waitTimeout;
}
}

4
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java vendored

@ -17,9 +17,9 @@
package org.apache.dolphinscheduler.service.cache.impl;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Message;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
@ -114,7 +114,7 @@ public class CacheNotifyServiceImpl implements CacheNotifyService {
public void notifyMaster(Message message) {
log.info("send result, command:{}", message.toString());
try {
List<Server> serverList = registryClient.getServerList(NodeType.MASTER);
List<Server> serverList = registryClient.getServerList(RegistryNodeType.MASTER);
if (CollectionUtils.isEmpty(serverList)) {
return;
}

4
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java vendored

@ -18,10 +18,10 @@
package org.apache.dolphinscheduler.service.cache;
import org.apache.dolphinscheduler.common.enums.CacheType;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.Message;
import org.apache.dolphinscheduler.remote.command.MessageType;
@ -84,7 +84,7 @@ public class CacheNotifyServiceTest {
server.setPort(serverConfig.getListenPort());
serverList.add(server);
Mockito.when(registryClient.getServerList(NodeType.MASTER)).thenReturn(serverList);
Mockito.when(registryClient.getServerList(RegistryNodeType.MASTER)).thenReturn(serverList);
cacheNotifyService.notifyMaster(cacheExpireMessage);

15
dolphinscheduler-standalone-server/src/main/resources/application.yaml

@ -141,9 +141,9 @@ master:
task-commit-interval: 1s
state-wheel-interval: 5s
# master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
max-cpu-load-avg: 50
max-cpu-load-avg: 80
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
reserved-memory: 0.03
reserved-memory: 0.1
# failover interval
failover-interval: 10m
# kill yarn/k8s application when failover taskInstance, default true
@ -164,15 +164,9 @@ worker:
#Scenes to be used for distributed users.For example,users created by FreeIpa are stored in LDAP.This parameter only applies to Linux, When this parameter is true, worker.tenant.auto.create has no effect and will not automatically create tenants.
tenant-distributed-user: false
# worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value -1: the number of cpu cores * 2
max-cpu-load-avg: -1
max-cpu-load-avg: 80
# worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G
reserved-memory: 0.3
# default worker groups separated by comma, like 'worker.groups=default,test'
groups:
- default
# alert server listen host
alert-listen-host: localhost
alert-listen-port: 50052
reserved-memory: 0.1
task-execute-threads-full-policy: REJECT
alert:
@ -180,6 +174,7 @@ alert:
# Mark each alert of alert server if late after x milliseconds as failed.
# Define value is (0 = infinite), and alert server would be waiting alert result.
wait-timeout: 0
heartbeat-interval: 60s
python-gateway:
# Weather enable python gateway server or not. The default value is true.

4
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -48,8 +48,6 @@ public class WorkerConfig implements Validator {
private boolean tenantDistributedUser = false;
private int maxCpuLoadAvg = -1;
private double reservedMemory = 0.3;
private String alertListenHost = "localhost";
private int alertListenPort = 50052;
private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties();
/**
@ -92,8 +90,6 @@ public class WorkerConfig implements Validator {
log.info("Worker config: tenantDistributedUser -> {}", tenantDistributedUser);
log.info("Worker config: maxCpuLoadAvg -> {}", maxCpuLoadAvg);
log.info("Worker config: reservedMemory -> {}", reservedMemory);
log.info("Worker config: alertListenHost -> {}", alertListenHost);
log.info("Worker config: alertListenPort -> {}", alertListenPort);
log.info("Worker config: registryDisconnectStrategy -> {}", registryDisconnectStrategy);
log.info("Worker config: workerAddress -> {}", registryDisconnectStrategy);
log.info("Worker config: workerRegistryPath: {}", workerRegistryPath);

7
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.remote.command.task.TaskDispatchMessage;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
import org.apache.dolphinscheduler.server.worker.runner.WorkerDelayTaskExecuteRunnable;
@ -71,6 +72,9 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
@Autowired(required = false)
private StorageOperate storageOperate;
@Autowired
private WorkerRegistryClient workerRegistryClient;
@Counted(value = "ds.task.execution.count", description = "task execute total count")
@Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override
@ -115,7 +119,8 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
workerMessageSender,
workerRpcClient,
taskPluginManager,
storageOperate)
storageOperate,
workerRegistryClient)
.createWorkerTaskExecuteRunnable();
// submit task to manager
boolean offer = workerManager.offer(workerTaskExecuteRunnable);

21
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java

@ -21,23 +21,30 @@ import static org.apache.dolphinscheduler.common.constants.Constants.SLEEP_TIME_
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.task.WorkerHeartBeatTask;
import org.apache.commons.collections4.CollectionUtils;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@Slf4j
@ -54,6 +61,7 @@ public class WorkerRegistryClient implements AutoCloseable {
private RegistryClient registryClient;
@Autowired
@Lazy
private WorkerConnectStrategy workerConnectStrategy;
private WorkerHeartBeatTask workerHeartBeatTask;
@ -87,7 +95,7 @@ public class WorkerRegistryClient implements AutoCloseable {
registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString(workerHeartBeat));
log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(), workerZKPath);
while (!registryClient.checkNodeExists(workerConfig.getWorkerAddress(), NodeType.WORKER)) {
while (!registryClient.checkNodeExists(workerConfig.getWorkerAddress(), RegistryNodeType.WORKER)) {
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
@ -98,6 +106,15 @@ public class WorkerRegistryClient implements AutoCloseable {
log.info("Worker node: {} registry finished", workerConfig.getWorkerAddress());
}
public Optional<Host> getAlertServerAddress() {
List<Server> serverList = registryClient.getServerList(RegistryNodeType.ALERT_SERVER);
if (CollectionUtils.isEmpty(serverList)) {
return Optional.empty();
}
Server server = serverList.get(0);
return Optional.of(new Host(server.getHost(), server.getPort()));
}
public void setRegistryStoppable(IStoppable stoppable) {
registryClient.setStoppable(stoppable);
}

7
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
@ -37,13 +38,15 @@ public class DefaultWorkerDelayTaskExecuteRunnable extends WorkerDelayTaskExecut
@NonNull WorkerMessageSender workerMessageSender,
@NonNull WorkerRpcClient workerRpcClient,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
@Nullable StorageOperate storageOperate,
@NonNull WorkerRegistryClient workerRegistryClient) {
super(taskExecutionContext,
workerConfig,
workerMessageSender,
workerRpcClient,
taskPluginManager,
storageOperate);
storageOperate,
workerRegistryClient);
}
@Override

10
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
@ -37,13 +38,15 @@ public class DefaultWorkerDelayTaskExecuteRunnableFactory
@NonNull WorkerMessageSender workerMessageSender,
@NonNull WorkerRpcClient workerRpcClient,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
@Nullable StorageOperate storageOperate,
@NonNull WorkerRegistryClient workerRegistryClient) {
super(taskExecutionContext,
workerConfig,
workerMessageSender,
workerRpcClient,
taskPluginManager,
storageOperate);
storageOperate,
workerRegistryClient);
}
@Override
@ -54,6 +57,7 @@ public class DefaultWorkerDelayTaskExecuteRunnableFactory
workerMessageSender,
workerRpcClient,
taskPluginManager,
storageOperate);
storageOperate,
workerRegistryClient);
}
}

7
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
@ -39,13 +40,15 @@ public abstract class WorkerDelayTaskExecuteRunnable extends WorkerTaskExecuteRu
@NonNull WorkerMessageSender workerMessageSender,
@NonNull WorkerRpcClient workerRpcClient,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
@Nullable StorageOperate storageOperate,
@NonNull WorkerRegistryClient workerRegistryClient) {
super(taskExecutionContext,
workerConfig,
workerMessageSender,
workerRpcClient,
taskPluginManager,
storageOperate);
storageOperate,
workerRegistryClient);
}
@Override

6
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
@ -38,6 +39,7 @@ public abstract class WorkerDelayTaskExecuteRunnableFactory<T extends WorkerDela
protected final @NonNull WorkerRpcClient workerRpcClient;
protected final @NonNull TaskPluginManager taskPluginManager;
protected final @Nullable StorageOperate storageOperate;
protected final @NonNull WorkerRegistryClient workerRegistryClient;
protected WorkerDelayTaskExecuteRunnableFactory(
@NonNull TaskExecutionContext taskExecutionContext,
@ -45,13 +47,15 @@ public abstract class WorkerDelayTaskExecuteRunnableFactory<T extends WorkerDela
@NonNull WorkerMessageSender workerMessageSender,
@NonNull WorkerRpcClient workerRpcClient,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
@Nullable StorageOperate storageOperate,
@NonNull WorkerRegistryClient workerRegistryClient) {
this.taskExecutionContext = taskExecutionContext;
this.workerConfig = workerConfig;
this.workerMessageSender = workerMessageSender;
this.workerRpcClient = workerRpcClient;
this.taskPluginManager = taskPluginManager;
this.storageOperate = storageOperate;
this.workerRegistryClient = workerRegistryClient;
}
public abstract T createWorkerTaskExecuteRunnable();

22
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java

@ -44,6 +44,7 @@ import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.log.TaskInstanceLogHeader;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionCheckerUtils;
@ -52,6 +53,7 @@ import org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils;
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Optional;
import javax.annotation.Nullable;
@ -72,6 +74,7 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
protected final TaskPluginManager taskPluginManager;
protected final @Nullable StorageOperate storageOperate;
protected final WorkerRpcClient workerRpcClient;
protected final WorkerRegistryClient workerRegistryClient;
protected @Nullable AbstractTask task;
@ -81,13 +84,15 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
@NonNull WorkerMessageSender workerMessageSender,
@NonNull WorkerRpcClient workerRpcClient,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
@Nullable StorageOperate storageOperate,
@NonNull WorkerRegistryClient workerRegistryClient) {
this.taskExecutionContext = taskExecutionContext;
this.workerConfig = workerConfig;
this.workerMessageSender = workerMessageSender;
this.workerRpcClient = workerRpcClient;
this.taskPluginManager = taskPluginManager;
this.storageOperate = storageOperate;
this.workerRegistryClient = workerRegistryClient;
}
protected abstract void executeTask(TaskCallBack taskCallBack);
@ -227,6 +232,15 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
if (!task.getNeedAlert()) {
return;
}
// todo: We need to send the alert to the master rather than directly send to the alert server
Optional<Host> alertServerAddressOptional = workerRegistryClient.getAlertServerAddress();
if (!alertServerAddressOptional.isPresent()) {
log.error("Cannot get alert server address, please check the alert server is running");
return;
}
Host alertServerAddress = alertServerAddressOptional.get();
log.info("The current task need to send alert, begin to send alert");
TaskExecutionStatus status = task.getExitStatus();
TaskAlertInfo taskAlertInfo = task.getTaskAlertInfo();
@ -238,10 +252,10 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
taskAlertInfo.getContent(),
strategy);
try {
workerRpcClient.send(Host.of(workerConfig.getAlertListenHost()), alertCommand.convert2Command());
log.info("Success send alert");
workerRpcClient.send(alertServerAddress, alertCommand.convert2Command());
log.info("Success send alert to : {}", alertServerAddress);
} catch (RemotingException e) {
log.error("Send alert failed, alertCommand: {}", alertCommand, e);
log.error("Send alert to: {} failed, alertCommand: {}", alertServerAddress, alertCommand, e);
}
}

7
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
@ -37,13 +38,15 @@ public class WorkerTaskExecuteRunnableFactoryBuilder {
@NonNull WorkerMessageSender workerMessageSender,
@NonNull WorkerRpcClient workerRpcClient,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
@Nullable StorageOperate storageOperate,
@NonNull WorkerRegistryClient workerRegistryClient) {
return new DefaultWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext,
workerConfig,
workerMessageSender,
workerRpcClient,
taskPluginManager,
storageOperate);
storageOperate,
workerRegistryClient);
}
}

3
dolphinscheduler-worker/src/main/resources/application.yaml

@ -52,9 +52,6 @@ worker:
max-cpu-load-avg: -1
# worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G
reserved-memory: 0.3
# alert server listen host
alert-listen-host: localhost
alert-listen-port: 50052
registry-disconnect-strategy:
# The disconnect strategy: stop, waiting
strategy: waiting

5
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java

@ -19,9 +19,9 @@ package org.apache.dolphinscheduler.server.worker.registry;
import static org.mockito.BDDMockito.given;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
@ -83,7 +83,8 @@ public class WorkerRegistryClientTest {
given(workerConfig.getWorkerAddress()).willReturn(NetUtils.getAddr(1234));
given(workerConfig.getHeartbeatInterval()).willReturn(Duration.ofSeconds(1));
given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(NodeType.class))).willReturn(true);
given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(RegistryNodeType.class)))
.willReturn(true);
workerRegistryClient.initWorkRegistry();
workerRegistryClient.start();

9
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
@ -46,6 +47,8 @@ public class DefaultWorkerDelayTaskExecuteRunnableTest {
private StorageOperate storageOperate = Mockito.mock(StorageOperate.class);
private WorkerRegistryClient workerRegistryClient = Mockito.mock(WorkerRegistryClient.class);
@Test
public void testDryRun() {
TaskExecutionContext taskExecutionContext = TaskExecutionContext.builder()
@ -60,7 +63,8 @@ public class DefaultWorkerDelayTaskExecuteRunnableTest {
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate);
storageOperate,
workerRegistryClient);
Assertions.assertAll(workerTaskExecuteRunnable::run);
Assertions.assertEquals(TaskExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus());
@ -84,7 +88,8 @@ public class DefaultWorkerDelayTaskExecuteRunnableTest {
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate);
storageOperate,
workerRegistryClient);
Assertions.assertAll(workerTaskExecuteRunnable::run);
Assertions.assertEquals(TaskExecutionStatus.FAILURE, taskExecutionContext.getCurrentExecutionStatus());

Loading…
Cancel
Save