Browse Source

IndexPack: Use streaming for large whole blobs

When indexing large blobs that are stored whole (non-delta form),
avoid allocating the entire blob in memory and instead stream it
through the SHA-1 checksum computation.  This reduces the size
of memory required by IndexPack when processing very big blobs,
such as a 500 MiB uncompressable binary.

If the large blob already exists in the local repository, its
contents needs to be compared byte-for-byte after the entire pack
has been indexed, to ensure there isn't an unexpected SHA-1 collision
which may result in later data corruption.  This compare is performed
as a streaming compare, again avoiding the large object allocation.

This change doesn't improve on memory utilization for large objects
stored as deltas.  The change also doesn't improve handling for
any large commits, trees or annotated tags.  There isn't much to
be done here for those objects, because they need to be passed down
to the ObjectChecker as a byte[].  Fortunately it isn't common for
these object types to be that large,

Bug: 312868
Change-Id: I862afd4cb78013ee033d4ec68c067b1774a05be8
Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
Signed-off-by: Chris Aniszczyk <caniszczyk@gmail.com>
CC: Roberto Tyley <roberto.tyley@guardian.co.uk>
stable-0.10
Shawn O. Pearce 14 years ago committed by Chris Aniszczyk
parent
commit
c181e1ab8a
  1. 20
      org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/IndexPackTest.java
  2. 93
      org.eclipse.jgit/src/org/eclipse/jgit/transport/IndexPack.java

20
org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/IndexPackTest.java

@ -58,6 +58,7 @@ import org.eclipse.jgit.junit.TestRepository;
import org.eclipse.jgit.lib.Constants; import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.NullProgressMonitor; import org.eclipse.jgit.lib.NullProgressMonitor;
import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.lib.RepositoryTestCase; import org.eclipse.jgit.lib.RepositoryTestCase;
import org.eclipse.jgit.lib.TextProgressMonitor; import org.eclipse.jgit.lib.TextProgressMonitor;
import org.eclipse.jgit.revwalk.RevBlob; import org.eclipse.jgit.revwalk.RevBlob;
@ -151,6 +152,25 @@ public class IndexPackTest extends RepositoryTestCase {
ip.renameAndOpenPack(); ip.renameAndOpenPack();
} }
public void testPackWithDuplicateBlob() throws Exception {
final byte[] data = Constants.encode("0123456789abcdefg");
TestRepository<Repository> d = new TestRepository<Repository>(db);
assertTrue(db.hasObject(d.blob(data)));
TemporaryBuffer.Heap pack = new TemporaryBuffer.Heap(1024);
packHeader(pack, 1);
pack.write((Constants.OBJ_BLOB) << 4 | 0x80 | 1);
pack.write(1);
deflate(pack, data);
digest(pack);
final byte[] raw = pack.toByteArray();
IndexPack ip = IndexPack.create(db, new ByteArrayInputStream(raw));
ip.setStreamFileThreshold(1);
ip.index(NullProgressMonitor.INSTANCE);
ip.renameAndOpenPack();
}
private void packHeader(TemporaryBuffer.Heap tinyPack, int cnt) private void packHeader(TemporaryBuffer.Heap tinyPack, int cnt)
throws IOException { throws IOException {
final byte[] hdr = new byte[8]; final byte[] hdr = new byte[8];

93
org.eclipse.jgit/src/org/eclipse/jgit/transport/IndexPack.java

@ -75,6 +75,7 @@ import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectIdSubclassMap; import org.eclipse.jgit.lib.ObjectIdSubclassMap;
import org.eclipse.jgit.lib.ObjectLoader; import org.eclipse.jgit.lib.ObjectLoader;
import org.eclipse.jgit.lib.ObjectReader; import org.eclipse.jgit.lib.ObjectReader;
import org.eclipse.jgit.lib.ObjectStream;
import org.eclipse.jgit.lib.ProgressMonitor; import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.lib.Repository; import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.storage.file.PackIndexWriter; import org.eclipse.jgit.storage.file.PackIndexWriter;
@ -144,6 +145,8 @@ public class IndexPack {
private final Repository repo; private final Repository repo;
private int streamFileThreshold;
/** /**
* Object database used for loading existing objects * Object database used for loading existing objects
*/ */
@ -151,6 +154,8 @@ public class IndexPack {
private InflaterStream inflater; private InflaterStream inflater;
private byte[] readBuffer;
private final MessageDigest objectDigest; private final MessageDigest objectDigest;
private final MutableObjectId tempObjectId; private final MutableObjectId tempObjectId;
@ -211,6 +216,9 @@ public class IndexPack {
private LongMap<UnresolvedDelta> baseByPos; private LongMap<UnresolvedDelta> baseByPos;
/** Blobs whose contents need to be double-checked after indexing. */
private List<PackedObjectInfo> deferredCheckBlobs;
private MessageDigest packDigest; private MessageDigest packDigest;
private RandomAccessFile packOut; private RandomAccessFile packOut;
@ -236,11 +244,13 @@ public class IndexPack {
public IndexPack(final Repository db, final InputStream src, public IndexPack(final Repository db, final InputStream src,
final File dstBase) throws IOException { final File dstBase) throws IOException {
repo = db; repo = db;
streamFileThreshold = 5 * (1 << 20); // A reasonable default for now.
objectDatabase = db.getObjectDatabase().newCachedDatabase(); objectDatabase = db.getObjectDatabase().newCachedDatabase();
in = src; in = src;
inflater = new InflaterStream(); inflater = new InflaterStream();
readCurs = objectDatabase.newReader(); readCurs = objectDatabase.newReader();
buf = new byte[BUFFER_SIZE]; buf = new byte[BUFFER_SIZE];
readBuffer = new byte[BUFFER_SIZE];
objectDigest = Constants.newMessageDigest(); objectDigest = Constants.newMessageDigest();
tempObjectId = new MutableObjectId(); tempObjectId = new MutableObjectId();
packDigest = Constants.newMessageDigest(); packDigest = Constants.newMessageDigest();
@ -258,6 +268,10 @@ public class IndexPack {
} }
} }
void setStreamFileThreshold(int sz) {
streamFileThreshold = sz;
}
/** /**
* Set the pack index file format version this instance will create. * Set the pack index file format version this instance will create.
* *
@ -396,6 +410,7 @@ public class IndexPack {
entries = new PackedObjectInfo[(int) objectCount]; entries = new PackedObjectInfo[(int) objectCount];
baseById = new ObjectIdSubclassMap<DeltaChain>(); baseById = new ObjectIdSubclassMap<DeltaChain>();
baseByPos = new LongMap<UnresolvedDelta>(); baseByPos = new LongMap<UnresolvedDelta>();
deferredCheckBlobs = new ArrayList<PackedObjectInfo>();
progress.beginTask(JGitText.get().receivingObjects, progress.beginTask(JGitText.get().receivingObjects,
(int) objectCount); (int) objectCount);
@ -407,6 +422,8 @@ public class IndexPack {
} }
readPackFooter(); readPackFooter();
endInput(); endInput();
if (!deferredCheckBlobs.isEmpty())
doDeferredCheckBlobs();
progress.endTask(); progress.endTask();
if (deltaCount > 0) { if (deltaCount > 0) {
if (packOut == null) if (packOut == null)
@ -837,17 +854,38 @@ public class IndexPack {
private void whole(final int type, final long pos, final long sz) private void whole(final int type, final long pos, final long sz)
throws IOException { throws IOException {
final byte[] data = inflateAndReturn(Source.INPUT, sz);
objectDigest.update(Constants.encodedTypeString(type)); objectDigest.update(Constants.encodedTypeString(type));
objectDigest.update((byte) ' '); objectDigest.update((byte) ' ');
objectDigest.update(Constants.encodeASCII(sz)); objectDigest.update(Constants.encodeASCII(sz));
objectDigest.update((byte) 0); objectDigest.update((byte) 0);
objectDigest.update(data);
tempObjectId.fromRaw(objectDigest.digest(), 0);
verifySafeObject(tempObjectId, type, data); boolean checkContentLater = false;
if (type == Constants.OBJ_BLOB && sz >= streamFileThreshold) {
InputStream inf = inflate(Source.INPUT, sz);
long cnt = 0;
while (cnt < sz) {
int r = inf.read(readBuffer);
if (r <= 0)
break;
objectDigest.update(readBuffer, 0, r);
cnt += r;
}
inf.close();
tempObjectId.fromRaw(objectDigest.digest(), 0);
checkContentLater = readCurs.has(tempObjectId);
} else {
final byte[] data = inflateAndReturn(Source.INPUT, sz);
objectDigest.update(data);
tempObjectId.fromRaw(objectDigest.digest(), 0);
verifySafeObject(tempObjectId, type, data);
}
final int crc32 = (int) crc.getValue(); final int crc32 = (int) crc.getValue();
addObjectAndTrack(new PackedObjectInfo(pos, crc32, tempObjectId)); PackedObjectInfo obj = new PackedObjectInfo(pos, crc32, tempObjectId);
addObjectAndTrack(obj);
if (checkContentLater)
deferredCheckBlobs.add(obj);
} }
private void verifySafeObject(final AnyObjectId id, final int type, private void verifySafeObject(final AnyObjectId id, final int type,
@ -863,7 +901,7 @@ public class IndexPack {
try { try {
final ObjectLoader ldr = readCurs.open(id, type); final ObjectLoader ldr = readCurs.open(id, type);
final byte[] existingData = ldr.getCachedBytes(Integer.MAX_VALUE); final byte[] existingData = ldr.getCachedBytes(data.length);
if (!Arrays.equals(data, existingData)) { if (!Arrays.equals(data, existingData)) {
throw new IOException(MessageFormat.format(JGitText.get().collisionOn, id.name())); throw new IOException(MessageFormat.format(JGitText.get().collisionOn, id.name()));
} }
@ -874,6 +912,49 @@ public class IndexPack {
} }
} }
private void doDeferredCheckBlobs() throws IOException {
final byte[] curBuffer = new byte[readBuffer.length];
for (PackedObjectInfo obj : deferredCheckBlobs) {
position(obj.getOffset());
int c = readFrom(Source.FILE);
final int type = (c >> 4) & 7;
long sz = c & 15;
int shift = 4;
while ((c & 0x80) != 0) {
c = readFrom(Source.FILE);
sz += (c & 0x7f) << shift;
shift += 7;
}
if (type != Constants.OBJ_BLOB)
throw new IOException(MessageFormat.format(
JGitText.get().unknownObjectType, type));
ObjectStream cur = readCurs.open(obj, type).openStream();
try {
if (cur.getSize() != sz)
throw new IOException(MessageFormat.format(
JGitText.get().collisionOn, obj.name()));
InputStream pck = inflate(Source.FILE, sz);
while (0 < sz) {
int n = (int) Math.min(readBuffer.length, sz);
IO.readFully(cur, curBuffer, 0, n);
IO.readFully(pck, readBuffer, 0, n);
for (int i = 0; i < n; i++) {
if (curBuffer[i] != readBuffer[i])
throw new IOException(MessageFormat.format(JGitText
.get().collisionOn, obj.name()));
}
sz -= n;
}
pck.close();
} finally {
cur.close();
}
}
}
// Current position of {@link #bOffset} within the entire file. // Current position of {@link #bOffset} within the entire file.
private long position() { private long position() {
return bBase + bOffset; return bBase + bOffset;

Loading…
Cancel
Save