← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~abentley/launchpad/all-url-support into lp:launchpad

 

Aaron Bentley has proposed merging lp:~abentley/launchpad/all-url-support into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #891715 in Launchpad itself: "API doesn't recognize standard parent branch URLs"
  https://bugs.launchpad.net/launchpad/+bug/891715

For more details, see:
https://code.launchpad.net/~abentley/launchpad/all-url-support/+merge/113294

= Summary =
Fix bug #891715: API doesn't recognize standard parent branch URLs

== Proposed fix ==
Switch getByUrl, translatePath and _getBranchIdAndTrailingPath to use common lookup code.

== Pre-implementation notes ==
None

== LOC Rationale ==
Reduces LOC by 40 lines.

== Implementation details ==
The root cause of this bug was DRY violation-- there were several ways to convert a hosting path into a branch.  Now, there is only one; BranchLookup.performLookup().  The in-memory version also shares as much code as possible.

The behaviour of iter_split was tweaked so that:
1. slashes are not lost (because some code paths care about trailing slashes.)
2. the specific splits to perform can be specified.

parseBranch called iter_split, but was determined to be unused, so it was removed.

BranchLookup.getIdAndTrailingPath was removed because it was unused after refactoring.  Its tests were converted into BranchLookup.getByHostingPath tests, except for the privacy tests, which were moved to Rewriter._getBranchIdAndTrailingPath.

The body of translatePath was extracted to performLookup and get_first_path_result.

inmemory was updated to use get_first_path_result.

Some lint was fixed.

== Tests ==
bin/test -v

== Demo and Q/A ==
Using launchpadlib, use getByUrl with a url like "bzr+ssh://+branch/bzrtools".


= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/code/xmlrpc/codehosting.py
  lib/lp/code/interfaces/branchnamespace.py
  lib/lp/app/browser/launchpad.py
  lib/lp/codehosting/inmemory.py
  lib/lp/code/model/tests/test_branchlookup.py
  lib/lp/code/xmlrpc/branch.py
  lib/lp/codehosting/tests/test_rewrite.py
  lib/lp/code/model/branchlookup.py
  lib/lp/codehosting/rewrite.py
  scripts/branch-rewrite.py
  lib/lp/code/model/tests/test_branchnamespace.py
  lib/lp/code/model/branchnamespace.py
  lib/lp/code/interfaces/branchlookup.py
  lib/lp/services/utils.py
-- 
https://code.launchpad.net/~abentley/launchpad/all-url-support/+merge/113294
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/all-url-support into lp:launchpad.
=== modified file 'lib/lp/app/browser/launchpad.py'
--- lib/lp/app/browser/launchpad.py	2012-06-29 08:40:05 +0000
+++ lib/lp/app/browser/launchpad.py	2012-07-03 20:16:32 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Browser code for the launchpad application."""
@@ -618,10 +618,9 @@
         target_url = self.request.getHeader('referer')
         path = '/'.join(self.request.stepstogo)
         try:
-            branch_data = getUtility(IBranchLookup).getByLPPath(path)
-            branch, trailing = branch_data
+            branch, trailing = getUtility(IBranchLookup).getByLPPath(path)
             target_url = canonical_url(branch)
-            if trailing is not None:
+            if trailing != '':
                 target_url = urlappend(target_url, trailing)
         except (NoLinkedBranch) as e:
             # A valid ICanHasLinkedBranch target exists but there's no

=== modified file 'lib/lp/code/interfaces/branchlookup.py'
--- lib/lp/code/interfaces/branchlookup.py	2011-08-23 04:38:33 +0000
+++ lib/lp/code/interfaces/branchlookup.py	2012-07-03 20:16:32 +0000
@@ -7,12 +7,23 @@
 
 __metaclass__ = type
 __all__ = [
+    'get_first_path_result',
     'IBranchLookup',
     'ILinkedBranchTraversable',
     'ILinkedBranchTraverser',
     ]
 
+from itertools import (
+    ifilter,
+    imap
+    )
+
 from zope.interface import Interface
+from lp.code.interfaces.codehosting import (
+    BRANCH_ALIAS_PREFIX,
+    BRANCH_ID_ALIAS_PREFIX,
+    )
+from lp.services.utils import iter_split
 
 
 class ILinkedBranchTraversable(Interface):
@@ -60,6 +71,16 @@
         Return the default value if there is no such branch.
         """
 
+    def getByHostingPath(path):
+        """Get information about a given codehosting path.
+
+        If the path includes a branch, it is returned.  Otherwise, None.
+        The portion of the path following the branch's portion is returned as
+        'trailing'.
+
+        :return: A tuple of (branch, trailing).
+        """
+
     def getByUniqueName(unique_name):
         """Find a branch by its ~owner/product/name unique name.
 
@@ -77,15 +98,15 @@
             branch is identified.
         """
 
-    def uriToUniqueName(uri):
-        """Return the unique name for the URI, if the URI is on codehosting.
+    def uriToHostingPath(uri):
+        """Return the path for the URI, if the URI is on codehosting.
 
-        This does not ensure that the unique name is valid.  It recognizes the
+        This does not ensure that the path is valid.  It recognizes the
         codehosting URIs of remote branches and mirrors, but not their
         remote URIs.
 
         :param uri: An instance of lazr.uri.URI
-        :return: The unique name if possible, None if the URI is not a valid
+        :return: The path if possible, None if the URI is not a valid
             codehosting URI.
         """
 
@@ -98,6 +119,9 @@
         Return None if no match was found.
         """
 
+    def performLookup(lookup):
+        """Find a branch and trailing path according to params"""
+
     def getByUrls(urls):
         """Find branches by URL.
 
@@ -147,3 +171,47 @@
             make things like 'bzr cat lp:~foo/bar/baz/README' work. Trailing
             paths are not handled for shortcut paths.
         """
+
+
+def path_lookups(path):
+    if path.startswith(BRANCH_ID_ALIAS_PREFIX + '/'):
+        try:
+            parts = path.split('/', 2)
+            branch_id = int(parts[1])
+        except (ValueError, IndexError):
+            return
+        trailing = '/'.join([''] + parts[2:])
+        yield {'type': 'id', 'branch_id': branch_id, 'trailing': trailing}
+        return
+    alias_prefix_trailing = BRANCH_ALIAS_PREFIX + '/'
+    if path.startswith(alias_prefix_trailing):
+        yield {'type': 'alias', 'lp_path': path[len(alias_prefix_trailing):]}
+        return
+    for unique_name, trailing in iter_split(path, '/', [5, 3]):
+        yield {
+            'type': 'branch_name',
+            'unique_name': unique_name,
+            'trailing': trailing,
+        }
+    for control_name, trailing in iter_split(path, '/', [4, 2]):
+        yield {
+            'type': 'control_name',
+            'control_name': control_name,
+            'trailing': trailing,
+        }
+
+
+def get_first_path_result(path, perform_lookup, failure_result):
+    """Find the first codehosting path lookup result.
+
+    :param path: The codehosting path to use.
+    :param perform_lookup: The callable to use for looking up a value.
+    :param failure_result: The result that indicates lookup failure.
+    :return: The first successful lookup, or failure_result if there are
+        no successes.
+    """
+    sparse_results = imap(perform_lookup, path_lookups(path))
+    results = ifilter(lambda x: x != failure_result, sparse_results)
+    for result in results:
+        return result
+    return failure_result

=== modified file 'lib/lp/code/interfaces/branchnamespace.py'
--- lib/lp/code/interfaces/branchnamespace.py	2011-03-03 01:13:47 +0000
+++ lib/lp/code/interfaces/branchnamespace.py	2012-07-03 20:16:32 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 # pylint: disable-msg=E0213,E0211
@@ -54,7 +54,7 @@
 
     def getBranches(eager_load=False):
         """Return the branches in this namespace.
-        
+
         :param eager_load: If True eager load related data for the branches.
         """
 
@@ -241,26 +241,6 @@
         :return: A dict with keys matching each component in 'namespace_name'.
         """
 
-    def parseBranchPath(branch_path):
-        """Parse 'branch_path' into a namespace dict and a trailing path.
-
-        See `IBranchNamespaceSet.parse` for what we mean by 'namespace dict'.
-
-        Since some paths can be parsed as either package branch paths or
-        product branch paths, this method yields possible parses of the given
-        path. The order of the yielded parses is undefined and shouldn't be
-        relied on.
-
-        Note that at most one of the parses will actually be valid. This can
-        be determined by looking the objects up in the database, or by using
-        `IBranchNamespaceSet.interpret`.
-
-        :param branch_path: A path to or within a branch. This will often, but
-            not always, include a '.bzr' segment.
-        :return: An iterator that yields '(namespace_dict, branch_name,
-            trailing_path)' for all valid parses of 'branch_path'.
-        """
-
     def traverse(segments):
         """Look up the branch at the path given by 'segments'.
 

=== modified file 'lib/lp/code/model/branchlookup.py'
--- lib/lp/code/model/branchlookup.py	2012-06-06 06:56:17 +0000
+++ lib/lp/code/model/branchlookup.py	2012-07-03 20:16:32 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Database implementation of the branch lookup utility."""
@@ -8,6 +8,10 @@
 # then get the IBranchLookup utility.
 __all__ = []
 
+
+import re
+
+from bzrlib.urlutils import escape
 from lazr.enum import DBItem
 from lazr.uri import (
     InvalidURIError,
@@ -26,6 +30,7 @@
     )
 from zope.interface import implements
 
+from lp.app.errors import NameLookupFailed
 from lp.app.validators.name import valid_name
 from lp.code.errors import (
     CannotHaveLinkedBranch,
@@ -34,15 +39,14 @@
     NoSuchBranch,
     )
 from lp.code.interfaces.branchlookup import (
+    get_first_path_result,
     IBranchLookup,
     ILinkedBranchTraversable,
     ILinkedBranchTraverser,
     )
 from lp.code.interfaces.branchnamespace import IBranchNamespaceSet
-from lp.code.interfaces.codehosting import BRANCH_ID_ALIAS_PREFIX
 from lp.code.interfaces.linkedbranch import get_linked_to_branch
 from lp.code.model.branch import Branch
-from lp.registry.enums import PUBLIC_INFORMATION_TYPES
 from lp.registry.errors import (
     NoSuchDistroSeries,
     NoSuchSourcePackageName,
@@ -67,10 +71,8 @@
 from lp.registry.model.sourcepackagename import SourcePackageName
 from lp.services.config import config
 from lp.services.database.lpstorm import (
-    IMasterStore,
-    ISlaveStore,
+    IStore,
     )
-from lp.services.utils import iter_split
 from lp.services.webapp.authorization import check_permission
 from lp.services.webapp.interfaces import (
     DEFAULT_FLAVOR,
@@ -224,7 +226,7 @@
             return default
 
     @staticmethod
-    def uriToUniqueName(uri):
+    def uriToHostingPath(uri):
         """See `IBranchLookup`."""
         schemes = ('http', 'sftp', 'bzr+ssh')
         codehosting_host = URI(config.codehosting.supermirror_root).host
@@ -251,9 +253,11 @@
         except InvalidURIError:
             return None
 
-        unique_name = self.uriToUniqueName(uri)
-        if unique_name is not None:
-            return self.getByUniqueName(unique_name)
+        path = self.uriToHostingPath(uri)
+        if path is not None:
+            branch, trailing = self.getByHostingPath(path)
+            if branch is not None:
+                return branch
 
         if uri.scheme == 'lp':
             if not self._uriHostAllowed(uri):
@@ -269,6 +273,27 @@
 
         return Branch.selectOneBy(url=url)
 
+    def performLookup(self, lookup):
+        if lookup['type'] == 'id':
+            return (self.get(lookup['branch_id']), lookup['trailing'])
+        elif lookup['type'] == 'alias':
+            try:
+                return self.getByLPPath(lookup['lp_path'])
+            except (InvalidProductName, NoLinkedBranch,
+                    CannotHaveLinkedBranch, NameLookupFailed,
+                    InvalidNamespace):
+                return None, None
+        elif lookup['type'] == 'branch_name':
+            store = IStore(Branch)
+            result = store.find(Branch,
+                                Branch.unique_name == lookup['unique_name'])
+            return (result.one(), escape(lookup['trailing']))
+        else:
+            return None, ''
+
+    def getByHostingPath(self, path):
+        return get_first_path_result(path, self.performLookup, (None, ''))
+
     def getByUrls(self, urls):
         """See `IBranchLookup`."""
         return dict((url, self.getByUrl(url)) for url in set(urls))
@@ -294,53 +319,6 @@
             return None
         return self._getBranchInNamespace(namespace_data, branch_name)
 
-    def _getIdAndTrailingPathByIdAlias(self, store, path):
-        """Query by the integer id."""
-        parts = path.split('/', 2)
-        try:
-            branch_id = int(parts[1])
-        except (ValueError, IndexError):
-            return None, None
-        result = store.find(
-            (Branch.id),
-            Branch.id == branch_id,
-            Branch.information_type.is_in(PUBLIC_INFORMATION_TYPES)).one()
-        if result is None:
-            return None, None
-        else:
-            try:
-                return branch_id, '/' + parts[2]
-            except IndexError:
-                return branch_id, ''
-
-    def _getIdAndTrailingPathByUniqueName(self, store, path):
-        """Query based on the unique name."""
-        prefixes = []
-        for first, second in iter_split(path, '/'):
-            prefixes.append(first)
-        result = store.find(
-            (Branch.id, Branch.unique_name),
-            Branch.unique_name.is_in(prefixes),
-            Branch.information_type.is_in(PUBLIC_INFORMATION_TYPES)).one()
-        if result is None:
-            return None, None
-        else:
-            branch_id, unique_name = result
-            trailing = path[len(unique_name):]
-            return branch_id, trailing
-
-    def getIdAndTrailingPath(self, path, from_slave=False):
-        """See `IBranchLookup`. """
-        if from_slave:
-            store = ISlaveStore(Branch)
-        else:
-            store = IMasterStore(Branch)
-        path = path.lstrip('/')
-        if path.startswith(BRANCH_ID_ALIAS_PREFIX):
-            return self._getIdAndTrailingPathByIdAlias(store, path)
-        else:
-            return self._getIdAndTrailingPathByUniqueName(store, path)
-
     def _getBranchInNamespace(self, namespace_data, branch_name):
         if namespace_data['product'] == '+junk':
             return self._getPersonalBranch(
@@ -421,13 +399,13 @@
         else:
             # If the first element doesn't start with a tilde, then maybe
             # 'path' is a shorthand notation for a branch.
+            # Ignore anything following /.bzr
+            prefix = re.match('^(.*?)(/?.bzr(/.*)?)?$', path).group(1)
             object_with_branch_link = getUtility(
-                ILinkedBranchTraverser).traverse(path)
+                ILinkedBranchTraverser).traverse(prefix)
             branch, bzr_path = self._getLinkedBranchAndPath(
                 object_with_branch_link)
             suffix = path[len(bzr_path) + 1:]
-        if suffix == '':
-            suffix = None
         return branch, suffix
 
     def _getLinkedBranchAndPath(self, provided):

=== modified file 'lib/lp/code/model/branchnamespace.py'
--- lib/lp/code/model/branchnamespace.py	2012-06-14 09:09:59 +0000
+++ lib/lp/code/model/branchnamespace.py	2012-07-03 20:16:32 +0000
@@ -69,7 +69,6 @@
 from lp.registry.interfaces.sourcepackagename import ISourcePackageNameSet
 from lp.registry.model.sourcepackage import SourcePackage
 from lp.services.database.constants import UTC_NOW
-from lp.services.utils import iter_split
 from lp.services.webapp.interfaces import (
     DEFAULT_FLAVOR,
     IStoreSelector,
@@ -512,24 +511,6 @@
         data['person'] = data['person'][1:]
         return data
 
-    def parseBranchPath(self, namespace_path):
-        """See `IBranchNamespaceSet`."""
-        found = False
-        for branch_path, trailing_path in iter_split(namespace_path, '/'):
-            try:
-                branch_path, branch = branch_path.rsplit('/', 1)
-            except ValueError:
-                continue
-            try:
-                parsed = self.parse(branch_path)
-            except InvalidNamespace:
-                continue
-            else:
-                found = True
-                yield parsed, branch, trailing_path
-        if not found:
-            raise InvalidNamespace(namespace_path)
-
     def lookup(self, namespace_name):
         """See `IBranchNamespaceSet`."""
         names = self.parse(namespace_name)

=== modified file 'lib/lp/code/model/tests/test_branchlookup.py'
--- lib/lp/code/model/tests/test_branchlookup.py	2012-05-31 03:54:13 +0000
+++ lib/lp/code/model/tests/test_branchlookup.py	2012-07-03 20:16:32 +0000
@@ -21,8 +21,10 @@
     )
 from lp.code.interfaces.branchnamespace import get_branch_namespace
 from lp.code.interfaces.codehosting import (
+    BRANCH_ALIAS_PREFIX,
     branch_id_alias,
     BRANCH_ID_ALIAS_PREFIX,
+    compose_public_url,
     )
 from lp.code.interfaces.linkedbranch import ICanHasLinkedBranch
 from lp.registry.enums import InformationType
@@ -76,106 +78,79 @@
         self.assertEqual(branch, found_branch)
 
 
-class TestGetIdAndTrailingPath(TestCaseWithFactory):
+class TestGetByHostingPath(TestCaseWithFactory):
     """Tests for `IBranchLookup.getIdAndTrailingPath`."""
 
     layer = DatabaseFunctionalLayer
 
     def setUp(self):
         TestCaseWithFactory.setUp(self)
-        self.branch_set = getUtility(IBranchLookup)
+        self.lookup = getUtility(IBranchLookup)
 
     def test_not_found(self):
         unused_name = self.factory.getUniqueString()
-        result = self.branch_set.getIdAndTrailingPath('/' + unused_name)
-        self.assertEqual((None, None), result)
+        result = self.lookup.getByHostingPath('' + unused_name)
+        self.assertEqual((None, ''), result)
 
     def test_junk(self):
         branch = self.factory.makePersonalBranch()
-        result = self.branch_set.getIdAndTrailingPath(
-            '/' + branch.unique_name)
-        self.assertEqual((branch.id, ''), result)
+        result = self.lookup.getByHostingPath(branch.unique_name)
+        self.assertEqual((branch, ''), result)
 
     def test_product(self):
         branch = self.factory.makeProductBranch()
-        result = self.branch_set.getIdAndTrailingPath(
-            '/' + branch.unique_name)
-        self.assertEqual((branch.id, ''), result)
+        result = self.lookup.getByHostingPath(branch.unique_name)
+        self.assertEqual((branch, ''), result)
 
     def test_source_package(self):
         branch = self.factory.makePackageBranch()
-        result = self.branch_set.getIdAndTrailingPath(
-            '/' + branch.unique_name)
-        self.assertEqual((branch.id, ''), result)
+        result = self.lookup.getByHostingPath(branch.unique_name)
+        self.assertEqual((branch, ''), result)
 
     def test_trailing_slash(self):
         branch = self.factory.makeAnyBranch()
-        result = self.branch_set.getIdAndTrailingPath(
-            '/' + branch.unique_name + '/')
-        self.assertEqual((branch.id, '/'), result)
+        result = self.lookup.getByHostingPath(branch.unique_name + '/')
+        self.assertEqual((branch, '/'), result)
 
     def test_trailing_path(self):
         branch = self.factory.makeAnyBranch()
         path = self.factory.getUniqueString()
-        result = self.branch_set.getIdAndTrailingPath(
-            '/' + branch.unique_name + '/' + path)
-        self.assertEqual((branch.id, '/' + path), result)
+        result = self.lookup.getByHostingPath(
+            branch.unique_name + '/' + path)
+        self.assertEqual((branch, '/' + path), result)
 
     def test_branch_id_alias(self):
         # The prefix by itself returns no branch, and no path.
         path = BRANCH_ID_ALIAS_PREFIX
-        result = self.branch_set.getIdAndTrailingPath('/' + path)
-        self.assertEqual((None, None), result)
+        result = self.lookup.getByHostingPath(path)
+        self.assertEqual((None, ''), result)
 
     def test_branch_id_alias_not_int(self):
         # The prefix followed by a non-integer returns no branch and no path.
         path = BRANCH_ID_ALIAS_PREFIX + '/foo'
-        result = self.branch_set.getIdAndTrailingPath('/' + path)
-        self.assertEqual((None, None), result)
-
-    def test_branch_id_alias_private(self):
-        # Private branches are not found at all (this is for anonymous access)
-        owner = self.factory.makePerson()
-        branch = self.factory.makeAnyBranch(
-            owner=owner, information_type=InformationType.USERDATA)
-        with person_logged_in(owner):
-            path = branch_id_alias(branch)
-        result = self.branch_set.getIdAndTrailingPath(path)
-        self.assertEqual((None, None), result)
-
-    def test_branch_id_alias_transitive_private(self):
-        # Transitively private branches are not found at all
-        # (this is for anonymous access)
-        owner = self.factory.makePerson()
-        private_branch = self.factory.makeAnyBranch(
-            owner=owner, information_type=InformationType.USERDATA)
-        branch = self.factory.makeAnyBranch(
-            stacked_on=private_branch, owner=owner)
-        with person_logged_in(owner):
-            path = branch_id_alias(branch)
-        result = self.branch_set.getIdAndTrailingPath(path)
-        self.assertEqual((None, None), result)
+        result = self.lookup.getByHostingPath(path)
+        self.assertEqual((None, ''), result)
 
     def test_branch_id_alias_public(self):
         # Public branches can be accessed.
         branch = self.factory.makeAnyBranch()
         path = branch_id_alias(branch)
-        result = self.branch_set.getIdAndTrailingPath(path)
-        self.assertEqual((branch.id, ''), result)
+        result = self.lookup.getByHostingPath(path.lstrip('/'))
+        self.assertEqual((branch, ''), result)
 
     def test_branch_id_alias_public_slash(self):
         # A trailing slash is returned as the extra path.
         branch = self.factory.makeAnyBranch()
         path = '%s/' % branch_id_alias(branch)
-        result = self.branch_set.getIdAndTrailingPath(path)
-        self.assertEqual((branch.id, '/'), result)
+        result = self.lookup.getByHostingPath(path.lstrip('/'))
+        self.assertEqual((branch, '/'), result)
 
     def test_branch_id_alias_public_with_path(self):
         # All the path after the number is returned as the trailing path.
         branch = self.factory.makeAnyBranch()
         path = '%s/foo' % branch_id_alias(branch)
-        result = self.branch_set.getIdAndTrailingPath(path)
-        self.assertEqual((branch.id, '/foo'), result)
+        result = self.lookup.getByHostingPath(path.lstrip('/'))
+        self.assertEqual((branch, '/foo'), result)
 
 
 class TestGetByPath(TestCaseWithFactory):
@@ -200,7 +175,7 @@
         branch = self.factory.makePersonalBranch()
         found_branch, suffix = self.getByPath(branch.unique_name)
         self.assertEqual(branch, found_branch)
-        self.assertEqual(None, suffix)
+        self.assertEqual('', suffix)
 
     def test_finds_suffixed_personal_branch(self):
         branch = self.factory.makePersonalBranch()
@@ -228,7 +203,7 @@
         branch = self.factory.makeProductBranch()
         found_branch, suffix = self.getByPath(branch.unique_name)
         self.assertEqual(branch, found_branch)
-        self.assertEqual(None, suffix)
+        self.assertEqual('', suffix)
 
     def test_finds_suffixed_product_branch(self):
         branch = self.factory.makeProductBranch()
@@ -258,7 +233,7 @@
         branch = self.factory.makePackageBranch()
         found_branch, suffix = self.getByPath(branch.unique_name)
         self.assertEqual(branch, found_branch)
-        self.assertEqual(None, suffix)
+        self.assertEqual('', suffix)
 
     def test_missing_package_branch(self):
         owner = self.factory.makePerson()
@@ -379,6 +354,22 @@
         branch2 = branch_set.getByUrl('lp://production/~aa/b/c')
         self.assertEqual(branch, branch2)
 
+    def test_getByUrl_with_alias(self):
+        """getByUrl works with +branch aliases."""
+        product = make_product_with_branch(self.factory)
+        branch = product.development_focus.branch
+        path = BRANCH_ALIAS_PREFIX + '/' + product.name
+        url = compose_public_url('bzr+ssh', path)
+        branch_lookup = getUtility(IBranchLookup)
+        self.assertEqual(branch, branch_lookup.getByUrl(url))
+
+    def test_getByUrl_with_id_alias(self):
+        """getByUrl works with +branch-id aliases."""
+        branch = self.factory.makeAnyBranch()
+        url = compose_public_url('bzr+ssh', branch_id_alias(branch))
+        branch_lookup = getUtility(IBranchLookup)
+        self.assertEqual(branch, branch_lookup.getByUrl(url))
+
     def test_getByUrls(self):
         # getByUrls returns a dictionary mapping branches to URLs.
         branch1 = self.factory.makeAnyBranch()
@@ -392,30 +383,30 @@
              branch2.bzr_identity: branch2,
              url3: None}, branches)
 
-    def test_uriToUniqueName(self):
-        """Ensure uriToUniqueName works.
+    def test_uriToHostingPath(self):
+        """Ensure uriToHostingPath works.
 
         Only codehosting-based using http, sftp or bzr+ssh URLs will
-        be handled. If any other URL gets passed the returned will be
-        None.
+        be handled. If any other URL gets passed (including lp), the return
+        value will be None.
         """
         branch_set = getUtility(IBranchLookup)
         uri = URI(config.codehosting.supermirror_root)
         uri.path = '/~foo/bar/baz'
         # Test valid schemes
         uri.scheme = 'http'
-        self.assertEqual('~foo/bar/baz', branch_set.uriToUniqueName(uri))
+        self.assertEqual('~foo/bar/baz', branch_set.uriToHostingPath(uri))
         uri.scheme = 'sftp'
-        self.assertEqual('~foo/bar/baz', branch_set.uriToUniqueName(uri))
+        self.assertEqual('~foo/bar/baz', branch_set.uriToHostingPath(uri))
         uri.scheme = 'bzr+ssh'
-        self.assertEqual('~foo/bar/baz', branch_set.uriToUniqueName(uri))
+        self.assertEqual('~foo/bar/baz', branch_set.uriToHostingPath(uri))
         # Test invalid scheme
         uri.scheme = 'ftp'
-        self.assertIs(None, branch_set.uriToUniqueName(uri))
+        self.assertIs(None, branch_set.uriToHostingPath(uri))
         # Test valid scheme, invalid domain
         uri.scheme = 'sftp'
         uri.host = 'example.com'
-        self.assertIs(None, branch_set.uriToUniqueName(uri))
+        self.assertIs(None, branch_set.uriToHostingPath(uri))
 
 
 class TestLinkedBranchTraverser(TestCaseWithFactory):
@@ -523,6 +514,12 @@
             NoSuchSourcePackageName, self.traverser.traverse, path)
 
 
+def make_product_with_branch(factory):
+    branch = factory.makeProductBranch()
+    removeSecurityProxy(branch.product).development_focus.branch = branch
+    return branch.product
+
+
 class TestGetByLPPath(TestCaseWithFactory):
     """Ensure URLs are correctly expanded."""
 
@@ -570,7 +567,7 @@
         # given the unique name of an existing product branch.
         branch = self.factory.makeProductBranch()
         self.assertEqual(
-            (branch, None),
+            (branch, ''),
             self.branch_lookup.getByLPPath(branch.unique_name))
 
     def test_resolve_product_branch_unique_name_with_trailing(self):
@@ -596,7 +593,7 @@
         # given the unique name of an existing junk branch.
         branch = self.factory.makePersonalBranch()
         self.assertEqual(
-            (branch, None),
+            (branch, ''),
             self.branch_lookup.getByLPPath(branch.unique_name))
 
     def test_resolve_personal_branch_unique_name_with_trailing(self):
@@ -619,7 +616,7 @@
             registrant,
             ICanHasLinkedBranch(distro_package).setBranch, branch, registrant)
         self.assertEqual(
-            (branch, None),
+            (branch, ''),
             self.branch_lookup.getByLPPath(
                 '%s/%s' % (
                     distro_package.distribution.name,
@@ -713,13 +710,32 @@
         # NoSuchProductSeries error, since we can't tell the difference
         # between a trailing path and an attempt to load a non-existent series
         # branch.
-        branch = self.factory.makeProductBranch()
-        product = removeSecurityProxy(branch.product)
-        product.development_focus.branch = branch
+        product = make_product_with_branch(self.factory)
         self.assertRaises(
             NoSuchProductSeries,
             self.branch_lookup.getByLPPath, '%s/other/bits' % product.name)
 
+    def test_product_with_bzr_suffix(self):
+        # A '.bzr' suffix is returned correctly.
+        product = make_product_with_branch(self.factory)
+        branch, suffix = self.branch_lookup.getByLPPath(
+            '%s/.bzr' % product.name)
+        self.assertEqual('.bzr', suffix)
+
+    def test_product_with_bzr_slash_suffix(self):
+        # A '.bzr/' suffix is returned correctly.
+        product = make_product_with_branch(self.factory)
+        branch, suffix = self.branch_lookup.getByLPPath(
+            '%s/.bzr/' % product.name)
+        self.assertEqual('.bzr/', suffix)
+
+    def test_product_with_bzr_extra_suffix(self):
+        # A '.bzr/extra' suffix is returned correctly.
+        product = make_product_with_branch(self.factory)
+        branch, suffix = self.branch_lookup.getByLPPath(
+            '%s/.bzr/extra' % product.name)
+        self.assertEqual('.bzr/extra', suffix)
+
     def test_too_long_product_series(self):
         # If the provided path points to an existing product series with a
         # linked branch but is followed by extra path segments, then we return

=== modified file 'lib/lp/code/model/tests/test_branchnamespace.py'
--- lib/lp/code/model/tests/test_branchnamespace.py	2012-06-08 06:01:50 +0000
+++ lib/lp/code/model/tests/test_branchnamespace.py	2012-07-03 20:16:32 +0000
@@ -667,48 +667,6 @@
                  distroseries='jaunty', sourcepackagename='foo'),
             self.namespace_set.parse('~foo/ubuntu/jaunty/foo'))
 
-    def test_parseBranchPath_junk_path(self):
-        # parseBranchPath takes a path within a branch and returns the dict of
-        # the namespace that it's in, as well as the part of the path that is
-        # within the namespace.
-        path = '~foo/+junk/bar/README'
-        parse_results = list(self.namespace_set.parseBranchPath(path))
-        expected_results = [
-            (dict(person='foo', product='+junk', distribution=None,
-                  distroseries=None, sourcepackagename=None),
-             'bar', 'README')]
-        self.assertEqual(expected_results, parse_results)
-
-    def test_parseBranchPath_product_path(self):
-        path = '~foo/bar/baz/README'
-        parse_results = list(self.namespace_set.parseBranchPath(path))
-        expected_results = [
-            (dict(person='foo', product='bar', distribution=None,
-                  distroseries=None, sourcepackagename=None),
-             'baz', 'README')]
-        self.assertEqual(expected_results, parse_results)
-
-    def test_parseBranchPath_package_path(self):
-        path = '~foo/bar/baz/qux/branch/README'
-        parse_results = list(self.namespace_set.parseBranchPath(path))
-        expected_results = [
-            (dict(person='foo', product=None, distribution='bar',
-                  distroseries='baz', sourcepackagename='qux'), 'branch',
-             'README'),
-            (dict(person='foo', product='bar', distribution=None,
-                  distroseries=None, sourcepackagename=None), 'baz',
-             'qux/branch/README')]
-        self.assertEqual(sorted(expected_results), sorted(parse_results))
-
-    def test_parseBranchPath_invalid_path(self):
-        path = 'foo/bar/baz/qux/branch/README'
-        self.assertRaises(
-            InvalidNamespace, list, self.namespace_set.parseBranchPath(path))
-
-    def test_parseBranchPath_empty(self):
-        self.assertRaises(
-            InvalidNamespace, list, self.namespace_set.parseBranchPath(''))
-
     def test_interpret_product_aliases(self):
         # Products can have aliases. IBranchNamespaceSet.interpret will find a
         # product given its alias.

=== modified file 'lib/lp/code/xmlrpc/branch.py'
--- lib/lp/code/xmlrpc/branch.py	2012-06-29 08:40:05 +0000
+++ lib/lp/code/xmlrpc/branch.py	2012-07-03 20:16:32 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 # Disable pylint 'should have "self" as first argument' warnings.
@@ -325,10 +325,10 @@
         # Reverse engineer the actual lp_path that is used, so we need to
         # remove any suffix that may be there from the strip_path.
         lp_path = strip_path
-        if suffix is not None:
+        if suffix != '':
             # E.g. 'project/trunk/filename.txt' the suffix is 'filename.txt'
             # we want lp_path to be 'project/trunk'.
-            lp_path = lp_path[:-(len(suffix)+1)]
+            lp_path = lp_path[:-(len(suffix) + 1)]
         return self._getUrlsForBranch(
             branch, lp_path, suffix, supported_schemes)
 

=== modified file 'lib/lp/code/xmlrpc/codehosting.py'
--- lib/lp/code/xmlrpc/codehosting.py	2012-06-29 08:40:05 +0000
+++ lib/lp/code/xmlrpc/codehosting.py	2012-07-03 20:16:32 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Implementations of the XML-RPC APIs for codehosting."""
@@ -32,14 +32,13 @@
 from lp.code.enums import BranchType
 from lp.code.errors import (
     BranchCreationException,
-    CannotHaveLinkedBranch,
     InvalidNamespace,
-    NoLinkedBranch,
     UnknownBranchTypeError,
     )
 from lp.code.interfaces import branchpuller
 from lp.code.interfaces.branch import get_db_branch_info
 from lp.code.interfaces.branchlookup import (
+    get_first_path_result,
     IBranchLookup,
     ILinkedBranchTraverser,
     )
@@ -51,7 +50,6 @@
 from lp.code.interfaces.codehosting import (
     BRANCH_ALIAS_PREFIX,
     branch_id_alias,
-    BRANCH_ID_ALIAS_PREFIX,
     BRANCH_TRANSPORT,
     CONTROL_TRANSPORT,
     ICodehostingAPI,
@@ -68,12 +66,10 @@
     NoSuchPerson,
     )
 from lp.registry.interfaces.product import (
-    InvalidProductName,
     NoSuchProduct,
     )
 from lp.registry.interfaces.sourcepackagename import ISourcePackageNameSet
 from lp.services.scripts.interfaces.scriptactivity import IScriptActivitySet
-from lp.services.utils import iter_split
 from lp.services.webapp import LaunchpadXMLRPCView
 from lp.services.webapp.authorization import check_permission
 from lp.services.webapp.interaction import setupInteractionForPerson
@@ -313,12 +309,12 @@
             {'id': branch_id, 'writable': writable},
             trailing_path)
 
-    def _serializeControlDirectory(self, requester, product_path,
-                                   trailing_path):
+    def _serializeControlDirectory(self, requester, lookup):
         try:
-            namespace = lookup_branch_namespace(product_path)
+            namespace = lookup_branch_namespace(lookup['control_name'])
         except (InvalidNamespace, NotFoundError):
             return
+        trailing_path = lookup['trailing'].lstrip('/')
         if not ('.bzr' == trailing_path or trailing_path.startswith('.bzr/')):
             # '.bzr' is OK, '.bzr/foo' is OK, '.bzrfoo' is not.
             return
@@ -332,26 +328,21 @@
         return (
             CONTROL_TRANSPORT,
             {'default_stack_on': escape(path)},
-            trailing_path)
+            escape(trailing_path))
 
-    def _translateBranchIdAlias(self, requester, path):
-        # If the path isn't a branch id alias, nothing more to do.
-        stripped_path = unescape(path.strip('/'))
-        if not stripped_path.startswith(BRANCH_ID_ALIAS_PREFIX + '/'):
+    def performLookup(self, requester, path, lookup):
+        looker = getUtility(IBranchLookup)
+        if lookup['type'] == 'control_name':
+            return self._serializeControlDirectory(requester, lookup)
+        branch, trailing = looker.performLookup(lookup)
+        if branch is None:
             return None
-        try:
-            parts = stripped_path.split('/', 2)
-            branch_id = int(parts[1])
-        except (ValueError, IndexError):
-            raise faults.PathTranslationError(path)
-        branch = getUtility(IBranchLookup).get(branch_id)
-        if branch is None:
-            raise faults.PathTranslationError(path)
-        try:
-            trailing = parts[2]
-        except IndexError:
-            trailing = ''
-        return self._serializeBranch(requester, branch, trailing, True)
+        trailing = trailing.lstrip('/')
+        serialized = self._serializeBranch(requester, branch, trailing,
+                                           lookup['type'] == 'id')
+        if serialized is None:
+            raise faults.PathTranslationError(path)
+        return serialized
 
     def translatePath(self, requester_id, path):
         """See `ICodehostingAPI`."""
@@ -359,47 +350,10 @@
         def translate_path(requester):
             if not path.startswith('/'):
                 return faults.InvalidPath(path)
-            branch = self._translateBranchIdAlias(requester, path)
-            if branch is not None:
-                return branch
-            stripped_path = path.strip('/')
-            for first, second in iter_split(stripped_path, '/'):
-                first = unescape(first)
-                # Is it a branch?
-                if first.startswith(BRANCH_ALIAS_PREFIX + '/'):
-                    try:
-                        # translatePath('/+branch/.bzr') *must* return not
-                        # found, otherwise bzr will look for it and we don't
-                        # have a global bzr dir.
-                        lp_path = first[len(BRANCH_ALIAS_PREFIX + '/'):]
-                        if lp_path == '.bzr' or lp_path.startswith('.bzr/'):
-                            raise faults.PathTranslationError(path)
-                        branch, trailing = getUtility(
-                            IBranchLookup).getByLPPath(lp_path)
-                    except (InvalidProductName, NoLinkedBranch,
-                            CannotHaveLinkedBranch):
-                        # If we get one of these errors, then there is no
-                        # point walking back through the path parts.
-                        break
-                    except (NameLookupFailed, InvalidNamespace):
-                        # The reason we're doing it is that getByLPPath thinks
-                        # that 'foo/.bzr' is a request for the '.bzr' series
-                        # of a product.
-                        continue
-                    if trailing is None:
-                        trailing = ''
-                    second = '/'.join([trailing, second]).strip('/')
-                else:
-                    branch = getUtility(IBranchLookup).getByUniqueName(first)
-                if branch is not None:
-                    branch = self._serializeBranch(requester, branch, second)
-                    if branch is None:
-                        break
-                    return branch
-                # Is it a product control directory?
-                product = self._serializeControlDirectory(
-                    requester, first, second)
-                if product is not None:
-                    return product
-            raise faults.PathTranslationError(path)
+            stripped_path = unescape(path.strip('/'))
+            lookup = lambda l: self.performLookup(requester_id, path, l)
+            result = get_first_path_result(stripped_path, lookup, None)
+            if result is None:
+                raise faults.PathTranslationError(path)
+            return result
         return run_with_login(requester_id, translate_path)

=== modified file 'lib/lp/codehosting/inmemory.py'
--- lib/lp/codehosting/inmemory.py	2012-06-29 08:40:05 +0000
+++ lib/lp/codehosting/inmemory.py	2012-07-03 20:16:32 +0000
@@ -33,11 +33,11 @@
 from lp.code.enums import BranchType
 from lp.code.errors import UnknownBranchTypeError
 from lp.code.interfaces.branch import IBranch
+from lp.code.interfaces.branchlookup import get_first_path_result
 from lp.code.interfaces.branchtarget import IBranchTarget
 from lp.code.interfaces.codehosting import (
     BRANCH_ALIAS_PREFIX,
     branch_id_alias,
-    BRANCH_ID_ALIAS_PREFIX,
     BRANCH_TRANSPORT,
     CONTROL_TRANSPORT,
     LAUNCHPAD_ANONYMOUS,
@@ -58,7 +58,6 @@
 from lp.registry.errors import InvalidName
 from lp.registry.interfaces.pocket import PackagePublishingPocket
 from lp.services.database.constants import UTC_NOW
-from lp.services.utils import iter_split
 from lp.services.xmlrpc import LaunchpadFault
 from lp.testing.factory import ObjectFactory
 from lp.xmlrpc import faults
@@ -801,13 +800,13 @@
         return self._factory.makeSourcePackage(
             distroseries, sourcepackagename)
 
-    def _serializeControlDirectory(self, requester, product_path,
-                                   trailing_path):
+    def _serializeControlDirectory(self, requester, lookup):
+        trailing_path = lookup['trailing'].lstrip('/')
         if not ('.bzr' == trailing_path or trailing_path.startswith('.bzr/')):
             return
-        target = self._get_product_target(product_path)
+        target = self._get_product_target(lookup['control_name'])
         if target is None:
-            target = self._get_package_target(product_path)
+            target = self._get_package_target(lookup['control_name'])
         if target is None:
             return
         default_branch = IBranchTarget(target).default_stacked_on_branch
@@ -836,59 +835,51 @@
             {'id': branch.id, 'writable': writable},
             trailing_path)
 
-    def _translateBranchIdAlias(self, requester, path):
-        # If the path isn't a branch id alias, nothing more to do.
-        stripped_path = unescape(path.strip('/'))
-        if not stripped_path.startswith(BRANCH_ID_ALIAS_PREFIX + '/'):
+    def performLookup(self, requester_id, lookup, branch_name_only=False):
+        branch = None
+        if branch_name_only and lookup['type'] != 'branch_name':
+            return
+        if lookup['type'] == 'control_name':
+            return self._serializeControlDirectory(requester_id,
+                                                   lookup)
+        elif lookup['type'] == 'id':
+            branch = self._branch_set.get(lookup['branch_id'])
+            if branch is None:
+                return None
+            trailing = lookup['trailing']
+        elif lookup['type'] == 'alias':
+            result = get_first_path_result(lookup['lp_path'],
+                lambda l: self.performLookup(requester_id, l,
+                    branch_name_only=True), None)
+            if result is not None:
+                return result
+            product_name = lookup['lp_path'].split('/', 2)[0]
+            product = self._product_set.getByName(product_name)
+            if product is None:
+                return None
+            branch = product.development_focus.branch
+            trailing = lookup['lp_path'][len(product_name):]
+        elif lookup['type'] == 'branch_name':
+            branch = self._branch_set._find(
+                unique_name=lookup['unique_name'])
+            trailing = escape(lookup['trailing'])
+        else:
             return None
-        try:
-            parts = stripped_path.split('/', 2)
-            branch_id = int(parts[1])
-        except (ValueError, IndexError):
-            return faults.PathTranslationError(path)
-        branch = self._branch_set.get(branch_id)
-        if branch is None:
-            return faults.PathTranslationError(path)
-        try:
-            trailing = parts[2]
-        except IndexError:
-            trailing = ''
-        return self._serializeBranch(requester, branch, trailing, True)
+        if branch is not None:
+            serialized = self._serializeBranch(requester_id, branch,
+                trailing.lstrip('/'), lookup['type'] == 'id')
+            if serialized is not None:
+                return serialized
 
     def translatePath(self, requester_id, path):
         if not path.startswith('/'):
             return faults.InvalidPath(path)
-        branch = self._translateBranchIdAlias(requester_id, path)
-        if branch is not None:
-            return branch
-        stripped_path = path.strip('/')
-        for first, second in iter_split(stripped_path, '/'):
-            first = unescape(first).encode('utf-8')
-            # Is it a branch?
-            if first.startswith('+branch/'):
-                component_name = first[len('+branch/'):]
-                product = self._product_set.getByName(component_name)
-                if product:
-                    branch = product.development_focus.branch
-                else:
-                    branch = self._branch_set._find(
-                        unique_name=component_name)
-            else:
-                branch = self._branch_set._find(unique_name=first)
-            if branch is not None:
-                branch = self._serializeBranch(requester_id, branch, second)
-                if isinstance(branch, Fault):
-                    return branch
-                elif branch is None:
-                    break
-                return branch
-
-            # Is it a product?
-            product = self._serializeControlDirectory(
-                requester_id, first, second)
-            if product:
-                return product
-        return faults.PathTranslationError(path)
+        result = get_first_path_result(unescape(path.strip('/')),
+            lambda l: self.performLookup(requester_id, l), None)
+        if result is not None:
+            return result
+        else:
+            return faults.PathTranslationError(path)
 
 
 class InMemoryFrontend:

=== modified file 'lib/lp/codehosting/rewrite.py'
--- lib/lp/codehosting/rewrite.py	2012-01-01 02:58:52 +0000
+++ lib/lp/codehosting/rewrite.py	2012-07-03 20:16:32 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Implementation of the dynamic RewriteMap used to serve branches over HTTP.
@@ -8,6 +8,7 @@
 
 from bzrlib import urlutils
 from zope.component import getUtility
+from zope.security.interfaces import Unauthorized
 
 from lp.code.interfaces.branchlookup import IBranchLookup
 from lp.code.interfaces.codehosting import BRANCH_ID_ALIAS_PREFIX
@@ -55,15 +56,19 @@
                 branch_id, inserted_time = self._cache[first]
                 if (self._now() < inserted_time +
                     config.codehosting.branch_rewrite_cache_lifetime):
-                    return branch_id, '/' + second, "HIT"
-        branch_id, trailing = getUtility(IBranchLookup).getIdAndTrailingPath(
-            location, from_slave=True)
-        if branch_id is None:
-            return None, None, "MISS"
-        else:
-            unique_name = location[1:-len(trailing)]
-            self._cache[unique_name] = (branch_id, self._now())
-            return branch_id, trailing, "MISS"
+                    return branch_id, second, "HIT"
+        lookup = getUtility(IBranchLookup)
+        branch, trailing = lookup.getByHostingPath(location.lstrip('/'))
+        if branch is not None:
+            try:
+                branch_id = branch.id
+            except Unauthorized:
+                pass
+            else:
+                unique_name = location[1:-len(trailing)]
+                self._cache[unique_name] = (branch_id, self._now())
+                return branch_id, trailing, "MISS"
+        return None, None, "MISS"
 
     def rewriteLine(self, resource_location):
         """Rewrite 'resource_location' to a more concrete location.
@@ -108,7 +113,8 @@
                 branch_id, trailing, cached = self._getBranchIdAndTrailingPath(
                     resource_location)
                 if branch_id is None:
-                    if resource_location.startswith('/' + BRANCH_ID_ALIAS_PREFIX):
+                    if resource_location.startswith(
+                            '/' + BRANCH_ID_ALIAS_PREFIX):
                         r = 'NULL'
                     else:
                         r = self._codebrowse_url(resource_location)

=== modified file 'lib/lp/codehosting/tests/test_rewrite.py'
--- lib/lp/codehosting/tests/test_rewrite.py	2012-05-31 03:54:13 +0000
+++ lib/lp/codehosting/tests/test_rewrite.py	2012-07-03 20:16:32 +0000
@@ -22,6 +22,7 @@
 from lp.testing import (
     FakeTime,
     nonblocking_readline,
+    person_logged_in,
     TestCase,
     TestCaseWithFactory,
     )
@@ -226,6 +227,29 @@
             '/' + branch.unique_name + '/.bzr/README')
         self.assertEqual(id_path + ('HIT',), result)
 
+    def test_branch_id_alias_private(self):
+        # Private branches are not found at all (this is for anonymous access)
+        owner = self.factory.makePerson()
+        branch = self.factory.makeAnyBranch(
+            owner=owner, information_type=InformationType.USERDATA)
+        with person_logged_in(owner):
+            path = branch_id_alias(branch)
+        result = self.makeRewriter()._getBranchIdAndTrailingPath(path)
+        self.assertEqual((None, None, 'MISS'), result)
+
+    def test_branch_id_alias_transitive_private(self):
+        # Transitively private branches are not found at all
+        # (this is for anonymous access)
+        owner = self.factory.makePerson()
+        private_branch = self.factory.makeAnyBranch(
+            owner=owner, information_type=InformationType.USERDATA)
+        branch = self.factory.makeAnyBranch(
+            stacked_on=private_branch, owner=owner)
+        with person_logged_in(owner):
+            path = branch_id_alias(branch)
+        result = self.makeRewriter()._getBranchIdAndTrailingPath(path)
+        self.assertEqual((None, None, 'MISS'), result)
+
 
 class TestBranchRewriterScript(TestCaseWithFactory):
     """Acceptance test for the branch-rewrite.py script."""

=== modified file 'lib/lp/services/utils.py'
--- lib/lp/services/utils.py	2012-05-31 16:30:03 +0000
+++ lib/lp/services/utils.py	2012-07-03 20:16:32 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Generic Python utilities.
@@ -116,23 +116,28 @@
     return base(int(hash_obj.hexdigest(), 16), 62)
 
 
-def iter_split(string, splitter):
+def iter_split(string, splitter, splits=None):
     """Iterate over ways to split 'string' in two with 'splitter'.
 
     If 'string' is empty, then yield nothing. Otherwise, yield tuples like
-    ('a/b/c', ''), ('a/b', 'c'), ('a', 'b/c') for a string 'a/b/c' and a
+    ('a/b/c', ''), ('a/b', '/c'), ('a', '/b/c') for a string 'a/b/c' and a
     splitter '/'.
 
-    The tuples are yielded such that the first tuple has everything in the
+    The tuples are yielded such that the first result has everything in the
     first tuple. With each iteration, the first element gets smaller and the
     second gets larger. It stops iterating just before it would have to yield
     ('', 'a/b/c').
+
+    Splits, if specified, is an iterable of splitters to split the string at.
     """
     if string == '':
         return
     tokens = string.split(splitter)
-    for i in reversed(range(1, len(tokens) + 1)):
-        yield splitter.join(tokens[:i]), splitter.join(tokens[i:])
+    if splits is None:
+        splits = reversed(range(1, len(tokens) + 1))
+    for i in splits:
+        first = splitter.join(tokens[:i])
+        yield first, string[len(first):]
 
 
 def iter_list_chunks(a_list, size):

=== modified file 'scripts/branch-rewrite.py'
--- scripts/branch-rewrite.py	2012-01-01 03:13:08 +0000
+++ scripts/branch-rewrite.py	2012-07-03 20:16:32 +0000
@@ -1,6 +1,6 @@
 #!/usr/bin/python -uS
 #
-# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 # pylint: disable-msg=W0403
@@ -13,14 +13,16 @@
 """
 
 import _pythonpath
+# quiet pyflakes
+_pythonpath
 
 import os
 import sys
 
-from lp.code.model.branch import Branch
+import transaction
+
 from lp.codehosting.rewrite import BranchRewriter
 from lp.services.config import config
-from lp.services.database.lpstorm import ISlaveStore
 from lp.services.log.loglevels import (
     INFO,
     WARNING,
@@ -56,6 +58,7 @@
         while True:
             try:
                 line = sys.stdin.readline()
+                transaction.abort()
                 # Mod-rewrite always gives us a newline terminated string.
                 if line:
                     print rewriter.rewriteLine(line.strip())
@@ -70,13 +73,10 @@
                 # The exception might have been a DisconnectionError or
                 # similar. Cleanup such as database reconnection will
                 # not happen until the transaction is rolled back.
-                # XXX StuartBishop 2011-08-31 bug=819282: We are
-                # explicitly rolling back the store here as a workaround
-                # instead of using transaction.abort()
                 try:
-                    ISlaveStore(Branch).rollback()
+                    transaction.abort()
                 except Exception:
-                    self.logger.exception('Exception occurred in rollback:')
+                    self.logger.exception('Exception occurred in abort:')
 
 
 if __name__ == '__main__':


Follow ups