Browse Source

Merge branch 'dev' into dev

pull/3/MERGE
dailidong 4 years ago committed by GitHub
parent
commit
5fec36c129
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 42
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
  2. 209
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java
  3. 15
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
  4. 6
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/os/OSUtilsTest.java
  5. 57
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java
  6. 2
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java
  7. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
  8. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
  9. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
  10. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  11. 30
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  12. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
  13. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  14. 6
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
  15. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
  16. 6
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManagerTest.java
  17. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ExecutionContextTestUtils.java
  18. 5
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
  19. 2
      dolphinscheduler-ui/build/config.js
  20. 0
      dolphinscheduler-ui/favicon.png
  21. 3
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue
  22. 23
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue
  23. 2
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/dragZoom.js
  24. 20
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js
  25. 67
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/multiDrag.js
  26. 16
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/util.js
  27. 4
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue

42
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java

@ -16,33 +16,22 @@
*/
package org.apache.dolphinscheduler.common.utils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.fasterxml.jackson.databind.type.CollectionType;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import static com.fasterxml.jackson.databind.DeserializationFeature.*;
/**
@ -55,17 +44,16 @@ public class JSONUtils {
/**
* can use static singleton, inject: just make sure to reuse!
*/
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final ObjectMapper objectMapper = new ObjectMapper()
.configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
.configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
.setTimeZone(TimeZone.getDefault())
;
private JSONUtils() {
}
static {
//Feature that determines whether encountering of unknown properties, false means not analyzer unknown properties
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).setTimeZone(TimeZone.getDefault());
objectMapper.configure(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true).setTimeZone(TimeZone.getDefault());
}
public static ArrayNode createArrayNode() {
return objectMapper.createArrayNode();

209
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java

@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.*;
import java.util.Enumeration;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;
import static java.util.Collections.emptyList;
/**
* NetUtils
*/
public class NetUtils {
private NetUtils() {
throw new IllegalStateException("Utility class");
}
private static Logger logger = LoggerFactory.getLogger(NetUtils.class);
private static final Pattern IP_PATTERN = Pattern.compile("\\d{1,3}(\\.\\d{1,3}){3,5}$");
private static String ANY_HOST_VALUE = "0.0.0.0";
private static String LOCAL_HOST_VALUE = "127.0.0.1";
private static InetAddress LOCAL_ADDRESS = null;
private static volatile String HOST_ADDRESS;
public static String getHost() {
if (HOST_ADDRESS != null) {
return HOST_ADDRESS;
}
InetAddress address = getLocalAddress();
if (address != null) {
HOST_ADDRESS = address.getHostAddress();
return HOST_ADDRESS;
}
return LOCAL_HOST_VALUE;
}
private static InetAddress getLocalAddress() {
if (null != LOCAL_ADDRESS) {
return LOCAL_ADDRESS;
}
return getLocalAddress0();
}
/**
* Find first valid IP from local network card
*
* @return first valid local IP
*/
private static synchronized InetAddress getLocalAddress0() {
if (null != LOCAL_ADDRESS) {
return LOCAL_ADDRESS;
}
InetAddress localAddress = null;
NetworkInterface networkInterface = findNetworkInterface();
Enumeration<InetAddress> addresses = networkInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
Optional<InetAddress> addressOp = toValidAddress(addresses.nextElement());
if (addressOp.isPresent()) {
try {
if (addressOp.get().isReachable(100)) {
LOCAL_ADDRESS = addressOp.get();
return LOCAL_ADDRESS;
}
} catch (IOException e) {
logger.warn("test address id reachable io exception", e);
}
}
}
try {
localAddress = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
logger.warn("InetAddress get LocalHost exception", e);
}
Optional<InetAddress> addressOp = toValidAddress(localAddress);
if (addressOp.isPresent()) {
LOCAL_ADDRESS = addressOp.get();
}
return LOCAL_ADDRESS;
}
private static Optional<InetAddress> toValidAddress(InetAddress address) {
if (address instanceof Inet6Address) {
Inet6Address v6Address = (Inet6Address) address;
if (isPreferIPV6Address()) {
return Optional.ofNullable(normalizeV6Address(v6Address));
}
}
if (isValidV4Address(address)) {
return Optional.of(address);
}
return Optional.empty();
}
private static InetAddress normalizeV6Address(Inet6Address address) {
String addr = address.getHostAddress();
int i = addr.lastIndexOf('%');
if (i > 0) {
try {
return InetAddress.getByName(addr.substring(0, i) + '%' + address.getScopeId());
} catch (UnknownHostException e) {
logger.debug("Unknown IPV6 address: ", e);
}
}
return address;
}
public static boolean isValidV4Address(InetAddress address) {
if (address == null || address.isLoopbackAddress()) {
return false;
}
String name = address.getHostAddress();
return (name != null
&& IP_PATTERN.matcher(name).matches()
&& !ANY_HOST_VALUE.equals(name)
&& !LOCAL_HOST_VALUE.equals(name));
}
/**
* Check if an ipv6 address
*
* @return true if it is reachable
*/
private static boolean isPreferIPV6Address() {
return Boolean.getBoolean("java.net.preferIPv6Addresses");
}
/**
* Get the suitable {@link NetworkInterface}
*
* @return If no {@link NetworkInterface} is available , return <code>null</code>
*/
private static NetworkInterface findNetworkInterface() {
List<NetworkInterface> validNetworkInterfaces = emptyList();
try {
validNetworkInterfaces = getValidNetworkInterfaces();
} catch (SocketException e) {
logger.warn("ValidNetworkInterfaces exception", e);
}
return validNetworkInterfaces.get(0);
}
/**
* Get the valid {@link NetworkInterface network interfaces}
*
* @throws SocketException SocketException if an I/O error occurs.
*/
private static List<NetworkInterface> getValidNetworkInterfaces() throws SocketException {
List<NetworkInterface> validNetworkInterfaces = new LinkedList<>();
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface networkInterface = interfaces.nextElement();
if (ignoreNetworkInterface(networkInterface)) { // ignore
continue;
}
validNetworkInterfaces.add(networkInterface);
}
return validNetworkInterfaces;
}
/**
* @param networkInterface {@link NetworkInterface}
* @return if the specified {@link NetworkInterface} should be ignored, return <code>true</code>
* @throws SocketException SocketException if an I/O error occurs.
*/
public static boolean ignoreNetworkInterface(NetworkInterface networkInterface) throws SocketException {
return networkInterface == null
|| networkInterface.isLoopback()
|| networkInterface.isVirtual()
|| !networkInterface.isUp();
}
}

15
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java

@ -23,8 +23,6 @@ import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.math.RoundingMode;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
@ -409,19 +407,6 @@ public class OSUtils {
return Integer.parseInt(runtimeMXBean.getName().split("@")[0]);
}
/**
* get local host
* @return host
*/
public static String getHost(){
try {
return InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
logger.error(e.getMessage(),e);
}
return null;
}
/**
* whether is macOS
* @return true if mac

6
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/os/OSUtilsTest.java

@ -35,12 +35,6 @@ public class OSUtilsTest {
private static Logger logger = LoggerFactory.getLogger(OSUtilsTest.class);
@Test
public void getHost(){
logger.info(OSUtils.getHost());
}
@Test
public void memoryUsage() {
logger.info("memoryUsage : {}", OSUtils.memoryUsage());// 0.3361799418926239

57
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.junit.Test;
import java.net.InetAddress;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* NetUtilsTest
*/
public class NetUtilsTest {
@Test
public void testGetLocalHost() {
assertNotNull(NetUtils.getHost());
}
@Test
public void testIsValidAddress() {
assertFalse(NetUtils.isValidV4Address(null));
InetAddress address = mock(InetAddress.class);
when(address.isLoopbackAddress()).thenReturn(true);
assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class);
when(address.getHostAddress()).thenReturn("localhost");
assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class);
when(address.getHostAddress()).thenReturn("0.0.0.0");
assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class);
when(address.getHostAddress()).thenReturn("127.0.0.1");
assertFalse(NetUtils.isValidV4Address(address));
address = mock(InetAddress.class);
when(address.getHostAddress()).thenReturn("1.2.3.4");
assertTrue(NetUtils.isValidV4Address(address));
}
}

2
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java

@ -93,7 +93,7 @@ public class OSUtilsTest {
}
@Test
public void getHost(){
String host = OSUtils.getHost();
String host = NetUtils.getHost();
Assert.assertNotNull(host);
Assert.assertNotEquals("", host);
}

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java

@ -21,7 +21,7 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
@ -80,7 +80,7 @@ public class MasterRegistry {
* registry
*/
public void registry() {
String address = OSUtils.getHost();
String address = NetUtils.getHost();
String localNodePath = getMasterPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
@ -132,7 +132,9 @@ public class MasterRegistry {
* @return
*/
private String getLocalAddress(){
return OSUtils.getHost() + Constants.COLON + masterConfig.getListenPort();
return NetUtils.getHost() + ":" + masterConfig.getListenPort();
}
}

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java

@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.slf4j.LoggerFactory;
@ -123,7 +123,7 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
private void initTaskParameters() {
this.taskInstance.setLogPath(getTaskLogPath(taskInstance));
this.taskInstance.setHost(OSUtils.getHost() + Constants.COLON + masterConfig.getListenPort());
this.taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort());
taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
taskInstance.setStartTime(new Date());
this.processService.saveTaskInstance(taskInstance);

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java

@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.slf4j.LoggerFactory;
@ -172,7 +172,7 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
private void initTaskParameters() {
taskInstance.setLogPath(getTaskLogPath(taskInstance));
taskInstance.setHost(OSUtils.getHost() + Constants.COLON + masterConfig.getListenPort());
taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort());
taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -21,6 +21,7 @@ import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -158,6 +159,6 @@ public class MasterSchedulerService extends Thread {
}
private String getLocalAddress(){
return OSUtils.getHost() + ":" + masterConfig.getListenPort();
return NetUtils.getHost() + ":" + masterConfig.getListenPort();
}
}

30
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -18,21 +18,15 @@
package org.apache.dolphinscheduler.server.worker.processor;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import com.github.rholder.retry.RetryException;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.RetryerUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@ -47,11 +41,10 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.rholder.retry.RetryException;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import io.netty.channel.Channel;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
/**
* worker request processor
@ -98,12 +91,13 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
String contextJson = taskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
if(taskExecutionContext == null){
logger.error("task execution context is null");
return;
}
taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort());
taskExecutionContext.setHost(NetUtils.getHost() + ":" + workerConfig.getListenPort());
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
@ -197,4 +191,4 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
}
}
}

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -21,7 +21,7 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
@ -87,7 +87,7 @@ public class WorkerRegistry {
* registry
*/
public void registry() {
String address = OSUtils.getHost();
String address = NetUtils.getHost();
String localNodePath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
@ -149,7 +149,7 @@ public class WorkerRegistry {
* @return
*/
private String getLocalAddress(){
return OSUtils.getHost() + Constants.COLON + workerConfig.getListenPort();
return NetUtils.getHost() + ":" + workerConfig.getListenPort();
}
}

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
@ -41,7 +41,7 @@ import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
import static org.apache.dolphinscheduler.common.Constants.*;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
/**
@ -75,7 +75,7 @@ public class ZKMasterClient extends AbstractZKClient {
// init system znode
this.initSystemZNode();
while (!checkZKNodeExists(OSUtils.getHost(), ZKNodeType.MASTER)){
while (!checkZKNodeExists(NetUtils.getHost(), ZKNodeType.MASTER)){
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
@ -155,7 +155,7 @@ public class ZKMasterClient extends AbstractZKClient {
* @throws Exception exception
*/
private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception {
if(StringUtils.isEmpty(serverHost) || serverHost.startsWith(OSUtils.getHost())){
if(StringUtils.isEmpty(serverHost) || serverHost.startsWith(NetUtils.getHost())){
return ;
}
switch (zkNodeType){

6
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.executor;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -78,7 +78,7 @@ public class NettyExecutorManagerTest {
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER);
executionContext.setHost(Host.of(OSUtils.getHost() + ":" + serverConfig.getListenPort()));
executionContext.setHost(Host.of(NetUtils.getHost() + ":" + serverConfig.getListenPort()));
Boolean execute = nettyExecutorManager.execute(executionContext);
Assert.assertTrue(execute);
nettyRemotingServer.close();
@ -97,7 +97,7 @@ public class NettyExecutorManagerTest {
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER);
executionContext.setHost(Host.of(OSUtils.getHost() + ":4444"));
executionContext.setHost(Host.of(NetUtils.getHost() + ":4444"));
nettyExecutorManager.execute(executionContext);
}

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@ -73,6 +73,6 @@ public class RoundRobinHostManagerTest {
ExecutionContext context = ExecutionContextTestUtils.getExecutionContext(10000);
Host host = roundRobinHostManager.select(context);
Assert.assertTrue(StringUtils.isNotEmpty(host.getAddress()));
Assert.assertTrue(host.getAddress().equalsIgnoreCase(OSUtils.getHost() + ":" + workerConfig.getListenPort()));
Assert.assertTrue(host.getAddress().equalsIgnoreCase(NetUtils.getHost() + ":" + workerConfig.getListenPort()));
}
}

6
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManagerTest.java

@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.server.registry;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@ -74,7 +74,7 @@ public class ZookeeperNodeManagerTest {
Set<String> masterNodes = zookeeperNodeManager.getMasterNodes();
Assert.assertTrue(CollectionUtils.isNotEmpty(masterNodes));
Assert.assertEquals(1, masterNodes.size());
Assert.assertEquals(OSUtils.getHost() + ":" + masterConfig.getListenPort(), masterNodes.iterator().next());
Assert.assertEquals(NetUtils.getHost() + ":" + masterConfig.getListenPort(), masterNodes.iterator().next());
}
@Test
@ -102,6 +102,6 @@ public class ZookeeperNodeManagerTest {
Set<String> workerNodes = zookeeperNodeManager.getWorkerGroupNodes("default");
Assert.assertTrue(CollectionUtils.isNotEmpty(workerNodes));
Assert.assertEquals(1, workerNodes.size());
Assert.assertEquals(OSUtils.getHost() + ":" + workerConfig.getListenPort(), workerNodes.iterator().next());
Assert.assertEquals(NetUtils.getHost() + ":" + workerConfig.getListenPort(), workerNodes.iterator().next());
}
}

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ExecutionContextTestUtils.java

@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -47,7 +47,7 @@ public class ExecutionContextTestUtils {
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER);
executionContext.setHost(Host.of(OSUtils.getHost() + ":" + port));
executionContext.setHost(Host.of(NetUtils.getHost() + ":" + port));
return executionContext;
}

5
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java

@ -17,8 +17,7 @@
package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
@ -60,7 +59,7 @@ public class WorkerRegistryTest {
workerRegistry.registry();
String workerPath = zookeeperRegistryCenter.getWorkerPath();
Assert.assertEquals(DEFAULT_WORKER_GROUP, workerConfig.getWorkerGroup().trim());
String instancePath = workerPath + "/" + workerConfig.getWorkerGroup().trim() + "/" + (OSUtils.getHost() + ":" + workerConfig.getListenPort());
String instancePath = workerPath + "/" + workerConfig.getWorkerGroup().trim() + "/" + (NetUtils.getHost() + ":" + workerConfig.getListenPort());
TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); //wait heartbeat info write into zk node
String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(instancePath);
Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length);

2
dolphinscheduler-ui/build/config.js

@ -109,7 +109,7 @@ const pages = glob.sync(['*/!(_*).html'], { cwd: viewDir }).map(p => {
filename: newPagePath || path.join('view', p),
template: `${path.join('src/view', p)}`,
cache: true,
favicon:'./favicon.ico',
favicon:'./favicon.png',
inject: true,
hash: version,
chunks: chunks,

0
dolphinscheduler-ui/favicon.ico → dolphinscheduler-ui/favicon.png

Before

Width:  |  Height:  |  Size: 4.2 KiB

After

Width:  |  Height:  |  Size: 4.2 KiB

3
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue

@ -563,7 +563,8 @@
cacheTaskInfo({item, fromThis}) {
self.cacheTasks(item)
},
close ({ flag, fromThis }) {
close ({ item,flag, fromThis }) {
self.addTasks(item)
// Edit status does not allow deletion of nodes
if (flag) {
jsPlumb.remove(id)

23
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue

@ -305,6 +305,7 @@
description: '',
// Node echo data
backfillItem: {},
cacheBackfillItem: {},
// Resource(list)
resourcesList: [],
successNode: 'success',
@ -580,17 +581,34 @@
this.isContentBox = false
// flag Whether to delete a node this.$destroy()
this.$emit('close', {
item: {
type: this.cacheBackfillItem.type,
id: this.cacheBackfillItem.id,
name: this.cacheBackfillItem.name,
params: this.cacheBackfillItem.params,
description: this.cacheBackfillItem.description,
runFlag: this.cacheBackfillItem.runFlag,
conditionResult: this.cacheBackfillItem.conditionResult,
dependence: this.cacheBackfillItem.dependence,
maxRetryTimes: this.cacheBackfillItem.maxRetryTimes,
retryInterval: this.cacheBackfillItem.retryInterval,
timeout: this.cacheBackfillItem.timeout,
taskInstancePriority: this.cacheBackfillItem.taskInstancePriority,
workerGroup: this.cacheBackfillItem.workerGroup,
status: this.cacheBackfillItem.status,
branch: this.cacheBackfillItem.branch
},
flag: flag,
fromThis: this
})
}
},
},
watch: {
/**
* Watch the item change, cache the value it changes
**/
_item (val) {
this._cacheItem()
// this._cacheItem()
}
},
created () {
@ -653,6 +671,7 @@
} else {
this.workerGroup = this.store.state.security.workerGroupsListAll[0].id
}
this.cacheBackfillItem = o
this.isContentBox = true
},
mounted () {

2
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/dragZoom.js

@ -29,7 +29,7 @@ DragZoom.prototype.init = function () {
.scaleExtent([0.5, 2])
.on('zoom', () => {
this.scale = d3.event.scale
$canvas.css('transform', 'translate(' + d3.event.translate[0] + 'px,' + d3.event.translate[1] + 'px) scale(' + this.scale + ')')
$canvas.css('transform', 'scale(' + this.scale + ')')
$canvas.css('transform-origin', '0 0')
})
this.element.call(this.zoom).on('dblclick.zoom', null)

20
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js

@ -31,9 +31,11 @@ import {
rtTasksTpl,
setSvgColor,
saveTargetarr,
rtTargetarrArr
rtTargetarrArr,
computeScale
} from './util'
import mStart from '@/conf/home/pages/projects/pages/definition/pages/list/_source/start'
import multiDrag from './multiDrag'
const JSP = function () {
this.dag = {}
@ -91,6 +93,9 @@ JSP.prototype.init = function ({ dag, instance, options }) {
if (this.config.isNewNodes) {
DragZoom.init()
}
// support multi drag
multiDrag()
}
/**
@ -144,12 +149,13 @@ JSP.prototype.draggable = function () {
scope: 'plant',
drop: function (ev, ui) {
let id = 'tasks-' + Math.ceil(Math.random() * 100000) // eslint-disable-line
// Get mouse coordinates
const left = parseInt(ui.offset.left - $(this).offset().left)
let top = parseInt(ui.offset.top - $(this).offset().top) - 10
if (top < 25) {
top = 25
}
let scale = computeScale($(this))
scale = scale || 1
// Get mouse coordinates and after scale coordinate
const left = parseInt(ui.offset.left - $(this).offset().left) / scale
const top = parseInt(ui.offset.top - $(this).offset().top) / scale
// Generate template node
$('#canvas').append(rtTasksTpl({
id: id,

67
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/multiDrag.js

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import JSP from './jsPlumbHandle'
/**
* when and only ctrl or meta key pressing, we can select one or more dags to drag
*/
export default function () {
// init
let selectableObjects = []
JSP.JspInstance.clearDragSelection()
let ctrlPress = false
let nodes = null
const $window = $(window)
$window.bind('keydown', function (event) {
if (event.ctrlKey || event.metaKey) {
if (nodes) {
nodes.unbind('mousedown', select)
}
nodes = $('.jtk-draggable')
nodes.bind('mousedown', select)
ctrlPress = true
}
})
$window.bind('keyup', function (event) {
clear()
})
function select (event) {
if (ctrlPress && event.button === 0) {
let index = null
if ((index = selectableObjects.indexOf(this)) !== -1) {
selectableObjects.splice(index, 1)
JSP.JspInstance.removeFromDragSelection(this)
$(this).css('border-color', '')
} else {
selectableObjects.push(this)
JSP.JspInstance.addToDragSelection(this)
$(this).css('border-color', '#4af')
}
}
}
function clear () {
ctrlPress = false
selectableObjects.map(item => {
$(item).css('border-color', '')
})
selectableObjects = []
JSP.JspInstance.clearDragSelection()
}
}

16
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/util.js

@ -130,6 +130,19 @@ const allNodesId = () => {
})
return idArr
}
/**
* compute scalebecause it cant get from jquery directly
* @param el element
* @returns {boolean|number}
*/
const computeScale = function (el) {
const matrix = el.css('transform')
if (!matrix || matrix === 'none') {
return false
}
const values = matrix.split('(')[1].split(')')[0].split(',')
return Math.sqrt(values[0] * values[0] + values[1] * values[1])
}
export {
rtTargetarrArr,
@ -139,5 +152,6 @@ export {
isNameExDag,
setSvgColor,
allNodesId,
rtBantpl
rtBantpl,
computeScale
}

4
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue

@ -32,7 +32,7 @@
<span>{{$t('Executor')}}</span>
</th>
<th scope="col" style="min-width: 70px">
<span>{{$t('Node Type')}}</span>
<span style="margin-left: 5px">{{$t('Node Type')}}</span>
</th>
<th scope="col" style="min-width: 30px">
<span>{{$t('State')}}</span>
@ -73,7 +73,7 @@
<span v-if="item.executorName">{{item.executorName}}</span>
<span v-else>-</span>
</td>
<td><span>{{item.taskType}}</span></td>
<td><span style="margin-left: 5px">{{item.taskType}}</span></td>
<td><span v-html="_rtState(item.state)" style="cursor: pointer;"></span></td>
<td>
<span v-if="item.submitTime">{{item.submitTime | formatDate}}</span>

Loading…
Cancel
Save