diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 73125f4926..2aff56e090 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -173,6 +173,11 @@ public final class Constants { */ public static final String COMMA = ","; + /** + * slash / + */ + public static final String SLASH = "/"; + /** * COLON : */ @@ -994,4 +999,9 @@ public final class Constants { * dataSource sensitive param */ public static final String DATASOURCE_PASSWORD_REGEX = "(?<=(\"password\":\")).*?(?=(\"))"; + + /** + * default worker group + */ + public static final String DEFAULT_WORKER_GROUP = "default"; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java index 853be7562f..3ed71e5e93 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java @@ -160,6 +160,18 @@ public class TaskExecutionContext implements Serializable{ */ private int taskTimeout; + /** + * worker group + */ + private String workerGroup; + + public String getWorkerGroup() { + return workerGroup; + } + + public void setWorkerGroup(String workerGroup) { + this.workerGroup = workerGroup; + } public Integer getTaskInstanceId() { return taskInstanceId; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java index 14c7d9f167..5157dd288f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.context; +import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; @@ -33,23 +34,27 @@ public class ExecutionContext { /** * context */ - private final Object context; + private final TaskExecutionContext context; /** * executor type : worker or client */ private final ExecutorType executorType; - public ExecutionContext(Object context, ExecutorType executorType) { + public ExecutionContext(TaskExecutionContext context, ExecutorType executorType) { this.context = context; this.executorType = executorType; } + public String getWorkerGroup(){ + return context.getWorkerGroup(); + } + public ExecutorType getExecutorType() { return executorType; } - public Object getContext() { + public TaskExecutionContext getContext() { return context; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index bdfe71cf5f..f4b1dab6b1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -136,7 +136,7 @@ public class NettyExecutorManager extends AbstractExecutorManager{ ExecutorType executorType = context.getExecutorType(); switch (executorType){ case WORKER: - TaskExecutionContext taskExecutionContext = (TaskExecutionContext)context.getContext(); + TaskExecutionContext taskExecutionContext = context.getContext(); requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext)); break; case CLIENT: @@ -191,7 +191,7 @@ public class NettyExecutorManager extends AbstractExecutorManager{ ExecutorType executorType = context.getExecutorType(); switch (executorType){ case WORKER: - nodes = zookeeperNodeManager.getWorkerNodes(); + nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup()); break; case CLIENT: break; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java index 3bb001e842..a57363213e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java @@ -75,7 +75,7 @@ public class RoundRobinHostManager implements HostManager { ExecutorType executorType = context.getExecutorType(); switch (executorType){ case WORKER: - nodes = zookeeperNodeManager.getWorkerNodes(); + nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup()); break; case CLIENT: break; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java index 1d6808d51e..590a25f52c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java @@ -17,9 +17,11 @@ package org.apache.dolphinscheduler.server.registry; +import org.apache.commons.collections.CollectionUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.service.zk.AbstractListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,9 +32,12 @@ import org.springframework.stereotype.Service; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; + /** * zookeeper node manager */ @@ -47,14 +52,14 @@ public class ZookeeperNodeManager implements InitializingBean { private final Lock masterLock = new ReentrantLock(); /** - * worker lock + * worker group lock */ - private final Lock workerLock = new ReentrantLock(); + private final Lock workerGroupLock = new ReentrantLock(); /** - * worker nodes + * worker group nodes */ - private final Set workerNodes = new HashSet<>(); + private final ConcurrentHashMap> workerGroupNodes = new ConcurrentHashMap<>(); /** * master nodes @@ -84,7 +89,7 @@ public class ZookeeperNodeManager implements InitializingBean { /** * init WorkerNodeListener listener */ - registryCenter.getZookeeperCachedOperator().addListener(new WorkerNodeListener()); + registryCenter.getZookeeperCachedOperator().addListener(new WorkerGroupNodeListener()); } /** @@ -98,39 +103,55 @@ public class ZookeeperNodeManager implements InitializingBean { syncMasterNodes(masterNodes); /** - * worker nodes from zookeeper + * worker group nodes from zookeeper */ - Set workersNodes = registryCenter.getWorkerNodesDirectly(); - syncWorkerNodes(workersNodes); + Set workerGroups = registryCenter.getWorkerGroupDirectly(); + for(String workerGroup : workerGroups){ + syncWorkerGroupNodes(workerGroup, registryCenter.getWorkerGroupNodesDirectly(workerGroup)); + } } /** - * worker node listener + * worker group node listener */ - class WorkerNodeListener extends AbstractListener { + class WorkerGroupNodeListener extends AbstractListener { @Override protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { if(registryCenter.isWorkerPath(path)){ try { if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) { - logger.info("worker node : {} added.", path); + logger.info("worker group node : {} added.", path); + String group = parseGroup(path); + Set workerNodes = workerGroupNodes.getOrDefault(group, new HashSet<>()); Set previousNodes = new HashSet<>(workerNodes); - Set currentNodes = registryCenter.getWorkerNodesDirectly(); - syncWorkerNodes(currentNodes); + Set currentNodes = registryCenter.getWorkerGroupNodesDirectly(group); + logger.info("currentNodes : {}", currentNodes); + syncWorkerGroupNodes(group, currentNodes); } else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) { - logger.info("worker node : {} down.", path); + logger.info("worker group node : {} down.", path); + String group = parseGroup(path); + Set workerNodes = workerGroupNodes.getOrDefault(group, new HashSet<>()); Set previousNodes = new HashSet<>(workerNodes); - Set currentNodes = registryCenter.getWorkerNodesDirectly(); - syncWorkerNodes(currentNodes); + Set currentNodes = registryCenter.getWorkerGroupNodesDirectly(group); + syncWorkerGroupNodes(group, currentNodes); } } catch (IllegalArgumentException ignore) { logger.warn(ignore.getMessage()); } catch (Exception ex) { - logger.error("WorkerListener capture data change and get data failed", ex); + logger.error("WorkerGroupListener capture data change and get data failed", ex); } } } + + private String parseGroup(String path){ + String[] parts = path.split("\\/"); + if(parts.length != 6){ + throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path)); + } + String group = parts[4]; + return group; + } } @@ -189,29 +210,42 @@ public class ZookeeperNodeManager implements InitializingBean { } /** - * sync worker nodes - * @param nodes worker nodes + * sync worker group nodes + * @param workerGroup + * @param nodes */ - private void syncWorkerNodes(Set nodes){ - workerLock.lock(); + private void syncWorkerGroupNodes(String workerGroup, Set nodes){ + workerGroupLock.lock(); try { + workerGroup = workerGroup.toLowerCase(); + Set workerNodes = workerGroupNodes.getOrDefault(workerGroup, new HashSet<>()); workerNodes.clear(); workerNodes.addAll(nodes); + workerGroupNodes.put(workerGroup, workerNodes); } finally { - workerLock.unlock(); + workerGroupLock.unlock(); } } /** - * get worker nodes - * @return worker nodes + * get worker group nodes + * @param workerGroup + * @return */ - public Set getWorkerNodes(){ - workerLock.lock(); + public Set getWorkerGroupNodes(String workerGroup){ + workerGroupLock.lock(); try { - return Collections.unmodifiableSet(workerNodes); + if(StringUtils.isEmpty(workerGroup)){ + workerGroup = DEFAULT_WORKER_GROUP; + } + workerGroup = workerGroup.toLowerCase(); + Set nodes = workerGroupNodes.get(workerGroup); + if(CollectionUtils.isNotEmpty(nodes)){ + return Collections.unmodifiableSet(nodes); + } + return nodes; } finally { - workerLock.unlock(); + workerGroupLock.unlock(); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java index 7d7e2efb83..a6a3ea0822 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java @@ -127,6 +127,25 @@ public class ZookeeperRegistryCenter implements InitializingBean { return new HashSet<>(workers); } + /** + * get worker group directly + * @return + */ + public Set getWorkerGroupDirectly() { + List workers = getChildrenKeys(getWorkerPath()); + return new HashSet<>(workers); + } + + /** + * get worker group nodes + * @param workerGroup + * @return + */ + public Set getWorkerGroupNodesDirectly(String workerGroup) { + List workers = getChildrenKeys(getWorkerGroupPath(workerGroup)); + return new HashSet<>(workers); + } + /** * whether worker path * @param path path @@ -145,6 +164,15 @@ public class ZookeeperRegistryCenter implements InitializingBean { return path != null && path.contains(MASTER_PATH); } + /** + * get worker group path + * @param workerGroup + * @return + */ + public String getWorkerGroupPath(String workerGroup) { + return WORKER_PATH + "/" + workerGroup; + } + /** * get children nodes * @param key key diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 01f66aca6e..ec43dd875c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; @@ -37,13 +36,11 @@ import org.apache.dolphinscheduler.service.queue.TaskQueueFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.WebApplicationType; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.ComponentScan; import javax.annotation.PostConstruct; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -58,30 +55,17 @@ public class WorkerServer implements IStoppable { */ private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class); - /** * zk worker client */ @Autowired private ZKWorkerClient zkWorkerClient = null; - - /** - * alert database access - */ - @Autowired - private AlertDao alertDao; - /** * task queue impl */ protected ITaskQueue taskQueue; - /** - * kill executor service - */ - private ExecutorService killExecutorService; - /** * fetch task executor service */ @@ -92,9 +76,6 @@ public class WorkerServer implements IStoppable { */ private CountDownLatch latch; - @Value("${server.is-combined-server:false}") - private Boolean isCombinedServer; - /** * worker config */ @@ -157,8 +138,6 @@ public class WorkerServer implements IStoppable { this.taskQueue = TaskQueueFactory.getTaskQueueInstance(); - this.killExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Kill-Thread-Executor"); - this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor"); zkWorkerClient.setStoppable(this); @@ -169,17 +148,15 @@ public class WorkerServer implements IStoppable { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { - stop("shutdownhook"); + stop("shutdownHook"); } })); //let the main thread await latch = new CountDownLatch(1); - if (!isCombinedServer) { - try { - latch.await(); - } catch (InterruptedException ignore) { - } + try { + latch.await(); + } catch (InterruptedException ignore) { } } @@ -210,17 +187,10 @@ public class WorkerServer implements IStoppable { try { ThreadPoolExecutors.getInstance().shutdown(); }catch (Exception e){ - logger.warn("threadpool service stopped exception:{}",e.getMessage()); + logger.warn("threadPool service stopped exception:{}",e.getMessage()); } - logger.info("threadpool service stopped"); - - try { - killExecutorService.shutdownNow(); - }catch (Exception e){ - logger.warn("worker kill executor service stopped exception:{}",e.getMessage()); - } - logger.info("worker kill executor service stopped"); + logger.info("threadPool service stopped"); try { fetchTaskExecutorService.shutdownNow(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java index a5615ea343..db78127dc1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.worker.cache; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java index b559d58f6c..584c42bbba 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.cache.impl; import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; +import org.springframework.stereotype.Service; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -26,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap; /** * TaskExecutionContextCache */ +@Service public class TaskExecutionContextCacheManagerImpl implements TaskExecutionContextCacheManager { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 747b34faf9..3c7500aa8b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -37,7 +37,7 @@ public class WorkerConfig { @Value("${worker.reserved.memory}") private double workerReservedMemory; - @Value("${worker.group: DEFAULT}") + @Value("${worker.group: default}") private String workerGroup; public String getWorkerGroup() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index 6876f05795..977643c25a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -33,8 +33,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; -import static org.apache.dolphinscheduler.remote.utils.Constants.SLASH; +import static org.apache.dolphinscheduler.common.Constants.COMMA; +import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; +import static org.apache.dolphinscheduler.common.Constants.SLASH; /** @@ -44,8 +45,6 @@ public class WorkerRegistry { private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class); - private static final String DEFAULT_GROUP = "DEFAULT"; - /** * zookeeper registry center */ @@ -74,7 +73,7 @@ public class WorkerRegistry { /** * worker group */ - private final String workerGroup; + private String workerGroup; /** * construct @@ -82,7 +81,7 @@ public class WorkerRegistry { * @param port port */ public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){ - this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_GROUP); + this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_WORKER_GROUP); } /** @@ -144,9 +143,11 @@ public class WorkerRegistry { StringBuilder builder = new StringBuilder(100); String workerPath = this.zookeeperRegistryCenter.getWorkerPath(); builder.append(workerPath).append(SLASH); - if(StringUtils.isNotEmpty(workerGroup) && !DEFAULT_GROUP.equalsIgnoreCase(workerGroup)){ - builder.append(workerGroup.trim()).append(SLASH); + if(StringUtils.isEmpty(workerGroup)){ + workerGroup = DEFAULT_WORKER_GROUP; } + //trim and lower case is need + builder.append(workerGroup.trim().toLowerCase()).append(SLASH); builder.append(address); return builder.toString(); }