← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~thumper/launchpad/private-branch-lookup-bug-261609 into lp:launchpad/devel

 

Tim Penhey has proposed merging lp:~thumper/launchpad/private-branch-lookup-bug-261609 into lp:launchpad/devel with lp:~thumper/launchpad/get-linked-to-branch as a prerequisite.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)


This is the actual work that allows urls like:
  bzr+ssh://bazaar.launchpad.net/+branch/project
to work for pushing to.  This will allow us to provide short lp:<project> urls for private trunk
branches without exposing the existence of the branch itself.

As a side-effect of this work, it became trivial to add the ability to push to a trunk branch
that doesn't exist, and create the link if the user has permission to do so.


lib/lp/code/interfaces/codehosting.py
 - define a symbolic name for the branch alias prefix

lib/lp/code/xmlrpc/codehosting.py
 - where the hard work is, two methods had to be changed here
 - createBranch now has a separate method to get the branch namespace, branch name, linking
   function and escaped path, these are then used to create the branch
 - translatePath had to be able to take the paths that start with the prefix and map that on
   to the actual branch in our physical filesystem

lib/lp/code/xmlrpc/tests/test_codehosting.py
 - where all our wonderful tests are to make sure things work properly

lib/lp/codehosting/inmemory.py
 - a big fake codehosting system
 - the codehosting tests above are run against the "real" db using the xmlrpc calls, and also
   against the inmemory codehosting implementation
 - other tests are run against the inmemory system, which has been confirmed to work in exactly
   the same way as the "real" one, but runs in a fraction of the time.
 - the changes here are to make the inmemory system match the implementation of the real one

lib/lp/codehosting/tests/test_acceptance.py
 - slow full system acceptance tests that do end to end tests

so... to run all the tests you should run:

test_acceptance
test_codehosting

-- 
https://code.launchpad.net/~thumper/launchpad/private-branch-lookup-bug-261609/+merge/32167
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~thumper/launchpad/private-branch-lookup-bug-261609 into lp:launchpad/devel.
=== modified file 'lib/lp/code/interfaces/codehosting.py'
--- lib/lp/code/interfaces/codehosting.py	2010-04-21 04:17:23 +0000
+++ lib/lp/code/interfaces/codehosting.py	2010-08-10 03:51:53 +0000
@@ -7,6 +7,7 @@
 
 __metaclass__ = type
 __all__ = [
+    'BRANCH_ALIAS_PREFIX',
     'BRANCH_TRANSPORT',
     'CONTROL_TRANSPORT',
     'ICodehostingAPI',
@@ -45,6 +46,9 @@
 # Indicates that a path points to a control directory.
 CONTROL_TRANSPORT = 'CONTROL_TRANSPORT'
 
+# The path prefix for getting at branches via their short name.
+BRANCH_ALIAS_PREFIX = '+branch'
+
 
 class ICodehostingApplication(ILaunchpadApplication):
     """Branch Puller application root."""

=== modified file 'lib/lp/code/xmlrpc/codehosting.py'
--- lib/lp/code/xmlrpc/codehosting.py	2010-08-03 03:56:32 +0000
+++ lib/lp/code/xmlrpc/codehosting.py	2010-08-10 03:51:53 +0000
@@ -33,14 +33,18 @@
 from lp.code.bzr import BranchFormat, ControlFormat, RepositoryFormat
 from lp.code.enums import BranchType
 from lp.code.errors import (
-    BranchCreationException, InvalidNamespace, UnknownBranchTypeError)
-from lp.code.interfaces.branchlookup import IBranchLookup
+    BranchCreationException, InvalidNamespace, NoLinkedBranch,
+    UnknownBranchTypeError)
+from lp.code.interfaces import branchpuller
+from lp.code.interfaces.branchlookup import (
+    IBranchLookup, ILinkedBranchTraverser)
 from lp.code.interfaces.branchnamespace import (
     lookup_branch_namespace, split_unique_name)
-from lp.code.interfaces import branchpuller
+from lp.code.interfaces.branchtarget import IBranchTarget
 from lp.code.interfaces.codehosting import (
-    BRANCH_TRANSPORT, CONTROL_TRANSPORT, ICodehostingAPI, LAUNCHPAD_ANONYMOUS,
-    LAUNCHPAD_SERVICES)
+    BRANCH_ALIAS_PREFIX, BRANCH_TRANSPORT, CONTROL_TRANSPORT, ICodehostingAPI,
+    LAUNCHPAD_ANONYMOUS, LAUNCHPAD_SERVICES)
+from lp.code.interfaces.linkedbranch import ICanHasLinkedBranch
 from lp.registry.interfaces.person import IPersonSet, NoSuchPerson
 from lp.registry.interfaces.product import NoSuchProduct
 from lp.services.scripts.interfaces.scriptactivity import IScriptActivitySet
@@ -90,7 +94,6 @@
         endInteraction()
 
 
-
 class CodehostingAPI(LaunchpadXMLRPCView):
     """See `ICodehostingAPI`."""
 
@@ -141,6 +144,35 @@
             date_completed=date_completed, hostname=hostname)
         return True
 
+    def _getBranchNamespaceExtras(self, path, requester):
+        """Get the branch namespace, branch name and callback for the path.
+
+        If the path defines a full branch path including the owner and branch
+        name, then the namespace that is returned is the namespace for the
+        owner and the branch target specified.
+
+        If the path uses an lp short name, then we only allow the requester to
+        create a branch if they have permission to link the newly created
+        branch to the short name target.  If there is an existing branch
+        already linked, then BranchExists is raised.  The branch name that is
+        used is determined by the namespace as the first unused name starting
+        with 'trunk'.
+        """
+        if path.startswith(BRANCH_ALIAS_PREFIX + '/'):
+            path = path[len(BRANCH_ALIAS_PREFIX) + 1:]
+            if not path.startswith('~'):
+                context = getUtility(ILinkedBranchTraverser).traverse(path)
+                target = IBranchTarget(context)
+                namespace = target.getNamespace(requester)
+                branch_name = namespace.findUnusedName('trunk')
+                def link_func(new_branch):
+                    link = ICanHasLinkedBranch(context)
+                    link.setBranch(new_branch, requester)
+                return namespace, branch_name, link_func, path
+        namespace_name, branch_name = split_unique_name(path)
+        namespace = lookup_branch_namespace(namespace_name)
+        return namespace, branch_name, None, path
+
     def createBranch(self, login_id, branch_path):
         """See `ICodehostingAPI`."""
         def create_branch(requester):
@@ -148,12 +180,11 @@
                 return faults.InvalidPath(branch_path)
             escaped_path = unescape(branch_path.strip('/'))
             try:
-                namespace_name, branch_name = split_unique_name(escaped_path)
+                namespace, branch_name, link_func, path = (
+                    self._getBranchNamespaceExtras(escaped_path, requester))
             except ValueError:
                 return faults.PermissionDenied(
                     "Cannot create branch at '%s'" % branch_path)
-            try:
-                namespace = lookup_branch_namespace(namespace_name)
             except InvalidNamespace:
                 return faults.PermissionDenied(
                     "Cannot create branch at '%s'" % branch_path)
@@ -175,8 +206,15 @@
                 return faults.PermissionDenied(msg)
             except BranchCreationException, e:
                 return faults.PermissionDenied(str(e))
-            else:
-                return branch.id
+
+            if link_func:
+                try:
+                    link_func(branch)
+                except Unauthorized:
+                    return faults.PermissionDenied(
+                        "Cannot create linked branch at '%s'." % path)
+
+            return branch.id
         return run_with_login(login_id, create_branch)
 
     def _canWriteToBranch(self, requester, branch):
@@ -266,7 +304,26 @@
             for first, second in iter_split(stripped_path, '/'):
                 first = unescape(first)
                 # Is it a branch?
-                branch = getUtility(IBranchLookup).getByUniqueName(first)
+                if first.startswith(BRANCH_ALIAS_PREFIX + '/'):
+                    try:
+                        # translatePath('/+branch/.bzr') *must* return not
+                        # found, otherwise bzr will look for it and we don't
+                        # have a global bzr dir.
+                        lp_path = first[len(BRANCH_ALIAS_PREFIX + '/'):]
+                        if lp_path.startswith('.bzr/'):
+                            raise faults.PathTranslationError(path)
+                        branch, trailing = getUtility(
+                            IBranchLookup).getByLPPath(lp_path)
+                    except (NameLookupFailed, InvalidNamespace, NoLinkedBranch):
+                        # The reason we're doing it is that getByLPPath thinks
+                        # that 'foo/.bzr' is a request for the '.bzr' series
+                        # of a product.
+                        continue
+                    if trailing is None:
+                        trailing = ''
+                    second = '/'.join([trailing, second]).strip('/')
+                else:
+                    branch = getUtility(IBranchLookup).getByUniqueName(first)
                 if branch is not None:
                     branch = self._serializeBranch(requester, branch, second)
                     if branch is None:
@@ -279,4 +336,3 @@
                     return product
             raise faults.PathTranslationError(path)
         return run_with_login(requester_id, translate_path)
-

=== modified file 'lib/lp/code/xmlrpc/tests/test_codehosting.py'
--- lib/lp/code/xmlrpc/tests/test_codehosting.py	2010-08-05 01:00:49 +0000
+++ lib/lp/code/xmlrpc/tests/test_codehosting.py	2010-08-10 03:51:53 +0000
@@ -6,6 +6,7 @@
 __metaclass__ = type
 
 import datetime
+import os
 import pytz
 import unittest
 
@@ -20,7 +21,8 @@
 from canonical.launchpad.ftests import ANONYMOUS, login, logout
 from lp.services.scripts.interfaces.scriptactivity import (
     IScriptActivitySet)
-from lp.code.interfaces.codehosting import BRANCH_TRANSPORT, CONTROL_TRANSPORT
+from lp.code.interfaces.codehosting import (
+    BRANCH_ALIAS_PREFIX, BRANCH_TRANSPORT, CONTROL_TRANSPORT)
 from canonical.launchpad.interfaces.launchpad import ILaunchBag
 from lp.testing import TestCaseWithFactory
 from lp.testing.factory import LaunchpadObjectFactory
@@ -34,6 +36,7 @@
 from lp.code.interfaces.branch import BRANCH_NAME_VALIDATION_ERROR_MESSAGE
 from lp.code.interfaces.branchlookup import IBranchLookup
 from lp.code.interfaces.branchtarget import IBranchTarget
+from lp.code.interfaces.linkedbranch import ICanHasLinkedBranch
 from lp.code.model.tests.test_branchpuller import AcquireBranchToPullTests
 from lp.code.xmlrpc.codehosting import (
     CodehostingAPI, LAUNCHPAD_ANONYMOUS, LAUNCHPAD_SERVICES, run_with_login)
@@ -410,6 +413,116 @@
         message = "No such source package: 'ningnangnong'."
         self.assertEqual(faults.NotFound(message), fault)
 
+    def test_createBranch_using_branch_alias(self):
+        # Branches can be created using the branch alias and the full unique
+        # name of the branch.
+        owner = self.factory.makePerson()
+        product = self.factory.makeProduct()
+        branch_name = self.factory.getUniqueString('branch-name')
+        unique_name = u'~%s/%s/%s' % (owner.name, product.name, branch_name)
+        path = u'/%s/%s' % (BRANCH_ALIAS_PREFIX, unique_name)
+        branch_id = self.codehosting_api.createBranch(owner.id, escape(path))
+        login(ANONYMOUS)
+        branch = self.branch_lookup.get(branch_id)
+        self.assertEqual(unique_name, branch.unique_name)
+
+    def test_createBranch_using_branch_alias_then_lookup(self):
+        # A branch newly created using createBranch is immediately traversable
+        # using translatePath.
+        owner = self.factory.makePerson()
+        product = self.factory.makeProduct()
+        branch_name = self.factory.getUniqueString('branch-name')
+        unique_name = u'~%s/%s/%s' % (owner.name, product.name, branch_name)
+        path = escape(u'/%s/%s' % (BRANCH_ALIAS_PREFIX, unique_name))
+        branch_id = self.codehosting_api.createBranch(owner.id, path)
+        login(ANONYMOUS)
+        translation = self.codehosting_api.translatePath(owner.id, path)
+        self.assertEqual(
+            (BRANCH_TRANSPORT, {'id': branch_id, 'writable': True}, ''),
+            translation)
+
+    def test_createBranch_using_branch_alias_product(self):
+        # If the person creating the branch has permission to link the new
+        # branch to the alias, then they are able to create a branch and link
+        # it.
+        owner = self.factory.makePerson()
+        product = self.factory.makeProduct(owner=owner)
+        path = u'/%s/%s' % (BRANCH_ALIAS_PREFIX, product.name)
+        branch_id = self.codehosting_api.createBranch(owner.id, escape(path))
+        login(ANONYMOUS)
+        branch = self.branch_lookup.get(branch_id)
+        self.assertEqual(owner, branch.owner)
+        self.assertEqual('trunk', branch.name)
+        self.assertEqual(product, branch.product)
+        self.assertEqual(ICanHasLinkedBranch(product).branch, branch)
+
+    def test_createBranch_using_branch_alias_product_then_lookup(self):
+        # A branch newly created using createBranch using a product alias is
+        # immediately traversable using translatePath.
+        product = self.factory.makeProduct()
+        owner = product.owner
+        path = escape(u'/%s/%s' % (BRANCH_ALIAS_PREFIX, product.name))
+        branch_id = self.codehosting_api.createBranch(owner.id, path)
+        login(ANONYMOUS)
+        translation = self.codehosting_api.translatePath(owner.id, path)
+        self.assertEqual(
+            (BRANCH_TRANSPORT, {'id': branch_id, 'writable': True}, ''),
+            translation)
+
+    def test_createBranch_using_branch_alias_product_not_auth(self):
+        # If the person creating the branch does not have permission to link
+        # the new branch to the alias, then can't create the branch.
+        owner = self.factory.makePerson()
+        product = self.factory.makeProduct('wibble')
+        path = u'/%s/%s' % (BRANCH_ALIAS_PREFIX, product.name)
+        fault = self.codehosting_api.createBranch(owner.id, escape(path))
+        message = "Cannot create linked branch at 'wibble'."
+        self.assertEqual(faults.PermissionDenied(message), fault)
+
+    def test_createBranch_using_branch_alias_product_not_exist(self):
+        # If the product doesn't exist, we don't (yet) create one.
+        owner = self.factory.makePerson()
+        path = u'/%s/foible' % (BRANCH_ALIAS_PREFIX,)
+        fault = self.codehosting_api.createBranch(owner.id, escape(path))
+        message = "Project 'foible' does not exist."
+        self.assertEqual(faults.NotFound(message), fault)
+
+    def test_createBranch_using_branch_alias_productseries(self):
+        # If the person creating the branch has permission to link the new
+        # branch to the alias, then they are able to create a branch and link
+        # it.
+        owner = self.factory.makePerson()
+        product = self.factory.makeProduct(owner=owner)
+        series = self.factory.makeProductSeries(product=product)
+        path = u'/%s/%s/%s' % (BRANCH_ALIAS_PREFIX, product.name, series.name)
+        branch_id = self.codehosting_api.createBranch(owner.id, escape(path))
+        login(ANONYMOUS)
+        branch = self.branch_lookup.get(branch_id)
+        self.assertEqual(owner, branch.owner)
+        self.assertEqual('trunk', branch.name)
+        self.assertEqual(product, branch.product)
+        self.assertEqual(ICanHasLinkedBranch(series).branch, branch)
+
+    def test_createBranch_using_branch_alias_productseries_not_auth(self):
+        # If the person creating the branch does not have permission to link
+        # the new branch to the alias, then can't create the branch.
+        owner = self.factory.makePerson()
+        product = self.factory.makeProduct(name='wibble')
+        self.factory.makeProductSeries(product=product, name='nip')
+        path = u'/%s/wibble/nip' % (BRANCH_ALIAS_PREFIX,)
+        fault = self.codehosting_api.createBranch(owner.id, escape(path))
+        message = "Cannot create linked branch at 'wibble/nip'."
+        self.assertEqual(faults.PermissionDenied(message), fault)
+
+    def test_createBranch_using_branch_alias_productseries_not_exist(self):
+        # If the product series doesn't exist, we don't (yet) create it.
+        owner = self.factory.makePerson()
+        self.factory.makeProduct(name='wibble')
+        path = u'/%s/wibble/nip' % (BRANCH_ALIAS_PREFIX,)
+        fault = self.codehosting_api.createBranch(owner.id, escape(path))
+        message = "No such product series: 'nip'."
+        self.assertEqual(faults.NotFound(message), fault)
+
     def test_requestMirror(self):
         # requestMirror should set the next_mirror_time field to be the
         # current time.
@@ -742,6 +855,103 @@
             (BRANCH_TRANSPORT, {'id': branch.id, 'writable': False}, ''),
             translation)
 
+    def test_translatePath_branch_alias_short_name(self):
+        # translatePath translates the short name of a branch if it's prefixed
+        # by +branch.
+        requester = self.factory.makePerson()
+        branch = self.factory.makeProductBranch()
+        removeSecurityProxy(branch.product.development_focus).branch = branch
+        short_name = ICanHasLinkedBranch(branch.product).bzr_path
+        path_in_branch = '.bzr/branch-format'
+        path = escape(u'/%s' % os.path.join(
+                BRANCH_ALIAS_PREFIX, short_name, path_in_branch))
+        translation = self.codehosting_api.translatePath(requester.id, path)
+        login(ANONYMOUS)
+        self.assertEqual(
+            (BRANCH_TRANSPORT, {'id': branch.id, 'writable': False},
+             path_in_branch), translation)
+
+    def test_translatePath_branch_alias_unique_name(self):
+        # translatePath translates +branch paths that are followed by the
+        # unique name as if they didn't have the prefix at all.
+        requester = self.factory.makePerson()
+        branch = self.factory.makeBranch()
+        path_in_branch = '.bzr/branch-format'
+        path = escape(u'/%s' % os.path.join(
+                BRANCH_ALIAS_PREFIX, branch.unique_name, path_in_branch))
+        translation = self.codehosting_api.translatePath(requester.id, path)
+        login(ANONYMOUS)
+        self.assertEqual(
+            (BRANCH_TRANSPORT, {'id': branch.id, 'writable': False},
+             path_in_branch), translation)
+
+    def test_translatePath_branch_alias_no_such_branch(self):
+        # translatePath returns a not found when there's no such branch, given
+        # a unique name after +branch.
+        requester = self.factory.makePerson()
+        product = self.factory.makeProduct()
+        path = '/%s/~%s/%s/doesntexist' % (
+            BRANCH_ALIAS_PREFIX, requester.name, product.name)
+        self.assertNotFound(requester, path)
+
+    def test_translatePath_branch_alias_no_such_person(self):
+        # translatePath returns a not found when there's no such person, given
+        # a unique name after +branch.
+        requester = self.factory.makePerson()
+        path = '/%s/~doesntexist/dontcare/noreally' % (BRANCH_ALIAS_PREFIX,)
+        self.assertNotFound(requester, path)
+
+    def test_translatePath_branch_alias_no_such_product(self):
+        # translatePath returns a not found when there's no such product,
+        # given a unique name after +branch.
+        requester = self.factory.makePerson()
+        path = '/%s/~%s/doesntexist/branchname' % (
+            BRANCH_ALIAS_PREFIX, requester.name)
+        self.assertNotFound(requester, path)
+
+    def test_translatePath_branch_alias_no_such_distro(self):
+        # translatePath returns a not found when there's no such distro, given
+        # a unique name after +branch.
+        requester = self.factory.makePerson()
+        path = '/%s/~%s/doesntexist/lucid/openssh/branchname' % (
+            BRANCH_ALIAS_PREFIX, requester.name)
+        self.assertNotFound(requester, path)
+
+    def test_translatePath_branch_alias_no_such_distroseries(self):
+        # translatePath returns a not found when there's no such distroseries,
+        # given a unique name after +branch.
+        requester = self.factory.makePerson()
+        distro = self.factory.makeDistribution()
+        path = '/%s/~%s/%s/doesntexist/openssh/branchname' % (
+            BRANCH_ALIAS_PREFIX, requester.name, distro.name)
+        self.assertNotFound(requester, path)
+
+    def test_translatePath_branch_alias_no_such_sourcepackagename(self):
+        # translatePath returns a not found when there's no such
+        # sourcepackagename, given a unique name after +branch.
+        requester = self.factory.makePerson()
+        distroseries = self.factory.makeDistroSeries()
+        distro = distroseries.distribution
+        path = '/%s/~%s/%s/%s/doesntexist/branchname' % (
+            BRANCH_ALIAS_PREFIX, requester.name, distro.name,
+            distroseries.name)
+        self.assertNotFound(requester, path)
+
+    def test_translatePath_branch_alias_product_with_no_branch(self):
+        # translatePath returns a not found when we look up a product that has
+        # no linked branch.
+        requester = self.factory.makePerson()
+        product = self.factory.makeProduct()
+        path = '/%s/%s' % (BRANCH_ALIAS_PREFIX, product.name)
+        self.assertNotFound(requester, path)
+
+    def test_translatePath_branch_alias_bzrdir(self):
+        # translatePath('/+branch/.bzr') *must* return not found, otherwise
+        # bzr will look for it and we don't have a global bzr dir.
+        requester = self.factory.makePerson()
+        self.assertNotFound(
+            requester, '/%s/.bzr/branch-format' % BRANCH_ALIAS_PREFIX)
+
     def assertTranslationIsControlDirectory(self, translation,
                                             default_stacked_on,
                                             trailing_path):

=== modified file 'lib/lp/codehosting/inmemory.py'
--- lib/lp/codehosting/inmemory.py	2010-04-26 00:23:45 +0000
+++ lib/lp/codehosting/inmemory.py	2010-08-10 03:51:53 +0000
@@ -30,11 +30,13 @@
 from lp.code.interfaces.branch import IBranch
 from lp.code.interfaces.branchtarget import IBranchTarget
 from lp.code.interfaces.codehosting import (
-    BRANCH_TRANSPORT, CONTROL_TRANSPORT, LAUNCHPAD_ANONYMOUS,
-    LAUNCHPAD_SERVICES)
+    BRANCH_ALIAS_PREFIX, BRANCH_TRANSPORT, CONTROL_TRANSPORT,
+    LAUNCHPAD_ANONYMOUS, LAUNCHPAD_SERVICES)
+from lp.code.interfaces.linkedbranch import ICanHasLinkedBranch
 from lp.code.xmlrpc.codehosting import datetime_from_tuple
 from lp.registry.interfaces.pocket import PackagePublishingPocket
 from lp.services.utils import iter_split
+from lp.services.xmlrpc import LaunchpadFault
 from lp.testing.factory import ObjectFactory
 
 
@@ -265,9 +267,17 @@
 class FakeProduct(FakeDatabaseObject):
     """Fake product."""
 
-    def __init__(self, name):
+    def __init__(self, name, owner):
         self.name = name
-        self.development_focus = FakeProductSeries()
+        self.owner = owner
+        self.bzr_path = name
+        self.development_focus = FakeProductSeries(self, 'trunk')
+        self.series = {
+            'trunk': self.development_focus,
+            }
+
+    def getSeries(self, name):
+        return self.series.get(name, None)
 
 
 @adapter(FakeProduct)
@@ -277,11 +287,36 @@
     return ProductBranchTarget(fake_product)
 
 
+@adapter(FakeProduct)
+@implementer(ICanHasLinkedBranch)
+def fake_product_to_can_has_linked_branch(fake_product):
+    """Adapt a `FakeProduct` to `ICanHasLinkedBranch`."""
+    return fake_product.development_focus
+
+
 class FakeProductSeries(FakeDatabaseObject):
     """Fake product series."""
 
     branch = None
 
+    def __init__(self, product, name):
+        self.product = product
+        self.name = name
+
+    @property
+    def bzr_path(self):
+        if self.product.development_focus is self:
+            return self.product.name
+        else:
+            return "%s/%s" % (self.product.name, self.name)
+
+
+@adapter(FakeProductSeries)
+@implementer(ICanHasLinkedBranch)
+def fake_productseries_to_can_has_linked_branch(fake_productseries):
+    """Adapt a `FakeProductSeries` to `ICanHasLinkedBranch`."""
+    return fake_productseries
+
 
 class FakeScriptActivity(FakeDatabaseObject):
     """Fake script activity."""
@@ -394,6 +429,8 @@
         self._distroseries_set._add(distroseries)
         return distroseries
 
+    makeDistroSeries = makeDistroRelease
+
     def makeSourcePackageName(self):
         sourcepackagename = FakeSourcePackageName(self.getUniqueString())
         self._sourcepackagename_set._add(sourcepackagename)
@@ -418,11 +455,22 @@
         self._person_set._add(person)
         return person
 
-    def makeProduct(self):
-        product = FakeProduct(self.getUniqueString())
+    def makeProduct(self, name=None, owner=None):
+        if name is None:
+            name = self.getUniqueString()
+        if owner is None:
+            owner = self.makePerson()
+        product = FakeProduct(name, owner)
         self._product_set._add(product)
         return product
 
+    def makeProductSeries(self, product, name=None):
+        if name is None:
+            name = self.getUniqueString()
+        series = FakeProductSeries(product, name)
+        product.series[name] = series
+        return series
+
     def enableDefaultStackingForProduct(self, product, branch=None):
         """Give 'product' a default stacked-on branch.
 
@@ -509,28 +557,60 @@
             FakeScriptActivity(name, hostname, date_started, date_completed))
         return True
 
-    def createBranch(self, requester_id, branch_path):
-        if not branch_path.startswith('/'):
-            return faults.InvalidPath(branch_path)
-        escaped_path = unescape(branch_path.strip('/'))
+    def _parseUniqueName(self, branch_path):
+        """Return a dict of the parsed information and the branch name."""
         try:
-            namespace_path, branch_name = escaped_path.rsplit('/', 1)
+            namespace_path, branch_name = branch_path.rsplit('/', 1)
         except ValueError:
-            return faults.PermissionDenied(
-                "Cannot create branch at '%s'" % branch_path)
+            raise faults.PermissionDenied(
+                "Cannot create branch at '/%s'" % branch_path)
         data = BranchNamespaceSet().parse(namespace_path)
+        return data, branch_name
+
+    def _createBranch(self, registrant, branch_path):
+        """The guts of the create branch method.
+
+        Raises exceptions on error conditions.
+        """
+        to_link = None
+        if branch_path.startswith(BRANCH_ALIAS_PREFIX + '/'):
+            branch_path = branch_path[len(BRANCH_ALIAS_PREFIX) + 1:]
+            if branch_path.startswith('~'):
+                data, branch_name = self._parseUniqueName(branch_path)
+            else:
+                tokens = branch_path.split('/')
+                data = {
+                    'person': registrant.name,
+                    'product': tokens[0],
+                    }
+                branch_name = 'trunk'
+                # check the series
+                product = self._product_set.getByName(data['product'])
+                if product is not None:
+                    if len(tokens) > 1:
+                        series = product.getSeries(tokens[1])
+                        if series is None:
+                            raise faults.NotFound(
+                                "No such product series: '%s'." % tokens[1])
+                        else:
+                            to_link = ICanHasLinkedBranch(series)
+                    else:
+                        to_link = ICanHasLinkedBranch(product)
+                # don't forget the link.
+        else:
+            data, branch_name = self._parseUniqueName(branch_path)
+
         owner = self._person_set.getByName(data['person'])
         if owner is None:
-            return faults.NotFound(
+            raise faults.NotFound(
                 "User/team '%s' does not exist." % (data['person'],))
-        registrant = self._person_set.get(requester_id)
         # The real code consults the branch creation policy of the product. We
         # don't need to do so here, since the tests above this layer never
         # encounter that behaviour. If they *do* change to rely on the branch
         # creation policy, the observed behaviour will be failure to raise
         # exceptions.
         if not registrant.inTeam(owner):
-            return faults.PermissionDenied(
+            raise faults.PermissionDenied(
                 ('%s cannot create branches owned by %s'
                  % (registrant.displayname, owner.displayname)))
         product = sourcepackage = None
@@ -539,35 +619,51 @@
         elif data['product'] is not None:
             product = self._product_set.getByName(data['product'])
             if product is None:
-                return faults.NotFound(
+                raise faults.NotFound(
                     "Project '%s' does not exist." % (data['product'],))
         elif data['distribution'] is not None:
             distro = self._distribution_set.getByName(data['distribution'])
             if distro is None:
-                return faults.NotFound(
+                raise faults.NotFound(
                     "No such distribution: '%s'." % (data['distribution'],))
             distroseries = self._distroseries_set.getByName(
                 data['distroseries'])
             if distroseries is None:
-                return faults.NotFound(
+                raise faults.NotFound(
                     "No such distribution series: '%s'."
                     % (data['distroseries'],))
             sourcepackagename = self._sourcepackagename_set.getByName(
                 data['sourcepackagename'])
             if sourcepackagename is None:
-                return faults.NotFound(
+                raise faults.NotFound(
                     "No such source package: '%s'."
                     % (data['sourcepackagename'],))
             sourcepackage = self._factory.makeSourcePackage(
                 distroseries, sourcepackagename)
         else:
-            return faults.PermissionDenied(
+            raise faults.PermissionDenied(
                 "Cannot create branch at '%s'" % branch_path)
+        branch = self._factory.makeBranch(
+            owner=owner, name=branch_name, product=product,
+            sourcepackage=sourcepackage, registrant=registrant,
+            branch_type=BranchType.HOSTED)
+        if to_link is not None:
+            if registrant.inTeam(to_link.product.owner):
+                to_link.branch = branch
+            else:
+                raise faults.PermissionDenied(
+                    "Cannot create linked branch at '%s'." % branch_path)
+        return branch.id
+
+    def createBranch(self, requester_id, branch_path):
+        if not branch_path.startswith('/'):
+            return faults.InvalidPath(branch_path)
+        escaped_path = unescape(branch_path.strip('/'))
+        registrant = self._person_set.get(requester_id)
         try:
-            return self._factory.makeBranch(
-                owner=owner, name=branch_name, product=product,
-                sourcepackage=sourcepackage, registrant=registrant,
-                branch_type=BranchType.HOSTED).id
+            return self._createBranch(registrant, escaped_path)
+        except LaunchpadFault, e:
+            return e
         except LaunchpadValidationError, e:
             msg = e.args[0]
             if isinstance(msg, unicode):
@@ -704,7 +800,15 @@
         for first, second in iter_split(stripped_path, '/'):
             first = unescape(first).encode('utf-8')
             # Is it a branch?
-            branch = self._branch_set._find(unique_name=first)
+            if first.startswith('+branch/'):
+                component_name = first[len('+branch/'):]
+                product = self._product_set.getByName(component_name)
+                if product:
+                    branch = product.development_focus.branch
+                else:
+                    branch = self._branch_set._find(unique_name=component_name)
+            else:
+                branch = self._branch_set._find(unique_name=first)
             if branch is not None:
                 branch = self._serializeBranch(requester_id, branch, second)
                 if isinstance(branch, Fault):
@@ -745,8 +849,10 @@
             self._sourcepackagename_set, self._factory,
             self._script_activity_set)
         sm = getSiteManager()
+        sm.registerAdapter(fake_product_to_can_has_linked_branch)
         sm.registerAdapter(fake_product_to_branch_target)
         sm.registerAdapter(fake_source_package_to_branch_target)
+        sm.registerAdapter(fake_productseries_to_can_has_linked_branch)
 
     def getCodehostingEndpoint(self):
         """See `LaunchpadDatabaseFrontend`.

=== modified file 'lib/lp/codehosting/tests/test_acceptance.py'
--- lib/lp/codehosting/tests/test_acceptance.py	2010-04-19 06:35:23 +0000
+++ lib/lp/codehosting/tests/test_acceptance.py	2010-08-10 03:51:53 +0000
@@ -487,6 +487,27 @@
             '~landscape-developers/landscape/some-branch')
         self.assertNotBranch(remote_url)
 
+    def test_push_to_new_full_branch_alias(self):
+        # We can also push branches to URLs like /+branch/~foo/bar/baz.
+        unique_name = '~testuser/firefox/new-branch'
+        remote_url = self.getTransportURL('+branch/%s' % unique_name)
+        self.push(self.local_branch_path, remote_url)
+        self.assertBranchesMatch(self.local_branch_path, remote_url)
+        self.assertBranchesMatch(
+            self.local_branch_path, self.getTransportURL(unique_name))
+
+    def test_push_to_new_short_branch_alias(self):
+        # We can also push branches to URLs like /+branch/firefox
+        # Hack 'firefox' so we have permission to do this.
+        LaunchpadZopelessTestSetup().txn.begin()
+        firefox = Product.selectOneBy(name='firefox')
+        testuser = Person.selectOneBy(name='testuser')
+        firefox.development_focus.owner = testuser
+        LaunchpadZopelessTestSetup().txn.commit()
+        remote_url = self.getTransportURL('+branch/firefox')
+        self.push(self.local_branch_path, remote_url)
+        self.assertBranchesMatch(self.local_branch_path, remote_url)
+
     def test_can_push_to_existing_hosted_branch(self):
         # If a hosted branch exists in the database, but not on the
         # filesystem, and is writable by the user, then the user is able to