Browse Source

Add host/port in heartbeat (#14591)

3.2.1-prepare
Wenjun Ruan 1 year ago committed by GitHub
parent
commit
cbd354d134
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      docs/docs/en/architecture/configuration.md
  2. 2
      docs/docs/zh/architecture/configuration.md
  3. 4
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java
  4. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java
  5. 5
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java
  6. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
  7. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
  8. 278
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java
  9. 19
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java
  10. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
  11. 14
      dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
  12. 3
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java

1
docs/docs/en/architecture/configuration.md

@ -129,6 +129,7 @@ export DOLPHINSCHEDULER_OPTS="
``` ```
> "-XX:DisableExplicitGC" is not recommended due to may lead to memory link (DolphinScheduler dependent on Netty to communicate). > "-XX:DisableExplicitGC" is not recommended due to may lead to memory link (DolphinScheduler dependent on Netty to communicate).
> If add "-Djava.net.preferIPv6Addresses=true" will use ipv6 address, if add "-Djava.net.preferIPv4Addresses=true" will use ipv4 address, if doesn't set the two parameter will use ipv4 or ipv6.
### Database connection related configuration ### Database connection related configuration

2
docs/docs/zh/architecture/configuration.md

@ -129,6 +129,8 @@ export DOLPHINSCHEDULER_OPTS="
``` ```
> 不建议设置"-XX:DisableExplicitGC" , DolphinScheduler使用Netty进行通讯,设置该参数,可能会导致内存泄漏. > 不建议设置"-XX:DisableExplicitGC" , DolphinScheduler使用Netty进行通讯,设置该参数,可能会导致内存泄漏.
>
>> 如果设置"-Djava.net.preferIPv6Addresses=true" 将会使用ipv6的IP地址, 如果设置"-Djava.net.preferIPv4Addresses=true"将会使用ipv4的IP地址, 如果都不设置,将会随机使用ipv4或者ipv6.
## 数据库连接相关配置 ## 数据库连接相关配置

4
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.common.model.AlertServerHeartBeat; import org.apache.dolphinscheduler.common.model.AlertServerHeartBeat;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask; import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
@ -59,7 +60,8 @@ public class AlertHeartbeatTask extends BaseHeartBeatTask<AlertServerHeartBeat>
.cpuUsage(OSUtils.cpuUsagePercentage()) .cpuUsage(OSUtils.cpuUsagePercentage())
.memoryUsage(OSUtils.memoryUsagePercentage()) .memoryUsage(OSUtils.memoryUsagePercentage())
.availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize()) .availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize())
.alertServerAddress(alertConfig.getAlertServerAddress()) .host(NetUtils.getHost())
.port(alertConfig.getPort())
.build(); .build();
} }

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java

@ -34,5 +34,7 @@ public class AlertServerHeartBeat implements HeartBeat {
private double cpuUsage; private double cpuUsage;
private double memoryUsage; private double memoryUsage;
private double availablePhysicalMemorySize; private double availablePhysicalMemorySize;
private String alertServerAddress;
private String host;
private int port;
} }

5
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java

@ -18,4 +18,9 @@
package org.apache.dolphinscheduler.common.model; package org.apache.dolphinscheduler.common.model;
public interface HeartBeat { public interface HeartBeat {
String getHost();
int getPort();
} }

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java

@ -36,4 +36,7 @@ public class MasterHeartBeat implements HeartBeat {
private double reservedMemory; private double reservedMemory;
private double diskAvailable; private double diskAvailable;
private int processId; private int processId;
private String host;
private int port;
} }

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java

@ -39,6 +39,9 @@ public class WorkerHeartBeat implements HeartBeat {
private int serverStatus; private int serverStatus;
private int processId; private int processId;
private String host;
private int port;
private int workerHostWeight; // worker host weight private int workerHostWeight; // worker host weight
private int workerWaitingTaskCount; // worker waiting task count private int workerWaitingTaskCount; // worker waiting task count
private int workerExecThreadCount; // worker thread pool thread count private int workerExecThreadCount; // worker thread pool thread count

278
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java

@ -17,24 +17,25 @@
package org.apache.dolphinscheduler.common.utils; package org.apache.dolphinscheduler.common.utils;
import static java.util.Collections.emptyList;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.conn.util.InetAddressUtils; import org.apache.http.conn.util.InetAddressUtils;
import java.io.IOException; import java.io.IOException;
import java.net.Inet4Address;
import java.net.Inet6Address; import java.net.Inet6Address;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.NetworkInterface; import java.net.NetworkInterface;
import java.net.SocketException; import java.net.SocketException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.stream.Collectors;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -106,7 +107,8 @@ public class NetUtils {
if (null != LOCAL_ADDRESS) { if (null != LOCAL_ADDRESS) {
return LOCAL_ADDRESS; return LOCAL_ADDRESS;
} }
return getLocalAddress0(); LOCAL_ADDRESS = getLocalAddress0();
return LOCAL_ADDRESS;
} }
/** /**
@ -115,55 +117,12 @@ public class NetUtils {
* @return first valid local IP * @return first valid local IP
*/ */
private static synchronized InetAddress getLocalAddress0() { private static synchronized InetAddress getLocalAddress0() {
if (null != LOCAL_ADDRESS) { List<NetworkInterface> suitableNetworkInterface = findSuitableNetworkInterface();
return LOCAL_ADDRESS; List<InetAddress> suitableInetAddress = findSuitableInetAddress(suitableNetworkInterface);
} if (CollectionUtils.isEmpty(suitableInetAddress)) {
return null;
InetAddress localAddress = null;
try {
Optional<NetworkInterface> networkInterface = findNetworkInterface();
if (networkInterface.isPresent()) {
Enumeration<InetAddress> addresses = networkInterface.get().getInetAddresses();
while (addresses.hasMoreElements()) {
Optional<InetAddress> addressOp = toValidAddress(addresses.nextElement());
if (addressOp.isPresent()) {
try {
if (addressOp.get().isReachable(200)) {
LOCAL_ADDRESS = addressOp.get();
return LOCAL_ADDRESS;
}
} catch (IOException e) {
log.warn("test address id reachable io exception", e);
}
}
}
}
localAddress = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
log.warn("InetAddress get LocalHost exception", e);
}
Optional<InetAddress> addressOp = toValidAddress(localAddress);
if (addressOp.isPresent()) {
LOCAL_ADDRESS = addressOp.get();
}
return LOCAL_ADDRESS;
}
private static Optional<InetAddress> toValidAddress(InetAddress address) {
if (address instanceof Inet6Address) {
Inet6Address v6Address = (Inet6Address) address;
if (isPreferIPV6Address()) {
InetAddress inetAddress = normalizeV6Address(v6Address);
log.debug("The host prefer ipv6 address, will use ipv6 address: {} directly", inetAddress);
return Optional.ofNullable(inetAddress);
}
}
if (isValidV4Address(address)) {
return Optional.of(address);
} }
log.warn("The address of the host is invalid, address: {}", address); return suitableInetAddress.get(0);
return Optional.empty();
} }
private static InetAddress normalizeV6Address(Inet6Address address) { private static InetAddress normalizeV6Address(Inet6Address address) {
@ -179,9 +138,8 @@ public class NetUtils {
return address; return address;
} }
public static boolean isValidV4Address(InetAddress address) { protected static boolean isValidV4Address(InetAddress address) {
if (!(address instanceof Inet4Address)) {
if (address == null || address.isLoopbackAddress()) {
return false; return false;
} }
String name = address.getHostAddress(); String name = address.getHostAddress();
@ -191,6 +149,17 @@ public class NetUtils {
&& !address.isLoopbackAddress()); && !address.isLoopbackAddress());
} }
protected static boolean isValidV6Address(InetAddress address) {
if (!(address instanceof Inet6Address)) {
return false;
}
String name = address.getHostAddress();
return (name != null
&& InetAddressUtils.isIPv6Address(name)
&& !address.isAnyLocalAddress()
&& !address.isLoopbackAddress());
}
/** /**
* Check if an ipv6 address * Check if an ipv6 address
* *
@ -200,79 +169,142 @@ public class NetUtils {
return Boolean.getBoolean("java.net.preferIPv6Addresses"); return Boolean.getBoolean("java.net.preferIPv6Addresses");
} }
private static boolean isPreferIPV4Address() {
return Boolean.getBoolean("java.net.preferIPv4Addresses");
}
/** /**
* Get the suitable {@link NetworkInterface} * Get the suitable {@link NetworkInterface}
*
* @return If no {@link NetworkInterface} is available , return <code>null</code>
*/ */
private static Optional<NetworkInterface> findNetworkInterface() { private static List<NetworkInterface> findSuitableNetworkInterface() {
List<NetworkInterface> validNetworkInterfaces = emptyList(); // Find all network interfaces
List<NetworkInterface> networkInterfaces = Collections.emptyList();
try {
networkInterfaces = getAllNetworkInterfaces();
} catch (SocketException e) {
log.warn("ValidNetworkInterfaces exception", e);
}
// Filter the loopback/virtual/ network interfaces
List<NetworkInterface> validNetworkInterfaces = networkInterfaces
.stream()
.filter(networkInterface -> {
try { try {
validNetworkInterfaces = getValidNetworkInterfaces(); return !(networkInterface == null
|| networkInterface.isLoopback()
|| networkInterface.isVirtual()
|| !networkInterface.isUp());
} catch (SocketException e) { } catch (SocketException e) {
log.warn("ValidNetworkInterfaces exception", e); log.warn("ValidNetworkInterfaces exception", e);
return false;
} }
})
.collect(Collectors.toList());
// Use the specified network interface if set
if (StringUtils.isNotBlank(specifyNetworkInterfaceName())) {
String specifyNetworkInterfaceName = specifyNetworkInterfaceName();
validNetworkInterfaces = validNetworkInterfaces.stream()
.filter(networkInterface -> specifyNetworkInterfaceName.equals(networkInterface.getDisplayName()))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(validNetworkInterfaces)) { if (CollectionUtils.isEmpty(validNetworkInterfaces)) {
log.warn("ValidNetworkInterfaces is empty"); throw new IllegalArgumentException(
return Optional.empty(); "The specified network interface: " + specifyNetworkInterfaceName + " is not found");
} }
// Try to specify config NetWork Interface
Optional<NetworkInterface> specifyNetworkInterface =
validNetworkInterfaces.stream().filter(NetUtils::isSpecifyNetworkInterface).findFirst();
if (specifyNetworkInterface.isPresent()) {
log.info("Use specified NetworkInterface: {}", specifyNetworkInterface.get());
return specifyNetworkInterface;
} }
return findAddress(validNetworkInterfaces); return filterByNetworkPriority(validNetworkInterfaces);
} }
/** /**
* Get the valid {@link NetworkInterface network interfaces} * Get the suitable {@link InetAddress}
*
* @throws SocketException SocketException if an I/O error occurs.
*/ */
private static List<NetworkInterface> getValidNetworkInterfaces() throws SocketException { private static List<InetAddress> findSuitableInetAddress(List<NetworkInterface> networkInterfaces) {
if (CollectionUtils.isEmpty(networkInterfaces)) {
return Collections.emptyList();
}
List<InetAddress> allInetAddresses = new LinkedList<>();
for (NetworkInterface networkInterface : networkInterfaces) {
Enumeration<InetAddress> addresses = networkInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
allInetAddresses.add(addresses.nextElement());
}
}
// Get prefer addresses
List<InetAddress> preferInetAddress = new ArrayList<>();
if (!isPreferIPV6Address() && !isPreferIPV4Address()) {
// no prefer, will use all addresses
preferInetAddress.addAll(getIpv4Addresses(allInetAddresses));
preferInetAddress.addAll(getIpv6Addresses(allInetAddresses));
}
if (isPreferIPV4Address()) {
preferInetAddress.addAll(getIpv4Addresses(allInetAddresses));
}
if (isPreferIPV6Address()) {
preferInetAddress.addAll(getIpv6Addresses(allInetAddresses));
}
// Get reachable addresses
return preferInetAddress.stream()
.filter(inetAddress -> {
try {
return inetAddress.isReachable(100);
} catch (IOException e) {
log.warn("InetAddress isReachable exception", e);
return false;
}
}).collect(Collectors.toList());
}
private static List<InetAddress> getIpv4Addresses(List<InetAddress> allInetAddresses) {
if (CollectionUtils.isEmpty(allInetAddresses)) {
return Collections.emptyList();
}
List<InetAddress> validIpv4Addresses = new ArrayList<>();
for (InetAddress inetAddress : allInetAddresses) {
if (isValidV4Address(inetAddress)) {
validIpv4Addresses.add(inetAddress);
}
}
return validIpv4Addresses;
}
private static List<InetAddress> getIpv6Addresses(List<InetAddress> allInetAddresses) {
if (CollectionUtils.isEmpty(allInetAddresses)) {
return Collections.emptyList();
}
List<InetAddress> validIpv6Addresses = new ArrayList<>();
for (InetAddress inetAddress : allInetAddresses) {
if (!isValidV6Address(inetAddress)) {
continue;
}
Inet6Address v6Address = (Inet6Address) inetAddress;
InetAddress normalizedV6Address = normalizeV6Address(v6Address);
validIpv6Addresses.add(normalizedV6Address);
}
return validIpv6Addresses;
}
private static List<NetworkInterface> getAllNetworkInterfaces() throws SocketException {
List<NetworkInterface> validNetworkInterfaces = new LinkedList<>(); List<NetworkInterface> validNetworkInterfaces = new LinkedList<>();
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces(); Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) { while (interfaces.hasMoreElements()) {
NetworkInterface networkInterface = interfaces.nextElement(); NetworkInterface networkInterface = interfaces.nextElement();
// ignore log.info("Found NetworkInterface: {}", networkInterface);
if (ignoreNetworkInterface(networkInterface)) {
log.debug("Info NetworkInterface: {}", networkInterface);
continue;
}
log.info("Found valid NetworkInterface: {}", networkInterface);
validNetworkInterfaces.add(networkInterface); validNetworkInterfaces.add(networkInterface);
} }
return validNetworkInterfaces; return validNetworkInterfaces;
} }
/** private static String specifyNetworkInterfaceName() {
* @param networkInterface {@link NetworkInterface} return PropertyUtils.getString(
* @return if the specified {@link NetworkInterface} should be ignored, return <code>true</code> Constants.DOLPHIN_SCHEDULER_NETWORK_INTERFACE_PREFERRED,
* @throws SocketException SocketException if an I/O error occurs.
*/
public static boolean ignoreNetworkInterface(NetworkInterface networkInterface) throws SocketException {
return networkInterface == null
|| networkInterface.isLoopback()
|| networkInterface.isVirtual()
|| !networkInterface.isUp();
}
private static boolean isSpecifyNetworkInterface(NetworkInterface networkInterface) {
String preferredNetworkInterface =
PropertyUtils.getString(Constants.DOLPHIN_SCHEDULER_NETWORK_INTERFACE_PREFERRED,
System.getProperty(Constants.DOLPHIN_SCHEDULER_NETWORK_INTERFACE_PREFERRED)); System.getProperty(Constants.DOLPHIN_SCHEDULER_NETWORK_INTERFACE_PREFERRED));
return Objects.equals(networkInterface.getDisplayName(), preferredNetworkInterface);
} }
private static Optional<NetworkInterface> findAddress(List<NetworkInterface> validNetworkInterfaces) { private static List<NetworkInterface> filterByNetworkPriority(List<NetworkInterface> validNetworkInterfaces) {
if (CollectionUtils.isEmpty(validNetworkInterfaces)) { if (CollectionUtils.isEmpty(validNetworkInterfaces)) {
return Optional.empty(); return Collections.emptyList();
} }
String networkPriority = PropertyUtils.getString(Constants.DOLPHIN_SCHEDULER_NETWORK_PRIORITY_STRATEGY, String networkPriority = PropertyUtils.getString(Constants.DOLPHIN_SCHEDULER_NETWORK_PRIORITY_STRATEGY,
NETWORK_PRIORITY_DEFAULT); NETWORK_PRIORITY_DEFAULT);
@ -282,28 +314,21 @@ public class NetUtils {
return findAddressByDefaultPolicy(validNetworkInterfaces); return findAddressByDefaultPolicy(validNetworkInterfaces);
case NETWORK_PRIORITY_INNER: case NETWORK_PRIORITY_INNER:
log.debug("Use inner NetworkInterface acquisition policy"); log.debug("Use inner NetworkInterface acquisition policy");
return findInnerAddress(validNetworkInterfaces); return findInnerAddressNetWorkInterface(validNetworkInterfaces);
case NETWORK_PRIORITY_OUTER: case NETWORK_PRIORITY_OUTER:
log.debug("Use outer NetworkInterface acquisition policy"); log.debug("Use outer NetworkInterface acquisition policy");
return findOuterAddress(validNetworkInterfaces); return findOuterAddressNetworkInterface(validNetworkInterfaces);
default: default:
log.error("There is no matching network card acquisition policy!"); log.error("There is no matching network card acquisition policy!");
return Optional.empty(); return Collections.emptyList();
} }
} }
private static Optional<NetworkInterface> findAddressByDefaultPolicy(List<NetworkInterface> validNetworkInterfaces) { private static List<NetworkInterface> findAddressByDefaultPolicy(List<NetworkInterface> validNetworkInterfaces) {
Optional<NetworkInterface> innerAddress = findInnerAddress(validNetworkInterfaces); List<NetworkInterface> allAddress = new ArrayList<>();
if (innerAddress.isPresent()) { allAddress.addAll(findInnerAddressNetWorkInterface(validNetworkInterfaces));
log.debug("Found inner NetworkInterface: {}", innerAddress.get()); allAddress.addAll(findOuterAddressNetworkInterface(validNetworkInterfaces));
return innerAddress; return allAddress;
}
Optional<NetworkInterface> outerAddress = findOuterAddress(validNetworkInterfaces);
if (outerAddress.isPresent()) {
log.debug("Found outer NetworkInterface: {}", outerAddress.get());
return outerAddress;
}
return Optional.empty();
} }
/** /**
@ -311,39 +336,40 @@ public class NetUtils {
* *
* @return If no {@link NetworkInterface} is available , return <code>null</code> * @return If no {@link NetworkInterface} is available , return <code>null</code>
*/ */
private static Optional<NetworkInterface> findInnerAddress(List<NetworkInterface> validNetworkInterfaces) { private static List<NetworkInterface> findInnerAddressNetWorkInterface(List<NetworkInterface> validNetworkInterfaces) {
if (CollectionUtils.isEmpty(validNetworkInterfaces)) { if (CollectionUtils.isEmpty(validNetworkInterfaces)) {
return Optional.empty(); return Collections.emptyList();
} }
List<NetworkInterface> innerNetworkInterfaces = new ArrayList<>();
for (NetworkInterface ni : validNetworkInterfaces) { for (NetworkInterface ni : validNetworkInterfaces) {
Enumeration<InetAddress> address = ni.getInetAddresses(); Enumeration<InetAddress> address = ni.getInetAddresses();
while (address.hasMoreElements()) { while (address.hasMoreElements()) {
InetAddress ip = address.nextElement(); InetAddress ip = address.nextElement();
if (ip.isSiteLocalAddress() if (ip.isSiteLocalAddress() && !ip.isLoopbackAddress()) {
&& !ip.isLoopbackAddress()) { innerNetworkInterfaces.add(ni);
return Optional.of(ni);
} }
} }
} }
return Optional.empty(); return innerNetworkInterfaces;
} }
private static Optional<NetworkInterface> findOuterAddress(List<NetworkInterface> validNetworkInterfaces) { private static List<NetworkInterface> findOuterAddressNetworkInterface(List<NetworkInterface> validNetworkInterfaces) {
if (CollectionUtils.isEmpty(validNetworkInterfaces)) { if (CollectionUtils.isEmpty(validNetworkInterfaces)) {
return Optional.empty(); return Collections.emptyList();
} }
List<NetworkInterface> outerNetworkInterfaces = new ArrayList<>();
for (NetworkInterface ni : validNetworkInterfaces) { for (NetworkInterface ni : validNetworkInterfaces) {
Enumeration<InetAddress> address = ni.getInetAddresses(); Enumeration<InetAddress> address = ni.getInetAddresses();
while (address.hasMoreElements()) { while (address.hasMoreElements()) {
InetAddress ip = address.nextElement(); InetAddress ip = address.nextElement();
if (!ip.isSiteLocalAddress() if (!ip.isSiteLocalAddress() && !ip.isLoopbackAddress()) {
&& !ip.isLoopbackAddress()) { outerNetworkInterfaces.add(ni);
return Optional.of(ni);
} }
} }
} }
return Optional.empty(); return outerNetworkInterfaces;
} }
} }

19
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java

@ -21,6 +21,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.net.Inet4Address;
import java.net.InetAddress; import java.net.InetAddress;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
@ -84,33 +85,33 @@ public class NetUtilsTest {
@Test @Test
public void testIsValidAddress() { public void testIsValidAddress() {
Assertions.assertFalse(NetUtils.isValidV4Address(null)); Assertions.assertFalse(NetUtils.isValidV4Address(null));
InetAddress address = mock(InetAddress.class); Inet4Address address = mock(Inet4Address.class);
when(address.isLoopbackAddress()).thenReturn(true); when(address.isLoopbackAddress()).thenReturn(true);
Assertions.assertFalse(NetUtils.isValidV4Address(address)); Assertions.assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class); address = mock(Inet4Address.class);
when(address.getHostAddress()).thenReturn("localhost"); when(address.getHostAddress()).thenReturn("localhost");
Assertions.assertFalse(NetUtils.isValidV4Address(address)); Assertions.assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class); address = mock(Inet4Address.class);
when(address.getHostAddress()).thenReturn("0.0.0.0"); when(address.getHostAddress()).thenReturn("0.0.0.0");
when(address.isAnyLocalAddress()).thenReturn(true); when(address.isAnyLocalAddress()).thenReturn(true);
Assertions.assertFalse(NetUtils.isValidV4Address(address)); Assertions.assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class); address = mock(Inet4Address.class);
when(address.getHostAddress()).thenReturn("127.0.0.1"); when(address.getHostAddress()).thenReturn("127.0.0.1");
when(address.isLoopbackAddress()).thenReturn(true); when(address.isLoopbackAddress()).thenReturn(true);
Assertions.assertFalse(NetUtils.isValidV4Address(address)); Assertions.assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class); address = mock(Inet4Address.class);
when(address.getHostAddress()).thenReturn("1.2.3.4"); when(address.getHostAddress()).thenReturn("1.2.3.4");
Assertions.assertTrue(NetUtils.isValidV4Address(address)); Assertions.assertTrue(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class); address = mock(Inet4Address.class);
when(address.getHostAddress()).thenReturn("1.2.3.4:80"); when(address.getHostAddress()).thenReturn("1.2.3.4:80");
Assertions.assertFalse(NetUtils.isValidV4Address(address)); Assertions.assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class); address = mock(Inet4Address.class);
when(address.getHostAddress()).thenReturn("256.0.0.1"); when(address.getHostAddress()).thenReturn("256.0.0.1");
Assertions.assertFalse(NetUtils.isValidV4Address(address)); Assertions.assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class); address = mock(Inet4Address.class);
when(address.getHostAddress()).thenReturn("127.0.0.0.1"); when(address.getHostAddress()).thenReturn("127.0.0.0.1");
Assertions.assertFalse(NetUtils.isValidV4Address(address)); Assertions.assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class); address = mock(Inet4Address.class);
when(address.getHostAddress()).thenReturn("-1.2.3.4"); when(address.getHostAddress()).thenReturn("-1.2.3.4");
Assertions.assertFalse(NetUtils.isValidV4Address(address)); Assertions.assertFalse(NetUtils.isValidV4Address(address));
} }

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask; import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat; import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@ -59,6 +60,8 @@ public class MasterHeartBeatTask extends BaseHeartBeatTask<MasterHeartBeat> {
.memoryUsage(OSUtils.memoryUsagePercentage()) .memoryUsage(OSUtils.memoryUsagePercentage())
.diskAvailable(OSUtils.diskAvailable()) .diskAvailable(OSUtils.diskAvailable())
.processId(processId) .processId(processId)
.host(NetUtils.getHost())
.port(masterConfig.getListenPort())
.build(); .build();
} }

14
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java

@ -90,12 +90,16 @@ public class RegistryClient {
server.setCreateTime(new Date(masterHeartBeat.getStartupTime())); server.setCreateTime(new Date(masterHeartBeat.getStartupTime()));
server.setLastHeartbeatTime(new Date(masterHeartBeat.getReportTime())); server.setLastHeartbeatTime(new Date(masterHeartBeat.getReportTime()));
server.setId(masterHeartBeat.getProcessId()); server.setId(masterHeartBeat.getProcessId());
server.setHost(masterHeartBeat.getHost());
server.setPort(masterHeartBeat.getPort());
break; break;
case WORKER: case WORKER:
WorkerHeartBeat workerHeartBeat = JSONUtils.parseObject(heartBeatJson, WorkerHeartBeat.class); WorkerHeartBeat workerHeartBeat = JSONUtils.parseObject(heartBeatJson, WorkerHeartBeat.class);
server.setCreateTime(new Date(workerHeartBeat.getStartupTime())); server.setCreateTime(new Date(workerHeartBeat.getStartupTime()));
server.setLastHeartbeatTime(new Date(workerHeartBeat.getReportTime())); server.setLastHeartbeatTime(new Date(workerHeartBeat.getReportTime()));
server.setId(workerHeartBeat.getProcessId()); server.setId(workerHeartBeat.getProcessId());
server.setHost(workerHeartBeat.getHost());
server.setPort(workerHeartBeat.getPort());
break; break;
case ALERT_SERVER: case ALERT_SERVER:
AlertServerHeartBeat alertServerHeartBeat = AlertServerHeartBeat alertServerHeartBeat =
@ -103,16 +107,16 @@ public class RegistryClient {
server.setCreateTime(new Date(alertServerHeartBeat.getStartupTime())); server.setCreateTime(new Date(alertServerHeartBeat.getStartupTime()));
server.setLastHeartbeatTime(new Date(alertServerHeartBeat.getReportTime())); server.setLastHeartbeatTime(new Date(alertServerHeartBeat.getReportTime()));
server.setId(alertServerHeartBeat.getProcessId()); server.setId(alertServerHeartBeat.getProcessId());
server.setHost(alertServerHeartBeat.getHost());
server.setPort(alertServerHeartBeat.getPort());
break;
default:
log.warn("unknown registry node type: {}", registryNodeType);
} }
server.setResInfo(heartBeatJson); server.setResInfo(heartBeatJson);
// todo: add host, port in heartBeat Info, so that we don't need to parse this again // todo: add host, port in heartBeat Info, so that we don't need to parse this again
server.setZkDirectory(registryNodeType.getRegistryPath() + "/" + serverPath); server.setZkDirectory(registryNodeType.getRegistryPath() + "/" + serverPath);
// set host and port
int lastColonIndex = serverPath.lastIndexOf(":");
// fetch the last one
server.setHost(serverPath.substring(0, lastColonIndex));
server.setPort(Integer.parseInt(serverPath.substring(lastColonIndex + 1)));
serverList.add(server); serverList.add(server);
} }
return serverList; return serverList;

3
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask; import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@ -75,6 +76,8 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
.workerWaitingTaskCount(this.workerWaitingTaskCount.get()) .workerWaitingTaskCount(this.workerWaitingTaskCount.get())
.workerExecThreadCount(workerConfig.getExecThreads()) .workerExecThreadCount(workerConfig.getExecThreads())
.serverStatus(serverStatus) .serverStatus(serverStatus)
.host(NetUtils.getHost())
.port(workerConfig.getListenPort())
.build(); .build();
} }

Loading…
Cancel
Save