← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:sync-signingkeys-commit-test into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:sync-signingkeys-commit-test into launchpad:master.

Commit message:
Add test that sync-signingkeys commits changes

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/383971

This mainly involved writing yet another thing that pretends to be a signing service, because we needed something that could support use of the signing service by subprocesses.  It's possible that we should also refactor some of the existing tests to use this fake service to reduce duplication (compare the test keyserver, for instance), though I haven't attempted that here.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:sync-signingkeys-commit-test into launchpad:master.
diff --git a/lib/lp/archivepublisher/tests/test_sync_signingkeys.py b/lib/lp/archivepublisher/tests/test_sync_signingkeys.py
index a1d1796..cb38a3c 100644
--- a/lib/lp/archivepublisher/tests/test_sync_signingkeys.py
+++ b/lib/lp/archivepublisher/tests/test_sync_signingkeys.py
@@ -6,36 +6,45 @@
 from __future__ import absolute_import, print_function, unicode_literals
 
 __metaclass__ = type
-
-
 __all__ = [
     'SyncSigningKeysScript',
     ]
 
 from datetime import datetime
 import os
+from textwrap import dedent
 
 from fixtures import (
     MockPatch,
     TempDir,
     )
 from pytz import utc
+from testtools.content import text_content
 from testtools.matchers import (
     Equals,
     MatchesDict,
     MatchesStructure,
     )
+import transaction
+from zope.component import getUtility
 
 from lp.archivepublisher.model.publisherconfig import PublisherConfig
 from lp.archivepublisher.scripts.sync_signingkeys import SyncSigningKeysScript
 from lp.services.compat import mock
+from lp.services.config.fixture import (
+    ConfigFixture,
+    ConfigUseFixture,
+    )
 from lp.services.database.interfaces import IStore
 from lp.services.log.logger import BufferLogger
 from lp.services.signing.enums import SigningKeyType
+from lp.services.signing.interfaces.signingkey import IArchiveSigningKeySet
+from lp.services.signing.testing.fixture import SigningServiceFixture
 from lp.services.signing.tests.helpers import SigningServiceClientFixture
 from lp.soyuz.model.archive import Archive
 from lp.testing import TestCaseWithFactory
 from lp.testing.layers import ZopelessDatabaseLayer
+from lp.testing.script import run_script
 
 
 class TestSyncSigningKeysScript(TestCaseWithFactory):
@@ -44,8 +53,16 @@ class TestSyncSigningKeysScript(TestCaseWithFactory):
     def setUp(self):
         super(TestSyncSigningKeysScript, self).setUp()
         self.signing_root_dir = self.useFixture(TempDir()).path
-        self.pushConfig(
-            "personalpackagearchive", signing_keys_root=self.signing_root_dir)
+        # Add our local configuration to an on-disk configuration file so
+        # that it can be used by subprocesses.
+        config_name = self.factory.getUniqueString()
+        config_fixture = self.useFixture(
+            ConfigFixture(config_name, os.environ["LPCONFIG"]))
+        config_fixture.add_section(dedent("""
+            [personalpackagearchive]
+            signing_keys_root: {}
+            """).format(self.signing_root_dir))
+        self.useFixture(ConfigUseFixture(config_name))
 
     def makeScript(self, test_args):
         script = SyncSigningKeysScript("test-sync", test_args=test_args)
@@ -282,3 +299,30 @@ class TestSyncSigningKeysScript(TestCaseWithFactory):
             "Signing key for %s / %s / %s already exists" %
             (key_type, archive.reference, series.name),
             script.logger.content.as_text())
+
+    def runScript(self):
+        transaction.commit()
+        ret, out, err = run_script("scripts/sync-signingkeys.py")
+        self.addDetail("stdout", text_content(out))
+        self.addDetail("stderr", text_content(err))
+        self.assertEqual(0, ret)
+        transaction.commit()
+
+    def test_script(self):
+        self.useFixture(SigningServiceFixture())
+        series = self.factory.makeDistroSeries()
+        archive = self.factory.makeArchive(distribution=series.distribution)
+        key_dirs = self.makeArchiveSigningDir(archive)
+        archive_root = key_dirs[None]
+        with open(os.path.join(archive_root, "uefi.key"), "wb") as fd:
+            fd.write(b"Private key content")
+        with open(os.path.join(archive_root, "uefi.crt"), "wb") as fd:
+            fd.write(b"Public key content")
+
+        self.runScript()
+
+        archive_signing_key = getUtility(IArchiveSigningKeySet).getSigningKey(
+            SigningKeyType.UEFI, archive, series)
+        self.assertThat(archive_signing_key, MatchesStructure(
+            key_type=Equals(SigningKeyType.UEFI),
+            public_key=Equals(b"Public key content")))
diff --git a/lib/lp/services/signing/testing/__init__.py b/lib/lp/services/signing/testing/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lp/services/signing/testing/__init__.py
diff --git a/lib/lp/services/signing/testing/fakesigning.py b/lib/lp/services/signing/testing/fakesigning.py
new file mode 100644
index 0000000..28877e9
--- /dev/null
+++ b/lib/lp/services/signing/testing/fakesigning.py
@@ -0,0 +1,188 @@
+# Copyright 2020 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Twisted resources implementing a fake signing service."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+__all__ = [
+    'SigningServiceResource',
+    ]
+
+import base64
+import json
+import os
+
+from nacl.encoding import Base64Encoder
+from nacl.public import (
+    Box,
+    PrivateKey,
+    PublicKey,
+    )
+from nacl.utils import random
+from twisted.web import resource
+
+
+class ServiceKeyResource(resource.Resource, object):
+    """Resource implementing /service-key."""
+
+    isLeaf = True
+
+    def __init__(self, service_public_key):
+        super(ServiceKeyResource, self).__init__()
+        self.service_public_key = service_public_key
+
+    def render_GET(self, request):
+        request.setHeader(b"Content-Type", b"application/json")
+        return json.dumps({
+            "service-key": self.service_public_key.encode(
+                encoder=Base64Encoder).decode("UTF-8"),
+            })
+
+
+class NonceResource(resource.Resource, object):
+    """Resource implementing /nonce.
+
+    Note that this fake signing service does not check that nonces are only
+    used once.
+    """
+
+    isLeaf = True
+
+    def __init__(self):
+        super(NonceResource, self).__init__()
+        self.nonces = []
+
+    def render_POST(self, request):
+        nonce = base64.b64encode(random(Box.NONCE_SIZE)).decode("UTF-8")
+        self.nonces.append(nonce)
+        request.setHeader(b"Content-Type", b"application/json")
+        return json.dumps({"nonce": nonce})
+
+
+class BoxedAuthenticationResource(resource.Resource, object):
+    """Base for resources that use boxed authentication."""
+
+    def __init__(self, service_private_key, client_public_key):
+        super(BoxedAuthenticationResource, self).__init__()
+        self.box = Box(service_private_key, client_public_key)
+
+    def _decrypt(self, request):
+        """Authenticate and decrypt request data."""
+        nonce = base64.b64decode(request.getHeader(b"X-Nonce"))
+        return self.box.decrypt(
+            request.content.read(), nonce, encoder=Base64Encoder)
+
+    def _encrypt(self, request, data):
+        """Encrypt and authenticate response data."""
+        nonce = base64.b64decode(request.getHeader(b"X-Response-Nonce"))
+        request.setHeader(b"Content-Type", b"application/x-boxed-json")
+        return self.box.encrypt(data, nonce, encoder=Base64Encoder).ciphertext
+
+
+class GenerateResource(BoxedAuthenticationResource):
+    """Resource implementing /generate."""
+
+    isLeaf = True
+
+    def __init__(self, service_private_key, client_public_key, keys):
+        super(GenerateResource, self).__init__(
+            service_private_key, client_public_key)
+        self.keys = keys
+        self.requests = []
+
+    def render_POST(self, request):
+        payload = json.loads(self._decrypt(request).decode("UTF-8"))
+        self.requests.append(payload)
+        # We don't need to bother with generating a real key here.  Just
+        # make up some random data.
+        private_key = random()
+        public_key = random()
+        fingerprint = base64.b64encode(random()).decode("UTF-8")
+        self.keys[fingerprint] = (private_key, public_key)
+        response_payload = {
+            "fingerprint": fingerprint,
+            "public-key": base64.b64encode(public_key).decode("UTF-8"),
+            }
+        return self._encrypt(
+            request, json.dumps(response_payload).encode("UTF-8"))
+
+
+class SignResource(BoxedAuthenticationResource):
+    """Resource implementing /sign."""
+
+    isLeaf = True
+
+    def __init__(self, service_private_key, client_public_key, keys):
+        super(SignResource, self).__init__(
+            service_private_key, client_public_key)
+        self.keys = keys
+        self.requests = []
+
+    def render_POST(self, request):
+        payload = json.loads(self._decrypt(request).decode("UTF-8"))
+        self.requests.append(payload)
+        _, public_key = self.keys[payload["fingerprint"]]
+        # We don't need to bother with generating a real signature here.
+        # Just make up some random data.
+        signed_message = random()
+        response_payload = {
+            "public-key": base64.b64encode(public_key).decode("UTF-8"),
+            "signed-message": base64.b64encode(signed_message).decode("UTF-8"),
+            }
+        return self._encrypt(
+            request, json.dumps(response_payload).encode("UTF-8"))
+
+
+class InjectResource(BoxedAuthenticationResource):
+    """Resource implementing /inject."""
+
+    isLeaf = True
+
+    def __init__(self, service_private_key, client_public_key, keys):
+        super(InjectResource, self).__init__(
+            service_private_key, client_public_key)
+        self.keys = keys
+        self.requests = []
+
+    def render_POST(self, request):
+        payload = json.loads(self._decrypt(request).decode("UTF-8"))
+        self.requests.append(payload)
+        private_key = base64.b64decode(payload["private-key"].encode("UTF-8"))
+        public_key = base64.b64decode(payload["public-key"].encode("UTF-8"))
+        # We don't need to bother with generating a real fingerprint here.
+        # Just make up some random data.
+        fingerprint = base64.b64encode(random()).decode("UTF-8")
+        self.keys[fingerprint] = (private_key, public_key)
+        response_payload = {"fingerprint": fingerprint}
+        return self._encrypt(
+            request, json.dumps(response_payload).encode("UTF-8"))
+
+
+class SigningServiceResource(resource.Resource, object):
+    """Root resource for the fake signing service."""
+
+    def __init__(self):
+        resource.Resource.__init__(self)
+        self.service_private_key = PrivateKey.generate()
+        self.client_public_key = PublicKey(
+            os.environ["FAKE_SIGNING_CLIENT_PUBLIC_KEY"],
+            encoder=Base64Encoder)
+        self.keys = {}
+        self.putChild(
+            b"service-key",
+            ServiceKeyResource(self.service_private_key.public_key))
+        self.putChild(b"nonce", NonceResource())
+        self.putChild(
+            b"generate",
+            GenerateResource(
+                self.service_private_key, self.client_public_key, self.keys))
+        self.putChild(
+            b"sign",
+            SignResource(
+                self.service_private_key, self.client_public_key, self.keys))
+        self.putChild(
+            b"inject",
+            InjectResource(
+                self.service_private_key, self.client_public_key, self.keys))
diff --git a/lib/lp/services/signing/testing/fakesigning.tac b/lib/lp/services/signing/testing/fakesigning.tac
new file mode 100644
index 0000000..d828ba5
--- /dev/null
+++ b/lib/lp/services/signing/testing/fakesigning.tac
@@ -0,0 +1,33 @@
+# Copyright 2020 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Twisted application configuration file for a fake signing service."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+
+import os
+
+from twisted.application import (
+    service,
+    strports,
+    )
+from twisted.python.compat import nativeString
+from twisted.web import server
+
+from lp.services.daemons import readyservice
+from lp.services.signing.testing.fakesigning import SigningServiceResource
+
+
+application = service.Application("fakesigning")
+svc = service.IServiceCollection(application)
+
+# Service that announces when the daemon is ready.
+readyservice.ReadyService().setServiceParent(svc)
+
+site = server.Site(SigningServiceResource())
+site.displayTracebacks = False
+
+port = nativeString("tcp:%s" % os.environ["FAKE_SIGNING_PORT"])
+strports.service(port, site).setServiceParent(svc)
diff --git a/lib/lp/services/signing/testing/fixture.py b/lib/lp/services/signing/testing/fixture.py
new file mode 100644
index 0000000..0afa9d4
--- /dev/null
+++ b/lib/lp/services/signing/testing/fixture.py
@@ -0,0 +1,90 @@
+# Copyright 2020 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Fake signing service fixture."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+__all__ = [
+    "SigningServiceFixture",
+    ]
+
+import os.path
+import socket
+from textwrap import dedent
+
+from nacl.encoding import Base64Encoder
+from nacl.public import PrivateKey
+from testtools import (
+    content,
+    content_type,
+    )
+from txfixtures.tachandler import TacTestFixture
+
+from lp.services.config import config
+from lp.services.config.fixture import (
+    ConfigFixture,
+    ConfigUseFixture,
+    )
+from lp.testing.factory import ObjectFactory
+
+
+class SigningServiceFixture(TacTestFixture):
+
+    tacfile = os.path.join(os.path.dirname(__file__), "fakesigning.tac")
+    pidfile = None
+    logfile = None
+    client_private_key = None
+    daemon_port = None
+
+    def setUp(self, spew=False, umask=None):
+        # Pick a random free port.
+        if self.daemon_port is None:
+            sock = socket.socket()
+            sock.bind(('', 0))
+            self.daemon_port = sock.getsockname()[1]
+            sock.close()
+            self.logfile = os.path.join(
+                config.root, "logs", "fakesigning-%s.log" % self.daemon_port)
+            self.pidfile = os.path.join(
+                config.root, "logs", "fakesigning-%s.pid" % self.daemon_port)
+        assert self.daemon_port is not None
+
+        super(SigningServiceFixture, self).setUp(
+            spew=spew, umask=umask,
+            python_path=os.path.join(config.root, "bin", "py"),
+            twistd_script=os.path.join(config.root, "bin", "twistd"))
+
+        logfile = self.logfile
+        self.addCleanup(lambda: os.path.exists(logfile) and os.unlink(logfile))
+
+        content.attach_file(
+            self, logfile, "signing-log", content_type.UTF8_TEXT)
+
+        factory = ObjectFactory()
+        config_name = factory.getUniqueString()
+        config_fixture = self.useFixture(
+            ConfigFixture(config_name, os.environ["LPCONFIG"]))
+        config_fixture.add_section(dedent("""
+            [signing]
+            signing_endpoint: http://localhost:{daemon_port}/
+            client_private_key: {client_private_key}
+            client_public_key: {client_public_key}
+            """).format(
+                daemon_port=self.daemon_port,
+                client_private_key=self.client_private_key.encode(
+                    encoder=Base64Encoder).decode("ASCII"),
+                client_public_key=self.client_private_key.public_key.encode(
+                    encoder=Base64Encoder).decode("ASCII")))
+        self.useFixture(ConfigUseFixture(config_name))
+
+    def setUpRoot(self):
+        # We don't need a root directory, but this is a convenient place to
+        # generate a client key and set environment variables.
+        self.client_private_key = PrivateKey.generate()
+
+        os.environ["FAKE_SIGNING_PORT"] = str(self.daemon_port)
+        os.environ["FAKE_SIGNING_CLIENT_PUBLIC_KEY"] = (
+            self.client_private_key.public_key.encode(
+                encoder=Base64Encoder).decode("ASCII"))