Browse Source

[1.3.6-prepare][Fix-5176-5275][Server] Fix wrong excludeFilters and move zk/ZKMasterClient and registry/ServerNodeManager into master #5278 (#5276)

* [1.3.6-prepare][Improvement-5275][Server] Move zk/ZKMasterClient and registry/ServerNodeManager into master

* [1.3.6-prepare][Improvement-5275][Server] Fix code smells

* [Improvement-5275][Server] Add ZKMasterClientTest
Shiwen Cheng 4 years ago committed by GitHub
parent
commit
4c12c2ad70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  2. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
  3. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
  4. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
  5. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  6. 23
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java
  7. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  8. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
  9. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
  10. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
  11. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
  12. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
  13. 30
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManagerTest.java
  14. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java
  15. 78
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClientTest.java
  16. 68
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java
  17. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
  18. 3
      pom.xml

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -28,7 +28,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.server.master.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
@ -117,7 +117,8 @@ public class MasterServer implements IStoppable {
this.nettyRemotingServer.start(); this.nettyRemotingServer.start();
// self tolerant // self tolerant
this.zkMasterClient.start(this); this.zkMasterClient.start();
this.zkMasterClient.setStoppable(this);
// scheduler start // scheduler start
this.masterSchedulerService.start(); this.masterSchedulerService.start();

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java

@ -28,7 +28,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java

@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ServerNodeManager.java → dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.registry; package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.enums.ZKNodeType;
@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.service.zk.AbstractListener; import org.apache.dolphinscheduler.service.zk.AbstractListener;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient; import org.apache.dolphinscheduler.service.zk.AbstractZKClient;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.server.master.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.imps.CuratorFrameworkState;

23
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java → dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.zk; package org.apache.dolphinscheduler.server.master.zk;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
@ -73,7 +73,7 @@ public class ZKMasterClient extends AbstractZKClient {
@Autowired @Autowired
private MasterRegistry masterRegistry; private MasterRegistry masterRegistry;
public void start(IStoppable stoppable) { public void start() {
InterProcessMutex mutex = null; InterProcessMutex mutex = null;
try { try {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
@ -83,7 +83,6 @@ public class ZKMasterClient extends AbstractZKClient {
// master registry // master registry
masterRegistry.registry(); masterRegistry.registry();
masterRegistry.getZookeeperRegistryCenter().setStoppable(stoppable);
String registryPath = this.masterRegistry.getMasterPath(); String registryPath = this.masterRegistry.getMasterPath();
masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registryPath, ZKNodeType.MASTER, Constants.DELETE_ZK_OP); masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registryPath, ZKNodeType.MASTER, Constants.DELETE_ZK_OP);
@ -106,6 +105,10 @@ public class ZKMasterClient extends AbstractZKClient {
} }
} }
public void setStoppable(IStoppable stoppable) {
masterRegistry.getZookeeperRegistryCenter().setStoppable(stoppable);
}
@Override @Override
public void close() { public void close() {
masterRegistry.unRegistry(); masterRegistry.unRegistry();
@ -138,7 +141,7 @@ public class ZKMasterClient extends AbstractZKClient {
* @param failover is failover * @param failover is failover
*/ */
private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) { private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {
logger.info("{} node deleted : {}", zkNodeType.toString(), path); logger.info("{} node deleted : {}", zkNodeType, path);
InterProcessMutex mutex = null; InterProcessMutex mutex = null;
try { try {
String failoverPath = getFailoverLockPath(zkNodeType); String failoverPath = getFailoverLockPath(zkNodeType);
@ -161,7 +164,7 @@ public class ZKMasterClient extends AbstractZKClient {
failoverServerWhenDown(serverHost, zkNodeType); failoverServerWhenDown(serverHost, zkNodeType);
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("{} server failover failed.", zkNodeType.toString()); logger.error("{} server failover failed.", zkNodeType);
logger.error("failover exception ", e); logger.error("failover exception ", e);
} finally { } finally {
releaseMutex(mutex); releaseMutex(mutex);
@ -173,15 +176,15 @@ public class ZKMasterClient extends AbstractZKClient {
* *
* @param serverHost server host * @param serverHost server host
* @param zkNodeType zookeeper node type * @param zkNodeType zookeeper node type
* @throws Exception exception
*/ */
private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) {
switch (zkNodeType) { switch (zkNodeType) {
case MASTER: case MASTER:
failoverMaster(serverHost); failoverMaster(serverHost);
break; break;
case WORKER: case WORKER:
failoverWorker(serverHost, true); failoverWorker(serverHost, true);
break;
default: default:
break; break;
} }
@ -194,7 +197,6 @@ public class ZKMasterClient extends AbstractZKClient {
* @return fail over lock path * @return fail over lock path
*/ */
private String getFailoverLockPath(ZKNodeType zkNodeType) { private String getFailoverLockPath(ZKNodeType zkNodeType) {
switch (zkNodeType) { switch (zkNodeType) {
case MASTER: case MASTER:
return getMasterFailoverLockPath(); return getMasterFailoverLockPath();
@ -250,7 +252,7 @@ public class ZKMasterClient extends AbstractZKClient {
* @param taskInstance task instance * @param taskInstance task instance
* @return true if task instance need fail over * @return true if task instance need fail over
*/ */
private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception { private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) {
boolean taskNeedFailover = true; boolean taskNeedFailover = true;
@ -312,9 +314,8 @@ public class ZKMasterClient extends AbstractZKClient {
* *
* @param workerHost worker host * @param workerHost worker host
* @param needCheckWorkerAlive need check worker alive * @param needCheckWorkerAlive need check worker alive
* @throws Exception exception
*/ */
private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) {
logger.info("start worker[{}] failover ...", workerHost); logger.info("start worker[{}] failover ...", workerHost);
List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -53,9 +53,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
@ComponentScan.Filter(type = FilterType.REGEX, pattern = { @ComponentScan.Filter(type = FilterType.REGEX, pattern = {
"org.apache.dolphinscheduler.server.master.*", "org.apache.dolphinscheduler.server.master.*",
"org.apache.dolphinscheduler.server.monitor.*", "org.apache.dolphinscheduler.server.monitor.*",
"org.apache.dolphinscheduler.server.log.*", "org.apache.dolphinscheduler.server.log.*"
"org.apache.dolphinscheduler.server.zk.ZKMasterClient",
"org.apache.dolphinscheduler.server.registry.ServerNodeManager"
}) })
}) })
@EnableTransactionManagement @EnableTransactionManagement

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -38,11 +38,11 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.master.zk.ZKMasterClient;
import org.apache.dolphinscheduler.server.registry.DependencyConfig; import org.apache.dolphinscheduler.server.registry.DependencyConfig;
import org.apache.dolphinscheduler.server.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriority;

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java

@ -23,8 +23,8 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.registry.DependencyConfig; import org.apache.dolphinscheduler.server.registry.DependencyConfig;
import org.apache.dolphinscheduler.server.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils; import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java

@ -30,8 +30,8 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.registry.DependencyConfig; import org.apache.dolphinscheduler.server.registry.DependencyConfig;
import org.apache.dolphinscheduler.server.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java

@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils; import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils;
import org.junit.Assert; import org.junit.Assert;

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java

@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.registry.DependencyConfig; import org.apache.dolphinscheduler.server.registry.DependencyConfig;
import org.apache.dolphinscheduler.server.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;

30
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ServerNodeManagerTest.java → dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManagerTest.java

@ -15,13 +15,14 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.registry; package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.registry.DependencyConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.server.zk.SpringZKServer;
@ -42,7 +43,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
* server node manager test * server node manager test
*/ */
@RunWith(SpringJUnit4ClassRunner.class) @RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, MasterRegistry.class,WorkerRegistry.class, @ContextConfiguration(classes = {DependencyConfig.class, SpringZKServer.class, MasterRegistry.class,WorkerRegistry.class,
ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, SpringConnectionFactory.class, ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, SpringConnectionFactory.class,
ZookeeperCachedOperator.class, ZookeeperConfig.class, ServerNodeManager.class}) ZookeeperCachedOperator.class, ZookeeperConfig.class, ServerNodeManager.class})
public class ServerNodeManagerTest { public class ServerNodeManagerTest {
@ -56,9 +57,6 @@ public class ServerNodeManagerTest {
@Autowired @Autowired
private WorkerRegistry workerRegistry; private WorkerRegistry workerRegistry;
@Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter;
@Autowired @Autowired
private WorkerConfig workerConfig; private WorkerConfig workerConfig;
@ -66,43 +64,39 @@ public class ServerNodeManagerTest {
private MasterConfig masterConfig; private MasterConfig masterConfig;
@Test @Test
public void testGetMasterNodes(){ public void testGetMasterNodes() {
masterRegistry.registry(); masterRegistry.registry();
try { try {
//let the serverNodeManager catch the registry event //let the serverNodeManager catch the registry event
Thread.sleep(2000); Thread.sleep(2000);
} catch (InterruptedException ignore) { } catch (InterruptedException ignore) {
//ignore
} }
Set<String> masterNodes = serverNodeManager.getMasterNodes(); Set<String> masterNodes = serverNodeManager.getMasterNodes();
Assert.assertTrue(CollectionUtils.isNotEmpty(masterNodes)); Assert.assertTrue(CollectionUtils.isNotEmpty(masterNodes));
Assert.assertEquals(1, masterNodes.size()); Assert.assertEquals(1, masterNodes.size());
Assert.assertEquals(NetUtils.getAddr(masterConfig.getListenPort()), masterNodes.iterator().next()); Assert.assertEquals(NetUtils.getAddr(masterConfig.getListenPort()), masterNodes.iterator().next());
masterRegistry.unRegistry();
} }
@Test @Test
public void testGetWorkerGroupNodes(){ public void testGetWorkerGroupNodes() {
workerRegistry.registry(); workerRegistry.registry();
try { try {
//let the serverNodeManager catch the registry event //let the serverNodeManager catch the registry event
Thread.sleep(2000); Thread.sleep(3000);
} catch (InterruptedException ignore) { } catch (InterruptedException ignore) {
//ignore
} }
Map<String, Set<String>> workerGroupNodes = serverNodeManager.getWorkerGroupNodes(); Map<String, Set<String>> workerGroupNodes = serverNodeManager.getWorkerGroupNodes();
Assert.assertEquals(1, workerGroupNodes.size()); Assert.assertEquals(1, workerGroupNodes.size());
Assert.assertEquals("default".trim(), workerGroupNodes.keySet().iterator().next()); Assert.assertEquals("default".trim(), workerGroupNodes.keySet().iterator().next());
}
@Test
public void testGetWorkerGroupNodesWithParam(){
workerRegistry.registry();
try {
//let the serverNodeManager catch the registry event
Thread.sleep(3000);
} catch (InterruptedException ignore) {
}
Set<String> workerNodes = serverNodeManager.getWorkerGroupNodes("default"); Set<String> workerNodes = serverNodeManager.getWorkerGroupNodes("default");
Assert.assertTrue(CollectionUtils.isNotEmpty(workerNodes)); Assert.assertTrue(CollectionUtils.isNotEmpty(workerNodes));
Assert.assertEquals(1, workerNodes.size()); Assert.assertEquals(1, workerNodes.size());
Assert.assertEquals(NetUtils.getAddr(workerConfig.getListenPort()), workerNodes.iterator().next()); Assert.assertEquals(NetUtils.getAddr(workerConfig.getListenPort()), workerNodes.iterator().next());
workerRegistry.unRegistry();
} }
} }

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java

@ -23,8 +23,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer; import org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.registry.DependencyConfig; import org.apache.dolphinscheduler.server.registry.DependencyConfig;
import org.apache.dolphinscheduler.server.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;

78
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClientTest.java

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.zk;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.registry.DependencyConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.zk.RegisterOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* zookeeper master client test
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {DependencyConfig.class, SpringZKServer.class, MasterRegistry.class,
ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, SpringConnectionFactory.class,
ZookeeperCachedOperator.class, ZookeeperConfig.class, ServerNodeManager.class,
ZKMasterClient.class, RegisterOperator.class})
public class ZKMasterClientTest {
@Autowired
private ZKMasterClient zkMasterClient;
@Autowired
private ServerNodeManager serverNodeManager;
@Autowired
private MasterConfig masterConfig;
@Test
public void testZKMasterClient() {
zkMasterClient.start();
try {
//let the serverNodeManager catch the registry event
Thread.sleep(2000);
} catch (InterruptedException ignore) {
//ignore
}
Set<String> masterNodes = serverNodeManager.getMasterNodes();
Assert.assertTrue(CollectionUtils.isNotEmpty(masterNodes));
Assert.assertEquals(1, masterNodes.size());
Assert.assertEquals(NetUtils.getAddr(masterConfig.getListenPort()), masterNodes.iterator().next());
zkMasterClient.close();
}
}

68
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java

@ -18,17 +18,32 @@
package org.apache.dolphinscheduler.server.registry; package org.apache.dolphinscheduler.server.registry;
import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.dolphinscheduler.dao.mapper.AlertMapper;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserAlertGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager; import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
import org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager; import org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@ -55,103 +70,108 @@ public class DependencyConfig {
} }
@Bean @Bean
public TaskInstanceCacheManagerImpl taskInstanceCacheManagerImpl(){ public TaskInstanceCacheManagerImpl taskInstanceCacheManagerImpl() {
return Mockito.mock(TaskInstanceCacheManagerImpl.class); return Mockito.mock(TaskInstanceCacheManagerImpl.class);
} }
@Bean @Bean
public ProcessService processService(){ public ProcessService processService() {
return Mockito.mock(ProcessService.class); return Mockito.mock(ProcessService.class);
} }
@Bean @Bean
public MasterConfig masterConfig(){ public MasterConfig masterConfig() {
return Mockito.mock(MasterConfig.class); return Mockito.mock(MasterConfig.class);
} }
@Bean
public WorkerConfig workerConfig() {
return Mockito.mock(WorkerConfig.class);
}
@Bean @Bean
public UserMapper userMapper(){ public UserMapper userMapper() {
return Mockito.mock(UserMapper.class); return Mockito.mock(UserMapper.class);
} }
@Bean @Bean
public ProcessDefinitionMapper processDefineMapper(){ public ProcessDefinitionMapper processDefineMapper() {
return Mockito.mock(ProcessDefinitionMapper.class); return Mockito.mock(ProcessDefinitionMapper.class);
} }
@Bean @Bean
public ProcessInstanceMapper processInstanceMapper(){ public ProcessInstanceMapper processInstanceMapper() {
return Mockito.mock(ProcessInstanceMapper.class); return Mockito.mock(ProcessInstanceMapper.class);
} }
@Bean @Bean
public DataSourceMapper dataSourceMapper(){ public DataSourceMapper dataSourceMapper() {
return Mockito.mock(DataSourceMapper.class); return Mockito.mock(DataSourceMapper.class);
} }
@Bean @Bean
public ProcessInstanceMapMapper processInstanceMapMapper(){ public ProcessInstanceMapMapper processInstanceMapMapper() {
return Mockito.mock(ProcessInstanceMapMapper.class); return Mockito.mock(ProcessInstanceMapMapper.class);
} }
@Bean @Bean
public TaskInstanceMapper taskInstanceMapper(){ public TaskInstanceMapper taskInstanceMapper() {
return Mockito.mock(TaskInstanceMapper.class); return Mockito.mock(TaskInstanceMapper.class);
} }
@Bean @Bean
public CommandMapper commandMapper(){ public CommandMapper commandMapper() {
return Mockito.mock(CommandMapper.class); return Mockito.mock(CommandMapper.class);
} }
@Bean @Bean
public ScheduleMapper scheduleMapper(){ public ScheduleMapper scheduleMapper() {
return Mockito.mock(ScheduleMapper.class); return Mockito.mock(ScheduleMapper.class);
} }
@Bean @Bean
public UdfFuncMapper udfFuncMapper(){ public UdfFuncMapper udfFuncMapper() {
return Mockito.mock(UdfFuncMapper.class); return Mockito.mock(UdfFuncMapper.class);
} }
@Bean @Bean
public ResourceMapper resourceMapper(){ public ResourceMapper resourceMapper() {
return Mockito.mock(ResourceMapper.class); return Mockito.mock(ResourceMapper.class);
} }
@Bean @Bean
public ErrorCommandMapper errorCommandMapper(){ public ErrorCommandMapper errorCommandMapper() {
return Mockito.mock(ErrorCommandMapper.class); return Mockito.mock(ErrorCommandMapper.class);
} }
@Bean @Bean
public TenantMapper tenantMapper(){ public TenantMapper tenantMapper() {
return Mockito.mock(TenantMapper.class); return Mockito.mock(TenantMapper.class);
} }
@Bean @Bean
public ProjectMapper projectMapper(){ public ProjectMapper projectMapper() {
return Mockito.mock(ProjectMapper.class); return Mockito.mock(ProjectMapper.class);
} }
@Bean @Bean
public TaskCallbackService taskCallbackService(){ public TaskCallbackService taskCallbackService() {
return Mockito.mock(TaskCallbackService.class); return Mockito.mock(TaskCallbackService.class);
} }
@Bean @Bean
public HostManager hostManager(){ public HostManager hostManager() {
return new RandomHostManager(); return new RandomHostManager();
} }
@Bean @Bean
public TaskResponseService taskResponseService(){ public TaskResponseService taskResponseService() {
return Mockito.mock(TaskResponseService.class); return Mockito.mock(TaskResponseService.class);
} }
@Bean @Bean
public TaskPriorityQueue taskPriorityQueue(){ public TaskPriorityQueue taskPriorityQueue() {
return new TaskPriorityQueueImpl(); return new TaskPriorityQueueImpl();
} }
} }

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java

@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;

3
pom.xml

@ -816,13 +816,14 @@
<include>**/server/master/dispatch/host/assign/RandomSelectorTest.java</include> <include>**/server/master/dispatch/host/assign/RandomSelectorTest.java</include>
<include>**/server/master/dispatch/host/assign/RoundRobinSelectorTest.java</include> <include>**/server/master/dispatch/host/assign/RoundRobinSelectorTest.java</include>
<include>**/server/master/register/MasterRegistryTest.java</include> <include>**/server/master/register/MasterRegistryTest.java</include>
<include>**/server/master/registry/ServerNodeManagerTest.java</include>
<include>**/server/master/zk/ZKMasterClientTest.java</include>
<include>**/server/master/AlertManagerTest.java</include> <include>**/server/master/AlertManagerTest.java</include>
<include>**/server/master/MasterCommandTest.java</include> <include>**/server/master/MasterCommandTest.java</include>
<!-- <include>**/server/master/DependentTaskTest.java</include> <!-- <include>**/server/master/DependentTaskTest.java</include>
<include>**/server/master/ConditionsTaskTest.java</include> <include>**/server/master/ConditionsTaskTest.java</include>
<include>**/server/master/MasterExecThreadTest.java</include> --> <include>**/server/master/MasterExecThreadTest.java</include> -->
<include>**/server/master/ParamsTest.java</include> <include>**/server/master/ParamsTest.java</include>
<include>**/server/register/ServerNodeManagerTest.java</include>
<include>**/server/register/ZookeeperRegistryCenterTest.java</include> <include>**/server/register/ZookeeperRegistryCenterTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include> <include>**/server/utils/DataxUtilsTest.java</include>
<include>**/server/utils/ExecutionContextTestUtils.java</include> <include>**/server/utils/ExecutionContextTestUtils.java</include>

Loading…
Cancel
Save