From e074d7a8bf2c559777ce92892d39df998d9d72cd Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 29 Mar 2023 17:33:12 +0800 Subject: [PATCH] Add jdbc registry module (#13812) * Add jdbc registry module * Update dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/README.md Co-authored-by: kezhenxu94 --------- Co-authored-by: kezhenxu94 --- .../dolphinscheduler-registry-all/pom.xml | 2 +- .../README.md | 34 ++++++- .../pom.xml | 8 +- .../plugin/registry/jdbc/JdbcOperator.java} | 96 +++++++++---------- .../plugin/registry/jdbc/JdbcRegistry.java} | 66 ++++++------- .../jdbc/JdbcRegistryConfiguration.java} | 30 +++--- .../registry/jdbc/JdbcRegistryConstant.java} | 4 +- .../jdbc/JdbcRegistryProperties.java} | 6 +- .../jdbc/mapper/JdbcRegistryDataMapper.java} | 28 +++--- .../jdbc/mapper/JdbcRegistryLockMapper.java} | 14 +-- .../plugin/registry/jdbc}/model/DataType.java | 2 +- .../jdbc/model/JdbcRegistryData.java} | 18 ++-- .../jdbc/model/JdbcRegistryLock.java} | 16 ++-- .../jdbc}/task/EphemeralDateManager.java | 38 ++++---- .../jdbc}/task/RegistryLockManager.java | 50 +++++----- .../jdbc}/task/SubscribeDataManager.java | 64 ++++++------- .../main/resources/mysql_registry_init.sql | 8 +- .../resources/postgresql_registry_init.sql | 45 +++++++++ .../dolphinscheduler-registry-plugins/pom.xml | 2 +- 19 files changed, 305 insertions(+), 226 deletions(-) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/{dolphinscheduler-registry-mysql => dolphinscheduler-registry-jdbc}/README.md (54%) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/{dolphinscheduler-registry-mysql => dolphinscheduler-registry-jdbc}/pom.xml (91%) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/{dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java => dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcOperator.java} (56%) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/{dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java => dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java} (73%) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/{dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConfiguration.java => dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConfiguration.java} (55%) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/{dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConstant.java => dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConstant.java} (91%) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/{dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryProperties.java => dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryProperties.java} (93%) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/{dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryDataMapper.java => dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryDataMapper.java} (62%) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/{dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryLockMapper.java => dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryLockMapper.java} (76%) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/{dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql => dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc}/model/DataType.java (94%) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/{dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java => dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/JdbcRegistryData.java} (79%) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/{dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java => dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/JdbcRegistryLock.java} (83%) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/{dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql => dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc}/task/EphemeralDateManager.java (80%) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/{dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql => dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc}/task/RegistryLockManager.java (68%) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/{dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql => dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc}/task/SubscribeDataManager.java (67%) rename dolphinscheduler-registry/dolphinscheduler-registry-plugins/{dolphinscheduler-registry-mysql => dolphinscheduler-registry-jdbc}/src/main/resources/mysql_registry_init.sql (92%) create mode 100644 dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/resources/postgresql_registry_init.sql diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-all/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-all/pom.xml index bc55198b25..ee0a30a03a 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-all/pom.xml +++ b/dolphinscheduler-registry/dolphinscheduler-registry-all/pom.xml @@ -34,7 +34,7 @@ org.apache.dolphinscheduler - dolphinscheduler-registry-mysql + dolphinscheduler-registry-jdbc ${project.version} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/README.md b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/README.md similarity index 54% rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/README.md rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/README.md index 4779db3bdf..305714ad9b 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/README.md +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/README.md @@ -4,7 +4,8 @@ This module is the mysql registry plugin module, this plugin will use mysql as t # How to use -If you want to set the registry center as mysql, you need to do the below two steps: +## Use Mysql as registry center +If you want to set mysql as the registry center, you need to do the below two steps: 1. Initialize the mysql table @@ -16,11 +17,10 @@ You need to set the registry properties in master/worker/api's appplication.yml ```yaml registry: - type: mysql + type: jdbc term-refresh-interval: 2s term-expire-times: 3 hikari-config: - driver-class-name: com.mysql.cj.jdbc.Driver jdbc-url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler username: root password: root @@ -34,3 +34,31 @@ store server metadata. NOTE: You need to add `mysql-connector-java.jar` into DS classpath, since this plugin will not bundle this driver in distribution. You can get the detail about Initialize the Database + +## Use Postgresql as registry center +If you want to set Postgresql as the registry center, you need to do the below two steps: + +1. Initialize the PostgreSQL table + +You can directly execute the sql script `src/main/resources/postgresql_registry_init.sql`. + +2. Open the config + +You need to set the registry properties in master/worker/api's appplication.yml + +```yaml +registry: + type: jdbc + term-refresh-interval: 2s + term-expire-times: 3 + hikari-config: + jdbc-url: jdbc:postgresql://localhost:5432/dolphinscheduler + username: root + password: root + maximum-pool-size: 5 + connection-timeout: 9000 + idle-timeout: 600000 +``` + +After do this two steps, you can start your DolphinScheduler cluster, your cluster will use postgresql as registry center to +store server metadata. diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/pom.xml similarity index 91% rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/pom.xml index dac8423d5e..47b6449293 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/pom.xml @@ -26,7 +26,7 @@ dev-SNAPSHOT - dolphinscheduler-registry-mysql + dolphinscheduler-registry-jdbc @@ -41,11 +41,17 @@ com.zaxxer HikariCP + mysql mysql-connector-java + + org.postgresql + postgresql + + org.slf4j slf4j-api diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcOperator.java similarity index 56% rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcOperator.java index 2b8e7730e3..dfb297a6a6 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcOperator.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.plugin.registry.mysql; +package org.apache.dolphinscheduler.plugin.registry.jdbc; -import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryDataMapper; -import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryLockMapper; -import org.apache.dolphinscheduler.plugin.registry.mysql.model.DataType; -import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData; -import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock; +import org.apache.dolphinscheduler.plugin.registry.jdbc.mapper.JdbcRegistryDataMapper; +import org.apache.dolphinscheduler.plugin.registry.jdbc.mapper.JdbcRegistryLockMapper; +import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DataType; +import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryData; +import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryLock; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -37,115 +37,115 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; @Component -@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "mysql") -public class MysqlOperator { +@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "jdbc") +public class JdbcOperator { @Autowired - private MysqlRegistryDataMapper mysqlRegistryDataMapper; + private JdbcRegistryDataMapper jdbcRegistryDataMapper; @Autowired - private MysqlRegistryLockMapper mysqlRegistryLockMapper; + private JdbcRegistryLockMapper jdbcRegistryLockMapper; private final long expireTimeWindow; - public MysqlOperator(MysqlRegistryProperties registryProperties) { + public JdbcOperator(JdbcRegistryProperties registryProperties) { this.expireTimeWindow = registryProperties.getTermExpireTimes() * registryProperties.getTermRefreshInterval().toMillis(); } public void healthCheck() { - mysqlRegistryLockMapper.countAll(); + jdbcRegistryLockMapper.countAll(); } - public List queryAllMysqlRegistryData() { - return mysqlRegistryDataMapper.selectAll(); + public List queryAllJdbcRegistryData() { + return jdbcRegistryDataMapper.selectAll(); } public Long insertOrUpdateEphemeralData(String key, String value) throws SQLException { - MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMapper.selectByKey(key); - if (mysqlRegistryData != null) { - long id = mysqlRegistryData.getId(); - if (mysqlRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) { + JdbcRegistryData jdbcRegistryData = jdbcRegistryDataMapper.selectByKey(key); + if (jdbcRegistryData != null) { + long id = jdbcRegistryData.getId(); + if (jdbcRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) { throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value)); } return id; } - mysqlRegistryData = MysqlRegistryData.builder() + jdbcRegistryData = JdbcRegistryData.builder() .key(key) .data(value) .type(DataType.EPHEMERAL.getTypeValue()) .lastTerm(System.currentTimeMillis()) .build(); - mysqlRegistryDataMapper.insert(mysqlRegistryData); - return mysqlRegistryData.getId(); + jdbcRegistryDataMapper.insert(jdbcRegistryData); + return jdbcRegistryData.getId(); } public long insertOrUpdatePersistentData(String key, String value) throws SQLException { - MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMapper.selectByKey(key); - if (mysqlRegistryData != null) { - long id = mysqlRegistryData.getId(); - if (mysqlRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) { + JdbcRegistryData jdbcRegistryData = jdbcRegistryDataMapper.selectByKey(key); + if (jdbcRegistryData != null) { + long id = jdbcRegistryData.getId(); + if (jdbcRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) { throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value)); } return id; } - mysqlRegistryData = MysqlRegistryData.builder() + jdbcRegistryData = JdbcRegistryData.builder() .key(key) .data(value) .type(DataType.PERSISTENT.getTypeValue()) .lastTerm(System.currentTimeMillis()) .build(); - mysqlRegistryDataMapper.insert(mysqlRegistryData); - return mysqlRegistryData.getId(); + jdbcRegistryDataMapper.insert(jdbcRegistryData); + return jdbcRegistryData.getId(); } public void deleteDataByKey(String key) { - mysqlRegistryDataMapper.deleteByKey(key); + jdbcRegistryDataMapper.deleteByKey(key); } public void deleteDataById(long id) { - mysqlRegistryDataMapper.deleteById(id); + jdbcRegistryDataMapper.deleteById(id); } public void clearExpireLock() { - mysqlRegistryLockMapper.clearExpireLock(System.currentTimeMillis() - expireTimeWindow); + jdbcRegistryLockMapper.clearExpireLock(System.currentTimeMillis() - expireTimeWindow); } public void clearExpireEphemeralDate() { - mysqlRegistryDataMapper.clearExpireEphemeralDate(System.currentTimeMillis() - expireTimeWindow, + jdbcRegistryDataMapper.clearExpireEphemeralDate(System.currentTimeMillis() - expireTimeWindow, DataType.EPHEMERAL.getTypeValue()); } - public MysqlRegistryData getData(String key) throws SQLException { - return mysqlRegistryDataMapper.selectByKey(key); + public JdbcRegistryData getData(String key) throws SQLException { + return jdbcRegistryDataMapper.selectByKey(key); } public List getChildren(String key) throws SQLException { - return mysqlRegistryDataMapper.fuzzyQueryByKey(key) + return jdbcRegistryDataMapper.fuzzyQueryByKey(key) .stream() - .map(MysqlRegistryData::getKey) + .map(JdbcRegistryData::getKey) .filter(fullPath -> fullPath.length() > key.length()) .map(fullPath -> StringUtils.substringBefore(fullPath.substring(key.length() + 1), "/")) .collect(Collectors.toList()); } public boolean existKey(String key) throws SQLException { - MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMapper.selectByKey(key); - return mysqlRegistryData != null; + JdbcRegistryData jdbcRegistryData = jdbcRegistryDataMapper.selectByKey(key); + return jdbcRegistryData != null; } /** * Try to acquire the target Lock, if cannot acquire, return null. */ @SuppressWarnings("checkstyle:IllegalCatch") - public MysqlRegistryLock tryToAcquireLock(String key) throws SQLException { - MysqlRegistryLock mysqlRegistryLock = MysqlRegistryLock.builder() + public JdbcRegistryLock tryToAcquireLock(String key) throws SQLException { + JdbcRegistryLock jdbcRegistryLock = JdbcRegistryLock.builder() .key(key) - .lockOwner(MysqlRegistryConstant.LOCK_OWNER) + .lockOwner(JdbcRegistryConstant.LOCK_OWNER) .lastTerm(System.currentTimeMillis()) .build(); try { - mysqlRegistryLockMapper.insert(mysqlRegistryLock); - return mysqlRegistryLock; + jdbcRegistryLockMapper.insert(jdbcRegistryLock); + return jdbcRegistryLock; } catch (Exception e) { if (e instanceof SQLIntegrityConstraintViolationException) { return null; @@ -154,26 +154,26 @@ public class MysqlOperator { } } - public MysqlRegistryLock getLockById(long lockId) throws SQLException { - return mysqlRegistryLockMapper.selectById(lockId); + public JdbcRegistryLock getLockById(long lockId) throws SQLException { + return jdbcRegistryLockMapper.selectById(lockId); } public boolean releaseLock(long lockId) throws SQLException { - return mysqlRegistryLockMapper.deleteById(lockId) > 0; + return jdbcRegistryLockMapper.deleteById(lockId) > 0; } public boolean updateEphemeralDataTerm(Collection ephemeralDateIds) throws SQLException { if (CollectionUtils.isEmpty(ephemeralDateIds)) { return true; } - return mysqlRegistryDataMapper.updateTermByIds(ephemeralDateIds, System.currentTimeMillis()) > 0; + return jdbcRegistryDataMapper.updateTermByIds(ephemeralDateIds, System.currentTimeMillis()) > 0; } public boolean updateLockTerm(List lockIds) { if (CollectionUtils.isEmpty(lockIds)) { return true; } - return mysqlRegistryLockMapper.updateTermByIds(lockIds, System.currentTimeMillis()) > 0; + return jdbcRegistryLockMapper.updateTermByIds(lockIds, System.currentTimeMillis()) > 0; } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java similarity index 73% rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java index a1d57ddb09..8ad409c424 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java @@ -15,11 +15,11 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.plugin.registry.mysql; +package org.apache.dolphinscheduler.plugin.registry.jdbc; -import org.apache.dolphinscheduler.plugin.registry.mysql.task.EphemeralDateManager; -import org.apache.dolphinscheduler.plugin.registry.mysql.task.RegistryLockManager; -import org.apache.dolphinscheduler.plugin.registry.mysql.task.SubscribeDataManager; +import org.apache.dolphinscheduler.plugin.registry.jdbc.task.EphemeralDateManager; +import org.apache.dolphinscheduler.plugin.registry.jdbc.task.RegistryLockManager; +import org.apache.dolphinscheduler.plugin.registry.jdbc.task.SubscribeDataManager; import org.apache.dolphinscheduler.registry.api.ConnectionListener; import org.apache.dolphinscheduler.registry.api.ConnectionState; import org.apache.dolphinscheduler.registry.api.Registry; @@ -43,36 +43,36 @@ import org.springframework.stereotype.Component; * store the DolphinScheduler master/worker's metadata and do the server registry/unRegistry. */ @Component -@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "mysql") +@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "jdbc") @Slf4j -public class MysqlRegistry implements Registry { +public class JdbcRegistry implements Registry { - private final MysqlRegistryProperties mysqlRegistryProperties; + private final JdbcRegistryProperties jdbcRegistryProperties; private final EphemeralDateManager ephemeralDateManager; private final SubscribeDataManager subscribeDataManager; private final RegistryLockManager registryLockManager; - private MysqlOperator mysqlOperator; - - public MysqlRegistry(MysqlRegistryProperties mysqlRegistryProperties, - MysqlOperator mysqlOperator) { - this.mysqlOperator = mysqlOperator; - mysqlOperator.clearExpireLock(); - mysqlOperator.clearExpireEphemeralDate(); - this.mysqlRegistryProperties = mysqlRegistryProperties; - this.ephemeralDateManager = new EphemeralDateManager(mysqlRegistryProperties, mysqlOperator); - this.subscribeDataManager = new SubscribeDataManager(mysqlRegistryProperties, mysqlOperator); - this.registryLockManager = new RegistryLockManager(mysqlRegistryProperties, mysqlOperator); - log.info("Initialize Mysql Registry..."); + private JdbcOperator jdbcOperator; + + public JdbcRegistry(JdbcRegistryProperties jdbcRegistryProperties, + JdbcOperator jdbcOperator) { + this.jdbcOperator = jdbcOperator; + jdbcOperator.clearExpireLock(); + jdbcOperator.clearExpireEphemeralDate(); + this.jdbcRegistryProperties = jdbcRegistryProperties; + this.ephemeralDateManager = new EphemeralDateManager(jdbcRegistryProperties, jdbcOperator); + this.subscribeDataManager = new SubscribeDataManager(jdbcRegistryProperties, jdbcOperator); + this.registryLockManager = new RegistryLockManager(jdbcRegistryProperties, jdbcOperator); + log.info("Initialize Jdbc Registry..."); } @PostConstruct public void start() { - log.info("Starting Mysql Registry..."); - // start a mysql connect check + log.info("Starting Jdbc Registry..."); + // start a jdbc connect check ephemeralDateManager.start(); subscribeDataManager.start(); registryLockManager.start(); - log.info("Started Mysql Registry..."); + log.info("Started Jdbc Registry..."); } @Override @@ -82,16 +82,16 @@ public class MysqlRegistry implements Registry { while (true) { if (System.currentTimeMillis() > endTimeMills) { throw new RegistryException( - String.format("Cannot connect to mysql registry in %s s", timeout.getSeconds())); + String.format("Cannot connect to jdbc registry in %s s", timeout.getSeconds())); } if (ephemeralDateManager.getConnectionState() == ConnectionState.CONNECTED) { return; } try { - Thread.sleep(mysqlRegistryProperties.getTermRefreshInterval().toMillis()); + Thread.sleep(jdbcRegistryProperties.getTermRefreshInterval().toMillis()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new RegistryException("Cannot connect to mysql registry due to interrupted exception", e); + throw new RegistryException("Cannot connect to jdbc registry due to interrupted exception", e); } } } @@ -127,7 +127,7 @@ public class MysqlRegistry implements Registry { // when put a ephemeralData will new a scheduler thread to update it ephemeralDateManager.insertOrUpdateEphemeralData(key, value); } else { - mysqlOperator.insertOrUpdatePersistentData(key, value); + jdbcOperator.insertOrUpdatePersistentData(key, value); } } catch (Exception ex) { throw new RegistryException(String.format("put key:%s, value:%s error", key, value), ex); @@ -137,7 +137,7 @@ public class MysqlRegistry implements Registry { @Override public void delete(String key) { try { - mysqlOperator.deleteDataByKey(key); + jdbcOperator.deleteDataByKey(key); } catch (Exception e) { throw new RegistryException(String.format("Delete key: %s error", key), e); } @@ -146,7 +146,7 @@ public class MysqlRegistry implements Registry { @Override public Collection children(String key) { try { - return mysqlOperator.getChildren(key); + return jdbcOperator.getChildren(key); } catch (SQLException e) { throw new RegistryException(String.format("Get key: %s children error", key), e); } @@ -155,7 +155,7 @@ public class MysqlRegistry implements Registry { @Override public boolean exists(String key) { try { - return mysqlOperator.existKey(key); + return jdbcOperator.existKey(key); } catch (Exception e) { throw new RegistryException(String.format("Check key: %s exist error", key), e); } @@ -181,15 +181,15 @@ public class MysqlRegistry implements Registry { @Override public void close() { - log.info("Closing Mysql Registry..."); - // remove the current Ephemeral node, if can connect to mysql + log.info("Closing Jdbc Registry..."); + // remove the current Ephemeral node, if can connect to jdbc try ( EphemeralDateManager closed1 = ephemeralDateManager; SubscribeDataManager close2 = subscribeDataManager; RegistryLockManager close3 = registryLockManager) { } catch (Exception e) { - log.error("Close Mysql Registry error", e); + log.error("Close Jdbc Registry error", e); } - log.info("Closed Mysql Registry..."); + log.info("Closed Jdbc Registry..."); } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConfiguration.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConfiguration.java similarity index 55% rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConfiguration.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConfiguration.java index 70a1baae53..ced09380f6 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConfiguration.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConfiguration.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.plugin.registry.mysql; +package org.apache.dolphinscheduler.plugin.registry.jdbc; -import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryDataMapper; -import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryLockMapper; +import org.apache.dolphinscheduler.plugin.registry.jdbc.mapper.JdbcRegistryDataMapper; +import org.apache.dolphinscheduler.plugin.registry.jdbc.mapper.JdbcRegistryLockMapper; import org.apache.ibatis.session.SqlSessionFactory; @@ -31,31 +31,31 @@ import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean; import com.zaxxer.hikari.HikariDataSource; @Configuration -@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "mysql") -public class MysqlRegistryConfiguration { +@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "jdbc") +public class JdbcRegistryConfiguration { @Bean - public SqlSessionFactory mysqlRegistrySqlSessionFactory(MysqlRegistryProperties mysqlRegistryProperties) throws Exception { + public SqlSessionFactory jdbcRegistrySqlSessionFactory(JdbcRegistryProperties jdbcRegistryProperties) throws Exception { MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean(); - sqlSessionFactoryBean.setDataSource(new HikariDataSource(mysqlRegistryProperties.getHikariConfig())); + sqlSessionFactoryBean.setDataSource(new HikariDataSource(jdbcRegistryProperties.getHikariConfig())); return sqlSessionFactoryBean.getObject(); } @Bean - public SqlSessionTemplate mysqlRegistrySqlSessionTemplate(SqlSessionFactory mysqlRegistrySqlSessionFactory) { - mysqlRegistrySqlSessionFactory.getConfiguration().addMapper(MysqlRegistryDataMapper.class); - mysqlRegistrySqlSessionFactory.getConfiguration().addMapper(MysqlRegistryLockMapper.class); - return new SqlSessionTemplate(mysqlRegistrySqlSessionFactory); + public SqlSessionTemplate jdbcRegistrySqlSessionTemplate(SqlSessionFactory jdbcRegistrySqlSessionFactory) { + jdbcRegistrySqlSessionFactory.getConfiguration().addMapper(JdbcRegistryDataMapper.class); + jdbcRegistrySqlSessionFactory.getConfiguration().addMapper(JdbcRegistryLockMapper.class); + return new SqlSessionTemplate(jdbcRegistrySqlSessionFactory); } @Bean - public MysqlRegistryDataMapper mysqlRegistryDataMapper(SqlSessionTemplate mysqlRegistrySqlSessionTemplate) { - return mysqlRegistrySqlSessionTemplate.getMapper(MysqlRegistryDataMapper.class); + public JdbcRegistryDataMapper jdbcRegistryDataMapper(SqlSessionTemplate jdbcRegistrySqlSessionTemplate) { + return jdbcRegistrySqlSessionTemplate.getMapper(JdbcRegistryDataMapper.class); } @Bean - public MysqlRegistryLockMapper mysqlRegistryLockMapper(SqlSessionTemplate mysqlRegistrySqlSessionTemplate) { - return mysqlRegistrySqlSessionTemplate.getMapper(MysqlRegistryLockMapper.class); + public JdbcRegistryLockMapper jdbcRegistryLockMapper(SqlSessionTemplate jdbcRegistrySqlSessionTemplate) { + return jdbcRegistrySqlSessionTemplate.getMapper(JdbcRegistryLockMapper.class); } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConstant.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConstant.java similarity index 91% rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConstant.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConstant.java index 1482560b7c..4a016f4d2e 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConstant.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConstant.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.plugin.registry.mysql; +package org.apache.dolphinscheduler.plugin.registry.jdbc; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; @@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.common.utils.OSUtils; import lombok.experimental.UtilityClass; @UtilityClass -public final class MysqlRegistryConstant { +public final class JdbcRegistryConstant { public static final long LOCK_ACQUIRE_INTERVAL = 1_000; diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryProperties.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryProperties.java similarity index 93% rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryProperties.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryProperties.java index 9690418bfb..eb8d5c8807 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryProperties.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryProperties.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.plugin.registry.mysql; +package org.apache.dolphinscheduler.plugin.registry.jdbc; import java.time.Duration; @@ -29,9 +29,9 @@ import com.zaxxer.hikari.HikariConfig; @Data @Configuration -@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "mysql") +@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "jdbc") @ConfigurationProperties(prefix = "registry") -public class MysqlRegistryProperties { +public class JdbcRegistryProperties { /** * Used to schedule refresh the ephemeral data/ lock. diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryDataMapper.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryDataMapper.java similarity index 62% rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryDataMapper.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryDataMapper.java index ef0cd5c3cc..83eb0aab43 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryDataMapper.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryDataMapper.java @@ -15,9 +15,9 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.plugin.registry.mysql.mapper; +package org.apache.dolphinscheduler.plugin.registry.jdbc.mapper; -import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData; +import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryData; import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Param; @@ -29,29 +29,29 @@ import java.util.List; import com.baomidou.mybatisplus.core.mapper.BaseMapper; -public interface MysqlRegistryDataMapper extends BaseMapper { +public interface JdbcRegistryDataMapper extends BaseMapper { - @Select("select * from t_ds_mysql_registry_data") - List selectAll(); + @Select("select * from t_ds_jdbc_registry_data") + List selectAll(); - @Select("select * from t_ds_mysql_registry_data where `key` = #{key}") - MysqlRegistryData selectByKey(@Param("key") String key); + @Select("select * from t_ds_jdbc_registry_data where key = #{key}") + JdbcRegistryData selectByKey(@Param("key") String key); - @Select("select * from t_ds_mysql_registry_data where `key` like CONCAT (#{key}, '%')") - List fuzzyQueryByKey(@Param("key") String key); + @Select("select * from t_ds_jdbc_registry_data where key like CONCAT (#{key}, '%')") + List fuzzyQueryByKey(@Param("key") String key); - @Update("update t_ds_mysql_registry_data set `data` = #{data}, `last_term` = #{term} where `id` = #{id}") + @Update("update t_ds_jdbc_registry_data set data = #{data}, last_term = #{term} where id = #{id}") int updateDataAndTermById(@Param("id") long id, @Param("data") String data, @Param("term") long term); - @Delete("delete from t_ds_mysql_registry_data where `key` = #{key}") + @Delete("delete from t_ds_jdbc_registry_data where key = #{key}") void deleteByKey(@Param("key") String key); - @Delete("delete from t_ds_mysql_registry_data where `last_term` < #{term} and `type` = #{type}") + @Delete("delete from t_ds_jdbc_registry_data where last_term < #{term} and type = #{type}") void clearExpireEphemeralDate(@Param("term") long term, @Param("type") int type); @Update({"