Browse Source

[Improvement][Api Module] refactor registry client, remove spring annotation (#5814)

* fix: refactor registry client, remove spring annotation

* fix UT

* fix UT

* fix checkstyle

* fix UT

* fix UT

* fix UT

* fix: Rename RegistryCenterUtils method name

Co-authored-by: wen-hemin <wenhemin@apache.com>
2.0.7-release
wen-hemin 3 years ago committed by GitHub
parent
commit
596821a5b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 30
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java
  2. 24
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
  3. 38
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtils.java
  4. 22
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/AbstractControllerTest.java
  5. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/MonitorControllerTest.java
  6. 38
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
  7. 21
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
  8. 21
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtilsTest.java
  9. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
  10. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
  11. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
  12. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RegistryMonitorImpl.java
  13. 19
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
  14. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/RemoveZKNode.java
  15. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  16. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
  17. 26
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
  18. 23
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManagerTest.java
  19. 14
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
  20. 34
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryClientTest.java

30
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java

@ -17,27 +17,22 @@
package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.utils.RegistryMonitor;
import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerServerModel;
import org.apache.dolphinscheduler.dao.MonitorDBDao;
import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -49,12 +44,6 @@ import com.google.common.collect.Sets;
@Service
public class MonitorServiceImpl extends BaseServiceImpl implements MonitorService {
@Autowired
private RegistryMonitor registryMonitor;
@Autowired
private RegistryClient registryClient;
@Autowired
private MonitorDBDao monitorDBDao;
@ -105,7 +94,7 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
public Map<String,Object> queryZookeeperState(User loginUser) {
Map<String, Object> result = new HashMap<>();
List<ZookeeperRecord> zookeeperRecordList = registryMonitor.zookeeperInfoList();
List<ZookeeperRecord> zookeeperRecordList = RegistryCenterUtils.zookeeperInfoList();
result.put(Constants.DATA_LIST, zookeeperRecordList);
putMsg(result, Status.SUCCESS);
@ -160,10 +149,7 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
@Override
public List<Server> getServerListFromRegistry(boolean isMaster) {
checkNotNull(registryMonitor);
NodeType nodeType = isMaster ? NodeType.MASTER : NodeType.WORKER;
return registryClient.getServerList(nodeType);
return isMaster ? RegistryCenterUtils.getMasterServers() : RegistryCenterUtils.getWorkerServers();
}
}

24
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java

@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.RegistryMonitor;
import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.ArrayList;
import java.util.Date;
@ -40,8 +39,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -61,16 +58,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
@Autowired
WorkerGroupMapper workerGroupMapper;
@Autowired
private RegistryMonitor registryMonitor;
@Autowired
ProcessInstanceMapper processInstanceMapper;
@Resource
RegistryClient registryClient;
/**
* create or update a worker group
*
@ -147,7 +137,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
}
// check zookeeper
String workerGroupPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SLASH + workerGroup.getName();
return registryClient.isExisted(workerGroupPath);
return RegistryCenterUtils.isNodeExisted(workerGroupPath);
}
/**
@ -157,7 +147,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
* @return boolean
*/
private String checkWorkerGroupAddrList(WorkerGroup workerGroup) {
Map<String, String> serverMaps = registryMonitor.getServerMaps(NodeType.WORKER, true);
Map<String, String> serverMaps = RegistryCenterUtils.getServerMaps(NodeType.WORKER, true);
if (Strings.isNullOrEmpty(workerGroup.getAddrList())) {
return null;
}
@ -258,7 +248,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
String workerPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
List<String> workerGroupList = null;
try {
workerGroupList = registryClient.getChildrenKeys(workerPath);
workerGroupList = RegistryCenterUtils.getChildrenNodes(workerPath);
} catch (Exception e) {
logger.error("getWorkerGroups exception: {}, workerPath: {}, isPaging: {}", e.getMessage(), workerPath, isPaging);
}
@ -276,7 +266,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
String workerGroupPath = workerPath + Constants.SLASH + workerGroup;
List<String> childrenNodes = null;
try {
childrenNodes = registryClient.getChildrenKeys(workerGroupPath);
childrenNodes = RegistryCenterUtils.getChildrenNodes(workerGroupPath);
} catch (Exception e) {
logger.error("getChildrenNodes exception: {}, workerGroupPath: {}", e.getMessage(), workerGroupPath);
}
@ -287,7 +277,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
wg.setName(workerGroup);
if (isPaging) {
wg.setAddrList(String.join(Constants.COMMA, childrenNodes));
String registeredValue = registryClient.get(workerGroupPath + Constants.SLASH + childrenNodes.get(0));
String registeredValue = RegistryCenterUtils.getNodeData(workerGroupPath + Constants.SLASH + childrenNodes.get(0));
wg.setCreateTime(DateUtils.stringToDate(registeredValue.split(Constants.COMMA)[6]));
wg.setUpdateTime(DateUtils.stringToDate(registeredValue.split(Constants.COMMA)[7]));
wg.setSystemDefault(true);
@ -334,7 +324,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
@Override
public Map<String, Object> getWorkerAddressList() {
Map<String, Object> result = new HashMap<>();
List<String> serverNodeList = registryMonitor.getServerNodeList(NodeType.WORKER, true);
List<String> serverNodeList = RegistryCenterUtils.getServerNodeList(NodeType.WORKER, true);
result.put(Constants.DATA_LIST, serverNodeList);
putMsg(result, Status.SUCCESS);
return result;

38
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegistryMonitor.java → dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtils.java

@ -26,31 +26,19 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* monitor zookeeper info todo registry-spi
* fixme Some of the information obtained in the api belongs to the unique information of zk.
* I am not sure whether there is a good abstraction method. This is related to whether the specific plug-in is provided.
*/
@Component
public class RegistryMonitor {
public class RegistryCenterUtils {
@Autowired
RegistryClient registryClient;
@PostConstruct
public void initRegistry() {
registryClient.init();
}
private static RegistryClient registryClient = RegistryClient.getInstance();
/**
* @return zookeeper info list
*/
public List<ZookeeperRecord> zookeeperInfoList() {
public static List<ZookeeperRecord> zookeeperInfoList() {
return null;
}
@ -59,7 +47,7 @@ public class RegistryMonitor {
*
* @return master server information
*/
public List<Server> getMasterServers() {
public static List<Server> getMasterServers() {
return registryClient.getServerList(NodeType.MASTER);
}
@ -68,7 +56,7 @@ public class RegistryMonitor {
*
* @return worker server informations
*/
public List<Server> getWorkerServers() {
public static List<Server> getWorkerServers() {
return registryClient.getServerList(NodeType.WORKER);
}
@ -106,11 +94,23 @@ public class RegistryMonitor {
return list;
}
public Map<String, String> getServerMaps(NodeType nodeType, boolean hostOnly) {
public static Map<String, String> getServerMaps(NodeType nodeType, boolean hostOnly) {
return registryClient.getServerMaps(nodeType, hostOnly);
}
public List<String> getServerNodeList(NodeType nodeType, boolean hostOnly) {
public static List<String> getServerNodeList(NodeType nodeType, boolean hostOnly) {
return registryClient.getServerNodeList(nodeType, hostOnly);
}
public static boolean isNodeExisted(String key) {
return registryClient.isExisted(key);
}
public static List<String> getChildrenNodes(final String key) {
return registryClient.getChildrenKeys(key);
}
public static String getNodeData(String key) {
return registryClient.get(key);
}
}

22
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/AbstractControllerTest.java

@ -17,22 +17,24 @@
package org.apache.dolphinscheduler.api.controller;
import static org.mockito.Mockito.doNothing;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.service.SessionService;
import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
@ -41,8 +43,11 @@ import org.springframework.web.context.WebApplicationContext;
/**
* abstract controller test
*/
@RunWith(SpringRunner.class)
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(SpringRunner.class)
@SpringBootTest(classes = ApiApplicationServer.class)
@PrepareForTest({ RegistryCenterUtils.class, RegistryClient.class })
@PowerMockIgnore({"javax.management.*"})
public class AbstractControllerTest {
public static final String SESSION_ID = "sessionId";
@ -59,12 +64,11 @@ public class AbstractControllerTest {
protected String sessionId;
@MockBean
RegistryClient registryClient;
@Before
public void setUp() {
doNothing().when(registryClient).init();
PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
PowerMockito.mockStatic(RegistryCenterUtils.class);
mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();
createSession();

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/MonitorControllerTest.java

@ -55,7 +55,6 @@ public class MonitorControllerTest extends AbstractControllerTest {
logger.info(mvcResult.getResponse().getContentAsString());
}
@Test
public void testListWorker() throws Exception {
@ -72,7 +71,6 @@ public class MonitorControllerTest extends AbstractControllerTest {
logger.info(mvcResult.getResponse().getContentAsString());
}
@Test
public void testQueryDatabaseState() throws Exception {
MvcResult mvcResult = mockMvc.perform(get("/monitor/database")
@ -88,7 +86,6 @@ public class MonitorControllerTest extends AbstractControllerTest {
logger.info(mvcResult.getResponse().getContentAsString());
}
@Test
public void testQueryZookeeperState() throws Exception {
MvcResult mvcResult = mockMvc.perform(get("/monitor/zookeeper/list")

38
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java

@ -22,14 +22,25 @@ import static org.springframework.test.web.servlet.request.MockMvcRequestBuilder
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
@ -38,15 +49,26 @@ import org.springframework.util.MultiValueMap;
/**
* worker group controller test
*/
public class WorkerGroupControllerTest extends AbstractControllerTest{
public class WorkerGroupControllerTest extends AbstractControllerTest {
private static Logger logger = LoggerFactory.getLogger(WorkerGroupControllerTest.class);
@MockBean
private WorkerGroupMapper workerGroupMapper;
@MockBean
private ProcessInstanceMapper processInstanceMapper;
@Test
public void testSaveWorkerGroup() throws Exception {
Map<String, String> serverMaps = new HashMap<>();
serverMaps.put("192.168.0.1", "192.168.0.1");
serverMaps.put("192.168.0.2", "192.168.0.2");
PowerMockito.when(RegistryCenterUtils.getServerMaps(NodeType.WORKER, true)).thenReturn(serverMaps);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("name","cxc_work_group");
paramsMap.add("ipList","192.16.12,192.168,10,12");
paramsMap.add("addrList","192.168.0.1,192.168.0.2");
MvcResult mvcResult = mockMvc.perform(post("/worker-group/save")
.header("sessionId", sessionId)
.params(paramsMap))
@ -91,9 +113,17 @@ public class WorkerGroupControllerTest extends AbstractControllerTest{
@Test
public void testDeleteById() throws Exception {
WorkerGroup workerGroup = new WorkerGroup();
workerGroup.setId(12);
workerGroup.setName("测试");
Mockito.when(workerGroupMapper.selectById(12)).thenReturn(workerGroup);
Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus("测试", Constants.NOT_TERMINATED_STATES)).thenReturn(null);
Mockito.when(workerGroupMapper.deleteById(12)).thenReturn(1);
Mockito.when(processInstanceMapper.updateProcessInstanceByWorkerGroupName("测试", "")).thenReturn(1);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("id","12");
MvcResult mvcResult = mockMvc.perform(get("/worker-group/delete-by-id")
MvcResult mvcResult = mockMvc.perform(post("/worker-group/delete-by-id")
.header("sessionId", sessionId)
.params(paramsMap))
.andExpect(status().isOk())

21
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java

@ -26,23 +26,30 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
/**
* worker group service test
*/
@RunWith(MockitoJUnitRunner.class)
@RunWith(PowerMockRunner.class)
@PrepareForTest({ RegistryClient.class })
@PowerMockIgnore({"javax.management.*"})
public class WorkerGroupServiceTest {
@ -69,13 +76,13 @@ public class WorkerGroupServiceTest {
List<String> workerGroupStrList = new ArrayList<>();
workerGroupStrList.add("default");
workerGroupStrList.add("test");
Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath)).thenReturn(workerGroupStrList);
Mockito.when(zookeeperCachedOperator.getChildrenNodes(workerPath)).thenReturn(workerGroupStrList);
List<String> defaultAddressList = new ArrayList<>();
defaultAddressList.add("192.168.220.188:1234");
defaultAddressList.add("192.168.220.189:1234");
Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath + "/default")).thenReturn(defaultAddressList);
Mockito.when(zookeeperCachedOperator.getChildrenNodes(workerPath + "/default")).thenReturn(defaultAddressList);
Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/" + defaultAddressList.get(0))).thenReturn("0.01,0.17,0.03,25.83,8.0,1.0,2020-07-21 11:17:59,2020-07-21 14:39:20,0,13238");
}
@ -119,6 +126,12 @@ public class WorkerGroupServiceTest {
PageInfo<WorkerGroup> pageInfo = (PageInfo) result.get(Constants.DATA_LIST);
Assert.assertEquals(pageInfo.getLists().size(), 1);
}*/
@Before
public void before() {
PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
}
@Test
public void testQueryAllGroup() {
Map<String, Object> result = workerGroupService.queryAllGroup();

21
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegistryMonitorUtilsTest.java → dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtilsTest.java

@ -14,35 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.utils;
import org.apache.dolphinscheduler.common.model.Server;
import java.util.List;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.util.List;
/**
* zookeeper monitor utils test
*/
@Ignore
public class RegistryMonitorUtilsTest {
public class RegistryCenterUtilsTest {
@Test
public void testGetMasterList(){
RegistryMonitor registryMonitor = new RegistryMonitor();
List<Server> masterServerList = registryMonitor.getMasterServers();
List<Server> workerServerList = registryMonitor.getWorkerServers();
List<Server> masterServerList = RegistryCenterUtils.getMasterServers();
List<Server> workerServerList = RegistryCenterUtils.getWorkerServers();
Assert.assertTrue(masterServerList.size() >= 0);
Assert.assertTrue(workerServerList.size() >= 0);
}
}
}

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -76,7 +76,7 @@ public class MasterRegistryClient {
*/
@Autowired
private ProcessService processService;
@Autowired
private RegistryClient registryClient;
/**
@ -327,8 +327,8 @@ public class MasterRegistryClient {
@PostConstruct
public void init() {
this.startTime = DateUtils.dateToString(new Date());
this.registryClient = RegistryClient.getInstance();
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
registryClient.init();
}
/**

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@ -100,8 +100,7 @@ public class ServerNodeManager implements InitializingBean {
/**
* zk client
*/
@Autowired
private RegistryClient registryClient;
private RegistryClient registryClient = RegistryClient.getInstance();
/**
* worker group mapper

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java

@ -66,7 +66,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
super(taskInstance);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
this.registryClient = SpringApplicationContext.getBean(RegistryClient.class);
this.registryClient = RegistryClient.getInstance();
}
/**

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RegistryMonitorImpl.java

@ -23,9 +23,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* zk monitor server impl
*/
@ -35,9 +35,7 @@ public class RegistryMonitorImpl extends AbstractMonitor {
/**
* zookeeper operator
*/
@Autowired
private RegistryClient registryClient;
private RegistryClient registryClient = RegistryClient.getInstance();
/**
* get active nodes map by path

19
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java

@ -19,15 +19,13 @@ package org.apache.dolphinscheduler.server.registry;
import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
import java.util.Date;
import java.util.Set;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Date;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -46,9 +44,6 @@ public class HeartBeatTask implements Runnable {
private String serverType;
private RegistryClient registryClient;
// server stop or not
protected IStoppable stoppable = null;
public HeartBeatTask(String startTime,
double maxCpuloadAvg,
double reservedMemory,
@ -124,12 +119,4 @@ public class HeartBeatTask implements Runnable {
}
}
/**
* for stop server
*
* @param serverStoppable server stoppable interface
*/
public void setStoppable(IStoppable serverStoppable) {
this.stoppable = serverStoppable;
}
}

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/RemoveZKNode.java

@ -18,10 +18,8 @@
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
@ -34,21 +32,17 @@ public class RemoveZKNode implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(RemoveZKNode.class);
/**
* zookeeper operator
*/
@Autowired
private RegistryClient registryClient;
private RegistryClient registryClient = RegistryClient.getInstance();
public static void main(String[] args) {
new SpringApplicationBuilder(RemoveZKNode.class).web(WebApplicationType.NONE).run(args);
}
@Override
public void run(String... args) throws Exception {
if (args.length != ARGS_LENGTH) {
logger.error("Usage: <node>");
return;
@ -56,6 +50,5 @@ public class RemoveZKNode implements CommandLineRunner {
registryClient.remove(args[0]);
registryClient.close();
}
}

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -19,6 +19,9 @@ package org.apache.dolphinscheduler.server.worker.processor;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
@ -28,13 +31,8 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import io.netty.channel.Channel;
@ -60,16 +58,15 @@ public class TaskCallbackService {
/**
* zookeeper registry center
*/
@Autowired
private RegistryClient registryClient;
/**
* netty remoting client
*/
private final NettyRemotingClient nettyRemotingClient;
public TaskCallbackService() {
this.registryClient = RegistryClient.getInstance();
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java

@ -67,8 +67,7 @@ public class WorkerRegistryClient {
*/
private ScheduledExecutorService heartBeatExecutor;
@Autowired
RegistryClient registryClient;
private RegistryClient registryClient;
/**
* worker start time
@ -81,8 +80,8 @@ public class WorkerRegistryClient {
public void initWorkRegistry() {
this.workerGroups = workerConfig.getWorkerGroups();
this.startTime = DateUtils.dateToString(new Date());
this.registryClient = RegistryClient.getInstance();
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
registryClient.init();
}
/**
@ -162,8 +161,4 @@ public class WorkerRegistryClient {
registryClient.setStoppable(stoppable);
}
public void closeRegistry() {
unRegistry();
}
}

26
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java

@ -20,6 +20,10 @@ package org.apache.dolphinscheduler.server.master.registry;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
@ -28,23 +32,24 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.test.util.ReflectionTestUtils;
/**
* MasterRegistryClientTest
*/
@RunWith(MockitoJUnitRunner.class)
@RunWith(PowerMockRunner.class)
@PrepareForTest({ RegistryClient.class })
@PowerMockIgnore({"javax.management.*"})
public class MasterRegistryClientTest {
@InjectMocks
@ -53,21 +58,25 @@ public class MasterRegistryClientTest {
@Mock
private MasterConfig masterConfig;
@Mock
private RegistryClient registryClient;
@Mock
private ScheduledExecutorService heartBeatExecutor;
@Mock
private ProcessService processService;
@Before
public void before() throws Exception {
PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
registryClient = PowerMockito.mock(RegistryClient.class);
given(registryClient.getLock(Mockito.anyString())).willReturn(true);
given(registryClient.getMasterFailoverLockPath()).willReturn("/path");
given(registryClient.releaseLock(Mockito.anyString())).willReturn(true);
given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn("127.0.0.1:8080");
doNothing().when(registryClient).handleDeadServer(Mockito.anyString(), Mockito.any(NodeType.class), Mockito.anyString());
ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setHost("127.0.0.1:8080");
@ -96,7 +105,6 @@ public class MasterRegistryClientTest {
@Test
public void removeNodePathTest() {
masterRegistryClient.removeNodePath("/path", NodeType.MASTER, false);
masterRegistryClient.removeNodePath("/path", NodeType.MASTER, true);
//Cannot mock static methods

23
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManagerTest.java

@ -21,23 +21,24 @@ import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
/**
* server node manager test
*/
@RunWith(MockitoJUnitRunner.class)
@RunWith(PowerMockRunner.class)
@PrepareForTest({ RegistryClient.class })
@PowerMockIgnore({"javax.management.*"})
public class ServerNodeManagerTest {
@InjectMocks
ServerNodeManager serverNodeManager;
@Mock
private RegistryClient registryClient;
private ServerNodeManager serverNodeManager;
@Mock
private WorkerGroupMapper workerGroupMapper;
@ -45,6 +46,12 @@ public class ServerNodeManagerTest {
@Mock
private AlertDao alertDao;
@Before
public void before() {
PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
serverNodeManager = PowerMockito.mock(ServerNodeManager.class);
}
@Test
public void test(){
//serverNodeManager.getWorkerGroupNodes()

14
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java

@ -44,18 +44,22 @@ import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
/**
* abstract registry client
* registry client singleton
*/
@Service
public class RegistryClient extends RegistryCenter {
private static final Logger logger = LoggerFactory.getLogger(RegistryClient.class);
private void loadRegistry() {
init();
private static RegistryClient registryClient = new RegistryClient();
private RegistryClient() {
super.init();
}
public static RegistryClient getInstance() {
return registryClient;
}
/**

34
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryClientTest.java

@ -21,43 +21,37 @@ import static org.apache.dolphinscheduler.common.Constants.ADD_OP;
import static org.apache.dolphinscheduler.common.Constants.DELETE_OP;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.spi.register.Registry;
import java.util.Arrays;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import com.google.common.collect.Sets;
@RunWith(MockitoJUnitRunner.Silent.class)
@RunWith(PowerMockRunner.class)
@PrepareForTest({ RegistryClient.class })
public class RegistryClientTest {
@InjectMocks
private RegistryClient registryClient;
@Mock
private Registry registry;
@Before
public void before() {
// registry=mock(Registry.class);
}
@Test
public void te() throws Exception {
doNothing().when(registry).persist(Mockito.anyString(), Mockito.anyString());
doNothing().when(registry).update(Mockito.anyString(), Mockito.anyString());
given(registry.releaseLock(Mockito.anyString())).willReturn(true);
given(registry.getChildren("/dead-servers")).willReturn(Arrays.asList("worker_127.0.0.1:8089"));
public void test() throws Exception {
Registry registry = PowerMockito.mock(Registry.class);
PowerMockito.doNothing().when(registry).persist(Mockito.anyString(), Mockito.anyString());
PowerMockito.doNothing().when(registry).update(Mockito.anyString(), Mockito.anyString());
PowerMockito.when(registry.releaseLock(Mockito.anyString())).thenReturn(true);
PowerMockito.when(registry.getChildren("/dead-servers")).thenReturn(Arrays.asList("worker_127.0.0.1:8089"));
PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
registryClient = PowerMockito.mock(RegistryClient.class);
registryClient.persist("/key", "");
registryClient.update("/key", "");
registryClient.releaseLock("/key");

Loading…
Cancel
Save