Browse Source

In the case of multiple network cards, a valid host is returned (#2924)

* In the case of multiple network cards, a valid host is returned

* In the case of multiple network cards, a valid host is returned

* In the case of multiple network cards, a valid host is returned

* In the case of multiple network cards, a valid host is returned

* fix LOCAL_ADDRESS not assigned  value

* fix LOCAL_ADDRESS not assigned  value

* fix LOCAL_ADDRESS not assigned  value

* fix lock error

* fix lock error

* end

* end

* end

* end

* code smell

* code smell

* code smell

* code smell

* code smell

* code smell

* code smell

* lowering

* messy

* small change

* compile error

Co-authored-by: dailidong <dailidong66@gmail.com>
Co-authored-by: gabry.wu <gabrywu@apache.org>
pull/3/MERGE
CalvinKirs 5 years ago committed by GitHub
parent
commit
d1282fdd96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 209
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java
  2. 15
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
  3. 6
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/os/OSUtilsTest.java
  4. 57
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java
  5. 2
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java
  6. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
  7. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
  8. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
  9. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  10. 30
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  11. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
  12. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  13. 6
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
  14. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
  15. 6
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManagerTest.java
  16. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ExecutionContextTestUtils.java
  17. 5
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java

209
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java

@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.*;
import java.util.Enumeration;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;
import static java.util.Collections.emptyList;
/**
* NetUtils
*/
public class NetUtils {
private NetUtils() {
throw new IllegalStateException("Utility class");
}
private static Logger logger = LoggerFactory.getLogger(NetUtils.class);
private static final Pattern IP_PATTERN = Pattern.compile("\\d{1,3}(\\.\\d{1,3}){3,5}$");
private static String ANY_HOST_VALUE = "0.0.0.0";
private static String LOCAL_HOST_VALUE = "127.0.0.1";
private static InetAddress LOCAL_ADDRESS = null;
private static volatile String HOST_ADDRESS;
public static String getHost() {
if (HOST_ADDRESS != null) {
return HOST_ADDRESS;
}
InetAddress address = getLocalAddress();
if (address != null) {
HOST_ADDRESS = address.getHostAddress();
return HOST_ADDRESS;
}
return LOCAL_HOST_VALUE;
}
private static InetAddress getLocalAddress() {
if (null != LOCAL_ADDRESS) {
return LOCAL_ADDRESS;
}
return getLocalAddress0();
}
/**
* Find first valid IP from local network card
*
* @return first valid local IP
*/
private static synchronized InetAddress getLocalAddress0() {
if (null != LOCAL_ADDRESS) {
return LOCAL_ADDRESS;
}
InetAddress localAddress = null;
NetworkInterface networkInterface = findNetworkInterface();
Enumeration<InetAddress> addresses = networkInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
Optional<InetAddress> addressOp = toValidAddress(addresses.nextElement());
if (addressOp.isPresent()) {
try {
if (addressOp.get().isReachable(100)) {
LOCAL_ADDRESS = addressOp.get();
return LOCAL_ADDRESS;
}
} catch (IOException e) {
logger.warn("test address id reachable io exception", e);
}
}
}
try {
localAddress = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
logger.warn("InetAddress get LocalHost exception", e);
}
Optional<InetAddress> addressOp = toValidAddress(localAddress);
if (addressOp.isPresent()) {
LOCAL_ADDRESS = addressOp.get();
}
return LOCAL_ADDRESS;
}
private static Optional<InetAddress> toValidAddress(InetAddress address) {
if (address instanceof Inet6Address) {
Inet6Address v6Address = (Inet6Address) address;
if (isPreferIPV6Address()) {
return Optional.ofNullable(normalizeV6Address(v6Address));
}
}
if (isValidV4Address(address)) {
return Optional.of(address);
}
return Optional.empty();
}
private static InetAddress normalizeV6Address(Inet6Address address) {
String addr = address.getHostAddress();
int i = addr.lastIndexOf('%');
if (i > 0) {
try {
return InetAddress.getByName(addr.substring(0, i) + '%' + address.getScopeId());
} catch (UnknownHostException e) {
logger.debug("Unknown IPV6 address: ", e);
}
}
return address;
}
public static boolean isValidV4Address(InetAddress address) {
if (address == null || address.isLoopbackAddress()) {
return false;
}
String name = address.getHostAddress();
return (name != null
&& IP_PATTERN.matcher(name).matches()
&& !ANY_HOST_VALUE.equals(name)
&& !LOCAL_HOST_VALUE.equals(name));
}
/**
* Check if an ipv6 address
*
* @return true if it is reachable
*/
private static boolean isPreferIPV6Address() {
return Boolean.getBoolean("java.net.preferIPv6Addresses");
}
/**
* Get the suitable {@link NetworkInterface}
*
* @return If no {@link NetworkInterface} is available , return <code>null</code>
*/
private static NetworkInterface findNetworkInterface() {
List<NetworkInterface> validNetworkInterfaces = emptyList();
try {
validNetworkInterfaces = getValidNetworkInterfaces();
} catch (SocketException e) {
logger.warn("ValidNetworkInterfaces exception", e);
}
return validNetworkInterfaces.get(0);
}
/**
* Get the valid {@link NetworkInterface network interfaces}
*
* @throws SocketException SocketException if an I/O error occurs.
*/
private static List<NetworkInterface> getValidNetworkInterfaces() throws SocketException {
List<NetworkInterface> validNetworkInterfaces = new LinkedList<>();
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface networkInterface = interfaces.nextElement();
if (ignoreNetworkInterface(networkInterface)) { // ignore
continue;
}
validNetworkInterfaces.add(networkInterface);
}
return validNetworkInterfaces;
}
/**
* @param networkInterface {@link NetworkInterface}
* @return if the specified {@link NetworkInterface} should be ignored, return <code>true</code>
* @throws SocketException SocketException if an I/O error occurs.
*/
public static boolean ignoreNetworkInterface(NetworkInterface networkInterface) throws SocketException {
return networkInterface == null
|| networkInterface.isLoopback()
|| networkInterface.isVirtual()
|| !networkInterface.isUp();
}
}

15
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java

@ -23,8 +23,6 @@ import java.io.InputStreamReader;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean; import java.lang.management.RuntimeMXBean;
import java.math.RoundingMode; import java.math.RoundingMode;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.DecimalFormat; import java.text.DecimalFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -409,19 +407,6 @@ public class OSUtils {
return Integer.parseInt(runtimeMXBean.getName().split("@")[0]); return Integer.parseInt(runtimeMXBean.getName().split("@")[0]);
} }
/**
* get local host
* @return host
*/
public static String getHost(){
try {
return InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
logger.error(e.getMessage(),e);
}
return null;
}
/** /**
* whether is macOS * whether is macOS
* @return true if mac * @return true if mac

6
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/os/OSUtilsTest.java

@ -35,12 +35,6 @@ public class OSUtilsTest {
private static Logger logger = LoggerFactory.getLogger(OSUtilsTest.class); private static Logger logger = LoggerFactory.getLogger(OSUtilsTest.class);
@Test
public void getHost(){
logger.info(OSUtils.getHost());
}
@Test @Test
public void memoryUsage() { public void memoryUsage() {
logger.info("memoryUsage : {}", OSUtils.memoryUsage());// 0.3361799418926239 logger.info("memoryUsage : {}", OSUtils.memoryUsage());// 0.3361799418926239

57
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.junit.Test;
import java.net.InetAddress;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* NetUtilsTest
*/
public class NetUtilsTest {
@Test
public void testGetLocalHost() {
assertNotNull(NetUtils.getHost());
}
@Test
public void testIsValidAddress() {
assertFalse(NetUtils.isValidV4Address(null));
InetAddress address = mock(InetAddress.class);
when(address.isLoopbackAddress()).thenReturn(true);
assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class);
when(address.getHostAddress()).thenReturn("localhost");
assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class);
when(address.getHostAddress()).thenReturn("0.0.0.0");
assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class);
when(address.getHostAddress()).thenReturn("127.0.0.1");
assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class);
when(address.getHostAddress()).thenReturn("1.2.3.4");
assertTrue(NetUtils.isValidV4Address(address));
}
}

2
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java

@ -93,7 +93,7 @@ public class OSUtilsTest {
} }
@Test @Test
public void getHost(){ public void getHost(){
String host = OSUtils.getHost(); String host = NetUtils.getHost();
Assert.assertNotNull(host); Assert.assertNotNull(host);
Assert.assertNotEquals("", host); Assert.assertNotEquals("", host);
} }

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java

@ -21,7 +21,7 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
@ -80,7 +80,7 @@ public class MasterRegistry {
* registry * registry
*/ */
public void registry() { public void registry() {
String address = OSUtils.getHost(); String address = NetUtils.getHost();
String localNodePath = getMasterPath(); String localNodePath = getMasterPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
@ -132,7 +132,9 @@ public class MasterRegistry {
* @return * @return
*/ */
private String getLocalAddress(){ private String getLocalAddress(){
return OSUtils.getHost() + Constants.COLON + masterConfig.getListenPort();
return NetUtils.getHost() + ":" + masterConfig.getListenPort();
} }
} }

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java

@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils; import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -123,7 +123,7 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
private void initTaskParameters() { private void initTaskParameters() {
this.taskInstance.setLogPath(getTaskLogPath(taskInstance)); this.taskInstance.setLogPath(getTaskLogPath(taskInstance));
this.taskInstance.setHost(OSUtils.getHost() + Constants.COLON + masterConfig.getListenPort()); this.taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort());
taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION); taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
taskInstance.setStartTime(new Date()); taskInstance.setStartTime(new Date());
this.processService.saveTaskInstance(taskInstance); this.processService.saveTaskInstance(taskInstance);

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java

@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DependentUtils; import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.DependentExecute; import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -172,7 +172,7 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
private void initTaskParameters() { private void initTaskParameters() {
taskInstance.setLogPath(getTaskLogPath(taskInstance)); taskInstance.setLogPath(getTaskLogPath(taskInstance));
taskInstance.setHost(OSUtils.getHost() + Constants.COLON + masterConfig.getListenPort()); taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort());
taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION); taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
taskInstance.setStartTime(new Date()); taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance); processService.updateTaskInstance(taskInstance);

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -21,6 +21,7 @@ import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -158,6 +159,6 @@ public class MasterSchedulerService extends Thread {
} }
private String getLocalAddress(){ private String getLocalAddress(){
return OSUtils.getHost() + ":" + masterConfig.getListenPort(); return NetUtils.getHost() + ":" + masterConfig.getListenPort();
} }
} }

30
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -18,21 +18,15 @@
package org.apache.dolphinscheduler.server.worker.processor; package org.apache.dolphinscheduler.server.worker.processor;
import java.util.Date; import ch.qos.logback.classic.LoggerContext;
import java.util.Optional; import ch.qos.logback.classic.sift.SiftingAppender;
import java.util.concurrent.ExecutionException; import com.github.rholder.retry.RetryException;
import java.util.concurrent.ExecutorService; import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.RetryerUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@ -47,11 +41,10 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.github.rholder.retry.RetryException; import java.util.Date;
import java.util.Optional;
import ch.qos.logback.classic.LoggerContext; import java.util.concurrent.ExecutionException;
import ch.qos.logback.classic.sift.SiftingAppender; import java.util.concurrent.ExecutorService;
import io.netty.channel.Channel;
/** /**
* worker request processor * worker request processor
@ -98,12 +91,13 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
String contextJson = taskRequestCommand.getTaskExecutionContext(); String contextJson = taskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class); TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
if(taskExecutionContext == null){ if(taskExecutionContext == null){
logger.error("task execution context is null"); logger.error("task execution context is null");
return; return;
} }
taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort()); taskExecutionContext.setHost(NetUtils.getHost() + ":" + workerConfig.getListenPort());
// custom logger // custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
@ -197,4 +191,4 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()); taskExecutionContext.getTaskInstanceId());
} }
} }

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -21,7 +21,7 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
@ -87,7 +87,7 @@ public class WorkerRegistry {
* registry * registry
*/ */
public void registry() { public void registry() {
String address = OSUtils.getHost(); String address = NetUtils.getHost();
String localNodePath = getWorkerPath(); String localNodePath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
@ -149,7 +149,7 @@ public class WorkerRegistry {
* @return * @return
*/ */
private String getLocalAddress(){ private String getLocalAddress(){
return OSUtils.getHost() + Constants.COLON + workerConfig.getListenPort(); return NetUtils.getHost() + ":" + workerConfig.getListenPort();
} }
} }

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
@ -41,7 +41,7 @@ import org.springframework.stereotype.Component;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import static org.apache.dolphinscheduler.common.Constants.*; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
/** /**
@ -75,7 +75,7 @@ public class ZKMasterClient extends AbstractZKClient {
// init system znode // init system znode
this.initSystemZNode(); this.initSystemZNode();
while (!checkZKNodeExists(OSUtils.getHost(), ZKNodeType.MASTER)){ while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)){
ThreadUtils.sleep(SLEEP_TIME_MILLIS); ThreadUtils.sleep(SLEEP_TIME_MILLIS);
} }
@ -155,7 +155,7 @@ public class ZKMasterClient extends AbstractZKClient {
* @throws Exception exception * @throws Exception exception
*/ */
private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception {
if(StringUtils.isEmpty(serverHost) || serverHost.startsWith(OSUtils.getHost())){ if(StringUtils.isEmpty(serverHost) || serverHost.startsWith(NetUtils.getHost())){
return ; return ;
} }
switch (zkNodeType){ switch (zkNodeType){

6
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.executor; package org.apache.dolphinscheduler.server.master.dispatch.executor;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -78,7 +78,7 @@ public class NettyExecutorManagerTest {
.buildProcessDefinitionRelatedInfo(processDefinition) .buildProcessDefinitionRelatedInfo(processDefinition)
.create(); .create();
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER); ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER);
executionContext.setHost(Host.of(OSUtils.getHost() + ":" + serverConfig.getListenPort())); executionContext.setHost(Host.of(NetUtils.getHost() + ":" + serverConfig.getListenPort()));
Boolean execute = nettyExecutorManager.execute(executionContext); Boolean execute = nettyExecutorManager.execute(executionContext);
Assert.assertTrue(execute); Assert.assertTrue(execute);
nettyRemotingServer.close(); nettyRemotingServer.close();
@ -97,7 +97,7 @@ public class NettyExecutorManagerTest {
.buildProcessDefinitionRelatedInfo(processDefinition) .buildProcessDefinitionRelatedInfo(processDefinition)
.create(); .create();
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER); ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER);
executionContext.setHost(Host.of(OSUtils.getHost() + ":4444")); executionContext.setHost(Host.of(NetUtils.getHost() + ":4444"));
nettyExecutorManager.execute(executionContext); nettyExecutorManager.execute(executionContext);
} }

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.host; package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@ -73,6 +73,6 @@ public class RoundRobinHostManagerTest {
ExecutionContext context = ExecutionContextTestUtils.getExecutionContext(10000); ExecutionContext context = ExecutionContextTestUtils.getExecutionContext(10000);
Host host = roundRobinHostManager.select(context); Host host = roundRobinHostManager.select(context);
Assert.assertTrue(StringUtils.isNotEmpty(host.getAddress())); Assert.assertTrue(StringUtils.isNotEmpty(host.getAddress()));
Assert.assertTrue(host.getAddress().equalsIgnoreCase(OSUtils.getHost() + ":" + workerConfig.getListenPort())); Assert.assertTrue(host.getAddress().equalsIgnoreCase(NetUtils.getHost() + ":" + workerConfig.getListenPort()));
} }
} }

6
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManagerTest.java

@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.server.registry;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@ -74,7 +74,7 @@ public class ZookeeperNodeManagerTest {
Set<String> masterNodes = zookeeperNodeManager.getMasterNodes(); Set<String> masterNodes = zookeeperNodeManager.getMasterNodes();
Assert.assertTrue(CollectionUtils.isNotEmpty(masterNodes)); Assert.assertTrue(CollectionUtils.isNotEmpty(masterNodes));
Assert.assertEquals(1, masterNodes.size()); Assert.assertEquals(1, masterNodes.size());
Assert.assertEquals(OSUtils.getHost() + ":" + masterConfig.getListenPort(), masterNodes.iterator().next()); Assert.assertEquals(NetUtils.getHost() + ":" + masterConfig.getListenPort(), masterNodes.iterator().next());
} }
@Test @Test
@ -102,6 +102,6 @@ public class ZookeeperNodeManagerTest {
Set<String> workerNodes = zookeeperNodeManager.getWorkerGroupNodes("default"); Set<String> workerNodes = zookeeperNodeManager.getWorkerGroupNodes("default");
Assert.assertTrue(CollectionUtils.isNotEmpty(workerNodes)); Assert.assertTrue(CollectionUtils.isNotEmpty(workerNodes));
Assert.assertEquals(1, workerNodes.size()); Assert.assertEquals(1, workerNodes.size());
Assert.assertEquals(OSUtils.getHost() + ":" + workerConfig.getListenPort(), workerNodes.iterator().next()); Assert.assertEquals(NetUtils.getHost() + ":" + workerConfig.getListenPort(), workerNodes.iterator().next());
} }
} }

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ExecutionContextTestUtils.java

@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -47,7 +47,7 @@ public class ExecutionContextTestUtils {
.buildProcessDefinitionRelatedInfo(processDefinition) .buildProcessDefinitionRelatedInfo(processDefinition)
.create(); .create();
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER); ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER);
executionContext.setHost(Host.of(OSUtils.getHost() + ":" + port)); executionContext.setHost(Host.of(NetUtils.getHost() + ":" + port));
return executionContext; return executionContext;
} }

5
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java

@ -17,8 +17,7 @@
package org.apache.dolphinscheduler.server.worker.registry; package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.server.zk.SpringZKServer;
@ -60,7 +59,7 @@ public class WorkerRegistryTest {
workerRegistry.registry(); workerRegistry.registry();
String workerPath = zookeeperRegistryCenter.getWorkerPath(); String workerPath = zookeeperRegistryCenter.getWorkerPath();
Assert.assertEquals(DEFAULT_WORKER_GROUP, workerConfig.getWorkerGroup().trim()); Assert.assertEquals(DEFAULT_WORKER_GROUP, workerConfig.getWorkerGroup().trim());
String instancePath = workerPath + "/" + workerConfig.getWorkerGroup().trim() + "/" + (OSUtils.getHost() + ":" + workerConfig.getListenPort()); String instancePath = workerPath + "/" + workerConfig.getWorkerGroup().trim() + "/" + (NetUtils.getHost() + ":" + workerConfig.getListenPort());
TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); //wait heartbeat info write into zk node TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); //wait heartbeat info write into zk node
String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(instancePath); String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(instancePath);
Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length); Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length);

Loading…
Cancel
Save