From 0af5944cac5e09ee99f242d029c89f924e4dd294 Mon Sep 17 00:00:00 2001 From: "Shawn O. Pearce" Date: Mon, 8 Feb 2010 19:10:50 -0800 Subject: [PATCH] Refactor SideBandOutputStream to be buffered Instead of relying on our callers to wrap us up inside of a BufferedOutputStream and using the proper block sizing, do the buffering directly inside of SideBandOutputStream. This ensures we don't get large write-throughs from BufferedOutputStream that might overflow the configured packet size. The constructor of SideBandOutputStream is also beefed up to check its arguments and ensure they are within acceptable ranges for the current side-band protocol. Change-Id: Ic14567327d03c9e972f9734b8228178bc448867d Signed-off-by: Shawn O. Pearce --- .../jgit/transport/PacketLineOutTest.java | 22 +-- .../transport/SideBandOutputStreamTest.java | 127 ++++++++++++++---- .../eclipse/jgit/transport/PacketLineOut.java | 14 +- .../jgit/transport/SideBandOutputStream.java | 98 +++++++++++--- .../transport/SideBandProgressMonitor.java | 12 +- .../eclipse/jgit/transport/UploadPack.java | 13 +- 6 files changed, 198 insertions(+), 88 deletions(-) diff --git a/org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/PacketLineOutTest.java b/org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/PacketLineOutTest.java index 6eb98ac12..c66d4d52a 100644 --- a/org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/PacketLineOutTest.java +++ b/org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/PacketLineOutTest.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2009, Google Inc. + * Copyright (C) 2009-2010, Google Inc. * and other copyright owners as documented in the project's IP log. * * This program and the accompanying materials are made available @@ -119,8 +119,7 @@ public class PacketLineOutTest extends TestCase { } public void testWritePacket3() throws IOException { - final int buflen = SideBandOutputStream.MAX_BUF - - SideBandOutputStream.HDR_SIZE; + final int buflen = SideBandOutputStream.MAX_BUF - 5; final byte[] buf = new byte[buflen]; for (int i = 0; i < buf.length; i++) { buf[i] = (byte) i; @@ -137,23 +136,6 @@ public class PacketLineOutTest extends TestCase { } } - // writeChannelPacket - - public void testWriteChannelPacket1() throws IOException { - out.writeChannelPacket(1, new byte[] { 'a' }, 0, 1); - assertBuffer("0006\001a"); - } - - public void testWriteChannelPacket2() throws IOException { - out.writeChannelPacket(2, new byte[] { 'b' }, 0, 1); - assertBuffer("0006\002b"); - } - - public void testWriteChannelPacket3() throws IOException { - out.writeChannelPacket(3, new byte[] { 'c' }, 0, 1); - assertBuffer("0006\003c"); - } - // flush public void testFlush() throws IOException { diff --git a/org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/SideBandOutputStreamTest.java b/org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/SideBandOutputStreamTest.java index 3c79f138c..61c894e41 100644 --- a/org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/SideBandOutputStreamTest.java +++ b/org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/SideBandOutputStreamTest.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2009, Google Inc. + * Copyright (C) 2009-2010, Google Inc. * and other copyright owners as documented in the project's IP log. * * This program and the accompanying materials are made available @@ -43,6 +43,13 @@ package org.eclipse.jgit.transport; +import static org.eclipse.jgit.transport.SideBandOutputStream.CH_DATA; +import static org.eclipse.jgit.transport.SideBandOutputStream.CH_ERROR; +import static org.eclipse.jgit.transport.SideBandOutputStream.CH_PROGRESS; +import static org.eclipse.jgit.transport.SideBandOutputStream.HDR_SIZE; +import static org.eclipse.jgit.transport.SideBandOutputStream.MAX_BUF; +import static org.eclipse.jgit.transport.SideBandOutputStream.SMALL_BUF; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -58,62 +65,90 @@ import org.eclipse.jgit.lib.Constants; public class SideBandOutputStreamTest extends TestCase { private ByteArrayOutputStream rawOut; - private PacketLineOut pckOut; - protected void setUp() throws Exception { super.setUp(); rawOut = new ByteArrayOutputStream(); - pckOut = new PacketLineOut(rawOut); } public void testWrite_CH_DATA() throws IOException { final SideBandOutputStream out; - out = new SideBandOutputStream(SideBandOutputStream.CH_DATA, pckOut); + out = new SideBandOutputStream(CH_DATA, SMALL_BUF, rawOut); out.write(new byte[] { 'a', 'b', 'c' }); + out.flush(); assertBuffer("0008\001abc"); } public void testWrite_CH_PROGRESS() throws IOException { final SideBandOutputStream out; - out = new SideBandOutputStream(SideBandOutputStream.CH_PROGRESS, pckOut); + out = new SideBandOutputStream(CH_PROGRESS, SMALL_BUF, rawOut); out.write(new byte[] { 'a', 'b', 'c' }); + out.flush(); assertBuffer("0008\002abc"); } public void testWrite_CH_ERROR() throws IOException { final SideBandOutputStream out; - out = new SideBandOutputStream(SideBandOutputStream.CH_ERROR, pckOut); + out = new SideBandOutputStream(CH_ERROR, SMALL_BUF, rawOut); out.write(new byte[] { 'a', 'b', 'c' }); + out.flush(); assertBuffer("0008\003abc"); } public void testWrite_Small() throws IOException { final SideBandOutputStream out; - out = new SideBandOutputStream(SideBandOutputStream.CH_DATA, pckOut); + out = new SideBandOutputStream(CH_DATA, SMALL_BUF, rawOut); + out.write('a'); + out.write('b'); + out.write('c'); + out.flush(); + assertBuffer("0008\001abc"); + } + + public void testWrite_SmallBlocks1() throws IOException { + final SideBandOutputStream out; + out = new SideBandOutputStream(CH_DATA, 6, rawOut); out.write('a'); out.write('b'); out.write('c'); + out.flush(); assertBuffer("0006\001a0006\001b0006\001c"); } + public void testWrite_SmallBlocks2() throws IOException { + final SideBandOutputStream out; + out = new SideBandOutputStream(CH_DATA, 6, rawOut); + out.write(new byte[] { 'a', 'b', 'c' }); + out.flush(); + assertBuffer("0006\001a0006\001b0006\001c"); + } + + public void testWrite_SmallBlocks3() throws IOException { + final SideBandOutputStream out; + out = new SideBandOutputStream(CH_DATA, 7, rawOut); + out.write('a'); + out.write(new byte[] { 'b', 'c' }); + out.flush(); + assertBuffer("0007\001ab0006\001c"); + } + public void testWrite_Large() throws IOException { - final int buflen = SideBandOutputStream.MAX_BUF - - SideBandOutputStream.HDR_SIZE; + final int buflen = MAX_BUF - HDR_SIZE; final byte[] buf = new byte[buflen]; for (int i = 0; i < buf.length; i++) { buf[i] = (byte) i; } final SideBandOutputStream out; - out = new SideBandOutputStream(SideBandOutputStream.CH_DATA, pckOut); + out = new SideBandOutputStream(CH_DATA, MAX_BUF, rawOut); out.write(buf); + out.flush(); final byte[] act = rawOut.toByteArray(); - final String explen = Integer.toString(buf.length + 5, 16); - assertEquals(5 + buf.length, act.length); + final String explen = Integer.toString(buf.length + HDR_SIZE, 16); + assertEquals(HDR_SIZE + buf.length, act.length); assertEquals(new String(act, 0, 4, "UTF-8"), explen); assertEquals(1, act[4]); - for (int i = 0, j = 5; i < buf.length; i++, j++) { + for (int i = 0, j = HDR_SIZE; i < buf.length; i++, j++) { assertEquals(buf[i], act[j]); } } @@ -132,17 +167,63 @@ public class SideBandOutputStreamTest extends TestCase { } }; - new SideBandOutputStream(SideBandOutputStream.CH_DATA, - new PacketLineOut(mockout)).flush(); - assertEquals(0, flushCnt[0]); - - new SideBandOutputStream(SideBandOutputStream.CH_ERROR, - new PacketLineOut(mockout)).flush(); + new SideBandOutputStream(CH_DATA, SMALL_BUF, mockout).flush(); assertEquals(1, flushCnt[0]); + } + + public void testConstructor_RejectsBadChannel() { + try { + new SideBandOutputStream(-1, MAX_BUF, rawOut); + fail("Accepted -1 channel number"); + } catch (IllegalArgumentException e) { + assertEquals("channel -1 must be in range [0, 255]", e.getMessage()); + } - new SideBandOutputStream(SideBandOutputStream.CH_PROGRESS, - new PacketLineOut(mockout)).flush(); - assertEquals(2, flushCnt[0]); + try { + new SideBandOutputStream(0, MAX_BUF, rawOut); + fail("Accepted 0 channel number"); + } catch (IllegalArgumentException e) { + assertEquals("channel 0 must be in range [0, 255]", e.getMessage()); + } + + try { + new SideBandOutputStream(256, MAX_BUF, rawOut); + fail("Accepted 256 channel number"); + } catch (IllegalArgumentException e) { + assertEquals("channel 256 must be in range [0, 255]", e + .getMessage()); + } + } + + public void testConstructor_RejectsBadBufferSize() { + try { + new SideBandOutputStream(CH_DATA, -1, rawOut); + fail("Accepted -1 for buffer size"); + } catch (IllegalArgumentException e) { + assertEquals("packet size -1 must be >= 5", e.getMessage()); + } + + try { + new SideBandOutputStream(CH_DATA, 0, rawOut); + fail("Accepted 0 for buffer size"); + } catch (IllegalArgumentException e) { + assertEquals("packet size 0 must be >= 5", e.getMessage()); + } + + try { + new SideBandOutputStream(CH_DATA, 1, rawOut); + fail("Accepted 1 for buffer size"); + } catch (IllegalArgumentException e) { + assertEquals("packet size 1 must be >= 5", e.getMessage()); + } + + try { + new SideBandOutputStream(CH_DATA, Integer.MAX_VALUE, rawOut); + fail("Accepted " + Integer.MAX_VALUE + " for buffer size"); + } catch (IllegalArgumentException e) { + assertEquals("packet size " + Integer.MAX_VALUE + + " must be <= 65520", e.getMessage()); + } } private void assertBuffer(final String exp) throws IOException { diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java index 81dd4f6a1..51506b20a 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/PacketLineOut.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2008-2009, Google Inc. + * Copyright (C) 2008-2010, Google Inc. * Copyright (C) 2008-2009, Robin Rosenberg * Copyright (C) 2008, Shawn O. Pearce * and other copyright owners as documented in the project's IP log. @@ -105,14 +105,6 @@ public class PacketLineOut { out.write(packet); } - void writeChannelPacket(final int channel, final byte[] buf, int off, - int len) throws IOException { - formatLength(len + 5); - lenbuffer[4] = (byte) channel; - out.write(lenbuffer, 0, 5); - out.write(buf, off, len); - } - /** * Write a packet end marker, sometimes referred to as a flush command. *

@@ -149,6 +141,10 @@ public class PacketLineOut { '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }; private void formatLength(int w) { + formatLength(lenbuffer, w); + } + + static void formatLength(byte[] lenbuffer, int w) { int o = 3; while (o >= 0 && w != 0) { lenbuffer[o--] = hexchar[w & 0xf]; diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java index 5e50fd89b..6e0a52627 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandOutputStream.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2008, Google Inc. + * Copyright (C) 2008-2010, Google Inc. * and other copyright owners as documented in the project's IP log. * * This program and the accompanying materials are made available @@ -47,11 +47,10 @@ import java.io.IOException; import java.io.OutputStream; /** - * Multiplexes data and progress messages + * Multiplexes data and progress messages. *

- * To correctly use this class you must wrap it in a BufferedOutputStream with a - * buffer size no larger than either {@link #SMALL_BUF} or {@link #MAX_BUF}, - * minus {@link #HDR_SIZE}. + * This stream is buffered at packet sizes, so the caller doesn't need to wrap + * it in yet another buffered stream. */ class SideBandOutputStream extends OutputStream { static final int CH_DATA = SideBandInputStream.CH_DATA; @@ -66,34 +65,93 @@ class SideBandOutputStream extends OutputStream { static final int HDR_SIZE = 5; - private final int channel; + private final OutputStream out; - private final PacketLineOut pckOut; + private final byte[] buffer; - private byte[] singleByteBuffer; + /** + * Number of bytes in {@link #buffer} that are valid data. + *

+ * Initialized to {@link #HDR_SIZE} if there is no application data in the + * buffer, as the packet header always appears at the start of the buffer. + */ + private int cnt; - SideBandOutputStream(final int chan, final PacketLineOut out) { - channel = chan; - pckOut = out; + /** + * Create a new stream to write side band packets. + * + * @param chan + * channel number to prefix all packets with, so the remote side + * can demultiplex the stream and get back the original data. + * Must be in the range [0, 255]. + * @param sz + * maximum size of a data packet within the stream. The remote + * side needs to agree to the packet size to prevent buffer + * overflows. Must be in the range [HDR_SIZE + 1, MAX_BUF). + * @param os + * stream that the packets are written onto. This stream should + * be attached to a SideBandInputStream on the remote side. + */ + SideBandOutputStream(final int chan, final int sz, final OutputStream os) { + if (chan <= 0 || chan > 255) + throw new IllegalArgumentException("channel " + chan + + " must be in range [0, 255]"); + if (sz <= HDR_SIZE) + throw new IllegalArgumentException("packet size " + sz + + " must be >= " + HDR_SIZE); + else if (MAX_BUF < sz) + throw new IllegalArgumentException("packet size " + sz + + " must be <= " + MAX_BUF); + + out = os; + buffer = new byte[sz]; + buffer[4] = (byte) chan; + cnt = HDR_SIZE; } @Override public void flush() throws IOException { - if (channel != CH_DATA) - pckOut.flush(); + if (HDR_SIZE < cnt) + writeBuffer(); + out.flush(); } @Override - public void write(final byte[] b, final int off, final int len) - throws IOException { - pckOut.writeChannelPacket(channel, b, off, len); + public void write(final byte[] b, int off, int len) throws IOException { + while (0 < len) { + int capacity = buffer.length - cnt; + if (cnt == HDR_SIZE && capacity < len) { + // Our block to write is bigger than the packet size, + // stream it out as-is to avoid unnecessary copies. + PacketLineOut.formatLength(buffer, buffer.length); + out.write(buffer, 0, HDR_SIZE); + out.write(b, off, capacity); + off += capacity; + len -= capacity; + + } else { + if (capacity == 0) + writeBuffer(); + + int n = Math.min(len, capacity); + System.arraycopy(b, off, buffer, cnt, n); + cnt += n; + off += n; + len -= n; + } + } } @Override public void write(final int b) throws IOException { - if (singleByteBuffer == null) - singleByteBuffer = new byte[1]; - singleByteBuffer[0] = (byte) b; - write(singleByteBuffer); + if (cnt == buffer.length) + writeBuffer(); + buffer[cnt++] = (byte) b; + } + + private void writeBuffer() throws IOException { + PacketLineOut.formatLength(buffer, cnt); + out.write(buffer, 0, cnt); + cnt = HDR_SIZE; } } diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandProgressMonitor.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandProgressMonitor.java index 89d338c89..efce7b1da 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandProgressMonitor.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/SideBandProgressMonitor.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2008, Google Inc. + * Copyright (C) 2008-2010, Google Inc. * and other copyright owners as documented in the project's IP log. * * This program and the accompanying materials are made available @@ -43,7 +43,7 @@ package org.eclipse.jgit.transport; -import java.io.BufferedOutputStream; +import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; @@ -66,12 +66,8 @@ class SideBandProgressMonitor implements ProgressMonitor { private int totalWork; - SideBandProgressMonitor(final PacketLineOut pckOut) { - final int bufsz = SideBandOutputStream.SMALL_BUF - - SideBandOutputStream.HDR_SIZE; - out = new PrintWriter(new OutputStreamWriter(new BufferedOutputStream( - new SideBandOutputStream(SideBandOutputStream.CH_PROGRESS, - pckOut), bufsz), Constants.CHARSET)); + SideBandProgressMonitor(final OutputStream os) { + out = new PrintWriter(new OutputStreamWriter(os, Constants.CHARSET)); } public void start(final int totalTasks) { diff --git a/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java b/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java index b76b22b77..39c4243ba 100644 --- a/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java +++ b/org.eclipse.jgit/src/org/eclipse/jgit/transport/UploadPack.java @@ -43,9 +43,6 @@ package org.eclipse.jgit.transport; -import static org.eclipse.jgit.transport.BasePackFetchConnection.MultiAck; - -import java.io.BufferedOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -70,6 +67,7 @@ import org.eclipse.jgit.revwalk.RevFlagSet; import org.eclipse.jgit.revwalk.RevObject; import org.eclipse.jgit.revwalk.RevTag; import org.eclipse.jgit.revwalk.RevWalk; +import org.eclipse.jgit.transport.BasePackFetchConnection.MultiAck; import org.eclipse.jgit.transport.RefAdvertiser.PacketLineOutRefAdvertiser; import org.eclipse.jgit.util.io.InterruptTimer; import org.eclipse.jgit.util.io.TimeoutInputStream; @@ -556,13 +554,12 @@ public class UploadPack { int bufsz = SideBandOutputStream.SMALL_BUF; if (options.contains(OPTION_SIDE_BAND_64K)) bufsz = SideBandOutputStream.MAX_BUF; - bufsz -= SideBandOutputStream.HDR_SIZE; - - packOut = new BufferedOutputStream(new SideBandOutputStream( - SideBandOutputStream.CH_DATA, pckOut), bufsz); + packOut = new SideBandOutputStream(SideBandOutputStream.CH_DATA, + bufsz, rawOut); if (progress) - pm = new SideBandProgressMonitor(pckOut); + pm = new SideBandProgressMonitor(new SideBandOutputStream( + SideBandOutputStream.CH_PROGRESS, bufsz, rawOut)); } final PackWriter pw;