From b54bfbe3b7fdea1d24ec806b2170d7a2d758e52a Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Sat, 20 Jul 2024 09:25:21 +0800 Subject: [PATCH] Fix JdbcRegistry will get duplicate children (#16348) --- .../plugin/registry/RegistryTestCase.java | 49 +++++++++---------- .../plugin/registry/jdbc/JdbcRegistry.java | 1 + 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java index 1edb0327f8..3d0c169e59 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.plugin.registry; +import static com.google.common.truth.Truth.assertThat; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -35,14 +36,11 @@ import java.util.concurrent.atomic.AtomicReference; import lombok.SneakyThrows; -import org.assertj.core.util.Lists; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import com.google.common.truth.Truth; - public abstract class RegistryTestCase { protected R registry; @@ -62,7 +60,7 @@ public abstract class RegistryTestCase { @Test public void testIsConnected() { registry.start(); - Truth.assertThat(registry.isConnected()).isTrue(); + assertThat(registry.isConnected()).isTrue(); } @Test @@ -119,7 +117,7 @@ public abstract class RegistryTestCase { AtomicReference connectionState = new AtomicReference<>(); registry.addConnectionStateListener(connectionState::set); - Truth.assertThat(connectionState.get()).isNull(); + assertThat(connectionState.get()).isNull(); registry.start(); await().atMost(Duration.ofSeconds(2)) @@ -134,7 +132,7 @@ public abstract class RegistryTestCase { String value = "127.0.0.1:8080"; assertThrows(RegistryException.class, () -> registry.get(key)); registry.put(key, value, true); - Truth.assertThat(registry.get(key)).isEqualTo(value); + assertThat(registry.get(key)).isEqualTo(value); } @Test @@ -143,11 +141,11 @@ public abstract class RegistryTestCase { String key = "/nodes/master" + System.nanoTime(); String value = "127.0.0.1:8080"; registry.put(key, value, true); - Truth.assertThat(registry.get(key)).isEqualTo(value); + assertThat(registry.get(key)).isEqualTo(value); // Update the value registry.put(key, "123", true); - Truth.assertThat(registry.get(key)).isEqualTo("123"); + assertThat(registry.get(key)).isEqualTo("123"); } @Test @@ -159,22 +157,23 @@ public abstract class RegistryTestCase { registry.delete(key); registry.put(key, value, true); - Truth.assertThat(registry.get(key)).isEqualTo(value); + assertThat(registry.get(key)).isEqualTo(value); registry.delete(key); - Truth.assertThat(registry.exists(key)).isFalse(); + assertThat(registry.exists(key)).isFalse(); } @Test public void testChildren() { registry.start(); - String master1 = "/nodes/children/127.0.0.1:8080"; - String master2 = "/nodes/children/127.0.0.2:8080"; + String master1 = "/nodes/children/childGroup1/127.0.0.1:8080"; + String master2 = "/nodes/children/childGroup1/127.0.0.2:8080"; String value = "123"; registry.put(master1, value, true); registry.put(master2, value, true); - Truth.assertThat(registry.children("/nodes/children")) - .containsAtLeastElementsIn(Lists.newArrayList("127.0.0.1:8080", "127.0.0.2:8080")); + assertThat(registry.children("/nodes/children")).containsExactly("childGroup1"); + assertThat(registry.children("/nodes/children/childGroup1")).containsExactly("127.0.0.1:8080", + "127.0.0.2:8080"); } @Test @@ -182,9 +181,9 @@ public abstract class RegistryTestCase { registry.start(); String key = "/nodes/master" + System.nanoTime(); String value = "123"; - Truth.assertThat(registry.exists(key)).isFalse(); + assertThat(registry.exists(key)).isFalse(); registry.put(key, value, true); - Truth.assertThat(registry.exists(key)).isTrue(); + assertThat(registry.exists(key)).isTrue(); } @@ -195,10 +194,10 @@ public abstract class RegistryTestCase { String lockKey = "/lock" + System.nanoTime(); // 1. Acquire the lock at the main thread - Truth.assertThat(registry.acquireLock(lockKey)).isTrue(); + assertThat(registry.acquireLock(lockKey)).isTrue(); // Acquire the lock at the main thread again // It should acquire success - Truth.assertThat(registry.acquireLock(lockKey)).isTrue(); + assertThat(registry.acquireLock(lockKey)).isTrue(); // Acquire the lock at another thread // It should acquire failed @@ -213,17 +212,17 @@ public abstract class RegistryTestCase { registry.start(); String lockKey = "/lock" + System.nanoTime(); // 1. Acquire the lock in the main thread - Truth.assertThat(registry.acquireLock(lockKey, 3000)).isTrue(); + assertThat(registry.acquireLock(lockKey, 3000)).isTrue(); // Acquire the lock in the main thread // It should acquire success - Truth.assertThat(registry.acquireLock(lockKey, 3000)).isTrue(); + assertThat(registry.acquireLock(lockKey, 3000)).isTrue(); // Acquire the lock at another thread // It should acquire failed CompletableFuture acquireResult = CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000)); - Truth.assertThat(acquireResult.get()).isFalse(); + assertThat(acquireResult.get()).isFalse(); } @@ -233,21 +232,21 @@ public abstract class RegistryTestCase { registry.start(); String lockKey = "/lock" + System.nanoTime(); // 1. Acquire the lock in the main thread - Truth.assertThat(registry.acquireLock(lockKey, 3000)).isTrue(); + assertThat(registry.acquireLock(lockKey, 3000)).isTrue(); // Acquire the lock at another thread // It should acquire failed CompletableFuture acquireResult = CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000)); - Truth.assertThat(acquireResult.get()).isFalse(); + assertThat(acquireResult.get()).isFalse(); // 2. Release the lock in the main thread - Truth.assertThat(registry.releaseLock(lockKey)).isTrue(); + assertThat(registry.releaseLock(lockKey)).isTrue(); // Acquire the lock at another thread // It should acquire success acquireResult = CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000)); - Truth.assertThat(acquireResult.get()).isTrue(); + assertThat(acquireResult.get()).isTrue(); } public abstract R createRegistry(); diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java index c8f205dafd..cc646e12e5 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java @@ -212,6 +212,7 @@ public final class JdbcRegistry implements Registry { .map(JdbcRegistryDataDTO::getDataKey) .filter(fullPath -> fullPath.length() > key.length()) .map(fullPath -> StringUtils.substringBefore(fullPath.substring(key.length() + 1), "/")) + .distinct() .collect(Collectors.toList()); } catch (Exception e) { throw new RegistryException(String.format("Get key: %s children error", key), e);