Browse Source

Add jdbc registry module (#13812)

* Add jdbc registry module

* Update dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/README.md

Co-authored-by: kezhenxu94 <kezhenxu94@apache.org>

---------

Co-authored-by: kezhenxu94 <kezhenxu94@apache.org>
dependabot/maven/dolphinscheduler-api-test/com.fasterxml.jackson.core-jackson-databind-2.13.4.2
Wenjun Ruan 1 year ago committed by GitHub
parent
commit
e074d7a8bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-registry/dolphinscheduler-registry-all/pom.xml
  2. 34
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/README.md
  3. 8
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/pom.xml
  4. 96
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcOperator.java
  5. 66
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java
  6. 30
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConfiguration.java
  7. 4
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConstant.java
  8. 6
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryProperties.java
  9. 28
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryDataMapper.java
  10. 14
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryLockMapper.java
  11. 2
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/DataType.java
  12. 18
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/JdbcRegistryData.java
  13. 16
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/JdbcRegistryLock.java
  14. 38
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/EphemeralDateManager.java
  15. 50
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/RegistryLockManager.java
  16. 64
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/SubscribeDataManager.java
  17. 8
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/resources/mysql_registry_init.sql
  18. 45
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/resources/postgresql_registry_init.sql
  19. 2
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml

2
dolphinscheduler-registry/dolphinscheduler-registry-all/pom.xml

@ -34,7 +34,7 @@
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-mysql</artifactId>
<artifactId>dolphinscheduler-registry-jdbc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>

34
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/README.md → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/README.md

@ -4,7 +4,8 @@ This module is the mysql registry plugin module, this plugin will use mysql as t
# How to use
If you want to set the registry center as mysql, you need to do the below two steps:
## Use Mysql as registry center
If you want to set mysql as the registry center, you need to do the below two steps:
1. Initialize the mysql table
@ -16,11 +17,10 @@ You need to set the registry properties in master/worker/api's appplication.yml
```yaml
registry:
type: mysql
type: jdbc
term-refresh-interval: 2s
term-expire-times: 3
hikari-config:
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler
username: root
password: root
@ -34,3 +34,31 @@ store server metadata.
NOTE: You need to add `mysql-connector-java.jar` into DS classpath, since this plugin will not bundle this driver in distribution.
You can get the detail about <a href="https://dolphinscheduler.apache.org/en-us/docs/3.1.2/guide/installation/pseudo-cluster">Initialize the Database</a>
## Use Postgresql as registry center
If you want to set Postgresql as the registry center, you need to do the below two steps:
1. Initialize the PostgreSQL table
You can directly execute the sql script `src/main/resources/postgresql_registry_init.sql`.
2. Open the config
You need to set the registry properties in master/worker/api's appplication.yml
```yaml
registry:
type: jdbc
term-refresh-interval: 2s
term-expire-times: 3
hikari-config:
jdbc-url: jdbc:postgresql://localhost:5432/dolphinscheduler
username: root
password: root
maximum-pool-size: 5
connection-timeout: 9000
idle-timeout: 600000
```
After do this two steps, you can start your DolphinScheduler cluster, your cluster will use postgresql as registry center to
store server metadata.

8
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/pom.xml

@ -26,7 +26,7 @@
<version>dev-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-registry-mysql</artifactId>
<artifactId>dolphinscheduler-registry-jdbc</artifactId>
<dependencies>
<dependency>
@ -41,11 +41,17 @@
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>

96
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcOperator.java

@ -15,13 +15,13 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql;
package org.apache.dolphinscheduler.plugin.registry.jdbc;
import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryDataMapper;
import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryLockMapper;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.DataType;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock;
import org.apache.dolphinscheduler.plugin.registry.jdbc.mapper.JdbcRegistryDataMapper;
import org.apache.dolphinscheduler.plugin.registry.jdbc.mapper.JdbcRegistryLockMapper;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DataType;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryData;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryLock;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@ -37,115 +37,115 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
@Component
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "mysql")
public class MysqlOperator {
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "jdbc")
public class JdbcOperator {
@Autowired
private MysqlRegistryDataMapper mysqlRegistryDataMapper;
private JdbcRegistryDataMapper jdbcRegistryDataMapper;
@Autowired
private MysqlRegistryLockMapper mysqlRegistryLockMapper;
private JdbcRegistryLockMapper jdbcRegistryLockMapper;
private final long expireTimeWindow;
public MysqlOperator(MysqlRegistryProperties registryProperties) {
public JdbcOperator(JdbcRegistryProperties registryProperties) {
this.expireTimeWindow =
registryProperties.getTermExpireTimes() * registryProperties.getTermRefreshInterval().toMillis();
}
public void healthCheck() {
mysqlRegistryLockMapper.countAll();
jdbcRegistryLockMapper.countAll();
}
public List<MysqlRegistryData> queryAllMysqlRegistryData() {
return mysqlRegistryDataMapper.selectAll();
public List<JdbcRegistryData> queryAllJdbcRegistryData() {
return jdbcRegistryDataMapper.selectAll();
}
public Long insertOrUpdateEphemeralData(String key, String value) throws SQLException {
MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMapper.selectByKey(key);
if (mysqlRegistryData != null) {
long id = mysqlRegistryData.getId();
if (mysqlRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) {
JdbcRegistryData jdbcRegistryData = jdbcRegistryDataMapper.selectByKey(key);
if (jdbcRegistryData != null) {
long id = jdbcRegistryData.getId();
if (jdbcRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) {
throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value));
}
return id;
}
mysqlRegistryData = MysqlRegistryData.builder()
jdbcRegistryData = JdbcRegistryData.builder()
.key(key)
.data(value)
.type(DataType.EPHEMERAL.getTypeValue())
.lastTerm(System.currentTimeMillis())
.build();
mysqlRegistryDataMapper.insert(mysqlRegistryData);
return mysqlRegistryData.getId();
jdbcRegistryDataMapper.insert(jdbcRegistryData);
return jdbcRegistryData.getId();
}
public long insertOrUpdatePersistentData(String key, String value) throws SQLException {
MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMapper.selectByKey(key);
if (mysqlRegistryData != null) {
long id = mysqlRegistryData.getId();
if (mysqlRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) {
JdbcRegistryData jdbcRegistryData = jdbcRegistryDataMapper.selectByKey(key);
if (jdbcRegistryData != null) {
long id = jdbcRegistryData.getId();
if (jdbcRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) {
throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value));
}
return id;
}
mysqlRegistryData = MysqlRegistryData.builder()
jdbcRegistryData = JdbcRegistryData.builder()
.key(key)
.data(value)
.type(DataType.PERSISTENT.getTypeValue())
.lastTerm(System.currentTimeMillis())
.build();
mysqlRegistryDataMapper.insert(mysqlRegistryData);
return mysqlRegistryData.getId();
jdbcRegistryDataMapper.insert(jdbcRegistryData);
return jdbcRegistryData.getId();
}
public void deleteDataByKey(String key) {
mysqlRegistryDataMapper.deleteByKey(key);
jdbcRegistryDataMapper.deleteByKey(key);
}
public void deleteDataById(long id) {
mysqlRegistryDataMapper.deleteById(id);
jdbcRegistryDataMapper.deleteById(id);
}
public void clearExpireLock() {
mysqlRegistryLockMapper.clearExpireLock(System.currentTimeMillis() - expireTimeWindow);
jdbcRegistryLockMapper.clearExpireLock(System.currentTimeMillis() - expireTimeWindow);
}
public void clearExpireEphemeralDate() {
mysqlRegistryDataMapper.clearExpireEphemeralDate(System.currentTimeMillis() - expireTimeWindow,
jdbcRegistryDataMapper.clearExpireEphemeralDate(System.currentTimeMillis() - expireTimeWindow,
DataType.EPHEMERAL.getTypeValue());
}
public MysqlRegistryData getData(String key) throws SQLException {
return mysqlRegistryDataMapper.selectByKey(key);
public JdbcRegistryData getData(String key) throws SQLException {
return jdbcRegistryDataMapper.selectByKey(key);
}
public List<String> getChildren(String key) throws SQLException {
return mysqlRegistryDataMapper.fuzzyQueryByKey(key)
return jdbcRegistryDataMapper.fuzzyQueryByKey(key)
.stream()
.map(MysqlRegistryData::getKey)
.map(JdbcRegistryData::getKey)
.filter(fullPath -> fullPath.length() > key.length())
.map(fullPath -> StringUtils.substringBefore(fullPath.substring(key.length() + 1), "/"))
.collect(Collectors.toList());
}
public boolean existKey(String key) throws SQLException {
MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMapper.selectByKey(key);
return mysqlRegistryData != null;
JdbcRegistryData jdbcRegistryData = jdbcRegistryDataMapper.selectByKey(key);
return jdbcRegistryData != null;
}
/**
* Try to acquire the target Lock, if cannot acquire, return null.
*/
@SuppressWarnings("checkstyle:IllegalCatch")
public MysqlRegistryLock tryToAcquireLock(String key) throws SQLException {
MysqlRegistryLock mysqlRegistryLock = MysqlRegistryLock.builder()
public JdbcRegistryLock tryToAcquireLock(String key) throws SQLException {
JdbcRegistryLock jdbcRegistryLock = JdbcRegistryLock.builder()
.key(key)
.lockOwner(MysqlRegistryConstant.LOCK_OWNER)
.lockOwner(JdbcRegistryConstant.LOCK_OWNER)
.lastTerm(System.currentTimeMillis())
.build();
try {
mysqlRegistryLockMapper.insert(mysqlRegistryLock);
return mysqlRegistryLock;
jdbcRegistryLockMapper.insert(jdbcRegistryLock);
return jdbcRegistryLock;
} catch (Exception e) {
if (e instanceof SQLIntegrityConstraintViolationException) {
return null;
@ -154,26 +154,26 @@ public class MysqlOperator {
}
}
public MysqlRegistryLock getLockById(long lockId) throws SQLException {
return mysqlRegistryLockMapper.selectById(lockId);
public JdbcRegistryLock getLockById(long lockId) throws SQLException {
return jdbcRegistryLockMapper.selectById(lockId);
}
public boolean releaseLock(long lockId) throws SQLException {
return mysqlRegistryLockMapper.deleteById(lockId) > 0;
return jdbcRegistryLockMapper.deleteById(lockId) > 0;
}
public boolean updateEphemeralDataTerm(Collection<Long> ephemeralDateIds) throws SQLException {
if (CollectionUtils.isEmpty(ephemeralDateIds)) {
return true;
}
return mysqlRegistryDataMapper.updateTermByIds(ephemeralDateIds, System.currentTimeMillis()) > 0;
return jdbcRegistryDataMapper.updateTermByIds(ephemeralDateIds, System.currentTimeMillis()) > 0;
}
public boolean updateLockTerm(List<Long> lockIds) {
if (CollectionUtils.isEmpty(lockIds)) {
return true;
}
return mysqlRegistryLockMapper.updateTermByIds(lockIds, System.currentTimeMillis()) > 0;
return jdbcRegistryLockMapper.updateTermByIds(lockIds, System.currentTimeMillis()) > 0;
}
}

66
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java

@ -15,11 +15,11 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql;
package org.apache.dolphinscheduler.plugin.registry.jdbc;
import org.apache.dolphinscheduler.plugin.registry.mysql.task.EphemeralDateManager;
import org.apache.dolphinscheduler.plugin.registry.mysql.task.RegistryLockManager;
import org.apache.dolphinscheduler.plugin.registry.mysql.task.SubscribeDataManager;
import org.apache.dolphinscheduler.plugin.registry.jdbc.task.EphemeralDateManager;
import org.apache.dolphinscheduler.plugin.registry.jdbc.task.RegistryLockManager;
import org.apache.dolphinscheduler.plugin.registry.jdbc.task.SubscribeDataManager;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.registry.api.Registry;
@ -43,36 +43,36 @@ import org.springframework.stereotype.Component;
* store the DolphinScheduler master/worker's metadata and do the server registry/unRegistry.
*/
@Component
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "mysql")
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "jdbc")
@Slf4j
public class MysqlRegistry implements Registry {
public class JdbcRegistry implements Registry {
private final MysqlRegistryProperties mysqlRegistryProperties;
private final JdbcRegistryProperties jdbcRegistryProperties;
private final EphemeralDateManager ephemeralDateManager;
private final SubscribeDataManager subscribeDataManager;
private final RegistryLockManager registryLockManager;
private MysqlOperator mysqlOperator;
public MysqlRegistry(MysqlRegistryProperties mysqlRegistryProperties,
MysqlOperator mysqlOperator) {
this.mysqlOperator = mysqlOperator;
mysqlOperator.clearExpireLock();
mysqlOperator.clearExpireEphemeralDate();
this.mysqlRegistryProperties = mysqlRegistryProperties;
this.ephemeralDateManager = new EphemeralDateManager(mysqlRegistryProperties, mysqlOperator);
this.subscribeDataManager = new SubscribeDataManager(mysqlRegistryProperties, mysqlOperator);
this.registryLockManager = new RegistryLockManager(mysqlRegistryProperties, mysqlOperator);
log.info("Initialize Mysql Registry...");
private JdbcOperator jdbcOperator;
public JdbcRegistry(JdbcRegistryProperties jdbcRegistryProperties,
JdbcOperator jdbcOperator) {
this.jdbcOperator = jdbcOperator;
jdbcOperator.clearExpireLock();
jdbcOperator.clearExpireEphemeralDate();
this.jdbcRegistryProperties = jdbcRegistryProperties;
this.ephemeralDateManager = new EphemeralDateManager(jdbcRegistryProperties, jdbcOperator);
this.subscribeDataManager = new SubscribeDataManager(jdbcRegistryProperties, jdbcOperator);
this.registryLockManager = new RegistryLockManager(jdbcRegistryProperties, jdbcOperator);
log.info("Initialize Jdbc Registry...");
}
@PostConstruct
public void start() {
log.info("Starting Mysql Registry...");
// start a mysql connect check
log.info("Starting Jdbc Registry...");
// start a jdbc connect check
ephemeralDateManager.start();
subscribeDataManager.start();
registryLockManager.start();
log.info("Started Mysql Registry...");
log.info("Started Jdbc Registry...");
}
@Override
@ -82,16 +82,16 @@ public class MysqlRegistry implements Registry {
while (true) {
if (System.currentTimeMillis() > endTimeMills) {
throw new RegistryException(
String.format("Cannot connect to mysql registry in %s s", timeout.getSeconds()));
String.format("Cannot connect to jdbc registry in %s s", timeout.getSeconds()));
}
if (ephemeralDateManager.getConnectionState() == ConnectionState.CONNECTED) {
return;
}
try {
Thread.sleep(mysqlRegistryProperties.getTermRefreshInterval().toMillis());
Thread.sleep(jdbcRegistryProperties.getTermRefreshInterval().toMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RegistryException("Cannot connect to mysql registry due to interrupted exception", e);
throw new RegistryException("Cannot connect to jdbc registry due to interrupted exception", e);
}
}
}
@ -127,7 +127,7 @@ public class MysqlRegistry implements Registry {
// when put a ephemeralData will new a scheduler thread to update it
ephemeralDateManager.insertOrUpdateEphemeralData(key, value);
} else {
mysqlOperator.insertOrUpdatePersistentData(key, value);
jdbcOperator.insertOrUpdatePersistentData(key, value);
}
} catch (Exception ex) {
throw new RegistryException(String.format("put key:%s, value:%s error", key, value), ex);
@ -137,7 +137,7 @@ public class MysqlRegistry implements Registry {
@Override
public void delete(String key) {
try {
mysqlOperator.deleteDataByKey(key);
jdbcOperator.deleteDataByKey(key);
} catch (Exception e) {
throw new RegistryException(String.format("Delete key: %s error", key), e);
}
@ -146,7 +146,7 @@ public class MysqlRegistry implements Registry {
@Override
public Collection<String> children(String key) {
try {
return mysqlOperator.getChildren(key);
return jdbcOperator.getChildren(key);
} catch (SQLException e) {
throw new RegistryException(String.format("Get key: %s children error", key), e);
}
@ -155,7 +155,7 @@ public class MysqlRegistry implements Registry {
@Override
public boolean exists(String key) {
try {
return mysqlOperator.existKey(key);
return jdbcOperator.existKey(key);
} catch (Exception e) {
throw new RegistryException(String.format("Check key: %s exist error", key), e);
}
@ -181,15 +181,15 @@ public class MysqlRegistry implements Registry {
@Override
public void close() {
log.info("Closing Mysql Registry...");
// remove the current Ephemeral node, if can connect to mysql
log.info("Closing Jdbc Registry...");
// remove the current Ephemeral node, if can connect to jdbc
try (
EphemeralDateManager closed1 = ephemeralDateManager;
SubscribeDataManager close2 = subscribeDataManager;
RegistryLockManager close3 = registryLockManager) {
} catch (Exception e) {
log.error("Close Mysql Registry error", e);
log.error("Close Jdbc Registry error", e);
}
log.info("Closed Mysql Registry...");
log.info("Closed Jdbc Registry...");
}
}

30
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConfiguration.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConfiguration.java

@ -15,10 +15,10 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql;
package org.apache.dolphinscheduler.plugin.registry.jdbc;
import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryDataMapper;
import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryLockMapper;
import org.apache.dolphinscheduler.plugin.registry.jdbc.mapper.JdbcRegistryDataMapper;
import org.apache.dolphinscheduler.plugin.registry.jdbc.mapper.JdbcRegistryLockMapper;
import org.apache.ibatis.session.SqlSessionFactory;
@ -31,31 +31,31 @@ import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import com.zaxxer.hikari.HikariDataSource;
@Configuration
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "mysql")
public class MysqlRegistryConfiguration {
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "jdbc")
public class JdbcRegistryConfiguration {
@Bean
public SqlSessionFactory mysqlRegistrySqlSessionFactory(MysqlRegistryProperties mysqlRegistryProperties) throws Exception {
public SqlSessionFactory jdbcRegistrySqlSessionFactory(JdbcRegistryProperties jdbcRegistryProperties) throws Exception {
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(new HikariDataSource(mysqlRegistryProperties.getHikariConfig()));
sqlSessionFactoryBean.setDataSource(new HikariDataSource(jdbcRegistryProperties.getHikariConfig()));
return sqlSessionFactoryBean.getObject();
}
@Bean
public SqlSessionTemplate mysqlRegistrySqlSessionTemplate(SqlSessionFactory mysqlRegistrySqlSessionFactory) {
mysqlRegistrySqlSessionFactory.getConfiguration().addMapper(MysqlRegistryDataMapper.class);
mysqlRegistrySqlSessionFactory.getConfiguration().addMapper(MysqlRegistryLockMapper.class);
return new SqlSessionTemplate(mysqlRegistrySqlSessionFactory);
public SqlSessionTemplate jdbcRegistrySqlSessionTemplate(SqlSessionFactory jdbcRegistrySqlSessionFactory) {
jdbcRegistrySqlSessionFactory.getConfiguration().addMapper(JdbcRegistryDataMapper.class);
jdbcRegistrySqlSessionFactory.getConfiguration().addMapper(JdbcRegistryLockMapper.class);
return new SqlSessionTemplate(jdbcRegistrySqlSessionFactory);
}
@Bean
public MysqlRegistryDataMapper mysqlRegistryDataMapper(SqlSessionTemplate mysqlRegistrySqlSessionTemplate) {
return mysqlRegistrySqlSessionTemplate.getMapper(MysqlRegistryDataMapper.class);
public JdbcRegistryDataMapper jdbcRegistryDataMapper(SqlSessionTemplate jdbcRegistrySqlSessionTemplate) {
return jdbcRegistrySqlSessionTemplate.getMapper(JdbcRegistryDataMapper.class);
}
@Bean
public MysqlRegistryLockMapper mysqlRegistryLockMapper(SqlSessionTemplate mysqlRegistrySqlSessionTemplate) {
return mysqlRegistrySqlSessionTemplate.getMapper(MysqlRegistryLockMapper.class);
public JdbcRegistryLockMapper jdbcRegistryLockMapper(SqlSessionTemplate jdbcRegistrySqlSessionTemplate) {
return jdbcRegistrySqlSessionTemplate.getMapper(JdbcRegistryLockMapper.class);
}
}

4
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConstant.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConstant.java

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql;
package org.apache.dolphinscheduler.plugin.registry.jdbc;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.common.utils.OSUtils;
import lombok.experimental.UtilityClass;
@UtilityClass
public final class MysqlRegistryConstant {
public final class JdbcRegistryConstant {
public static final long LOCK_ACQUIRE_INTERVAL = 1_000;

6
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryProperties.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryProperties.java

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql;
package org.apache.dolphinscheduler.plugin.registry.jdbc;
import java.time.Duration;
@ -29,9 +29,9 @@ import com.zaxxer.hikari.HikariConfig;
@Data
@Configuration
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "mysql")
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "jdbc")
@ConfigurationProperties(prefix = "registry")
public class MysqlRegistryProperties {
public class JdbcRegistryProperties {
/**
* Used to schedule refresh the ephemeral data/ lock.

28
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryDataMapper.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryDataMapper.java

@ -15,9 +15,9 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql.mapper;
package org.apache.dolphinscheduler.plugin.registry.jdbc.mapper;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryData;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Param;
@ -29,29 +29,29 @@ import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface MysqlRegistryDataMapper extends BaseMapper<MysqlRegistryData> {
public interface JdbcRegistryDataMapper extends BaseMapper<JdbcRegistryData> {
@Select("select * from t_ds_mysql_registry_data")
List<MysqlRegistryData> selectAll();
@Select("select * from t_ds_jdbc_registry_data")
List<JdbcRegistryData> selectAll();
@Select("select * from t_ds_mysql_registry_data where `key` = #{key}")
MysqlRegistryData selectByKey(@Param("key") String key);
@Select("select * from t_ds_jdbc_registry_data where key = #{key}")
JdbcRegistryData selectByKey(@Param("key") String key);
@Select("select * from t_ds_mysql_registry_data where `key` like CONCAT (#{key}, '%')")
List<MysqlRegistryData> fuzzyQueryByKey(@Param("key") String key);
@Select("select * from t_ds_jdbc_registry_data where key like CONCAT (#{key}, '%')")
List<JdbcRegistryData> fuzzyQueryByKey(@Param("key") String key);
@Update("update t_ds_mysql_registry_data set `data` = #{data}, `last_term` = #{term} where `id` = #{id}")
@Update("update t_ds_jdbc_registry_data set data = #{data}, last_term = #{term} where id = #{id}")
int updateDataAndTermById(@Param("id") long id, @Param("data") String data, @Param("term") long term);
@Delete("delete from t_ds_mysql_registry_data where `key` = #{key}")
@Delete("delete from t_ds_jdbc_registry_data where key = #{key}")
void deleteByKey(@Param("key") String key);
@Delete("delete from t_ds_mysql_registry_data where `last_term` < #{term} and `type` = #{type}")
@Delete("delete from t_ds_jdbc_registry_data where last_term < #{term} and type = #{type}")
void clearExpireEphemeralDate(@Param("term") long term, @Param("type") int type);
@Update({"<script>",
"update t_ds_mysql_registry_data",
"set `last_term` = #{term}",
"update t_ds_jdbc_registry_data",
"set last_term = #{term}",
"where id IN ",
"<foreach item='id' index='index' collection='ids' open='(' separator=',' close=')'>",
" #{id}",

14
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryLockMapper.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryLockMapper.java

@ -15,9 +15,9 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql.mapper;
package org.apache.dolphinscheduler.plugin.registry.jdbc.mapper;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryLock;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Param;
@ -28,17 +28,17 @@ import java.util.Collection;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface MysqlRegistryLockMapper extends BaseMapper<MysqlRegistryLock> {
public interface JdbcRegistryLockMapper extends BaseMapper<JdbcRegistryLock> {
@Select("select count(1) from t_ds_mysql_registry_lock")
@Select("select count(1) from t_ds_jdbc_registry_lock")
int countAll();
@Delete("delete from t_ds_mysql_registry_lock where `last_term` < #{term}")
@Delete("delete from t_ds_jdbc_registry_lock where last_term < #{term}")
void clearExpireLock(@Param("term") long term);
@Update({"<script>",
"update t_ds_mysql_registry_lock",
"set `last_term` = #{term}",
"update t_ds_jdbc_registry_lock",
"set last_term = #{term}",
"where id IN ",
"<foreach item='id' index='index' collection='ids' open='(' separator=',' close=')'>",
" #{id}",

2
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/DataType.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/DataType.java

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql.model;
package org.apache.dolphinscheduler.plugin.registry.jdbc.model;
public enum DataType {

18
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/JdbcRegistryData.java

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql.model;
package org.apache.dolphinscheduler.plugin.registry.jdbc.model;
import java.util.Date;
@ -29,26 +29,26 @@ import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@TableName(value = "t_ds_mysql_registry_data")
@TableName(value = "t_ds_jdbc_registry_data")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MysqlRegistryData {
public class JdbcRegistryData {
@TableId(value = "id", type = IdType.AUTO)
private Long id;
@TableField(value = "`key`")
@TableField(value = "key")
private String key;
@TableField(value = "`data`")
@TableField(value = "data")
private String data;
@TableField(value = "`type`")
@TableField(value = "type")
private int type;
@TableField(value = "`last_term`")
@TableField(value = "last_term")
private long lastTerm;
@TableField(value = "`create_time`")
@TableField(value = "create_time")
private Date createTime;
@TableField(value = "`last_time`")
@TableField(value = "last_time")
private Date lastUpdateTime;
}

16
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/JdbcRegistryLock.java

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql.model;
package org.apache.dolphinscheduler.plugin.registry.jdbc.model;
import java.util.Date;
@ -29,38 +29,38 @@ import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@TableName(value = "t_ds_mysql_registry_lock")
@TableName(value = "t_ds_jdbc_registry_lock")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MysqlRegistryLock {
public class JdbcRegistryLock {
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* The lock key.
*/
@TableField(value = "`key`")
@TableField(value = "key")
private String key;
/**
* acquire lock host.
*/
@TableField(value = "`lock_owner`")
@TableField(value = "lock_owner")
private String lockOwner;
/**
* The last term, if the (currentTime - lastTerm) > termExpire time, the lock will be expired.
*/
@TableField(value = "`last_term`")
@TableField(value = "last_term")
private Long lastTerm;
/**
* The lock last update time.
*/
@TableField(value = "`last_update_time`")
@TableField(value = "last_update_time")
private Date lastUpdateTime;
/**
* The lock create time.
*/
@TableField(value = "`create_time`")
@TableField(value = "create_time")
private Date createTime;
}

38
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/EphemeralDateManager.java

@ -15,12 +15,12 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql.task;
package org.apache.dolphinscheduler.plugin.registry.jdbc.task;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlOperator;
import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcOperator;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
@ -39,21 +39,21 @@ import lombok.extern.slf4j.Slf4j;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* This thread is used to check the connect state to mysql.
* This thread is used to check the connect state to jdbc.
*/
@Slf4j
public class EphemeralDateManager implements AutoCloseable {
private ConnectionState connectionState;
private final MysqlOperator mysqlOperator;
private final MysqlRegistryProperties registryProperties;
private final JdbcOperator jdbcOperator;
private final JdbcRegistryProperties registryProperties;
private final List<ConnectionListener> connectionListeners = Collections.synchronizedList(new ArrayList<>());
private final Set<Long> ephemeralDateIds = Collections.synchronizedSet(new HashSet<>());
private final ScheduledExecutorService scheduledExecutorService;
public EphemeralDateManager(MysqlRegistryProperties registryProperties, MysqlOperator mysqlOperator) {
public EphemeralDateManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) {
this.registryProperties = registryProperties;
this.mysqlOperator = checkNotNull(mysqlOperator);
this.jdbcOperator = checkNotNull(jdbcOperator);
this.scheduledExecutorService = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder().setNameFormat("EphemeralDateTermRefreshThread").setDaemon(true).build());
@ -61,7 +61,7 @@ public class EphemeralDateManager implements AutoCloseable {
public void start() {
this.scheduledExecutorService.scheduleWithFixedDelay(
new EphemeralDateTermRefreshTask(mysqlOperator, connectionListeners, ephemeralDateIds),
new EphemeralDateTermRefreshTask(jdbcOperator, connectionListeners, ephemeralDateIds),
registryProperties.getTermRefreshInterval().toMillis(),
registryProperties.getTermRefreshInterval().toMillis(),
TimeUnit.MILLISECONDS);
@ -72,7 +72,7 @@ public class EphemeralDateManager implements AutoCloseable {
}
public long insertOrUpdateEphemeralData(String key, String value) throws SQLException {
long ephemeralId = mysqlOperator.insertOrUpdateEphemeralData(key, value);
long ephemeralId = jdbcOperator.insertOrUpdateEphemeralData(key, value);
ephemeralDateIds.add(ephemeralId);
return ephemeralId;
}
@ -87,7 +87,7 @@ public class EphemeralDateManager implements AutoCloseable {
connectionListeners.clear();
scheduledExecutorService.shutdownNow();
for (Long ephemeralDateId : ephemeralDateIds) {
mysqlOperator.deleteDataById(ephemeralDateId);
jdbcOperator.deleteDataById(ephemeralDateId);
}
}
@ -96,12 +96,12 @@ public class EphemeralDateManager implements AutoCloseable {
private final List<ConnectionListener> connectionListeners;
private final Set<Long> ephemeralDateIds;
private final MysqlOperator mysqlOperator;
private final JdbcOperator jdbcOperator;
private EphemeralDateTermRefreshTask(MysqlOperator mysqlOperator,
private EphemeralDateTermRefreshTask(JdbcOperator jdbcOperator,
List<ConnectionListener> connectionListeners,
Set<Long> ephemeralDateIds) {
this.mysqlOperator = checkNotNull(mysqlOperator);
this.jdbcOperator = checkNotNull(jdbcOperator);
this.connectionListeners = checkNotNull(connectionListeners);
this.ephemeralDateIds = checkNotNull(ephemeralDateIds);
}
@ -130,7 +130,7 @@ public class EphemeralDateManager implements AutoCloseable {
triggerListener(connectionState);
}
} catch (Exception e) {
log.error("Mysql Registry connect state check task execute failed", e);
log.error("Jdbc Registry connect state check task execute failed", e);
connectionState = ConnectionState.DISCONNECTED;
triggerListener(ConnectionState.DISCONNECTED);
}
@ -139,11 +139,11 @@ public class EphemeralDateManager implements AutoCloseable {
private ConnectionState getConnectionState() {
try {
if (ephemeralDateIds.isEmpty()) {
mysqlOperator.healthCheck();
jdbcOperator.healthCheck();
} else {
updateEphemeralDateTerm();
}
mysqlOperator.clearExpireEphemeralDate();
jdbcOperator.clearExpireEphemeralDate();
return ConnectionState.CONNECTED;
} catch (Exception ex) {
log.error("Get connection state error, meet an unknown exception", ex);
@ -152,8 +152,8 @@ public class EphemeralDateManager implements AutoCloseable {
}
private void updateEphemeralDateTerm() throws SQLException {
if (!mysqlOperator.updateEphemeralDataTerm(ephemeralDateIds)) {
log.warn("Update mysql registry ephemeral data: {} term error", ephemeralDateIds);
if (!jdbcOperator.updateEphemeralDataTerm(ephemeralDateIds)) {
log.warn("Update jdbc registry ephemeral data: {} term error", ephemeralDateIds);
}
}

50
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/RegistryLockManager.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/RegistryLockManager.java

@ -15,13 +15,13 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql.task;
package org.apache.dolphinscheduler.plugin.registry.jdbc.task;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlOperator;
import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlRegistryConstant;
import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcOperator;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryConstant;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryLock;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import java.sql.SQLException;
@ -42,22 +42,22 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
@Slf4j
public class RegistryLockManager implements AutoCloseable {
private final MysqlOperator mysqlOperator;
private final MysqlRegistryProperties registryProperties;
private final Map<String, MysqlRegistryLock> lockHoldMap;
private final JdbcOperator jdbcOperator;
private final JdbcRegistryProperties registryProperties;
private final Map<String, JdbcRegistryLock> lockHoldMap;
private final ScheduledExecutorService lockTermUpdateThreadPool;
public RegistryLockManager(MysqlRegistryProperties registryProperties, MysqlOperator mysqlOperator) {
public RegistryLockManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) {
this.registryProperties = registryProperties;
this.mysqlOperator = mysqlOperator;
this.jdbcOperator = jdbcOperator;
this.lockHoldMap = new ConcurrentHashMap<>();
this.lockTermUpdateThreadPool = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("MysqlRegistryLockTermRefreshThread").setDaemon(true).build());
new ThreadFactoryBuilder().setNameFormat("JdbcRegistryLockTermRefreshThread").setDaemon(true).build());
}
public void start() {
lockTermUpdateThreadPool.scheduleWithFixedDelay(
new LockTermRefreshTask(lockHoldMap, mysqlOperator),
new LockTermRefreshTask(lockHoldMap, jdbcOperator),
registryProperties.getTermRefreshInterval().toMillis(),
registryProperties.getTermRefreshInterval().toMillis(),
TimeUnit.MILLISECONDS);
@ -69,26 +69,26 @@ public class RegistryLockManager implements AutoCloseable {
public void acquireLock(String lockKey) throws RegistryException {
// maybe we can use the computeIf absent
lockHoldMap.computeIfAbsent(lockKey, key -> {
MysqlRegistryLock mysqlRegistryLock;
JdbcRegistryLock jdbcRegistryLock;
try {
while ((mysqlRegistryLock = mysqlOperator.tryToAcquireLock(lockKey)) == null) {
while ((jdbcRegistryLock = jdbcOperator.tryToAcquireLock(lockKey)) == null) {
log.debug("Acquire the lock {} failed try again", key);
// acquire failed, wait and try again
ThreadUtils.sleep(MysqlRegistryConstant.LOCK_ACQUIRE_INTERVAL);
ThreadUtils.sleep(JdbcRegistryConstant.LOCK_ACQUIRE_INTERVAL);
}
} catch (SQLException e) {
throw new RegistryException("Acquire the lock error", e);
}
return mysqlRegistryLock;
return jdbcRegistryLock;
});
}
public void releaseLock(String lockKey) {
MysqlRegistryLock mysqlRegistryLock = lockHoldMap.get(lockKey);
if (mysqlRegistryLock != null) {
JdbcRegistryLock jdbcRegistryLock = lockHoldMap.get(lockKey);
if (jdbcRegistryLock != null) {
try {
// the lock is unExit
mysqlOperator.releaseLock(mysqlRegistryLock.getId());
jdbcOperator.releaseLock(jdbcRegistryLock.getId());
lockHoldMap.remove(lockKey);
} catch (SQLException e) {
throw new RegistryException(String.format("Release lock: %s error", lockKey), e);
@ -99,7 +99,7 @@ public class RegistryLockManager implements AutoCloseable {
@Override
public void close() {
lockTermUpdateThreadPool.shutdownNow();
for (Map.Entry<String, MysqlRegistryLock> lockEntry : lockHoldMap.entrySet()) {
for (Map.Entry<String, JdbcRegistryLock> lockEntry : lockHoldMap.entrySet()) {
releaseLock(lockEntry.getKey());
}
}
@ -110,8 +110,8 @@ public class RegistryLockManager implements AutoCloseable {
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
static class LockTermRefreshTask implements Runnable {
private final Map<String, MysqlRegistryLock> lockHoldMap;
private final MysqlOperator mysqlOperator;
private final Map<String, JdbcRegistryLock> lockHoldMap;
private final JdbcOperator jdbcOperator;
public void run() {
try {
@ -120,12 +120,12 @@ public class RegistryLockManager implements AutoCloseable {
}
List<Long> lockIds = lockHoldMap.values()
.stream()
.map(MysqlRegistryLock::getId)
.map(JdbcRegistryLock::getId)
.collect(Collectors.toList());
if (!mysqlOperator.updateLockTerm(lockIds)) {
if (!jdbcOperator.updateLockTerm(lockIds)) {
log.warn("Update the lock: {} term failed.", lockIds);
}
mysqlOperator.clearExpireLock();
jdbcOperator.clearExpireLock();
} catch (Exception e) {
log.error("Update lock term error", e);
}

64
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/SubscribeDataManager.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/SubscribeDataManager.java

@ -15,11 +15,11 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql.task;
package org.apache.dolphinscheduler.plugin.registry.jdbc.task;
import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlOperator;
import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcOperator;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryData;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
@ -44,24 +44,24 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
@Slf4j
public class SubscribeDataManager implements AutoCloseable {
private final MysqlOperator mysqlOperator;
private final MysqlRegistryProperties registryProperties;
private final JdbcOperator jdbcOperator;
private final JdbcRegistryProperties registryProperties;
private final Map<String, List<SubscribeListener>> dataSubScribeMap = new ConcurrentHashMap<>();
private final ScheduledExecutorService dataSubscribeCheckThreadPool;
private final Map<String, MysqlRegistryData> mysqlRegistryDataMap = new ConcurrentHashMap<>();
private final Map<String, JdbcRegistryData> jdbcRegistryDataMap = new ConcurrentHashMap<>();
public SubscribeDataManager(MysqlRegistryProperties registryProperties, MysqlOperator mysqlOperator) {
public SubscribeDataManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) {
this.registryProperties = registryProperties;
this.mysqlOperator = mysqlOperator;
this.jdbcOperator = jdbcOperator;
this.dataSubscribeCheckThreadPool = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder().setNameFormat("MysqlRegistrySubscribeDataCheckThread").setDaemon(true)
new ThreadFactoryBuilder().setNameFormat("JdbcRegistrySubscribeDataCheckThread").setDaemon(true)
.build());
}
public void start() {
dataSubscribeCheckThreadPool.scheduleWithFixedDelay(
new RegistrySubscribeDataCheckTask(dataSubScribeMap, mysqlOperator, mysqlRegistryDataMap),
new RegistrySubscribeDataCheckTask(dataSubScribeMap, jdbcOperator, jdbcRegistryDataMap),
registryProperties.getTermRefreshInterval().toMillis(),
registryProperties.getTermRefreshInterval().toMillis(),
TimeUnit.MILLISECONDS);
@ -76,11 +76,11 @@ public class SubscribeDataManager implements AutoCloseable {
}
public String getData(String path) {
MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMap.get(path);
if (mysqlRegistryData == null) {
JdbcRegistryData jdbcRegistryData = jdbcRegistryDataMap.get(path);
if (jdbcRegistryData == null) {
return null;
}
return mysqlRegistryData.getData();
return jdbcRegistryData.getData();
}
@Override
@ -93,23 +93,23 @@ public class SubscribeDataManager implements AutoCloseable {
static class RegistrySubscribeDataCheckTask implements Runnable {
private final Map<String, List<SubscribeListener>> dataSubScribeMap;
private final MysqlOperator mysqlOperator;
private final Map<String, MysqlRegistryData> mysqlRegistryDataMap;
private final JdbcOperator jdbcOperator;
private final Map<String, JdbcRegistryData> jdbcRegistryDataMap;
@Override
public void run() {
// query the full data from database, and update the mysqlRegistryDataMap
// query the full data from database, and update the jdbcRegistryDataMap
try {
Map<String, MysqlRegistryData> currentMysqlDataMap = mysqlOperator.queryAllMysqlRegistryData()
Map<String, JdbcRegistryData> currentJdbcDataMap = jdbcOperator.queryAllJdbcRegistryData()
.stream()
.collect(Collectors.toMap(MysqlRegistryData::getKey, Function.identity()));
.collect(Collectors.toMap(JdbcRegistryData::getKey, Function.identity()));
// find the different
List<MysqlRegistryData> addedData = new ArrayList<>();
List<MysqlRegistryData> deletedData = new ArrayList<>();
List<MysqlRegistryData> updatedData = new ArrayList<>();
for (Map.Entry<String, MysqlRegistryData> entry : currentMysqlDataMap.entrySet()) {
MysqlRegistryData newData = entry.getValue();
MysqlRegistryData oldData = mysqlRegistryDataMap.get(entry.getKey());
List<JdbcRegistryData> addedData = new ArrayList<>();
List<JdbcRegistryData> deletedData = new ArrayList<>();
List<JdbcRegistryData> updatedData = new ArrayList<>();
for (Map.Entry<String, JdbcRegistryData> entry : currentJdbcDataMap.entrySet()) {
JdbcRegistryData newData = entry.getValue();
JdbcRegistryData oldData = jdbcRegistryDataMap.get(entry.getKey());
if (oldData == null) {
addedData.add(newData);
} else {
@ -118,13 +118,13 @@ public class SubscribeDataManager implements AutoCloseable {
}
}
}
for (Map.Entry<String, MysqlRegistryData> entry : mysqlRegistryDataMap.entrySet()) {
if (!currentMysqlDataMap.containsKey(entry.getKey())) {
for (Map.Entry<String, JdbcRegistryData> entry : jdbcRegistryDataMap.entrySet()) {
if (!currentJdbcDataMap.containsKey(entry.getKey())) {
deletedData.add(entry.getValue());
}
}
mysqlRegistryDataMap.clear();
mysqlRegistryDataMap.putAll(currentMysqlDataMap);
jdbcRegistryDataMap.clear();
jdbcRegistryDataMap.putAll(currentJdbcDataMap);
// trigger listener
for (Map.Entry<String, List<SubscribeListener>> entry : dataSubScribeMap.entrySet()) {
String subscribeKey = entry.getKey();
@ -134,15 +134,15 @@ public class SubscribeDataManager implements AutoCloseable {
triggerListener(updatedData, subscribeKey, subscribeListeners, Event.Type.UPDATE);
}
} catch (Exception e) {
log.error("Query data from mysql registry error");
log.error("Query data from jdbc registry error");
}
}
private void triggerListener(List<MysqlRegistryData> dataList,
private void triggerListener(List<JdbcRegistryData> dataList,
String subscribeKey,
List<SubscribeListener> subscribeListeners,
Event.Type type) {
for (MysqlRegistryData data : dataList) {
for (JdbcRegistryData data : dataList) {
if (data.getKey().startsWith(subscribeKey)) {
subscribeListeners.forEach(subscribeListener -> subscribeListener
.notify(new Event(data.getKey(), data.getKey(), data.getData(), type)));

8
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/resources/mysql_registry_init.sql

@ -17,8 +17,8 @@
SET FOREIGN_KEY_CHECKS = 0;
DROP TABLE IF EXISTS `t_ds_mysql_registry_data`;
CREATE TABLE `t_ds_mysql_registry_data`
DROP TABLE IF EXISTS `t_ds_jdbc_registry_data`;
CREATE TABLE `t_ds_jdbc_registry_data`
(
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
`key` varchar(256) NOT NULL COMMENT 'key, like zookeeper node path',
@ -33,8 +33,8 @@ CREATE TABLE `t_ds_mysql_registry_data`
DEFAULT CHARSET = utf8;
DROP TABLE IF EXISTS `t_ds_mysql_registry_lock`;
CREATE TABLE `t_ds_mysql_registry_lock`
DROP TABLE IF EXISTS `t_ds_jdbc_registry_lock`;
CREATE TABLE `t_ds_jdbc_registry_lock`
(
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
`key` varchar(256) NOT NULL COMMENT 'lock path',

45
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/resources/postgresql_registry_init.sql

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
DROP TABLE IF EXISTS t_ds_jdbc_registry_data;
create table t_ds_jdbc_registry_data
(
id serial
constraint t_ds_jdbc_registry_data_pk primary key,
key varchar not null,
data text not null,
type int4 not null,
last_term bigint not null,
last_update_time timestamp default current_timestamp not null,
create_time timestamp default current_timestamp not null
);
create unique index t_ds_jdbc_registry_data_key_uindex on t_ds_jdbc_registry_data (key);
DROP TABLE IF EXISTS t_ds_jdbc_registry_lock;
create table t_ds_jdbc_registry_lock
(
id serial
constraint t_ds_jdbc_registry_lock_pk primary key,
key varchar not null,
lock_owner varchar not null,
last_term bigint not null,
last_update_time timestamp default current_timestamp not null,
create_time timestamp default current_timestamp not null
);
create unique index t_ds_jdbc_registry_lock_key_uindex on t_ds_jdbc_registry_lock (key);

2
dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml

@ -30,7 +30,7 @@
<modules>
<module>dolphinscheduler-registry-zookeeper</module>
<module>dolphinscheduler-registry-mysql</module>
<module>dolphinscheduler-registry-jdbc</module>
<module>dolphinscheduler-registry-etcd</module>
</modules>
</project>

Loading…
Cancel
Save