launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #20011
Re: [Merge] lp:~thomir/launchpad/devel-gpg-layer into lp:launchpad
Review: Needs Fixing code
Diff comments:
> === modified file 'lib/lp/services/gpg/configure.zcml'
> --- lib/lp/services/gpg/configure.zcml 2011-12-09 00:24:57 +0000
> +++ lib/lp/services/gpg/configure.zcml 2016-02-18 00:43:11 +0000
> @@ -13,12 +13,22 @@
> <allow interface="lp.services.gpg.interfaces.IGPGHandler" />
> </class>
>
> + <class class="lp.services.gpg.handler.GPGClient">
> + <allow interface="lp.services.gpg.interfaces.IGPGClient" />
> + </class>
> +
> <securedutility
> class="lp.services.gpg.handler.GPGHandler"
> provides="lp.services.gpg.interfaces.IGPGHandler">
> <allow interface="lp.services.gpg.interfaces.IGPGHandler" />
> </securedutility>
>
> + <securedutility
> + class="lp.services.gpg.handler.GPGClient"
> + provides="lp.services.gpg.interfaces.IGPGClient">
> + <allow interface="lp.services.gpg.interfaces.IGPGClient" />
> + </securedutility>
> +
Prefer to group ZCML by object rather than directive. So the two GPGClient bits together, and the two GPGHandler bits together.
> <class class="lp.services.gpg.handler.PymeSignature">
> <allow interface="lp.services.gpg.interfaces.IPymeSignature" />
> </class>
>
> === modified file 'lib/lp/services/gpg/handler.py'
> --- lib/lp/services/gpg/handler.py 2015-09-26 02:55:36 +0000
> +++ lib/lp/services/gpg/handler.py 2016-02-18 00:43:11 +0000
> @@ -619,3 +615,126 @@
> self.name = uid.name
> self.email = uid.email
> self.comment = uid.comment
> +
> +
> +def sanitize_fingerprint(fingerprint):
> + """Sanitize a GPG Fingerprint.
> +
> + This is the ultimate implementation of IGPGHandler.sanitizeFingerprint, and
> + is also used by the IGPGClient implementation.
> +
> + """
Extra blank line, plus s/Fingerprint/fingerprint/ in the first line.
> + # remove whitespaces, truncate to max of 40 (as per v4 keys) and
> + # convert to upper case
> + fingerprint = fingerprint.replace(' ', '')
> + fingerprint = fingerprint[:40].upper()
> +
> + if not valid_fingerprint(fingerprint):
> + return None
> +
> + return fingerprint
> +
> +
> +def sanitize_fingerprint_or_raise(fingerprint):
> + """Check the sanity of 'fingerprint'.
> +
> + If 'fingerprint' is a valid fingerprint, the sanitised version will be
> + returned (see sanitize_fingerprint).
> +
> + Otherwise, a ValueError will be raised.
> +
> + """
> + sane_fingerprint = sanitize_fingerprint(fingerprint)
> + if sane_fingerprint is None:
> + raise ValueError("Invalid fingerprint: %r." % fingerprint)
> + return sane_fingerprint
> +
> +
> +@implementer(IGPGClient)
> +class GPGClient:
> + """See IGPGClient."""
> +
> + def __init__(self):
> + self.write_hooks = set()
> +
> + def get_keys_for_owner(self, owner_id):
LP universally uses lowerCamelCase for method names, inherited from all of Zope which predates PEP 8.
> + """See IGPGClient."""
> + try:
> + conn = httplib.HTTPConnection(config.gpgservice.api_endpoint)
> + conn.request('GET', '/users/%s/keys' % self._encode_owner_id(owner_id))
> + resp = conn.getresponse()
> + if resp.status != httplib.OK:
> + self.raise_for_error(resp)
> + return json.load(resp)
We use requests rather than httplib in GitHostingClient and WebhookClient, since it's cleaner and has built-in JSON send/receive support. There are also patterns in those utilities for timeouts, which we need here.
> + except socket.error:
> + raise
This seems like a no-op.
> + finally:
> + conn.close()
> +
> + def add_key_for_owner(self, owner_id, fingerprint):
> + """See IGPGClient."""
> + fingerprint = sanitize_fingerprint_or_raise(fingerprint)
> + try:
> + conn = httplib.HTTPConnection(config.gpgservice.api_endpoint)
> + headers = {
> + 'Content-Type': 'application/json'
> + }
> + path = '/users/%s/keys' % self._encode_owner_id(owner_id)
> + body = json.dumps(dict(fingerprint=fingerprint))
> + conn.request('POST', path, body, headers)
> + resp = conn.getresponse()
> + if resp.status == httplib.CREATED:
> + self._notify_writes()
> + else:
> + self.raise_for_error(resp)
> + except socket.error:
> + raise
> + finally:
> + conn.close()
> +
> + def disable_key_for_owner(self, owner_id, fingerprint):
> + """See IGPGClient."""
> + fingerprint = sanitize_fingerprint_or_raise(fingerprint)
> + try:
> + conn = httplib.HTTPConnection(config.gpgservice.api_endpoint)
> + path = '/users/%s/keys/%s' % (self._encode_owner_id(owner_id), fingerprint)
> + conn.request('DELETE', path)
> + resp = conn.getresponse()
> + if resp.status == httplib.OK:
> + self._notify_writes()
> + else:
> + self.raise_for_error(resp)
> + except socket.error:
> + raise
> + finally:
> + conn.close()
> +
> + def register_write_hook(self, hook_callable):
> + """See IGPGClient."""
> + if not callable(hook_callable):
> + raise TypeError("'hook_callable' parameter must be a callable.")
> + self.write_hooks.add(hook_callable)
> +
> + def deregister_write_hook(self, hook_callable):
We conventionally prefer "unregister" over "deregister".
> + """See IGPGClient."""
> + if hook_callable not in self.write_hooks:
> + raise ValueError("%r not registered.")
> + self.write_hooks.remove(hook_callable)
> +
> + def _notify_writes(self):
> + for hook in self.write_hooks:
> + try:
> + hook()
> + except:
> + pass
Swallowing exceptions entirely is forbidden, but I'm not quite sure why this would even want to ignore them at all.
> +
> + def _encode_owner_id(self, owner_id):
> + return base64.b64encode(owner_id, altchars='-_')
> +
> + def raise_for_error(self, response):
> + """Raise GPGServiceException based on what's in 'response'."""
> + if response.getheader('Content-Type') == 'application/json':
> + message = json.load(response)['status']
> + else:
> + message = "Unhandled service error:\n" + response.read()
> + raise GPGServiceException(message)
This squashes the useful HTTP status code, and could also make logs Very Large.
>
> === modified file 'lib/lp/services/gpg/interfaces.py'
> --- lib/lp/services/gpg/interfaces.py 2016-02-10 00:51:55 +0000
> +++ lib/lp/services/gpg/interfaces.py 2016-02-18 00:43:11 +0000
> @@ -421,3 +423,57 @@
> name = Attribute("The name portion of this user ID")
> email = Attribute("The email portion of this user ID")
> comment = Attribute("The comment portion of this user ID")
> +
> +
> +class GPGServiceException(Exception):
> +
> + """Raised when we get an error from the gpgservice.
> +
> + More specific errors for commonly encountered errors may be added once we
> + actually integrate gpgservice with the rest of launchpad.
> +
> + """
Extra line again.
> +
> +
> +class IGPGClient(Interface):
> +
> + """A client for querying a gpgservice instance."""
> +
> + def get_keys_for_owner(owner_id):
> + """Get a list of keys for a given owner.
> +
> + :raises GPGServiceException: If we get an error from the gpgservice.
> + :raises socket.error" on socket-level errors (connection timeouts etc)
> + """
> +
> + def add_key_for_owner(owner_id, fingerprint):
> + """Add a GPG key.
> +
> + :raises ValueError: if the fingerprint isn't valid.
> + :raises GPGServiceException: If we get an error from the gpgservice.
> + :raises socket.error" on socket-level errors (connection timeouts etc)
> + """
> +
> + def disable_key_for_owner(owner_id, fingerprint):
> + """Disable a GPG key.
> +
> + :raises ValueError: if the fingerprint isn't valid.
> + :raises GPGServiceException: If we get an error from the gpgservice.
> + :raises socket.error" on socket-level errors (connection timeouts etc)
> + """
> +
> + def register_write_hook(hook_callable):
> + """Register a write hook.
> +
> + The hook_callable will be called with no arguments whenever an operation
> + is performed that modifies the GPG database.
> +
> + :raises TypeError: if hook_callable is not a callable.
> + :raises GPGServiceException: If we get an error from the gpgservice.
> + """
> +
> + def deregister_write_hook(hook_callable):
> + """Deregister a write hook that was registered with register_write_hook.
> +
> + :raises ValueError: if hook_callable was not registered.
> + """
>
> === modified file 'lib/lp/services/gpg/tests/test_gpghandler.py'
> --- lib/lp/services/gpg/tests/test_gpghandler.py 2015-09-26 02:55:36 +0000
> +++ lib/lp/services/gpg/tests/test_gpghandler.py 2016-02-18 00:43:11 +0000
> @@ -1,14 +1,30 @@
> # Copyright 2009-2015 Canonical Ltd. This software is licensed under the
> # GNU Affero General Public License version 3 (see the file LICENSE).
> +import random
Missing blank line before imports.
> +import string
>
> +from testtools.matchers import (
> + Contains,
> + Equals,
> + HasLength,
> + Not,
> + raises,
> + )
> from zope.component import getUtility
> from zope.security.proxy import removeSecurityProxy
>
> +from lp.services.config.fixture import (
> + ConfigFixture,
> + ConfigUseFixture,
> + )
> from lp.services.gpg.interfaces import (
> GPGKeyDoesNotExistOnServer,
> GPGKeyTemporarilyNotFoundError,
> + GPGServiceException,
> + IGPGClient,
> IGPGHandler,
> )
> +from lp.services.gpg.handler import GPGClient
handler < interfaces. utilities/format-new-and-modified-imports automates correct sorting.
> from lp.services.log.logger import BufferLogger
> from lp.services.timeout import (
> get_default_timeout_function,
> @@ -184,3 +206,131 @@
> self.assertRaises(
> GPGKeyDoesNotExistOnServer,
> removeSecurityProxy(self.gpg_handler)._getPubKey, fingerprint)
> +
> +
> +class GPGClientTests(TestCase):
> +
> + layer = GPGServiceLayer
> +
> + def test_can_get_utility(self):
> + client = getUtility(IGPGClient)
> + self.assertIsNot(None, client)
> +
> + def get_random_owner_id_string(self):
> + """Get a random string that's representative of the owner id scheme."""
> + candidates = string.ascii_lowercase + string.digits
> + openid_id = ''.join((random.choice(candidates) for i in range(6)))
> + return 'https://login.ubuntu.com/+id/' + openid_id
Production domains in tests make things hard to grep for and can get us into unconfigurable corners or tests and dev instances that accidentally hit prod.
Tests use http://testopenid.dev/ as their OpenID provider, but this test could use any domain at all.
> +
> + def test_get_key_for_user_with_sampledata(self):
> + client = getUtility(IGPGClient)
> + data = client.get_keys_for_owner('name16_oid')
> + self.assertThat(data, Contains('keys'))
> + self.assertThat(data['keys'], HasLength(1))
ContainsDict({'keys': HasLength(1)}) is slightly cleaner and makes test failures much more useful.
> +
> + def test_get_key_for_unknown_user(self):
> + client = getUtility(IGPGClient)
> + user = self.get_random_owner_id_string()
> + data = client.get_keys_for_owner(user)
> + self.assertThat(data, Contains('keys'))
> + self.assertThat(data['keys'], HasLength(0))
> +
> + def test_register_non_callable_raises_TypeError(self):
> + client = getUtility(IGPGClient)
> + self.assertThat(
> + lambda: client.register_write_hook("not a callable"),
> + raises(TypeError))
I find "with ExpectedException" to be cleaner for some things, but happy either way.
> +
> + def test_deregister_with_unregstered_hook_raises_ValueError(self):
unregstered
> + client = getUtility(IGPGClient)
> + self.assertThat(
> + lambda: client.deregister_write_hook("not registerred"),
> + raises(ValueError))
> +
> + def test_can_deregister_registerred_write_hook(self):
registerred
> + client = getUtility(IGPGClient)
> + hook = CallableTracker()
lp.testing.fakemethod.FakeMethod probably does what you need.
> + client.register_write_hook(hook)
> + client.deregister_write_hook(hook)
> +
> + self.assertThat(
> + lambda: client.deregister_write_hook(hook),
> + raises(ValueError))
> +
> + def test_can_add_new_fingerprint_for_user(self):
> + self.useFixture(KeyServerTac())
> + client = getUtility(IGPGClient)
> + fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
> + user = self.get_random_owner_id_string()
> + client.add_key_for_owner(user, fingerprint)
> + data = client.get_keys_for_owner(user)
> + keys = data['keys']
> + self.assertThat(keys, HasLength(1))
> + self.assertThat(keys[0]['fingerprint'], Equals(fingerprint))
> + self.assertThat(keys[0]['enabled'], Equals(True))
ContainsDict would be lovely here.
> +
> + def test_adding_fingerprint_notifies_writes(self):
> + self.useFixture(KeyServerTac())
> + client = getUtility(IGPGClient)
> + hook = CallableTracker()
> + client.register_write_hook(hook)
> + self.addCleanup(client.deregister_write_hook, hook)
> + fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
> + user = self.get_random_owner_id_string()
> + client.add_key_for_owner(user, fingerprint)
> +
> + self.assertTrue(hook.called)
> +
> + def test_adding_invalid_fingerprint_raises_ValueError(self):
> + client = getUtility(IGPGClient)
> + self.assertThat(
> + lambda: client.add_key_for_owner(self.get_random_owner_id_string(), ''),
> + raises(ValueError("Invalid fingerprint: ''.")))
> +
> + def test_adding_duplicate_fingerprint_raises_GPGServiceException(self):
> + self.useFixture(KeyServerTac())
> + client = getUtility(IGPGClient)
> + fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
> + user_one = self.get_random_owner_id_string()
> + user_two = self.get_random_owner_id_string()
> + client.add_key_for_owner(user_one, fingerprint)
> + self.assertThat(
> + lambda: client.add_key_for_owner(user_two, fingerprint),
> + raises(GPGServiceException("Error: Fingerprint already in database.")))
This needs to be a more specific exception, but that can always be fixed later.
> +
> + def test_disabling_active_key(self):
> + client = getUtility(IGPGClient)
> + fingerprint = 'ABCDEF0123456789ABCDDCBA0000111112345678'
We don't use sampledata in new tests if we can avoid it. It hides test setup and makes sampledata very hard to change or remove later. So please add a new fingerprint here, then disable it.
> + client.disable_key_for_owner('name16_oid', fingerprint)
> + data = client.get_keys_for_owner('name16_oid')
> + keys = data['keys']
> +
> + self.assertThat(keys, HasLength(1))
> + self.assertThat(keys[0]['enabled'], Equals(False))
> +
> + def test_disabling_key_notifies_writes(self):
> + client = getUtility(IGPGClient)
> + hook = CallableTracker()
> + client.register_write_hook(hook)
> + self.addCleanup(client.deregister_write_hook, hook)
> + fingerprint = 'ABCDEF0123456789ABCDDCBA0000111112345678'
> + client.disable_key_for_owner('name16_oid', fingerprint)
> +
> + self.assertTrue(hook.called)
> +
> + def test_disabling_invalid_fingerprint_raises_ValueError(self):
> + client = getUtility(IGPGClient)
> + self.assertThat(
> + lambda: client.disable_key_for_owner(self.get_random_owner_id_string(), ''),
> + raises(ValueError("Invalid fingerprint: ''."))
> + )
> +
> +
> +class CallableTracker:
> +
> + def __init__(self):
> + self.called = False
> + self.call_params = []
> +
> + def __call__(self):
> + self.called = True
>
> === modified file 'lib/lp/testing/layers.py'
> --- lib/lp/testing/layers.py 2015-12-16 11:48:10 +0000
> +++ lib/lp/testing/layers.py 2016-02-18 00:43:11 +0000
> @@ -1161,6 +1164,47 @@
> logout()
>
>
> +class GPGServiceLayer(ZopelessLayer):
GPGServerLayer can't inherit ZopelessLayer, as it's also needed in LaunchpadFunctionalLayer which uses FunctionalLayer rather than ZopelessLayer.
> +
> + service_fixture = None
> + gpgservice_needs_reset = False
> +
> + @classmethod
> + @profiled
> + def setUp(cls):
> + gpg_client = getUtility(IGPGClient)
> + gpg_client.register_write_hook(cls._on_gpgservice_write)
> + cls.service_fixture = GPGKeyServiceFixture(BaseLayer.config_fixture)
> + cls.service_fixture.setUp()
> +
> + @classmethod
> + @profiled
> + def tearDown(cls):
> + # ZopelessLayer logs us out, but then we can't deregister out write hook
> + login(ANONYMOUS)
> + gpg_client = getUtility(IGPGClient)
I'd personally use removeSecurityProxy here to avoid potential catastrophic testrunner explosions if the DB or authz systems are in a bad state during tearDown.
login/logout should work in normal cases, but it will be really messy when something goes wrong, and layer tearDown is a very widely used place to have mess.
> + gpg_client.deregister_write_hook(cls._on_gpgservice_write)
> + cls.service_fixture.cleanUp()
> + cls.service_fixture = None
> + logout()
> +
> + @classmethod
> + @profiled
> + def testSetUp(cls):
> + if cls.gpgservice_needs_reset:
> + cls.service_fixture.reset_service_database()
> + cls.gpgservice_needs_reset = False
Why testSetUp over testTearDown?
> +
> + @classmethod
> + @profiled
> + def testTearDown(cls):
> + pass
> +
> + @classmethod
> + def _on_gpgservice_write(cls):
> + cls.gpgservice_needs_reset = True
> +
> +
> class TwistedLayer(BaseLayer):
> """A layer for cleaning up the Twisted thread pool."""
>
--
https://code.launchpad.net/~thomir/launchpad/devel-gpg-layer/+merge/286440
Your team Launchpad code reviewers is subscribed to branch lp:launchpad.
References