Browse Source

DfsBlockCache: Consolidate where ReadableChannel is opened

Opening a readable channel can be expensive and the number of channels
can be limited in DFS. Ensure that caller of
BlockBasedFile.readOneBlock() is responsible for opening and closing
the file, and that the ReadableChannel is reused in the request. As a side
effect, this makes the code easier to read, with better use of
try-with-resources.

The downside is that this means a readable channel is always opened, even
when the entire pack is already available for copying from cache. This
should be an acceptable cost: it's rare enough not to overload the server
and from a client latency perspective, the latency cost is in the noise
relative to the data transfer cost involved in a clone. If this turns out
to be a problem in practice, we can reintroduce that optimization in a
followup change.

Change-Id: I340428ee4bacd2dce019d5616ef12339a0c85f0b
Signed-off-by: Minh Thai <mthai@google.com>
Signed-off-by: Jonathan Nieder <jrn@google.com>
stable-5.3
Minh Thai 6 years ago committed by Jonathan Nieder
parent
commit
4207b8d6ae
  1. 14
      org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/BlockBasedFile.java
  2. 20
      org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsBlockCache.java
  3. 124
      org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsPackFile.java
  4. 4
      org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsReftable.java

14
org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/BlockBasedFile.java

@ -48,7 +48,6 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.text.MessageFormat; import java.text.MessageFormat;
import org.eclipse.jgit.annotations.Nullable;
import org.eclipse.jgit.errors.PackInvalidException; import org.eclipse.jgit.errors.PackInvalidException;
import org.eclipse.jgit.internal.storage.pack.PackExt; import org.eclipse.jgit.internal.storage.pack.PackExt;
@ -130,18 +129,18 @@ abstract class BlockBasedFile {
} }
DfsBlock getOrLoadBlock(long pos, DfsReader ctx) throws IOException { DfsBlock getOrLoadBlock(long pos, DfsReader ctx) throws IOException {
return cache.getOrLoad(this, pos, ctx, null); try (ReadableChannel rc = ctx.db.openFile(desc, ext)) {
return cache.getOrLoad(this, pos, ctx, () -> rc);
}
} }
DfsBlock readOneBlock(long pos, DfsReader ctx, DfsBlock readOneBlock(long pos, DfsReader ctx, ReadableChannel rc)
@Nullable ReadableChannel fileChannel) throws IOException { throws IOException {
if (invalid) if (invalid)
throw new PackInvalidException(getFileName()); throw new PackInvalidException(getFileName());
ctx.stats.readBlock++; ctx.stats.readBlock++;
long start = System.nanoTime(); long start = System.nanoTime();
ReadableChannel rc = fileChannel != null ? fileChannel
: ctx.db.openFile(desc, ext);
try { try {
int size = blockSize(rc); int size = blockSize(rc);
pos = (pos / size) * size; pos = (pos / size) * size;
@ -189,9 +188,6 @@ abstract class BlockBasedFile {
return new DfsBlock(key, pos, buf); return new DfsBlock(key, pos, buf);
} finally { } finally {
if (rc != fileChannel) {
rc.close();
}
ctx.stats.readBlockMicros += elapsedMicros(start); ctx.stats.readBlockMicros += elapsedMicros(start);
} }
} }

20
org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsBlockCache.java

@ -52,7 +52,6 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.LongStream; import java.util.stream.LongStream;
import org.eclipse.jgit.annotations.Nullable;
import org.eclipse.jgit.internal.JGitText; import org.eclipse.jgit.internal.JGitText;
import org.eclipse.jgit.internal.storage.pack.PackExt; import org.eclipse.jgit.internal.storage.pack.PackExt;
@ -375,13 +374,13 @@ public final class DfsBlockCache {
* @param ctx * @param ctx
* current thread's reader. * current thread's reader.
* @param fileChannel * @param fileChannel
* optional channel to read {@code pack}. * supplier for channel to read {@code pack}.
* @return the object reference. * @return the object reference.
* @throws IOException * @throws IOException
* the reference was not in the cache and could not be loaded. * the reference was not in the cache and could not be loaded.
*/ */
DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx, DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx,
@Nullable ReadableChannel fileChannel) throws IOException { ReadableChannelSupplier fileChannel) throws IOException {
final long requestedPosition = position; final long requestedPosition = position;
position = file.alignToBlock(position); position = file.alignToBlock(position);
@ -413,7 +412,8 @@ public final class DfsBlockCache {
getStat(statMiss, key).incrementAndGet(); getStat(statMiss, key).incrementAndGet();
boolean credit = true; boolean credit = true;
try { try {
v = file.readOneBlock(requestedPosition, ctx, fileChannel); v = file.readOneBlock(requestedPosition, ctx,
fileChannel.get());
credit = false; credit = false;
} finally { } finally {
if (credit) { if (credit) {
@ -749,4 +749,16 @@ public final class DfsBlockCache {
interface RefLoader<T> { interface RefLoader<T> {
Ref<T> load() throws IOException; Ref<T> load() throws IOException;
} }
/**
* Supplier for readable channel
*/
@FunctionalInterface
public interface ReadableChannelSupplier {
/**
* @return ReadableChannel
* @throws IOException
*/
ReadableChannel get() throws IOException;
}
} }

124
org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsPackFile.java

@ -438,99 +438,77 @@ public final class DfsPackFile extends BlockBasedFile {
return dstbuf; return dstbuf;
} }
void copyPackAsIs(PackOutputStream out, DfsReader ctx) void copyPackAsIs(PackOutputStream out, DfsReader ctx) throws IOException {
throws IOException {
// If the length hasn't been determined yet, pin to set it. // If the length hasn't been determined yet, pin to set it.
if (length == -1) { if (length == -1) {
ctx.pin(this, 0); ctx.pin(this, 0);
ctx.unpin(); ctx.unpin();
} }
if (cache.shouldCopyThroughCache(length)) { try (ReadableChannel rc = ctx.db.openFile(desc, PACK)) {
copyPackThroughCache(out, ctx); int sz = ctx.getOptions().getStreamPackBufferSize();
} else { if (sz > 0) {
copyPackBypassCache(out, ctx); rc.setReadAheadBytes(sz);
}
if (cache.shouldCopyThroughCache(length)) {
copyPackThroughCache(out, ctx, rc);
} else {
copyPackBypassCache(out, rc);
}
} }
} }
private void copyPackThroughCache(PackOutputStream out, DfsReader ctx) private void copyPackThroughCache(PackOutputStream out, DfsReader ctx,
throws IOException { ReadableChannel rc) throws IOException {
@SuppressWarnings("resource") // Explicitly closed in finally block long position = 12;
ReadableChannel rc = null; long remaining = length - (12 + 20);
try { while (0 < remaining) {
long position = 12; DfsBlock b = cache.getOrLoad(this, position, ctx, () -> rc);
long remaining = length - (12 + 20); int ptr = (int) (position - b.start);
while (0 < remaining) { int n = (int) Math.min(b.size() - ptr, remaining);
DfsBlock b; b.write(out, position, n);
if (rc != null) { position += n;
b = cache.getOrLoad(this, position, ctx, rc); remaining -= n;
} else { }
b = cache.get(key, alignToBlock(position)); }
if (b == null) {
rc = ctx.db.openFile(desc, PACK);
int sz = ctx.getOptions().getStreamPackBufferSize();
if (sz > 0) {
rc.setReadAheadBytes(sz);
}
b = cache.getOrLoad(this, position, ctx, rc);
}
}
private long copyPackBypassCache(PackOutputStream out, ReadableChannel rc)
throws IOException {
ByteBuffer buf = newCopyBuffer(out, rc);
long position = 12;
long remaining = length - (12 + 20);
boolean packHeadSkipped = false;
while (0 < remaining) {
DfsBlock b = cache.get(key, alignToBlock(position));
if (b != null) {
int ptr = (int) (position - b.start); int ptr = (int) (position - b.start);
int n = (int) Math.min(b.size() - ptr, remaining); int n = (int) Math.min(b.size() - ptr, remaining);
b.write(out, position, n); b.write(out, position, n);
position += n; position += n;
remaining -= n; remaining -= n;
rc.position(position);
packHeadSkipped = true;
continue;
} }
} finally {
if (rc != null) {
rc.close();
}
}
}
private long copyPackBypassCache(PackOutputStream out, DfsReader ctx) buf.position(0);
throws IOException { int n = read(rc, buf);
try (ReadableChannel rc = ctx.db.openFile(desc, PACK)) { if (n <= 0) {
ByteBuffer buf = newCopyBuffer(out, rc); throw packfileIsTruncated();
if (ctx.getOptions().getStreamPackBufferSize() > 0) { } else if (n > remaining) {
rc.setReadAheadBytes(ctx.getOptions().getStreamPackBufferSize()); n = (int) remaining;
} }
long position = 12;
long remaining = length - (12 + 20);
boolean packHeadSkipped = false;
while (0 < remaining) {
DfsBlock b = cache.get(key, alignToBlock(position));
if (b != null) {
int ptr = (int) (position - b.start);
int n = (int) Math.min(b.size() - ptr, remaining);
b.write(out, position, n);
position += n;
remaining -= n;
rc.position(position);
packHeadSkipped = true;
continue;
}
buf.position(0); if (!packHeadSkipped) {
int n = read(rc, buf); // Need skip the 'PACK' header for the first read
if (n <= 0) { out.write(buf.array(), 12, n - 12);
throw packfileIsTruncated(); packHeadSkipped = true;
} else if (n > remaining) { } else {
n = (int) remaining; out.write(buf.array(), 0, n);
}
if (!packHeadSkipped) {
// Need skip the 'PACK' header for the first read
out.write(buf.array(), 12, n - 12);
packHeadSkipped = true;
} else {
out.write(buf.array(), 0, n);
}
position += n;
remaining -= n;
} }
return position; position += n;
remaining -= n;
} }
return position;
} }
private ByteBuffer newCopyBuffer(PackOutputStream out, ReadableChannel rc) { private ByteBuffer newCopyBuffer(PackOutputStream out, ReadableChannel rc) {

4
org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/dfs/DfsReftable.java

@ -128,7 +128,9 @@ public class DfsReftable extends BlockBasedFile {
open().setReadAheadBytes(readAhead); open().setReadAheadBytes(readAhead);
} }
DfsBlock block = cache.getOrLoad(file, pos, ctx, ch); DfsBlock block = cache.getOrLoad(file, pos, ctx, () -> {
return open();
});
if (block.start == pos && block.size() >= cnt) { if (block.start == pos && block.size() >= cnt) {
return block.zeroCopyByteBuffer(cnt); return block.zeroCopyByteBuffer(cnt);
} }

Loading…
Cancel
Save