From 4207b8d6aee3a74f93de9d9540e265d6efaf43f2 Mon Sep 17 00:00:00 2001 From: Minh Thai Date: Tue, 18 Dec 2018 15:06:59 -0800 Subject: [PATCH] DfsBlockCache: Consolidate where ReadableChannel is opened Opening a readable channel can be expensive and the number of channels can be limited in DFS. Ensure that caller of BlockBasedFile.readOneBlock() is responsible for opening and closing the file, and that the ReadableChannel is reused in the request. As a side effect, this makes the code easier to read, with better use of try-with-resources. The downside is that this means a readable channel is always opened, even when the entire pack is already available for copying from cache. This should be an acceptable cost: it's rare enough not to overload the server and from a client latency perspective, the latency cost is in the noise relative to the data transfer cost involved in a clone. If this turns out to be a problem in practice, we can reintroduce that optimization in a followup change. Change-Id: I340428ee4bacd2dce019d5616ef12339a0c85f0b Signed-off-by: Minh Thai Signed-off-by: Jonathan Nieder --- .../internal/storage/dfs/BlockBasedFile.java | 14 +- .../internal/storage/dfs/DfsBlockCache.java | 20 ++- .../internal/storage/dfs/DfsPackFile.java | 124 +++++++----------- .../internal/storage/dfs/DfsReftable.java | 4 +- 4 files changed, 75 insertions(+), 87 deletions(-) diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/BlockBasedFile.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/BlockBasedFile.java index b9758bd64..600b1a44d 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/BlockBasedFile.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/BlockBasedFile.java @@ -48,7 +48,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.text.MessageFormat; -import org.eclipse.jgit.annotations.Nullable; import org.eclipse.jgit.errors.PackInvalidException; import org.eclipse.jgit.internal.storage.pack.PackExt; @@ -130,18 +129,18 @@ abstract class BlockBasedFile { } DfsBlock getOrLoadBlock(long pos, DfsReader ctx) throws IOException { - return cache.getOrLoad(this, pos, ctx, null); + try (ReadableChannel rc = ctx.db.openFile(desc, ext)) { + return cache.getOrLoad(this, pos, ctx, () -> rc); + } } - DfsBlock readOneBlock(long pos, DfsReader ctx, - @Nullable ReadableChannel fileChannel) throws IOException { + DfsBlock readOneBlock(long pos, DfsReader ctx, ReadableChannel rc) + throws IOException { if (invalid) throw new PackInvalidException(getFileName()); ctx.stats.readBlock++; long start = System.nanoTime(); - ReadableChannel rc = fileChannel != null ? fileChannel - : ctx.db.openFile(desc, ext); try { int size = blockSize(rc); pos = (pos / size) * size; @@ -189,9 +188,6 @@ abstract class BlockBasedFile { return new DfsBlock(key, pos, buf); } finally { - if (rc != fileChannel) { - rc.close(); - } ctx.stats.readBlockMicros += elapsedMicros(start); } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsBlockCache.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsBlockCache.java index a7c16de25..d9e9327d3 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsBlockCache.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsBlockCache.java @@ -52,7 +52,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.stream.LongStream; -import org.eclipse.jgit.annotations.Nullable; import org.eclipse.jgit.internal.JGitText; import org.eclipse.jgit.internal.storage.pack.PackExt; @@ -375,13 +374,13 @@ public final class DfsBlockCache { * @param ctx * current thread's reader. * @param fileChannel - * optional channel to read {@code pack}. + * supplier for channel to read {@code pack}. * @return the object reference. * @throws IOException * the reference was not in the cache and could not be loaded. */ DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx, - @Nullable ReadableChannel fileChannel) throws IOException { + ReadableChannelSupplier fileChannel) throws IOException { final long requestedPosition = position; position = file.alignToBlock(position); @@ -413,7 +412,8 @@ public final class DfsBlockCache { getStat(statMiss, key).incrementAndGet(); boolean credit = true; try { - v = file.readOneBlock(requestedPosition, ctx, fileChannel); + v = file.readOneBlock(requestedPosition, ctx, + fileChannel.get()); credit = false; } finally { if (credit) { @@ -749,4 +749,16 @@ public final class DfsBlockCache { interface RefLoader { Ref load() throws IOException; } + + /** + * Supplier for readable channel + */ + @FunctionalInterface + public interface ReadableChannelSupplier { + /** + * @return ReadableChannel + * @throws IOException + */ + ReadableChannel get() throws IOException; + } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsPackFile.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsPackFile.java index a7073f695..7d891b523 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsPackFile.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsPackFile.java @@ -438,99 +438,77 @@ public final class DfsPackFile extends BlockBasedFile { return dstbuf; } - void copyPackAsIs(PackOutputStream out, DfsReader ctx) - throws IOException { + void copyPackAsIs(PackOutputStream out, DfsReader ctx) throws IOException { // If the length hasn't been determined yet, pin to set it. if (length == -1) { ctx.pin(this, 0); ctx.unpin(); } - if (cache.shouldCopyThroughCache(length)) { - copyPackThroughCache(out, ctx); - } else { - copyPackBypassCache(out, ctx); + try (ReadableChannel rc = ctx.db.openFile(desc, PACK)) { + int sz = ctx.getOptions().getStreamPackBufferSize(); + if (sz > 0) { + rc.setReadAheadBytes(sz); + } + if (cache.shouldCopyThroughCache(length)) { + copyPackThroughCache(out, ctx, rc); + } else { + copyPackBypassCache(out, rc); + } } } - private void copyPackThroughCache(PackOutputStream out, DfsReader ctx) - throws IOException { - @SuppressWarnings("resource") // Explicitly closed in finally block - ReadableChannel rc = null; - try { - long position = 12; - long remaining = length - (12 + 20); - while (0 < remaining) { - DfsBlock b; - if (rc != null) { - b = cache.getOrLoad(this, position, ctx, rc); - } else { - b = cache.get(key, alignToBlock(position)); - if (b == null) { - rc = ctx.db.openFile(desc, PACK); - int sz = ctx.getOptions().getStreamPackBufferSize(); - if (sz > 0) { - rc.setReadAheadBytes(sz); - } - b = cache.getOrLoad(this, position, ctx, rc); - } - } + private void copyPackThroughCache(PackOutputStream out, DfsReader ctx, + ReadableChannel rc) throws IOException { + long position = 12; + long remaining = length - (12 + 20); + while (0 < remaining) { + DfsBlock b = cache.getOrLoad(this, position, ctx, () -> rc); + int ptr = (int) (position - b.start); + int n = (int) Math.min(b.size() - ptr, remaining); + b.write(out, position, n); + position += n; + remaining -= n; + } + } + private long copyPackBypassCache(PackOutputStream out, ReadableChannel rc) + throws IOException { + ByteBuffer buf = newCopyBuffer(out, rc); + long position = 12; + long remaining = length - (12 + 20); + boolean packHeadSkipped = false; + while (0 < remaining) { + DfsBlock b = cache.get(key, alignToBlock(position)); + if (b != null) { int ptr = (int) (position - b.start); int n = (int) Math.min(b.size() - ptr, remaining); b.write(out, position, n); position += n; remaining -= n; + rc.position(position); + packHeadSkipped = true; + continue; } - } finally { - if (rc != null) { - rc.close(); - } - } - } - private long copyPackBypassCache(PackOutputStream out, DfsReader ctx) - throws IOException { - try (ReadableChannel rc = ctx.db.openFile(desc, PACK)) { - ByteBuffer buf = newCopyBuffer(out, rc); - if (ctx.getOptions().getStreamPackBufferSize() > 0) { - rc.setReadAheadBytes(ctx.getOptions().getStreamPackBufferSize()); + buf.position(0); + int n = read(rc, buf); + if (n <= 0) { + throw packfileIsTruncated(); + } else if (n > remaining) { + n = (int) remaining; } - long position = 12; - long remaining = length - (12 + 20); - boolean packHeadSkipped = false; - while (0 < remaining) { - DfsBlock b = cache.get(key, alignToBlock(position)); - if (b != null) { - int ptr = (int) (position - b.start); - int n = (int) Math.min(b.size() - ptr, remaining); - b.write(out, position, n); - position += n; - remaining -= n; - rc.position(position); - packHeadSkipped = true; - continue; - } - buf.position(0); - int n = read(rc, buf); - if (n <= 0) { - throw packfileIsTruncated(); - } else if (n > remaining) { - n = (int) remaining; - } - - if (!packHeadSkipped) { - // Need skip the 'PACK' header for the first read - out.write(buf.array(), 12, n - 12); - packHeadSkipped = true; - } else { - out.write(buf.array(), 0, n); - } - position += n; - remaining -= n; + if (!packHeadSkipped) { + // Need skip the 'PACK' header for the first read + out.write(buf.array(), 12, n - 12); + packHeadSkipped = true; + } else { + out.write(buf.array(), 0, n); } - return position; + position += n; + remaining -= n; } + return position; } private ByteBuffer newCopyBuffer(PackOutputStream out, ReadableChannel rc) { diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsReftable.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsReftable.java index 7502471b0..a80797730 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsReftable.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsReftable.java @@ -128,7 +128,9 @@ public class DfsReftable extends BlockBasedFile { open().setReadAheadBytes(readAhead); } - DfsBlock block = cache.getOrLoad(file, pos, ctx, ch); + DfsBlock block = cache.getOrLoad(file, pos, ctx, () -> { + return open(); + }); if (block.start == pos && block.size() >= cnt) { return block.zeroCopyByteBuffer(cnt); }