Browse Source

[Fix-16650] Memory leaks in KubernetesApplicationManager (#16652)

dev
Terry Tao 2 months ago committed by GitHub
parent
commit
5e179136d9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
  2. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
  3. 11
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtilsTest.java

2
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java

@ -319,6 +319,8 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
Thread.currentThread().interrupt();
result.setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
} finally {
ProcessUtils.removeK8sClientCache(taskRequest.getTaskAppId());
}
return result;
}

6
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java

@ -243,4 +243,10 @@ public final class ProcessUtils {
.getPodLogWatcher(
new KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId, containerName));
}
public static void removeK8sClientCache(String taskAppId) {
KubernetesApplicationManager applicationManager =
(KubernetesApplicationManager) applicationManagerMap.get(ResourceManagerType.KUBERNETES);
applicationManager.removeCache(taskAppId);
}
}

11
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtilsTest.java

@ -84,4 +84,15 @@ public class ProcessUtilsTest {
Assertions.assertEquals(exceptPidsStr3, actualPidsStr3);
}
@Test
public void tetRemoveK8sClientCache() {
Assertions.assertDoesNotThrow(() -> {
ProcessUtils.removeK8sClientCache("a");
});
Assertions.assertThrows(Exception.class, () -> {
ProcessUtils.removeK8sClientCache(null);
});
}
}

Loading…
Cancel
Save