Browse Source

add worker group

pull/2/head
Technoboy- 5 years ago
parent
commit
952d6edae8
  1. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 12
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
  3. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
  4. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
  5. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
  6. 90
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
  7. 28
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
  8. 36
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  9. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
  10. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
  11. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  12. 17
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -173,6 +173,11 @@ public final class Constants {
*/
public static final String COMMA = ",";
/**
* slash /
*/
public static final String SLASH = "/";
/**
* COLON :
*/
@ -994,4 +999,9 @@ public final class Constants {
* dataSource sensitive param
*/
public static final String DATASOURCE_PASSWORD_REGEX = "(?<=(\"password\":\")).*?(?=(\"))";
/**
* default worker group
*/
public static final String DEFAULT_WORKER_GROUP = "default";
}

12
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java

@ -160,6 +160,18 @@ public class TaskExecutionContext implements Serializable{
*/
private int taskTimeout;
/**
* worker group
*/
private String workerGroup;
public String getWorkerGroup() {
return workerGroup;
}
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
public Integer getTaskInstanceId() {
return taskInstanceId;

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.context;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
@ -33,23 +34,27 @@ public class ExecutionContext {
/**
* context
*/
private final Object context;
private final TaskExecutionContext context;
/**
* executor type : worker or client
*/
private final ExecutorType executorType;
public ExecutionContext(Object context, ExecutorType executorType) {
public ExecutionContext(TaskExecutionContext context, ExecutorType executorType) {
this.context = context;
this.executorType = executorType;
}
public String getWorkerGroup(){
return context.getWorkerGroup();
}
public ExecutorType getExecutorType() {
return executorType;
}
public Object getContext() {
public TaskExecutionContext getContext() {
return context;
}

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java

@ -136,7 +136,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
TaskExecutionContext taskExecutionContext = (TaskExecutionContext)context.getContext();
TaskExecutionContext taskExecutionContext = context.getContext();
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
break;
case CLIENT:
@ -191,7 +191,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
nodes = zookeeperNodeManager.getWorkerNodes();
nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
break;
case CLIENT:
break;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java

@ -75,7 +75,7 @@ public class RoundRobinHostManager implements HostManager {
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
nodes = zookeeperNodeManager.getWorkerNodes();
nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
break;
case CLIENT:
break;

90
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java

@ -17,9 +17,11 @@
package org.apache.dolphinscheduler.server.registry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.service.zk.AbstractListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,9 +32,12 @@ import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
/**
* zookeeper node manager
*/
@ -47,14 +52,14 @@ public class ZookeeperNodeManager implements InitializingBean {
private final Lock masterLock = new ReentrantLock();
/**
* worker lock
* worker group lock
*/
private final Lock workerLock = new ReentrantLock();
private final Lock workerGroupLock = new ReentrantLock();
/**
* worker nodes
* worker group nodes
*/
private final Set<String> workerNodes = new HashSet<>();
private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>();
/**
* master nodes
@ -84,7 +89,7 @@ public class ZookeeperNodeManager implements InitializingBean {
/**
* init WorkerNodeListener listener
*/
registryCenter.getZookeeperCachedOperator().addListener(new WorkerNodeListener());
registryCenter.getZookeeperCachedOperator().addListener(new WorkerGroupNodeListener());
}
/**
@ -98,39 +103,55 @@ public class ZookeeperNodeManager implements InitializingBean {
syncMasterNodes(masterNodes);
/**
* worker nodes from zookeeper
* worker group nodes from zookeeper
*/
Set<String> workersNodes = registryCenter.getWorkerNodesDirectly();
syncWorkerNodes(workersNodes);
Set<String> workerGroups = registryCenter.getWorkerGroupDirectly();
for(String workerGroup : workerGroups){
syncWorkerGroupNodes(workerGroup, registryCenter.getWorkerGroupNodesDirectly(workerGroup));
}
}
/**
* worker node listener
* worker group node listener
*/
class WorkerNodeListener extends AbstractListener {
class WorkerGroupNodeListener extends AbstractListener {
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
if(registryCenter.isWorkerPath(path)){
try {
if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) {
logger.info("worker node : {} added.", path);
logger.info("worker group node : {} added.", path);
String group = parseGroup(path);
Set<String> workerNodes = workerGroupNodes.getOrDefault(group, new HashSet<>());
Set<String> previousNodes = new HashSet<>(workerNodes);
Set<String> currentNodes = registryCenter.getWorkerNodesDirectly();
syncWorkerNodes(currentNodes);
Set<String> currentNodes = registryCenter.getWorkerGroupNodesDirectly(group);
logger.info("currentNodes : {}", currentNodes);
syncWorkerGroupNodes(group, currentNodes);
} else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) {
logger.info("worker node : {} down.", path);
logger.info("worker group node : {} down.", path);
String group = parseGroup(path);
Set<String> workerNodes = workerGroupNodes.getOrDefault(group, new HashSet<>());
Set<String> previousNodes = new HashSet<>(workerNodes);
Set<String> currentNodes = registryCenter.getWorkerNodesDirectly();
syncWorkerNodes(currentNodes);
Set<String> currentNodes = registryCenter.getWorkerGroupNodesDirectly(group);
syncWorkerGroupNodes(group, currentNodes);
}
} catch (IllegalArgumentException ignore) {
logger.warn(ignore.getMessage());
} catch (Exception ex) {
logger.error("WorkerListener capture data change and get data failed", ex);
logger.error("WorkerGroupListener capture data change and get data failed", ex);
}
}
}
private String parseGroup(String path){
String[] parts = path.split("\\/");
if(parts.length != 6){
throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
}
String group = parts[4];
return group;
}
}
@ -189,29 +210,42 @@ public class ZookeeperNodeManager implements InitializingBean {
}
/**
* sync worker nodes
* @param nodes worker nodes
* sync worker group nodes
* @param workerGroup
* @param nodes
*/
private void syncWorkerNodes(Set<String> nodes){
workerLock.lock();
private void syncWorkerGroupNodes(String workerGroup, Set<String> nodes){
workerGroupLock.lock();
try {
workerGroup = workerGroup.toLowerCase();
Set<String> workerNodes = workerGroupNodes.getOrDefault(workerGroup, new HashSet<>());
workerNodes.clear();
workerNodes.addAll(nodes);
workerGroupNodes.put(workerGroup, workerNodes);
} finally {
workerLock.unlock();
workerGroupLock.unlock();
}
}
/**
* get worker nodes
* @return worker nodes
* get worker group nodes
* @param workerGroup
* @return
*/
public Set<String> getWorkerNodes(){
workerLock.lock();
public Set<String> getWorkerGroupNodes(String workerGroup){
workerGroupLock.lock();
try {
return Collections.unmodifiableSet(workerNodes);
if(StringUtils.isEmpty(workerGroup)){
workerGroup = DEFAULT_WORKER_GROUP;
}
workerGroup = workerGroup.toLowerCase();
Set<String> nodes = workerGroupNodes.get(workerGroup);
if(CollectionUtils.isNotEmpty(nodes)){
return Collections.unmodifiableSet(nodes);
}
return nodes;
} finally {
workerLock.unlock();
workerGroupLock.unlock();
}
}

28
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java

@ -127,6 +127,25 @@ public class ZookeeperRegistryCenter implements InitializingBean {
return new HashSet<>(workers);
}
/**
* get worker group directly
* @return
*/
public Set<String> getWorkerGroupDirectly() {
List<String> workers = getChildrenKeys(getWorkerPath());
return new HashSet<>(workers);
}
/**
* get worker group nodes
* @param workerGroup
* @return
*/
public Set<String> getWorkerGroupNodesDirectly(String workerGroup) {
List<String> workers = getChildrenKeys(getWorkerGroupPath(workerGroup));
return new HashSet<>(workers);
}
/**
* whether worker path
* @param path path
@ -145,6 +164,15 @@ public class ZookeeperRegistryCenter implements InitializingBean {
return path != null && path.contains(MASTER_PATH);
}
/**
* get worker group path
* @param workerGroup
* @return
*/
public String getWorkerGroupPath(String workerGroup) {
return WORKER_PATH + "/" + workerGroup;
}
/**
* get children nodes
* @param key key

36
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
@ -37,13 +36,11 @@ import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import javax.annotation.PostConstruct;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -58,30 +55,17 @@ public class WorkerServer implements IStoppable {
*/
private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class);
/**
* zk worker client
*/
@Autowired
private ZKWorkerClient zkWorkerClient = null;
/**
* alert database access
*/
@Autowired
private AlertDao alertDao;
/**
* task queue impl
*/
protected ITaskQueue taskQueue;
/**
* kill executor service
*/
private ExecutorService killExecutorService;
/**
* fetch task executor service
*/
@ -92,9 +76,6 @@ public class WorkerServer implements IStoppable {
*/
private CountDownLatch latch;
@Value("${server.is-combined-server:false}")
private Boolean isCombinedServer;
/**
* worker config
*/
@ -157,8 +138,6 @@ public class WorkerServer implements IStoppable {
this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
this.killExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Kill-Thread-Executor");
this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
zkWorkerClient.setStoppable(this);
@ -169,19 +148,17 @@ public class WorkerServer implements IStoppable {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
stop("shutdownhook");
stop("shutdownHook");
}
}));
//let the main thread await
latch = new CountDownLatch(1);
if (!isCombinedServer) {
try {
latch.await();
} catch (InterruptedException ignore) {
}
}
}
@Override
public synchronized void stop(String cause) {
@ -210,17 +187,10 @@ public class WorkerServer implements IStoppable {
try {
ThreadPoolExecutors.getInstance().shutdown();
}catch (Exception e){
logger.warn("threadpool service stopped exception:{}",e.getMessage());
logger.warn("threadPool service stopped exception:{}",e.getMessage());
}
logger.info("threadpool service stopped");
try {
killExecutorService.shutdownNow();
}catch (Exception e){
logger.warn("worker kill executor service stopped exception:{}",e.getMessage());
}
logger.info("worker kill executor service stopped");
logger.info("threadPool service stopped");
try {
fetchTaskExecutorService.shutdownNow();

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java vendored

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.cache;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
/**

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java vendored

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.cache.impl;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -26,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* TaskExecutionContextCache
*/
@Service
public class TaskExecutionContextCacheManagerImpl implements TaskExecutionContextCacheManager {

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -37,7 +37,7 @@ public class WorkerConfig {
@Value("${worker.reserved.memory}")
private double workerReservedMemory;
@Value("${worker.group: DEFAULT}")
@Value("${worker.group: default}")
private String workerGroup;
public String getWorkerGroup() {

17
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -33,8 +33,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
import static org.apache.dolphinscheduler.remote.utils.Constants.SLASH;
import static org.apache.dolphinscheduler.common.Constants.COMMA;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.SLASH;
/**
@ -44,8 +45,6 @@ public class WorkerRegistry {
private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class);
private static final String DEFAULT_GROUP = "DEFAULT";
/**
* zookeeper registry center
*/
@ -74,7 +73,7 @@ public class WorkerRegistry {
/**
* worker group
*/
private final String workerGroup;
private String workerGroup;
/**
* construct
@ -82,7 +81,7 @@ public class WorkerRegistry {
* @param port port
*/
public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){
this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_GROUP);
this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_WORKER_GROUP);
}
/**
@ -144,9 +143,11 @@ public class WorkerRegistry {
StringBuilder builder = new StringBuilder(100);
String workerPath = this.zookeeperRegistryCenter.getWorkerPath();
builder.append(workerPath).append(SLASH);
if(StringUtils.isNotEmpty(workerGroup) && !DEFAULT_GROUP.equalsIgnoreCase(workerGroup)){
builder.append(workerGroup.trim()).append(SLASH);
if(StringUtils.isEmpty(workerGroup)){
workerGroup = DEFAULT_WORKER_GROUP;
}
//trim and lower case is need
builder.append(workerGroup.trim().toLowerCase()).append(SLASH);
builder.append(address);
return builder.toString();
}

Loading…
Cancel
Save