diff --git a/org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/PacketLineOutTest.java b/org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/PacketLineOutTest.java index 6eb98ac12..c66d4d52a 100644 --- a/org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/PacketLineOutTest.java +++ b/org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/PacketLineOutTest.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2009, Google Inc. + * Copyright (C) 2009-2010, Google Inc. * and other copyright owners as documented in the project's IP log. * * This program and the accompanying materials are made available @@ -119,8 +119,7 @@ public class PacketLineOutTest extends TestCase { } public void testWritePacket3() throws IOException { - final int buflen = SideBandOutputStream.MAX_BUF - - SideBandOutputStream.HDR_SIZE; + final int buflen = SideBandOutputStream.MAX_BUF - 5; final byte[] buf = new byte[buflen]; for (int i = 0; i < buf.length; i++) { buf[i] = (byte) i; @@ -137,23 +136,6 @@ public class PacketLineOutTest extends TestCase { } } - // writeChannelPacket - - public void testWriteChannelPacket1() throws IOException { - out.writeChannelPacket(1, new byte[] { 'a' }, 0, 1); - assertBuffer("0006\001a"); - } - - public void testWriteChannelPacket2() throws IOException { - out.writeChannelPacket(2, new byte[] { 'b' }, 0, 1); - assertBuffer("0006\002b"); - } - - public void testWriteChannelPacket3() throws IOException { - out.writeChannelPacket(3, new byte[] { 'c' }, 0, 1); - assertBuffer("0006\003c"); - } - // flush public void testFlush() throws IOException { diff --git a/org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/SideBandOutputStreamTest.java b/org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/SideBandOutputStreamTest.java index 3c79f138c..61c894e41 100644 --- a/org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/SideBandOutputStreamTest.java +++ b/org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/SideBandOutputStreamTest.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2009, Google Inc. + * Copyright (C) 2009-2010, Google Inc. * and other copyright owners as documented in the project's IP log. * * This program and the accompanying materials are made available @@ -43,6 +43,13 @@ package org.eclipse.jgit.transport; +import static org.eclipse.jgit.transport.SideBandOutputStream.CH_DATA; +import static org.eclipse.jgit.transport.SideBandOutputStream.CH_ERROR; +import static org.eclipse.jgit.transport.SideBandOutputStream.CH_PROGRESS; +import static org.eclipse.jgit.transport.SideBandOutputStream.HDR_SIZE; +import static org.eclipse.jgit.transport.SideBandOutputStream.MAX_BUF; +import static org.eclipse.jgit.transport.SideBandOutputStream.SMALL_BUF; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -58,62 +65,90 @@ import org.eclipse.jgit.lib.Constants; public class SideBandOutputStreamTest extends TestCase { private ByteArrayOutputStream rawOut; - private PacketLineOut pckOut; - protected void setUp() throws Exception { super.setUp(); rawOut = new ByteArrayOutputStream(); - pckOut = new PacketLineOut(rawOut); } public void testWrite_CH_DATA() throws IOException { final SideBandOutputStream out; - out = new SideBandOutputStream(SideBandOutputStream.CH_DATA, pckOut); + out = new SideBandOutputStream(CH_DATA, SMALL_BUF, rawOut); out.write(new byte[] { 'a', 'b', 'c' }); + out.flush(); assertBuffer("0008\001abc"); } public void testWrite_CH_PROGRESS() throws IOException { final SideBandOutputStream out; - out = new SideBandOutputStream(SideBandOutputStream.CH_PROGRESS, pckOut); + out = new SideBandOutputStream(CH_PROGRESS, SMALL_BUF, rawOut); out.write(new byte[] { 'a', 'b', 'c' }); + out.flush(); assertBuffer("0008\002abc"); } public void testWrite_CH_ERROR() throws IOException { final SideBandOutputStream out; - out = new SideBandOutputStream(SideBandOutputStream.CH_ERROR, pckOut); + out = new SideBandOutputStream(CH_ERROR, SMALL_BUF, rawOut); out.write(new byte[] { 'a', 'b', 'c' }); + out.flush(); assertBuffer("0008\003abc"); } public void testWrite_Small() throws IOException { final SideBandOutputStream out; - out = new SideBandOutputStream(SideBandOutputStream.CH_DATA, pckOut); + out = new SideBandOutputStream(CH_DATA, SMALL_BUF, rawOut); + out.write('a'); + out.write('b'); + out.write('c'); + out.flush(); + assertBuffer("0008\001abc"); + } + + public void testWrite_SmallBlocks1() throws IOException { + final SideBandOutputStream out; + out = new SideBandOutputStream(CH_DATA, 6, rawOut); out.write('a'); out.write('b'); out.write('c'); + out.flush(); assertBuffer("0006\001a0006\001b0006\001c"); } + public void testWrite_SmallBlocks2() throws IOException { + final SideBandOutputStream out; + out = new SideBandOutputStream(CH_DATA, 6, rawOut); + out.write(new byte[] { 'a', 'b', 'c' }); + out.flush(); + assertBuffer("0006\001a0006\001b0006\001c"); + } + + public void testWrite_SmallBlocks3() throws IOException { + final SideBandOutputStream out; + out = new SideBandOutputStream(CH_DATA, 7, rawOut); + out.write('a'); + out.write(new byte[] { 'b', 'c' }); + out.flush(); + assertBuffer("0007\001ab0006\001c"); + } + public void testWrite_Large() throws IOException { - final int buflen = SideBandOutputStream.MAX_BUF - - SideBandOutputStream.HDR_SIZE; + final int buflen = MAX_BUF - HDR_SIZE; final byte[] buf = new byte[buflen]; for (int i = 0; i < buf.length; i++) { buf[i] = (byte) i; } final SideBandOutputStream out; - out = new SideBandOutputStream(SideBandOutputStream.CH_DATA, pckOut); + out = new SideBandOutputStream(CH_DATA, MAX_BUF, rawOut); out.write(buf); + out.flush(); final byte[] act = rawOut.toByteArray(); - final String explen = Integer.toString(buf.length + 5, 16); - assertEquals(5 + buf.length, act.length); + final String explen = Integer.toString(buf.length + HDR_SIZE, 16); + assertEquals(HDR_SIZE + buf.length, act.length); assertEquals(new String(act, 0, 4, "UTF-8"), explen); assertEquals(1, act[4]); - for (int i = 0, j = 5; i < buf.length; i++, j++) { + for (int i = 0, j = HDR_SIZE; i < buf.length; i++, j++) { assertEquals(buf[i], act[j]); } } @@ -132,17 +167,63 @@ public class SideBandOutputStreamTest extends TestCase { } }; - new SideBandOutputStream(SideBandOutputStream.CH_DATA, - new PacketLineOut(mockout)).flush(); - assertEquals(0, flushCnt[0]); - - new SideBandOutputStream(SideBandOutputStream.CH_ERROR, - new PacketLineOut(mockout)).flush(); + new SideBandOutputStream(CH_DATA, SMALL_BUF, mockout).flush(); assertEquals(1, flushCnt[0]); + } + + public void testConstructor_RejectsBadChannel() { + try { + new SideBandOutputStream(-1, MAX_BUF, rawOut); + fail("Accepted -1 channel number"); + } catch (IllegalArgumentException e) { + assertEquals("channel -1 must be in range [0, 255]", e.getMessage()); + } - new SideBandOutputStream(SideBandOutputStream.CH_PROGRESS, - new PacketLineOut(mockout)).flush(); - assertEquals(2, flushCnt[0]); + try { + new SideBandOutputStream(0, MAX_BUF, rawOut); + fail("Accepted 0 channel number"); + } catch (IllegalArgumentException e) { + assertEquals("channel 0 must be in range [0, 255]", e.getMessage()); + } + + try { + new SideBandOutputStream(256, MAX_BUF, rawOut); + fail("Accepted 256 channel number"); + } catch (IllegalArgumentException e) { + assertEquals("channel 256 must be in range [0, 255]", e + .getMessage()); + } + } + + public void testConstructor_RejectsBadBufferSize() { + try { + new SideBandOutputStream(CH_DATA, -1, rawOut); + fail("Accepted -1 for buffer size"); + } catch (IllegalArgumentException e) { + assertEquals("packet size -1 must be >= 5", e.getMessage()); + } + + try { + new SideBandOutputStream(CH_DATA, 0, rawOut); + fail("Accepted 0 for buffer size"); + } catch (IllegalArgumentException e) { + assertEquals("packet size 0 must be >= 5", e.getMessage()); + } + + try { + new SideBandOutputStream(CH_DATA, 1, rawOut); + fail("Accepted 1 for buffer size"); + } catch (IllegalArgumentException e) { + assertEquals("packet size 1 must be >= 5", e.getMessage()); + } + + try { + new SideBandOutputStream(CH_DATA, Integer.MAX_VALUE, rawOut); + fail("Accepted " + Integer.MAX_VALUE + " for buffer size"); + } catch (IllegalArgumentException e) { + assertEquals("packet size " + Integer.MAX_VALUE + + " must be <= 65520", e.getMessage()); + } } private void assertBuffer(final String exp) throws IOException { diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java index 81dd4f6a1..51506b20a 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2008-2009, Google Inc. + * Copyright (C) 2008-2010, Google Inc. * Copyright (C) 2008-2009, Robin Rosenberg * Copyright (C) 2008, Shawn O. Pearce * and other copyright owners as documented in the project's IP log. @@ -105,14 +105,6 @@ public class PacketLineOut { out.write(packet); } - void writeChannelPacket(final int channel, final byte[] buf, int off, - int len) throws IOException { - formatLength(len + 5); - lenbuffer[4] = (byte) channel; - out.write(lenbuffer, 0, 5); - out.write(buf, off, len); - } - /** * Write a packet end marker, sometimes referred to as a flush command. *

@@ -149,6 +141,10 @@ public class PacketLineOut { '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }; private void formatLength(int w) { + formatLength(lenbuffer, w); + } + + static void formatLength(byte[] lenbuffer, int w) { int o = 3; while (o >= 0 && w != 0) { lenbuffer[o--] = hexchar[w & 0xf]; diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java index 5e50fd89b..6e0a52627 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2008, Google Inc. + * Copyright (C) 2008-2010, Google Inc. * and other copyright owners as documented in the project's IP log. * * This program and the accompanying materials are made available @@ -47,11 +47,10 @@ import java.io.IOException; import java.io.OutputStream; /** - * Multiplexes data and progress messages + * Multiplexes data and progress messages. *

- * To correctly use this class you must wrap it in a BufferedOutputStream with a - * buffer size no larger than either {@link #SMALL_BUF} or {@link #MAX_BUF}, - * minus {@link #HDR_SIZE}. + * This stream is buffered at packet sizes, so the caller doesn't need to wrap + * it in yet another buffered stream. */ class SideBandOutputStream extends OutputStream { static final int CH_DATA = SideBandInputStream.CH_DATA; @@ -66,34 +65,93 @@ class SideBandOutputStream extends OutputStream { static final int HDR_SIZE = 5; - private final int channel; + private final OutputStream out; - private final PacketLineOut pckOut; + private final byte[] buffer; - private byte[] singleByteBuffer; + /** + * Number of bytes in {@link #buffer} that are valid data. + *

+ * Initialized to {@link #HDR_SIZE} if there is no application data in the + * buffer, as the packet header always appears at the start of the buffer. + */ + private int cnt; - SideBandOutputStream(final int chan, final PacketLineOut out) { - channel = chan; - pckOut = out; + /** + * Create a new stream to write side band packets. + * + * @param chan + * channel number to prefix all packets with, so the remote side + * can demultiplex the stream and get back the original data. + * Must be in the range [0, 255]. + * @param sz + * maximum size of a data packet within the stream. The remote + * side needs to agree to the packet size to prevent buffer + * overflows. Must be in the range [HDR_SIZE + 1, MAX_BUF). + * @param os + * stream that the packets are written onto. This stream should + * be attached to a SideBandInputStream on the remote side. + */ + SideBandOutputStream(final int chan, final int sz, final OutputStream os) { + if (chan <= 0 || chan > 255) + throw new IllegalArgumentException("channel " + chan + + " must be in range [0, 255]"); + if (sz <= HDR_SIZE) + throw new IllegalArgumentException("packet size " + sz + + " must be >= " + HDR_SIZE); + else if (MAX_BUF < sz) + throw new IllegalArgumentException("packet size " + sz + + " must be <= " + MAX_BUF); + + out = os; + buffer = new byte[sz]; + buffer[4] = (byte) chan; + cnt = HDR_SIZE; } @Override public void flush() throws IOException { - if (channel != CH_DATA) - pckOut.flush(); + if (HDR_SIZE < cnt) + writeBuffer(); + out.flush(); } @Override - public void write(final byte[] b, final int off, final int len) - throws IOException { - pckOut.writeChannelPacket(channel, b, off, len); + public void write(final byte[] b, int off, int len) throws IOException { + while (0 < len) { + int capacity = buffer.length - cnt; + if (cnt == HDR_SIZE && capacity < len) { + // Our block to write is bigger than the packet size, + // stream it out as-is to avoid unnecessary copies. + PacketLineOut.formatLength(buffer, buffer.length); + out.write(buffer, 0, HDR_SIZE); + out.write(b, off, capacity); + off += capacity; + len -= capacity; + + } else { + if (capacity == 0) + writeBuffer(); + + int n = Math.min(len, capacity); + System.arraycopy(b, off, buffer, cnt, n); + cnt += n; + off += n; + len -= n; + } + } } @Override public void write(final int b) throws IOException { - if (singleByteBuffer == null) - singleByteBuffer = new byte[1]; - singleByteBuffer[0] = (byte) b; - write(singleByteBuffer); + if (cnt == buffer.length) + writeBuffer(); + buffer[cnt++] = (byte) b; + } + + private void writeBuffer() throws IOException { + PacketLineOut.formatLength(buffer, cnt); + out.write(buffer, 0, cnt); + cnt = HDR_SIZE; } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandProgressMonitor.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandProgressMonitor.java index 89d338c89..efce7b1da 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandProgressMonitor.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandProgressMonitor.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2008, Google Inc. + * Copyright (C) 2008-2010, Google Inc. * and other copyright owners as documented in the project's IP log. * * This program and the accompanying materials are made available @@ -43,7 +43,7 @@ package org.eclipse.jgit.transport; -import java.io.BufferedOutputStream; +import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; @@ -66,12 +66,8 @@ class SideBandProgressMonitor implements ProgressMonitor { private int totalWork; - SideBandProgressMonitor(final PacketLineOut pckOut) { - final int bufsz = SideBandOutputStream.SMALL_BUF - - SideBandOutputStream.HDR_SIZE; - out = new PrintWriter(new OutputStreamWriter(new BufferedOutputStream( - new SideBandOutputStream(SideBandOutputStream.CH_PROGRESS, - pckOut), bufsz), Constants.CHARSET)); + SideBandProgressMonitor(final OutputStream os) { + out = new PrintWriter(new OutputStreamWriter(os, Constants.CHARSET)); } public void start(final int totalTasks) { diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java index b76b22b77..39c4243ba 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java @@ -43,9 +43,6 @@ package org.eclipse.jgit.transport; -import static org.eclipse.jgit.transport.BasePackFetchConnection.MultiAck; - -import java.io.BufferedOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -70,6 +67,7 @@ import org.eclipse.jgit.revwalk.RevFlagSet; import org.eclipse.jgit.revwalk.RevObject; import org.eclipse.jgit.revwalk.RevTag; import org.eclipse.jgit.revwalk.RevWalk; +import org.eclipse.jgit.transport.BasePackFetchConnection.MultiAck; import org.eclipse.jgit.transport.RefAdvertiser.PacketLineOutRefAdvertiser; import org.eclipse.jgit.util.io.InterruptTimer; import org.eclipse.jgit.util.io.TimeoutInputStream; @@ -556,13 +554,12 @@ public class UploadPack { int bufsz = SideBandOutputStream.SMALL_BUF; if (options.contains(OPTION_SIDE_BAND_64K)) bufsz = SideBandOutputStream.MAX_BUF; - bufsz -= SideBandOutputStream.HDR_SIZE; - - packOut = new BufferedOutputStream(new SideBandOutputStream( - SideBandOutputStream.CH_DATA, pckOut), bufsz); + packOut = new SideBandOutputStream(SideBandOutputStream.CH_DATA, + bufsz, rawOut); if (progress) - pm = new SideBandProgressMonitor(pckOut); + pm = new SideBandProgressMonitor(new SideBandOutputStream( + SideBandOutputStream.CH_PROGRESS, bufsz, rawOut)); } final PackWriter pw;