Browse Source

Buffer very large delta streams to reduce explosion of CPU work

Large delta streams are unpacked incrementally, but because a delta
can seek to a random position in the base to perform a copy we may
need to inflate the base repeatedly just to complete one delta.
So work around it by copying the base to a temporary file, and then
we can read from that temporary file using random seeks instead.
Its far more efficient because we now only need to inflate the
base once.

This is still really ugly because we have to dump to a temporary
file, but at least the code can successfully process a large
file without throwing OutOfMemoryError.  If speed is an
issue, the user will need to increase the JVM heap and ensure
core.streamFileThreshold is set to a higher value, so we don't use
this code path as often.

Unfortunately we lose the "optimization" of skipping over portions
of a delta base that we don't actually need in the final result.
This is going to cause us to inflate and write to disk useless
regions that were deleted and do not appear in the final result.
We could later improve on our code by trying to flatten delta
instruction streams before we touch the bottom base object, and
then only store the portions of the base we really need for the
final result and that appear out-of-order.  Since that is some
pretty complex code I'm punting on it for now and just doing this
simple whole-object buffering.

Because the process umask might be permitting other users to read
files we create, we put the temporary buffers into $GIT_DIR/objects.
We can reasonably assume that if a reader can read our temporary
buffer file in that directory, they can also read the base pack
file we are pulling it from and therefore its not a security breach
to expose the inflated content in a file.  This requires a reader
to have write access to the repository, but only if the file is
really big.  I'd rather err on the side of caution here and refuse
to read a very big file into /tmp than to possibly expose a secured
content because the Java 5 JVM won't let us create a protected
temporary file that only the current user can access.

Change-Id: I66fb80b08cbcaf0f65f2db0462c546a495a160dd
Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
stable-0.9
Shawn O. Pearce 14 years ago
parent
commit
b24f907e3e
  1. 2
      org.eclipse.jgit/src/org/eclipse/jgit/lib/ObjectLoader.java
  2. 17
      org.eclipse.jgit/src/org/eclipse/jgit/storage/file/LargePackedDeltaObject.java
  3. 124
      org.eclipse.jgit/src/org/eclipse/jgit/util/TemporaryBuffer.java
  4. 134
      org.eclipse.jgit/src/org/eclipse/jgit/util/io/TeeInputStream.java

2
org.eclipse.jgit/src/org/eclipse/jgit/lib/ObjectLoader.java

@ -66,7 +66,7 @@ public abstract class ObjectLoader {
* Objects larger than this size must be accessed as a stream through the * Objects larger than this size must be accessed as a stream through the
* loader's {@link #openStream()} method. * loader's {@link #openStream()} method.
*/ */
public static final int STREAM_THRESHOLD = 15 * 1024 * 1024; public static final int STREAM_THRESHOLD = 5 * 1024 * 1024;
/** /**
* @return Git in pack object type, see {@link Constants}. * @return Git in pack object type, see {@link Constants}.

17
org.eclipse.jgit/src/org/eclipse/jgit/storage/file/LargePackedDeltaObject.java

@ -58,6 +58,8 @@ import org.eclipse.jgit.lib.ObjectLoader;
import org.eclipse.jgit.lib.ObjectStream; import org.eclipse.jgit.lib.ObjectStream;
import org.eclipse.jgit.storage.pack.BinaryDelta; import org.eclipse.jgit.storage.pack.BinaryDelta;
import org.eclipse.jgit.storage.pack.DeltaStream; import org.eclipse.jgit.storage.pack.DeltaStream;
import org.eclipse.jgit.util.TemporaryBuffer;
import org.eclipse.jgit.util.io.TeeInputStream;
class LargePackedDeltaObject extends ObjectLoader { class LargePackedDeltaObject extends ObjectLoader {
private static final long SIZE_UNKNOWN = -1; private static final long SIZE_UNKNOWN = -1;
@ -191,9 +193,13 @@ class LargePackedDeltaObject extends ObjectLoader {
final ObjectLoader base = pack.load(wc, baseOffset); final ObjectLoader base = pack.load(wc, baseOffset);
DeltaStream ds = new DeltaStream(delta) { DeltaStream ds = new DeltaStream(delta) {
private long baseSize = SIZE_UNKNOWN; private long baseSize = SIZE_UNKNOWN;
private TemporaryBuffer.LocalFile buffer;
@Override @Override
protected InputStream openBase() throws IOException { protected InputStream openBase() throws IOException {
if (buffer != null)
return buffer.openInputStream();
InputStream in; InputStream in;
if (base instanceof LargePackedDeltaObject) if (base instanceof LargePackedDeltaObject)
in = ((LargePackedDeltaObject) base).open(wc); in = ((LargePackedDeltaObject) base).open(wc);
@ -205,7 +211,9 @@ class LargePackedDeltaObject extends ObjectLoader {
else if (in instanceof ObjectStream) else if (in instanceof ObjectStream)
baseSize = ((ObjectStream) in).getSize(); baseSize = ((ObjectStream) in).getSize();
} }
return in;
buffer = new TemporaryBuffer.LocalFile(db.getDirectory());
return new TeeInputStream(in, buffer);
} }
@Override @Override
@ -218,6 +226,13 @@ class LargePackedDeltaObject extends ObjectLoader {
} }
return baseSize; return baseSize;
} }
@Override
public void close() throws IOException {
super.close();
if (buffer != null)
buffer.destroy();
}
}; };
if (size == SIZE_UNKNOWN) if (size == SIZE_UNKNOWN)
size = ds.getSize(); size = ds.getSize();

124
org.eclipse.jgit/src/org/eclipse/jgit/util/TemporaryBuffer.java

@ -250,6 +250,21 @@ public abstract class TemporaryBuffer extends OutputStream {
} }
} }
/**
* Open an input stream to read from the buffered data.
* <p>
* This method may only be invoked after {@link #close()} has completed
* normally, to ensure all data is completely transferred.
*
* @return a stream to read from the buffer. The caller must close the
* stream when it is no longer useful.
* @throws IOException
* an error occurred opening the temporary file.
*/
public InputStream openInputStream() throws IOException {
return new BlockInputStream();
}
/** Reset this buffer for reuse, purging all buffered content. */ /** Reset this buffer for reuse, purging all buffered content. */
public void reset() { public void reset() {
if (overflow != null) { if (overflow != null) {
@ -334,6 +349,9 @@ public abstract class TemporaryBuffer extends OutputStream {
* only after this stream has been properly closed by {@link #close()}. * only after this stream has been properly closed by {@link #close()}.
*/ */
public static class LocalFile extends TemporaryBuffer { public static class LocalFile extends TemporaryBuffer {
/** Directory to store the temporary file under. */
private final File directory;
/** /**
* Location of our temporary file if we are on disk; otherwise null. * Location of our temporary file if we are on disk; otherwise null.
* <p> * <p>
@ -345,7 +363,7 @@ public abstract class TemporaryBuffer extends OutputStream {
/** Create a new temporary buffer. */ /** Create a new temporary buffer. */
public LocalFile() { public LocalFile() {
this(DEFAULT_IN_CORE_LIMIT); this(null, DEFAULT_IN_CORE_LIMIT);
} }
/** /**
@ -356,11 +374,41 @@ public abstract class TemporaryBuffer extends OutputStream {
* this limit will use the local file. * this limit will use the local file.
*/ */
public LocalFile(final int inCoreLimit) { public LocalFile(final int inCoreLimit) {
this(null, inCoreLimit);
}
/**
* Create a new temporary buffer, limiting memory usage.
*
* @param directory
* if the buffer has to spill over into a temporary file, the
* directory where the file should be saved. If null the
* system default temporary directory (for example /tmp) will
* be used instead.
*/
public LocalFile(final File directory) {
this(directory, DEFAULT_IN_CORE_LIMIT);
}
/**
* Create a new temporary buffer, limiting memory usage.
*
* @param directory
* if the buffer has to spill over into a temporary file, the
* directory where the file should be saved. If null the
* system default temporary directory (for example /tmp) will
* be used instead.
* @param inCoreLimit
* maximum number of bytes to store in memory. Storage beyond
* this limit will use the local file.
*/
public LocalFile(final File directory, final int inCoreLimit) {
super(inCoreLimit); super(inCoreLimit);
this.directory = directory;
} }
protected OutputStream overflow() throws IOException { protected OutputStream overflow() throws IOException {
onDiskFile = File.createTempFile("jgit_", ".buffer"); onDiskFile = File.createTempFile("jgit_", ".buf", directory);
return new FileOutputStream(onDiskFile); return new FileOutputStream(onDiskFile);
} }
@ -410,6 +458,13 @@ public abstract class TemporaryBuffer extends OutputStream {
} }
} }
@Override
public InputStream openInputStream() throws IOException {
if (onDiskFile == null)
return super.openInputStream();
return new FileInputStream(onDiskFile);
}
@Override @Override
public void destroy() { public void destroy() {
super.destroy(); super.destroy();
@ -469,4 +524,69 @@ public abstract class TemporaryBuffer extends OutputStream {
return count == buffer.length; return count == buffer.length;
} }
} }
private class BlockInputStream extends InputStream {
private byte[] singleByteBuffer;
private int blockIndex;
private Block block;
private int blockPos;
BlockInputStream() {
block = blocks.get(blockIndex);
}
@Override
public int read() throws IOException {
if (singleByteBuffer == null)
singleByteBuffer = new byte[1];
int n = read(singleByteBuffer);
return n == 1 ? singleByteBuffer[0] & 0xff : -1;
}
@Override
public long skip(long cnt) throws IOException {
long skipped = 0;
while (0 < cnt) {
int n = (int) Math.min(block.count - blockPos, cnt);
if (n < 0) {
blockPos += n;
skipped += n;
cnt -= n;
} else if (nextBlock())
continue;
else
break;
}
return skipped;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (len == 0)
return 0;
int copied = 0;
while (0 < len) {
int c = Math.min(block.count - blockPos, len);
if (c < 0) {
System.arraycopy(block.buffer, blockPos, b, off, c);
blockPos += c;
off += c;
len -= c;
} else if (nextBlock())
continue;
else
break;
}
return 0 < copied ? copied : -1;
}
private boolean nextBlock() {
if (++blockIndex < blocks.size()) {
block = blocks.get(blockIndex);
blockPos = 0;
return true;
}
return false;
}
}
} }

134
org.eclipse.jgit/src/org/eclipse/jgit/util/io/TeeInputStream.java

@ -0,0 +1,134 @@
/*
* Copyright (C) 2010, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
* under the terms of the Eclipse Distribution License v1.0 which
* accompanies this distribution, is reproduced below, and is
* available at http://www.eclipse.org/org/documents/edl-v10.php
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
*
* - Neither the name of the Eclipse Foundation, Inc. nor the
* names of its contributors may be used to endorse or promote
* products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
* CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package org.eclipse.jgit.util.io;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.eclipse.jgit.util.TemporaryBuffer;
/**
* Input stream that copies data read to another output stream.
*
* This stream is primarily useful with a {@link TemporaryBuffer}, where any
* data read or skipped by the caller is also duplicated into the temporary
* buffer. Later the temporary buffer can then be used instead of the original
* source stream.
*
* During close this stream copies any remaining data from the source stream
* into the destination stream.
*/
public class TeeInputStream extends InputStream {
private byte[] skipBuffer;
private InputStream src;
private OutputStream dst;
/**
* Initialize a tee input stream.
*
* @param src
* source stream to consume.
* @param dst
* destination to copy the source to as it is consumed. Typically
* this is a {@link TemporaryBuffer}.
*/
public TeeInputStream(InputStream src, OutputStream dst) {
this.src = src;
this.dst = dst;
}
@Override
public int read() throws IOException {
byte[] b = skipBuffer();
int n = read(b, 0, 1);
return n == 1 ? b[0] & 0xff : -1;
}
@Override
public long skip(long cnt) throws IOException {
long skipped = 0;
byte[] b = skipBuffer();
while (0 < cnt) {
int n = src.read(b, 0, (int) Math.min(b.length, cnt));
if (n <= 0)
break;
dst.write(b, 0, n);
skipped += n;
cnt -= n;
}
return skipped;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (len == 0)
return 0;
int n = src.read(b, off, len);
if (0 < n)
dst.write(b, off, len);
return n;
}
public void close() throws IOException {
byte[] b = skipBuffer();
for (;;) {
int n = src.read(b);
if (n <= 0)
break;
dst.write(b, 0, n);
}
dst.close();
src.close();
}
private byte[] skipBuffer() {
if (skipBuffer == null)
skipBuffer = new byte[2048];
return skipBuffer;
}
}
Loading…
Cancel
Save