launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #00507
[Merge] lp:~thumper/launchpad/private-branch-lookup-bug-261609 into lp:launchpad/devel
Tim Penhey has proposed merging lp:~thumper/launchpad/private-branch-lookup-bug-261609 into lp:launchpad/devel with lp:~thumper/launchpad/get-linked-to-branch as a prerequisite.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
This is the actual work that allows urls like:
bzr+ssh://bazaar.launchpad.net/+branch/project
to work for pushing to. This will allow us to provide short lp:<project> urls for private trunk
branches without exposing the existence of the branch itself.
As a side-effect of this work, it became trivial to add the ability to push to a trunk branch
that doesn't exist, and create the link if the user has permission to do so.
lib/lp/code/interfaces/codehosting.py
- define a symbolic name for the branch alias prefix
lib/lp/code/xmlrpc/codehosting.py
- where the hard work is, two methods had to be changed here
- createBranch now has a separate method to get the branch namespace, branch name, linking
function and escaped path, these are then used to create the branch
- translatePath had to be able to take the paths that start with the prefix and map that on
to the actual branch in our physical filesystem
lib/lp/code/xmlrpc/tests/test_codehosting.py
- where all our wonderful tests are to make sure things work properly
lib/lp/codehosting/inmemory.py
- a big fake codehosting system
- the codehosting tests above are run against the "real" db using the xmlrpc calls, and also
against the inmemory codehosting implementation
- other tests are run against the inmemory system, which has been confirmed to work in exactly
the same way as the "real" one, but runs in a fraction of the time.
- the changes here are to make the inmemory system match the implementation of the real one
lib/lp/codehosting/tests/test_acceptance.py
- slow full system acceptance tests that do end to end tests
so... to run all the tests you should run:
test_acceptance
test_codehosting
--
https://code.launchpad.net/~thumper/launchpad/private-branch-lookup-bug-261609/+merge/32167
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~thumper/launchpad/private-branch-lookup-bug-261609 into lp:launchpad/devel.
=== modified file 'lib/lp/code/interfaces/codehosting.py'
--- lib/lp/code/interfaces/codehosting.py 2010-04-21 04:17:23 +0000
+++ lib/lp/code/interfaces/codehosting.py 2010-08-10 03:51:53 +0000
@@ -7,6 +7,7 @@
__metaclass__ = type
__all__ = [
+ 'BRANCH_ALIAS_PREFIX',
'BRANCH_TRANSPORT',
'CONTROL_TRANSPORT',
'ICodehostingAPI',
@@ -45,6 +46,9 @@
# Indicates that a path points to a control directory.
CONTROL_TRANSPORT = 'CONTROL_TRANSPORT'
+# The path prefix for getting at branches via their short name.
+BRANCH_ALIAS_PREFIX = '+branch'
+
class ICodehostingApplication(ILaunchpadApplication):
"""Branch Puller application root."""
=== modified file 'lib/lp/code/xmlrpc/codehosting.py'
--- lib/lp/code/xmlrpc/codehosting.py 2010-08-03 03:56:32 +0000
+++ lib/lp/code/xmlrpc/codehosting.py 2010-08-10 03:51:53 +0000
@@ -33,14 +33,18 @@
from lp.code.bzr import BranchFormat, ControlFormat, RepositoryFormat
from lp.code.enums import BranchType
from lp.code.errors import (
- BranchCreationException, InvalidNamespace, UnknownBranchTypeError)
-from lp.code.interfaces.branchlookup import IBranchLookup
+ BranchCreationException, InvalidNamespace, NoLinkedBranch,
+ UnknownBranchTypeError)
+from lp.code.interfaces import branchpuller
+from lp.code.interfaces.branchlookup import (
+ IBranchLookup, ILinkedBranchTraverser)
from lp.code.interfaces.branchnamespace import (
lookup_branch_namespace, split_unique_name)
-from lp.code.interfaces import branchpuller
+from lp.code.interfaces.branchtarget import IBranchTarget
from lp.code.interfaces.codehosting import (
- BRANCH_TRANSPORT, CONTROL_TRANSPORT, ICodehostingAPI, LAUNCHPAD_ANONYMOUS,
- LAUNCHPAD_SERVICES)
+ BRANCH_ALIAS_PREFIX, BRANCH_TRANSPORT, CONTROL_TRANSPORT, ICodehostingAPI,
+ LAUNCHPAD_ANONYMOUS, LAUNCHPAD_SERVICES)
+from lp.code.interfaces.linkedbranch import ICanHasLinkedBranch
from lp.registry.interfaces.person import IPersonSet, NoSuchPerson
from lp.registry.interfaces.product import NoSuchProduct
from lp.services.scripts.interfaces.scriptactivity import IScriptActivitySet
@@ -90,7 +94,6 @@
endInteraction()
-
class CodehostingAPI(LaunchpadXMLRPCView):
"""See `ICodehostingAPI`."""
@@ -141,6 +144,35 @@
date_completed=date_completed, hostname=hostname)
return True
+ def _getBranchNamespaceExtras(self, path, requester):
+ """Get the branch namespace, branch name and callback for the path.
+
+ If the path defines a full branch path including the owner and branch
+ name, then the namespace that is returned is the namespace for the
+ owner and the branch target specified.
+
+ If the path uses an lp short name, then we only allow the requester to
+ create a branch if they have permission to link the newly created
+ branch to the short name target. If there is an existing branch
+ already linked, then BranchExists is raised. The branch name that is
+ used is determined by the namespace as the first unused name starting
+ with 'trunk'.
+ """
+ if path.startswith(BRANCH_ALIAS_PREFIX + '/'):
+ path = path[len(BRANCH_ALIAS_PREFIX) + 1:]
+ if not path.startswith('~'):
+ context = getUtility(ILinkedBranchTraverser).traverse(path)
+ target = IBranchTarget(context)
+ namespace = target.getNamespace(requester)
+ branch_name = namespace.findUnusedName('trunk')
+ def link_func(new_branch):
+ link = ICanHasLinkedBranch(context)
+ link.setBranch(new_branch, requester)
+ return namespace, branch_name, link_func, path
+ namespace_name, branch_name = split_unique_name(path)
+ namespace = lookup_branch_namespace(namespace_name)
+ return namespace, branch_name, None, path
+
def createBranch(self, login_id, branch_path):
"""See `ICodehostingAPI`."""
def create_branch(requester):
@@ -148,12 +180,11 @@
return faults.InvalidPath(branch_path)
escaped_path = unescape(branch_path.strip('/'))
try:
- namespace_name, branch_name = split_unique_name(escaped_path)
+ namespace, branch_name, link_func, path = (
+ self._getBranchNamespaceExtras(escaped_path, requester))
except ValueError:
return faults.PermissionDenied(
"Cannot create branch at '%s'" % branch_path)
- try:
- namespace = lookup_branch_namespace(namespace_name)
except InvalidNamespace:
return faults.PermissionDenied(
"Cannot create branch at '%s'" % branch_path)
@@ -175,8 +206,15 @@
return faults.PermissionDenied(msg)
except BranchCreationException, e:
return faults.PermissionDenied(str(e))
- else:
- return branch.id
+
+ if link_func:
+ try:
+ link_func(branch)
+ except Unauthorized:
+ return faults.PermissionDenied(
+ "Cannot create linked branch at '%s'." % path)
+
+ return branch.id
return run_with_login(login_id, create_branch)
def _canWriteToBranch(self, requester, branch):
@@ -266,7 +304,26 @@
for first, second in iter_split(stripped_path, '/'):
first = unescape(first)
# Is it a branch?
- branch = getUtility(IBranchLookup).getByUniqueName(first)
+ if first.startswith(BRANCH_ALIAS_PREFIX + '/'):
+ try:
+ # translatePath('/+branch/.bzr') *must* return not
+ # found, otherwise bzr will look for it and we don't
+ # have a global bzr dir.
+ lp_path = first[len(BRANCH_ALIAS_PREFIX + '/'):]
+ if lp_path.startswith('.bzr/'):
+ raise faults.PathTranslationError(path)
+ branch, trailing = getUtility(
+ IBranchLookup).getByLPPath(lp_path)
+ except (NameLookupFailed, InvalidNamespace, NoLinkedBranch):
+ # The reason we're doing it is that getByLPPath thinks
+ # that 'foo/.bzr' is a request for the '.bzr' series
+ # of a product.
+ continue
+ if trailing is None:
+ trailing = ''
+ second = '/'.join([trailing, second]).strip('/')
+ else:
+ branch = getUtility(IBranchLookup).getByUniqueName(first)
if branch is not None:
branch = self._serializeBranch(requester, branch, second)
if branch is None:
@@ -279,4 +336,3 @@
return product
raise faults.PathTranslationError(path)
return run_with_login(requester_id, translate_path)
-
=== modified file 'lib/lp/code/xmlrpc/tests/test_codehosting.py'
--- lib/lp/code/xmlrpc/tests/test_codehosting.py 2010-08-05 01:00:49 +0000
+++ lib/lp/code/xmlrpc/tests/test_codehosting.py 2010-08-10 03:51:53 +0000
@@ -6,6 +6,7 @@
__metaclass__ = type
import datetime
+import os
import pytz
import unittest
@@ -20,7 +21,8 @@
from canonical.launchpad.ftests import ANONYMOUS, login, logout
from lp.services.scripts.interfaces.scriptactivity import (
IScriptActivitySet)
-from lp.code.interfaces.codehosting import BRANCH_TRANSPORT, CONTROL_TRANSPORT
+from lp.code.interfaces.codehosting import (
+ BRANCH_ALIAS_PREFIX, BRANCH_TRANSPORT, CONTROL_TRANSPORT)
from canonical.launchpad.interfaces.launchpad import ILaunchBag
from lp.testing import TestCaseWithFactory
from lp.testing.factory import LaunchpadObjectFactory
@@ -34,6 +36,7 @@
from lp.code.interfaces.branch import BRANCH_NAME_VALIDATION_ERROR_MESSAGE
from lp.code.interfaces.branchlookup import IBranchLookup
from lp.code.interfaces.branchtarget import IBranchTarget
+from lp.code.interfaces.linkedbranch import ICanHasLinkedBranch
from lp.code.model.tests.test_branchpuller import AcquireBranchToPullTests
from lp.code.xmlrpc.codehosting import (
CodehostingAPI, LAUNCHPAD_ANONYMOUS, LAUNCHPAD_SERVICES, run_with_login)
@@ -410,6 +413,116 @@
message = "No such source package: 'ningnangnong'."
self.assertEqual(faults.NotFound(message), fault)
+ def test_createBranch_using_branch_alias(self):
+ # Branches can be created using the branch alias and the full unique
+ # name of the branch.
+ owner = self.factory.makePerson()
+ product = self.factory.makeProduct()
+ branch_name = self.factory.getUniqueString('branch-name')
+ unique_name = u'~%s/%s/%s' % (owner.name, product.name, branch_name)
+ path = u'/%s/%s' % (BRANCH_ALIAS_PREFIX, unique_name)
+ branch_id = self.codehosting_api.createBranch(owner.id, escape(path))
+ login(ANONYMOUS)
+ branch = self.branch_lookup.get(branch_id)
+ self.assertEqual(unique_name, branch.unique_name)
+
+ def test_createBranch_using_branch_alias_then_lookup(self):
+ # A branch newly created using createBranch is immediately traversable
+ # using translatePath.
+ owner = self.factory.makePerson()
+ product = self.factory.makeProduct()
+ branch_name = self.factory.getUniqueString('branch-name')
+ unique_name = u'~%s/%s/%s' % (owner.name, product.name, branch_name)
+ path = escape(u'/%s/%s' % (BRANCH_ALIAS_PREFIX, unique_name))
+ branch_id = self.codehosting_api.createBranch(owner.id, path)
+ login(ANONYMOUS)
+ translation = self.codehosting_api.translatePath(owner.id, path)
+ self.assertEqual(
+ (BRANCH_TRANSPORT, {'id': branch_id, 'writable': True}, ''),
+ translation)
+
+ def test_createBranch_using_branch_alias_product(self):
+ # If the person creating the branch has permission to link the new
+ # branch to the alias, then they are able to create a branch and link
+ # it.
+ owner = self.factory.makePerson()
+ product = self.factory.makeProduct(owner=owner)
+ path = u'/%s/%s' % (BRANCH_ALIAS_PREFIX, product.name)
+ branch_id = self.codehosting_api.createBranch(owner.id, escape(path))
+ login(ANONYMOUS)
+ branch = self.branch_lookup.get(branch_id)
+ self.assertEqual(owner, branch.owner)
+ self.assertEqual('trunk', branch.name)
+ self.assertEqual(product, branch.product)
+ self.assertEqual(ICanHasLinkedBranch(product).branch, branch)
+
+ def test_createBranch_using_branch_alias_product_then_lookup(self):
+ # A branch newly created using createBranch using a product alias is
+ # immediately traversable using translatePath.
+ product = self.factory.makeProduct()
+ owner = product.owner
+ path = escape(u'/%s/%s' % (BRANCH_ALIAS_PREFIX, product.name))
+ branch_id = self.codehosting_api.createBranch(owner.id, path)
+ login(ANONYMOUS)
+ translation = self.codehosting_api.translatePath(owner.id, path)
+ self.assertEqual(
+ (BRANCH_TRANSPORT, {'id': branch_id, 'writable': True}, ''),
+ translation)
+
+ def test_createBranch_using_branch_alias_product_not_auth(self):
+ # If the person creating the branch does not have permission to link
+ # the new branch to the alias, then can't create the branch.
+ owner = self.factory.makePerson()
+ product = self.factory.makeProduct('wibble')
+ path = u'/%s/%s' % (BRANCH_ALIAS_PREFIX, product.name)
+ fault = self.codehosting_api.createBranch(owner.id, escape(path))
+ message = "Cannot create linked branch at 'wibble'."
+ self.assertEqual(faults.PermissionDenied(message), fault)
+
+ def test_createBranch_using_branch_alias_product_not_exist(self):
+ # If the product doesn't exist, we don't (yet) create one.
+ owner = self.factory.makePerson()
+ path = u'/%s/foible' % (BRANCH_ALIAS_PREFIX,)
+ fault = self.codehosting_api.createBranch(owner.id, escape(path))
+ message = "Project 'foible' does not exist."
+ self.assertEqual(faults.NotFound(message), fault)
+
+ def test_createBranch_using_branch_alias_productseries(self):
+ # If the person creating the branch has permission to link the new
+ # branch to the alias, then they are able to create a branch and link
+ # it.
+ owner = self.factory.makePerson()
+ product = self.factory.makeProduct(owner=owner)
+ series = self.factory.makeProductSeries(product=product)
+ path = u'/%s/%s/%s' % (BRANCH_ALIAS_PREFIX, product.name, series.name)
+ branch_id = self.codehosting_api.createBranch(owner.id, escape(path))
+ login(ANONYMOUS)
+ branch = self.branch_lookup.get(branch_id)
+ self.assertEqual(owner, branch.owner)
+ self.assertEqual('trunk', branch.name)
+ self.assertEqual(product, branch.product)
+ self.assertEqual(ICanHasLinkedBranch(series).branch, branch)
+
+ def test_createBranch_using_branch_alias_productseries_not_auth(self):
+ # If the person creating the branch does not have permission to link
+ # the new branch to the alias, then can't create the branch.
+ owner = self.factory.makePerson()
+ product = self.factory.makeProduct(name='wibble')
+ self.factory.makeProductSeries(product=product, name='nip')
+ path = u'/%s/wibble/nip' % (BRANCH_ALIAS_PREFIX,)
+ fault = self.codehosting_api.createBranch(owner.id, escape(path))
+ message = "Cannot create linked branch at 'wibble/nip'."
+ self.assertEqual(faults.PermissionDenied(message), fault)
+
+ def test_createBranch_using_branch_alias_productseries_not_exist(self):
+ # If the product series doesn't exist, we don't (yet) create it.
+ owner = self.factory.makePerson()
+ self.factory.makeProduct(name='wibble')
+ path = u'/%s/wibble/nip' % (BRANCH_ALIAS_PREFIX,)
+ fault = self.codehosting_api.createBranch(owner.id, escape(path))
+ message = "No such product series: 'nip'."
+ self.assertEqual(faults.NotFound(message), fault)
+
def test_requestMirror(self):
# requestMirror should set the next_mirror_time field to be the
# current time.
@@ -742,6 +855,103 @@
(BRANCH_TRANSPORT, {'id': branch.id, 'writable': False}, ''),
translation)
+ def test_translatePath_branch_alias_short_name(self):
+ # translatePath translates the short name of a branch if it's prefixed
+ # by +branch.
+ requester = self.factory.makePerson()
+ branch = self.factory.makeProductBranch()
+ removeSecurityProxy(branch.product.development_focus).branch = branch
+ short_name = ICanHasLinkedBranch(branch.product).bzr_path
+ path_in_branch = '.bzr/branch-format'
+ path = escape(u'/%s' % os.path.join(
+ BRANCH_ALIAS_PREFIX, short_name, path_in_branch))
+ translation = self.codehosting_api.translatePath(requester.id, path)
+ login(ANONYMOUS)
+ self.assertEqual(
+ (BRANCH_TRANSPORT, {'id': branch.id, 'writable': False},
+ path_in_branch), translation)
+
+ def test_translatePath_branch_alias_unique_name(self):
+ # translatePath translates +branch paths that are followed by the
+ # unique name as if they didn't have the prefix at all.
+ requester = self.factory.makePerson()
+ branch = self.factory.makeBranch()
+ path_in_branch = '.bzr/branch-format'
+ path = escape(u'/%s' % os.path.join(
+ BRANCH_ALIAS_PREFIX, branch.unique_name, path_in_branch))
+ translation = self.codehosting_api.translatePath(requester.id, path)
+ login(ANONYMOUS)
+ self.assertEqual(
+ (BRANCH_TRANSPORT, {'id': branch.id, 'writable': False},
+ path_in_branch), translation)
+
+ def test_translatePath_branch_alias_no_such_branch(self):
+ # translatePath returns a not found when there's no such branch, given
+ # a unique name after +branch.
+ requester = self.factory.makePerson()
+ product = self.factory.makeProduct()
+ path = '/%s/~%s/%s/doesntexist' % (
+ BRANCH_ALIAS_PREFIX, requester.name, product.name)
+ self.assertNotFound(requester, path)
+
+ def test_translatePath_branch_alias_no_such_person(self):
+ # translatePath returns a not found when there's no such person, given
+ # a unique name after +branch.
+ requester = self.factory.makePerson()
+ path = '/%s/~doesntexist/dontcare/noreally' % (BRANCH_ALIAS_PREFIX,)
+ self.assertNotFound(requester, path)
+
+ def test_translatePath_branch_alias_no_such_product(self):
+ # translatePath returns a not found when there's no such product,
+ # given a unique name after +branch.
+ requester = self.factory.makePerson()
+ path = '/%s/~%s/doesntexist/branchname' % (
+ BRANCH_ALIAS_PREFIX, requester.name)
+ self.assertNotFound(requester, path)
+
+ def test_translatePath_branch_alias_no_such_distro(self):
+ # translatePath returns a not found when there's no such distro, given
+ # a unique name after +branch.
+ requester = self.factory.makePerson()
+ path = '/%s/~%s/doesntexist/lucid/openssh/branchname' % (
+ BRANCH_ALIAS_PREFIX, requester.name)
+ self.assertNotFound(requester, path)
+
+ def test_translatePath_branch_alias_no_such_distroseries(self):
+ # translatePath returns a not found when there's no such distroseries,
+ # given a unique name after +branch.
+ requester = self.factory.makePerson()
+ distro = self.factory.makeDistribution()
+ path = '/%s/~%s/%s/doesntexist/openssh/branchname' % (
+ BRANCH_ALIAS_PREFIX, requester.name, distro.name)
+ self.assertNotFound(requester, path)
+
+ def test_translatePath_branch_alias_no_such_sourcepackagename(self):
+ # translatePath returns a not found when there's no such
+ # sourcepackagename, given a unique name after +branch.
+ requester = self.factory.makePerson()
+ distroseries = self.factory.makeDistroSeries()
+ distro = distroseries.distribution
+ path = '/%s/~%s/%s/%s/doesntexist/branchname' % (
+ BRANCH_ALIAS_PREFIX, requester.name, distro.name,
+ distroseries.name)
+ self.assertNotFound(requester, path)
+
+ def test_translatePath_branch_alias_product_with_no_branch(self):
+ # translatePath returns a not found when we look up a product that has
+ # no linked branch.
+ requester = self.factory.makePerson()
+ product = self.factory.makeProduct()
+ path = '/%s/%s' % (BRANCH_ALIAS_PREFIX, product.name)
+ self.assertNotFound(requester, path)
+
+ def test_translatePath_branch_alias_bzrdir(self):
+ # translatePath('/+branch/.bzr') *must* return not found, otherwise
+ # bzr will look for it and we don't have a global bzr dir.
+ requester = self.factory.makePerson()
+ self.assertNotFound(
+ requester, '/%s/.bzr/branch-format' % BRANCH_ALIAS_PREFIX)
+
def assertTranslationIsControlDirectory(self, translation,
default_stacked_on,
trailing_path):
=== modified file 'lib/lp/codehosting/inmemory.py'
--- lib/lp/codehosting/inmemory.py 2010-04-26 00:23:45 +0000
+++ lib/lp/codehosting/inmemory.py 2010-08-10 03:51:53 +0000
@@ -30,11 +30,13 @@
from lp.code.interfaces.branch import IBranch
from lp.code.interfaces.branchtarget import IBranchTarget
from lp.code.interfaces.codehosting import (
- BRANCH_TRANSPORT, CONTROL_TRANSPORT, LAUNCHPAD_ANONYMOUS,
- LAUNCHPAD_SERVICES)
+ BRANCH_ALIAS_PREFIX, BRANCH_TRANSPORT, CONTROL_TRANSPORT,
+ LAUNCHPAD_ANONYMOUS, LAUNCHPAD_SERVICES)
+from lp.code.interfaces.linkedbranch import ICanHasLinkedBranch
from lp.code.xmlrpc.codehosting import datetime_from_tuple
from lp.registry.interfaces.pocket import PackagePublishingPocket
from lp.services.utils import iter_split
+from lp.services.xmlrpc import LaunchpadFault
from lp.testing.factory import ObjectFactory
@@ -265,9 +267,17 @@
class FakeProduct(FakeDatabaseObject):
"""Fake product."""
- def __init__(self, name):
+ def __init__(self, name, owner):
self.name = name
- self.development_focus = FakeProductSeries()
+ self.owner = owner
+ self.bzr_path = name
+ self.development_focus = FakeProductSeries(self, 'trunk')
+ self.series = {
+ 'trunk': self.development_focus,
+ }
+
+ def getSeries(self, name):
+ return self.series.get(name, None)
@adapter(FakeProduct)
@@ -277,11 +287,36 @@
return ProductBranchTarget(fake_product)
+@adapter(FakeProduct)
+@implementer(ICanHasLinkedBranch)
+def fake_product_to_can_has_linked_branch(fake_product):
+ """Adapt a `FakeProduct` to `ICanHasLinkedBranch`."""
+ return fake_product.development_focus
+
+
class FakeProductSeries(FakeDatabaseObject):
"""Fake product series."""
branch = None
+ def __init__(self, product, name):
+ self.product = product
+ self.name = name
+
+ @property
+ def bzr_path(self):
+ if self.product.development_focus is self:
+ return self.product.name
+ else:
+ return "%s/%s" % (self.product.name, self.name)
+
+
+@adapter(FakeProductSeries)
+@implementer(ICanHasLinkedBranch)
+def fake_productseries_to_can_has_linked_branch(fake_productseries):
+ """Adapt a `FakeProductSeries` to `ICanHasLinkedBranch`."""
+ return fake_productseries
+
class FakeScriptActivity(FakeDatabaseObject):
"""Fake script activity."""
@@ -394,6 +429,8 @@
self._distroseries_set._add(distroseries)
return distroseries
+ makeDistroSeries = makeDistroRelease
+
def makeSourcePackageName(self):
sourcepackagename = FakeSourcePackageName(self.getUniqueString())
self._sourcepackagename_set._add(sourcepackagename)
@@ -418,11 +455,22 @@
self._person_set._add(person)
return person
- def makeProduct(self):
- product = FakeProduct(self.getUniqueString())
+ def makeProduct(self, name=None, owner=None):
+ if name is None:
+ name = self.getUniqueString()
+ if owner is None:
+ owner = self.makePerson()
+ product = FakeProduct(name, owner)
self._product_set._add(product)
return product
+ def makeProductSeries(self, product, name=None):
+ if name is None:
+ name = self.getUniqueString()
+ series = FakeProductSeries(product, name)
+ product.series[name] = series
+ return series
+
def enableDefaultStackingForProduct(self, product, branch=None):
"""Give 'product' a default stacked-on branch.
@@ -509,28 +557,60 @@
FakeScriptActivity(name, hostname, date_started, date_completed))
return True
- def createBranch(self, requester_id, branch_path):
- if not branch_path.startswith('/'):
- return faults.InvalidPath(branch_path)
- escaped_path = unescape(branch_path.strip('/'))
+ def _parseUniqueName(self, branch_path):
+ """Return a dict of the parsed information and the branch name."""
try:
- namespace_path, branch_name = escaped_path.rsplit('/', 1)
+ namespace_path, branch_name = branch_path.rsplit('/', 1)
except ValueError:
- return faults.PermissionDenied(
- "Cannot create branch at '%s'" % branch_path)
+ raise faults.PermissionDenied(
+ "Cannot create branch at '/%s'" % branch_path)
data = BranchNamespaceSet().parse(namespace_path)
+ return data, branch_name
+
+ def _createBranch(self, registrant, branch_path):
+ """The guts of the create branch method.
+
+ Raises exceptions on error conditions.
+ """
+ to_link = None
+ if branch_path.startswith(BRANCH_ALIAS_PREFIX + '/'):
+ branch_path = branch_path[len(BRANCH_ALIAS_PREFIX) + 1:]
+ if branch_path.startswith('~'):
+ data, branch_name = self._parseUniqueName(branch_path)
+ else:
+ tokens = branch_path.split('/')
+ data = {
+ 'person': registrant.name,
+ 'product': tokens[0],
+ }
+ branch_name = 'trunk'
+ # check the series
+ product = self._product_set.getByName(data['product'])
+ if product is not None:
+ if len(tokens) > 1:
+ series = product.getSeries(tokens[1])
+ if series is None:
+ raise faults.NotFound(
+ "No such product series: '%s'." % tokens[1])
+ else:
+ to_link = ICanHasLinkedBranch(series)
+ else:
+ to_link = ICanHasLinkedBranch(product)
+ # don't forget the link.
+ else:
+ data, branch_name = self._parseUniqueName(branch_path)
+
owner = self._person_set.getByName(data['person'])
if owner is None:
- return faults.NotFound(
+ raise faults.NotFound(
"User/team '%s' does not exist." % (data['person'],))
- registrant = self._person_set.get(requester_id)
# The real code consults the branch creation policy of the product. We
# don't need to do so here, since the tests above this layer never
# encounter that behaviour. If they *do* change to rely on the branch
# creation policy, the observed behaviour will be failure to raise
# exceptions.
if not registrant.inTeam(owner):
- return faults.PermissionDenied(
+ raise faults.PermissionDenied(
('%s cannot create branches owned by %s'
% (registrant.displayname, owner.displayname)))
product = sourcepackage = None
@@ -539,35 +619,51 @@
elif data['product'] is not None:
product = self._product_set.getByName(data['product'])
if product is None:
- return faults.NotFound(
+ raise faults.NotFound(
"Project '%s' does not exist." % (data['product'],))
elif data['distribution'] is not None:
distro = self._distribution_set.getByName(data['distribution'])
if distro is None:
- return faults.NotFound(
+ raise faults.NotFound(
"No such distribution: '%s'." % (data['distribution'],))
distroseries = self._distroseries_set.getByName(
data['distroseries'])
if distroseries is None:
- return faults.NotFound(
+ raise faults.NotFound(
"No such distribution series: '%s'."
% (data['distroseries'],))
sourcepackagename = self._sourcepackagename_set.getByName(
data['sourcepackagename'])
if sourcepackagename is None:
- return faults.NotFound(
+ raise faults.NotFound(
"No such source package: '%s'."
% (data['sourcepackagename'],))
sourcepackage = self._factory.makeSourcePackage(
distroseries, sourcepackagename)
else:
- return faults.PermissionDenied(
+ raise faults.PermissionDenied(
"Cannot create branch at '%s'" % branch_path)
+ branch = self._factory.makeBranch(
+ owner=owner, name=branch_name, product=product,
+ sourcepackage=sourcepackage, registrant=registrant,
+ branch_type=BranchType.HOSTED)
+ if to_link is not None:
+ if registrant.inTeam(to_link.product.owner):
+ to_link.branch = branch
+ else:
+ raise faults.PermissionDenied(
+ "Cannot create linked branch at '%s'." % branch_path)
+ return branch.id
+
+ def createBranch(self, requester_id, branch_path):
+ if not branch_path.startswith('/'):
+ return faults.InvalidPath(branch_path)
+ escaped_path = unescape(branch_path.strip('/'))
+ registrant = self._person_set.get(requester_id)
try:
- return self._factory.makeBranch(
- owner=owner, name=branch_name, product=product,
- sourcepackage=sourcepackage, registrant=registrant,
- branch_type=BranchType.HOSTED).id
+ return self._createBranch(registrant, escaped_path)
+ except LaunchpadFault, e:
+ return e
except LaunchpadValidationError, e:
msg = e.args[0]
if isinstance(msg, unicode):
@@ -704,7 +800,15 @@
for first, second in iter_split(stripped_path, '/'):
first = unescape(first).encode('utf-8')
# Is it a branch?
- branch = self._branch_set._find(unique_name=first)
+ if first.startswith('+branch/'):
+ component_name = first[len('+branch/'):]
+ product = self._product_set.getByName(component_name)
+ if product:
+ branch = product.development_focus.branch
+ else:
+ branch = self._branch_set._find(unique_name=component_name)
+ else:
+ branch = self._branch_set._find(unique_name=first)
if branch is not None:
branch = self._serializeBranch(requester_id, branch, second)
if isinstance(branch, Fault):
@@ -745,8 +849,10 @@
self._sourcepackagename_set, self._factory,
self._script_activity_set)
sm = getSiteManager()
+ sm.registerAdapter(fake_product_to_can_has_linked_branch)
sm.registerAdapter(fake_product_to_branch_target)
sm.registerAdapter(fake_source_package_to_branch_target)
+ sm.registerAdapter(fake_productseries_to_can_has_linked_branch)
def getCodehostingEndpoint(self):
"""See `LaunchpadDatabaseFrontend`.
=== modified file 'lib/lp/codehosting/tests/test_acceptance.py'
--- lib/lp/codehosting/tests/test_acceptance.py 2010-04-19 06:35:23 +0000
+++ lib/lp/codehosting/tests/test_acceptance.py 2010-08-10 03:51:53 +0000
@@ -487,6 +487,27 @@
'~landscape-developers/landscape/some-branch')
self.assertNotBranch(remote_url)
+ def test_push_to_new_full_branch_alias(self):
+ # We can also push branches to URLs like /+branch/~foo/bar/baz.
+ unique_name = '~testuser/firefox/new-branch'
+ remote_url = self.getTransportURL('+branch/%s' % unique_name)
+ self.push(self.local_branch_path, remote_url)
+ self.assertBranchesMatch(self.local_branch_path, remote_url)
+ self.assertBranchesMatch(
+ self.local_branch_path, self.getTransportURL(unique_name))
+
+ def test_push_to_new_short_branch_alias(self):
+ # We can also push branches to URLs like /+branch/firefox
+ # Hack 'firefox' so we have permission to do this.
+ LaunchpadZopelessTestSetup().txn.begin()
+ firefox = Product.selectOneBy(name='firefox')
+ testuser = Person.selectOneBy(name='testuser')
+ firefox.development_focus.owner = testuser
+ LaunchpadZopelessTestSetup().txn.commit()
+ remote_url = self.getTransportURL('+branch/firefox')
+ self.push(self.local_branch_path, remote_url)
+ self.assertBranchesMatch(self.local_branch_path, remote_url)
+
def test_can_push_to_existing_hosted_branch(self):
# If a hosted branch exists in the database, but not on the
# filesystem, and is writable by the user, then the user is able to