Browse Source

Refactor worker (#2042)

* Refactor worker (#10)

* Refactor worker (#2000)

* Refactor worker (#2)

* Refactor worker (#1993)

* Refactor worker (#1)

* add TaskResponseProcessor (#1983)

* 1, master persistent task 2. extract  master and worker communication model (#1992)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* TaskExecutionContext create modify (#1994)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

* add- register processor

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* buildAckCommand taskInstanceId not set modify (#2002)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify (#2004)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment (#2006)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type (#2012)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* Refactor worker (#2018)

* Refactor worker (#7)

* Refactor worker (#2000)

* Refactor worker (#2)

* Refactor worker (#1993)

* Refactor worker (#1)

* add TaskResponseProcessor (#1983)

* 1, master persistent task 2. extract  master and worker communication model (#1992)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* TaskExecutionContext create modify (#1994)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

* add- register processor

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* buildAckCommand taskInstanceId not set modify (#2002)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify (#2004)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment (#2006)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type (#2012)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* Refactor worker (#8)

* Refactor worker (#2000)

* Refactor worker (#2)

* Refactor worker (#1993)

* Refactor worker (#1)

* add TaskResponseProcessor (#1983)

* 1, master persistent task 2. extract  master and worker communication model (#1992)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* TaskExecutionContext create modify (#1994)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

* add- register processor

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* buildAckCommand taskInstanceId not set modify (#2002)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify (#2004)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment (#2006)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type (#2012)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* add kill command

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* add TaskInstanceCacheManager receive Worker report result,modify master polling db transfrom to cache (#2021)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* refactor heartbeat logic

* update registry and add worker group

* add worker group

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>
pull/2/head
Tboy 5 years ago committed by GitHub
parent
commit
3bad56ca15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 12
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
  3. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
  4. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
  5. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
  6. 90
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
  7. 28
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
  8. 36
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  9. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
  10. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
  11. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  12. 17
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -173,6 +173,11 @@ public final class Constants {
*/ */
public static final String COMMA = ","; public static final String COMMA = ",";
/**
* slash /
*/
public static final String SLASH = "/";
/** /**
* COLON : * COLON :
*/ */
@ -994,4 +999,9 @@ public final class Constants {
* dataSource sensitive param * dataSource sensitive param
*/ */
public static final String DATASOURCE_PASSWORD_REGEX = "(?<=(\"password\":\")).*?(?=(\"))"; public static final String DATASOURCE_PASSWORD_REGEX = "(?<=(\"password\":\")).*?(?=(\"))";
/**
* default worker group
*/
public static final String DEFAULT_WORKER_GROUP = "default";
} }

12
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java

@ -160,6 +160,18 @@ public class TaskExecutionContext implements Serializable{
*/ */
private int taskTimeout; private int taskTimeout;
/**
* worker group
*/
private String workerGroup;
public String getWorkerGroup() {
return workerGroup;
}
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
public Integer getTaskInstanceId() { public Integer getTaskInstanceId() {
return taskInstanceId; return taskInstanceId;

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.context; package org.apache.dolphinscheduler.server.master.dispatch.context;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
@ -33,23 +34,27 @@ public class ExecutionContext {
/** /**
* context * context
*/ */
private final Object context; private final TaskExecutionContext context;
/** /**
* executor type : worker or client * executor type : worker or client
*/ */
private final ExecutorType executorType; private final ExecutorType executorType;
public ExecutionContext(Object context, ExecutorType executorType) { public ExecutionContext(TaskExecutionContext context, ExecutorType executorType) {
this.context = context; this.context = context;
this.executorType = executorType; this.executorType = executorType;
} }
public String getWorkerGroup(){
return context.getWorkerGroup();
}
public ExecutorType getExecutorType() { public ExecutorType getExecutorType() {
return executorType; return executorType;
} }
public Object getContext() { public TaskExecutionContext getContext() {
return context; return context;
} }

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java

@ -136,7 +136,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
ExecutorType executorType = context.getExecutorType(); ExecutorType executorType = context.getExecutorType();
switch (executorType){ switch (executorType){
case WORKER: case WORKER:
TaskExecutionContext taskExecutionContext = (TaskExecutionContext)context.getContext(); TaskExecutionContext taskExecutionContext = context.getContext();
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext)); requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
break; break;
case CLIENT: case CLIENT:
@ -191,7 +191,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
ExecutorType executorType = context.getExecutorType(); ExecutorType executorType = context.getExecutorType();
switch (executorType){ switch (executorType){
case WORKER: case WORKER:
nodes = zookeeperNodeManager.getWorkerNodes(); nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
break; break;
case CLIENT: case CLIENT:
break; break;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java

@ -75,7 +75,7 @@ public class RoundRobinHostManager implements HostManager {
ExecutorType executorType = context.getExecutorType(); ExecutorType executorType = context.getExecutorType();
switch (executorType){ switch (executorType){
case WORKER: case WORKER:
nodes = zookeeperNodeManager.getWorkerNodes(); nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
break; break;
case CLIENT: case CLIENT:
break; break;

90
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java

@ -17,9 +17,11 @@
package org.apache.dolphinscheduler.server.registry; package org.apache.dolphinscheduler.server.registry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.service.zk.AbstractListener; import org.apache.dolphinscheduler.service.zk.AbstractListener;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -30,9 +32,12 @@ import org.springframework.stereotype.Service;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
/** /**
* zookeeper node manager * zookeeper node manager
*/ */
@ -47,14 +52,14 @@ public class ZookeeperNodeManager implements InitializingBean {
private final Lock masterLock = new ReentrantLock(); private final Lock masterLock = new ReentrantLock();
/** /**
* worker lock * worker group lock
*/ */
private final Lock workerLock = new ReentrantLock(); private final Lock workerGroupLock = new ReentrantLock();
/** /**
* worker nodes * worker group nodes
*/ */
private final Set<String> workerNodes = new HashSet<>(); private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>();
/** /**
* master nodes * master nodes
@ -84,7 +89,7 @@ public class ZookeeperNodeManager implements InitializingBean {
/** /**
* init WorkerNodeListener listener * init WorkerNodeListener listener
*/ */
registryCenter.getZookeeperCachedOperator().addListener(new WorkerNodeListener()); registryCenter.getZookeeperCachedOperator().addListener(new WorkerGroupNodeListener());
} }
/** /**
@ -98,39 +103,55 @@ public class ZookeeperNodeManager implements InitializingBean {
syncMasterNodes(masterNodes); syncMasterNodes(masterNodes);
/** /**
* worker nodes from zookeeper * worker group nodes from zookeeper
*/ */
Set<String> workersNodes = registryCenter.getWorkerNodesDirectly(); Set<String> workerGroups = registryCenter.getWorkerGroupDirectly();
syncWorkerNodes(workersNodes); for(String workerGroup : workerGroups){
syncWorkerGroupNodes(workerGroup, registryCenter.getWorkerGroupNodesDirectly(workerGroup));
}
} }
/** /**
* worker node listener * worker group node listener
*/ */
class WorkerNodeListener extends AbstractListener { class WorkerGroupNodeListener extends AbstractListener {
@Override @Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
if(registryCenter.isWorkerPath(path)){ if(registryCenter.isWorkerPath(path)){
try { try {
if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) { if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) {
logger.info("worker node : {} added.", path); logger.info("worker group node : {} added.", path);
String group = parseGroup(path);
Set<String> workerNodes = workerGroupNodes.getOrDefault(group, new HashSet<>());
Set<String> previousNodes = new HashSet<>(workerNodes); Set<String> previousNodes = new HashSet<>(workerNodes);
Set<String> currentNodes = registryCenter.getWorkerNodesDirectly(); Set<String> currentNodes = registryCenter.getWorkerGroupNodesDirectly(group);
syncWorkerNodes(currentNodes); logger.info("currentNodes : {}", currentNodes);
syncWorkerGroupNodes(group, currentNodes);
} else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) { } else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) {
logger.info("worker node : {} down.", path); logger.info("worker group node : {} down.", path);
String group = parseGroup(path);
Set<String> workerNodes = workerGroupNodes.getOrDefault(group, new HashSet<>());
Set<String> previousNodes = new HashSet<>(workerNodes); Set<String> previousNodes = new HashSet<>(workerNodes);
Set<String> currentNodes = registryCenter.getWorkerNodesDirectly(); Set<String> currentNodes = registryCenter.getWorkerGroupNodesDirectly(group);
syncWorkerNodes(currentNodes); syncWorkerGroupNodes(group, currentNodes);
} }
} catch (IllegalArgumentException ignore) { } catch (IllegalArgumentException ignore) {
logger.warn(ignore.getMessage()); logger.warn(ignore.getMessage());
} catch (Exception ex) { } catch (Exception ex) {
logger.error("WorkerListener capture data change and get data failed", ex); logger.error("WorkerGroupListener capture data change and get data failed", ex);
} }
} }
} }
private String parseGroup(String path){
String[] parts = path.split("\\/");
if(parts.length != 6){
throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
}
String group = parts[4];
return group;
}
} }
@ -189,29 +210,42 @@ public class ZookeeperNodeManager implements InitializingBean {
} }
/** /**
* sync worker nodes * sync worker group nodes
* @param nodes worker nodes * @param workerGroup
* @param nodes
*/ */
private void syncWorkerNodes(Set<String> nodes){ private void syncWorkerGroupNodes(String workerGroup, Set<String> nodes){
workerLock.lock(); workerGroupLock.lock();
try { try {
workerGroup = workerGroup.toLowerCase();
Set<String> workerNodes = workerGroupNodes.getOrDefault(workerGroup, new HashSet<>());
workerNodes.clear(); workerNodes.clear();
workerNodes.addAll(nodes); workerNodes.addAll(nodes);
workerGroupNodes.put(workerGroup, workerNodes);
} finally { } finally {
workerLock.unlock(); workerGroupLock.unlock();
} }
} }
/** /**
* get worker nodes * get worker group nodes
* @return worker nodes * @param workerGroup
* @return
*/ */
public Set<String> getWorkerNodes(){ public Set<String> getWorkerGroupNodes(String workerGroup){
workerLock.lock(); workerGroupLock.lock();
try { try {
return Collections.unmodifiableSet(workerNodes); if(StringUtils.isEmpty(workerGroup)){
workerGroup = DEFAULT_WORKER_GROUP;
}
workerGroup = workerGroup.toLowerCase();
Set<String> nodes = workerGroupNodes.get(workerGroup);
if(CollectionUtils.isNotEmpty(nodes)){
return Collections.unmodifiableSet(nodes);
}
return nodes;
} finally { } finally {
workerLock.unlock(); workerGroupLock.unlock();
} }
} }

28
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java

@ -127,6 +127,25 @@ public class ZookeeperRegistryCenter implements InitializingBean {
return new HashSet<>(workers); return new HashSet<>(workers);
} }
/**
* get worker group directly
* @return
*/
public Set<String> getWorkerGroupDirectly() {
List<String> workers = getChildrenKeys(getWorkerPath());
return new HashSet<>(workers);
}
/**
* get worker group nodes
* @param workerGroup
* @return
*/
public Set<String> getWorkerGroupNodesDirectly(String workerGroup) {
List<String> workers = getChildrenKeys(getWorkerGroupPath(workerGroup));
return new HashSet<>(workers);
}
/** /**
* whether worker path * whether worker path
* @param path path * @param path path
@ -145,6 +164,15 @@ public class ZookeeperRegistryCenter implements InitializingBean {
return path != null && path.contains(MASTER_PATH); return path != null && path.contains(MASTER_PATH);
} }
/**
* get worker group path
* @param workerGroup
* @return
*/
public String getWorkerGroupPath(String workerGroup) {
return WORKER_PATH + "/" + workerGroup;
}
/** /**
* get children nodes * get children nodes
* @param key key * @param key key

36
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
@ -37,13 +36,11 @@ import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.WebApplicationType; import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ComponentScan;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -58,30 +55,17 @@ public class WorkerServer implements IStoppable {
*/ */
private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class); private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class);
/** /**
* zk worker client * zk worker client
*/ */
@Autowired @Autowired
private ZKWorkerClient zkWorkerClient = null; private ZKWorkerClient zkWorkerClient = null;
/**
* alert database access
*/
@Autowired
private AlertDao alertDao;
/** /**
* task queue impl * task queue impl
*/ */
protected ITaskQueue taskQueue; protected ITaskQueue taskQueue;
/**
* kill executor service
*/
private ExecutorService killExecutorService;
/** /**
* fetch task executor service * fetch task executor service
*/ */
@ -92,9 +76,6 @@ public class WorkerServer implements IStoppable {
*/ */
private CountDownLatch latch; private CountDownLatch latch;
@Value("${server.is-combined-server:false}")
private Boolean isCombinedServer;
/** /**
* worker config * worker config
*/ */
@ -157,8 +138,6 @@ public class WorkerServer implements IStoppable {
this.taskQueue = TaskQueueFactory.getTaskQueueInstance(); this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
this.killExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Kill-Thread-Executor");
this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor"); this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
zkWorkerClient.setStoppable(this); zkWorkerClient.setStoppable(this);
@ -169,19 +148,17 @@ public class WorkerServer implements IStoppable {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
stop("shutdownhook"); stop("shutdownHook");
} }
})); }));
//let the main thread await //let the main thread await
latch = new CountDownLatch(1); latch = new CountDownLatch(1);
if (!isCombinedServer) {
try { try {
latch.await(); latch.await();
} catch (InterruptedException ignore) { } catch (InterruptedException ignore) {
} }
} }
}
@Override @Override
public synchronized void stop(String cause) { public synchronized void stop(String cause) {
@ -210,17 +187,10 @@ public class WorkerServer implements IStoppable {
try { try {
ThreadPoolExecutors.getInstance().shutdown(); ThreadPoolExecutors.getInstance().shutdown();
}catch (Exception e){ }catch (Exception e){
logger.warn("threadpool service stopped exception:{}",e.getMessage()); logger.warn("threadPool service stopped exception:{}",e.getMessage());
} }
logger.info("threadpool service stopped"); logger.info("threadPool service stopped");
try {
killExecutorService.shutdownNow();
}catch (Exception e){
logger.warn("worker kill executor service stopped exception:{}",e.getMessage());
}
logger.info("worker kill executor service stopped");
try { try {
fetchTaskExecutorService.shutdownNow(); fetchTaskExecutorService.shutdownNow();

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java vendored

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.cache; package org.apache.dolphinscheduler.server.worker.cache;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
/** /**

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java vendored

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.cache.impl;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.springframework.stereotype.Service;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -26,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
/** /**
* TaskExecutionContextCache * TaskExecutionContextCache
*/ */
@Service
public class TaskExecutionContextCacheManagerImpl implements TaskExecutionContextCacheManager { public class TaskExecutionContextCacheManagerImpl implements TaskExecutionContextCacheManager {

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -37,7 +37,7 @@ public class WorkerConfig {
@Value("${worker.reserved.memory}") @Value("${worker.reserved.memory}")
private double workerReservedMemory; private double workerReservedMemory;
@Value("${worker.group: DEFAULT}") @Value("${worker.group: default}")
private String workerGroup; private String workerGroup;
public String getWorkerGroup() { public String getWorkerGroup() {

17
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -33,8 +33,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; import static org.apache.dolphinscheduler.common.Constants.COMMA;
import static org.apache.dolphinscheduler.remote.utils.Constants.SLASH; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.SLASH;
/** /**
@ -44,8 +45,6 @@ public class WorkerRegistry {
private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class); private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class);
private static final String DEFAULT_GROUP = "DEFAULT";
/** /**
* zookeeper registry center * zookeeper registry center
*/ */
@ -74,7 +73,7 @@ public class WorkerRegistry {
/** /**
* worker group * worker group
*/ */
private final String workerGroup; private String workerGroup;
/** /**
* construct * construct
@ -82,7 +81,7 @@ public class WorkerRegistry {
* @param port port * @param port port
*/ */
public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){ public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){
this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_GROUP); this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_WORKER_GROUP);
} }
/** /**
@ -144,9 +143,11 @@ public class WorkerRegistry {
StringBuilder builder = new StringBuilder(100); StringBuilder builder = new StringBuilder(100);
String workerPath = this.zookeeperRegistryCenter.getWorkerPath(); String workerPath = this.zookeeperRegistryCenter.getWorkerPath();
builder.append(workerPath).append(SLASH); builder.append(workerPath).append(SLASH);
if(StringUtils.isNotEmpty(workerGroup) && !DEFAULT_GROUP.equalsIgnoreCase(workerGroup)){ if(StringUtils.isEmpty(workerGroup)){
builder.append(workerGroup.trim()).append(SLASH); workerGroup = DEFAULT_WORKER_GROUP;
} }
//trim and lower case is need
builder.append(workerGroup.trim().toLowerCase()).append(SLASH);
builder.append(address); builder.append(address);
return builder.toString(); return builder.toString();
} }

Loading…
Cancel
Save