From a86566dcf04df51a445051cf9d12ba9451020a3e Mon Sep 17 00:00:00 2001 From: Dmitry Neverov Date: Fri, 21 Aug 2015 17:44:49 +0200 Subject: [PATCH] Fix hanging fetch via SSH Signaling the need to flush() only via the interrupted status of a copying thread doesn't work realiably with jsch. The write() method of com.jcraft.jsch.Session catches the InterruptedException in several places. As a result StreamCopyThread can easily miss the interrupt if it was interrupted during the dst.write() or dst.flush() call. When it happens, StreamCopyThread will not send some data to the remote side and will not get the response back, because remote side will wait for more data from us. The flushCount field incremented during flush() method guarantees we don't miss flush() even if jsch catches InterruptedException in dst.write() or dst.flush() calls. Checking the flushCount after dst.write() is needed because dst.write() can clear interrupt status, in this case the next blocking src.read() won't throw an exception and we will not call flush(). Flush is performed only after src.read() was blocked and thrown an InterruptedIOException exception, this guarantees that we flush all the data available in src so far (src.read() doesn't block while more is available). FlushCount is reset to 0 only when there were no flush() calls since last blocked read, that means we flushed all data available in src. If there were flush() calls, the interrupt status is restored, so next blocked read will throw InterruptedException and we will flush() again. Change-Id: I692b226edaff502f06235ec05da9052b5fe6478a Signed-off-by: Dmitry Neverov --- .../jgit/util/io/StreamCopyThread.java | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java index 24b8b5333..8d39a22ac 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/util/io/StreamCopyThread.java @@ -47,6 +47,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicInteger; /** Thread to copy from an input stream to an output stream. */ public class StreamCopyThread extends Thread { @@ -58,6 +59,8 @@ public class StreamCopyThread extends Thread { private volatile boolean done; + private final AtomicInteger flushCount = new AtomicInteger(0); + /** * Create a thread to copy data from an input stream to an output stream. * @@ -82,6 +85,7 @@ public class StreamCopyThread extends Thread { * the request. */ public void flush() { + flushCount.incrementAndGet(); interrupt(); } @@ -109,22 +113,30 @@ public class StreamCopyThread extends Thread { public void run() { try { final byte[] buf = new byte[BUFFER_SIZE]; - int interruptCounter = 0; + int flushCountBeforeRead = 0; + boolean readInterrupted = false; for (;;) { try { - if (interruptCounter > 0) { + if (readInterrupted) { dst.flush(); - interruptCounter--; + readInterrupted = false; + if (!flushCount.compareAndSet(flushCountBeforeRead, 0)) { + // There was a flush() call since last blocked read. + // Set interrupt status, so next blocked read will throw + // an InterruptedIOException and we will flush again. + interrupt(); + } } if (done) break; + flushCountBeforeRead = flushCount.get(); final int n; try { n = src.read(buf); } catch (InterruptedIOException wakey) { - interruptCounter++; + readInterrupted = true; continue; } if (n < 0) @@ -141,7 +153,7 @@ public class StreamCopyThread extends Thread { // set interrupt status, which will be checked // when we block in src.read - if (writeInterrupted) + if (writeInterrupted || flushCount.get() > 0) interrupt(); break; }