Browse Source

DHT: Remove per-process ChunkCache

Performance testing has indicated the per-process ChunkCache isn't
very effective for the DHT storage implementation.  If a server is
using the DHT storage backend, it is most likely part of a larger
cluster where requests are distributed in a round-robin fashion
between the member servers.

In such a scenario there is insufficient data locality between
requests to get a good hit ratio on the per-process ChunkCache.  A low
hit ratio means the cache is actually hurting performance by eating up
memory that could otherwise be used for transient request data, and
increasing pressure on the GC when it needs to find free space.

Remove all of the ChunkCache code.  Installations that want to cache
(to reduce database usage) should wrap their Database with a
CacheDatabase and use a network based CacheServer.

I left the ChunkCache in the original DHT storage commit because I
wanted to document in the history of the project that its probably
worth *not* having, but leave open a door for someone to revert this
change if they find otherwise at a later date.

Change-Id: I364d0725c46c5a19f7443642a40c89ba4d3fdd29
Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
Signed-off-by: Chris Aniszczyk <caniszczyk@gmail.com>
stable-1.0
Shawn O. Pearce 14 years ago committed by Chris Aniszczyk
parent
commit
7cad0adc7d
  1. 436
      org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/ChunkCache.java
  2. 97
      org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/ChunkCacheConfig.java
  3. 1
      org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtCachedPack.java
  4. 67
      org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtReader.java
  5. 3
      org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/OpenQueue.java
  6. 45
      org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/Prefetcher.java

436
org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/ChunkCache.java

@ -1,436 +0,0 @@
/*
* Copyright (C) 2008-2011, Google Inc.
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
* under the terms of the Eclipse Distribution License v1.0 which
* accompanies this distribution, is reproduced below, and is
* available at http://www.eclipse.org/org/documents/edl-v10.php
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
*
* - Neither the name of the Eclipse Foundation, Inc. nor the
* names of its contributors may be used to endorse or promote
* products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
* CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package org.eclipse.jgit.storage.dht;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jgit.lib.AnyObjectId;
import org.eclipse.jgit.storage.dht.DhtReader.ChunkAndOffset;
/**
* Caches recently used {@link PackChunk} in memory for faster read access.
* <p>
* During a miss, older entries are evicted from the cache so long as
* {@link #isFull()} returns true.
* <p>
* Its too expensive during object access to be 100% accurate with a least
* recently used (LRU) algorithm. Strictly ordering every read is a lot of
* overhead that typically doesn't yield a corresponding benefit to the
* application.
* <p>
* This cache implements a loose LRU policy by randomly picking a window
* comprised of roughly 10% of the cache, and evicting the oldest accessed entry
* within that window.
* <p>
* Entities created by the cache are held under SoftReferences, permitting the
* Java runtime's garbage collector to evict entries when heap memory gets low.
* Most JREs implement a loose least recently used algorithm for this eviction.
* <p>
* The internal hash table does not expand at runtime, instead it is fixed in
* size at cache creation time. The internal lock table used to gate load
* invocations is also fixed in size.
* <p>
* To maintain higher concurrency workloads, during eviction only one thread
* performs the eviction work, while other threads can continue to insert new
* objects in parallel. This means that the cache can be temporarily over limit,
* especially if the nominated eviction thread is being starved relative to the
* other threads.
*/
public class ChunkCache {
private static final Random rng = new Random();
private static volatile ChunkCache cache;
static {
cache = new ChunkCache(new ChunkCacheConfig());
}
/**
* Modify the configuration of the chunk cache.
* <p>
* The new configuration is applied immediately. If the new limits are
* smaller than what what is currently cached, older entries will be purged
* as soon as possible to allow the cache to meet the new limit.
*
* @param cfg
* the new chunk cache configuration.
* @throws IllegalArgumentException
* the cache configuration contains one or more invalid
* settings, usually too low of a limit.
*/
public static void reconfigure(ChunkCacheConfig cfg) {
ChunkCache nc = new ChunkCache(cfg);
cache = nc;
}
static ChunkCache get() {
return cache;
}
/** ReferenceQueue to cleanup released and garbage collected windows. */
private final ReferenceQueue<PackChunk> queue;
/** Number of entries in {@link #table}. */
private final int tableSize;
/** Access clock for loose LRU. */
private final AtomicLong clock;
/** Hash bucket directory; entries are chained below. */
private final AtomicReferenceArray<Entry> table;
/** Locks to prevent concurrent loads for same (ChunkKey,position). */
private final Lock[] locks;
/** Lock to elect the eviction thread after a load occurs. */
private final ReentrantLock evictLock;
/** Number of {@link #table} buckets to scan for an eviction window. */
private final int evictBatch;
private final long maxBytes;
private final AtomicLong openBytes;
private ChunkCache(ChunkCacheConfig cfg) {
tableSize = tableSize(cfg);
final int lockCount = lockCount(cfg);
if (tableSize < 0)
throw new IllegalArgumentException();
if (lockCount < 0)
throw new IllegalArgumentException();
queue = new ReferenceQueue<PackChunk>();
clock = new AtomicLong(1);
table = new AtomicReferenceArray<Entry>(tableSize);
locks = new Lock[lockCount];
for (int i = 0; i < locks.length; i++)
locks[i] = new Lock();
evictLock = new ReentrantLock();
int eb = (int) (tableSize * .1);
if (64 < eb)
eb = 64;
else if (eb < 4)
eb = 4;
if (tableSize < eb)
eb = tableSize;
evictBatch = eb;
maxBytes = cfg.getChunkCacheLimit();
openBytes = new AtomicLong();
}
long getOpenBytes() {
return openBytes.get();
}
private Ref createRef(ChunkKey key, PackChunk v) {
final Ref ref = new Ref(key, v, queue);
openBytes.addAndGet(ref.size);
return ref;
}
private void clear(Ref ref) {
openBytes.addAndGet(-ref.size);
}
private boolean isFull() {
return maxBytes < openBytes.get();
}
private static int tableSize(ChunkCacheConfig cfg) {
final int csz = 1 * ChunkCacheConfig.MiB;
final long limit = cfg.getChunkCacheLimit();
if (limit == 0)
return 0;
if (csz <= 0)
throw new IllegalArgumentException();
if (limit < csz)
throw new IllegalArgumentException();
return (int) Math.min(5 * (limit / csz) / 2, 2000000000);
}
private static int lockCount(ChunkCacheConfig cfg) {
if (cfg.getChunkCacheLimit() == 0)
return 0;
return 32;
}
PackChunk get(ChunkKey chunkKey) {
if (tableSize == 0)
return null;
return scan(table.get(slot(chunkKey)), chunkKey);
}
ChunkAndOffset find(RepositoryKey repo, AnyObjectId objId) {
// TODO(spearce) This method violates our no-collision rules.
// Its possible for a duplicate object to be uploaded into a new
// chunk, and have that get used if the new chunk is pulled into
// the process cache for a different object.
for (int slot = 0; slot < tableSize; slot++) {
for (Entry e = table.get(slot); e != null; e = e.next) {
PackChunk chunk = e.ref.get();
if (chunk != null) {
int pos = chunk.findOffset(repo, objId);
if (0 <= pos) {
hit(e.ref);
return new ChunkAndOffset(chunk, pos);
}
}
}
}
return null;
}
PackChunk put(PackChunk chunk) {
if (tableSize == 0)
return chunk;
final ChunkKey chunkKey = chunk.getChunkKey();
final int slot = slot(chunkKey);
final Entry e1 = table.get(slot);
PackChunk v = scan(e1, chunkKey);
if (v != null)
return v;
synchronized (lock(chunkKey)) {
Entry e2 = table.get(slot);
if (e2 != e1) {
v = scan(e2, chunkKey);
if (v != null)
return v;
}
v = chunk;
final Ref ref = createRef(chunkKey, v);
hit(ref);
for (;;) {
final Entry n = new Entry(clean(e2), ref);
if (table.compareAndSet(slot, e2, n))
break;
e2 = table.get(slot);
}
}
if (evictLock.tryLock()) {
try {
gc();
evict();
} finally {
evictLock.unlock();
}
}
return v;
}
private PackChunk scan(Entry n, ChunkKey chunk) {
for (; n != null; n = n.next) {
Ref r = n.ref;
if (r.chunk.equals(chunk)) {
PackChunk v = r.get();
if (v != null) {
hit(r);
return v;
}
n.kill();
break;
}
}
return null;
}
private void hit(final Ref r) {
// We don't need to be 100% accurate here. Its sufficient that at least
// one thread performs the increment. Any other concurrent access at
// exactly the same time can simply use the same clock value.
//
// Consequently we attempt the set, but we don't try to recover should
// it fail. This is why we don't use getAndIncrement() here.
//
final long c = clock.get();
clock.compareAndSet(c, c + 1);
r.lastAccess = c;
}
private void evict() {
while (isFull()) {
int ptr = rng.nextInt(tableSize);
Entry old = null;
int slot = 0;
for (int b = evictBatch - 1; b >= 0; b--, ptr++) {
if (tableSize <= ptr)
ptr = 0;
for (Entry e = table.get(ptr); e != null; e = e.next) {
if (e.dead)
continue;
if (old == null || e.ref.lastAccess < old.ref.lastAccess) {
old = e;
slot = ptr;
}
}
}
if (old != null) {
old.kill();
gc();
final Entry e1 = table.get(slot);
table.compareAndSet(slot, e1, clean(e1));
}
}
}
private void gc() {
Ref r;
while ((r = (Ref) queue.poll()) != null) {
// Sun's Java 5 and 6 implementation have a bug where a Reference
// can be enqueued and dequeued twice on the same reference queue
// due to a race condition within ReferenceQueue.enqueue(Reference).
//
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6837858
//
// We CANNOT permit a Reference to come through us twice, as it will
// skew the resource counters we maintain. Our canClear() check here
// provides a way to skip the redundant dequeues, if any.
//
if (r.canClear()) {
clear(r);
boolean found = false;
final int s = slot(r.chunk);
final Entry e1 = table.get(s);
for (Entry n = e1; n != null; n = n.next) {
if (n.ref == r) {
n.dead = true;
found = true;
break;
}
}
if (found)
table.compareAndSet(s, e1, clean(e1));
}
}
}
private int slot(ChunkKey chunk) {
return (chunk.hashCode() >>> 1) % tableSize;
}
private Lock lock(ChunkKey chunk) {
return locks[(chunk.hashCode() >>> 1) % locks.length];
}
private static Entry clean(Entry top) {
while (top != null && top.dead) {
top.ref.enqueue();
top = top.next;
}
if (top == null)
return null;
final Entry n = clean(top.next);
return n == top.next ? top : new Entry(n, top.ref);
}
private static class Entry {
/** Next entry in the hash table's chain list. */
final Entry next;
/** The referenced object. */
final Ref ref;
/**
* Marked true when ref.get() returns null and the ref is dead.
* <p>
* A true here indicates that the ref is no longer accessible, and that
* we therefore need to eventually purge this Entry object out of the
* bucket's chain.
*/
volatile boolean dead;
Entry(final Entry n, final Ref r) {
next = n;
ref = r;
}
final void kill() {
dead = true;
ref.enqueue();
}
}
/** A soft reference wrapped around a cached object. */
private static class Ref extends SoftReference<PackChunk> {
final ChunkKey chunk;
final int size;
long lastAccess;
private boolean cleared;
Ref(ChunkKey chunk, PackChunk v, ReferenceQueue<PackChunk> queue) {
super(v, queue);
this.chunk = chunk;
this.size = v.getTotalSize();
}
final synchronized boolean canClear() {
if (cleared)
return false;
cleared = true;
return true;
}
}
private static final class Lock {
// Used only for its implicit monitor.
}
}

97
org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/ChunkCacheConfig.java

@ -1,97 +0,0 @@
/*
* Copyright (C) 2011, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
* under the terms of the Eclipse Distribution License v1.0 which
* accompanies this distribution, is reproduced below, and is
* available at http://www.eclipse.org/org/documents/edl-v10.php
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
*
* - Neither the name of the Eclipse Foundation, Inc. nor the
* names of its contributors may be used to endorse or promote
* products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
* CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package org.eclipse.jgit.storage.dht;
import org.eclipse.jgit.lib.Config;
/** Configuration parameters for {@link ChunkCache}. */
public class ChunkCacheConfig {
/** 1024 (number of bytes in one kibibyte/kilobyte) */
public static final int KiB = 1024;
/** 1024 {@link #KiB} (number of bytes in one mebibyte/megabyte) */
public static final int MiB = 1024 * KiB;
private long chunkCacheLimit;
/** Create a default configuration. */
public ChunkCacheConfig() {
setChunkCacheLimit(10 * MiB);
}
/**
* @return maximum number bytes of heap memory to dedicate to caching pack
* file data. If the limit is configured to 0, the chunk cache is
* disabled. <b>Default is 10 MB.</b>
*/
public long getChunkCacheLimit() {
return chunkCacheLimit;
}
/**
* @param newLimit
* maximum number bytes of heap memory to dedicate to caching
* pack file data.
* @return {@code this}
*/
public ChunkCacheConfig setChunkCacheLimit(final long newLimit) {
chunkCacheLimit = Math.max(0, newLimit);
return this;
}
/**
* Update properties by setting fields from the configuration.
* <p>
* If a property is not defined in the configuration, then it is left
* unmodified.
*
* @param rc
* configuration to read properties from.
* @return {@code this}
*/
public ChunkCacheConfig fromConfig(final Config rc) {
setChunkCacheLimit(rc.getLong("core", "dht", "chunkCacheLimit", getChunkCacheLimit()));
return this;
}
}

1
org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtCachedPack.java

@ -98,7 +98,6 @@ public class DhtCachedPack extends CachedPack {
void copyAsIs(PackOutputStream out, boolean validate, DhtReader ctx)
throws IOException {
Prefetcher p = new Prefetcher(ctx, 0);
p.setCacheLoadedChunks(false);
p.push(info.chunks);
copyPack(out, ctx, p, validate);
}

67
org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/DhtReader.java

@ -195,10 +195,6 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs {
if (repository.getRefDatabase().findChunk(objId) != null)
return true;
// TODO(spearce) This is expensive. Is it worthwhile?
if (ChunkCache.get().find(repo, objId) != null)
return true;
return !find(objId).isEmpty();
}
@ -210,7 +206,7 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs {
if (ldr != null)
return ldr;
ChunkAndOffset p = getChunk(objId, typeHint, true, false);
ChunkAndOffset p = getChunk(objId, typeHint, false);
ldr = PackChunk.read(p.chunk, p.offset, this, typeHint);
recentChunk(p.chunk);
return ldr;
@ -274,19 +270,12 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs {
recentChunks.put(chunk);
}
ChunkAndOffset getChunk(AnyObjectId objId, int typeHint, boolean recent)
throws DhtException, MissingObjectException {
return getChunk(objId, typeHint, true /* load */, recent);
ChunkAndOffset getChunkGently(AnyObjectId objId) {
return recentChunks.find(repo, objId);
}
ChunkAndOffset getChunkGently(AnyObjectId objId, int typeHint)
ChunkAndOffset getChunk(AnyObjectId objId, int typeHint, boolean checkRecent)
throws DhtException, MissingObjectException {
return getChunk(objId, typeHint, false /* no load */, true /* recent */);
}
private ChunkAndOffset getChunk(AnyObjectId objId, int typeHint,
boolean loadIfRequired, boolean checkRecent) throws DhtException,
MissingObjectException {
if (checkRecent) {
ChunkAndOffset r = recentChunks.find(repo, objId);
if (r != null)
@ -298,38 +287,21 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs {
key = ((RefData.IdWithChunk) objId).getChunkKey();
else
key = repository.getRefDatabase().findChunk(objId);
if (key != null) {
PackChunk chunk = ChunkCache.get().get(key);
if (chunk != null) {
PackChunk chunk = load(key);
if (chunk != null && chunk.hasIndex()) {
int pos = chunk.findOffset(repo, objId);
if (0 <= pos)
return new ChunkAndOffset(chunk, pos);
}
if (loadIfRequired) {
chunk = load(key);
if (chunk != null && chunk.hasIndex()) {
int pos = chunk.findOffset(repo, objId);
if (0 <= pos) {
chunk = ChunkCache.get().put(chunk);
return new ChunkAndOffset(chunk, pos);
}
}
}
// The hint above is stale. Fall through and do a
// more exhaustive lookup to find the object.
}
ChunkAndOffset r = ChunkCache.get().find(repo, objId);
if (r != null)
return r;
if (!loadIfRequired)
return null;
if (prefetcher != null) {
r = prefetcher.find(repo, objId);
ChunkAndOffset r = prefetcher.find(repo, objId);
if (r != null)
return r;
}
@ -352,8 +324,6 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs {
continue;
}
if (chunk.hasIndex())
chunk = ChunkCache.get().put(chunk);
return new ChunkAndOffset(chunk, link.getOffset());
}
@ -372,10 +342,6 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs {
if (r != null)
return r.chunk.getChunkKey();
r = ChunkCache.get().find(repo, objId);
if (r != null)
return r.chunk.getChunkKey();
for (ObjectInfo link : find(objId))
return link.getChunkKey();
@ -394,16 +360,9 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs {
if (chunk != null)
return chunk;
chunk = ChunkCache.get().get(key);
if (chunk != null)
return chunk;
chunk = load(key);
if (chunk != null) {
if (chunk.hasIndex())
return ChunkCache.get().put(chunk);
if (chunk != null)
return chunk;
}
throw new DhtMissingChunkException(key);
}
@ -440,7 +399,6 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs {
public void writeObjects(PackOutputStream out, List<ObjectToPack> objects)
throws IOException {
prefetcher = new Prefetcher(this, 0);
prefetcher.setCacheLoadedChunks(false);
try {
List itr = objects;
new ObjectWriter(this, prefetcher).plan(itr);
@ -678,13 +636,6 @@ public class DhtReader extends ObjectReader implements ObjectReuseAsIs {
*/
public int cntPrefetcher_Load;
/**
* Number of times the prefetcher obtained from {@link ChunkCache}.
* Incremented when the prefetcher recovered the chunk from the
* local JVM chunk cache and thus avoided reading the database.
*/
public int cntPrefetcher_ChunkCacheHit;
/**
* Number of times the prefetcher ordering was wrong. Incremented if
* a reader wants a chunk but the prefetcher didn't have it ready at

3
org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/OpenQueue.java

@ -59,8 +59,7 @@ import org.eclipse.jgit.lib.ObjectReader;
/**
* Locates objects in large batches, then opens them clustered by chunk.
* <p>
* To simplify the implementation this method does not consult the local
* {@link ChunkCache} for objects. Instead it performs lookups for the
* To simplify the implementation this method performs lookups for the
* {@link ObjectInfo} in large batches, clusters those by ChunkKey, and loads
* the chunks with a {@link Prefetcher}.
* <p>

45
org.eclipse.jgit.storage.dht/src/org/eclipse/jgit/storage/dht/Prefetcher.java

@ -91,8 +91,6 @@ class Prefetcher implements StreamingCallback<Collection<PackChunk.Members>> {
private final int lowWaterMark;
private boolean cacheLoadedChunks;
private boolean first = true;
private boolean automaticallyPushHints = true;
@ -120,19 +118,13 @@ class Prefetcher implements StreamingCallback<Collection<PackChunk.Members>> {
if (lwm <= 0)
lwm = (highWaterMark / averageChunkSize) / 2;
lowWaterMark = lwm * averageChunkSize;
cacheLoadedChunks = true;
}
boolean isType(int type) {
return objectType == type;
}
synchronized void setCacheLoadedChunks(boolean cacheLoadedChunks) {
this.cacheLoadedChunks = cacheLoadedChunks;
}
void push(DhtReader ctx, Collection<RevCommit> roots) throws DhtException,
MissingObjectException {
void push(DhtReader ctx, Collection<RevCommit> roots) {
// Approximate walk by using hints from the most recent commit.
// Since the commits were recently parsed by the reader, we can
// ask the reader for their chunk locations and most likely get
@ -143,7 +135,7 @@ class Prefetcher implements StreamingCallback<Collection<PackChunk.Members>> {
for (RevCommit cmit : roots) {
if (time < cmit.getCommitTime()) {
ChunkAndOffset p = ctx.getChunkGently(cmit, cmit.getType());
ChunkAndOffset p = ctx.getChunkGently(cmit);
if (p != null && p.chunk.getMeta() != null) {
time = cmit.getCommitTime();
chunk = p.chunk;
@ -254,8 +246,7 @@ class Prefetcher implements StreamingCallback<Collection<PackChunk.Members>> {
}
}
synchronized ChunkAndOffset find(
@SuppressWarnings("hiding") RepositoryKey repo, AnyObjectId objId) {
synchronized ChunkAndOffset find(RepositoryKey repo, AnyObjectId objId) {
for (PackChunk c : ready.values()) {
int p = c.findOffset(repo, objId);
if (0 <= p)
@ -341,9 +332,6 @@ class Prefetcher implements StreamingCallback<Collection<PackChunk.Members>> {
private PackChunk useReadyChunk(ChunkKey key) {
PackChunk chunk = ready.remove(key);
if (cacheLoadedChunks)
chunk = ChunkCache.get().put(chunk);
status.put(chunk.getChunkKey(), Status.DONE);
bytesReady -= chunk.getTotalSize();
@ -366,26 +354,19 @@ class Prefetcher implements StreamingCallback<Collection<PackChunk.Members>> {
// set's iterator order to load in the order we want data.
//
LinkedHashSet<ChunkKey> toLoad = new LinkedHashSet<ChunkKey>();
ChunkCache cache = ChunkCache.get();
while (bytesReady + bytesLoading < highWaterMark && !queue.isEmpty()) {
ChunkKey key = queue.removeFirst();
PackChunk chunk = cache.get(key);
if (chunk != null) {
stats.access(key).cntPrefetcher_ChunkCacheHit++;
chunkIsReady(chunk);
} else {
stats.access(key).cntPrefetcher_Load++;
toLoad.add(key);
status.put(key, Status.LOADING);
bytesLoading += averageChunkSize;
// For the first chunk, start immediately to reduce the
// startup latency associated with additional chunks.
if (first)
break;
}
stats.access(key).cntPrefetcher_Load++;
toLoad.add(key);
status.put(key, Status.LOADING);
bytesLoading += averageChunkSize;
// For the first chunk, start immediately to reduce the
// startup latency associated with additional chunks.
if (first)
break;
}
if (!toLoad.isEmpty() && error == null)

Loading…
Cancel
Save