Browse Source

Fix registry table field (#14043)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
0a02522420
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcOperator.java
  2. 10
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryDataMapper.java
  3. 13
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/JdbcRegistryData.java
  4. 8
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/JdbcRegistryLock.java
  5. 8
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/SubscribeDataManager.java
  6. 12
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/resources/mysql_registry_init.sql
  7. 12
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/resources/postgresql_registry_init.sql

16
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcOperator.java

@ -70,9 +70,9 @@ public class JdbcOperator {
return id; return id;
} }
jdbcRegistryData = JdbcRegistryData.builder() jdbcRegistryData = JdbcRegistryData.builder()
.key(key) .dataKey(key)
.data(value) .dataValue(value)
.type(DataType.EPHEMERAL.getTypeValue()) .dataType(DataType.EPHEMERAL.getTypeValue())
.lastTerm(System.currentTimeMillis()) .lastTerm(System.currentTimeMillis())
.build(); .build();
jdbcRegistryDataMapper.insert(jdbcRegistryData); jdbcRegistryDataMapper.insert(jdbcRegistryData);
@ -89,9 +89,9 @@ public class JdbcOperator {
return id; return id;
} }
jdbcRegistryData = JdbcRegistryData.builder() jdbcRegistryData = JdbcRegistryData.builder()
.key(key) .dataKey(key)
.data(value) .dataValue(value)
.type(DataType.PERSISTENT.getTypeValue()) .dataType(DataType.PERSISTENT.getTypeValue())
.lastTerm(System.currentTimeMillis()) .lastTerm(System.currentTimeMillis())
.build(); .build();
jdbcRegistryDataMapper.insert(jdbcRegistryData); jdbcRegistryDataMapper.insert(jdbcRegistryData);
@ -122,7 +122,7 @@ public class JdbcOperator {
public List<String> getChildren(String key) throws SQLException { public List<String> getChildren(String key) throws SQLException {
return jdbcRegistryDataMapper.fuzzyQueryByKey(key) return jdbcRegistryDataMapper.fuzzyQueryByKey(key)
.stream() .stream()
.map(JdbcRegistryData::getKey) .map(JdbcRegistryData::getDataKey)
.filter(fullPath -> fullPath.length() > key.length()) .filter(fullPath -> fullPath.length() > key.length())
.map(fullPath -> StringUtils.substringBefore(fullPath.substring(key.length() + 1), "/")) .map(fullPath -> StringUtils.substringBefore(fullPath.substring(key.length() + 1), "/"))
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -139,7 +139,7 @@ public class JdbcOperator {
@SuppressWarnings("checkstyle:IllegalCatch") @SuppressWarnings("checkstyle:IllegalCatch")
public JdbcRegistryLock tryToAcquireLock(String key) throws SQLException { public JdbcRegistryLock tryToAcquireLock(String key) throws SQLException {
JdbcRegistryLock jdbcRegistryLock = JdbcRegistryLock.builder() JdbcRegistryLock jdbcRegistryLock = JdbcRegistryLock.builder()
.key(key) .lockKey(key)
.lockOwner(JdbcRegistryConstant.LOCK_OWNER) .lockOwner(JdbcRegistryConstant.LOCK_OWNER)
.lastTerm(System.currentTimeMillis()) .lastTerm(System.currentTimeMillis())
.build(); .build();

10
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryDataMapper.java

@ -34,19 +34,19 @@ public interface JdbcRegistryDataMapper extends BaseMapper<JdbcRegistryData> {
@Select("select * from t_ds_jdbc_registry_data") @Select("select * from t_ds_jdbc_registry_data")
List<JdbcRegistryData> selectAll(); List<JdbcRegistryData> selectAll();
@Select("select * from t_ds_jdbc_registry_data where key = #{key}") @Select("select * from t_ds_jdbc_registry_data where data_key = #{key}")
JdbcRegistryData selectByKey(@Param("key") String key); JdbcRegistryData selectByKey(@Param("key") String key);
@Select("select * from t_ds_jdbc_registry_data where key like CONCAT (#{key}, '%')") @Select("select * from t_ds_jdbc_registry_data where data_key like CONCAT (#{key}, '%')")
List<JdbcRegistryData> fuzzyQueryByKey(@Param("key") String key); List<JdbcRegistryData> fuzzyQueryByKey(@Param("key") String key);
@Update("update t_ds_jdbc_registry_data set data = #{data}, last_term = #{term} where id = #{id}") @Update("update t_ds_jdbc_registry_data set data_value = #{data}, last_term = #{term} where id = #{id}")
int updateDataAndTermById(@Param("id") long id, @Param("data") String data, @Param("term") long term); int updateDataAndTermById(@Param("id") long id, @Param("data") String data, @Param("term") long term);
@Delete("delete from t_ds_jdbc_registry_data where key = #{key}") @Delete("delete from t_ds_jdbc_registry_data where data_key = #{key}")
void deleteByKey(@Param("key") String key); void deleteByKey(@Param("key") String key);
@Delete("delete from t_ds_jdbc_registry_data where last_term < #{term} and type = #{type}") @Delete("delete from t_ds_jdbc_registry_data where last_term < #{term} and data_type = #{type}")
void clearExpireEphemeralDate(@Param("term") long term, @Param("type") int type); void clearExpireEphemeralDate(@Param("term") long term, @Param("type") int type);
@Update({"<script>", @Update({"<script>",

13
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/JdbcRegistryData.java

@ -25,7 +25,6 @@ import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
@ -38,17 +37,11 @@ public class JdbcRegistryData {
@TableId(value = "id", type = IdType.AUTO) @TableId(value = "id", type = IdType.AUTO)
private Long id; private Long id;
@TableField(value = "key") private String dataKey;
private String key; private String dataValue;
@TableField(value = "data") private int dataType;
private String data;
@TableField(value = "type")
private int type;
@TableField(value = "last_term")
private long lastTerm; private long lastTerm;
@TableField(value = "create_time")
private Date createTime; private Date createTime;
@TableField(value = "last_time")
private Date lastUpdateTime; private Date lastUpdateTime;
} }

8
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/model/JdbcRegistryLock.java

@ -25,7 +25,6 @@ import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
@ -41,26 +40,21 @@ public class JdbcRegistryLock {
/** /**
* The lock key. * The lock key.
*/ */
@TableField(value = "key") private String lockKey;
private String key;
/** /**
* acquire lock host. * acquire lock host.
*/ */
@TableField(value = "lock_owner")
private String lockOwner; private String lockOwner;
/** /**
* The last term, if the (currentTime - lastTerm) > termExpire time, the lock will be expired. * The last term, if the (currentTime - lastTerm) > termExpire time, the lock will be expired.
*/ */
@TableField(value = "last_term")
private Long lastTerm; private Long lastTerm;
/** /**
* The lock last update time. * The lock last update time.
*/ */
@TableField(value = "last_update_time")
private Date lastUpdateTime; private Date lastUpdateTime;
/** /**
* The lock create time. * The lock create time.
*/ */
@TableField(value = "create_time")
private Date createTime; private Date createTime;
} }

8
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/SubscribeDataManager.java

@ -80,7 +80,7 @@ public class SubscribeDataManager implements AutoCloseable {
if (jdbcRegistryData == null) { if (jdbcRegistryData == null) {
return null; return null;
} }
return jdbcRegistryData.getData(); return jdbcRegistryData.getDataValue();
} }
@Override @Override
@ -102,7 +102,7 @@ public class SubscribeDataManager implements AutoCloseable {
try { try {
Map<String, JdbcRegistryData> currentJdbcDataMap = jdbcOperator.queryAllJdbcRegistryData() Map<String, JdbcRegistryData> currentJdbcDataMap = jdbcOperator.queryAllJdbcRegistryData()
.stream() .stream()
.collect(Collectors.toMap(JdbcRegistryData::getKey, Function.identity())); .collect(Collectors.toMap(JdbcRegistryData::getDataKey, Function.identity()));
// find the different // find the different
List<JdbcRegistryData> addedData = new ArrayList<>(); List<JdbcRegistryData> addedData = new ArrayList<>();
List<JdbcRegistryData> deletedData = new ArrayList<>(); List<JdbcRegistryData> deletedData = new ArrayList<>();
@ -143,9 +143,9 @@ public class SubscribeDataManager implements AutoCloseable {
List<SubscribeListener> subscribeListeners, List<SubscribeListener> subscribeListeners,
Event.Type type) { Event.Type type) {
for (JdbcRegistryData data : dataList) { for (JdbcRegistryData data : dataList) {
if (data.getKey().startsWith(subscribeKey)) { if (data.getDataKey().startsWith(subscribeKey)) {
subscribeListeners.forEach(subscribeListener -> subscribeListener subscribeListeners.forEach(subscribeListener -> subscribeListener
.notify(new Event(data.getKey(), data.getKey(), data.getData(), type))); .notify(new Event(data.getDataKey(), data.getDataKey(), data.getDataValue(), type)));
} }
} }
} }

12
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/resources/mysql_registry_init.sql

@ -21,14 +21,14 @@ DROP TABLE IF EXISTS `t_ds_jdbc_registry_data`;
CREATE TABLE `t_ds_jdbc_registry_data` CREATE TABLE `t_ds_jdbc_registry_data`
( (
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key', `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
`key` varchar(256) NOT NULL COMMENT 'key, like zookeeper node path', `data_key` varchar(256) NOT NULL COMMENT 'key, like zookeeper node path',
`data` text NOT NULL COMMENT 'data, like zookeeper node value', `data_value` text NOT NULL COMMENT 'data, like zookeeper node value',
`type` tinyint(4) NOT NULL COMMENT '1: ephemeral node, 2: persistent node', `data_type` tinyint(4) NOT NULL COMMENT '1: ephemeral node, 2: persistent node',
`last_term` bigint NOT NULL COMMENT 'last term time', `last_term` bigint NOT NULL COMMENT 'last term time',
`last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'last update time', `last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'last update time',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time', `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
unique (`key`) unique (`data_key`)
) ENGINE = InnoDB ) ENGINE = InnoDB
DEFAULT CHARSET = utf8; DEFAULT CHARSET = utf8;
@ -37,12 +37,12 @@ DROP TABLE IF EXISTS `t_ds_jdbc_registry_lock`;
CREATE TABLE `t_ds_jdbc_registry_lock` CREATE TABLE `t_ds_jdbc_registry_lock`
( (
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key', `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
`key` varchar(256) NOT NULL COMMENT 'lock path', `lock_key` varchar(256) NOT NULL COMMENT 'lock path',
`lock_owner` varchar(256) NOT NULL COMMENT 'the lock owner, ip_processId', `lock_owner` varchar(256) NOT NULL COMMENT 'the lock owner, ip_processId',
`last_term` bigint NOT NULL COMMENT 'last term time', `last_term` bigint NOT NULL COMMENT 'last term time',
`last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'last update time', `last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'last update time',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time', `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
unique (`key`) unique (`lock_key`)
) ENGINE = InnoDB ) ENGINE = InnoDB
DEFAULT CHARSET = utf8; DEFAULT CHARSET = utf8;

12
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/resources/postgresql_registry_init.sql

@ -20,15 +20,15 @@ create table t_ds_jdbc_registry_data
( (
id serial id serial
constraint t_ds_jdbc_registry_data_pk primary key, constraint t_ds_jdbc_registry_data_pk primary key,
key varchar not null, data_key varchar not null,
data text not null, data_value text not null,
type int4 not null, data_type int4 not null,
last_term bigint not null, last_term bigint not null,
last_update_time timestamp default current_timestamp not null, last_update_time timestamp default current_timestamp not null,
create_time timestamp default current_timestamp not null create_time timestamp default current_timestamp not null
); );
create unique index t_ds_jdbc_registry_data_key_uindex on t_ds_jdbc_registry_data (key); create unique index t_ds_jdbc_registry_data_key_uindex on t_ds_jdbc_registry_data (data_key);
DROP TABLE IF EXISTS t_ds_jdbc_registry_lock; DROP TABLE IF EXISTS t_ds_jdbc_registry_lock;
@ -36,10 +36,10 @@ create table t_ds_jdbc_registry_lock
( (
id serial id serial
constraint t_ds_jdbc_registry_lock_pk primary key, constraint t_ds_jdbc_registry_lock_pk primary key,
key varchar not null, lock_key varchar not null,
lock_owner varchar not null, lock_owner varchar not null,
last_term bigint not null, last_term bigint not null,
last_update_time timestamp default current_timestamp not null, last_update_time timestamp default current_timestamp not null,
create_time timestamp default current_timestamp not null create_time timestamp default current_timestamp not null
); );
create unique index t_ds_jdbc_registry_lock_key_uindex on t_ds_jdbc_registry_lock (key); create unique index t_ds_jdbc_registry_lock_key_uindex on t_ds_jdbc_registry_lock (lock_key);

Loading…
Cancel
Save