From 4c12c2ad709dea7330317b507a2f18965583e828 Mon Sep 17 00:00:00 2001 From: Shiwen Cheng Date: Fri, 16 Apr 2021 00:26:48 +0800 Subject: [PATCH] [1.3.6-prepare][Fix-5176-5275][Server] Fix wrong excludeFilters and move zk/ZKMasterClient and registry/ServerNodeManager into master #5278 (#5276) * [1.3.6-prepare][Improvement-5275][Server] Move zk/ZKMasterClient and registry/ServerNodeManager into master * [1.3.6-prepare][Improvement-5275][Server] Fix code smells * [Improvement-5275][Server] Add ZKMasterClientTest --- .../server/master/MasterServer.java | 5 +- .../executor/NettyExecutorManager.java | 2 +- .../dispatch/host/CommonHostManager.java | 2 +- .../registry/ServerNodeManager.java | 3 +- .../master/runner/MasterSchedulerService.java | 2 +- .../{ => master}/zk/ZKMasterClient.java | 23 +++--- .../server/worker/WorkerServer.java | 4 +- .../TaskPriorityQueueConsumerTest.java | 4 +- .../dispatch/ExecutorDispatcherTest.java | 2 +- .../executor/NettyExecutorManagerTest.java | 2 +- .../host/RoundRobinHostManagerTest.java | 2 +- .../queue/TaskResponseServiceTest.java | 2 +- .../registry/ServerNodeManagerTest.java | 30 +++---- .../runner/MasterTaskExecThreadTest.java | 2 +- .../server/master/zk/ZKMasterClientTest.java | 78 +++++++++++++++++++ .../server/registry/DependencyConfig.java | 68 ++++++++++------ .../processor/TaskCallbackServiceTest.java | 2 +- pom.xml | 3 +- 18 files changed, 165 insertions(+), 71 deletions(-) rename dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/{ => master}/registry/ServerNodeManager.java (98%) rename dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/{ => master}/zk/ZKMasterClient.java (96%) rename dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/{ => master}/registry/ServerNodeManagerTest.java (84%) create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClientTest.java diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 6c05554d8b..016c8c980e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -28,7 +28,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; -import org.apache.dolphinscheduler.server.zk.ZKMasterClient; +import org.apache.dolphinscheduler.server.master.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; @@ -117,7 +117,8 @@ public class MasterServer implements IStoppable { this.nettyRemotingServer.start(); // self tolerant - this.zkMasterClient.start(this); + this.zkMasterClient.start(); + this.zkMasterClient.setStoppable(this); // scheduler start this.masterSchedulerService.start(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index c2cb677445..d5b5cdb7df 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -28,7 +28,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; -import org.apache.dolphinscheduler.server.registry.ServerNodeManager; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.commons.collections.CollectionUtils; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java index fbc510ede5..62be46d352 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java @@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; -import org.apache.dolphinscheduler.server.registry.ServerNodeManager; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import java.util.ArrayList; import java.util.Collection; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ServerNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java similarity index 98% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ServerNodeManager.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index a487c21218..d713c8366f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ServerNodeManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.registry; +package org.apache.dolphinscheduler.server.master.registry; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ZKNodeType; @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.service.zk.AbstractListener; import org.apache.dolphinscheduler.service.zk.AbstractZKClient; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 2462f203f8..6774d95932 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.zk.ZKMasterClient; +import org.apache.dolphinscheduler.server.master.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.curator.framework.imps.CuratorFrameworkState; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java similarity index 96% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java index cbabbc4c8c..8daf8a153a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.zk; +package org.apache.dolphinscheduler.server.master.zk; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; @@ -73,7 +73,7 @@ public class ZKMasterClient extends AbstractZKClient { @Autowired private MasterRegistry masterRegistry; - public void start(IStoppable stoppable) { + public void start() { InterProcessMutex mutex = null; try { // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master @@ -83,7 +83,6 @@ public class ZKMasterClient extends AbstractZKClient { // master registry masterRegistry.registry(); - masterRegistry.getZookeeperRegistryCenter().setStoppable(stoppable); String registryPath = this.masterRegistry.getMasterPath(); masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registryPath, ZKNodeType.MASTER, Constants.DELETE_ZK_OP); @@ -106,6 +105,10 @@ public class ZKMasterClient extends AbstractZKClient { } } + public void setStoppable(IStoppable stoppable) { + masterRegistry.getZookeeperRegistryCenter().setStoppable(stoppable); + } + @Override public void close() { masterRegistry.unRegistry(); @@ -138,7 +141,7 @@ public class ZKMasterClient extends AbstractZKClient { * @param failover is failover */ private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) { - logger.info("{} node deleted : {}", zkNodeType.toString(), path); + logger.info("{} node deleted : {}", zkNodeType, path); InterProcessMutex mutex = null; try { String failoverPath = getFailoverLockPath(zkNodeType); @@ -161,7 +164,7 @@ public class ZKMasterClient extends AbstractZKClient { failoverServerWhenDown(serverHost, zkNodeType); } } catch (Exception e) { - logger.error("{} server failover failed.", zkNodeType.toString()); + logger.error("{} server failover failed.", zkNodeType); logger.error("failover exception ", e); } finally { releaseMutex(mutex); @@ -173,15 +176,15 @@ public class ZKMasterClient extends AbstractZKClient { * * @param serverHost server host * @param zkNodeType zookeeper node type - * @throws Exception exception */ - private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { + private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) { switch (zkNodeType) { case MASTER: failoverMaster(serverHost); break; case WORKER: failoverWorker(serverHost, true); + break; default: break; } @@ -194,7 +197,6 @@ public class ZKMasterClient extends AbstractZKClient { * @return fail over lock path */ private String getFailoverLockPath(ZKNodeType zkNodeType) { - switch (zkNodeType) { case MASTER: return getMasterFailoverLockPath(); @@ -250,7 +252,7 @@ public class ZKMasterClient extends AbstractZKClient { * @param taskInstance task instance * @return true if task instance need fail over */ - private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception { + private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) { boolean taskNeedFailover = true; @@ -312,9 +314,8 @@ public class ZKMasterClient extends AbstractZKClient { * * @param workerHost worker host * @param needCheckWorkerAlive need check worker alive - * @throws Exception exception */ - private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { + private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) { logger.info("start worker[{}] failover ...", workerHost); List needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index bd8f022a14..2ecd375cc9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -53,9 +53,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement; @ComponentScan.Filter(type = FilterType.REGEX, pattern = { "org.apache.dolphinscheduler.server.master.*", "org.apache.dolphinscheduler.server.monitor.*", - "org.apache.dolphinscheduler.server.log.*", - "org.apache.dolphinscheduler.server.zk.ZKMasterClient", - "org.apache.dolphinscheduler.server.registry.ServerNodeManager" + "org.apache.dolphinscheduler.server.log.*" }) }) @EnableTransactionManagement diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index 6b0841b37e..9bcbf0eb6b 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -38,11 +38,11 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; +import org.apache.dolphinscheduler.server.master.zk.ZKMasterClient; import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.SpringZKServer; -import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriority; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java index b5d6f27448..796970f6f9 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java @@ -23,8 +23,8 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java index b1fd4e8342..d1600b0ee8 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java @@ -30,8 +30,8 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java index 8923547214..5f4a4ecc67 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java @@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; -import org.apache.dolphinscheduler.server.registry.ServerNodeManager; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils; import org.junit.Assert; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index 094684835c..d47a115780 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.server.master.processor.queue; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ServerNodeManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManagerTest.java similarity index 84% rename from dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ServerNodeManagerTest.java rename to dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManagerTest.java index 867a15c7e4..1b94174ea6 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ServerNodeManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManagerTest.java @@ -15,13 +15,14 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.registry; +package org.apache.dolphinscheduler.server.master.registry; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; +import org.apache.dolphinscheduler.server.registry.DependencyConfig; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.zk.SpringZKServer; @@ -42,7 +43,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; * server node manager test */ @RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, MasterRegistry.class,WorkerRegistry.class, +@ContextConfiguration(classes = {DependencyConfig.class, SpringZKServer.class, MasterRegistry.class,WorkerRegistry.class, ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, SpringConnectionFactory.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, ServerNodeManager.class}) public class ServerNodeManagerTest { @@ -56,9 +57,6 @@ public class ServerNodeManagerTest { @Autowired private WorkerRegistry workerRegistry; - @Autowired - private ZookeeperRegistryCenter zookeeperRegistryCenter; - @Autowired private WorkerConfig workerConfig; @@ -66,43 +64,39 @@ public class ServerNodeManagerTest { private MasterConfig masterConfig; @Test - public void testGetMasterNodes(){ + public void testGetMasterNodes() { masterRegistry.registry(); try { //let the serverNodeManager catch the registry event Thread.sleep(2000); } catch (InterruptedException ignore) { + //ignore } Set masterNodes = serverNodeManager.getMasterNodes(); Assert.assertTrue(CollectionUtils.isNotEmpty(masterNodes)); Assert.assertEquals(1, masterNodes.size()); Assert.assertEquals(NetUtils.getAddr(masterConfig.getListenPort()), masterNodes.iterator().next()); + masterRegistry.unRegistry(); } @Test - public void testGetWorkerGroupNodes(){ + public void testGetWorkerGroupNodes() { workerRegistry.registry(); try { //let the serverNodeManager catch the registry event - Thread.sleep(2000); + Thread.sleep(3000); } catch (InterruptedException ignore) { + //ignore } Map> workerGroupNodes = serverNodeManager.getWorkerGroupNodes(); Assert.assertEquals(1, workerGroupNodes.size()); Assert.assertEquals("default".trim(), workerGroupNodes.keySet().iterator().next()); - } - @Test - public void testGetWorkerGroupNodesWithParam(){ - workerRegistry.registry(); - try { - //let the serverNodeManager catch the registry event - Thread.sleep(3000); - } catch (InterruptedException ignore) { - } Set workerNodes = serverNodeManager.getWorkerGroupNodes("default"); Assert.assertTrue(CollectionUtils.isNotEmpty(workerNodes)); Assert.assertEquals(1, workerNodes.size()); Assert.assertEquals(NetUtils.getAddr(workerConfig.getListenPort()), workerNodes.iterator().next()); + workerRegistry.unRegistry(); } + } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java index e2b8d813e0..68a646cd8f 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java @@ -23,8 +23,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClientTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClientTest.java new file mode 100644 index 0000000000..3ff6daa606 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClientTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.zk; + +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; +import org.apache.dolphinscheduler.server.registry.DependencyConfig; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.service.zk.RegisterOperator; +import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; + +import java.util.Set; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +/** + * zookeeper master client test + */ +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = {DependencyConfig.class, SpringZKServer.class, MasterRegistry.class, + ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, SpringConnectionFactory.class, + ZookeeperCachedOperator.class, ZookeeperConfig.class, ServerNodeManager.class, + ZKMasterClient.class, RegisterOperator.class}) +public class ZKMasterClientTest { + + @Autowired + private ZKMasterClient zkMasterClient; + + @Autowired + private ServerNodeManager serverNodeManager; + + @Autowired + private MasterConfig masterConfig; + + @Test + public void testZKMasterClient() { + zkMasterClient.start(); + try { + //let the serverNodeManager catch the registry event + Thread.sleep(2000); + } catch (InterruptedException ignore) { + //ignore + } + Set masterNodes = serverNodeManager.getMasterNodes(); + Assert.assertTrue(CollectionUtils.isNotEmpty(masterNodes)); + Assert.assertEquals(1, masterNodes.size()); + Assert.assertEquals(NetUtils.getAddr(masterConfig.getListenPort()), masterNodes.iterator().next()); + zkMasterClient.close(); + } + +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java index 30f1053d3a..91a7c0cd4a 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java @@ -18,17 +18,32 @@ package org.apache.dolphinscheduler.server.registry; import org.apache.dolphinscheduler.dao.AlertDao; -import org.apache.dolphinscheduler.dao.mapper.*; +import org.apache.dolphinscheduler.dao.mapper.AlertMapper; +import org.apache.dolphinscheduler.dao.mapper.CommandMapper; +import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; +import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; +import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.TenantMapper; +import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; +import org.apache.dolphinscheduler.dao.mapper.UserAlertGroupMapper; +import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager; import org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; + import org.mockito.Mockito; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -55,103 +70,108 @@ public class DependencyConfig { } @Bean - public TaskInstanceCacheManagerImpl taskInstanceCacheManagerImpl(){ + public TaskInstanceCacheManagerImpl taskInstanceCacheManagerImpl() { return Mockito.mock(TaskInstanceCacheManagerImpl.class); } @Bean - public ProcessService processService(){ + public ProcessService processService() { return Mockito.mock(ProcessService.class); } @Bean - public MasterConfig masterConfig(){ + public MasterConfig masterConfig() { return Mockito.mock(MasterConfig.class); } + + @Bean + public WorkerConfig workerConfig() { + return Mockito.mock(WorkerConfig.class); + } + @Bean - public UserMapper userMapper(){ + public UserMapper userMapper() { return Mockito.mock(UserMapper.class); } @Bean - public ProcessDefinitionMapper processDefineMapper(){ + public ProcessDefinitionMapper processDefineMapper() { return Mockito.mock(ProcessDefinitionMapper.class); } @Bean - public ProcessInstanceMapper processInstanceMapper(){ + public ProcessInstanceMapper processInstanceMapper() { return Mockito.mock(ProcessInstanceMapper.class); } @Bean - public DataSourceMapper dataSourceMapper(){ + public DataSourceMapper dataSourceMapper() { return Mockito.mock(DataSourceMapper.class); } @Bean - public ProcessInstanceMapMapper processInstanceMapMapper(){ + public ProcessInstanceMapMapper processInstanceMapMapper() { return Mockito.mock(ProcessInstanceMapMapper.class); } @Bean - public TaskInstanceMapper taskInstanceMapper(){ + public TaskInstanceMapper taskInstanceMapper() { return Mockito.mock(TaskInstanceMapper.class); } @Bean - public CommandMapper commandMapper(){ + public CommandMapper commandMapper() { return Mockito.mock(CommandMapper.class); } @Bean - public ScheduleMapper scheduleMapper(){ + public ScheduleMapper scheduleMapper() { return Mockito.mock(ScheduleMapper.class); } @Bean - public UdfFuncMapper udfFuncMapper(){ + public UdfFuncMapper udfFuncMapper() { return Mockito.mock(UdfFuncMapper.class); } @Bean - public ResourceMapper resourceMapper(){ + public ResourceMapper resourceMapper() { return Mockito.mock(ResourceMapper.class); } - - @Bean - public ErrorCommandMapper errorCommandMapper(){ + public ErrorCommandMapper errorCommandMapper() { return Mockito.mock(ErrorCommandMapper.class); } @Bean - public TenantMapper tenantMapper(){ + public TenantMapper tenantMapper() { return Mockito.mock(TenantMapper.class); } @Bean - public ProjectMapper projectMapper(){ + public ProjectMapper projectMapper() { return Mockito.mock(ProjectMapper.class); } @Bean - public TaskCallbackService taskCallbackService(){ + public TaskCallbackService taskCallbackService() { return Mockito.mock(TaskCallbackService.class); } @Bean - public HostManager hostManager(){ + public HostManager hostManager() { return new RandomHostManager(); } @Bean - public TaskResponseService taskResponseService(){ + public TaskResponseService taskResponseService() { return Mockito.mock(TaskResponseService.class); } @Bean - public TaskPriorityQueue taskPriorityQueue(){ + public TaskPriorityQueue taskPriorityQueue() { return new TaskPriorityQueueImpl(); } + } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index 7d1d24d36e..3050350dad 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; -import org.apache.dolphinscheduler.server.registry.ServerNodeManager; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; diff --git a/pom.xml b/pom.xml index 6b8fe3d020..2620836a30 100644 --- a/pom.xml +++ b/pom.xml @@ -816,13 +816,14 @@ **/server/master/dispatch/host/assign/RandomSelectorTest.java **/server/master/dispatch/host/assign/RoundRobinSelectorTest.java **/server/master/register/MasterRegistryTest.java + **/server/master/registry/ServerNodeManagerTest.java + **/server/master/zk/ZKMasterClientTest.java **/server/master/AlertManagerTest.java **/server/master/MasterCommandTest.java **/server/master/ParamsTest.java - **/server/register/ServerNodeManagerTest.java **/server/register/ZookeeperRegistryCenterTest.java **/server/utils/DataxUtilsTest.java **/server/utils/ExecutionContextTestUtils.java