Browse Source

Merge pull request #14 from apache/dev

update from code
pull/3/MERGE
BoYiZhang 4 years ago committed by GitHub
parent
commit
a26724cebb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 45
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
  3. 121
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java
  4. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
  5. 3
      dolphinscheduler-common/src/main/resources/common.properties
  6. 74
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
  7. 103
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
  8. 2
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js
  9. 3
      dolphinscheduler-ui/src/js/conf/home/pages/user/pages/password/_source/info.vue
  10. 3
      pom.xml
  11. 2
      tools/dependencies/known-dependencies.txt

5
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -1000,4 +1000,9 @@ public final class Constants {
public static final String DATASOURCE_ENCRYPTION_SALT_DEFAULT = "!@#$%^&*";
public static final String DATASOURCE_ENCRYPTION_ENABLE = "datasource.encryption.enable";
public static final String DATASOURCE_ENCRYPTION_SALT = "datasource.encryption.salt";
/**
* Network IP gets priority, default inner outer
*/
public static final String NETWORK_PRIORITY_STRATEGY = "dolphin.scheduler.network.priority.strategy";
}

45
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java

@ -14,26 +14,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.commons.io.IOUtils;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_UPLOAD_PATH;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
@ -43,7 +48,13 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_UPLOAD_PATH;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
/**
* hadoop utils
@ -102,7 +113,6 @@ public class HadoopUtils implements Closeable {
}
}
/**
* init hadoop configuration
*/
@ -167,7 +177,6 @@ public class HadoopUtils implements Closeable {
fs = FileSystem.get(configuration);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
@ -232,11 +241,11 @@ public class HadoopUtils implements Closeable {
return new byte[0];
}
FSDataInputStream fsDataInputStream = fs.open(new Path(hdfsFilePath));
return IOUtils.toByteArray(fsDataInputStream);
try (FSDataInputStream fsDataInputStream = fs.open(new Path(hdfsFilePath))) {
return IOUtils.toByteArray(fsDataInputStream);
}
}
/**
* cat file on hdfs
*
@ -527,7 +536,6 @@ public class HadoopUtils implements Closeable {
return String.format("%s/udfs", getHdfsTenantDir(tenantCode));
}
/**
* get hdfs file name
*
@ -579,7 +587,6 @@ public class HadoopUtils implements Closeable {
return String.format("%s/%s", getHdfsDataBasePath(), tenantCode);
}
/**
* getAppAddress
*
@ -610,7 +617,6 @@ public class HadoopUtils implements Closeable {
return start + activeRM + end;
}
@Override
public void close() throws IOException {
if (fs != null) {
@ -623,7 +629,6 @@ public class HadoopUtils implements Closeable {
}
}
/**
* yarn ha admin utils
*/
@ -669,7 +674,6 @@ public class HadoopUtils implements Closeable {
return null;
}
/**
* get ResourceManager state
*
@ -694,4 +698,5 @@ public class HadoopUtils implements Closeable {
}
}
}

121
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java

@ -21,6 +21,8 @@ import static org.apache.dolphinscheduler.common.Constants.DOLPHIN_SCHEDULER_PRE
import static java.util.Collections.emptyList;
import org.apache.dolphinscheduler.common.Constants;
import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
@ -42,22 +44,20 @@ import org.slf4j.LoggerFactory;
*/
public class NetUtils {
private NetUtils() {
throw new UnsupportedOperationException("Construct NetUtils");
}
private static Logger logger = LoggerFactory.getLogger(NetUtils.class);
private static final Pattern IP_PATTERN = Pattern.compile("\\d{1,3}(\\.\\d{1,3}){3,5}$");
private static String ANY_HOST_VALUE = "0.0.0.0";
private static String LOCAL_HOST_VALUE = "127.0.0.1";
private static final String NETWORK_PRIORITY_DEFAULT = "default";
private static final String NETWORK_PRIORITY_INNER = "inner";
private static final String NETWORK_PRIORITY_OUTER = "outer";
private static final Logger logger = LoggerFactory.getLogger(NetUtils.class);
private static final String ANY_HOST_VALUE = "0.0.0.0";
private static final String LOCAL_HOST_VALUE = "127.0.0.1";
private static InetAddress LOCAL_ADDRESS = null;
private static volatile String HOST_ADDRESS;
private NetUtils() {
throw new UnsupportedOperationException("Construct NetUtils");
}
public static String getHost() {
if (HOST_ADDRESS != null) {
return HOST_ADDRESS;
@ -87,24 +87,27 @@ public class NetUtils {
if (null != LOCAL_ADDRESS) {
return LOCAL_ADDRESS;
}
InetAddress localAddress = null;
NetworkInterface networkInterface = findNetworkInterface();
Enumeration<InetAddress> addresses = networkInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
Optional<InetAddress> addressOp = toValidAddress(addresses.nextElement());
if (addressOp.isPresent()) {
try {
if (addressOp.get().isReachable(100)) {
LOCAL_ADDRESS = addressOp.get();
return LOCAL_ADDRESS;
try {
NetworkInterface networkInterface = findNetworkInterface();
if (networkInterface != null) {
Enumeration<InetAddress> addresses = networkInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
Optional<InetAddress> addressOp = toValidAddress(addresses.nextElement());
if (addressOp.isPresent()) {
try {
if (addressOp.get().isReachable(100)) {
LOCAL_ADDRESS = addressOp.get();
return LOCAL_ADDRESS;
}
} catch (IOException e) {
logger.warn("test address id reachable io exception", e);
}
}
} catch (IOException e) {
logger.warn("test address id reachable io exception", e);
}
}
}
try {
localAddress = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
logger.warn("InetAddress get LocalHost exception", e);
@ -190,7 +193,7 @@ public class NetUtils {
if (null != result) {
return result;
}
return validNetworkInterfaces.get(0);
return findAddress(validNetworkInterfaces);
}
/**
@ -227,4 +230,70 @@ public class NetUtils {
String preferredNetworkInterface = System.getProperty(DOLPHIN_SCHEDULER_PREFERRED_NETWORK_INTERFACE);
return Objects.equals(networkInterface.getDisplayName(), preferredNetworkInterface);
}
private static NetworkInterface findAddress(List<NetworkInterface> validNetworkInterfaces) {
if (validNetworkInterfaces.isEmpty()) {
return null;
}
String networkPriority = PropertyUtils.getString(Constants.NETWORK_PRIORITY_STRATEGY, NETWORK_PRIORITY_DEFAULT);
if (NETWORK_PRIORITY_DEFAULT.equalsIgnoreCase(networkPriority)) {
return findAddressByDefaultPolicy(validNetworkInterfaces);
} else if (NETWORK_PRIORITY_INNER.equalsIgnoreCase(networkPriority)) {
return findInnerAddress(validNetworkInterfaces);
} else if (NETWORK_PRIORITY_OUTER.equalsIgnoreCase(networkPriority)) {
return findOuterAddress(validNetworkInterfaces);
} else {
logger.error("There is no matching network card acquisition policy!");
return null;
}
}
private static NetworkInterface findAddressByDefaultPolicy(List<NetworkInterface> validNetworkInterfaces) {
NetworkInterface networkInterface;
networkInterface = findInnerAddress(validNetworkInterfaces);
if (networkInterface == null) {
networkInterface = findOuterAddress(validNetworkInterfaces);
if (networkInterface == null) {
networkInterface = validNetworkInterfaces.get(0);
}
}
return networkInterface;
}
/**
* Get the Intranet IP
*
* @return If no {@link NetworkInterface} is available , return <code>null</code>
*/
private static NetworkInterface findInnerAddress(List<NetworkInterface> validNetworkInterfaces) {
NetworkInterface networkInterface = null;
for (NetworkInterface ni : validNetworkInterfaces) {
Enumeration<InetAddress> address = ni.getInetAddresses();
while (address.hasMoreElements()) {
InetAddress ip = address.nextElement();
if (ip.isSiteLocalAddress()
&& !ip.isLoopbackAddress()) {
networkInterface = ni;
}
}
}
return networkInterface;
}
private static NetworkInterface findOuterAddress(List<NetworkInterface> validNetworkInterfaces) {
NetworkInterface networkInterface = null;
for (NetworkInterface ni : validNetworkInterfaces) {
Enumeration<InetAddress> address = ni.getInetAddresses();
while (address.hasMoreElements()) {
InetAddress ip = address.nextElement();
if (!ip.isSiteLocalAddress()
&& !ip.isLoopbackAddress()) {
networkInterface = ni;
}
}
}
return networkInterface;
}
}

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java

@ -59,9 +59,8 @@ public class TaskParametersUtils {
switch (EnumUtils.getEnum(TaskType.class, taskType)) {
case SUB_PROCESS:
return JSONUtils.parseObject(parameter, SubProcessParameters.class);
case WATERDROP:
return JSONUtils.parseObject(parameter, ShellParameters.class);
case SHELL:
case WATERDROP:
return JSONUtils.parseObject(parameter, ShellParameters.class);
case PROCEDURE:
return JSONUtils.parseObject(parameter, ProcedureParameters.class);

3
dolphinscheduler-common/src/main/resources/common.properties

@ -72,3 +72,6 @@ kerberos.expire.time=2
# datasource encryption salt
datasource.encryption.enable=false
datasource.encryption.salt=!@#$%^&*
# Network IP gets priority, default inner outer
#dolphin.scheduler.network.priority.strategy=default

74
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java

@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task;
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.EnumUtils;
@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask;
import org.apache.dolphinscheduler.server.worker.task.spark.SparkTask;
import org.apache.dolphinscheduler.server.worker.task.sql.SqlTask;
import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopTask;
import org.slf4j.Logger;
/**
@ -37,42 +38,39 @@ import org.slf4j.Logger;
*/
public class TaskManager {
/**
* create new task
* @param taskExecutionContext taskExecutionContext
* @param logger logger
* @return AbstractTask
* @throws IllegalArgumentException illegal argument exception
*/
public static AbstractTask newTask(TaskExecutionContext taskExecutionContext,
Logger logger)
throws IllegalArgumentException {
switch (EnumUtils.getEnum(TaskType.class,taskExecutionContext.getTaskType())) {
case SHELL:
return new ShellTask(taskExecutionContext, logger);
case WATERDROP:
return new ShellTask(taskExecutionContext, logger);
case PROCEDURE:
return new ProcedureTask(taskExecutionContext, logger);
case SQL:
return new SqlTask(taskExecutionContext, logger);
case MR:
return new MapReduceTask(taskExecutionContext, logger);
case SPARK:
return new SparkTask(taskExecutionContext, logger);
case FLINK:
return new FlinkTask(taskExecutionContext, logger);
case PYTHON:
return new PythonTask(taskExecutionContext, logger);
case HTTP:
return new HttpTask(taskExecutionContext, logger);
case DATAX:
return new DataxTask(taskExecutionContext, logger);
case SQOOP:
return new SqoopTask(taskExecutionContext, logger);
default:
logger.error("unsupport task type: {}", taskExecutionContext.getTaskType());
throw new IllegalArgumentException("not support task type");
/**
* create new task
* @param taskExecutionContext taskExecutionContext
* @param logger logger
* @return AbstractTask
* @throws IllegalArgumentException illegal argument exception
*/
public static AbstractTask newTask(TaskExecutionContext taskExecutionContext, Logger logger) throws IllegalArgumentException {
switch (EnumUtils.getEnum(TaskType.class,taskExecutionContext.getTaskType())) {
case SHELL:
case WATERDROP:
return new ShellTask(taskExecutionContext, logger);
case PROCEDURE:
return new ProcedureTask(taskExecutionContext, logger);
case SQL:
return new SqlTask(taskExecutionContext, logger);
case MR:
return new MapReduceTask(taskExecutionContext, logger);
case SPARK:
return new SparkTask(taskExecutionContext, logger);
case FLINK:
return new FlinkTask(taskExecutionContext, logger);
case PYTHON:
return new PythonTask(taskExecutionContext, logger);
case HTTP:
return new HttpTask(taskExecutionContext, logger);
case DATAX:
return new DataxTask(taskExecutionContext, logger);
case SQOOP:
return new SqoopTask(taskExecutionContext, logger);
default:
logger.error("unsupport task type: {}", taskExecutionContext.getTaskType());
throw new IllegalArgumentException("not support task type");
}
}
}
}

103
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class})
public class TaskManagerTest {
private TaskExecutionContext taskExecutionContext;
private Logger taskLogger;
private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
@Before
public void before() {
// init task execution context, logger
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setProcessId(12345);
taskExecutionContext.setProcessDefineId(1);
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskType("");
taskExecutionContext.setFirstSubmitTime(new Date());
taskExecutionContext.setDelayTime(0);
taskExecutionContext.setLogPath("/tmp/test.log");
taskExecutionContext.setHost("localhost");
taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4");
taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(
LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()
));
taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl();
taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
.thenReturn(taskExecutionContextCacheManager);
}
@Test
public void testNewTask() {
taskExecutionContext.setTaskType("SHELL");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
taskExecutionContext.setTaskType("WATERDROP");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
taskExecutionContext.setTaskType("HTTP");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
taskExecutionContext.setTaskType("MR");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
taskExecutionContext.setTaskType("SPARK");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
taskExecutionContext.setTaskType("FLINK");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
taskExecutionContext.setTaskType("PYTHON");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
taskExecutionContext.setTaskType("DATAX");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
taskExecutionContext.setTaskType("SQOOP");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
//taskExecutionContext.setTaskType(null);
//Assert.assertNull(TaskManager.newTask(taskExecutionContext,taskLogger));
//taskExecutionContext.setTaskType("XXX");
//Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
}
}

2
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js

@ -37,7 +37,7 @@ const warningTypeList = [
]
const isEmial = (val) => {
let regEmail = /^([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line
let regEmail = /^([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,}$/ // eslint-disable-line
return regEmail.test(val)
}

3
dolphinscheduler-ui/src/js/conf/home/pages/user/pages/password/_source/info.vue

@ -85,7 +85,8 @@
userPassword: this.userPassword,
tenantId: this.userInfo.tenantId,
email: this.userInfo.email,
phone: this.userInfo.phone
phone: this.userInfo.phone,
state: this.userinfo.state
}
this.spinnerLoading = true
this.updateUser(param).then(res => {

3
pom.xml

@ -69,7 +69,7 @@
<mybatis-plus.version>3.2.0</mybatis-plus.version>
<mybatis.spring.version>2.0.1</mybatis.spring.version>
<cron.utils.version>5.0.5</cron.utils.version>
<druid.version>1.1.14</druid.version>
<druid.version>1.1.22</druid.version>
<h2.version>1.4.200</h2.version>
<commons.codec.version>1.6</commons.codec.version>
<commons.logging.version>1.1.1</commons.logging.version>
@ -847,6 +847,7 @@
<!--<include>**/server/worker/task/datax/DataxTaskTest.java</include>-->
<!--<include>**/server/worker/task/http/HttpTaskTest.java</include>-->
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
<include>**/server/worker/task/TaskManagerTest.java</include>
<include>**/server/worker/EnvFileTest.java</include>
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include>
<include>**/service/quartz/cron/CronUtilsTest.java</include>

2
tools/dependencies/known-dependencies.txt

@ -43,7 +43,7 @@ datanucleus-api-jdo-4.2.1.jar
datanucleus-core-4.1.6.jar
datanucleus-rdbms-4.1.7.jar
derby-10.14.2.0.jar
druid-1.1.14.jar
druid-1.1.22.jar
gson-2.8.5.jar
guava-20.0.jar
guice-3.0.jar

Loading…
Cancel
Save