Browse Source

Add mysql registry plugin (#10406)

* Add mysql registry plugin
k8s/config
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
78c5fcc6ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/pom.xml
  2. 2
      dolphinscheduler-master/pom.xml
  3. 42
      dolphinscheduler-registry/dolphinscheduler-registry-all/pom.xml
  4. 36
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/README.md
  5. 56
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml
  6. 326
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
  7. 177
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java
  8. 31
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConstant.java
  9. 47
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryProperties.java
  10. 33
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/DataType.java
  11. 40
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
  12. 54
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
  13. 162
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java
  14. 137
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/RegistryLockManager.java
  15. 156
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/SubscribeDataManager.java
  16. 47
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql
  17. 6
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
  18. 46
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryProperties.java
  19. 3
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
  20. 3
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
  21. 1
      dolphinscheduler-registry/pom.xml
  22. 2
      dolphinscheduler-worker/pom.xml
  23. 17
      pom.xml

2
dolphinscheduler-api/pom.xml

@ -62,7 +62,7 @@
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-zookeeper</artifactId>
<artifactId>dolphinscheduler-registry-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>

2
dolphinscheduler-master/pom.xml

@ -49,7 +49,7 @@
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-zookeeper</artifactId>
<artifactId>dolphinscheduler-registry-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>

42
dolphinscheduler-registry/dolphinscheduler-registry-all/pom.xml

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry</artifactId>
<version>dev-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-registry-all</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-mysql</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

36
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/README.md

@ -0,0 +1,36 @@
# Introduction
This module is the mysql registry plugin module, this plugin will use mysql as the registry center.
# How to use
If you want to set the registry center as mysql, you need to do the below two steps:
1. Initialize the mysql table
You can directly execute the sql script `src/main/resources/mysql_registry_init.sql`.
2. Open the config
You need to set the registry properties in master/worker/api's appplication.yml
```yaml
registry:
type: mysql
term-refresh-interval: 2s
term-expire-times: 3
hikari-config:
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler
username: root
password: root
maximum-pool-size: 5
connection-timeout: 9000
idle-timeout: 600000
```
After do this two steps, you can start your DolphinScheduler cluster, your cluster will use mysql as registry center to
store server metadata.
NOTE: You need to add `mysql-connector-java.jar` into DS classpath, since this plugin will not bundle this driver in distribution.
You can get the detail about <a href="https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/installation/pseudo-cluster.html">Initialize the Database</a>

56
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml

@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Apache Software Foundation (ASF) under one or more contributor
~ license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright
~ ownership. Apache Software Foundation (ASF) licenses this file to you under
~ the Apache License, Version 2.0 (the "License"); you may
~ not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-plugins</artifactId>
<version>dev-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-registry-mysql</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-common</artifactId>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
</project>

326
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java

@ -0,0 +1,326 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.DataType;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock;
import org.apache.commons.lang3.StringUtils;
import java.sql.Array;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
/**
* Used to CRUD from mysql
*/
public class MysqlOperator implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(MysqlOperator.class);
private final HikariDataSource dataSource;
private final long expireTimeWindow;
public MysqlOperator(MysqlRegistryProperties registryProperties) {
this.expireTimeWindow = registryProperties.getTermExpireTimes() * registryProperties.getTermRefreshInterval().toMillis();
HikariConfig hikariConfig = registryProperties.getHikariConfig();
hikariConfig.setPoolName("MysqlRegistryDataSourcePool");
this.dataSource = new HikariDataSource(hikariConfig);
}
public void healthCheck() throws SQLException {
String sql = "select 1 from t_ds_mysql_registry_data";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery();) {
// if no exception, the healthCheck success
}
}
public List<MysqlRegistryData> queryAllMysqlRegistryData() throws SQLException {
String sql = "select id, `key`, data, type, create_time, last_update_time from t_ds_mysql_registry_data";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery()) {
List<MysqlRegistryData> result = new ArrayList<>(resultSet.getFetchSize());
while (resultSet.next()) {
MysqlRegistryData mysqlRegistryData = MysqlRegistryData.builder()
.id(resultSet.getLong("id"))
.key(resultSet.getString("key"))
.data(resultSet.getString("data"))
.type(resultSet.getInt("type"))
.createTime(resultSet.getTimestamp("create_time"))
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
.build();
result.add(mysqlRegistryData);
}
return result;
}
}
public long insertOrUpdateEphemeralData(String key, String value) throws SQLException {
String sql = "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, current_timestamp)" +
"ON DUPLICATE KEY UPDATE data=?, last_update_time=current_timestamp";
// put a ephemeralData
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
preparedStatement.setString(1, key);
preparedStatement.setString(2, value);
preparedStatement.setInt(3, DataType.EPHEMERAL.getTypeValue());
preparedStatement.setString(4, value);
int insertCount = preparedStatement.executeUpdate();
ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
if (insertCount < 1 || !generatedKeys.next()) {
throw new SQLException("Insert or update ephemeral data error");
}
return generatedKeys.getLong(1);
}
}
public long insertOrUpdatePersistentData(String key, String value) throws SQLException {
String sql = "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, current_timestamp)" +
"ON DUPLICATE KEY UPDATE data=?, last_update_time=current_timestamp";
// put a persistent Data
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
preparedStatement.setString(1, key);
preparedStatement.setString(2, value);
preparedStatement.setInt(3, DataType.PERSISTENT.getTypeValue());
preparedStatement.setString(4, value);
int insertCount = preparedStatement.executeUpdate();
ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
if (insertCount < 1 || !generatedKeys.next()) {
throw new SQLException("Insert or update persistent data error");
}
return generatedKeys.getLong(1);
}
}
public void deleteEphemeralData(String key) throws SQLException {
String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and type = ?";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, key);
preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue());
preparedStatement.execute();
}
}
public void deleteEphemeralData(long ephemeralNodeId) throws SQLException {
String sql = "DELETE from t_ds_mysql_registry_data where `id` = ?";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setLong(1, ephemeralNodeId);
preparedStatement.execute();
}
}
public void deletePersistentData(String key) throws SQLException {
String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and type = ?";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, key);
preparedStatement.setInt(2, DataType.PERSISTENT.getTypeValue());
preparedStatement.execute();
}
}
public void clearExpireLock() {
String sql = "delete from t_ds_mysql_registry_lock where last_term < ?";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setTimestamp(1,
new Timestamp(System.currentTimeMillis() - expireTimeWindow));
int i = preparedStatement.executeUpdate();
if (i > 0) {
logger.info("Clear expire lock, size: {}", i);
}
} catch (Exception ex) {
logger.warn("Clear expire lock from mysql registry error", ex);
}
}
public void clearExpireEphemeralDate() {
String sql = "delete from t_ds_mysql_registry_data where last_update_time < ? and type = ?";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setTimestamp(1, new Timestamp(System.currentTimeMillis() - expireTimeWindow));
preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue());
int i = preparedStatement.executeUpdate();
if (i > 0) {
logger.info("clear expire ephemeral data, size:{}", i);
}
} catch (Exception ex) {
logger.warn("Clear expire ephemeral data from mysql registry error", ex);
}
}
public MysqlRegistryData getData(String key) throws SQLException {
String sql = "SELECT id, `key`, data, type, create_time, last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, key);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
return null;
}
return MysqlRegistryData.builder()
.id(resultSet.getLong("id"))
.key(resultSet.getString("key"))
.data(resultSet.getString("data"))
.type(resultSet.getInt("type"))
.createTime(resultSet.getTimestamp("create_time"))
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
.build();
}
}
}
public List<String> getChildren(String key) throws SQLException {
String sql = "SELECT `key` from t_ds_mysql_registry_data where `key` like ?";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, key + "%");
try (ResultSet resultSet = preparedStatement.executeQuery()) {
List<String> result = new ArrayList<>(resultSet.getFetchSize());
while (resultSet.next()) {
String fullPath = resultSet.getString("key");
if (fullPath.length() > key.length()) {
result.add(StringUtils.substringBefore(fullPath.substring(key.length() + 1), "/"));
}
}
return result;
}
}
}
public boolean existKey(String key) throws SQLException {
String sql = "SELECT 1 FROM t_ds_mysql_registry_data WHERE `key` = ?";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, key);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) {
return true;
}
}
}
return false;
}
/**
* Try to acquire the target Lock, if cannot acquire, return null.
*/
public MysqlRegistryLock tryToAcquireLock(String key) throws SQLException {
String sql = "INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner, last_term, last_update_time, create_time) VALUES (?, ?, current_timestamp, current_timestamp, current_timestamp)";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
preparedStatement.setString(1, key);
preparedStatement.setString(2, MysqlRegistryConstant.LOCK_OWNER);
preparedStatement.executeUpdate();
try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
if (resultSet.next()) {
long newLockId = resultSet.getLong(1);
return getLockById(newLockId);
}
}
return null;
} catch (SQLIntegrityConstraintViolationException e) {
// duplicate exception
return null;
}
}
public MysqlRegistryLock getLockById(long lockId) throws SQLException {
String sql = "SELECT `id`, `key`, lock_owner, last_term, last_update_time, create_time FROM t_ds_mysql_registry_lock WHERE id = ?";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setLong(1, lockId);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) {
return MysqlRegistryLock.builder()
.id(resultSet.getLong("id"))
.key(resultSet.getString("key"))
.lockOwner(resultSet.getString("lock_owner"))
.lastTerm(resultSet.getTimestamp("last_term"))
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
.createTime(resultSet.getTimestamp("create_time"))
.build();
}
}
return null;
}
}
// release the lock
public boolean releaseLock(long lockId) throws SQLException {
String sql = "DELETE FROM t_ds_mysql_registry_lock WHERE id = ?";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setLong(1, lockId);
int i = preparedStatement.executeUpdate();
return i > 0;
}
}
public boolean updateEphemeralDataTerm(Collection<Long> ephemeralDateIds) throws SQLException {
String sql = "update t_ds_mysql_registry_data set `last_update_time` = current_timestamp() where `id` IN (?)";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
Array idArray = connection.createArrayOf("bigint", ephemeralDateIds.toArray());
preparedStatement.setArray(1, idArray);
return preparedStatement.executeUpdate() > 0;
}
}
public boolean updateLockTerm(List<Long> lockIds) throws SQLException {
String sql = "update t_ds_mysql_registry_lock set `last_term` = current_timestamp and `last_update_time` = current_timestamp where `id` IN (?)";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
Array idArray = connection.createArrayOf("bigint", lockIds.toArray());
preparedStatement.setArray(1, idArray);
return preparedStatement.executeUpdate() > 0;
}
}
@Override
public void close() throws Exception {
if (!dataSource.isClosed()) {
try (HikariDataSource closedDatasource = this.dataSource) {
}
}
}
}

177
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java

@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql;
import org.apache.dolphinscheduler.plugin.registry.mysql.task.EphemeralDateManager;
import org.apache.dolphinscheduler.plugin.registry.mysql.task.RegistryLockManager;
import org.apache.dolphinscheduler.plugin.registry.mysql.task.SubscribeDataManager;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collection;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* This is one of the implementation of {@link Registry}, with this implementation, you need to rely on mysql database to
* store the DolphinScheduler master/worker's metadata and do the server registry/unRegistry.
*/
@Component
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "mysql")
public class MysqlRegistry implements Registry {
private static Logger LOGGER = LoggerFactory.getLogger(MysqlRegistry.class);
private final EphemeralDateManager ephemeralDateManager;
private final SubscribeDataManager subscribeDataManager;
private final RegistryLockManager registryLockManager;
private final MysqlOperator mysqlOperator;
public MysqlRegistry(MysqlRegistryProperties mysqlRegistryProperties) {
this.mysqlOperator = new MysqlOperator(mysqlRegistryProperties);
mysqlOperator.clearExpireLock();
mysqlOperator.clearExpireEphemeralDate();
this.ephemeralDateManager = new EphemeralDateManager(mysqlRegistryProperties, mysqlOperator);
this.subscribeDataManager = new SubscribeDataManager(mysqlRegistryProperties, mysqlOperator);
this.registryLockManager = new RegistryLockManager(mysqlRegistryProperties, mysqlOperator);
LOGGER.info("Initialize Mysql Registry...");
}
@PostConstruct
public void start() {
LOGGER.info("Starting Mysql Registry...");
// start a mysql connect check
ephemeralDateManager.start();
subscribeDataManager.start();
registryLockManager.start();
LOGGER.info("Started Mysql Registry...");
}
@Override
public boolean subscribe(String path, SubscribeListener listener) {
// new a schedule thread to query the path, if the path
subscribeDataManager.addListener(path, listener);
return true;
}
@Override
public void unsubscribe(String path) {
subscribeDataManager.removeListener(path);
}
@Override
public void addConnectionStateListener(ConnectionListener listener) {
// check the current connection
ephemeralDateManager.addConnectionListener(listener);
}
@Override
public String get(String key) {
// get the key value
return subscribeDataManager.getData(key);
}
@Override
public void put(String key, String value, boolean deleteOnDisconnect) {
try {
if (deleteOnDisconnect) {
// when put a ephemeralData will new a scheduler thread to update it
ephemeralDateManager.insertOrUpdateEphemeralData(key, value);
} else {
mysqlOperator.insertOrUpdatePersistentData(key, value);
}
} catch (Exception ex) {
throw new RegistryException(String.format("put key:%s, value:%s error", key, value), ex);
}
}
@Override
public void delete(String key) {
try {
mysqlOperator.deleteEphemeralData(key);
mysqlOperator.deletePersistentData(key);
} catch (Exception e) {
throw new RegistryException(String.format("Delete key: %s error", key), e);
}
}
@Override
public Collection<String> children(String key) {
try {
return mysqlOperator.getChildren(key);
} catch (SQLException e) {
throw new RegistryException(String.format("Get key: %s children error", key), e);
}
}
@Override
public boolean exists(String key) {
try {
return mysqlOperator.existKey(key);
} catch (Exception e) {
throw new RegistryException(String.format("Check key: %s exist error", key), e);
}
}
@Override
public boolean acquireLock(String key) {
try {
registryLockManager.acquireLock(key);
return true;
} catch (RegistryException e) {
throw e;
} catch (Exception e) {
throw new RegistryException(String.format("Acquire lock: %s error", key), e);
}
}
@Override
public boolean releaseLock(String key) {
registryLockManager.releaseLock(key);
return true;
}
@Override
public Duration getSessionTimeout() {
throw new UnsupportedOperationException("Not support session timeout at Mysql Registry");
}
@Override
public void close() {
LOGGER.info("Closing Mysql Registry...");
// remove the current Ephemeral node, if can connect to mysql
try (EphemeralDateManager closed1 = ephemeralDateManager;
SubscribeDataManager close2 = subscribeDataManager;
RegistryLockManager close3 = registryLockManager;
MysqlOperator closed4 = mysqlOperator) {
} catch (Exception e) {
LOGGER.error("Close Mysql Registry error", e);
}
LOGGER.info("Closed Mysql Registry...");
}
}

31
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConstant.java

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import lombok.experimental.UtilityClass;
@UtilityClass
public final class MysqlRegistryConstant {
public static final long LOCK_ACQUIRE_INTERVAL = 1_000;
public static final String LOCK_OWNER = NetUtils.getHost() + "_" + OSUtils.getProcessID();
}

47
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryProperties.java

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql;
import java.time.Duration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import com.zaxxer.hikari.HikariConfig;
import lombok.Data;
@Data
@Configuration
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "mysql")
@ConfigurationProperties(prefix = "registry")
public class MysqlRegistryProperties {
/**
* Used to schedule refresh the ephemeral data/ lock.
*/
private Duration termRefreshInterval = Duration.ofSeconds(2);
/**
* Used to calculate the expire time,
* e.g. if you set 2, and latest two refresh error, then the ephemeral data/lock will be expire.
*/
private int termExpireTimes = 3;
private HikariConfig hikariConfig;
}

33
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/DataType.java

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql.model;
public enum DataType {
EPHEMERAL(1),
PERSISTENT(2),
;
private final int typeValue;
DataType(int typeValue) {
this.typeValue = typeValue;
}
public int getTypeValue() {
return typeValue;
}
}

40
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql.model;
import java.util.Date;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MysqlRegistryData {
private long id;
private String key;
private String data;
private int type;
private Date createTime;
private Date lastUpdateTime;
}

54
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql.model;
import java.util.Date;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MysqlRegistryLock {
private long id;
/**
* The lock key.
*/
private String key;
/**
* acquire lock host.
*/
private String lockOwner;
/**
* The last term, if the (currentTime - lastTerm) > termExpire time, the lock will be expired.
*/
private Date lastTerm;
/**
* The lock last update time.
*/
private Date lastUpdateTime;
/**
* The lock create time.
*/
private Date createTime;
}

162
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java

@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql.task;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlOperator;
import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlRegistryProperties;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* This thread is used to check the connect state to mysql.
*/
public class EphemeralDateManager implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(EphemeralDateManager.class);
private final MysqlOperator mysqlOperator;
private final MysqlRegistryProperties registryProperties;
private final List<ConnectionListener> connectionListeners = Collections.synchronizedList(new ArrayList<>());
private final Set<Long> ephemeralDateIds = Collections.synchronizedSet(new HashSet<>());
private final ScheduledExecutorService scheduledExecutorService;
public EphemeralDateManager(MysqlRegistryProperties registryProperties, MysqlOperator mysqlOperator) {
this.registryProperties = registryProperties;
this.mysqlOperator = checkNotNull(mysqlOperator);
this.scheduledExecutorService = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder().setNameFormat("EphemeralDateTermRefreshThread").setDaemon(true).build());
}
public void start() {
this.scheduledExecutorService.scheduleWithFixedDelay(
new EphemeralDateTermRefreshTask(mysqlOperator, connectionListeners, ephemeralDateIds),
registryProperties.getTermRefreshInterval().toMillis(),
registryProperties.getTermRefreshInterval().toMillis(),
TimeUnit.MILLISECONDS);
}
public void addConnectionListener(ConnectionListener connectionListener) {
connectionListeners.add(connectionListener);
}
public long insertOrUpdateEphemeralData(String key, String value) throws SQLException {
long ephemeralId = mysqlOperator.insertOrUpdateEphemeralData(key, value);
ephemeralDateIds.add(ephemeralId);
return ephemeralId;
}
@Override
public void close() throws SQLException {
ephemeralDateIds.clear();
connectionListeners.clear();
scheduledExecutorService.shutdownNow();
for (Long ephemeralDateId : ephemeralDateIds) {
mysqlOperator.deleteEphemeralData(ephemeralDateId);
}
}
// Use this task to refresh ephemeral term and check the connect state.
static class EphemeralDateTermRefreshTask implements Runnable {
private final List<ConnectionListener> connectionListeners;
private final Set<Long> ephemeralDateIds;
private final MysqlOperator mysqlOperator;
private ConnectionState connectionState;
private EphemeralDateTermRefreshTask(MysqlOperator mysqlOperator,
List<ConnectionListener> connectionListeners,
Set<Long> ephemeralDateIds) {
this.mysqlOperator = checkNotNull(mysqlOperator);
this.connectionListeners = checkNotNull(connectionListeners);
this.ephemeralDateIds = checkNotNull(ephemeralDateIds);
}
@Override
public void run() {
try {
ConnectionState currentConnectionState = getConnectionState();
if (currentConnectionState == connectionState) {
// no state change
return;
}
if (connectionState == ConnectionState.CONNECTED) {
if (currentConnectionState == ConnectionState.DISCONNECTED) {
connectionState = ConnectionState.DISCONNECTED;
triggerListener(ConnectionState.DISCONNECTED);
}
} else if (connectionState == ConnectionState.DISCONNECTED) {
if (currentConnectionState == ConnectionState.CONNECTED) {
connectionState = ConnectionState.CONNECTED;
triggerListener(ConnectionState.RECONNECTED);
}
} else if (connectionState == null) {
connectionState = currentConnectionState;
triggerListener(connectionState);
}
} catch (Exception e) {
LOGGER.error("Mysql Registry connect state check task execute failed", e);
connectionState = ConnectionState.DISCONNECTED;
triggerListener(ConnectionState.DISCONNECTED);
}
}
private ConnectionState getConnectionState() {
try {
if (ephemeralDateIds.isEmpty()) {
mysqlOperator.healthCheck();
} else {
updateEphemeralDateTerm();
}
mysqlOperator.clearExpireEphemeralDate();
return ConnectionState.CONNECTED;
} catch (Exception ex) {
return ConnectionState.DISCONNECTED;
}
}
private void updateEphemeralDateTerm() throws SQLException {
if (!mysqlOperator.updateEphemeralDataTerm(ephemeralDateIds)) {
LOGGER.warn("Update mysql registry ephemeral data: {} term error", ephemeralDateIds);
}
}
private void triggerListener(ConnectionState connectionState) {
for (ConnectionListener connectionListener : connectionListeners) {
connectionListener.onUpdate(connectionState);
}
}
}
}

137
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/RegistryLockManager.java

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql.task;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlOperator;
import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlRegistryConstant;
import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
public class RegistryLockManager implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(RegistryLockManager.class);
private final MysqlOperator mysqlOperator;
private final MysqlRegistryProperties registryProperties;
private final Map<String, MysqlRegistryLock> lockHoldMap;
private final ScheduledExecutorService lockTermUpdateThreadPool;
public RegistryLockManager(MysqlRegistryProperties registryProperties, MysqlOperator mysqlOperator) {
this.registryProperties = registryProperties;
this.mysqlOperator = mysqlOperator;
this.lockHoldMap = new ConcurrentHashMap<>();
this.lockTermUpdateThreadPool = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("MysqlRegistryLockTermRefreshThread").setDaemon(true).build());
}
public void start() {
lockTermUpdateThreadPool.scheduleWithFixedDelay(
new LockTermRefreshTask(lockHoldMap, mysqlOperator),
registryProperties.getTermRefreshInterval().toMillis(),
registryProperties.getTermRefreshInterval().toMillis(),
TimeUnit.MILLISECONDS);
}
/**
* Acquire the lock, if cannot get the lock will await.
*/
public void acquireLock(String lockKey) throws RegistryException {
// maybe we can use the computeIf absent
lockHoldMap.computeIfAbsent(lockKey, key -> {
MysqlRegistryLock mysqlRegistryLock;
try {
while ((mysqlRegistryLock = mysqlOperator.tryToAcquireLock(lockKey)) == null) {
logger.debug("Acquire the lock {} failed try again", key);
// acquire failed, wait and try again
ThreadUtils.sleep(MysqlRegistryConstant.LOCK_ACQUIRE_INTERVAL);
}
} catch (SQLException e) {
throw new RegistryException("Acquire the lock error", e);
}
return mysqlRegistryLock;
});
}
public void releaseLock(String lockKey) {
MysqlRegistryLock mysqlRegistryLock = lockHoldMap.get(lockKey);
if (mysqlRegistryLock != null) {
try {
// the lock is unExit
mysqlOperator.releaseLock(mysqlRegistryLock.getId());
lockHoldMap.remove(lockKey);
} catch (SQLException e) {
throw new RegistryException(String.format("Release lock: %s error", lockKey), e);
}
}
}
@Override
public void close() {
lockTermUpdateThreadPool.shutdownNow();
for (Map.Entry<String, MysqlRegistryLock> lockEntry : lockHoldMap.entrySet()) {
releaseLock(lockEntry.getKey());
}
}
/**
* This task is used to refresh the lock held by the current server.
*/
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
static class LockTermRefreshTask implements Runnable {
private final Map<String, MysqlRegistryLock> lockHoldMap;
private final MysqlOperator mysqlOperator;
public void run() {
try {
if (lockHoldMap.isEmpty()) {
return;
}
List<Long> lockIds = lockHoldMap.values()
.stream()
.map(MysqlRegistryLock::getId)
.collect(Collectors.toList());
if (!mysqlOperator.updateLockTerm(lockIds)) {
logger.warn("Update the lock: {} term failed.", lockIds);
}
mysqlOperator.clearExpireLock();
} catch (Exception e) {
logger.error("Update lock term error", e);
}
}
}
}

156
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/SubscribeDataManager.java

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql.task;
import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlOperator;
import org.apache.dolphinscheduler.plugin.registry.mysql.MysqlRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.RequiredArgsConstructor;
/**
* Used to refresh if the subscribe path has been changed.
*/
public class SubscribeDataManager implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(SubscribeDataManager.class);
private final MysqlOperator mysqlOperator;
private final MysqlRegistryProperties registryProperties;
private final Map<String, List<SubscribeListener>> dataSubScribeMap = new ConcurrentHashMap<>();
private final ScheduledExecutorService dataSubscribeCheckThreadPool;
private final Map<String, MysqlRegistryData> mysqlRegistryDataMap = new ConcurrentHashMap<>();
public SubscribeDataManager(MysqlRegistryProperties registryProperties, MysqlOperator mysqlOperator) {
this.registryProperties = registryProperties;
this.mysqlOperator = mysqlOperator;
this.dataSubscribeCheckThreadPool = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder().setNameFormat("MysqlRegistrySubscribeDataCheckThread").setDaemon(true).build());
}
public void start() {
dataSubscribeCheckThreadPool.scheduleWithFixedDelay(
new RegistrySubscribeDataCheckTask(dataSubScribeMap, mysqlOperator, mysqlRegistryDataMap),
registryProperties.getTermRefreshInterval().toMillis(),
registryProperties.getTermRefreshInterval().toMillis(),
TimeUnit.MILLISECONDS);
}
public void addListener(String path, SubscribeListener subscribeListener) {
dataSubScribeMap.computeIfAbsent(path, k -> new ArrayList<>()).add(subscribeListener);
}
public void removeListener(String path) {
dataSubScribeMap.remove(path);
}
public String getData(String path) {
MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMap.get(path);
if (mysqlRegistryData == null) {
return null;
}
return mysqlRegistryData.getData();
}
@Override
public void close() {
dataSubscribeCheckThreadPool.shutdownNow();
dataSubScribeMap.clear();
}
@RequiredArgsConstructor
static class RegistrySubscribeDataCheckTask implements Runnable {
private final Map<String, List<SubscribeListener>> dataSubScribeMap;
private final MysqlOperator mysqlOperator;
private final Map<String, MysqlRegistryData> mysqlRegistryDataMap;
@Override
public void run() {
// query the full data from database, and update the mysqlRegistryDataMap
try {
Map<String, MysqlRegistryData> currentMysqlDataMap = mysqlOperator.queryAllMysqlRegistryData()
.stream()
.collect(Collectors.toMap(MysqlRegistryData::getKey, Function.identity()));
// find the different
List<MysqlRegistryData> addedData = new ArrayList<>();
List<MysqlRegistryData> deletedData = new ArrayList<>();
List<MysqlRegistryData> updatedData = new ArrayList<>();
for (Map.Entry<String, MysqlRegistryData> entry : currentMysqlDataMap.entrySet()) {
MysqlRegistryData newData = entry.getValue();
MysqlRegistryData oldData = mysqlRegistryDataMap.get(entry.getKey());
if (oldData == null) {
addedData.add(newData);
} else {
if (!entry.getValue().getLastUpdateTime().equals(oldData.getLastUpdateTime())) {
updatedData.add(newData);
}
}
}
for (Map.Entry<String, MysqlRegistryData> entry : mysqlRegistryDataMap.entrySet()) {
if (!currentMysqlDataMap.containsKey(entry.getKey())) {
deletedData.add(entry.getValue());
}
}
mysqlRegistryDataMap.clear();
mysqlRegistryDataMap.putAll(currentMysqlDataMap);
// trigger listener
for (Map.Entry<String, List<SubscribeListener>> entry : dataSubScribeMap.entrySet()) {
String subscribeKey = entry.getKey();
List<SubscribeListener> subscribeListeners = entry.getValue();
triggerListener(addedData, subscribeKey, subscribeListeners, Event.Type.ADD);
triggerListener(deletedData, subscribeKey, subscribeListeners, Event.Type.REMOVE);
triggerListener(updatedData, subscribeKey, subscribeListeners, Event.Type.UPDATE);
}
} catch (Exception e) {
LOGGER.error("Query data from mysql registry error");
}
}
private void triggerListener(List<MysqlRegistryData> dataList,
String subscribeKey,
List<SubscribeListener> subscribeListeners,
Event.Type type) {
for (MysqlRegistryData data : dataList) {
if (data.getKey().startsWith(subscribeKey)) {
subscribeListeners.forEach(subscribeListener ->
subscribeListener.notify(new Event(data.getKey(), data.getKey(), data.getData(), type)));
}
}
}
}
}

47
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
SET FOREIGN_KEY_CHECKS = 0;
DROP TABLE IF EXISTS `t_ds_mysql_registry_data`;
CREATE TABLE `t_ds_mysql_registry_data`
(
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
`key` varchar(200) NOT NULL COMMENT 'key, like zookeeper node path',
`data` varchar(200) NOT NULL COMMENT 'data, like zookeeper node value',
`type` tinyint(4) NOT NULL COMMENT '1: ephemeral node, 2: persistent node',
`last_update_time` timestamp NULL COMMENT 'last update time',
`create_time` timestamp NULL COMMENT 'create time',
PRIMARY KEY (`id`),
unique (`key`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
DROP TABLE IF EXISTS `t_ds_mysql_registry_lock`;
CREATE TABLE `t_ds_mysql_registry_lock`
(
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
`key` varchar(200) NOT NULL COMMENT 'lock path',
`lock_owner` varchar(100) NOT NULL COMMENT 'the lock owner, ip_processId',
`last_term` timestamp NOT NULL COMMENT 'last term time',
`last_update_time` timestamp NULL COMMENT 'last update time',
`create_time` timestamp NULL COMMENT 'lock create time',
PRIMARY KEY (`id`),
unique (`key`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;

6
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java

@ -23,8 +23,6 @@ import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.RegistryProperties;
import org.apache.dolphinscheduler.registry.api.RegistryProperties.ZookeeperProperties;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.curator.framework.CuratorFramework;
@ -59,14 +57,14 @@ import com.google.common.base.Strings;
@Component
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "zookeeper")
public final class ZookeeperRegistry implements Registry {
private final ZookeeperProperties properties;
private final ZookeeperRegistryProperties.ZookeeperProperties properties;
private final CuratorFramework client;
private final Map<String, TreeCache> treeCacheMap = new ConcurrentHashMap<>();
private static final ThreadLocal<Map<String, InterProcessMutex>> threadLocalLockMap = new ThreadLocal<>();
public ZookeeperRegistry(RegistryProperties registryProperties) {
public ZookeeperRegistry(ZookeeperRegistryProperties registryProperties) {
properties = registryProperties.getZookeeper();
final ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(

46
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryProperties.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryProperties.java

@ -1,44 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.registry.api;
package org.apache.dolphinscheduler.plugin.registry.zookeeper;
import java.time.Duration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "zookeeper")
@ConfigurationProperties(prefix = "registry")
public class RegistryProperties {
private Type type;
public class ZookeeperRegistryProperties {
private ZookeeperProperties zookeeper = new ZookeeperProperties();
public Type getType() {
return type;
}
public void setType(Type type) {
this.type = type;
}
public ZookeeperProperties getZookeeper() {
return zookeeper;
}
@ -47,10 +37,6 @@ public class RegistryProperties {
this.zookeeper = zookeeper;
}
public enum Type {
ZOOKEEPER
}
public static final class ZookeeperProperties {
private String namespace;
private String connectString;

3
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.plugin.registry.zookeeper;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.RegistryProperties;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.curator.test.TestingServer;
@ -48,7 +47,7 @@ public class ZookeeperRegistryTest {
public void before() throws Exception {
server = new TestingServer(true);
RegistryProperties p = new RegistryProperties();
ZookeeperRegistryProperties p = new ZookeeperRegistryProperties();
p.getZookeeper().setConnectString(server.getConnectString());
registry = new ZookeeperRegistry(p);
registry.start();

3
dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml

@ -22,8 +22,8 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler-registry</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry</artifactId>
<version>dev-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-registry-plugins</artifactId>
@ -32,5 +32,6 @@
<modules>
<module>dolphinscheduler-registry-zookeeper</module>
<module>dolphinscheduler-registry-mysql</module>
</modules>
</project>

1
dolphinscheduler-registry/pom.xml

@ -48,5 +48,6 @@
<modules>
<module>dolphinscheduler-registry-api</module>
<module>dolphinscheduler-registry-plugins</module>
<module>dolphinscheduler-registry-all</module>
</modules>
</project>

2
dolphinscheduler-worker/pom.xml

@ -59,7 +59,7 @@
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-zookeeper</artifactId>
<artifactId>dolphinscheduler-registry-all</artifactId>
</dependency>
<dependency>

17
pom.xml

@ -131,6 +131,7 @@
<hibernate.validator.version>6.2.2.Final</hibernate.validator.version>
<aws.sdk.version>1.12.160</aws.sdk.version>
<joda-time.version>2.10.13</joda-time.version>
<lombok.version>1.18.20</lombok.version>
<docker.hub>apache</docker.hub>
<docker.repo>${project.name}</docker.repo>
<docker.tag>${project.version}</docker.tag>
@ -273,6 +274,11 @@
<artifactId>dolphinscheduler-registry-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-all</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-dao</artifactId>
@ -385,6 +391,11 @@
<artifactId>dolphinscheduler-registry-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-mysql</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
@ -1222,6 +1233,12 @@
<version>${auto-service.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>

Loading…
Cancel
Save