Browse Source

RefDirectory: Add in-process fair lock for atomic updates

In a server scenario such as Gerrit Code Review, there may be many
atomic BatchRefUpdates contending for locks on both the packed-refs file
and some subset of loose refs. We already retry lock acquisition to
improve this situation slightly, but we can do better by using an
in-process lock. This way, instead of retrying and potentially exceeding
their timeout, different threads sharing the same Repository instance
can wait on a fair lock without having to touch the disk lock. Since a
server is probably already using RepositoryCache anyway, there is a high
likelihood of reusing the Repository instance.

Change-Id: If5dd1dc58f0ce62f26131fd5965a0e21a80e8bd3
stable-4.9
Dave Borowitz 7 years ago
parent
commit
45da0fc6f7
  1. 55
      org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/file/BatchRefUpdateTest.java
  2. 7
      org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/file/PackedBatchRefUpdate.java
  3. 198
      org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/file/RefDirectory.java

55
org.eclipse.jgit.test/tst/org/eclipse/jgit/internal/storage/file/BatchRefUpdateTest.java

@ -43,6 +43,8 @@
package org.eclipse.jgit.internal.storage.file; package org.eclipse.jgit.internal.storage.file;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.eclipse.jgit.internal.storage.file.BatchRefUpdateTest.Result.LOCK_FAILURE; import static org.eclipse.jgit.internal.storage.file.BatchRefUpdateTest.Result.LOCK_FAILURE;
import static org.eclipse.jgit.internal.storage.file.BatchRefUpdateTest.Result.OK; import static org.eclipse.jgit.internal.storage.file.BatchRefUpdateTest.Result.OK;
import static org.eclipse.jgit.internal.storage.file.BatchRefUpdateTest.Result.REJECTED_MISSING_OBJECT; import static org.eclipse.jgit.internal.storage.file.BatchRefUpdateTest.Result.REJECTED_MISSING_OBJECT;
@ -58,6 +60,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -67,6 +70,7 @@ import java.util.Collections;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate; import java.util.function.Predicate;
import org.eclipse.jgit.junit.LocalDiskRepositoryTestCase; import org.eclipse.jgit.junit.LocalDiskRepositoryTestCase;
@ -666,6 +670,57 @@ public class BatchRefUpdateTest extends LocalDiskRepositoryTestCase {
} }
} }
@Test
public void atomicUpdateRespectsInProcessLock() throws Exception {
assumeTrue(atomic);
writeLooseRef("refs/heads/master", A);
List<ReceiveCommand> cmds = Arrays.asList(
new ReceiveCommand(A, B, "refs/heads/master", UPDATE),
new ReceiveCommand(zeroId(), B, "refs/heads/branch", CREATE));
Thread t = new Thread(() -> {
try {
execute(newBatchUpdate(cmds).setAllowNonFastForwards(true));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
ReentrantLock l = refdir.inProcessPackedRefsLock;
l.lock();
try {
t.start();
long timeoutSecs = 10;
long startNanos = System.nanoTime();
// Hold onto the lock until we observe the worker thread has attempted to
// acquire it.
while (l.getQueueLength() == 0) {
long elapsedNanos = System.nanoTime() - startNanos;
assertTrue(
"timed out waiting for work thread to attempt to acquire lock",
NANOSECONDS.toSeconds(elapsedNanos) < timeoutSecs);
Thread.sleep(3);
}
// Once we unlock, the worker thread should finish the update promptly.
l.unlock();
t.join(SECONDS.toMillis(timeoutSecs));
assertFalse(t.isAlive());
} finally {
if (l.isHeldByCurrentThread()) {
l.unlock();
}
}
assertResults(cmds, OK, OK);
assertRefs(
"refs/heads/master", B,
"refs/heads/branch", B);
}
private void writeLooseRef(String name, AnyObjectId id) throws IOException { private void writeLooseRef(String name, AnyObjectId id) throws IOException {
write(new File(diskRepo.getDirectory(), name), id.name() + "\n"); write(new File(diskRepo.getDirectory(), name), id.name() + "\n");
} }

7
org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/file/PackedBatchRefUpdate.java

@ -177,6 +177,7 @@ class PackedBatchRefUpdate extends BatchRefUpdate {
} }
Map<String, LockFile> locks = null; Map<String, LockFile> locks = null;
refdb.inProcessPackedRefsLock.lock();
try { try {
locks = lockLooseRefs(pending); locks = lockLooseRefs(pending);
if (locks == null) { if (locks == null) {
@ -195,7 +196,11 @@ class PackedBatchRefUpdate extends BatchRefUpdate {
// commitPackedRefs removes lock file (by renaming over real file). // commitPackedRefs removes lock file (by renaming over real file).
refdb.commitPackedRefs(packedRefsLock, newRefs, oldPackedList); refdb.commitPackedRefs(packedRefsLock, newRefs, oldPackedList);
} finally { } finally {
unlockAll(locks); try {
unlockAll(locks);
} finally {
refdb.inProcessPackedRefsLock.unlock();
}
} }
refdb.fireRefsChanged(); refdb.fireRefsChanged();

198
org.eclipse.jgit/src/org/eclipse/jgit/internal/storage/file/RefDirectory.java

@ -75,6 +75,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jgit.annotations.NonNull; import org.eclipse.jgit.annotations.NonNull;
import org.eclipse.jgit.annotations.Nullable; import org.eclipse.jgit.annotations.Nullable;
@ -166,6 +167,22 @@ public class RefDirectory extends RefDatabase {
/** Immutable sorted list of packed references. */ /** Immutable sorted list of packed references. */
final AtomicReference<PackedRefList> packedRefs = new AtomicReference<>(); final AtomicReference<PackedRefList> packedRefs = new AtomicReference<>();
/**
* Lock for coordinating operations within a single process that may contend
* on the {@code packed-refs} file.
* <p>
* All operations that write {@code packed-refs} must still acquire a
* {@link LockFile} on {@link #packedRefsFile}, even after they have acquired
* this lock, since there may be multiple {@link RefDirectory} instances or
* other processes operating on the same repo on disk.
* <p>
* This lock exists so multiple threads in the same process can wait in a fair
* queue without trying, failing, and retrying to acquire the on-disk lock. If
* {@code RepositoryCache} is used, this lock instance will be used by all
* threads.
*/
final ReentrantLock inProcessPackedRefsLock = new ReentrantLock(true);
/** /**
* Number of modifications made to this database. * Number of modifications made to this database.
* <p> * <p>
@ -610,14 +627,19 @@ public class RefDirectory extends RefDatabase {
// we don't miss an edit made externally. // we don't miss an edit made externally.
final PackedRefList packed = getPackedRefs(); final PackedRefList packed = getPackedRefs();
if (packed.contains(name)) { if (packed.contains(name)) {
LockFile lck = lockPackedRefsOrThrow(); inProcessPackedRefsLock.lock();
try { try {
PackedRefList cur = readPackedRefs(); LockFile lck = lockPackedRefsOrThrow();
int idx = cur.find(name); try {
if (0 <= idx) PackedRefList cur = readPackedRefs();
commitPackedRefs(lck, cur.remove(idx), packed); int idx = cur.find(name);
if (0 <= idx)
commitPackedRefs(lck, cur.remove(idx), packed);
} finally {
lck.unlock();
}
} finally { } finally {
lck.unlock(); inProcessPackedRefsLock.unlock();
} }
} }
@ -671,99 +693,103 @@ public class RefDirectory extends RefDatabase {
FS fs = parent.getFS(); FS fs = parent.getFS();
// Lock the packed refs file and read the content // Lock the packed refs file and read the content
LockFile lck = lockPackedRefsOrThrow(); inProcessPackedRefsLock.lock();
try { try {
final PackedRefList packed = getPackedRefs(); LockFile lck = lockPackedRefsOrThrow();
RefList<Ref> cur = readPackedRefs(); try {
final PackedRefList packed = getPackedRefs();
// Iterate over all refs to be packed RefList<Ref> cur = readPackedRefs();
boolean dirty = false;
for (String refName : refs) { // Iterate over all refs to be packed
Ref oldRef = readRef(refName, cur); boolean dirty = false;
if (oldRef == null) { for (String refName : refs) {
continue; // A non-existent ref is already correctly packed. Ref oldRef = readRef(refName, cur);
} if (oldRef == null) {
if (oldRef.isSymbolic()) { continue; // A non-existent ref is already correctly packed.
continue; // can't pack symbolic refs }
} if (oldRef.isSymbolic()) {
// Add/Update it to packed-refs continue; // can't pack symbolic refs
Ref newRef = peeledPackedRef(oldRef); }
if (newRef == oldRef) { // Add/Update it to packed-refs
// No-op; peeledPackedRef returns the input ref only if it's already Ref newRef = peeledPackedRef(oldRef);
// packed, and readRef returns a packed ref only if there is no loose if (newRef == oldRef) {
// ref. // No-op; peeledPackedRef returns the input ref only if it's already
continue; // packed, and readRef returns a packed ref only if there is no
} // loose ref.
continue;
}
dirty = true; dirty = true;
int idx = cur.find(refName); int idx = cur.find(refName);
if (idx >= 0) { if (idx >= 0) {
cur = cur.set(idx, newRef); cur = cur.set(idx, newRef);
} else { } else {
cur = cur.add(idx, newRef); cur = cur.add(idx, newRef);
}
} }
} if (!dirty) {
if (!dirty) { // All requested refs were already packed accurately
// All requested refs were already packed accurately return packed;
return packed;
}
// The new content for packed-refs is collected. Persist it.
PackedRefList result = commitPackedRefs(lck, cur, packed);
// Now delete the loose refs which are now packed
for (String refName : refs) {
// Lock the loose ref
File refFile = fileFor(refName);
if (!fs.exists(refFile)) {
continue;
} }
LockFile rLck = heldLocks.get(refName); // The new content for packed-refs is collected. Persist it.
boolean shouldUnlock; PackedRefList result = commitPackedRefs(lck, cur, packed);
if (rLck == null) {
rLck = new LockFile(refFile);
if (!rLck.lock()) {
continue;
}
shouldUnlock = true;
} else {
shouldUnlock = false;
}
try { // Now delete the loose refs which are now packed
LooseRef currentLooseRef = scanRef(null, refName); for (String refName : refs) {
if (currentLooseRef == null || currentLooseRef.isSymbolic()) { // Lock the loose ref
File refFile = fileFor(refName);
if (!fs.exists(refFile)) {
continue; continue;
} }
Ref packedRef = cur.get(refName);
ObjectId clr_oid = currentLooseRef.getObjectId(); LockFile rLck = heldLocks.get(refName);
if (clr_oid != null boolean shouldUnlock;
&& clr_oid.equals(packedRef.getObjectId())) { if (rLck == null) {
RefList<LooseRef> curLoose, newLoose; rLck = new LockFile(refFile);
do { if (!rLck.lock()) {
curLoose = looseRefs.get(); continue;
int idx = curLoose.find(refName); }
if (idx < 0) { shouldUnlock = true;
break; } else {
} shouldUnlock = false;
newLoose = curLoose.remove(idx);
} while (!looseRefs.compareAndSet(curLoose, newLoose));
int levels = levelsIn(refName) - 2;
delete(refFile, levels, rLck);
} }
} finally {
if (shouldUnlock) { try {
rLck.unlock(); LooseRef currentLooseRef = scanRef(null, refName);
if (currentLooseRef == null || currentLooseRef.isSymbolic()) {
continue;
}
Ref packedRef = cur.get(refName);
ObjectId clr_oid = currentLooseRef.getObjectId();
if (clr_oid != null
&& clr_oid.equals(packedRef.getObjectId())) {
RefList<LooseRef> curLoose, newLoose;
do {
curLoose = looseRefs.get();
int idx = curLoose.find(refName);
if (idx < 0) {
break;
}
newLoose = curLoose.remove(idx);
} while (!looseRefs.compareAndSet(curLoose, newLoose));
int levels = levelsIn(refName) - 2;
delete(refFile, levels, rLck);
}
} finally {
if (shouldUnlock) {
rLck.unlock();
}
} }
} }
// Don't fire refsChanged. The refs have not change, only their
// storage.
return result;
} finally {
lck.unlock();
} }
// Don't fire refsChanged. The refs have not change, only their
// storage.
return result;
} finally { } finally {
lck.unlock(); inProcessPackedRefsLock.unlock();
} }
} }

Loading…
Cancel
Save