Browse Source

Make the FileLfsRepository thread safe

The FileLfsRepository.out member could have been accessed from multiple
threads which would corrupt the content.

Don't store the AtomicObjectOutputStream in the FileLfsRepository.out but
move it to the ObjectUploadListener which is instantiated per-request.

Add a parallel upload test.

Change-Id: I62298630e99c46b500d376843ffcde934436215b
Signed-off-by: Saša Živkov <sasa.zivkov@sap.com>
Signed-off-by: Matthias Sohn <matthias.sohn@sap.com>
stable-4.3
Saša Živkov 9 years ago
parent
commit
b72fc2b494
  1. 39
      org.eclipse.jgit.lfs.server.test/tst/org/eclipse/jgit/lfs/server/fs/UploadTest.java
  2. 17
      org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/FileLfsRepository.java
  3. 20
      org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/ObjectUploadListener.java

39
org.eclipse.jgit.lfs.server.test/tst/org/eclipse/jgit/lfs/server/fs/UploadTest.java

@ -51,6 +51,13 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.eclipse.jgit.lfs.lib.AnyLongObjectId; import org.eclipse.jgit.lfs.lib.AnyLongObjectId;
import org.eclipse.jgit.lfs.lib.LongObjectId; import org.eclipse.jgit.lfs.lib.LongObjectId;
@ -98,4 +105,36 @@ public class UploadTest extends LfsServerTest {
assertEquals("expected object length " + Files.size(f), Files.size(f), assertEquals("expected object length " + Files.size(f), Files.size(f),
repository.getSize(id)); repository.getSize(id));
} }
@Test
public void testParallelUploads() throws Exception {
int count = 10;
List<Path> paths = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
Path f = Paths.get(getTempDirectory().toString(),
"largeRandomFile_" + i);
createPseudoRandomContentFile(f, 1 * MiB);
paths.add(f);
}
final CyclicBarrier barrier = new CyclicBarrier(count);
ExecutorService e = Executors.newFixedThreadPool(count);
try {
for (final Path p : paths) {
e.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
barrier.await();
putContent(p);
return null;
}
});
}
} finally {
e.shutdown();
e.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
}
} }

17
org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/FileLfsRepository.java

@ -45,10 +45,8 @@ package org.eclipse.jgit.lfs.server.fs;
import static org.eclipse.jgit.util.HttpSupport.HDR_AUTHORIZATION; import static org.eclipse.jgit.util.HttpSupport.HDR_AUTHORIZATION;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
@ -70,7 +68,6 @@ public class FileLfsRepository implements LargeFileRepository {
private final String url; private final String url;
private final Path dir; private final Path dir;
private AtomicObjectOutputStream out;
/** /**
* @param url * @param url
@ -147,21 +144,11 @@ public class FileLfsRepository implements LargeFileRepository {
return FileChannel.open(getPath(id), StandardOpenOption.READ); return FileChannel.open(getPath(id), StandardOpenOption.READ);
} }
WritableByteChannel getWriteChannel(AnyLongObjectId id) AtomicObjectOutputStream getOutputStream(AnyLongObjectId id)
throws IOException { throws IOException {
Path path = getPath(id); Path path = getPath(id);
Files.createDirectories(path.getParent()); Files.createDirectories(path.getParent());
out = new AtomicObjectOutputStream(path, id); return new AtomicObjectOutputStream(path, id);
return Channels.newChannel(out);
}
/**
* Abort the output stream
*/
void abortWrite() {
if (out != null) {
out.abort();
}
} }
private static char[] toHexCharArray(int b) { private static char[] toHexCharArray(int b) {

20
org.eclipse.jgit.lfs.server/src/org/eclipse/jgit/lfs/server/fs/ObjectUploadListener.java

@ -74,13 +74,13 @@ class ObjectUploadListener implements ReadListener {
private final HttpServletResponse response; private final HttpServletResponse response;
private FileLfsRepository repository;
private final ServletInputStream in; private final ServletInputStream in;
private final ReadableByteChannel inChannel; private final ReadableByteChannel inChannel;
private WritableByteChannel out; private final AtomicObjectOutputStream out;
private WritableByteChannel channel;
private final ByteBuffer buffer = ByteBuffer.allocateDirect(8192); private final ByteBuffer buffer = ByteBuffer.allocateDirect(8192);
@ -98,12 +98,12 @@ class ObjectUploadListener implements ReadListener {
AsyncContext context, HttpServletRequest request, AsyncContext context, HttpServletRequest request,
HttpServletResponse response, AnyLongObjectId id) HttpServletResponse response, AnyLongObjectId id)
throws FileNotFoundException, IOException { throws FileNotFoundException, IOException {
this.repository = repository;
this.context = context; this.context = context;
this.response = response; this.response = response;
this.in = request.getInputStream(); this.in = request.getInputStream();
this.inChannel = Channels.newChannel(in); this.inChannel = Channels.newChannel(in);
this.out = repository.getWriteChannel(id); this.out = repository.getOutputStream(id);
this.channel = Channels.newChannel(out);
response.setContentType(Constants.CONTENT_TYPE_GIT_LFS_JSON); response.setContentType(Constants.CONTENT_TYPE_GIT_LFS_JSON);
} }
@ -117,12 +117,12 @@ class ObjectUploadListener implements ReadListener {
while (in.isReady()) { while (in.isReady()) {
if (inChannel.read(buffer) > 0) { if (inChannel.read(buffer) > 0) {
buffer.flip(); buffer.flip();
out.write(buffer); channel.write(buffer);
buffer.compact(); buffer.compact();
} else { } else {
buffer.flip(); buffer.flip();
while (buffer.hasRemaining()) { while (buffer.hasRemaining()) {
out.write(buffer); channel.write(buffer);
} }
close(); close();
return; return;
@ -141,7 +141,7 @@ class ObjectUploadListener implements ReadListener {
protected void close() throws IOException { protected void close() throws IOException {
try { try {
inChannel.close(); inChannel.close();
out.close(); channel.close();
// TODO check if status 200 is ok for PUT request, HTTP foresees 204 // TODO check if status 200 is ok for PUT request, HTTP foresees 204
// for successful PUT without response body // for successful PUT without response body
response.setStatus(HttpServletResponse.SC_OK); response.setStatus(HttpServletResponse.SC_OK);
@ -157,9 +157,9 @@ class ObjectUploadListener implements ReadListener {
@Override @Override
public void onError(Throwable e) { public void onError(Throwable e) {
try { try {
repository.abortWrite(); out.abort();
inChannel.close(); inChannel.close();
out.close(); channel.close();
int status; int status;
if (e instanceof CorruptLongObjectException) { if (e instanceof CorruptLongObjectException) {
status = HttpStatus.SC_BAD_REQUEST; status = HttpStatus.SC_BAD_REQUEST;

Loading…
Cancel
Save