Browse Source

Refactor worker (#6)

* Refactor worker (#2000)

* Refactor worker (#2)

* Refactor worker (#1993)

* Refactor worker (#1)

* add TaskResponseProcessor (#1983)

* 1, master persistent task 2. extract  master and worker communication model (#1992)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* TaskExecutionContext create modify (#1994)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

* add- register processor

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* buildAckCommand taskInstanceId not set modify (#2002)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify (#2004)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment (#2006)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>
pull/2/head
Tboy 5 years ago committed by GitHub
parent
commit
89a70f8b2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 36
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
  2. 51
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
  3. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
  4. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java
  5. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java
  6. 21
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
  7. 23
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
  8. 71
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
  9. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
  10. 23
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
  11. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
  12. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
  13. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
  14. 29
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
  15. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
  16. 71
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
  17. 61
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java

36
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java

@ -24,10 +24,19 @@ import java.util.Objects;
*/ */
public class Host implements Serializable { public class Host implements Serializable {
/**
* address
*/
private String address; private String address;
/**
* ip
*/
private String ip; private String ip;
/**
* port
*/
private int port; private int port;
public Host() { public Host() {
@ -65,6 +74,11 @@ public class Host implements Serializable {
this.address = ip + ":" + port; this.address = ip + ":" + port;
} }
/**
* address convert host
* @param address address
* @return host
*/
public static Host of(String address){ public static Host of(String address){
String[] parts = address.split(":"); String[] parts = address.split(":");
if (parts.length != 2) { if (parts.length != 2) {
@ -74,17 +88,14 @@ public class Host implements Serializable {
return host; return host;
} }
@Override
public String toString() {
return "Host{" +
"address='" + address + '\'' +
'}';
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) {
if (o == null || getClass() != o.getClass()) return false; return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Host host = (Host) o; Host host = (Host) o;
return Objects.equals(getAddress(), host.getAddress()); return Objects.equals(getAddress(), host.getAddress());
} }
@ -93,4 +104,11 @@ public class Host implements Serializable {
public int hashCode() { public int hashCode() {
return Objects.hash(getAddress()); return Objects.hash(getAddress());
} }
@Override
public String toString() {
return "Host{" +
"address='" + address + '\'' +
'}';
}
} }

51
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java

@ -32,12 +32,21 @@ import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/**
* executor dispatcher
*/
@Service @Service
public class ExecutorDispatcher implements InitializingBean { public class ExecutorDispatcher implements InitializingBean {
/**
* netty executor manager
*/
@Autowired @Autowired
private NettyExecutorManager nettyExecutorManager; private NettyExecutorManager nettyExecutorManager;
/**
* round robin host manager
*/
@Autowired @Autowired
private RoundRobinHostManager hostManager; private RoundRobinHostManager hostManager;
@ -47,30 +56,54 @@ public class ExecutorDispatcher implements InitializingBean {
this.executorManagers = new ConcurrentHashMap<>(); this.executorManagers = new ConcurrentHashMap<>();
} }
public void dispatch(final ExecutionContext executeContext) throws ExecuteException { /**
ExecutorManager executorManager = this.executorManagers.get(executeContext.getExecutorType()); * task dispatch
* @param context context
* @throws ExecuteException
*/
public void dispatch(final ExecutionContext context) throws ExecuteException {
/**
* get executor manager
*/
ExecutorManager executorManager = this.executorManagers.get(context.getExecutorType());
if(executorManager == null){ if(executorManager == null){
throw new ExecuteException("no ExecutorManager for type : " + executeContext.getExecutorType()); throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
} }
Host host = hostManager.select(executeContext);
/**
* host select
*/
Host host = hostManager.select(context);
if (StringUtils.isEmpty(host.getAddress())) { if (StringUtils.isEmpty(host.getAddress())) {
throw new ExecuteException(String.format("fail to execute : %s due to no worker ", executeContext.getContext())); throw new ExecuteException(String.format("fail to execute : %s due to no worker ", context.getContext()));
} }
executeContext.setHost(host); context.setHost(host);
executorManager.beforeExecute(executeContext); executorManager.beforeExecute(context);
try { try {
executorManager.execute(executeContext); /**
* task execute
*/
executorManager.execute(context);
} finally { } finally {
executorManager.afterExecute(executeContext); executorManager.afterExecute(context);
} }
} }
/**
* register init
* @throws Exception
*/
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
register(ExecutorType.WORKER, nettyExecutorManager); register(ExecutorType.WORKER, nettyExecutorManager);
register(ExecutorType.CLIENT, nettyExecutorManager); register(ExecutorType.CLIENT, nettyExecutorManager);
} }
/**
* register
* @param type executor type
* @param executorManager executorManager
*/
public void register(ExecutorType type, ExecutorManager executorManager){ public void register(ExecutorType type, ExecutorManager executorManager){
executorManagers.put(type, executorManager); executorManagers.put(type, executorManager);
} }

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java

@ -20,12 +20,24 @@ package org.apache.dolphinscheduler.server.master.dispatch.context;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
/**
* execution context
*/
public class ExecutionContext { public class ExecutionContext {
/**
* host
*/
private Host host; private Host host;
/**
* context
*/
private final Object context; private final Object context;
/**
* executor type : worker or client
*/
private final ExecutorType executorType; private final ExecutorType executorType;
public ExecutionContext(Object context, ExecutorType executorType) { public ExecutionContext(Object context, ExecutorType executorType) {

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java

@ -16,7 +16,9 @@
*/ */
package org.apache.dolphinscheduler.server.master.dispatch.enums; package org.apache.dolphinscheduler.server.master.dispatch.enums;
/**
* executor type
*/
public enum ExecutorType { public enum ExecutorType {
WORKER, WORKER,

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java

@ -17,7 +17,9 @@
package org.apache.dolphinscheduler.server.master.dispatch.exceptions; package org.apache.dolphinscheduler.server.master.dispatch.exceptions;
/**
* execute exception
*/
public class ExecuteException extends Exception{ public class ExecuteException extends Exception{
public ExecuteException() { public ExecuteException() {

21
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java

@ -20,17 +20,26 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
/**
* abstract executor manager
*/
public abstract class AbstractExecutorManager implements ExecutorManager{ public abstract class AbstractExecutorManager implements ExecutorManager{
/**
* before execute , add time monitor timeout
* @param context context
* @throws ExecuteException
*/
@Override @Override
public void beforeExecute(ExecutionContext executeContext) throws ExecuteException { public void beforeExecute(ExecutionContext context) throws ExecuteException {
//TODO add time monitor
} }
/**
* after execute , add dispatch monitor
* @param context context
* @throws ExecuteException
*/
@Override @Override
public void afterExecute(ExecutionContext executeContext) throws ExecuteException { public void afterExecute(ExecutionContext context) throws ExecuteException {
//TODO add dispatch monitor
} }
} }

23
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java

@ -20,12 +20,29 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
/**
* executor manager
*/
public interface ExecutorManager { public interface ExecutorManager {
/**
* before execute
* @param executeContext executeContext
* @throws ExecuteException
*/
void beforeExecute(ExecutionContext executeContext) throws ExecuteException; void beforeExecute(ExecutionContext executeContext) throws ExecuteException;
void execute(ExecutionContext executeContext) throws ExecuteException; /**
* execute task
* @param context context
* @throws ExecuteException
*/
void execute(ExecutionContext context) throws ExecuteException;
void afterExecute(ExecutionContext executeContext) throws ExecuteException; /**
* after execute
* @param context context
* @throws ExecuteException
*/
void afterExecute(ExecutionContext context) throws ExecuteException;
} }

71
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java

@ -42,15 +42,23 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
/**
* netty executor manager
*/
@Service @Service
public class NettyExecutorManager extends AbstractExecutorManager{ public class NettyExecutorManager extends AbstractExecutorManager{
private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class); private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class);
/**
* zookeeper node manager
*/
@Autowired @Autowired
private ZookeeperNodeManager zookeeperNodeManager; private ZookeeperNodeManager zookeeperNodeManager;
/**
* netty remote client
*/
private final NettyRemotingClient nettyRemotingClient; private final NettyRemotingClient nettyRemotingClient;
public NettyExecutorManager(){ public NettyExecutorManager(){
@ -60,29 +68,48 @@ public class NettyExecutorManager extends AbstractExecutorManager{
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor()); this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
} }
/**
* execute logic
* @param context context
* @throws ExecuteException
*/
@Override @Override
public void execute(ExecutionContext executeContext) throws ExecuteException { public void execute(ExecutionContext context) throws ExecuteException {
Set<String> allNodes = getAllNodes(executeContext);
/**
* all nodes
*/
Set<String> allNodes = getAllNodes(context);
/**
* fail nodes
*/
Set<String> failNodeSet = new HashSet<>(); Set<String> failNodeSet = new HashSet<>();
//
Command command = buildCommand(executeContext); /**
Host host = executeContext.getHost(); * build command accord executeContext
*/
Command command = buildCommand(context);
/**
* execute task host
*/
Host host = context.getHost();
boolean success = false; boolean success = false;
//
while (!success) { while (!success) {
try { try {
doExecute(host, command); doExecute(host,command);
success = true; success = true;
executeContext.setHost(host); context.setHost(host);
} catch (ExecuteException ex) { } catch (ExecuteException ex) {
logger.error(String.format("execute context : %s error", executeContext.getContext()), ex); logger.error(String.format("execute context : %s error", context.getContext()), ex);
try { try {
failNodeSet.add(host.getAddress()); failNodeSet.add(host.getAddress());
Set<String> tmpAllIps = new HashSet<>(allNodes); Set<String> tmpAllIps = new HashSet<>(allNodes);
Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet); Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
if (remained != null && remained.size() > 0) { if (remained != null && remained.size() > 0) {
host = Host.of(remained.iterator().next()); host = Host.of(remained.iterator().next());
logger.error("retry execute context : {} host : {}", executeContext.getContext(), host); logger.error("retry execute context : {} host : {}", context.getContext(), host);
} else { } else {
throw new ExecuteException("fail after try all nodes"); throw new ExecuteException("fail after try all nodes");
} }
@ -93,6 +120,11 @@ public class NettyExecutorManager extends AbstractExecutorManager{
} }
} }
/**
* build command
* @param context context
* @return command
*/
private Command buildCommand(ExecutionContext context) { private Command buildCommand(ExecutionContext context) {
ExecuteTaskRequestCommand requestCommand = new ExecuteTaskRequestCommand(); ExecuteTaskRequestCommand requestCommand = new ExecuteTaskRequestCommand();
ExecutorType executorType = context.getExecutorType(); ExecutorType executorType = context.getExecutorType();
@ -110,7 +142,16 @@ public class NettyExecutorManager extends AbstractExecutorManager{
return requestCommand.convert2Command(); return requestCommand.convert2Command();
} }
/**
* execute logic
* @param host host
* @param command command
* @throws ExecuteException
*/
private void doExecute(final Host host, final Command command) throws ExecuteException { private void doExecute(final Host host, final Command command) throws ExecuteException {
/**
* retry countdefault retry 3
*/
int retryCount = 3; int retryCount = 3;
boolean success = false; boolean success = false;
do { do {
@ -131,8 +172,16 @@ public class NettyExecutorManager extends AbstractExecutorManager{
} }
} }
/**
* get all nodes
* @param context context
* @return nodes
*/
private Set<String> getAllNodes(ExecutionContext context){ private Set<String> getAllNodes(ExecutionContext context){
Set<String> nodes = Collections.EMPTY_SET; Set<String> nodes = Collections.EMPTY_SET;
/**
* executor type
*/
ExecutorType executorType = context.getExecutorType(); ExecutorType executorType = context.getExecutorType();
switch (executorType){ switch (executorType){
case WORKER: case WORKER:

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java

@ -21,8 +21,16 @@ package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
/**
* host manager
*/
public interface HostManager { public interface HostManager {
/**
* select host
* @param context context
* @return host
*/
Host select(ExecutionContext context); Host select(ExecutionContext context);
} }

23
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java

@ -34,24 +34,44 @@ import java.util.Collection;
import java.util.List; import java.util.List;
/**
* round robin host manager
*/
@Service @Service
public class RoundRobinHostManager implements HostManager { public class RoundRobinHostManager implements HostManager {
private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class); private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class);
/**
* zookeeperNodeManager
*/
@Autowired @Autowired
private ZookeeperNodeManager zookeeperNodeManager; private ZookeeperNodeManager zookeeperNodeManager;
/**
* selector
*/
private final Selector<Host> selector; private final Selector<Host> selector;
/**
* set round robin
*/
public RoundRobinHostManager(){ public RoundRobinHostManager(){
this.selector = new RoundRobinSelector<>(); this.selector = new RoundRobinSelector<>();
} }
/**
* select host
* @param context context
* @return host
*/
@Override @Override
public Host select(ExecutionContext context){ public Host select(ExecutionContext context){
Host host = new Host(); Host host = new Host();
Collection<String> nodes = null; Collection<String> nodes = null;
/**
* executor type
*/
ExecutorType executorType = context.getExecutorType(); ExecutorType executorType = context.getExecutorType();
switch (executorType){ switch (executorType){
case WORKER: case WORKER:
@ -69,6 +89,9 @@ public class RoundRobinHostManager implements HostManager {
List<Host> candidateHosts = new ArrayList<>(nodes.size()); List<Host> candidateHosts = new ArrayList<>(nodes.size());
nodes.stream().forEach(node -> candidateHosts.add(Host.of(node))); nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
/**
* select
*/
return selector.select(candidateHosts); return selector.select(candidateHosts);
} }

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java

@ -20,7 +20,10 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import java.util.Collection; import java.util.Collection;
import java.util.Random; import java.util.Random;
/**
* random selector
* @param <T> T
*/
public class RandomSelector<T> implements Selector<T> { public class RandomSelector<T> implements Selector<T> {
private final Random random = new Random(); private final Random random = new Random();
@ -32,11 +35,17 @@ public class RandomSelector<T> implements Selector<T> {
throw new IllegalArgumentException("Empty source."); throw new IllegalArgumentException("Empty source.");
} }
/**
* if only one , return directly
*/
if (source.size() == 1) { if (source.size() == 1) {
return (T) source.toArray()[0]; return (T) source.toArray()[0];
} }
int size = source.size(); int size = source.size();
/**
* random select
*/
int randomIndex = random.nextInt(size); int randomIndex = random.nextInt(size);
return (T) source.toArray()[randomIndex]; return (T) source.toArray()[randomIndex];

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java

@ -21,6 +21,10 @@ import org.springframework.stereotype.Service;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/**
* round robin selector
* @param <T> T
*/
@Service @Service
public class RoundRobinSelector<T> implements Selector<T> { public class RoundRobinSelector<T> implements Selector<T> {
@ -32,11 +36,17 @@ public class RoundRobinSelector<T> implements Selector<T> {
throw new IllegalArgumentException("Empty source."); throw new IllegalArgumentException("Empty source.");
} }
/**
* if only one , return directly
*/
if (source.size() == 1) { if (source.size() == 1) {
return (T)source.toArray()[0]; return (T)source.toArray()[0];
} }
int size = source.size(); int size = source.size();
/**
* round robin
*/
return (T) source.toArray()[index.getAndIncrement() % size]; return (T) source.toArray()[index.getAndIncrement() % size];
} }
} }

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java

@ -20,7 +20,16 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import java.util.Collection; import java.util.Collection;
/**
* selector
* @param <T> T
*/
public interface Selector<T> { public interface Selector<T> {
/**
* select
* @param source source
* @return T
*/
T select(Collection<T> source); T select(Collection<T> source);
} }

29
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java

@ -29,6 +29,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/**
* task fulture
*/
public class TaskFuture { public class TaskFuture {
private final static Logger LOGGER = LoggerFactory.getLogger(TaskFuture.class); private final static Logger LOGGER = LoggerFactory.getLogger(TaskFuture.class);
@ -139,19 +142,6 @@ public class TaskFuture {
} }
@Override
public String toString() {
return "ResponseFuture{" +
"opaque=" + opaque +
", timeoutMillis=" + timeoutMillis +
", latch=" + latch +
", beginTimestamp=" + beginTimestamp +
", responseCommand=" + responseCommand +
", sendOk=" + sendOk +
", cause=" + cause +
'}';
}
/** /**
* scan future table * scan future table
*/ */
@ -168,4 +158,17 @@ public class TaskFuture {
} }
} }
} }
@Override
public String toString() {
return "TaskFuture{" +
"opaque=" + opaque +
", timeoutMillis=" + timeoutMillis +
", latch=" + latch +
", beginTimestamp=" + beginTimestamp +
", responseCommand=" + responseCommand +
", sendOk=" + sendOk +
", cause=" + cause +
'}';
}
} }

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java

@ -46,11 +46,19 @@ public class TaskAckProcessor implements NettyRequestProcessor {
this.processService = SpringApplicationContext.getBean(ProcessService.class); this.processService = SpringApplicationContext.getBean(ProcessService.class);
} }
/**
* task ack process
* @param channel channel channel
* @param command command ExecuteTaskAckCommand
*/
@Override @Override
public void process(Channel channel, Command command) { public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK == command.getType(), String.format("invalid command type : %s", command.getType())); Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class); ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class);
logger.info("taskAckCommand : {}",taskAckCommand); logger.info("taskAckCommand : {}",taskAckCommand);
/**
* change Task state
*/
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()), processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
taskAckCommand.getStartTime(), taskAckCommand.getStartTime(),
taskAckCommand.getHost(), taskAckCommand.getHost(),

71
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java

@ -33,37 +33,80 @@ import java.util.Set;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
/**
* zookeeper node manager
*/
@Service @Service
public class ZookeeperNodeManager implements InitializingBean { public class ZookeeperNodeManager implements InitializingBean {
private final Logger logger = LoggerFactory.getLogger(ZookeeperNodeManager.class); private final Logger logger = LoggerFactory.getLogger(ZookeeperNodeManager.class);
/**
* master lock
*/
private final Lock masterLock = new ReentrantLock(); private final Lock masterLock = new ReentrantLock();
/**
* worker lock
*/
private final Lock workerLock = new ReentrantLock(); private final Lock workerLock = new ReentrantLock();
/**
* worker nodes
*/
private final Set<String> workerNodes = new HashSet<>(); private final Set<String> workerNodes = new HashSet<>();
/**
* master nodes
*/
private final Set<String> masterNodes = new HashSet<>(); private final Set<String> masterNodes = new HashSet<>();
/**
* zookeeper registry center
*/
@Autowired @Autowired
private ZookeeperRegistryCenter registryCenter; private ZookeeperRegistryCenter registryCenter;
/**
* init listener
* @throws Exception
*/
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
/**
* load nodes from zookeeper
*/
load(); load();
/**
* init MasterNodeListener listener
*/
registryCenter.getZookeeperCachedOperator().addListener(new MasterNodeListener()); registryCenter.getZookeeperCachedOperator().addListener(new MasterNodeListener());
/**
* init WorkerNodeListener listener
*/
registryCenter.getZookeeperCachedOperator().addListener(new WorkerNodeListener()); registryCenter.getZookeeperCachedOperator().addListener(new WorkerNodeListener());
} }
/**
* load nodes from zookeeper
*/
private void load(){ private void load(){
Set<String> schedulerNodes = registryCenter.getMasterNodesDirectly(); /**
syncMasterNodes(schedulerNodes); * master nodes from zookeeper
*/
Set<String> masterNodes = registryCenter.getMasterNodesDirectly();
syncMasterNodes(masterNodes);
/**
* worker nodes from zookeeper
*/
Set<String> workersNodes = registryCenter.getWorkerNodesDirectly(); Set<String> workersNodes = registryCenter.getWorkerNodesDirectly();
syncWorkerNodes(workersNodes); syncWorkerNodes(workersNodes);
} }
/**
* worker node listener
*/
class WorkerNodeListener extends AbstractListener { class WorkerNodeListener extends AbstractListener {
@Override @Override
@ -91,6 +134,9 @@ public class ZookeeperNodeManager implements InitializingBean {
} }
/**
* master node listener
*/
class MasterNodeListener extends AbstractListener { class MasterNodeListener extends AbstractListener {
@Override @Override
@ -115,6 +161,10 @@ public class ZookeeperNodeManager implements InitializingBean {
} }
} }
/**
* get master nodes
* @return master nodes
*/
public Set<String> getMasterNodes() { public Set<String> getMasterNodes() {
masterLock.lock(); masterLock.lock();
try { try {
@ -124,6 +174,10 @@ public class ZookeeperNodeManager implements InitializingBean {
} }
} }
/**
* sync master nodes
* @param nodes master nodes
*/
private void syncMasterNodes(Set<String> nodes){ private void syncMasterNodes(Set<String> nodes){
masterLock.lock(); masterLock.lock();
try { try {
@ -134,6 +188,10 @@ public class ZookeeperNodeManager implements InitializingBean {
} }
} }
/**
* sync worker nodes
* @param nodes worker nodes
*/
private void syncWorkerNodes(Set<String> nodes){ private void syncWorkerNodes(Set<String> nodes){
workerLock.lock(); workerLock.lock();
try { try {
@ -144,6 +202,10 @@ public class ZookeeperNodeManager implements InitializingBean {
} }
} }
/**
* get worker nodes
* @return worker nodes
*/
public Set<String> getWorkerNodes(){ public Set<String> getWorkerNodes(){
workerLock.lock(); workerLock.lock();
try { try {
@ -153,6 +215,9 @@ public class ZookeeperNodeManager implements InitializingBean {
} }
} }
/**
* close
*/
public void close(){ public void close(){
registryCenter.close(); registryCenter.close();
} }

61
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java

@ -27,17 +27,32 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/**
* zookeeper register center
*/
@Service @Service
public class ZookeeperRegistryCenter implements InitializingBean { public class ZookeeperRegistryCenter implements InitializingBean {
private final AtomicBoolean isStarted = new AtomicBoolean(false); private final AtomicBoolean isStarted = new AtomicBoolean(false);
/**
* namespace
*/
public static final String NAMESPACE = "/dolphinscheduler"; public static final String NAMESPACE = "/dolphinscheduler";
/**
* nodes namespace
*/
public static final String NODES = NAMESPACE + "/nodes"; public static final String NODES = NAMESPACE + "/nodes";
/**
* master path
*/
public static final String MASTER_PATH = NODES + "/master"; public static final String MASTER_PATH = NODES + "/master";
/**
* worker path
*/
public static final String WORKER_PATH = NODES + "/worker"; public static final String WORKER_PATH = NODES + "/worker";
public static final String EMPTY = ""; public static final String EMPTY = "";
@ -50,19 +65,26 @@ public class ZookeeperRegistryCenter implements InitializingBean {
init(); init();
} }
/**
* init node persist
*/
public void init() { public void init() {
if (isStarted.compareAndSet(false, true)) { if (isStarted.compareAndSet(false, true)) {
//TODO
// zookeeperCachedOperator.start(NODES);
initNodes(); initNodes();
} }
} }
/**
* init nodes
*/
private void initNodes() { private void initNodes() {
zookeeperCachedOperator.persist(MASTER_PATH, EMPTY); zookeeperCachedOperator.persist(MASTER_PATH, EMPTY);
zookeeperCachedOperator.persist(WORKER_PATH, EMPTY); zookeeperCachedOperator.persist(WORKER_PATH, EMPTY);
} }
/**
* close
*/
public void close() { public void close() {
if (isStarted.compareAndSet(true, false)) { if (isStarted.compareAndSet(true, false)) {
if (zookeeperCachedOperator != null) { if (zookeeperCachedOperator != null) {
@ -71,36 +93,71 @@ public class ZookeeperRegistryCenter implements InitializingBean {
} }
} }
/**
* get master path
* @return master path
*/
public String getMasterPath() { public String getMasterPath() {
return MASTER_PATH; return MASTER_PATH;
} }
/**
* get worker path
* @return worker path
*/
public String getWorkerPath() { public String getWorkerPath() {
return WORKER_PATH; return WORKER_PATH;
} }
/**
* get master nodes directly
* @return master nodes
*/
public Set<String> getMasterNodesDirectly() { public Set<String> getMasterNodesDirectly() {
List<String> masters = getChildrenKeys(MASTER_PATH); List<String> masters = getChildrenKeys(MASTER_PATH);
return new HashSet<>(masters); return new HashSet<>(masters);
} }
/**
* get worker nodes directly
* @return master nodes
*/
public Set<String> getWorkerNodesDirectly() { public Set<String> getWorkerNodesDirectly() {
List<String> workers = getChildrenKeys(WORKER_PATH); List<String> workers = getChildrenKeys(WORKER_PATH);
return new HashSet<>(workers); return new HashSet<>(workers);
} }
/**
* whether worker path
* @param path path
* @return result
*/
public boolean isWorkerPath(String path) { public boolean isWorkerPath(String path) {
return path != null && path.contains(WORKER_PATH); return path != null && path.contains(WORKER_PATH);
} }
/**
* whether master path
* @param path path
* @return result
*/
public boolean isMasterPath(String path) { public boolean isMasterPath(String path) {
return path != null && path.contains(MASTER_PATH); return path != null && path.contains(MASTER_PATH);
} }
/**
* get children nodes
* @param key key
* @return children nodes
*/
public List<String> getChildrenKeys(final String key) { public List<String> getChildrenKeys(final String key) {
return zookeeperCachedOperator.getChildrenKeys(key); return zookeeperCachedOperator.getChildrenKeys(key);
} }
/**
* get zookeeperCachedOperator
* @return zookeeperCachedOperator
*/
public ZookeeperCachedOperator getZookeeperCachedOperator() { public ZookeeperCachedOperator getZookeeperCachedOperator() {
return zookeeperCachedOperator; return zookeeperCachedOperator;
} }

Loading…
Cancel
Save