← Back to team overview

launchpad-reviewers team mailing list archive

Re: [Merge] ~pappacena/launchpad:lp-signing-integration into launchpad:master

 


Diff comments:

> diff --git a/lib/lp/archivepublisher/enums.py b/lib/lp/archivepublisher/enums.py
> new file mode 100644
> index 0000000..e68befa
> --- /dev/null
> +++ b/lib/lp/archivepublisher/enums.py

It's true that this is extracting code from lp.archivepublisher.signing, so I can see why you put this in lp.archivepublisher too, but you've spread it out between lp.archivepublisher and lp.services.signing, which caused me to stop and think about it.  Maybe this should all be in lp.services.signing?  There's not much publisher-specific about it other than the supported key types, which we might well expand in future.

(See also my comments below about splitting up database tables.)

> @@ -0,0 +1,51 @@
> +# Copyright 2009 Canonical Ltd.  This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +"""Enum for archive publisher
> +"""
> +
> +__metaclass__ = type
> +__all__ = [
> +    'SigningKeyType',
> +    ]
> +
> +from lazr.enum import (
> +    DBEnumeratedType,
> +    DBItem,
> +    )
> +
> +from lazr.enum import DBEnumeratedType
> +
> +
> +class SigningKeyType(DBEnumeratedType):
> +    """Available key types on lp-signing service
> +    """
> +    UEFI = DBItem(0, """
> +        UEFI key
> +        
> +        UEFI signing key
> +        """)
> +
> +    KMOD = DBItem(1, """
> +        KMOD key
> +        
> +        KMOD signing key
> +        """)
> +
> +    OPAL = DBItem(2, """
> +        OPAL key
> +        
> +        OPAL signing key
> +        """)
> +
> +    SIPL = DBItem(3, """
> +        SIPL key
> +        
> +        SIPL signing key
> +        """)
> +
> +    FIT  = DBItem(4, """
> +        FIT key
> +        
> +        FIT signing key
> +        """)
> \ No newline at end of file
> diff --git a/lib/lp/archivepublisher/interfaces/signingkey.py b/lib/lp/archivepublisher/interfaces/signingkey.py
> new file mode 100644
> index 0000000..149b523
> --- /dev/null
> +++ b/lib/lp/archivepublisher/interfaces/signingkey.py
> @@ -0,0 +1,51 @@
> +# Copyright 2020 Canonical Ltd.  This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +"""Interfaces for signing keys stored at the signing service."""
> +
> +__metaclass__ = type
> +
> +__all__ = [
> +    'ISigningKey'
> +]
> +
> +from lp.archivepublisher.enums import SigningKeyType
> +from lp.registry.interfaces.distroseries import IDistroSeries
> +from lp.soyuz.interfaces.archive import IArchive
> +from zope.interface.interface import Interface
> +from zope.schema import (
> +    Int,
> +    Text,
> +    Datetime,
> +    Choice
> +    )
> +from lazr.restful.fields import Reference
> +from lp import _
> +
> +
> +class ISigningKey(Interface):
> +    """A key registered to sign uploaded files"""
> +
> +    id = Int(title=_('ID'), required=True, readonly=True)
> +
> +    archive = Reference(
> +        IArchive, title=_("Archive"), required=True,
> +        description=_("The archive that owns this key."))
> +
> +    distro_serie = Reference(
> +        IDistroSeries, title=_("Distro serie"), required=False,
> +        description=_("The distro serie that uses this archive."))

Although it's awkward, the English singular and plural are both "series".  We do occasionally use the invented plural "serieses" when we have no other sensible choice, but we don't use the invented singular "serie".

The series also doesn't use the archive.  Maybe "The minimum series that uses this key, if any."?

I'd also consider the overall layout here.  It occasionally makes sense to share the same key among multiple archives, e.g. a PPA whose purpose is to be a staging area for another archive and so is intentionally authorised to use the same key.  lp-signing deliberately doesn't know much about the policy for using a key, other than the client authorization system.  So I think this should be split up: we should have one table with key_type/fingerprint/public_key/date_created (which is a generic thing that can live in lp.services.signing), and another that expresses the links between archives and signing keys for each key type, possibly qualified by series (which would live in lp.soyuz or lp.archivepublisher).

(Inconveniently, (I)ArchiveSigningKey is already taken by something that should almost certainly eventually move into the signing service but won't be supported by it in the first pass.  Fortunately it doesn't have an associated database table of its own; it might make sense to rename it to (I)ArchiveGPGSigningKey as a preparatory step, and we can plan to eventually merge it into this system once lp-signing has GPG key support.)

> +
> +    key_type = Choice(
> +        title=_("The signing key type (UEFI, KMOD, etc)."),
> +        required=True, readonly=True, vocabulary=SigningKeyType)
> +
> +    fingerprint = Text(
> +        title=_("Fingerprint of the key"), required=True, readonly=True)
> +
> +    public_key = Text(
> +        title=_("Public key content, base64-encoded"), required=False,
> +        readonly=True)

lp-signing base64-encodes it in order to fit it into a JSON-based transport, since JSON doesn't accept bytes; but for database storage I think we should decode it.

> +
> +    date_created = Datetime(
> +        title=_('When this key was created'), required=True, readonly=True)
> diff --git a/lib/lp/archivepublisher/model/signingkeys.py b/lib/lp/archivepublisher/model/signingkeys.py
> new file mode 100644
> index 0000000..dfe08d2
> --- /dev/null
> +++ b/lib/lp/archivepublisher/model/signingkeys.py

"signingkey" rather than "signingkeys", to match the interface.  We usually use the singular for models.

> @@ -0,0 +1,101 @@
> +# Copyright 2020 Canonical Ltd.  This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +"""Database classes to manage signing keys stored at the signing service."""
> +
> +
> +__metaclass__ = type
> +
> +import pytz
> +from storm.locals import (
> +    DateTime,
> +    Int,
> +    Reference,
> +    Unicode,
> +    )
> +from zope.interface.declarations import implementer
> +
> +from lp.archivepublisher.enums import SigningKeyType
> +from lp.archivepublisher.interfaces.signingkey import ISigningKey
> +from lp.registry.model.distroseries import DistroSeries
> +from lp.services.database.constants import (
> +    DEFAULT,
> +    UTC_NOW,
> +    )
> +from lp.services.database.enumcol import DBEnum
> +from lp.services.database.interfaces import IMasterStore
> +from lp.services.database.stormbase import StormBase
> +from lp.services.signing.proxy import SigningService
> +from lp.soyuz.model.archive import Archive
> +
> +
> +@implementer(ISigningKey)
> +class SigningKey(StormBase):
> +    """A key stored at lp-signing, used to sign uploaded files and packages"""
> +
> +    __storm_table__ = 'SigningKey'
> +
> +    id = Int(primary=True)
> +
> +    archive_id = Int(name="archive")
> +    archive = Reference(archive_id, Archive.id)
> +
> +    distro_series_id = Int(name="distro_series", allow_none=True)
> +    distro_series = Reference(distro_series_id, DistroSeries.id)
> +
> +    key_type = DBEnum(enum=SigningKeyType, allow_none=False)
> +
> +    description = Unicode(allow_none=True)
> +
> +    fingerprint = Unicode(allow_none=False)
> +
> +    public_key = Unicode(allow_none=True)
> +
> +    date_created = DateTime(
> +        allow_none=False, default=UTC_NOW, tzinfo=pytz.UTC)
> +
> +    def __init__(self, key_type, archive, fingerprint, public_key,
> +                 distro_series=None, description=None, date_created=DEFAULT):
> +        super(SigningKey, self).__init__()
> +        self.key_type = key_type
> +        self.archive = archive
> +        self.fingerprint = fingerprint
> +        self.public_key = public_key
> +        self.description = description
> +        self.distro_series = distro_series
> +        self.date_created = date_created
> +
> +    @classmethod
> +    def generate(cls, key_type, archive, distro_series=None,
> +                 description=None):
> +        """Generated a new key on signing service, and save it to db.
> +
> +        :param key_type: One of the SigningKeyType enum's value
> +        :param archive: The package Archive that should be associated with
> +                        this key
> +        :param distro_series: (optional) The DistroSeries object
> +        :param description: (optional) The description associated with this
> +                            key
> +        :returns: The generated SigningKey
> +        """
> +        signing_service = SigningService()
> +        generated_key = signing_service.generate(key_type.name, description)
> +        signing_key = SigningKey(
> +            key_type=key_type, archive=archive,
> +            fingerprint=generated_key['fingerprint'],
> +            public_key=generated_key['public-key'],
> +            distro_series=distro_series, description=description)
> +        store = IMasterStore(SigningKey)
> +        store.add(signing_key)
> +        return signing_key
> +
> +    def sign(self, mode, message, message_name=None):
> +        """Sign the given message using this key
> +
> +        :param mode: "ATTACHED" or "DETACHED"
> +        :param message: The message to be signed
> +        :param message_name: A name for the message beign signed"""
> +        signing_service = SigningService()
> +        signed = signing_service.sign(
> +            self.key_type.name, self.fingerprint, message_name, message, mode)
> +        return signed['signed-message']
> diff --git a/lib/lp/archivepublisher/tests/test_signingkeys.py b/lib/lp/archivepublisher/tests/test_signingkeys.py
> new file mode 100644
> index 0000000..12bb7f3
> --- /dev/null
> +++ b/lib/lp/archivepublisher/tests/test_signingkeys.py

Similarly, "signingkey" rather than "signingkeys".

> @@ -0,0 +1,85 @@
> +# Copyright 2010-2020 Canonical Ltd.  This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +__metaclass__ = type
> +
> +import mock
> +
> +from lp.archivepublisher.enums import SigningKeyType
> +from lp.archivepublisher.model.signingkeys import SigningKey
> +from lp.services.database.interfaces import IMasterStore
> +from lp.services.signing.tests.test_proxy import SigningServiceResponseFactory
> +from lp.testing import TestCaseWithFactory
> +from lp.testing.layers import DatabaseFunctionalLayer
> +
> +
> +class TestSigningServiceSigningKey(TestCaseWithFactory):

Just "TestSigningKey" would do, and would be more regular.

> +
> +    layer = DatabaseFunctionalLayer
> +
> +    def setUp(self, *args, **kwargs):
> +        super(TestSigningServiceSigningKey, self).setUp(*args, **kwargs)
> +        self.signing_service = SigningServiceResponseFactory()
> +
> +    def test_save_signing_key(self):
> +        archive = self.factory.makeArchive()
> +        s = SigningKey(
> +            SigningKeyType.UEFI, archive, u"a fingerprint", u"a public_key",
> +            description=u"This is my key!")
> +        store = IMasterStore(SigningKey)
> +        store.add(s)
> +        store.commit()
> +
> +        resultset = store.find(SigningKey)
> +        self.assertEqual(1, resultset.count())
> +        db_key = resultset.one()
> +        self.assertEqual(SigningKeyType.UEFI, db_key.key_type)
> +        self.assertEqual(archive, db_key.archive)
> +        self.assertEqual("a fingerprint", db_key.fingerprint)
> +        self.assertEqual("a public_key", db_key.public_key)
> +        self.assertEqual("This is my key!", db_key.description)
> +
> +    @mock.patch("lp.services.signing.proxy.requests")
> +    def test_generate_signing_key_saves_correctly(self, mock_requests):
> +        self.signing_service.patch(mock_requests)
> +
> +        archive = self.factory.makeArchive()
> +        distro_series = archive.distribution.series[0]
> +
> +        key = SigningKey.generate(
> +            SigningKeyType.UEFI, archive, distro_series, u"this is my key")
> +        self.assertIsInstance(key, SigningKey)
> +
> +        store = IMasterStore(SigningKey)
> +        store.invalidate()
> +
> +        rs = store.find(SigningKey)
> +        self.assertEqual(1, rs.count())
> +        db_key = rs.one()
> +
> +        self.assertEqual(SigningKeyType.UEFI, db_key.key_type)
> +        self.assertEqual(
> +            self.signing_service.generated_fingerprint, db_key.fingerprint)
> +        self.assertEqual(
> +            self.signing_service.generated_public_key, db_key.public_key)
> +        self.assertEqual(archive, db_key.archive)
> +        self.assertEqual(distro_series, db_key.distro_series)
> +        self.assertEqual("this is my key", db_key.description)
> +
> +    @mock.patch("lp.services.signing.proxy.requests")
> +    def test_sign_some_data(self, mock_requests):
> +        self.signing_service.patch(mock_requests)
> +
> +        archive = self.factory.makeArchive()
> +
> +        s = SigningKey(
> +            SigningKeyType.UEFI, archive, u"a fingerprint", u"a public_key",
> +            description=u"This is my key!")
> +        signed = s.sign("ATTACHED", "secure message", "message_name")
> +
> +        # Checks if the returned value is actually the returning value from
> +        # HTTP POST /sign call to lp-signing service
> +        api_resp = self.signing_service.get_latest_json_response(
> +            "POST", "/sign")
> +        self.assertIsNotNone(api_resp, "The API was never called")
> +        self.assertEqual(api_resp['signed-message'], signed)
> diff --git a/lib/lp/services/signing/proxy.py b/lib/lp/services/signing/proxy.py
> new file mode 100644
> index 0000000..583920e
> --- /dev/null
> +++ b/lib/lp/services/signing/proxy.py
> @@ -0,0 +1,152 @@
> +# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +"""Proxy calls to lp-signing service"""
> +
> +from __future__ import division
> +
> +__metaclass__ = type
> +
> +import base64
> +import json
> +
> +import requests
> +from lp.services.propertycache import cachedproperty
> +from nacl.encoding import Base64Encoder
> +from nacl.public import (
> +    Box,
> +    PrivateKey,
> +    PublicKey,
> +)
> +
> +
> +class SigningService:
> +    # XXX: Move it to configuration
> +    LP_SIGNING_ADDRESS = "http://signing.launchpad.test:8000";
> +
> +    # XXX: Temporary test keys. Should be moved to configuration files
> +    LOCAL_PRIVATE_KEY = "O73bJzd3hybyBxUKk0FaR6K9CbbmxBYkw6vCrIWZkSY="
> +    LOCAL_PUBLIC_KEY = "xEtwSS7kdGmo0ElcN2fR/mcHS0A42zhYbo/+5KV4xRs="
> +
> +    ATTACHED = "ATTACHED"
> +    DETACHED = "DETACHED"
> +    KEY_TYPES = {"UEFI", "KMOD", "OPAL", "SIPL", "FIT"}

You could do something with SigningKeyType.items or similar here, rather than needing to repeat it.

> +
> +    def __init__(self):
> +        pass
> +
> +    def get_url(self, path):
> +        """Shotcut to concatenate LP_SIGNING_ADDRESS with the desired
> +        endpoint path
> +
> +        :param path: The REST endpoint to be joined"""
> +        return self.LP_SIGNING_ADDRESS + path
> +
> +    def _get_json(self, path, method="GET", **kwargs):
> +        """Helper method to do an HTTP request and get back a json from  the
> +        signing service, raising exception if status code != 200
> +        """
> +        url = self.get_url(path)
> +        if method == "GET":
> +            response = requests.get(url)
> +        elif method == "POST":
> +            response = requests.post(url, **kwargs)

Use lp.services.timeout.urlfetch here so that we get timeout support, and timeline integration would be good as well so that we can see the request in OOPSes.  Have a look at lp.code.model.githosting to see how it does things.

> +        else:
> +            raise NotImplemented("Only GET and POST are allowed for now")
> +
> +        response.raise_for_status()
> +        return response.json()
> +
> +    @cachedproperty
> +    def service_public_key(self):
> +        """Returns the lp-signing service's public key
> +        """
> +        data = self._get_json("/service-key")
> +        return PublicKey(data["service-key"], encoder=Base64Encoder)
> +
> +    @property
> +    def private_key(self):
> +        return PrivateKey(self.LOCAL_PRIVATE_KEY, encoder=Base64Encoder)
> +
> +    def get_nonce(self):
> +        """Get nonce, to be used when sending messages
> +        """
> +        data = self._get_json("/nonce", "POST")
> +        return base64.b64decode(data["nonce"].encode("UTF-8"))
> +
> +    def _get_auth_headers(self, nonce):
> +        """Get headers to call authenticated endpoints
> +
> +        :param nonce: The nonce bytes to be used (not the base64 encoded one!)
> +        :return: Header dict, ready to be used by requests
> +        """
> +        return {
> +            "Content-Type": "application/x-boxed-json",
> +            "X-Client-Public-Key": self.LOCAL_PUBLIC_KEY,
> +            "X-Nonce": base64.b64encode(nonce)}
> +
> +    def _encrypt_payload(self, nonce, message):
> +        """Returns the encrypted version of message, base64 encoded and
> +        ready to be sent on a HTTP request to lp-signing service
> +
> +        :param nonce: The original (non-base64 encoded) nonce
> +        :param message: The str message to be encrypted
> +        """
> +        box = Box(self.private_key, self.service_public_key)
> +        encrypted_message = box.encrypt(message, nonce, encoder=Base64Encoder)
> +        return encrypted_message.ciphertext
> +
> +    def generate(self, key_type, description):
> +        """Generate a key to be used when signing
> +
> +        :param key_type: One of available key types at lp-signing: ["UEFI",
> +                         "KMOD", "OPAL", "SIPL", "FIT"]
> +        :param description: String description of the generated key
> +        :return: A dict with 'fingerprint' (str) and 'public-key' (a
> +                Base64-encoded NaCl public key)
> +        """
> +        if key_type not in SigningService.KEY_TYPES:
> +            raise ValueError("%s is not a valid key type" % key_type)
> +        nonce = self.get_nonce()
> +        data = json.dumps({
> +            "key-type": key_type,
> +            "description": description,
> +            }).encode("UTF-8")
> +        return self._get_json(
> +            "/generate", "POST", headers=self._get_auth_headers(nonce),
> +            data=self._encrypt_payload(nonce, data))
> +
> +    def sign(self, key_type, fingerprint, message_name, message, mode):
> +        """Sign the given message using the specified key_type and a
> +        pre-generated fingerprint (see `generate` method).
> +
> +        :param key_type: One of the available key types (see `generate`
> +                         method docs)
> +        :param fingerprint: The fingerprint of the signing key, generated by
> +                            the `generate` method
> +        :param message_name: A description of the message being signed
> +        :param message: The message to be signed
> +        :param mode: SignService.ATTACHED or SignService.DETACHED
> +        :return: A dict with 'public-key' and 'signed-message'
> +        """
> +        if mode not in {SigningService.ATTACHED, SigningService.DETACHED}:
> +            raise ValueError("%s is not a valid mode" % mode)
> +        if key_type not in SigningService.KEY_TYPES:
> +            raise ValueError("%s is not a valid key type" % key_type)
> +
> +        nonce = self.get_nonce()
> +        data = json.dumps({
> +            "key-type": key_type,
> +            "fingerprint": fingerprint,
> +            "message-name": message_name,
> +            "message": base64.b64encode(message).decode("UTF-8"),
> +            "mode": mode,
> +            }).encode("UTF-8")
> +        data = self._get_json(
> +            "/sign", "POST",
> +            headers=self._get_auth_headers(nonce),
> +            data=self._encrypt_payload(nonce, data))
> +
> +        return {
> +            'public-key': data['public-key'],
> +            'signed-message': base64.b64decode(data['signed-message'])}
> \ No newline at end of file
> diff --git a/lib/lp/services/signing/tests/test_proxy.py b/lib/lp/services/signing/tests/test_proxy.py
> new file mode 100644
> index 0000000..15cdbe9
> --- /dev/null
> +++ b/lib/lp/services/signing/tests/test_proxy.py
> @@ -0,0 +1,321 @@
> +# Copyright 2010-2020 Canonical Ltd.  This software is licensed under the
> +# GNU Affero General Public License version 3 (see the file LICENSE).
> +
> +__metaclass__ = type
> +
> +import base64
> +from collections import defaultdict
> +
> +import mock
> +from mock import ANY
> +from nacl.encoding import Base64Encoder
> +from nacl.public import PublicKey
> +import requests
> +
> +from lp.services.signing.proxy import SigningService
> +from lp.testing import TestCaseWithFactory
> +from lp.testing.layers import BaseLayer
> +
> +
> +class SigningServiceResponseFactory:
> +    """Factory for fake responses from lp-signing service.
> +
> +    This class is a helper to pretend that lp-signing service is running by
> +    mocking `requests` module, and returning fake responses from
> +    response.get(url) and response.post(url). See `patch` method.
> +    """

Could you use the "responses" module here instead, please?  That's generally what we use to serve as a requests mock.  Same in lp.archivepublisher.tests.test_signingkey(s).

> +    def __init__(self):
> +        self.base64_service_public_key = (
> +            u"x7vTtpmn0+DvKNdmtf047fn1JRQI5eMnOQRy3xJ1m10=")
> +        self.base64_nonce = "neSSa2MUZlQU3XiipU2TfiaqW5nrVUpR"
> +        self.generated_public_key = (
> +            u'LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURFVENDQWZtZ0F3SUJBZ0l'
> +            u'VZlgreHlFNUp4VVcyWVBYemVDMGtsQlZZQTBjd0RRWUpLb1pJaHZjTkFRRUwKQl'
> +            u'FBd0dERVdNQlFHQTFVRUF3d05WR1Z6ZENCclpYa2dWVVZHU1RBZUZ3MHlNREF4T'
> +            u'WpneE16UTVORGRhRncwegpNREF4TWpVeE16UTVORGRhTUJneEZqQVVCZ05WQkFN'
> +            u'TURWUmxjM1FnYTJWNUlGVkZSa2t3Z2dFaU1BMEdDU3FHClNJYjNEUUVCQVFVQUE'
> +            u'0SUJEd0F3Z2dFS0FvSUJBUURPc3Vvd3VGZllRK1g0TjVLZWtXMGxBbmMvemNrYk'
> +            u'5mUmEKK2hZSE56RmJDa2hMUzZYTWdOY1d5eW8rTk4rd05FN3JEcUgwc3gweUZzc'
> +            u'zJuVCtxWXM1WFdIYmRXMHBVNnpCTwp0bEh0MjhNQzYzWU03ZkpFZnVpM1RFRXph'
> +            u'R1VKOUp2dFhENG16Vkd1cGxZczVBckkvc3RvdDVHY0J6bHVuNHJnCkNHNUdtWXR'
> +            u'KVGw4YkpSWGFqckhMZFJJc2dIWCtXTXBKUCtnQVlnSE94M3Y5VHlXaDJMR0FnbU'
> +            u'MyYVFSbFE3WEoKS1ZSRzVaYTJVMlRaZ3dnYzc0SkFBNjVIQSt2Z2xtcW5SZExad'
> +            u'0RRTUluYjZ6djBsL0tDYkFRbldwL2hxMDB6VwoxRkVrR1k2Sm1Jb3BnR0lxTm9E'
> +            u'WVJOSEllWCtiVHE2eWthcUg5M0pjV2NOYlBZdEZGMUFGQWdNQkFBR2pVekJSCk1'
> +            u'CMEdBMVVkRGdRV0JCUXFpZXBwdkxOaWZIMjlKV3ByNk4zdmFpYzNDREFmQmdOVk'
> +            u'hTTUVHREFXZ0JRcWllcHAKdkxOaWZIMjlKV3ByNk4zdmFpYzNDREFQQmdOVkhST'
> +            u'UJBZjhFQlRBREFRSC9NQTBHQ1NxR1NJYjNEUUVCQ3dVQQpBNElCQVFCYXIzaGk5'
> +            u'TXFabjJ4eUh5Z3pBQzhGMFlmU2lHNm5aV09jSEJPT1QveVhMOVloTlFxL292TWZ'
> +            u'TUlJtCnl1SkE3eWlxcUhUU211ZDZ4QWxYZlhQYXpxaUxHSEdpMXl2dWVVOWtmOW'
> +            u'dHRGp2NW5Kek1YMXBLeUZLcVZjaysKZVdyVEF6S21xL2FTWFRlcFhUNVBoRU14Y'
> +            u'UdORHlJb3I3ck0rU2JZaWNFZGZoeEZiSTc1UWk3NERCdGlBdmZCbgpsK2JwMk1D'
> +            u'dDNydS81YiszYUNjRm5Pa0dXQ3I0RW1RZktzSTlYanViblJYNTFVMTVleFdsQW1'
> +            u'LaVNQRGd4aUcvCm5iSklYVHNUYXNqeVNuaEl0QkFQcWozdlgzRFhFZmlpVm5icH'
> +            u'dSbEZ4YjhQRlI5cC9hQTExdVp2VXhMYTI1TDQKSEVhdW5sRWdpRnM4Ynl5Z2hTa'
> +            u'GZJdU40aDZuMwotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg')
> +        self.generated_fingerprint = (
> +            u'338D218488DFD597D8FCB9C328C3E9D9ADA16CEE')
> +
> +        """self.latest_response is a Structure like:
> +         {
> +            "GET": {
> +                "/service-key": mock_response1,
> +            },
> +            "POST": {
> +                "/nonce": mock_response2
> +            }
> +        }
> +        """
> +        self.latest_responses = defaultdict(dict)
> +
> +    def get_latest_json_response(self, method, endpoint):
> +        """Returns the latest JSON response for the given HTTP method and
> +        endpoint.
> +        """
> +        method = method.upper()
> +        resp = self.get_latest_response(method, endpoint)
> +        if resp is None:
> +            return None
> +        return resp.json.return_value
> +
> +    def get_latest_response(self, method, endpoint):
> +        """Returns the latest response object for the given HTTP method and
> +        endpoint.
> +        """
> +        return self.latest_responses.get(method, {}).get(endpoint)
> +
> +    def _get_mock_response(self, status_code, json):
> +        mock_response = mock.Mock(requests.Response)
> +        mock_response.status_code = status_code
> +        mock_response.json.return_value = json
> +        return mock_response
> +
> +    def get_service_key(self, service_key):
> +        """Fake response for GET /service-key endpoint.
> +        """
> +        return self._get_mock_response(200, {"service-key": service_key})
> +
> +    def post_nonce(self, nonce):
> +        """Fake response for POST /nonce endpoint.
> +
> +        :param nonce: The base64-encoded nonce, as it would be returned by
> +                      the request.
> +        """
> +        return self._get_mock_response(200, {"nonce": nonce})
> +
> +    def post_generate(self, public_key, finger_print):
> +        """Fake response for POST /generate endpoint.
> +        """
> +        return self._get_mock_response(
> +            200, {'fingerprint': finger_print, 'public-key': public_key})
> +
> +    def post_sign(self, signed_message, public_key):
> +        b64_signed_msg = base64.b64encode(signed_message)
> +        return self._get_mock_response(
> +            200, {'signed-message': b64_signed_msg, 'public-key': public_key})
> +
> +    def patch_requests(self, requests_module, method, responses):
> +        """Patch the mock "requests module" to return the given responses
> +        when certain endpoints are called.
> +
> +        :param requests_module: The result of @mock.patch("requests"),
> +                                to be patched.
> +        :param method: HTTP method beign patched (GET or POST, for example)
> +        :param responses: A dict where keys are the endpoints and the values
> +                          are the mock requests.Response objects. E.g.:
> +                          {"/sign": mock.Mock(requests.Response), ...}
> +        """
> +        def side_effect(url, *args, **kwags):
> +            for endpoint, response in responses.items():
> +                if url.endswith(endpoint):
> +                    self.latest_responses[method.upper()][endpoint] = response
> +                    return response
> +            return mock.Mock()
> +        method = method.lower()
> +        requests_method = getattr(requests_module, method)
> +        requests_method.side_effect = side_effect
> +        return requests_module
> +
> +    def patch(self, requests_module):
> +        """Patches all requests with default test values.
> +
> +        This method gets a `requests` module mock, and sets the responses as
> +        if they were real calls to lp-signing. You can inspect the responses
> +        and called methods using some helpers provided by this class.
> +
> +        The mock responses are available using self.get_latest_json_response
> +        and self.get_latest_response. Both methods receives the HTTP
> +        method and the endpoint (e.g. ("GET", "/service-key").
> +
> +        You can easily checked if an API call was actually made by
> +        checking if the above mock_response.json was called or is None,
> +        for example.
> +
> +        Other helpful attributes are:
> +            self.base64_service_public_key
> +            self.base64_nonce
> +            self.generated_public_key
> +            self.generated_fingerprint
> +        which holds the respective values used in the fake responses
> +
> +        :param requests_module: The mock of `requests` module, patched with
> +                                mock.patch.
> +        """
> +        # HTTP GET responses
> +        get_service_key_response = self.get_service_key(
> +            self.base64_service_public_key)
> +        get_responses = {"/service-key": get_service_key_response}
> +        self.patch_requests(requests_module, "GET", get_responses)
> +
> +        post_nonce_response = self.post_nonce(self.base64_nonce)
> +        post_generate_response = self.post_generate(
> +            self.generated_public_key, self.generated_fingerprint)
> +
> +        post_sign_response = self.post_sign(
> +            'the-signed-msg', 'the-public-key')
> +
> +        # Replace POST /nonce, /generate and /sign responses by our mocks.
> +        self.patch_requests(
> +            requests_module, "POST", {
> +                "/nonce": post_nonce_response,
> +                "/generate": post_generate_response,
> +                "/sign": post_sign_response})
> +
> +
> +
> +@mock.patch("lp.services.signing.proxy.requests")
> +class SigningServiceProxyTest(TestCaseWithFactory):
> +    """Tests signing service without actually making calls to lp-signing.
> +
> +    Every REST call is mocked using self.response_factory, and most of this
> +    class's work is actually calling those endpoints. So, many things are
> +    mocked here, returning fake responses created at
> +    SigningServiceResponseFactory.
> +    """
> +    layer = BaseLayer
> +
> +    def setUp(self, *args, **kwargs):
> +        super(TestCaseWithFactory, self).setUp(*args, **kwargs)
> +        self.response_factory = SigningServiceResponseFactory()
> +
> +    def test_get_service_public_key(self, mock_requests):
> +        self.response_factory.patch(mock_requests)
> +
> +        signing = SigningService()
> +        key = signing.service_public_key
> +
> +        # Asserts that the public key is correct.
> +        self.assertIsInstance(key, PublicKey)
> +        self.assertEqual(
> +            key.encode(Base64Encoder),
> +            self.response_factory.base64_service_public_key)
> +
> +        # Asserts that the endpoint was called.
> +        mock_requests.get.assert_called_once_with(
> +            signing.LP_SIGNING_ADDRESS + "/service-key")
> +
> +    def test_get_nonce(self, mock_requests):
> +        # Server returns base64-encoded nonce, but the
> +        # SigningService.get_nonce method returns it already decoded.
> +        self.response_factory.patch(mock_requests)
> +
> +        signing = SigningService()
> +        nonce = signing.get_nonce()
> +
> +        self.assertEqual(
> +            base64.b64encode(nonce), self.response_factory.base64_nonce)
> +
> +    def test_generate_unknown_key_type_raises_exception(self, mock_requests):
> +        signing = SigningService()
> +        self.assertRaises(
> +            ValueError, signing.generate, "banana", "Wrong key type")
> +        self.assertEqual(0, mock_requests.get.call_count)
> +        self.assertEqual(0, mock_requests.post.call_count)
> +
> +    def test_generate_key(self, mock_requests):
> +        """Makes sure that the SigningService.generate method calls the
> +        correct endpoints
> +        """
> +        self.response_factory.patch(mock_requests)
> +        # Generate the key, and checks if we got back the correct dict.
> +        signing = SigningService()
> +        generated = signing.generate("UEFI", "my lp test key")
> +
> +        self.assertEqual(generated, {
> +            'public-key': self.response_factory.generated_public_key,
> +            'fingerprint': self.response_factory.generated_fingerprint})
> +
> +        # Asserts it tried to fetch service key, fetched the nonce and posted
> +        # to the /generate endpoint.
> +        mock_requests.get.assert_called_once_with(
> +            signing.LP_SIGNING_ADDRESS + "/service-key")
> +
> +        responses = self.response_factory
> +        self.assertIsNotNone(
> +            responses.get_latest_json_response("POST", "/nonce"))
> +        self.assertIsNotNone(
> +            responses.get_latest_json_response("POST", "/generate"))
> +
> +        mock_requests.post.assert_called_with(
> +            signing.LP_SIGNING_ADDRESS + "/generate",
> +            headers={
> +                "Content-Type": "application/x-boxed-json",
> +                "X-Client-Public-Key": signing.LOCAL_PUBLIC_KEY,
> +                "X-Nonce": self.response_factory.base64_nonce
> +            },
> +            data=ANY)  # XXX: check the encrypted data.
> +
> +    def test_sign_invalid_mode(self, mock_requests):
> +        signing = SigningService()
> +        self.assertRaises(
> +            ValueError, signing.sign,
> +            'UEFI', 'fingerprint', 'message_name', 'message', 'NO-MODE')
> +        self.assertEqual(0, mock_requests.get.call_count)
> +        self.assertEqual(0, mock_requests.post.call_count)
> +
> +    def test_sign_invalid_key_type(self, mock_requests):
> +        signing = SigningService()
> +        self.assertRaises(
> +            ValueError, signing.sign,
> +            'shrug', 'fingerprint', 'message_name', 'message', 'ATTACHED')
> +        self.assertEqual(0, mock_requests.get.call_count)
> +        self.assertEqual(0, mock_requests.post.call_count)
> +
> +    def test_sign(self, mock_requests):
> +        """Runs through SignService.sign() flow"""
> +        # Replace GET /service-key response by our mock.
> +        resp_factory = self.response_factory
> +        resp_factory.patch(mock_requests)
> +
> +        fingerprint = '338D218488DFD597D8FCB9C328C3E9D9ADA16CEE'
> +        key_type = 'KMOD'
> +        mode = 'DETACHED'
> +        message_name = 'my test msg'
> +        message = 'this is the message content'
> +
> +        signing = SigningService()
> +        data = signing.sign(
> +            key_type, fingerprint, message_name, message, mode)
> +
> +        # It should have returned the values from response.json(),
> +        # but decoding what is base64-encoded.
> +        resp_json = resp_factory.get_latest_json_response("POST", "/sign")
> +        self.assertEqual(2, len(data))
> +        self.assertEqual(data['public-key'], resp_json['public-key'])
> +        self.assertEqual(
> +            data['signed-message'],
> +            base64.b64decode(resp_json['signed-message']))
> +
> +        self.assertIsNotNone(
> +            resp_factory.get_latest_response("POST", "/nonce"))
> +        self.assertIsNotNone(
> +            resp_factory.get_latest_response("POST", "/sign"))
> +
> +        mock_requests.post.assert_called_with(
> +            signing.LP_SIGNING_ADDRESS + "/sign",
> +            headers={
> +                "Content-Type": "application/x-boxed-json",
> +                "X-Client-Public-Key": signing.LOCAL_PUBLIC_KEY,
> +                "X-Nonce": resp_factory.base64_nonce
> +            },
> +            data=ANY)  # XXX: check the encrypted data.
> \ No newline at end of file


-- 
https://code.launchpad.net/~pappacena/launchpad/+git/launchpad/+merge/378364
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/launchpad:lp-signing-integration into launchpad:master.


References