Browse Source

DHT: Change DhtReadher caches to be dynamic by workload

Instead of fixing the prefetch queue and recent chunk queue as
different sizes, allow these to share the same limit but be scaled
based on the work being performed.

During walks about 20% of the space will be given to the prefetcher,
and the other 80% will be used by the recent chunks cache. This
should improve cases where there is bad locality between chunks.

During writing of a pack stream, 90-100% of the space should be
made available to the prefetcher, as the prefetch plan is usually
very accurate about the order chunks will be needed in.

Change-Id: I1ca7acb4518e66eb9d4138fb753df38e7254704d
Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
stable-1.1
Shawn O. Pearce 14 years ago
parent
commit
d34ec12019
  1. 15
      org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtCachedPack.java
  2. 18
      org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtReader.java
  3. 121
      org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtReaderOptions.java
  4. 10
      org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/OpenQueue.java
  5. 4
      org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/Prefetcher.java
  6. 44
      org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/RecentChunks.java

15
org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtCachedPack.java

@ -125,9 +125,18 @@ public class DhtCachedPack extends CachedPack {
throws IOException {
if (keyList == null)
init();
Prefetcher p = new Prefetcher(ctx, 0);
p.push(Arrays.asList(keyList));
copyPack(out, p, validate);
// Clear the recent chunks because all of the reader's
// chunk limit should be made available for prefetch.
int cacheLimit = ctx.getOptions().getChunkLimit();
ctx.getRecentChunks().setMaxBytes(0);
try {
Prefetcher p = new Prefetcher(ctx, 0, cacheLimit);
p.push(Arrays.asList(keyList));
copyPack(out, p, validate);
} finally {
ctx.getRecentChunks().setMaxBytes(cacheLimit);
}
}
private void copyPack(PackOutputStream out, Prefetcher prefetcher,

18
org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtReader.java

@ -156,6 +156,10 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs {
return recentInfo;
}
RecentChunks getRecentChunks() {
return recentChunks;
}
DeltaBaseCache getDeltaBaseCache() {
return deltaBaseCache;
}
@ -242,7 +246,7 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs {
// configured as push might invoke our own methods that may
// try to call back into the active prefetcher.
//
Prefetcher p = new Prefetcher(this, OBJ_COMMIT);
Prefetcher p = prefetch(OBJ_COMMIT, readerOptions.getWalkCommitsPrefetchRatio());
p.push(this, roots);
prefetcher = p;
}
@ -256,7 +260,7 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs {
// configured as push might invoke our own methods that may
// try to call back into the active prefetcher.
//
Prefetcher p = new Prefetcher(this, OBJ_TREE);
Prefetcher p = prefetch(OBJ_TREE, readerOptions.getWalkTreesPrefetchRatio());
p.push(this, min.getTree(), max.getTree());
prefetcher = p;
}
@ -391,14 +395,22 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs {
new RepresentationSelector(packer, this, monitor).select(itr);
}
private Prefetcher prefetch(final int type, final int ratio) {
int limit = readerOptions.getChunkLimit();
int prefetchLimit = (int) (limit * (ratio / 100.0));
recentChunks.setMaxBytes(limit - prefetchLimit);
return new Prefetcher(this, type, prefetchLimit);
}
private void endPrefetch() {
recentChunks.setMaxBytes(getOptions().getChunkLimit());
prefetcher = null;
}
@SuppressWarnings("unchecked")
public void writeObjects(PackOutputStream out, List<ObjectToPack> objects)
throws IOException {
prefetcher = new Prefetcher(this, 0);
prefetcher = prefetch(0, readerOptions.getWriteObjectsPrefetchRatio());
try {
List itr = objects;
new ObjectWriter(this, prefetcher).plan(itr);

121
org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtReaderOptions.java

@ -57,7 +57,15 @@ public class DhtReaderOptions {
private boolean prefetchFollowEdgeHints;
private int prefetchLimit;
private int chunkLimit;
private int openQueuePrefetchRatio;
private int walkCommitsPrefetchRatio;
private int walkTreesPrefetchRatio;
private int writeObjectsPrefetchRatio;
private int objectIndexConcurrentBatches;
@ -69,15 +77,18 @@ public class DhtReaderOptions {
private int recentInfoCacheSize;
private int recentChunkCacheSize;
private boolean trackFirstChunkLoad;
/** Create a default reader configuration. */
public DhtReaderOptions() {
setTimeout(Timeout.seconds(5));
setPrefetchFollowEdgeHints(true);
setPrefetchLimit(5 * MiB);
setChunkLimit(5 * MiB);
setOpenQueuePrefetchRatio(20 /* percent */);
setWalkCommitsPrefetchRatio(20 /* percent */);
setWalkTreesPrefetchRatio(20 /* percent */);
setWriteObjectsPrefetchRatio(90 /* percent */);
setObjectIndexConcurrentBatches(2);
setObjectIndexBatchSize(512);
@ -86,7 +97,6 @@ public class DhtReaderOptions {
setDeltaBaseCacheLimit(10 * MiB);
setRecentInfoCacheSize(4096);
setRecentChunkCacheSize(4);
}
/** @return default timeout to wait on long operations before aborting. */
@ -125,19 +135,83 @@ public class DhtReaderOptions {
return this;
}
/** @return number of bytes to load during prefetching. */
public int getPrefetchLimit() {
return prefetchLimit;
/** @return number of bytes to hold within a DhtReader. */
public int getChunkLimit() {
return chunkLimit;
}
/**
* Set the number of bytes the prefetcher should hold onto.
* Set the number of bytes hold within a DhtReader.
*
* @param maxBytes
* @return {@code this}
*/
public DhtReaderOptions setPrefetchLimit(int maxBytes) {
prefetchLimit = Math.max(1024, maxBytes);
public DhtReaderOptions setChunkLimit(int maxBytes) {
chunkLimit = Math.max(1024, maxBytes);
return this;
}
/** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */
public int getOpenQueuePrefetchRatio() {
return openQueuePrefetchRatio;
}
/**
* Set the prefetch ratio used by the open object queue.
*
* @param ratio 0..100.
* @return {@code this}
*/
public DhtReaderOptions setOpenQueuePrefetchRatio(int ratio) {
openQueuePrefetchRatio = Math.max(0, Math.min(ratio, 100));
return this;
}
/** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */
public int getWalkCommitsPrefetchRatio() {
return walkCommitsPrefetchRatio;
}
/**
* Set the prefetch ratio used by the open object queue.
*
* @param ratio 0..100.
* @return {@code this}
*/
public DhtReaderOptions setWalkCommitsPrefetchRatio(int ratio) {
walkCommitsPrefetchRatio = Math.max(0, Math.min(ratio, 100));
return this;
}
/** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */
public int getWalkTreesPrefetchRatio() {
return walkTreesPrefetchRatio;
}
/**
* Set the prefetch ratio used by the open object queue.
*
* @param ratio 0..100.
* @return {@code this}
*/
public DhtReaderOptions setWalkTreesPrefetchRatio(int ratio) {
walkTreesPrefetchRatio = Math.max(0, Math.min(ratio, 100));
return this;
}
/** @return percentage of {@link #getChunkLimit()} used for prefetch, 0..100. */
public int getWriteObjectsPrefetchRatio() {
return writeObjectsPrefetchRatio;
}
/**
* Set the prefetch ratio used by the open object queue.
*
* @param ratio 0..100.
* @return {@code this}
*/
public DhtReaderOptions setWriteObjectsPrefetchRatio(int ratio) {
writeObjectsPrefetchRatio = Math.max(0, Math.min(ratio, 100));
return this;
}
@ -226,24 +300,6 @@ public class DhtReaderOptions {
return this;
}
/** @return number of recent chunks to hold onto per-reader. */
public int getRecentChunkCacheSize() {
return recentChunkCacheSize;
}
/**
* Set the number of chunks each reader holds onto for recently used access.
*
* @param chunkCnt
* the number of chunks each reader retains of recently used
* chunks to smooth out access.
* @return {@code this}
*/
public DhtReaderOptions setRecentChunkCacheSize(int chunkCnt) {
recentChunkCacheSize = Math.max(1, chunkCnt);
return this;
}
/**
* @return true if {@link DhtReader.Statistics} includes the stack trace for
* the first time a chunk is loaded. Supports debugging DHT code.
@ -277,7 +333,11 @@ public class DhtReaderOptions {
public DhtReaderOptions fromConfig(Config rc) {
setTimeout(Timeout.getTimeout(rc, "core", "dht", "timeout", getTimeout()));
setPrefetchFollowEdgeHints(rc.getBoolean("core", "dht", "prefetchFollowEdgeHints", isPrefetchFollowEdgeHints()));
setPrefetchLimit(rc.getInt("core", "dht", "prefetchLimit", getPrefetchLimit()));
setChunkLimit(rc.getInt("core", "dht", "chunkLimit", getChunkLimit()));
setOpenQueuePrefetchRatio(rc.getInt("core", "dht", "openQueuePrefetchRatio", getOpenQueuePrefetchRatio()));
setWalkCommitsPrefetchRatio(rc.getInt("core", "dht", "walkCommitsPrefetchRatio", getWalkCommitsPrefetchRatio()));
setWalkTreesPrefetchRatio(rc.getInt("core", "dht", "walkTreesPrefetchRatio", getWalkTreesPrefetchRatio()));
setWriteObjectsPrefetchRatio(rc.getInt("core", "dht", "writeObjectsPrefetchRatio", getWriteObjectsPrefetchRatio()));
setObjectIndexConcurrentBatches(rc.getInt("core", "dht", "objectIndexConcurrentBatches", getObjectIndexConcurrentBatches()));
setObjectIndexBatchSize(rc.getInt("core", "dht", "objectIndexBatchSize", getObjectIndexBatchSize()));
@ -286,7 +346,6 @@ public class DhtReaderOptions {
setDeltaBaseCacheLimit(rc.getInt("core", "dht", "deltaBaseCacheLimit", getDeltaBaseCacheLimit()));
setRecentInfoCacheSize(rc.getInt("core", "dht", "recentInfoCacheSize", getRecentInfoCacheSize()));
setRecentChunkCacheSize(rc.getInt("core", "dht", "recentChunkCacheSize", getRecentChunkCacheSize()));
setTrackFirstChunkLoad(rc.getBoolean("core", "dht", "debugTrackFirstChunkLoad", isTrackFirstChunkLoad()));
return this;

10
org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/OpenQueue.java

@ -159,6 +159,7 @@ final class OpenQueue<T extends ObjectId> extends QueueObjectLookup<T>
@Override
public void release() {
reader.getRecentChunks().setMaxBytes(reader.getOptions().getChunkLimit());
prefetcher = null;
currChunk = null;
}
@ -173,8 +174,13 @@ final class OpenQueue<T extends ObjectId> extends QueueObjectLookup<T>
list = new ArrayList<ObjectWithInfo<T>>();
byChunk.put(chunkKey, list);
if (prefetcher == null)
prefetcher = new Prefetcher(reader, 0);
if (prefetcher == null) {
int limit = reader.getOptions().getChunkLimit();
int ratio = reader.getOptions().getOpenQueuePrefetchRatio();
int prefetchLimit = (int) (limit * (ratio / 100.0));
reader.getRecentChunks().setMaxBytes(limit - prefetchLimit);
prefetcher = new Prefetcher(reader, 0, prefetchLimit);
}
prefetcher.push(chunkKey);
}
list.add(c);

4
org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/Prefetcher.java

@ -104,7 +104,7 @@ class Prefetcher implements StreamingCallback<Collection<PackChunk.Members>> {
private DhtException error;
Prefetcher(DhtReader reader, int objectType) {
Prefetcher(DhtReader reader, int objectType, int prefetchLimitInBytes) {
this.db = reader.getDatabase();
this.stats = reader.getStatistics();
this.objectType = objectType;
@ -113,7 +113,7 @@ class Prefetcher implements StreamingCallback<Collection<PackChunk.Members>> {
this.queue = new LinkedList<ChunkKey>();
this.followEdgeHints = reader.getOptions().isPrefetchFollowEdgeHints();
this.averageChunkSize = reader.getInserterOptions().getChunkSize();
this.highWaterMark = reader.getOptions().getPrefetchLimit();
this.highWaterMark = prefetchLimitInBytes;
int lwm = (highWaterMark / averageChunkSize) - 4;
if (lwm <= 0)

44
org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/RecentChunks.java

@ -56,11 +56,11 @@ final class RecentChunks {
private final DhtReader.Statistics stats;
private final int maxSize;
private final HashMap<ChunkKey, Node> byKey;
private int curSize;
private int maxBytes;
private int curBytes;
private Node lruHead;
@ -69,8 +69,16 @@ final class RecentChunks {
RecentChunks(DhtReader reader) {
this.reader = reader;
this.stats = reader.getStatistics();
this.maxSize = reader.getOptions().getRecentChunkCacheSize();
this.byKey = new HashMap<ChunkKey, Node>();
this.maxBytes = reader.getOptions().getChunkLimit();
}
void setMaxBytes(int newMax) {
maxBytes = Math.max(0, newMax);
if (0 < maxBytes)
prune();
else
clear();
}
PackChunk get(ChunkKey key) {
@ -91,16 +99,26 @@ final class RecentChunks {
return;
}
if (curSize < maxSize) {
n = new Node();
curSize++;
} else {
n = lruTail;
byKey.remove(n.chunk.getChunkKey());
}
curBytes += chunk.getTotalSize();
prune();
n = new Node();
n.chunk = chunk;
byKey.put(chunk.getChunkKey(), n);
hit(n);
first(n);
}
private void prune() {
while (maxBytes < curBytes) {
Node n = lruTail;
if (n == null)
break;
PackChunk c = n.chunk;
curBytes -= c.getTotalSize();
byKey.remove(c.getChunkKey());
remove(n);
}
}
ObjectLoader open(RepositoryKey repo, AnyObjectId objId, int typeHint)
@ -167,7 +185,7 @@ final class RecentChunks {
}
void clear() {
curSize = 0;
curBytes = 0;
lruHead = null;
lruTail = null;
byKey.clear();

Loading…
Cancel
Save