Browse Source

Add mybatis in mysql registry module (#13275)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
04785dc799
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java
  2. 6
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml
  3. 401
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java
  4. 13
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java
  5. 61
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConfiguration.java
  6. 61
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryDataMapper.java
  7. 48
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryLockMapper.java
  8. 15
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java
  9. 14
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java
  10. 2
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/DaoConfiguration.java

@ -26,6 +26,6 @@ import org.springframework.context.annotation.Configuration;
@Configuration @Configuration
@EnableAutoConfiguration @EnableAutoConfiguration
@MapperScan("org.apache.dolphinscheduler.dao.mapper") @MapperScan(basePackages = "org.apache.dolphinscheduler.dao.mapper", sqlSessionFactoryRef = "sqlSessionFactory")
public class DaoConfiguration { public class DaoConfiguration {
} }

6
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/pom.xml

@ -50,6 +50,12 @@
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

401
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlOperator.java

@ -17,412 +17,167 @@
package org.apache.dolphinscheduler.plugin.registry.mysql; package org.apache.dolphinscheduler.plugin.registry.mysql;
import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryDataMapper;
import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryLockMapper;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.DataType; import org.apache.dolphinscheduler.plugin.registry.mysql.model.DataType;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData; import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock; import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException; import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.stream.Collectors;
import lombok.NonNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import com.zaxxer.hikari.HikariConfig; @Component
import com.zaxxer.hikari.HikariDataSource; @ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "mysql")
public class MysqlOperator {
/**
* Used to CRUD from mysql
*/
public class MysqlOperator implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(MysqlOperator.class); private static final Logger logger = LoggerFactory.getLogger(MysqlOperator.class);
private final HikariDataSource dataSource; @Autowired
private MysqlRegistryDataMapper mysqlRegistryDataMapper;
@Autowired
private MysqlRegistryLockMapper mysqlRegistryLockMapper;
private final long expireTimeWindow; private final long expireTimeWindow;
public MysqlOperator(MysqlRegistryProperties registryProperties) { public MysqlOperator(MysqlRegistryProperties registryProperties) {
this.expireTimeWindow = this.expireTimeWindow =
registryProperties.getTermExpireTimes() * registryProperties.getTermRefreshInterval().toMillis(); registryProperties.getTermExpireTimes() * registryProperties.getTermRefreshInterval().toMillis();
HikariConfig hikariConfig = registryProperties.getHikariConfig();
hikariConfig.setPoolName("MysqlRegistryDataSourcePool");
this.dataSource = new HikariDataSource(hikariConfig);
} }
public void healthCheck() throws SQLException { public void healthCheck() {
String sql = "select 1 from t_ds_mysql_registry_data"; mysqlRegistryLockMapper.countAll();
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery();) {
// if no exception, the healthCheck success
}
} }
public List<MysqlRegistryData> queryAllMysqlRegistryData() throws SQLException { public List<MysqlRegistryData> queryAllMysqlRegistryData() {
String sql = return mysqlRegistryDataMapper.selectAll();
"select id, `key`, data, type, last_term, create_time, last_update_time from t_ds_mysql_registry_data";
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery()) {
List<MysqlRegistryData> result = new ArrayList<>(resultSet.getFetchSize());
while (resultSet.next()) {
MysqlRegistryData mysqlRegistryData = MysqlRegistryData.builder()
.id(resultSet.getLong("id"))
.key(resultSet.getString("key"))
.data(resultSet.getString("data"))
.type(resultSet.getInt("type"))
.lastTerm(resultSet.getLong("last_term"))
.createTime(resultSet.getTimestamp("create_time"))
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
.build();
result.add(mysqlRegistryData);
}
return result;
}
} }
public Long insertOrUpdateEphemeralData(String key, String value) throws SQLException { public Long insertOrUpdateEphemeralData(String key, String value) throws SQLException {
Optional<MysqlRegistryData> mysqlRegistryDataOptional = selectByKey(key); MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMapper.selectByKey(key);
if (mysqlRegistryDataOptional.isPresent()) { if (mysqlRegistryData != null) {
long id = mysqlRegistryDataOptional.get().getId(); long id = mysqlRegistryData.getId();
if (!updateValueById(id, value)) { if (mysqlRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) {
throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value)); throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value));
} }
return id; return id;
} }
MysqlRegistryData mysqlRegistryData = MysqlRegistryData.builder() mysqlRegistryData = MysqlRegistryData.builder()
.key(key) .key(key)
.data(value) .data(value)
.type(DataType.EPHEMERAL.getTypeValue()) .type(DataType.EPHEMERAL.getTypeValue())
.lastTerm(System.currentTimeMillis()) .lastTerm(System.currentTimeMillis())
.build(); .build();
return insertMysqlRegistryData(mysqlRegistryData); mysqlRegistryDataMapper.insert(mysqlRegistryData);
} return mysqlRegistryData.getId();
private Optional<MysqlRegistryData> selectByKey(@NonNull String key) throws SQLException {
String sql =
"select id, `key`, data, type, create_time, last_update_time from t_ds_mysql_registry_data where `key` = ?";
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, key);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) {
return Optional.of(
MysqlRegistryData.builder()
.id(resultSet.getLong("id"))
.key(resultSet.getString("key"))
.data(resultSet.getString("data"))
.type(resultSet.getInt("type"))
.createTime(resultSet.getTimestamp("create_time"))
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
.build());
}
return Optional.empty();
}
}
}
private boolean updateValueById(long id, String value) throws SQLException {
String sql = "update t_ds_mysql_registry_data set data = ?, last_term = ? where id = ?";
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, value);
preparedStatement.setLong(2, System.currentTimeMillis());
preparedStatement.setLong(3, id);
return preparedStatement.executeUpdate() > 0;
}
}
private long insertMysqlRegistryData(@NonNull MysqlRegistryData mysqlRegistryData) throws SQLException {
String sql =
"INSERT INTO t_ds_mysql_registry_data (`key`, data, type, last_term) VALUES (?, ?, ?, ?)";
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
preparedStatement.setString(1, mysqlRegistryData.getKey());
preparedStatement.setString(2, mysqlRegistryData.getData());
preparedStatement.setInt(3, mysqlRegistryData.getType());
preparedStatement.setLong(4, mysqlRegistryData.getLastTerm());
int insertCount = preparedStatement.executeUpdate();
ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
if (insertCount < 1 || !generatedKeys.next()) {
throw new SQLException("Insert ephemeral data error, data: " + mysqlRegistryData);
}
return generatedKeys.getLong(1);
}
} }
public long insertOrUpdatePersistentData(String key, String value) throws SQLException { public long insertOrUpdatePersistentData(String key, String value) throws SQLException {
String sql = MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMapper.selectByKey(key);
"INSERT INTO t_ds_mysql_registry_data (`key`, data, type, last_term) VALUES (?, ?, ?, ?)" if (mysqlRegistryData != null) {
+ long id = mysqlRegistryData.getId();
"ON DUPLICATE KEY UPDATE data=?, last_term=?"; if (mysqlRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) {
// put a persistent Data throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value));
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
long term = System.currentTimeMillis();
preparedStatement.setString(1, key);
preparedStatement.setString(2, value);
preparedStatement.setInt(3, DataType.PERSISTENT.getTypeValue());
preparedStatement.setLong(4, term);
preparedStatement.setString(5, value);
preparedStatement.setLong(6, term);
int insertCount = preparedStatement.executeUpdate();
ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
if (insertCount < 1 || !generatedKeys.next()) {
throw new SQLException("Insert or update persistent data error");
}
return generatedKeys.getLong(1);
}
} }
return id;
public void deleteEphemeralData(String key) throws SQLException {
String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and type = ?";
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, key);
preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue());
preparedStatement.execute();
} }
mysqlRegistryData = MysqlRegistryData.builder()
.key(key)
.data(value)
.type(DataType.PERSISTENT.getTypeValue())
.lastTerm(System.currentTimeMillis())
.build();
mysqlRegistryDataMapper.insert(mysqlRegistryData);
return mysqlRegistryData.getId();
} }
public void deleteEphemeralData(long ephemeralNodeId) throws SQLException { public void deleteDataByKey(String key) {
String sql = "DELETE from t_ds_mysql_registry_data where `id` = ?"; mysqlRegistryDataMapper.deleteByKey(key);
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setLong(1, ephemeralNodeId);
preparedStatement.execute();
}
} }
public void deletePersistentData(String key) throws SQLException { public void deleteDataById(long id) {
String sql = "DELETE from t_ds_mysql_registry_data where `key` = ? and type = ?"; mysqlRegistryDataMapper.deleteById(id);
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, key);
preparedStatement.setInt(2, DataType.PERSISTENT.getTypeValue());
preparedStatement.execute();
}
} }
public void clearExpireLock() { public void clearExpireLock() {
String sql = "delete from t_ds_mysql_registry_lock where last_term < ?"; mysqlRegistryLockMapper.clearExpireLock(System.currentTimeMillis() - expireTimeWindow);
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setLong(1, System.currentTimeMillis() - expireTimeWindow);
int i = preparedStatement.executeUpdate();
if (i > 0) {
logger.info("Clear expire lock, size: {}", i);
}
} catch (Exception ex) {
logger.warn("Clear expire lock from mysql registry error", ex);
}
} }
public void clearExpireEphemeralDate() { public void clearExpireEphemeralDate() {
String sql = "delete from t_ds_mysql_registry_data where last_term < ? and type = ?"; mysqlRegistryDataMapper.clearExpireEphemeralDate(System.currentTimeMillis() - expireTimeWindow,
try ( DataType.EPHEMERAL.getTypeValue());
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setLong(1, System.currentTimeMillis() - expireTimeWindow);
preparedStatement.setInt(2, DataType.EPHEMERAL.getTypeValue());
int i = preparedStatement.executeUpdate();
if (i > 0) {
logger.info("clear expire ephemeral data, size:{}", i);
}
} catch (Exception ex) {
logger.warn("Clear expire ephemeral data from mysql registry error", ex);
}
} }
public MysqlRegistryData getData(String key) throws SQLException { public MysqlRegistryData getData(String key) throws SQLException {
String sql = return mysqlRegistryDataMapper.selectByKey(key);
"SELECT id, `key`, data, type, last_term, create_time, last_update_time FROM t_ds_mysql_registry_data WHERE `key` = ?";
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, key);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
return null;
}
return MysqlRegistryData.builder()
.id(resultSet.getLong("id"))
.key(resultSet.getString("key"))
.data(resultSet.getString("data"))
.type(resultSet.getInt("type"))
.lastTerm(resultSet.getLong("last_term"))
.createTime(resultSet.getTimestamp("create_time"))
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
.build();
}
}
} }
public List<String> getChildren(String key) throws SQLException { public List<String> getChildren(String key) throws SQLException {
String sql = "SELECT `key` from t_ds_mysql_registry_data where `key` like ?"; return mysqlRegistryDataMapper.fuzzyQueryByKey(key)
try ( .stream()
Connection connection = dataSource.getConnection(); .map(MysqlRegistryData::getKey)
PreparedStatement preparedStatement = connection.prepareStatement(sql)) { .filter(fullPath -> fullPath.length() > key.length())
preparedStatement.setString(1, key + "%"); .map(fullPath -> StringUtils.substringBefore(fullPath.substring(key.length() + 1), "/"))
try (ResultSet resultSet = preparedStatement.executeQuery()) { .collect(Collectors.toList());
List<String> result = new ArrayList<>(resultSet.getFetchSize());
while (resultSet.next()) {
String fullPath = resultSet.getString("key");
if (fullPath.length() > key.length()) {
result.add(StringUtils.substringBefore(fullPath.substring(key.length() + 1), "/"));
}
}
return result;
}
}
} }
public boolean existKey(String key) throws SQLException { public boolean existKey(String key) throws SQLException {
String sql = "SELECT 1 FROM t_ds_mysql_registry_data WHERE `key` = ?"; MysqlRegistryData mysqlRegistryData = mysqlRegistryDataMapper.selectByKey(key);
try ( return mysqlRegistryData != null;
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setString(1, key);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) {
return true;
}
}
}
return false;
} }
/** /**
* Try to acquire the target Lock, if cannot acquire, return null. * Try to acquire the target Lock, if cannot acquire, return null.
*/ */
@SuppressWarnings("checkstyle:IllegalCatch")
public MysqlRegistryLock tryToAcquireLock(String key) throws SQLException { public MysqlRegistryLock tryToAcquireLock(String key) throws SQLException {
String sql = MysqlRegistryLock mysqlRegistryLock = MysqlRegistryLock.builder()
"INSERT INTO t_ds_mysql_registry_lock (`key`, lock_owner, last_term) VALUES (?, ?, ?)"; .key(key)
try ( .lockOwner(MysqlRegistryConstant.LOCK_OWNER)
Connection connection = dataSource.getConnection(); .lastTerm(System.currentTimeMillis())
PreparedStatement preparedStatement = .build();
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { try {
preparedStatement.setString(1, key); mysqlRegistryLockMapper.insert(mysqlRegistryLock);
preparedStatement.setString(2, MysqlRegistryConstant.LOCK_OWNER); return mysqlRegistryLock;
preparedStatement.setLong(3, System.currentTimeMillis()); } catch (Exception e) {
preparedStatement.executeUpdate(); if (e instanceof SQLIntegrityConstraintViolationException) {
try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
if (resultSet.next()) {
long newLockId = resultSet.getLong(1);
return getLockById(newLockId);
}
}
return null;
} catch (SQLIntegrityConstraintViolationException e) {
// duplicate exception
return null; return null;
} }
throw e;
}
} }
public MysqlRegistryLock getLockById(long lockId) throws SQLException { public MysqlRegistryLock getLockById(long lockId) throws SQLException {
String sql = return mysqlRegistryLockMapper.selectById(lockId);
"SELECT `id`, `key`, lock_owner, last_term, last_update_time, create_time FROM t_ds_mysql_registry_lock WHERE id = ?";
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setLong(1, lockId);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) {
return MysqlRegistryLock.builder()
.id(resultSet.getLong("id"))
.key(resultSet.getString("key"))
.lockOwner(resultSet.getString("lock_owner"))
.lastTerm(resultSet.getLong("last_term"))
.lastUpdateTime(resultSet.getTimestamp("last_update_time"))
.createTime(resultSet.getTimestamp("create_time"))
.build();
}
}
return null;
}
} }
// release the lock
public boolean releaseLock(long lockId) throws SQLException { public boolean releaseLock(long lockId) throws SQLException {
String sql = "DELETE FROM t_ds_mysql_registry_lock WHERE id = ?"; return mysqlRegistryLockMapper.deleteById(lockId) > 0;
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setLong(1, lockId);
int i = preparedStatement.executeUpdate();
return i > 0;
}
} }
public boolean updateEphemeralDataTerm(Collection<Long> ephemeralDateIds) throws SQLException { public boolean updateEphemeralDataTerm(Collection<Long> ephemeralDateIds) throws SQLException {
StringBuilder sb = new StringBuilder("update t_ds_mysql_registry_data set `last_term` = ? where `id` IN ("); if (CollectionUtils.isEmpty(ephemeralDateIds)) {
Iterator<Long> iterator = ephemeralDateIds.iterator(); return true;
for (int i = 0; i < ephemeralDateIds.size(); i++) {
sb.append(iterator.next());
if (i != ephemeralDateIds.size() - 1) {
sb.append(",");
}
}
sb.append(")");
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sb.toString())) {
preparedStatement.setLong(1, System.currentTimeMillis());
return preparedStatement.executeUpdate() > 0;
} }
return mysqlRegistryDataMapper.updateTermByIds(ephemeralDateIds, System.currentTimeMillis()) > 0;
} }
public boolean updateLockTerm(List<Long> lockIds) throws SQLException { public boolean updateLockTerm(List<Long> lockIds) {
StringBuilder sb = if (CollectionUtils.isEmpty(lockIds)) {
new StringBuilder("update t_ds_mysql_registry_lock set `last_term` = ? where `id` IN ("); return true;
Iterator<Long> iterator = lockIds.iterator();
for (int i = 0; i < lockIds.size(); i++) {
sb.append(iterator.next());
if (i != lockIds.size() - 1) {
sb.append(",");
}
}
sb.append(")");
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sb.toString())) {
preparedStatement.setLong(1, System.currentTimeMillis());
return preparedStatement.executeUpdate() > 0;
} }
return mysqlRegistryLockMapper.updateTermByIds(lockIds, System.currentTimeMillis()) > 0;
} }
@Override
public void close() throws Exception {
if (!dataSource.isClosed()) {
try (HikariDataSource closedDatasource = this.dataSource) {
}
}
}
} }

13
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java

@ -53,10 +53,11 @@ public class MysqlRegistry implements Registry {
private final EphemeralDateManager ephemeralDateManager; private final EphemeralDateManager ephemeralDateManager;
private final SubscribeDataManager subscribeDataManager; private final SubscribeDataManager subscribeDataManager;
private final RegistryLockManager registryLockManager; private final RegistryLockManager registryLockManager;
private final MysqlOperator mysqlOperator; private MysqlOperator mysqlOperator;
public MysqlRegistry(MysqlRegistryProperties mysqlRegistryProperties) { public MysqlRegistry(MysqlRegistryProperties mysqlRegistryProperties,
this.mysqlOperator = new MysqlOperator(mysqlRegistryProperties); MysqlOperator mysqlOperator) {
this.mysqlOperator = mysqlOperator;
mysqlOperator.clearExpireLock(); mysqlOperator.clearExpireLock();
mysqlOperator.clearExpireEphemeralDate(); mysqlOperator.clearExpireEphemeralDate();
this.mysqlRegistryProperties = mysqlRegistryProperties; this.mysqlRegistryProperties = mysqlRegistryProperties;
@ -138,8 +139,7 @@ public class MysqlRegistry implements Registry {
@Override @Override
public void delete(String key) { public void delete(String key) {
try { try {
mysqlOperator.deleteEphemeralData(key); mysqlOperator.deleteDataByKey(key);
mysqlOperator.deletePersistentData(key);
} catch (Exception e) { } catch (Exception e) {
throw new RegistryException(String.format("Delete key: %s error", key), e); throw new RegistryException(String.format("Delete key: %s error", key), e);
} }
@ -188,8 +188,7 @@ public class MysqlRegistry implements Registry {
try ( try (
EphemeralDateManager closed1 = ephemeralDateManager; EphemeralDateManager closed1 = ephemeralDateManager;
SubscribeDataManager close2 = subscribeDataManager; SubscribeDataManager close2 = subscribeDataManager;
RegistryLockManager close3 = registryLockManager; RegistryLockManager close3 = registryLockManager) {
MysqlOperator closed4 = mysqlOperator) {
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("Close Mysql Registry error", e); LOGGER.error("Close Mysql Registry error", e);
} }

61
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistryConfiguration.java

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql;
import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryDataMapper;
import org.apache.dolphinscheduler.plugin.registry.mysql.mapper.MysqlRegistryLockMapper;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import com.zaxxer.hikari.HikariDataSource;
@Configuration
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "mysql")
public class MysqlRegistryConfiguration {
@Bean
public SqlSessionFactory mysqlRegistrySqlSessionFactory(MysqlRegistryProperties mysqlRegistryProperties) throws Exception {
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(new HikariDataSource(mysqlRegistryProperties.getHikariConfig()));
return sqlSessionFactoryBean.getObject();
}
@Bean
public SqlSessionTemplate mysqlRegistrySqlSessionTemplate(SqlSessionFactory mysqlRegistrySqlSessionFactory) {
mysqlRegistrySqlSessionFactory.getConfiguration().addMapper(MysqlRegistryDataMapper.class);
mysqlRegistrySqlSessionFactory.getConfiguration().addMapper(MysqlRegistryLockMapper.class);
return new SqlSessionTemplate(mysqlRegistrySqlSessionFactory);
}
@Bean
public MysqlRegistryDataMapper mysqlRegistryDataMapper(SqlSessionTemplate mysqlRegistrySqlSessionTemplate) {
return mysqlRegistrySqlSessionTemplate.getMapper(MysqlRegistryDataMapper.class);
}
@Bean
public MysqlRegistryLockMapper mysqlRegistryLockMapper(SqlSessionTemplate mysqlRegistrySqlSessionTemplate) {
return mysqlRegistrySqlSessionTemplate.getMapper(MysqlRegistryLockMapper.class);
}
}

61
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryDataMapper.java

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql.mapper;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryData;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import java.util.Collection;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface MysqlRegistryDataMapper extends BaseMapper<MysqlRegistryData> {
@Select("select * from t_ds_mysql_registry_data")
List<MysqlRegistryData> selectAll();
@Select("select * from t_ds_mysql_registry_data where `key` = #{key}")
MysqlRegistryData selectByKey(@Param("key") String key);
@Select("select * from t_ds_mysql_registry_data where `key` like CONCAT (#{key}, '%')")
List<MysqlRegistryData> fuzzyQueryByKey(@Param("key") String key);
@Update("update t_ds_mysql_registry_data set `data` = #{data}, `last_term` = #{term} where `id` = #{id}")
int updateDataAndTermById(@Param("id") long id, @Param("data") String data, @Param("term") long term);
@Delete("delete from t_ds_mysql_registry_data where `key` = #{key}")
void deleteByKey(@Param("key") String key);
@Delete("delete from t_ds_mysql_registry_data where `last_term` < #{term} and `type` = #{type}")
void clearExpireEphemeralDate(@Param("term") long term, @Param("type") int type);
@Update({"<script>",
"update t_ds_mysql_registry_data",
"set `last_term` = #{term}",
"where id IN ",
"<foreach item='id' index='index' collection='ids' open='(' separator=',' close=')'>",
" #{id}",
"</foreach>",
"</script>"})
int updateTermByIds(@Param("ids") Collection<Long> ids, @Param("term") long term);
}

48
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/mapper/MysqlRegistryLockMapper.java

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.mysql.mapper;
import org.apache.dolphinscheduler.plugin.registry.mysql.model.MysqlRegistryLock;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import java.util.Collection;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface MysqlRegistryLockMapper extends BaseMapper<MysqlRegistryLock> {
@Select("select count(1) from t_ds_mysql_registry_lock")
int countAll();
@Delete("delete from t_ds_mysql_registry_lock where `last_term` < #{term}")
void clearExpireLock(@Param("term") long term);
@Update({"<script>",
"update t_ds_mysql_registry_lock",
"set `last_term` = #{term}",
"where id IN ",
"<foreach item='id' index='index' collection='ids' open='(' separator=',' close=')'>",
" #{id}",
"</foreach>",
"</script>"})
int updateTermByIds(@Param("ids") Collection<Long> ids, @Param("term") long term);
}

15
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryData.java

@ -24,18 +24,31 @@ import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@TableName(value = "t_ds_mysql_registry_data")
@Data @Data
@Builder @Builder
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
public class MysqlRegistryData { public class MysqlRegistryData {
private long id; @TableId(value = "id", type = IdType.AUTO)
private Long id;
@TableField(value = "`key`")
private String key; private String key;
@TableField(value = "`data`")
private String data; private String data;
@TableField(value = "`type`")
private int type; private int type;
@TableField(value = "`last_term`")
private long lastTerm; private long lastTerm;
@TableField(value = "`create_time`")
private Date createTime; private Date createTime;
@TableField(value = "`last_time`")
private Date lastUpdateTime; private Date lastUpdateTime;
} }

14
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/model/MysqlRegistryLock.java

@ -24,31 +24,43 @@ import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@TableName(value = "t_ds_mysql_registry_lock")
@Data @Data
@Builder @Builder
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
public class MysqlRegistryLock { public class MysqlRegistryLock {
private long id; @TableId(value = "id", type = IdType.AUTO)
private Long id;
/** /**
* The lock key. * The lock key.
*/ */
@TableField(value = "`key`")
private String key; private String key;
/** /**
* acquire lock host. * acquire lock host.
*/ */
@TableField(value = "`lock_owner`")
private String lockOwner; private String lockOwner;
/** /**
* The last term, if the (currentTime - lastTerm) > termExpire time, the lock will be expired. * The last term, if the (currentTime - lastTerm) > termExpire time, the lock will be expired.
*/ */
@TableField(value = "`last_term`")
private Long lastTerm; private Long lastTerm;
/** /**
* The lock last update time. * The lock last update time.
*/ */
@TableField(value = "`last_update_time`")
private Date lastUpdateTime; private Date lastUpdateTime;
/** /**
* The lock create time. * The lock create time.
*/ */
@TableField(value = "`create_time`")
private Date createTime; private Date createTime;
} }

2
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java

@ -89,7 +89,7 @@ public class EphemeralDateManager implements AutoCloseable {
connectionListeners.clear(); connectionListeners.clear();
scheduledExecutorService.shutdownNow(); scheduledExecutorService.shutdownNow();
for (Long ephemeralDateId : ephemeralDateIds) { for (Long ephemeralDateId : ephemeralDateIds) {
mysqlOperator.deleteEphemeralData(ephemeralDateId); mysqlOperator.deleteDataById(ephemeralDateId);
} }
} }

Loading…
Cancel
Save