diff --git a/dolphinscheduler-standalone-server/src/main/resources/registry.properties b/.github/CODEOWNERS similarity index 74% rename from dolphinscheduler-standalone-server/src/main/resources/registry.properties rename to .github/CODEOWNERS index 3c33799075..9def4bee11 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/registry.properties +++ b/.github/CODEOWNERS @@ -15,8 +15,6 @@ # limitations under the License. # -# This file is only to override the production configurations in standalone server. - -registry.plugin.dir=./dolphinscheduler-dist/target/dolphinscheduler-dist-2.0.0-SNAPSHOT/lib/plugin/registry/zookeeper -registry.plugin.name=zookeeper -registry.servers=127.0.0.1:2181 +dolphinscheduler/dolphinscheduler-e2e @kezhenxu94 +dolphinscheduler/dolphinscheduler-registry @kezhenxu94 +dolphinscheduler/dolphinscheduler-standalone-server @kezhenxu94 diff --git a/docker/build/conf/dolphinscheduler/registry.properties.tpl b/docker/build/conf/dolphinscheduler/registry.properties.tpl index 40836f5a12..9ee8add211 100644 --- a/docker/build/conf/dolphinscheduler/registry.properties.tpl +++ b/docker/build/conf/dolphinscheduler/registry.properties.tpl @@ -15,13 +15,5 @@ # limitations under the License. # -#registry.plugin.dir config the Registry Plugin dir. -registry.plugin.dir=${REGISTRY_PLUGIN_DIR} - registry.plugin.name=${REGISTRY_PLUGIN_NAME} registry.servers=${REGISTRY_SERVERS} - -#maven.local.repository=/usr/local/localRepository - -#registry.plugin.binding config the Registry Plugin need be load when development and run in IDE -#registry.plugin.binding=./dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml diff --git a/docker/build/startup-init-conf.sh b/docker/build/startup-init-conf.sh index 35f70bbdf0..47178341c0 100755 --- a/docker/build/startup-init-conf.sh +++ b/docker/build/startup-init-conf.sh @@ -37,7 +37,6 @@ export DATABASE_PARAMS=${DATABASE_PARAMS:-"characterEncoding=utf8"} #============================================================================ # Registry #============================================================================ -export REGISTRY_PLUGIN_DIR=${REGISTRY_PLUGIN_DIR:-"lib/plugin/registry"} export REGISTRY_PLUGIN_NAME=${REGISTRY_PLUGIN_NAME:-"zookeeper"} export REGISTRY_SERVERS=${REGISTRY_SERVERS:-"127.0.0.1:2181"} diff --git a/docker/docker-swarm/config.env.sh b/docker/docker-swarm/config.env.sh index afc09b0f11..0c79a045a2 100755 --- a/docker/docker-swarm/config.env.sh +++ b/docker/docker-swarm/config.env.sh @@ -39,7 +39,6 @@ DATABASE_PARAMS=characterEncoding=utf8 #============================================================================ # Registry #============================================================================ -REGISTRY_PLUGIN_DIR=lib/plugin/registry REGISTRY_PLUGIN_NAME=zookeeper REGISTRY_SERVERS=dolphinscheduler-zookeeper:2181 diff --git a/docker/docker-swarm/docker-compose.yml b/docker/docker-swarm/docker-compose.yml index 3f63e79074..2f67ae8e37 100644 --- a/docker/docker-swarm/docker-compose.yml +++ b/docker/docker-swarm/docker-compose.yml @@ -140,4 +140,4 @@ volumes: dolphinscheduler-worker-data: dolphinscheduler-logs: dolphinscheduler-shared-local: - dolphinscheduler-resource-local: \ No newline at end of file + dolphinscheduler-resource-local: diff --git a/docker/kubernetes/dolphinscheduler/templates/_helpers.tpl b/docker/kubernetes/dolphinscheduler/templates/_helpers.tpl index 2b0786fb31..5ef83a72d9 100644 --- a/docker/kubernetes/dolphinscheduler/templates/_helpers.tpl +++ b/docker/kubernetes/dolphinscheduler/templates/_helpers.tpl @@ -166,12 +166,6 @@ Create a database environment variables. Create a registry environment variables. */}} {{- define "dolphinscheduler.registry.env_vars" -}} -- name: REGISTRY_PLUGIN_DIR - {{- if .Values.zookeeper.enabled }} - value: "lib/plugin/registry" - {{- else }} - value: {{ .Values.externalRegistry.registryPluginDir }} - {{- end }} - name: REGISTRY_PLUGIN_NAME {{- if .Values.zookeeper.enabled }} value: "zookeeper" @@ -239,4 +233,4 @@ Create a fsFileResourcePersistence volumeMount. - mountPath: {{ default "/dolphinscheduler" .Values.common.configmap.RESOURCE_UPLOAD_PATH | quote }} name: {{ include "dolphinscheduler.fullname" . }}-fs-file {{- end -}} -{{- end -}} \ No newline at end of file +{{- end -}} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java index cb3b0b2716..ccec49b39a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java @@ -19,14 +19,14 @@ package org.apache.dolphinscheduler.api.service.impl; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.MonitorService; -import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.WorkerServerModel; import org.apache.dolphinscheduler.dao.MonitorDBDao; import org.apache.dolphinscheduler.dao.entity.MonitorRecord; import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord; +import org.apache.dolphinscheduler.service.registry.RegistryClient; import java.util.HashMap; import java.util.List; @@ -48,6 +48,9 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic @Autowired private MonitorDBDao monitorDBDao; + @Autowired + private RegistryClient registryClient; + /** * query database state * @@ -55,7 +58,7 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic * @return data base state */ @Override - public Map queryDatabaseState(User loginUser) { + public Map queryDatabaseState(User loginUser) { Map result = new HashMap<>(); List monitorRecordList = monitorDBDao.queryDatabaseState(); @@ -74,13 +77,13 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic * @return master information list */ @Override - public Map queryMaster(User loginUser) { + public Map queryMaster(User loginUser) { Map result = new HashMap<>(); List masterServers = getServerListFromRegistry(true); result.put(Constants.DATA_LIST, masterServers); - putMsg(result,Status.SUCCESS); + putMsg(result, Status.SUCCESS); return result; } @@ -92,12 +95,10 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic * @return zookeeper information list */ @Override - public Map queryZookeeperState(User loginUser) { + public Map queryZookeeperState(User loginUser) { Map result = new HashMap<>(); - List zookeeperRecordList = RegistryCenterUtils.zookeeperInfoList(); - - result.put(Constants.DATA_LIST, zookeeperRecordList); + result.put(Constants.DATA_LIST, null); putMsg(result, Status.SUCCESS); return result; @@ -111,46 +112,48 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic * @return worker information list */ @Override - public Map queryWorker(User loginUser) { + public Map queryWorker(User loginUser) { Map result = new HashMap<>(); List workerServers = getServerListFromRegistry(false) - .stream() - .map((Server server) -> { - WorkerServerModel model = new WorkerServerModel(); - model.setId(server.getId()); - model.setHost(server.getHost()); - model.setPort(server.getPort()); - model.setZkDirectories(Sets.newHashSet(server.getZkDirectory())); - model.setResInfo(server.getResInfo()); - model.setCreateTime(server.getCreateTime()); - model.setLastHeartbeatTime(server.getLastHeartbeatTime()); - return model; - }) - .collect(Collectors.toList()); + .stream() + .map((Server server) -> { + WorkerServerModel model = new WorkerServerModel(); + model.setId(server.getId()); + model.setHost(server.getHost()); + model.setPort(server.getPort()); + model.setZkDirectories(Sets.newHashSet(server.getZkDirectory())); + model.setResInfo(server.getResInfo()); + model.setCreateTime(server.getCreateTime()); + model.setLastHeartbeatTime(server.getLastHeartbeatTime()); + return model; + }) + .collect(Collectors.toList()); Map workerHostPortServerMapping = workerServers - .stream() - .collect(Collectors.toMap( - (WorkerServerModel worker) -> { - String[] s = worker.getZkDirectories().iterator().next().split("/"); - return s[s.length - 1]; - } - , Function.identity() - , (WorkerServerModel oldOne, WorkerServerModel newOne) -> { - oldOne.getZkDirectories().addAll(newOne.getZkDirectories()); - return oldOne; - })); + .stream() + .collect(Collectors.toMap( + (WorkerServerModel worker) -> { + String[] s = worker.getZkDirectories().iterator().next().split("/"); + return s[s.length - 1]; + } + , Function.identity() + , (WorkerServerModel oldOne, WorkerServerModel newOne) -> { + oldOne.getZkDirectories().addAll(newOne.getZkDirectories()); + return oldOne; + })); result.put(Constants.DATA_LIST, workerHostPortServerMapping.values()); - putMsg(result,Status.SUCCESS); + putMsg(result, Status.SUCCESS); return result; } @Override public List getServerListFromRegistry(boolean isMaster) { - return isMaster ? RegistryCenterUtils.getMasterServers() : RegistryCenterUtils.getWorkerServers(); + return isMaster + ? registryClient.getServerList(NodeType.MASTER) + : registryClient.getServerList(NodeType.WORKER); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index 379af645df..06e6958bc1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.api.service.impl; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.WorkerGroupService; import org.apache.dolphinscheduler.api.utils.PageInfo; -import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.NodeType; @@ -31,14 +30,17 @@ import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; +import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.commons.lang.StringUtils; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -58,10 +60,13 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro private static final Logger logger = LoggerFactory.getLogger(WorkerGroupServiceImpl.class); @Autowired - WorkerGroupMapper workerGroupMapper; + private WorkerGroupMapper workerGroupMapper; @Autowired - ProcessInstanceMapper processInstanceMapper; + private ProcessInstanceMapper processInstanceMapper; + + @Autowired + private RegistryClient registryClient; /** * create or update a worker group @@ -139,7 +144,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro } // check zookeeper String workerGroupPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH + workerGroup.getName(); - return RegistryCenterUtils.isNodeExisted(workerGroupPath); + return registryClient.exists(workerGroupPath); } /** @@ -149,7 +154,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro * @return boolean */ private String checkWorkerGroupAddrList(WorkerGroup workerGroup) { - Map serverMaps = RegistryCenterUtils.getServerMaps(NodeType.WORKER, true); + Map serverMaps = registryClient.getServerMaps(NodeType.WORKER, true); if (Strings.isNullOrEmpty(workerGroup.getAddrList())) { return null; } @@ -250,11 +255,11 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro List workerGroups = workerGroupMapper.queryAllWorkerGroup(); // worker groups from zookeeper String workerPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; - List workerGroupList = null; + Collection workerGroupList = null; try { - workerGroupList = RegistryCenterUtils.getChildrenNodes(workerPath); + workerGroupList = registryClient.getChildrenKeys(workerPath); } catch (Exception e) { - logger.error("getWorkerGroups exception: {}, workerPath: {}, isPaging: {}", e.getMessage(), workerPath, isPaging); + logger.error("getWorkerGroups exception, workerPath: {}, isPaging: {}", workerPath, isPaging, e); } if (CollectionUtils.isEmpty(workerGroupList)) { @@ -268,9 +273,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro for (String workerGroup : workerGroupList) { String workerGroupPath = workerPath + Constants.SINGLE_SLASH + workerGroup; - List childrenNodes = null; + Collection childrenNodes = null; try { - childrenNodes = RegistryCenterUtils.getChildrenNodes(workerGroupPath); + childrenNodes = registryClient.getChildrenKeys(workerGroupPath); } catch (Exception e) { logger.error("getChildrenNodes exception: {}, workerGroupPath: {}", e.getMessage(), workerGroupPath); } @@ -281,7 +286,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro wg.setName(workerGroup); if (isPaging) { wg.setAddrList(String.join(Constants.COMMA, childrenNodes)); - String registeredValue = RegistryCenterUtils.getNodeData(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.get(0)); + String registeredValue = registryClient.get(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.iterator().next()); wg.setCreateTime(DateUtils.stringToDate(registeredValue.split(Constants.COMMA)[6])); wg.setUpdateTime(DateUtils.stringToDate(registeredValue.split(Constants.COMMA)[7])); wg.setSystemDefault(true); @@ -328,7 +333,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro @Override public Map getWorkerAddressList() { Map result = new HashMap<>(); - List serverNodeList = RegistryCenterUtils.getServerNodeList(NodeType.WORKER, true); + Set serverNodeList = registryClient.getServerNodeSet(NodeType.WORKER, true); result.put(Constants.DATA_LIST, serverNodeList); putMsg(result, Status.SUCCESS); return result; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtils.java deleted file mode 100644 index 71e0456583..0000000000 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtils.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.api.utils; - -import org.apache.dolphinscheduler.common.enums.NodeType; -import org.apache.dolphinscheduler.common.model.Server; -import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord; -import org.apache.dolphinscheduler.service.registry.RegistryClient; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * monitor zookeeper info todo registry-spi - * fixme Some of the information obtained in the api belongs to the unique information of zk. - * I am not sure whether there is a good abstraction method. This is related to whether the specific plug-in is provided. - */ -public class RegistryCenterUtils { - - private static RegistryClient registryClient = RegistryClient.getInstance(); - - /** - * @return zookeeper info list - */ - public static List zookeeperInfoList() { - return null; - } - - /** - * get master servers - * - * @return master server information - */ - public static List getMasterServers() { - return registryClient.getServerList(NodeType.MASTER); - } - - /** - * master construct is the same with worker, use the master instead - * - * @return worker server informations - */ - public static List getWorkerServers() { - return registryClient.getServerList(NodeType.WORKER); - } - - public static Map getServerMaps(NodeType nodeType, boolean hostOnly) { - return registryClient.getServerMaps(nodeType, hostOnly); - } - - public static List getServerNodeList(NodeType nodeType, boolean hostOnly) { - return registryClient.getServerNodeList(nodeType, hostOnly); - } - - public static boolean isNodeExisted(String key) { - return registryClient.isExisted(key); - } - - public static List getChildrenNodes(final String key) { - return registryClient.getChildrenKeys(key); - } - - public static String getNodeData(String key) { - return registryClient.get(key); - } -} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/AbstractControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/AbstractControllerTest.java index 41a1cc351e..dcc9da32dc 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/AbstractControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/AbstractControllerTest.java @@ -20,11 +20,9 @@ package org.apache.dolphinscheduler.api.controller; import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.SessionService; -import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.commons.lang.StringUtils; @@ -36,11 +34,6 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.runner.RunWith; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.powermock.modules.junit4.PowerMockRunnerDelegate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @@ -51,11 +44,8 @@ import org.springframework.web.context.WebApplicationContext; /** * abstract controller test */ -@RunWith(PowerMockRunner.class) -@PowerMockRunnerDelegate(SpringRunner.class) +@RunWith(SpringRunner.class) @SpringBootTest(classes = ApiApplicationServer.class) -@PrepareForTest({ RegistryCenterUtils.class, RegistryClient.class }) -@PowerMockIgnore({"javax.management.*"}) public class AbstractControllerTest { public static final String SESSION_ID = "sessionId"; @@ -74,9 +64,6 @@ public class AbstractControllerTest { @Before public void setUp() { - PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class)); - PowerMockito.mockStatic(RegistryCenterUtils.class); - mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build(); createSession(); @@ -98,7 +85,7 @@ public class AbstractControllerTest { String session = sessionService.createSession(loginUser, "127.0.0.1"); sessionId = session; - Assert.assertTrue(!StringUtils.isEmpty(session)); + Assert.assertFalse(StringUtils.isEmpty(session)); } public Map success() { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java index 62022d8b40..a0a1772d6a 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java @@ -48,9 +48,6 @@ import org.springframework.test.web.servlet.MvcResult; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; -/** - * scheduler controller test - */ public class SchedulerControllerTest extends AbstractControllerTest { private static Logger logger = LoggerFactory.getLogger(SchedulerControllerTest.class); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java index 486707cf9f..a61e7d38ec 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java @@ -50,9 +50,6 @@ import org.springframework.test.web.servlet.MvcResult; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; -/** - * task instance controller test - */ public class TaskInstanceControllerTest extends AbstractControllerTest { @InjectMocks diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java index 14cd52fbe1..7f675a5b3e 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java @@ -37,9 +37,6 @@ import org.springframework.test.web.servlet.MvcResult; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; -/** - * tenant controller test - */ public class TenantControllerTest extends AbstractControllerTest { private static Logger logger = LoggerFactory.getLogger(TenantControllerTest.class); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java index f6e79bf770..873236f551 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java @@ -22,7 +22,6 @@ import static org.springframework.test.web.servlet.request.MockMvcRequestBuilder import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; -import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.NodeType; @@ -30,6 +29,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; +import org.apache.dolphinscheduler.service.registry.RegistryClient; import java.util.HashMap; import java.util.Map; @@ -59,12 +59,15 @@ public class WorkerGroupControllerTest extends AbstractControllerTest { @MockBean private ProcessInstanceMapper processInstanceMapper; + @MockBean + private RegistryClient registryClient; + @Test public void testSaveWorkerGroup() throws Exception { Map serverMaps = new HashMap<>(); serverMaps.put("192.168.0.1", "192.168.0.1"); serverMaps.put("192.168.0.2", "192.168.0.2"); - PowerMockito.when(RegistryCenterUtils.getServerMaps(NodeType.WORKER, true)).thenReturn(serverMaps); + PowerMockito.when(registryClient.getServerMaps(NodeType.WORKER, true)).thenReturn(serverMaps); MultiValueMap paramsMap = new LinkedMultiValueMap<>(); paramsMap.add("name","cxc_work_group"); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java index 0c7683554a..1dfcc64c41 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.api.service; +import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.impl.WorkerGroupServiceImpl; import org.apache.dolphinscheduler.common.Constants; @@ -33,105 +34,32 @@ import java.util.List; import java.util.Map; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -/** - * worker group service test - */ -@RunWith(PowerMockRunner.class) -@PrepareForTest({ RegistryClient.class }) -@PowerMockIgnore({"javax.management.*"}) +@RunWith(SpringJUnit4ClassRunner.class) +@SpringBootTest(classes = ApiApplicationServer.class) public class WorkerGroupServiceTest { + @MockBean + private RegistryClient registryClient; - @InjectMocks + @Autowired private WorkerGroupServiceImpl workerGroupService; - @Mock + @MockBean private WorkerGroupMapper workerGroupMapper; - @Mock + @MockBean private ProcessInstanceMapper processInstanceMapper; - private String groupName = "groupName000001"; - /* @Before - public void init() { - ZookeeperConfig zookeeperConfig = new ZookeeperConfig(); - zookeeperConfig.setDsRoot("/dolphinscheduler_qzw"); - Mockito.when(zookeeperCachedOperator.getZookeeperConfig()).thenReturn(zookeeperConfig); - - String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS; - - List workerGroupStrList = new ArrayList<>(); - workerGroupStrList.add("default"); - workerGroupStrList.add("test"); - Mockito.when(zookeeperCachedOperator.getChildrenNodes(workerPath)).thenReturn(workerGroupStrList); - - List defaultAddressList = new ArrayList<>(); - defaultAddressList.add("192.168.220.188:1234"); - defaultAddressList.add("192.168.220.189:1234"); - - Mockito.when(zookeeperCachedOperator.getChildrenNodes(workerPath + "/default")).thenReturn(defaultAddressList); - - Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/" + defaultAddressList.get(0))).thenReturn("0.01,0.17,0.03,25.83,8.0,1.0,2020-07-21 11:17:59,2020-07-21 14:39:20,0,13238"); - } - -*//** - * create or update a worker group - *//* - @Test - public void testSaveWorkerGroup() { - // worker server maps - Map serverMaps = new HashMap<>(); - serverMaps.put("127.0.0.1:1234", "0.3,0.07,4.4,7.42,16.0,0.3,2021-03-19 20:17:58,2021-03-19 20:25:29,0,79214"); - Mockito.when(zookeeperMonitor.getServerMaps(ZKNodeType.WORKER, true)).thenReturn(serverMaps); - - User user = new User(); - // general user add - user.setUserType(UserType.GENERAL_USER); - Map result = workerGroupService.saveWorkerGroup(user, 0, groupName, "127.0.0.1:1234"); - Assert.assertEquals(Status.USER_NO_OPERATION_PERM.getMsg(), result.get(Constants.MSG)); - - // success - user.setUserType(UserType.ADMIN_USER); - result = workerGroupService.saveWorkerGroup(user, 0, groupName, "127.0.0.1:1234"); - Assert.assertEquals(Status.SUCCESS.getMsg(), result.get(Constants.MSG)); - // group name exist - Mockito.when(workerGroupMapper.selectById(2)).thenReturn(getWorkerGroup(2)); - Mockito.when(workerGroupMapper.queryWorkerGroupByName(groupName)).thenReturn(getList()); - result = workerGroupService.saveWorkerGroup(user, 2, groupName, "127.0.0.1:1234"); - Assert.assertEquals(Status.NAME_EXIST, result.get(Constants.STATUS)); - }*/ - - /** - * query worker group paging - */ - /* @Test - public void testQueryAllGroupPaging() { - User user = new User(); - // general user add - user.setUserType(UserType.ADMIN_USER); - Map result = workerGroupService.queryAllGroupPaging(user, 1, 10, null); - PageInfo pageInfo = (PageInfo) result.get(Constants.DATA_LIST); - Assert.assertEquals(pageInfo.getLists().size(), 1); - }*/ - - @Before - public void before() { - PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class)); - } - @Test public void testQueryAllGroup() { Map result = workerGroupService.queryAllGroup(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtilsTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtilsTest.java deleted file mode 100644 index b8b49459ea..0000000000 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtilsTest.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.api.utils; - -import org.apache.dolphinscheduler.common.model.Server; - -import java.util.List; - -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; - -/** - * zookeeper monitor utils test - */ -@Ignore -public class RegistryCenterUtilsTest { - - @Test - public void testGetMasterList(){ - List masterServerList = RegistryCenterUtils.getMasterServers(); - List workerServerList = RegistryCenterUtils.getWorkerServers(); - - Assert.assertTrue(masterServerList.size() >= 0); - Assert.assertTrue(workerServerList.size() >= 0); - } - -} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index fa9a490827..410070ed31 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -90,8 +90,6 @@ public final class Constants { public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters"; public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers"; public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters"; - public static final String REGISTRY_PLUGIN_BINDING = "registry.plugin.binding"; - public static final String REGISTRY_PLUGIN_DIR = "registry.plugin.dir"; public static final String REGISTRY_SERVERS = "registry.servers"; /** diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/NodeType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/NodeType.java index acc3c02378..cb247cf2f4 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/NodeType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/NodeType.java @@ -16,15 +16,6 @@ */ package org.apache.dolphinscheduler.common.enums; -/** - * zk node type - */ public enum NodeType { - - /** - * 0 master node; - * 1 worker node; - * 2 dead_server node; - */ - MASTER, WORKER, DEAD_SERVER; + MASTER, WORKER, DEAD_SERVER } diff --git a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml index 9ecd9efbd0..d7119d009b 100644 --- a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml +++ b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml @@ -75,11 +75,6 @@ - - - - - diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java b/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java deleted file mode 100644 index cda98ef0da..0000000000 --- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.plugin.registry.zookeeper; - -import org.apache.dolphinscheduler.spi.register.RegistryConnectListener; -import org.apache.dolphinscheduler.spi.register.RegistryConnectState; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ZookeeperConnectionStateListener implements ConnectionStateListener { - - private static final Logger logger = LoggerFactory.getLogger(ZookeeperConnectionStateListener.class); - - private RegistryConnectListener registryConnectListener; - - public ZookeeperConnectionStateListener(RegistryConnectListener registryConnectListener) { - this.registryConnectListener = registryConnectListener; - } - - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) { - - if (newState == ConnectionState.LOST) { - logger.error("connection lost from zookeeper"); - registryConnectListener.notify(RegistryConnectState.LOST); - } else if (newState == ConnectionState.RECONNECTED) { - logger.info("reconnected to zookeeper"); - registryConnectListener.notify(RegistryConnectState.RECONNECTED); - } else if (newState == ConnectionState.SUSPENDED) { - logger.warn("zookeeper connection SUSPENDED"); - registryConnectListener.notify(RegistryConnectState.SUSPENDED); - } - - } - -} diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryPlugin.java b/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryPlugin.java deleted file mode 100644 index 85723ada09..0000000000 --- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryPlugin.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.plugin.registry.zookeeper; - -import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin; -import org.apache.dolphinscheduler.spi.register.RegistryFactory; - -import com.google.common.collect.ImmutableList; - -/** - * zookeeper registry plugin - */ -public class ZookeeperRegistryPlugin implements DolphinSchedulerPlugin { - - @Override - public Iterable getRegisterFactorys() { - return ImmutableList.of(new ZookeeperRegistryFactory()); - } -} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-api/pom.xml new file mode 100644 index 0000000000..103271036a --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/pom.xml @@ -0,0 +1,32 @@ + + + + + + dolphinscheduler-registry + org.apache.dolphinscheduler + 2.0.0-SNAPSHOT + + 4.0.0 + + dolphinscheduler-registry-api + diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionListener.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionListener.java new file mode 100644 index 0000000000..eaebc81482 --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.dolphinscheduler.registry.api; + +@FunctionalInterface +public interface ConnectionListener { + void onUpdate(ConnectionState newState); +} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionState.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionState.java new file mode 100644 index 0000000000..fef3bcab79 --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionState.java @@ -0,0 +1,27 @@ +/* + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.dolphinscheduler.registry.api; + +public enum ConnectionState { + CONNECTED, + RECONNECTED, + SUSPENDED, + DISCONNECTED +} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Event.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Event.java new file mode 100644 index 0000000000..be781db738 --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Event.java @@ -0,0 +1,48 @@ +/* + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.dolphinscheduler.registry.api; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import lombok.experimental.Accessors; + +@Getter +@Setter +@Builder +@ToString +@NoArgsConstructor +@AllArgsConstructor +@Accessors(fluent = true) +public class Event { + private String key; + private String path; + private String data; + private Type type; + + public enum Type { + ADD, + REMOVE, + UPDATE + } +} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java new file mode 100644 index 0000000000..6057a7e2e4 --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java @@ -0,0 +1,48 @@ +/* + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.dolphinscheduler.registry.api; + +import java.io.Closeable; +import java.util.Collection; +import java.util.Map; + +public interface Registry extends Closeable { + void start(Map config); + + boolean subscribe(String path, SubscribeListener listener); + + void unsubscribe(String path); + + void addConnectionStateListener(ConnectionListener listener); + + String get(String key); + + void put(String key, String value, boolean deleteOnDisconnect); + + void delete(String key); + + Collection children(String key); + + boolean exists(String key); + + boolean acquireLock(String key); + + boolean releaseLock(String key); +} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryException.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryException.java new file mode 100644 index 0000000000..b88fe2540f --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryException.java @@ -0,0 +1,31 @@ +/* + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.dolphinscheduler.registry.api; + +public final class RegistryException extends RuntimeException { + + public RegistryException(String message, Throwable cause) { + super(message, cause); + } + + public RegistryException(String message) { + super(message); + } +} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryFactory.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryFactory.java new file mode 100644 index 0000000000..6903e95bbd --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryFactory.java @@ -0,0 +1,26 @@ +/* + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.dolphinscheduler.registry.api; + +public interface RegistryFactory { + String name(); + + Registry create(); +} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryFactoryLoader.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryFactoryLoader.java new file mode 100644 index 0000000000..45d4596e28 --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryFactoryLoader.java @@ -0,0 +1,35 @@ +/* + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.dolphinscheduler.registry.api; + +import static java.util.stream.Collectors.toMap; + +import java.util.Map; +import java.util.ServiceLoader; +import java.util.function.Function; +import java.util.stream.StreamSupport; + +public final class RegistryFactoryLoader { + public static Map load() { + final ServiceLoader factories = ServiceLoader.load(RegistryFactory.class); + return StreamSupport.stream(factories.spliterator(), false) + .collect(toMap(RegistryFactory::name, Function.identity())); + } +} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/SubscribeListener.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/SubscribeListener.java new file mode 100644 index 0000000000..2432eb1e25 --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/SubscribeListener.java @@ -0,0 +1,24 @@ +/* + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.dolphinscheduler.registry.api; + +public interface SubscribeListener { + void notify(Event event); +} diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/pom.xml similarity index 88% rename from dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/pom.xml index 5ad7ee934e..42690b9367 100644 --- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/pom.xml @@ -19,18 +19,19 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - dolphinscheduler-registry-plugin + dolphinscheduler-registry-plugins org.apache.dolphinscheduler 2.0.0-SNAPSHOT 4.0.0 - dolphinscheduler-registry-zookeeper - - dolphinscheduler-plugin + + org.apache.dolphinscheduler + dolphinscheduler-registry-api + org.apache.zookeeper @@ -57,7 +58,6 @@ slf4j-api - org.apache.curator curator-test @@ -76,11 +76,5 @@ runtime test - - - - dolphinscheduler-registry-zookeeper-${project.version} - - - \ No newline at end of file + diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConfiguration.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConfiguration.java similarity index 100% rename from dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConfiguration.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConfiguration.java diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java new file mode 100644 index 0000000000..9526e0e060 --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.registry.zookeeper; + +import org.apache.dolphinscheduler.registry.api.ConnectionListener; +import org.apache.dolphinscheduler.registry.api.ConnectionState; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.ConnectionStateListener; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +public final class ZookeeperConnectionStateListener implements ConnectionStateListener { + private final ConnectionListener listener; + + @Override + public void stateChanged(CuratorFramework client, + org.apache.curator.framework.state.ConnectionState newState) { + switch (newState) { + case LOST: + log.warn("Registry disconnected"); + listener.onUpdate(ConnectionState.DISCONNECTED); + break; + case RECONNECTED: + log.info("Registry reconnected"); + listener.onUpdate(ConnectionState.RECONNECTED); + break; + case SUSPENDED: + log.warn("Registry suspended"); + listener.onUpdate(ConnectionState.SUSPENDED); + break; + default: + break; + } + } +} diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java similarity index 53% rename from dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java index e84666a416..89cb28006d 100644 --- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java @@ -28,21 +28,19 @@ import static org.apache.dolphinscheduler.plugin.registry.zookeeper.ZookeeperCon import static java.util.concurrent.TimeUnit.MILLISECONDS; -import org.apache.dolphinscheduler.spi.register.DataChangeEvent; -import org.apache.dolphinscheduler.spi.register.ListenerManager; -import org.apache.dolphinscheduler.spi.register.Registry; -import org.apache.dolphinscheduler.spi.register.RegistryConnectListener; -import org.apache.dolphinscheduler.spi.register.RegistryException; -import org.apache.dolphinscheduler.spi.register.SubscribeListener; +import org.apache.dolphinscheduler.registry.api.ConnectionListener; +import org.apache.dolphinscheduler.registry.api.Event; +import org.apache.dolphinscheduler.registry.api.Registry; +import org.apache.dolphinscheduler.registry.api.RegistryException; +import org.apache.dolphinscheduler.registry.api.SubscribeListener; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; -import org.apache.curator.framework.api.transaction.TransactionOp; +import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; @@ -56,28 +54,18 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import com.google.common.base.Strings; -public class ZookeeperRegistry implements Registry { +public final class ZookeeperRegistry implements Registry { private CuratorFramework client; - /** - * treeCache map - * k-subscribe key - * v-listener - */ - private Map treeCacheMap = new HashMap<>(); - - /** - * Distributed lock map - */ - private ThreadLocal> threadLocalLockMap = new ThreadLocal<>(); - - /** - * build retry policy - */ + private final Map treeCacheMap = new ConcurrentHashMap<>(); + + private static final ThreadLocal> threadLocalLockMap = new ThreadLocal<>(); + private static RetryPolicy buildRetryPolicy(Map registerData) { int baseSleepTimeMs = BASE_SLEEP_TIME.getParameterValue(registerData.get(BASE_SLEEP_TIME.getName())); int maxRetries = MAX_RETRIES.getParameterValue(registerData.get(MAX_RETRIES.getName())); @@ -85,35 +73,32 @@ public class ZookeeperRegistry implements Registry { return new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries, maxSleepMs); } - /** - * build digest - */ private static void buildDigest(CuratorFrameworkFactory.Builder builder, String digest) { builder.authorization(DIGEST.getName(), digest.getBytes(StandardCharsets.UTF_8)) - .aclProvider(new ACLProvider() { - @Override - public List getDefaultAcl() { - return ZooDefs.Ids.CREATOR_ALL_ACL; - } - - @Override - public List getAclForPath(final String path) { - return ZooDefs.Ids.CREATOR_ALL_ACL; - } - }); + .aclProvider(new ACLProvider() { + @Override + public List getDefaultAcl() { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + + @Override + public List getAclForPath(final String path) { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + }); } @Override - public void init(Map registerData) { - - CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() - .connectString(SERVERS.getParameterValue(registerData.get(SERVERS.getName()))) - .retryPolicy(buildRetryPolicy(registerData)) - .namespace(NAME_SPACE.getParameterValue(registerData.get(NAME_SPACE.getName()))) - .sessionTimeoutMs(SESSION_TIMEOUT_MS.getParameterValue(registerData.get(SESSION_TIMEOUT_MS.getName()))) - .connectionTimeoutMs(CONNECTION_TIMEOUT_MS.getParameterValue(registerData.get(CONNECTION_TIMEOUT_MS.getName()))); - - String digest = DIGEST.getParameterValue(registerData.get(DIGEST.getName())); + public void start(Map config) { + CuratorFrameworkFactory.Builder builder = + CuratorFrameworkFactory.builder() + .connectString(SERVERS.getParameterValue(config.get(SERVERS.getName()))) + .retryPolicy(buildRetryPolicy(config)) + .namespace(NAME_SPACE.getParameterValue(config.get(NAME_SPACE.getName()))) + .sessionTimeoutMs(SESSION_TIMEOUT_MS.getParameterValue(config.get(SESSION_TIMEOUT_MS.getName()))) + .connectionTimeoutMs(CONNECTION_TIMEOUT_MS.getParameterValue(config.get(CONNECTION_TIMEOUT_MS.getName()))); + + String digest = DIGEST.getParameterValue(config.get(DIGEST.getName())); if (!Strings.isNullOrEmpty(digest)) { buildDigest(builder, digest); } @@ -121,7 +106,7 @@ public class ZookeeperRegistry implements Registry { client.start(); try { - if (!client.blockUntilConnected(BLOCK_UNTIL_CONNECTED_WAIT_MS.getParameterValue(registerData.get(BLOCK_UNTIL_CONNECTED_WAIT_MS.getName())), MILLISECONDS)) { + if (!client.blockUntilConnected(BLOCK_UNTIL_CONNECTED_WAIT_MS.getParameterValue(config.get(BLOCK_UNTIL_CONNECTED_WAIT_MS.getName())), MILLISECONDS)) { client.close(); throw new RegistryException("zookeeper connect timeout"); } @@ -132,55 +117,26 @@ public class ZookeeperRegistry implements Registry { } @Override - public void addConnectionStateListener(RegistryConnectListener registryConnectListener) { - client.getConnectionStateListenable().addListener(new ZookeeperConnectionStateListener(registryConnectListener)); + public void addConnectionStateListener(ConnectionListener listener) { + client.getConnectionStateListenable().addListener(new ZookeeperConnectionStateListener(listener)); } @Override - public boolean subscribe(String path, SubscribeListener subscribeListener) { - if (null != treeCacheMap.get(path)) { - return false; - } - TreeCache treeCache = new TreeCache(client, path); - TreeCacheListener treeCacheListener = (client, event) -> { - TreeCacheEvent.Type type = event.getType(); - DataChangeEvent eventType = null; - String dataPath = null; - switch (type) { - case NODE_ADDED: - dataPath = event.getData().getPath(); - eventType = DataChangeEvent.ADD; - break; - case NODE_UPDATED: - eventType = DataChangeEvent.UPDATE; - dataPath = event.getData().getPath(); - break; - case NODE_REMOVED: - eventType = DataChangeEvent.REMOVE; - dataPath = event.getData().getPath(); - break; - default: - } - if (null != eventType && null != dataPath) { - ListenerManager.dataChange(path, dataPath, new String(event.getData().getData()), eventType); - } - }; - treeCache.getListenable().addListener(treeCacheListener); - treeCacheMap.put(path, treeCache); + public boolean subscribe(String path, SubscribeListener listener) { + final TreeCache treeCache = treeCacheMap.computeIfAbsent(path, $ -> new TreeCache(client, path)); + treeCache.getListenable().addListener(($, event) -> listener.notify(new EventAdaptor(event, path))); try { treeCache.start(); } catch (Exception e) { - throw new RegistryException("start zookeeper tree cache error", e); + treeCacheMap.remove(path); + throw new RegistryException("Failed to subscribe listener for key: " + path, e); } - ListenerManager.addListener(path, subscribeListener); return true; } @Override public void unsubscribe(String path) { - TreeCache treeCache = treeCacheMap.get(path); - treeCache.close(); - ListenerManager.removeListener(path); + CloseableUtils.closeQuietly(treeCacheMap.get(path)); } @Override @@ -193,12 +149,7 @@ public class ZookeeperRegistry implements Registry { } @Override - public void remove(String key) { - delete(key); - } - - @Override - public boolean isExisted(String key) { + public boolean exists(String key) { try { return null != client.checkExists().forPath(key); } catch (Exception e) { @@ -207,47 +158,22 @@ public class ZookeeperRegistry implements Registry { } @Override - public void persist(String key, String value) { - try { - if (isExisted(key)) { - update(key, value); - return; - } - client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8)); + public void put(String key, String value, boolean deleteOnDisconnect) { + final CreateMode mode = deleteOnDisconnect ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT; - } catch (Exception e) { - throw new RegistryException("zookeeper persist error", e); - } - } - - @Override - public void persistEphemeral(String key, String value) { try { - if (isExisted(key)) { - update(key, value); - return; - } - client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); + client.create() + .orSetData() + .creatingParentsIfNeeded() + .withMode(mode) + .forPath(key, value.getBytes(StandardCharsets.UTF_8)); } catch (Exception e) { - throw new RegistryException("zookeeper persist ephemeral error", e); + throw new RegistryException("Failed to put registry key: " + key, e); } } @Override - public void update(String key, String value) { - try { - if (!isExisted(key)) { - return; - } - TransactionOp transactionOp = client.transactionOp(); - client.transaction().forOperations(transactionOp.check().forPath(key), transactionOp.setData().forPath(key, value.getBytes(StandardCharsets.UTF_8))); - } catch (Exception e) { - throw new RegistryException("zookeeper update error", e); - } - } - - @Override - public List getChildren(String key) { + public List children(String key) { try { List result = client.getChildren().forPath(key); result.sort(Comparator.reverseOrder()); @@ -258,23 +184,20 @@ public class ZookeeperRegistry implements Registry { } @Override - public boolean delete(String nodePath) { + public void delete(String nodePath) { try { client.delete() - .deletingChildrenIfNeeded() - .forPath(nodePath); - } catch (KeeperException.NoNodeException ignore) { - // the node is not exist, we can believe the node has been removed - + .deletingChildrenIfNeeded() + .forPath(nodePath); + } catch (KeeperException.NoNodeException ignored) { + // Is already deleted or does not exist } catch (Exception e) { - throw new RegistryException("zookeeper delete key error", e); + throw new RegistryException("Failed to delete registry key: " + nodePath, e); } - return true; } @Override public boolean acquireLock(String key) { - InterProcessMutex interProcessMutex = new InterProcessMutex(client, key); try { interProcessMutex.acquire(); @@ -291,7 +214,6 @@ public class ZookeeperRegistry implements Registry { throw new RegistryException("zookeeper release lock error", e); } } - } @Override @@ -311,22 +233,35 @@ public class ZookeeperRegistry implements Registry { return true; } - public CuratorFramework getClient() { - return client; - } - @Override public void close() { - treeCacheMap.forEach((key, value) -> value.close()); - waitForCacheClose(500); + treeCacheMap.values().forEach(CloseableUtils::closeQuietly); CloseableUtils.closeQuietly(client); } - private void waitForCacheClose(long millis) { - try { - Thread.sleep(millis); - } catch (final InterruptedException ex) { - Thread.currentThread().interrupt(); + static final class EventAdaptor extends Event { + public EventAdaptor(TreeCacheEvent event, String key) { + key(key); + + switch (event.getType()) { + case NODE_ADDED: + type(Type.ADD); + break; + case NODE_UPDATED: + type(Type.UPDATE); + break; + case NODE_REMOVED: + type(Type.REMOVE); + break; + default: + break; + } + + final ChildData data = event.getData(); + if (data != null) { + path(data.getPath()); + data(new String(data.getData())); + } } } } diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryFactory.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryFactory.java similarity index 76% rename from dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryFactory.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryFactory.java index 1ecf3e05b1..949df21bee 100644 --- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryFactory.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryFactory.java @@ -17,16 +17,16 @@ package org.apache.dolphinscheduler.plugin.registry.zookeeper; -import org.apache.dolphinscheduler.spi.register.Registry; -import org.apache.dolphinscheduler.spi.register.RegistryFactory; +import org.apache.dolphinscheduler.registry.api.Registry; +import org.apache.dolphinscheduler.registry.api.RegistryFactory; -/** - * Zookeeper registry factory - */ -public class ZookeeperRegistryFactory implements RegistryFactory { +import com.google.auto.service.AutoService; + +@AutoService(RegistryFactory.class) +public final class ZookeeperRegistryFactory implements RegistryFactory { @Override - public String getName() { + public String name() { return "zookeeper"; } diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java similarity index 87% rename from dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java index a5dc33b12e..8442c02c70 100644 --- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java @@ -17,8 +17,8 @@ package org.apache.dolphinscheduler.plugin.registry.zookeeper; -import org.apache.dolphinscheduler.spi.register.DataChangeEvent; -import org.apache.dolphinscheduler.spi.register.SubscribeListener; +import org.apache.dolphinscheduler.registry.api.Event; +import org.apache.dolphinscheduler.registry.api.SubscribeListener; import org.apache.curator.test.TestingServer; @@ -50,18 +50,18 @@ public class ZookeeperRegistryTest { server = new TestingServer(true); Map registryConfig = new HashMap<>(); registryConfig.put(ZookeeperConfiguration.SERVERS.getName(), server.getConnectString()); - registry.init(registryConfig); - registry.persist("/sub", ""); + registry.start(registryConfig); + registry.put("/sub", "", false); } @Test public void persistTest() { - registry.persist("/nodes/m1", ""); - registry.persist("/nodes/m2", ""); - Assert.assertEquals(Arrays.asList("m2", "m1"), registry.getChildren("/nodes")); - Assert.assertTrue(registry.isExisted("/nodes/m1")); + registry.put("/nodes/m1", "", false); + registry.put("/nodes/m2", "", false); + Assert.assertEquals(Arrays.asList("m2", "m1"), registry.children("/nodes")); + Assert.assertTrue(registry.exists("/nodes/m1")); registry.delete("/nodes/m2"); - Assert.assertFalse(registry.isExisted("/nodes/m2")); + Assert.assertFalse(registry.exists("/nodes/m2")); } @Test @@ -112,10 +112,9 @@ public class ZookeeperRegistryTest { } - class TestListener implements SubscribeListener { - + static class TestListener implements SubscribeListener { @Override - public void notify(String path, String data, DataChangeEvent dataChangeEvent) { + public void notify(Event event) { logger.info("I'm test listener"); } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml new file mode 100644 index 0000000000..4f3db56190 --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml @@ -0,0 +1,36 @@ + + + + + + dolphinscheduler-registry + org.apache.dolphinscheduler + 2.0.0-SNAPSHOT + + dolphinscheduler-registry-plugins + 4.0.0 + pom + + + dolphinscheduler-registry-zookeeper + + diff --git a/dolphinscheduler-registry-plugin/pom.xml b/dolphinscheduler-registry/pom.xml similarity index 74% rename from dolphinscheduler-registry-plugin/pom.xml rename to dolphinscheduler-registry/pom.xml index 2f8bb0d2d1..8ce7c243f2 100644 --- a/dolphinscheduler-registry-plugin/pom.xml +++ b/dolphinscheduler-registry/pom.xml @@ -24,20 +24,25 @@ 2.0.0-SNAPSHOT 4.0.0 - org.apache.dolphinscheduler - dolphinscheduler-registry-plugin + dolphinscheduler-registry pom + + dolphinscheduler-registry-api + dolphinscheduler-registry-plugins + + - - org.apache.dolphinscheduler - dolphinscheduler-spi + com.google.auto.service + auto-service + true + + + org.projectlombok + lombok + 1.18.22 provided - - - dolphinscheduler-registry-zookeeper - - \ No newline at end of file + diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index 7ef9432daa..6c47f84910 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.registry.api.ConnectionState; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; @@ -42,11 +43,10 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; import org.apache.dolphinscheduler.service.registry.RegistryClient; -import org.apache.dolphinscheduler.spi.register.RegistryConnectListener; -import org.apache.dolphinscheduler.spi.register.RegistryConnectState; import org.apache.commons.lang.StringUtils; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.concurrent.Executors; @@ -79,6 +79,7 @@ public class MasterRegistryClient { @Autowired private ProcessService processService; + @Autowired private RegistryClient registryClient; /** @@ -104,12 +105,11 @@ public class MasterRegistryClient { public void init() { this.startupTime = System.currentTimeMillis(); - this.registryClient = RegistryClient.getInstance(); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); } public void start() { - String nodeLock = registryClient.getMasterStartUpLockPath(); + String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS; try { // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/startup-masters @@ -117,7 +117,7 @@ public class MasterRegistryClient { // master registry registry(); String registryPath = getMasterPath(); - registryClient.handleDeadServer(registryPath, NodeType.MASTER, Constants.DELETE_OP); + registryClient.handleDeadServer(Collections.singleton(registryPath), NodeType.MASTER, Constants.DELETE_OP); // init system node @@ -143,7 +143,8 @@ public class MasterRegistryClient { } public void closeRegistry() { - unRegistry(); + // TODO unsubscribe MasterRegistryDataListener + deregister(); } /** @@ -167,7 +168,7 @@ public class MasterRegistryClient { return; } // handle dead server - registryClient.handleDeadServer(path, nodeType, Constants.ADD_OP); + registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP); } //failover server if (failover) { @@ -209,9 +210,9 @@ public class MasterRegistryClient { private String getFailoverLockPath(NodeType nodeType) { switch (nodeType) { case MASTER: - return registryClient.getMasterFailoverLockPath(); + return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS; case WORKER: - return registryClient.getWorkerFailoverLockPath(); + return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS; default: return ""; } @@ -289,20 +290,20 @@ public class MasterRegistryClient { ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); if (workerHost == null - || !checkOwner - || processInstance.getHost().equalsIgnoreCase(getLocalAddress())) { + || !checkOwner + || processInstance.getHost().equalsIgnoreCase(getLocalAddress())) { // only failover the task owned myself if worker down. if (processInstance == null) { logger.error("failover error, the process {} of task {} do not exists.", - taskInstance.getProcessInstanceId(), taskInstance.getId()); + taskInstance.getProcessInstanceId(), taskInstance.getId()); continue; } taskInstance.setProcessInstance(processInstance); TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() - .buildTaskInstanceRelatedInfo(taskInstance) - .buildProcessInstanceRelatedInfo(processInstance) - .create(); + .buildTaskInstanceRelatedInfo(taskInstance) + .buildProcessInstanceRelatedInfo(processInstance) + .create(); // only kill yarn job if exists , the local thread has exited ProcessUtils.killYarnJob(taskExecutionContext); @@ -348,14 +349,6 @@ public class MasterRegistryClient { logger.info("master failover end"); } - public void blockAcquireMutex() { - registryClient.getLock(registryClient.getMasterLockPath()); - } - - public void releaseLock() { - registryClient.releaseLock(registryClient.getMasterLockPath()); - } - /** * registry */ @@ -364,36 +357,24 @@ public class MasterRegistryClient { localNodePath = getMasterPath(); int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval(); HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, - masterConfig.getMasterMaxCpuloadAvg(), - masterConfig.getMasterReservedMemory(), - Sets.newHashSet(getMasterPath()), - Constants.MASTER_TYPE, - registryClient); + masterConfig.getMasterMaxCpuloadAvg(), + masterConfig.getMasterReservedMemory(), + Sets.newHashSet(getMasterPath()), + Constants.MASTER_TYPE, + registryClient); registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo()); - registryClient.addConnectionStateListener(new MasterRegistryConnectStateListener()); + registryClient.addConnectionStateListener(newState -> { + if (newState == ConnectionState.RECONNECTED || newState == ConnectionState.SUSPENDED) { + registryClient.persistEphemeral(localNodePath, ""); + } + }); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval); } - class MasterRegistryConnectStateListener implements RegistryConnectListener { - - @Override - public void notify(RegistryConnectState newState) { - if (RegistryConnectState.RECONNECTED == newState) { - registryClient.persistEphemeral(localNodePath, ""); - } - if (RegistryConnectState.SUSPENDED == newState) { - registryClient.persistEphemeral(localNodePath, ""); - } - } - } - - /** - * remove registry info - */ - public void unRegistry() { + public void deregister() { try { String address = getLocalAddress(); String localNodePath = getMasterPath(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java index 4fd50a01bc..cb5b6bef86 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java @@ -22,42 +22,43 @@ import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHED import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.registry.api.Event; +import org.apache.dolphinscheduler.registry.api.SubscribeListener; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.spi.register.DataChangeEvent; -import org.apache.dolphinscheduler.spi.register.SubscribeListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Strings; + public class MasterRegistryDataListener implements SubscribeListener { private static final Logger logger = LoggerFactory.getLogger(MasterRegistryDataListener.class); - private MasterRegistryClient masterRegistryClient; + private final MasterRegistryClient masterRegistryClient; public MasterRegistryDataListener() { masterRegistryClient = SpringApplicationContext.getBean(MasterRegistryClient.class); } @Override - public void notify(String path, String data, DataChangeEvent event) { + public void notify(Event event) { + final String path = event.path(); + if (Strings.isNullOrEmpty(path)) { + return; + } //monitor master if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS + Constants.SINGLE_SLASH)) { - handleMasterEvent(event, path); + handleMasterEvent(event); } else if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH)) { //monitor worker - handleWorkerEvent(event, path); + handleWorkerEvent(event); } } - /** - * monitor master - * - * @param event event - * @param path path - */ - public void handleMasterEvent(DataChangeEvent event, String path) { - switch (event) { + public void handleMasterEvent(Event event) { + final String path = event.path(); + switch (event.type()) { case ADD: logger.info("master node added : {}", path); break; @@ -69,14 +70,9 @@ public class MasterRegistryDataListener implements SubscribeListener { } } - /** - * monitor worker - * - * @param event event - * @param path path - */ - public void handleWorkerEvent(DataChangeEvent event, String path) { - switch (event) { + public void handleWorkerEvent(Event event) { + final String path = event.path(); + switch (event.type()) { case ADD: logger.info("worker node added : {}", path); break; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index 1bceeb7aad..b7e904b4d3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -27,16 +27,18 @@ import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; +import org.apache.dolphinscheduler.registry.api.Event; +import org.apache.dolphinscheduler.registry.api.Event.Type; +import org.apache.dolphinscheduler.registry.api.SubscribeListener; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue; import org.apache.dolphinscheduler.service.registry.RegistryClient; -import org.apache.dolphinscheduler.spi.register.DataChangeEvent; -import org.apache.dolphinscheduler.spi.register.SubscribeListener; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -101,10 +103,8 @@ public class ServerNodeManager implements InitializingBean { */ private ScheduledExecutorService executorService; - /** - * zk client - */ - private RegistryClient registryClient = RegistryClient.getInstance(); + @Autowired + private RegistryClient registryClient; /** * eg : /node/worker/group/127.0.0.1:xxx @@ -153,11 +153,11 @@ public class ServerNodeManager implements InitializingBean { */ executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor")); executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS); - /** + /* * init MasterNodeListener listener */ registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener()); - /** + /* * init WorkerNodeListener listener */ registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener()); @@ -167,15 +167,15 @@ public class ServerNodeManager implements InitializingBean { * load nodes from zookeeper */ public void load() { - /** + /* * master nodes from zookeeper */ updateMasterNodes(); - /** + /* * worker group nodes from zookeeper */ - Set workerGroups = registryClient.getWorkerGroupDirectly(); + Collection workerGroups = registryClient.getWorkerGroupDirectly(); for (String workerGroup : workerGroups) { syncWorkerGroupNodes(workerGroup, registryClient.getWorkerGroupNodesDirectly(workerGroup)); } @@ -218,25 +218,28 @@ public class ServerNodeManager implements InitializingBean { class WorkerDataListener implements SubscribeListener { @Override - public void notify(String path, String data, DataChangeEvent dataChangeEvent) { + public void notify(Event event) { + final String path = event.path(); + final Type type = event.type(); + final String data = event.data(); if (registryClient.isWorkerPath(path)) { try { - if (dataChangeEvent == DataChangeEvent.ADD) { + if (type == Type.ADD) { logger.info("worker group node : {} added.", path); String group = parseGroup(path); - Set currentNodes = registryClient.getWorkerGroupNodesDirectly(group); + Collection currentNodes = registryClient.getWorkerGroupNodesDirectly(group); logger.info("currentNodes : {}", currentNodes); syncWorkerGroupNodes(group, currentNodes); - } else if (dataChangeEvent == DataChangeEvent.REMOVE) { + } else if (type == Type.REMOVE) { logger.info("worker group node : {} down.", path); String group = parseGroup(path); - Set currentNodes = registryClient.getWorkerGroupNodesDirectly(group); + Collection currentNodes = registryClient.getWorkerGroupNodesDirectly(group); syncWorkerGroupNodes(group, currentNodes); alertDao.sendServerStopedAlert(1, path, "WORKER"); - } else if (dataChangeEvent == DataChangeEvent.UPDATE) { + } else if (type == Type.UPDATE) { logger.debug("worker group node : {} update, data: {}", path, data); String group = parseGroup(path); - Set currentNodes = registryClient.getWorkerGroupNodesDirectly(group); + Collection currentNodes = registryClient.getWorkerGroupNodesDirectly(group); syncWorkerGroupNodes(group, currentNodes); String node = parseNode(path); @@ -268,19 +271,18 @@ public class ServerNodeManager implements InitializingBean { } } - /** - * master node listener - */ class MasterDataListener implements SubscribeListener { @Override - public void notify(String path, String data, DataChangeEvent dataChangeEvent) { + public void notify(Event event) { + final String path = event.path(); + final Type type = event.type(); if (registryClient.isMasterPath(path)) { try { - if (dataChangeEvent.equals(DataChangeEvent.ADD)) { + if (type.equals(Type.ADD)) { logger.info("master node : {} added.", path); updateMasterNodes(); } - if (dataChangeEvent.equals(DataChangeEvent.REMOVE)) { + if (type.equals(Type.REMOVE)) { logger.info("master node : {} down.", path); updateMasterNodes(); alertDao.sendServerStopedAlert(1, path, "MASTER"); @@ -295,10 +297,10 @@ public class ServerNodeManager implements InitializingBean { private void updateMasterNodes() { SLOT_LIST.clear(); this.masterNodes.clear(); - String nodeLock = registryClient.getMasterLockPath(); + String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS; try { registryClient.getLock(nodeLock); - Set currentNodes = registryClient.getMasterNodesDirectly(); + Collection currentNodes = registryClient.getMasterNodesDirectly(); List masterNodes = registryClient.getServerList(NodeType.MASTER); syncMasterNodes(currentNodes, masterNodes); } catch (Exception e) { @@ -328,7 +330,7 @@ public class ServerNodeManager implements InitializingBean { * * @param nodes master nodes */ - private void syncMasterNodes(Set nodes, List masterNodes) { + private void syncMasterNodes(Collection nodes, List masterNodes) { masterLock.lock(); try { this.masterNodes.addAll(nodes); @@ -353,7 +355,7 @@ public class ServerNodeManager implements InitializingBean { * @param workerGroup worker group * @param nodes worker nodes */ - private void syncWorkerGroupNodes(String workerGroup, Set nodes) { + private void syncWorkerGroupNodes(String workerGroup, Collection nodes) { workerGroupLock.lock(); try { workerGroup = workerGroup.toLowerCase(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RegistryMonitorImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RegistryMonitorImpl.java index 34d6d9d270..9e3bb0c65b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RegistryMonitorImpl.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RegistryMonitorImpl.java @@ -19,36 +19,26 @@ package org.apache.dolphinscheduler.server.monitor; import org.apache.dolphinscheduler.service.registry.RegistryClient; +import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -/** - * zk monitor server impl - */ @Component public class RegistryMonitorImpl extends AbstractMonitor { - /** - * zookeeper operator - */ - private RegistryClient registryClient = RegistryClient.getInstance(); - - /** - * get active nodes map by path - * - * @param path path - * @return active nodes map - */ + @Autowired + private RegistryClient registryClient; + @Override protected Map getActiveNodesByPath(String path) { Map maps = new HashMap<>(); - List childrenList = registryClient.getChildrenKeys(path); + Collection childrenList = registryClient.getChildrenKeys(path); if (childrenList == null) { return maps; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java index 61e8c40c76..67fd07a892 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java @@ -33,11 +33,11 @@ public class HeartBeatTask implements Runnable { private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class); - private Set heartBeatPaths; - private RegistryClient registryClient; + private final Set heartBeatPaths; + private final RegistryClient registryClient; private WorkerManagerThread workerManagerThread; - private String serverType; - private HeartBeat heartBeat; + private final String serverType; + private final HeartBeat heartBeat; public HeartBeatTask(long startupTime, double maxCpuloadAvg, @@ -89,7 +89,7 @@ public class HeartBeatTask implements Runnable { } for (String heartBeatPath : heartBeatPaths) { - registryClient.update(heartBeatPath, heartBeat.encodeHeartBeat()); + registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat()); } } catch (Throwable ex) { logger.error("error write heartbeat info", ex); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index fa186d0d5f..96ec36b176 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -51,18 +51,12 @@ public class TaskCallbackService { */ private static final ConcurrentHashMap REMOTE_CHANNELS = new ConcurrentHashMap<>(); - /** - * zookeeper registry center - */ - private RegistryClient registryClient; - /** * netty remoting client */ private final NettyRemotingClient nettyRemotingClient; public TaskCallbackService() { - this.registryClient = RegistryClient.getInstance(); final NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java index b59e3ec616..54ec49c0a4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java @@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.commons.lang.StringUtils; +import java.io.IOException; import java.util.Set; import java.util.StringJoiner; import java.util.concurrent.Executors; @@ -73,6 +74,7 @@ public class WorkerRegistryClient { */ private ScheduledExecutorService heartBeatExecutor; + @Autowired private RegistryClient registryClient; /** @@ -86,7 +88,6 @@ public class WorkerRegistryClient { public void initWorkRegistry() { this.workerGroups = workerConfig.getWorkerGroups(); this.startupTime = System.currentTimeMillis(); - this.registryClient = RegistryClient.getInstance(); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); } @@ -121,7 +122,7 @@ public class WorkerRegistryClient { /** * remove registry info */ - public void unRegistry() { + public void unRegistry() throws IOException { try { String address = getLocalAddress(); Set workerZkPaths = getWorkerZkPaths(); @@ -161,7 +162,7 @@ public class WorkerRegistryClient { return workerPaths; } - public void handleDeadServer(Set nodeSet, NodeType nodeType, String opType) throws Exception { + public void handleDeadServer(Set nodeSet, NodeType nodeType, String opType) { registryClient.handleDeadServer(nodeSet, nodeType, opType); } diff --git a/dolphinscheduler-server/src/main/resources/config/install_config.conf b/dolphinscheduler-server/src/main/resources/config/install_config.conf index 2e74b1e1a9..96174d9fc4 100755 --- a/dolphinscheduler-server/src/main/resources/config/install_config.conf +++ b/dolphinscheduler-server/src/main/resources/config/install_config.conf @@ -93,11 +93,6 @@ dbname="dolphinscheduler" # --------------------------------------------------------- # Registry Server # --------------------------------------------------------- -# Registry Server plugin dir. DolphinScheduler will find and load the registry plugin jar package from this dir. -# For now default registry server is zookeeper, so the default value is `lib/plugin/registry/zookeeper`. -# If you want to implement your own registry server, please see https://dolphinscheduler.apache.org/en-us/docs/dev/user_doc/registry_spi.html -registryPluginDir="lib/plugin/registry/zookeeper" - # Registry Server plugin name, should be a substring of `registryPluginDir`, DolphinScheduler use this for verifying configuration consistency registryPluginName="zookeeper" diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java index 80f75af0a8..619dba84bd 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java @@ -56,7 +56,7 @@ public class ExecutorDispatcherTest { } @Test - public void testDispatch() throws ExecuteException { + public void testDispatch() throws Exception { int port = 30000; final NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(port); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java index 073df65b84..97c45f138f 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java @@ -20,10 +20,6 @@ package org.apache.dolphinscheduler.server.master.registry; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.doNothing; -import java.util.Arrays; -import java.util.Date; -import java.util.concurrent.ScheduledExecutorService; - import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; @@ -33,13 +29,17 @@ import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecC import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.registry.RegistryClient; + +import java.util.Arrays; +import java.util.Date; +import java.util.concurrent.ScheduledExecutorService; + import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -59,6 +59,7 @@ public class MasterRegistryClientTest { @Mock private MasterConfig masterConfig; + @Mock private RegistryClient registryClient; @Mock @@ -72,13 +73,10 @@ public class MasterRegistryClientTest { @Before public void before() throws Exception { - PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class)); - registryClient = PowerMockito.mock(RegistryClient.class); given(registryClient.getLock(Mockito.anyString())).willReturn(true); - given(registryClient.getMasterFailoverLockPath()).willReturn("/path"); given(registryClient.releaseLock(Mockito.anyString())).willReturn(true); given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn("127.0.0.1:8080"); - doNothing().when(registryClient).handleDeadServer(Mockito.anyString(), Mockito.any(NodeType.class), Mockito.anyString()); + doNothing().when(registryClient).handleDeadServer(Mockito.anySet(), Mockito.any(NodeType.class), Mockito.anyString()); ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient); ProcessInstance processInstance = new ProcessInstance(); diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-service/pom.xml index bc8b367364..753dc31548 100644 --- a/dolphinscheduler-service/pom.xml +++ b/dolphinscheduler-service/pom.xml @@ -42,6 +42,10 @@ org.apache.dolphinscheduler dolphinscheduler-spi + + org.apache.dolphinscheduler + dolphinscheduler-registry-zookeeper + org.quartz-scheduler diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java deleted file mode 100644 index ba74f88afb..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.registry; - -import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS; -import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; -import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; - -import org.apache.dolphinscheduler.common.IStoppable; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; -import org.apache.dolphinscheduler.spi.plugin.DolphinPluginLoader; -import org.apache.dolphinscheduler.spi.plugin.DolphinPluginManagerConfig; -import org.apache.dolphinscheduler.spi.register.Registry; -import org.apache.dolphinscheduler.spi.register.RegistryConnectListener; -import org.apache.dolphinscheduler.spi.register.RegistryException; -import org.apache.dolphinscheduler.spi.register.RegistryPluginManager; -import org.apache.dolphinscheduler.spi.register.SubscribeListener; - -import org.apache.commons.lang.StringUtils; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.ImmutableList; - -/** - * All business parties use this class to access the registry - */ -public class RegistryCenter { - - private static final Logger logger = LoggerFactory.getLogger(RegistryCenter.class); - - private final AtomicBoolean isStarted = new AtomicBoolean(false); - - private Registry registry; - - private IStoppable stoppable; - - /** - * nodes namespace - */ - protected static String NODES; - - private RegistryPluginManager registryPluginManager; - - protected static final String EMPTY = ""; - - private static final String REGISTRY_PREFIX = "registry"; - - private static final String REGISTRY_PLUGIN_BINDING = "registry.plugin.binding"; - - private static final String REGISTRY_PLUGIN_DIR = "registry.plugin.dir"; - - private static final String MAVEN_LOCAL_REPOSITORY = "maven.local.repository"; - - private static final String REGISTRY_PLUGIN_NAME = "plugin.name"; - - /** - * default registry plugin dir - */ - private static final String REGISTRY_PLUGIN_PATH = "lib/plugin/registry"; - - private static final String REGISTRY_CONFIG_FILE_PATH = "/registry.properties"; - - /** - * init node persist - */ - public void init() { - if (isStarted.compareAndSet(false, true)) { - PropertyUtils.loadPropertyFile(REGISTRY_CONFIG_FILE_PATH); - Map registryConfig = PropertyUtils.getPropertiesByPrefix(REGISTRY_PREFIX); - - if (null == registryConfig || registryConfig.isEmpty()) { - throw new RegistryException("registry config param is null"); - } - if (null == registryPluginManager) { - installRegistryPlugin(registryConfig.get(REGISTRY_PLUGIN_NAME)); - registry = registryPluginManager.getRegistry(); - } - - registry.init(registryConfig); - initNodes(); - - } - } - - /** - * init nodes - */ - private void initNodes() { - persist(REGISTRY_DOLPHINSCHEDULER_MASTERS, EMPTY); - persist(REGISTRY_DOLPHINSCHEDULER_WORKERS, EMPTY); - persist(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, EMPTY); - } - - /** - * install registry plugin - */ - private void installRegistryPlugin(String registryPluginName) { - DolphinPluginManagerConfig registryPluginManagerConfig = new DolphinPluginManagerConfig(); - registryPluginManagerConfig.setPlugins(PropertyUtils.getString(REGISTRY_PLUGIN_BINDING)); - if (StringUtils.isNotBlank(PropertyUtils.getString(REGISTRY_PLUGIN_DIR))) { - registryPluginManagerConfig.setInstalledPluginsDir(PropertyUtils.getString(REGISTRY_PLUGIN_DIR, REGISTRY_PLUGIN_PATH).trim()); - } - - if (StringUtils.isNotBlank(PropertyUtils.getString(MAVEN_LOCAL_REPOSITORY))) { - registryPluginManagerConfig.setMavenLocalRepository(PropertyUtils.getString(MAVEN_LOCAL_REPOSITORY).trim()); - } - - registryPluginManager = new RegistryPluginManager(registryPluginName); - - DolphinPluginLoader registryPluginLoader = new DolphinPluginLoader(registryPluginManagerConfig, ImmutableList.of(registryPluginManager)); - try { - registryPluginLoader.loadPlugins(); - } catch (Exception e) { - throw new RuntimeException("Load registry Plugin Failed !", e); - } - } - - /** - * close - */ - public void close() { - if (isStarted.compareAndSet(true, false) && registry != null) { - registry.close(); - } - } - - public void persist(String key, String value) { - registry.persist(key, value); - } - - public void persistEphemeral(String key, String value) { - registry.persistEphemeral(key, value); - } - - public void remove(String key) { - registry.remove(key); - } - - public void update(String key, String value) { - registry.update(key, value); - } - - public String get(String key) { - return registry.get(key); - } - - public void subscribe(String path, SubscribeListener subscribeListener) { - registry.subscribe(path, subscribeListener); - } - - public void addConnectionStateListener(RegistryConnectListener registryConnectListener) { - registry.addConnectionStateListener(registryConnectListener); - } - - public boolean isExisted(String key) { - return registry.isExisted(key); - } - - public boolean getLock(String key) { - return registry.acquireLock(key); - } - - public boolean releaseLock(String key) { - return registry.releaseLock(key); - } - - /** - * @return get dead server node parent path - */ - public String getDeadZNodeParentPath() { - return REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS; - } - - public void setStoppable(IStoppable stoppable) { - this.stoppable = stoppable; - } - - public IStoppable getStoppable() { - return stoppable; - } - - /** - * whether master path - * - * @param path path - * @return result - */ - public boolean isMasterPath(String path) { - return path != null && path.contains(REGISTRY_DOLPHINSCHEDULER_MASTERS); - } - - /** - * get worker group path - * - * @param workerGroup workerGroup - * @return worker group path - */ - public String getWorkerGroupPath(String workerGroup) { - return REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup; - } - - /** - * whether worker path - * - * @param path path - * @return result - */ - public boolean isWorkerPath(String path) { - return path != null && path.contains(REGISTRY_DOLPHINSCHEDULER_WORKERS); - } - - /** - * get children nodes - * - * @param key key - * @return children nodes - */ - public List getChildrenKeys(final String key) { - return registry.getChildren(key); - } - -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java index 210ce21e8e..f384678681 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java @@ -22,60 +22,72 @@ import static org.apache.dolphinscheduler.common.Constants.COLON; import static org.apache.dolphinscheduler.common.Constants.DELETE_OP; import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING; import static org.apache.dolphinscheduler.common.Constants.MASTER_TYPE; +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS; import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE; +import static com.google.common.base.Preconditions.checkArgument; + import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.HeartBeat; import org.apache.dolphinscheduler.common.utils.JSONUtils; - -import org.apache.commons.lang.StringUtils; - +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.registry.api.ConnectionListener; +import org.apache.dolphinscheduler.registry.api.Registry; +import org.apache.dolphinscheduler.registry.api.RegistryException; +import org.apache.dolphinscheduler.registry.api.RegistryFactory; +import org.apache.dolphinscheduler.registry.api.RegistryFactoryLoader; +import org.apache.dolphinscheduler.registry.api.SubscribeListener; + +import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; +import java.util.Collection; import java.util.Date; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; -/** - * registry client singleton - */ -public class RegistryClient extends RegistryCenter { +import com.google.common.base.Strings; +@Component +public class RegistryClient { private static final Logger logger = LoggerFactory.getLogger(RegistryClient.class); - private static RegistryClient registryClient = new RegistryClient(); - - private RegistryClient() { - super.init(); - } - - public static RegistryClient getInstance() { - return registryClient; + private static final String EMPTY = ""; + private static final String REGISTRY_PREFIX = "registry"; + private static final String REGISTRY_PLUGIN_NAME = "plugin.name"; + private static final String REGISTRY_CONFIG_FILE_PATH = "/registry.properties"; + private final AtomicBoolean isStarted = new AtomicBoolean(false); + private Registry registry; + private IStoppable stoppable; + + @PostConstruct + public void afterConstruct() { + start(); + initNodes(); } - /** - * get active master num - * - * @return active master number - */ public int getActiveMasterNum() { - List childrenList = new ArrayList<>(); + Collection childrenList = new ArrayList<>(); try { // read master node parent path from conf - if (isExisted(getNodeParentPath(NodeType.MASTER))) { - childrenList = getChildrenKeys(getNodeParentPath(NodeType.MASTER)); + if (exists(rootNodePath(NodeType.MASTER))) { + childrenList = getChildrenKeys(rootNodePath(NodeType.MASTER)); } } catch (Exception e) { logger.error("getActiveMasterNum error", e); @@ -83,15 +95,9 @@ public class RegistryClient extends RegistryCenter { return childrenList.size(); } - /** - * get server list. - * - * @param nodeType zookeeper node type - * @return server list - */ public List getServerList(NodeType nodeType) { - Map serverMaps = getServerMaps(nodeType); - String parentPath = getNodeParentPath(nodeType); + Map serverMaps = getServerMaps(nodeType, false); + String parentPath = rootNodePath(nodeType); List serverList = new ArrayList<>(); for (Map.Entry entry : serverMaps.entrySet()) { @@ -119,40 +125,11 @@ public class RegistryClient extends RegistryCenter { return serverList; } - /** - * get server nodes. - * - * @param nodeType registry node type - * @return result : list - */ - public List getServerNodes(NodeType nodeType) { - String path = getNodeParentPath(nodeType); - List serverList = getChildrenKeys(path); - if (nodeType == NodeType.WORKER) { - List workerList = new ArrayList<>(); - for (String group : serverList) { - List groupServers = getChildrenKeys(path + SINGLE_SLASH + group); - for (String groupServer : groupServers) { - workerList.add(group + SINGLE_SLASH + groupServer); - } - } - serverList = workerList; - } - return serverList; - } - - /** - * get server list map. - * - * @param nodeType zookeeper node type - * @param hostOnly host only - * @return result : {host : resource info} - */ public Map getServerMaps(NodeType nodeType, boolean hostOnly) { Map serverMap = new HashMap<>(); try { - String path = getNodeParentPath(nodeType); - List serverList = getServerNodes(nodeType); + String path = rootNodePath(nodeType); + Collection serverList = getServerNodes(nodeType); for (String server : serverList) { String host = server; if (nodeType == NodeType.WORKER && hostOnly) { @@ -167,298 +144,194 @@ public class RegistryClient extends RegistryCenter { return serverMap; } - /** - * get server list map. - * - * @param nodeType zookeeper node type - * @return result : {host : resource info} - */ - public Map getServerMaps(NodeType nodeType) { - return getServerMaps(nodeType, false); + public boolean checkNodeExists(String host, NodeType nodeType) { + return getServerMaps(nodeType, true).keySet() + .stream() + .anyMatch(it -> it.contains(host)); } - /** - * get server node set. - * - * @param nodeType zookeeper node type - * @param hostOnly host only - * @return result : set - */ - public Set getServerNodeSet(NodeType nodeType, boolean hostOnly) { - Set serverSet = new HashSet<>(); - try { - List serverList = getServerNodes(nodeType); - for (String server : serverList) { - String host = server; - if (nodeType == NodeType.WORKER && hostOnly) { - host = server.split(SINGLE_SLASH)[1]; - } - serverSet.add(host); + public void handleDeadServer(Collection nodes, NodeType nodeType, String opType) { + nodes.forEach(node -> { + final String host = getHostByEventDataPath(node); + final String type = nodeType == NodeType.MASTER ? MASTER_TYPE : WORKER_TYPE; + + if (opType.equals(DELETE_OP)) { + removeDeadServerByHost(host, type); + } else if (opType.equals(ADD_OP)) { + String deadServerPath = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + type + UNDERLINE + host; + // Add dead server info to zk dead server path : /dead-servers/ + registry.put(deadServerPath, type + UNDERLINE + host, false); + logger.info("{} server dead , and {} added to zk dead server path success", nodeType, node); } - } catch (Exception e) { - logger.error("get server node set failed", e); - } - return serverSet; + }); } - /** - * get server node list. - * - * @param nodeType zookeeper node type - * @param hostOnly host only - * @return result : list - */ - public List getServerNodeList(NodeType nodeType, boolean hostOnly) { - Set serverSet = getServerNodeSet(nodeType, hostOnly); - List serverList = new ArrayList<>(serverSet); - Collections.sort(serverList); - return serverList; - } + public boolean checkIsDeadServer(String node, String serverType) { + // ip_sequence_no + String[] zNodesPath = node.split("/"); + String ipSeqNo = zNodesPath[zNodesPath.length - 1]; + String deadServerPath = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + serverType + UNDERLINE + ipSeqNo; - /** - * check the zookeeper node already exists - * - * @param host host - * @param nodeType zookeeper node type - * @return true if exists - */ - public boolean checkNodeExists(String host, NodeType nodeType) { - String path = getNodeParentPath(nodeType); - if (StringUtils.isEmpty(path)) { - logger.error("check zk node exists error, host:{}, zk node type:{}", - host, nodeType); - return false; - } - Map serverMaps = getServerMaps(nodeType, true); - for (String hostKey : serverMaps.keySet()) { - if (hostKey.contains(host)) { - return true; - } - } - return false; + return !exists(node) || exists(deadServerPath); } - /** - * @return get worker node parent path - */ - protected String getWorkerNodeParentPath() { - return Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; + public Collection getMasterNodesDirectly() { + return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_MASTERS); } - /** - * @return get master node parent path - */ - protected String getMasterNodeParentPath() { - return Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; + public Collection getWorkerGroupDirectly() { + return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS); } - /** - * @return get dead server node parent path - */ - protected String getDeadNodeParentPath() { - return Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS; + public Collection getWorkerGroupNodesDirectly(String workerGroup) { + return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup); } /** - * @return get master lock path + * get host ip:port, path format: parentPath/ip:port + * + * @param path path + * @return host ip:port, string format: parentPath/ip:port */ - public String getMasterLockPath() { - return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS; + public String getHostByEventDataPath(String path) { + checkArgument(!Strings.isNullOrEmpty(path), "path cannot be null or empty"); + + final String[] pathArray = path.split(SINGLE_SLASH); + + checkArgument(pathArray.length >= 1, "cannot parse path: %s", path); + + return pathArray[pathArray.length - 1]; } - /** - * @param nodeType zookeeper node type - * @return get zookeeper node parent path - */ - public String getNodeParentPath(NodeType nodeType) { - String path = ""; - switch (nodeType) { - case MASTER: - return getMasterNodeParentPath(); - case WORKER: - return getWorkerNodeParentPath(); - case DEAD_SERVER: - return getDeadNodeParentPath(); - default: - break; + public void close() throws IOException { + if (isStarted.compareAndSet(true, false) && registry != null) { + registry.close(); } - return path; } - /** - * @return get master start up lock path - */ - public String getMasterStartUpLockPath() { - return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS; + public void persistEphemeral(String key, String value) { + registry.put(key, value, true); } - /** - * @return get master failover lock path - */ - public String getMasterFailoverLockPath() { - return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS; + public void remove(String key) { + registry.delete(key); } - /** - * @return get worker failover lock path - */ - public String getWorkerFailoverLockPath() { - return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS; + public String get(String key) { + return registry.get(key); } - /** - * opType(add): if find dead server , then add to zk deadServerPath - * opType(delete): delete path from zk - * - * @param node node path - * @param nodeType master or worker - * @param opType delete or add - * @throws Exception errors - */ - public void handleDeadServer(String node, NodeType nodeType, String opType) throws Exception { - String host = getHostByEventDataPath(node); - String type = (nodeType == NodeType.MASTER) ? MASTER_TYPE : WORKER_TYPE; - - //check server restart, if restart , dead server path in zk should be delete - if (opType.equals(DELETE_OP)) { - removeDeadServerByHost(host, type); - - } else if (opType.equals(ADD_OP)) { - String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host; - if (!isExisted(deadServerPath)) { - //add dead server info to zk dead server path : /dead-servers/ + public void subscribe(String path, SubscribeListener listener) { + registry.subscribe(path, listener); + } - persist(deadServerPath, (type + UNDERLINE + host)); + public void addConnectionStateListener(ConnectionListener listener) { + registry.addConnectionStateListener(listener); + } - logger.info("{} server dead , and {} added to zk dead server path success", - nodeType, node); - } - } + public boolean exists(String key) { + return registry.exists(key); + } + public boolean getLock(String key) { + return registry.acquireLock(key); } - /** - * check dead server or not , if dead, stop self - * - * @param node node path - * @param serverType master or worker prefix - * @return true if not exists - * @throws Exception errors - */ - public boolean checkIsDeadServer(String node, String serverType) throws Exception { - // ip_sequence_no - String[] zNodesPath = node.split("\\/"); - String ipSeqNo = zNodesPath[zNodesPath.length - 1]; - String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + serverType + UNDERLINE + ipSeqNo; + public boolean releaseLock(String key) { + return registry.releaseLock(key); + } - return !isExisted(node) || isExisted(deadServerPath); + public void setStoppable(IStoppable stoppable) { + this.stoppable = stoppable; } - /** - * get master nodes directly - * - * @return master nodes - */ - public Set getMasterNodesDirectly() { - List masters = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_MASTERS); - return new HashSet<>(masters); + public IStoppable getStoppable() { + return stoppable; } - /** - * get worker nodes directly - * - * @return master nodes - */ - public Set getWorkerNodesDirectly() { - List workers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS); - return new HashSet<>(workers); + public boolean isMasterPath(String path) { + return path != null && path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS); } - /** - * get worker group directly - * - * @return worker group nodes - */ - public Set getWorkerGroupDirectly() { - List workers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS); - return new HashSet<>(workers); + public boolean isWorkerPath(String path) { + return path != null && path.startsWith(REGISTRY_DOLPHINSCHEDULER_WORKERS); } - /** - * get worker group nodes - */ - public Set getWorkerGroupNodesDirectly(String workerGroup) { - List workers = getChildrenKeys(getWorkerGroupPath(workerGroup)); - return new HashSet<>(workers); + public Collection getChildrenKeys(final String key) { + return registry.children(key); } - /** - * opType(add): if find dead server , then add to zk deadServerPath - * opType(delete): delete path from zk - * - * @param nodeSet node path set - * @param nodeType master or worker - * @param opType delete or add - * @throws Exception errors - */ - public void handleDeadServer(Set nodeSet, NodeType nodeType, String opType) throws Exception { + public Set getServerNodeSet(NodeType nodeType, boolean hostOnly) { + try { + return getServerNodes(nodeType).stream().map(server -> { + if (nodeType == NodeType.WORKER && hostOnly) { + return server.split(SINGLE_SLASH)[1]; + } + return server; + }).collect(Collectors.toSet()); + } catch (Exception e) { + throw new RegistryException("Failed to get server node: " + nodeType, e); + } + } - String type = (nodeType == NodeType.MASTER) ? MASTER_TYPE : WORKER_TYPE; - for (String node : nodeSet) { - String host = getHostByEventDataPath(node); - //check server restart, if restart , dead server path in zk should be delete - if (opType.equals(DELETE_OP)) { - removeDeadServerByHost(host, type); + private void start() { + if (isStarted.compareAndSet(false, true)) { + PropertyUtils.loadPropertyFile(REGISTRY_CONFIG_FILE_PATH); + final Map registryConfig = PropertyUtils.getPropertiesByPrefix(REGISTRY_PREFIX); - } else if (opType.equals(ADD_OP)) { - String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host; - if (!isExisted(deadServerPath)) { - //add dead server info to zk dead server path : /dead-servers/ - persist(deadServerPath, (type + UNDERLINE + host)); - logger.info("{} server dead , and {} added to registry dead server path success", - nodeType, node); - } + if (null == registryConfig || registryConfig.isEmpty()) { + throw new RegistryException("registry config param is null"); } - + final String pluginName = registryConfig.get(REGISTRY_PLUGIN_NAME); + final Map factories = RegistryFactoryLoader.load(); + if (!factories.containsKey(pluginName)) { + throw new RegistryException("No such registry plugin: " + pluginName); + } + registry = factories.get(pluginName).create(); + registry.start(registryConfig); } + } + private void initNodes() { + registry.put(REGISTRY_DOLPHINSCHEDULER_MASTERS, EMPTY, false); + registry.put(REGISTRY_DOLPHINSCHEDULER_WORKERS, EMPTY, false); + registry.put(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, EMPTY, false); } - /** - * get host ip:port, string format: parentPath/ip:port - * - * @param path path - * @return host ip:port, string format: parentPath/ip:port - */ - public String getHostByEventDataPath(String path) { - if (StringUtils.isEmpty(path)) { - logger.error("empty path!"); - return ""; - } - String[] pathArray = path.split(SINGLE_SLASH); - if (pathArray.length < 1) { - logger.error("parse ip error: {}", path); - return ""; + private String rootNodePath(NodeType type) { + switch (type) { + case MASTER: + return Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; + case WORKER: + return Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; + case DEAD_SERVER: + return Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS; + default: + throw new IllegalStateException("Should not reach here"); } - return pathArray[pathArray.length - 1]; + } + private Collection getServerNodes(NodeType nodeType) { + final String path = rootNodePath(nodeType); + final Collection serverList = getChildrenKeys(path); + if (nodeType != NodeType.WORKER) { + return serverList; + } + return serverList.stream().flatMap(group -> + getChildrenKeys(path + SINGLE_SLASH + group) + .stream() + .map(it -> group + SINGLE_SLASH + it) + ).collect(Collectors.toList()); } - /** - * remove dead server by host - * - * @param host host - * @param serverType serverType - */ - public void removeDeadServerByHost(String host, String serverType) { - List deadServers = getChildrenKeys(getDeadZNodeParentPath()); + private void removeDeadServerByHost(String host, String serverType) { + Collection deadServers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS); for (String serverPath : deadServers) { if (serverPath.startsWith(serverType + UNDERLINE + host)) { - String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath; + String server = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + serverPath; remove(server); logger.info("{} server {} deleted from zk dead server path success", serverType, host); } } } - } diff --git a/dolphinscheduler-service/src/main/resources/registry.properties b/dolphinscheduler-service/src/main/resources/registry.properties index b00dfc0533..f534a65ed8 100644 --- a/dolphinscheduler-service/src/main/resources/registry.properties +++ b/dolphinscheduler-service/src/main/resources/registry.properties @@ -15,18 +15,5 @@ # limitations under the License. # -#registry.plugin.dir config the Registry Plugin dir. -registry.plugin.dir=lib/plugin/registry - registry.plugin.name=zookeeper registry.servers=127.0.0.1:2181 - -#maven.local.repository=/usr/local/localRepository - -#registry.plugin.binding config the Registry Plugin need be load when development and run in IDE -#registry.plugin.binding=./dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml - -#registry timeout -#registry.session.timeout.ms=30000 -#registry.connection.timeout.ms=7500 -#registry.block.until.connected.wait=600 \ No newline at end of file diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryClientTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryClientTest.java deleted file mode 100644 index 2dc219334a..0000000000 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryClientTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.registry; - -import static org.apache.dolphinscheduler.common.Constants.ADD_OP; -import static org.apache.dolphinscheduler.common.Constants.DELETE_OP; - -import static org.mockito.BDDMockito.given; - -import org.apache.dolphinscheduler.common.enums.NodeType; -import org.apache.dolphinscheduler.spi.register.Registry; - -import java.util.Arrays; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import com.google.common.collect.Sets; - -@RunWith(PowerMockRunner.class) -@PrepareForTest({ RegistryClient.class }) -public class RegistryClientTest { - - private RegistryClient registryClient; - - @Test - public void test() throws Exception { - Registry registry = PowerMockito.mock(Registry.class); - PowerMockito.doNothing().when(registry).persist(Mockito.anyString(), Mockito.anyString()); - PowerMockito.doNothing().when(registry).update(Mockito.anyString(), Mockito.anyString()); - PowerMockito.when(registry.releaseLock(Mockito.anyString())).thenReturn(true); - PowerMockito.when(registry.getChildren("/dead-servers")).thenReturn(Arrays.asList("worker_127.0.0.1:8089")); - - PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class)); - registryClient = PowerMockito.mock(RegistryClient.class); - registryClient.persist("/key", ""); - registryClient.update("/key", ""); - registryClient.releaseLock("/key"); - registryClient.getChildrenKeys("/key"); - registryClient.handleDeadServer(Sets.newHashSet("ma/127.0.0.1:8089"), NodeType.WORKER, DELETE_OP); - registryClient.handleDeadServer(Sets.newHashSet("ma/127.0.0.1:8089"), NodeType.WORKER, ADD_OP); - //registryClient.removeDeadServerByHost("127.0.0.1:8089","master"); - registryClient.handleDeadServer("ma/127.0.0.1:8089", NodeType.WORKER, DELETE_OP); - registryClient.handleDeadServer("ma/127.0.0.1:8089", NodeType.WORKER, ADD_OP); - registryClient.checkIsDeadServer("master/127.0.0.1","master"); - given(registry.getChildren("/nodes/worker")).willReturn(Arrays.asList("worker_127.0.0.1:8089")); - given(registry.getChildren("/nodes/worker/worker_127.0.0.1:8089")).willReturn(Arrays.asList("default")); - - registryClient.checkNodeExists("127.0.0.1",NodeType.WORKER); - - registryClient.getServerList(NodeType.MASTER); - - } - -} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryPluginTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryPluginTest.java deleted file mode 100644 index a35252c230..0000000000 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryPluginTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.registry; - -import org.apache.dolphinscheduler.spi.plugin.DolphinPluginLoader; -import org.apache.dolphinscheduler.spi.plugin.DolphinPluginManagerConfig; -import org.apache.dolphinscheduler.spi.register.RegistryPluginManager; - -import org.junit.Assert; -import org.junit.Test; - -import com.google.common.collect.ImmutableList; - -public class RegistryPluginTest { - - @Test - public void testLoadPlugin() throws Exception { - DolphinPluginManagerConfig registryPluginManagerConfig = new DolphinPluginManagerConfig(); - String path = DolphinPluginLoader.class.getClassLoader().getResource("").getPath(); - - String registryPluginZkPath = path + "../../../dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml"; - registryPluginManagerConfig.setPlugins(registryPluginZkPath); - RegistryPluginManager registryPluginManager = new RegistryPluginManager("zookeeper"); - - DolphinPluginLoader registryPluginLoader = new DolphinPluginLoader(registryPluginManagerConfig, ImmutableList.of(registryPluginManager)); - registryPluginLoader.loadPlugins(); - Assert.assertNotNull(registryPluginManager.getRegistry()); - - } -} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/DolphinSchedulerPlugin.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/DolphinSchedulerPlugin.java index 1c5f1c515e..a073c53c8a 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/DolphinSchedulerPlugin.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/DolphinSchedulerPlugin.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.spi; import static java.util.Collections.emptyList; import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory; -import org.apache.dolphinscheduler.spi.register.RegistryFactory; import org.apache.dolphinscheduler.spi.task.TaskChannelFactory; /** @@ -42,14 +41,6 @@ public interface DolphinSchedulerPlugin { return emptyList(); } - /** - * get registry plugin factory - * @return registry factory - */ - default Iterable getRegisterFactorys() { - return emptyList(); - } - /** * get task plugin factory * @return registry factory diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ConnectStateListener.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ConnectStateListener.java deleted file mode 100644 index 6675ef60f1..0000000000 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ConnectStateListener.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.spi.register; - -public interface ConnectStateListener { - - void notify(RegistryConnectState state); -} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/DataChangeEvent.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/DataChangeEvent.java deleted file mode 100644 index a6aa32d8f4..0000000000 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/DataChangeEvent.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.spi.register; - -/** - * Monitor the type of data changes - */ -public enum DataChangeEvent { - - ADD("ADD", 1), - REMOVE("REMOVE", 2), - UPDATE("UPDATE",3); - - private String type; - - private int value; - - DataChangeEvent(String type, int value) { - this.type = type; - this.value = value; - } -} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java deleted file mode 100644 index 94b13e6656..0000000000 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.spi.register; - -import java.util.HashMap; - -/** - * The registry node monitors subscriptions - */ -public class ListenerManager { - - /** - * All message subscriptions must be subscribed uniformly at startup. - * A node path only supports one listener - */ - private static HashMap listeners = new HashMap<>(); - - /** - * Check whether the key has been monitored - */ - public static boolean checkHasListeners(String path) { - return null != listeners.get(path); - } - - /** - * add listener(A node can only be monitored by one listener) - */ - public static void addListener(String path, SubscribeListener listener) { - listeners.put(path, listener); - } - - /** - * remove listener - */ - public static void removeListener(String path) { - listeners.remove(path); - } - - /** - * - *After the data changes, it is distributed to the corresponding listener for processing - */ - public static void dataChange(String key,String path, String data, DataChangeEvent dataChangeEvent) { - SubscribeListener notifyListener = listeners.get(key); - if (null == notifyListener) { - return; - } - notifyListener.notify(path, data, dataChangeEvent); - } - -} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/Registry.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/Registry.java deleted file mode 100644 index 11fe25a891..0000000000 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/Registry.java +++ /dev/null @@ -1,102 +0,0 @@ -package org.apache.dolphinscheduler.spi.register;/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.util.List; -import java.util.Map; - -/** - * The final display of all registry component data must follow a tree structure. - * Therefore, some registry may need to do a layer of internal conversion, such as Etcd - */ -public interface Registry { - - /** - * initialize registry center. - */ - void init(Map registerData); - - /** - * close registry - */ - void close(); - - /** - * subscribe registry data change, a path can only be monitored by one listener - */ - boolean subscribe(String path, SubscribeListener subscribeListener); - - /** - * unsubscribe - */ - void unsubscribe(String path); - - /** - * Registry status monitoring, globally unique. Only one is allowed to subscribe. - */ - void addConnectionStateListener(RegistryConnectListener registryConnectListener); - - /** - * get key - */ - String get(String key); - - /** - * delete - */ - void remove(String key); - - /** - * persist data - */ - void persist(String key, String value); - - /** - *persist ephemeral data - */ - void persistEphemeral(String key, String value); - - /** - * update data - */ - void update(String key, String value); - - /** - * get children keys - */ - List getChildren(String path); - - /** - * Judge node is exist or not. - */ - boolean isExisted(String key); - - /** - * delete kay - */ - boolean delete(String key); - - /** - * Obtain a distributed lock - * todo It is best to add expiration time, and automatically release the lock after expiration - */ - boolean acquireLock(String key); - - /** - * release key - */ - boolean releaseLock(String key); -} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryConnectListener.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryConnectListener.java deleted file mode 100644 index 83385f8998..0000000000 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryConnectListener.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.spi.register; - -public interface RegistryConnectListener { - - void notify(RegistryConnectState newState); -} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryConnectState.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryConnectState.java deleted file mode 100644 index e085e6d091..0000000000 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryConnectState.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.spi.register; - -/** - * All registry connection status must be converted to this - */ -public enum RegistryConnectState { - CONNECTED("connected", 1), - RECONNECTED("reconnected", 2), - SUSPENDED("suspended", 3), - LOST("lost", 4); - - private String description; - - private int state; - - RegistryConnectState(String description, int state) { - this.description = description; - this.state = state; - } -} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryException.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryException.java deleted file mode 100644 index 884f005910..0000000000 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.spi.register; - -/** - * registry exception - */ -public class RegistryException extends RuntimeException { - - public RegistryException(String message, Throwable cause) { - super(message, cause); - } - - public RegistryException(String message) { - super(message); - } -} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryFactory.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryFactory.java deleted file mode 100644 index 244c0f437a..0000000000 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryFactory.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.spi.register; - -/** - * Registry the component factory, all registry must implement this interface - */ -public interface RegistryFactory { - - /** - * get registry component name - */ - String getName(); - - /** - * get registry - */ - Registry create(); -} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryPluginManager.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryPluginManager.java deleted file mode 100644 index 211795f5b9..0000000000 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryPluginManager.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.spi.register; - -import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin; -import org.apache.dolphinscheduler.spi.classloader.ThreadContextClassLoader; -import org.apache.dolphinscheduler.spi.plugin.AbstractDolphinPluginManager; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The plug-in address of the registry needs to be configured. - * Multi-registries are not supported. - * When the plug-in directory contains multiple plug-ins, only the configured plug-in will be used. - * todo It’s not good to put it here, consider creating a separate API module for each plugin - */ -public class RegistryPluginManager extends AbstractDolphinPluginManager { - - private static final Logger logger = LoggerFactory.getLogger(RegistryPluginManager.class); - - private RegistryFactory registryFactory; - - public static Registry registry; - - private String registerPluginName; - - public RegistryPluginManager(String registerPluginName) { - this.registerPluginName = registerPluginName; - } - - @Override - public void installPlugin(DolphinSchedulerPlugin dolphinSchedulerPlugin) { - for (RegistryFactory registryFactory : dolphinSchedulerPlugin.getRegisterFactorys()) { - logger.info("Registering Registry Plugin '{}'", registryFactory.getName()); - if (registerPluginName.equals(registryFactory.getName())) { - this.registryFactory = registryFactory; - loadRegistry(); - return; - } - } - if (null == registry) { - throw new RegistryException(String.format("not found %s registry plugin ", registerPluginName)); - } - } - - /** - * load registry - */ - private void loadRegistry() { - try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(registryFactory.getClass().getClassLoader())) { - registry = registryFactory.create(); - } - } - - /** - * get registry - * @return registry - */ - public Registry getRegistry() { - if (null == registry) { - throw new RegistryException("not install registry"); - } - return registry; - } - -} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java deleted file mode 100644 index 3db7f2e1a2..0000000000 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.spi.register; - -/** - * Registration center subscription. All listeners must implement this interface - */ -public interface SubscribeListener { - - /** - * Processing logic when the subscription node changes - */ - void notify(String path, String data, DataChangeEvent dataChangeEvent); - -} diff --git a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java index 4a417704c8..27d9a55711 100644 --- a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java +++ b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java @@ -86,15 +86,6 @@ public class StandaloneServer { private static void startRegistry() throws Exception { final TestingServer server = new TestingServer(true); System.setProperty("registry.servers", server.getConnectString()); - - final Path registryPath = Paths.get( - StandaloneServer.class.getProtectionDomain().getCodeSource().getLocation().getPath(), - "../../../dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml" - ).toAbsolutePath(); - if (Files.exists(registryPath)) { - System.setProperty("registry.plugin.binding", registryPath.toString()); - System.setProperty("registry.plugin.dir", ""); - } } private static void startDatabase() throws IOException, SQLException { diff --git a/pom.xml b/pom.xml index 0c42a7a244..a475181ad4 100644 --- a/pom.xml +++ b/pom.xml @@ -128,6 +128,7 @@ 1.9.16 1.5.1 0.10.9 + 1.0.1 @@ -231,7 +232,12 @@ org.apache.dolphinscheduler - dolphinscheduler-registry-plugin + dolphinscheduler-registry + ${project.version} + + + org.apache.dolphinscheduler + dolphinscheduler-registry-zookeeper ${project.version} @@ -270,6 +276,12 @@ ${project.version} + + org.apache.dolphinscheduler + dolphinscheduler-registry-api + ${project.version} + + org.apache.curator curator-framework @@ -690,6 +702,13 @@ py4j ${py4j.version} + + + com.google.auto.service + auto-service + ${auto-service.version} + true + @@ -1234,7 +1253,7 @@ dolphinscheduler-spi dolphinscheduler-alert-plugin - dolphinscheduler-registry-plugin + dolphinscheduler-registry dolphinscheduler-task-plugin dolphinscheduler-ui dolphinscheduler-server diff --git a/script/env/dolphinscheduler_env.sh b/script/env/dolphinscheduler_env.sh index e62f54e396..1798a5d311 100755 --- a/script/env/dolphinscheduler_env.sh +++ b/script/env/dolphinscheduler_env.sh @@ -20,7 +20,7 @@ export HADOOP_CONF_DIR=/opt/soft/hadoop/etc/hadoop export SPARK_HOME1=/opt/soft/spark1 export SPARK_HOME2=/opt/soft/spark2 export PYTHON_HOME=/opt/soft/python -export JAVA_HOME=/opt/soft/java +export JAVA_HOME=${JAVA_HOME:-/opt/soft/java} export HIVE_HOME=/opt/soft/hive export FLINK_HOME=/opt/soft/flink export DATAX_HOME=/opt/soft/datax