Browse Source

cherry-pick Use bigint represent term in mysql registry to avoid time inaccurate #13082

3.1.3-release
Wenjun Ruan 2 years ago committed by zhuangchong
parent
commit
fa78b95a76
  1. 183
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
  2. 1
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
  3. 2
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
  4. 25
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql

183
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java

@ -30,10 +30,13 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -70,10 +73,12 @@ public class MysqlOperator implements AutoCloseable {
}
public List<MysqlRegistryData> queryAllMysqlRegistryData() throws SQLException {
String sql = "select id, `key`, data, type, create_time, last_update_time from t_ds_mysql_registry_data";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery()) {
String sql =
"select id, `key`, data, type, last_term, create_time, last_update_time from t_ds_mysql_registry_data";
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery()) {
List<MysqlRegistryData> result = new ArrayList<>(resultSet.getFetchSize());
while (resultSet.next()) {
MysqlRegistryData mysqlRegistryData = MysqlRegistryData.builder()
@ -81,6 +86,7 @@ public class MysqlOperator implements AutoCloseable {
.key(resultSet.getString("key"))
.data(resultSet.getString("data"))
.type(resultSet.getInt("type"))
.lastTerm(resultSet.getLong("last_term"))
.createTime(resultSet.getTimestamp("create_time"))
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
.build();
@ -90,35 +96,97 @@ public class MysqlOperator implements AutoCloseable {
}
}
public long insertOrUpdateEphemeralData(String key, String value) throws SQLException {
String sql = "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, current_timestamp)" +
"ON DUPLICATE KEY UPDATE data=?, last_update_time=current_timestamp";
// put a ephemeralData
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
public Long insertOrUpdateEphemeralData(String key, String value) throws SQLException {
Optional<MysqlRegistryData> mysqlRegistryDataOptional = selectByKey(key);
if (mysqlRegistryDataOptional.isPresent()) {
long id = mysqlRegistryDataOptional.get().getId();
if (!updateValueById(id, value)) {
throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value));
}
return id;
}
MysqlRegistryData mysqlRegistryData = MysqlRegistryData.builder()
.key(key)
.data(value)
.type(DataType.EPHEMERAL.getTypeValue())
.lastTerm(System.currentTimeMillis())
.build();
return insertMysqlRegistryData(mysqlRegistryData);
}
private Optional<MysqlRegistryData> selectByKey(@NonNull String key) throws SQLException {
String sql =
"select id, `key`, data, type, create_time, last_update_time from t_ds_mysql_registry_data where `key` = ?";
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, key);
preparedStatement.setString(2, value);
preparedStatement.setInt(3, DataType.EPHEMERAL.getTypeValue());
preparedStatement.setString(4, value);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) {
return Optional.of(
MysqlRegistryData.builder()
.id(resultSet.getLong("id"))
.key(resultSet.getString("key"))
.data(resultSet.getString("data"))
.type(resultSet.getInt("type"))
.createTime(resultSet.getTimestamp("create_time"))
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
.build());
}
return Optional.empty();
}
}
}
private boolean updateValueById(long id, String value) throws SQLException {
String sql = "update t_ds_mysql_registry_data set data = ?, last_term = ? where id = ?";
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, value);
preparedStatement.setLong(2, System.currentTimeMillis());
preparedStatement.setLong(3, id);
return preparedStatement.executeUpdate() > 0;
}
}
private long insertMysqlRegistryData(@NonNull MysqlRegistryData mysqlRegistryData) throws SQLException {
String sql =
"INSERT INTO t_ds_mysql_registry_data (`key`, data, type, last_term) VALUES (?, ?, ?, ?)";
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
preparedStatement.setString(1, mysqlRegistryData.getKey());
preparedStatement.setString(2, mysqlRegistryData.getData());
preparedStatement.setInt(3, mysqlRegistryData.getType());
preparedStatement.setLong(4, mysqlRegistryData.getLastTerm());
int insertCount = preparedStatement.executeUpdate();
ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
if (insertCount < 1 || !generatedKeys.next()) {
throw new SQLException("Insert or update ephemeral data error");
throw new SQLException("Insert ephemeral data error, data: " + mysqlRegistryData);
}
return generatedKeys.getLong(1);
}
}
public long insertOrUpdatePersistentData(String key, String value) throws SQLException {
String sql = "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, create_time, last_update_time) VALUES (?, ?, ?, current_timestamp, current_timestamp)" +
"ON DUPLICATE KEY UPDATE data=?, last_update_time=current_timestamp";
String sql =
"INSERT INTO t_ds_mysql_registry_data (`key`, data, type, last_term) VALUES (?, ?, ?, ?)"
+
"ON DUPLICATE KEY UPDATE data=?, last_term=?";
// put a persistent Data
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
long term = System.currentTimeMillis();
preparedStatement.setString(1, key);
preparedStatement.setString(2, value);
preparedStatement.setInt(3, DataType.PERSISTENT.getTypeValue());
preparedStatement.setString(4, value);
preparedStatement.setLong(4, term);
preparedStatement.setString(5, value);
preparedStatement.setLong(6, term);
int insertCount = preparedStatement.executeUpdate();
ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
if (insertCount < 1 || !generatedKeys.next()) {
@ -159,10 +227,10 @@ public class MysqlOperator implements AutoCloseable {
public void clearExpireLock() {
String sql = "delete from t_ds_mysql_registry_lock where last_term < ?";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setTimestamp(1,
new Timestamp(System.currentTimeMillis() - expireTimeWindow));
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setLong(1, System.currentTimeMillis() - expireTimeWindow);
int i = preparedStatement.executeUpdate();
if (i > 0) {
logger.info("Clear expire lock, size: {}", i);
@ -173,10 +241,11 @@ public class MysqlOperator implements AutoCloseable {
}
public void clearExpireEphemeralDate() {
String sql = "delete from t_ds_mysql_registry_data where last_update_time < ? and type = ?";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setTimestamp(1, new Timestamp(System.currentTimeMillis() - expireTimeWindow));
String sql = "delete from t_ds_mysql_registry_data where last_term < ? and type = ?";
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setLong(1, System.currentTimeMillis() - expireTimeWindow);
preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue());
int i = preparedStatement.executeUpdate();
if (i > 0) {
@ -188,9 +257,11 @@ public class MysqlOperator implements AutoCloseable {
}
public MysqlRegistryData getData(String key) throws SQLException {
String sql = "SELECT id, `key`, data, type, create_time, last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
String sql =
"SELECT id, `key`, data, type, last_term, create_time, last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?";
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, key);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
@ -201,6 +272,7 @@ public class MysqlOperator implements AutoCloseable {
.key(resultSet.getString("key"))
.data(resultSet.getString("data"))
.type(resultSet.getInt("type"))
.lastTerm(resultSet.getLong("last_term"))
.createTime(resultSet.getTimestamp("create_time"))
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
.build();
@ -244,11 +316,15 @@ public class MysqlOperator implements AutoCloseable {
* Try to acquire the target Lock, if cannot acquire, return null.
*/
public MysqlRegistryLock tryToAcquireLock(String key) throws SQLException {
String sql = "INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner, last_term, last_update_time, create_time) VALUES (?, ?, current_timestamp, current_timestamp, current_timestamp)";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
String sql =
"INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner, last_term) VALUES (?, ?, ?)";
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
preparedStatement.setString(1, key);
preparedStatement.setString(2, MysqlRegistryConstant.LOCK_OWNER);
preparedStatement.setLong(3, System.currentTimeMillis());
preparedStatement.executeUpdate();
try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
if (resultSet.next()) {
@ -274,7 +350,7 @@ public class MysqlOperator implements AutoCloseable {
.id(resultSet.getLong("id"))
.key(resultSet.getString("key"))
.lockOwner(resultSet.getString("lock_owner"))
.lastTerm(resultSet.getTimestamp("last_term"))
.lastTerm(resultSet.getLong("last_term"))
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
.createTime(resultSet.getTimestamp("create_time"))
.build();
@ -296,21 +372,38 @@ public class MysqlOperator implements AutoCloseable {
}
public boolean updateEphemeralDataTerm(Collection<Long> ephemeralDateIds) throws SQLException {
String sql = "update t_ds_mysql_registry_data set `last_update_time` = current_timestamp() where `id` IN (?)";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
Array idArray = connection.createArrayOf("bigint", ephemeralDateIds.toArray());
preparedStatement.setArray(1, idArray);
StringBuilder sb = new StringBuilder("update t_ds_mysql_registry_data set `last_term` = ? where `id` IN (");
Iterator<Long> iterator = ephemeralDateIds.iterator();
for (int i = 0; i < ephemeralDateIds.size(); i++) {
sb.append(iterator.next());
if (i != ephemeralDateIds.size() - 1) {
sb.append(",");
}
}
sb.append(")");
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sb.toString())) {
preparedStatement.setLong(1, System.currentTimeMillis());
return preparedStatement.executeUpdate() > 0;
}
}
public boolean updateLockTerm(List<Long> lockIds) throws SQLException {
String sql = "update t_ds_mysql_registry_lock set `last_term` = current_timestamp and `last_update_time` = current_timestamp where `id` IN (?)";
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
Array idArray = connection.createArrayOf("bigint", lockIds.toArray());
preparedStatement.setArray(1, idArray);
StringBuilder sb =
new StringBuilder("update t_ds_mysql_registry_lock set `last_term` = ? where `id` IN (");
Iterator<Long> iterator = lockIds.iterator();
for (int i = 0; i < lockIds.size(); i++) {
sb.append(iterator.next());
if (i != lockIds.size() - 1) {
sb.append(",");
}
}
sb.append(")");
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sb.toString())) {
preparedStatement.setLong(1, System.currentTimeMillis());
return preparedStatement.executeUpdate() > 0;
}
}

1
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java

@ -34,6 +34,7 @@ public class MysqlRegistryData {
private String key;
private String data;
private int type;
private long lastTerm;
private Date createTime;
private Date lastUpdateTime;

2
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java

@ -42,7 +42,7 @@ public class MysqlRegistryLock {
/**
* The last term, if the (currentTime - lastTerm) > termExpire time, the lock will be expired.
*/
private Date lastTerm;
private Long lastTerm;
/**
* The lock last update time.
*/

25
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/resources/mysql_registry_init.sql

@ -20,12 +20,13 @@ SET FOREIGN_KEY_CHECKS = 0;
DROP TABLE IF EXISTS `t_ds_mysql_registry_data`;
CREATE TABLE `t_ds_mysql_registry_data`
(
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
`key` varchar(200) NOT NULL COMMENT 'key, like zookeeper node path',
`data` varchar(200) NOT NULL COMMENT 'data, like zookeeper node value',
`type` tinyint(4) NOT NULL COMMENT '1: ephemeral node, 2: persistent node',
`last_update_time` timestamp NULL COMMENT 'last update time',
`create_time` timestamp NULL COMMENT 'create time',
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
`key` varchar(256) NOT NULL COMMENT 'key, like zookeeper node path',
`data` text NOT NULL COMMENT 'data, like zookeeper node value',
`type` tinyint(4) NOT NULL COMMENT '1: ephemeral node, 2: persistent node',
`last_term` bigint NOT NULL COMMENT 'last term time',
`last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'last update time',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
PRIMARY KEY (`id`),
unique (`key`)
) ENGINE = InnoDB
@ -35,12 +36,12 @@ CREATE TABLE `t_ds_mysql_registry_data`
DROP TABLE IF EXISTS `t_ds_mysql_registry_lock`;
CREATE TABLE `t_ds_mysql_registry_lock`
(
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
`key` varchar(200) NOT NULL COMMENT 'lock path',
`lock_owner` varchar(100) NOT NULL COMMENT 'the lock owner, ip_processId',
`last_term` timestamp NOT NULL COMMENT 'last term time',
`last_update_time` timestamp NULL COMMENT 'last update time',
`create_time` timestamp NULL COMMENT 'lock create time',
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
`key` varchar(256) NOT NULL COMMENT 'lock path',
`lock_owner` varchar(256) NOT NULL COMMENT 'the lock owner, ip_processId',
`last_term` bigint NOT NULL COMMENT 'last term time',
`last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'last update time',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
PRIMARY KEY (`id`),
unique (`key`)
) ENGINE = InnoDB

Loading…
Cancel
Save