diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManager.java new file mode 100644 index 0000000000..db34592bab --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManager.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.registry.etcd; + +import org.apache.dolphinscheduler.registry.api.RegistryException; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +import lombok.extern.slf4j.Slf4j; +import io.etcd.jetcd.Client; +import io.etcd.jetcd.lease.LeaseKeepAliveResponse; +import io.grpc.stub.StreamObserver; + +@Slf4j +public class EtcdKeepAliveLeaseManager { + + private final Map keyLeaseCache = new ConcurrentHashMap<>(); + + private final Client client; + + EtcdKeepAliveLeaseManager(Client client) { + this.client = client; + } + + long getOrCreateKeepAliveLease(String key, long timeToLive) { + return keyLeaseCache.computeIfAbsent(key, $ -> { + try { + long leaseId = client.getLeaseClient().grant(timeToLive).get().getID(); + client.getLeaseClient().keepAlive(leaseId, new StreamObserver() { + + @Override + public void onNext(LeaseKeepAliveResponse value) { + } + + @Override + public void onError(Throwable t) { + log.error("Lease {} keep alive error, remove cache with key:{}", leaseId, key, t); + keyLeaseCache.remove(key); + } + + @Override + public void onCompleted() { + log.error("Lease {} keep alive complete, remove cache with key:{}", leaseId, key); + keyLeaseCache.remove(key); + } + }); + log.info("Lease {} keep alive create with key:{}", leaseId, key); + return leaseId; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RegistryException("Failed to create lease key: " + key, e); + } catch (ExecutionException e) { + throw new RegistryException("Failed to create lease key: " + key, e); + } + }); + } + + Optional getKeepAliveLease(String key) { + return Optional.ofNullable(keyLeaseCache.get(key)); + } +} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java index 9348d9e027..6abc34696f 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java @@ -71,6 +71,9 @@ public class EtcdRegistry implements Registry { private static Logger LOGGER = LoggerFactory.getLogger(EtcdRegistry.class); private final Client client; private EtcdConnectionStateListener etcdConnectionStateListener; + + private EtcdKeepAliveLeaseManager etcdKeepAliveLeaseManager; + public static final String FOLDER_SEPARATOR = "/"; // save the lock info for thread // key:lockKey Value:leaseId @@ -101,6 +104,7 @@ public class EtcdRegistry implements Registry { client = clientBuilder.build(); LOGGER.info("Started Etcd Registry..."); etcdConnectionStateListener = new EtcdConnectionStateListener(client); + etcdKeepAliveLeaseManager = new EtcdKeepAliveLeaseManager(client); } /** @@ -186,9 +190,7 @@ public class EtcdRegistry implements Registry { try { if (deleteOnDisconnect) { // keep the key by lease, if disconnected, the lease will expire and the key will delete - long leaseId = client.getLeaseClient().grant(TIME_TO_LIVE_SECONDS).get().getID(); - client.getLeaseClient().keepAlive(leaseId, Observers.observer(response -> { - })); + long leaseId = etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease(key, TIME_TO_LIVE_SECONDS); PutOption putOption = PutOption.newBuilder().withLeaseId(leaseId).build(); client.getKVClient().put(byteSequence(key), byteSequence(value),putOption).get(); } else { diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManagerTest.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManagerTest.java new file mode 100644 index 0000000000..70593e4bad --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManagerTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.registry.etcd; + +import java.io.IOException; +import java.util.Optional; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import io.etcd.jetcd.Client; +import io.etcd.jetcd.launcher.EtcdCluster; +import io.etcd.jetcd.test.EtcdClusterExtension; + +class EtcdKeepAliveLeaseManagerTest { + + static EtcdClusterExtension server; + + static Client client; + + static EtcdKeepAliveLeaseManager etcdKeepAliveLeaseManager; + @BeforeAll + public static void before() throws Exception { + server = EtcdClusterExtension.builder() + .withNodes(1) + .withImage("ibmcom/etcd:3.2.24") + .build(); + server.restart(); + + client = Client.builder().endpoints(server.clientEndpoints()).build(); + + etcdKeepAliveLeaseManager = new EtcdKeepAliveLeaseManager(client); + } + + @Test + void getOrCreateKeepAliveLeaseTest() throws Exception { + long first = etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease("/test", 3); + long second = etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease("/test", 3); + Assertions.assertEquals(first, second); + + client.getLeaseClient().revoke(first).get(); + + // wait for lease expire + Thread.sleep(3000); + Optional keepAliveLease = etcdKeepAliveLeaseManager.getKeepAliveLease("/test"); + Assertions.assertFalse(keepAliveLease.isPresent()); + } + + @AfterAll + public static void after() throws IOException { + try (EtcdCluster closeServer = server.cluster()) { + client.close(); + } + } +}