← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/launchpad/handle-more-gpg-exceptions into lp:launchpad

 

Colin Watson has proposed merging lp:~cjwatson/launchpad/handle-more-gpg-exceptions into lp:launchpad.

Commit message:
Handle more GPG exceptions when verifying code of conduct signatures or authenticating incoming email.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/handle-more-gpg-exceptions/+merge/371763
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/handle-more-gpg-exceptions into lp:launchpad.
=== modified file 'lib/lp/registry/model/codeofconduct.py'
--- lib/lp/registry/model/codeofconduct.py	2018-05-06 08:52:34 +0000
+++ lib/lp/registry/model/codeofconduct.py	2019-08-23 19:42:27 +0000
@@ -43,6 +43,7 @@
     )
 from lp.services.gpg.interfaces import (
     GPGKeyExpired,
+    GPGKeyNotFoundError,
     GPGVerificationError,
     IGPGHandler,
     )
@@ -272,7 +273,7 @@
 
         try:
             sig = gpghandler.getVerifiedSignature(sane_signedcode)
-        except (GPGVerificationError, GPGKeyExpired) as e:
+        except (GPGVerificationError, GPGKeyExpired, GPGKeyNotFoundError) as e:
             return str(e)
 
         if not sig.fingerprint:

=== added file 'lib/lp/registry/tests/test_codeofconduct.py'
--- lib/lp/registry/tests/test_codeofconduct.py	1970-01-01 00:00:00 +0000
+++ lib/lp/registry/tests/test_codeofconduct.py	2019-08-23 19:42:27 +0000
@@ -0,0 +1,194 @@
+# Copyright 2019 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Test codes of conduct."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+
+from textwrap import dedent
+
+from testtools.matchers import (
+    ContainsDict,
+    Equals,
+    MatchesRegex,
+    )
+from zope.component import getUtility
+
+from lp.registry.interfaces.codeofconduct import (
+    ICodeOfConductSet,
+    ISignedCodeOfConductSet,
+    )
+from lp.services.config import config
+from lp.services.gpg.handler import PymeSignature
+from lp.services.gpg.interfaces import (
+    GPGKeyExpired,
+    GPGKeyNotFoundError,
+    GPGVerificationError,
+    IGPGHandler,
+    )
+from lp.services.mail.sendmail import format_address
+from lp.testing import TestCaseWithFactory
+from lp.testing.fixture import ZopeUtilityFixture
+from lp.testing.layers import ZopelessDatabaseLayer
+
+
+class FakePymeKey:
+
+    def __init__(self, fingerprint):
+        self.fingerprint = fingerprint
+
+
+class FakeGPGHandlerBadSignature:
+
+    def getVerifiedSignature(self, content, signature=None):
+        raise GPGVerificationError("Bad signature.")
+
+
+class FakeGPGHandlerExpired:
+
+    def __init__(self, key):
+        self.key = key
+
+    def getVerifiedSignature(self, content, signature=None):
+        raise GPGKeyExpired(self.key)
+
+
+class FakeGPGHandlerNotFound:
+
+    def __init__(self, fingerprint):
+        self.fingerprint = fingerprint
+
+    def getVerifiedSignature(self, content, signature=None):
+        raise GPGKeyNotFoundError(self.fingerprint)
+
+
+class FakeGPGHandlerGood:
+
+    def __init__(self, signature):
+        self.signature = signature
+
+    def getVerifiedSignature(self, content, signature=None):
+        return self.signature
+
+
+class TestSignedCodeOfConductSet(TestCaseWithFactory):
+
+    layer = ZopelessDatabaseLayer
+
+    def test_verifyAndStore_bad_signature(self):
+        self.useFixture(ZopeUtilityFixture(
+            FakeGPGHandlerBadSignature(), IGPGHandler))
+        user = self.factory.makePerson()
+        self.assertEqual(
+            "Bad signature.",
+            getUtility(ISignedCodeOfConductSet).verifyAndStore(
+                user, "signed data"))
+
+    def test_verifyAndStore_expired(self):
+        key = FakePymeKey("0" * 40)
+        self.useFixture(ZopeUtilityFixture(
+            FakeGPGHandlerExpired(key), IGPGHandler))
+        user = self.factory.makePerson()
+        self.assertEqual(
+            "%s has expired" % key.fingerprint,
+            getUtility(ISignedCodeOfConductSet).verifyAndStore(
+                user, "signed data"))
+
+    def test_verifyAndStore_not_found(self):
+        fingerprint = "0" * 40
+        self.useFixture(ZopeUtilityFixture(
+            FakeGPGHandlerNotFound(fingerprint), IGPGHandler))
+        user = self.factory.makePerson()
+        self.assertEqual(
+            "No GPG key found with the given content: %s" % fingerprint,
+            getUtility(ISignedCodeOfConductSet).verifyAndStore(
+                user, "signed data"))
+
+    def test_verifyAndStore_unregistered(self):
+        fingerprint = "0" * 40
+        signature = PymeSignature(fingerprint, "plain data")
+        self.useFixture(ZopeUtilityFixture(
+            FakeGPGHandlerGood(signature), IGPGHandler))
+        user = self.factory.makePerson()
+        self.assertThat(
+            getUtility(ISignedCodeOfConductSet).verifyAndStore(
+                user, "signed data"),
+            MatchesRegex(r"^The key you used.*is not registered"))
+
+    def test_verifyAndStore_wrong_owner(self):
+        other_user = self.factory.makePerson()
+        gpgkey = self.factory.makeGPGKey(other_user)
+        signature = PymeSignature(gpgkey.fingerprint, "plain data")
+        self.useFixture(ZopeUtilityFixture(
+            FakeGPGHandlerGood(signature), IGPGHandler))
+        user = self.factory.makePerson()
+        self.assertThat(
+            getUtility(ISignedCodeOfConductSet).verifyAndStore(
+                user, "signed data"),
+            MatchesRegex(r"^You.*do not seem to be the owner"))
+
+    def test_verifyAndStore_deactivated(self):
+        user = self.factory.makePerson()
+        gpgkey = self.factory.makeGPGKey(user)
+        gpgkey.active = False
+        signature = PymeSignature(gpgkey.fingerprint, "plain data")
+        self.useFixture(ZopeUtilityFixture(
+            FakeGPGHandlerGood(signature), IGPGHandler))
+        self.assertThat(
+            getUtility(ISignedCodeOfConductSet).verifyAndStore(
+                user, "signed data"),
+            MatchesRegex(r"^The OpenPGP key used.*has been deactivated"))
+
+    def test_verifyAndStore_bad_plain_data(self):
+        user = self.factory.makePerson()
+        gpgkey = self.factory.makeGPGKey(user)
+        signature = PymeSignature(gpgkey.fingerprint, "plain data")
+        self.useFixture(ZopeUtilityFixture(
+            FakeGPGHandlerGood(signature), IGPGHandler))
+        self.assertThat(
+            getUtility(ISignedCodeOfConductSet).verifyAndStore(
+                user, "signed data"),
+            MatchesRegex(
+                r"^The signed text does not match the Code of Conduct"))
+
+    def test_verifyAndStore_good(self):
+        user = self.factory.makePerson()
+        gpgkey = self.factory.makeGPGKey(user)
+        signature = PymeSignature(
+            gpgkey.fingerprint,
+            getUtility(ICodeOfConductSet).current_code_of_conduct.content)
+        self.useFixture(ZopeUtilityFixture(
+            FakeGPGHandlerGood(signature), IGPGHandler))
+        self.assertIsNone(
+            getUtility(ISignedCodeOfConductSet).verifyAndStore(
+                user, "signed data"))
+        [notification] = self.assertEmailQueueLength(1)
+        self.assertThat(dict(notification), ContainsDict({
+            "From": Equals(format_address(
+                "Launchpad Code Of Conduct System",
+                config.canonical.noreply_from_address)),
+            "To": Equals(user.preferredemail.email),
+            "Subject": Equals(
+                "Your Code of Conduct signature has been acknowledged"),
+            }))
+        self.assertEqual(
+            dedent("""\
+
+                Hello
+
+                Your Code of Conduct Signature was modified.
+
+                User: '%(user)s'
+                Digitally Signed by %(fingerprint)s
+
+
+                Thanks,
+
+                The Launchpad Team
+                """) % {
+                    'user': user.display_name,
+                    'fingerprint': gpgkey.fingerprint,
+                    },
+            notification.get_payload(decode=True))

=== modified file 'lib/lp/services/gpg/doc/gpg-signatures.txt'
--- lib/lp/services/gpg/doc/gpg-signatures.txt	2019-03-07 15:05:17 +0000
+++ lib/lp/services/gpg/doc/gpg-signatures.txt	2019-08-23 19:42:27 +0000
@@ -42,7 +42,7 @@
     ... -----END PGP SIGNATURE-----
     ... """
 
-    >>> master_sig = gpghandler.verifySignature(content)
+    >>> master_sig = gpghandler.getVerifiedSignature(content)
     >>> print master_sig.fingerprint
     A419AE861E88BC9E04B9C26FBA2B9389DFD20543
 
@@ -62,7 +62,7 @@
     ... """
     >>>
 
-    >>> subkey_sig = gpghandler.verifySignature(content)
+    >>> subkey_sig = gpghandler.getVerifiedSignature(content)
     >>> print subkey_sig.fingerprint
     A419AE861E88BC9E04B9C26FBA2B9389DFD20543
 

=== modified file 'lib/lp/services/gpg/handler.py'
--- lib/lp/services/gpg/handler.py	2019-03-22 19:22:54 +0000
+++ lib/lp/services/gpg/handler.py	2019-08-23 19:42:27 +0000
@@ -130,15 +130,6 @@
             if os.path.exists(filename):
                 os.remove(filename)
 
-    def verifySignature(self, content, signature=None):
-        """See IGPGHandler."""
-        try:
-            return self.getVerifiedSignature(content, signature)
-        except (GPGVerificationError, GPGKeyExpired):
-            # Swallow GPG verification errors
-            pass
-        return None
-
     def getVerifiedSignatureResilient(self, content, signature=None):
         """See IGPGHandler."""
         errors = []

=== modified file 'lib/lp/services/gpg/interfaces.py'
--- lib/lp/services/gpg/interfaces.py	2019-03-22 19:22:54 +0000
+++ lib/lp/services/gpg/interfaces.py	2019-08-23 19:42:27 +0000
@@ -223,13 +223,6 @@
         If the firgerprint cannot be sanitized return None.
         """
 
-    def verifySignature(content, signature=None):
-        """See `getVerifiedSignature`.
-
-        Suppress all exceptions and simply return None if the could not
-        be verified.
-        """
-
     def getURLForKeyInServer(fingerprint, action=None, public=False):
         """Return the URL for that fingerprint on the configured keyserver.
 
@@ -263,6 +256,7 @@
 
         :raise GPGVerificationError: if the signature cannot be verified.
         :raise GPGKeyExpired: if the signature was made with an expired key.
+        :raise GPGKeyNotFoundError: if the key was not found on the keyserver.
         :return: a `PymeSignature` object.
         """
 

=== modified file 'lib/lp/services/mail/incoming.py'
--- lib/lp/services/mail/incoming.py	2019-07-25 15:00:18 +0000
+++ lib/lp/services/mail/incoming.py	2019-08-23 19:42:27 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Functions dealing with mails coming into Launchpad."""
@@ -26,6 +26,8 @@
 from lp.registry.interfaces.person import IPerson
 from lp.services.features import getFeatureFlag
 from lp.services.gpg.interfaces import (
+    GPGKeyExpired,
+    GPGKeyNotFoundError,
     GPGVerificationError,
     IGPGHandler,
     )
@@ -314,8 +316,8 @@
         sig = gpghandler.getVerifiedSignature(
             canonicalise_line_endings(mail.signedContent), signature)
         log.debug("got signature %r" % sig)
-    except GPGVerificationError as e:
-        # verifySignature failed to verify the signature.
+    except (GPGVerificationError, GPGKeyExpired, GPGKeyNotFoundError) as e:
+        # getVerifiedSignature failed to verify the signature.
         message = "Signature couldn't be verified: %s" % e
         log.debug(message)
         raise InvalidSignature(message)

=== modified file 'lib/lp/services/mail/tests/test_incoming.py'
--- lib/lp/services/mail/tests/test_incoming.py	2016-03-23 17:55:39 +0000
+++ lib/lp/services/mail/tests/test_incoming.py	2019-08-23 19:42:27 +0000
@@ -17,6 +17,11 @@
 from zope.security.management import setSecurityPolicy
 
 from lp.services.config import config
+from lp.services.gpg.interfaces import (
+    GPGKeyExpired,
+    GPGKeyNotFoundError,
+    IGPGHandler,
+    )
 from lp.services.log.logger import BufferLogger
 from lp.services.mail import helpers
 from lp.services.mail.handlers import mail_handlers
@@ -34,6 +39,7 @@
 from lp.testing import TestCaseWithFactory
 from lp.testing.dbuser import switch_dbuser
 from lp.testing.factory import GPGSigningContext
+from lp.testing.fixture import ZopeUtilityFixture
 from lp.testing.gpgkeys import import_secret_test_key
 from lp.testing.layers import LaunchpadZopelessLayer
 from lp.testing.mail_helpers import pop_notifications
@@ -52,6 +58,30 @@
         return True
 
 
+class FakePymeKey:
+
+    def __init__(self, fingerprint):
+        self.fingerprint = fingerprint
+
+
+class FakeGPGHandlerExpired:
+
+    def __init__(self, key):
+        self.key = key
+
+    def getVerifiedSignature(self, content, signature=None):
+        raise GPGKeyExpired(self.key)
+
+
+class FakeGPGHandlerNotFound:
+
+    def __init__(self, fingerprint):
+        self.fingerprint = fingerprint
+
+    def getVerifiedSignature(self, content, signature=None):
+        raise GPGKeyNotFoundError(self.fingerprint)
+
+
 class IncomingTestCase(TestCaseWithFactory):
 
     layer = LaunchpadZopelessLayer
@@ -86,6 +116,72 @@
             "(7, 58, u'No data')",
             body)
 
+    def test_expired_key(self):
+        """An expired key should not be handled as an OOPS.
+
+        It should produce a message explaining to the user what went wrong.
+        """
+        person = self.factory.makePerson()
+        key = FakePymeKey("0" * 40)
+        self.useFixture(ZopeUtilityFixture(
+            FakeGPGHandlerExpired(key), IGPGHandler))
+        transaction.commit()
+        email_address = person.preferredemail.email
+        invalid_body = (
+            '-----BEGIN PGP SIGNED MESSAGE-----\n'
+            'Hash: SHA1\n\n'
+            'Body\n'
+            '-----BEGIN PGP SIGNATURE-----\n'
+            'Not a signature.\n'
+            '-----END PGP SIGNATURE-----\n')
+        ctrl = MailController(
+            email_address, 'to@xxxxxxxxxxx', 'subject', invalid_body,
+            bulk=False)
+        ctrl.send()
+        handleMail()
+        self.assertEqual([], self.oopses)
+        [notification] = pop_notifications()
+        body = notification.get_payload()[0].get_payload(decode=True)
+        self.assertIn(
+            "An error occurred while processing a mail you sent to "
+            "Launchpad's email\ninterface.\n\n\n"
+            "Error message:\n\nSignature couldn't be verified: "
+            "%s\nhas expired" % key.fingerprint,
+            body)
+
+    def test_key_not_found(self):
+        """A key not found on the keyserver should not be handled as an OOPS.
+
+        It should produce a message explaining to the user what went wrong.
+        """
+        person = self.factory.makePerson()
+        fingerprint = "0" * 40
+        self.useFixture(ZopeUtilityFixture(
+            FakeGPGHandlerNotFound(fingerprint), IGPGHandler))
+        transaction.commit()
+        email_address = person.preferredemail.email
+        invalid_body = (
+            '-----BEGIN PGP SIGNED MESSAGE-----\n'
+            'Hash: SHA1\n\n'
+            'Body\n'
+            '-----BEGIN PGP SIGNATURE-----\n'
+            'Not a signature.\n'
+            '-----END PGP SIGNATURE-----\n')
+        ctrl = MailController(
+            email_address, 'to@xxxxxxxxxxx', 'subject', invalid_body,
+            bulk=False)
+        ctrl.send()
+        handleMail()
+        self.assertEqual([], self.oopses)
+        [notification] = pop_notifications()
+        body = notification.get_payload()[0].get_payload(decode=True)
+        self.assertIn(
+            "An error occurred while processing a mail you sent to "
+            "Launchpad's email\ninterface.\n\n\n"
+            "Error message:\n\nSignature couldn't be verified: "
+            "No GPG key found with the given content:\n%s" % fingerprint,
+            body)
+
     def test_mail_too_big(self):
         """Much-too-big mail should generate a bounce, not an OOPS.
 

=== modified file 'lib/lp/soyuz/doc/fakepackager.txt'
--- lib/lp/soyuz/doc/fakepackager.txt	2019-04-28 17:14:55 +0000
+++ lib/lp/soyuz/doc/fakepackager.txt	2019-08-23 19:42:27 +0000
@@ -172,7 +172,7 @@
     >>> from zope.component import getUtility
     >>> from lp.services.gpg.interfaces import IGPGHandler
     >>> gpghandler = getUtility(IGPGHandler)
-    >>> sig = gpghandler.verifySignature(content)
+    >>> sig = gpghandler.getVerifiedSignature(content)
 
     >>> sig.fingerprint == packager.gpg_key_fingerprint[2:]
     True

=== modified file 'lib/lp/testing/fixture.py'
--- lib/lp/testing/fixture.py	2019-05-22 14:57:45 +0000
+++ lib/lp/testing/fixture.py	2019-08-23 19:42:27 +0000
@@ -13,6 +13,7 @@
     'Urllib2Fixture',
     'ZopeAdapterFixture',
     'ZopeEventHandlerFixture',
+    'ZopeUtilityFixture',
     'ZopeViewReplacementFixture',
     ]
 


Follow ups