Browse Source

[Fix-14008][registry] Fix etcd memory leak due to leaseId (#14034)

3.2.1-prepare
eye-gu 1 year ago committed by GitHub
parent
commit
d87a0d831c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 79
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManager.java
  2. 8
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
  3. 72
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManagerTest.java

79
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManager.java

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.etcd;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.grpc.stub.StreamObserver;
@Slf4j
public class EtcdKeepAliveLeaseManager {
private final Map<String, Long> keyLeaseCache = new ConcurrentHashMap<>();
private final Client client;
EtcdKeepAliveLeaseManager(Client client) {
this.client = client;
}
long getOrCreateKeepAliveLease(String key, long timeToLive) {
return keyLeaseCache.computeIfAbsent(key, $ -> {
try {
long leaseId = client.getLeaseClient().grant(timeToLive).get().getID();
client.getLeaseClient().keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
@Override
public void onNext(LeaseKeepAliveResponse value) {
}
@Override
public void onError(Throwable t) {
log.error("Lease {} keep alive error, remove cache with key:{}", leaseId, key, t);
keyLeaseCache.remove(key);
}
@Override
public void onCompleted() {
log.error("Lease {} keep alive complete, remove cache with key:{}", leaseId, key);
keyLeaseCache.remove(key);
}
});
log.info("Lease {} keep alive create with key:{}", leaseId, key);
return leaseId;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RegistryException("Failed to create lease key: " + key, e);
} catch (ExecutionException e) {
throw new RegistryException("Failed to create lease key: " + key, e);
}
});
}
Optional<Long> getKeepAliveLease(String key) {
return Optional.ofNullable(keyLeaseCache.get(key));
}
}

8
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java

@ -76,6 +76,9 @@ public class EtcdRegistry implements Registry {
private final Client client;
private EtcdConnectionStateListener etcdConnectionStateListener;
private EtcdKeepAliveLeaseManager etcdKeepAliveLeaseManager;
public static final String FOLDER_SEPARATOR = "/";
// save the lock info for thread
// key:lockKey Value:leaseId
@ -120,6 +123,7 @@ public class EtcdRegistry implements Registry {
client = clientBuilder.build();
log.info("Started Etcd Registry...");
etcdConnectionStateListener = new EtcdConnectionStateListener(client);
etcdKeepAliveLeaseManager = new EtcdKeepAliveLeaseManager(client);
}
/**
@ -206,9 +210,7 @@ public class EtcdRegistry implements Registry {
try {
if (deleteOnDisconnect) {
// keep the key by lease, if disconnected, the lease will expire and the key will delete
long leaseId = client.getLeaseClient().grant(TIME_TO_LIVE_SECONDS).get().getID();
client.getLeaseClient().keepAlive(leaseId, Observers.observer(response -> {
}));
long leaseId = etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease(key, TIME_TO_LIVE_SECONDS);
PutOption putOption = PutOption.newBuilder().withLeaseId(leaseId).build();
client.getKVClient().put(byteSequence(key), byteSequence(value), putOption).get();
} else {

72
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManagerTest.java

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.etcd;
import java.io.IOException;
import java.util.Optional;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.launcher.EtcdCluster;
import io.etcd.jetcd.test.EtcdClusterExtension;
class EtcdKeepAliveLeaseManagerTest {
static EtcdClusterExtension server;
static Client client;
static EtcdKeepAliveLeaseManager etcdKeepAliveLeaseManager;
@BeforeAll
public static void before() throws Exception {
server = EtcdClusterExtension.builder()
.withNodes(1)
.withImage("ibmcom/etcd:3.2.24")
.build();
server.restart();
client = Client.builder().endpoints(server.clientEndpoints()).build();
etcdKeepAliveLeaseManager = new EtcdKeepAliveLeaseManager(client);
}
@Test
void getOrCreateKeepAliveLeaseTest() throws Exception {
long first = etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease("/test", 3);
long second = etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease("/test", 3);
Assertions.assertEquals(first, second);
client.getLeaseClient().revoke(first).get();
// wait for lease expire
Thread.sleep(3000);
Optional<Long> keepAliveLease = etcdKeepAliveLeaseManager.getKeepAliveLease("/test");
Assertions.assertFalse(keepAliveLease.isPresent());
}
@AfterAll
public static void after() throws IOException {
try (EtcdCluster closeServer = server.cluster()) {
client.close();
}
}
}
Loading…
Cancel
Save