Browse Source

Fix mysql registry doesn't support array (#12255)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
2e4a9e6d53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 129
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
  2. 1
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java

129
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java

@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.sql.Array;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
@ -34,6 +33,7 @@ import java.sql.Timestamp;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -52,7 +52,8 @@ public class MysqlOperator implements AutoCloseable {
private final long expireTimeWindow; private final long expireTimeWindow;
public MysqlOperator(MysqlRegistryProperties registryProperties) { public MysqlOperator(MysqlRegistryProperties registryProperties) {
this.expireTimeWindow = registryProperties.getTermExpireTimes() * registryProperties.getTermRefreshInterval().toMillis(); this.expireTimeWindow =
registryProperties.getTermExpireTimes() * registryProperties.getTermRefreshInterval().toMillis();
HikariConfig hikariConfig = registryProperties.getHikariConfig(); HikariConfig hikariConfig = registryProperties.getHikariConfig();
hikariConfig.setPoolName("MysqlRegistryDataSourcePool"); hikariConfig.setPoolName("MysqlRegistryDataSourcePool");
@ -62,18 +63,20 @@ public class MysqlOperator implements AutoCloseable {
public void healthCheck() throws SQLException { public void healthCheck() throws SQLException {
String sql = "select 1 from t_ds_mysql_registry_data"; String sql = "select 1 from t_ds_mysql_registry_data";
try (Connection connection = dataSource.getConnection(); try (
PreparedStatement preparedStatement = connection.prepareStatement(sql); Connection connection = dataSource.getConnection();
ResultSet resultSet = preparedStatement.executeQuery();) { PreparedStatement preparedStatement = connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery();) {
// if no exception, the healthCheck success // if no exception, the healthCheck success
} }
} }
public List<MysqlRegistryData> queryAllMysqlRegistryData() throws SQLException { public List<MysqlRegistryData> queryAllMysqlRegistryData() throws SQLException {
String sql = "select id, `key`, data, type, create_time, last_update_time from t_ds_mysql_registry_data"; String sql = "select id, `key`, data, type, create_time, last_update_time from t_ds_mysql_registry_data";
try (Connection connection = dataSource.getConnection(); try (
PreparedStatement preparedStatement = connection.prepareStatement(sql); Connection connection = dataSource.getConnection();
ResultSet resultSet = preparedStatement.executeQuery()) { PreparedStatement preparedStatement = connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery()) {
List<MysqlRegistryData> result = new ArrayList<>(resultSet.getFetchSize()); List<MysqlRegistryData> result = new ArrayList<>(resultSet.getFetchSize());
while (resultSet.next()) { while (resultSet.next()) {
MysqlRegistryData mysqlRegistryData = MysqlRegistryData.builder() MysqlRegistryData mysqlRegistryData = MysqlRegistryData.builder()
@ -91,11 +94,15 @@ public class MysqlOperator implements AutoCloseable {
} }
public long insertOrUpdateEphemeralData(String key, String value) throws SQLException { public long insertOrUpdateEphemeralData(String key, String value) throws SQLException {
String sql = "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, current_timestamp)" + String sql =
"ON DUPLICATE KEY UPDATE data=?, last_update_time=current_timestamp"; "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, current_timestamp)"
+
"ON DUPLICATE KEY UPDATE data=?, last_update_time=current_timestamp";
// put a ephemeralData // put a ephemeralData
try (Connection connection = dataSource.getConnection(); try (
PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
preparedStatement.setString(1, key); preparedStatement.setString(1, key);
preparedStatement.setString(2, value); preparedStatement.setString(2, value);
preparedStatement.setInt(3, DataType.EPHEMERAL.getTypeValue()); preparedStatement.setInt(3, DataType.EPHEMERAL.getTypeValue());
@ -110,11 +117,15 @@ public class MysqlOperator implements AutoCloseable {
} }
public long insertOrUpdatePersistentData(String key, String value) throws SQLException { public long insertOrUpdatePersistentData(String key, String value) throws SQLException {
String sql = "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, current_timestamp)" + String sql =
"ON DUPLICATE KEY UPDATE data=?, last_update_time=current_timestamp"; "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, current_timestamp)"
+
"ON DUPLICATE KEY UPDATE data=?, last_update_time=current_timestamp";
// put a persistent Data // put a persistent Data
try (Connection connection = dataSource.getConnection(); try (
PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
preparedStatement.setString(1, key); preparedStatement.setString(1, key);
preparedStatement.setString(2, value); preparedStatement.setString(2, value);
preparedStatement.setInt(3, DataType.PERSISTENT.getTypeValue()); preparedStatement.setInt(3, DataType.PERSISTENT.getTypeValue());
@ -130,8 +141,9 @@ public class MysqlOperator implements AutoCloseable {
public void deleteEphemeralData(String key) throws SQLException { public void deleteEphemeralData(String key) throws SQLException {
String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and type = ?"; String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and type = ?";
try (Connection connection = dataSource.getConnection(); try (
PreparedStatement preparedStatement = connection.prepareStatement(sql)) { Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, key); preparedStatement.setString(1, key);
preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue()); preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue());
preparedStatement.execute(); preparedStatement.execute();
@ -140,8 +152,9 @@ public class MysqlOperator implements AutoCloseable {
public void deleteEphemeralData(long ephemeralNodeId) throws SQLException { public void deleteEphemeralData(long ephemeralNodeId) throws SQLException {
String sql = "DELETE from t_ds_mysql_registry_data where `id` = ?"; String sql = "DELETE from t_ds_mysql_registry_data where `id` = ?";
try (Connection connection = dataSource.getConnection(); try (
PreparedStatement preparedStatement = connection.prepareStatement(sql)) { Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setLong(1, ephemeralNodeId); preparedStatement.setLong(1, ephemeralNodeId);
preparedStatement.execute(); preparedStatement.execute();
} }
@ -149,8 +162,9 @@ public class MysqlOperator implements AutoCloseable {
public void deletePersistentData(String key) throws SQLException { public void deletePersistentData(String key) throws SQLException {
String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and type = ?"; String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and type = ?";
try (Connection connection = dataSource.getConnection(); try (
PreparedStatement preparedStatement = connection.prepareStatement(sql)) { Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, key); preparedStatement.setString(1, key);
preparedStatement.setInt(2, DataType.PERSISTENT.getTypeValue()); preparedStatement.setInt(2, DataType.PERSISTENT.getTypeValue());
preparedStatement.execute(); preparedStatement.execute();
@ -159,8 +173,9 @@ public class MysqlOperator implements AutoCloseable {
public void clearExpireLock() { public void clearExpireLock() {
String sql = "delete from t_ds_mysql_registry_lock where last_term < ?"; String sql = "delete from t_ds_mysql_registry_lock where last_term < ?";
try (Connection connection = dataSource.getConnection(); try (
PreparedStatement preparedStatement = connection.prepareStatement(sql)) { Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setTimestamp(1, preparedStatement.setTimestamp(1,
new Timestamp(System.currentTimeMillis() - expireTimeWindow)); new Timestamp(System.currentTimeMillis() - expireTimeWindow));
int i = preparedStatement.executeUpdate(); int i = preparedStatement.executeUpdate();
@ -174,8 +189,9 @@ public class MysqlOperator implements AutoCloseable {
public void clearExpireEphemeralDate() { public void clearExpireEphemeralDate() {
String sql = "delete from t_ds_mysql_registry_data where last_update_time < ? and type = ?"; String sql = "delete from t_ds_mysql_registry_data where last_update_time < ? and type = ?";
try (Connection connection = dataSource.getConnection(); try (
PreparedStatement preparedStatement = connection.prepareStatement(sql)) { Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setTimestamp(1, new Timestamp(System.currentTimeMillis() - expireTimeWindow)); preparedStatement.setTimestamp(1, new Timestamp(System.currentTimeMillis() - expireTimeWindow));
preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue()); preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue());
int i = preparedStatement.executeUpdate(); int i = preparedStatement.executeUpdate();
@ -188,9 +204,11 @@ public class MysqlOperator implements AutoCloseable {
} }
public MysqlRegistryData getData(String key) throws SQLException { public MysqlRegistryData getData(String key) throws SQLException {
String sql = "SELECT id, `key`, data, type, create_time, last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?"; String sql =
try (Connection connection = dataSource.getConnection(); "SELECT id, `key`, data, type, create_time, last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?";
PreparedStatement preparedStatement = connection.prepareStatement(sql)) { try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, key); preparedStatement.setString(1, key);
try (ResultSet resultSet = preparedStatement.executeQuery()) { try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) { if (!resultSet.next()) {
@ -210,8 +228,9 @@ public class MysqlOperator implements AutoCloseable {
public List<String> getChildren(String key) throws SQLException { public List<String> getChildren(String key) throws SQLException {
String sql = "SELECT `key` from t_ds_mysql_registry_data where `key` like ?"; String sql = "SELECT `key` from t_ds_mysql_registry_data where `key` like ?";
try (Connection connection = dataSource.getConnection(); try (
PreparedStatement preparedStatement = connection.prepareStatement(sql)) { Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, key + "%"); preparedStatement.setString(1, key + "%");
try (ResultSet resultSet = preparedStatement.executeQuery()) { try (ResultSet resultSet = preparedStatement.executeQuery()) {
List<String> result = new ArrayList<>(resultSet.getFetchSize()); List<String> result = new ArrayList<>(resultSet.getFetchSize());
@ -228,8 +247,9 @@ public class MysqlOperator implements AutoCloseable {
public boolean existKey(String key) throws SQLException { public boolean existKey(String key) throws SQLException {
String sql = "SELECT 1 FROM t_ds_mysql_registry_data WHERE `key` = ?"; String sql = "SELECT 1 FROM t_ds_mysql_registry_data WHERE `key` = ?";
try (Connection connection = dataSource.getConnection(); try (
PreparedStatement preparedStatement = connection.prepareStatement(sql)) { Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, key); preparedStatement.setString(1, key);
try (ResultSet resultSet = preparedStatement.executeQuery()) { try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) { if (resultSet.next()) {
@ -244,9 +264,12 @@ public class MysqlOperator implements AutoCloseable {
* Try to acquire the target Lock, if cannot acquire, return null. * Try to acquire the target Lock, if cannot acquire, return null.
*/ */
public MysqlRegistryLock tryToAcquireLock(String key) throws SQLException { public MysqlRegistryLock tryToAcquireLock(String key) throws SQLException {
String sql = "INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner, last_term, last_update_time, create_time) VALUES (?, ?, current_timestamp, current_timestamp, current_timestamp)"; String sql =
try (Connection connection = dataSource.getConnection(); "INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner, last_term, last_update_time, create_time) VALUES (?, ?, current_timestamp, current_timestamp, current_timestamp)";
PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
preparedStatement.setString(1, key); preparedStatement.setString(1, key);
preparedStatement.setString(2, MysqlRegistryConstant.LOCK_OWNER); preparedStatement.setString(2, MysqlRegistryConstant.LOCK_OWNER);
preparedStatement.executeUpdate(); preparedStatement.executeUpdate();
@ -264,9 +287,11 @@ public class MysqlOperator implements AutoCloseable {
} }
public MysqlRegistryLock getLockById(long lockId) throws SQLException { public MysqlRegistryLock getLockById(long lockId) throws SQLException {
String sql = "SELECT `id`, `key`, lock_owner, last_term, last_update_time, create_time FROM t_ds_mysql_registry_lock WHERE id = ?"; String sql =
try (Connection connection = dataSource.getConnection(); "SELECT `id`, `key`, lock_owner, last_term, last_update_time, create_time FROM t_ds_mysql_registry_lock WHERE id = ?";
PreparedStatement preparedStatement = connection.prepareStatement(sql)) { try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setLong(1, lockId); preparedStatement.setLong(1, lockId);
try (ResultSet resultSet = preparedStatement.executeQuery()) { try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) { if (resultSet.next()) {
@ -287,8 +312,9 @@ public class MysqlOperator implements AutoCloseable {
// release the lock // release the lock
public boolean releaseLock(long lockId) throws SQLException { public boolean releaseLock(long lockId) throws SQLException {
String sql = "DELETE FROM t_ds_mysql_registry_lock WHERE id = ?"; String sql = "DELETE FROM t_ds_mysql_registry_lock WHERE id = ?";
try (Connection connection = dataSource.getConnection(); try (
PreparedStatement preparedStatement = connection.prepareStatement(sql)) { Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setLong(1, lockId); preparedStatement.setLong(1, lockId);
int i = preparedStatement.executeUpdate(); int i = preparedStatement.executeUpdate();
return i > 0; return i > 0;
@ -297,20 +323,23 @@ public class MysqlOperator implements AutoCloseable {
public boolean updateEphemeralDataTerm(Collection<Long> ephemeralDateIds) throws SQLException { public boolean updateEphemeralDataTerm(Collection<Long> ephemeralDateIds) throws SQLException {
String sql = "update t_ds_mysql_registry_data set `last_update_time` = current_timestamp() where `id` IN (?)"; String sql = "update t_ds_mysql_registry_data set `last_update_time` = current_timestamp() where `id` IN (?)";
try (Connection connection = dataSource.getConnection(); String ids = ephemeralDateIds.stream().map(String::valueOf).collect(Collectors.joining(","));
PreparedStatement preparedStatement = connection.prepareStatement(sql)) { try (
Array idArray = connection.createArrayOf("bigint", ephemeralDateIds.toArray()); Connection connection = dataSource.getConnection();
preparedStatement.setArray(1, idArray); PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, ids);
return preparedStatement.executeUpdate() > 0; return preparedStatement.executeUpdate() > 0;
} }
} }
public boolean updateLockTerm(List<Long> lockIds) throws SQLException { public boolean updateLockTerm(List<Long> lockIds) throws SQLException {
String sql = "update t_ds_mysql_registry_lock set `last_term` = current_timestamp and `last_update_time` = current_timestamp where `id` IN (?)"; String sql =
try (Connection connection = dataSource.getConnection(); "update t_ds_mysql_registry_lock set `last_term` = current_timestamp and `last_update_time` = current_timestamp where `id` IN (?)";
PreparedStatement preparedStatement = connection.prepareStatement(sql)) { String ids = lockIds.stream().map(String::valueOf).collect(Collectors.joining(","));
Array idArray = connection.createArrayOf("bigint", lockIds.toArray()); try (
preparedStatement.setArray(1, idArray); Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, ids);
return preparedStatement.executeUpdate() > 0; return preparedStatement.executeUpdate() > 0;
} }
} }

1
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java

@ -148,6 +148,7 @@ public class EphemeralDateManager implements AutoCloseable {
mysqlOperator.clearExpireEphemeralDate(); mysqlOperator.clearExpireEphemeralDate();
return ConnectionState.CONNECTED; return ConnectionState.CONNECTED;
} catch (Exception ex) { } catch (Exception ex) {
LOGGER.error("Get connection state error, meet an unknown exception", ex);
return ConnectionState.DISCONNECTED; return ConnectionState.DISCONNECTED;
} }
} }

Loading…
Cancel
Save