Browse Source

Split delta search buckets by byte weight

Instead of assuming all objects cost the same amount of time to
delta compress, aggregate the byte size of objects in the list
and partition threads with roughly equal total bytes.

Before splitting the list select the N largest paths and assign
each one to its own thread. This allows threads to get through the
worst cases in parallel before attempting smaller paths that are
more likely to be splittable.

By running the largest path buckets first on each thread the likely
slowest part of compression is done early, while progress is still
reporting a low percentage. This gives users a better impression of
how fast the phase will run. On very complex inputs the slow part
is more likely to happen first, making a user realize its time to
go grab lunch, or even run it overnight.

If the worst sections are earlier, memory overruns may show up
earlier, giving the user a chance to correct the configuration and
try again before wasting large amounts of time. It also makes it
less likely the delta compression phase reaches 92% in 30 minutes
and then crawls for 10 hours through the remaining 8%.

Change-Id: I7621c4349b99e40098825c4966b8411079992e5f
stable-3.0
Shawn Pearce 12 years ago committed by Shawn Pearce
parent
commit
21e4aa2b9e
  1. 200
      org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java
  2. 2
      org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java
  3. 27
      org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java

200
org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java

@ -43,7 +43,12 @@
package org.eclipse.jgit.internal.storage.pack;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
@ -53,7 +58,10 @@ import org.eclipse.jgit.storage.pack.PackConfig;
final class DeltaTask implements Callable<Object> {
static final class Block {
private static final int MIN_TOP_PATH = 50 << 20;
final List<DeltaTask> tasks;
final int threads;
final PackConfig config;
final ObjectReader templateReader;
final DeltaCache dc;
@ -62,10 +70,13 @@ final class DeltaTask implements Callable<Object> {
final int beginIndex;
final int endIndex;
private long totalWeight;
Block(int threads, PackConfig config, ObjectReader reader,
DeltaCache dc, ThreadSafeProgressMonitor pm,
ObjectToPack[] list, int begin, int end) {
this.tasks = new ArrayList<DeltaTask>(threads);
this.threads = threads;
this.config = config;
this.templateReader = reader;
this.dc = dc;
@ -75,7 +86,7 @@ final class DeltaTask implements Callable<Object> {
this.endIndex = end;
}
synchronized Slice stealWork() {
synchronized DeltaWindow stealWork(DeltaTask forThread) {
for (;;) {
DeltaTask maxTask = null;
Slice maxSlice = null;
@ -92,9 +103,122 @@ final class DeltaTask implements Callable<Object> {
if (maxTask == null)
return null;
if (maxTask.tryStealWork(maxSlice))
return maxSlice;
return forThread.initWindow(maxSlice);
}
}
void partitionTasks() {
ArrayList<WeightedPath> topPaths = computeTopPaths();
Iterator<WeightedPath> topPathItr = topPaths.iterator();
int nextTop = 0;
long weightPerThread = totalWeight / threads;
for (int i = beginIndex; i < endIndex;) {
DeltaTask task = new DeltaTask(this);
long w = 0;
// Assign the thread one top path.
if (topPathItr.hasNext()) {
WeightedPath p = topPathItr.next();
w += p.weight;
task.add(p.slice);
}
// Assign the task thread ~average weight.
int s = i;
for (; w < weightPerThread && i < endIndex;) {
if (nextTop < topPaths.size()
&& i == topPaths.get(nextTop).slice.beginIndex) {
if (s < i)
task.add(new Slice(s, i));
s = i = topPaths.get(nextTop++).slice.endIndex;
} else
w += list[i++].getWeight();
}
// Round up the slice to the end of a path.
if (s < i) {
int h = list[i - 1].getPathHash();
while (i < endIndex) {
if (h == list[i].getPathHash())
i++;
else
break;
}
task.add(new Slice(s, i));
}
if (!task.slices.isEmpty())
tasks.add(task);
}
while (topPathItr.hasNext()) {
WeightedPath p = topPathItr.next();
DeltaTask task = new DeltaTask(this);
task.add(p.slice);
tasks.add(task);
}
topPaths = null;
}
private ArrayList<WeightedPath> computeTopPaths() {
ArrayList<WeightedPath> topPaths = new ArrayList<WeightedPath>(
threads);
int cp = beginIndex;
int ch = list[cp].getPathHash();
long cw = list[cp].getWeight();
totalWeight = list[cp].getWeight();
for (int i = cp + 1; i < endIndex; i++) {
ObjectToPack o = list[i];
if (ch != o.getPathHash()) {
if (MIN_TOP_PATH < cw) {
if (topPaths.size() < threads) {
Slice s = new Slice(cp, i);
topPaths.add(new WeightedPath(cw, s));
if (topPaths.size() == threads)
Collections.sort(topPaths);
} else if (topPaths.get(0).weight < cw) {
Slice s = new Slice(cp, i);
WeightedPath p = new WeightedPath(cw, s);
topPaths.set(0, p);
if (p.compareTo(topPaths.get(1)) > 0)
Collections.sort(topPaths);
}
}
cp = i;
ch = o.getPathHash();
cw = 0;
}
if (o.isEdge() || o.doNotAttemptDelta())
continue;
cw += o.getWeight();
totalWeight += o.getWeight();
}
// Sort by starting index to identify gaps later.
Collections.sort(topPaths, new Comparator<WeightedPath>() {
public int compare(WeightedPath a, WeightedPath b) {
return a.slice.beginIndex - b.slice.beginIndex;
}
});
return topPaths;
}
}
static final class WeightedPath implements Comparable<WeightedPath> {
final long weight;
final Slice slice;
WeightedPath(long weight, Slice s) {
this.weight = weight;
this.slice = s;
}
public int compareTo(WeightedPath o) {
int cmp = Long.signum(weight - o.weight);
if (cmp != 0)
return cmp;
return slice.beginIndex - o.slice.beginIndex;
}
}
static final class Slice {
@ -112,36 +236,82 @@ final class DeltaTask implements Callable<Object> {
}
private final Block block;
private final Slice firstSlice;
private volatile DeltaWindow dw;
private final LinkedList<Slice> slices;
DeltaTask(Block b, int beginIndex, int endIndex) {
private ObjectReader or;
private DeltaWindow dw;
DeltaTask(Block b) {
this.block = b;
this.firstSlice = new Slice(beginIndex, endIndex);
this.slices = new LinkedList<Slice>();
}
void add(Slice s) {
if (!slices.isEmpty()) {
Slice last = slices.getLast();
if (last.endIndex == s.beginIndex) {
slices.removeLast();
slices.add(new Slice(last.beginIndex, s.endIndex));
return;
}
}
slices.add(s);
}
public Object call() throws Exception {
ObjectReader or = block.templateReader.newReader();
or = block.templateReader.newReader();
try {
for (Slice s = firstSlice; s != null; s = block.stealWork()) {
dw = new DeltaWindow(block.config, block.dc, or, block.pm,
block.list, s.beginIndex, s.endIndex);
dw.search();
dw = null;
DeltaWindow w;
for (;;) {
synchronized (this) {
if (slices.isEmpty())
break;
w = initWindow(slices.removeFirst());
}
runWindow(w);
}
while ((w = block.stealWork(this)) != null)
runWindow(w);
} finally {
or.release();
block.pm.endWorker();
or.release();
or = null;
}
return null;
}
Slice remaining() {
DeltaWindow initWindow(Slice s) {
DeltaWindow w = new DeltaWindow(block.config, block.dc,
or, block.pm,
block.list, s.beginIndex, s.endIndex);
synchronized (this) {
dw = w;
}
return w;
}
private void runWindow(DeltaWindow w) throws IOException {
try {
w.search();
} finally {
synchronized (this) {
dw = null;
}
}
}
synchronized Slice remaining() {
if (!slices.isEmpty())
return slices.getLast();
DeltaWindow d = dw;
return d != null ? d.remaining() : null;
}
boolean tryStealWork(Slice s) {
synchronized boolean tryStealWork(Slice s) {
if (!slices.isEmpty() && slices.getLast().beginIndex == s.beginIndex) {
slices.removeLast();
return true;
}
DeltaWindow d = dw;
return d != null ? d.tryStealWork(s) : false;
}

2
org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java

@ -134,7 +134,7 @@ final class DeltaWindow {
}
synchronized boolean tryStealWork(DeltaTask.Slice s) {
if (s.beginIndex <= cur)
if (s.beginIndex <= cur || end <= s.beginIndex)
return false;
end = s.beginIndex;
return true;

27
org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java

@ -1337,35 +1337,10 @@ public class PackWriter {
final DeltaCache dc = new ThreadSafeDeltaCache(config);
final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor);
int estSize = cnt / threads;
if (estSize < config.getDeltaSearchWindowSize())
estSize = config.getDeltaSearchWindowSize();
DeltaTask.Block taskBlock = new DeltaTask.Block(threads, config,
reader, dc, pm,
list, 0, cnt);
for (int i = 0; i < cnt;) {
final int start = i;
int end;
if (cnt - i < estSize) {
// If we don't have enough to fill the remaining block,
// schedule what is left over as a single block.
end = cnt;
} else {
// Try to split the block at the end of a path.
end = start + estSize;
int h = list[end - 1].getPathHash();
while (end < cnt) {
if (h == list[end].getPathHash())
end++;
else
break;
}
}
i = end;
taskBlock.tasks.add(new DeltaTask(taskBlock, start, end));
}
taskBlock.partitionTasks();
pm.startWorkers(taskBlock.tasks.size());
final Executor executor = config.getExecutor();

Loading…
Cancel
Save