Browse Source
Using the ScheduledExecutorService is better than performing the pushes directly in the hook method, but it still has limitations: - Busy repositories can result in parallel pushes to the mirror (#23) - The ScheduledExecutorService being used is a shared resources, which can result in other callers being starved out A better approach is to use the BucketedExecutor. It offers several improvements that are tailor-made for the work being done here: - Buckets can be used to control what can be done in parallel - Tasks with the same key are grouped, and at most a single node in a Data Center cluster can run them - Tasks with different keys can be run in parallel - Work can be shared among cluster nodes, in Data Center installations - Tasks can be scheduled from one cluster node and run on another - Locking is inherent in the BucketedExecutor's design, which means the hook doesn't need to do any locking of its own - This means there aren't any blocked threads. This fixes issue #23 by ensuring at most a single push happens to any given mirror, and by allowing the overall concurrency, even in Data Center installations, to be controlled. (#34) While I was making changes, because the minimum supported version for the plugin is already Bitbucket Server 5.5, I switched the code over to the new PostRepositoryHook SPI. This _could_ be extended to allow the hook to respond to a wider array of events, so mirroring would be triggered after pull request merges, branch or tag creation, etc. However, for the sake of consistency, the current code still only triggers pushes after other pushes. (Related to #45) Other things: - Added the ability to configure a timeout (#39) - Added the ability to configure the number of retries, and the number of BucketedExecutor threads _per cluster node_ - Added the ability to pass -Dbitbucket.test.version=5.11.1 (or any other version) to test against a version of Bitbucket Server other than the one being compiled against - Marked the plugin Data Center-compatible - Simplified wiring for DefaultPasswordEncryptor and removed the init method from the PasswordEncryptor interfacepull/63/head
Bryan Turner
7 years ago
11 changed files with 531 additions and 453 deletions
@ -0,0 +1,147 @@ |
|||||||
|
package com.englishtown.bitbucket.hook; |
||||||
|
|
||||||
|
import com.atlassian.bitbucket.concurrent.BucketProcessor; |
||||||
|
import com.atlassian.bitbucket.i18n.I18nService; |
||||||
|
import com.atlassian.bitbucket.permission.Permission; |
||||||
|
import com.atlassian.bitbucket.repository.Repository; |
||||||
|
import com.atlassian.bitbucket.repository.RepositoryService; |
||||||
|
import com.atlassian.bitbucket.scm.Command; |
||||||
|
import com.atlassian.bitbucket.scm.ScmCommandBuilder; |
||||||
|
import com.atlassian.bitbucket.scm.ScmService; |
||||||
|
import com.atlassian.bitbucket.scm.git.command.GitCommandExitHandler; |
||||||
|
import com.atlassian.bitbucket.server.ApplicationPropertiesService; |
||||||
|
import com.atlassian.bitbucket.user.SecurityService; |
||||||
|
import com.google.common.base.Strings; |
||||||
|
import org.slf4j.Logger; |
||||||
|
import org.slf4j.LoggerFactory; |
||||||
|
|
||||||
|
import javax.annotation.Nonnull; |
||||||
|
import java.net.URI; |
||||||
|
import java.net.URISyntaxException; |
||||||
|
import java.time.Duration; |
||||||
|
import java.util.List; |
||||||
|
import java.util.Locale; |
||||||
|
|
||||||
|
import static com.englishtown.bitbucket.hook.MirrorRepositoryHook.PROP_PREFIX; |
||||||
|
|
||||||
|
public class MirrorBucketProcessor implements BucketProcessor<MirrorRequest> { |
||||||
|
|
||||||
|
static final String PROP_TIMEOUT = PROP_PREFIX + "timeout"; |
||||||
|
|
||||||
|
private static final String DEFAULT_REFSPEC = "+refs/heads/*:refs/heads/*"; |
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MirrorBucketProcessor.class); |
||||||
|
|
||||||
|
private final I18nService i18nService; |
||||||
|
private final PasswordEncryptor passwordEncryptor; |
||||||
|
private final RepositoryService repositoryService; |
||||||
|
private final ScmService scmService; |
||||||
|
private final SecurityService securityService; |
||||||
|
private final Duration timeout; |
||||||
|
|
||||||
|
public MirrorBucketProcessor(I18nService i18nService, PasswordEncryptor passwordEncryptor, |
||||||
|
ApplicationPropertiesService propertiesService, RepositoryService repositoryService, |
||||||
|
ScmService scmService, SecurityService securityService) { |
||||||
|
this.i18nService = i18nService; |
||||||
|
this.passwordEncryptor = passwordEncryptor; |
||||||
|
this.repositoryService = repositoryService; |
||||||
|
this.scmService = scmService; |
||||||
|
this.securityService = securityService; |
||||||
|
|
||||||
|
timeout = Duration.ofSeconds(propertiesService.getPluginProperty(PROP_TIMEOUT, 120L)); |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void process(@Nonnull String key, @Nonnull List<MirrorRequest> requests) { |
||||||
|
if (requests.isEmpty()) { |
||||||
|
return; |
||||||
|
} |
||||||
|
// Every request is for the same mirror URL, and the same repository ID. In case the
|
||||||
|
// settings (e.g. username/password) have been changed since the first request was
|
||||||
|
// queued, we process the _last_ request in the list. Since mirroring pushes all of
|
||||||
|
// the configured refspecs, any single request should roll up changes from any number
|
||||||
|
// of requests
|
||||||
|
MirrorRequest request = requests.get(requests.size() - 1); |
||||||
|
|
||||||
|
securityService.withPermission(Permission.REPO_READ, "Mirror changes") |
||||||
|
.call(() -> { |
||||||
|
Repository repository = repositoryService.getById(request.getRepositoryId()); |
||||||
|
if (repository == null) { |
||||||
|
log.debug("{}: Repository has been deleted", request.getRepositoryId()); |
||||||
|
return null; |
||||||
|
} |
||||||
|
if (repositoryService.isEmpty(repository)) { |
||||||
|
log.debug("{}: The repository is empty", repository); |
||||||
|
return null; |
||||||
|
} |
||||||
|
runMirrorCommand(request.getSettings(), repository); |
||||||
|
|
||||||
|
return null; |
||||||
|
}); |
||||||
|
} |
||||||
|
|
||||||
|
private void runMirrorCommand(MirrorSettings settings, Repository repository) { |
||||||
|
log.warn("{}: Preparing to push changes to mirror", repository); |
||||||
|
|
||||||
|
String password = passwordEncryptor.decrypt(settings.password); |
||||||
|
String authenticatedUrl = getAuthenticatedUrl(settings.mirrorRepoUrl, settings.username, password); |
||||||
|
|
||||||
|
// Call push command with the prune flag and refspecs for heads and tags
|
||||||
|
// Do not use the mirror flag as pull-request refs are included
|
||||||
|
ScmCommandBuilder<?> builder = scmService.createBuilder(repository) |
||||||
|
.command("push") |
||||||
|
.argument("--prune") // this deletes locally deleted branches
|
||||||
|
.argument(authenticatedUrl) |
||||||
|
.argument("--force"); |
||||||
|
|
||||||
|
// Use an atomic transaction to have a consistent state
|
||||||
|
if (settings.atomic) { |
||||||
|
builder.argument("--atomic"); |
||||||
|
} |
||||||
|
|
||||||
|
// Add refspec args
|
||||||
|
String refspecs = Strings.isNullOrEmpty(settings.refspec) ? DEFAULT_REFSPEC : settings.refspec; |
||||||
|
for (String refspec : refspecs.split("\\s|\\n")) { |
||||||
|
if (!Strings.isNullOrEmpty(refspec)) { |
||||||
|
builder.argument(refspec); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// Add tags refspec
|
||||||
|
if (settings.tags) { |
||||||
|
builder.argument("+refs/tags/*:refs/tags/*"); |
||||||
|
} |
||||||
|
// Add notes refspec
|
||||||
|
if (settings.notes) { |
||||||
|
builder.argument("+refs/notes/*:refs/notes/*"); |
||||||
|
} |
||||||
|
|
||||||
|
PasswordHandler passwordHandler = new PasswordHandler(settings.password, |
||||||
|
new GitCommandExitHandler(i18nService, repository)); |
||||||
|
|
||||||
|
Command<String> command = builder.errorHandler(passwordHandler) |
||||||
|
.exitHandler(passwordHandler) |
||||||
|
.build(passwordHandler); |
||||||
|
command.setTimeout(timeout); |
||||||
|
|
||||||
|
Object result = command.call(); |
||||||
|
log.warn("{}: Push completed with the following output:\n{}", repository, result); |
||||||
|
} |
||||||
|
|
||||||
|
String getAuthenticatedUrl(String mirrorRepoUrl, String username, String password) { |
||||||
|
// Only http(s) has username/password
|
||||||
|
if (!mirrorRepoUrl.toLowerCase(Locale.ROOT).startsWith("http")) { |
||||||
|
return mirrorRepoUrl; |
||||||
|
} |
||||||
|
|
||||||
|
URI uri = URI.create(mirrorRepoUrl); |
||||||
|
String userInfo = username + ":" + password; |
||||||
|
|
||||||
|
try { |
||||||
|
return new URI(uri.getScheme(), userInfo, uri.getHost(), uri.getPort(), |
||||||
|
uri.getPath(), uri.getQuery(), uri.getFragment()).toString(); |
||||||
|
} catch (URISyntaxException e) { |
||||||
|
throw new IllegalStateException("The configured mirror URL (" + mirrorRepoUrl + ") is invalid", e); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,33 @@ |
|||||||
|
package com.englishtown.bitbucket.hook; |
||||||
|
|
||||||
|
import com.atlassian.bitbucket.repository.Repository; |
||||||
|
|
||||||
|
import java.io.Serializable; |
||||||
|
|
||||||
|
class MirrorRequest implements Serializable { |
||||||
|
|
||||||
|
private final int repositoryId; |
||||||
|
private final MirrorSettings settings; |
||||||
|
|
||||||
|
MirrorRequest(Repository repository, MirrorSettings settings) { |
||||||
|
this(repository.getId(), settings); |
||||||
|
} |
||||||
|
|
||||||
|
MirrorRequest(int repositoryId, MirrorSettings settings) { |
||||||
|
this.repositoryId = repositoryId; |
||||||
|
this.settings = settings; |
||||||
|
} |
||||||
|
|
||||||
|
int getRepositoryId() { |
||||||
|
return repositoryId; |
||||||
|
} |
||||||
|
|
||||||
|
MirrorSettings getSettings() { |
||||||
|
return settings; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public String toString() { |
||||||
|
return repositoryId + ":" + settings.mirrorRepoUrl; |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,15 @@ |
|||||||
|
package com.englishtown.bitbucket.hook; |
||||||
|
|
||||||
|
import java.io.Serializable; |
||||||
|
|
||||||
|
class MirrorSettings implements Serializable { |
||||||
|
|
||||||
|
String mirrorRepoUrl; |
||||||
|
String username; |
||||||
|
String password; |
||||||
|
String suffix; |
||||||
|
String refspec; |
||||||
|
boolean tags; |
||||||
|
boolean notes; |
||||||
|
boolean atomic; |
||||||
|
} |
@ -0,0 +1,155 @@ |
|||||||
|
package com.englishtown.bitbucket.hook; |
||||||
|
|
||||||
|
import com.atlassian.bitbucket.i18n.I18nService; |
||||||
|
import com.atlassian.bitbucket.i18n.SimpleI18nService; |
||||||
|
import com.atlassian.bitbucket.repository.Repository; |
||||||
|
import com.atlassian.bitbucket.repository.RepositoryService; |
||||||
|
import com.atlassian.bitbucket.scm.ScmService; |
||||||
|
import com.atlassian.bitbucket.scm.git.command.GitCommand; |
||||||
|
import com.atlassian.bitbucket.scm.git.command.GitScmCommandBuilder; |
||||||
|
import com.atlassian.bitbucket.server.ApplicationPropertiesService; |
||||||
|
import com.atlassian.bitbucket.user.DummySecurityService; |
||||||
|
import com.atlassian.bitbucket.user.SecurityService; |
||||||
|
import org.junit.Before; |
||||||
|
import org.junit.Rule; |
||||||
|
import org.junit.Test; |
||||||
|
import org.mockito.Mock; |
||||||
|
import org.mockito.Spy; |
||||||
|
import org.mockito.junit.MockitoJUnit; |
||||||
|
import org.mockito.junit.MockitoRule; |
||||||
|
|
||||||
|
import java.time.Duration; |
||||||
|
import java.util.Collections; |
||||||
|
import java.util.List; |
||||||
|
|
||||||
|
import static com.atlassian.bitbucket.mockito.MockitoUtils.returnArg; |
||||||
|
import static com.atlassian.bitbucket.mockito.MockitoUtils.returnFirst; |
||||||
|
import static com.atlassian.bitbucket.mockito.MockitoUtils.returnsSelf; |
||||||
|
import static com.englishtown.bitbucket.hook.MirrorBucketProcessor.PROP_TIMEOUT; |
||||||
|
import static org.junit.Assert.assertEquals; |
||||||
|
import static org.mockito.Mockito.*; |
||||||
|
|
||||||
|
/** |
||||||
|
* Unit tests for {@link MirrorRepositoryHook} |
||||||
|
*/ |
||||||
|
public class MirrorBucketProcessorTest { |
||||||
|
|
||||||
|
private static final String URL_HTTP = "https://bitbucket-mirror.englishtown.com/scm/test/test.git"; |
||||||
|
private static final String URL_SSH = "ssh://git@bitbucket-mirror.englishtown.com/scm/test/test.git"; |
||||||
|
|
||||||
|
private static final MirrorSettings SETTINGS = new MirrorSettings() { |
||||||
|
{ |
||||||
|
mirrorRepoUrl = URL_SSH; |
||||||
|
password = "test-password"; |
||||||
|
refspec = "+refs/heads/master:refs/heads/master +refs/heads/develop:refs/heads/develop"; |
||||||
|
username = "test-user"; |
||||||
|
|
||||||
|
atomic = true; |
||||||
|
notes = true; |
||||||
|
tags = true; |
||||||
|
} |
||||||
|
}; |
||||||
|
private static final MirrorRequest REQUEST = new MirrorRequest(1, SETTINGS); |
||||||
|
private static final List<MirrorRequest> REQUESTS = Collections.singletonList(REQUEST); |
||||||
|
|
||||||
|
@Rule |
||||||
|
public MockitoRule mockitoRule = MockitoJUnit.rule().silent(); |
||||||
|
|
||||||
|
@Mock |
||||||
|
private GitScmCommandBuilder builder; |
||||||
|
@Mock |
||||||
|
private GitCommand<String> command; |
||||||
|
@Spy |
||||||
|
private I18nService i18nService = new SimpleI18nService(); |
||||||
|
@Mock |
||||||
|
private PasswordEncryptor passwordEncryptor; |
||||||
|
private MirrorBucketProcessor processor; |
||||||
|
@Mock |
||||||
|
private ApplicationPropertiesService propertiesService; |
||||||
|
@Mock |
||||||
|
private Repository repository; |
||||||
|
@Mock |
||||||
|
private ScmService scmService; |
||||||
|
@Mock |
||||||
|
private RepositoryService repositoryService; |
||||||
|
@Spy |
||||||
|
private SecurityService securityService = new DummySecurityService(); |
||||||
|
|
||||||
|
@Before |
||||||
|
public void setup() { |
||||||
|
when(builder.command(anyString())).thenAnswer(returnsSelf()); |
||||||
|
when(builder.argument(anyString())).thenAnswer(returnsSelf()); |
||||||
|
when(builder.errorHandler(any())).thenAnswer(returnsSelf()); |
||||||
|
when(builder.exitHandler(any())).thenAnswer(returnsSelf()); |
||||||
|
when(builder.<String>build(any())).thenReturn(command); |
||||||
|
|
||||||
|
when(passwordEncryptor.decrypt(anyString())).thenAnswer(returnFirst()); |
||||||
|
when(propertiesService.getPluginProperty(eq(PROP_TIMEOUT), anyLong())).thenAnswer(returnArg(1)); |
||||||
|
|
||||||
|
doReturn(builder).when(scmService).createBuilder(any()); |
||||||
|
|
||||||
|
processor = new MirrorBucketProcessor(i18nService, passwordEncryptor, |
||||||
|
propertiesService, repositoryService, scmService, securityService); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testProcess() { |
||||||
|
when(repositoryService.getById(eq(1))).thenReturn(repository); |
||||||
|
|
||||||
|
processor.process("ignored", REQUESTS); |
||||||
|
|
||||||
|
verify(builder).command(eq("push")); |
||||||
|
verify(builder).argument(eq("--prune")); |
||||||
|
verify(builder).argument(eq("--force")); |
||||||
|
verify(builder).argument(eq(URL_SSH)); |
||||||
|
verify(builder).argument(eq("--atomic")); |
||||||
|
verify(builder).argument(eq("+refs/heads/master:refs/heads/master")); |
||||||
|
verify(builder).argument(eq("+refs/heads/develop:refs/heads/develop")); |
||||||
|
verify(builder).argument(eq("+refs/tags/*:refs/tags/*")); |
||||||
|
verify(builder).argument(eq("+refs/notes/*:refs/notes/*")); |
||||||
|
verify(command).call(); |
||||||
|
verify(command).setTimeout(eq(Duration.ofSeconds(120L))); |
||||||
|
verify(passwordEncryptor).decrypt(eq(SETTINGS.password)); |
||||||
|
verify(scmService).createBuilder(same(repository)); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testProcessWithDeletedRepository() { |
||||||
|
processor.process("ignored", REQUESTS); |
||||||
|
|
||||||
|
verify(repositoryService).getById(eq(1)); |
||||||
|
verifyNoMoreInteractions(repositoryService); |
||||||
|
verifyZeroInteractions(scmService); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testProcessWithEmptyRepository() { |
||||||
|
when(repositoryService.getById(eq(1))).thenReturn(repository); |
||||||
|
when(repositoryService.isEmpty(same(repository))).thenReturn(true); |
||||||
|
|
||||||
|
processor.process("ignored", REQUESTS); |
||||||
|
|
||||||
|
verify(repositoryService).getById(eq(1)); |
||||||
|
verify(repositoryService).isEmpty(same(repository)); |
||||||
|
verifyZeroInteractions(passwordEncryptor, scmService); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testProcessWithoutRequests() { |
||||||
|
processor.process("ignored", Collections.emptyList()); |
||||||
|
|
||||||
|
verifyZeroInteractions(repositoryService, scmService); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testGetAuthenticatedUrlForHttp() { |
||||||
|
String url = processor.getAuthenticatedUrl(URL_HTTP, "user", "password"); |
||||||
|
assertEquals("https://user:password@bitbucket-mirror.englishtown.com/scm/test/test.git", url); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testGetAuthenticatedUrlForSsh() { |
||||||
|
String url = processor.getAuthenticatedUrl(URL_SSH, "user", "password"); |
||||||
|
assertEquals(URL_SSH, url); |
||||||
|
} |
||||||
|
} |
Loading…
Reference in new issue