Browse Source

Reduce multi-level buffered streams in transport code

Some transports actually provide stream buffering on their own,
without needing to be wrapped up inside of a BufferedInputStream in
order to smooth out system calls to read or write.  A great example
of this is the JSch SSH client, or the Apache MINA SSHD server.
Both use custom buffering to packetize the streams into the encrypted
SSH channel, and wrapping them up inside of a BufferedInputStream
or BufferedOutputStream is relatively pointless.

Our SideBandOutputStream implementation also provides some fairly
large buffering, equal to one complete side-band packet on the main
data channel.  Wrapping that inside of a BufferedOutputStream just to
smooth out small writes from PackWriter causes extra data copies, and
provides no advantage.  We can save some memory and some CPU cycles
by letting PackWriter dump directly into the SideBandOutputStream's
internal buffer array.

Instead we push the buffering streams down to be as close to the
network socket (or operating system pipe) as possible.  This allows
us to smooth out the smaller reads/writes from pkt-line messages
during advertisement and negotation, but avoid copying altogether
when the stream switches to larger writes over a side band channel.

Change-Id: I2f6f16caee64783c77d3dd1b2a41b3cc0c64c159
Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
stable-0.7
Shawn O. Pearce 15 years ago
parent
commit
2156aa894c
  1. 8
      org.eclipse.jgit.junit/src/org/eclipse/jgit/junit/TestRepository.java
  2. 10
      org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ConcurrentRepackTest.java
  3. 12
      org.eclipse.jgit/src/org/eclipse/jgit/lib/PackWriter.java
  4. 24
      org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackConnection.java
  5. 1
      org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java
  6. 12
      org.eclipse.jgit/src/org/eclipse/jgit/transport/BundleWriter.java
  7. 20
      org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitAnon.java
  8. 18
      org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportLocal.java
  9. 7
      org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java
  10. 3
      org.eclipse.jgit/src/org/eclipse/jgit/transport/WalkPushConnection.java

8
org.eclipse.jgit.junit/src/org/eclipse/jgit/junit/TestRepository.java

@ -43,9 +43,11 @@
package org.eclipse.jgit.junit; package org.eclipse.jgit.junit;
import java.io.BufferedOutputStream;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -570,10 +572,10 @@ public class TestRepository {
pw.preparePack(all, Collections.<ObjectId> emptySet()); pw.preparePack(all, Collections.<ObjectId> emptySet());
final ObjectId name = pw.computeName(); final ObjectId name = pw.computeName();
FileOutputStream out; OutputStream out;
final File pack = nameFor(odb, name, ".pack"); final File pack = nameFor(odb, name, ".pack");
out = new FileOutputStream(pack); out = new BufferedOutputStream(new FileOutputStream(pack));
try { try {
pw.writePack(out); pw.writePack(out);
} finally { } finally {
@ -582,7 +584,7 @@ public class TestRepository {
pack.setReadOnly(); pack.setReadOnly();
final File idx = nameFor(odb, name, ".idx"); final File idx = nameFor(odb, name, ".idx");
out = new FileOutputStream(idx); out = new BufferedOutputStream(new FileOutputStream(idx));
try { try {
pw.writeIndex(out); pw.writeIndex(out);
} finally { } finally {

10
org.eclipse.jgit.test/tst/org/eclipse/jgit/lib/ConcurrentRepackTest.java

@ -1,5 +1,5 @@
/* /*
* Copyright (C) 2009, Google Inc. * Copyright (C) 2009-2010, Google Inc.
* Copyright (C) 2009, Robin Rosenberg <robin.rosenberg@dewire.com> * Copyright (C) 2009, Robin Rosenberg <robin.rosenberg@dewire.com>
* and other copyright owners as documented in the project's IP log. * and other copyright owners as documented in the project's IP log.
* *
@ -44,9 +44,11 @@
package org.eclipse.jgit.lib; package org.eclipse.jgit.lib;
import java.io.BufferedOutputStream;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays; import java.util.Arrays;
import org.eclipse.jgit.errors.IncorrectObjectTypeException; import org.eclipse.jgit.errors.IncorrectObjectTypeException;
@ -203,16 +205,16 @@ public class ConcurrentRepackTest extends RepositoryTestCase {
private static void write(final File[] files, final PackWriter pw) private static void write(final File[] files, final PackWriter pw)
throws IOException { throws IOException {
final long begin = files[0].getParentFile().lastModified(); final long begin = files[0].getParentFile().lastModified();
FileOutputStream out; OutputStream out;
out = new FileOutputStream(files[0]); out = new BufferedOutputStream(new FileOutputStream(files[0]));
try { try {
pw.writePack(out); pw.writePack(out);
} finally { } finally {
out.close(); out.close();
} }
out = new FileOutputStream(files[1]); out = new BufferedOutputStream(new FileOutputStream(files[1]));
try { try {
pw.writeIndex(out); pw.writeIndex(out);
} finally { } finally {

12
org.eclipse.jgit/src/org/eclipse/jgit/lib/PackWriter.java

@ -1,5 +1,5 @@
/* /*
* Copyright (C) 2008-2009, Google Inc. * Copyright (C) 2008-2010, Google Inc.
* Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com> * Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com>
* and other copyright owners as documented in the project's IP log. * and other copyright owners as documented in the project's IP log.
* *
@ -44,7 +44,6 @@
package org.eclipse.jgit.lib; package org.eclipse.jgit.lib;
import java.io.BufferedOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.security.MessageDigest; import java.security.MessageDigest;
@ -97,7 +96,6 @@ import org.eclipse.jgit.util.NB;
* undefined behavior. * undefined behavior.
* </p> * </p>
*/ */
public class PackWriter { public class PackWriter {
/** /**
* Title of {@link ProgressMonitor} task used during counting objects to * Title of {@link ProgressMonitor} task used during counting objects to
@ -578,9 +576,8 @@ public class PackWriter {
* </p> * </p>
* *
* @param packStream * @param packStream
* output stream of pack data. If the stream is not buffered it * output stream of pack data. The stream should be buffered by
* will be buffered by the writer. Caller is responsible for * the caller. The caller is responsible for closing the stream.
* closing the stream.
* @throws IOException * @throws IOException
* an error occurred reading a local object's data to include in * an error occurred reading a local object's data to include in
* the pack, or writing compressed object data to the output * the pack, or writing compressed object data to the output
@ -590,8 +587,6 @@ public class PackWriter {
if (reuseDeltas || reuseObjects) if (reuseDeltas || reuseObjects)
searchForReuse(); searchForReuse();
if (!(packStream instanceof BufferedOutputStream))
packStream = new BufferedOutputStream(packStream);
out = new PackOutputStream(packStream); out = new PackOutputStream(packStream);
writeMonitor.beginTask(WRITING_OBJECTS_PROGRESS, getObjectsNumber()); writeMonitor.beginTask(WRITING_OBJECTS_PROGRESS, getObjectsNumber());
@ -599,7 +594,6 @@ public class PackWriter {
writeObjects(); writeObjects();
writeChecksum(); writeChecksum();
out.flush();
windowCursor.release(); windowCursor.release();
writeMonitor.endTask(); writeMonitor.endTask();
} }

24
org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackConnection.java

@ -1,5 +1,4 @@
/* /*
* Copyright (C) 2009, Constantine Plotnikov <constantine.plotnikov@gmail.com>
* Copyright (C) 2008-2010, Google Inc. * Copyright (C) 2008-2010, Google Inc.
* Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com> * Copyright (C) 2008, Marek Zawirski <marek.zawirski@gmail.com>
* Copyright (C) 2008, Robin Rosenberg <robin.rosenberg@dewire.com> * Copyright (C) 2008, Robin Rosenberg <robin.rosenberg@dewire.com>
@ -47,8 +46,6 @@
package org.eclipse.jgit.transport; package org.eclipse.jgit.transport;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -97,10 +94,10 @@ abstract class BasePackConnection extends BaseConnection {
/** Timer to manage {@link #timeoutIn} and {@link #timeoutOut}. */ /** Timer to manage {@link #timeoutIn} and {@link #timeoutOut}. */
private InterruptTimer myTimer; private InterruptTimer myTimer;
/** Buffered input stream reading from the remote. */ /** Input stream reading from the remote. */
protected InputStream in; protected InputStream in;
/** Buffered output stream sending to the remote. */ /** Output stream sending to the remote. */
protected OutputStream out; protected OutputStream out;
/** Packet line decoder around {@link #in}. */ /** Packet line decoder around {@link #in}. */
@ -127,6 +124,17 @@ abstract class BasePackConnection extends BaseConnection {
uri = transport.uri; uri = transport.uri;
} }
/**
* Configure this connection with the directional pipes.
*
* @param myIn
* input stream to receive data from the peer. Caller must ensure
* the input is buffered, otherwise read performance may suffer.
* @param myOut
* output stream to transmit data to the peer. Caller must ensure
* the output is buffered, otherwise write performance may
* suffer.
*/
protected final void init(InputStream myIn, OutputStream myOut) { protected final void init(InputStream myIn, OutputStream myOut) {
final int timeout = transport.getTimeout(); final int timeout = transport.getTimeout();
if (timeout > 0) { if (timeout > 0) {
@ -140,10 +148,8 @@ abstract class BasePackConnection extends BaseConnection {
myOut = timeoutOut; myOut = timeoutOut;
} }
in = myIn instanceof BufferedInputStream ? myIn in = myIn;
: new BufferedInputStream(myIn, IndexPack.BUFFER_SIZE); out = myOut;
out = myOut instanceof BufferedOutputStream ? myOut
: new BufferedOutputStream(myOut);
pckIn = new PacketLineIn(in); pckIn = new PacketLineIn(in);
pckOut = new PacketLineOut(out); pckOut = new PacketLineOut(out);

1
org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackPushConnection.java

@ -244,6 +244,7 @@ class BasePackPushConnection extends BasePackConnection implements
writer.preparePack(newObjects, remoteObjects); writer.preparePack(newObjects, remoteObjects);
final long start = System.currentTimeMillis(); final long start = System.currentTimeMillis();
writer.writePack(out); writer.writePack(out);
out.flush();
packTransferTime = System.currentTimeMillis() - start; packTransferTime = System.currentTimeMillis() - start;
} }

12
org.eclipse.jgit/src/org/eclipse/jgit/transport/BundleWriter.java

@ -1,5 +1,5 @@
/* /*
* Copyright (C) 2008-2009, Google Inc. * Copyright (C) 2008-2010, Google Inc.
* and other copyright owners as documented in the project's IP log. * and other copyright owners as documented in the project's IP log.
* *
* This program and the accompanying materials are made available * This program and the accompanying materials are made available
@ -43,7 +43,6 @@
package org.eclipse.jgit.transport; package org.eclipse.jgit.transport;
import java.io.BufferedOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
@ -155,18 +154,15 @@ public class BundleWriter {
* This method can only be called once per BundleWriter instance. * This method can only be called once per BundleWriter instance.
* *
* @param os * @param os
* the stream the bundle is written to. If the stream is not * the stream the bundle is written to. The stream should be
* buffered it will be buffered by the writer. Caller is * buffered by the caller. The caller is responsible for closing
* responsible for closing the stream. * the stream.
* @throws IOException * @throws IOException
* an error occurred reading a local object's data to include in * an error occurred reading a local object's data to include in
* the bundle, or writing compressed object data to the output * the bundle, or writing compressed object data to the output
* stream. * stream.
*/ */
public void writeBundle(OutputStream os) throws IOException { public void writeBundle(OutputStream os) throws IOException {
if (!(os instanceof BufferedOutputStream))
os = new BufferedOutputStream(os);
final HashSet<ObjectId> inc = new HashSet<ObjectId>(); final HashSet<ObjectId> inc = new HashSet<ObjectId>();
final HashSet<ObjectId> exc = new HashSet<ObjectId>(); final HashSet<ObjectId> exc = new HashSet<ObjectId>();
inc.addAll(include.values()); inc.addAll(include.values());

20
org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportGitAnon.java

@ -45,7 +45,11 @@
package org.eclipse.jgit.transport; package org.eclipse.jgit.transport;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -136,7 +140,13 @@ class TransportGitAnon extends TcpTransport implements PackTransport {
super(TransportGitAnon.this); super(TransportGitAnon.this);
sock = openConnection(); sock = openConnection();
try { try {
init(sock.getInputStream(), sock.getOutputStream()); InputStream sIn = sock.getInputStream();
OutputStream sOut = sock.getOutputStream();
sIn = new BufferedInputStream(sIn);
sOut = new BufferedOutputStream(sOut);
init(sIn, sOut);
service("git-upload-pack", pckOut); service("git-upload-pack", pckOut);
} catch (IOException err) { } catch (IOException err) {
close(); close();
@ -169,7 +179,13 @@ class TransportGitAnon extends TcpTransport implements PackTransport {
super(TransportGitAnon.this); super(TransportGitAnon.this);
sock = openConnection(); sock = openConnection();
try { try {
init(sock.getInputStream(), sock.getOutputStream()); InputStream sIn = sock.getInputStream();
OutputStream sOut = sock.getOutputStream();
sIn = new BufferedInputStream(sIn);
sOut = new BufferedOutputStream(sOut);
init(sIn, sOut);
service("git-receive-pack", pckOut); service("git-receive-pack", pckOut);
} catch (IOException err) { } catch (IOException err) {
close(); close();

18
org.eclipse.jgit/src/org/eclipse/jgit/transport/TransportLocal.java

@ -47,6 +47,8 @@
package org.eclipse.jgit.transport; package org.eclipse.jgit.transport;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -259,8 +261,12 @@ class TransportLocal extends Transport implements PackTransport {
errorReaderThread = new StreamCopyThread(upErr, msg.getRawStream()); errorReaderThread = new StreamCopyThread(upErr, msg.getRawStream());
errorReaderThread.start(); errorReaderThread.start();
final InputStream upIn = uploadPack.getInputStream(); InputStream upIn = uploadPack.getInputStream();
final OutputStream upOut = uploadPack.getOutputStream(); OutputStream upOut = uploadPack.getOutputStream();
upIn = new BufferedInputStream(upIn);
upOut = new BufferedOutputStream(upOut);
init(upIn, upOut); init(upIn, upOut);
readAdvertisedRefs(); readAdvertisedRefs();
} }
@ -385,8 +391,12 @@ class TransportLocal extends Transport implements PackTransport {
errorReaderThread = new StreamCopyThread(rpErr, msg.getRawStream()); errorReaderThread = new StreamCopyThread(rpErr, msg.getRawStream());
errorReaderThread.start(); errorReaderThread.start();
final InputStream rpIn = receivePack.getInputStream(); InputStream rpIn = receivePack.getInputStream();
final OutputStream rpOut = receivePack.getOutputStream(); OutputStream rpOut = receivePack.getOutputStream();
rpIn = new BufferedInputStream(rpIn);
rpOut = new BufferedOutputStream(rpOut);
init(rpIn, rpOut); init(rpIn, rpOut);
readAdvertisedRefs(); readAdvertisedRefs();
} }

7
org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java

@ -583,12 +583,9 @@ public class UploadPack {
} }
} }
pw.writePack(packOut); pw.writePack(packOut);
packOut.flush();
if (sideband) { if (sideband)
packOut.flush();
pckOut.end(); pckOut.end();
} else {
rawOut.flush();
}
} }
} }

3
org.eclipse.jgit/src/org/eclipse/jgit/transport/WalkPushConnection.java

@ -45,6 +45,7 @@ package org.eclipse.jgit.transport;
import static org.eclipse.jgit.transport.WalkRemoteObjectDatabase.ROOT_DIR; import static org.eclipse.jgit.transport.WalkRemoteObjectDatabase.ROOT_DIR;
import java.io.BufferedOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
@ -251,6 +252,7 @@ class WalkPushConnection extends BaseConnection implements PushConnection {
final String wt = "Put " + base.substring(0, 12); final String wt = "Put " + base.substring(0, 12);
OutputStream os = dest.writeFile(pathPack, monitor, wt + "..pack"); OutputStream os = dest.writeFile(pathPack, monitor, wt + "..pack");
try { try {
os = new BufferedOutputStream(os);
pw.writePack(os); pw.writePack(os);
} finally { } finally {
os.close(); os.close();
@ -258,6 +260,7 @@ class WalkPushConnection extends BaseConnection implements PushConnection {
os = dest.writeFile(pathIdx, monitor, wt + "..idx"); os = dest.writeFile(pathIdx, monitor, wt + "..idx");
try { try {
os = new BufferedOutputStream(os);
pw.writeIndex(os); pw.writeIndex(os);
} finally { } finally {
os.close(); os.close();

Loading…
Cancel
Save