Browse Source

Refactor SideBandOutputStream to be buffered

Instead of relying on our callers to wrap us up inside of a
BufferedOutputStream and using the proper block sizing, do the
buffering directly inside of SideBandOutputStream.  This ensures
we don't get large write-throughs from BufferedOutputStream that
might overflow the configured packet size.

The constructor of SideBandOutputStream is also beefed up to check
its arguments and ensure they are within acceptable ranges for the
current side-band protocol.

Change-Id: Ic14567327d03c9e972f9734b8228178bc448867d
Signed-off-by: Shawn O. Pearce <spearce@spearce.org>
stable-0.7
Shawn O. Pearce 15 years ago
parent
commit
0af5944cac
  1. 22
      org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/PacketLineOutTest.java
  2. 127
      org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/SideBandOutputStreamTest.java
  3. 14
      org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java
  4. 98
      org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java
  5. 12
      org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandProgressMonitor.java
  6. 13
      org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java

22
org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/PacketLineOutTest.java

@ -1,5 +1,5 @@
/* /*
* Copyright (C) 2009, Google Inc. * Copyright (C) 2009-2010, Google Inc.
* and other copyright owners as documented in the project's IP log. * and other copyright owners as documented in the project's IP log.
* *
* This program and the accompanying materials are made available * This program and the accompanying materials are made available
@ -119,8 +119,7 @@ public class PacketLineOutTest extends TestCase {
} }
public void testWritePacket3() throws IOException { public void testWritePacket3() throws IOException {
final int buflen = SideBandOutputStream.MAX_BUF final int buflen = SideBandOutputStream.MAX_BUF - 5;
- SideBandOutputStream.HDR_SIZE;
final byte[] buf = new byte[buflen]; final byte[] buf = new byte[buflen];
for (int i = 0; i < buf.length; i++) { for (int i = 0; i < buf.length; i++) {
buf[i] = (byte) i; buf[i] = (byte) i;
@ -137,23 +136,6 @@ public class PacketLineOutTest extends TestCase {
} }
} }
// writeChannelPacket
public void testWriteChannelPacket1() throws IOException {
out.writeChannelPacket(1, new byte[] { 'a' }, 0, 1);
assertBuffer("0006\001a");
}
public void testWriteChannelPacket2() throws IOException {
out.writeChannelPacket(2, new byte[] { 'b' }, 0, 1);
assertBuffer("0006\002b");
}
public void testWriteChannelPacket3() throws IOException {
out.writeChannelPacket(3, new byte[] { 'c' }, 0, 1);
assertBuffer("0006\003c");
}
// flush // flush
public void testFlush() throws IOException { public void testFlush() throws IOException {

127
org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/SideBandOutputStreamTest.java

@ -1,5 +1,5 @@
/* /*
* Copyright (C) 2009, Google Inc. * Copyright (C) 2009-2010, Google Inc.
* and other copyright owners as documented in the project's IP log. * and other copyright owners as documented in the project's IP log.
* *
* This program and the accompanying materials are made available * This program and the accompanying materials are made available
@ -43,6 +43,13 @@
package org.eclipse.jgit.transport; package org.eclipse.jgit.transport;
import static org.eclipse.jgit.transport.SideBandOutputStream.CH_DATA;
import static org.eclipse.jgit.transport.SideBandOutputStream.CH_ERROR;
import static org.eclipse.jgit.transport.SideBandOutputStream.CH_PROGRESS;
import static org.eclipse.jgit.transport.SideBandOutputStream.HDR_SIZE;
import static org.eclipse.jgit.transport.SideBandOutputStream.MAX_BUF;
import static org.eclipse.jgit.transport.SideBandOutputStream.SMALL_BUF;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
@ -58,62 +65,90 @@ import org.eclipse.jgit.lib.Constants;
public class SideBandOutputStreamTest extends TestCase { public class SideBandOutputStreamTest extends TestCase {
private ByteArrayOutputStream rawOut; private ByteArrayOutputStream rawOut;
private PacketLineOut pckOut;
protected void setUp() throws Exception { protected void setUp() throws Exception {
super.setUp(); super.setUp();
rawOut = new ByteArrayOutputStream(); rawOut = new ByteArrayOutputStream();
pckOut = new PacketLineOut(rawOut);
} }
public void testWrite_CH_DATA() throws IOException { public void testWrite_CH_DATA() throws IOException {
final SideBandOutputStream out; final SideBandOutputStream out;
out = new SideBandOutputStream(SideBandOutputStream.CH_DATA, pckOut); out = new SideBandOutputStream(CH_DATA, SMALL_BUF, rawOut);
out.write(new byte[] { 'a', 'b', 'c' }); out.write(new byte[] { 'a', 'b', 'c' });
out.flush();
assertBuffer("0008\001abc"); assertBuffer("0008\001abc");
} }
public void testWrite_CH_PROGRESS() throws IOException { public void testWrite_CH_PROGRESS() throws IOException {
final SideBandOutputStream out; final SideBandOutputStream out;
out = new SideBandOutputStream(SideBandOutputStream.CH_PROGRESS, pckOut); out = new SideBandOutputStream(CH_PROGRESS, SMALL_BUF, rawOut);
out.write(new byte[] { 'a', 'b', 'c' }); out.write(new byte[] { 'a', 'b', 'c' });
out.flush();
assertBuffer("0008\002abc"); assertBuffer("0008\002abc");
} }
public void testWrite_CH_ERROR() throws IOException { public void testWrite_CH_ERROR() throws IOException {
final SideBandOutputStream out; final SideBandOutputStream out;
out = new SideBandOutputStream(SideBandOutputStream.CH_ERROR, pckOut); out = new SideBandOutputStream(CH_ERROR, SMALL_BUF, rawOut);
out.write(new byte[] { 'a', 'b', 'c' }); out.write(new byte[] { 'a', 'b', 'c' });
out.flush();
assertBuffer("0008\003abc"); assertBuffer("0008\003abc");
} }
public void testWrite_Small() throws IOException { public void testWrite_Small() throws IOException {
final SideBandOutputStream out; final SideBandOutputStream out;
out = new SideBandOutputStream(SideBandOutputStream.CH_DATA, pckOut); out = new SideBandOutputStream(CH_DATA, SMALL_BUF, rawOut);
out.write('a');
out.write('b');
out.write('c');
out.flush();
assertBuffer("0008\001abc");
}
public void testWrite_SmallBlocks1() throws IOException {
final SideBandOutputStream out;
out = new SideBandOutputStream(CH_DATA, 6, rawOut);
out.write('a'); out.write('a');
out.write('b'); out.write('b');
out.write('c'); out.write('c');
out.flush();
assertBuffer("0006\001a0006\001b0006\001c"); assertBuffer("0006\001a0006\001b0006\001c");
} }
public void testWrite_SmallBlocks2() throws IOException {
final SideBandOutputStream out;
out = new SideBandOutputStream(CH_DATA, 6, rawOut);
out.write(new byte[] { 'a', 'b', 'c' });
out.flush();
assertBuffer("0006\001a0006\001b0006\001c");
}
public void testWrite_SmallBlocks3() throws IOException {
final SideBandOutputStream out;
out = new SideBandOutputStream(CH_DATA, 7, rawOut);
out.write('a');
out.write(new byte[] { 'b', 'c' });
out.flush();
assertBuffer("0007\001ab0006\001c");
}
public void testWrite_Large() throws IOException { public void testWrite_Large() throws IOException {
final int buflen = SideBandOutputStream.MAX_BUF final int buflen = MAX_BUF - HDR_SIZE;
- SideBandOutputStream.HDR_SIZE;
final byte[] buf = new byte[buflen]; final byte[] buf = new byte[buflen];
for (int i = 0; i < buf.length; i++) { for (int i = 0; i < buf.length; i++) {
buf[i] = (byte) i; buf[i] = (byte) i;
} }
final SideBandOutputStream out; final SideBandOutputStream out;
out = new SideBandOutputStream(SideBandOutputStream.CH_DATA, pckOut); out = new SideBandOutputStream(CH_DATA, MAX_BUF, rawOut);
out.write(buf); out.write(buf);
out.flush();
final byte[] act = rawOut.toByteArray(); final byte[] act = rawOut.toByteArray();
final String explen = Integer.toString(buf.length + 5, 16); final String explen = Integer.toString(buf.length + HDR_SIZE, 16);
assertEquals(5 + buf.length, act.length); assertEquals(HDR_SIZE + buf.length, act.length);
assertEquals(new String(act, 0, 4, "UTF-8"), explen); assertEquals(new String(act, 0, 4, "UTF-8"), explen);
assertEquals(1, act[4]); assertEquals(1, act[4]);
for (int i = 0, j = 5; i < buf.length; i++, j++) { for (int i = 0, j = HDR_SIZE; i < buf.length; i++, j++) {
assertEquals(buf[i], act[j]); assertEquals(buf[i], act[j]);
} }
} }
@ -132,17 +167,63 @@ public class SideBandOutputStreamTest extends TestCase {
} }
}; };
new SideBandOutputStream(SideBandOutputStream.CH_DATA, new SideBandOutputStream(CH_DATA, SMALL_BUF, mockout).flush();
new PacketLineOut(mockout)).flush();
assertEquals(0, flushCnt[0]);
new SideBandOutputStream(SideBandOutputStream.CH_ERROR,
new PacketLineOut(mockout)).flush();
assertEquals(1, flushCnt[0]); assertEquals(1, flushCnt[0]);
}
public void testConstructor_RejectsBadChannel() {
try {
new SideBandOutputStream(-1, MAX_BUF, rawOut);
fail("Accepted -1 channel number");
} catch (IllegalArgumentException e) {
assertEquals("channel -1 must be in range [0, 255]", e.getMessage());
}
new SideBandOutputStream(SideBandOutputStream.CH_PROGRESS, try {
new PacketLineOut(mockout)).flush(); new SideBandOutputStream(0, MAX_BUF, rawOut);
assertEquals(2, flushCnt[0]); fail("Accepted 0 channel number");
} catch (IllegalArgumentException e) {
assertEquals("channel 0 must be in range [0, 255]", e.getMessage());
}
try {
new SideBandOutputStream(256, MAX_BUF, rawOut);
fail("Accepted 256 channel number");
} catch (IllegalArgumentException e) {
assertEquals("channel 256 must be in range [0, 255]", e
.getMessage());
}
}
public void testConstructor_RejectsBadBufferSize() {
try {
new SideBandOutputStream(CH_DATA, -1, rawOut);
fail("Accepted -1 for buffer size");
} catch (IllegalArgumentException e) {
assertEquals("packet size -1 must be >= 5", e.getMessage());
}
try {
new SideBandOutputStream(CH_DATA, 0, rawOut);
fail("Accepted 0 for buffer size");
} catch (IllegalArgumentException e) {
assertEquals("packet size 0 must be >= 5", e.getMessage());
}
try {
new SideBandOutputStream(CH_DATA, 1, rawOut);
fail("Accepted 1 for buffer size");
} catch (IllegalArgumentException e) {
assertEquals("packet size 1 must be >= 5", e.getMessage());
}
try {
new SideBandOutputStream(CH_DATA, Integer.MAX_VALUE, rawOut);
fail("Accepted " + Integer.MAX_VALUE + " for buffer size");
} catch (IllegalArgumentException e) {
assertEquals("packet size " + Integer.MAX_VALUE
+ " must be <= 65520", e.getMessage());
}
} }
private void assertBuffer(final String exp) throws IOException { private void assertBuffer(final String exp) throws IOException {

14
org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java

@ -1,5 +1,5 @@
/* /*
* Copyright (C) 2008-2009, Google Inc. * Copyright (C) 2008-2010, Google Inc.
* Copyright (C) 2008-2009, Robin Rosenberg <robin.rosenberg@dewire.com> * Copyright (C) 2008-2009, Robin Rosenberg <robin.rosenberg@dewire.com>
* Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org> * Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
* and other copyright owners as documented in the project's IP log. * and other copyright owners as documented in the project's IP log.
@ -105,14 +105,6 @@ public class PacketLineOut {
out.write(packet); out.write(packet);
} }
void writeChannelPacket(final int channel, final byte[] buf, int off,
int len) throws IOException {
formatLength(len + 5);
lenbuffer[4] = (byte) channel;
out.write(lenbuffer, 0, 5);
out.write(buf, off, len);
}
/** /**
* Write a packet end marker, sometimes referred to as a flush command. * Write a packet end marker, sometimes referred to as a flush command.
* <p> * <p>
@ -149,6 +141,10 @@ public class PacketLineOut {
'7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }; '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
private void formatLength(int w) { private void formatLength(int w) {
formatLength(lenbuffer, w);
}
static void formatLength(byte[] lenbuffer, int w) {
int o = 3; int o = 3;
while (o >= 0 && w != 0) { while (o >= 0 && w != 0) {
lenbuffer[o--] = hexchar[w & 0xf]; lenbuffer[o--] = hexchar[w & 0xf];

98
org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java

@ -1,5 +1,5 @@
/* /*
* Copyright (C) 2008, Google Inc. * Copyright (C) 2008-2010, Google Inc.
* and other copyright owners as documented in the project's IP log. * and other copyright owners as documented in the project's IP log.
* *
* This program and the accompanying materials are made available * This program and the accompanying materials are made available
@ -47,11 +47,10 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
/** /**
* Multiplexes data and progress messages * Multiplexes data and progress messages.
* <p> * <p>
* To correctly use this class you must wrap it in a BufferedOutputStream with a * This stream is buffered at packet sizes, so the caller doesn't need to wrap
* buffer size no larger than either {@link #SMALL_BUF} or {@link #MAX_BUF}, * it in yet another buffered stream.
* minus {@link #HDR_SIZE}.
*/ */
class SideBandOutputStream extends OutputStream { class SideBandOutputStream extends OutputStream {
static final int CH_DATA = SideBandInputStream.CH_DATA; static final int CH_DATA = SideBandInputStream.CH_DATA;
@ -66,34 +65,93 @@ class SideBandOutputStream extends OutputStream {
static final int HDR_SIZE = 5; static final int HDR_SIZE = 5;
private final int channel; private final OutputStream out;
private final PacketLineOut pckOut; private final byte[] buffer;
private byte[] singleByteBuffer; /**
* Number of bytes in {@link #buffer} that are valid data.
* <p>
* Initialized to {@link #HDR_SIZE} if there is no application data in the
* buffer, as the packet header always appears at the start of the buffer.
*/
private int cnt;
/**
* Create a new stream to write side band packets.
*
* @param chan
* channel number to prefix all packets with, so the remote side
* can demultiplex the stream and get back the original data.
* Must be in the range [0, 255].
* @param sz
* maximum size of a data packet within the stream. The remote
* side needs to agree to the packet size to prevent buffer
* overflows. Must be in the range [HDR_SIZE + 1, MAX_BUF).
* @param os
* stream that the packets are written onto. This stream should
* be attached to a SideBandInputStream on the remote side.
*/
SideBandOutputStream(final int chan, final int sz, final OutputStream os) {
if (chan <= 0 || chan > 255)
throw new IllegalArgumentException("channel " + chan
+ " must be in range [0, 255]");
if (sz <= HDR_SIZE)
throw new IllegalArgumentException("packet size " + sz
+ " must be >= " + HDR_SIZE);
else if (MAX_BUF < sz)
throw new IllegalArgumentException("packet size " + sz
+ " must be <= " + MAX_BUF);
SideBandOutputStream(final int chan, final PacketLineOut out) { out = os;
channel = chan; buffer = new byte[sz];
pckOut = out; buffer[4] = (byte) chan;
cnt = HDR_SIZE;
} }
@Override @Override
public void flush() throws IOException { public void flush() throws IOException {
if (channel != CH_DATA) if (HDR_SIZE < cnt)
pckOut.flush(); writeBuffer();
out.flush();
} }
@Override @Override
public void write(final byte[] b, final int off, final int len) public void write(final byte[] b, int off, int len) throws IOException {
throws IOException { while (0 < len) {
pckOut.writeChannelPacket(channel, b, off, len); int capacity = buffer.length - cnt;
if (cnt == HDR_SIZE && capacity < len) {
// Our block to write is bigger than the packet size,
// stream it out as-is to avoid unnecessary copies.
PacketLineOut.formatLength(buffer, buffer.length);
out.write(buffer, 0, HDR_SIZE);
out.write(b, off, capacity);
off += capacity;
len -= capacity;
} else {
if (capacity == 0)
writeBuffer();
int n = Math.min(len, capacity);
System.arraycopy(b, off, buffer, cnt, n);
cnt += n;
off += n;
len -= n;
}
}
} }
@Override @Override
public void write(final int b) throws IOException { public void write(final int b) throws IOException {
if (singleByteBuffer == null) if (cnt == buffer.length)
singleByteBuffer = new byte[1]; writeBuffer();
singleByteBuffer[0] = (byte) b; buffer[cnt++] = (byte) b;
write(singleByteBuffer); }
private void writeBuffer() throws IOException {
PacketLineOut.formatLength(buffer, cnt);
out.write(buffer, 0, cnt);
cnt = HDR_SIZE;
} }
} }

12
org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandProgressMonitor.java

@ -1,5 +1,5 @@
/* /*
* Copyright (C) 2008, Google Inc. * Copyright (C) 2008-2010, Google Inc.
* and other copyright owners as documented in the project's IP log. * and other copyright owners as documented in the project's IP log.
* *
* This program and the accompanying materials are made available * This program and the accompanying materials are made available
@ -43,7 +43,7 @@
package org.eclipse.jgit.transport; package org.eclipse.jgit.transport;
import java.io.BufferedOutputStream; import java.io.OutputStream;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.io.PrintWriter; import java.io.PrintWriter;
@ -66,12 +66,8 @@ class SideBandProgressMonitor implements ProgressMonitor {
private int totalWork; private int totalWork;
SideBandProgressMonitor(final PacketLineOut pckOut) { SideBandProgressMonitor(final OutputStream os) {
final int bufsz = SideBandOutputStream.SMALL_BUF out = new PrintWriter(new OutputStreamWriter(os, Constants.CHARSET));
- SideBandOutputStream.HDR_SIZE;
out = new PrintWriter(new OutputStreamWriter(new BufferedOutputStream(
new SideBandOutputStream(SideBandOutputStream.CH_PROGRESS,
pckOut), bufsz), Constants.CHARSET));
} }
public void start(final int totalTasks) { public void start(final int totalTasks) {

13
org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java

@ -43,9 +43,6 @@
package org.eclipse.jgit.transport; package org.eclipse.jgit.transport;
import static org.eclipse.jgit.transport.BasePackFetchConnection.MultiAck;
import java.io.BufferedOutputStream;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -70,6 +67,7 @@ import org.eclipse.jgit.revwalk.RevFlagSet;
import org.eclipse.jgit.revwalk.RevObject; import org.eclipse.jgit.revwalk.RevObject;
import org.eclipse.jgit.revwalk.RevTag; import org.eclipse.jgit.revwalk.RevTag;
import org.eclipse.jgit.revwalk.RevWalk; import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.transport.BasePackFetchConnection.MultiAck;
import org.eclipse.jgit.transport.RefAdvertiser.PacketLineOutRefAdvertiser; import org.eclipse.jgit.transport.RefAdvertiser.PacketLineOutRefAdvertiser;
import org.eclipse.jgit.util.io.InterruptTimer; import org.eclipse.jgit.util.io.InterruptTimer;
import org.eclipse.jgit.util.io.TimeoutInputStream; import org.eclipse.jgit.util.io.TimeoutInputStream;
@ -556,13 +554,12 @@ public class UploadPack {
int bufsz = SideBandOutputStream.SMALL_BUF; int bufsz = SideBandOutputStream.SMALL_BUF;
if (options.contains(OPTION_SIDE_BAND_64K)) if (options.contains(OPTION_SIDE_BAND_64K))
bufsz = SideBandOutputStream.MAX_BUF; bufsz = SideBandOutputStream.MAX_BUF;
bufsz -= SideBandOutputStream.HDR_SIZE;
packOut = new BufferedOutputStream(new SideBandOutputStream(
SideBandOutputStream.CH_DATA, pckOut), bufsz);
packOut = new SideBandOutputStream(SideBandOutputStream.CH_DATA,
bufsz, rawOut);
if (progress) if (progress)
pm = new SideBandProgressMonitor(pckOut); pm = new SideBandProgressMonitor(new SideBandOutputStream(
SideBandOutputStream.CH_PROGRESS, bufsz, rawOut));
} }
final PackWriter pw; final PackWriter pw;

Loading…
Cancel
Save