@ -75,6 +75,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors ;
import java.util.concurrent.Executors ;
import java.util.concurrent.Future ;
import java.util.concurrent.Future ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.TimeUnit ;
import java.util.zip.CRC32 ;
import java.util.zip.CheckedOutputStream ;
import java.util.zip.Deflater ;
import java.util.zip.Deflater ;
import java.util.zip.DeflaterOutputStream ;
import java.util.zip.DeflaterOutputStream ;
@ -275,12 +277,16 @@ public class PackWriter {
private boolean canBuildBitmaps ;
private boolean canBuildBitmaps ;
private boolean indexDisabled ;
private int depth ;
private int depth ;
private Collection < ? extends ObjectId > unshallowObjects ;
private Collection < ? extends ObjectId > unshallowObjects ;
private PackBitmapIndexBuilder writeBitmaps ;
private PackBitmapIndexBuilder writeBitmaps ;
private CRC32 crc32 ;
/ * *
/ * *
* Create writer for specified repository .
* Create writer for specified repository .
* < p >
* < p >
@ -471,6 +477,19 @@ public class PackWriter {
this . useBitmaps = useBitmaps ;
this . useBitmaps = useBitmaps ;
}
}
/** @return true if the index file cannot be created by this PackWriter. */
public boolean isIndexDisabled ( ) {
return indexDisabled | | ! cachedPacks . isEmpty ( ) ;
}
/ * *
* @param noIndex
* true to disable creation of the index file .
* /
public void setIndexDisabled ( boolean noIndex ) {
this . indexDisabled = noIndex ;
}
/ * *
/ * *
* @return true to ignore objects that are uninteresting and also not found
* @return true to ignore objects that are uninteresting and also not found
* on local disk ; false to throw a { @link MissingObjectException }
* on local disk ; false to throw a { @link MissingObjectException }
@ -855,7 +874,7 @@ public class PackWriter {
* the index data could not be written to the supplied stream .
* the index data could not be written to the supplied stream .
* /
* /
public void writeIndex ( final OutputStream indexStream ) throws IOException {
public void writeIndex ( final OutputStream indexStream ) throws IOException {
if ( ! cachedPacks . isEmpty ( ) )
if ( isIndexDisabled ( ) )
throw new IOException ( JGitText . get ( ) . cachedPacksPreventsIndexCreation ) ;
throw new IOException ( JGitText . get ( ) . cachedPacksPreventsIndexCreation ) ;
long writeStart = System . currentTimeMillis ( ) ;
long writeStart = System . currentTimeMillis ( ) ;
@ -996,8 +1015,13 @@ public class PackWriter {
if ( config . isDeltaCompress ( ) )
if ( config . isDeltaCompress ( ) )
searchForDeltas ( compressMonitor ) ;
searchForDeltas ( compressMonitor ) ;
final PackOutputStream out = new PackOutputStream ( writeMonitor ,
crc32 = new CRC32 ( ) ;
packStream , this ) ;
final PackOutputStream out = new PackOutputStream (
writeMonitor ,
isIndexDisabled ( )
? packStream
: new CheckedOutputStream ( packStream , crc32 ) ,
this ) ;
long objCnt = getObjectCount ( ) ;
long objCnt = getObjectCount ( ) ;
stats . totalObjects = objCnt ;
stats . totalObjects = objCnt ;
@ -1305,68 +1329,57 @@ public class PackWriter {
threads = Runtime . getRuntime ( ) . availableProcessors ( ) ;
threads = Runtime . getRuntime ( ) . availableProcessors ( ) ;
if ( threads < = 1 | | cnt < = 2 * config . getDeltaSearchWindowSize ( ) ) {
if ( threads < = 1 | | cnt < = 2 * config . getDeltaSearchWindowSize ( ) ) {
DeltaCache dc = new DeltaCache ( config ) ;
new DeltaWindow ( config , new DeltaCache ( config ) , reader , monitor ,
DeltaWindow dw = new DeltaWindow ( config , dc , reader ) ;
list , 0 , cnt ) . search ( ) ;
dw . search ( monitor , list , 0 , cnt ) ;
return ;
return ;
}
}
final DeltaCache dc = new ThreadSafeDeltaCache ( config ) ;
final DeltaCache dc = new ThreadSafeDeltaCache ( config ) ;
final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor ( monitor ) ;
final ThreadSafeProgressMonitor pm = new ThreadSafeProgressMonitor ( monitor ) ;
// Guess at the size of batch we want. Because we don't really
int estSize = cnt / threads ;
// have a way for a thread to steal work from another thread if
if ( estSize < config . getDeltaSearchWindowSize ( ) )
// it ends early, we over partition slightly so the work units
estSize = config . getDeltaSearchWindowSize ( ) ;
// are a bit smaller.
//
int estSize = cnt / ( threads * 2 ) ;
if ( estSize < 2 * config . getDeltaSearchWindowSize ( ) )
estSize = 2 * config . getDeltaSearchWindowSize ( ) ;
final List < DeltaTask > myTasks = new ArrayList < DeltaTask > ( threads * 2 ) ;
DeltaTask . Block taskBlock = new DeltaTask . Block ( threads , config ,
reader , dc , pm ,
list , 0 , cnt ) ;
for ( int i = 0 ; i < cnt ; ) {
for ( int i = 0 ; i < cnt ; ) {
final int start = i ;
final int start = i ;
final int batchSiz e;
int end ;
if ( cnt - i < estSize ) {
if ( cnt - i < estSize ) {
// If we don't have enough to fill the remaining block,
// If we don't have enough to fill the remaining block,
// schedule what is left over as a single block.
// schedule what is left over as a single block.
//
end = cnt ;
batchSize = cnt - i ;
} else {
} else {
// Try to split the block at the end of a path.
// Try to split the block at the end of a path.
//
end = start + estSize ;
int end = start + estSize ;
int h = list [ end - 1 ] . getPathHash ( ) ;
while ( end < cnt ) {
while ( end < cnt ) {
ObjectToPack a = list [ end - 1 ] ;
if ( h = = list [ end ] . getPathHash ( ) )
ObjectToPack b = list [ end ] ;
if ( a . getPathHash ( ) = = b . getPathHash ( ) )
end + + ;
end + + ;
else
else
break ;
break ;
}
}
batchSize = end - start ;
}
}
i + = batchSiz e;
i = end ;
myT asks. add ( new DeltaTask ( config , reader , dc , pm , batchSize , start , list ) ) ;
taskBlock . t asks. add ( new DeltaTask ( taskBlock , start , end ) ) ;
}
}
pm . startWorkers ( myT asks. size ( ) ) ;
pm . startWorkers ( taskBlock . t asks. size ( ) ) ;
final Executor executor = config . getExecutor ( ) ;
final Executor executor = config . getExecutor ( ) ;
final List < Throwable > errors = Collections
final List < Throwable > errors = Collections
. synchronizedList ( new ArrayList < Throwable > ( ) ) ;
. synchronizedList ( new ArrayList < Throwable > ( ) ) ;
if ( executor instanceof ExecutorService ) {
if ( executor instanceof ExecutorService ) {
// Caller supplied us a service, use it directly.
// Caller supplied us a service, use it directly.
//
runTasks ( ( ExecutorService ) executor , pm , taskBlock , errors ) ;
runTasks ( ( ExecutorService ) executor , pm , myTasks , errors ) ;
} else if ( executor = = null ) {
} else if ( executor = = null ) {
// Caller didn't give us a way to run the tasks, spawn up a
// Caller didn't give us a way to run the tasks, spawn up a
// temporary thread pool and make sure it tears down cleanly.
// temporary thread pool and make sure it tears down cleanly.
//
ExecutorService pool = Executors . newFixedThreadPool ( threads ) ;
ExecutorService pool = Executors . newFixedThreadPool ( threads ) ;
try {
try {
runTasks ( pool , pm , myTasks , errors ) ;
runTasks ( pool , pm , taskBlock , errors ) ;
} finally {
} finally {
pool . shutdown ( ) ;
pool . shutdown ( ) ;
for ( ; ; ) {
for ( ; ; ) {
@ -1383,8 +1396,7 @@ public class PackWriter {
// The caller gave us an executor, but it might not do
// The caller gave us an executor, but it might not do
// asynchronous execution. Wrap everything and hope it
// asynchronous execution. Wrap everything and hope it
// can schedule these for us.
// can schedule these for us.
//
for ( final DeltaTask task : taskBlock . tasks ) {
for ( final DeltaTask task : myTasks ) {
executor . execute ( new Runnable ( ) {
executor . execute ( new Runnable ( ) {
public void run ( ) {
public void run ( ) {
try {
try {
@ -1426,9 +1438,9 @@ public class PackWriter {
private static void runTasks ( ExecutorService pool ,
private static void runTasks ( ExecutorService pool ,
ThreadSafeProgressMonitor pm ,
ThreadSafeProgressMonitor pm ,
List < DeltaTask > tasks , List < Throwable > errors ) throws IOException {
DeltaTask . Block tb , List < Throwable > errors ) throws IOException {
List < Future < ? > > futures = new ArrayList < Future < ? > > ( tasks . size ( ) ) ;
List < Future < ? > > futures = new ArrayList < Future < ? > > ( tb . t asks . size ( ) ) ;
for ( DeltaTask task : tasks )
for ( DeltaTask task : tb . t asks )
futures . add ( pool . submit ( task ) ) ;
futures . add ( pool . submit ( task ) ) ;
try {
try {
@ -1496,12 +1508,12 @@ public class PackWriter {
if ( otp . isWritten ( ) )
if ( otp . isWritten ( ) )
return ; // Delta chain cycle caused this to write already.
return ; // Delta chain cycle caused this to write already.
out . resetCRC32 ( ) ;
crc32 . reset ( ) ;
otp . setOffset ( out . length ( ) ) ;
otp . setOffset ( out . length ( ) ) ;
try {
try {
reuseSupport . copyObjectAsIs ( out , otp , reuseValidate ) ;
reuseSupport . copyObjectAsIs ( out , otp , reuseValidate ) ;
out . endObject ( ) ;
out . endObject ( ) ;
otp . setCRC ( out . getCRC32 ( ) ) ;
otp . setCRC ( ( int ) crc32 . getValue ( ) ) ;
typeStats . reusedObjects + + ;
typeStats . reusedObjects + + ;
if ( otp . isDeltaRepresentation ( ) ) {
if ( otp . isDeltaRepresentation ( ) ) {
typeStats . reusedDeltas + + ;
typeStats . reusedDeltas + + ;
@ -1535,7 +1547,7 @@ public class PackWriter {
else
else
writeWholeObjectDeflate ( out , otp ) ;
writeWholeObjectDeflate ( out , otp ) ;
out . endObject ( ) ;
out . endObject ( ) ;
otp . setCRC ( out . getCRC32 ( ) ) ;
otp . setCRC ( ( int ) crc32 . getValue ( ) ) ;
}
}
private void writeBase ( PackOutputStream out , ObjectToPack base )
private void writeBase ( PackOutputStream out , ObjectToPack base )
@ -1549,7 +1561,7 @@ public class PackWriter {
final Deflater deflater = deflater ( ) ;
final Deflater deflater = deflater ( ) ;
final ObjectLoader ldr = reader . open ( otp , otp . getType ( ) ) ;
final ObjectLoader ldr = reader . open ( otp , otp . getType ( ) ) ;
out . resetCRC32 ( ) ;
crc32 . reset ( ) ;
otp . setOffset ( out . length ( ) ) ;
otp . setOffset ( out . length ( ) ) ;
out . writeHeader ( otp , ldr . getSize ( ) ) ;
out . writeHeader ( otp , ldr . getSize ( ) ) ;
@ -1563,7 +1575,7 @@ public class PackWriter {
final ObjectToPack otp ) throws IOException {
final ObjectToPack otp ) throws IOException {
writeBase ( out , otp . getDeltaBase ( ) ) ;
writeBase ( out , otp . getDeltaBase ( ) ) ;
out . resetCRC32 ( ) ;
crc32 . reset ( ) ;
otp . setOffset ( out . length ( ) ) ;
otp . setOffset ( out . length ( ) ) ;
DeltaCache . Ref ref = otp . popCachedDelta ( ) ;
DeltaCache . Ref ref = otp . popCachedDelta ( ) ;