diff --git a/deploy/docker/docker-compose.yml b/deploy/docker/docker-compose.yml index 046e03070b..6a5f0adcc2 100644 --- a/deploy/docker/docker-compose.yml +++ b/deploy/docker/docker-compose.yml @@ -98,6 +98,9 @@ services: interval: 30s timeout: 5s retries: 3 + depends_on: + dolphinscheduler-zookeeper: + condition: service_healthy volumes: - dolphinscheduler-logs:/opt/dolphinscheduler/logs networks: diff --git a/deploy/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml b/deploy/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml index eb06883a45..bae447c2cd 100644 --- a/deploy/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml +++ b/deploy/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml @@ -65,8 +65,6 @@ spec: value: {{ .Values.timezone }} - name: SPRING_JACKSON_TIME_ZONE value: {{ .Values.timezone }} - - name: WORKER_ALERT_LISTEN_HOST - value: {{ include "dolphinscheduler.fullname" . }}-alert {{- include "dolphinscheduler.database.env_vars" . | nindent 12 }} {{- include "dolphinscheduler.registry.env_vars" . | nindent 12 }} {{ range $key, $value := .Values.worker.env }} diff --git a/deploy/terraform/aws/dolphinscheduler-worker.tf b/deploy/terraform/aws/dolphinscheduler-worker.tf index 563764eb1d..97f402b5fb 100644 --- a/deploy/terraform/aws/dolphinscheduler-worker.tf +++ b/deploy/terraform/aws/dolphinscheduler-worker.tf @@ -15,9 +15,6 @@ # specific language governing permissions and limitations # under the License. -locals { - alert_server_ip = var.ds_component_replicas.alert > 0 ? aws_instance.alert[0].private_ip : aws_instance.standalone_server[0].private_ip -} resource "aws_security_group" "worker" { name = "worker_server_sg" @@ -60,7 +57,6 @@ data "template_file" "worker_user_data" { "database_username" = aws_db_instance.database.username "database_password" = aws_db_instance.database.password "zookeeper_connect_string" = var.zookeeper_connect_string != "" ? var.zookeeper_connect_string : aws_instance.zookeeper[0].private_ip - "alert_server_host" = local.alert_server_ip "s3_access_key_id" = aws_iam_access_key.s3.id "s3_secret_access_key" = aws_iam_access_key.s3.secret "s3_region" = var.aws_region diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/pom.xml b/dolphinscheduler-alert/dolphinscheduler-alert-server/pom.xml index fcdf991d9b..e4cf95bac6 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/pom.xml +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/pom.xml @@ -46,6 +46,11 @@ dolphinscheduler-dao + + org.apache.dolphinscheduler + dolphinscheduler-registry-all + + com.google.guava guava diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java index dd1e53bc17..f95e71a10c 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java @@ -17,16 +17,13 @@ package org.apache.dolphinscheduler.alert; +import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; +import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient; +import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer; +import org.apache.dolphinscheduler.alert.service.AlertBootstrapService; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.dao.PluginDao; -import org.apache.dolphinscheduler.remote.NettyRemotingServer; -import org.apache.dolphinscheduler.remote.factory.NettyRemotingServerFactory; -import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; - -import java.io.Closeable; -import java.util.List; import javax.annotation.PreDestroy; @@ -42,18 +39,16 @@ import org.springframework.context.event.EventListener; @SpringBootApplication @ComponentScan("org.apache.dolphinscheduler") @Slf4j -public class AlertServer implements Closeable { +public class AlertServer { @Autowired - private PluginDao pluginDao; - + private AlertBootstrapService alertBootstrapService; @Autowired - private AlertSenderService alertSenderService; + private AlertRpcServer alertRpcServer; @Autowired - private List nettyRequestProcessors; + private AlertPluginManager alertPluginManager; @Autowired - private AlertConfig alertConfig; - private NettyRemotingServer nettyRemotingServer; + private AlertRegistryClient alertRegistryClient; public static void main(String[] args) { Thread.currentThread().setName(Constants.THREAD_NAME_ALERT_SERVER); @@ -63,14 +58,13 @@ public class AlertServer implements Closeable { @EventListener public void run(ApplicationReadyEvent readyEvent) { log.info("Alert server is staring ..."); - - checkTable(); - startServer(); - alertSenderService.start(); + alertPluginManager.start(); + alertRegistryClient.start(); + alertBootstrapService.start(); + alertRpcServer.start(); log.info("Alert server is started ..."); } - @Override @PreDestroy public void close() { destroy("alert server destroy"); @@ -90,33 +84,18 @@ public class AlertServer implements Closeable { log.warn("AlterServer is already stopped"); return; } - log.info("Alert server is stopping, cause: {}", cause); - + try ( + AlertRpcServer closedAlertRpcServer = alertRpcServer; + AlertBootstrapService closedAlertBootstrapService = alertBootstrapService; + AlertRegistryClient closedAlertRegistryClient = alertRegistryClient) { + // close resource + } // thread sleep 3 seconds for thread quietly stop ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis()); - - // close - this.nettyRemotingServer.close(); log.info("Alter server stopped, cause: {}", cause); } catch (Exception e) { log.error("Alert server stop failed, cause: {}", cause, e); } } - - protected void checkTable() { - if (!pluginDao.checkPluginDefineTableExist()) { - log.error("Plugin Define Table t_ds_plugin_define Not Exist . Please Create it First !"); - System.exit(1); - } - } - - protected void startServer() { - nettyRemotingServer = NettyRemotingServerFactory.buildNettyRemotingServer(alertConfig.getPort()); - for (NettyRequestProcessor nettyRequestProcessor : nettyRequestProcessors) { - nettyRemotingServer.registerProcessor(nettyRequestProcessor); - log.info("Success register netty processor: {}", nettyRequestProcessor.getClass().getName()); - } - nettyRemotingServer.start(); - } } diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/config/AlertConfig.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/config/AlertConfig.java new file mode 100644 index 0000000000..0b1f5ee6e8 --- /dev/null +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/config/AlertConfig.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.alert.config; + +import org.apache.dolphinscheduler.common.utils.NetUtils; + +import java.time.Duration; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; +import org.springframework.validation.Errors; +import org.springframework.validation.Validator; + +@Slf4j +@Data +@Component +@ConfigurationProperties("alert") +public final class AlertConfig implements Validator { + + private int port; + + private int waitTimeout; + + private Duration heartbeatInterval = Duration.ofSeconds(60); + + private String alertServerAddress; + + @Override + public boolean supports(Class clazz) { + return AlertConfig.class.isAssignableFrom(clazz); + } + + @Override + public void validate(Object target, Errors errors) { + AlertConfig alertConfig = (AlertConfig) target; + + if (heartbeatInterval.getSeconds() <= 0) { + errors.rejectValue("heartbeat-interval", null, "should be a valid duration"); + } + + alertConfig.setAlertServerAddress(NetUtils.getAddr(port)); + printConfig(); + } + + private void printConfig() { + log.info("Alert config: port -> {}", port); + log.info("Alert config: alertServerAddress -> {}", alertServerAddress); + log.info("Alert config: heartbeatInterval -> {}", heartbeatInterval); + } +} diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServerMetrics.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java similarity index 97% rename from dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServerMetrics.java rename to dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java index 7a8b89a52d..4784aa3b62 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServerMetrics.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.alert; +package org.apache.dolphinscheduler.alert.metrics; import java.util.function.Supplier; diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java similarity index 81% rename from dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java rename to dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java index 047df6e3b0..81ebbe0ac4 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.alert; +package org.apache.dolphinscheduler.alert.plugin; import org.apache.dolphinscheduler.alert.api.AlertChannel; import org.apache.dolphinscheduler.alert.api.AlertChannelFactory; @@ -39,8 +39,6 @@ import java.util.Optional; import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; @Component @@ -53,24 +51,32 @@ public final class AlertPluginManager { this.pluginDao = pluginDao; } - private final Map channelKeyedById = new HashMap<>(); + private final Map alertPluginMap = new HashMap<>(); - private final PluginParams warningTypeParams = getWarningTypeParams(); + public void start() { + log.info("AlertPluginManager start ..."); + checkAlertPluginExist(); + installAlertPlugin(); + log.info("AlertPluginManager started ..."); + } - public PluginParams getWarningTypeParams() { - return RadioParam.newBuilder(AlertConstants.NAME_WARNING_TYPE, AlertConstants.WARNING_TYPE) - .addParamsOptions( - new ParamsOptions(WarningType.SUCCESS.getDescp(), WarningType.SUCCESS.getDescp(), false)) - .addParamsOptions( - new ParamsOptions(WarningType.FAILURE.getDescp(), WarningType.FAILURE.getDescp(), false)) - .addParamsOptions(new ParamsOptions(WarningType.ALL.getDescp(), WarningType.ALL.getDescp(), false)) - .setValue(WarningType.ALL.getDescp()) - .addValidate(Validate.newBuilder().setRequired(true).build()) - .build(); + public Optional getAlertChannel(int id) { + return Optional.ofNullable(alertPluginMap.get(id)); + } + + public int size() { + return alertPluginMap.size(); + } + + private void checkAlertPluginExist() { + if (!pluginDao.checkPluginDefineTableExist()) { + log.error("Plugin Define Table t_ds_plugin_define Not Exist . Please Create it First !"); + System.exit(1); + } } - @EventListener - public void installPlugin(ApplicationReadyEvent readyEvent) { + private void installAlertPlugin() { + final PluginParams warningTypeParams = getWarningTypeParams(); PrioritySPIFactory prioritySPIFactory = new PrioritySPIFactory<>(AlertChannelFactory.class); @@ -92,15 +98,19 @@ public final class AlertPluginManager { final PluginDefine pluginDefine = new PluginDefine(name, PluginType.ALERT.getDesc(), paramsJson); final int id = pluginDao.addOrUpdatePluginDefine(pluginDefine); - channelKeyedById.put(id, alertChannel); + alertPluginMap.put(id, alertChannel); } } - public Optional getAlertChannel(int id) { - return Optional.ofNullable(channelKeyedById.get(id)); - } - - public int size() { - return channelKeyedById.size(); + private PluginParams getWarningTypeParams() { + return RadioParam.newBuilder(AlertConstants.NAME_WARNING_TYPE, AlertConstants.WARNING_TYPE) + .addParamsOptions( + new ParamsOptions(WarningType.SUCCESS.getDescp(), WarningType.SUCCESS.getDescp(), false)) + .addParamsOptions( + new ParamsOptions(WarningType.FAILURE.getDescp(), WarningType.FAILURE.getDescp(), false)) + .addParamsOptions(new ParamsOptions(WarningType.ALL.getDescp(), WarningType.ALL.getDescp(), false)) + .setValue(WarningType.ALL.getDescp()) + .addValidate(Validate.newBuilder().setRequired(true).build()) + .build(); } } diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java new file mode 100644 index 0000000000..a5a0056d84 --- /dev/null +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.alert.registry; + +import org.apache.dolphinscheduler.alert.config.AlertConfig; +import org.apache.dolphinscheduler.common.model.AlertServerHeartBeat; +import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class AlertHeartbeatTask extends BaseHeartBeatTask { + + private final AlertConfig alertConfig; + private final Integer processId; + private final RegistryClient registryClient; + private final String heartBeatPath; + private final long startupTime; + + public AlertHeartbeatTask(AlertConfig alertConfig, + RegistryClient registryClient) { + super("AlertHeartbeatTask", alertConfig.getHeartbeatInterval().toMillis()); + this.startupTime = System.currentTimeMillis(); + this.alertConfig = alertConfig; + this.registryClient = registryClient; + this.heartBeatPath = + RegistryNodeType.ALERT_SERVER.getRegistryPath() + "/" + alertConfig.getAlertServerAddress(); + this.processId = OSUtils.getProcessID(); + } + + @Override + public AlertServerHeartBeat getHeartBeat() { + return AlertServerHeartBeat.builder() + .processId(processId) + .startupTime(startupTime) + .reportTime(System.currentTimeMillis()) + .cpuUsage(OSUtils.cpuUsage()) + .memoryUsage(OSUtils.memoryUsage()) + .loadAverage(OSUtils.loadAverage()) + .availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize()) + .alertServerAddress(alertConfig.getAlertServerAddress()) + .build(); + } + + @Override + public void writeHeartBeat(AlertServerHeartBeat heartBeat) { + String heartBeatJson = JSONUtils.toJsonString(heartBeat); + registryClient.persistEphemeral(heartBeatPath, heartBeatJson); + log.debug("Success write master heartBeatInfo into registry, masterRegistryPath: {}, heartBeatInfo: {}", + heartBeatPath, heartBeatJson); + } +} diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertRegistryClient.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertRegistryClient.java new file mode 100644 index 0000000000..a93742e54b --- /dev/null +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertRegistryClient.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.alert.registry; + +import org.apache.dolphinscheduler.alert.config.AlertConfig; +import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class AlertRegistryClient implements AutoCloseable { + + @Autowired + private RegistryClient registryClient; + + @Autowired + private AlertConfig alertConfig; + + private AlertHeartbeatTask alertHeartbeatTask; + + public void start() { + log.info("AlertRegistryClient starting..."); + registryClient.getLock(RegistryNodeType.ALERT_LOCK.getRegistryPath()); + alertHeartbeatTask = new AlertHeartbeatTask(alertConfig, registryClient); + alertHeartbeatTask.start(); + // start heartbeat task + log.info("AlertRegistryClient started..."); + } + + @Override + public void close() { + log.info("AlertRegistryClient closing..."); + alertHeartbeatTask.shutdown(); + registryClient.releaseLock(RegistryNodeType.ALERT_LOCK.getRegistryPath()); + log.info("AlertRegistryClient closed..."); + } +} diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRequestProcessor.java similarity index 83% rename from dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java rename to dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRequestProcessor.java index 9a9caa9f82..f0141a4108 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRequestProcessor.java @@ -15,8 +15,9 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.alert; +package org.apache.dolphinscheduler.alert.rpc; +import org.apache.dolphinscheduler.alert.service.AlertBootstrapService; import org.apache.dolphinscheduler.remote.command.Message; import org.apache.dolphinscheduler.remote.command.MessageType; import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequest; @@ -34,10 +35,10 @@ import io.netty.channel.Channel; @Slf4j public final class AlertRequestProcessor implements NettyRequestProcessor { - private final AlertSenderService alertSenderService; + private final AlertBootstrapService alertBootstrapService; - public AlertRequestProcessor(AlertSenderService alertSenderService) { - this.alertSenderService = alertSenderService; + public AlertRequestProcessor(AlertBootstrapService alertBootstrapService) { + this.alertBootstrapService = alertBootstrapService; } @Override @@ -46,7 +47,7 @@ public final class AlertRequestProcessor implements NettyRequestProcessor { log.info("Received command : {}", alertSendRequest); - AlertSendResponse alertSendResponse = alertSenderService.syncHandler( + AlertSendResponse alertSendResponse = alertBootstrapService.syncHandler( alertSendRequest.getGroupId(), alertSendRequest.getTitle(), alertSendRequest.getContent(), diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java new file mode 100644 index 0000000000..f5bd470359 --- /dev/null +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/rpc/AlertRpcServer.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.alert.rpc; + +import org.apache.dolphinscheduler.alert.config.AlertConfig; +import org.apache.dolphinscheduler.remote.NettyRemotingServer; +import org.apache.dolphinscheduler.remote.factory.NettyRemotingServerFactory; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; + +import java.util.List; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class AlertRpcServer implements AutoCloseable { + + @Autowired + private List nettyRequestProcessors; + @Autowired + private AlertConfig alertConfig; + + private NettyRemotingServer nettyRemotingServer; + + public void start() { + log.info("Starting alert rpc server..."); + nettyRemotingServer = NettyRemotingServerFactory.buildNettyRemotingServer(alertConfig.getPort()); + for (NettyRequestProcessor nettyRequestProcessor : nettyRequestProcessors) { + nettyRemotingServer.registerProcessor(nettyRequestProcessor); + log.info("Success register netty processor: {}", nettyRequestProcessor.getClass().getName()); + } + nettyRemotingServer.start(); + log.info("Started alert rpc server..."); + } + + @Override + public void close() throws Exception { + log.info("Closing alert rpc server..."); + nettyRemotingServer.close(); + log.info("Closed alert rpc server..."); + } +} diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java similarity index 94% rename from dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java rename to dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java index 20ce32f7ff..3394f83097 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/service/AlertBootstrapService.java @@ -15,18 +15,22 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.alert; +package org.apache.dolphinscheduler.alert.service; import org.apache.dolphinscheduler.alert.api.AlertChannel; import org.apache.dolphinscheduler.alert.api.AlertConstants; import org.apache.dolphinscheduler.alert.api.AlertData; import org.apache.dolphinscheduler.alert.api.AlertInfo; import org.apache.dolphinscheduler.alert.api.AlertResult; +import org.apache.dolphinscheduler.alert.config.AlertConfig; +import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics; +import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.AlertStatus; import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.AlertDao; @@ -50,28 +54,24 @@ import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.google.common.collect.Lists; @Service @Slf4j -public final class AlertSenderService extends Thread { +public final class AlertBootstrapService extends BaseDaemonThread implements AutoCloseable { - private final AlertDao alertDao; - private final AlertPluginManager alertPluginManager; - private final AlertConfig alertConfig; + @Autowired + private AlertDao alertDao; + @Autowired + private AlertPluginManager alertPluginManager; + @Autowired + private AlertConfig alertConfig; - public AlertSenderService(AlertDao alertDao, AlertPluginManager alertPluginManager, AlertConfig alertConfig) { - this.alertDao = alertDao; - this.alertPluginManager = alertPluginManager; - this.alertConfig = alertConfig; - } - - @Override - public synchronized void start() { - super.setName("AlertSenderService"); - super.start(); + public AlertBootstrapService() { + super("AlertBootstrapService"); } @Override @@ -301,4 +301,10 @@ public final class AlertSenderService extends Thread { return new AlertResult("false", e.getMessage()); } } + + @Override + public void close() { + log.info("Closed AlertBootstrapService..."); + } + } diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/application.yaml b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/application.yaml index 2bf44e1b36..a82718ec23 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/application.yaml +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/application.yaml @@ -64,6 +64,21 @@ alert: # Mark each alert of alert server if late after x milliseconds as failed. # Define value is (0 = infinite), and alert server would be waiting alert result. wait-timeout: 0 + heartbeat-interval: 60s + +registry: + type: zookeeper + zookeeper: + namespace: dolphinscheduler + connect-string: localhost:2181 + retry-policy: + base-sleep-time: 60ms + max-sleep: 300ms + max-retries: 5 + session-timeout: 30s + connection-timeout: 9s + block-until-connected: 600ms + digest: ~ metrics: enabled: true diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertPluginManagerTest.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertPluginManagerTest.java deleted file mode 100644 index aa505e1e68..0000000000 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertPluginManagerTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.alert; - -import static org.mockito.ArgumentMatchers.any; - -import org.apache.dolphinscheduler.dao.PluginDao; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -public class AlertPluginManagerTest { - - @Mock - private PluginDao pluginDao; - - @InjectMocks - private AlertPluginManager alertPluginManager; - - @Test - public void testAlertPluginManager() { - Mockito.when(pluginDao.addOrUpdatePluginDefine(any())).thenReturn(0); - - alertPluginManager.installPlugin(null); - - Assertions.assertEquals(1, alertPluginManager.size()); - - Assertions.assertNotNull(alertPluginManager.getAlertChannel(0)); - } -} diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertBootstrapServiceTest.java similarity index 89% rename from dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java rename to dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertBootstrapServiceTest.java index eb4183c0e0..745dbc9865 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertBootstrapServiceTest.java @@ -20,11 +20,11 @@ package org.apache.dolphinscheduler.alert.runner; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.dolphinscheduler.alert.AlertConfig; -import org.apache.dolphinscheduler.alert.AlertPluginManager; -import org.apache.dolphinscheduler.alert.AlertSenderService; import org.apache.dolphinscheduler.alert.api.AlertChannel; import org.apache.dolphinscheduler.alert.api.AlertResult; +import org.apache.dolphinscheduler.alert.config.AlertConfig; +import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; +import org.apache.dolphinscheduler.alert.service.AlertBootstrapService; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.PluginDao; @@ -47,9 +47,9 @@ import org.mockito.MockitoAnnotations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AlertSenderServiceTest { +public class AlertBootstrapServiceTest { - private static final Logger logger = LoggerFactory.getLogger(AlertSenderServiceTest.class); + private static final Logger logger = LoggerFactory.getLogger(AlertBootstrapServiceTest.class); @Mock private AlertDao alertDao; @@ -61,7 +61,7 @@ public class AlertSenderServiceTest { private AlertConfig alertConfig; @InjectMocks - private AlertSenderService alertSenderService; + private AlertBootstrapService alertBootstrapService; @BeforeEach public void before() { @@ -80,7 +80,7 @@ public class AlertSenderServiceTest { when(alertConfig.getWaitTimeout()).thenReturn(0); AlertSendResponse alertSendResponse = - alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); + alertBootstrapService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); Assertions.assertFalse(alertSendResponse.isSuccess()); alertSendResponse.getResResults().forEach(result -> logger .info("alert send response result, status:{}, message:{}", result.isSuccess(), result.getMessage())); @@ -101,7 +101,7 @@ public class AlertSenderServiceTest { when(pluginDao.getPluginDefineById(pluginDefineId)).thenReturn(pluginDefine); alertSendResponse = - alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); + alertBootstrapService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); Assertions.assertFalse(alertSendResponse.isSuccess()); alertSendResponse.getResResults().forEach(result -> logger .info("alert send response result, status:{}, message:{}", result.isSuccess(), result.getMessage())); @@ -113,7 +113,7 @@ public class AlertSenderServiceTest { when(alertConfig.getWaitTimeout()).thenReturn(0); alertSendResponse = - alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); + alertBootstrapService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); Assertions.assertFalse(alertSendResponse.isSuccess()); alertSendResponse.getResResults().forEach(result -> logger .info("alert send response result, status:{}, message:{}", result.isSuccess(), result.getMessage())); @@ -126,7 +126,7 @@ public class AlertSenderServiceTest { when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock)); alertSendResponse = - alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); + alertBootstrapService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); Assertions.assertFalse(alertSendResponse.isSuccess()); alertSendResponse.getResResults().forEach(result -> logger .info("alert send response result, status:{}, message:{}", result.isSuccess(), result.getMessage())); @@ -140,7 +140,7 @@ public class AlertSenderServiceTest { when(alertConfig.getWaitTimeout()).thenReturn(5000); alertSendResponse = - alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); + alertBootstrapService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); Assertions.assertTrue(alertSendResponse.isSuccess()); alertSendResponse.getResResults().forEach(result -> logger .info("alert send response result, status:{}, message:{}", result.isSuccess(), result.getMessage())); @@ -184,6 +184,6 @@ public class AlertSenderServiceTest { when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock)); Assertions.assertTrue(Boolean.parseBoolean(alertResult.getStatus())); when(alertDao.listInstanceByAlertGroupId(1)).thenReturn(new ArrayList<>()); - alertSenderService.send(alertList); + alertBootstrapService.send(alertList); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java index 20ac139065..70c75aca69 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java @@ -19,9 +19,9 @@ package org.apache.dolphinscheduler.api.service.impl; import org.apache.dolphinscheduler.api.rpc.ApiRpcClient; import org.apache.dolphinscheduler.api.service.MetricsCleanUpService; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.remote.command.workflow.WorkflowMetricsCleanUpRequest; import org.apache.dolphinscheduler.remote.utils.Host; @@ -46,7 +46,7 @@ public class MetricsCleanUpServiceImpl implements MetricsCleanUpService { public void cleanUpWorkflowMetricsByDefinitionCode(String workflowDefinitionCode) { WorkflowMetricsCleanUpRequest workflowMetricsCleanUpRequest = new WorkflowMetricsCleanUpRequest(); workflowMetricsCleanUpRequest.setProcessDefinitionCode(workflowDefinitionCode); - List masterNodeList = registryClient.getServerList(NodeType.MASTER); + List masterNodeList = registryClient.getServerList(RegistryNodeType.MASTER); for (Server server : masterNodeList) { try { final String host = String.format("%s:%s", server.getHost(), server.getPort()); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java index 2a0b89c6e1..945fabe72f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java @@ -20,13 +20,13 @@ package org.apache.dolphinscheduler.api.service.impl; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.MonitorService; import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.WorkerServerModel; import org.apache.dolphinscheduler.dao.MonitorDBDao; import org.apache.dolphinscheduler.dao.entity.MonitorRecord; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import java.util.HashMap; import java.util.List; @@ -130,8 +130,8 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic @Override public List getServerListFromRegistry(boolean isMaster) { return isMaster - ? registryClient.getServerList(NodeType.MASTER) - : registryClient.getServerList(NodeType.WORKER); + ? registryClient.getServerList(RegistryNodeType.MASTER) + : registryClient.getServerList(RegistryNodeType.WORKER); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index afbc75a03c..34aa50bb18 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; @@ -39,6 +38,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.collections4.CollectionUtils; @@ -187,7 +187,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro if (Strings.isNullOrEmpty(workerGroup.getAddrList())) { return null; } - Map serverMaps = registryClient.getServerMaps(NodeType.WORKER); + Map serverMaps = registryClient.getServerMaps(RegistryNodeType.WORKER); for (String addr : workerGroup.getAddrList().split(Constants.COMMA)) { if (!serverMaps.containsKey(addr)) { return addr; @@ -296,7 +296,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro .anyMatch(workerGroup -> Constants.DEFAULT_WORKER_GROUP.equals(workerGroup.getName())); if (!containDefaultWorkerGroups) { // there doesn't exist a default WorkerGroup, we will add all worker to the default worker group. - Set activeWorkerNodes = registryClient.getServerNodeSet(NodeType.WORKER); + Set activeWorkerNodes = registryClient.getServerNodeSet(RegistryNodeType.WORKER); WorkerGroup defaultWorkerGroup = new WorkerGroup(); defaultWorkerGroup.setName(Constants.DEFAULT_WORKER_GROUP); defaultWorkerGroup.setAddrList(String.join(Constants.COMMA, activeWorkerNodes)); @@ -363,7 +363,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro @Override public Map getWorkerAddressList() { Map result = new HashMap<>(); - Set serverNodeList = registryClient.getServerNodeSet(NodeType.WORKER); + Set serverNodeList = registryClient.getServerNodeSet(RegistryNodeType.WORKER); result.put(Constants.DATA_LIST, serverNodeList); putMsg(result, Status.SUCCESS); return result; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java index 73abb63fa0..8cda4eb81a 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java @@ -24,12 +24,12 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import org.apache.dolphinscheduler.api.utils.Result; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import java.util.HashMap; import java.util.Map; @@ -66,7 +66,7 @@ public class WorkerGroupControllerTest extends AbstractControllerTest { Map serverMaps = new HashMap<>(); serverMaps.put("192.168.0.1", "192.168.0.1"); serverMaps.put("192.168.0.2", "192.168.0.2"); - Mockito.when(registryClient.getServerMaps(NodeType.WORKER)).thenReturn(serverMaps); + Mockito.when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps); MultiValueMap paramsMap = new LinkedMultiValueMap<>(); paramsMap.add("name", "cxc_work_group"); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java index 0d80d288f7..3aba92d24b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java @@ -24,13 +24,13 @@ import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl; import org.apache.dolphinscheduler.api.service.impl.MonitorServiceImpl; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.MonitorDBDao; import org.apache.dolphinscheduler.dao.entity.MonitorRecord; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.commons.collections4.CollectionUtils; @@ -103,7 +103,7 @@ public class MonitorServiceTest { @Test public void testQueryMaster() { mockPermissionCheck(ApiFuncIdentificationConstant.MONITOR_MASTER_VIEW, true); - Mockito.when(registryClient.getServerList(NodeType.MASTER)).thenReturn(getServerList()); + Mockito.when(registryClient.getServerList(RegistryNodeType.MASTER)).thenReturn(getServerList()); Map result = monitorService.queryMaster(user); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); @@ -115,7 +115,7 @@ public class MonitorServiceTest { @Test public void testQueryWorker() { mockPermissionCheck(ApiFuncIdentificationConstant.MONITOR_WORKER_VIEW, true); - Mockito.when(registryClient.getServerList(NodeType.WORKER)).thenReturn(getServerList()); + Mockito.when(registryClient.getServerList(RegistryNodeType.WORKER)).thenReturn(getServerList()); Map result = monitorService.queryWorker(user); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java index 4393a42c07..bf2b9363b2 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java @@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.api.service.impl.WorkerGroupServiceImpl; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -38,6 +37,7 @@ import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapp import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.ArrayList; @@ -155,7 +155,7 @@ public class WorkerGroupServiceTest { Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null); Map serverMaps = new HashMap<>(); serverMaps.put("localhost1:0000", ""); - Mockito.when(registryClient.getServerMaps(NodeType.WORKER)).thenReturn(serverMaps); + Mockito.when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps); Map result = workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", ""); @@ -175,7 +175,7 @@ public class WorkerGroupServiceTest { Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null); Map serverMaps = new HashMap<>(); serverMaps.put("localhost:0000", ""); - Mockito.when(registryClient.getServerMaps(NodeType.WORKER)).thenReturn(serverMaps); + Mockito.when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps); Mockito.when(workerGroupMapper.insert(any())).thenReturn(1); Map result = @@ -197,7 +197,7 @@ public class WorkerGroupServiceTest { Set activeWorkerNodes = new HashSet<>(); activeWorkerNodes.add("localhost:12345"); activeWorkerNodes.add("localhost:23456"); - Mockito.when(registryClient.getServerNodeSet(NodeType.WORKER)).thenReturn(activeWorkerNodes); + Mockito.when(registryClient.getServerNodeSet(RegistryNodeType.WORKER)).thenReturn(activeWorkerNodes); Result result = workerGroupService.queryAllGroupPaging(loginUser, 1, 1, null); Assertions.assertEquals(result.getCode(), Status.SUCCESS.getCode()); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java index 9fb85c9dd0..ec28208dd3 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java @@ -35,14 +35,8 @@ public final class Constants { */ public static final String COMMON_PROPERTIES_PATH = "/common.properties"; - /** - * registry properties - */ public static final String REGISTRY_DOLPHINSCHEDULER_MASTERS = "/nodes/master"; public static final String REGISTRY_DOLPHINSCHEDULER_WORKERS = "/nodes/worker"; - public static final String REGISTRY_DOLPHINSCHEDULER_NODE = "/nodes"; - public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS = "/lock/masters"; - public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters"; public static final String FORMAT_SS = "%s%s"; public static final String FORMAT_S_S = "%s/%s"; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/NodeType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java similarity index 59% rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/NodeType.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java index 548b951a30..a89c903187 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/NodeType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java @@ -15,8 +15,25 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.common.enums; +package org.apache.dolphinscheduler.common.model; -public enum NodeType { - MASTER, WORKER, DEAD_SERVER +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class AlertServerHeartBeat implements HeartBeat { + + private int processId; + private long startupTime; + private long reportTime; + private double cpuUsage; + private double memoryUsage; + private double loadAverage; + private double availablePhysicalMemorySize; + private String alertServerAddress; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java index e703ce0f17..9bdddc25ae 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java @@ -39,9 +39,9 @@ public abstract class BaseHeartBeatTask extends BaseDaemonThread { @Override public synchronized void start() { - log.info("Starting {}", threadName); + log.info("Starting {}...", threadName); super.start(); - log.info("Started {}, heartBeatInterval: {}", threadName, heartBeatInterval); + log.info("Started {}, heartBeatInterval: {}...", threadName, heartBeatInterval); } @Override @@ -68,8 +68,8 @@ public abstract class BaseHeartBeatTask extends BaseDaemonThread { } public void shutdown() { - log.warn("{} task finished", threadName); runningFlag = false; + log.warn("{} finished...", threadName); } private void handleInterruptException(InterruptedException ex) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index bbd90eb477..5cbdd4d310 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -17,10 +17,9 @@ package org.apache.dolphinscheduler.server.master.config; -import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; - import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector; import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; @@ -145,7 +144,8 @@ public class MasterConfig implements Validator { } masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort())); - masterConfig.setMasterRegistryPath(REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterConfig.getMasterAddress()); + masterConfig.setMasterRegistryPath( + RegistryNodeType.MASTER.getRegistryPath() + "/" + masterConfig.getMasterAddress()); printConfig(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index 14e4fa24e9..f2aaf417ed 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -17,16 +17,15 @@ package org.apache.dolphinscheduler.server.master.registry; -import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_NODE; import static org.apache.dolphinscheduler.common.constants.Constants.SLEEP_TIME_MILLIS; import org.apache.dolphinscheduler.common.IStoppable; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.RegistryException; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.service.FailoverService; import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask; @@ -67,7 +66,7 @@ public class MasterRegistryClient implements AutoCloseable { registry(); registryClient.addConnectionStateListener( new MasterConnectionStateListener(masterConfig, registryClient, masterConnectStrategy)); - registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener()); + registryClient.subscribe(RegistryNodeType.ALL_SERVERS.getRegistryPath(), new MasterRegistryDataListener()); } catch (Exception e) { throw new RegistryException("Master registry client start up error", e); } @@ -86,11 +85,11 @@ public class MasterRegistryClient implements AutoCloseable { /** * remove master node path * - * @param path node path + * @param path node path * @param nodeType node type * @param failover is failover */ - public void removeMasterNodePath(String path, NodeType nodeType, boolean failover) { + public void removeMasterNodePath(String path, RegistryNodeType nodeType, boolean failover) { log.info("{} node deleted : {}", nodeType, path); if (StringUtils.isEmpty(path)) { @@ -124,7 +123,7 @@ public class MasterRegistryClient implements AutoCloseable { * @param nodeType node type * @param failover is failover */ - public void removeWorkerNodePath(String path, NodeType nodeType, boolean failover) { + public void removeWorkerNodePath(String path, RegistryNodeType nodeType, boolean failover) { log.info("{} node deleted : {}", nodeType, path); try { String serverHost = null; @@ -158,7 +157,7 @@ public class MasterRegistryClient implements AutoCloseable { registryClient.remove(masterRegistryPath); registryClient.persistEphemeral(masterRegistryPath, JSONUtils.toJsonString(masterHeartBeatTask.getHeartBeat())); - while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) { + while (!registryClient.checkNodeExists(NetUtils.getHost(), RegistryNodeType.MASTER)) { log.warn("The current master server node:{} cannot find in registry", NetUtils.getHost()); ThreadUtils.sleep(SLEEP_TIME_MILLIS); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java index 513db20059..84d484f74a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java @@ -17,13 +17,10 @@ package org.apache.dolphinscheduler.server.master.registry; -import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; -import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; - import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.registry.api.Event; import org.apache.dolphinscheduler.registry.api.SubscribeListener; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import lombok.extern.slf4j.Slf4j; @@ -46,9 +43,9 @@ public class MasterRegistryDataListener implements SubscribeListener { return; } // monitor master - if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS + Constants.SINGLE_SLASH)) { + if (path.startsWith(RegistryNodeType.MASTER.getRegistryPath() + Constants.SINGLE_SLASH)) { handleMasterEvent(event); - } else if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH)) { + } else if (path.startsWith(RegistryNodeType.WORKER.getRegistryPath() + Constants.SINGLE_SLASH)) { // monitor worker handleWorkerEvent(event); } @@ -61,7 +58,7 @@ public class MasterRegistryDataListener implements SubscribeListener { log.info("master node added : {}", path); break; case REMOVE: - masterRegistryClient.removeMasterNodePath(path, NodeType.MASTER, true); + masterRegistryClient.removeMasterNodePath(path, RegistryNodeType.MASTER, true); break; default: @@ -77,7 +74,7 @@ public class MasterRegistryDataListener implements SubscribeListener { break; case REMOVE: log.info("worker node deleted : {}", path); - masterRegistryClient.removeWorkerNodePath(path, NodeType.WORKER, true); + masterRegistryClient.removeWorkerNodePath(path, RegistryNodeType.WORKER, true); break; default: break; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index d04b7d0510..3e2ba337ea 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -17,11 +17,7 @@ package org.apache.dolphinscheduler.server.master.registry; -import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; -import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; - import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -32,6 +28,7 @@ import org.apache.dolphinscheduler.registry.api.Event; import org.apache.dolphinscheduler.registry.api.Event.Type; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.SubscribeListener; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException; @@ -140,11 +137,8 @@ public class ServerNodeManager implements InitializingBean { masterConfig.getWorkerGroupRefreshInterval().getSeconds(), TimeUnit.SECONDS); - // init MasterNodeListener listener - registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener()); - - // init WorkerNodeListener listener - registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener()); + registryClient.subscribe(RegistryNodeType.MASTER.getRegistryPath(), new MasterDataListener()); + registryClient.subscribe(RegistryNodeType.WORKER.getRegistryPath(), new WorkerDataListener()); } class WorkerNodeInfoAndGroupDbSyncTask implements Runnable { @@ -238,11 +232,11 @@ public class ServerNodeManager implements InitializingBean { currentSlot = 0; totalSlot = 0; this.masterNodes.clear(); - String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS; + String nodeLock = RegistryNodeType.MASTER_NODE_LOCK.getRegistryPath(); try { registryClient.getLock(nodeLock); Collection currentNodes = registryClient.getMasterNodesDirectly(); - List masterNodeList = registryClient.getServerList(NodeType.MASTER); + List masterNodeList = registryClient.getServerList(RegistryNodeType.MASTER); syncMasterNodes(currentNodes, masterNodeList); } catch (Exception e) { log.error("update master nodes error", e); @@ -255,7 +249,7 @@ public class ServerNodeManager implements InitializingBean { private void updateWorkerNodes() { workerGroupWriteLock.lock(); try { - Map workerNodeMaps = registryClient.getServerMaps(NodeType.WORKER); + Map workerNodeMaps = registryClient.getServerMaps(RegistryNodeType.WORKER); for (Map.Entry entry : workerNodeMaps.entrySet()) { workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class)); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java index 46188f1719..b90251c2ff 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java @@ -17,16 +17,13 @@ package org.apache.dolphinscheduler.server.master.service; -import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -/** - * failover service - */ @Component @Slf4j public class FailoverService { @@ -46,7 +43,7 @@ public class FailoverService { * @param serverHost server host * @param nodeType node type */ - public void failoverServerWhenDown(String serverHost, NodeType nodeType) { + public void failoverServerWhenDown(String serverHost, RegistryNodeType nodeType) { switch (nodeType) { case MASTER: log.info("Master failover starting, masterServer: {}", serverHost); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java index a95cb9debb..45504a9a86 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java @@ -18,11 +18,11 @@ package org.apache.dolphinscheduler.server.master.service; import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; @@ -76,7 +76,8 @@ public class MasterFailoverService { List needFailoverMasterHosts = processService.queryNeedFailoverProcessInstanceHost() .stream() // failover myself || dead server - .filter(host -> localAddress.equals(host) || !registryClient.checkNodeExists(host, NodeType.MASTER)) + .filter(host -> localAddress.equals(host) + || !registryClient.checkNodeExists(host, RegistryNodeType.MASTER)) .distinct() .collect(Collectors.toList()); if (CollectionUtils.isEmpty(needFailoverMasterHosts)) { @@ -90,7 +91,7 @@ public class MasterFailoverService { } public void failoverMaster(String masterHost) { - String failoverPath = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + masterHost; + String failoverPath = RegistryNodeType.MASTER_FAILOVER_LOCK + "/" + masterHost; try { registryClient.getLock(failoverPath); doFailoverMaster(masterHost); @@ -111,8 +112,9 @@ public class MasterFailoverService { private void doFailoverMaster(@NonNull String masterHost) { StopWatch failoverTimeCost = StopWatch.createStarted(); - Optional masterStartupTimeOptional = getServerStartupTime(registryClient.getServerList(NodeType.MASTER), - masterHost); + Optional masterStartupTimeOptional = + getServerStartupTime(registryClient.getServerList(RegistryNodeType.MASTER), + masterHost); List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances( masterHost); if (CollectionUtils.isEmpty(needFailoverProcessInstanceList)) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java index 0c9e4d3102..9fa001a222 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.service; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.Flag; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; @@ -29,6 +28,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.server.master.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; @@ -105,7 +105,7 @@ public class WorkerFailoverService { // we query the task instance from cache, so that we can directly update the cache final Optional needFailoverWorkerStartTime = - getServerStartupTime(registryClient.getServerList(NodeType.WORKER), workerHost); + getServerStartupTime(registryClient.getServerList(RegistryNodeType.WORKER), workerHost); final List needFailoverTaskInstanceList = getNeedFailoverTaskInstance(workerHost); if (CollectionUtils.isEmpty(needFailoverTaskInstanceList)) { diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java index cf894a0fcb..269650741d 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java @@ -20,11 +20,11 @@ package org.apache.dolphinscheduler.server.master.registry; import static org.mockito.BDDMockito.given; import org.apache.dolphinscheduler.common.enums.CommandType; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -94,9 +94,9 @@ public class MasterRegistryClientTest { @Test public void removeNodePathTest() { - masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, false); - masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, true); + masterRegistryClient.removeMasterNodePath("/path", RegistryNodeType.MASTER, false); + masterRegistryClient.removeMasterNodePath("/path", RegistryNodeType.MASTER, true); // Cannot mock static methods - masterRegistryClient.removeWorkerNodePath("/path", NodeType.WORKER, true); + masterRegistryClient.removeWorkerNodePath("/path", RegistryNodeType.WORKER, true); } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java index 030221ef6e..d32224c7b1 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.doNothing; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; @@ -32,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.event.StateEvent; @@ -166,8 +166,10 @@ public class FailoverServiceTest { workerServer.setPort(workerPort); workerServer.setCreateTime(new Date()); - given(registryClient.getServerList(NodeType.WORKER)).willReturn(new ArrayList<>(Arrays.asList(workerServer))); - given(registryClient.getServerList(NodeType.MASTER)).willReturn(new ArrayList<>(Arrays.asList(masterServer))); + given(registryClient.getServerList(RegistryNodeType.WORKER)) + .willReturn(new ArrayList<>(Arrays.asList(workerServer))); + given(registryClient.getServerList(RegistryNodeType.MASTER)) + .willReturn(new ArrayList<>(Arrays.asList(masterServer))); doNothing().when(workflowExecuteThreadPool).submitStateEvent(Mockito.any(StateEvent.class)); } @@ -176,17 +178,17 @@ public class FailoverServiceTest { public void failoverMasterTest() { processInstance.setHost(Constants.NULL); masterTaskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION); - failoverService.failoverServerWhenDown(testMasterHost, NodeType.MASTER); + failoverService.failoverServerWhenDown(testMasterHost, RegistryNodeType.MASTER); Assertions.assertNotEquals(masterTaskInstance.getState(), TaskExecutionStatus.NEED_FAULT_TOLERANCE); processInstance.setHost(testMasterHost); masterTaskInstance.setState(TaskExecutionStatus.SUCCESS); - failoverService.failoverServerWhenDown(testMasterHost, NodeType.MASTER); + failoverService.failoverServerWhenDown(testMasterHost, RegistryNodeType.MASTER); Assertions.assertNotEquals(masterTaskInstance.getState(), TaskExecutionStatus.NEED_FAULT_TOLERANCE); processInstance.setHost(testMasterHost); masterTaskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION); - failoverService.failoverServerWhenDown(testMasterHost, NodeType.MASTER); + failoverService.failoverServerWhenDown(testMasterHost, RegistryNodeType.MASTER); Assertions.assertEquals(masterTaskInstance.getState(), TaskExecutionStatus.RUNNING_EXECUTION); } @@ -200,7 +202,7 @@ public class FailoverServiceTest { Mockito.when(cacheManager.getAll()).thenReturn(Lists.newArrayList(workflowExecuteRunnable)); Mockito.when(cacheManager.getByProcessInstanceId(Mockito.anyInt())).thenReturn(workflowExecuteRunnable); - failoverService.failoverServerWhenDown(testWorkerHost, NodeType.WORKER); + failoverService.failoverServerWhenDown(testWorkerHost, RegistryNodeType.WORKER); Assertions.assertEquals(TaskExecutionStatus.NEED_FAULT_TOLERANCE, workerTaskInstance.getState()); } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java index 3153b4c1f0..4423a0ffd1 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java @@ -21,11 +21,12 @@ import static com.google.common.base.Preconditions.checkArgument; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.common.model.AlertServerHeartBeat; import org.apache.dolphinscheduler.common.model.MasterHeartBeat; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.commons.lang3.StringUtils; @@ -71,22 +72,8 @@ public class RegistryClient { registry.connectUntilTimeout(duration); } - public int getActiveMasterNum() { - Collection childrenList = new ArrayList<>(); - try { - // read master node parent path from conf - if (exists(rootNodePath(NodeType.MASTER))) { - childrenList = getChildrenKeys(rootNodePath(NodeType.MASTER)); - } - } catch (Exception e) { - log.error("getActiveMasterNum error", e); - } - return childrenList.size(); - } - - public List getServerList(NodeType nodeType) { - Map serverMaps = getServerMaps(nodeType); - String parentPath = rootNodePath(nodeType); + public List getServerList(RegistryNodeType registryNodeType) { + Map serverMaps = getServerMaps(registryNodeType); List serverList = new ArrayList<>(); for (Map.Entry entry : serverMaps.entrySet()) { @@ -97,7 +84,7 @@ public class RegistryClient { continue; } Server server = new Server(); - switch (nodeType) { + switch (registryNodeType) { case MASTER: MasterHeartBeat masterHeartBeat = JSONUtils.parseObject(heartBeatJson, MasterHeartBeat.class); server.setCreateTime(new Date(masterHeartBeat.getStartupTime())); @@ -110,11 +97,17 @@ public class RegistryClient { server.setLastHeartbeatTime(new Date(workerHeartBeat.getReportTime())); server.setId(workerHeartBeat.getProcessId()); break; + case ALERT_SERVER: + AlertServerHeartBeat alertServerHeartBeat = + JSONUtils.parseObject(heartBeatJson, AlertServerHeartBeat.class); + server.setCreateTime(new Date(alertServerHeartBeat.getStartupTime())); + server.setLastHeartbeatTime(new Date(alertServerHeartBeat.getReportTime())); + server.setId(alertServerHeartBeat.getProcessId()); } server.setResInfo(heartBeatJson); // todo: add host, port in heartBeat Info, so that we don't need to parse this again - server.setZkDirectory(parentPath + "/" + serverPath); + server.setZkDirectory(registryNodeType.getRegistryPath() + "/" + serverPath); // set host and port String[] hostAndPort = serverPath.split(Constants.COLON); // fetch the last one @@ -128,13 +121,12 @@ public class RegistryClient { /** * Return server host:port -> value */ - public Map getServerMaps(NodeType nodeType) { + public Map getServerMaps(RegistryNodeType nodeType) { Map serverMap = new HashMap<>(); try { - String path = rootNodePath(nodeType); Collection serverList = getServerNodes(nodeType); for (String server : serverList) { - serverMap.putIfAbsent(server, get(path + Constants.SINGLE_SLASH + server)); + serverMap.putIfAbsent(server, get(nodeType.getRegistryPath() + Constants.SINGLE_SLASH + server)); } } catch (Exception e) { log.error("get server list failed", e); @@ -143,14 +135,14 @@ public class RegistryClient { return serverMap; } - public boolean checkNodeExists(String host, NodeType nodeType) { + public boolean checkNodeExists(String host, RegistryNodeType nodeType) { return getServerMaps(nodeType).keySet() .stream() .anyMatch(it -> it.contains(host)); } public Collection getMasterNodesDirectly() { - return getChildrenKeys(Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS); + return getChildrenKeys(RegistryNodeType.MASTER.getRegistryPath()); } /** @@ -214,18 +206,18 @@ public class RegistryClient { } public boolean isMasterPath(String path) { - return path != null && path.startsWith(Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS); + return path != null && path.startsWith(RegistryNodeType.MASTER.getRegistryPath()); } public boolean isWorkerPath(String path) { - return path != null && path.startsWith(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS); + return path != null && path.startsWith(RegistryNodeType.WORKER.getRegistryPath()); } public Collection getChildrenKeys(final String key) { return registry.children(key); } - public Set getServerNodeSet(NodeType nodeType) { + public Set getServerNodeSet(RegistryNodeType nodeType) { try { return new HashSet<>(getServerNodes(nodeType)); } catch (Exception e) { @@ -234,24 +226,13 @@ public class RegistryClient { } private void initNodes() { - registry.put(Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS, EMPTY, false); - registry.put(Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS, EMPTY, false); - } - - private String rootNodePath(NodeType type) { - switch (type) { - case MASTER: - return Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; - case WORKER: - return Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; - default: - throw new IllegalStateException("Should not reach here"); - } + registry.put(RegistryNodeType.MASTER.getRegistryPath(), EMPTY, false); + registry.put(RegistryNodeType.WORKER.getRegistryPath(), EMPTY, false); + registry.put(RegistryNodeType.ALERT_SERVER.getRegistryPath(), EMPTY, false); } - private Collection getServerNodes(NodeType nodeType) { - final String path = rootNodePath(nodeType); - return getChildrenKeys(path); + private Collection getServerNodes(RegistryNodeType nodeType) { + return getChildrenKeys(nodeType.getRegistryPath()); } } diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertConfig.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java similarity index 57% rename from dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertConfig.java rename to dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java index 534fedd358..f5c07e66f6 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertConfig.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java @@ -15,32 +15,26 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.alert; +package org.apache.dolphinscheduler.registry.api.enums; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.stereotype.Component; +import lombok.AllArgsConstructor; +import lombok.Getter; -@Component -@ConfigurationProperties("alert") -public final class AlertConfig { +@Getter +@AllArgsConstructor +public enum RegistryNodeType { - private int port; + ALL_SERVERS("nodes", "/nodes"), + MASTER("Master", "/nodes/master"), + MASTER_NODE_LOCK("MasterNodeLock", "/lock/master-node"), + MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"), + WORKER("Worker", "/nodes/worker"), + ALERT_SERVER("AlertServer", "/nodes/alert-server"), + ALERT_LOCK("AlertNodeLock", "/lock/alert"), + ; - private int waitTimeout; + private final String name; - public int getPort() { - return port; - } + private final String registryPath; - public void setPort(final int port) { - this.port = port; - } - - public int getWaitTimeout() { - return waitTimeout; - } - - public void setWaitTimeout(final int waitTimeout) { - this.waitTimeout = waitTimeout; - } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java index 94e4054b26..f48820dc80 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java @@ -17,9 +17,9 @@ package org.apache.dolphinscheduler.service.cache.impl; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Message; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; @@ -114,7 +114,7 @@ public class CacheNotifyServiceImpl implements CacheNotifyService { public void notifyMaster(Message message) { log.info("send result, command:{}", message.toString()); try { - List serverList = registryClient.getServerList(NodeType.MASTER); + List serverList = registryClient.getServerList(RegistryNodeType.MASTER); if (CollectionUtils.isEmpty(serverList)) { return; } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java index bf1dc575f8..6714ad2a29 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java @@ -18,10 +18,10 @@ package org.apache.dolphinscheduler.service.cache; import org.apache.dolphinscheduler.common.enums.CacheType; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.Message; import org.apache.dolphinscheduler.remote.command.MessageType; @@ -84,7 +84,7 @@ public class CacheNotifyServiceTest { server.setPort(serverConfig.getListenPort()); serverList.add(server); - Mockito.when(registryClient.getServerList(NodeType.MASTER)).thenReturn(serverList); + Mockito.when(registryClient.getServerList(RegistryNodeType.MASTER)).thenReturn(serverList); cacheNotifyService.notifyMaster(cacheExpireMessage); diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index afacc8f7d7..404b30ad69 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -141,9 +141,9 @@ master: task-commit-interval: 1s state-wheel-interval: 5s # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2 - max-cpu-load-avg: 50 + max-cpu-load-avg: 80 # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G - reserved-memory: 0.03 + reserved-memory: 0.1 # failover interval failover-interval: 10m # kill yarn/k8s application when failover taskInstance, default true @@ -164,15 +164,9 @@ worker: #Scenes to be used for distributed users.For example,users created by FreeIpa are stored in LDAP.This parameter only applies to Linux, When this parameter is true, worker.tenant.auto.create has no effect and will not automatically create tenants. tenant-distributed-user: false # worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value -1: the number of cpu cores * 2 - max-cpu-load-avg: -1 + max-cpu-load-avg: 80 # worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G - reserved-memory: 0.3 - # default worker groups separated by comma, like 'worker.groups=default,test' - groups: - - default - # alert server listen host - alert-listen-host: localhost - alert-listen-port: 50052 + reserved-memory: 0.1 task-execute-threads-full-policy: REJECT alert: @@ -180,6 +174,7 @@ alert: # Mark each alert of alert server if late after x milliseconds as failed. # Define value is (0 = infinite), and alert server would be waiting alert result. wait-timeout: 0 + heartbeat-interval: 60s python-gateway: # Weather enable python gateway server or not. The default value is true. diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 93b705aa22..7154dc3878 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -48,8 +48,6 @@ public class WorkerConfig implements Validator { private boolean tenantDistributedUser = false; private int maxCpuLoadAvg = -1; private double reservedMemory = 0.3; - private String alertListenHost = "localhost"; - private int alertListenPort = 50052; private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties(); /** @@ -92,8 +90,6 @@ public class WorkerConfig implements Validator { log.info("Worker config: tenantDistributedUser -> {}", tenantDistributedUser); log.info("Worker config: maxCpuLoadAvg -> {}", maxCpuLoadAvg); log.info("Worker config: reservedMemory -> {}", reservedMemory); - log.info("Worker config: alertListenHost -> {}", alertListenHost); - log.info("Worker config: alertListenPort -> {}", alertListenPort); log.info("Worker config: registryDisconnectStrategy -> {}", registryDisconnectStrategy); log.info("Worker config: workerAddress -> {}", registryDisconnectStrategy); log.info("Worker config: workerRegistryPath: {}", workerRegistryPath); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java index 7aa8a23033..4883beca40 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.remote.command.task.TaskDispatchMessage; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics; +import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient; import org.apache.dolphinscheduler.server.worker.runner.WorkerDelayTaskExecuteRunnable; @@ -71,6 +72,9 @@ public class TaskDispatchProcessor implements NettyRequestProcessor { @Autowired(required = false) private StorageOperate storageOperate; + @Autowired + private WorkerRegistryClient workerRegistryClient; + @Counted(value = "ds.task.execution.count", description = "task execute total count") @Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true) @Override @@ -115,7 +119,8 @@ public class TaskDispatchProcessor implements NettyRequestProcessor { workerMessageSender, workerRpcClient, taskPluginManager, - storageOperate) + storageOperate, + workerRegistryClient) .createWorkerTaskExecuteRunnable(); // submit task to manager boolean offer = workerManager.offer(workerTaskExecuteRunnable); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java index 4a67307144..4a049f7d90 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java @@ -21,23 +21,30 @@ import static org.apache.dolphinscheduler.common.constants.Constants.SLEEP_TIME_ import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.RegistryException; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; +import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.worker.task.WorkerHeartBeatTask; +import org.apache.commons.collections4.CollectionUtils; + import java.io.IOException; +import java.util.List; +import java.util.Optional; import javax.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; @Slf4j @@ -54,6 +61,7 @@ public class WorkerRegistryClient implements AutoCloseable { private RegistryClient registryClient; @Autowired + @Lazy private WorkerConnectStrategy workerConnectStrategy; private WorkerHeartBeatTask workerHeartBeatTask; @@ -87,7 +95,7 @@ public class WorkerRegistryClient implements AutoCloseable { registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString(workerHeartBeat)); log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(), workerZKPath); - while (!registryClient.checkNodeExists(workerConfig.getWorkerAddress(), NodeType.WORKER)) { + while (!registryClient.checkNodeExists(workerConfig.getWorkerAddress(), RegistryNodeType.WORKER)) { ThreadUtils.sleep(SLEEP_TIME_MILLIS); } @@ -98,6 +106,15 @@ public class WorkerRegistryClient implements AutoCloseable { log.info("Worker node: {} registry finished", workerConfig.getWorkerAddress()); } + public Optional getAlertServerAddress() { + List serverList = registryClient.getServerList(RegistryNodeType.ALERT_SERVER); + if (CollectionUtils.isEmpty(serverList)) { + return Optional.empty(); + } + Server server = serverList.get(0); + return Optional.of(new Host(server.getHost(), server.getPort())); + } + public void setRegistryStoppable(IStoppable stoppable) { registryClient.setStoppable(stoppable); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java index 1f5c021717..b8b2328774 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient; @@ -37,13 +38,15 @@ public class DefaultWorkerDelayTaskExecuteRunnable extends WorkerDelayTaskExecut @NonNull WorkerMessageSender workerMessageSender, @NonNull WorkerRpcClient workerRpcClient, @NonNull TaskPluginManager taskPluginManager, - @Nullable StorageOperate storageOperate) { + @Nullable StorageOperate storageOperate, + @NonNull WorkerRegistryClient workerRegistryClient) { super(taskExecutionContext, workerConfig, workerMessageSender, workerRpcClient, taskPluginManager, - storageOperate); + storageOperate, + workerRegistryClient); } @Override diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java index 58f406a725..5fd6ae7b53 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient; @@ -37,13 +38,15 @@ public class DefaultWorkerDelayTaskExecuteRunnableFactory @NonNull WorkerMessageSender workerMessageSender, @NonNull WorkerRpcClient workerRpcClient, @NonNull TaskPluginManager taskPluginManager, - @Nullable StorageOperate storageOperate) { + @Nullable StorageOperate storageOperate, + @NonNull WorkerRegistryClient workerRegistryClient) { super(taskExecutionContext, workerConfig, workerMessageSender, workerRpcClient, taskPluginManager, - storageOperate); + storageOperate, + workerRegistryClient); } @Override @@ -54,6 +57,7 @@ public class DefaultWorkerDelayTaskExecuteRunnableFactory workerMessageSender, workerRpcClient, taskPluginManager, - storageOperate); + storageOperate, + workerRegistryClient); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java index bed6ab4d90..2bb09ac07d 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient; @@ -39,13 +40,15 @@ public abstract class WorkerDelayTaskExecuteRunnable extends WorkerTaskExecuteRu @NonNull WorkerMessageSender workerMessageSender, @NonNull WorkerRpcClient workerRpcClient, @NonNull TaskPluginManager taskPluginManager, - @Nullable StorageOperate storageOperate) { + @Nullable StorageOperate storageOperate, + @NonNull WorkerRegistryClient workerRegistryClient) { super(taskExecutionContext, workerConfig, workerMessageSender, workerRpcClient, taskPluginManager, - storageOperate); + storageOperate, + workerRegistryClient); } @Override diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java index c16dff395b..ce8635ed48 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient; @@ -38,6 +39,7 @@ public abstract class WorkerDelayTaskExecuteRunnableFactory alertServerAddressOptional = workerRegistryClient.getAlertServerAddress(); + if (!alertServerAddressOptional.isPresent()) { + log.error("Cannot get alert server address, please check the alert server is running"); + return; + } + Host alertServerAddress = alertServerAddressOptional.get(); + log.info("The current task need to send alert, begin to send alert"); TaskExecutionStatus status = task.getExitStatus(); TaskAlertInfo taskAlertInfo = task.getTaskAlertInfo(); @@ -238,10 +252,10 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { taskAlertInfo.getContent(), strategy); try { - workerRpcClient.send(Host.of(workerConfig.getAlertListenHost()), alertCommand.convert2Command()); - log.info("Success send alert"); + workerRpcClient.send(alertServerAddress, alertCommand.convert2Command()); + log.info("Success send alert to : {}", alertServerAddress); } catch (RemotingException e) { - log.error("Send alert failed, alertCommand: {}", alertCommand, e); + log.error("Send alert to: {} failed, alertCommand: {}", alertServerAddress, alertCommand, e); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java index a89882d961..33b1a9b755 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient; @@ -37,13 +38,15 @@ public class WorkerTaskExecuteRunnableFactoryBuilder { @NonNull WorkerMessageSender workerMessageSender, @NonNull WorkerRpcClient workerRpcClient, @NonNull TaskPluginManager taskPluginManager, - @Nullable StorageOperate storageOperate) { + @Nullable StorageOperate storageOperate, + @NonNull WorkerRegistryClient workerRegistryClient) { return new DefaultWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext, workerConfig, workerMessageSender, workerRpcClient, taskPluginManager, - storageOperate); + storageOperate, + workerRegistryClient); } } diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml b/dolphinscheduler-worker/src/main/resources/application.yaml index a73f829070..21fbf4c293 100644 --- a/dolphinscheduler-worker/src/main/resources/application.yaml +++ b/dolphinscheduler-worker/src/main/resources/application.yaml @@ -52,9 +52,6 @@ worker: max-cpu-load-avg: -1 # worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G reserved-memory: 0.3 - # alert server listen host - alert-listen-host: localhost - alert-listen-port: 50052 registry-disconnect-strategy: # The disconnect strategy: stop, waiting strategy: waiting diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java index 9e864a13a7..d55ccc25af 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java @@ -19,9 +19,9 @@ package org.apache.dolphinscheduler.server.worker.registry; import static org.mockito.BDDMockito.given; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; @@ -83,7 +83,8 @@ public class WorkerRegistryClientTest { given(workerConfig.getWorkerAddress()).willReturn(NetUtils.getAddr(1234)); given(workerConfig.getHeartbeatInterval()).willReturn(Duration.ofSeconds(1)); - given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(NodeType.class))).willReturn(true); + given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(RegistryNodeType.class))) + .willReturn(true); workerRegistryClient.initWorkRegistry(); workerRegistryClient.start(); diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java index 74b8d122d4..29d30167d3 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient; @@ -46,6 +47,8 @@ public class DefaultWorkerDelayTaskExecuteRunnableTest { private StorageOperate storageOperate = Mockito.mock(StorageOperate.class); + private WorkerRegistryClient workerRegistryClient = Mockito.mock(WorkerRegistryClient.class); + @Test public void testDryRun() { TaskExecutionContext taskExecutionContext = TaskExecutionContext.builder() @@ -60,7 +63,8 @@ public class DefaultWorkerDelayTaskExecuteRunnableTest { workerMessageSender, alertClientService, taskPluginManager, - storageOperate); + storageOperate, + workerRegistryClient); Assertions.assertAll(workerTaskExecuteRunnable::run); Assertions.assertEquals(TaskExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus()); @@ -84,7 +88,8 @@ public class DefaultWorkerDelayTaskExecuteRunnableTest { workerMessageSender, alertClientService, taskPluginManager, - storageOperate); + storageOperate, + workerRegistryClient); Assertions.assertAll(workerTaskExecuteRunnable::run); Assertions.assertEquals(TaskExecutionStatus.FAILURE, taskExecutionContext.getCurrentExecutionStatus());