diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml
index 6c9f41d7a3..a7e78a11f7 100644
--- a/.github/workflows/unit-test.yml
+++ b/.github/workflows/unit-test.yml
@@ -76,7 +76,7 @@ jobs:
restore-keys: ${{ runner.os }}-maven-
- name: Run Unit tests
- run: ./mvnw clean verify -B -Dmaven.test.skip=false -Dspotless.skip=true -DskipUT=false -DskipIT=false
+ run: ./mvnw clean verify -B -Dmaven.test.skip=false -Dspotless.skip=true -DskipUT=false
- name: Upload coverage report to codecov
run: CODECOV_TOKEN="09c2663f-b091-4258-8a47-c981827eb29a" bash <(curl -s https://codecov.io/bash)
@@ -99,23 +99,11 @@ jobs:
-Dsonar.login=e4058004bc6be89decf558ac819aa1ecbee57682
-Dsonar.exclusions=,dolphinscheduler-ui/src/**/i18n/locale/*.js,dolphinscheduler-microbench/src/**/*
-Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
- -DskipUT=true -DskipIT=true
+ -DskipUT=true
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
- - name: Collect logs
- continue-on-error: true
- run: |
- mkdir -p ${LOG_DIR}
- docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml logs dolphinscheduler-postgresql > ${LOG_DIR}/db.txt
-
- - name: Upload logs
- uses: actions/upload-artifact@v2
- continue-on-error: true
- with:
- name: unit-test-logs
- path: ${LOG_DIR}
result:
name: Unit Test
runs-on: ubuntu-latest
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-aliyunVoice/src/test/resources/logback.xml b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-aliyunVoice/src/test/resources/logback.xml
new file mode 100644
index 0000000000..9a182a18ef
--- /dev/null
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-aliyunVoice/src/test/resources/logback.xml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-dingtalk/src/test/resources/logback.xml b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-dingtalk/src/test/resources/logback.xml
new file mode 100644
index 0000000000..9a182a18ef
--- /dev/null
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-dingtalk/src/test/resources/logback.xml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-email/src/test/resources/logback.xml b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-email/src/test/resources/logback.xml
new file mode 100644
index 0000000000..9a182a18ef
--- /dev/null
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-email/src/test/resources/logback.xml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-feishu/src/test/resources/logback.xml b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-feishu/src/test/resources/logback.xml
new file mode 100644
index 0000000000..9a182a18ef
--- /dev/null
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-feishu/src/test/resources/logback.xml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-http/src/test/resources/logback.xml b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-http/src/test/resources/logback.xml
new file mode 100644
index 0000000000..9a182a18ef
--- /dev/null
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-http/src/test/resources/logback.xml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-pagerduty/src/test/resources/logback.xml b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-pagerduty/src/test/resources/logback.xml
new file mode 100644
index 0000000000..9a182a18ef
--- /dev/null
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-pagerduty/src/test/resources/logback.xml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-prometheus/src/test/resources/logback.xml b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-prometheus/src/test/resources/logback.xml
new file mode 100644
index 0000000000..9a182a18ef
--- /dev/null
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-prometheus/src/test/resources/logback.xml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-script/src/test/resources/logback.xml b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-script/src/test/resources/logback.xml
new file mode 100644
index 0000000000..9a182a18ef
--- /dev/null
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-script/src/test/resources/logback.xml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-slack/src/test/resources/logback.xml b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-slack/src/test/resources/logback.xml
new file mode 100644
index 0000000000..9a182a18ef
--- /dev/null
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-slack/src/test/resources/logback.xml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-telegram/src/test/resources/logback.xml b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-telegram/src/test/resources/logback.xml
new file mode 100644
index 0000000000..9a182a18ef
--- /dev/null
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-telegram/src/test/resources/logback.xml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-webexteams/src/test/resources/logback.xml b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-webexteams/src/test/resources/logback.xml
new file mode 100644
index 0000000000..9a182a18ef
--- /dev/null
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-webexteams/src/test/resources/logback.xml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-wechat/src/test/resources/logback.xml b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-wechat/src/test/resources/logback.xml
new file mode 100644
index 0000000000..9a182a18ef
--- /dev/null
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-wechat/src/test/resources/logback.xml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/resources/logback.xml b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/resources/logback.xml
new file mode 100644
index 0000000000..9a182a18ef
--- /dev/null
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/resources/logback.xml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
diff --git a/dolphinscheduler-api/src/test/resources/application.yaml b/dolphinscheduler-api/src/test/resources/application.yaml
index 26536d631f..5eb7e1f8d7 100644
--- a/dolphinscheduler-api/src/test/resources/application.yaml
+++ b/dolphinscheduler-api/src/test/resources/application.yaml
@@ -44,6 +44,17 @@ mybatis-plus:
registry:
type: zookeeper
+ zookeeper:
+ namespace: dolphinscheduler
+ connect-string: localhost:2181
+ retry-policy:
+ base-sleep-time: 60ms
+ max-sleep: 300ms
+ max-retries: 5
+ session-timeout: 30s
+ connection-timeout: 9s
+ block-until-connected: 600ms
+ digest: ~
api:
audit-enable: true
diff --git a/dolphinscheduler-api/src/test/resources/logback-spring.xml b/dolphinscheduler-api/src/test/resources/logback.xml
similarity index 100%
rename from dolphinscheduler-api/src/test/resources/logback-spring.xml
rename to dolphinscheduler-api/src/test/resources/logback.xml
diff --git a/dolphinscheduler-bom/pom.xml b/dolphinscheduler-bom/pom.xml
index 18e75de57b..10c4f0e4a1 100644
--- a/dolphinscheduler-bom/pom.xml
+++ b/dolphinscheduler-bom/pom.xml
@@ -37,6 +37,7 @@
1.2.20
2.12.0
0.5.11
+
0.7.1
1.41.0
1.11
@@ -943,6 +944,13 @@
+
+ org.testcontainers
+ testcontainers
+ ${testcontainer.version}
+ test
+
+
org.testcontainers
mysql
diff --git a/dolphinscheduler-common/src/test/resources/logback.xml b/dolphinscheduler-common/src/test/resources/logback.xml
new file mode 100644
index 0000000000..9a182a18ef
--- /dev/null
+++ b/dolphinscheduler-common/src/test/resources/logback.xml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
diff --git a/dolphinscheduler-data-quality/src/test/resources/logback.xml b/dolphinscheduler-data-quality/src/test/resources/logback.xml
new file mode 100644
index 0000000000..9a182a18ef
--- /dev/null
+++ b/dolphinscheduler-data-quality/src/test/resources/logback.xml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
index 86b82a8fb6..f90ef1ea32 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
@@ -26,10 +26,20 @@ import java.util.Collection;
import lombok.NonNull;
/**
- * Registry
+ * The SPI interface for registry center, each registry plugin should implement this interface.
*/
public interface Registry extends Closeable {
+ /**
+ * Start the registry, once started, the registry will connect to the registry center.
+ */
+ void start();
+
+ /**
+ * Whether the registry is connected
+ *
+ * @return true if connected, false otherwise.
+ */
boolean isConnected();
/**
@@ -40,7 +50,13 @@ public interface Registry extends Closeable {
*/
void connectUntilTimeout(@NonNull Duration timeout) throws RegistryException;
- boolean subscribe(String path, SubscribeListener listener);
+ /**
+ * Subscribe the path, when the path has expose {@link Event}, the listener will be triggered.
+ *
+ * @param path the path to subscribe
+ * @param listener the listener to be triggered
+ */
+ void subscribe(String path, SubscribeListener listener);
/**
* Remove the path from the subscribe list.
@@ -53,35 +69,34 @@ public interface Registry extends Closeable {
void addConnectionStateListener(ConnectionListener listener);
/**
- * @return the value
+ * Get the value of the key, if key not exist will throw {@link RegistryException}
*/
- String get(String key);
+ String get(String key) throws RegistryException;
/**
- * @param key
- * @param value
+ * Put the key-value pair into the registry
+ *
+ * @param key the key, cannot be null
+ * @param value the value, cannot be null
* @param deleteOnDisconnect if true, when the connection state is disconnected, the key will be deleted
*/
void put(String key, String value, boolean deleteOnDisconnect);
/**
- * This function will delete the keys whose prefix is {@param key}
- *
- * @param key the prefix of deleted key
- * @throws if the key not exists, there is a registryException
+ * Delete the key from the registry
*/
void delete(String key);
/**
- * @return {@code true} if key exists.
- * E.g: registry contains the following keys:[/test/test1/test2,]
- * if the key: /test
- * Return: test1
+ * Return the children of the key
*/
Collection children(String key);
/**
- * @return if key exists,return true
+ * Check if the key exists
+ *
+ * @param key the key to check
+ * @return true if the key exists
*/
boolean exists(String key);
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/pom.xml
index b084db1ccf..0f5c4d1494 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/pom.xml
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/pom.xml
@@ -31,6 +31,15 @@
org.apache.dolphinscheduler
dolphinscheduler-registry-api
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-registry-it
+ ${project.version}
+ test-jar
+ test
+
+
io.etcd
jetcd-core
@@ -49,18 +58,22 @@
+
+
+ io.netty
+ netty-all
+
+
io.etcd
jetcd-test
test
+
- io.netty
- netty-all
-
-
- org.slf4j
- slf4j-api
+ org.springframework.boot
+ spring-boot-starter-test
+ test
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
index 24a462c03f..80279775ff 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
@@ -71,6 +71,7 @@ import io.netty.handler.ssl.SslContext;
@Slf4j
public class EtcdRegistry implements Registry {
+ private final EtcdRegistryProperties etcdRegistryProperties;
private final Client client;
private EtcdConnectionStateListener etcdConnectionStateListener;
@@ -83,9 +84,8 @@ public class EtcdRegistry implements Registry {
private final Map watcherMap = new ConcurrentHashMap<>();
- private static final long TIME_TO_LIVE_SECONDS = 30L;
-
public EtcdRegistry(EtcdRegistryProperties registryProperties) throws SSLException {
+ this.etcdRegistryProperties = registryProperties;
ClientBuilder clientBuilder = Client.builder()
.endpoints(Util.toURIs(Splitter.on(",").trimResults().splitToList(registryProperties.getEndpoints())))
.namespace(byteSequence(registryProperties.getNamespace()))
@@ -129,6 +129,11 @@ public class EtcdRegistry implements Registry {
}
+ @Override
+ public void start() {
+ // The start has been set in the constructor
+ }
+
@Override
public boolean isConnected() {
return client.getKVClient().get(byteSequence("/")).join() != null;
@@ -145,7 +150,7 @@ public class EtcdRegistry implements Registry {
* @return if subcribe Returns true if no exception was thrown
*/
@Override
- public boolean subscribe(String path, SubscribeListener listener) {
+ public void subscribe(String path, SubscribeListener listener) {
try {
ByteSequence watchKey = byteSequence(path);
WatchOption watchOption =
@@ -159,7 +164,6 @@ public class EtcdRegistry implements Registry {
} catch (Exception e) {
throw new RegistryException("Failed to subscribe listener for key: " + path, e);
}
- return true;
}
/**
@@ -193,7 +197,7 @@ public class EtcdRegistry implements Registry {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RegistryException("etcd get data error", e);
- } catch (ExecutionException e) {
+ } catch (Exception e) {
throw new RegistryException("etcd get data error, key = " + key, e);
}
}
@@ -206,7 +210,8 @@ public class EtcdRegistry implements Registry {
try {
if (deleteOnDisconnect) {
// keep the key by lease, if disconnected, the lease will expire and the key will delete
- long leaseId = etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease(key, TIME_TO_LIVE_SECONDS);
+ long leaseId = etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease(key,
+ etcdRegistryProperties.getTtl().get(ChronoUnit.SECONDS));
PutOption putOption = PutOption.newBuilder().withLeaseId(leaseId).build();
client.getKVClient().put(byteSequence(key), byteSequence(value), putOption).get();
} else {
@@ -289,47 +294,59 @@ public class EtcdRegistry implements Registry {
*/
@Override
public boolean acquireLock(String key) {
+ Map leaseIdMap = threadLocalLockMap.get();
+ if (null == leaseIdMap) {
+ leaseIdMap = new HashMap<>();
+ threadLocalLockMap.set(leaseIdMap);
+ }
+ if (leaseIdMap.containsKey(key)) {
+ return true;
+ }
+
Lock lockClient = client.getLockClient();
Lease leaseClient = client.getLeaseClient();
// get the lock with a lease
try {
- long leaseId = leaseClient.grant(TIME_TO_LIVE_SECONDS).get().getID();
+ long leaseId = leaseClient.grant(etcdRegistryProperties.getTtl().get(ChronoUnit.SECONDS)).get().getID();
// keep the lease
client.getLeaseClient().keepAlive(leaseId, Observers.observer(response -> {
}));
lockClient.lock(byteSequence(key), leaseId).get();
// save the leaseId for release Lock
- if (null == threadLocalLockMap.get()) {
- threadLocalLockMap.set(new HashMap<>());
- }
- threadLocalLockMap.get().put(key, leaseId);
+ leaseIdMap.put(key, leaseId);
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RegistryException("etcd get lock error", e);
- } catch (ExecutionException e) {
+ } catch (Exception e) {
throw new RegistryException("etcd get lock error, lockKey: " + key, e);
}
}
@Override
public boolean acquireLock(String key, long timeout) {
+ Map leaseIdMap = threadLocalLockMap.get();
+ if (null == leaseIdMap) {
+ leaseIdMap = new HashMap<>();
+ threadLocalLockMap.set(leaseIdMap);
+ }
+ if (leaseIdMap.containsKey(key)) {
+ return true;
+ }
+
Lock lockClient = client.getLockClient();
Lease leaseClient = client.getLeaseClient();
// get the lock with a lease
try {
- long leaseId = leaseClient.grant(TIME_TO_LIVE_SECONDS).get().getID();
+ long leaseId = leaseClient.grant(etcdRegistryProperties.getTtl().get(ChronoUnit.SECONDS)).get().getID();
// keep the lease
- lockClient.lock(byteSequence(key), leaseId).get(timeout, TimeUnit.MICROSECONDS);
+ lockClient.lock(byteSequence(key), leaseId).get(timeout, TimeUnit.MILLISECONDS);
client.getLeaseClient().keepAlive(leaseId, Observers.observer(response -> {
}));
// save the leaseId for release Lock
- if (null == threadLocalLockMap.get()) {
- threadLocalLockMap.set(new HashMap<>());
- }
- threadLocalLockMap.get().put(key, leaseId);
+ leaseIdMap.put(key, leaseId);
return true;
} catch (TimeoutException timeoutException) {
log.debug("Acquire lock: {} in {}/ms timeout", key, timeout);
@@ -337,7 +354,7 @@ public class EtcdRegistry implements Registry {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RegistryException("etcd get lock error", e);
- } catch (ExecutionException e) {
+ } catch (Exception e) {
throw new RegistryException("etcd get lock error, lockKey: " + key, e);
}
}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryProperties.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryProperties.java
index babb6dea76..b748c2a0fe 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryProperties.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryProperties.java
@@ -33,6 +33,8 @@ public class EtcdRegistryProperties {
private String namespace = "dolphinscheduler";
private Duration connectionTimeout = Duration.ofSeconds(9);
+ private Duration ttl = Duration.ofSeconds(30);
+
// auth
private String user;
private String password;
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManagerTest.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManagerTest.java
index eb716378d0..84acbae8f3 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManagerTest.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManagerTest.java
@@ -43,7 +43,7 @@ class EtcdKeepAliveLeaseManagerTest {
.withNodes(1)
.withImage("ibmcom/etcd:3.2.24")
.build();
- server.restart();
+ server.cluster().start();
client = Client.builder().endpoints(server.clientEndpoints()).build();
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryTest.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryTest.java
deleted file mode 100644
index b99bab98ad..0000000000
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.registry.etcd;
-
-import org.apache.dolphinscheduler.registry.api.Event;
-import org.apache.dolphinscheduler.registry.api.SubscribeListener;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.etcd.jetcd.test.EtcdClusterExtension;
-
-public class EtcdRegistryTest {
-
- private static final Logger logger = LoggerFactory.getLogger(EtcdRegistryTest.class);
-
- public static EtcdRegistry registry;
-
- @BeforeAll
- public static void before() throws Exception {
- EtcdClusterExtension server = EtcdClusterExtension.builder()
- .withNodes(1)
- .withImage("ibmcom/etcd:3.2.24")
- .build();
- EtcdRegistryProperties properties = new EtcdRegistryProperties();
- server.restart();
- properties.setEndpoints(String.valueOf(server.clientEndpoints().get(0)));
- registry = new EtcdRegistry(properties);
- registry.put("/sub", "sub", false);
- }
-
- @Test
- public void persistTest() {
- registry.put("/nodes/m1", "", false);
- registry.put("/nodes/m2", "", false);
- Assertions.assertEquals(Arrays.asList("m1", "m2"), registry.children("/nodes"));
- Assertions.assertTrue(registry.exists("/nodes/m1"));
- registry.delete("/nodes/m2");
- Assertions.assertFalse(registry.exists("/nodes/m2"));
- registry.delete("/nodes");
- Assertions.assertFalse(registry.exists("/nodes/m1"));
- }
-
- @Test
- public void lockTest() {
- CountDownLatch preCountDownLatch = new CountDownLatch(1);
- CountDownLatch allCountDownLatch = new CountDownLatch(2);
- List testData = new ArrayList<>();
- new Thread(() -> {
- registry.acquireLock("/lock");
- preCountDownLatch.countDown();
- logger.info(Thread.currentThread().getName()
- + " :I got the lock, but I don't want to work. I want to rest for a while");
- try {
- Thread.sleep(1000);
- logger.info(Thread.currentThread().getName() + " :I'm going to start working");
- testData.add("thread1");
-
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- logger.info(Thread.currentThread().getName() + " :I have finished my work, now I release the lock");
- registry.releaseLock("/lock");
- allCountDownLatch.countDown();
- }
- }).start();
- try {
- preCountDownLatch.await(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- new Thread(() -> {
- try {
- logger.info(Thread.currentThread().getName() + " :I am trying to acquire the lock");
- registry.acquireLock("/lock");
- logger.info(Thread.currentThread().getName() + " :I got the lock and I started working");
-
- testData.add("thread2");
- } finally {
- registry.releaseLock("/lock");
- allCountDownLatch.countDown();
- }
-
- }).start();
- try {
- allCountDownLatch.await(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- Assertions.assertEquals(testData, Arrays.asList("thread1", "thread2"));
- }
-
- @Test
- public void subscribeTest() {
- boolean status = registry.subscribe("/sub", new TestListener());
- // The following add and delete operations are used for debugging
- registry.put("/sub/m1", "tt", false);
- registry.put("/sub/m2", "tt", false);
- registry.delete("/sub/m2");
- registry.delete("/sub");
- Assertions.assertTrue(status);
-
- }
-
- static class TestListener implements SubscribeListener {
-
- @Override
- public void notify(Event event) {
- logger.info("I'm test listener");
- }
- }
-
- @AfterAll
- public static void after() throws IOException {
- registry.close();
- }
-}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryTestCase.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryTestCase.java
new file mode 100644
index 0000000000..1e751c1862
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryTestCase.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.etcd;
+
+import org.apache.dolphinscheduler.plugin.registry.RegistryTestCase;
+
+import java.net.URI;
+import java.util.stream.Collectors;
+
+import lombok.SneakyThrows;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import io.etcd.jetcd.launcher.EtcdCluster;
+import io.etcd.jetcd.test.EtcdClusterExtension;
+
+@SpringBootTest(classes = EtcdRegistryProperties.class)
+@SpringBootApplication(scanBasePackageClasses = EtcdRegistryProperties.class)
+public class EtcdRegistryTestCase extends RegistryTestCase {
+
+ @Autowired
+ private EtcdRegistryProperties etcdRegistryProperties;
+
+ private static EtcdCluster etcdCluster;
+
+ @SneakyThrows
+ @BeforeAll
+ public static void setUpTestingServer() {
+ etcdCluster = EtcdClusterExtension.builder()
+ .withNodes(1)
+ .withImage("ibmcom/etcd:3.2.24")
+ .build()
+ .cluster();
+ etcdCluster.start();
+ System.setProperty("registry.endpoints",
+ etcdCluster.clientEndpoints().stream().map(URI::toString).collect(Collectors.joining(",")));
+ }
+
+ @SneakyThrows
+ @Override
+ public EtcdRegistry createRegistry() {
+ return new EtcdRegistry(etcdRegistryProperties);
+ }
+
+ @SneakyThrows
+ @AfterAll
+ public static void tearDownTestingServer() {
+ try (EtcdCluster cluster = etcdCluster) {
+ }
+ }
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/resources/application.yaml b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/resources/application.yaml
new file mode 100644
index 0000000000..083d38511c
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/resources/application.yaml
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+registry:
+ type: etcd
+ ttl: 2s
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/resources/logback.xml b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/resources/logback.xml
new file mode 100644
index 0000000000..6f211959c5
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/resources/logback.xml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/pom.xml
new file mode 100644
index 0000000000..7f4b97d3ef
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/pom.xml
@@ -0,0 +1,60 @@
+
+
+
+ 4.0.0
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-registry-plugins
+ dev-SNAPSHOT
+
+
+ dolphinscheduler-registry-it
+
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-registry-api
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+ false
+
+
+
+
+ test-jar
+
+
+
+
+
+
+
+
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
new file mode 100644
index 0000000000..8fbd6bc5c0
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import lombok.SneakyThrows;
+
+import org.assertj.core.util.Lists;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.truth.Truth;
+
+public abstract class RegistryTestCase {
+
+ protected R registry;
+
+ @BeforeEach
+ public void setupRegistry() {
+ registry = createRegistry();
+ }
+
+ @SneakyThrows
+ @AfterEach
+ public void tearDownRegistry() {
+ try (R registry = this.registry) {
+ }
+ }
+
+ @Test
+ public void testIsConnected() {
+ registry.start();
+ Truth.assertThat(registry.isConnected()).isTrue();
+ }
+
+ @Test
+ public void testConnectUntilTimeout() {
+ registry.start();
+ await().atMost(Duration.ofSeconds(10))
+ .untilAsserted(() -> registry.connectUntilTimeout(Duration.ofSeconds(3)));
+
+ }
+
+ @SneakyThrows
+ @Test
+ public void testSubscribe() {
+ registry.start();
+
+ final AtomicBoolean subscribeAdded = new AtomicBoolean(false);
+ final AtomicBoolean subscribeRemoved = new AtomicBoolean(false);
+ final AtomicBoolean subscribeUpdated = new AtomicBoolean(false);
+
+ SubscribeListener subscribeListener = event -> {
+ System.out.println("Receive event: " + event);
+ if (event.type() == Event.Type.ADD) {
+ subscribeAdded.compareAndSet(false, true);
+ }
+ if (event.type() == Event.Type.REMOVE) {
+ subscribeRemoved.compareAndSet(false, true);
+ }
+ if (event.type() == Event.Type.UPDATE) {
+ subscribeUpdated.compareAndSet(false, true);
+ }
+ };
+ String key = "/nodes/master" + System.nanoTime();
+ registry.subscribe(key, subscribeListener);
+ registry.put(key, String.valueOf(System.nanoTime()), true);
+ // Sleep 3 seconds here since in mysql jdbc registry
+ // If multiple event occurs in a refresh time, only the last event will be triggered
+ Thread.sleep(3000);
+ registry.put(key, String.valueOf(System.nanoTime()), true);
+ Thread.sleep(3000);
+ registry.delete(key);
+
+ await().atMost(Duration.ofSeconds(10))
+ .untilAsserted(() -> {
+ Assertions.assertTrue(subscribeAdded.get());
+ Assertions.assertTrue(subscribeUpdated.get());
+ Assertions.assertTrue(subscribeRemoved.get());
+ });
+ }
+
+ @SneakyThrows
+ @Test
+ public void testUnsubscribe() {
+ registry.start();
+
+ final AtomicBoolean subscribeAdded = new AtomicBoolean(false);
+ final AtomicBoolean subscribeRemoved = new AtomicBoolean(false);
+ final AtomicBoolean subscribeUpdated = new AtomicBoolean(false);
+
+ SubscribeListener subscribeListener = event -> {
+ if (event.type() == Event.Type.ADD) {
+ subscribeAdded.compareAndSet(false, true);
+ }
+ if (event.type() == Event.Type.REMOVE) {
+ subscribeRemoved.compareAndSet(false, true);
+ }
+ if (event.type() == Event.Type.UPDATE) {
+ subscribeUpdated.compareAndSet(false, true);
+ }
+ };
+ String key = "/nodes/master" + System.nanoTime();
+ String value = "127.0.0.1:8080";
+ registry.subscribe(key, subscribeListener);
+ registry.unsubscribe(key);
+ registry.put(key, value, true);
+ registry.put(key, value, true);
+ registry.delete(key);
+
+ Thread.sleep(2000);
+ Assertions.assertFalse(subscribeAdded.get());
+ Assertions.assertFalse(subscribeRemoved.get());
+ Assertions.assertFalse(subscribeUpdated.get());
+
+ }
+
+ @SneakyThrows
+ @Test
+ public void testAddConnectionStateListener() {
+
+ AtomicReference connectionState = new AtomicReference<>();
+ registry.addConnectionStateListener(connectionState::set);
+
+ Truth.assertThat(connectionState.get()).isNull();
+ registry.start();
+
+ await().atMost(Duration.ofSeconds(2))
+ .until(() -> ConnectionState.CONNECTED == connectionState.get());
+
+ }
+
+ @Test
+ public void testGet() {
+ registry.start();
+ String key = "/nodes/master" + System.nanoTime();
+ String value = "127.0.0.1:8080";
+ assertThrows(RegistryException.class, () -> registry.get(key));
+ registry.put(key, value, true);
+ Truth.assertThat(registry.get(key)).isEqualTo(value);
+ }
+
+ @Test
+ public void testPut() {
+ registry.start();
+ String key = "/nodes/master" + System.nanoTime();
+ String value = "127.0.0.1:8080";
+ registry.put(key, value, true);
+ Truth.assertThat(registry.get(key)).isEqualTo(value);
+
+ // Update the value
+ registry.put(key, "123", true);
+ Truth.assertThat(registry.get(key)).isEqualTo("123");
+ }
+
+ @Test
+ public void testDelete() {
+ registry.start();
+ String key = "/nodes/master" + System.nanoTime();
+ String value = "127.0.0.1:8080";
+ // Delete a non-existent key
+ registry.delete(key);
+
+ registry.put(key, value, true);
+ Truth.assertThat(registry.get(key)).isEqualTo(value);
+ registry.delete(key);
+ Truth.assertThat(registry.exists(key)).isFalse();
+
+ }
+
+ @Test
+ public void testChildren() {
+ registry.start();
+ String master1 = "/nodes/children/127.0.0.1:8080";
+ String master2 = "/nodes/children/127.0.0.2:8080";
+ String value = "123";
+ registry.put(master1, value, true);
+ registry.put(master2, value, true);
+ Truth.assertThat(registry.children("/nodes/children"))
+ .containsAtLeastElementsIn(Lists.newArrayList("127.0.0.1:8080", "127.0.0.2:8080"));
+ }
+
+ @Test
+ public void testExists() {
+ registry.start();
+ String key = "/nodes/master" + System.nanoTime();
+ String value = "123";
+ Truth.assertThat(registry.exists(key)).isFalse();
+ registry.put(key, value, true);
+ Truth.assertThat(registry.exists(key)).isTrue();
+
+ }
+
+ @SneakyThrows
+ @Test
+ public void testAcquireLock() {
+ registry.start();
+ String lockKey = "/lock" + System.nanoTime();
+
+ // 1. Acquire the lock at the main thread
+ Truth.assertThat(registry.acquireLock(lockKey)).isTrue();
+ // Acquire the lock at the main thread again
+ // It should acquire success
+ Truth.assertThat(registry.acquireLock(lockKey)).isTrue();
+
+ // Acquire the lock at another thread
+ // It should acquire failed
+ CompletableFuture acquireResult = CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey));
+ assertThrows(TimeoutException.class, () -> acquireResult.get(3000, TimeUnit.MILLISECONDS));
+
+ }
+
+ @SneakyThrows
+ @Test
+ public void testAcquireLock_withTimeout() {
+ registry.start();
+ String lockKey = "/lock" + System.nanoTime();
+ // 1. Acquire the lock in the main thread
+ Truth.assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
+
+ // Acquire the lock in the main thread
+ // It should acquire success
+ Truth.assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
+
+ // Acquire the lock at another thread
+ // It should acquire failed
+ CompletableFuture acquireResult =
+ CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000));
+ Truth.assertThat(acquireResult.get()).isFalse();
+
+ }
+
+ @SneakyThrows
+ @Test
+ public void testReleaseLock() {
+ registry.start();
+ String lockKey = "/lock" + System.nanoTime();
+ // 1. Acquire the lock in the main thread
+ Truth.assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
+
+ // Acquire the lock at another thread
+ // It should acquire failed
+ CompletableFuture acquireResult =
+ CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000));
+ Truth.assertThat(acquireResult.get()).isFalse();
+
+ // 2. Release the lock in the main thread
+ Truth.assertThat(registry.releaseLock(lockKey)).isTrue();
+
+ // Acquire the lock at another thread
+ // It should acquire success
+ acquireResult = CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000));
+ Truth.assertThat(acquireResult.get()).isTrue();
+ }
+
+ public abstract R createRegistry();
+
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/pom.xml
index d4285edfbd..aa592b9da4 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/pom.xml
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/pom.xml
@@ -72,6 +72,31 @@
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-registry-it
+ ${project.version}
+ test-jar
+ test
+
+
+
+ org.testcontainers
+ mysql
+
+
+
+ org.testcontainers
+ postgresql
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/EphemeralDateManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/EphemeralDateManager.java
similarity index 93%
rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/EphemeralDateManager.java
rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/EphemeralDateManager.java
index 64915e8ca8..7c601b91a1 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/EphemeralDateManager.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/EphemeralDateManager.java
@@ -15,12 +15,10 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.registry.jdbc.task;
+package org.apache.dolphinscheduler.plugin.registry.jdbc;
import static com.google.common.base.Preconditions.checkNotNull;
-import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcOperator;
-import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
@@ -42,7 +40,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* This thread is used to check the connect state to jdbc.
*/
@Slf4j
-public class EphemeralDateManager implements AutoCloseable {
+class EphemeralDateManager implements AutoCloseable {
private ConnectionState connectionState;
private final JdbcOperator jdbcOperator;
@@ -51,7 +49,7 @@ public class EphemeralDateManager implements AutoCloseable {
private final Set ephemeralDateIds = Collections.synchronizedSet(new HashSet<>());
private final ScheduledExecutorService scheduledExecutorService;
- public EphemeralDateManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) {
+ EphemeralDateManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) {
this.registryProperties = registryProperties;
this.jdbcOperator = checkNotNull(jdbcOperator);
this.scheduledExecutorService = Executors.newScheduledThreadPool(
@@ -151,7 +149,7 @@ public class EphemeralDateManager implements AutoCloseable {
}
}
- private void updateEphemeralDateTerm() throws SQLException {
+ private void updateEphemeralDateTerm() {
if (!jdbcOperator.updateEphemeralDataTerm(ephemeralDateIds)) {
log.warn("Update jdbc registry ephemeral data: {} term error", ephemeralDateIds);
}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcOperator.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcOperator.java
index a56d609da7..95f58a4a20 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcOperator.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcOperator.java
@@ -29,26 +29,25 @@ import org.apache.commons.lang3.StringUtils;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Collection;
+import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.stereotype.Component;
+import org.springframework.dao.DuplicateKeyException;
-@Component
-@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "jdbc")
-public class JdbcOperator {
+public final class JdbcOperator {
- @Autowired
- private JdbcRegistryDataMapper jdbcRegistryDataMapper;
- @Autowired
- private JdbcRegistryLockMapper jdbcRegistryLockMapper;
+ private final JdbcRegistryDataMapper jdbcRegistryDataMapper;
+ private final JdbcRegistryLockMapper jdbcRegistryLockMapper;
private final long expireTimeWindow;
- public JdbcOperator(JdbcRegistryProperties registryProperties) {
+ JdbcOperator(JdbcRegistryProperties registryProperties,
+ JdbcRegistryDataMapper jdbcRegistryDataMapper,
+ JdbcRegistryLockMapper jdbcRegistryLockMapper) {
this.expireTimeWindow =
registryProperties.getTermExpireTimes() * registryProperties.getTermRefreshInterval().toMillis();
+ this.jdbcRegistryDataMapper = jdbcRegistryDataMapper;
+ this.jdbcRegistryLockMapper = jdbcRegistryLockMapper;
}
public void healthCheck() {
@@ -62,17 +61,21 @@ public class JdbcOperator {
public Long insertOrUpdateEphemeralData(String key, String value) throws SQLException {
JdbcRegistryData jdbcRegistryData = jdbcRegistryDataMapper.selectByKey(key);
if (jdbcRegistryData != null) {
- long id = jdbcRegistryData.getId();
- if (jdbcRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) {
+ jdbcRegistryData.setDataValue(value);
+ jdbcRegistryData.setLastUpdateTime(new Date());
+ jdbcRegistryData.setLastTerm(System.currentTimeMillis());
+ if (jdbcRegistryDataMapper.updateById(jdbcRegistryData) <= 0) {
throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value));
}
- return id;
+ return jdbcRegistryData.getId();
}
jdbcRegistryData = JdbcRegistryData.builder()
.dataKey(key)
.dataValue(value)
.dataType(DataType.EPHEMERAL.getTypeValue())
.lastTerm(System.currentTimeMillis())
+ .lastUpdateTime(new Date())
+ .createTime(new Date())
.build();
jdbcRegistryDataMapper.insert(jdbcRegistryData);
return jdbcRegistryData.getId();
@@ -81,17 +84,21 @@ public class JdbcOperator {
public long insertOrUpdatePersistentData(String key, String value) throws SQLException {
JdbcRegistryData jdbcRegistryData = jdbcRegistryDataMapper.selectByKey(key);
if (jdbcRegistryData != null) {
- long id = jdbcRegistryData.getId();
- if (jdbcRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) {
+ jdbcRegistryData.setDataValue(value);
+ jdbcRegistryData.setLastUpdateTime(new Date());
+ jdbcRegistryData.setLastTerm(System.currentTimeMillis());
+ if (jdbcRegistryDataMapper.updateById(jdbcRegistryData) <= 0) {
throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value));
}
- return id;
+ return jdbcRegistryData.getId();
}
jdbcRegistryData = JdbcRegistryData.builder()
.dataKey(key)
.dataValue(value)
.dataType(DataType.PERSISTENT.getTypeValue())
.lastTerm(System.currentTimeMillis())
+ .lastUpdateTime(new Date())
+ .createTime(new Date())
.build();
jdbcRegistryDataMapper.insert(jdbcRegistryData);
return jdbcRegistryData.getId();
@@ -127,7 +134,7 @@ public class JdbcOperator {
.collect(Collectors.toList());
}
- public boolean existKey(String key) throws SQLException {
+ public boolean existKey(String key) {
JdbcRegistryData jdbcRegistryData = jdbcRegistryDataMapper.selectByKey(key);
return jdbcRegistryData != null;
}
@@ -136,24 +143,25 @@ public class JdbcOperator {
* Try to acquire the target Lock, if cannot acquire, return null.
*/
@SuppressWarnings("checkstyle:IllegalCatch")
- public JdbcRegistryLock tryToAcquireLock(String key) throws SQLException {
+ public JdbcRegistryLock tryToAcquireLock(String key) {
JdbcRegistryLock jdbcRegistryLock = JdbcRegistryLock.builder()
.lockKey(key)
- .lockOwner(JdbcRegistryConstant.LOCK_OWNER)
+ .lockOwner(LockUtils.getLockOwner())
.lastTerm(System.currentTimeMillis())
+ .lastUpdateTime(new Date())
.build();
try {
jdbcRegistryLockMapper.insert(jdbcRegistryLock);
return jdbcRegistryLock;
} catch (Exception e) {
- if (e instanceof SQLIntegrityConstraintViolationException) {
+ if (e instanceof SQLIntegrityConstraintViolationException || e instanceof DuplicateKeyException) {
return null;
}
throw e;
}
}
- public JdbcRegistryLock getLockById(long lockId) throws SQLException {
+ public JdbcRegistryLock getLockById(long lockId) {
return jdbcRegistryLockMapper.selectById(lockId);
}
@@ -161,7 +169,7 @@ public class JdbcOperator {
return jdbcRegistryLockMapper.deleteById(lockId) > 0;
}
- public boolean updateEphemeralDataTerm(Collection ephemeralDateIds) throws SQLException {
+ public boolean updateEphemeralDataTerm(Collection ephemeralDateIds) {
if (CollectionUtils.isEmpty(ephemeralDateIds)) {
return true;
}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java
index 12b29c34cc..2b7993c87b 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java
@@ -17,9 +17,7 @@
package org.apache.dolphinscheduler.plugin.registry.jdbc;
-import org.apache.dolphinscheduler.plugin.registry.jdbc.task.EphemeralDateManager;
-import org.apache.dolphinscheduler.plugin.registry.jdbc.task.RegistryLockManager;
-import org.apache.dolphinscheduler.plugin.registry.jdbc.task.SubscribeDataManager;
+import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryData;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.registry.api.Registry;
@@ -30,31 +28,24 @@ import java.sql.SQLException;
import java.time.Duration;
import java.util.Collection;
-import javax.annotation.PostConstruct;
-
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.stereotype.Component;
-
/**
* This is one of the implementation of {@link Registry}, with this implementation, you need to rely on mysql database to
* store the DolphinScheduler master/worker's metadata and do the server registry/unRegistry.
*/
-@Component
-@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "jdbc")
@Slf4j
-public class JdbcRegistry implements Registry {
+public final class JdbcRegistry implements Registry {
private final JdbcRegistryProperties jdbcRegistryProperties;
private final EphemeralDateManager ephemeralDateManager;
private final SubscribeDataManager subscribeDataManager;
private final RegistryLockManager registryLockManager;
- private JdbcOperator jdbcOperator;
+ private final JdbcOperator jdbcOperator;
- public JdbcRegistry(JdbcRegistryProperties jdbcRegistryProperties,
- JdbcOperator jdbcOperator) {
+ JdbcRegistry(JdbcRegistryProperties jdbcRegistryProperties,
+ JdbcOperator jdbcOperator) {
this.jdbcOperator = jdbcOperator;
jdbcOperator.clearExpireLock();
jdbcOperator.clearExpireEphemeralDate();
@@ -65,7 +56,7 @@ public class JdbcRegistry implements Registry {
log.info("Initialize Jdbc Registry...");
}
- @PostConstruct
+ @Override
public void start() {
log.info("Starting Jdbc Registry...");
// start a jdbc connect check
@@ -103,10 +94,9 @@ public class JdbcRegistry implements Registry {
}
@Override
- public boolean subscribe(String path, SubscribeListener listener) {
+ public void subscribe(String path, SubscribeListener listener) {
// new a schedule thread to query the path, if the path
subscribeDataManager.addListener(path, listener);
- return true;
}
@Override
@@ -122,8 +112,18 @@ public class JdbcRegistry implements Registry {
@Override
public String get(String key) {
- // get the key value
- return subscribeDataManager.getData(key);
+ try {
+ // get the key value
+ JdbcRegistryData data = jdbcOperator.getData(key);
+ if (data == null) {
+ throw new RegistryException("key: " + key + " not exist");
+ }
+ return data.getDataValue();
+ } catch (RegistryException registryException) {
+ throw registryException;
+ } catch (Exception e) {
+ throw new RegistryException(String.format("Get key: %s error", key), e);
+ }
}
@Override
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryAutoConfiguration.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryAutoConfiguration.java
index f21ce0d67c..603a476322 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryAutoConfiguration.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryAutoConfiguration.java
@@ -49,6 +49,18 @@ public class JdbcRegistryAutoConfiguration {
log.info("Load JdbcRegistryAutoConfiguration");
}
+ @Bean
+ public JdbcOperator jdbcOperator(JdbcRegistryProperties jdbcRegistryProperties,
+ JdbcRegistryDataMapper jdbcRegistryDataMapper,
+ JdbcRegistryLockMapper jdbcRegistryLockMapper) {
+ return new JdbcOperator(jdbcRegistryProperties, jdbcRegistryDataMapper, jdbcRegistryLockMapper);
+ }
+
+ @Bean
+ public JdbcRegistry jdbcRegistry(JdbcRegistryProperties jdbcRegistryProperties, JdbcOperator jdbcOperator) {
+ return new JdbcRegistry(jdbcRegistryProperties, jdbcOperator);
+ }
+
@Bean
@ConditionalOnMissingBean
public SqlSessionFactory sqlSessionFactory(JdbcRegistryProperties jdbcRegistryProperties) throws Exception {
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConstant.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConstant.java
index 4a016f4d2e..84496afb80 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConstant.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConstant.java
@@ -17,15 +17,11 @@
package org.apache.dolphinscheduler.plugin.registry.jdbc;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
-
import lombok.experimental.UtilityClass;
@UtilityClass
-public final class JdbcRegistryConstant {
+final class JdbcRegistryConstant {
public static final long LOCK_ACQUIRE_INTERVAL = 1_000;
- public static final String LOCK_OWNER = NetUtils.getHost() + "_" + OSUtils.getProcessID();
}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/LockUtils.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/LockUtils.java
new file mode 100644
index 0000000000..f70f0afa5b
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/LockUtils.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.jdbc;
+
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
+public class LockUtils {
+
+ private static final String LOCK_OWNER_PREFIX = NetUtils.getHost() + "_" + OSUtils.getProcessID() + "_";
+
+ public static String getLockOwner() {
+ return LOCK_OWNER_PREFIX + Thread.currentThread().getName();
+ }
+
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/RegistryLockManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/RegistryLockManager.java
similarity index 73%
rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/RegistryLockManager.java
rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/RegistryLockManager.java
index b624b9e788..6c519685ff 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/RegistryLockManager.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/RegistryLockManager.java
@@ -15,12 +15,9 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.registry.jdbc.task;
+package org.apache.dolphinscheduler.plugin.registry.jdbc;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcOperator;
-import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryConstant;
-import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryLock;
import org.apache.dolphinscheduler.registry.api.RegistryException;
@@ -40,14 +37,15 @@ import lombok.extern.slf4j.Slf4j;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@Slf4j
-public class RegistryLockManager implements AutoCloseable {
+class RegistryLockManager implements AutoCloseable {
private final JdbcOperator jdbcOperator;
private final JdbcRegistryProperties registryProperties;
+ // lock owner -> lock
private final Map lockHoldMap;
private final ScheduledExecutorService lockTermUpdateThreadPool;
- public RegistryLockManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) {
+ RegistryLockManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) {
this.registryProperties = registryProperties;
this.jdbcOperator = jdbcOperator;
this.lockHoldMap = new ConcurrentHashMap<>();
@@ -67,20 +65,24 @@ public class RegistryLockManager implements AutoCloseable {
* Acquire the lock, if cannot get the lock will await.
*/
public void acquireLock(String lockKey) throws RegistryException {
- // maybe we can use the computeIf absent
- lockHoldMap.computeIfAbsent(lockKey, key -> {
- JdbcRegistryLock jdbcRegistryLock;
- try {
- while ((jdbcRegistryLock = jdbcOperator.tryToAcquireLock(lockKey)) == null) {
- log.debug("Acquire the lock {} failed try again", key);
- // acquire failed, wait and try again
- ThreadUtils.sleep(JdbcRegistryConstant.LOCK_ACQUIRE_INTERVAL);
+ try {
+ while (true) {
+ JdbcRegistryLock jdbcRegistryLock = lockHoldMap.get(lockKey);
+ if (jdbcRegistryLock != null && LockUtils.getLockOwner().equals(jdbcRegistryLock.getLockOwner())) {
+ return;
}
- } catch (SQLException e) {
- throw new RegistryException("Acquire the lock error", e);
+ jdbcRegistryLock = jdbcOperator.tryToAcquireLock(lockKey);
+ if (jdbcRegistryLock != null) {
+ lockHoldMap.put(lockKey, jdbcRegistryLock);
+ return;
+ }
+ log.debug("Acquire the lock {} failed try again", lockKey);
+ // acquire failed, wait and try again
+ ThreadUtils.sleep(JdbcRegistryConstant.LOCK_ACQUIRE_INTERVAL);
}
- return jdbcRegistryLock;
- });
+ } catch (Exception ex) {
+ throw new RegistryException("Acquire the lock: " + lockKey + " error", ex);
+ }
}
/**
@@ -88,21 +90,22 @@ public class RegistryLockManager implements AutoCloseable {
*/
public boolean acquireLock(String lockKey, long timeout) throws RegistryException {
long startTime = System.currentTimeMillis();
- while (System.currentTimeMillis() - startTime < timeout) {
- try {
- if (lockHoldMap.containsKey(lockKey)) {
+ try {
+ while (System.currentTimeMillis() - startTime < timeout) {
+ JdbcRegistryLock jdbcRegistryLock = lockHoldMap.get(lockKey);
+ if (jdbcRegistryLock != null && LockUtils.getLockOwner().equals(jdbcRegistryLock.getLockOwner())) {
return true;
}
- JdbcRegistryLock jdbcRegistryLock = jdbcOperator.tryToAcquireLock(lockKey);
+ jdbcRegistryLock = jdbcOperator.tryToAcquireLock(lockKey);
if (jdbcRegistryLock != null) {
lockHoldMap.put(lockKey, jdbcRegistryLock);
return true;
}
- } catch (SQLException e) {
- throw new RegistryException("Acquire the lock: " + lockKey + " error", e);
+ log.debug("Acquire the lock {} failed try again", lockKey);
+ ThreadUtils.sleep(JdbcRegistryConstant.LOCK_ACQUIRE_INTERVAL);
}
- log.debug("Acquire the lock {} failed try again", lockKey);
- ThreadUtils.sleep(JdbcRegistryConstant.LOCK_ACQUIRE_INTERVAL);
+ } catch (Exception e) {
+ throw new RegistryException("Acquire the lock: " + lockKey + " error", e);
}
return false;
}
@@ -115,6 +118,7 @@ public class RegistryLockManager implements AutoCloseable {
jdbcOperator.releaseLock(jdbcRegistryLock.getId());
lockHoldMap.remove(lockKey);
} catch (SQLException e) {
+ lockHoldMap.remove(lockKey);
throw new RegistryException(String.format("Release lock: %s error", lockKey), e);
}
}
@@ -149,7 +153,6 @@ public class RegistryLockManager implements AutoCloseable {
if (!jdbcOperator.updateLockTerm(lockIds)) {
log.warn("Update the lock: {} term failed.", lockIds);
}
- jdbcOperator.clearExpireLock();
} catch (Exception e) {
log.error("Update lock term error", e);
}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/SubscribeDataManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/SubscribeDataManager.java
similarity index 91%
rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/SubscribeDataManager.java
rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/SubscribeDataManager.java
index 4718b053f4..e86dc4b155 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/SubscribeDataManager.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/SubscribeDataManager.java
@@ -15,10 +15,8 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.registry.jdbc.task;
+package org.apache.dolphinscheduler.plugin.registry.jdbc;
-import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcOperator;
-import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryData;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
@@ -42,7 +40,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* Used to refresh if the subscribe path has been changed.
*/
@Slf4j
-public class SubscribeDataManager implements AutoCloseable {
+class SubscribeDataManager implements AutoCloseable {
private final JdbcOperator jdbcOperator;
private final JdbcRegistryProperties registryProperties;
@@ -50,7 +48,7 @@ public class SubscribeDataManager implements AutoCloseable {
private final ScheduledExecutorService dataSubscribeCheckThreadPool;
private final Map jdbcRegistryDataMap = new ConcurrentHashMap<>();
- public SubscribeDataManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) {
+ SubscribeDataManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) {
this.registryProperties = registryProperties;
this.jdbcOperator = jdbcOperator;
this.dataSubscribeCheckThreadPool = Executors.newScheduledThreadPool(
@@ -75,12 +73,8 @@ public class SubscribeDataManager implements AutoCloseable {
dataSubScribeMap.remove(path);
}
- public String getData(String path) {
- JdbcRegistryData jdbcRegistryData = jdbcRegistryDataMap.get(path);
- if (jdbcRegistryData == null) {
- return null;
- }
- return jdbcRegistryData.getDataValue();
+ public JdbcRegistryData getData(String path) {
+ return jdbcRegistryDataMap.get(path);
}
@Override
@@ -107,6 +101,7 @@ public class SubscribeDataManager implements AutoCloseable {
List addedData = new ArrayList<>();
List deletedData = new ArrayList<>();
List updatedData = new ArrayList<>();
+
for (Map.Entry entry : currentJdbcDataMap.entrySet()) {
JdbcRegistryData newData = entry.getValue();
JdbcRegistryData oldData = jdbcRegistryDataMap.get(entry.getKey());
@@ -118,6 +113,7 @@ public class SubscribeDataManager implements AutoCloseable {
}
}
}
+
for (Map.Entry entry : jdbcRegistryDataMap.entrySet()) {
if (!currentJdbcDataMap.containsKey(entry.getKey())) {
deletedData.add(entry.getValue());
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryDataMapper.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryDataMapper.java
index 701f2e7310..e1d27bbf0b 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryDataMapper.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryDataMapper.java
@@ -40,9 +40,6 @@ public interface JdbcRegistryDataMapper extends BaseMapper {
@Select("select * from t_ds_jdbc_registry_data where data_key like CONCAT (#{key}, '%')")
List fuzzyQueryByKey(@Param("key") String key);
- @Update("update t_ds_jdbc_registry_data set data_value = #{data}, last_term = #{term} where id = #{id}")
- int updateDataAndTermById(@Param("id") long id, @Param("data") String data, @Param("term") long term);
-
@Delete("delete from t_ds_jdbc_registry_data where data_key = #{key}")
void deleteByKey(@Param("key") String key);
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryLockMapper.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryLockMapper.java
index 2d11c90a24..0f529a8786 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryLockMapper.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryLockMapper.java
@@ -38,7 +38,7 @@ public interface JdbcRegistryLockMapper extends BaseMapper {
@Update({"