From cbd354d134ca32194c91c85f8ea3ffe615bd5536 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Thu, 20 Jul 2023 21:26:35 +0800 Subject: [PATCH] Add host/port in heartbeat (#14591) --- docs/docs/en/architecture/configuration.md | 1 + docs/docs/zh/architecture/configuration.md | 2 + .../alert/registry/AlertHeartbeatTask.java | 4 +- .../common/model/AlertServerHeartBeat.java | 4 +- .../common/model/HeartBeat.java | 5 + .../common/model/MasterHeartBeat.java | 3 + .../common/model/WorkerHeartBeat.java | 3 + .../common/utils/NetUtils.java | 284 ++++++++++-------- .../common/utils/NetUtilsTest.java | 19 +- .../master/task/MasterHeartBeatTask.java | 3 + .../registry/api/RegistryClient.java | 14 +- .../worker/task/WorkerHeartBeatTask.java | 3 + 12 files changed, 200 insertions(+), 145 deletions(-) diff --git a/docs/docs/en/architecture/configuration.md b/docs/docs/en/architecture/configuration.md index a9316aa9f8..08b6ac4011 100644 --- a/docs/docs/en/architecture/configuration.md +++ b/docs/docs/en/architecture/configuration.md @@ -129,6 +129,7 @@ export DOLPHINSCHEDULER_OPTS=" ``` > "-XX:DisableExplicitGC" is not recommended due to may lead to memory link (DolphinScheduler dependent on Netty to communicate). +> If add "-Djava.net.preferIPv6Addresses=true" will use ipv6 address, if add "-Djava.net.preferIPv4Addresses=true" will use ipv4 address, if doesn't set the two parameter will use ipv4 or ipv6. ### Database connection related configuration diff --git a/docs/docs/zh/architecture/configuration.md b/docs/docs/zh/architecture/configuration.md index f50dae2f8b..5a39ff725c 100644 --- a/docs/docs/zh/architecture/configuration.md +++ b/docs/docs/zh/architecture/configuration.md @@ -129,6 +129,8 @@ export DOLPHINSCHEDULER_OPTS=" ``` > 不建议设置"-XX:DisableExplicitGC" , DolphinScheduler使用Netty进行通讯,设置该参数,可能会导致内存泄漏. +> +>> 如果设置"-Djava.net.preferIPv6Addresses=true" 将会使用ipv6的IP地址, 如果设置"-Djava.net.preferIPv4Addresses=true"将会使用ipv4的IP地址, 如果都不设置,将会随机使用ipv4或者ipv6. ## 数据库连接相关配置 diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java index 31b30ae42b..150b8dbe0c 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.alert.config.AlertConfig; import org.apache.dolphinscheduler.common.model.AlertServerHeartBeat; import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; @@ -59,7 +60,8 @@ public class AlertHeartbeatTask extends BaseHeartBeatTask .cpuUsage(OSUtils.cpuUsagePercentage()) .memoryUsage(OSUtils.memoryUsagePercentage()) .availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize()) - .alertServerAddress(alertConfig.getAlertServerAddress()) + .host(NetUtils.getHost()) + .port(alertConfig.getPort()) .build(); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java index 4d3610b7fe..9273b77a1c 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java @@ -34,5 +34,7 @@ public class AlertServerHeartBeat implements HeartBeat { private double cpuUsage; private double memoryUsage; private double availablePhysicalMemorySize; - private String alertServerAddress; + + private String host; + private int port; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java index 1a1d1610e4..218afa245e 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java @@ -18,4 +18,9 @@ package org.apache.dolphinscheduler.common.model; public interface HeartBeat { + + String getHost(); + + int getPort(); + } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java index b6086b9bbe..52c96defc6 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java @@ -36,4 +36,7 @@ public class MasterHeartBeat implements HeartBeat { private double reservedMemory; private double diskAvailable; private int processId; + + private String host; + private int port; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java index dcacd75c6c..d3843d2783 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java @@ -39,6 +39,9 @@ public class WorkerHeartBeat implements HeartBeat { private int serverStatus; private int processId; + private String host; + private int port; + private int workerHostWeight; // worker host weight private int workerWaitingTaskCount; // worker waiting task count private int workerExecThreadCount; // worker thread pool thread count diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java index da265219d8..161ee4698d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java @@ -17,24 +17,25 @@ package org.apache.dolphinscheduler.common.utils; -import static java.util.Collections.emptyList; - import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.http.conn.util.InetAddressUtils; import java.io.IOException; +import java.net.Inet4Address; import java.net.Inet6Address; import java.net.InetAddress; import java.net.NetworkInterface; import java.net.SocketException; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; import java.util.Enumeration; import java.util.LinkedList; import java.util.List; -import java.util.Objects; -import java.util.Optional; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -106,7 +107,8 @@ public class NetUtils { if (null != LOCAL_ADDRESS) { return LOCAL_ADDRESS; } - return getLocalAddress0(); + LOCAL_ADDRESS = getLocalAddress0(); + return LOCAL_ADDRESS; } /** @@ -115,55 +117,12 @@ public class NetUtils { * @return first valid local IP */ private static synchronized InetAddress getLocalAddress0() { - if (null != LOCAL_ADDRESS) { - return LOCAL_ADDRESS; + List suitableNetworkInterface = findSuitableNetworkInterface(); + List suitableInetAddress = findSuitableInetAddress(suitableNetworkInterface); + if (CollectionUtils.isEmpty(suitableInetAddress)) { + return null; } - - InetAddress localAddress = null; - try { - Optional networkInterface = findNetworkInterface(); - if (networkInterface.isPresent()) { - Enumeration addresses = networkInterface.get().getInetAddresses(); - while (addresses.hasMoreElements()) { - Optional addressOp = toValidAddress(addresses.nextElement()); - if (addressOp.isPresent()) { - try { - if (addressOp.get().isReachable(200)) { - LOCAL_ADDRESS = addressOp.get(); - return LOCAL_ADDRESS; - } - } catch (IOException e) { - log.warn("test address id reachable io exception", e); - } - } - } - - } - localAddress = InetAddress.getLocalHost(); - } catch (UnknownHostException e) { - log.warn("InetAddress get LocalHost exception", e); - } - Optional addressOp = toValidAddress(localAddress); - if (addressOp.isPresent()) { - LOCAL_ADDRESS = addressOp.get(); - } - return LOCAL_ADDRESS; - } - - private static Optional toValidAddress(InetAddress address) { - if (address instanceof Inet6Address) { - Inet6Address v6Address = (Inet6Address) address; - if (isPreferIPV6Address()) { - InetAddress inetAddress = normalizeV6Address(v6Address); - log.debug("The host prefer ipv6 address, will use ipv6 address: {} directly", inetAddress); - return Optional.ofNullable(inetAddress); - } - } - if (isValidV4Address(address)) { - return Optional.of(address); - } - log.warn("The address of the host is invalid, address: {}", address); - return Optional.empty(); + return suitableInetAddress.get(0); } private static InetAddress normalizeV6Address(Inet6Address address) { @@ -179,9 +138,8 @@ public class NetUtils { return address; } - public static boolean isValidV4Address(InetAddress address) { - - if (address == null || address.isLoopbackAddress()) { + protected static boolean isValidV4Address(InetAddress address) { + if (!(address instanceof Inet4Address)) { return false; } String name = address.getHostAddress(); @@ -191,6 +149,17 @@ public class NetUtils { && !address.isLoopbackAddress()); } + protected static boolean isValidV6Address(InetAddress address) { + if (!(address instanceof Inet6Address)) { + return false; + } + String name = address.getHostAddress(); + return (name != null + && InetAddressUtils.isIPv6Address(name) + && !address.isAnyLocalAddress() + && !address.isLoopbackAddress()); + } + /** * Check if an ipv6 address * @@ -200,79 +169,142 @@ public class NetUtils { return Boolean.getBoolean("java.net.preferIPv6Addresses"); } + private static boolean isPreferIPV4Address() { + return Boolean.getBoolean("java.net.preferIPv4Addresses"); + } + /** * Get the suitable {@link NetworkInterface} - * - * @return If no {@link NetworkInterface} is available , return null */ - private static Optional findNetworkInterface() { - - List validNetworkInterfaces = emptyList(); + private static List findSuitableNetworkInterface() { + // Find all network interfaces + List networkInterfaces = Collections.emptyList(); try { - validNetworkInterfaces = getValidNetworkInterfaces(); + networkInterfaces = getAllNetworkInterfaces(); } catch (SocketException e) { log.warn("ValidNetworkInterfaces exception", e); } - if (CollectionUtils.isEmpty(validNetworkInterfaces)) { - log.warn("ValidNetworkInterfaces is empty"); - return Optional.empty(); - } - // Try to specify config NetWork Interface - Optional specifyNetworkInterface = - validNetworkInterfaces.stream().filter(NetUtils::isSpecifyNetworkInterface).findFirst(); - if (specifyNetworkInterface.isPresent()) { - log.info("Use specified NetworkInterface: {}", specifyNetworkInterface.get()); - return specifyNetworkInterface; + // Filter the loopback/virtual/ network interfaces + List validNetworkInterfaces = networkInterfaces + .stream() + .filter(networkInterface -> { + try { + return !(networkInterface == null + || networkInterface.isLoopback() + || networkInterface.isVirtual() + || !networkInterface.isUp()); + } catch (SocketException e) { + log.warn("ValidNetworkInterfaces exception", e); + return false; + } + }) + .collect(Collectors.toList()); + + // Use the specified network interface if set + if (StringUtils.isNotBlank(specifyNetworkInterfaceName())) { + String specifyNetworkInterfaceName = specifyNetworkInterfaceName(); + validNetworkInterfaces = validNetworkInterfaces.stream() + .filter(networkInterface -> specifyNetworkInterfaceName.equals(networkInterface.getDisplayName())) + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(validNetworkInterfaces)) { + throw new IllegalArgumentException( + "The specified network interface: " + specifyNetworkInterfaceName + " is not found"); + } } - return findAddress(validNetworkInterfaces); + return filterByNetworkPriority(validNetworkInterfaces); } /** - * Get the valid {@link NetworkInterface network interfaces} - * - * @throws SocketException SocketException if an I/O error occurs. + * Get the suitable {@link InetAddress} */ - private static List getValidNetworkInterfaces() throws SocketException { + private static List findSuitableInetAddress(List networkInterfaces) { + if (CollectionUtils.isEmpty(networkInterfaces)) { + return Collections.emptyList(); + } + List allInetAddresses = new LinkedList<>(); + for (NetworkInterface networkInterface : networkInterfaces) { + Enumeration addresses = networkInterface.getInetAddresses(); + while (addresses.hasMoreElements()) { + allInetAddresses.add(addresses.nextElement()); + } + } + // Get prefer addresses + List preferInetAddress = new ArrayList<>(); + if (!isPreferIPV6Address() && !isPreferIPV4Address()) { + // no prefer, will use all addresses + preferInetAddress.addAll(getIpv4Addresses(allInetAddresses)); + preferInetAddress.addAll(getIpv6Addresses(allInetAddresses)); + } + if (isPreferIPV4Address()) { + preferInetAddress.addAll(getIpv4Addresses(allInetAddresses)); + } + if (isPreferIPV6Address()) { + preferInetAddress.addAll(getIpv6Addresses(allInetAddresses)); + } + // Get reachable addresses + return preferInetAddress.stream() + .filter(inetAddress -> { + try { + return inetAddress.isReachable(100); + } catch (IOException e) { + log.warn("InetAddress isReachable exception", e); + return false; + } + }).collect(Collectors.toList()); + } + + private static List getIpv4Addresses(List allInetAddresses) { + if (CollectionUtils.isEmpty(allInetAddresses)) { + return Collections.emptyList(); + } + List validIpv4Addresses = new ArrayList<>(); + for (InetAddress inetAddress : allInetAddresses) { + if (isValidV4Address(inetAddress)) { + validIpv4Addresses.add(inetAddress); + } + } + return validIpv4Addresses; + } + + private static List getIpv6Addresses(List allInetAddresses) { + if (CollectionUtils.isEmpty(allInetAddresses)) { + return Collections.emptyList(); + } + List validIpv6Addresses = new ArrayList<>(); + for (InetAddress inetAddress : allInetAddresses) { + if (!isValidV6Address(inetAddress)) { + continue; + } + Inet6Address v6Address = (Inet6Address) inetAddress; + InetAddress normalizedV6Address = normalizeV6Address(v6Address); + validIpv6Addresses.add(normalizedV6Address); + } + return validIpv6Addresses; + } + + private static List getAllNetworkInterfaces() throws SocketException { List validNetworkInterfaces = new LinkedList<>(); Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); while (interfaces.hasMoreElements()) { NetworkInterface networkInterface = interfaces.nextElement(); - // ignore - if (ignoreNetworkInterface(networkInterface)) { - log.debug("Info NetworkInterface: {}", networkInterface); - continue; - } - log.info("Found valid NetworkInterface: {}", networkInterface); + log.info("Found NetworkInterface: {}", networkInterface); validNetworkInterfaces.add(networkInterface); } return validNetworkInterfaces; } - /** - * @param networkInterface {@link NetworkInterface} - * @return if the specified {@link NetworkInterface} should be ignored, return true - * @throws SocketException SocketException if an I/O error occurs. - */ - public static boolean ignoreNetworkInterface(NetworkInterface networkInterface) throws SocketException { - return networkInterface == null - || networkInterface.isLoopback() - || networkInterface.isVirtual() - || !networkInterface.isUp(); - } - - private static boolean isSpecifyNetworkInterface(NetworkInterface networkInterface) { - String preferredNetworkInterface = - PropertyUtils.getString(Constants.DOLPHIN_SCHEDULER_NETWORK_INTERFACE_PREFERRED, - System.getProperty(Constants.DOLPHIN_SCHEDULER_NETWORK_INTERFACE_PREFERRED)); - return Objects.equals(networkInterface.getDisplayName(), preferredNetworkInterface); + private static String specifyNetworkInterfaceName() { + return PropertyUtils.getString( + Constants.DOLPHIN_SCHEDULER_NETWORK_INTERFACE_PREFERRED, + System.getProperty(Constants.DOLPHIN_SCHEDULER_NETWORK_INTERFACE_PREFERRED)); } - private static Optional findAddress(List validNetworkInterfaces) { + private static List filterByNetworkPriority(List validNetworkInterfaces) { if (CollectionUtils.isEmpty(validNetworkInterfaces)) { - return Optional.empty(); + return Collections.emptyList(); } String networkPriority = PropertyUtils.getString(Constants.DOLPHIN_SCHEDULER_NETWORK_PRIORITY_STRATEGY, NETWORK_PRIORITY_DEFAULT); @@ -282,28 +314,21 @@ public class NetUtils { return findAddressByDefaultPolicy(validNetworkInterfaces); case NETWORK_PRIORITY_INNER: log.debug("Use inner NetworkInterface acquisition policy"); - return findInnerAddress(validNetworkInterfaces); + return findInnerAddressNetWorkInterface(validNetworkInterfaces); case NETWORK_PRIORITY_OUTER: log.debug("Use outer NetworkInterface acquisition policy"); - return findOuterAddress(validNetworkInterfaces); + return findOuterAddressNetworkInterface(validNetworkInterfaces); default: log.error("There is no matching network card acquisition policy!"); - return Optional.empty(); + return Collections.emptyList(); } } - private static Optional findAddressByDefaultPolicy(List validNetworkInterfaces) { - Optional innerAddress = findInnerAddress(validNetworkInterfaces); - if (innerAddress.isPresent()) { - log.debug("Found inner NetworkInterface: {}", innerAddress.get()); - return innerAddress; - } - Optional outerAddress = findOuterAddress(validNetworkInterfaces); - if (outerAddress.isPresent()) { - log.debug("Found outer NetworkInterface: {}", outerAddress.get()); - return outerAddress; - } - return Optional.empty(); + private static List findAddressByDefaultPolicy(List validNetworkInterfaces) { + List allAddress = new ArrayList<>(); + allAddress.addAll(findInnerAddressNetWorkInterface(validNetworkInterfaces)); + allAddress.addAll(findOuterAddressNetworkInterface(validNetworkInterfaces)); + return allAddress; } /** @@ -311,39 +336,40 @@ public class NetUtils { * * @return If no {@link NetworkInterface} is available , return null */ - private static Optional findInnerAddress(List validNetworkInterfaces) { + private static List findInnerAddressNetWorkInterface(List validNetworkInterfaces) { if (CollectionUtils.isEmpty(validNetworkInterfaces)) { - return Optional.empty(); + return Collections.emptyList(); } + List innerNetworkInterfaces = new ArrayList<>(); for (NetworkInterface ni : validNetworkInterfaces) { Enumeration address = ni.getInetAddresses(); while (address.hasMoreElements()) { InetAddress ip = address.nextElement(); - if (ip.isSiteLocalAddress() - && !ip.isLoopbackAddress()) { - return Optional.of(ni); + if (ip.isSiteLocalAddress() && !ip.isLoopbackAddress()) { + innerNetworkInterfaces.add(ni); } } } - return Optional.empty(); + return innerNetworkInterfaces; } - private static Optional findOuterAddress(List validNetworkInterfaces) { + private static List findOuterAddressNetworkInterface(List validNetworkInterfaces) { if (CollectionUtils.isEmpty(validNetworkInterfaces)) { - return Optional.empty(); + return Collections.emptyList(); } + + List outerNetworkInterfaces = new ArrayList<>(); for (NetworkInterface ni : validNetworkInterfaces) { Enumeration address = ni.getInetAddresses(); while (address.hasMoreElements()) { InetAddress ip = address.nextElement(); - if (!ip.isSiteLocalAddress() - && !ip.isLoopbackAddress()) { - return Optional.of(ni); + if (!ip.isSiteLocalAddress() && !ip.isLoopbackAddress()) { + outerNetworkInterfaces.add(ni); } } } - return Optional.empty(); + return outerNetworkInterfaces; } } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java index 8a17395784..f3e759e585 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.when; +import java.net.Inet4Address; import java.net.InetAddress; import org.junit.jupiter.api.Assertions; @@ -84,33 +85,33 @@ public class NetUtilsTest { @Test public void testIsValidAddress() { Assertions.assertFalse(NetUtils.isValidV4Address(null)); - InetAddress address = mock(InetAddress.class); + Inet4Address address = mock(Inet4Address.class); when(address.isLoopbackAddress()).thenReturn(true); Assertions.assertFalse(NetUtils.isValidV4Address(address)); - address = mock(InetAddress.class); + address = mock(Inet4Address.class); when(address.getHostAddress()).thenReturn("localhost"); Assertions.assertFalse(NetUtils.isValidV4Address(address)); - address = mock(InetAddress.class); + address = mock(Inet4Address.class); when(address.getHostAddress()).thenReturn("0.0.0.0"); when(address.isAnyLocalAddress()).thenReturn(true); Assertions.assertFalse(NetUtils.isValidV4Address(address)); - address = mock(InetAddress.class); + address = mock(Inet4Address.class); when(address.getHostAddress()).thenReturn("127.0.0.1"); when(address.isLoopbackAddress()).thenReturn(true); Assertions.assertFalse(NetUtils.isValidV4Address(address)); - address = mock(InetAddress.class); + address = mock(Inet4Address.class); when(address.getHostAddress()).thenReturn("1.2.3.4"); Assertions.assertTrue(NetUtils.isValidV4Address(address)); - address = mock(InetAddress.class); + address = mock(Inet4Address.class); when(address.getHostAddress()).thenReturn("1.2.3.4:80"); Assertions.assertFalse(NetUtils.isValidV4Address(address)); - address = mock(InetAddress.class); + address = mock(Inet4Address.class); when(address.getHostAddress()).thenReturn("256.0.0.1"); Assertions.assertFalse(NetUtils.isValidV4Address(address)); - address = mock(InetAddress.class); + address = mock(Inet4Address.class); when(address.getHostAddress()).thenReturn("127.0.0.0.1"); Assertions.assertFalse(NetUtils.isValidV4Address(address)); - address = mock(InetAddress.class); + address = mock(Inet4Address.class); when(address.getHostAddress()).thenReturn("-1.2.3.4"); Assertions.assertFalse(NetUtils.isValidV4Address(address)); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java index a28b026192..824d4778ab 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask; import org.apache.dolphinscheduler.common.model.MasterHeartBeat; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.server.master.config.MasterConfig; @@ -59,6 +60,8 @@ public class MasterHeartBeatTask extends BaseHeartBeatTask { .memoryUsage(OSUtils.memoryUsagePercentage()) .diskAvailable(OSUtils.diskAvailable()) .processId(processId) + .host(NetUtils.getHost()) + .port(masterConfig.getListenPort()) .build(); } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java index 6b8046b6e2..ddd80d94a1 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java @@ -90,12 +90,16 @@ public class RegistryClient { server.setCreateTime(new Date(masterHeartBeat.getStartupTime())); server.setLastHeartbeatTime(new Date(masterHeartBeat.getReportTime())); server.setId(masterHeartBeat.getProcessId()); + server.setHost(masterHeartBeat.getHost()); + server.setPort(masterHeartBeat.getPort()); break; case WORKER: WorkerHeartBeat workerHeartBeat = JSONUtils.parseObject(heartBeatJson, WorkerHeartBeat.class); server.setCreateTime(new Date(workerHeartBeat.getStartupTime())); server.setLastHeartbeatTime(new Date(workerHeartBeat.getReportTime())); server.setId(workerHeartBeat.getProcessId()); + server.setHost(workerHeartBeat.getHost()); + server.setPort(workerHeartBeat.getPort()); break; case ALERT_SERVER: AlertServerHeartBeat alertServerHeartBeat = @@ -103,16 +107,16 @@ public class RegistryClient { server.setCreateTime(new Date(alertServerHeartBeat.getStartupTime())); server.setLastHeartbeatTime(new Date(alertServerHeartBeat.getReportTime())); server.setId(alertServerHeartBeat.getProcessId()); + server.setHost(alertServerHeartBeat.getHost()); + server.setPort(alertServerHeartBeat.getPort()); + break; + default: + log.warn("unknown registry node type: {}", registryNodeType); } server.setResInfo(heartBeatJson); // todo: add host, port in heartBeat Info, so that we don't need to parse this again server.setZkDirectory(registryNodeType.getRegistryPath() + "/" + serverPath); - // set host and port - int lastColonIndex = serverPath.lastIndexOf(":"); - // fetch the last one - server.setHost(serverPath.substring(0, lastColonIndex)); - server.setPort(Integer.parseInt(serverPath.substring(lastColonIndex + 1))); serverList.add(server); } return serverList; diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java index 7b0dbe60e7..bb9c77e849 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask; import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -75,6 +76,8 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask { .workerWaitingTaskCount(this.workerWaitingTaskCount.get()) .workerExecThreadCount(workerConfig.getExecThreads()) .serverStatus(serverStatus) + .host(NetUtils.getHost()) + .port(workerConfig.getListenPort()) .build(); }