Browse Source

[1.3.6-prepare][Fix-5115][Server] Fix the registered address of a server is the loopback address 127.0.0.1 (#5117)

Shiwen Cheng 4 years ago committed by GitHub
parent
commit
b14793d4e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 335
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java
  3. 52
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
  4. 16
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/os/OSUtilsTest.java
  5. 89
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java
  6. 29
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java
  7. 12
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
  8. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
  9. 7
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
  10. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
  11. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  12. 20
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  13. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
  14. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  15. 7
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
  16. 5
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
  17. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
  18. 13
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManagerTest.java
  19. 5
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ExecutionContextTestUtils.java
  20. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
  21. 1
      pom.xml

11
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -1001,6 +1001,17 @@ public final class Constants {
public static final String END_TIME = "end time";
public static final String START_END_DATE = "startDate,endDate";
/**
* Network system properties
*/
public static final String DOLPHIN_SCHEDULER_PREFERRED_NETWORK_INTERFACE = "dolphin.scheduler.network.interface.preferred";
/**
* Network IP gets priority, default inner outer
*/
public static final String NETWORK_PRIORITY_STRATEGY = "dolphin.scheduler.network.priority.strategy";
/**
* datasource encryption salt
*/

335
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java

@ -0,0 +1,335 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import static org.apache.dolphinscheduler.common.Constants.DOLPHIN_SCHEDULER_PREFERRED_NETWORK_INTERFACE;
import static java.util.Collections.emptyList;
import org.apache.dolphinscheduler.common.Constants;
import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Enumeration;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* NetUtils
*/
public class NetUtils {
private static final Pattern STS_PATTERN = Pattern.compile("-\\d+$"); // StatefulSet pattern
private static final Pattern IP_PATTERN = Pattern.compile("\\d{1,3}(\\.\\d{1,3}){3,5}$");
private static final String NETWORK_PRIORITY_DEFAULT = "default";
private static final String NETWORK_PRIORITY_INNER = "inner";
private static final String NETWORK_PRIORITY_OUTER = "outer";
private static final Logger logger = LoggerFactory.getLogger(NetUtils.class);
private static InetAddress LOCAL_ADDRESS = null;
private static volatile String HOST_ADDRESS;
private NetUtils() {
throw new UnsupportedOperationException("Construct NetUtils");
}
/**
* get addr like host:port
* @return addr
*/
public static String getAddr(String host, int port) {
return String.format("%s:%d", host, port);
}
/**
* get addr like host:port
* @return addr
*/
public static String getAddr(int port) {
return getAddr(getHost(), port);
}
/**
* get host
* @return host
*/
public static String getHost(InetAddress inetAddress) {
if (inetAddress != null) {
if (Constants.KUBERNETES_MODE) {
String canonicalHost = inetAddress.getCanonicalHostName();
if (!canonicalHost.contains(".") || IP_PATTERN.matcher(canonicalHost).matches()) {
String host = inetAddress.getHostName();
if (STS_PATTERN.matcher(host).find()) {
return String.format("%s.%s", host, host.replaceFirst("\\d+$", "headless"));
}
}
return canonicalHost;
}
return inetAddress.getHostAddress();
}
return null;
}
public static String getHost() {
if (HOST_ADDRESS != null) {
return HOST_ADDRESS;
}
InetAddress address = getLocalAddress();
if (address != null) {
HOST_ADDRESS = getHost(address);
return HOST_ADDRESS;
}
return Constants.KUBERNETES_MODE ? "localhost" : "127.0.0.1";
}
private static InetAddress getLocalAddress() {
if (null != LOCAL_ADDRESS) {
return LOCAL_ADDRESS;
}
return getLocalAddress0();
}
/**
* Find first valid IP from local network card
*
* @return first valid local IP
*/
private static synchronized InetAddress getLocalAddress0() {
if (null != LOCAL_ADDRESS) {
return LOCAL_ADDRESS;
}
InetAddress localAddress = null;
try {
NetworkInterface networkInterface = findNetworkInterface();
if (networkInterface != null) {
Enumeration<InetAddress> addresses = networkInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
Optional<InetAddress> addressOp = toValidAddress(addresses.nextElement());
if (addressOp.isPresent()) {
try {
if (addressOp.get().isReachable(100)) {
LOCAL_ADDRESS = addressOp.get();
return LOCAL_ADDRESS;
}
} catch (IOException e) {
logger.warn("test address id reachable io exception", e);
}
}
}
}
localAddress = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
logger.warn("InetAddress get LocalHost exception", e);
}
Optional<InetAddress> addressOp = toValidAddress(localAddress);
if (addressOp.isPresent()) {
LOCAL_ADDRESS = addressOp.get();
}
return LOCAL_ADDRESS;
}
private static Optional<InetAddress> toValidAddress(InetAddress address) {
if (address instanceof Inet6Address) {
Inet6Address v6Address = (Inet6Address) address;
if (isPreferIPV6Address()) {
return Optional.ofNullable(normalizeV6Address(v6Address));
}
}
if (isValidV4Address(address)) {
return Optional.of(address);
}
return Optional.empty();
}
private static InetAddress normalizeV6Address(Inet6Address address) {
String addr = address.getHostAddress();
int i = addr.lastIndexOf('%');
if (i > 0) {
try {
return InetAddress.getByName(addr.substring(0, i) + '%' + address.getScopeId());
} catch (UnknownHostException e) {
logger.debug("Unknown IPV6 address: ", e);
}
}
return address;
}
public static boolean isValidV4Address(InetAddress address) {
if (address == null || address.isLoopbackAddress()) {
return false;
}
String name = address.getHostAddress();
return (name != null
&& IP_PATTERN.matcher(name).matches()
&& !address.isAnyLocalAddress()
&& !address.isLoopbackAddress());
}
/**
* Check if an ipv6 address
*
* @return true if it is reachable
*/
private static boolean isPreferIPV6Address() {
return Boolean.getBoolean("java.net.preferIPv6Addresses");
}
/**
* Get the suitable {@link NetworkInterface}
*
* @return If no {@link NetworkInterface} is available , return <code>null</code>
*/
private static NetworkInterface findNetworkInterface() {
List<NetworkInterface> validNetworkInterfaces = emptyList();
try {
validNetworkInterfaces = getValidNetworkInterfaces();
} catch (SocketException e) {
logger.warn("ValidNetworkInterfaces exception", e);
}
NetworkInterface result = null;
// Try to specify config NetWork Interface
for (NetworkInterface networkInterface : validNetworkInterfaces) {
if (isSpecifyNetworkInterface(networkInterface)) {
result = networkInterface;
break;
}
}
if (null != result) {
return result;
}
return findAddress(validNetworkInterfaces);
}
/**
* Get the valid {@link NetworkInterface network interfaces}
*
* @throws SocketException SocketException if an I/O error occurs.
*/
private static List<NetworkInterface> getValidNetworkInterfaces() throws SocketException {
List<NetworkInterface> validNetworkInterfaces = new LinkedList<>();
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface networkInterface = interfaces.nextElement();
if (ignoreNetworkInterface(networkInterface)) { // ignore
continue;
}
validNetworkInterfaces.add(networkInterface);
}
return validNetworkInterfaces;
}
/**
* @param networkInterface {@link NetworkInterface}
* @return if the specified {@link NetworkInterface} should be ignored, return <code>true</code>
* @throws SocketException SocketException if an I/O error occurs.
*/
public static boolean ignoreNetworkInterface(NetworkInterface networkInterface) throws SocketException {
return networkInterface == null
|| networkInterface.isLoopback()
|| networkInterface.isVirtual()
|| !networkInterface.isUp();
}
private static boolean isSpecifyNetworkInterface(NetworkInterface networkInterface) {
String preferredNetworkInterface = System.getProperty(DOLPHIN_SCHEDULER_PREFERRED_NETWORK_INTERFACE);
return Objects.equals(networkInterface.getDisplayName(), preferredNetworkInterface);
}
private static NetworkInterface findAddress(List<NetworkInterface> validNetworkInterfaces) {
if (validNetworkInterfaces.isEmpty()) {
return null;
}
String networkPriority = PropertyUtils.getString(Constants.NETWORK_PRIORITY_STRATEGY, NETWORK_PRIORITY_DEFAULT);
if (NETWORK_PRIORITY_DEFAULT.equalsIgnoreCase(networkPriority)) {
return findAddressByDefaultPolicy(validNetworkInterfaces);
} else if (NETWORK_PRIORITY_INNER.equalsIgnoreCase(networkPriority)) {
return findInnerAddress(validNetworkInterfaces);
} else if (NETWORK_PRIORITY_OUTER.equalsIgnoreCase(networkPriority)) {
return findOuterAddress(validNetworkInterfaces);
} else {
logger.error("There is no matching network card acquisition policy!");
return null;
}
}
private static NetworkInterface findAddressByDefaultPolicy(List<NetworkInterface> validNetworkInterfaces) {
NetworkInterface networkInterface;
networkInterface = findInnerAddress(validNetworkInterfaces);
if (networkInterface == null) {
networkInterface = findOuterAddress(validNetworkInterfaces);
if (networkInterface == null) {
networkInterface = validNetworkInterfaces.get(0);
}
}
return networkInterface;
}
/**
* Get the Intranet IP
*
* @return If no {@link NetworkInterface} is available , return <code>null</code>
*/
private static NetworkInterface findInnerAddress(List<NetworkInterface> validNetworkInterfaces) {
NetworkInterface networkInterface = null;
for (NetworkInterface ni : validNetworkInterfaces) {
Enumeration<InetAddress> address = ni.getInetAddresses();
while (address.hasMoreElements()) {
InetAddress ip = address.nextElement();
if (ip.isSiteLocalAddress()
&& !ip.isLoopbackAddress()) {
networkInterface = ni;
}
}
}
return networkInterface;
}
private static NetworkInterface findOuterAddress(List<NetworkInterface> validNetworkInterfaces) {
NetworkInterface networkInterface = null;
for (NetworkInterface ni : validNetworkInterfaces) {
Enumeration<InetAddress> address = ni.getInetAddresses();
while (address.hasMoreElements()) {
InetAddress ip = address.nextElement();
if (!ip.isSiteLocalAddress()
&& !ip.isLoopbackAddress()) {
networkInterface = ni;
}
}
}
return networkInterface;
}
}

52
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java

@ -29,8 +29,6 @@ import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.math.RoundingMode;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
@ -415,56 +413,6 @@ public class OSUtils {
return Integer.parseInt(runtimeMXBean.getName().split("@")[0]);
}
/**
* get local addr
* @return addr like host:port
*/
public static String getAddr(int port) {
return getAddr(getHost(), port);
}
/**
* get addr
* @return addr like host:port
*/
public static String getAddr(String host, int port) {
return String.format("%s:%d", host, port);
}
/**
* get local host
* @return host
*/
public static String getHost(){
try {
return getHost(InetAddress.getLocalHost());
} catch (UnknownHostException e) {
logger.error(e.getMessage(),e);
}
return null;
}
/**
* get local host
* @return host
*/
public static String getHost(InetAddress inetAddress){
if (inetAddress != null) {
if (Constants.KUBERNETES_MODE) {
String canonicalHost = inetAddress.getCanonicalHostName();
if (!canonicalHost.contains(".") || IP_PATTERN.matcher(canonicalHost).matches()) {
String host = inetAddress.getHostName();
if (STS_PATTERN.matcher(host).find()) {
return String.format("%s.%s", host, host.replaceFirst("\\d+$", "headless"));
}
}
return canonicalHost;
}
return inetAddress.getHostAddress();
}
return null;
}
/**
* whether is macOS
* @return true if mac

16
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/os/OSUtilsTest.java

@ -18,14 +18,15 @@ package org.apache.dolphinscheduler.common.os;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import oshi.hardware.GlobalMemory;
import java.math.RoundingMode;
import java.text.DecimalFormat;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import oshi.hardware.GlobalMemory;
/**
* OSUtilsTest
@ -34,13 +35,6 @@ public class OSUtilsTest {
private static Logger logger = LoggerFactory.getLogger(OSUtilsTest.class);
@Test
public void getHost(){
logger.info(OSUtils.getHost());
}
@Test
public void memoryUsage() {
logger.info("memoryUsage : {}", OSUtils.memoryUsage());// 0.3361799418926239

89
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.common.CommonTest;
import org.apache.dolphinscheduler.common.Constants;
import java.net.InetAddress;
import org.junit.Test;
/**
* NetUtilsTest
*/
public class NetUtilsTest {
@Test
public void testGetAddr() {
assertEquals(NetUtils.getHost() + ":5678", NetUtils.getAddr(5678));
assertEquals("127.0.0.1:5678", NetUtils.getAddr("127.0.0.1", 5678));
assertEquals("localhost:1234", NetUtils.getAddr("localhost", 1234));
}
@Test
public void testGetHost() throws Exception {
InetAddress address = mock(InetAddress.class);
when(address.getCanonicalHostName()).thenReturn("dolphinscheduler-worker-0.dolphinscheduler-worker-headless.default.svc.cluster.local");
when(address.getHostName()).thenReturn("dolphinscheduler-worker-0");
when(address.getHostAddress()).thenReturn("172.17.0.15");
assertEquals("172.17.0.15", NetUtils.getHost(address));
CommonTest.setFinalStatic(Constants.class.getDeclaredField("KUBERNETES_MODE"), true);
assertEquals("dolphinscheduler-worker-0.dolphinscheduler-worker-headless.default.svc.cluster.local", NetUtils.getHost(address));
address = mock(InetAddress.class);
when(address.getCanonicalHostName()).thenReturn("dolphinscheduler-worker-0");
when(address.getHostName()).thenReturn("dolphinscheduler-worker-0");
CommonTest.setFinalStatic(Constants.class.getDeclaredField("KUBERNETES_MODE"), true);
assertEquals("dolphinscheduler-worker-0.dolphinscheduler-worker-headless", NetUtils.getHost(address));
}
@Test
public void testGetLocalHost() {
assertNotNull(NetUtils.getHost());
}
@Test
public void testIsValidAddress() {
assertFalse(NetUtils.isValidV4Address(null));
InetAddress address = mock(InetAddress.class);
when(address.isLoopbackAddress()).thenReturn(true);
assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class);
when(address.getHostAddress()).thenReturn("localhost");
assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class);
when(address.getHostAddress()).thenReturn("0.0.0.0");
when(address.isAnyLocalAddress()).thenReturn(true);
assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class);
when(address.getHostAddress()).thenReturn("127.0.0.1");
when(address.isLoopbackAddress()).thenReturn(true);
assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class);
when(address.getHostAddress()).thenReturn("1.2.3.4");
assertTrue(NetUtils.isValidV4Address(address));
}
}

29
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java

@ -16,17 +16,12 @@
*/
package org.apache.dolphinscheduler.common.utils;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.common.CommonTest;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import java.io.IOException;
import java.net.InetAddress;
import java.util.List;
import org.junit.Assert;
@ -95,30 +90,6 @@ public class OSUtilsTest {
Assert.assertNotEquals(0, processId);
}
@Test
public void getAddr(){
Assert.assertEquals(OSUtils.getHost() + ":5678", OSUtils.getAddr(5678));
Assert.assertEquals("127.0.0.1:5678", OSUtils.getAddr("127.0.0.1", 5678));
Assert.assertEquals("localhost:1234", OSUtils.getAddr("localhost", 1234));
}
@Test
public void getHost() throws Exception {
String host = OSUtils.getHost();
Assert.assertNotNull(host);
Assert.assertNotEquals("", host);
InetAddress address = mock(InetAddress.class);
when(address.getCanonicalHostName()).thenReturn("dolphinscheduler-worker-0.dolphinscheduler-worker-headless.default.svc.cluster.local");
when(address.getHostName()).thenReturn("dolphinscheduler-worker-0");
when(address.getHostAddress()).thenReturn("172.17.0.15");
Assert.assertEquals("172.17.0.15", OSUtils.getHost(address));
CommonTest.setFinalStatic(Constants.class.getDeclaredField("KUBERNETES_MODE"), true);
Assert.assertEquals("dolphinscheduler-worker-0.dolphinscheduler-worker-headless.default.svc.cluster.local", OSUtils.getHost(address));
address = mock(InetAddress.class);
when(address.getCanonicalHostName()).thenReturn("dolphinscheduler-worker-0");
when(address.getHostName()).thenReturn("dolphinscheduler-worker-0");
CommonTest.setFinalStatic(Constants.class.getDeclaredField("KUBERNETES_MODE"), true);
Assert.assertEquals("dolphinscheduler-worker-0.dolphinscheduler-worker-headless", OSUtils.getHost(address));
}
@Test
public void checkResource(){
boolean resource = OSUtils.checkResource(100,0);
Assert.assertTrue(resource);

12
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java

@ -16,12 +16,12 @@
*/
package org.apache.dolphinscheduler.remote.utils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import java.net.InetSocketAddress;
import io.netty.channel.Channel;
/**
* channel utils
*/
@ -34,7 +34,7 @@ public class ChannelUtils {
* @return local address
*/
public static String getLocalAddress(Channel channel){
return OSUtils.getHost(((InetSocketAddress)channel.localAddress()).getAddress());
return NetUtils.getHost(((InetSocketAddress)channel.localAddress()).getAddress());
}
/**
@ -43,7 +43,7 @@ public class ChannelUtils {
* @return remote address
*/
public static String getRemoteAddress(Channel channel){
return OSUtils.getHost(((InetSocketAddress)channel.remoteAddress()).getAddress());
return NetUtils.getHost(((InetSocketAddress)channel.remoteAddress()).getAddress());
}
/**
@ -53,7 +53,7 @@ public class ChannelUtils {
*/
public static Host toAddress(Channel channel){
InetSocketAddress socketAddress = ((InetSocketAddress)channel.remoteAddress());
return new Host(OSUtils.getHost(socketAddress.getAddress()), socketAddress.getPort());
return new Host(NetUtils.getHost(socketAddress.getAddress()), socketAddress.getPort());
}
}

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java

@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
@ -81,7 +81,7 @@ public class MasterRegistry {
* registry
*/
public void registry() {
String address = OSUtils.getHost();
String address = NetUtils.getHost();
String localNodePath = getMasterPath();
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");
zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener(
@ -133,7 +133,7 @@ public class MasterRegistry {
* @return
*/
private String getLocalAddress() {
return OSUtils.getAddr(masterConfig.getListenPort());
return NetUtils.getAddr(masterConfig.getListenPort());
}
/**

7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java

@ -25,9 +25,8 @@ import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
@ -35,6 +34,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.LoggerFactory;
public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
/**
@ -122,7 +123,7 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
private void initTaskParameters() {
this.taskInstance.setLogPath(getTaskLogPath(taskInstance));
this.taskInstance.setHost(OSUtils.getAddr(masterConfig.getListenPort()));
this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
this.processService.saveTaskInstance(taskInstance);

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java

@ -16,6 +16,8 @@
*/
package org.apache.dolphinscheduler.server.master.runner;
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@ -25,14 +27,17 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
import org.slf4j.LoggerFactory;
public class DependentTaskExecThread extends MasterBaseTaskExecThread {
@ -174,7 +179,7 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
private void initTaskParameters() {
taskInstance.setLogPath(getTaskLogPath(taskInstance));
taskInstance.setHost(OSUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -171,6 +172,6 @@ public class MasterSchedulerService extends Thread {
}
private String getLocalAddress() {
return OSUtils.getAddr(masterConfig.getListenPort());
return NetUtils.getAddr(masterConfig.getListenPort());
}
}

20
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -17,10 +17,6 @@
package org.apache.dolphinscheduler.server.worker.processor;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@ -28,7 +24,7 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
@ -45,13 +41,19 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import io.netty.channel.Channel;
/**
* worker request processor
*/
@ -126,7 +128,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
taskExecutionContext.setHost(OSUtils.getAddr(workerConfig.getListenPort()));
taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setLogPath(getTaskLogPath(taskExecutionContext));

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -22,7 +22,7 @@ import static org.apache.dolphinscheduler.common.Constants.SLASH;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
@ -98,7 +98,7 @@ public class WorkerRegistry {
* registry
*/
public void registry() {
String address = OSUtils.getHost();
String address = NetUtils.getHost();
Set<String> workerZkPaths = getWorkerZkPaths();
int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval();
@ -172,7 +172,7 @@ public class WorkerRegistry {
* @return local address
*/
private String getLocalAddress() {
return OSUtils.getAddr(workerConfig.getListenPort());
return NetUtils.getAddr(workerConfig.getListenPort());
}
}

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
@ -90,7 +90,7 @@ public class ZKMasterClient extends AbstractZKClient {
// init system znode
this.initSystemZNode();
while (!checkZKNodeExists(OSUtils.getHost(), ZKNodeType.MASTER)) {
while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)) {
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
// startup tolerant

7
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.executor;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -78,7 +79,7 @@ public class NettyExecutorManagerTest {
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER);
executionContext.setHost(Host.of(OSUtils.getAddr(serverConfig.getListenPort())));
executionContext.setHost(Host.of(NetUtils.getAddr(serverConfig.getListenPort())));
Boolean execute = nettyExecutorManager.execute(executionContext);
Assert.assertTrue(execute);
nettyRemotingServer.close();
@ -97,7 +98,7 @@ public class NettyExecutorManagerTest {
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER);
executionContext.setHost(Host.of(OSUtils.getAddr(OSUtils.getHost(), 4444)));
executionContext.setHost(Host.of(NetUtils.getAddr(4444)));
nettyExecutorManager.execute(executionContext);
}

5
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -73,6 +74,6 @@ public class RoundRobinHostManagerTest {
ExecutionContext context = ExecutionContextTestUtils.getExecutionContext(10000);
Host host = roundRobinHostManager.select(context);
Assert.assertTrue(StringUtils.isNotEmpty(host.getAddress()));
Assert.assertTrue(host.getAddress().equalsIgnoreCase(OSUtils.getAddr(workerConfig.getListenPort())));
Assert.assertTrue(host.getAddress().equalsIgnoreCase(NetUtils.getAddr(workerConfig.getListenPort())));
}
}

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java

@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
@ -60,7 +60,7 @@ public class MasterRegistryTest {
masterRegistry.registry();
String masterPath = zookeeperRegistryCenter.getMasterPath();
TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node
String masterNodePath = masterPath + "/" + OSUtils.getAddr(Constants.LOCAL_ADDRESS, masterConfig.getListenPort());
String masterNodePath = masterPath + "/" + NetUtils.getAddr(Constants.LOCAL_ADDRESS, masterConfig.getListenPort());
String heartbeat = zookeeperRegistryCenter.getRegisterOperator().get(masterNodePath);
Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length);
masterRegistry.unRegistry();

13
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManagerTest.java

@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.server.registry;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@ -26,6 +26,10 @@ import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import java.util.Map;
import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -33,9 +37,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.Map;
import java.util.Set;
/**
* zookeeper node manager test
*/
@ -74,7 +75,7 @@ public class ZookeeperNodeManagerTest {
Set<String> masterNodes = zookeeperNodeManager.getMasterNodes();
Assert.assertTrue(CollectionUtils.isNotEmpty(masterNodes));
Assert.assertEquals(1, masterNodes.size());
Assert.assertEquals(OSUtils.getAddr(masterConfig.getListenPort()), masterNodes.iterator().next());
Assert.assertEquals(NetUtils.getAddr(masterConfig.getListenPort()), masterNodes.iterator().next());
}
@Test
@ -102,6 +103,6 @@ public class ZookeeperNodeManagerTest {
Set<String> workerNodes = zookeeperNodeManager.getWorkerGroupNodes("default");
Assert.assertTrue(CollectionUtils.isNotEmpty(workerNodes));
Assert.assertEquals(1, workerNodes.size());
Assert.assertEquals(OSUtils.getAddr(workerConfig.getListenPort()), workerNodes.iterator().next());
Assert.assertEquals(NetUtils.getAddr(workerConfig.getListenPort()), workerNodes.iterator().next());
}
}

5
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ExecutionContextTestUtils.java

@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.mockito.Mockito;
/**
@ -47,7 +48,7 @@ public class ExecutionContextTestUtils {
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER);
executionContext.setHost(Host.of(OSUtils.getAddr(OSUtils.getHost(), port)));
executionContext.setHost(Host.of(NetUtils.getAddr(port)));
return executionContext;
}

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java

@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.worker.registry;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@ -121,7 +121,7 @@ public class WorkerRegistryTest {
int i = 0;
for (String workerGroup : workerConfig.getWorkerGroups()) {
String workerZkPath = workerPath + "/" + workerGroup.trim() + "/" + (OSUtils.getAddr(workerConfig.getListenPort()));
String workerZkPath = workerPath + "/" + workerGroup.trim() + "/" + (NetUtils.getAddr(workerConfig.getListenPort()));
String heartbeat = zookeeperRegistryCenter.getRegisterOperator().get(workerZkPath);
if (0 == i) {
Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/test/"));

1
pom.xml

@ -769,6 +769,7 @@
<include>**/common/utils/JSONUtilsTest.java</include>
<include>**/common/utils/LoggerUtilsTest.java</include>
<include>**/common/utils/OSUtilsTest.java</include>
<include>**/common/utils/NetUtilsTest.java</include>
<include>**/common/utils/ParameterUtilsTest.java</include>
<include>**/common/utils/TimePlaceholderUtilsTest.java</include>
<include>**/common/utils/PreconditionsTest.java</include>

Loading…
Cancel
Save