@ -61,6 +61,9 @@ public class StreamCopyThread extends Thread {
private final AtomicInteger flushCount = new AtomicInteger ( 0 ) ;
private final AtomicInteger flushCount = new AtomicInteger ( 0 ) ;
/** Lock held by flush to avoid interrupting a write. */
private final Object writeLock ;
/ * *
/ * *
* Create a thread to copy data from an input stream to an output stream .
* Create a thread to copy data from an input stream to an output stream .
*
*
@ -75,6 +78,7 @@ public class StreamCopyThread extends Thread {
setName ( Thread . currentThread ( ) . getName ( ) + "-StreamCopy" ) ; //$NON-NLS-1$
setName ( Thread . currentThread ( ) . getName ( ) + "-StreamCopy" ) ; //$NON-NLS-1$
src = i ;
src = i ;
dst = o ;
dst = o ;
writeLock = new Object ( ) ;
}
}
/ * *
/ * *
@ -86,7 +90,9 @@ public class StreamCopyThread extends Thread {
* /
* /
public void flush ( ) {
public void flush ( ) {
flushCount . incrementAndGet ( ) ;
flushCount . incrementAndGet ( ) ;
interrupt ( ) ;
synchronized ( writeLock ) {
interrupt ( ) ;
}
}
}
/ * *
/ * *
@ -118,12 +124,12 @@ public class StreamCopyThread extends Thread {
for ( ; ; ) {
for ( ; ; ) {
try {
try {
if ( readInterrupted ) {
if ( readInterrupted ) {
try {
synchronized ( writeLock ) {
boolean interruptedAgain = Thread . interrupted ( ) ;
dst . flush ( ) ;
dst . flush ( ) ;
} catch ( InterruptedIOException e ) {
if ( interruptedAgain ) {
// There was a new flush() call during flush previous bytes
interrupt ( ) ;
// need continue read/write/flush for the new bytes
}
continue ;
}
}
readInterrupted = false ;
readInterrupted = false ;
if ( ! flushCount . compareAndSet ( flushCountBeforeRead , 0 ) ) {
if ( ! flushCount . compareAndSet ( flushCountBeforeRead , 0 ) ) {
@ -148,20 +154,26 @@ public class StreamCopyThread extends Thread {
if ( n < 0 )
if ( n < 0 )
break ;
break ;
boolean writeInterrupted = false ;
synchronized ( writeLock ) {
for ( ; ; ) {
if ( isInterrupted ( ) ) {
try {
dst . write ( buf , 0 , n ) ;
} catch ( InterruptedIOException wakey ) {
writeInterrupted = true ;
continue ;
continue ;
}
}
// set interrupt status, which will be checked
boolean writeInterrupted = false ;
// when we block in src.read
for ( ; ; ) {
if ( writeInterrupted | | flushCount . get ( ) > 0 )
try {
interrupt ( ) ;
dst . write ( buf , 0 , n ) ;
break ;
} catch ( InterruptedIOException wakey ) {
writeInterrupted = true ;
continue ;
}
// set interrupt status, which will be checked
// when we block in src.read
if ( writeInterrupted | | flushCount . get ( ) > 0 )
interrupt ( ) ;
break ;
}
}
}
} catch ( IOException e ) {
} catch ( IOException e ) {
break ;
break ;