From d0a533762586ce331a8f967042fa209c847c3c8d Mon Sep 17 00:00:00 2001 From: Shawn Pearce Date: Mon, 8 Apr 2013 18:11:30 -0700 Subject: [PATCH 1/7] Steal work from delta threads to rebalance CPU load If the configuration wants to run 4 threads the delta search work is initially split somewhat evenly across the 4 threads. During execution some threads will finish early due to the work not being split fairly, as the initial partitions were based on object count and not cost to inflate or size of DeltaIndex. When a thread finishes early it now tries to take 50% of the work remaining on a sibling thread, and executes that before exiting. This repeats as each thread completes until a thread has only 1 object remaining. Repacking Blink, Chromium's new fork of WebKit (2.2M objects 3.9G): [pack] reuseDeltas = false reuseObjects = false depth = 50 threads = 8 window = 250 windowMemory = 800m before: ~105% CPU after 80% after: >780% CPU to 100% Change-Id: I65e45422edd96778aba4b6e5a0fd489ea48e8ca3 --- .../jgit/internal/storage/pack/DeltaTask.java | 98 ++++++++++++++----- .../internal/storage/pack/DeltaWindow.java | 59 +++++++++-- .../internal/storage/pack/PackWriter.java | 56 +++++------ 3 files changed, 147 insertions(+), 66 deletions(-) diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java index cb4c5a6f9..218696eef 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java @@ -43,6 +43,8 @@ package org.eclipse.jgit.internal.storage.pack; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.Callable; import org.eclipse.jgit.lib.ObjectReader; @@ -50,42 +52,92 @@ import org.eclipse.jgit.lib.ThreadSafeProgressMonitor; import org.eclipse.jgit.storage.pack.PackConfig; final class DeltaTask implements Callable { - private final PackConfig config; + static final class Block { + final List tasks; + final PackConfig config; + final ObjectReader templateReader; + final DeltaCache dc; + final ThreadSafeProgressMonitor pm; + final ObjectToPack[] list; + final int beginIndex; + final int endIndex; - private final ObjectReader templateReader; - - private final DeltaCache dc; + Block(int threads, PackConfig config, ObjectReader reader, + DeltaCache dc, ThreadSafeProgressMonitor pm, + ObjectToPack[] list, int begin, int end) { + this.tasks = new ArrayList(threads); + this.config = config; + this.templateReader = reader; + this.dc = dc; + this.pm = pm; + this.list = list; + this.beginIndex = begin; + this.endIndex = end; + } - private final ThreadSafeProgressMonitor pm; + synchronized Slice stealWork() { + for (int attempts = 0; attempts < 2; attempts++) { + DeltaTask maxTask = null; + int maxWork = 0; + for (DeltaTask task : tasks) { + int r = task.remaining(); + if (maxWork < r) { + maxTask = task; + maxWork = r; + } + } + if (maxTask == null) + return null; + Slice s = maxTask.stealWork(); + if (s != null) + return s; + } + return null; + } + } - private final int batchSize; + static final class Slice { + final int beginIndex; + final int endIndex; - private final int start; + Slice(int b, int e) { + beginIndex = b; + endIndex = e; + } + } - private final ObjectToPack[] list; + private final Block block; + private final Slice firstSlice; + private volatile DeltaWindow dw; - DeltaTask(PackConfig config, ObjectReader reader, DeltaCache dc, - ThreadSafeProgressMonitor pm, int batchSize, int start, - ObjectToPack[] list) { - this.config = config; - this.templateReader = reader; - this.dc = dc; - this.pm = pm; - this.batchSize = batchSize; - this.start = start; - this.list = list; + DeltaTask(Block b, int beginIndex, int endIndex) { + this.block = b; + this.firstSlice = new Slice(beginIndex, endIndex); } public Object call() throws Exception { - final ObjectReader or = templateReader.newReader(); + ObjectReader or = block.templateReader.newReader(); try { - DeltaWindow dw; - dw = new DeltaWindow(config, dc, or); - dw.search(pm, list, start, batchSize); + for (Slice s = firstSlice; s != null; s = block.stealWork()) { + dw = new DeltaWindow(block.config, block.dc, or, block.pm, + block.list, s.beginIndex, s.endIndex); + dw.search(); + dw = null; + } } finally { or.release(); - pm.endWorker(); + block.pm.endWorker(); } return null; } + + int remaining() { + DeltaWindow d = dw; + return d != null ? d.remaining() : 0; + } + + Slice stealWork() { + DeltaWindow d = dw; + return d != null ? d.stealWork() : null; + } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java index cca5fc064..a4ff9da9f 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java @@ -56,7 +56,7 @@ import org.eclipse.jgit.lib.ProgressMonitor; import org.eclipse.jgit.storage.pack.PackConfig; import org.eclipse.jgit.util.TemporaryBuffer; -class DeltaWindow { +final class DeltaWindow { private static final int NEXT_RES = 0; private static final int NEXT_SRC = 1; @@ -67,6 +67,8 @@ class DeltaWindow { private final ObjectReader reader; + private final ProgressMonitor monitor; + private final DeltaWindowEntry[] window; /** Maximum number of bytes to admit to the window at once. */ @@ -75,6 +77,12 @@ class DeltaWindow { /** Maximum depth we should create for any delta chain. */ private final int maxDepth; + private final ObjectToPack[] toSearch; + + private int cur; + + private int end; + /** Amount of memory we have loaded right now. */ private long loaded; @@ -102,10 +110,16 @@ class DeltaWindow { /** Used to compress cached deltas. */ private Deflater deflater; - DeltaWindow(PackConfig pc, DeltaCache dc, ObjectReader or) { + DeltaWindow(PackConfig pc, DeltaCache dc, ObjectReader or, + ProgressMonitor pm, + ObjectToPack[] in, int beginIndex, int endIndex) { config = pc; deltaCache = dc; reader = or; + monitor = pm; + toSearch = in; + cur = beginIndex; + end = endIndex; // C Git increases the window size supplied by the user by 1. // We don't know why it does this, but if the user asks for @@ -126,21 +140,48 @@ class DeltaWindow { maxDepth = config.getMaxDeltaDepth(); } - void search(ProgressMonitor monitor, ObjectToPack[] toSearch, int off, - int cnt) throws IOException { + synchronized int remaining() { + return end - cur; + } + + synchronized DeltaTask.Slice stealWork() { + int e = end; + int n = (e - cur) >>> 1; + if (0 == n) + return null; + + int t = e - n; + int h = toSearch[t].getPathHash(); + while (cur < t) { + if (h == toSearch[t - 1].getPathHash()) + t--; + else + break; + } + end = t; + return new DeltaTask.Slice(t, e); + } + + void search() throws IOException { try { - for (int end = off + cnt; off < end; off++) { + for (;;) { + ObjectToPack next; + synchronized (this) { + if (end <= cur) + break; + next = toSearch[cur++]; + } res = window[resSlot]; if (0 < maxMemory) { clear(res); int tail = next(resSlot); - final long need = estimateSize(toSearch[off]); + final long need = estimateSize(next); while (maxMemory < loaded + need && tail != resSlot) { clear(window[tail]); tail = next(tail); } } - res.set(toSearch[off]); + res.set(next); if (res.object.isEdge() || res.object.doNotAttemptDelta()) { // We don't actually want to make a delta for @@ -152,7 +193,7 @@ class DeltaWindow { // Search for a delta for the current window slot. // monitor.update(1); - search(); + searchInWindow(); } } } finally { @@ -181,7 +222,7 @@ class DeltaWindow { ent.set(null); } - private void search() throws IOException { + private void searchInWindow() throws IOException { // TODO(spearce) If the object is used as a base for other // objects in this pack we should limit the depth we create // for ourselves to be the remainder of our longest dependent diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java index 38210f362..6577cec83 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java @@ -1305,68 +1305,57 @@ public class PackWriter { threads = Runtime.getRuntime().availableProcessors(); if (threads <= 1 || cnt <= 2 * config.getDeltaSearchWindowSize()) { - DeltaCache dc = new DeltaCache(config); - DeltaWindow dw = new DeltaWindow(config, dc, reader); - dw.search(monitor, list, 0, cnt); + new DeltaWindow(config, new DeltaCache(config), reader, monitor, + list, 0, cnt).search(); return; } final DeltaCache dc = new ThreadSafeDeltaCache(config); final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor); - // Guess at the size of batch we want. Because we don't really - // have a way for a thread to steal work from another thread if - // it ends early, we over partition slightly so the work units - // are a bit smaller. - // - int estSize = cnt / (threads * 2); - if (estSize < 2 * config.getDeltaSearchWindowSize()) - estSize = 2 * config.getDeltaSearchWindowSize(); + int estSize = cnt / threads; + if (estSize < config.getDeltaSearchWindowSize()) + estSize = config.getDeltaSearchWindowSize(); - final List myTasks = new ArrayList(threads * 2); + DeltaTask.Block taskBlock = new DeltaTask.Block(threads, config, + reader, dc, pm, + list, 0, cnt); for (int i = 0; i < cnt;) { final int start = i; - final int batchSize; + int end; if (cnt - i < estSize) { // If we don't have enough to fill the remaining block, // schedule what is left over as a single block. - // - batchSize = cnt - i; + end = cnt; } else { // Try to split the block at the end of a path. - // - int end = start + estSize; + end = start + estSize; + int h = list[end - 1].getPathHash(); while (end < cnt) { - ObjectToPack a = list[end - 1]; - ObjectToPack b = list[end]; - if (a.getPathHash() == b.getPathHash()) + if (h == list[end].getPathHash()) end++; else break; } - batchSize = end - start; } - i += batchSize; - myTasks.add(new DeltaTask(config, reader, dc, pm, batchSize, start, list)); + i = end; + taskBlock.tasks.add(new DeltaTask(taskBlock, start, end)); } - pm.startWorkers(myTasks.size()); + pm.startWorkers(taskBlock.tasks.size()); final Executor executor = config.getExecutor(); final List errors = Collections .synchronizedList(new ArrayList()); if (executor instanceof ExecutorService) { // Caller supplied us a service, use it directly. - // - runTasks((ExecutorService) executor, pm, myTasks, errors); - + runTasks((ExecutorService) executor, pm, taskBlock, errors); } else if (executor == null) { // Caller didn't give us a way to run the tasks, spawn up a // temporary thread pool and make sure it tears down cleanly. - // ExecutorService pool = Executors.newFixedThreadPool(threads); try { - runTasks(pool, pm, myTasks, errors); + runTasks(pool, pm, taskBlock, errors); } finally { pool.shutdown(); for (;;) { @@ -1383,8 +1372,7 @@ public class PackWriter { // The caller gave us an executor, but it might not do // asynchronous execution. Wrap everything and hope it // can schedule these for us. - // - for (final DeltaTask task : myTasks) { + for (final DeltaTask task : taskBlock.tasks) { executor.execute(new Runnable() { public void run() { try { @@ -1426,9 +1414,9 @@ public class PackWriter { private static void runTasks(ExecutorService pool, ThreadSafeProgressMonitor pm, - List tasks, List errors) throws IOException { - List> futures = new ArrayList>(tasks.size()); - for (DeltaTask task : tasks) + DeltaTask.Block tb, List errors) throws IOException { + List> futures = new ArrayList>(tb.tasks.size()); + for (DeltaTask task : tb.tasks) futures.add(pool.submit(task)); try { From eb17495ca4ce95c63bacf81af16ab19ff042b65c Mon Sep 17 00:00:00 2001 From: Shawn Pearce Date: Wed, 10 Apr 2013 09:30:23 -0700 Subject: [PATCH 2/7] Disable CRC32 computation when no PackIndex will be created If a server is streaming 3GiB worth of pack data to a client there is no reason to compute the CRC32 checksum on the objects. The CRC32 code computed by PackWriter is used only in the new index created by writeIndex(), which is never invoked for the native Git network protocols. Object reuse may still compute its own CRC32 to verify the data being copied from an existing pack has not been corrupted. This check is done by the ObjectReader that implements ObjectReuseAsIs and has no relationship to the CRC32 being skipped during output. Change-Id: I05626f2e0d6ce19119b57d8a27193922636d60a7 --- .../storage/pack/PackOutputStream.java | 15 ------- .../internal/storage/pack/PackWriter.java | 40 +++++++++++++++---- .../transport/BasePackPushConnection.java | 1 + .../eclipse/jgit/transport/BundleWriter.java | 1 + .../eclipse/jgit/transport/UploadPack.java | 1 + 5 files changed, 35 insertions(+), 23 deletions(-) diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java index 0b73e6a69..ea6781495 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java @@ -47,7 +47,6 @@ package org.eclipse.jgit.internal.storage.pack; import java.io.IOException; import java.io.OutputStream; import java.security.MessageDigest; -import java.util.zip.CRC32; import org.eclipse.jgit.internal.JGitText; import org.eclipse.jgit.lib.Constants; @@ -64,8 +63,6 @@ public final class PackOutputStream extends OutputStream { private final PackWriter packWriter; - private final CRC32 crc = new CRC32(); - private final MessageDigest md = Constants.newMessageDigest(); private long count; @@ -102,7 +99,6 @@ public final class PackOutputStream extends OutputStream { public void write(final int b) throws IOException { count++; out.write(b); - crc.update(b); md.update((byte) b); } @@ -122,7 +118,6 @@ public final class PackOutputStream extends OutputStream { } out.write(b, off, n); - crc.update(b, off, n); md.update(b, off, n); off += n; @@ -235,16 +230,6 @@ public final class PackOutputStream extends OutputStream { return count; } - /** @return obtain the current CRC32 register. */ - int getCRC32() { - return (int) crc.getValue(); - } - - /** Reinitialize the CRC32 register for a new region. */ - void resetCRC32() { - crc.reset(); - } - /** @return obtain the current SHA-1 digest. */ byte[] getDigest() { return md.digest(); diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java index 6577cec83..2e6812c1c 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java @@ -75,6 +75,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.zip.CRC32; +import java.util.zip.CheckedOutputStream; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; @@ -275,12 +277,16 @@ public class PackWriter { private boolean canBuildBitmaps; + private boolean indexDisabled; + private int depth; private Collection unshallowObjects; private PackBitmapIndexBuilder writeBitmaps; + private CRC32 crc32; + /** * Create writer for specified repository. *

@@ -471,6 +477,19 @@ public class PackWriter { this.useBitmaps = useBitmaps; } + /** @return true if the index file cannot be created by this PackWriter. */ + public boolean isIndexDisabled() { + return indexDisabled || !cachedPacks.isEmpty(); + } + + /** + * @param noIndex + * true to disable creation of the index file. + */ + public void setIndexDisabled(boolean noIndex) { + this.indexDisabled = noIndex; + } + /** * @return true to ignore objects that are uninteresting and also not found * on local disk; false to throw a {@link MissingObjectException} @@ -855,7 +874,7 @@ public class PackWriter { * the index data could not be written to the supplied stream. */ public void writeIndex(final OutputStream indexStream) throws IOException { - if (!cachedPacks.isEmpty()) + if (isIndexDisabled()) throw new IOException(JGitText.get().cachedPacksPreventsIndexCreation); long writeStart = System.currentTimeMillis(); @@ -996,8 +1015,13 @@ public class PackWriter { if (config.isDeltaCompress()) searchForDeltas(compressMonitor); - final PackOutputStream out = new PackOutputStream(writeMonitor, - packStream, this); + crc32 = new CRC32(); + final PackOutputStream out = new PackOutputStream( + writeMonitor, + isIndexDisabled() + ? packStream + : new CheckedOutputStream(packStream, crc32), + this); long objCnt = getObjectCount(); stats.totalObjects = objCnt; @@ -1484,12 +1508,12 @@ public class PackWriter { if (otp.isWritten()) return; // Delta chain cycle caused this to write already. - out.resetCRC32(); + crc32.reset(); otp.setOffset(out.length()); try { reuseSupport.copyObjectAsIs(out, otp, reuseValidate); out.endObject(); - otp.setCRC(out.getCRC32()); + otp.setCRC((int) crc32.getValue()); typeStats.reusedObjects++; if (otp.isDeltaRepresentation()) { typeStats.reusedDeltas++; @@ -1523,7 +1547,7 @@ public class PackWriter { else writeWholeObjectDeflate(out, otp); out.endObject(); - otp.setCRC(out.getCRC32()); + otp.setCRC((int) crc32.getValue()); } private void writeBase(PackOutputStream out, ObjectToPack base) @@ -1537,7 +1561,7 @@ public class PackWriter { final Deflater deflater = deflater(); final ObjectLoader ldr = reader.open(otp, otp.getType()); - out.resetCRC32(); + crc32.reset(); otp.setOffset(out.length()); out.writeHeader(otp, ldr.getSize()); @@ -1551,7 +1575,7 @@ public class PackWriter { final ObjectToPack otp) throws IOException { writeBase(out, otp.getDeltaBase()); - out.resetCRC32(); + crc32.reset(); otp.setOffset(out.length()); DeltaCache.Ref ref = otp.popCachedDelta(); diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java index 60985e7c2..22b458c92 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java @@ -291,6 +291,7 @@ public abstract class BasePackPushConnection extends BasePackConnection implemen newObjects.add(r.getNewObjectId()); } + writer.setIndexDisabled(true); writer.setUseCachedPacks(true); writer.setUseBitmaps(true); writer.setThin(thinPack); diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BundleWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BundleWriter.java index 54c8bf904..4f5cda7ab 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/BundleWriter.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/BundleWriter.java @@ -200,6 +200,7 @@ public class BundleWriter { inc.addAll(include.values()); for (final RevCommit r : assume) exc.add(r.getId()); + packWriter.setIndexDisabled(true); packWriter.setDeltaBaseAsOffset(true); packWriter.setThin(exc.size() > 0); packWriter.setReuseValidatingObjects(false); diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java index 1377a37cd..5347eb713 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java @@ -1116,6 +1116,7 @@ public class UploadPack { cfg = new PackConfig(db); final PackWriter pw = new PackWriter(cfg, walk.getObjectReader()); try { + pw.setIndexDisabled(true); pw.setUseCachedPacks(true); pw.setUseBitmaps(true); pw.setReuseDeltaCommits(true); From 2be6927d8e707458e7efdfa4b585a3dd627c7346 Mon Sep 17 00:00:00 2001 From: Shawn Pearce Date: Wed, 10 Apr 2013 09:33:56 -0700 Subject: [PATCH 3/7] Always allocate the PackOutputStream copyBuffer The getCopyBuffer() is almost always used during output. All known implementations of ObjectReuseAsIs rely on the buffer to be present, and the only sane way to get good performance from PackWriter is to reuse objects during packing. Avoid a branch and test when obtaining this buffer by making sure it is always populated. Change-Id: I200baa0bde5dcdd11bab7787291ad64535c9f7fb --- .../jgit/internal/storage/pack/PackOutputStream.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java index ea6781495..fcf054c9c 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java @@ -67,9 +67,9 @@ public final class PackOutputStream extends OutputStream { private long count; - private byte[] headerBuffer = new byte[32]; + private final byte[] headerBuffer = new byte[32]; - private byte[] copyBuffer; + private final byte[] copyBuffer = new byte[16 << 10]; private long checkCancelAt; @@ -216,8 +216,6 @@ public final class PackOutputStream extends OutputStream { /** @return a temporary buffer writers can use to copy data with. */ public byte[] getCopyBuffer() { - if (copyBuffer == null) - copyBuffer = new byte[16 * 1024]; return copyBuffer; } From 66192817cd52c6f7049be4491787ca40923de014 Mon Sep 17 00:00:00 2001 From: Shawn Pearce Date: Wed, 10 Apr 2013 09:37:19 -0700 Subject: [PATCH 4/7] Declare members of PackOutputStream final These methods cannot be sanely overridden anywhere. Most methods are package visible only, or are private. A few public methods do exist but there is no useful way to override them since creation of PackOutputStream is managed by PackWriter and cannot be delegated. Change-Id: I12cd3326b78d497c1f9751014d04d1460b46e0b0 --- .../storage/pack/PackOutputStream.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java index fcf054c9c..68f74641b 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java @@ -96,14 +96,14 @@ public final class PackOutputStream extends OutputStream { } @Override - public void write(final int b) throws IOException { + public final void write(final int b) throws IOException { count++; out.write(b); md.update((byte) b); } @Override - public void write(final byte[] b, int off, int len) + public final void write(final byte[] b, int off, int len) throws IOException { while (0 < len) { final int n = Math.min(len, BYTES_TO_WRITE_BEFORE_CANCEL_CHECK); @@ -130,7 +130,8 @@ public final class PackOutputStream extends OutputStream { out.flush(); } - void writeFileHeader(int version, long objectCount) throws IOException { + final void writeFileHeader(int version, long objectCount) + throws IOException { System.arraycopy(Constants.PACK_SIGNATURE, 0, headerBuffer, 0, 4); NB.encodeInt32(headerBuffer, 4, version); NB.encodeInt32(headerBuffer, 8, (int) objectCount); @@ -152,7 +153,7 @@ public final class PackOutputStream extends OutputStream { * examine the type of exception and possibly its message to * distinguish between these cases. */ - public void writeObject(ObjectToPack otp) throws IOException { + public final void writeObject(ObjectToPack otp) throws IOException { packWriter.writeObject(this, otp); } @@ -172,7 +173,7 @@ public final class PackOutputStream extends OutputStream { * @throws IOException * the underlying stream refused to accept the header. */ - public void writeHeader(ObjectToPack otp, long rawLength) + public final void writeHeader(ObjectToPack otp, long rawLength) throws IOException { if (otp.isDeltaRepresentation()) { if (packWriter.isDeltaBaseAsOffset()) { @@ -201,7 +202,7 @@ public final class PackOutputStream extends OutputStream { } } - private int encodeTypeSize(int type, long rawLength) { + private final int encodeTypeSize(int type, long rawLength) { long nextLength = rawLength >>> 4; headerBuffer[0] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (type << 4) | (rawLength & 0x0F)); rawLength = nextLength; @@ -215,7 +216,7 @@ public final class PackOutputStream extends OutputStream { } /** @return a temporary buffer writers can use to copy data with. */ - public byte[] getCopyBuffer() { + public final byte[] getCopyBuffer() { return copyBuffer; } @@ -224,12 +225,12 @@ public final class PackOutputStream extends OutputStream { } /** @return total number of bytes written since stream start. */ - public long length() { + public final long length() { return count; } /** @return obtain the current SHA-1 digest. */ - byte[] getDigest() { + final byte[] getDigest() { return md.digest(); } } From d01fe3279530a2939bf917d26e393efb85079ca1 Mon Sep 17 00:00:00 2001 From: Shawn Pearce Date: Wed, 10 Apr 2013 09:41:55 -0700 Subject: [PATCH 5/7] Skip main thread test in ThreadSafeProgressMonitor update(int) is only invoked from a worker thread, in JGit's case this is DeltaTask. The Javadoc of TSPM suggests update should only ever be used by a worker thread. Skip the main thread check, saving some cycles on each run of the progress monitor. Change-Id: I6cb9382d71b4cb3f8e8981c7ac382da25304dfcb --- .../org/eclipse/jgit/lib/ThreadSafeProgressMonitorTest.java | 2 ++ .../src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java | 5 +---- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ThreadSafeProgressMonitorTest.java b/org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ThreadSafeProgressMonitorTest.java index b44970e35..2845f8acf 100644 --- a/org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ThreadSafeProgressMonitorTest.java +++ b/org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ThreadSafeProgressMonitorTest.java @@ -104,9 +104,11 @@ public class ThreadSafeProgressMonitorTest { assertEquals(42, mock.value); pm.update(1); + pm.pollForUpdates(); assertEquals(43, mock.value); pm.update(2); + pm.pollForUpdates(); assertEquals(45, mock.value); pm.endTask(); diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java b/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java index 9e8e256b0..ff85f9b8f 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java @@ -157,10 +157,7 @@ public class ThreadSafeProgressMonitor implements ProgressMonitor { } public void update(int completed) { - int old = pendingUpdates.getAndAdd(completed); - if (isMainThread()) - doUpdates(); - else if (old == 0) + if (0 == pendingUpdates.getAndAdd(completed)) process.release(); } From 46ef61a7027449198abe2f18c3ee1d6c578cf949 Mon Sep 17 00:00:00 2001 From: Shawn Pearce Date: Wed, 10 Apr 2013 10:17:25 -0700 Subject: [PATCH 6/7] Tighten object header writing in PackOutuptStream Most objects are written as OFS_DELTA with the base in the pack, that is why this case comes first in writeHeader(). Rewrite the condition to always examine this first and cache the PackWriter's formatting flag for use of OFS_DELTA headers, in modern Git networks this is true more often then it it is false. Assume the cost of write() is high, especially due to entering the MessageDigest to update the pack footer SHA-1 computation. Combine the OFS_DELTA information as part of the header buffer so that the entire burst is a single write call, rather than two relatively small ones. Most OFS_DELTA headers are <= 6 bytes, so this rewrite tranforms 2 writes of 3 bytes each into 1 write of ~6 bytes. Try to simplify the objectHeader code to reduce branches and use more local registers. This shouldn't really be necessary if the compiler is well optimized, but it isn't very hard to clarify data usage to either javac or the JIT, which may make it easier for the JIT to produce better machine code for this method. Change-Id: I2b12788ad6866076fabbf7fa11f8cce44e963f35 --- .../storage/pack/PackOutputStream.java | 71 +++++++++++-------- 1 file changed, 40 insertions(+), 31 deletions(-) diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java index 68f74641b..2533e906e 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java @@ -44,6 +44,10 @@ package org.eclipse.jgit.internal.storage.pack; +import static org.eclipse.jgit.lib.Constants.OBJ_OFS_DELTA; +import static org.eclipse.jgit.lib.Constants.OBJ_REF_DELTA; +import static org.eclipse.jgit.lib.Constants.PACK_SIGNATURE; + import java.io.IOException; import java.io.OutputStream; import java.security.MessageDigest; @@ -73,6 +77,8 @@ public final class PackOutputStream extends OutputStream { private long checkCancelAt; + private boolean ofsDelta; + /** * Initialize a pack output stream. *

@@ -132,10 +138,11 @@ public final class PackOutputStream extends OutputStream { final void writeFileHeader(int version, long objectCount) throws IOException { - System.arraycopy(Constants.PACK_SIGNATURE, 0, headerBuffer, 0, 4); + System.arraycopy(PACK_SIGNATURE, 0, headerBuffer, 0, 4); NB.encodeInt32(headerBuffer, 4, version); NB.encodeInt32(headerBuffer, 8, (int) objectCount); write(headerBuffer, 0, 12); + ofsDelta = packWriter.isDeltaBaseAsOffset(); } /** @@ -175,43 +182,45 @@ public final class PackOutputStream extends OutputStream { */ public final void writeHeader(ObjectToPack otp, long rawLength) throws IOException { - if (otp.isDeltaRepresentation()) { - if (packWriter.isDeltaBaseAsOffset()) { - ObjectToPack baseInPack = otp.getDeltaBase(); - if (baseInPack != null && baseInPack.isWritten()) { - final long start = count; - int n = encodeTypeSize(Constants.OBJ_OFS_DELTA, rawLength); - write(headerBuffer, 0, n); - - long offsetDiff = start - baseInPack.getOffset(); - n = headerBuffer.length - 1; - headerBuffer[n] = (byte) (offsetDiff & 0x7F); - while ((offsetDiff >>= 7) > 0) - headerBuffer[--n] = (byte) (0x80 | (--offsetDiff & 0x7F)); - write(headerBuffer, n, headerBuffer.length - n); - return; - } - } - - int n = encodeTypeSize(Constants.OBJ_REF_DELTA, rawLength); + ObjectToPack b = otp.getDeltaBase(); + if (b != null && (b.isWritten() & ofsDelta)) { + int n = objectHeader(rawLength, OBJ_OFS_DELTA, headerBuffer); + n = ofsDelta(count - b.getOffset(), headerBuffer, n); + write(headerBuffer, 0, n); + } else if (otp.isDeltaRepresentation()) { + int n = objectHeader(rawLength, OBJ_REF_DELTA, headerBuffer); otp.getDeltaBaseId().copyRawTo(headerBuffer, n); - write(headerBuffer, 0, n + Constants.OBJECT_ID_LENGTH); + write(headerBuffer, 0, n + 20); } else { - int n = encodeTypeSize(otp.getType(), rawLength); + int n = objectHeader(rawLength, otp.getType(), headerBuffer); write(headerBuffer, 0, n); } } - private final int encodeTypeSize(int type, long rawLength) { - long nextLength = rawLength >>> 4; - headerBuffer[0] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (type << 4) | (rawLength & 0x0F)); - rawLength = nextLength; - int n = 1; - while (rawLength > 0) { - nextLength >>>= 7; - headerBuffer[n++] = (byte) ((nextLength > 0 ? 0x80 : 0x00) | (rawLength & 0x7F)); - rawLength = nextLength; + private static final int objectHeader(long len, int type, byte[] buf) { + byte b = (byte) ((type << 4) | (len & 0x0F)); + int n = 0; + for (len >>>= 4; len != 0; len >>>= 7) { + buf[n++] = (byte) (0x80 | b); + b = (byte) (len & 0x7F); } + buf[n++] = b; + return n; + } + + private static final int ofsDelta(long diff, byte[] buf, int p) { + p += ofsDeltaVarIntLength(diff); + int n = p; + buf[--n] = (byte) (diff & 0x7F); + while ((diff >>>= 7) != 0) + buf[--n] = (byte) (0x80 | (--diff & 0x7F)); + return p; + } + + private static final int ofsDeltaVarIntLength(long v) { + int n = 1; + for (; (v >>>= 7) != 0; n++) + --v; return n; } From 6c0bb4351df033b3e79fd9e3f846af7a25864fed Mon Sep 17 00:00:00 2001 From: Shawn Pearce Date: Wed, 10 Apr 2013 13:05:58 -0700 Subject: [PATCH 7/7] Increase PackOutputStream copy buffer to 64 KiB Colby just pointed out to me the buffer was 16 KiB. This may be very small for common objects. Increase to 64 KiB. Change-Id: Ideecc4720655a57673252f7adb8eebdf2fda230d --- .../eclipse/jgit/internal/storage/pack/PackOutputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java index 2533e906e..be1e3d471 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackOutputStream.java @@ -73,7 +73,7 @@ public final class PackOutputStream extends OutputStream { private final byte[] headerBuffer = new byte[32]; - private final byte[] copyBuffer = new byte[16 << 10]; + private final byte[] copyBuffer = new byte[64 << 10]; private long checkCancelAt;