diff --git a/.github/workflows/ci_e2e.yml b/.github/workflows/ci_e2e.yml index 82c81ef4e1..eaffc04e62 100644 --- a/.github/workflows/ci_e2e.yml +++ b/.github/workflows/ci_e2e.yml @@ -66,9 +66,9 @@ jobs: run: cd ./e2e && mvn -B clean test - name: Collect logs if: failure() - uses: actions/upload-artifact@v1 + uses: actions/upload-artifact@v2 with: name: dslogs - path: /var/lib/docker/volumes/docker-swarm_dolphinscheduler-logs/_data + path: ${{ github.workspace }}/docker/docker-swarm/dolphinscheduler-logs diff --git a/LICENSE b/LICENSE index dba3bc5288..9a0c6aa66a 100644 --- a/LICENSE +++ b/LICENSE @@ -215,3 +215,4 @@ The following components are provided under the Apache License. See project link The text of each license is the standard Apache 2.0 license. ScriptRunner from https://github.com/mybatis/mybatis-3 Apache 2.0 mvnw files from https://github.com/takari/maven-wrapper Apache 2.0 + PropertyPlaceholderHelper from https://github.com/spring-projects/spring-framework Apache 2.0 diff --git a/NOTICE b/NOTICE index 63b57af8c8..3e5695849b 100644 --- a/NOTICE +++ b/NOTICE @@ -72,4 +72,16 @@ Refactored SqlBuilder class (SQL, AbstractSQL) distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and - limitations under the License. \ No newline at end of file + limitations under the License. + +Spring Framework ${version} +Copyright (c) 2002-${copyright} Pivotal, Inc. + +This product is licensed to you under the Apache License, Version 2.0 +(the "License"). You may not use this product except in compliance with +the License. + +This product may include a number of subcomponents with separate +copyright notices and license terms. Your use of the source code for +these subcomponents is subject to the terms and conditions of the +subcomponent's license, as noted in the license.txt file. \ No newline at end of file diff --git a/ambari_plugin/common-services/DOLPHIN/1.3.0/package/scripts/params.py b/ambari_plugin/common-services/DOLPHIN/1.3.0/package/scripts/params.py index 08942df621..3ca46f2182 100644 --- a/ambari_plugin/common-services/DOLPHIN/1.3.0/package/scripts/params.py +++ b/ambari_plugin/common-services/DOLPHIN/1.3.0/package/scripts/params.py @@ -145,6 +145,11 @@ if len(zookeeperHosts) > 0 and "clientPort" in config['configurations']['zoo.cfg zookeeperPort = ":" + clientPort + "," dolphin_zookeeper_map['zookeeper.quorum'] = zookeeperPort.join(zookeeperHosts) + ":" + clientPort dolphin_zookeeper_map.update(config['configurations']['dolphin-zookeeper']) - +if 'spring.servlet.multipart.max-file-size' in dolphin_app_api_map: + file_size = dolphin_app_api_map['spring.servlet.multipart.max-file-size'] + dolphin_app_api_map['spring.servlet.multipart.max-file-size'] = file_size + "MB" +if 'spring.servlet.multipart.max-request-size' in dolphin_app_api_map: + request_size = dolphin_app_api_map['spring.servlet.multipart.max-request-size'] + dolphin_app_api_map['spring.servlet.multipart.max-request-size'] = request_size + "MB" diff --git a/docker/docker-swarm/docker-compose.yml b/docker/docker-swarm/docker-compose.yml index ee8be2570d..5cb6717f15 100644 --- a/docker/docker-swarm/docker-compose.yml +++ b/docker/docker-swarm/docker-compose.yml @@ -72,7 +72,7 @@ services: - dolphinscheduler-postgresql - dolphinscheduler-zookeeper volumes: - - dolphinscheduler-logs:/opt/dolphinscheduler/logs + - ./dolphinscheduler-logs:/opt/dolphinscheduler/logs networks: - dolphinscheduler @@ -95,7 +95,7 @@ services: depends_on: - dolphinscheduler-api volumes: - - dolphinscheduler-logs:/var/log/nginx + - ./dolphinscheduler-logs:/var/log/nginx networks: - dolphinscheduler @@ -133,7 +133,7 @@ services: depends_on: - dolphinscheduler-postgresql volumes: - - dolphinscheduler-logs:/opt/dolphinscheduler/logs + - ./dolphinscheduler-logs:/opt/dolphinscheduler/logs networks: - dolphinscheduler @@ -168,7 +168,7 @@ services: - dolphinscheduler-postgresql - dolphinscheduler-zookeeper volumes: - - dolphinscheduler-logs:/opt/dolphinscheduler/logs + - ./dolphinscheduler-logs:/opt/dolphinscheduler/logs networks: - dolphinscheduler @@ -210,9 +210,7 @@ services: - type: volume source: dolphinscheduler-worker-data target: /tmp/dolphinscheduler - - type: volume - source: dolphinscheduler-logs - target: /opt/dolphinscheduler/logs + - ./dolphinscheduler-logs:/opt/dolphinscheduler/logs networks: - dolphinscheduler @@ -225,7 +223,6 @@ volumes: dolphinscheduler-postgresql-initdb: dolphinscheduler-zookeeper: dolphinscheduler-worker-data: - dolphinscheduler-logs: configs: dolphinscheduler-worker-task-env: diff --git a/dolphinscheduler-alert/pom.xml b/dolphinscheduler-alert/pom.xml index 08ccc3a246..215916ddf7 100644 --- a/dolphinscheduler-alert/pom.xml +++ b/dolphinscheduler-alert/pom.xml @@ -21,7 +21,7 @@ org.apache.dolphinscheduler dolphinscheduler - 1.3.0-SNAPSHOT + 1.3.2-SNAPSHOT dolphinscheduler-alert ${project.artifactId} diff --git a/dolphinscheduler-api/pom.xml b/dolphinscheduler-api/pom.xml index f45f854abe..035551e669 100644 --- a/dolphinscheduler-api/pom.xml +++ b/dolphinscheduler-api/pom.xml @@ -21,7 +21,7 @@ org.apache.dolphinscheduler dolphinscheduler - 1.3.0-SNAPSHOT + 1.3.2-SNAPSHOT dolphinscheduler-api ${project.artifactId} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/Directory.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/Directory.java index 289d5060bf..2d06e1b6ef 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/Directory.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/Directory.java @@ -1,5 +1,3 @@ -package org.apache.dolphinscheduler.api.dto.resources; - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -16,6 +14,8 @@ package org.apache.dolphinscheduler.api.dto.resources; * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.dolphinscheduler.api.dto.resources; + /** * directory */ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/FileLeaf.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/FileLeaf.java index b9b91821f4..92aaf13320 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/FileLeaf.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/FileLeaf.java @@ -1,5 +1,3 @@ -package org.apache.dolphinscheduler.api.dto.resources; - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -16,6 +14,8 @@ package org.apache.dolphinscheduler.api.dto.resources; * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.dolphinscheduler.api.dto.resources; + /** * file leaf */ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/ResourceComponent.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/ResourceComponent.java index 6a5b6e3353..999fc88b04 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/ResourceComponent.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/ResourceComponent.java @@ -1,11 +1,3 @@ -package org.apache.dolphinscheduler.api.dto.resources; - -import com.fasterxml.jackson.annotation.JsonPropertyOrder; -import org.apache.dolphinscheduler.common.enums.ResourceType; - -import java.util.ArrayList; -import java.util.List; - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -22,6 +14,14 @@ import java.util.List; * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.dolphinscheduler.api.dto.resources; + +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import org.apache.dolphinscheduler.common.enums.ResourceType; + +import java.util.ArrayList; +import java.util.List; + /** * resource component */ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/Visitor.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/Visitor.java index 3dfce7c7c1..b2ef79c398 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/Visitor.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/visitor/Visitor.java @@ -1,8 +1,3 @@ -package org.apache.dolphinscheduler.api.dto.resources.visitor; - - -import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent; - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -19,6 +14,10 @@ import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent; * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.dolphinscheduler.api.dto.resources.visitor; + +import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent; + /** * Visitor */ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptor.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptor.java index af29cb67fb..cb7a8e653f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptor.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptor.java @@ -50,20 +50,10 @@ public class LoginHandlerInterceptor implements HandlerInterceptor { /** * Intercept the execution of a handler. Called after HandlerMapping determined - * an appropriate handler object, but before HandlerAdapter invokes the handler. - *

DispatcherServlet processes a handler in an execution chain, consisting - * of any number of interceptors, with the handler itself at the end. - * With this method, each interceptor can decide to abort the execution chain, - * typically sending a HTTP error or writing a custom response. - *

Note: special considerations apply for asynchronous - * request processing. For more details see - * {@link org.springframework.web.servlet.AsyncHandlerInterceptor}. - * @param request current HTTP request - * @param response current HTTP response - * @param handler chosen handler to execute, for type and/or instance evaluation - * @return {@code true} if the execution chain should proceed with the - * next interceptor or the handler itself. Else, DispatcherServlet assumes - * that this interceptor has already dealt with the response itself. + * @param request current HTTP request + * @param response current HTTP response + * @param handler chosen handler to execute, for type and/or instance evaluation + * @return boolean true or false */ @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java index 3370961fd4..55c4fa113b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java @@ -16,29 +16,33 @@ */ package org.apache.dolphinscheduler.api.service; +import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ZKNodeType; -import org.apache.dolphinscheduler.dao.MonitorDBDao; import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.common.model.WorkerServerModel; +import org.apache.dolphinscheduler.dao.MonitorDBDao; import org.apache.dolphinscheduler.dao.entity.MonitorRecord; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.dolphinscheduler.common.utils.Preconditions.*; +import com.google.common.collect.Sets; /** * monitor service */ @Service -public class MonitorService extends BaseService{ +public class MonitorService extends BaseService { @Autowired private ZookeeperMonitor zookeeperMonitor; @@ -108,15 +112,41 @@ public class MonitorService extends BaseService{ public Map queryWorker(User loginUser) { Map result = new HashMap<>(5); - List masterServers = getServerListFromZK(false); - - result.put(Constants.DATA_LIST, masterServers); + List workerServers = getServerListFromZK(false) + .stream() + .map((Server server) -> { + WorkerServerModel model = new WorkerServerModel(); + model.setId(server.getId()); + model.setHost(server.getHost()); + model.setPort(server.getPort()); + model.setZkDirectories(Sets.newHashSet(server.getZkDirectory())); + model.setResInfo(server.getResInfo()); + model.setCreateTime(server.getCreateTime()); + model.setLastHeartbeatTime(server.getLastHeartbeatTime()); + return model; + }) + .collect(Collectors.toList()); + + Map workerHostPortServerMapping = workerServers + .stream() + .collect(Collectors.toMap( + (WorkerServerModel worker) -> { + String[] s = worker.getZkDirectories().iterator().next().split("/"); + return s[s.length - 1]; + } + , Function.identity() + , (WorkerServerModel oldOne, WorkerServerModel newOne) -> { + oldOne.getZkDirectories().addAll(newOne.getZkDirectories()); + return oldOne; + })); + + result.put(Constants.DATA_LIST, workerHostPortServerMapping.values()); putMsg(result,Status.SUCCESS); return result; } - public List getServerListFromZK(boolean isMaster){ + public List getServerListFromZK(boolean isMaster) { checkNotNull(zookeeperMonitor); ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER; diff --git a/dolphinscheduler-common/pom.xml b/dolphinscheduler-common/pom.xml index 3b0f82f5b0..130f9dfa99 100644 --- a/dolphinscheduler-common/pom.xml +++ b/dolphinscheduler-common/pom.xml @@ -21,7 +21,7 @@ org.apache.dolphinscheduler dolphinscheduler - 1.3.0-SNAPSHOT + 1.3.2-SNAPSHOT dolphinscheduler-common dolphinscheduler-common @@ -84,12 +84,14 @@ com.fasterxml.jackson.core jackson-databind - org.apache.commons commons-collections4 - + + commons-beanutils + commons-beanutils + org.apache.hadoop hadoop-common diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 2432b7ee93..57491c96b7 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -978,5 +978,10 @@ public final class Constants { public static final int NORAML_NODE_STATUS = 0; public static final int ABNORMAL_NODE_STATUS = 1; + /** + * net system properties + */ + public static final String DOLPHIN_SCHEDULER_PREFERRED_NETWORK_INTERFACE = "dolphin.scheduler.network.interface.preferred"; + } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerServerModel.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerServerModel.java new file mode 100644 index 0000000000..984124b872 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerServerModel.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.model; + + +import java.util.Date; +import java.util.Set; + +import com.fasterxml.jackson.annotation.JsonFormat; + +/** + * server + */ +public class WorkerServerModel { + + /** + * id + */ + private int id; + + /** + * host + */ + private String host; + + /** + * port + */ + private int port; + + /** + * worker directories in zookeeper + */ + private Set zkDirectories; + + /** + * resource info about CPU and memory + */ + private String resInfo; + + /** + * create time + */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date createTime; + + /** + * last heart beat time + */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date lastHeartbeatTime; + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public Set getZkDirectories() { + return zkDirectories; + } + + public void setZkDirectories(Set zkDirectories) { + this.zkDirectories = zkDirectories; + } + + public Date getLastHeartbeatTime() { + return lastHeartbeatTime; + } + + public void setLastHeartbeatTime(Date lastHeartbeatTime) { + this.lastHeartbeatTime = lastHeartbeatTime; + } + + public String getResInfo() { + return resInfo; + } + + public void setResInfo(String resInfo) { + this.resInfo = resInfo; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java index 22c58640cc..d900f0f6bf 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java @@ -16,8 +16,7 @@ */ package org.apache.dolphinscheduler.common.utils; - -import org.apache.commons.collections.BeanMap; +import org.apache.commons.beanutils.BeanMap; import org.apache.commons.lang.StringUtils; import java.util.*; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java index 4111ef9714..fce59d6548 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java @@ -408,7 +408,7 @@ public class HadoopUtils implements Closeable { * @param applicationId application id * @return the return may be null or there may be other parse exceptions */ - public ExecutionStatus getApplicationStatus(String applicationId) throws Exception{ + public ExecutionStatus getApplicationStatus(String applicationId) throws Exception { if (StringUtils.isEmpty(applicationId)) { return null; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java index 3bc80c7bfc..13a25dc636 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java @@ -21,13 +21,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.*; -import java.util.Enumeration; -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.regex.Pattern; import static java.util.Collections.emptyList; +import static org.apache.dolphinscheduler.common.Constants.DOLPHIN_SCHEDULER_PREFERRED_NETWORK_INTERFACE; /** * NetUtils @@ -171,9 +169,19 @@ public class NetUtils { logger.warn("ValidNetworkInterfaces exception", e); } + NetworkInterface result = null; + // Try to specify config NetWork Interface + for (NetworkInterface networkInterface : validNetworkInterfaces) { + if (isSpecifyNetworkInterface(networkInterface)) { + result = networkInterface; + break; + } + } + if (null != result) { + return result; + } return validNetworkInterfaces.get(0); - } /** @@ -206,4 +214,8 @@ public class NetUtils { || !networkInterface.isUp(); } + private static boolean isSpecifyNetworkInterface(NetworkInterface networkInterface) { + String preferredNetworkInterface = System.getProperty(DOLPHIN_SCHEDULER_PREFERRED_NETWORK_INTERFACE); + return Objects.equals(networkInterface.getDisplayName(), preferredNetworkInterface); + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/Preconditions.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/Preconditions.java index 32fd298a7d..1fe40b97e3 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/Preconditions.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/Preconditions.java @@ -18,266 +18,55 @@ package org.apache.dolphinscheduler.common.utils; /** - * A collection of static utility methods to validate input. + * utility methods for validating input * - *

This class is modelled after Google Guava's Preconditions class, and partly takes code - * from that class. We add this code to here base in order to reduce external - * dependencies. */ public final class Preconditions { - // ------------------------------------------------------------------------ - // Null checks - // ------------------------------------------------------------------------ + private Preconditions() {} /** - * Ensures that the given object reference is not null. - * Upon violation, a {@code NullPointerException} with no message is thrown. + * if obj is null will throw NPE * - * @param reference reference + * @param obj obj * @param T * @return T */ - public static T checkNotNull(T reference) { - if (reference == null) { + public static T checkNotNull(T obj) { + if (obj == null) { throw new NullPointerException(); } - return reference; + return obj; } /** - * Ensures that the given object reference is not null. - * Upon violation, a {@code NullPointerException} with the given message is thrown. - * @param reference reference - * @param errorMessage errorMessage + * if obj is null will throw NullPointerException with error message + * @param obj obj + * @param errorMsg error message * @param T * @return T */ - public static T checkNotNull(T reference, String errorMessage) { - if (reference == null) { - throw new NullPointerException(String.valueOf(errorMessage)); - } - return reference; - } - - /** - * Ensures that the given object reference is not null. - * Upon violation, a {@code NullPointerException} with the given message is thrown. - * - *

The error message is constructed from a template and an arguments array, after - * a similar fashion as {@link String#format(String, Object...)}, but supporting only - * {@code %s} as a placeholder. - * - * @param reference The object reference - * @param errorMessageTemplate The message template for the {@code NullPointerException} - * that is thrown if the check fails. The template substitutes its - * {@code %s} placeholders with the error message arguments. - * @param errorMessageArgs The arguments for the error message, to be inserted into the - * message template for the {@code %s} placeholders. - * - * @param T - * @return The object reference itself (generically typed). - */ - public static T checkNotNull(T reference, - String errorMessageTemplate, - Object... errorMessageArgs) { - - if (reference == null) { - throw new NullPointerException(format(errorMessageTemplate, errorMessageArgs)); + public static T checkNotNull(T obj, String errorMsg) { + if (obj == null) { + throw new NullPointerException(errorMsg); } - return reference; + return obj; } - // ------------------------------------------------------------------------ - // Boolean Condition Checking (Argument) - // ------------------------------------------------------------------------ /** - * Checks the given boolean condition, and throws an {@code IllegalArgumentException} if - * the condition is not met (evaluates to {@code false}). + * if condition is false will throw an IllegalArgumentException with the given message * - * @param condition The condition to check + * @param condition condition + * @param errorMsg error message * * @throws IllegalArgumentException Thrown, if the condition is violated. */ - public static void checkArgument(boolean condition) { - if (!condition) { - throw new IllegalArgumentException(); - } - } - - /** - * Checks the given boolean condition, and throws an {@code IllegalArgumentException} if - * the condition is not met (evaluates to {@code false}). The exception will have the - * given error message. - * - * @param condition The condition to check - * @param errorMessage The message for the {@code IllegalArgumentException} that is thrown if the check fails. - * - * @throws IllegalArgumentException Thrown, if the condition is violated. - */ - public static void checkArgument(boolean condition, Object errorMessage) { - if (!condition) { - throw new IllegalArgumentException(String.valueOf(errorMessage)); - } - } - - /** - * Checks the given boolean condition, and throws an {@code IllegalArgumentException} if - * the condition is not met (evaluates to {@code false}). - * - * @param condition The condition to check - * @param errorMessageTemplate The message template for the {@code IllegalArgumentException} - * that is thrown if the check fails. The template substitutes its - * {@code %s} placeholders with the error message arguments. - * @param errorMessageArgs The arguments for the error message, to be inserted into the - * message template for the {@code %s} placeholders. - * - * @throws IllegalArgumentException Thrown, if the condition is violated. - */ - public static void checkArgument(boolean condition, - String errorMessageTemplate, - Object... errorMessageArgs) { - - if (!condition) { - throw new IllegalArgumentException(format(errorMessageTemplate, errorMessageArgs)); - } - } - - // ------------------------------------------------------------------------ - // Boolean Condition Checking (State) - // ------------------------------------------------------------------------ - - /** - * Checks the given boolean condition, and throws an {@code IllegalStateException} if - * the condition is not met (evaluates to {@code false}). - * - * @param condition The condition to check - * - * @throws IllegalStateException Thrown, if the condition is violated. - */ - public static void checkState(boolean condition) { - if (!condition) { - throw new IllegalStateException(); - } - } - - /** - * Checks the given boolean condition, and throws an {@code IllegalStateException} if - * the condition is not met (evaluates to {@code false}). The exception will have the - * given error message. - * - * @param condition The condition to check - * @param errorMessage The message for the {@code IllegalStateException} that is thrown if the check fails. - * - * @throws IllegalStateException Thrown, if the condition is violated. - */ - public static void checkState(boolean condition, Object errorMessage) { + public static void checkArgument(boolean condition, Object errorMsg) { if (!condition) { - throw new IllegalStateException(String.valueOf(errorMessage)); + throw new IllegalArgumentException(String.valueOf(errorMsg)); } } - /** - * Checks the given boolean condition, and throws an {@code IllegalStateException} if - * the condition is not met (evaluates to {@code false}). - * - * @param condition The condition to check - * @param errorMessageTemplate The message template for the {@code IllegalStateException} - * that is thrown if the check fails. The template substitutes its - * {@code %s} placeholders with the error message arguments. - * @param errorMessageArgs The arguments for the error message, to be inserted into the - * message template for the {@code %s} placeholders. - * - * @throws IllegalStateException Thrown, if the condition is violated. - */ - public static void checkState(boolean condition, - String errorMessageTemplate, - Object... errorMessageArgs) { - - if (!condition) { - throw new IllegalStateException(format(errorMessageTemplate, errorMessageArgs)); - } - } - - /** - * Ensures that the given index is valid for an array, list or string of the given size. - * - * @param index index to check - * @param size size of the array, list or string - * - * @throws IllegalArgumentException Thrown, if size is negative. - * @throws IndexOutOfBoundsException Thrown, if the index negative or greater than or equal to size - */ - public static void checkElementIndex(int index, int size) { - checkArgument(size >= 0, "Size was negative."); - if (index < 0 || index >= size) { - throw new IndexOutOfBoundsException("Index: " + index + ", Size: " + size); - } - } - /** - * Ensures that the given index is valid for an array, list or string of the given size. - * - * @param index index to check - * @param size size of the array, list or string - * @param errorMessage The message for the {@code IndexOutOfBoundsException} that is thrown if the check fails. - * - * @throws IllegalArgumentException Thrown, if size is negative. - * @throws IndexOutOfBoundsException Thrown, if the index negative or greater than or equal to size - */ - public static void checkElementIndex(int index, int size, String errorMessage) { - checkArgument(size >= 0, "Size was negative."); - if (index < 0 || index >= size) { - throw new IndexOutOfBoundsException(String.valueOf(errorMessage) + " Index: " + index + ", Size: " + size); - } - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - /** - * A simplified formatting method. Similar to {@link String#format(String, Object...)}, but - * with lower overhead (only String parameters, no locale, no format validation). - * - *

This method is taken quasi verbatim from the Guava Preconditions class. - */ - private static String format( String template, Object... args) { - final int numArgs = args == null ? 0 : args.length; - template = String.valueOf(template); // null -> "null" - - // start substituting the arguments into the '%s' placeholders - StringBuilder builder = new StringBuilder(template.length() + 16 * numArgs); - int templateStart = 0; - int i = 0; - while (i < numArgs) { - int placeholderStart = template.indexOf("%s", templateStart); - if (placeholderStart == -1) { - break; - } - builder.append(template.substring(templateStart, placeholderStart)); - builder.append(args[i++]); - templateStart = placeholderStart + 2; - } - builder.append(template.substring(templateStart)); - - // if we run out of placeholders, append the extra args in square braces - if (i < numArgs) { - builder.append(" ["); - builder.append(args[i++]); - while (i < numArgs) { - builder.append(", "); - builder.append(args[i++]); - } - builder.append(']'); - } - - return builder.toString(); - } - - // ------------------------------------------------------------------------ - - /** Private constructor to prevent instantiation. */ - private Preconditions() {} } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PreconditionsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PreconditionsTest.java index 47b24bb93c..3bf13aa9f3 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PreconditionsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PreconditionsTest.java @@ -33,11 +33,9 @@ public class PreconditionsTest { */ @Test public void testCheckNotNull() throws Exception { - String testReference = "test reference"; - //test reference is not null + String testReference = "test object"; Assert.assertEquals(testReference, Preconditions.checkNotNull(testReference)); - Assert.assertEquals(testReference,Preconditions.checkNotNull(testReference,"reference is null")); - Assert.assertEquals(testReference,Preconditions.checkNotNull(testReference,"%s is null",testReference)); + Assert.assertEquals(testReference,Preconditions.checkNotNull(testReference,"object is null")); //test reference is null try { @@ -51,120 +49,19 @@ public class PreconditionsTest { } catch (NullPointerException ex) { assertNull(ex.getMessage()); } - //test reference is null ,expect contains errorMessage - try { - Preconditions.checkNotNull(null,"reference is null"); - } catch (NullPointerException ex) { - assertThat(ex.getMessage(), containsString("reference is null")); - } - - try { - Preconditions.checkNotNull("","reference is null"); - } catch (NullPointerException ex) { - assertThat(ex.getMessage(), containsString("reference is null")); - } - //test reference is null ,expect contains errorMessageTemplate and errorMessageArgs try { - Preconditions.checkNotNull(null,"%s is null",testReference); + Preconditions.checkNotNull(null,"object is null"); } catch (NullPointerException ex) { - assertThat(ex.getMessage(), containsString(testReference + " is null")); + assertThat(ex.getMessage(), containsString("object is null")); } try { - Preconditions.checkNotNull("","%s is null",testReference); + Preconditions.checkNotNull("","object is null"); } catch (NullPointerException ex) { - assertThat(ex.getMessage(), containsString(testReference + " is null")); - } - } - - /** - * Test checkArgument - */ - @Test - public void testCheckArgument() throws Exception { - - int argument = 100; - //boolean condition is true - Preconditions.checkArgument(argument > 0 && argument < 200); - - //boolean condition is false - try { - Preconditions.checkArgument(argument > 0 && argument < 50); - } catch (IllegalArgumentException ex) { - assertNull(ex.getMessage()); - } - - //boolean condition is false ,expect contains errorMessage - try { - Preconditions.checkArgument(argument > 300, "argument is error"); - } catch (IllegalArgumentException ex) { - assertThat(ex.getMessage(), containsString("argument is error")); - } - - //boolean condition is false,expect contains errorMessageTemplate and errorMessageArgs - try { - Preconditions.checkArgument(argument > 0 && argument < 99, "argument %s is error",argument); - } catch (IllegalArgumentException ex) { - assertThat(ex.getMessage(), containsString( "argument " + argument + " is error")); - } - } - - /** - * Test checkState - */ - @Test - public void testCheckState() throws Exception { - int state = 1; - //boolean condition is true - Preconditions.checkState(state == 1); - Preconditions.checkState(state > -1); - - //boolean condition is false - try { - Preconditions.checkState(state > 2); - } catch (IllegalStateException ex) { - assertNull(ex.getMessage()); - } - - //boolean condition is false ,expect contains errorMessage - try { - Preconditions.checkState(state < 1, "state is error"); - } catch (IllegalStateException ex) { - assertThat(ex.getMessage(), containsString("state is error")); + assertThat(ex.getMessage(), containsString("object is null")); } - //boolean condition is false,expect contains errorMessageTemplate and errorMessageArgs - try { - Preconditions.checkState(state < -1 , "state %s is error",state); - } catch (IllegalStateException ex) { - assertThat(ex.getMessage(), containsString( "state " + state + " is error")); - } } - /** - * Test checkElementIndex - */ - @Test - public void testCheckElementIndex() throws Exception { - int index = 2; - int size = 30; - - //boolean condition is true - Preconditions.checkElementIndex(index, size); - - //boolean condition is false - try { - Preconditions.checkElementIndex(-1, 10); - } catch (IndexOutOfBoundsException ex) { - assertThat(ex.getMessage(), containsString("Index: -1, Size: 10")); - } - - //boolean condition is false ,expect contains errorMessage - try { - Preconditions.checkElementIndex(100, 50, "index is greater than size"); - } catch (IndexOutOfBoundsException ex) { - assertThat(ex.getMessage(), containsString("index is greater than size Index: 100, Size: 50")); - } - } } diff --git a/dolphinscheduler-dao/pom.xml b/dolphinscheduler-dao/pom.xml index 433bb70047..c474f6d992 100644 --- a/dolphinscheduler-dao/pom.xml +++ b/dolphinscheduler-dao/pom.xml @@ -21,7 +21,7 @@ org.apache.dolphinscheduler dolphinscheduler - 1.3.0-SNAPSHOT + 1.3.2-SNAPSHOT dolphinscheduler-dao ${project.artifactId} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSource.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSource.java index 055937b49c..e64b0395ec 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSource.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSource.java @@ -14,10 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.datasource; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; /** * data source of hive @@ -40,4 +46,51 @@ public class HiveDataSource extends BaseDataSource { public DbType dbTypeSelector() { return DbType.HIVE; } + + /** + * build hive jdbc params,append : ?hive_conf_list + * + * hive jdbc url template: + * + * jdbc:hive2://:,:/dbName;initFile=;sess_var_list?hive_conf_list#hive_var_list + * + * @param otherParams otherParams + * @return filter otherParams + */ + @Override + protected String filterOther(String otherParams) { + if (StringUtils.isBlank(otherParams)) { + return ""; + } + + StringBuilder hiveConfListSb = new StringBuilder(); + hiveConfListSb.append("?"); + StringBuilder sessionVarListSb = new StringBuilder(); + + String[] otherArray = otherParams.split(";", -1); + + // get the default hive conf var name + Set hiveConfSet = Stream.of(ConfVars.values()).map(confVars -> confVars.varname) + .collect(Collectors.toSet()); + + for (String conf : otherArray) { + if (hiveConfSet.contains(conf.split("=")[0])) { + hiveConfListSb.append(conf).append(";"); + } else { + sessionVarListSb.append(conf).append(";"); + } + } + + // remove the last ";" + if (sessionVarListSb.length() > 0) { + sessionVarListSb.deleteCharAt(sessionVarListSb.length() - 1); + } + + if (hiveConfListSb.length() > 0) { + hiveConfListSb.deleteCharAt(hiveConfListSb.length() - 1); + } + + return sessionVarListSb.toString() + hiveConfListSb.toString(); + } + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index eb51fc50cb..a90d927154 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.dao.entity; +import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.fasterxml.jackson.annotation.JsonFormat; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; @@ -27,10 +28,11 @@ import org.apache.dolphinscheduler.common.utils.*; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import java.io.Serializable; import java.util.Date; -import java.util.List; +import java.util.Map; /** * task instance @@ -213,7 +215,7 @@ public class TaskInstance implements Serializable { @TableField(exist = false) - private List resources; + private Map resources; @@ -455,10 +457,14 @@ public class TaskInstance implements Serializable { || (this.getState().typeIsFailure() && !taskCanRetry()); } - public List getResources() { + public Map getResources() { return resources; } + public void setResources(Map resources) { + this.resources = resources; + } + public boolean isSubProcess(){ return TaskType.SUB_PROCESS.equals(TaskType.valueOf(this.taskType)); } @@ -471,9 +477,7 @@ public class TaskInstance implements Serializable { return TaskType.CONDITIONS.equals(TaskType.valueOf(this.taskType)); } - public void setResources(List resources) { - this.resources = resources; - } + /** * determine if you can try again diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSourceTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSourceTest.java new file mode 100644 index 0000000000..36a505872f --- /dev/null +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSourceTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.datasource; + +import org.junit.Assert; +import org.junit.Test; + +/** + * test data source of hive + */ +public class HiveDataSourceTest { + + @Test + public void testfilterOther() { + BaseDataSource hiveDataSource = new HiveDataSource(); + + // not contain hive_site_conf + String other = hiveDataSource.filterOther("charset=UTF-8"); + Assert.assertEquals("charset=UTF-8", other); + + // not contain + other = hiveDataSource.filterOther(""); + Assert.assertEquals("", other); + + // only contain hive_site_conf + other = hiveDataSource.filterOther("hive.mapred.mode=strict"); + Assert.assertEquals("?hive.mapred.mode=strict", other); + + // contain hive_site_conf at the first + other = hiveDataSource.filterOther("hive.mapred.mode=strict;charset=UTF-8"); + Assert.assertEquals("charset=UTF-8?hive.mapred.mode=strict", other); + + // contain hive_site_conf in the middle + other = hiveDataSource.filterOther("charset=UTF-8;hive.mapred.mode=strict;foo=bar"); + Assert.assertEquals("charset=UTF-8;foo=bar?hive.mapred.mode=strict", other); + + // contain hive_site_conf at the end + other = hiveDataSource.filterOther("charset=UTF-8;foo=bar;hive.mapred.mode=strict"); + Assert.assertEquals("charset=UTF-8;foo=bar?hive.mapred.mode=strict", other); + + // contain multi hive_site_conf + other = hiveDataSource.filterOther("charset=UTF-8;foo=bar;hive.mapred.mode=strict;hive.exec.parallel=true"); + Assert.assertEquals("charset=UTF-8;foo=bar?hive.mapred.mode=strict;hive.exec.parallel=true", other); + } + + @Test + public void testGetHiveJdbcUrlOther() { + + BaseDataSource hiveDataSource = new HiveDataSource(); + hiveDataSource.setAddress("jdbc:hive2://127.0.0.1:10000"); + hiveDataSource.setDatabase("test"); + hiveDataSource.setPassword("123456"); + hiveDataSource.setUser("test"); + Assert.assertEquals("jdbc:hive2://127.0.0.1:10000/test", hiveDataSource.getJdbcUrl()); + + hiveDataSource.setOther("charset=UTF-8;hive.mapred.mode=strict;hive.server2.thrift.http.path=hs2"); + + Assert.assertEquals( + "jdbc:hive2://127.0.0.1:10000/test;charset=UTF-8?hive.mapred.mode=strict;hive.server2.thrift.http.path=hs2", + hiveDataSource.getJdbcUrl()); + + hiveDataSource.setOther("hive.mapred.mode=strict;hive.server2.thrift.http.path=hs2"); + + Assert.assertEquals( + "jdbc:hive2://127.0.0.1:10000/test;?hive.mapred.mode=strict;hive.server2.thrift.http.path=hs2", + hiveDataSource.getJdbcUrl()); + + } + +} \ No newline at end of file diff --git a/dolphinscheduler-dist/pom.xml b/dolphinscheduler-dist/pom.xml index 166edc9f5f..ca3834e90b 100644 --- a/dolphinscheduler-dist/pom.xml +++ b/dolphinscheduler-dist/pom.xml @@ -20,7 +20,7 @@ dolphinscheduler org.apache.dolphinscheduler - 1.3.0-SNAPSHOT + 1.3.2-SNAPSHOT 4.0.0 diff --git a/dolphinscheduler-dist/release-docs/LICENSE b/dolphinscheduler-dist/release-docs/LICENSE index 20ca383a2a..10279872c7 100644 --- a/dolphinscheduler-dist/release-docs/LICENSE +++ b/dolphinscheduler-dist/release-docs/LICENSE @@ -228,6 +228,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. byte-buddy 1.9.10: https://mvnrepository.com/artifact/net.bytebuddy/byte-buddy/1.9.10, Apache 2.0 classmate 1.4.0: https://mvnrepository.com/artifact/com.fasterxml/classmate/1.4.0, Apache 2.0 clickhouse-jdbc 0.1.52: https://mvnrepository.com/artifact/ru.yandex.clickhouse/clickhouse-jdbc/0.1.52, Apache 2.0 + commons-beanutils 1.7.0 https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils/1.7.0, Apache 2.0 commons-cli 1.2: https://mvnrepository.com/artifact/commons-cli/commons-cli/1.2, Apache 2.0 commons-codec 1.6: https://mvnrepository.com/artifact/commons-codec/commons-codec/1.6, Apache 2.0 commons-collections 3.2.2: https://mvnrepository.com/artifact/commons-collections/commons-collections/3.2.2, Apache 2.0 diff --git a/dolphinscheduler-dist/release-docs/NOTICE b/dolphinscheduler-dist/release-docs/NOTICE index b72d2ed23d..6ce789c7fb 100644 --- a/dolphinscheduler-dist/release-docs/NOTICE +++ b/dolphinscheduler-dist/release-docs/NOTICE @@ -132,18 +132,6 @@ granted provided that the copyright notice appears in all copies. ======================================================================== -Apache Log4j NOTICE - -======================================================================== -Apache log4j -Copyright 2007 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). - -======================================================================== - - Joda Time NOTICE ======================================================================== @@ -438,17 +426,6 @@ under the Apache License 2.0 (see: StringUtils.containsWhitespace()) ======================================================================== -Apache Jakarta Commons Lang NOTICE - -======================================================================== -Apache Jakarta Commons Lang -Copyright 2001-2007 The Apache Software Foundation - -This product includes software developed by -The Apache Software Foundation (http://www.apache.org/). -======================================================================== - - Apache ZooKeeper NOTICE ======================================================================== @@ -672,109 +649,6 @@ The licenses for these third party components are included in LICENSE.txt ======================================================================== -Apache Commons CLI NOTICE - -======================================================================== -Apache Commons CLI -Copyright 2001-2009 The Apache Software Foundation - -This product includes software developed by -The Apache Software Foundation (http://www.apache.org/). - -======================================================================== - - -Apache Commons Collections NOTICE - -======================================================================== -Apache Commons Collections -Copyright 2001-2015 The Apache Software Foundation - -This product includes software developed by -The Apache Software Foundation (http://www.apache.org/). -======================================================================== - - -Apache Commons Collections4 NOTICE - -======================================================================== -Apache Commons Collections -Copyright 2001-2019 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). - -======================================================================== - - -Apache Commons Compress NOTICE - -======================================================================== -Apache Commons Compress -Copyright 2002-2012 The Apache Software Foundation - -This product includes software developed by -The Apache Software Foundation (http://www.apache.org/). - -======================================================================== - - -Apache Commons Configuration NOTICE - -======================================================================== -Apache Commons Configuration -Copyright 2001-2013 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). - -======================================================================== - - -Apache Commons Daemon NOTICE - -======================================================================== -Copyright 1999-2019 The Apache Software Foundation - -This product includes software developed by -The Apache Software Foundation (http://www.apache.org/). - -======================================================================== - - -Apache Commons Email NOTICE - -======================================================================== -Apache Commons Email -Copyright 2001-2017 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). - -======================================================================== - - -Apache HttpComponents Client NOTICE - -======================================================================== -Copyright 1999-2018 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). - -======================================================================== - -Apache Commons IO NOTICE - -======================================================================== -Copyright 2002-2019 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). - -======================================================================== - - Commons Logging NOTICE ======================================================================== @@ -872,18 +746,6 @@ file. ======================================================================== -Apache Commons Pool NOTICE - -======================================================================== -Apache Commons Pool -Copyright 2001-2012 The Apache Software Foundation - -This product includes software developed by -The Apache Software Foundation (http://www.apache.org/). - -========================================================================= - - Apache Derby NOTICE =========================================================================== @@ -1176,18 +1038,6 @@ No other notice covers that jar file. ========================================================================= -Apache HttpClient NOTICE - -========================================================================= -Apache HttpComponents Client -Copyright 1999-2015 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). - -========================================================================= - - Apache Httpcomponents core NOTICE ========================================================================= @@ -1229,28 +1079,6 @@ from the source code management (SCM) system project uses. ========================================================================= -Apache Avro NOTICE - -========================================================================= -Apache Avro -Copyright 2009-2013 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). -========================================================================= - - -Apach Commons IO NOTICE - -========================================================================= -Apache Commons IO -Copyright 2002-2012 The Apache Software Foundation - -This product includes software developed by -The Apache Software Foundation (http://www.apache.org/). -========================================================================= - - Junit4 NOTICE ========================================================================= @@ -1335,39 +1163,6 @@ Junit4 NOTICE WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - ========================================================================= - - -Apache Thrift NOTICE - -========================================================================= - Apache Thrift - Copyright 2006-2010 The Apache Software Foundation. - - This product includes software developed at - The Apache Software Foundation (http://www.apache.org/). -========================================================================= - - - Apache Commons DBCP NOTICE - -========================================================================= - Apache Commons DBCP - Copyright 2001-2010 The Apache Software Foundation - - This product includes software developed by - The Apache Software Foundation (http://www.apache.org/). -========================================================================= - - - Apache Commons Daemon NOTICE - -========================================================================= - Apache Commons Daemon - Copyright 1999-2013 The Apache Software Foundation - - This product includes software developed by - The Apache Software Foundation (http://www.apache.org/). ========================================================================= Bonecp NOTICE @@ -1500,28 +1295,6 @@ Licensed under the Apache License, Version 2.0 (the "License"); ========================================================================= - Apache EL NOTICE - -========================================================================= - Apache Tomcat - Copyright 1999-2018 The Apache Software Foundation - - This product includes software developed at - The Apache Software Foundation (http://www.apache.org/). -========================================================================= - - -Apache Commons Net NOTICE - -========================================================================= -Apache Commons Net -Copyright 2001-2012 The Apache Software Foundation - -This product includes software developed by -The Apache Software Foundation (http://www.apache.org/). -========================================================================= - - Apache Logging Log4j2 NOTICE ========================================================================= @@ -1586,50 +1359,6 @@ Portions of this software were originally based on the following: ========================================================================= -Apache Parquet Hadoop Bundle NOTICE - -========================================================================= -Apache Parquet Hadoop Bundle -Copyright 2015 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). -========================================================================= - - -Apache Log4j 1.x NOTICE - -========================================================================= -Apache Log4j 1.x Compatibility API -Copyright 1999-2019 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). -========================================================================= - - -Apache log4j NOTICE - -========================================================================= -Apache log4j -Copyright 2007 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). -========================================================================= - - -Apache Java Data Objects (JDO) NOTICE - -========================================================================= -Apache Java Data Objects (JDO) -Copyright 2005-2006 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). -========================================================================= - - Jackson XC NOTICE ========================================================================= @@ -1656,17 +1385,6 @@ see CREDITS file. ========================================================================= -Apache HttpClient Mime NOTICE - -========================================================================= -Apache HttpClient Mime -Copyright 1999-2019 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). -========================================================================= - - Apache HttpCore NOTICE ========================================================================= @@ -1693,17 +1411,6 @@ The Apache Software Foundation (http://www.apache.org/). ========================================================================= -Hive Storage API NOTICE - -========================================================================= -Apache Hive -Copyright 2016 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). -========================================================================= - - Hadoop NOTICE ========================================================================= @@ -2190,60 +1897,6 @@ Other developers who have contributed code are: ========================================================================= - -Apache Yetus NOTICE - -========================================================================= -Apache Yetus - Audience Annotations -Copyright 2015-2017 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). -========================================================================= - - -Apache Directory LDAP API Utilities NOTICE - -========================================================================= -Apache Directory LDAP API Utilities -Copyright 2003-2013 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). -========================================================================= - - -Apache Directory API ASN.1 API NOTICE - -========================================================================= -Apache Directory API ASN.1 API -Copyright 2003-2013 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). -========================================================================= - - -ApacheDS Protocol Kerberos Codec NOTICE - -========================================================================= -ApacheDS Protocol Kerberos Codec -Copyright 2003-2013 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). -========================================================================= - - -ApacheDS I18n NOTICE - -========================================================================= -ApacheDS I18n -Copyright 2003-2013 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). - ANT NOTICE ========================================================================= diff --git a/dolphinscheduler-dist/release-docs/licenses/LICENSE-commons-beanutils.txt b/dolphinscheduler-dist/release-docs/licenses/LICENSE-commons-beanutils.txt new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-commons-beanutils.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/dolphinscheduler-microbench/pom.xml b/dolphinscheduler-microbench/pom.xml index 4d9775fa70..6b11b2e2d6 100644 --- a/dolphinscheduler-microbench/pom.xml +++ b/dolphinscheduler-microbench/pom.xml @@ -21,7 +21,7 @@ dolphinscheduler org.apache.dolphinscheduler - 1.3.0-SNAPSHOT + 1.3.2-SNAPSHOT 4.0.0 diff --git a/dolphinscheduler-plugin-api/pom.xml b/dolphinscheduler-plugin-api/pom.xml index 5c9a138671..7db15e73c3 100644 --- a/dolphinscheduler-plugin-api/pom.xml +++ b/dolphinscheduler-plugin-api/pom.xml @@ -23,7 +23,7 @@ org.apache.dolphinscheduler dolphinscheduler - 1.3.0-SNAPSHOT + 1.3.2-SNAPSHOT dolphinscheduler-plugin-api ${project.artifactId} diff --git a/dolphinscheduler-remote/pom.xml b/dolphinscheduler-remote/pom.xml index a20a02e90d..4d398f3069 100644 --- a/dolphinscheduler-remote/pom.xml +++ b/dolphinscheduler-remote/pom.xml @@ -20,7 +20,7 @@ dolphinscheduler org.apache.dolphinscheduler - 1.3.0-SNAPSHOT + 1.3.2-SNAPSHOT 4.0.0 diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java index 29d48db8f8..d774dc8b9e 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java @@ -26,69 +26,34 @@ public class RemotingException extends Exception { super(); } - /** Constructs a new runtime exception with the specified detail message. - * The cause is not initialized, and may subsequently be initialized by a - * call to {@link #initCause}. + /** + * Construct a new runtime exception with the detail message * - * @param message the detail message. The detail message is saved for - * later retrieval by the {@link #getMessage()} method. + * @param message detail message */ public RemotingException(String message) { super(message); } /** - * Constructs a new runtime exception with the specified detail message and - * cause.

Note that the detail message associated with - * {@code cause} is not automatically incorporated in - * this runtime exception's detail message. + * Construct a new runtime exception with the detail message and cause * - * @param message the detail message (which is saved for later retrieval - * by the {@link #getMessage()} method). - * @param cause the cause (which is saved for later retrieval by the - * {@link #getCause()} method). (A null value is - * permitted, and indicates that the cause is nonexistent or - * unknown.) + * @param message the detail message + * @param cause the cause * @since 1.4 */ public RemotingException(String message, Throwable cause) { super(message, cause); } - /** Constructs a new runtime exception with the specified cause and a - * detail message of (cause==null ? null : cause.toString()) - * (which typically contains the class and detail message of - * cause). This constructor is useful for runtime exceptions - * that are little more than wrappers for other throwables. + /** + * Construct a new runtime exception with throwable * - * @param cause the cause (which is saved for later retrieval by the - * {@link #getCause()} method). (A null value is - * permitted, and indicates that the cause is nonexistent or - * unknown.) - * @since 1.4 + * @param cause the cause */ public RemotingException(Throwable cause) { super(cause); } - /** - * Constructs a new runtime exception with the specified detail - * message, cause, suppression enabled or disabled, and writable - * stack trace enabled or disabled. - * - * @param message the detail message. - * @param cause the cause. (A {@code null} value is permitted, - * and indicates that the cause is nonexistent or unknown.) - * @param enableSuppression whether or not suppression is enabled - * or disabled - * @param writableStackTrace whether or not the stack trace should - * be writable - * - * @since 1.7 - */ - protected RemotingException(String message, Throwable cause, - boolean enableSuppression, - boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } + } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java index 2e3954f4bc..bbb32c76d1 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java @@ -170,21 +170,6 @@ public class ResponseFuture { } } - @Override - public String toString() { - return "ResponseFuture{" + - "opaque=" + opaque + - ", timeoutMillis=" + timeoutMillis + - ", invokeCallback=" + invokeCallback + - ", releaseSemaphore=" + releaseSemaphore + - ", latch=" + latch + - ", beginTimestamp=" + beginTimestamp + - ", responseCommand=" + responseCommand + - ", sendOk=" + sendOk + - ", cause=" + cause + - '}'; - } - /** * scan future table */ @@ -209,4 +194,19 @@ public class ResponseFuture { } } } + + @Override + public String toString() { + return "ResponseFuture{" + + "opaque=" + opaque + + ", timeoutMillis=" + timeoutMillis + + ", invokeCallback=" + invokeCallback + + ", releaseSemaphore=" + releaseSemaphore + + ", latch=" + latch + + ", beginTimestamp=" + beginTimestamp + + ", responseCommand=" + responseCommand + + ", sendOk=" + sendOk + + ", cause=" + cause + + '}'; + } } diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml index b2daacf5db..4cbce0ab47 100644 --- a/dolphinscheduler-server/pom.xml +++ b/dolphinscheduler-server/pom.xml @@ -21,7 +21,7 @@ org.apache.dolphinscheduler dolphinscheduler - 1.3.0-SNAPSHOT + 1.3.2-SNAPSHOT dolphinscheduler-server dolphinscheduler-server diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java index 97afb4f6d9..210db5c4c4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java @@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.entity; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import java.io.Serializable; -import java.util.List; +import java.util.Map; /** * SQL Task ExecutionContext @@ -38,9 +38,9 @@ public class SQLTaskExecutionContext implements Serializable { */ private String connectionParams; /** - * udf function list + * udf function tenant code map */ - private List udfFuncList; + private Map udfFuncTenantCodeMap; public int getWarningGroupId() { @@ -51,12 +51,12 @@ public class SQLTaskExecutionContext implements Serializable { this.warningGroupId = warningGroupId; } - public List getUdfFuncList() { - return udfFuncList; + public Map getUdfFuncTenantCodeMap() { + return udfFuncTenantCodeMap; } - public void setUdfFuncList(List udfFuncList) { - this.udfFuncList = udfFuncList; + public void setUdfFuncTenantCodeMap(Map udfFuncTenantCodeMap) { + this.udfFuncTenantCodeMap = udfFuncTenantCodeMap; } public String getConnectionParams() { @@ -72,7 +72,7 @@ public class SQLTaskExecutionContext implements Serializable { return "SQLTaskExecutionContext{" + "warningGroupId=" + warningGroupId + ", connectionParams='" + connectionParams + '\'' + - ", udfFuncList=" + udfFuncList + + ", udfFuncTenantCodeMap=" + udfFuncTenantCodeMap + '}'; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index 3fc65c1853..81488fb134 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import java.io.Serializable; import java.util.Date; -import java.util.List; import java.util.Map; /** @@ -171,9 +170,9 @@ public class TaskExecutionContext implements Serializable{ private String workerGroup; /** - * resources full name + * resources full name and tenant code */ - private List resources; + private Map resources; /** * sql TaskExecutionContext @@ -446,11 +445,11 @@ public class TaskExecutionContext implements Serializable{ this.dependenceTaskExecutionContext = dependenceTaskExecutionContext; } - public List getResources() { + public Map getResources() { return resources; } - public void setResources(List resources) { + public void setResources(Map resources) { this.resources = resources; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 68db1f2061..21995c3867 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.consumer; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.SqoopJobType; +import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -49,14 +50,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; - /** * TaskUpdateQueue consumer */ @@ -330,7 +327,13 @@ public class TaskPriorityQueueConsumer extends Thread{ } List udfFuncList = processService.queryUdfFunListByids(udfFunIdsArray); - sqlTaskExecutionContext.setUdfFuncList(udfFuncList); + Map udfFuncMap = new HashMap<>(); + for(UdfFunc udfFunc : udfFuncList) { + String tenantCode = processService.queryTenantCodeByResName(udfFunc.getResourceName(), ResourceType.UDF); + udfFuncMap.put(udfFunc,tenantCode); + } + + sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncMap); } } @@ -364,20 +367,23 @@ public class TaskPriorityQueueConsumer extends Thread{ } /** - * get resource full name list + * get resource map key is full name and value is tenantCode */ - private List getResourceFullNames(TaskNode taskNode) { - List resourceFullNameList = new ArrayList<>(); + private Map getResourceFullNames(TaskNode taskNode) { + Map resourceMap = new HashMap<>(); AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); if (baseParam != null) { List projectResourceFiles = baseParam.getResourceFilesList(); - if (projectResourceFiles != null) { + if (CollectionUtils.isNotEmpty(projectResourceFiles)) { // filter the resources that the resource id equals 0 Set oldVersionResources = projectResourceFiles.stream().filter(t -> t.getId() == 0).collect(Collectors.toSet()); if (CollectionUtils.isNotEmpty(oldVersionResources)) { - resourceFullNameList.addAll(oldVersionResources.stream().map(resource -> resource.getRes()).collect(Collectors.toSet())); + + oldVersionResources.forEach( + (t)->resourceMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE)) + ); } // get the resource id in order to get the resource names in batch @@ -388,13 +394,13 @@ public class TaskPriorityQueueConsumer extends Thread{ Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]); List resources = processService.listResourceByIds(resourceIds); - resourceFullNameList.addAll(resources.stream() - .map(resourceInfo -> resourceInfo.getFullName()) - .collect(Collectors.toList())); + resources.forEach( + (t)->resourceMap.put(t.getFullName(),processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE)) + ); } } } - return resourceFullNameList; + return resourceMap; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java index 8a441b9de1..eaf614c023 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java @@ -26,68 +26,21 @@ public class ExecuteException extends Exception{ super(); } - /** - * Constructs a new exception with the specified detail message. The - * cause is not initialized, and may subsequently be initialized by - * a call to {@link #initCause}. - * - * @param message the detail message. The detail message is saved for - * later retrieval by the {@link #getMessage()} method. - */ + public ExecuteException(String message) { super(message); } - /** - * Constructs a new exception with the specified detail message and - * cause.

Note that the detail message associated with - * {@code cause} is not automatically incorporated in - * this exception's detail message. - * - * @param message the detail message (which is saved for later retrieval - * by the {@link #getMessage()} method). - * @param cause the cause (which is saved for later retrieval by the - * {@link #getCause()} method). (A null value is - * permitted, and indicates that the cause is nonexistent or - * unknown.) - * @since 1.4 - */ + public ExecuteException(String message, Throwable cause) { super(message, cause); } - /** - * Constructs a new exception with the specified cause and a detail - * message of (cause==null ? null : cause.toString()) (which - * typically contains the class and detail message of cause). - * This constructor is useful for exceptions that are little more than - * wrappers for other throwables (for example, {@link - * java.security.PrivilegedActionException}). - * - * @param cause the cause (which is saved for later retrieval by the - * {@link #getCause()} method). (A null value is - * permitted, and indicates that the cause is nonexistent or - * unknown.) - * @since 1.4 - */ + public ExecuteException(Throwable cause) { super(cause); } - /** - * Constructs a new exception with the specified detail message, - * cause, suppression enabled or disabled, and writable stack - * trace enabled or disabled. - * - * @param message the detail message. - * @param cause the cause. (A {@code null} value is permitted, - * and indicates that the cause is nonexistent or unknown.) - * @param enableSuppression whether or not suppression is enabled - * or disabled - * @param writableStackTrace whether or not the stack trace should - * be writable - * @since 1.7 - */ protected ExecuteException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index 6fc3f45ec4..ee9b86babb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor; +import com.github.rholder.retry.RetryException; +import org.apache.dolphinscheduler.common.utils.RetryerUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -36,6 +38,7 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.*; +import java.util.concurrent.ExecutionException; /** * netty executor manager @@ -133,26 +136,13 @@ public class NettyExecutorManager extends AbstractExecutorManager{ * @throws ExecuteException if error throws ExecuteException */ private void doExecute(final Host host, final Command command) throws ExecuteException { - /** - * retry count,default retry 3 - */ - int retryCount = 3; - boolean success = false; - do { - try { + try { + RetryerUtils.retryCall(() -> { nettyRemotingClient.send(host, command); - success = true; - } catch (Exception ex) { - logger.error(String.format("send command : %s to %s error", command, host), ex); - retryCount--; - try { - Thread.sleep(100); - } catch (InterruptedException ignore) {} - } - } while (retryCount >= 0 && !success); - - if (!success) { - throw new ExecuteException(String.format("send command : %s to %s error", command, host)); + return Boolean.TRUE; + }); + } catch (ExecutionException | RetryException e) { + throw new ExecuteException(String.format("send command : %s to %s error", command, host), e); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java index 1668f8ae92..040ea5a43f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java @@ -16,10 +16,16 @@ */ package org.apache.dolphinscheduler.server.master.registry; +import java.util.Date; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.annotation.PostConstruct; + import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; @@ -31,13 +37,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import javax.annotation.PostConstruct; -import java.util.Date; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; +import com.google.common.collect.Sets; /** * master registry @@ -100,7 +100,7 @@ public class MasterRegistry { HeartBeatTask heartBeatTask = new HeartBeatTask(startTime, masterConfig.getMasterReservedMemory(), masterConfig.getMasterMaxCpuloadAvg(), - getMasterPath(), + Sets.newHashSet(getMasterPath()), zookeeperRegistryCenter); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); @@ -113,6 +113,7 @@ public class MasterRegistry { public void unRegistry() { String address = getLocalAddress(); String localNodePath = getMasterPath(); + heartBeatExecutor.shutdownNow(); zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath); logger.info("master node : {} unRegistry to ZK.", address); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java index 2345ce9533..bd8c79cce9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java @@ -17,50 +17,50 @@ package org.apache.dolphinscheduler.server.registry; +import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; + +import java.util.Date; +import java.util.Set; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Date; - -import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; - -public class HeartBeatTask extends Thread{ +public class HeartBeatTask extends Thread { private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class); private String startTime; private double reservedMemory; private double maxCpuloadAvg; - private String heartBeatPath; + private Set heartBeatPaths; private ZookeeperRegistryCenter zookeeperRegistryCenter; public HeartBeatTask(String startTime, double reservedMemory, double maxCpuloadAvg, - String heartBeatPath, - ZookeeperRegistryCenter zookeeperRegistryCenter){ + Set heartBeatPaths, + ZookeeperRegistryCenter zookeeperRegistryCenter) { this.startTime = startTime; this.reservedMemory = reservedMemory; this.maxCpuloadAvg = maxCpuloadAvg; - this.heartBeatPath = heartBeatPath; + this.heartBeatPaths = heartBeatPaths; this.zookeeperRegistryCenter = zookeeperRegistryCenter; } @Override public void run() { try { - double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); double loadAverage = OSUtils.loadAverage(); int status = Constants.NORAML_NODE_STATUS; - if(availablePhysicalMemorySize < reservedMemory - || loadAverage > maxCpuloadAvg){ - logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize , loadAverage); + if (availablePhysicalMemorySize < reservedMemory + || loadAverage > maxCpuloadAvg) { + logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize, loadAverage); status = Constants.ABNORMAL_NODE_STATUS; } @@ -76,8 +76,11 @@ public class HeartBeatTask extends Thread{ builder.append(status).append(COMMA); //save process id builder.append(OSUtils.getProcessID()); - zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, builder.toString()); - } catch (Throwable ex){ + + for (String heartBeatPath : heartBeatPaths) { + zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, builder.toString()); + } + } catch (Throwable ex) { logger.error("error write heartbeat info", ex); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java index 63efb24a3e..3a8c8fe7d6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.server.utils; +import org.apache.commons.collections.MapUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; @@ -24,10 +25,8 @@ import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.slf4j.Logger; import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; +import java.util.stream.Collectors; import static org.apache.dolphinscheduler.common.utils.CollectionUtils.isNotEmpty; @@ -43,53 +42,44 @@ public class UDFUtils { /** * create function list - * @param udfFuncs udf functions - * @param tenantCode tenant code - * @param logger logger + * @param udfFuncTenantCodeMap key is udf function,value is tenant code + * @param logger logger * @return create function list */ - public static List createFuncs(List udfFuncs, String tenantCode,Logger logger){ + public static List createFuncs(Map udfFuncTenantCodeMap, Logger logger){ - if (CollectionUtils.isEmpty(udfFuncs)){ + if (MapUtils.isEmpty(udfFuncTenantCodeMap)){ logger.info("can't find udf function resource"); return null; } - // get hive udf jar path - String hiveUdfJarPath = HadoopUtils.getHdfsUdfDir(tenantCode); - logger.info("hive udf jar path : {}" , hiveUdfJarPath); - - // is the root directory of udf defined - if (StringUtils.isEmpty(hiveUdfJarPath)) { - logger.error("not define hive udf jar path"); - throw new RuntimeException("hive udf jar base path not defined "); - } - Set resources = getFuncResouces(udfFuncs); List funcList = new ArrayList<>(); // build jar sql - buildJarSql(funcList, resources, hiveUdfJarPath); + buildJarSql(funcList, udfFuncTenantCodeMap); // build temp function sql - buildTempFuncSql(funcList, udfFuncs); + buildTempFuncSql(funcList, udfFuncTenantCodeMap.keySet().stream().collect(Collectors.toList())); return funcList; } /** * build jar sql - * @param sqls sql list - * @param resources resource set - * @param uploadPath upload path + * @param sqls sql list + * @param udfFuncTenantCodeMap key is udf function,value is tenant code */ - private static void buildJarSql(List sqls, Set resources, String uploadPath) { + private static void buildJarSql(List sqls, Map udfFuncTenantCodeMap) { String defaultFS = HadoopUtils.getInstance().getConfiguration().get(Constants.FS_DEFAULTFS); - if (!uploadPath.startsWith("hdfs:")) { - uploadPath = defaultFS + uploadPath; - } - for (String resource : resources) { - sqls.add(String.format("add jar %s/%s", uploadPath, resource)); + Set> entries = udfFuncTenantCodeMap.entrySet(); + for (Map.Entry entry:entries){ + String uploadPath = HadoopUtils.getHdfsUdfDir(entry.getValue()); + if (!uploadPath.startsWith("hdfs:")) { + uploadPath = defaultFS + uploadPath; + } + sqls.add(String.format("add jar %s%s", uploadPath, entry.getKey().getResourceName())); } + } /** @@ -106,20 +96,5 @@ public class UDFUtils { } } - /** - * get the resource names of all functions - * @param udfFuncs udf function list - * @return - */ - private static Set getFuncResouces(List udfFuncs) { - Set resources = new HashSet<>(); - - for (UdfFunc udfFunc : udfFuncs) { - resources.add(udfFunc.getResourceName()); - } - - return resources; - } - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 1a31fa09fe..2dedaf8e1b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -17,6 +17,8 @@ */ package org.apache.dolphinscheduler.server.worker.config; +import java.util.Set; + import org.apache.dolphinscheduler.common.Constants; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.PropertySource; @@ -41,8 +43,8 @@ public class WorkerConfig { @Value("${worker.reserved.memory:0.3}") private double workerReservedMemory; - @Value("${worker.group: default}") - private String workerGroup; + @Value("#{'${worker.groups:default}'.split(',')}") + private Set workerGroups; @Value("${worker.listen.port: 1234}") private int listenPort; @@ -55,12 +57,12 @@ public class WorkerConfig { this.listenPort = listenPort; } - public String getWorkerGroup() { - return workerGroup; + public Set getWorkerGroups() { + return workerGroups; } - public void setWorkerGroup(String workerGroup) { - this.workerGroup = workerGroup; + public void setWorkerGroups(Set workerGroups) { + this.workerGroups = workerGroups; } public int getWorkerExecThreads() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index e1349ea9fe..5e400e1e1f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -16,10 +16,20 @@ */ package org.apache.dolphinscheduler.server.worker.registry; +import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; +import static org.apache.dolphinscheduler.common.Constants.SLASH; + +import java.util.Date; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.annotation.PostConstruct; + import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -32,15 +42,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import javax.annotation.PostConstruct; -import java.util.Date; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import static org.apache.dolphinscheduler.common.Constants.COMMA; -import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; -import static org.apache.dolphinscheduler.common.Constants.SLASH; +import com.google.common.collect.Sets; /** @@ -74,11 +76,11 @@ public class WorkerRegistry { private String startTime; - private String workerGroup; + private Set workerGroups; @PostConstruct - public void init(){ - this.workerGroup = workerConfig.getWorkerGroup(); + public void init() { + this.workerGroups = workerConfig.getWorkerGroups(); this.startTime = DateUtils.dateToString(new Date()); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); } @@ -88,31 +90,35 @@ public class WorkerRegistry { */ public void registry() { String address = NetUtils.getHost(); - String localNodePath = getWorkerPath(); - zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); - zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) { - if(newState == ConnectionState.LOST){ - logger.error("worker : {} connection lost from zookeeper", address); - } else if(newState == ConnectionState.RECONNECTED){ - logger.info("worker : {} reconnected to zookeeper", address); - zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); - } else if(newState == ConnectionState.SUSPENDED){ - logger.warn("worker : {} connection SUSPENDED ", address); - } - } - }); + Set workerZkPaths = getWorkerZkPaths(); int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval(); - HeartBeatTask heartBeatTask = new HeartBeatTask(startTime, - workerConfig.getWorkerReservedMemory(), - workerConfig.getWorkerMaxCpuloadAvg(), - getWorkerPath(), - zookeeperRegistryCenter); - this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); - logger.info("worker node : {} registry to ZK successfully with heartBeatInterval : {}s", address, workerHeartbeatInterval); + for (String workerZKPath : workerZkPaths) { + zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath, ""); + zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + if (newState == ConnectionState.LOST) { + logger.error("worker : {} connection lost from zookeeper", address); + } else if (newState == ConnectionState.RECONNECTED) { + logger.info("worker : {} reconnected to zookeeper", address); + zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath, ""); + } else if (newState == ConnectionState.SUSPENDED) { + logger.warn("worker : {} connection SUSPENDED ", address); + } + } + }); + logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath); + } + HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime, + this.workerConfig.getWorkerReservedMemory(), + this.workerConfig.getWorkerMaxCpuloadAvg(), + workerZkPaths, + this.zookeeperRegistryCenter); + + this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); + logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval); } /** @@ -120,36 +126,41 @@ public class WorkerRegistry { */ public void unRegistry() { String address = getLocalAddress(); - String localNodePath = getWorkerPath(); - zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath); + Set workerZkPaths = getWorkerZkPaths(); + for (String workerZkPath : workerZkPaths) { + zookeeperRegistryCenter.getZookeeperCachedOperator().remove(workerZkPath); + logger.info("worker node : {} unRegistry from ZK {}.", address, workerZkPath); + } this.heartBeatExecutor.shutdownNow(); - logger.info("worker node : {} unRegistry to ZK.", address); } /** * get worker path - * @return */ - private String getWorkerPath() { + private Set getWorkerZkPaths() { + Set workerZkPaths = Sets.newHashSet(); + String address = getLocalAddress(); - StringBuilder builder = new StringBuilder(100); - String workerPath = this.zookeeperRegistryCenter.getWorkerPath(); - builder.append(workerPath).append(SLASH); - if(StringUtils.isEmpty(workerGroup)){ - workerGroup = DEFAULT_WORKER_GROUP; + String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath(); + + for (String workGroup : this.workerGroups) { + StringBuilder workerZkPathBuilder = new StringBuilder(100); + workerZkPathBuilder.append(workerZkPathPrefix).append(SLASH); + if (StringUtils.isEmpty(workGroup)) { + workGroup = DEFAULT_WORKER_GROUP; + } + // trim and lower case is need + workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH); + workerZkPathBuilder.append(address); + workerZkPaths.add(workerZkPathBuilder.toString()); } - //trim and lower case is need - builder.append(workerGroup.trim().toLowerCase()).append(SLASH); - builder.append(address); - return builder.toString(); + return workerZkPaths; } /** * get local address - * @return */ - private String getLocalAddress(){ + private String getLocalAddress() { return NetUtils.getHost() + ":" + workerConfig.getListenPort(); } - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index bf4c46f1b3..26494bc77b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -22,7 +22,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.Set; +import org.apache.commons.collections.MapUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -106,7 +108,6 @@ public class TaskExecuteThread implements Runnable { // copy hdfs/minio file to local downloadResource(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources(), - taskExecutionContext.getTenantCode(), logger); taskExecutionContext.setTaskParams(taskNode.getParams()); @@ -227,22 +228,25 @@ public class TaskExecuteThread implements Runnable { * @param logger */ private void downloadResource(String execLocalPath, - List projectRes, - String tenantCode, + Map projectRes, Logger logger) throws Exception { - if (CollectionUtils.isEmpty(projectRes)){ + if (MapUtils.isEmpty(projectRes)){ return; } - for (String resource : projectRes) { - File resFile = new File(execLocalPath, resource); + Set> resEntries = projectRes.entrySet(); + + for (Map.Entry resource : resEntries) { + String fullName = resource.getKey(); + String tenantCode = resource.getValue(); + File resFile = new File(execLocalPath, fullName); if (!resFile.exists()) { try { // query the tenant code of the resource according to the name of the resource - String resHdfsPath = HadoopUtils.getHdfsResourceFileName(tenantCode, resource); + String resHdfsPath = HadoopUtils.getHdfsResourceFileName(tenantCode, fullName); logger.info("get resource file from hdfs :{}", resHdfsPath); - HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + resource, false, true); + HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + fullName, false, true); }catch (Exception e){ logger.error(e.getMessage(),e); throw new RuntimeException(e.getMessage()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index 0afeb8a3c8..acc75d70d2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -131,8 +131,7 @@ public class SqlTask extends AbstractTask { .map(this::getSqlAndSqlParamsMap) .collect(Collectors.toList()); - List createFuncs = UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncList(), - taskExecutionContext.getTenantCode(), + List createFuncs = UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncTenantCodeMap(), logger); // execute sql task diff --git a/dolphinscheduler-server/src/main/resources/worker.properties b/dolphinscheduler-server/src/main/resources/worker.properties index eb01bbb3ab..0365c8a9c9 100644 --- a/dolphinscheduler-server/src/main/resources/worker.properties +++ b/dolphinscheduler-server/src/main/resources/worker.properties @@ -31,4 +31,4 @@ #worker.listen.port: 1234 # default worker group -worker.group=default \ No newline at end of file +#worker.groups=default diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java index b34ba8bee9..7fc9d2bf79 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java @@ -17,62 +17,154 @@ package org.apache.dolphinscheduler.server.worker.registry; +import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executor; + +import org.apache.curator.framework.imps.CuratorFrameworkImpl; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; -import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringRunner; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.concurrent.TimeUnit; +import com.google.common.collect.Sets; -import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; - - -import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH; /** * worker registry test */ -@RunWith(SpringRunner.class) -@ContextConfiguration(classes={SpringZKServer.class, WorkerRegistry.class,ZookeeperRegistryCenter.class, WorkerConfig.class, ZookeeperCachedOperator.class, ZookeeperConfig.class}) - +@RunWith(MockitoJUnitRunner.Silent.class) public class WorkerRegistryTest { - @Autowired + private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRegistryTest.class); + + private static final String TEST_WORKER_GROUP = "test"; + + @InjectMocks private WorkerRegistry workerRegistry; - @Autowired + @Mock private ZookeeperRegistryCenter zookeeperRegistryCenter; - @Autowired + @Mock + private ZookeeperCachedOperator zookeeperCachedOperator; + + @Mock + private CuratorFrameworkImpl zkClient; + + @Mock private WorkerConfig workerConfig; + @Before + public void before() { + Set workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP); + Mockito.when(workerConfig.getWorkerGroups()).thenReturn(workerGroups); + + Mockito.when(zookeeperRegistryCenter.getWorkerPath()).thenReturn("/dolphinscheduler/nodes/worker"); + Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator()).thenReturn(zookeeperCachedOperator); + Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient()).thenReturn(zkClient); + Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable()).thenReturn( + new Listenable() { + @Override + public void addListener(ConnectionStateListener connectionStateListener) { + LOGGER.info("add listener"); + } + + @Override + public void addListener(ConnectionStateListener connectionStateListener, Executor executor) { + LOGGER.info("add listener executor"); + } + + @Override + public void removeListener(ConnectionStateListener connectionStateListener) { + LOGGER.info("remove listener"); + } + }); + + Mockito.when(workerConfig.getWorkerHeartbeatInterval()).thenReturn(10); + + Mockito.when(workerConfig.getWorkerReservedMemory()).thenReturn(1.1); + + Mockito.when(workerConfig.getWorkerMaxCpuloadAvg()).thenReturn(1); + } + @Test - public void testRegistry() throws InterruptedException { + public void testRegistry() { + + workerRegistry.init(); + workerRegistry.registry(); + String workerPath = zookeeperRegistryCenter.getWorkerPath(); - Assert.assertEquals(DEFAULT_WORKER_GROUP, workerConfig.getWorkerGroup().trim()); - String instancePath = workerPath + "/" + workerConfig.getWorkerGroup().trim() + "/" + (NetUtils.getHost() + ":" + workerConfig.getListenPort()); - TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); //wait heartbeat info write into zk node - String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(instancePath); - Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length); + + int i = 0; + for (String workerGroup : workerConfig.getWorkerGroups()) { + String workerZkPath = workerPath + "/" + workerGroup.trim() + "/" + (NetUtils.getHost() + ":" + workerConfig.getListenPort()); + String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(workerZkPath); + if (0 == i) { + Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/test/")); + } else { + Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/default/")); + } + i++; + } + + workerRegistry.unRegistry(); + + workerConfig.getWorkerGroups().add(StringUtils.EMPTY); + workerRegistry.init(); + workerRegistry.registry(); + + workerRegistry.unRegistry(); + + // testEmptyWorkerGroupsRegistry + workerConfig.getWorkerGroups().remove(StringUtils.EMPTY); + workerConfig.getWorkerGroups().remove(TEST_WORKER_GROUP); + workerConfig.getWorkerGroups().remove(DEFAULT_WORKER_GROUP); + workerRegistry.init(); + workerRegistry.registry(); + + List testWorkerGroupPathZkChildren = zookeeperRegistryCenter.getChildrenKeys(workerPath + "/" + TEST_WORKER_GROUP); + List defaultWorkerGroupPathZkChildren = zookeeperRegistryCenter.getChildrenKeys(workerPath + "/" + DEFAULT_WORKER_GROUP); + + Assert.assertEquals(0, testWorkerGroupPathZkChildren.size()); + Assert.assertEquals(0, defaultWorkerGroupPathZkChildren.size()); } @Test - public void testUnRegistry() throws InterruptedException { + public void testUnRegistry() { + workerRegistry.init(); workerRegistry.registry(); - TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); //wait heartbeat info write into zk node + workerRegistry.unRegistry(); String workerPath = zookeeperRegistryCenter.getWorkerPath(); - String workerGroupPath = workerPath + "/" + workerConfig.getWorkerGroup().trim(); - List childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath); - Assert.assertTrue(childrenKeys.isEmpty()); + + for (String workerGroup : workerConfig.getWorkerGroups()) { + String workerGroupPath = workerPath + "/" + workerGroup.trim(); + List childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath); + Assert.assertTrue(childrenKeys.isEmpty()); + } + + // testEmptyWorkerGroupsUnRegistry + workerConfig.getWorkerGroups().remove(TEST_WORKER_GROUP); + workerConfig.getWorkerGroups().remove(DEFAULT_WORKER_GROUP); + workerRegistry.init(); + workerRegistry.registry(); + + workerRegistry.unRegistry(); } } diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-service/pom.xml index 1b5b95363e..f0e8f408d7 100644 --- a/dolphinscheduler-service/pom.xml +++ b/dolphinscheduler-service/pom.xml @@ -20,7 +20,7 @@ dolphinscheduler org.apache.dolphinscheduler - 1.3.0-SNAPSHOT + 1.3.2-SNAPSHOT 4.0.0 diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java index 3cdc9ab972..c7a53ebdc0 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.service.zk; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooKeeperServerMain; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; @@ -34,44 +35,62 @@ import java.util.concurrent.atomic.AtomicBoolean; public class ZKServer { private static final Logger logger = LoggerFactory.getLogger(ZKServer.class); - private static volatile PublicZooKeeperServerMain zkServer = null; - public static final int DEFAULT_ZK_TEST_PORT = 2181; - private static String dataDir = null; + private final AtomicBoolean isStarted = new AtomicBoolean(false); + + private PublicZooKeeperServerMain zooKeeperServerMain = null; + + private int port; - private static final AtomicBoolean isStarted = new AtomicBoolean(false); + private String dataDir = null; + + private String prefix; public static void main(String[] args) { - if(!isStarted()){ - ZKServer.start(); - - /** - * register hooks, which are called before the process exits - */ - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - stop(); - } - })); - }else{ - logger.info("zk server aleady started"); + ZKServer zkServer; + if (args.length == 0) { + zkServer = new ZKServer(); + } else if (args.length == 1){ + zkServer = new ZKServer(Integer.valueOf(args[0]), ""); + } else { + zkServer = new ZKServer(Integer.valueOf(args[0]), args[1]); + } + zkServer.registerHook(); + zkServer.start(); + } + + public ZKServer() { + this(DEFAULT_ZK_TEST_PORT, ""); + } + + public ZKServer(int port, String prefix) { + this.port = port; + if (prefix != null && prefix.contains("/")) { + throw new IllegalArgumentException("The prefix of path may not have '/'"); } + this.prefix = (prefix == null ? null : prefix.trim()); + } + + private void registerHook() { + /** + * register hooks, which are called before the process exits + */ + Runtime.getRuntime().addShutdownHook(new Thread(this::stop)); } /** * start service */ - public static void start() { + public void start() { try { - startLocalZkServer(DEFAULT_ZK_TEST_PORT); + startLocalZkServer(port); } catch (Exception e) { - logger.error("Failed to start ZK: " + e); + logger.error("Failed to start ZK ", e); } } - public static boolean isStarted(){ + public boolean isStarted(){ return isStarted.get(); } @@ -94,8 +113,12 @@ public class ZKServer { * * @param port The port to listen on */ - public static void startLocalZkServer(final int port) { - String zkDataDir = System.getProperty("user.dir") +"/zookeeper_data"; + public void startLocalZkServer(final int port) { + String zkDataDir = System.getProperty("user.dir") + (StringUtils.isEmpty(prefix) ? StringUtils.EMPTY : ("/" + prefix)) + "/zookeeper_data"; + File file = new File(zkDataDir); + if (file.exists()) { + logger.warn("The path of zk server exists"); + } logger.info("zk server starting, data dir path:{}" , zkDataDir); startLocalZkServer(port, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME,"60"); } @@ -108,31 +131,29 @@ public class ZKServer { * @param tickTime zk tick time * @param maxClientCnxns zk max client connections */ - private static synchronized void startLocalZkServer(final int port, final String dataDirPath,final int tickTime,String maxClientCnxns) { - if (zkServer != null) { - throw new RuntimeException("Zookeeper server is already started!"); - } - zkServer = new PublicZooKeeperServerMain(); - logger.info("Zookeeper data path : {} ", dataDirPath); - dataDir = dataDirPath; - final String[] args = new String[]{Integer.toString(port), dataDirPath, Integer.toString(tickTime), maxClientCnxns}; + private void startLocalZkServer(final int port, final String dataDirPath,final int tickTime,String maxClientCnxns) { + if (isStarted.compareAndSet(false, true)) { + zooKeeperServerMain = new PublicZooKeeperServerMain(); + logger.info("Zookeeper data path : {} ", dataDirPath); + dataDir = dataDirPath; + final String[] args = new String[]{Integer.toString(port), dataDirPath, Integer.toString(tickTime), maxClientCnxns}; - try { - logger.info("Zookeeper server started "); - isStarted.compareAndSet(false, true); - - zkServer.initializeAndRun(args); - } catch (QuorumPeerConfig.ConfigException e) { - logger.warn("Caught exception while starting ZK", e); - } catch (IOException e) { - logger.warn("Caught exception while starting ZK", e); + try { + logger.info("Zookeeper server started "); + isStarted.compareAndSet(false, true); + + zooKeeperServerMain.initializeAndRun(args); + } catch (QuorumPeerConfig.ConfigException | IOException e) { + logger.warn("Caught exception while starting ZK", e); + throw new RuntimeException(e); + } } } /** * Stops a local Zk instance, deleting its data directory */ - public static void stop() { + public void stop() { try { stopLocalZkServer(true); logger.info("zk server stopped"); @@ -147,19 +168,21 @@ public class ZKServer { * * @param deleteDataDir Whether or not to delete the data directory */ - private static synchronized void stopLocalZkServer(final boolean deleteDataDir) { - if (zkServer != null) { + private void stopLocalZkServer(final boolean deleteDataDir) { + if (isStarted.compareAndSet(true, false)) { try { - zkServer.shutdown(); - zkServer = null; + if (zooKeeperServerMain == null) { + return; + } + zooKeeperServerMain.shutdown(); + zooKeeperServerMain = null; if (deleteDataDir) { org.apache.commons.io.FileUtils.deleteDirectory(new File(dataDir)); } - isStarted.compareAndSet(true, false); } catch (Exception e) { logger.warn("Caught exception while stopping ZK server", e); throw new RuntimeException(e); } } } -} \ No newline at end of file +} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java index 42b942b907..10be65e90a 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java @@ -18,18 +18,44 @@ package org.apache.dolphinscheduler.service.zk; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -// ZKServer is a process, can't unit test -public class ZKServerTest { +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +public class ZKServerTest { + private static final Logger log = LoggerFactory.getLogger(ZKServerTest.class); @Test - public void isStarted() { - Assert.assertEquals(false, ZKServer.isStarted()); + public void testRunWithDefaultPort() { + AtomicReference zkServer = new AtomicReference<>(); + new Thread(() -> { + zkServer.set(new ZKServer()); + zkServer.get().start(); + }).start(); + try { + TimeUnit.SECONDS.sleep(5); + Assert.assertEquals(true, zkServer.get().isStarted()); + } catch (InterruptedException e) { + log.error("Thread interrupted", e); + } + zkServer.get().stop(); } @Test - public void stop() { - ZKServer.stop(); + public void testRunWithCustomPort() { + AtomicReference zkServer = new AtomicReference<>(); + new Thread(() -> { + zkServer.set(new ZKServer(2183, null)); + zkServer.get().start(); + }).start(); + try { + TimeUnit.SECONDS.sleep(5); + Assert.assertEquals(true, zkServer.get().isStarted()); + } catch (InterruptedException e) { + log.error("Thread interrupted", e); + } + zkServer.get().stop(); } } \ No newline at end of file diff --git a/dolphinscheduler-ui/pom.xml b/dolphinscheduler-ui/pom.xml index 8baeb38a77..13644bad91 100644 --- a/dolphinscheduler-ui/pom.xml +++ b/dolphinscheduler-ui/pom.xml @@ -20,7 +20,7 @@ dolphinscheduler org.apache.dolphinscheduler - 1.3.0-SNAPSHOT + 1.3.2-SNAPSHOT 4.0.0 diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/timeoutAlarm.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/timeoutAlarm.vue index b6710e5b33..0dcab3e901 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/timeoutAlarm.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/_source/timeoutAlarm.vue @@ -91,7 +91,8 @@ return false } // Verify timeout duration Non 0 positive integer - if (this.enable && !parseInt(this.interval) && !_.isInteger(this.interval)) { + const reg = /^[1-9]\d*$/ + if (this.enable && !reg.test(this.interval)) { this.$message.warning(`${this.$t('Timeout must be a positive integer')}`) return false } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue b/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue index 0d05205d28..1e15688c5d 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/createDataSource.vue @@ -24,16 +24,10 @@ @@ -200,7 +194,41 @@ showdDatabase: false, showConnectType: false, isShowPrincipal:true, - prePortMapper:{} + prePortMapper:{}, + datasourceTypeList: [ + { + value: 'MYSQL', + label: 'MYSQL' + }, + { + value: 'POSTGRESQL', + label: 'POSTGRESQL' + }, + { + value: 'HIVE', + label: 'HIVE/IMPALA' + }, + { + value: 'SPARK', + label: 'SPARK' + }, + { + value: 'CLICKHOUSE', + label: 'CLICKHOUSE' + }, + { + value: 'ORACLE', + label: 'ORACLE' + }, + { + value: 'SQLSERVER', + label: 'SQLSERVER' + }, + { + value: 'DB2', + label: 'DB2' + } + ] } }, props: { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperDirectories.vue b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperDirectories.vue new file mode 100644 index 0000000000..1201cb55b4 --- /dev/null +++ b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperDirectories.vue @@ -0,0 +1,112 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + + + + + diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue index ceaed89675..c8c0ed194c 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue @@ -22,8 +22,8 @@

IP: {{item.host}} - {{$t('Process Pid')}}: {{item.id}} - {{$t('Zk registration directory')}}: {{item.zkDirectory}} + {{$t('Process Pid')}}: {{item.port}} + {{$t('Zk registration directory')}}: {{$t('Directory detail')}}
{{$t('Create Time')}}: {{item.createTime | formatDate}} @@ -74,6 +74,7 @@ import mNoData from '@/module/components/noData/noData' import themeData from '@/module/echarts/themeData.json' import mListConstruction from '@/module/components/listConstruction/listConstruction' + import zookeeperDirectoriesPopup from './_source/zookeeperDirectories' export default { name: 'servers-worker', @@ -86,7 +87,25 @@ }, props: {}, methods: { - ...mapActions('monitor', ['getWorkerData']) + ...mapActions('monitor', ['getWorkerData']), + _showZkDirectories (item) { + let zkDirectories = [] + item.zkDirectories.forEach(zkDirectory => { + zkDirectories.push({ + zkDirectory: zkDirectory + }) + }) + this.$drawer({ + direction: 'right', + render (h) { + return h(zookeeperDirectoriesPopup, { + props: { + zkDirectories: zkDirectories + } + }) + } + }) + } }, watch: {}, created () { @@ -105,7 +124,7 @@ this.isLoading = true }) }, - components: { mList, mListConstruction, mSpin, mNoData, mGauge } + components: { mList, mListConstruction, mSpin, mNoData, mGauge, zookeeperDirectoriesPopup } }