diff --git a/org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ThreadSafeProgressMonitorTest.java b/org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ThreadSafeProgressMonitorTest.java new file mode 100644 index 000000000..6839f8d3c --- /dev/null +++ b/org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ThreadSafeProgressMonitorTest.java @@ -0,0 +1,189 @@ +/* + * Copyright (C) 2010, Google Inc. + * and other copyright owners as documented in the project's IP log. + * + * This program and the accompanying materials are made available + * under the terms of the Eclipse Distribution License v1.0 which + * accompanies this distribution, is reproduced below, and is + * available at http://www.eclipse.org/org/documents/edl-v10.php + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided + * with the distribution. + * + * - Neither the name of the Eclipse Foundation, Inc. nor the + * names of its contributors may be used to endorse or promote + * products derived from this software without specific prior + * written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND + * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package org.eclipse.jgit.lib; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import junit.framework.TestCase; + +public class ThreadSafeProgressMonitorTest extends TestCase { + public void testFailsMethodsOnBackgroundThread() + throws InterruptedException { + final MockProgressMonitor mock = new MockProgressMonitor(); + final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(mock); + + runOnThread(new Runnable() { + public void run() { + try { + pm.start(1); + fail("start did not fail on background thread"); + } catch (IllegalStateException notMainThread) { + // Expected result + } + + try { + pm.beginTask("title", 1); + fail("beginTask did not fail on background thread"); + } catch (IllegalStateException notMainThread) { + // Expected result + } + + try { + pm.endTask(); + fail("endTask did not fail on background thread"); + } catch (IllegalStateException notMainThread) { + // Expected result + } + } + }); + + // Ensure we didn't alter the mock above when checking threads. + assertNull(mock.taskTitle); + assertEquals(0, mock.value); + } + + public void testMethodsOkOnMainThread() { + final MockProgressMonitor mock = new MockProgressMonitor(); + final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(mock); + + pm.start(1); + assertEquals(1, mock.value); + + pm.beginTask("title", 42); + assertEquals("title", mock.taskTitle); + assertEquals(42, mock.value); + + pm.update(1); + assertEquals(43, mock.value); + + pm.update(2); + assertEquals(45, mock.value); + + pm.endTask(); + assertNull(mock.taskTitle); + assertEquals(0, mock.value); + } + + public void testUpdateOnBackgroundThreads() throws InterruptedException { + final MockProgressMonitor mock = new MockProgressMonitor(); + final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(mock); + + pm.startWorker(); + + final CountDownLatch doUpdate = new CountDownLatch(1); + final CountDownLatch didUpdate = new CountDownLatch(1); + final CountDownLatch doEndWorker = new CountDownLatch(1); + + final Thread bg = new Thread() { + public void run() { + assertFalse(pm.isCancelled()); + + await(doUpdate); + pm.update(2); + didUpdate.countDown(); + + await(doEndWorker); + pm.update(1); + pm.endWorker(); + } + }; + bg.start(); + + pm.pollForUpdates(); + assertEquals(0, mock.value); + doUpdate.countDown(); + + await(didUpdate); + pm.pollForUpdates(); + assertEquals(2, mock.value); + + doEndWorker.countDown(); + pm.waitForCompletion(); + assertEquals(3, mock.value); + } + + private static void await(CountDownLatch cdl) { + try { + assertTrue("latch released", cdl.await(1000, TimeUnit.MILLISECONDS)); + } catch (InterruptedException ie) { + fail("Did not expect to be interrupted"); + } + } + + private static void runOnThread(Runnable task) throws InterruptedException { + Thread t = new Thread(task); + t.start(); + t.join(1000); + assertFalse("thread has stopped", t.isAlive()); + } + + private static class MockProgressMonitor implements ProgressMonitor { + String taskTitle; + + int value; + + public void update(int completed) { + value += completed; + } + + public void start(int totalTasks) { + value = totalTasks; + } + + public void beginTask(String title, int totalWork) { + taskTitle = title; + value = totalWork; + } + + public void endTask() { + taskTitle = null; + value = 0; + } + + public boolean isCancelled() { + return false; + } + } +} diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java b/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java index 9708bb2f9..9e8e256b0 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java @@ -43,16 +43,35 @@ package org.eclipse.jgit.lib; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; /** * Wrapper around the general {@link ProgressMonitor} to make it thread safe. + * + * Updates to the underlying ProgressMonitor are made only from the thread that + * allocated this wrapper. Callers are responsible for ensuring the allocating + * thread uses {@link #pollForUpdates()} or {@link #waitForCompletion()} to + * update the underlying ProgressMonitor. + * + * Only {@link #update(int)}, {@link #isCancelled()}, and {@link #endWorker()} + * may be invoked from a worker thread. All other methods of the ProgressMonitor + * interface can only be called from the thread that allocates this wrapper. */ public class ThreadSafeProgressMonitor implements ProgressMonitor { private final ProgressMonitor pm; private final ReentrantLock lock; + private final Thread mainThread; + + private final AtomicInteger workers; + + private final AtomicInteger pendingUpdates; + + private final Semaphore process; + /** * Wrap a ProgressMonitor to be thread safe. * @@ -62,33 +81,87 @@ public class ThreadSafeProgressMonitor implements ProgressMonitor { public ThreadSafeProgressMonitor(ProgressMonitor pm) { this.pm = pm; this.lock = new ReentrantLock(); + this.mainThread = Thread.currentThread(); + this.workers = new AtomicInteger(0); + this.pendingUpdates = new AtomicInteger(0); + this.process = new Semaphore(0); } public void start(int totalTasks) { - lock.lock(); - try { - pm.start(totalTasks); - } finally { - lock.unlock(); - } + if (!isMainThread()) + throw new IllegalStateException(); + pm.start(totalTasks); } public void beginTask(String title, int totalWork) { - lock.lock(); - try { - pm.beginTask(title, totalWork); - } finally { - lock.unlock(); + if (!isMainThread()) + throw new IllegalStateException(); + pm.beginTask(title, totalWork); + } + + /** Notify the monitor a worker is starting. */ + public void startWorker() { + startWorkers(1); + } + + /** + * Notify the monitor of workers starting. + * + * @param count + * the number of worker threads that are starting. + */ + public void startWorkers(int count) { + workers.addAndGet(count); + } + + /** Notify the monitor a worker is finished. */ + public void endWorker() { + if (workers.decrementAndGet() == 0) + process.release(); + } + + /** + * Non-blocking poll for pending updates. + * + * This method can only be invoked by the same thread that allocated this + * ThreadSafeProgressMonior. + */ + public void pollForUpdates() { + assert isMainThread(); + doUpdates(); + } + + /** + * Process pending updates and wait for workers to finish. + * + * This method can only be invoked by the same thread that allocated this + * ThreadSafeProgressMonior. + * + * @throws InterruptedException + * if the main thread is interrupted while waiting for + * completion of workers. + */ + public void waitForCompletion() throws InterruptedException { + assert isMainThread(); + while (0 < workers.get()) { + doUpdates(); + process.acquire(); } + doUpdates(); + } + + private void doUpdates() { + int cnt = pendingUpdates.getAndSet(0); + if (0 < cnt) + pm.update(cnt); } public void update(int completed) { - lock.lock(); - try { - pm.update(completed); - } finally { - lock.unlock(); - } + int old = pendingUpdates.getAndAdd(completed); + if (isMainThread()) + doUpdates(); + else if (old == 0) + process.release(); } public boolean isCancelled() { @@ -101,11 +174,12 @@ public class ThreadSafeProgressMonitor implements ProgressMonitor { } public void endTask() { - lock.lock(); - try { - pm.endTask(); - } finally { - lock.unlock(); - } + if (!isMainThread()) + throw new IllegalStateException(); + pm.endTask(); + } + + private boolean isMainThread() { + return Thread.currentThread() == mainThread; } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaTask.java b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaTask.java index 5e551e9d4..aa0374618 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaTask.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaTask.java @@ -46,7 +46,7 @@ package org.eclipse.jgit.storage.pack; import java.util.concurrent.Callable; import org.eclipse.jgit.lib.ObjectReader; -import org.eclipse.jgit.lib.ProgressMonitor; +import org.eclipse.jgit.lib.ThreadSafeProgressMonitor; final class DeltaTask implements Callable { private final PackConfig config; @@ -55,7 +55,7 @@ final class DeltaTask implements Callable { private final DeltaCache dc; - private final ProgressMonitor pm; + private final ThreadSafeProgressMonitor pm; private final int batchSize; @@ -64,7 +64,8 @@ final class DeltaTask implements Callable { private final ObjectToPack[] list; DeltaTask(PackConfig config, ObjectReader reader, DeltaCache dc, - ProgressMonitor pm, int batchSize, int start, ObjectToPack[] list) { + ThreadSafeProgressMonitor pm, int batchSize, int start, + ObjectToPack[] list) { this.config = config; this.templateReader = reader; this.dc = dc; @@ -82,6 +83,7 @@ final class DeltaTask implements Callable { dw.search(pm, list, start, batchSize); } finally { or.release(); + pm.endWorker(); } return null; } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java index 20c4bb0f9..5986aca4e 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java @@ -59,7 +59,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -675,7 +674,7 @@ public class PackWriter { } final DeltaCache dc = new ThreadSafeDeltaCache(config); - final ProgressMonitor pm = new ThreadSafeProgressMonitor(monitor); + final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor); // Guess at the size of batch we want. Because we don't really // have a way for a thread to steal work from another thread if @@ -713,6 +712,7 @@ public class PackWriter { i += batchSize; myTasks.add(new DeltaTask(config, reader, dc, pm, batchSize, start, list)); } + pm.startWorkers(myTasks.size()); final Executor executor = config.getExecutor(); final List errors = Collections @@ -720,7 +720,7 @@ public class PackWriter { if (executor instanceof ExecutorService) { // Caller supplied us a service, use it directly. // - runTasks((ExecutorService) executor, myTasks, errors); + runTasks((ExecutorService) executor, pm, myTasks, errors); } else if (executor == null) { // Caller didn't give us a way to run the tasks, spawn up a @@ -728,7 +728,7 @@ public class PackWriter { // ExecutorService pool = Executors.newFixedThreadPool(threads); try { - runTasks(pool, myTasks, errors); + runTasks(pool, pm, myTasks, errors); } finally { pool.shutdown(); for (;;) { @@ -746,7 +746,6 @@ public class PackWriter { // asynchronous execution. Wrap everything and hope it // can schedule these for us. // - final CountDownLatch done = new CountDownLatch(myTasks.size()); for (final DeltaTask task : myTasks) { executor.execute(new Runnable() { public void run() { @@ -754,14 +753,12 @@ public class PackWriter { task.call(); } catch (Throwable failure) { errors.add(failure); - } finally { - done.countDown(); } } }); } try { - done.await(); + pm.waitForCompletion(); } catch (InterruptedException ie) { // We can't abort the other tasks as we have no handle. // Cross our fingers and just break out anyway. @@ -789,13 +786,14 @@ public class PackWriter { } } - private void runTasks(ExecutorService pool, List tasks, - List errors) throws IOException { + private void runTasks(ExecutorService pool, ThreadSafeProgressMonitor pm, + List tasks, List errors) throws IOException { List> futures = new ArrayList>(tasks.size()); for (DeltaTask task : tasks) futures.add(pool.submit(task)); try { + pm.waitForCompletion(); for (Future f : futures) { try { f.get();