diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java
index 68fd2cae66..d1de5630a8 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java
@@ -26,6 +26,6 @@ import org.springframework.context.annotation.Configuration;
@Configuration
@EnableAutoConfiguration
-@MapperScan("org.apache.dolphinscheduler.dao.mapper")
+@MapperScan(basePackages = "org.apache.dolphinscheduler.dao.mapper", sqlSessionFactoryRef = "sqlSessionFactory")
public class DaoConfiguration {
}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml
index fd7f0925e2..dac8423d5e 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml
@@ -50,6 +50,12 @@
org.slf4j
slf4j-api
+
+
+ com.baomidou
+ mybatis-plus
+
+
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
index adfa41324a..a770fe75c7 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
@@ -17,412 +17,167 @@
package org.apache.dolphinscheduler.plugin.registry.mysql;
+import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryDataMapper;
+import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryLockMapper;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.DataType;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
-import java.sql.Statement;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
-import java.util.Optional;
-
-import lombok.NonNull;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
-import com.zaxxer.hikari.HikariConfig;
-import com.zaxxer.hikari.HikariDataSource;
-
-/**
- * Used to CRUD from mysql
- */
-public class MysqlOperator implements AutoCloseable {
+@Component
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "mysql")
+public class MysqlOperator {
private static final Logger logger = LoggerFactory.getLogger(MysqlOperator.class);
- private final HikariDataSource dataSource;
+ @Autowired
+ private MysqlRegistryDataMapper mysqlRegistryDataMapper;
+ @Autowired
+ private MysqlRegistryLockMapper mysqlRegistryLockMapper;
+
private final long expireTimeWindow;
public MysqlOperator(MysqlRegistryProperties registryProperties) {
this.expireTimeWindow =
registryProperties.getTermExpireTimes() * registryProperties.getTermRefreshInterval().toMillis();
-
- HikariConfig hikariConfig = registryProperties.getHikariConfig();
- hikariConfig.setPoolName("MysqlRegistryDataSourcePool");
-
- this.dataSource = new HikariDataSource(hikariConfig);
}
- public void healthCheck() throws SQLException {
- String sql = "select 1 from t_ds_mysql_registry_data";
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql);
- ResultSet resultSet = preparedStatement.executeQuery();) {
- // if no exception, the healthCheck success
- }
+ public void healthCheck() {
+ mysqlRegistryLockMapper.countAll();
}
- public List queryAllMysqlRegistryData() throws SQLException {
- String sql =
- "select id, `key`, data, type, last_term, create_time, last_update_time from t_ds_mysql_registry_data";
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql);
- ResultSet resultSet = preparedStatement.executeQuery()) {
- List result = new ArrayList<>(resultSet.getFetchSize());
- while (resultSet.next()) {
- MysqlRegistryData mysqlRegistryData = MysqlRegistryData.builder()
- .id(resultSet.getLong("id"))
- .key(resultSet.getString("key"))
- .data(resultSet.getString("data"))
- .type(resultSet.getInt("type"))
- .lastTerm(resultSet.getLong("last_term"))
- .createTime(resultSet.getTimestamp("create_time"))
- .lastUpdateTime(resultSet.getTimestamp("last_update_time"))
- .build();
- result.add(mysqlRegistryData);
- }
- return result;
- }
+ public List queryAllMysqlRegistryData() {
+ return mysqlRegistryDataMapper.selectAll();
}
public Long insertOrUpdateEphemeralData(String key, String value) throws SQLException {
- Optional mysqlRegistryDataOptional = selectByKey(key);
- if (mysqlRegistryDataOptional.isPresent()) {
- long id = mysqlRegistryDataOptional.get().getId();
- if (!updateValueById(id, value)) {
+ MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMapper.selectByKey(key);
+ if (mysqlRegistryData != null) {
+ long id = mysqlRegistryData.getId();
+ if (mysqlRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) {
throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value));
}
return id;
}
- MysqlRegistryData mysqlRegistryData = MysqlRegistryData.builder()
+ mysqlRegistryData = MysqlRegistryData.builder()
.key(key)
.data(value)
.type(DataType.EPHEMERAL.getTypeValue())
.lastTerm(System.currentTimeMillis())
.build();
- return insertMysqlRegistryData(mysqlRegistryData);
- }
-
- private Optional selectByKey(@NonNull String key) throws SQLException {
- String sql =
- "select id, `key`, data, type, create_time, last_update_time from t_ds_mysql_registry_data where `key` = ?";
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- preparedStatement.setString(1, key);
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- if (resultSet.next()) {
- return Optional.of(
- MysqlRegistryData.builder()
- .id(resultSet.getLong("id"))
- .key(resultSet.getString("key"))
- .data(resultSet.getString("data"))
- .type(resultSet.getInt("type"))
- .createTime(resultSet.getTimestamp("create_time"))
- .lastUpdateTime(resultSet.getTimestamp("last_update_time"))
- .build());
- }
- return Optional.empty();
- }
- }
- }
-
- private boolean updateValueById(long id, String value) throws SQLException {
- String sql = "update t_ds_mysql_registry_data set data = ?, last_term = ? where id = ?";
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- preparedStatement.setString(1, value);
- preparedStatement.setLong(2, System.currentTimeMillis());
- preparedStatement.setLong(3, id);
- return preparedStatement.executeUpdate() > 0;
- }
- }
-
- private long insertMysqlRegistryData(@NonNull MysqlRegistryData mysqlRegistryData) throws SQLException {
- String sql =
- "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, last_term) VALUES (?, ?, ?, ?)";
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement =
- connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
- preparedStatement.setString(1, mysqlRegistryData.getKey());
- preparedStatement.setString(2, mysqlRegistryData.getData());
- preparedStatement.setInt(3, mysqlRegistryData.getType());
- preparedStatement.setLong(4, mysqlRegistryData.getLastTerm());
- int insertCount = preparedStatement.executeUpdate();
- ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
- if (insertCount < 1 || !generatedKeys.next()) {
- throw new SQLException("Insert ephemeral data error, data: " + mysqlRegistryData);
- }
- return generatedKeys.getLong(1);
- }
+ mysqlRegistryDataMapper.insert(mysqlRegistryData);
+ return mysqlRegistryData.getId();
}
public long insertOrUpdatePersistentData(String key, String value) throws SQLException {
- String sql =
- "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, last_term) VALUES (?, ?, ?, ?)"
- +
- "ON DUPLICATE KEY UPDATE data=?, last_term=?";
- // put a persistent Data
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement =
- connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
- long term = System.currentTimeMillis();
- preparedStatement.setString(1, key);
- preparedStatement.setString(2, value);
- preparedStatement.setInt(3, DataType.PERSISTENT.getTypeValue());
- preparedStatement.setLong(4, term);
- preparedStatement.setString(5, value);
- preparedStatement.setLong(6, term);
- int insertCount = preparedStatement.executeUpdate();
- ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
- if (insertCount < 1 || !generatedKeys.next()) {
- throw new SQLException("Insert or update persistent data error");
+ MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMapper.selectByKey(key);
+ if (mysqlRegistryData != null) {
+ long id = mysqlRegistryData.getId();
+ if (mysqlRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) {
+ throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value));
}
- return generatedKeys.getLong(1);
- }
- }
-
- public void deleteEphemeralData(String key) throws SQLException {
- String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and type = ?";
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- preparedStatement.setString(1, key);
- preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue());
- preparedStatement.execute();
+ return id;
}
+ mysqlRegistryData = MysqlRegistryData.builder()
+ .key(key)
+ .data(value)
+ .type(DataType.PERSISTENT.getTypeValue())
+ .lastTerm(System.currentTimeMillis())
+ .build();
+ mysqlRegistryDataMapper.insert(mysqlRegistryData);
+ return mysqlRegistryData.getId();
}
- public void deleteEphemeralData(long ephemeralNodeId) throws SQLException {
- String sql = "DELETE from t_ds_mysql_registry_data where `id` = ?";
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- preparedStatement.setLong(1, ephemeralNodeId);
- preparedStatement.execute();
- }
+ public void deleteDataByKey(String key) {
+ mysqlRegistryDataMapper.deleteByKey(key);
}
- public void deletePersistentData(String key) throws SQLException {
- String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and type = ?";
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- preparedStatement.setString(1, key);
- preparedStatement.setInt(2, DataType.PERSISTENT.getTypeValue());
- preparedStatement.execute();
- }
+ public void deleteDataById(long id) {
+ mysqlRegistryDataMapper.deleteById(id);
}
public void clearExpireLock() {
- String sql = "delete from t_ds_mysql_registry_lock where last_term < ?";
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- preparedStatement.setLong(1, System.currentTimeMillis() - expireTimeWindow);
- int i = preparedStatement.executeUpdate();
- if (i > 0) {
- logger.info("Clear expire lock, size: {}", i);
- }
- } catch (Exception ex) {
- logger.warn("Clear expire lock from mysql registry error", ex);
- }
+ mysqlRegistryLockMapper.clearExpireLock(System.currentTimeMillis() - expireTimeWindow);
}
public void clearExpireEphemeralDate() {
- String sql = "delete from t_ds_mysql_registry_data where last_term < ? and type = ?";
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- preparedStatement.setLong(1, System.currentTimeMillis() - expireTimeWindow);
- preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue());
- int i = preparedStatement.executeUpdate();
- if (i > 0) {
- logger.info("clear expire ephemeral data, size:{}", i);
- }
- } catch (Exception ex) {
- logger.warn("Clear expire ephemeral data from mysql registry error", ex);
- }
+ mysqlRegistryDataMapper.clearExpireEphemeralDate(System.currentTimeMillis() - expireTimeWindow,
+ DataType.EPHEMERAL.getTypeValue());
}
public MysqlRegistryData getData(String key) throws SQLException {
- String sql =
- "SELECT id, `key`, data, type, last_term, create_time, last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?";
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- preparedStatement.setString(1, key);
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- if (!resultSet.next()) {
- return null;
- }
- return MysqlRegistryData.builder()
- .id(resultSet.getLong("id"))
- .key(resultSet.getString("key"))
- .data(resultSet.getString("data"))
- .type(resultSet.getInt("type"))
- .lastTerm(resultSet.getLong("last_term"))
- .createTime(resultSet.getTimestamp("create_time"))
- .lastUpdateTime(resultSet.getTimestamp("last_update_time"))
- .build();
- }
- }
+ return mysqlRegistryDataMapper.selectByKey(key);
}
public List getChildren(String key) throws SQLException {
- String sql = "SELECT `key` from t_ds_mysql_registry_data where `key` like ?";
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- preparedStatement.setString(1, key + "%");
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- List result = new ArrayList<>(resultSet.getFetchSize());
- while (resultSet.next()) {
- String fullPath = resultSet.getString("key");
- if (fullPath.length() > key.length()) {
- result.add(StringUtils.substringBefore(fullPath.substring(key.length() + 1), "/"));
- }
- }
- return result;
- }
- }
+ return mysqlRegistryDataMapper.fuzzyQueryByKey(key)
+ .stream()
+ .map(MysqlRegistryData::getKey)
+ .filter(fullPath -> fullPath.length() > key.length())
+ .map(fullPath -> StringUtils.substringBefore(fullPath.substring(key.length() + 1), "/"))
+ .collect(Collectors.toList());
}
public boolean existKey(String key) throws SQLException {
- String sql = "SELECT 1 FROM t_ds_mysql_registry_data WHERE `key` = ?";
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- preparedStatement.setString(1, key);
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- if (resultSet.next()) {
- return true;
- }
- }
- }
- return false;
+ MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMapper.selectByKey(key);
+ return mysqlRegistryData != null;
}
/**
* Try to acquire the target Lock, if cannot acquire, return null.
*/
+ @SuppressWarnings("checkstyle:IllegalCatch")
public MysqlRegistryLock tryToAcquireLock(String key) throws SQLException {
- String sql =
- "INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner, last_term) VALUES (?, ?, ?)";
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement =
- connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
- preparedStatement.setString(1, key);
- preparedStatement.setString(2, MysqlRegistryConstant.LOCK_OWNER);
- preparedStatement.setLong(3, System.currentTimeMillis());
- preparedStatement.executeUpdate();
- try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
- if (resultSet.next()) {
- long newLockId = resultSet.getLong(1);
- return getLockById(newLockId);
- }
+ MysqlRegistryLock mysqlRegistryLock = MysqlRegistryLock.builder()
+ .key(key)
+ .lockOwner(MysqlRegistryConstant.LOCK_OWNER)
+ .lastTerm(System.currentTimeMillis())
+ .build();
+ try {
+ mysqlRegistryLockMapper.insert(mysqlRegistryLock);
+ return mysqlRegistryLock;
+ } catch (Exception e) {
+ if (e instanceof SQLIntegrityConstraintViolationException) {
+ return null;
}
- return null;
- } catch (SQLIntegrityConstraintViolationException e) {
- // duplicate exception
- return null;
+ throw e;
}
}
public MysqlRegistryLock getLockById(long lockId) throws SQLException {
- String sql =
- "SELECT `id`, `key`, lock_owner, last_term, last_update_time, create_time FROM t_ds_mysql_registry_lock WHERE id = ?";
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- preparedStatement.setLong(1, lockId);
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- if (resultSet.next()) {
- return MysqlRegistryLock.builder()
- .id(resultSet.getLong("id"))
- .key(resultSet.getString("key"))
- .lockOwner(resultSet.getString("lock_owner"))
- .lastTerm(resultSet.getLong("last_term"))
- .lastUpdateTime(resultSet.getTimestamp("last_update_time"))
- .createTime(resultSet.getTimestamp("create_time"))
- .build();
- }
- }
- return null;
- }
+ return mysqlRegistryLockMapper.selectById(lockId);
}
- // release the lock
public boolean releaseLock(long lockId) throws SQLException {
- String sql = "DELETE FROM t_ds_mysql_registry_lock WHERE id = ?";
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- preparedStatement.setLong(1, lockId);
- int i = preparedStatement.executeUpdate();
- return i > 0;
- }
+ return mysqlRegistryLockMapper.deleteById(lockId) > 0;
}
public boolean updateEphemeralDataTerm(Collection ephemeralDateIds) throws SQLException {
- StringBuilder sb = new StringBuilder("update t_ds_mysql_registry_data set `last_term` = ? where `id` IN (");
- Iterator iterator = ephemeralDateIds.iterator();
- for (int i = 0; i < ephemeralDateIds.size(); i++) {
- sb.append(iterator.next());
- if (i != ephemeralDateIds.size() - 1) {
- sb.append(",");
- }
- }
- sb.append(")");
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sb.toString())) {
- preparedStatement.setLong(1, System.currentTimeMillis());
- return preparedStatement.executeUpdate() > 0;
+ if (CollectionUtils.isEmpty(ephemeralDateIds)) {
+ return true;
}
+ return mysqlRegistryDataMapper.updateTermByIds(ephemeralDateIds, System.currentTimeMillis()) > 0;
}
- public boolean updateLockTerm(List lockIds) throws SQLException {
- StringBuilder sb =
- new StringBuilder("update t_ds_mysql_registry_lock set `last_term` = ? where `id` IN (");
- Iterator iterator = lockIds.iterator();
- for (int i = 0; i < lockIds.size(); i++) {
- sb.append(iterator.next());
- if (i != lockIds.size() - 1) {
- sb.append(",");
- }
- }
- sb.append(")");
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sb.toString())) {
- preparedStatement.setLong(1, System.currentTimeMillis());
- return preparedStatement.executeUpdate() > 0;
+ public boolean updateLockTerm(List lockIds) {
+ if (CollectionUtils.isEmpty(lockIds)) {
+ return true;
}
+ return mysqlRegistryLockMapper.updateTermByIds(lockIds, System.currentTimeMillis()) > 0;
}
- @Override
- public void close() throws Exception {
- if (!dataSource.isClosed()) {
- try (HikariDataSource closedDatasource = this.dataSource) {
-
- }
- }
- }
}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java
index d5caae9f00..5a72e80f58 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java
@@ -53,10 +53,11 @@ public class MysqlRegistry implements Registry {
private final EphemeralDateManager ephemeralDateManager;
private final SubscribeDataManager subscribeDataManager;
private final RegistryLockManager registryLockManager;
- private final MysqlOperator mysqlOperator;
+ private MysqlOperator mysqlOperator;
- public MysqlRegistry(MysqlRegistryProperties mysqlRegistryProperties) {
- this.mysqlOperator = new MysqlOperator(mysqlRegistryProperties);
+ public MysqlRegistry(MysqlRegistryProperties mysqlRegistryProperties,
+ MysqlOperator mysqlOperator) {
+ this.mysqlOperator = mysqlOperator;
mysqlOperator.clearExpireLock();
mysqlOperator.clearExpireEphemeralDate();
this.mysqlRegistryProperties = mysqlRegistryProperties;
@@ -138,8 +139,7 @@ public class MysqlRegistry implements Registry {
@Override
public void delete(String key) {
try {
- mysqlOperator.deleteEphemeralData(key);
- mysqlOperator.deletePersistentData(key);
+ mysqlOperator.deleteDataByKey(key);
} catch (Exception e) {
throw new RegistryException(String.format("Delete key: %s error", key), e);
}
@@ -188,8 +188,7 @@ public class MysqlRegistry implements Registry {
try (
EphemeralDateManager closed1 = ephemeralDateManager;
SubscribeDataManager close2 = subscribeDataManager;
- RegistryLockManager close3 = registryLockManager;
- MysqlOperator closed4 = mysqlOperator) {
+ RegistryLockManager close3 = registryLockManager) {
} catch (Exception e) {
LOGGER.error("Close Mysql Registry error", e);
}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConfiguration.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConfiguration.java
new file mode 100644
index 0000000000..70a1baae53
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConfiguration.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql;
+
+import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryDataMapper;
+import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryLockMapper;
+
+import org.apache.ibatis.session.SqlSessionFactory;
+
+import org.mybatis.spring.SqlSessionTemplate;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
+import com.zaxxer.hikari.HikariDataSource;
+
+@Configuration
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "mysql")
+public class MysqlRegistryConfiguration {
+
+ @Bean
+ public SqlSessionFactory mysqlRegistrySqlSessionFactory(MysqlRegistryProperties mysqlRegistryProperties) throws Exception {
+ MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
+ sqlSessionFactoryBean.setDataSource(new HikariDataSource(mysqlRegistryProperties.getHikariConfig()));
+ return sqlSessionFactoryBean.getObject();
+ }
+
+ @Bean
+ public SqlSessionTemplate mysqlRegistrySqlSessionTemplate(SqlSessionFactory mysqlRegistrySqlSessionFactory) {
+ mysqlRegistrySqlSessionFactory.getConfiguration().addMapper(MysqlRegistryDataMapper.class);
+ mysqlRegistrySqlSessionFactory.getConfiguration().addMapper(MysqlRegistryLockMapper.class);
+ return new SqlSessionTemplate(mysqlRegistrySqlSessionFactory);
+ }
+
+ @Bean
+ public MysqlRegistryDataMapper mysqlRegistryDataMapper(SqlSessionTemplate mysqlRegistrySqlSessionTemplate) {
+ return mysqlRegistrySqlSessionTemplate.getMapper(MysqlRegistryDataMapper.class);
+ }
+
+ @Bean
+ public MysqlRegistryLockMapper mysqlRegistryLockMapper(SqlSessionTemplate mysqlRegistrySqlSessionTemplate) {
+ return mysqlRegistrySqlSessionTemplate.getMapper(MysqlRegistryLockMapper.class);
+ }
+
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryDataMapper.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryDataMapper.java
new file mode 100644
index 0000000000..ef0cd5c3cc
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryDataMapper.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql.mapper;
+
+import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData;
+
+import org.apache.ibatis.annotations.Delete;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Select;
+import org.apache.ibatis.annotations.Update;
+
+import java.util.Collection;
+import java.util.List;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+public interface MysqlRegistryDataMapper extends BaseMapper {
+
+ @Select("select * from t_ds_mysql_registry_data")
+ List selectAll();
+
+ @Select("select * from t_ds_mysql_registry_data where `key` = #{key}")
+ MysqlRegistryData selectByKey(@Param("key") String key);
+
+ @Select("select * from t_ds_mysql_registry_data where `key` like CONCAT (#{key}, '%')")
+ List fuzzyQueryByKey(@Param("key") String key);
+
+ @Update("update t_ds_mysql_registry_data set `data` = #{data}, `last_term` = #{term} where `id` = #{id}")
+ int updateDataAndTermById(@Param("id") long id, @Param("data") String data, @Param("term") long term);
+
+ @Delete("delete from t_ds_mysql_registry_data where `key` = #{key}")
+ void deleteByKey(@Param("key") String key);
+
+ @Delete("delete from t_ds_mysql_registry_data where `last_term` < #{term} and `type` = #{type}")
+ void clearExpireEphemeralDate(@Param("term") long term, @Param("type") int type);
+
+ @Update({""})
+ int updateTermByIds(@Param("ids") Collection ids, @Param("term") long term);
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryLockMapper.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryLockMapper.java
new file mode 100644
index 0000000000..5e22a7473a
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryLockMapper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.mysql.mapper;
+
+import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock;
+
+import org.apache.ibatis.annotations.Delete;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Select;
+import org.apache.ibatis.annotations.Update;
+
+import java.util.Collection;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+public interface MysqlRegistryLockMapper extends BaseMapper {
+
+ @Select("select count(1) from t_ds_mysql_registry_lock")
+ int countAll();
+
+ @Delete("delete from t_ds_mysql_registry_lock where `last_term` < #{term}")
+ void clearExpireLock(@Param("term") long term);
+
+ @Update({""})
+ int updateTermByIds(@Param("ids") Collection ids, @Param("term") long term);
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
index e9ff6c81de..a4f80db608 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
@@ -24,18 +24,31 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+
+@TableName(value = "t_ds_mysql_registry_data")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MysqlRegistryData {
- private long id;
+ @TableId(value = "id", type = IdType.AUTO)
+ private Long id;
+ @TableField(value = "`key`")
private String key;
+ @TableField(value = "`data`")
private String data;
+ @TableField(value = "`type`")
private int type;
+ @TableField(value = "`last_term`")
private long lastTerm;
+ @TableField(value = "`create_time`")
private Date createTime;
+ @TableField(value = "`last_time`")
private Date lastUpdateTime;
}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
index a86e4afe2c..2739a61561 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
@@ -24,31 +24,43 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+
+@TableName(value = "t_ds_mysql_registry_lock")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MysqlRegistryLock {
- private long id;
+ @TableId(value = "id", type = IdType.AUTO)
+ private Long id;
/**
* The lock key.
*/
+ @TableField(value = "`key`")
private String key;
/**
* acquire lock host.
*/
+ @TableField(value = "`lock_owner`")
private String lockOwner;
/**
* The last term, if the (currentTime - lastTerm) > termExpire time, the lock will be expired.
*/
+ @TableField(value = "`last_term`")
private Long lastTerm;
/**
* The lock last update time.
*/
+ @TableField(value = "`last_update_time`")
private Date lastUpdateTime;
/**
* The lock create time.
*/
+ @TableField(value = "`create_time`")
private Date createTime;
}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java
index d17806c6bd..dc4a55797f 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java
@@ -89,7 +89,7 @@ public class EphemeralDateManager implements AutoCloseable {
connectionListeners.clear();
scheduledExecutorService.shutdownNow();
for (Long ephemeralDateId : ephemeralDateIds) {
- mysqlOperator.deleteEphemeralData(ephemeralDateId);
+ mysqlOperator.deleteDataById(ephemeralDateId);
}
}