From 4ce9c82cba6edfc0841b4e509cad6a43f0ec1303 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Sat, 3 Dec 2022 11:51:01 +0800 Subject: [PATCH] Use bigint represent term in mysql registry to avoid time inaccurate (#13082) --- .../plugin/registry/mysql/MysqlOperator.java | 137 ++++++++++++++---- .../mysql/model/MysqlRegistryData.java | 1 + .../mysql/model/MysqlRegistryLock.java | 2 +- .../main/resources/mysql_registry_init.sql | 23 +-- 4 files changed, 119 insertions(+), 44 deletions(-) diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java index 8d7c88f358..adfa41324a 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java @@ -29,11 +29,13 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLIntegrityConstraintViolationException; import java.sql.Statement; -import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; -import java.util.stream.Collectors; +import java.util.Optional; + +import lombok.NonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +74,8 @@ public class MysqlOperator implements AutoCloseable { } public List queryAllMysqlRegistryData() throws SQLException { - String sql = "select id, `key`, data, type, create_time, last_update_time from t_ds_mysql_registry_data"; + String sql = + "select id, `key`, data, type, last_term, create_time, last_update_time from t_ds_mysql_registry_data"; try ( Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(sql); @@ -84,6 +87,7 @@ public class MysqlOperator implements AutoCloseable { .key(resultSet.getString("key")) .data(resultSet.getString("data")) .type(resultSet.getInt("type")) + .lastTerm(resultSet.getLong("last_term")) .createTime(resultSet.getTimestamp("create_time")) .lastUpdateTime(resultSet.getTimestamp("last_update_time")) .build(); @@ -93,24 +97,75 @@ public class MysqlOperator implements AutoCloseable { } } - public long insertOrUpdateEphemeralData(String key, String value) throws SQLException { + public Long insertOrUpdateEphemeralData(String key, String value) throws SQLException { + Optional mysqlRegistryDataOptional = selectByKey(key); + if (mysqlRegistryDataOptional.isPresent()) { + long id = mysqlRegistryDataOptional.get().getId(); + if (!updateValueById(id, value)) { + throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value)); + } + return id; + } + MysqlRegistryData mysqlRegistryData = MysqlRegistryData.builder() + .key(key) + .data(value) + .type(DataType.EPHEMERAL.getTypeValue()) + .lastTerm(System.currentTimeMillis()) + .build(); + return insertMysqlRegistryData(mysqlRegistryData); + } + + private Optional selectByKey(@NonNull String key) throws SQLException { String sql = - "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, current_timestamp)" - + - "ON DUPLICATE KEY UPDATE data=?, last_update_time=current_timestamp"; - // put a ephemeralData + "select id, `key`, data, type, create_time, last_update_time from t_ds_mysql_registry_data where `key` = ?"; + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + preparedStatement.setString(1, key); + try (ResultSet resultSet = preparedStatement.executeQuery()) { + if (resultSet.next()) { + return Optional.of( + MysqlRegistryData.builder() + .id(resultSet.getLong("id")) + .key(resultSet.getString("key")) + .data(resultSet.getString("data")) + .type(resultSet.getInt("type")) + .createTime(resultSet.getTimestamp("create_time")) + .lastUpdateTime(resultSet.getTimestamp("last_update_time")) + .build()); + } + return Optional.empty(); + } + } + } + + private boolean updateValueById(long id, String value) throws SQLException { + String sql = "update t_ds_mysql_registry_data set data = ?, last_term = ? where id = ?"; + try ( + Connection connection = dataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + preparedStatement.setString(1, value); + preparedStatement.setLong(2, System.currentTimeMillis()); + preparedStatement.setLong(3, id); + return preparedStatement.executeUpdate() > 0; + } + } + + private long insertMysqlRegistryData(@NonNull MysqlRegistryData mysqlRegistryData) throws SQLException { + String sql = + "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, last_term) VALUES (?, ?, ?, ?)"; try ( Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { - preparedStatement.setString(1, key); - preparedStatement.setString(2, value); - preparedStatement.setInt(3, DataType.EPHEMERAL.getTypeValue()); - preparedStatement.setString(4, value); + preparedStatement.setString(1, mysqlRegistryData.getKey()); + preparedStatement.setString(2, mysqlRegistryData.getData()); + preparedStatement.setInt(3, mysqlRegistryData.getType()); + preparedStatement.setLong(4, mysqlRegistryData.getLastTerm()); int insertCount = preparedStatement.executeUpdate(); ResultSet generatedKeys = preparedStatement.getGeneratedKeys(); if (insertCount < 1 || !generatedKeys.next()) { - throw new SQLException("Insert or update ephemeral data error"); + throw new SQLException("Insert ephemeral data error, data: " + mysqlRegistryData); } return generatedKeys.getLong(1); } @@ -118,18 +173,21 @@ public class MysqlOperator implements AutoCloseable { public long insertOrUpdatePersistentData(String key, String value) throws SQLException { String sql = - "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, current_timestamp)" + "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, last_term) VALUES (?, ?, ?, ?)" + - "ON DUPLICATE KEY UPDATE data=?, last_update_time=current_timestamp"; + "ON DUPLICATE KEY UPDATE data=?, last_term=?"; // put a persistent Data try ( Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { + long term = System.currentTimeMillis(); preparedStatement.setString(1, key); preparedStatement.setString(2, value); preparedStatement.setInt(3, DataType.PERSISTENT.getTypeValue()); - preparedStatement.setString(4, value); + preparedStatement.setLong(4, term); + preparedStatement.setString(5, value); + preparedStatement.setLong(6, term); int insertCount = preparedStatement.executeUpdate(); ResultSet generatedKeys = preparedStatement.getGeneratedKeys(); if (insertCount < 1 || !generatedKeys.next()) { @@ -176,8 +234,7 @@ public class MysqlOperator implements AutoCloseable { try ( Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(sql)) { - preparedStatement.setTimestamp(1, - new Timestamp(System.currentTimeMillis() - expireTimeWindow)); + preparedStatement.setLong(1, System.currentTimeMillis() - expireTimeWindow); int i = preparedStatement.executeUpdate(); if (i > 0) { logger.info("Clear expire lock, size: {}", i); @@ -188,11 +245,11 @@ public class MysqlOperator implements AutoCloseable { } public void clearExpireEphemeralDate() { - String sql = "delete from t_ds_mysql_registry_data where last_update_time < ? and type = ?"; + String sql = "delete from t_ds_mysql_registry_data where last_term < ? and type = ?"; try ( Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(sql)) { - preparedStatement.setTimestamp(1, new Timestamp(System.currentTimeMillis() - expireTimeWindow)); + preparedStatement.setLong(1, System.currentTimeMillis() - expireTimeWindow); preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue()); int i = preparedStatement.executeUpdate(); if (i > 0) { @@ -205,7 +262,7 @@ public class MysqlOperator implements AutoCloseable { public MysqlRegistryData getData(String key) throws SQLException { String sql = - "SELECT id, `key`, data, type, create_time, last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?"; + "SELECT id, `key`, data, type, last_term, create_time, last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?"; try ( Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(sql)) { @@ -219,6 +276,7 @@ public class MysqlOperator implements AutoCloseable { .key(resultSet.getString("key")) .data(resultSet.getString("data")) .type(resultSet.getInt("type")) + .lastTerm(resultSet.getLong("last_term")) .createTime(resultSet.getTimestamp("create_time")) .lastUpdateTime(resultSet.getTimestamp("last_update_time")) .build(); @@ -265,13 +323,14 @@ public class MysqlOperator implements AutoCloseable { */ public MysqlRegistryLock tryToAcquireLock(String key) throws SQLException { String sql = - "INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner, last_term, last_update_time, create_time) VALUES (?, ?, current_timestamp, current_timestamp, current_timestamp)"; + "INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner, last_term) VALUES (?, ?, ?)"; try ( Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { preparedStatement.setString(1, key); preparedStatement.setString(2, MysqlRegistryConstant.LOCK_OWNER); + preparedStatement.setLong(3, System.currentTimeMillis()); preparedStatement.executeUpdate(); try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) { if (resultSet.next()) { @@ -299,7 +358,7 @@ public class MysqlOperator implements AutoCloseable { .id(resultSet.getLong("id")) .key(resultSet.getString("key")) .lockOwner(resultSet.getString("lock_owner")) - .lastTerm(resultSet.getTimestamp("last_term")) + .lastTerm(resultSet.getLong("last_term")) .lastUpdateTime(resultSet.getTimestamp("last_update_time")) .createTime(resultSet.getTimestamp("create_time")) .build(); @@ -322,24 +381,38 @@ public class MysqlOperator implements AutoCloseable { } public boolean updateEphemeralDataTerm(Collection ephemeralDateIds) throws SQLException { - String sql = "update t_ds_mysql_registry_data set `last_update_time` = current_timestamp() where `id` IN (?)"; - String ids = ephemeralDateIds.stream().map(String::valueOf).collect(Collectors.joining(",")); + StringBuilder sb = new StringBuilder("update t_ds_mysql_registry_data set `last_term` = ? where `id` IN ("); + Iterator iterator = ephemeralDateIds.iterator(); + for (int i = 0; i < ephemeralDateIds.size(); i++) { + sb.append(iterator.next()); + if (i != ephemeralDateIds.size() - 1) { + sb.append(","); + } + } + sb.append(")"); try ( Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { - preparedStatement.setString(1, ids); + PreparedStatement preparedStatement = connection.prepareStatement(sb.toString())) { + preparedStatement.setLong(1, System.currentTimeMillis()); return preparedStatement.executeUpdate() > 0; } } public boolean updateLockTerm(List lockIds) throws SQLException { - String sql = - "update t_ds_mysql_registry_lock set `last_term` = current_timestamp and `last_update_time` = current_timestamp where `id` IN (?)"; - String ids = lockIds.stream().map(String::valueOf).collect(Collectors.joining(",")); + StringBuilder sb = + new StringBuilder("update t_ds_mysql_registry_lock set `last_term` = ? where `id` IN ("); + Iterator iterator = lockIds.iterator(); + for (int i = 0; i < lockIds.size(); i++) { + sb.append(iterator.next()); + if (i != lockIds.size() - 1) { + sb.append(","); + } + } + sb.append(")"); try ( Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { - preparedStatement.setString(1, ids); + PreparedStatement preparedStatement = connection.prepareStatement(sb.toString())) { + preparedStatement.setLong(1, System.currentTimeMillis()); return preparedStatement.executeUpdate() > 0; } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java index e3045881c5..e9ff6c81de 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java @@ -34,6 +34,7 @@ public class MysqlRegistryData { private String key; private String data; private int type; + private long lastTerm; private Date createTime; private Date lastUpdateTime; diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java index 79d718cd4f..a86e4afe2c 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java @@ -42,7 +42,7 @@ public class MysqlRegistryLock { /** * The last term, if the (currentTime - lastTerm) > termExpire time, the lock will be expired. */ - private Date lastTerm; + private Long lastTerm; /** * The lock last update time. */ diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql index c9b22f478e..77b33c4cf7 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql @@ -20,12 +20,13 @@ SET FOREIGN_KEY_CHECKS = 0; DROP TABLE IF EXISTS `t_ds_mysql_registry_data`; CREATE TABLE `t_ds_mysql_registry_data` ( - `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key', - `key` varchar(200) NOT NULL COMMENT 'key, like zookeeper node path', + `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key', + `key` varchar(256) NOT NULL COMMENT 'key, like zookeeper node path', `data` text NOT NULL COMMENT 'data, like zookeeper node value', - `type` tinyint(4) NOT NULL COMMENT '1: ephemeral node, 2: persistent node', - `last_update_time` timestamp NULL COMMENT 'last update time', - `create_time` timestamp NULL COMMENT 'create time', + `type` tinyint(4) NOT NULL COMMENT '1: ephemeral node, 2: persistent node', + `last_term` bigint NOT NULL COMMENT 'last term time', + `last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'last update time', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time', PRIMARY KEY (`id`), unique (`key`) ) ENGINE = InnoDB @@ -35,12 +36,12 @@ CREATE TABLE `t_ds_mysql_registry_data` DROP TABLE IF EXISTS `t_ds_mysql_registry_lock`; CREATE TABLE `t_ds_mysql_registry_lock` ( - `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key', - `key` varchar(200) NOT NULL COMMENT 'lock path', - `lock_owner` varchar(100) NOT NULL COMMENT 'the lock owner, ip_processId', - `last_term` timestamp NOT NULL COMMENT 'last term time', - `last_update_time` timestamp NULL COMMENT 'last update time', - `create_time` timestamp NULL COMMENT 'lock create time', + `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key', + `key` varchar(256) NOT NULL COMMENT 'lock path', + `lock_owner` varchar(256) NOT NULL COMMENT 'the lock owner, ip_processId', + `last_term` bigint NOT NULL COMMENT 'last term time', + `last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'last update time', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time', PRIMARY KEY (`id`), unique (`key`) ) ENGINE = InnoDB