Browse Source

Fix JdbcRegistry will get duplicate children (#16348)

dev
Wenjun Ruan 3 months ago committed by GitHub
parent
commit
b54bfbe3b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 49
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
  2. 1
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java

49
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.plugin.registry; package org.apache.dolphinscheduler.plugin.registry;
import static com.google.common.truth.Truth.assertThat;
import static org.awaitility.Awaitility.await; import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
@ -35,14 +36,11 @@ import java.util.concurrent.atomic.AtomicReference;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import com.google.common.truth.Truth;
public abstract class RegistryTestCase<R extends Registry> { public abstract class RegistryTestCase<R extends Registry> {
protected R registry; protected R registry;
@ -62,7 +60,7 @@ public abstract class RegistryTestCase<R extends Registry> {
@Test @Test
public void testIsConnected() { public void testIsConnected() {
registry.start(); registry.start();
Truth.assertThat(registry.isConnected()).isTrue(); assertThat(registry.isConnected()).isTrue();
} }
@Test @Test
@ -119,7 +117,7 @@ public abstract class RegistryTestCase<R extends Registry> {
AtomicReference<ConnectionState> connectionState = new AtomicReference<>(); AtomicReference<ConnectionState> connectionState = new AtomicReference<>();
registry.addConnectionStateListener(connectionState::set); registry.addConnectionStateListener(connectionState::set);
Truth.assertThat(connectionState.get()).isNull(); assertThat(connectionState.get()).isNull();
registry.start(); registry.start();
await().atMost(Duration.ofSeconds(2)) await().atMost(Duration.ofSeconds(2))
@ -134,7 +132,7 @@ public abstract class RegistryTestCase<R extends Registry> {
String value = "127.0.0.1:8080"; String value = "127.0.0.1:8080";
assertThrows(RegistryException.class, () -> registry.get(key)); assertThrows(RegistryException.class, () -> registry.get(key));
registry.put(key, value, true); registry.put(key, value, true);
Truth.assertThat(registry.get(key)).isEqualTo(value); assertThat(registry.get(key)).isEqualTo(value);
} }
@Test @Test
@ -143,11 +141,11 @@ public abstract class RegistryTestCase<R extends Registry> {
String key = "/nodes/master" + System.nanoTime(); String key = "/nodes/master" + System.nanoTime();
String value = "127.0.0.1:8080"; String value = "127.0.0.1:8080";
registry.put(key, value, true); registry.put(key, value, true);
Truth.assertThat(registry.get(key)).isEqualTo(value); assertThat(registry.get(key)).isEqualTo(value);
// Update the value // Update the value
registry.put(key, "123", true); registry.put(key, "123", true);
Truth.assertThat(registry.get(key)).isEqualTo("123"); assertThat(registry.get(key)).isEqualTo("123");
} }
@Test @Test
@ -159,22 +157,23 @@ public abstract class RegistryTestCase<R extends Registry> {
registry.delete(key); registry.delete(key);
registry.put(key, value, true); registry.put(key, value, true);
Truth.assertThat(registry.get(key)).isEqualTo(value); assertThat(registry.get(key)).isEqualTo(value);
registry.delete(key); registry.delete(key);
Truth.assertThat(registry.exists(key)).isFalse(); assertThat(registry.exists(key)).isFalse();
} }
@Test @Test
public void testChildren() { public void testChildren() {
registry.start(); registry.start();
String master1 = "/nodes/children/127.0.0.1:8080"; String master1 = "/nodes/children/childGroup1/127.0.0.1:8080";
String master2 = "/nodes/children/127.0.0.2:8080"; String master2 = "/nodes/children/childGroup1/127.0.0.2:8080";
String value = "123"; String value = "123";
registry.put(master1, value, true); registry.put(master1, value, true);
registry.put(master2, value, true); registry.put(master2, value, true);
Truth.assertThat(registry.children("/nodes/children")) assertThat(registry.children("/nodes/children")).containsExactly("childGroup1");
.containsAtLeastElementsIn(Lists.newArrayList("127.0.0.1:8080", "127.0.0.2:8080")); assertThat(registry.children("/nodes/children/childGroup1")).containsExactly("127.0.0.1:8080",
"127.0.0.2:8080");
} }
@Test @Test
@ -182,9 +181,9 @@ public abstract class RegistryTestCase<R extends Registry> {
registry.start(); registry.start();
String key = "/nodes/master" + System.nanoTime(); String key = "/nodes/master" + System.nanoTime();
String value = "123"; String value = "123";
Truth.assertThat(registry.exists(key)).isFalse(); assertThat(registry.exists(key)).isFalse();
registry.put(key, value, true); registry.put(key, value, true);
Truth.assertThat(registry.exists(key)).isTrue(); assertThat(registry.exists(key)).isTrue();
} }
@ -195,10 +194,10 @@ public abstract class RegistryTestCase<R extends Registry> {
String lockKey = "/lock" + System.nanoTime(); String lockKey = "/lock" + System.nanoTime();
// 1. Acquire the lock at the main thread // 1. Acquire the lock at the main thread
Truth.assertThat(registry.acquireLock(lockKey)).isTrue(); assertThat(registry.acquireLock(lockKey)).isTrue();
// Acquire the lock at the main thread again // Acquire the lock at the main thread again
// It should acquire success // It should acquire success
Truth.assertThat(registry.acquireLock(lockKey)).isTrue(); assertThat(registry.acquireLock(lockKey)).isTrue();
// Acquire the lock at another thread // Acquire the lock at another thread
// It should acquire failed // It should acquire failed
@ -213,17 +212,17 @@ public abstract class RegistryTestCase<R extends Registry> {
registry.start(); registry.start();
String lockKey = "/lock" + System.nanoTime(); String lockKey = "/lock" + System.nanoTime();
// 1. Acquire the lock in the main thread // 1. Acquire the lock in the main thread
Truth.assertThat(registry.acquireLock(lockKey, 3000)).isTrue(); assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
// Acquire the lock in the main thread // Acquire the lock in the main thread
// It should acquire success // It should acquire success
Truth.assertThat(registry.acquireLock(lockKey, 3000)).isTrue(); assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
// Acquire the lock at another thread // Acquire the lock at another thread
// It should acquire failed // It should acquire failed
CompletableFuture<Boolean> acquireResult = CompletableFuture<Boolean> acquireResult =
CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000)); CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000));
Truth.assertThat(acquireResult.get()).isFalse(); assertThat(acquireResult.get()).isFalse();
} }
@ -233,21 +232,21 @@ public abstract class RegistryTestCase<R extends Registry> {
registry.start(); registry.start();
String lockKey = "/lock" + System.nanoTime(); String lockKey = "/lock" + System.nanoTime();
// 1. Acquire the lock in the main thread // 1. Acquire the lock in the main thread
Truth.assertThat(registry.acquireLock(lockKey, 3000)).isTrue(); assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
// Acquire the lock at another thread // Acquire the lock at another thread
// It should acquire failed // It should acquire failed
CompletableFuture<Boolean> acquireResult = CompletableFuture<Boolean> acquireResult =
CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000)); CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000));
Truth.assertThat(acquireResult.get()).isFalse(); assertThat(acquireResult.get()).isFalse();
// 2. Release the lock in the main thread // 2. Release the lock in the main thread
Truth.assertThat(registry.releaseLock(lockKey)).isTrue(); assertThat(registry.releaseLock(lockKey)).isTrue();
// Acquire the lock at another thread // Acquire the lock at another thread
// It should acquire success // It should acquire success
acquireResult = CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000)); acquireResult = CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000));
Truth.assertThat(acquireResult.get()).isTrue(); assertThat(acquireResult.get()).isTrue();
} }
public abstract R createRegistry(); public abstract R createRegistry();

1
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java

@ -212,6 +212,7 @@ public final class JdbcRegistry implements Registry {
.map(JdbcRegistryDataDTO::getDataKey) .map(JdbcRegistryDataDTO::getDataKey)
.filter(fullPath -> fullPath.length() > key.length()) .filter(fullPath -> fullPath.length() > key.length())
.map(fullPath -> StringUtils.substringBefore(fullPath.substring(key.length() + 1), "/")) .map(fullPath -> StringUtils.substringBefore(fullPath.substring(key.length() + 1), "/"))
.distinct()
.collect(Collectors.toList()); .collect(Collectors.toList());
} catch (Exception e) { } catch (Exception e) {
throw new RegistryException(String.format("Get key: %s children error", key), e); throw new RegistryException(String.format("Get key: %s children error", key), e);

Loading…
Cancel
Save