From 6053f9b2dfceaf7ebfe2a3d9e5ba9ed40cd6f62b Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Mon, 24 Feb 2020 11:40:24 +0800 Subject: [PATCH] add comment (#2006) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment --- .../dolphinscheduler/remote/utils/Host.java | 36 ++++++--- .../master/dispatch/ExecutorDispatcher.java | 51 ++++++++++--- .../dispatch/context/ExecutionContext.java | 12 +++ .../master/dispatch/enums/ExecutorType.java | 4 +- .../dispatch/exceptions/ExecuteException.java | 4 +- .../executor/AbstractExecutorManager.java | 21 ++++-- .../dispatch/executor/ExecutorManager.java | 23 +++++- .../executor/NettyExecutorManager.java | 75 ++++++++++++++++--- .../master/dispatch/host/HostManager.java | 8 ++ .../dispatch/host/RoundRobinHostManager.java | 23 ++++++ .../dispatch/host/assign/RandomSelector.java | 11 ++- .../host/assign/RoundRobinSelector.java | 10 +++ .../master/dispatch/host/assign/Selector.java | 9 +++ .../server/master/future/TaskFuture.java | 29 +++---- .../master/processor/TaskAckProcessor.java | 8 ++ .../server/registry/ZookeeperNodeManager.java | 71 +++++++++++++++++- .../registry/ZookeeperRegistryCenter.java | 61 ++++++++++++++- 17 files changed, 397 insertions(+), 59 deletions(-) diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java index f53c611dee..fde683061a 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java @@ -24,10 +24,19 @@ import java.util.Objects; */ public class Host implements Serializable { + /** + * address + */ private String address; + /** + * ip + */ private String ip; + /** + * port + */ private int port; public Host() { @@ -65,6 +74,11 @@ public class Host implements Serializable { this.address = ip + ":" + port; } + /** + * address convert host + * @param address address + * @return host + */ public static Host of(String address){ String[] parts = address.split(":"); if (parts.length != 2) { @@ -74,17 +88,14 @@ public class Host implements Serializable { return host; } - @Override - public String toString() { - return "Host{" + - "address='" + address + '\'' + - '}'; - } - @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } Host host = (Host) o; return Objects.equals(getAddress(), host.getAddress()); } @@ -93,4 +104,11 @@ public class Host implements Serializable { public int hashCode() { return Objects.hash(getAddress()); } + + @Override + public String toString() { + return "Host{" + + "address='" + address + '\'' + + '}'; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java index 2fd303af55..01fb840303 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java @@ -32,12 +32,21 @@ import org.springframework.stereotype.Service; import java.util.concurrent.ConcurrentHashMap; +/** + * executor dispatcher + */ @Service public class ExecutorDispatcher implements InitializingBean { + /** + * netty executor manager + */ @Autowired private NettyExecutorManager nettyExecutorManager; + /** + * round robin host manager + */ @Autowired private RoundRobinHostManager hostManager; @@ -47,30 +56,54 @@ public class ExecutorDispatcher implements InitializingBean { this.executorManagers = new ConcurrentHashMap<>(); } - public void dispatch(final ExecutionContext executeContext) throws ExecuteException { - ExecutorManager executorManager = this.executorManagers.get(executeContext.getExecutorType()); + /** + * task dispatch + * @param context context + * @throws ExecuteException + */ + public void dispatch(final ExecutionContext context) throws ExecuteException { + /** + * get executor manager + */ + ExecutorManager executorManager = this.executorManagers.get(context.getExecutorType()); if(executorManager == null){ - throw new ExecuteException("no ExecutorManager for type : " + executeContext.getExecutorType()); + throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType()); } - Host host = hostManager.select(executeContext); + + /** + * host select + */ + Host host = hostManager.select(context); if (StringUtils.isEmpty(host.getAddress())) { - throw new ExecuteException(String.format("fail to execute : %s due to no worker ", executeContext.getContext())); + throw new ExecuteException(String.format("fail to execute : %s due to no worker ", context.getContext())); } - executeContext.setHost(host); - executorManager.beforeExecute(executeContext); + context.setHost(host); + executorManager.beforeExecute(context); try { - executorManager.execute(executeContext); + /** + * task execute + */ + executorManager.execute(context); } finally { - executorManager.afterExecute(executeContext); + executorManager.afterExecute(context); } } + /** + * register init + * @throws Exception + */ @Override public void afterPropertiesSet() throws Exception { register(ExecutorType.WORKER, nettyExecutorManager); register(ExecutorType.CLIENT, nettyExecutorManager); } + /** + * register + * @param type executor type + * @param executorManager executorManager + */ public void register(ExecutorType type, ExecutorManager executorManager){ executorManagers.put(type, executorManager); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java index 4bccba0d7a..14c7d9f167 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java @@ -20,12 +20,24 @@ package org.apache.dolphinscheduler.server.master.dispatch.context; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; +/** + * execution context + */ public class ExecutionContext { + /** + * host + */ private Host host; + /** + * context + */ private final Object context; + /** + * executor type : worker or client + */ private final ExecutorType executorType; public ExecutionContext(Object context, ExecutorType executorType) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java index 70aaeaeda2..03be62e701 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java @@ -16,7 +16,9 @@ */ package org.apache.dolphinscheduler.server.master.dispatch.enums; - +/** + * executor type + */ public enum ExecutorType { WORKER, diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java index d8ca50a9f9..8a441b9de1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java @@ -17,7 +17,9 @@ package org.apache.dolphinscheduler.server.master.dispatch.exceptions; - +/** + * execute exception + */ public class ExecuteException extends Exception{ public ExecuteException() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java index 65ed15eb50..e1f0c3c976 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java @@ -20,17 +20,26 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; - +/** + * abstract executor manager + */ public abstract class AbstractExecutorManager implements ExecutorManager{ + /** + * before execute , add time monitor , timeout + * @param context context + * @throws ExecuteException + */ @Override - public void beforeExecute(ExecutionContext executeContext) throws ExecuteException { - //TODO add time monitor + public void beforeExecute(ExecutionContext context) throws ExecuteException { } + /** + * after execute , add dispatch monitor + * @param context context + * @throws ExecuteException + */ @Override - public void afterExecute(ExecutionContext executeContext) throws ExecuteException { - //TODO add dispatch monitor - + public void afterExecute(ExecutionContext context) throws ExecuteException { } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java index 98d391e7ea..1d78d2f08f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java @@ -20,12 +20,29 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; - +/** + * executor manager + */ public interface ExecutorManager { + /** + * before execute + * @param executeContext executeContext + * @throws ExecuteException + */ void beforeExecute(ExecutionContext executeContext) throws ExecuteException; - void execute(ExecutionContext executeContext) throws ExecuteException; + /** + * execute task + * @param context context + * @throws ExecuteException + */ + void execute(ExecutionContext context) throws ExecuteException; - void afterExecute(ExecutionContext executeContext) throws ExecuteException; + /** + * after execute + * @param context context + * @throws ExecuteException + */ + void afterExecute(ExecutionContext context) throws ExecuteException; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index e24bbe769a..e07bea4546 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -42,47 +42,78 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; - +/** + * netty executor manager + */ @Service public class NettyExecutorManager extends AbstractExecutorManager{ private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class); + /** + * zookeeper node manager + */ @Autowired private ZookeeperNodeManager zookeeperNodeManager; + /** + * netty remote client + */ private final NettyRemotingClient nettyRemotingClient; public NettyExecutorManager(){ final NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); + /** + * register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor + * register EXECUTE_TASK_ACK command type TaskAckProcessor + */ this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor()); this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor()); } + /** + * execute logic + * @param context context + * @throws ExecuteException + */ @Override - public void execute(ExecutionContext executeContext) throws ExecuteException { - Set allNodes = getAllNodes(executeContext); + public void execute(ExecutionContext context) throws ExecuteException { + + /** + * all nodes + */ + Set allNodes = getAllNodes(context); + + /** + * fail nodes + */ Set failNodeSet = new HashSet<>(); - // - Command command = buildCommand(executeContext); - Host host = executeContext.getHost(); + + /** + * build command accord executeContext + */ + Command command = buildCommand(context); + + /** + * execute task host + */ + Host host = context.getHost(); boolean success = false; - // while (!success) { try { - doExecute(host, command); + doExecute(host,command); success = true; - executeContext.setHost(host); + context.setHost(host); } catch (ExecuteException ex) { - logger.error(String.format("execute context : %s error", executeContext.getContext()), ex); + logger.error(String.format("execute context : %s error", context.getContext()), ex); try { failNodeSet.add(host.getAddress()); Set tmpAllIps = new HashSet<>(allNodes); Collection remained = CollectionUtils.subtract(tmpAllIps, failNodeSet); if (remained != null && remained.size() > 0) { host = Host.of(remained.iterator().next()); - logger.error("retry execute context : {} host : {}", executeContext.getContext(), host); + logger.error("retry execute context : {} host : {}", context.getContext(), host); } else { throw new ExecuteException("fail after try all nodes"); } @@ -93,6 +124,11 @@ public class NettyExecutorManager extends AbstractExecutorManager{ } } + /** + * build command + * @param context context + * @return command + */ private Command buildCommand(ExecutionContext context) { ExecuteTaskRequestCommand requestCommand = new ExecuteTaskRequestCommand(); ExecutorType executorType = context.getExecutorType(); @@ -110,7 +146,16 @@ public class NettyExecutorManager extends AbstractExecutorManager{ return requestCommand.convert2Command(); } + /** + * execute logic + * @param host host + * @param command command + * @throws ExecuteException + */ private void doExecute(final Host host, final Command command) throws ExecuteException { + /** + * retry count,default retry 3 + */ int retryCount = 3; boolean success = false; do { @@ -131,8 +176,16 @@ public class NettyExecutorManager extends AbstractExecutorManager{ } } + /** + * get all nodes + * @param context context + * @return nodes + */ private Set getAllNodes(ExecutionContext context){ Set nodes = Collections.EMPTY_SET; + /** + * executor type + */ ExecutorType executorType = context.getExecutorType(); switch (executorType){ case WORKER: diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java index 87082738da..ec65cabb0b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java @@ -21,8 +21,16 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; +/** + * host manager + */ public interface HostManager { + /** + * select host + * @param context context + * @return host + */ Host select(ExecutionContext context); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java index 1c222b84af..3bb001e842 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java @@ -34,24 +34,44 @@ import java.util.Collection; import java.util.List; +/** + * round robin host manager + */ @Service public class RoundRobinHostManager implements HostManager { private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class); + /** + * zookeeperNodeManager + */ @Autowired private ZookeeperNodeManager zookeeperNodeManager; + /** + * selector + */ private final Selector selector; + /** + * set round robin + */ public RoundRobinHostManager(){ this.selector = new RoundRobinSelector<>(); } + /** + * select host + * @param context context + * @return host + */ @Override public Host select(ExecutionContext context){ Host host = new Host(); Collection nodes = null; + /** + * executor type + */ ExecutorType executorType = context.getExecutorType(); switch (executorType){ case WORKER: @@ -69,6 +89,9 @@ public class RoundRobinHostManager implements HostManager { List candidateHosts = new ArrayList<>(nodes.size()); nodes.stream().forEach(node -> candidateHosts.add(Host.of(node))); + /** + * select + */ return selector.select(candidateHosts); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java index cf8c0e84d4..be52fcb1cf 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java @@ -20,7 +20,10 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign; import java.util.Collection; import java.util.Random; - +/** + * random selector + * @param T + */ public class RandomSelector implements Selector { private final Random random = new Random(); @@ -32,11 +35,17 @@ public class RandomSelector implements Selector { throw new IllegalArgumentException("Empty source."); } + /** + * if only one , return directly + */ if (source.size() == 1) { return (T) source.toArray()[0]; } int size = source.size(); + /** + * random select + */ int randomIndex = random.nextInt(size); return (T) source.toArray()[randomIndex]; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java index 90319de122..1eb30c8d5a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java @@ -21,6 +21,10 @@ import org.springframework.stereotype.Service; import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; +/** + * round robin selector + * @param T + */ @Service public class RoundRobinSelector implements Selector { @@ -32,11 +36,17 @@ public class RoundRobinSelector implements Selector { throw new IllegalArgumentException("Empty source."); } + /** + * if only one , return directly + */ if (source.size() == 1) { return (T)source.toArray()[0]; } int size = source.size(); + /** + * round robin + */ return (T) source.toArray()[index.getAndIncrement() % size]; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java index bd7c4ac5b9..08649819a0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java @@ -20,7 +20,16 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign; import java.util.Collection; +/** + * selector + * @param T + */ public interface Selector { + /** + * select + * @param source source + * @return T + */ T select(Collection source); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java index 32fb55facf..0c6d7402be 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java @@ -29,6 +29,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +/** + * task fulture + */ public class TaskFuture { private final static Logger LOGGER = LoggerFactory.getLogger(TaskFuture.class); @@ -139,19 +142,6 @@ public class TaskFuture { } - @Override - public String toString() { - return "ResponseFuture{" + - "opaque=" + opaque + - ", timeoutMillis=" + timeoutMillis + - ", latch=" + latch + - ", beginTimestamp=" + beginTimestamp + - ", responseCommand=" + responseCommand + - ", sendOk=" + sendOk + - ", cause=" + cause + - '}'; - } - /** * scan future table */ @@ -168,4 +158,17 @@ public class TaskFuture { } } } + + @Override + public String toString() { + return "TaskFuture{" + + "opaque=" + opaque + + ", timeoutMillis=" + timeoutMillis + + ", latch=" + latch + + ", beginTimestamp=" + beginTimestamp + + ", responseCommand=" + responseCommand + + ", sendOk=" + sendOk + + ", cause=" + cause + + '}'; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java index f5f2123b71..83da3b03ee 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java @@ -46,11 +46,19 @@ public class TaskAckProcessor implements NettyRequestProcessor { this.processService = SpringApplicationContext.getBean(ProcessService.class); } + /** + * task ack process + * @param channel channel channel + * @param command command ExecuteTaskAckCommand + */ @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK == command.getType(), String.format("invalid command type : %s", command.getType())); ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class); logger.info("taskAckCommand : {}",taskAckCommand); + /** + * change Task state + */ processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()), taskAckCommand.getStartTime(), taskAckCommand.getHost(), diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java index c7a2d0bdfd..1d6808d51e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java @@ -33,37 +33,80 @@ import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; - +/** + * zookeeper node manager + */ @Service public class ZookeeperNodeManager implements InitializingBean { private final Logger logger = LoggerFactory.getLogger(ZookeeperNodeManager.class); + /** + * master lock + */ private final Lock masterLock = new ReentrantLock(); + /** + * worker lock + */ private final Lock workerLock = new ReentrantLock(); + /** + * worker nodes + */ private final Set workerNodes = new HashSet<>(); + /** + * master nodes + */ private final Set masterNodes = new HashSet<>(); + /** + * zookeeper registry center + */ @Autowired private ZookeeperRegistryCenter registryCenter; + /** + * init listener + * @throws Exception + */ @Override public void afterPropertiesSet() throws Exception { + /** + * load nodes from zookeeper + */ load(); + /** + * init MasterNodeListener listener + */ registryCenter.getZookeeperCachedOperator().addListener(new MasterNodeListener()); + /** + * init WorkerNodeListener listener + */ registryCenter.getZookeeperCachedOperator().addListener(new WorkerNodeListener()); } + /** + * load nodes from zookeeper + */ private void load(){ - Set schedulerNodes = registryCenter.getMasterNodesDirectly(); - syncMasterNodes(schedulerNodes); + /** + * master nodes from zookeeper + */ + Set masterNodes = registryCenter.getMasterNodesDirectly(); + syncMasterNodes(masterNodes); + + /** + * worker nodes from zookeeper + */ Set workersNodes = registryCenter.getWorkerNodesDirectly(); syncWorkerNodes(workersNodes); } + /** + * worker node listener + */ class WorkerNodeListener extends AbstractListener { @Override @@ -91,6 +134,9 @@ public class ZookeeperNodeManager implements InitializingBean { } + /** + * master node listener + */ class MasterNodeListener extends AbstractListener { @Override @@ -115,6 +161,10 @@ public class ZookeeperNodeManager implements InitializingBean { } } + /** + * get master nodes + * @return master nodes + */ public Set getMasterNodes() { masterLock.lock(); try { @@ -124,6 +174,10 @@ public class ZookeeperNodeManager implements InitializingBean { } } + /** + * sync master nodes + * @param nodes master nodes + */ private void syncMasterNodes(Set nodes){ masterLock.lock(); try { @@ -134,6 +188,10 @@ public class ZookeeperNodeManager implements InitializingBean { } } + /** + * sync worker nodes + * @param nodes worker nodes + */ private void syncWorkerNodes(Set nodes){ workerLock.lock(); try { @@ -144,6 +202,10 @@ public class ZookeeperNodeManager implements InitializingBean { } } + /** + * get worker nodes + * @return worker nodes + */ public Set getWorkerNodes(){ workerLock.lock(); try { @@ -153,6 +215,9 @@ public class ZookeeperNodeManager implements InitializingBean { } } + /** + * close + */ public void close(){ registryCenter.close(); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java index 3364a94a23..7d7e2efb83 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java @@ -27,17 +27,32 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +/** + * zookeeper register center + */ @Service public class ZookeeperRegistryCenter implements InitializingBean { private final AtomicBoolean isStarted = new AtomicBoolean(false); + /** + * namespace + */ public static final String NAMESPACE = "/dolphinscheduler"; + /** + * nodes namespace + */ public static final String NODES = NAMESPACE + "/nodes"; + /** + * master path + */ public static final String MASTER_PATH = NODES + "/master"; + /** + * worker path + */ public static final String WORKER_PATH = NODES + "/worker"; public static final String EMPTY = ""; @@ -50,19 +65,26 @@ public class ZookeeperRegistryCenter implements InitializingBean { init(); } + /** + * init node persist + */ public void init() { if (isStarted.compareAndSet(false, true)) { - //TODO -// zookeeperCachedOperator.start(NODES); initNodes(); } } + /** + * init nodes + */ private void initNodes() { zookeeperCachedOperator.persist(MASTER_PATH, EMPTY); zookeeperCachedOperator.persist(WORKER_PATH, EMPTY); } + /** + * close + */ public void close() { if (isStarted.compareAndSet(true, false)) { if (zookeeperCachedOperator != null) { @@ -71,36 +93,71 @@ public class ZookeeperRegistryCenter implements InitializingBean { } } + /** + * get master path + * @return master path + */ public String getMasterPath() { return MASTER_PATH; } + /** + * get worker path + * @return worker path + */ public String getWorkerPath() { return WORKER_PATH; } + /** + * get master nodes directly + * @return master nodes + */ public Set getMasterNodesDirectly() { List masters = getChildrenKeys(MASTER_PATH); return new HashSet<>(masters); } + /** + * get worker nodes directly + * @return master nodes + */ public Set getWorkerNodesDirectly() { List workers = getChildrenKeys(WORKER_PATH); return new HashSet<>(workers); } + /** + * whether worker path + * @param path path + * @return result + */ public boolean isWorkerPath(String path) { return path != null && path.contains(WORKER_PATH); } + /** + * whether master path + * @param path path + * @return result + */ public boolean isMasterPath(String path) { return path != null && path.contains(MASTER_PATH); } + /** + * get children nodes + * @param key key + * @return children nodes + */ public List getChildrenKeys(final String key) { return zookeeperCachedOperator.getChildrenKeys(key); } + /** + * get zookeeperCachedOperator + * @return zookeeperCachedOperator + */ public ZookeeperCachedOperator getZookeeperCachedOperator() { return zookeeperCachedOperator; }