Browse Source

Add a minimum negotiation feature for fetch

Android an Chrome have several repos with >300k refs. We sometimes see
negotiations of >100k rounds. This change provides a "minimal negotiation"
feature on the client side that limits how many "have" lines the client
sends. The client extracts the current SHA-1 values for the refs in its
wants set, and terminates negotiation early when all of those values have
been sent as haves. If a new branch is being fetched then that set will
be empty and the client will terminate after current default minimum
of two rounds.

This feature is gated behind a "fetch.useminimalnegotiation" configuration
flag, which defaults to false.

Change-Id: Ib12b095cac76a59da6e8f72773c4129e3b32ff2b
Signed-off-by: Terry Parker <tparker@google.com>
stable-4.11
Terry Parker 7 years ago
parent
commit
9530c10192
  1. 85
      org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/TestProtocolTest.java
  2. 83
      org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackFetchConnection.java
  3. 17
      org.eclipse.jgit/src/org/eclipse/jgit/transport/TestProtocol.java

85
org.eclipse.jgit.test/tst/org/eclipse/jgit/transport/TestProtocolTest.java

@ -60,6 +60,9 @@ import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
import org.eclipse.jgit.junit.TestRepository;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.storage.pack.PackStatistics;
import org.eclipse.jgit.transport.BasePackFetchConnection.FetchConfig;
import org.eclipse.jgit.transport.resolver.ReceivePackFactory;
import org.eclipse.jgit.transport.resolver.ServiceNotAuthorizedException;
import org.eclipse.jgit.transport.resolver.UploadPackFactory;
@ -70,6 +73,11 @@ import org.junit.Test;
public class TestProtocolTest {
private static final RefSpec HEADS = new RefSpec("+refs/heads/*:refs/heads/*");
private static final RefSpec MASTER = new RefSpec(
"+refs/heads/master:refs/heads/master");
private static final int HAVES_PER_ROUND = 32;
private static class User {
private final String name;
@ -81,7 +89,14 @@ public class TestProtocolTest {
private static class DefaultUpload implements UploadPackFactory<User> {
@Override
public UploadPack create(User req, Repository db) {
return new UploadPack(db);
UploadPack up = new UploadPack(db);
up.setPostUploadHook(new PostUploadHook() {
@Override
public void onPostUpload(PackStatistics stats) {
havesCount = stats.getHaves();
}
});
return up;
}
}
@ -92,6 +107,8 @@ public class TestProtocolTest {
}
}
private static long havesCount;
private List<TransportProtocol> protos;
private TestRepository<InMemoryRepository> local;
private TestRepository<InMemoryRepository> remote;
@ -146,6 +163,68 @@ public class TestProtocolTest {
}
}
@Test
public void testFullNegotiation() throws Exception {
TestProtocol<User> proto = registerDefault();
URIish uri = proto.register(new User("user"), remote.getRepository());
// Enough local branches to cause 10 rounds of negotiation,
// and a unique remote master branch commit with a later timestamp.
for (int i = 0; i < 10 * HAVES_PER_ROUND; i++) {
local.branch("local-branch-" + i).commit().create();
}
remote.tick(11 * HAVES_PER_ROUND);
RevCommit master = remote.branch("master").commit()
.add("readme.txt", "unique commit").create();
try (Git git = new Git(local.getRepository())) {
assertNull(local.getRepository().exactRef("refs/heads/master"));
git.fetch().setRemote(uri.toString()).setRefSpecs(MASTER).call();
assertEquals(master, local.getRepository()
.exactRef("refs/heads/master").getObjectId());
assertEquals(10 * HAVES_PER_ROUND, havesCount);
}
}
@Test
public void testMinimalNegotiation() throws Exception {
TestProtocol<User> proto = registerDefault();
URIish uri = proto.register(new User("user"), remote.getRepository());
// Enough local branches to cause 10 rounds of negotiation,
// and a unique remote master branch commit with a later timestamp.
for (int i = 0; i < 10 * HAVES_PER_ROUND; i++) {
local.branch("local-branch-" + i).commit().create();
}
remote.tick(11 * HAVES_PER_ROUND);
RevCommit master = remote.branch("master").commit()
.add("readme.txt", "unique commit").create();
TestProtocol.setFetchConfig(new FetchConfig(true, true));
try (Git git = new Git(local.getRepository())) {
assertNull(local.getRepository().exactRef("refs/heads/master"));
git.fetch().setRemote(uri.toString()).setRefSpecs(MASTER).call();
assertEquals(master, local.getRepository()
.exactRef("refs/heads/master").getObjectId());
assertTrue(havesCount <= 2 * HAVES_PER_ROUND);
// Update the remote master and add local branches for 5 more rounds
// of negotiation, where the local branch commits have newer
// timestamps. Negotiation should send 5 rounds for those newer
// branches before getting to the round that sends its stale version
// of master.
master = remote.branch("master").commit().parent(master).create();
local.tick(2 * HAVES_PER_ROUND);
for (int i = 0; i < 5 * HAVES_PER_ROUND; i++) {
local.branch("local-" + i).commit().create();
}
git.fetch().setRemote(uri.toString()).setRefSpecs(MASTER).call();
assertEquals(master, local.getRepository()
.exactRef("refs/heads/master").getObjectId());
assertEquals(6 * HAVES_PER_ROUND, havesCount);
}
}
@Test
public void testUploadPackFactory() throws Exception {
ObjectId master = remote.branch("master").commit().create();
@ -171,7 +250,7 @@ public class TestProtocolTest {
try {
git.fetch()
.setRemote(user1Uri.toString())
.setRefSpecs(HEADS)
.setRefSpecs(MASTER)
.call();
} catch (InvalidRemoteException expected) {
// Expected.
@ -181,7 +260,7 @@ public class TestProtocolTest {
git.fetch()
.setRemote(user2Uri.toString())
.setRefSpecs(HEADS)
.setRefSpecs(MASTER)
.call();
assertEquals(1, rejected.get());
assertEquals(master,

83
org.eclipse.jgit/src/org/eclipse/jgit/transport/BasePackFetchConnection.java

@ -54,6 +54,7 @@ import java.text.MessageFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@ -230,6 +231,8 @@ public abstract class BasePackFetchConnection extends BasePackConnection
private boolean noProgress;
private Set<AnyObjectId> minimalNegotiationSet;
private String lockMessage;
private PackLock packLock;
@ -249,8 +252,11 @@ public abstract class BasePackFetchConnection extends BasePackConnection
super(packTransport);
if (local != null) {
final FetchConfig cfg = local.getConfig().get(FetchConfig::new);
final FetchConfig cfg = getFetchConfig();
allowOfsDelta = cfg.allowOfsDelta;
if (cfg.minimalNegotiation) {
minimalNegotiationSet = new HashSet<>();
}
} else {
allowOfsDelta = true;
}
@ -277,11 +283,20 @@ public abstract class BasePackFetchConnection extends BasePackConnection
}
}
private static class FetchConfig {
static class FetchConfig {
final boolean allowOfsDelta;
final boolean minimalNegotiation;
FetchConfig(final Config c) {
allowOfsDelta = c.getBoolean("repack", "usedeltabaseoffset", true); //$NON-NLS-1$ //$NON-NLS-2$
minimalNegotiation = c.getBoolean("fetch", "useminimalnegotiation", //$NON-NLS-1$ //$NON-NLS-2$
false);
}
FetchConfig(boolean allowOfsDelta, boolean minimalNegotiation) {
this.allowOfsDelta = allowOfsDelta;
this.minimalNegotiation = minimalNegotiation;
}
}
@ -391,6 +406,10 @@ public abstract class BasePackFetchConnection extends BasePackConnection
super.close();
}
FetchConfig getFetchConfig() {
return local.getConfig().get(FetchConfig::new);
}
private int maxTimeWanted(final Collection<Ref> wants) {
int maxTime = 0;
for (final Ref r : wants) {
@ -492,9 +511,19 @@ public abstract class BasePackFetchConnection extends BasePackConnection
}
line.append('\n');
p.writeString(line.toString());
if (minimalNegotiationSet != null) {
Ref current = local.exactRef(r.getName());
if (current != null) {
ObjectId o = current.getObjectId();
if (o != null && !o.equals(ObjectId.zeroId())) {
minimalNegotiationSet.add(o);
}
}
}
if (first)
}
if (first) {
return false;
}
p.end();
outNeedsEnd = false;
return true;
@ -549,18 +578,24 @@ public abstract class BasePackFetchConnection extends BasePackConnection
boolean receivedAck = false;
boolean receivedReady = false;
if (statelessRPC)
if (statelessRPC) {
state.writeTo(out, null);
}
negotiateBegin();
SEND_HAVES: for (;;) {
final RevCommit c = walk.next();
if (c == null)
if (c == null) {
break SEND_HAVES;
}
pckOut.writeString("have " + c.getId().name() + "\n"); //$NON-NLS-1$ //$NON-NLS-2$
ObjectId o = c.getId();
pckOut.writeString("have " + o.name() + "\n"); //$NON-NLS-1$ //$NON-NLS-2$
havesSent++;
havesSinceLastContinue++;
if (minimalNegotiationSet != null) {
minimalNegotiationSet.remove(o);
}
if ((31 & havesSent) != 0) {
// We group the have lines into blocks of 32, each marked
@ -570,8 +605,9 @@ public abstract class BasePackFetchConnection extends BasePackConnection
continue;
}
if (monitor.isCancelled())
if (monitor.isCancelled()) {
throw new CancelledException();
}
pckOut.end();
resultsPending++; // Each end will cause a result to come back.
@ -593,6 +629,13 @@ public abstract class BasePackFetchConnection extends BasePackConnection
// pack on the remote side. Keep doing that.
//
resultsPending--;
if (minimalNegotiationSet != null
&& minimalNegotiationSet.isEmpty()) {
// Minimal negotiation was requested and we sent out our
// current reference values for our wants, so terminate
// negotiation early.
break SEND_HAVES;
}
break READ_RESULT;
case ACK:
@ -603,8 +646,9 @@ public abstract class BasePackFetchConnection extends BasePackConnection
multiAck = MultiAck.OFF;
resultsPending = 0;
receivedAck = true;
if (statelessRPC)
if (statelessRPC) {
state.writeTo(out, null);
}
break SEND_HAVES;
case ACK_CONTINUE:
@ -619,19 +663,28 @@ public abstract class BasePackFetchConnection extends BasePackConnection
receivedAck = true;
receivedContinue = true;
havesSinceLastContinue = 0;
if (anr == AckNackResult.ACK_READY)
if (anr == AckNackResult.ACK_READY) {
receivedReady = true;
}
if (minimalNegotiationSet != null && minimalNegotiationSet.isEmpty()) {
// Minimal negotiation was requested and we sent out our current reference
// values for our wants, so terminate negotiation early.
break SEND_HAVES;
}
break;
}
if (monitor.isCancelled())
if (monitor.isCancelled()) {
throw new CancelledException();
}
}
if (noDone & receivedReady)
if (noDone & receivedReady) {
break SEND_HAVES;
if (statelessRPC)
}
if (statelessRPC) {
state.writeTo(out, null);
}
if (receivedContinue && havesSinceLastContinue > MAX_HAVES) {
// Our history must be really different from the remote's.
@ -645,8 +698,9 @@ public abstract class BasePackFetchConnection extends BasePackConnection
// Tell the remote side we have run out of things to talk about.
//
if (monitor.isCancelled())
if (monitor.isCancelled()) {
throw new CancelledException();
}
if (!receivedReady || !noDone) {
// When statelessRPC is true we should always leave SEND_HAVES
@ -691,10 +745,11 @@ public abstract class BasePackFetchConnection extends BasePackConnection
break;
}
if (monitor.isCancelled())
if (monitor.isCancelled()) {
throw new CancelledException();
}
}
}
private void negotiateBegin() throws IOException {
walk.resetRetain(REACHABLE, ADVERTISED);

17
org.eclipse.jgit/src/org/eclipse/jgit/transport/TestProtocol.java

@ -54,6 +54,7 @@ import org.eclipse.jgit.errors.NotSupportedException;
import org.eclipse.jgit.errors.TransportException;
import org.eclipse.jgit.internal.JGitText;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.transport.BasePackFetchConnection.FetchConfig;
import org.eclipse.jgit.transport.resolver.ReceivePackFactory;
import org.eclipse.jgit.transport.resolver.UploadPackFactory;
@ -78,6 +79,8 @@ import org.eclipse.jgit.transport.resolver.UploadPackFactory;
public class TestProtocol<C> extends TransportProtocol {
private static final String SCHEME = "test"; //$NON-NLS-1$
private static FetchConfig fetchConfig;
private class Handle {
final C req;
final Repository remote;
@ -147,6 +150,10 @@ public class TestProtocol<C> extends TransportProtocol {
return Collections.emptySet();
}
static void setFetchConfig(FetchConfig c) {
fetchConfig = c;
}
/**
* Register a repository connection over the internal test protocol.
*
@ -184,8 +191,14 @@ public class TestProtocol<C> extends TransportProtocol {
public FetchConnection openFetch() throws NotSupportedException,
TransportException {
handle.remote.incrementOpen();
return new InternalFetchConnection<>(
this, uploadPackFactory, handle.req, handle.remote);
return new InternalFetchConnection<C>(this, uploadPackFactory,
handle.req, handle.remote) {
@Override
FetchConfig getFetchConfig() {
return fetchConfig != null ? fetchConfig
: super.getFetchConfig();
}
};
}
@Override

Loading…
Cancel
Save