← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~jml/launchpad/login-helper-love into lp:launchpad/devel

 

Jonathan Lange has proposed merging lp:~jml/launchpad/login-helper-love into lp:launchpad/devel.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)


This moves a bunch of login helpers to the _login module, which groups them all together nicely.

It gets rid of a bunch of custom login helpers from test_codeimportjob and makes the module completely non-reliant on sample data.

In the course of doing this, we add some generic-ish helpers for doing context manager -> decorator stuff, and a loginAsAnyone helper to the factory.

All of these changes are tested.
-- 
https://code.launchpad.net/~jml/launchpad/login-helper-love/+merge/30185
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jml/launchpad/login-helper-love into lp:launchpad/devel.
=== modified file 'lib/lp/code/model/tests/test_codeimportjob.py'
--- lib/lp/code/model/tests/test_codeimportjob.py	2010-07-10 14:20:23 +0000
+++ lib/lp/code/model/tests/test_codeimportjob.py	2010-07-17 19:03:44 +0000
@@ -18,8 +18,6 @@
 
 import transaction
 
-from twisted.python.util import mergeFunctionMetadata
-
 from zope.component import getUtility
 from zope.security.proxy import removeSecurityProxy
 
@@ -36,10 +34,9 @@
 from lp.code.interfaces.codeimportjob import (
     ICodeImportJobSet, ICodeImportJobWorkflow)
 from lp.code.interfaces.codeimportresult import ICodeImportResult
-from lp.registry.interfaces.person import IPersonSet
 from lp.testing import (
-    ANONYMOUS, login, login_celebrity, logout, TestCaseWithFactory)
-from lp.testing.sampledata import NO_PRIVILEGE_EMAIL, VCS_IMPORTS_MEMBER_EMAIL
+    ANONYMOUS, login, login_celebrity, logout, TestCaseWithFactory,
+    with_anonymous_login, with_celebrity_logged_in)
 from canonical.launchpad.testing.codeimporthelpers import (
     make_finished_import, make_running_import)
 from canonical.launchpad.testing.pages import get_feedback_messages
@@ -960,23 +957,7 @@
             CodeImportReviewStatus.FAILING, code_import.review_status)
 
 
-def logged_in_as(email):
-    """Return a decorator that wraps functions to runs logged in as `email`.
-    """
-    def decorator(function):
-        def decorated(*args, **kw):
-            login(email)
-            try:
-                return function(*args, **kw)
-            finally:
-                logout()
-        return mergeFunctionMetadata(function, decorated)
-    return decorator
-
-
-# This is a dependence on the sample data: David Allouche is a member of the
-# ~vcs-imports celebrity team.
-logged_in_for_code_imports = logged_in_as(VCS_IMPORTS_MEMBER_EMAIL)
+logged_in_for_code_imports = with_celebrity_logged_in('vcs_imports')
 
 
 class TestRequestJobUIRaces(TestCaseWithFactory):
@@ -998,12 +979,15 @@
         code_import_id = code_import.id
         return code_import_id, branch_url
 
-    @logged_in_as(NO_PRIVILEGE_EMAIL)
     def requestJobByUserWithDisplayName(self, code_import_id, displayname):
         """Record a request for the job by a user with the given name."""
-        getUtility(ICodeImportJobWorkflow).requestJob(
-            getUtility(ICodeImportSet).get(code_import_id).import_job,
-            self.factory.makePerson(displayname=displayname))
+        self.factory.loginAsAnyone()
+        try:
+            getUtility(ICodeImportJobWorkflow).requestJob(
+                getUtility(ICodeImportSet).get(code_import_id).import_job,
+                self.factory.makePerson(displayname=displayname))
+        finally:
+            logout()
 
     @logged_in_for_code_imports
     def deleteJob(self, code_import_id):
@@ -1013,7 +997,7 @@
         getUtility(ICodeImportSet).get(code_import_id).updateFromData(
             {'review_status': CodeImportReviewStatus.SUSPENDED}, user)
 
-    @logged_in_as(ANONYMOUS)
+    @with_anonymous_login
     def startJob(self, code_import_id):
         """Mark the job as started on an arbitrary machine."""
         getUtility(ICodeImportJobWorkflow).startJob(
@@ -1037,6 +1021,8 @@
         code_import_id, branch_url = self.getNewCodeImportIDAndBranchURL()
         user_browser = self.getUserBrowser(branch_url)
         self.deleteJob(code_import_id)
+        # user_browser fails when we are logged in.
+        logout()
         user_browser.getControl('Import Now').click()
         self.assertEqual(
             [u'This import is no longer being updated automatically.'],
@@ -1047,6 +1033,8 @@
         code_import_id, branch_url = self.getNewCodeImportIDAndBranchURL()
         user_browser = self.getUserBrowser(branch_url)
         self.startJob(code_import_id)
+        # user_browser fails when we are logged in.
+        logout()
         user_browser.getControl('Import Now').click()
         self.assertEqual(
             [u'The import is already running.'],

=== modified file 'lib/lp/services/tests/test_utils.py'
--- lib/lp/services/tests/test_utils.py	2010-04-12 05:52:41 +0000
+++ lib/lp/services/tests/test_utils.py	2010-07-17 19:03:44 +0000
@@ -5,10 +5,16 @@
 
 __metaclass__ = type
 
+from contextlib import contextmanager
 import itertools
 import unittest
 
-from lp.services.utils import CachingIterator, iter_split
+from lp.services.utils import (
+    CachingIterator,
+    decorate_with,
+    iter_split,
+    run_with,
+    )
 from lp.testing import TestCase
 
 
@@ -67,5 +73,126 @@
         self.assertEqual([1,2,3,4], list(i1))
 
 
+class TestRunWithContextManager(TestCase):
+    """Tests for `run_with`."""
+
+    def setUp(self):
+        super(TestRunWithContextManager, self).setUp()
+        self.trivialContextManager = self._trivialContextManager()
+
+    @contextmanager
+    def _trivialContextManager(self):
+        """A trivial context manager, used for testing."""
+        yield
+
+    def test_run_with_calls_context(self):
+        # run_with runs with the context that it is passed.
+        calls = []
+        @contextmanager
+        def appending_twice():
+            calls.append('before')
+            yield
+            calls.append('after')
+        run_with(appending_twice(), lambda: None)
+        self.assertEquals(['before', 'after'], calls)
+
+    def test_run_with_calls_function(self):
+        # run_with calls the function that it has been passed.
+        calls = []
+        run_with(self.trivialContextManager, calls.append, 'foo')
+        self.assertEquals(['foo'], calls)
+
+    def test_run_with_passes_through_kwargs(self):
+        # run_with passes through keyword arguments.
+        calls = []
+        def append(*args, **kwargs):
+            calls.append((args, kwargs))
+        run_with(self.trivialContextManager, append, 'foo', 'bar', qux=4)
+        self.assertEquals([(('foo', 'bar'), {'qux': 4})], calls)
+
+    def test_run_with_returns_result(self):
+        # run_with returns the result of the function it's given.
+        arbitrary_value = self.factory.getUniqueString()
+        result = run_with(self.trivialContextManager, lambda: arbitrary_value)
+        self.assertEquals(arbitrary_value, result)
+
+    def test_run_with_bubbles_exceptions(self):
+        # run_with bubbles up exceptions.
+        self.assertRaises(
+            ZeroDivisionError,
+            run_with, self.trivialContextManager, lambda: 1/0)
+
+
+class TestDecorateWith(TestCase):
+    """Tests for `decorate_with`."""
+
+    def setUp(self):
+        super(TestDecorateWith, self).setUp()
+
+    @contextmanager
+    def trivialContextManager(self):
+        """A trivial context manager, used for testing."""
+        yield
+
+    def test_decorate_with_calls_context(self):
+        # When run, a function decorated with decorated_with runs with the
+        # context given to decorated_with.
+        calls = []
+        @contextmanager
+        def appending_twice():
+            calls.append('before')
+            yield
+            calls.append('after')
+        @decorate_with(appending_twice)
+        def function():
+            pass
+        function()
+        self.assertEquals(['before', 'after'], calls)
+
+    def test_decorate_with_function(self):
+        # The original function is actually called when we call the result of
+        # decoration.
+        calls = []
+        @decorate_with(self.trivialContextManager)
+        def function():
+            calls.append('foo')
+        function()
+        self.assertEquals(['foo'], calls)
+
+    def test_decorate_with_call_twice(self):
+        # A function decorated with decorate_with can be called twice.
+        calls = []
+        @decorate_with(self.trivialContextManager)
+        def function():
+            calls.append('foo')
+        function()
+        function()
+        self.assertEquals(['foo', 'foo'], calls)
+
+    def test_decorate_with_arguments(self):
+        # decorate_with passes through arguments.
+        calls = []
+        @decorate_with(self.trivialContextManager)
+        def function(*args, **kwargs):
+            calls.append((args, kwargs))
+        function('foo', 'bar', qux=4)
+        self.assertEquals([(('foo', 'bar'), {'qux': 4})], calls)
+
+    def test_decorate_with_name_and_docstring(self):
+        # decorate_with preserves function names and docstrings.
+        @decorate_with(self.trivialContextManager)
+        def arbitrary_name():
+            """Arbitrary docstring."""
+        self.assertEqual('arbitrary_name', arbitrary_name.__name__)
+        self.assertEqual('Arbitrary docstring.', arbitrary_name.__doc__)
+
+    def test_decorate_with_returns(self):
+        # decorate_with returns the original function's return value.
+        decorator = decorate_with(self.trivialContextManager)
+        arbitrary_value = self.getUniqueString()
+        result = decorator(lambda: arbitrary_value)()
+        self.assertEqual(arbitrary_value, result)
+
+
 def test_suite():
     return unittest.TestLoader().loadTestsFromName(__name__)

=== modified file 'lib/lp/services/utils.py'
--- lib/lp/services/utils.py	2010-04-12 10:47:13 +0000
+++ lib/lp/services/utils.py	2010-07-17 19:03:44 +0000
@@ -10,7 +10,9 @@
 __metaclass__ = type
 __all__ = [
     'CachingIterator',
+    'decorate_with',
     'iter_split',
+    'run_with',
     'synchronize',
     'text_delta',
     'value_string',
@@ -19,6 +21,8 @@
 import itertools
 
 from lazr.enum import BaseItem
+
+from twisted.python.util import mergeFunctionMetadata
 from zope.security.proxy import isinstance as zope_isinstance
 
 
@@ -141,3 +145,23 @@
                 break
             self.data.append(item)
             yield item
+
+
+def run_with(context, function, *args, **kwargs):
+    """Run 'function' with 'context'.
+
+    Runs the given function with arbitrary arguments and keyword arguments
+    with the given context. Returns the return value of 'function'.
+    """
+    with context:
+        return function(*args, **kwargs)
+
+
+def decorate_with(context_factory, *args, **kwargs):
+    """Create a decorator that runs decorated functions with 'context'."""
+    def decorator(function):
+        def decorated(*a, **kw):
+            with context_factory(*args, **kwargs):
+                return function(*a, **kw)
+        return mergeFunctionMetadata(function, decorated)
+    return decorator

=== modified file 'lib/lp/testing/__init__.py'
--- lib/lp/testing/__init__.py	2010-07-09 17:44:53 +0000
+++ lib/lp/testing/__init__.py	2010-07-17 19:03:44 +0000
@@ -13,6 +13,7 @@
     'build_yui_unittest_suite',
     'BrowserTestCase',
     'capture_events',
+    'celebrity_logged_in',
     'FakeTime',
     'get_lsb_information',
     'is_logged_in',
@@ -27,6 +28,7 @@
     'map_branch_contents',
     'normalize_whitespace',
     'oauth_access_token_for',
+    'person_logged_in',
     'record_statements',
     'run_with_login',
     'run_with_storm_debug',
@@ -39,6 +41,7 @@
     'validate_mock_class',
     'WindmillTestCase',
     'with_anonymous_login',
+    'with_celebrity_logged_in',
     'ws_object',
     'YUIUnitTestCase',
     'ZopeTestInSubProcess',
@@ -69,8 +72,6 @@
 import testtools
 import transaction
 
-from twisted.python.util import mergeFunctionMetadata
-
 from windmill.authoring import WindmillTestClient
 
 from zope.component import adapter, getUtility
@@ -85,15 +86,25 @@
 from canonical.config import config
 from canonical.launchpad.webapp.errorlog import ErrorReportEvent
 from canonical.launchpad.webapp.interaction import ANONYMOUS
-from canonical.launchpad.webapp.interfaces import ILaunchBag
 from canonical.launchpad.windmill.testing import constants
 from lp.codehosting.vfs import branch_id_to_path, get_rw_server
 from lp.registry.interfaces.packaging import IPackagingUtil
-# Import the login and logout functions here as it is a much better
+# Import the login helper functions here as it is a much better
 # place to import them from in tests.
 from lp.testing._login import (
-    is_logged_in, login, login_as, login_celebrity, login_person, login_team,
-    logout)
+    celebrity_logged_in,
+    is_logged_in,
+    login,
+    login_as,
+    login_celebrity,
+    login_person,
+    login_team,
+    logout,
+    person_logged_in,
+    run_with_login,
+    with_anonymous_login,
+    with_celebrity_logged_in,
+    )
 # canonical.launchpad.ftests expects test_tales to be imported from here.
 # XXX: JonathanLange 2010-01-01: Why?!
 from lp.testing._tales import test_tales
@@ -821,35 +832,6 @@
     return distinfo
 
 
-def with_anonymous_login(function):
-    """Decorate 'function' so that it runs in an anonymous login."""
-    def wrapped(*args, **kwargs):
-        login(ANONYMOUS)
-        try:
-            return function(*args, **kwargs)
-        finally:
-            logout()
-    return mergeFunctionMetadata(function, wrapped)
-
-
-@contextmanager
-def person_logged_in(person):
-    current_person = getUtility(ILaunchBag).user
-    logout()
-    login_person(person)
-    try:
-        yield
-    finally:
-        logout()
-        login_person(current_person)
-
-
-def run_with_login(person, function, *args, **kwargs):
-    """Run 'function' with 'person' logged in."""
-    with person_logged_in(person):
-        return function(*args, **kwargs)
-
-
 def time_counter(origin=None, delta=timedelta(seconds=5)):
     """A generator for yielding datetime values.
 

=== modified file 'lib/lp/testing/_login.py'
--- lib/lp/testing/_login.py	2010-07-13 09:56:16 +0000
+++ lib/lp/testing/_login.py	2010-07-17 19:03:44 +0000
@@ -6,6 +6,7 @@
 __metaclass__ = type
 
 __all__ = [
+    'celebrity_logged_in',
     'login',
     'login_as',
     'login_celebrity',
@@ -13,20 +14,26 @@
     'login_team',
     'logout',
     'is_logged_in',
+    'person_logged_in',
+    'run_with_login',
+    'with_anonymous_login',
+    'with_celebrity_logged_in',
     ]
 
+from contextlib import contextmanager
 import random
 
 from zope.component import getUtility
 from zope.security.management import endInteraction
-from zope.security.proxy import removeSecurityProxy
 
 from canonical.launchpad.interfaces.launchpad import ILaunchpadCelebrities
 from canonical.launchpad.webapp.interaction import (
     ANONYMOUS, setupInteractionByEmail, setupInteractionForPerson)
+from canonical.launchpad.webapp.interfaces import ILaunchBag
 from canonical.launchpad.webapp.servers import LaunchpadTestRequest
 from canonical.launchpad.webapp.vhosts import allvhosts
 
+from lp.services.utils import decorate_with
 
 
 _logged_in = False
@@ -141,3 +148,43 @@
     global _logged_in
     _logged_in = False
     endInteraction()
+
+
+def _with_login(login_method, identifier):
+    """Make a context manager that runs with a particular log in."""
+    current_person = getUtility(ILaunchBag).user
+    login_method(identifier)
+    try:
+        yield
+    finally:
+        login_person(current_person)
+
+
+@contextmanager
+def person_logged_in(person):
+    """Make a context manager for running logged in as 'person'.
+
+    :param person: A person, an account, a team or ANONYMOUS. If a team,
+        will log in as an arbitrary member of that team.
+    """
+    return _with_login(login_as, person)
+
+
+@contextmanager
+def celebrity_logged_in(celebrity_name):
+    """Make a context manager for running logged in as a celebrity."""
+    return _with_login(login_celebrity, celebrity_name)
+
+
+with_anonymous_login = decorate_with(person_logged_in, None)
+
+
+def with_celebrity_logged_in(celebrity_name):
+    """Decorate a function so that it's run with a celebrity logged in."""
+    return decorate_with(celebrity_logged_in, celebrity_name)
+
+
+def run_with_login(person, function, *args, **kwargs):
+    """Run 'function' with 'person' logged in."""
+    with person_logged_in(person):
+        return function(*args, **kwargs)

=== modified file 'lib/lp/testing/factory.py'
--- lib/lp/testing/factory.py	2010-07-16 18:15:44 +0000
+++ lib/lp/testing/factory.py	2010-07-17 19:03:44 +0000
@@ -121,7 +121,7 @@
     ISourcePackage, SourcePackageUrgency)
 from lp.registry.interfaces.sourcepackagename import (
     ISourcePackageNameSet)
-from lp.registry.interfaces.ssh import ISSHKeySet, SSHKeyType
+from lp.registry.interfaces.ssh import ISSHKeySet
 from lp.registry.interfaces.distributionmirror import (
     MirrorContent, MirrorSpeed)
 from lp.registry.interfaces.pocket import PackagePublishingPocket
@@ -148,9 +148,15 @@
 from lp.soyuz.model.processor import ProcessorFamily, ProcessorFamilySet
 from lp.soyuz.model.publishing import (
     BinaryPackagePublishingHistory, SourcePackagePublishingHistory)
-
-from lp.testing import run_with_login, time_counter, login, logout, temp_dir
-
+from lp.testing import (
+    ANONYMOUS,
+    login,
+    login_as,
+    logout,
+    run_with_login,
+    temp_dir,
+    time_counter,
+    )
 from lp.translations.interfaces.potemplate import IPOTemplateSet
 from lp.translations.interfaces.translationimportqueue import (
     RosettaImportStatus)
@@ -341,6 +347,17 @@
             logout()
         return result
 
+    def loginAsAnyone(self):
+        """Log in as an arbitrary person.
+
+        If you want to log in as a celebrity, including admins, see
+        `lp.testing.login_celebrity`.
+        """
+        login(ANONYMOUS)
+        person = self.makePerson()
+        login_as(person)
+        return person
+
     def makeCopyArchiveLocation(self, distribution=None, owner=None,
         name=None, enabled=True):
         """Create and return a new arbitrary location for copy packages."""

=== modified file 'lib/lp/testing/sampledata.py'
--- lib/lp/testing/sampledata.py	2010-07-10 14:20:23 +0000
+++ lib/lp/testing/sampledata.py	2010-07-17 19:03:44 +0000
@@ -10,9 +10,7 @@
 __metaclass__ = type
 __all__ = [
     'NO_PRIVILEGE_EMAIL',
-    'VCS_IMPORTS_MEMBER_EMAIL',
     ]
 
 
 NO_PRIVILEGE_EMAIL = 'no-priv@xxxxxxxxxxxxx'
-VCS_IMPORTS_MEMBER_EMAIL = 'david.allouche@xxxxxxxxxxxxx'

=== modified file 'lib/lp/testing/tests/test_factory.py'
--- lib/lp/testing/tests/test_factory.py	2010-07-10 09:04:39 +0000
+++ lib/lp/testing/tests/test_factory.py	2010-07-17 19:03:44 +0000
@@ -7,6 +7,9 @@
 
 import unittest
 
+from zope.component import getUtility
+
+from canonical.launchpad.webapp.interfaces import ILaunchBag
 from canonical.testing.layers import DatabaseFunctionalLayer
 from lp.code.enums import CodeImportReviewStatus
 from lp.testing import TestCaseWithFactory
@@ -29,6 +32,13 @@
         code_import = self.factory.makeCodeImport(review_status=status)
         self.assertEqual(status, code_import.review_status)
 
+    def test_loginAsAnyone(self):
+        # Login as anyone logs you in as any user.
+        person = self.factory.loginAsAnyone()
+        current_person = getUtility(ILaunchBag).user
+        self.assertIsNot(None, person)
+        self.assertEqual(person, current_person)
+
 
 def test_suite():
     return unittest.TestLoader().loadTestsFromName(__name__)

=== modified file 'lib/lp/testing/tests/test_login.py'
--- lib/lp/testing/tests/test_login.py	2010-07-13 09:56:16 +0000
+++ lib/lp/testing/tests/test_login.py	2010-07-17 19:03:44 +0000
@@ -15,8 +15,19 @@
 from canonical.launchpad.webapp.interfaces import IOpenLaunchBag
 from canonical.testing.layers import DatabaseFunctionalLayer
 from lp.testing import (
-    ANONYMOUS, is_logged_in, login, login_as, login_celebrity, login_person,
-    login_team, logout)
+    ANONYMOUS,
+    celebrity_logged_in,
+    is_logged_in,
+    login,
+    login_as,
+    login_celebrity,
+    login_person,
+    login_team,
+    logout,
+    person_logged_in,
+    with_anonymous_login,
+    with_celebrity_logged_in,
+    )
 from lp.testing import TestCaseWithFactory
 
 
@@ -177,7 +188,7 @@
         login_celebrity('vcs_imports')
         vcs_imports = getUtility(ILaunchpadCelebrities).vcs_imports
         person = self.getLoggedInPerson()
-        self.assertTrue(person.inTeam, vcs_imports)
+        self.assertTrue(person.inTeam(vcs_imports))
 
     def test_login_nonexistent_celebrity(self):
         # login_celebrity raises ValueError when called with a non-existent
@@ -186,6 +197,77 @@
         e = self.assertRaises(ValueError, login_celebrity, 'nonexistent')
         self.assertEqual(str(e), "No such celebrity: 'nonexistent'")
 
+    def test_person_logged_in(self):
+        # The person_logged_in context manager runs with a person logged in.
+        person = self.factory.makePerson()
+        with person_logged_in(person):
+            self.assertLoggedIn(person)
+
+    def test_person_logged_in_restores_person(self):
+        # Once outside of the person_logged_in context, the originially
+        # logged-in person is re-logged in.
+        a = self.factory.makePerson()
+        login_as(a)
+        b = self.factory.makePerson()
+        with person_logged_in(b):
+            self.assertLoggedIn(b)
+        self.assertLoggedIn(a)
+
+    def test_person_logged_in_restores_person_even_when_raises(self):
+        # Once outside of the person_logged_in context, the originially
+        # logged-in person is re-logged in.
+        a = self.factory.makePerson()
+        login_as(a)
+        b = self.factory.makePerson()
+        try:
+            with person_logged_in(b):
+                1/0
+        except ZeroDivisionError:
+            pass
+        self.assertLoggedIn(a)
+
+    def test_team_logged_in(self):
+        # person_logged_in also works when given teams.
+        team = self.factory.makeTeam()
+        with person_logged_in(team):
+            person = self.getLoggedInPerson()
+        self.assertTrue(person.inTeam(team))
+
+    def test_celebrity_logged_in(self):
+        # celebrity_logged_in runs in a context where a celebrity is logged
+        # in.
+        vcs_imports = getUtility(ILaunchpadCelebrities).vcs_imports
+        with celebrity_logged_in('vcs_imports'):
+            person = self.getLoggedInPerson()
+        self.assertTrue(person.inTeam(vcs_imports))
+
+    def test_celebrity_logged_in_restores_person(self):
+        # Once outside of the celebrity_logged_in context, the originally
+        # logged-in person is re-logged in.
+        person = self.factory.makePerson()
+        login_as(person)
+        with celebrity_logged_in('vcs_imports'):
+            pass
+        self.assertLoggedIn(person)
+
+    def test_with_celebrity_logged_in(self):
+        # with_celebrity_logged_in decorates a function so that it runs with
+        # the given person logged in.
+        vcs_imports = getUtility(ILaunchpadCelebrities).vcs_imports
+        @with_celebrity_logged_in('vcs_imports')
+        def f():
+            return self.getLoggedInPerson()
+        person = f()
+        self.assertTrue(person.inTeam, vcs_imports)
+
+    def test_with_anonymous_log_in(self):
+        # with_anonymous_login logs in as the anonymous user.
+        @with_anonymous_login
+        def f():
+            return self.getLoggedInPerson()
+        person = f()
+        self.assertEqual(ANONYMOUS, person)
+
 
 def test_suite():
     return unittest.TestLoader().loadTestsFromName(__name__)