Browse Source

Merge "Call ProgressMonitor.update() from main thread"

stable-0.10
Shawn O. Pearce 14 years ago committed by Code Review
parent
commit
79ca8a2d19
  1. 189
      org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ThreadSafeProgressMonitorTest.java
  2. 112
      org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java
  3. 8
      org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaTask.java
  4. 18
      org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java

189
org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ThreadSafeProgressMonitorTest.java

@ -0,0 +1,189 @@
/*
* Copyright (C) 2010, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
* under the terms of the Eclipse Distribution License v1.0 which
* accompanies this distribution, is reproduced below, and is
* available at http://www.eclipse.org/org/documents/edl-v10.php
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
*
* - Neither the name of the Eclipse Foundation, Inc. nor the
* names of its contributors may be used to endorse or promote
* products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
* CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package org.eclipse.jgit.lib;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
public class ThreadSafeProgressMonitorTest extends TestCase {
public void testFailsMethodsOnBackgroundThread()
throws InterruptedException {
final MockProgressMonitor mock = new MockProgressMonitor();
final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(mock);
runOnThread(new Runnable() {
public void run() {
try {
pm.start(1);
fail("start did not fail on background thread");
} catch (IllegalStateException notMainThread) {
// Expected result
}
try {
pm.beginTask("title", 1);
fail("beginTask did not fail on background thread");
} catch (IllegalStateException notMainThread) {
// Expected result
}
try {
pm.endTask();
fail("endTask did not fail on background thread");
} catch (IllegalStateException notMainThread) {
// Expected result
}
}
});
// Ensure we didn't alter the mock above when checking threads.
assertNull(mock.taskTitle);
assertEquals(0, mock.value);
}
public void testMethodsOkOnMainThread() {
final MockProgressMonitor mock = new MockProgressMonitor();
final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(mock);
pm.start(1);
assertEquals(1, mock.value);
pm.beginTask("title", 42);
assertEquals("title", mock.taskTitle);
assertEquals(42, mock.value);
pm.update(1);
assertEquals(43, mock.value);
pm.update(2);
assertEquals(45, mock.value);
pm.endTask();
assertNull(mock.taskTitle);
assertEquals(0, mock.value);
}
public void testUpdateOnBackgroundThreads() throws InterruptedException {
final MockProgressMonitor mock = new MockProgressMonitor();
final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(mock);
pm.startWorker();
final CountDownLatch doUpdate = new CountDownLatch(1);
final CountDownLatch didUpdate = new CountDownLatch(1);
final CountDownLatch doEndWorker = new CountDownLatch(1);
final Thread bg = new Thread() {
public void run() {
assertFalse(pm.isCancelled());
await(doUpdate);
pm.update(2);
didUpdate.countDown();
await(doEndWorker);
pm.update(1);
pm.endWorker();
}
};
bg.start();
pm.pollForUpdates();
assertEquals(0, mock.value);
doUpdate.countDown();
await(didUpdate);
pm.pollForUpdates();
assertEquals(2, mock.value);
doEndWorker.countDown();
pm.waitForCompletion();
assertEquals(3, mock.value);
}
private static void await(CountDownLatch cdl) {
try {
assertTrue("latch released", cdl.await(1000, TimeUnit.MILLISECONDS));
} catch (InterruptedException ie) {
fail("Did not expect to be interrupted");
}
}
private static void runOnThread(Runnable task) throws InterruptedException {
Thread t = new Thread(task);
t.start();
t.join(1000);
assertFalse("thread has stopped", t.isAlive());
}
private static class MockProgressMonitor implements ProgressMonitor {
String taskTitle;
int value;
public void update(int completed) {
value += completed;
}
public void start(int totalTasks) {
value = totalTasks;
}
public void beginTask(String title, int totalWork) {
taskTitle = title;
value = totalWork;
}
public void endTask() {
taskTitle = null;
value = 0;
}
public boolean isCancelled() {
return false;
}
}
}

112
org.eclipse.jgit/src/org/eclipse/jgit/lib/ThreadSafeProgressMonitor.java

@ -43,16 +43,35 @@
package org.eclipse.jgit.lib; package org.eclipse.jgit.lib;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
/** /**
* Wrapper around the general {@link ProgressMonitor} to make it thread safe. * Wrapper around the general {@link ProgressMonitor} to make it thread safe.
*
* Updates to the underlying ProgressMonitor are made only from the thread that
* allocated this wrapper. Callers are responsible for ensuring the allocating
* thread uses {@link #pollForUpdates()} or {@link #waitForCompletion()} to
* update the underlying ProgressMonitor.
*
* Only {@link #update(int)}, {@link #isCancelled()}, and {@link #endWorker()}
* may be invoked from a worker thread. All other methods of the ProgressMonitor
* interface can only be called from the thread that allocates this wrapper.
*/ */
public class ThreadSafeProgressMonitor implements ProgressMonitor { public class ThreadSafeProgressMonitor implements ProgressMonitor {
private final ProgressMonitor pm; private final ProgressMonitor pm;
private final ReentrantLock lock; private final ReentrantLock lock;
private final Thread mainThread;
private final AtomicInteger workers;
private final AtomicInteger pendingUpdates;
private final Semaphore process;
/** /**
* Wrap a ProgressMonitor to be thread safe. * Wrap a ProgressMonitor to be thread safe.
* *
@ -62,33 +81,87 @@ public class ThreadSafeProgressMonitor implements ProgressMonitor {
public ThreadSafeProgressMonitor(ProgressMonitor pm) { public ThreadSafeProgressMonitor(ProgressMonitor pm) {
this.pm = pm; this.pm = pm;
this.lock = new ReentrantLock(); this.lock = new ReentrantLock();
this.mainThread = Thread.currentThread();
this.workers = new AtomicInteger(0);
this.pendingUpdates = new AtomicInteger(0);
this.process = new Semaphore(0);
} }
public void start(int totalTasks) { public void start(int totalTasks) {
lock.lock(); if (!isMainThread())
try { throw new IllegalStateException();
pm.start(totalTasks); pm.start(totalTasks);
} finally {
lock.unlock();
}
} }
public void beginTask(String title, int totalWork) { public void beginTask(String title, int totalWork) {
lock.lock(); if (!isMainThread())
try { throw new IllegalStateException();
pm.beginTask(title, totalWork); pm.beginTask(title, totalWork);
} finally {
lock.unlock();
} }
/** Notify the monitor a worker is starting. */
public void startWorker() {
startWorkers(1);
} }
public void update(int completed) { /**
lock.lock(); * Notify the monitor of workers starting.
try { *
pm.update(completed); * @param count
} finally { * the number of worker threads that are starting.
lock.unlock(); */
public void startWorkers(int count) {
workers.addAndGet(count);
} }
/** Notify the monitor a worker is finished. */
public void endWorker() {
if (workers.decrementAndGet() == 0)
process.release();
}
/**
* Non-blocking poll for pending updates.
*
* This method can only be invoked by the same thread that allocated this
* ThreadSafeProgressMonior.
*/
public void pollForUpdates() {
assert isMainThread();
doUpdates();
}
/**
* Process pending updates and wait for workers to finish.
*
* This method can only be invoked by the same thread that allocated this
* ThreadSafeProgressMonior.
*
* @throws InterruptedException
* if the main thread is interrupted while waiting for
* completion of workers.
*/
public void waitForCompletion() throws InterruptedException {
assert isMainThread();
while (0 < workers.get()) {
doUpdates();
process.acquire();
}
doUpdates();
}
private void doUpdates() {
int cnt = pendingUpdates.getAndSet(0);
if (0 < cnt)
pm.update(cnt);
}
public void update(int completed) {
int old = pendingUpdates.getAndAdd(completed);
if (isMainThread())
doUpdates();
else if (old == 0)
process.release();
} }
public boolean isCancelled() { public boolean isCancelled() {
@ -101,11 +174,12 @@ public class ThreadSafeProgressMonitor implements ProgressMonitor {
} }
public void endTask() { public void endTask() {
lock.lock(); if (!isMainThread())
try { throw new IllegalStateException();
pm.endTask(); pm.endTask();
} finally {
lock.unlock();
} }
private boolean isMainThread() {
return Thread.currentThread() == mainThread;
} }
} }

8
org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/DeltaTask.java

@ -46,7 +46,7 @@ package org.eclipse.jgit.storage.pack;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import org.eclipse.jgit.lib.ObjectReader; import org.eclipse.jgit.lib.ObjectReader;
import org.eclipse.jgit.lib.ProgressMonitor; import org.eclipse.jgit.lib.ThreadSafeProgressMonitor;
final class DeltaTask implements Callable<Object> { final class DeltaTask implements Callable<Object> {
private final PackConfig config; private final PackConfig config;
@ -55,7 +55,7 @@ final class DeltaTask implements Callable<Object> {
private final DeltaCache dc; private final DeltaCache dc;
private final ProgressMonitor pm; private final ThreadSafeProgressMonitor pm;
private final int batchSize; private final int batchSize;
@ -64,7 +64,8 @@ final class DeltaTask implements Callable<Object> {
private final ObjectToPack[] list; private final ObjectToPack[] list;
DeltaTask(PackConfig config, ObjectReader reader, DeltaCache dc, DeltaTask(PackConfig config, ObjectReader reader, DeltaCache dc,
ProgressMonitor pm, int batchSize, int start, ObjectToPack[] list) { ThreadSafeProgressMonitor pm, int batchSize, int start,
ObjectToPack[] list) {
this.config = config; this.config = config;
this.templateReader = reader; this.templateReader = reader;
this.dc = dc; this.dc = dc;
@ -82,6 +83,7 @@ final class DeltaTask implements Callable<Object> {
dw.search(pm, list, start, batchSize); dw.search(pm, list, start, batchSize);
} finally { } finally {
or.release(); or.release();
pm.endWorker();
} }
return null; return null;
} }

18
org.eclipse.jgit/src/org/eclipse/jgit/storage/pack/PackWriter.java

@ -59,7 +59,6 @@ import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -675,7 +674,7 @@ public class PackWriter {
} }
final DeltaCache dc = new ThreadSafeDeltaCache(config); final DeltaCache dc = new ThreadSafeDeltaCache(config);
final ProgressMonitor pm = new ThreadSafeProgressMonitor(monitor); final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor(monitor);
// Guess at the size of batch we want. Because we don't really // Guess at the size of batch we want. Because we don't really
// have a way for a thread to steal work from another thread if // have a way for a thread to steal work from another thread if
@ -713,6 +712,7 @@ public class PackWriter {
i += batchSize; i += batchSize;
myTasks.add(new DeltaTask(config, reader, dc, pm, batchSize, start, list)); myTasks.add(new DeltaTask(config, reader, dc, pm, batchSize, start, list));
} }
pm.startWorkers(myTasks.size());
final Executor executor = config.getExecutor(); final Executor executor = config.getExecutor();
final List<Throwable> errors = Collections final List<Throwable> errors = Collections
@ -720,7 +720,7 @@ public class PackWriter {
if (executor instanceof ExecutorService) { if (executor instanceof ExecutorService) {
// Caller supplied us a service, use it directly. // Caller supplied us a service, use it directly.
// //
runTasks((ExecutorService) executor, myTasks, errors); runTasks((ExecutorService) executor, pm, myTasks, errors);
} else if (executor == null) { } else if (executor == null) {
// Caller didn't give us a way to run the tasks, spawn up a // Caller didn't give us a way to run the tasks, spawn up a
@ -728,7 +728,7 @@ public class PackWriter {
// //
ExecutorService pool = Executors.newFixedThreadPool(threads); ExecutorService pool = Executors.newFixedThreadPool(threads);
try { try {
runTasks(pool, myTasks, errors); runTasks(pool, pm, myTasks, errors);
} finally { } finally {
pool.shutdown(); pool.shutdown();
for (;;) { for (;;) {
@ -746,7 +746,6 @@ public class PackWriter {
// asynchronous execution. Wrap everything and hope it // asynchronous execution. Wrap everything and hope it
// can schedule these for us. // can schedule these for us.
// //
final CountDownLatch done = new CountDownLatch(myTasks.size());
for (final DeltaTask task : myTasks) { for (final DeltaTask task : myTasks) {
executor.execute(new Runnable() { executor.execute(new Runnable() {
public void run() { public void run() {
@ -754,14 +753,12 @@ public class PackWriter {
task.call(); task.call();
} catch (Throwable failure) { } catch (Throwable failure) {
errors.add(failure); errors.add(failure);
} finally {
done.countDown();
} }
} }
}); });
} }
try { try {
done.await(); pm.waitForCompletion();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
// We can't abort the other tasks as we have no handle. // We can't abort the other tasks as we have no handle.
// Cross our fingers and just break out anyway. // Cross our fingers and just break out anyway.
@ -789,13 +786,14 @@ public class PackWriter {
} }
} }
private void runTasks(ExecutorService pool, List<DeltaTask> tasks, private void runTasks(ExecutorService pool, ThreadSafeProgressMonitor pm,
List<Throwable> errors) throws IOException { List<DeltaTask> tasks, List<Throwable> errors) throws IOException {
List<Future<?>> futures = new ArrayList<Future<?>>(tasks.size()); List<Future<?>> futures = new ArrayList<Future<?>>(tasks.size());
for (DeltaTask task : tasks) for (DeltaTask task : tasks)
futures.add(pool.submit(task)); futures.add(pool.submit(task));
try { try {
pm.waitForCompletion();
for (Future<?> f : futures) { for (Future<?> f : futures) {
try { try {
f.get(); f.get();

Loading…
Cancel
Save