Browse Source

Steal work from delta threads to rebalance CPU load

If the configuration wants to run 4 threads the delta search work
is initially split somewhat evenly across the 4 threads. During
execution some threads will finish early due to the work not being
split fairly, as the initial partitions were based on object count
and not cost to inflate or size of DeltaIndex.

When a thread finishes early it now tries to take 50% of the work
remaining on a sibling thread, and executes that before exiting.
This repeats as each thread completes until a thread has only 1
object remaining.

Repacking Blink, Chromium's new fork of WebKit (2.2M objects 3.9G):

  [pack]
    reuseDeltas = false
    reuseObjects = false
    depth = 50
    threads = 8
    window = 250
    windowMemory = 800m

  before: ~105% CPU after 80%
  after:  >780% CPU to 100%

Change-Id: I65e45422edd96778aba4b6e5a0fd489ea48e8ca3
stable-3.0
Shawn Pearce 12 years ago
parent
commit
d0a5337625
  1. 98
      org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java
  2. 59
      org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java
  3. 56
      org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java

98
org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaTask.java

@ -43,6 +43,8 @@
package org.eclipse.jgit.internal.storage.pack; package org.eclipse.jgit.internal.storage.pack;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import org.eclipse.jgit.lib.ObjectReader; import org.eclipse.jgit.lib.ObjectReader;
@ -50,42 +52,92 @@ import org.eclipse.jgit.lib.ThreadSafeProgressMonitor;
import org.eclipse.jgit.storage.pack.PackConfig; import org.eclipse.jgit.storage.pack.PackConfig;
final class DeltaTask implements Callable<Object> { final class DeltaTask implements Callable<Object> {
private final PackConfig config; static final class Block {
final List<DeltaTask> tasks;
final PackConfig config;
final ObjectReader templateReader;
final DeltaCache dc;
final ThreadSafeProgressMonitor pm;
final ObjectToPack[] list;
final int beginIndex;
final int endIndex;
private final ObjectReader templateReader; Block(int threads, PackConfig config, ObjectReader reader,
DeltaCache dc, ThreadSafeProgressMonitor pm,
private final DeltaCache dc; ObjectToPack[] list, int begin, int end) {
this.tasks = new ArrayList<DeltaTask>(threads);
this.config = config;
this.templateReader = reader;
this.dc = dc;
this.pm = pm;
this.list = list;
this.beginIndex = begin;
this.endIndex = end;
}
private final ThreadSafeProgressMonitor pm; synchronized Slice stealWork() {
for (int attempts = 0; attempts < 2; attempts++) {
DeltaTask maxTask = null;
int maxWork = 0;
for (DeltaTask task : tasks) {
int r = task.remaining();
if (maxWork < r) {
maxTask = task;
maxWork = r;
}
}
if (maxTask == null)
return null;
Slice s = maxTask.stealWork();
if (s != null)
return s;
}
return null;
}
}
private final int batchSize; static final class Slice {
final int beginIndex;
final int endIndex;
private final int start; Slice(int b, int e) {
beginIndex = b;
endIndex = e;
}
}
private final ObjectToPack[] list; private final Block block;
private final Slice firstSlice;
private volatile DeltaWindow dw;
DeltaTask(PackConfig config, ObjectReader reader, DeltaCache dc, DeltaTask(Block b, int beginIndex, int endIndex) {
ThreadSafeProgressMonitor pm, int batchSize, int start, this.block = b;
ObjectToPack[] list) { this.firstSlice = new Slice(beginIndex, endIndex);
this.config = config;
this.templateReader = reader;
this.dc = dc;
this.pm = pm;
this.batchSize = batchSize;
this.start = start;
this.list = list;
} }
public Object call() throws Exception { public Object call() throws Exception {
final ObjectReader or = templateReader.newReader(); ObjectReader or = block.templateReader.newReader();
try { try {
DeltaWindow dw; for (Slice s = firstSlice; s != null; s = block.stealWork()) {
dw = new DeltaWindow(config, dc, or); dw = new DeltaWindow(block.config, block.dc, or, block.pm,
dw.search(pm, list, start, batchSize); block.list, s.beginIndex, s.endIndex);
dw.search();
dw = null;
}
} finally { } finally {
or.release(); or.release();
pm.endWorker(); block.pm.endWorker();
} }
return null; return null;
} }
int remaining() {
DeltaWindow d = dw;
return d != null ? d.remaining() : 0;
}
Slice stealWork() {
DeltaWindow d = dw;
return d != null ? d.stealWork() : null;
}
} }

59
org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/DeltaWindow.java

@ -56,7 +56,7 @@ import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.storage.pack.PackConfig; import org.eclipse.jgit.storage.pack.PackConfig;
import org.eclipse.jgit.util.TemporaryBuffer; import org.eclipse.jgit.util.TemporaryBuffer;
class DeltaWindow { final class DeltaWindow {
private static final int NEXT_RES = 0; private static final int NEXT_RES = 0;
private static final int NEXT_SRC = 1; private static final int NEXT_SRC = 1;
@ -67,6 +67,8 @@ class DeltaWindow {
private final ObjectReader reader; private final ObjectReader reader;
private final ProgressMonitor monitor;
private final DeltaWindowEntry[] window; private final DeltaWindowEntry[] window;
/** Maximum number of bytes to admit to the window at once. */ /** Maximum number of bytes to admit to the window at once. */
@ -75,6 +77,12 @@ class DeltaWindow {
/** Maximum depth we should create for any delta chain. */ /** Maximum depth we should create for any delta chain. */
private final int maxDepth; private final int maxDepth;
private final ObjectToPack[] toSearch;
private int cur;
private int end;
/** Amount of memory we have loaded right now. */ /** Amount of memory we have loaded right now. */
private long loaded; private long loaded;
@ -102,10 +110,16 @@ class DeltaWindow {
/** Used to compress cached deltas. */ /** Used to compress cached deltas. */
private Deflater deflater; private Deflater deflater;
DeltaWindow(PackConfig pc, DeltaCache dc, ObjectReader or) { DeltaWindow(PackConfig pc, DeltaCache dc, ObjectReader or,
ProgressMonitor pm,
ObjectToPack[] in, int beginIndex, int endIndex) {
config = pc; config = pc;
deltaCache = dc; deltaCache = dc;
reader = or; reader = or;
monitor = pm;
toSearch = in;
cur = beginIndex;
end = endIndex;
// C Git increases the window size supplied by the user by 1. // C Git increases the window size supplied by the user by 1.
// We don't know why it does this, but if the user asks for // We don't know why it does this, but if the user asks for
@ -126,21 +140,48 @@ class DeltaWindow {
maxDepth = config.getMaxDeltaDepth(); maxDepth = config.getMaxDeltaDepth();
} }
void search(ProgressMonitor monitor, ObjectToPack[] toSearch, int off, synchronized int remaining() {
int cnt) throws IOException { return end - cur;
}
synchronized DeltaTask.Slice stealWork() {
int e = end;
int n = (e - cur) >>> 1;
if (0 == n)
return null;
int t = e - n;
int h = toSearch[t].getPathHash();
while (cur < t) {
if (h == toSearch[t - 1].getPathHash())
t--;
else
break;
}
end = t;
return new DeltaTask.Slice(t, e);
}
void search() throws IOException {
try { try {
for (int end = off + cnt; off < end; off++) { for (;;) {
ObjectToPack next;
synchronized (this) {
if (end <= cur)
break;
next = toSearch[cur++];
}
res = window[resSlot]; res = window[resSlot];
if (0 < maxMemory) { if (0 < maxMemory) {
clear(res); clear(res);
int tail = next(resSlot); int tail = next(resSlot);
final long need = estimateSize(toSearch[off]); final long need = estimateSize(next);
while (maxMemory < loaded + need && tail != resSlot) { while (maxMemory < loaded + need && tail != resSlot) {
clear(window[tail]); clear(window[tail]);
tail = next(tail); tail = next(tail);
} }
} }
res.set(toSearch[off]); res.set(next);
if (res.object.isEdge() || res.object.doNotAttemptDelta()) { if (res.object.isEdge() || res.object.doNotAttemptDelta()) {
// We don't actually want to make a delta for // We don't actually want to make a delta for
@ -152,7 +193,7 @@ class DeltaWindow {
// Search for a delta for the current window slot. // Search for a delta for the current window slot.
// //
monitor.update(1); monitor.update(1);
search(); searchInWindow();
} }
} }
} finally { } finally {
@ -181,7 +222,7 @@ class DeltaWindow {
ent.set(null); ent.set(null);
} }
private void search() throws IOException { private void searchInWindow() throws IOException {
// TODO(spearce) If the object is used as a base for other // TODO(spearce) If the object is used as a base for other
// objects in this pack we should limit the depth we create // objects in this pack we should limit the depth we create
// for ourselves to be the remainder of our longest dependent // for ourselves to be the remainder of our longest dependent

56
org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/pack/PackWriter.java

@ -1305,68 +1305,57 @@ public class PackWriter {
threads = Runtime.getRuntime().availableProcessors(); threads = Runtime.getRuntime().availableProcessors();
if (threads <= 1 || cnt <= 2 * config.getDeltaSearchWindowSize()) { if (threads <= 1 || cnt <= 2 * config.getDeltaSearchWindowSize()) {
DeltaCache dc = new DeltaCache(config); new DeltaWindow(config, new DeltaCache(config), reader, monitor,
DeltaWindow dw = new DeltaWindow(config, dc, reader); list, 0, cnt).search();
dw.search(monitor, list, 0, cnt);
return; return;
} }
final DeltaCache dc = new ThreadSafeDeltaCache(config); final DeltaCache dc = new ThreadSafeDeltaCache(config);
final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor); final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor);
// Guess at the size of batch we want. Because we don't really int estSize = cnt / threads;
// have a way for a thread to steal work from another thread if if (estSize < config.getDeltaSearchWindowSize())
// it ends early, we over partition slightly so the work units estSize = config.getDeltaSearchWindowSize();
// are a bit smaller.
//
int estSize = cnt / (threads * 2);
if (estSize < 2 * config.getDeltaSearchWindowSize())
estSize = 2 * config.getDeltaSearchWindowSize();
final List<DeltaTask> myTasks = new ArrayList<DeltaTask>(threads * 2); DeltaTask.Block taskBlock = new DeltaTask.Block(threads, config,
reader, dc, pm,
list, 0, cnt);
for (int i = 0; i < cnt;) { for (int i = 0; i < cnt;) {
final int start = i; final int start = i;
final int batchSize; int end;
if (cnt - i < estSize) { if (cnt - i < estSize) {
// If we don't have enough to fill the remaining block, // If we don't have enough to fill the remaining block,
// schedule what is left over as a single block. // schedule what is left over as a single block.
// end = cnt;
batchSize = cnt - i;
} else { } else {
// Try to split the block at the end of a path. // Try to split the block at the end of a path.
// end = start + estSize;
int end = start + estSize; int h = list[end - 1].getPathHash();
while (end < cnt) { while (end < cnt) {
ObjectToPack a = list[end - 1]; if (h == list[end].getPathHash())
ObjectToPack b = list[end];
if (a.getPathHash() == b.getPathHash())
end++; end++;
else else
break; break;
} }
batchSize = end - start;
} }
i += batchSize; i = end;
myTasks.add(new DeltaTask(config, reader, dc, pm, batchSize, start, list)); taskBlock.tasks.add(new DeltaTask(taskBlock, start, end));
} }
pm.startWorkers(myTasks.size()); pm.startWorkers(taskBlock.tasks.size());
final Executor executor = config.getExecutor(); final Executor executor = config.getExecutor();
final List<Throwable> errors = Collections final List<Throwable> errors = Collections
.synchronizedList(new ArrayList<Throwable>()); .synchronizedList(new ArrayList<Throwable>());
if (executor instanceof ExecutorService) { if (executor instanceof ExecutorService) {
// Caller supplied us a service, use it directly. // Caller supplied us a service, use it directly.
// runTasks((ExecutorService) executor, pm, taskBlock, errors);
runTasks((ExecutorService) executor, pm, myTasks, errors);
} else if (executor == null) { } else if (executor == null) {
// Caller didn't give us a way to run the tasks, spawn up a // Caller didn't give us a way to run the tasks, spawn up a
// temporary thread pool and make sure it tears down cleanly. // temporary thread pool and make sure it tears down cleanly.
//
ExecutorService pool = Executors.newFixedThreadPool(threads); ExecutorService pool = Executors.newFixedThreadPool(threads);
try { try {
runTasks(pool, pm, myTasks, errors); runTasks(pool, pm, taskBlock, errors);
} finally { } finally {
pool.shutdown(); pool.shutdown();
for (;;) { for (;;) {
@ -1383,8 +1372,7 @@ public class PackWriter {
// The caller gave us an executor, but it might not do // The caller gave us an executor, but it might not do
// asynchronous execution. Wrap everything and hope it // asynchronous execution. Wrap everything and hope it
// can schedule these for us. // can schedule these for us.
// for (final DeltaTask task : taskBlock.tasks) {
for (final DeltaTask task : myTasks) {
executor.execute(new Runnable() { executor.execute(new Runnable() {
public void run() { public void run() {
try { try {
@ -1426,9 +1414,9 @@ public class PackWriter {
private static void runTasks(ExecutorService pool, private static void runTasks(ExecutorService pool,
ThreadSafeProgressMonitor pm, ThreadSafeProgressMonitor pm,
List<DeltaTask> tasks, List<Throwable> errors) throws IOException { DeltaTask.Block tb, List<Throwable> errors) throws IOException {
List<Future<?>> futures = new ArrayList<Future<?>>(tasks.size()); List<Future<?>> futures = new ArrayList<Future<?>>(tb.tasks.size());
for (DeltaTask task : tasks) for (DeltaTask task : tb.tasks)
futures.add(pool.submit(task)); futures.add(pool.submit(task));
try { try {

Loading…
Cancel
Save