diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaTask.java b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaTask.java new file mode 100644 index 000000000..11bb3efa7 --- /dev/null +++ b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaTask.java @@ -0,0 +1,88 @@ +/* + * Copyright (C) 2010, Google Inc. + * and other copyright owners as documented in the project's IP log. + * + * This program and the accompanying materials are made available + * under the terms of the Eclipse Distribution License v1.0 which + * accompanies this distribution, is reproduced below, and is + * available at http://www.eclipse.org/org/documents/edl-v10.php + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided + * with the distribution. + * + * - Neither the name of the Eclipse Foundation, Inc. nor the + * names of its contributors may be used to endorse or promote + * products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND + * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package org.eclipse.jgit.storage.pack; + +import java.util.concurrent.Callable; + +import org.eclipse.jgit.lib.ObjectReader; +import org.eclipse.jgit.lib.ProgressMonitor; + +final class DeltaTask implements Callable { + private final PackWriter writer; + + private final ObjectReader templateReader; + + private final DeltaCache dc; + + private final ProgressMonitor pm; + + private final int batchSize; + + private final int start; + + private final ObjectToPack[] list; + + DeltaTask(PackWriter writer, ObjectReader reader, DeltaCache dc, + ProgressMonitor pm, int batchSize, int start, ObjectToPack[] list) { + this.writer = writer; + this.templateReader = reader; + this.dc = dc; + this.pm = pm; + this.batchSize = batchSize; + this.start = start; + this.list = list; + } + + public Object call() throws Exception { + final ObjectReader or = templateReader.newReader(); + try { + DeltaWindow dw; + dw = new DeltaWindow(writer, dc, or); + dw.search(pm, list, start, batchSize); + } finally { + or.release(); + } + return null; + } +} diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java index a9cd45b1d..b2a6697d1 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java @@ -58,8 +58,12 @@ import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; @@ -218,6 +222,8 @@ public class PackWriter { private int threads = 1; + private Executor executor; + private boolean thin; private boolean ignoreMissingUninteresting = true; @@ -603,6 +609,10 @@ public class PackWriter { * During delta compression, if there are enough objects to be considered * the writer will start up concurrent threads and allow them to compress * different sections of the repository concurrently. + *

+ * An application thread pool can be set by {@link #setExecutor(Executor)}. + * If not set a temporary pool will be created by the writer, and torn down + * automatically when compression is over. * * @param threads * number of threads to use. If <= 0 the number of available @@ -612,6 +622,22 @@ public class PackWriter { this.threads = threads; } + /** + * Set the executor to use when using threads. + *

+ * During delta compression if the executor is non-null jobs will be queued + * up on it to perform delta compression in parallel. Aside from setting the + * executor, the caller must set {@link #setThread(int)} to enable threaded + * delta search. + * + * @param executor + * executor to use for threads. Set to null to create a temporary + * executor just for this writer. + */ + public void setExecutor(Executor executor) { + this.executor = executor; + } + /** @return true if this writer is producing a thin pack. */ public boolean isThin() { return thin; @@ -980,11 +1006,8 @@ public class PackWriter { return; } - final List errors = Collections - .synchronizedList(new ArrayList()); final DeltaCache dc = new ThreadSafeDeltaCache(this); final ProgressMonitor pm = new ThreadSafeProgressMonitor(monitor); - final ExecutorService pool = Executors.newFixedThreadPool(threads); // Guess at the size of batch we want. Because we don't really // have a way for a thread to steal work from another thread if @@ -995,6 +1018,7 @@ public class PackWriter { if (estSize < 2 * getDeltaSearchWindowSize()) estSize = 2 * getDeltaSearchWindowSize(); + final List myTasks = new ArrayList(threads * 2); for (int i = 0; i < cnt;) { final int start = i; final int batchSize; @@ -1019,39 +1043,66 @@ public class PackWriter { batchSize = end - start; } i += batchSize; + myTasks.add(new DeltaTask(this, reader, dc, pm, batchSize, start, list)); + } - pool.submit(new Runnable() { - public void run() { + final List errors = Collections + .synchronizedList(new ArrayList()); + if (executor instanceof ExecutorService) { + // Caller supplied us a service, use it directly. + // + runTasks((ExecutorService) executor, myTasks, errors); + + } else if (executor == null) { + // Caller didn't give us a way to run the tasks, spawn up a + // temporary thread pool and make sure it tears down cleanly. + // + ExecutorService pool = Executors.newFixedThreadPool(threads); + try { + runTasks(pool, myTasks, errors); + } finally { + pool.shutdown(); + for (;;) { try { - final ObjectReader or = reader.newReader(); + if (pool.awaitTermination(60, TimeUnit.SECONDS)) + break; + } catch (InterruptedException e) { + throw new IOException( + JGitText.get().packingCancelledDuringObjectsWriting); + } + } + } + } else { + // The caller gave us an executor, but it might not do + // asynchronous execution. Wrap everything and hope it + // can schedule these for us. + // + final CountDownLatch done = new CountDownLatch(myTasks.size()); + for (final DeltaTask task : myTasks) { + executor.execute(new Runnable() { + public void run() { try { - DeltaWindow dw; - dw = new DeltaWindow(PackWriter.this, dc, or); - dw.search(pm, list, start, batchSize); + task.call(); + } catch (Throwable failure) { + errors.add(failure); } finally { - or.release(); + done.countDown(); } - } catch (Throwable err) { - errors.add(err); } - } - }); - } - - // Tell the pool to stop. - // - pool.shutdown(); - for (;;) { + }); + } try { - if (pool.awaitTermination(60, TimeUnit.SECONDS)) - break; - } catch (InterruptedException e) { + done.await(); + } catch (InterruptedException ie) { + // We can't abort the other tasks as we have no handle. + // Cross our fingers and just break out anyway. + // throw new IOException( JGitText.get().packingCancelledDuringObjectsWriting); } } - // If any thread threw an error, try to report it back as + // If any task threw an error, try to report it back as // though we weren't using a threaded search algorithm. // if (!errors.isEmpty()) { @@ -1069,6 +1120,28 @@ public class PackWriter { } } + private void runTasks(ExecutorService pool, List tasks, + List errors) throws IOException { + List> futures = new ArrayList>(tasks.size()); + for (DeltaTask task : tasks) + futures.add(pool.submit(task)); + + try { + for (Future f : futures) { + try { + f.get(); + } catch (ExecutionException failed) { + errors.add(failed.getCause()); + } + } + } catch (InterruptedException ie) { + for (Future f : futures) + f.cancel(true); + throw new IOException( + JGitText.get().packingCancelledDuringObjectsWriting); + } + } + private void writeObjects(ProgressMonitor writeMonitor, PackOutputStream out) throws IOException { for (List list : objectsLists) {