Browse Source

Honor pack.threads and perform delta search in parallel

If we have multiple CPUs available, packing usually goes faster
when each CPU is assigned a slice of the available search space.
The number of threads to use is guessed from the runtime if it
wasn't set by the caller, or wasn't set in the configuration.

Change-Id: If554fd8973db77632a52a0f45377dd6ec13fc220
Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
stable-0.9
Shawn O. Pearce 15 years ago
parent
commit
74e0835012
  1. 10
      org.eclipse.jgit/src/org/eclipse/jgit/lib/ObjectReader.java
  2. 111
      org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java
  3. 5
      org.eclipse.jgit/src/org/eclipse/jgit/storage/file/WindowCursor.java
  4. 15
      org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaCache.java
  5. 3
      org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackConfig.java
  6. 133
      org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java
  7. 86
      org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/ThreadSafeDeltaCache.java

10
org.eclipse.jgit/src/org/eclipse/jgit/lib/ObjectReader.java

@ -59,6 +59,16 @@ public abstract class ObjectReader {
/** Type hint indicating the caller doesn't know the type. */ /** Type hint indicating the caller doesn't know the type. */
protected static final int OBJ_ANY = -1; protected static final int OBJ_ANY = -1;
/**
* Construct a new reader from the same data.
* <p>
* Applications can use this method to build a new reader from the same data
* source, but for an different thread.
*
* @return a brand new reader, using the same data source.
*/
public abstract ObjectReader newReader();
/** /**
* Does the requested object exist in this database? * Does the requested object exist in this database?
* *

111
org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java

@ -0,0 +1,111 @@
/*
* Copyright (C) 2010, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
* under the terms of the Eclipse Distribution License v1.0 which
* accompanies this distribution, is reproduced below, and is
* available at http://www.eclipse.org/org/documents/edl-v10.php
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
*
* - Neither the name of the Eclipse Foundation, Inc. nor the
* names of its contributors may be used to endorse or promote
* products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
* CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package org.eclipse.jgit.lib;
import java.util.concurrent.locks.ReentrantLock;
/**
* Wrapper around the general {@link ProgressMonitor} to make it thread safe.
*/
public class ThreadSafeProgressMonitor implements ProgressMonitor {
private final ProgressMonitor pm;
private final ReentrantLock lock;
/**
* Wrap a ProgressMonitor to be thread safe.
*
* @param pm
* the underlying monitor to receive events.
*/
public ThreadSafeProgressMonitor(ProgressMonitor pm) {
this.pm = pm;
this.lock = new ReentrantLock();
}
public void start(int totalTasks) {
lock.lock();
try {
pm.start(totalTasks);
} finally {
lock.unlock();
}
}
public void beginTask(String title, int totalWork) {
lock.lock();
try {
pm.beginTask(title, totalWork);
} finally {
lock.unlock();
}
}
public void update(int completed) {
lock.lock();
try {
pm.update(completed);
} finally {
lock.unlock();
}
}
public boolean isCancelled() {
lock.lock();
try {
return pm.isCancelled();
} finally {
lock.unlock();
}
}
public void endTask() {
lock.lock();
try {
pm.endTask();
} finally {
lock.unlock();
}
}
}

5
org.eclipse.jgit/src/org/eclipse/jgit/storage/file/WindowCursor.java

@ -77,6 +77,11 @@ final class WindowCursor extends ObjectReader implements ObjectReuseAsIs {
this.db = db; this.db = db;
} }
@Override
public ObjectReader newReader() {
return new WindowCursor(db);
}
public boolean has(AnyObjectId objectId) throws IOException { public boolean has(AnyObjectId objectId) throws IOException {
return db.has(objectId); return db.has(objectId);
} }

15
org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaCache.java

@ -95,11 +95,7 @@ class DeltaCache {
// The caller may have had to allocate more space than is // The caller may have had to allocate more space than is
// required. If we are about to waste anything, shrink it. // required. If we are about to waste anything, shrink it.
// //
if (data.length != actLen) { data = resize(data, actLen);
byte[] nbuf = new byte[actLen];
System.arraycopy(data, 0, nbuf, 0, actLen);
data = nbuf;
}
// When we reserved space for this item we did it for the // When we reserved space for this item we did it for the
// inflated size of the delta, but we were just given the // inflated size of the delta, but we were just given the
@ -112,6 +108,15 @@ class DeltaCache {
return new Ref(data, queue); return new Ref(data, queue);
} }
byte[] resize(byte[] data, int actLen) {
if (data.length != actLen) {
byte[] nbuf = new byte[actLen];
System.arraycopy(data, 0, nbuf, 0, actLen);
data = nbuf;
}
return data;
}
private void checkForGarbageCollectedObjects() { private void checkForGarbageCollectedObjects() {
Ref r; Ref r;
while ((r = (Ref) queue.poll()) != null) while ((r = (Ref) queue.poll()) != null)

3
org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackConfig.java

@ -71,6 +71,8 @@ class PackConfig {
final long bigFileThreshold; final long bigFileThreshold;
final int threads;
private PackConfig(Config rc) { private PackConfig(Config rc) {
deltaWindow = rc.getInt("pack", "window", PackWriter.DEFAULT_DELTA_SEARCH_WINDOW_SIZE); deltaWindow = rc.getInt("pack", "window", PackWriter.DEFAULT_DELTA_SEARCH_WINDOW_SIZE);
deltaWindowMemory = rc.getLong("pack", null, "windowmemory", 0); deltaWindowMemory = rc.getLong("pack", null, "windowmemory", 0);
@ -80,6 +82,7 @@ class PackConfig {
compression = compression(rc); compression = compression(rc);
indexVersion = rc.getInt("pack", "indexversion", 2); indexVersion = rc.getInt("pack", "indexversion", 2);
bigFileThreshold = rc.getLong("core", null, "bigfilethreshold", PackWriter.DEFAULT_BIG_FILE_THRESHOLD); bigFileThreshold = rc.getLong("core", null, "bigfilethreshold", PackWriter.DEFAULT_BIG_FILE_THRESHOLD);
threads = rc.getInt("pack", "threads", 0);
} }
private static int compression(Config rc) { private static int compression(Config rc) {

133
org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java

@ -58,6 +58,9 @@ import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.zip.Deflater; import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream; import java.util.zip.DeflaterOutputStream;
@ -77,6 +80,7 @@ import org.eclipse.jgit.lib.ObjectLoader;
import org.eclipse.jgit.lib.ObjectReader; import org.eclipse.jgit.lib.ObjectReader;
import org.eclipse.jgit.lib.ProgressMonitor; import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.lib.Repository; import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.lib.ThreadSafeProgressMonitor;
import org.eclipse.jgit.revwalk.ObjectWalk; import org.eclipse.jgit.revwalk.ObjectWalk;
import org.eclipse.jgit.revwalk.RevFlag; import org.eclipse.jgit.revwalk.RevFlag;
import org.eclipse.jgit.revwalk.RevObject; import org.eclipse.jgit.revwalk.RevObject;
@ -233,6 +237,8 @@ public class PackWriter {
private long bigFileThreshold = DEFAULT_BIG_FILE_THRESHOLD; private long bigFileThreshold = DEFAULT_BIG_FILE_THRESHOLD;
private int threads = 1;
private boolean thin; private boolean thin;
private boolean ignoreMissingUninteresting = true; private boolean ignoreMissingUninteresting = true;
@ -289,6 +295,7 @@ public class PackWriter {
compressionLevel = pc.compression; compressionLevel = pc.compression;
indexVersion = pc.indexVersion; indexVersion = pc.indexVersion;
bigFileThreshold = pc.bigFileThreshold; bigFileThreshold = pc.bigFileThreshold;
threads = pc.threads;
} }
private static Config configOf(final Repository repo) { private static Config configOf(final Repository repo) {
@ -452,6 +459,9 @@ public class PackWriter {
/** /**
* Get the number of objects to try when looking for a delta base. * Get the number of objects to try when looking for a delta base.
* <p>
* This limit is per thread, if 4 threads are used the actual memory
* used will be 4 times this value.
* *
* @return the object count to be searched. * @return the object count to be searched.
*/ */
@ -477,6 +487,8 @@ public class PackWriter {
/** /**
* Get the size of the in-memory delta cache. * Get the size of the in-memory delta cache.
* <p>
* This limit is for the entire writer, even if multiple threads are used.
* *
* @return maximum number of bytes worth of delta data to cache in memory. * @return maximum number of bytes worth of delta data to cache in memory.
* If 0 the cache is infinite in size (up to the JVM heap limit * If 0 the cache is infinite in size (up to the JVM heap limit
@ -570,6 +582,26 @@ public class PackWriter {
compressionLevel = level; compressionLevel = level;
} }
/** @return number of threads used for delta compression. */
public int getThreads() {
return threads;
}
/**
* Set the number of threads to use for delta compression.
* <p>
* During delta compression, if there are enough objects to be considered
* the writer will start up concurrent threads and allow them to compress
* different sections of the repository concurrently.
*
* @param threads
* number of threads to use. If <= 0 the number of available
* processors for this JVM is used.
*/
public void setThread(int threads) {
this.threads = threads;
}
/** @return true if this writer is producing a thin pack. */ /** @return true if this writer is producing a thin pack. */
public boolean isThin() { public boolean isThin() {
return thin; return thin;
@ -925,12 +957,107 @@ public class PackWriter {
return true; return true;
} }
private void searchForDeltas(ProgressMonitor monitor, private void searchForDeltas(final ProgressMonitor monitor,
ObjectToPack[] list, int cnt) throws MissingObjectException, final ObjectToPack[] list, final int cnt)
IncorrectObjectTypeException, LargeObjectException, IOException { throws MissingObjectException, IncorrectObjectTypeException,
LargeObjectException, IOException {
if (threads == 0)
threads = Runtime.getRuntime().availableProcessors();
if (threads <= 1 || cnt <= 2 * getDeltaSearchWindowSize()) {
DeltaCache dc = new DeltaCache(this); DeltaCache dc = new DeltaCache(this);
DeltaWindow dw = new DeltaWindow(this, dc, reader); DeltaWindow dw = new DeltaWindow(this, dc, reader);
dw.search(monitor, list, 0, cnt); dw.search(monitor, list, 0, cnt);
return;
}
final List<Throwable> errors = Collections
.synchronizedList(new ArrayList<Throwable>());
final DeltaCache dc = new ThreadSafeDeltaCache(this);
final ProgressMonitor pm = new ThreadSafeProgressMonitor(monitor);
final ExecutorService pool = Executors.newFixedThreadPool(threads);
// Guess at the size of batch we want. Because we don't really
// have a way for a thread to steal work from another thread if
// it ends early, we over partition slightly so the work units
// are a bit smaller.
//
int estSize = cnt / (threads * 2);
if (estSize < 2 * getDeltaSearchWindowSize())
estSize = 2 * getDeltaSearchWindowSize();
for (int i = 0; i < cnt;) {
final int start = i;
final int batchSize;
if (cnt - i < estSize) {
// If we don't have enough to fill the remaining block,
// schedule what is left over as a single block.
//
batchSize = cnt - i;
} else {
// Try to split the block at the end of a path.
//
int end = start + estSize;
while (end < cnt) {
ObjectToPack a = list[end - 1];
ObjectToPack b = list[end];
if (a.getPathHash() == b.getPathHash())
end++;
else
break;
}
batchSize = end - start;
}
i += batchSize;
pool.submit(new Runnable() {
public void run() {
try {
final ObjectReader or = reader.newReader();
try {
DeltaWindow dw;
dw = new DeltaWindow(PackWriter.this, dc, or);
dw.search(pm, list, start, batchSize);
} finally {
or.release();
}
} catch (Throwable err) {
errors.add(err);
}
}
});
}
// Tell the pool to stop.
//
pool.shutdown();
for (;;) {
try {
if (pool.awaitTermination(60, TimeUnit.SECONDS))
break;
} catch (InterruptedException e) {
throw new IOException(
JGitText.get().packingCancelledDuringObjectsWriting);
}
}
// If any thread threw an error, try to report it back as
// though we weren't using a threaded search algorithm.
//
if (!errors.isEmpty()) {
Throwable err = errors.get(0);
if (err instanceof Error)
throw (Error) err;
if (err instanceof RuntimeException)
throw (RuntimeException) err;
if (err instanceof IOException)
throw (IOException) err;
IOException fail = new IOException(err.getMessage());
fail.initCause(err);
throw fail;
}
} }
private void writeObjects(ProgressMonitor writeMonitor, PackOutputStream out) private void writeObjects(ProgressMonitor writeMonitor, PackOutputStream out)

86
org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/ThreadSafeDeltaCache.java

@ -0,0 +1,86 @@
/*
* Copyright (C) 2010, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
* under the terms of the Eclipse Distribution License v1.0 which
* accompanies this distribution, is reproduced below, and is
* available at http://www.eclipse.org/org/documents/edl-v10.php
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
*
* - Neither the name of the Eclipse Foundation, Inc. nor the
* names of its contributors may be used to endorse or promote
* products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
* CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package org.eclipse.jgit.storage.pack;
import java.util.concurrent.locks.ReentrantLock;
class ThreadSafeDeltaCache extends DeltaCache {
private final ReentrantLock lock;
ThreadSafeDeltaCache(PackWriter pw) {
super(pw);
lock = new ReentrantLock();
}
@Override
boolean canCache(int length, ObjectToPack src, ObjectToPack res) {
lock.lock();
try {
return super.canCache(length, src, res);
} finally {
lock.unlock();
}
}
@Override
void credit(int reservedSize) {
lock.lock();
try {
super.credit(reservedSize);
} finally {
lock.unlock();
}
}
@Override
Ref cache(byte[] data, int actLen, int reservedSize) {
data = resize(data, actLen);
lock.lock();
try {
return super.cache(data, actLen, reservedSize);
} finally {
lock.unlock();
}
}
}
Loading…
Cancel
Save