From 04785dc799200d052fa7367ca4a5ebbd9784f036 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Mon, 26 Dec 2022 11:09:25 +0800 Subject: [PATCH] Add mybatis in mysql registry module (#13275) --- .../dao/DaoConfiguration.java | 2 +- .../dolphinscheduler-registry-mysql/pom.xml | 6 + .../plugin/registry/mysql/MysqlOperator.java | 401 ++++-------------- .../plugin/registry/mysql/MysqlRegistry.java | 13 +- .../mysql/MysqlRegistryConfiguration.java | 61 +++ .../mysql/mapper/MysqlRegistryDataMapper.java | 61 +++ .../mysql/mapper/MysqlRegistryLockMapper.java | 48 +++ .../mysql/model/MysqlRegistryData.java | 15 +- .../mysql/model/MysqlRegistryLock.java | 14 +- .../mysql/task/EphemeralDateManager.java | 2 +- 10 files changed, 289 insertions(+), 334 deletions(-) create mode 100644 dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConfiguration.java create mode 100644 dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryDataMapper.java create mode 100644 dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryLockMapper.java diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java index 68fd2cae66..d1de5630a8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java @@ -26,6 +26,6 @@ import org.springframework.context.annotation.Configuration; @Configuration @EnableAutoConfiguration -@MapperScan("org.apache.dolphinscheduler.dao.mapper") +@MapperScan(basePackages = "org.apache.dolphinscheduler.dao.mapper", sqlSessionFactoryRef = "sqlSessionFactory") public class DaoConfiguration { } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml index fd7f0925e2..dac8423d5e 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml @@ -50,6 +50,12 @@ org.slf4j slf4j-api + + + com.baomidou + mybatis-plus + + diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java index adfa41324a..a770fe75c7 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java @@ -17,412 +17,167 @@ package org.apache.dolphinscheduler.plugin.registry.mysql; +import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryDataMapper; +import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryLockMapper; import org.apache.dolphinscheduler.plugin.registry.mysql.model.DataType; import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData; import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLIntegrityConstraintViolationException; -import java.sql.Statement; -import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; import java.util.List; -import java.util.Optional; - -import lombok.NonNull; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; -import com.zaxxer.hikari.HikariConfig; -import com.zaxxer.hikari.HikariDataSource; - -/** - * Used to CRUD from mysql - */ -public class MysqlOperator implements AutoCloseable { +@Component +@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "mysql") +public class MysqlOperator { private static final Logger logger = LoggerFactory.getLogger(MysqlOperator.class); - private final HikariDataSource dataSource; + @Autowired + private MysqlRegistryDataMapper mysqlRegistryDataMapper; + @Autowired + private MysqlRegistryLockMapper mysqlRegistryLockMapper; + private final long expireTimeWindow; public MysqlOperator(MysqlRegistryProperties registryProperties) { this.expireTimeWindow = registryProperties.getTermExpireTimes() * registryProperties.getTermRefreshInterval().toMillis(); - - HikariConfig hikariConfig = registryProperties.getHikariConfig(); - hikariConfig.setPoolName("MysqlRegistryDataSourcePool"); - - this.dataSource = new HikariDataSource(hikariConfig); } - public void healthCheck() throws SQLException { - String sql = "select 1 from t_ds_mysql_registry_data"; - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql); - ResultSet resultSet = preparedStatement.executeQuery();) { - // if no exception, the healthCheck success - } + public void healthCheck() { + mysqlRegistryLockMapper.countAll(); } - public List queryAllMysqlRegistryData() throws SQLException { - String sql = - "select id, `key`, data, type, last_term, create_time, last_update_time from t_ds_mysql_registry_data"; - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql); - ResultSet resultSet = preparedStatement.executeQuery()) { - List result = new ArrayList<>(resultSet.getFetchSize()); - while (resultSet.next()) { - MysqlRegistryData mysqlRegistryData = MysqlRegistryData.builder() - .id(resultSet.getLong("id")) - .key(resultSet.getString("key")) - .data(resultSet.getString("data")) - .type(resultSet.getInt("type")) - .lastTerm(resultSet.getLong("last_term")) - .createTime(resultSet.getTimestamp("create_time")) - .lastUpdateTime(resultSet.getTimestamp("last_update_time")) - .build(); - result.add(mysqlRegistryData); - } - return result; - } + public List queryAllMysqlRegistryData() { + return mysqlRegistryDataMapper.selectAll(); } public Long insertOrUpdateEphemeralData(String key, String value) throws SQLException { - Optional mysqlRegistryDataOptional = selectByKey(key); - if (mysqlRegistryDataOptional.isPresent()) { - long id = mysqlRegistryDataOptional.get().getId(); - if (!updateValueById(id, value)) { + MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMapper.selectByKey(key); + if (mysqlRegistryData != null) { + long id = mysqlRegistryData.getId(); + if (mysqlRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) { throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value)); } return id; } - MysqlRegistryData mysqlRegistryData = MysqlRegistryData.builder() + mysqlRegistryData = MysqlRegistryData.builder() .key(key) .data(value) .type(DataType.EPHEMERAL.getTypeValue()) .lastTerm(System.currentTimeMillis()) .build(); - return insertMysqlRegistryData(mysqlRegistryData); - } - - private Optional selectByKey(@NonNull String key) throws SQLException { - String sql = - "select id, `key`, data, type, create_time, last_update_time from t_ds_mysql_registry_data where `key` = ?"; - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { - preparedStatement.setString(1, key); - try (ResultSet resultSet = preparedStatement.executeQuery()) { - if (resultSet.next()) { - return Optional.of( - MysqlRegistryData.builder() - .id(resultSet.getLong("id")) - .key(resultSet.getString("key")) - .data(resultSet.getString("data")) - .type(resultSet.getInt("type")) - .createTime(resultSet.getTimestamp("create_time")) - .lastUpdateTime(resultSet.getTimestamp("last_update_time")) - .build()); - } - return Optional.empty(); - } - } - } - - private boolean updateValueById(long id, String value) throws SQLException { - String sql = "update t_ds_mysql_registry_data set data = ?, last_term = ? where id = ?"; - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { - preparedStatement.setString(1, value); - preparedStatement.setLong(2, System.currentTimeMillis()); - preparedStatement.setLong(3, id); - return preparedStatement.executeUpdate() > 0; - } - } - - private long insertMysqlRegistryData(@NonNull MysqlRegistryData mysqlRegistryData) throws SQLException { - String sql = - "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, last_term) VALUES (?, ?, ?, ?)"; - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = - connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { - preparedStatement.setString(1, mysqlRegistryData.getKey()); - preparedStatement.setString(2, mysqlRegistryData.getData()); - preparedStatement.setInt(3, mysqlRegistryData.getType()); - preparedStatement.setLong(4, mysqlRegistryData.getLastTerm()); - int insertCount = preparedStatement.executeUpdate(); - ResultSet generatedKeys = preparedStatement.getGeneratedKeys(); - if (insertCount < 1 || !generatedKeys.next()) { - throw new SQLException("Insert ephemeral data error, data: " + mysqlRegistryData); - } - return generatedKeys.getLong(1); - } + mysqlRegistryDataMapper.insert(mysqlRegistryData); + return mysqlRegistryData.getId(); } public long insertOrUpdatePersistentData(String key, String value) throws SQLException { - String sql = - "INSERT INTO t_ds_mysql_registry_data (`key`, data, type, last_term) VALUES (?, ?, ?, ?)" - + - "ON DUPLICATE KEY UPDATE data=?, last_term=?"; - // put a persistent Data - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = - connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { - long term = System.currentTimeMillis(); - preparedStatement.setString(1, key); - preparedStatement.setString(2, value); - preparedStatement.setInt(3, DataType.PERSISTENT.getTypeValue()); - preparedStatement.setLong(4, term); - preparedStatement.setString(5, value); - preparedStatement.setLong(6, term); - int insertCount = preparedStatement.executeUpdate(); - ResultSet generatedKeys = preparedStatement.getGeneratedKeys(); - if (insertCount < 1 || !generatedKeys.next()) { - throw new SQLException("Insert or update persistent data error"); + MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMapper.selectByKey(key); + if (mysqlRegistryData != null) { + long id = mysqlRegistryData.getId(); + if (mysqlRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) { + throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value)); } - return generatedKeys.getLong(1); - } - } - - public void deleteEphemeralData(String key) throws SQLException { - String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and type = ?"; - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { - preparedStatement.setString(1, key); - preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue()); - preparedStatement.execute(); + return id; } + mysqlRegistryData = MysqlRegistryData.builder() + .key(key) + .data(value) + .type(DataType.PERSISTENT.getTypeValue()) + .lastTerm(System.currentTimeMillis()) + .build(); + mysqlRegistryDataMapper.insert(mysqlRegistryData); + return mysqlRegistryData.getId(); } - public void deleteEphemeralData(long ephemeralNodeId) throws SQLException { - String sql = "DELETE from t_ds_mysql_registry_data where `id` = ?"; - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { - preparedStatement.setLong(1, ephemeralNodeId); - preparedStatement.execute(); - } + public void deleteDataByKey(String key) { + mysqlRegistryDataMapper.deleteByKey(key); } - public void deletePersistentData(String key) throws SQLException { - String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and type = ?"; - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { - preparedStatement.setString(1, key); - preparedStatement.setInt(2, DataType.PERSISTENT.getTypeValue()); - preparedStatement.execute(); - } + public void deleteDataById(long id) { + mysqlRegistryDataMapper.deleteById(id); } public void clearExpireLock() { - String sql = "delete from t_ds_mysql_registry_lock where last_term < ?"; - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { - preparedStatement.setLong(1, System.currentTimeMillis() - expireTimeWindow); - int i = preparedStatement.executeUpdate(); - if (i > 0) { - logger.info("Clear expire lock, size: {}", i); - } - } catch (Exception ex) { - logger.warn("Clear expire lock from mysql registry error", ex); - } + mysqlRegistryLockMapper.clearExpireLock(System.currentTimeMillis() - expireTimeWindow); } public void clearExpireEphemeralDate() { - String sql = "delete from t_ds_mysql_registry_data where last_term < ? and type = ?"; - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { - preparedStatement.setLong(1, System.currentTimeMillis() - expireTimeWindow); - preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue()); - int i = preparedStatement.executeUpdate(); - if (i > 0) { - logger.info("clear expire ephemeral data, size:{}", i); - } - } catch (Exception ex) { - logger.warn("Clear expire ephemeral data from mysql registry error", ex); - } + mysqlRegistryDataMapper.clearExpireEphemeralDate(System.currentTimeMillis() - expireTimeWindow, + DataType.EPHEMERAL.getTypeValue()); } public MysqlRegistryData getData(String key) throws SQLException { - String sql = - "SELECT id, `key`, data, type, last_term, create_time, last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?"; - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { - preparedStatement.setString(1, key); - try (ResultSet resultSet = preparedStatement.executeQuery()) { - if (!resultSet.next()) { - return null; - } - return MysqlRegistryData.builder() - .id(resultSet.getLong("id")) - .key(resultSet.getString("key")) - .data(resultSet.getString("data")) - .type(resultSet.getInt("type")) - .lastTerm(resultSet.getLong("last_term")) - .createTime(resultSet.getTimestamp("create_time")) - .lastUpdateTime(resultSet.getTimestamp("last_update_time")) - .build(); - } - } + return mysqlRegistryDataMapper.selectByKey(key); } public List getChildren(String key) throws SQLException { - String sql = "SELECT `key` from t_ds_mysql_registry_data where `key` like ?"; - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { - preparedStatement.setString(1, key + "%"); - try (ResultSet resultSet = preparedStatement.executeQuery()) { - List result = new ArrayList<>(resultSet.getFetchSize()); - while (resultSet.next()) { - String fullPath = resultSet.getString("key"); - if (fullPath.length() > key.length()) { - result.add(StringUtils.substringBefore(fullPath.substring(key.length() + 1), "/")); - } - } - return result; - } - } + return mysqlRegistryDataMapper.fuzzyQueryByKey(key) + .stream() + .map(MysqlRegistryData::getKey) + .filter(fullPath -> fullPath.length() > key.length()) + .map(fullPath -> StringUtils.substringBefore(fullPath.substring(key.length() + 1), "/")) + .collect(Collectors.toList()); } public boolean existKey(String key) throws SQLException { - String sql = "SELECT 1 FROM t_ds_mysql_registry_data WHERE `key` = ?"; - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { - preparedStatement.setString(1, key); - try (ResultSet resultSet = preparedStatement.executeQuery()) { - if (resultSet.next()) { - return true; - } - } - } - return false; + MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMapper.selectByKey(key); + return mysqlRegistryData != null; } /** * Try to acquire the target Lock, if cannot acquire, return null. */ + @SuppressWarnings("checkstyle:IllegalCatch") public MysqlRegistryLock tryToAcquireLock(String key) throws SQLException { - String sql = - "INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner, last_term) VALUES (?, ?, ?)"; - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = - connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { - preparedStatement.setString(1, key); - preparedStatement.setString(2, MysqlRegistryConstant.LOCK_OWNER); - preparedStatement.setLong(3, System.currentTimeMillis()); - preparedStatement.executeUpdate(); - try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) { - if (resultSet.next()) { - long newLockId = resultSet.getLong(1); - return getLockById(newLockId); - } + MysqlRegistryLock mysqlRegistryLock = MysqlRegistryLock.builder() + .key(key) + .lockOwner(MysqlRegistryConstant.LOCK_OWNER) + .lastTerm(System.currentTimeMillis()) + .build(); + try { + mysqlRegistryLockMapper.insert(mysqlRegistryLock); + return mysqlRegistryLock; + } catch (Exception e) { + if (e instanceof SQLIntegrityConstraintViolationException) { + return null; } - return null; - } catch (SQLIntegrityConstraintViolationException e) { - // duplicate exception - return null; + throw e; } } public MysqlRegistryLock getLockById(long lockId) throws SQLException { - String sql = - "SELECT `id`, `key`, lock_owner, last_term, last_update_time, create_time FROM t_ds_mysql_registry_lock WHERE id = ?"; - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { - preparedStatement.setLong(1, lockId); - try (ResultSet resultSet = preparedStatement.executeQuery()) { - if (resultSet.next()) { - return MysqlRegistryLock.builder() - .id(resultSet.getLong("id")) - .key(resultSet.getString("key")) - .lockOwner(resultSet.getString("lock_owner")) - .lastTerm(resultSet.getLong("last_term")) - .lastUpdateTime(resultSet.getTimestamp("last_update_time")) - .createTime(resultSet.getTimestamp("create_time")) - .build(); - } - } - return null; - } + return mysqlRegistryLockMapper.selectById(lockId); } - // release the lock public boolean releaseLock(long lockId) throws SQLException { - String sql = "DELETE FROM t_ds_mysql_registry_lock WHERE id = ?"; - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { - preparedStatement.setLong(1, lockId); - int i = preparedStatement.executeUpdate(); - return i > 0; - } + return mysqlRegistryLockMapper.deleteById(lockId) > 0; } public boolean updateEphemeralDataTerm(Collection ephemeralDateIds) throws SQLException { - StringBuilder sb = new StringBuilder("update t_ds_mysql_registry_data set `last_term` = ? where `id` IN ("); - Iterator iterator = ephemeralDateIds.iterator(); - for (int i = 0; i < ephemeralDateIds.size(); i++) { - sb.append(iterator.next()); - if (i != ephemeralDateIds.size() - 1) { - sb.append(","); - } - } - sb.append(")"); - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sb.toString())) { - preparedStatement.setLong(1, System.currentTimeMillis()); - return preparedStatement.executeUpdate() > 0; + if (CollectionUtils.isEmpty(ephemeralDateIds)) { + return true; } + return mysqlRegistryDataMapper.updateTermByIds(ephemeralDateIds, System.currentTimeMillis()) > 0; } - public boolean updateLockTerm(List lockIds) throws SQLException { - StringBuilder sb = - new StringBuilder("update t_ds_mysql_registry_lock set `last_term` = ? where `id` IN ("); - Iterator iterator = lockIds.iterator(); - for (int i = 0; i < lockIds.size(); i++) { - sb.append(iterator.next()); - if (i != lockIds.size() - 1) { - sb.append(","); - } - } - sb.append(")"); - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sb.toString())) { - preparedStatement.setLong(1, System.currentTimeMillis()); - return preparedStatement.executeUpdate() > 0; + public boolean updateLockTerm(List lockIds) { + if (CollectionUtils.isEmpty(lockIds)) { + return true; } + return mysqlRegistryLockMapper.updateTermByIds(lockIds, System.currentTimeMillis()) > 0; } - @Override - public void close() throws Exception { - if (!dataSource.isClosed()) { - try (HikariDataSource closedDatasource = this.dataSource) { - - } - } - } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java index d5caae9f00..5a72e80f58 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java @@ -53,10 +53,11 @@ public class MysqlRegistry implements Registry { private final EphemeralDateManager ephemeralDateManager; private final SubscribeDataManager subscribeDataManager; private final RegistryLockManager registryLockManager; - private final MysqlOperator mysqlOperator; + private MysqlOperator mysqlOperator; - public MysqlRegistry(MysqlRegistryProperties mysqlRegistryProperties) { - this.mysqlOperator = new MysqlOperator(mysqlRegistryProperties); + public MysqlRegistry(MysqlRegistryProperties mysqlRegistryProperties, + MysqlOperator mysqlOperator) { + this.mysqlOperator = mysqlOperator; mysqlOperator.clearExpireLock(); mysqlOperator.clearExpireEphemeralDate(); this.mysqlRegistryProperties = mysqlRegistryProperties; @@ -138,8 +139,7 @@ public class MysqlRegistry implements Registry { @Override public void delete(String key) { try { - mysqlOperator.deleteEphemeralData(key); - mysqlOperator.deletePersistentData(key); + mysqlOperator.deleteDataByKey(key); } catch (Exception e) { throw new RegistryException(String.format("Delete key: %s error", key), e); } @@ -188,8 +188,7 @@ public class MysqlRegistry implements Registry { try ( EphemeralDateManager closed1 = ephemeralDateManager; SubscribeDataManager close2 = subscribeDataManager; - RegistryLockManager close3 = registryLockManager; - MysqlOperator closed4 = mysqlOperator) { + RegistryLockManager close3 = registryLockManager) { } catch (Exception e) { LOGGER.error("Close Mysql Registry error", e); } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConfiguration.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConfiguration.java new file mode 100644 index 0000000000..70a1baae53 --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConfiguration.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.registry.mysql; + +import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryDataMapper; +import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryLockMapper; + +import org.apache.ibatis.session.SqlSessionFactory; + +import org.mybatis.spring.SqlSessionTemplate; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean; +import com.zaxxer.hikari.HikariDataSource; + +@Configuration +@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "mysql") +public class MysqlRegistryConfiguration { + + @Bean + public SqlSessionFactory mysqlRegistrySqlSessionFactory(MysqlRegistryProperties mysqlRegistryProperties) throws Exception { + MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean(); + sqlSessionFactoryBean.setDataSource(new HikariDataSource(mysqlRegistryProperties.getHikariConfig())); + return sqlSessionFactoryBean.getObject(); + } + + @Bean + public SqlSessionTemplate mysqlRegistrySqlSessionTemplate(SqlSessionFactory mysqlRegistrySqlSessionFactory) { + mysqlRegistrySqlSessionFactory.getConfiguration().addMapper(MysqlRegistryDataMapper.class); + mysqlRegistrySqlSessionFactory.getConfiguration().addMapper(MysqlRegistryLockMapper.class); + return new SqlSessionTemplate(mysqlRegistrySqlSessionFactory); + } + + @Bean + public MysqlRegistryDataMapper mysqlRegistryDataMapper(SqlSessionTemplate mysqlRegistrySqlSessionTemplate) { + return mysqlRegistrySqlSessionTemplate.getMapper(MysqlRegistryDataMapper.class); + } + + @Bean + public MysqlRegistryLockMapper mysqlRegistryLockMapper(SqlSessionTemplate mysqlRegistrySqlSessionTemplate) { + return mysqlRegistrySqlSessionTemplate.getMapper(MysqlRegistryLockMapper.class); + } + +} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryDataMapper.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryDataMapper.java new file mode 100644 index 0000000000..ef0cd5c3cc --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryDataMapper.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.registry.mysql.mapper; + +import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData; + +import org.apache.ibatis.annotations.Delete; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; +import org.apache.ibatis.annotations.Update; + +import java.util.Collection; +import java.util.List; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +public interface MysqlRegistryDataMapper extends BaseMapper { + + @Select("select * from t_ds_mysql_registry_data") + List selectAll(); + + @Select("select * from t_ds_mysql_registry_data where `key` = #{key}") + MysqlRegistryData selectByKey(@Param("key") String key); + + @Select("select * from t_ds_mysql_registry_data where `key` like CONCAT (#{key}, '%')") + List fuzzyQueryByKey(@Param("key") String key); + + @Update("update t_ds_mysql_registry_data set `data` = #{data}, `last_term` = #{term} where `id` = #{id}") + int updateDataAndTermById(@Param("id") long id, @Param("data") String data, @Param("term") long term); + + @Delete("delete from t_ds_mysql_registry_data where `key` = #{key}") + void deleteByKey(@Param("key") String key); + + @Delete("delete from t_ds_mysql_registry_data where `last_term` < #{term} and `type` = #{type}") + void clearExpireEphemeralDate(@Param("term") long term, @Param("type") int type); + + @Update({""}) + int updateTermByIds(@Param("ids") Collection ids, @Param("term") long term); +} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryLockMapper.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryLockMapper.java new file mode 100644 index 0000000000..5e22a7473a --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryLockMapper.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.registry.mysql.mapper; + +import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock; + +import org.apache.ibatis.annotations.Delete; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; +import org.apache.ibatis.annotations.Update; + +import java.util.Collection; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +public interface MysqlRegistryLockMapper extends BaseMapper { + + @Select("select count(1) from t_ds_mysql_registry_lock") + int countAll(); + + @Delete("delete from t_ds_mysql_registry_lock where `last_term` < #{term}") + void clearExpireLock(@Param("term") long term); + + @Update({""}) + int updateTermByIds(@Param("ids") Collection ids, @Param("term") long term); +} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java index e9ff6c81de..a4f80db608 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java @@ -24,18 +24,31 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; + +@TableName(value = "t_ds_mysql_registry_data") @Data @Builder @NoArgsConstructor @AllArgsConstructor public class MysqlRegistryData { - private long id; + @TableId(value = "id", type = IdType.AUTO) + private Long id; + @TableField(value = "`key`") private String key; + @TableField(value = "`data`") private String data; + @TableField(value = "`type`") private int type; + @TableField(value = "`last_term`") private long lastTerm; + @TableField(value = "`create_time`") private Date createTime; + @TableField(value = "`last_time`") private Date lastUpdateTime; } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java index a86e4afe2c..2739a61561 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java @@ -24,31 +24,43 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; + +@TableName(value = "t_ds_mysql_registry_lock") @Data @Builder @NoArgsConstructor @AllArgsConstructor public class MysqlRegistryLock { - private long id; + @TableId(value = "id", type = IdType.AUTO) + private Long id; /** * The lock key. */ + @TableField(value = "`key`") private String key; /** * acquire lock host. */ + @TableField(value = "`lock_owner`") private String lockOwner; /** * The last term, if the (currentTime - lastTerm) > termExpire time, the lock will be expired. */ + @TableField(value = "`last_term`") private Long lastTerm; /** * The lock last update time. */ + @TableField(value = "`last_update_time`") private Date lastUpdateTime; /** * The lock create time. */ + @TableField(value = "`create_time`") private Date createTime; } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java index d17806c6bd..dc4a55797f 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java @@ -89,7 +89,7 @@ public class EphemeralDateManager implements AutoCloseable { connectionListeners.clear(); scheduledExecutorService.shutdownNow(); for (Long ephemeralDateId : ephemeralDateIds) { - mysqlOperator.deleteEphemeralData(ephemeralDateId); + mysqlOperator.deleteDataById(ephemeralDateId); } }