launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #24576
[Merge] ~pappacena/launchpad:unrevert-lp-signing-integration into launchpad:master
Thiago F. Pappacena has proposed merging ~pappacena/launchpad:unrevert-lp-signing-integration into launchpad:master.
Commit message:
[HOLD] Adding back the lp-signing integration, reverted due to the fact that we didn't deploy yet the database changes.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~pappacena/launchpad/+git/launchpad/+merge/381927
We should wait until https://code.launchpad.net/~pappacena/launchpad/+git/launchpad/+merge/379218 is deployed to production before merging this MP.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/launchpad:unrevert-lp-signing-integration into launchpad:master.
diff --git a/configs/development/launchpad-lazr.conf b/configs/development/launchpad-lazr.conf
index a0b8f0e..7b01979 100644
--- a/configs/development/launchpad-lazr.conf
+++ b/configs/development/launchpad-lazr.conf
@@ -182,6 +182,11 @@ tools_source: deb http://ppa.launchpad.net/snappy-dev/snapcraft-daily/ubuntu %(s
global_suggestions_enabled: True
generate_templates: True
+[signing]
+signing_endpoint = http://signing.launchpad.test:8000
+client_private_key = O73bJzd3hybyBxUKk0FaR6K9CbbmxBYkw6vCrIWZkSY=
+client_public_key = xEtwSS7kdGmo0ElcN2fR/mcHS0A42zhYbo/+5KV4xRs=
+
[profiling]
profiling_allowed: True
diff --git a/lib/lp/archivepublisher/archivegpgsigningkey.py b/lib/lp/archivepublisher/archivegpgsigningkey.py
index 2b14365..7439a32 100644
--- a/lib/lp/archivepublisher/archivegpgsigningkey.py
+++ b/lib/lp/archivepublisher/archivegpgsigningkey.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""ArchiveGPGSigningKey implementation."""
@@ -8,17 +8,12 @@ __metaclass__ = type
__all__ = [
'ArchiveGPGSigningKey',
'SignableArchive',
- 'SigningMode',
]
import os
import gpgme
-from lazr.enum import (
- EnumeratedType,
- Item,
- )
from twisted.internet.threads import deferToThread
from zope.component import getUtility
from zope.interface import implementer
@@ -43,13 +38,7 @@ from lp.services.config import config
from lp.services.gpg.interfaces import IGPGHandler
from lp.services.osutils import remove_if_exists
from lp.services.propertycache import get_property_cache
-
-
-class SigningMode(EnumeratedType):
- """Archive file signing mode."""
-
- DETACHED = Item("Detached signature")
- CLEAR = Item("Cleartext signature")
+from lp.services.signing.enums import SigningMode
@implementer(ISignableArchive)
@@ -100,6 +89,8 @@ class SignableArchive:
output_paths = []
for input_path, output_path, mode, suite in signatures:
+ if mode not in {SigningMode.DETACHED, SigningMode.CLEAR}:
+ raise ValueError('Invalid signature mode for GPG: %s' % mode)
if self.archive.signing_key is not None:
with open(input_path) as input_file:
input_content = input_file.read()
diff --git a/lib/lp/archivepublisher/signing.py b/lib/lp/archivepublisher/signing.py
index b685a61..e59ddc6 100644
--- a/lib/lp/archivepublisher/signing.py
+++ b/lib/lp/archivepublisher/signing.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2018 Canonical Ltd. This software is licensed under the
+# Copyright 2012-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""The processing of Signing tarballs.
@@ -18,6 +18,7 @@ __all__ = [
"UefiUpload",
]
+from functools import partial
import os
import shutil
import stat
@@ -27,13 +28,22 @@ import tempfile
import textwrap
import scandir
+from zope.component import getUtility
from lp.archivepublisher.config import getPubConfig
from lp.archivepublisher.customupload import CustomUpload
+from lp.registry.interfaces.distroseries import IDistroSeriesSet
+from lp.services.features import getFeatureFlag
from lp.services.osutils import remove_if_exists
+from lp.services.signing.enums import SigningKeyType
+from lp.services.signing.interfaces.signingkey import IArchiveSigningKeySet
from lp.soyuz.interfaces.queue import CustomUploadError
+PUBLISHER_USES_SIGNING_SERVICE = (
+ 'archivepublisher.signing_service.enabled')
+
+
class SigningUploadPackError(CustomUploadError):
def __init__(self, tarfile_path, exc):
message = "Problem building tarball '%s': %s" % (
@@ -41,6 +51,14 @@ class SigningUploadPackError(CustomUploadError):
CustomUploadError.__init__(self, message)
+class NoSigningKeyError(Exception):
+ pass
+
+
+class SigningServiceError(Exception):
+ pass
+
+
class SigningUpload(CustomUpload):
"""Signing custom upload.
@@ -65,6 +83,16 @@ class SigningUpload(CustomUpload):
Signing keys may be installed in the "signingroot" directory specified in
publisher configuration. In this directory, the private key is
"uefi.key" and the certificate is "uefi.crt".
+
+ This class is already prepared to use signing service. There are
+ basically two places interacting with it:
+ - findSigningHandlers(), that provides a handler to call signing
+ service to sign each file (together with a fallback handler,
+ that signs the file locally).
+
+ - copyPublishedPublicKeys(), that accepts both ways of saving public
+ keys: by copying from local file system (old way) or saving the
+ public key stored at signing service (new way).
"""
custom_type = "signing"
@@ -105,6 +133,13 @@ class SigningUpload(CustomUpload):
def setTargetDirectory(self, archive, tarfile_path, suite):
self.archive = archive
+
+ if suite:
+ self.distro_series, _ = getUtility(IDistroSeriesSet).fromSuite(
+ self.archive.distribution, suite)
+ else:
+ self.distro_series = None
+
pubconf = getPubConfig(archive)
if pubconf.signingroot is None:
if self.logger is not None:
@@ -122,7 +157,7 @@ class SigningUpload(CustomUpload):
self.fit_cert = None
self.autokey = False
else:
- signing_for = suite.split('-')[0]
+ signing_for = self.distro_series.name if self.distro_series else ''
self.uefi_key = self.getSeriesPath(
pubconf, "uefi.key", archive, signing_for)
self.uefi_cert = self.getSeriesPath(
@@ -165,26 +200,37 @@ class SigningUpload(CustomUpload):
self.archiveroot = pubconf.archiveroot
self.temproot = pubconf.temproot
- self.public_keys = set()
+ self.public_keys = {}
- def publishPublicKey(self, key):
- """Record this key as having been used in this upload."""
- self.public_keys.add(key)
+ def publishPublicKey(self, key, content=None):
+ """Record this key as having been used in this upload.
- def copyPublishedPublicKeys(self):
- """Copy out published keys into the custom upload."""
- keydir = os.path.join(self.tmpdir, self.version, "control")
- if not os.path.exists(keydir):
- os.makedirs(keydir)
- for key in self.public_keys:
- # Ensure we only emit files which are world readable.
+ :param key: Key file name
+ :param content: Key file content (if None, try to read it from local
+ filesystem)
+ """
+ if content is not None:
+ self.public_keys[key] = content
+ elif key not in self.public_keys:
+ # Ensure we only emit files which are world-readable.
if stat.S_IMODE(os.stat(key).st_mode) & stat.S_IROTH:
- shutil.copy(key, os.path.join(keydir, os.path.basename(key)))
+ with open(key, "rb") as f:
+ self.public_keys[key] = f.read()
else:
if self.logger is not None:
self.logger.warning(
"%s: public key not world readable" % key)
+ def copyPublishedPublicKeys(self):
+ """Copy out published keys into the custom upload."""
+ keydir = os.path.join(self.tmpdir, self.version, "control")
+ if not os.path.exists(keydir):
+ os.makedirs(keydir)
+ for filename, content in self.public_keys.items():
+ file_path = os.path.join(keydir, os.path.basename(filename))
+ with open(file_path, 'wb') as fd:
+ fd.write(content)
+
def setSigningOptions(self):
"""Find and extract raw-signing options from the tarball."""
self.signing_options = {}
@@ -219,22 +265,145 @@ class SigningUpload(CustomUpload):
def findSigningHandlers(self):
"""Find all the signable files in an extracted tarball."""
+ use_signing_service = bool(
+ getFeatureFlag(PUBLISHER_USES_SIGNING_SERVICE))
+
+ fallback_handlers = {
+ SigningKeyType.UEFI: self.signUefi,
+ SigningKeyType.KMOD: self.signKmod,
+ SigningKeyType.OPAL: self.signOpal,
+ SigningKeyType.SIPL: self.signSipl,
+ SigningKeyType.FIT: self.signFit,
+ }
+
for dirpath, dirnames, filenames in scandir.walk(self.tmpdir):
for filename in filenames:
+ file_path = os.path.join(dirpath, filename)
if filename.endswith(".efi"):
- yield (os.path.join(dirpath, filename), self.signUefi)
+ key_type = SigningKeyType.UEFI
elif filename.endswith(".ko"):
- yield (os.path.join(dirpath, filename), self.signKmod)
+ key_type = SigningKeyType.KMOD
elif filename.endswith(".opal"):
- yield (os.path.join(dirpath, filename), self.signOpal)
+ key_type = SigningKeyType.OPAL
elif filename.endswith(".sipl"):
- yield (os.path.join(dirpath, filename), self.signSipl)
+ key_type = SigningKeyType.SIPL
elif filename.endswith(".fit"):
- yield (os.path.join(dirpath, filename), self.signFit)
+ key_type = SigningKeyType.FIT
+ else:
+ continue
+
+ if use_signing_service:
+ key = getUtility(IArchiveSigningKeySet).getSigningKey(
+ key_type, self.archive, self.distro_series)
+ handler = partial(
+ self.signUsingSigningService, key_type, key)
+ fallback_handler = partial(
+ self.signUsingLocalKey, key_type,
+ fallback_handlers.get(key_type))
+ yield file_path, handler, fallback_handler
+ else:
+ yield file_path, fallback_handlers.get(key_type), None
+
+ def signUsingLocalKey(self, key_type, handler, filename):
+ """Sign the given filename using using handler if the local
+ key files exists. If the local key files does not exist, raises
+ IOError.
+
+ Note that this method should only be used as a fallback to signing
+ service, since it will not try to generate local keys.
+
+ :param key_type: One of the SigningKeyType items.
+ :param handler: One of the local signing handlers (self.signUefi,
+ self.signKmod, etc).
+ :param filename: The filename to be signed.
+ """
+
+ if not self.keyFilesExist(key_type):
+ raise IOError(
+ "Could not fallback to local signing keys: the key files "
+ "were not found.")
+ return handler(filename)
+
+ def keyFilesExist(self, key_type):
+ """Checks if all needed key files exists in the local filesystem
+ for the given key type.
+ """
+ fallback_keys = {
+ SigningKeyType.UEFI: [self.uefi_cert, self.uefi_key],
+ SigningKeyType.KMOD: [self.kmod_pem, self.kmod_x509],
+ SigningKeyType.OPAL: [self.opal_pem, self.opal_x509],
+ SigningKeyType.SIPL: [self.sipl_pem, self.sipl_x509],
+ SigningKeyType.FIT: [self.fit_cert, self.fit_key],
+ }
+ # If we are missing local key files, do not proceed.
+ key_files = [i for i in fallback_keys[key_type] if i]
+ return all(os.path.exists(key_file) for key_file in key_files)
+
+ def signUsingSigningService(self, key_type, signing_key, filename):
+ """Sign the given filename using a certain key hosted on signing
+ service, writes the signed content back to the filesystem and
+ publishes the public key to self.public_keys.
+
+ If the given key is None and self.autokey is set to True, this method
+ generates a key on signing service and associates it with the current
+ archive.
+
+ :param key_type: One of the SigningKeyType enum items
+ :param signing_key: The SigningKey to be used (or None,
+ to autogenerate a key if possible).
+ :param filename: The filename to be signed.
+ :return: Boolean. True if signed, or raises SigningServiceError
+ on failure.
+ """
+ if signing_key is None:
+ if not self.autokey:
+ raise NoSigningKeyError("No signing key for %s" % filename)
+ description = (
+ u"%s key for %s" % (key_type.name, self.archive.reference))
+ try:
+ signing_key = getUtility(IArchiveSigningKeySet).generate(
+ key_type, self.archive, description=description
+ ).signing_key
+ except Exception as e:
+ if self.logger:
+ self.logger.exception(
+ "Error generating signing key for %s: %s %s" %
+ (self.archive.reference, e.__class__.__name__, e))
+ raise SigningServiceError(
+ "Could not generate key %s: %s" % (key_type, e))
+
+ with open(filename, "rb") as fd:
+ content = fd.read()
+
+ try:
+ signed_content = signing_key.sign(
+ content, message_name=os.path.basename(filename))
+ except Exception as e:
+ if self.logger:
+ self.logger.exception(
+ "Error signing %s on signing service: %s %s" %
+ (filename, e.__class__.__name__, e))
+ raise SigningServiceError(
+ "Could not sign message with key %s: %s" % (signing_key, e))
+
+ if key_type in (SigningKeyType.UEFI, SigningKeyType.FIT):
+ file_suffix = ".signed"
+ public_key_suffix = ".crt"
+ else:
+ file_suffix = ".sig"
+ public_key_suffix = ".x509"
+
+ signed_filename = filename + file_suffix
+ public_key_filename = key_type.name.lower() + public_key_suffix
+
+ with open(signed_filename, 'wb') as fd:
+ fd.write(signed_content)
+
+ self.publishPublicKey(public_key_filename, signing_key.public_key)
+ return True
def getKeys(self, which, generate, *keynames):
"""Validate and return the uefi key and cert for encryption."""
-
if self.autokey:
for keyfile in keynames:
if keyfile and not os.path.exists(keyfile):
@@ -299,7 +468,7 @@ class SigningUpload(CustomUpload):
return
self.publishPublicKey(cert)
cmdl = ["sbsign", "--key", key, "--cert", cert, image]
- return self.callLog("UEFI signing", cmdl)
+ return self.callLog("UEFI signing", cmdl) == 0
openssl_config_base = textwrap.dedent("""\
[ req ]
@@ -387,7 +556,7 @@ class SigningUpload(CustomUpload):
return
self.publishPublicKey(cert)
cmdl = ["kmodsign", "-D", "sha512", pem, cert, image, image + ".sig"]
- return self.callLog("Kmod signing", cmdl)
+ return self.callLog("Kmod signing", cmdl) == 0
def generateOpalKeys(self):
"""Generate new Opal Signing Keys for this archive."""
@@ -403,7 +572,7 @@ class SigningUpload(CustomUpload):
return
self.publishPublicKey(cert)
cmdl = ["kmodsign", "-D", "sha512", pem, cert, image, image + ".sig"]
- return self.callLog("Opal signing", cmdl)
+ return self.callLog("Opal signing", cmdl) == 0
def generateSiplKeys(self):
"""Generate new Sipl Signing Keys for this archive."""
@@ -419,7 +588,7 @@ class SigningUpload(CustomUpload):
return
self.publishPublicKey(cert)
cmdl = ["kmodsign", "-D", "sha512", pem, cert, image, image + ".sig"]
- return self.callLog("SIPL signing", cmdl)
+ return self.callLog("SIPL signing", cmdl) == 0
def generateFitKeys(self):
"""Generate new FIT Keys for this archive."""
@@ -439,7 +608,7 @@ class SigningUpload(CustomUpload):
shutil.copy(image, image_signed)
cmdl = ["mkimage", "-F", "-k", os.path.dirname(key), "-r",
image_signed]
- return self.callLog("FIT signing", cmdl)
+ return self.callLog("FIT signing", cmdl) == 0
def convertToTarball(self):
"""Convert unpacked output to signing tarball."""
@@ -467,10 +636,18 @@ class SigningUpload(CustomUpload):
"""
super(SigningUpload, self).extract()
self.setSigningOptions()
- filehandlers = list(self.findSigningHandlers())
- for (filename, handler) in filehandlers:
- if (handler(filename) == 0 and
- 'signed-only' in self.signing_options):
+ for filename, handler, fallback_handler in self.findSigningHandlers():
+ try:
+ was_signed = handler(filename)
+ except (NoSigningKeyError, SigningServiceError) as e:
+ if fallback_handler is not None and self.logger:
+ self.logger.warning(
+ "Signing service will try to fallback to local key. "
+ "Reason: %s (%s)" % (e.__class__.__name__, e))
+ was_signed = False
+ if not was_signed and fallback_handler is not None:
+ was_signed = fallback_handler(filename)
+ if was_signed and 'signed-only' in self.signing_options:
os.unlink(filename)
# Copy out the public keys where they were used.
@@ -514,5 +691,4 @@ class UefiUpload(SigningUpload):
packages are converted to the new form and location.
"""
custom_type = "uefi"
-
dists_directory = "uefi"
diff --git a/lib/lp/archivepublisher/tests/test_publisher.py b/lib/lp/archivepublisher/tests/test_publisher.py
index d94fd34..1a2b664 100644
--- a/lib/lp/archivepublisher/tests/test_publisher.py
+++ b/lib/lp/archivepublisher/tests/test_publisher.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2019 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Tests for publisher class."""
@@ -32,11 +32,13 @@ import time
from debian.deb822 import Release
from fixtures import MonkeyPatch
+
+
try:
import lzma
except ImportError:
from backports import lzma
-import mock
+from lp.services.compat import mock
import pytz
import scandir
from testscenarios import (
diff --git a/lib/lp/archivepublisher/tests/test_signing.py b/lib/lp/archivepublisher/tests/test_signing.py
index efe4ae6..8295aef 100644
--- a/lib/lp/archivepublisher/tests/test_signing.py
+++ b/lib/lp/archivepublisher/tests/test_signing.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2019 Canonical Ltd. This software is licensed under the
+# Copyright 2012-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Test UEFI custom uploads."""
@@ -9,10 +9,12 @@ __metaclass__ = type
import os
import re
+import shutil
import stat
import tarfile
from fixtures import MonkeyPatch
+from mock import call
import scandir
from testtools.matchers import (
Contains,
@@ -21,6 +23,7 @@ from testtools.matchers import (
Matcher,
MatchesAll,
MatchesDict,
+ MatchesStructure,
Mismatch,
Not,
StartsWith,
@@ -39,11 +42,16 @@ from lp.archivepublisher.interfaces.archivegpgsigningkey import (
)
from lp.archivepublisher.interfaces.publisherconfig import IPublisherConfigSet
from lp.archivepublisher.signing import (
+ PUBLISHER_USES_SIGNING_SERVICE,
SigningUpload,
UefiUpload,
)
from lp.archivepublisher.tests.test_run_parts import RunPartsMixin
+from lp.services.features.testing import FeatureFixture
from lp.services.osutils import write_file
+from lp.services.signing.enums import SigningMode
+from lp.services.signing.proxy import SigningKeyType
+from lp.services.signing.tests.helpers import SigningServiceClientFixture
from lp.services.tarfile_helpers import LaunchpadWriteTarFile
from lp.soyuz.enums import ArchivePurpose
from lp.testing import TestCaseWithFactory
@@ -184,7 +192,9 @@ class TestSigningHelpers(TestCaseWithFactory):
distribution=self.distro, purpose=ArchivePurpose.PRIMARY)
self.signing_dir = os.path.join(
self.temp_dir, self.distro.name + "-signing")
- self.suite = "distroseries"
+ self.distroseries = self.factory.makeDistroSeries(
+ distribution=self.distro)
+ self.suite = self.distroseries.name
pubconf = getPubConfig(self.archive)
if not os.path.exists(pubconf.temproot):
os.makedirs(pubconf.temproot)
@@ -267,7 +277,7 @@ class TestSigningHelpers(TestCaseWithFactory):
return os.path.join(pubconf.archiveroot, "dists", self.suite, "main")
-class TestSigning(RunPartsMixin, TestSigningHelpers):
+class TestLocalSigningUpload(RunPartsMixin, TestSigningHelpers):
def getSignedPath(self, loader_type, arch):
return os.path.join(self.getDistsPath(), "signed",
@@ -604,7 +614,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.generateUefiKeys = FakeMethod()
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", self.suite)
upload.signUefi('t.efi')
self.assertEqual(1, fake_call.call_count)
# Assert command form.
@@ -624,7 +634,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.generateUefiKeys = FakeMethod()
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", self.suite)
upload.signUefi('t.efi')
self.assertEqual(0, fake_call.call_count)
self.assertEqual(0, upload.generateUefiKeys.call_count)
@@ -638,7 +648,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
self.useFixture(MonkeyPatch("subprocess.call", fake_call))
upload = SigningUpload()
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", self.suite)
upload.generateUefiKeys()
self.assertEqual(1, fake_call.call_count)
# Assert the actual command matches.
@@ -662,7 +672,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.generateFitKeys = FakeMethod()
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", self.suite)
upload.signFit('t.fit')
# Confirm the copy was performed.
self.assertEqual(1, fake_copy.call_count)
@@ -687,7 +697,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.generateFitKeys = FakeMethod()
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", self.suite)
upload.signUefi('t.fit')
self.assertEqual(0, fake_call.call_count)
self.assertEqual(0, upload.generateFitKeys.call_count)
@@ -701,7 +711,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
self.useFixture(MonkeyPatch("subprocess.call", fake_call))
upload = SigningUpload()
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", self.suite)
upload.generateFitKeys()
self.assertEqual(1, fake_call.call_count)
# Assert the actual command matches.
@@ -720,7 +730,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
self.setUpPPA()
upload = SigningUpload()
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", self.suite)
text = upload.generateOpensslConfig('Kmod', upload.openssl_config_kmod)
id_re = re.compile(r'^# KMOD OpenSSL config\n')
@@ -743,7 +753,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.generateKmodKeys = FakeMethod()
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", self.suite)
upload.signKmod('t.ko')
self.assertEqual(1, fake_call.call_count)
# Assert command form.
@@ -764,7 +774,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.generateKmodKeys = FakeMethod()
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", self.suite)
upload.signKmod('t.ko')
self.assertEqual(0, fake_call.call_count)
self.assertEqual(0, upload.generateKmodKeys.call_count)
@@ -778,7 +788,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
self.useFixture(MonkeyPatch("subprocess.call", fake_call))
upload = SigningUpload()
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", self.suite)
upload.generateKmodKeys()
self.assertEqual(2, fake_call.call_count)
# Assert the actual command matches.
@@ -806,7 +816,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
self.setUpPPA()
upload = SigningUpload()
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", self.suite)
text = upload.generateOpensslConfig('Opal', upload.openssl_config_opal)
id_re = re.compile(r'^# OPAL OpenSSL config\n')
@@ -826,7 +836,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.generateOpalKeys = FakeMethod()
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", self.suite)
upload.signOpal('t.opal')
self.assertEqual(1, fake_call.call_count)
# Assert command form.
@@ -847,7 +857,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.generateOpalKeys = FakeMethod()
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", self.suite)
upload.signOpal('t.opal')
self.assertEqual(0, fake_call.call_count)
self.assertEqual(0, upload.generateOpalKeys.call_count)
@@ -861,7 +871,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
self.useFixture(MonkeyPatch("subprocess.call", fake_call))
upload = SigningUpload()
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", self.suite)
upload.generateOpalKeys()
self.assertEqual(2, fake_call.call_count)
# Assert the actual command matches.
@@ -889,7 +899,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
self.setUpPPA()
upload = SigningUpload()
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", self.suite)
text = upload.generateOpensslConfig('SIPL', upload.openssl_config_sipl)
id_re = re.compile(r'^# SIPL OpenSSL config\n')
@@ -909,7 +919,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.generateSiplKeys = FakeMethod()
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", self.suite)
upload.signSipl('t.sipl')
self.assertEqual(1, fake_call.call_count)
# Assert command form.
@@ -930,7 +940,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.generateSiplKeys = FakeMethod()
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", self.suite)
upload.signOpal('t.sipl')
self.assertEqual(0, fake_call.call_count)
self.assertEqual(0, upload.generateSiplKeys.call_count)
@@ -944,7 +954,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
self.useFixture(MonkeyPatch("subprocess.call", fake_call))
upload = SigningUpload()
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", self.suite)
upload.generateSiplKeys()
self.assertEqual(2, fake_call.call_count)
# Assert the actual command matches.
@@ -979,22 +989,20 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
This should fall through to the first series,
as the second does not have keys.
"""
- self.suite = "nokeys-distroseries"
first_series = self.factory.makeDistroSeries(
self.distro,
name="existingkeys"
)
- self.factory.makeDistroSeries(
- self.distro,
- name="nokeys"
- )
+ self.distroseries = self.factory.makeDistroSeries(
+ self.distro, name="nokeys")
+ self.suite = self.distroseries.name
# Each image in the tarball is signed.
self.setUpUefiKeys()
self.setUpUefiKeys(series=first_series)
self.openArchive("test", "1.0", "amd64")
self.tarfile.add_file("1.0/empty.efi", b"")
upload = self.process_emulate()
- expected_callers = [('UEFI signing', 1),]
+ expected_callers = [('UEFI signing', 1)]
self.assertContentEqual(expected_callers, upload.callLog.caller_list())
# Check the correct series name appears in the call arguments
self.assertIn(
@@ -1103,7 +1111,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.callLog = FakeMethodCallLog(upload=upload)
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", "")
upload.signUefi(os.path.join(self.makeTemporaryDirectory(), 't.efi'))
self.assertEqual(0, upload.callLog.caller_count('UEFI keygen'))
self.assertFalse(os.path.exists(self.key))
@@ -1120,7 +1128,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.callLog = FakeMethodCallLog(upload=upload)
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", "")
upload.signUefi(os.path.join(self.makeTemporaryDirectory(), 't.efi'))
self.assertEqual(1, upload.callLog.caller_count('UEFI keygen'))
self.assertTrue(os.path.exists(self.key))
@@ -1138,7 +1146,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.callLog = FakeMethodCallLog(upload=upload)
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", "")
upload.signFit(os.path.join(self.makeTemporaryDirectory(), 'fit'))
self.assertEqual(0, upload.callLog.caller_count('FIT keygen'))
self.assertFalse(os.path.exists(self.fit_key))
@@ -1157,7 +1165,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.callLog = FakeMethodCallLog(upload=upload)
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", "")
upload.signFit(os.path.join(self.makeTemporaryDirectory(), 't.fit'))
self.assertEqual(1, upload.callLog.caller_count('FIT keygen'))
self.assertTrue(os.path.exists(self.fit_key))
@@ -1175,7 +1183,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.callLog = FakeMethodCallLog(upload=upload)
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", "")
upload.signKmod(os.path.join(self.makeTemporaryDirectory(), 't.ko'))
self.assertEqual(0, upload.callLog.caller_count('Kmod keygen key'))
self.assertEqual(0, upload.callLog.caller_count('Kmod keygen cert'))
@@ -1193,7 +1201,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.callLog = FakeMethodCallLog(upload=upload)
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", "")
upload.signKmod(os.path.join(self.makeTemporaryDirectory(), 't.ko'))
self.assertEqual(1, upload.callLog.caller_count('Kmod keygen key'))
self.assertEqual(1, upload.callLog.caller_count('Kmod keygen cert'))
@@ -1212,7 +1220,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.callLog = FakeMethodCallLog(upload=upload)
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", "")
upload.signOpal(os.path.join(self.makeTemporaryDirectory(), 't.opal'))
self.assertEqual(0, upload.callLog.caller_count('Opal keygen key'))
self.assertEqual(0, upload.callLog.caller_count('Opal keygen cert'))
@@ -1230,7 +1238,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.callLog = FakeMethodCallLog(upload=upload)
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", "")
upload.signOpal(os.path.join(self.makeTemporaryDirectory(), 't.opal'))
self.assertEqual(1, upload.callLog.caller_count('Opal keygen key'))
self.assertEqual(1, upload.callLog.caller_count('Opal keygen cert'))
@@ -1249,7 +1257,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.callLog = FakeMethodCallLog(upload=upload)
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", "")
upload.signOpal(os.path.join(self.makeTemporaryDirectory(), 't.sipl'))
self.assertEqual(0, upload.callLog.caller_count('SIPL keygen key'))
self.assertEqual(0, upload.callLog.caller_count('SIPL keygen cert'))
@@ -1267,7 +1275,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
upload = SigningUpload()
upload.callLog = FakeMethodCallLog(upload=upload)
upload.setTargetDirectory(
- self.archive, "test_1.0_amd64.tar.gz", "distroseries")
+ self.archive, "test_1.0_amd64.tar.gz", "")
upload.signSipl(os.path.join(self.makeTemporaryDirectory(), 't.sipl'))
self.assertEqual(1, upload.callLog.caller_count('SIPL keygen key'))
self.assertEqual(1, upload.callLog.caller_count('SIPL keygen cert'))
@@ -1290,7 +1298,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
self.tarfile.add_file("1.0/empty.sipl", b"")
self.process_emulate()
sha256file = os.path.join(self.getSignedPath("test", "amd64"),
- "1.0", "SHA256SUMS")
+ "1.0", "SHA256SUMS")
self.assertTrue(os.path.exists(sha256file))
@defer.inlineCallbacks
@@ -1309,7 +1317,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
self.tarfile.add_file("1.0/empty.sipl", b"")
self.process_emulate()
sha256file = os.path.join(self.getSignedPath("test", "amd64"),
- "1.0", "SHA256SUMS")
+ "1.0", "SHA256SUMS")
self.assertTrue(os.path.exists(sha256file))
self.assertThat(
sha256file + '.gpg',
@@ -1333,7 +1341,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
self.tarfile.add_file("1.0/empty.sipl", b"")
self.process_emulate()
sha256file = os.path.join(self.getSignedPath("test", "amd64"),
- "1.0", "SHA256SUMS")
+ "1.0", "SHA256SUMS")
self.assertTrue(os.path.exists(sha256file))
self.assertThat(
sha256file + '.gpg',
@@ -1344,10 +1352,10 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
"1.0", "signed.tar.gz")
with tarfile.open(tarfilename) as tarball:
self.assertThat(tarball.getnames(), MatchesAll(*[
- Not(Contains(name)) for name in [
- "1.0/SHA256SUMS", "1.0/SHA256SUMS.gpg",
- "1.0/signed.tar.gz",
- ]]))
+ Not(Contains(name)) for name in [
+ "1.0/SHA256SUMS", "1.0/SHA256SUMS.gpg",
+ "1.0/signed.tar.gz",
+ ]]))
def test_checksumming_tree_signed_with_external_run_parts(self):
# Checksum files can be signed using an external run-parts helper.
@@ -1368,7 +1376,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
self.tarfile.add_file("1.0/empty.sipl", "")
self.process_emulate()
sha256file = os.path.join(self.getSignedPath("test", "amd64"),
- "1.0", "SHA256SUMS")
+ "1.0", "SHA256SUMS")
self.assertTrue(os.path.exists(sha256file))
self.assertEqual(1, run_parts_fixture.new_value.call_count)
args, kwargs = run_parts_fixture.new_value.calls[-1]
@@ -1499,3 +1507,500 @@ class TestUefi(TestSigningHelpers):
self.getDistsPath(), "uefi")))
self.assertTrue(os.path.exists(os.path.join(
self.getSignedPath("test", "amd64"), "1.0", "empty.efi")))
+
+
+class TestSigningUploadWithSigningService(TestSigningHelpers):
+ """Tests for SigningUpload using lp-signing service
+ """
+ layer = ZopelessDatabaseLayer
+
+ def setUp(self):
+ super(TestSigningUploadWithSigningService, self).setUp()
+ self.useFixture(FeatureFixture({PUBLISHER_USES_SIGNING_SERVICE: True}))
+
+ self.signing_service_client = self.useFixture(
+ SigningServiceClientFixture(self.factory))
+ self.signing_keys = {
+ k: v.signing_key for k, v in self.setUpAllKeyTypes(
+ self.archive).items()}
+
+ def setUpAllKeyTypes(self, archive):
+ """Helper to create
+
+ :return: A dict like {key_type: signing_key} with all keys available.
+ """
+ keys_per_type = {}
+ for key_type in SigningKeyType.items:
+ signing_key = self.factory.makeSigningKey(key_type=key_type)
+ arch_key = self.factory.makeArchiveSigningKey(
+ archive=archive, signing_key=signing_key)
+ keys_per_type[key_type] = arch_key
+ return keys_per_type
+
+ def getArchiveSigningKey(self, key_type):
+ signing_key = self.factory.makeSigningKey(key_type=key_type)
+ arch_signing_key = self.factory.makeArchiveSigningKey(
+ archive=self.archive, signing_key=signing_key)
+ return arch_signing_key
+
+ @staticmethod
+ def getFileListContent(basedir, filenames):
+ contents = []
+ for filename in filenames:
+ with open(os.path.join(basedir, filename), 'rb') as fd:
+ contents.append(fd.read())
+ return contents
+
+ def getSignedPath(self, loader_type, arch):
+ return os.path.join(self.getDistsPath(), "signed",
+ "%s-%s" % (loader_type, arch))
+
+ def process_emulate(self):
+ """Shortcut to close tarfile and run SigningUpload.process.
+ """
+ self.tarfile.close()
+ self.buffer.close()
+
+ upload = SigningUpload()
+ upload.process(self.archive, self.path, self.suite)
+ return upload
+
+ def test_set_target_directory_with_distroseries(self):
+ archive = self.factory.makeArchive()
+ series_name = archive.distribution.series[1].name
+
+ upload = SigningUpload()
+ upload.setTargetDirectory(
+ archive, "test_1.0_amd64.tar.gz", series_name)
+
+ pubconfig = getPubConfig(archive)
+ self.assertThat(upload, MatchesStructure.byEquality(
+ distro_series=archive.distribution.series[1],
+ archive=archive,
+ autokey=pubconfig.signingautokey))
+ self.assertEqual(0, self.signing_service_client.generate.call_count)
+ self.assertEqual(0, self.signing_service_client.sign.call_count)
+
+ def test_options_handling_single(self):
+ """If the configured key/cert are missing, processing succeeds but
+ nothing is signed.
+ """
+ self.openArchive("test", "1.0", "amd64")
+ self.tarfile.add_file("1.0/control/options", b"first\n")
+
+ upload = self.process_emulate()
+
+ self.assertContentEqual(['first'], upload.signing_options.keys())
+
+ self.assertEqual(0, self.signing_service_client.generate.call_count)
+ self.assertEqual(0, self.signing_service_client.sign.call_count)
+
+ def test_options_handling_multiple(self):
+ """If the configured key/cert are missing, processing succeeds but
+ nothing is signed.
+ """
+ self.openArchive("test", "1.0", "amd64")
+ self.tarfile.add_file("1.0/control/options", b"first\nsecond\n")
+
+ upload = self.process_emulate()
+
+ self.assertContentEqual(['first', 'second'],
+ upload.signing_options.keys())
+ self.assertEqual(0, self.signing_service_client.generate.call_count)
+ self.assertEqual(0, self.signing_service_client.sign.call_count)
+
+ def test_options_tarball(self):
+ """Specifying the "tarball" option should create an tarball in tmpdir.
+ """
+ self.openArchive("test", "1.0", "amd64")
+ self.tarfile.add_file("1.0/control/options", b"tarball")
+ self.tarfile.add_file("1.0/empty.efi", b"a")
+ self.tarfile.add_file("1.0/empty.ko", b"b")
+ self.tarfile.add_file("1.0/empty.opal", b"c")
+ self.tarfile.add_file("1.0/empty.sipl", b"d")
+ self.tarfile.add_file("1.0/empty.fit", b"e")
+
+ self.process_emulate()
+
+ self.assertThat(self.getSignedPath("test", "amd64"), SignedMatches([
+ "1.0/SHA256SUMS",
+ "1.0/signed.tar.gz",
+ ]))
+ tarfilename = os.path.join(self.getSignedPath("test", "amd64"),
+ "1.0", "signed.tar.gz")
+ with tarfile.open(tarfilename) as tarball:
+ self.assertContentEqual([
+ '1.0', '1.0/control', '1.0/control/options',
+ '1.0/empty.efi', '1.0/empty.efi.signed',
+ '1.0/control/uefi.crt',
+ '1.0/empty.ko', '1.0/empty.ko.sig', '1.0/control/kmod.x509',
+ '1.0/empty.opal', '1.0/empty.opal.sig',
+ '1.0/control/opal.x509',
+ '1.0/empty.sipl', '1.0/empty.sipl.sig',
+ '1.0/control/sipl.x509',
+ '1.0/empty.fit', '1.0/empty.fit.signed',
+ '1.0/control/fit.crt',
+ ], tarball.getnames())
+ self.assertEqual(0, self.signing_service_client.generate.call_count)
+ keys = self.signing_keys
+ self.assertItemsEqual([
+ call(
+ SigningKeyType.UEFI, keys[SigningKeyType.UEFI].fingerprint,
+ 'empty.efi', b'a', SigningMode.ATTACHED),
+ call(
+ SigningKeyType.KMOD, keys[SigningKeyType.KMOD].fingerprint,
+ 'empty.ko', b'b', SigningMode.DETACHED),
+ call(
+ SigningKeyType.OPAL, keys[SigningKeyType.OPAL].fingerprint,
+ 'empty.opal', b'c', SigningMode.DETACHED),
+ call(
+ SigningKeyType.SIPL, keys[SigningKeyType.SIPL].fingerprint,
+ 'empty.sipl', b'd', SigningMode.DETACHED),
+ call(
+ SigningKeyType.FIT, keys[SigningKeyType.FIT].fingerprint,
+ 'empty.fit', b'e', SigningMode.ATTACHED)],
+ self.signing_service_client.sign.call_args_list)
+
+ def test_options_signed_only(self):
+ """Specifying the "signed-only" option should trigger removal of
+ the source files leaving signatures only.
+ """
+ self.openArchive("test", "1.0", "amd64")
+ self.tarfile.add_file("1.0/control/options", b"signed-only")
+ self.tarfile.add_file("1.0/empty.efi", b"a")
+ self.tarfile.add_file("1.0/empty.ko", b"b")
+ self.tarfile.add_file("1.0/empty.opal", b"c")
+ self.tarfile.add_file("1.0/empty.sipl", b"d")
+ self.tarfile.add_file("1.0/empty.fit", b"e")
+
+ self.process_emulate()
+
+ self.assertThat(self.getSignedPath("test", "amd64"), SignedMatches([
+ "1.0/SHA256SUMS", "1.0/control/options",
+ "1.0/empty.efi.signed", "1.0/control/uefi.crt",
+ "1.0/empty.ko.sig", "1.0/control/kmod.x509",
+ "1.0/empty.opal.sig", "1.0/control/opal.x509",
+ "1.0/empty.sipl.sig", "1.0/control/sipl.x509",
+ "1.0/empty.fit.signed", "1.0/control/fit.crt",
+ ]))
+ self.assertEqual(0, self.signing_service_client.generate.call_count)
+ keys = self.signing_keys
+ self.assertItemsEqual([
+ call(
+ SigningKeyType.UEFI, keys[SigningKeyType.UEFI].fingerprint,
+ 'empty.efi', b'a', SigningMode.ATTACHED),
+ call(
+ SigningKeyType.KMOD, keys[SigningKeyType.KMOD].fingerprint,
+ 'empty.ko', b'b', SigningMode.DETACHED),
+ call(
+ SigningKeyType.OPAL, keys[SigningKeyType.OPAL].fingerprint,
+ 'empty.opal', b'c', SigningMode.DETACHED),
+ call(
+ SigningKeyType.SIPL, keys[SigningKeyType.SIPL].fingerprint,
+ 'empty.sipl', b'd', SigningMode.DETACHED),
+ call(
+ SigningKeyType.FIT, keys[SigningKeyType.FIT].fingerprint,
+ 'empty.fit', b'e', SigningMode.ATTACHED)],
+ self.signing_service_client.sign.call_args_list)
+
+ def test_options_tarball_signed_only(self):
+ """Specifying the "tarball" option should create an tarball in
+ the tmpdir. Adding signed-only should trigger removal of the
+ original files.
+ """
+ self.openArchive("test", "1.0", "amd64")
+ self.tarfile.add_file("1.0/control/options", b"tarball\nsigned-only")
+ self.tarfile.add_file("1.0/empty.efi", b"a")
+ self.tarfile.add_file("1.0/empty.ko", b"b")
+ self.tarfile.add_file("1.0/empty.opal", b"c")
+ self.tarfile.add_file("1.0/empty.sipl", b"d")
+ self.tarfile.add_file("1.0/empty.fit", b"e")
+ self.process_emulate()
+ self.assertThat(self.getSignedPath("test", "amd64"), SignedMatches([
+ "1.0/SHA256SUMS",
+ "1.0/signed.tar.gz",
+ ]))
+ tarfilename = os.path.join(self.getSignedPath("test", "amd64"),
+ "1.0", "signed.tar.gz")
+ with tarfile.open(tarfilename) as tarball:
+ self.assertContentEqual([
+ '1.0', '1.0/control', '1.0/control/options',
+ '1.0/empty.efi.signed', '1.0/control/uefi.crt',
+ '1.0/empty.ko.sig', '1.0/control/kmod.x509',
+ '1.0/empty.opal.sig', '1.0/control/opal.x509',
+ '1.0/empty.sipl.sig', '1.0/control/sipl.x509',
+ '1.0/empty.fit.signed', '1.0/control/fit.crt',
+ ], tarball.getnames())
+ self.assertEqual(0, self.signing_service_client.generate.call_count)
+ keys = self.signing_keys
+ self.assertItemsEqual([
+ call(
+ SigningKeyType.UEFI, keys[SigningKeyType.UEFI].fingerprint,
+ 'empty.efi', b'a', SigningMode.ATTACHED),
+ call(
+ SigningKeyType.KMOD, keys[SigningKeyType.KMOD].fingerprint,
+ 'empty.ko', b'b', SigningMode.DETACHED),
+ call(
+ SigningKeyType.OPAL, keys[SigningKeyType.OPAL].fingerprint,
+ 'empty.opal', b'c', SigningMode.DETACHED),
+ call(
+ SigningKeyType.SIPL, keys[SigningKeyType.SIPL].fingerprint,
+ 'empty.sipl', b'd', SigningMode.DETACHED),
+ call(
+ SigningKeyType.FIT, keys[SigningKeyType.FIT].fingerprint,
+ 'empty.fit', b'e', SigningMode.ATTACHED)],
+ self.signing_service_client.sign.call_args_list)
+
+ def test_archive_copy(self):
+ """If there is no key/cert configuration, processing succeeds but
+ nothing is signed.
+ """
+ self.archive = self.factory.makeArchive(
+ distribution=self.distro, purpose=ArchivePurpose.COPY)
+
+ pubconf = getPubConfig(self.archive)
+ if not os.path.exists(pubconf.temproot):
+ os.makedirs(pubconf.temproot)
+ self.openArchive("test", "1.0", "amd64")
+ self.tarfile.add_file("1.0/empty.efi", b"a")
+ self.tarfile.add_file("1.0/empty.ko", b"b")
+ self.tarfile.add_file("1.0/empty.opal", b"c")
+ self.tarfile.add_file("1.0/empty.sipl", b"d")
+ self.tarfile.add_file("1.0/empty.fit", b"e")
+ self.tarfile.close()
+ self.buffer.close()
+
+ upload = SigningUpload()
+ upload.process(self.archive, self.path, self.suite)
+
+ signed_path = self.getSignedPath("test", "amd64")
+ self.assertThat(signed_path, SignedMatches(
+ ["1.0/SHA256SUMS", "1.0/empty.efi", "1.0/empty.ko",
+ "1.0/empty.opal", "1.0/empty.sipl", "1.0/empty.fit", ]))
+
+ self.assertEqual(0, self.signing_service_client.generate.call_count)
+ self.assertEqual(0, self.signing_service_client.sign.call_count)
+
+ def test_sign_without_autokey_and_no_key_pre_set(self):
+ """This case should raise exception, since we don't have fallback
+ keys on the filesystem to cover for the missing signing service
+ keys.
+ """
+ self.distro = self.factory.makeDistribution()
+ self.distroseries = self.factory.makeDistroSeries(
+ distribution=self.distro)
+ self.suite = self.distroseries.name
+ self.archive = self.factory.makeArchive(
+ distribution=self.distro, purpose=ArchivePurpose.PRIMARY)
+
+ filenames = [
+ "1.0/empty.efi", "1.0/empty.ko", "1.0/empty.opal",
+ "1.0/empty.sipl", "1.0/empty.fit"]
+
+ # Write data on the archive
+ self.openArchive("test", "1.0", "amd64")
+ for filename in filenames:
+ self.tarfile.add_file(filename, b"somedata for %s" % filename)
+
+ self.assertRaises(IOError, self.process_emulate)
+
+ def test_sign_without_autokey_and_some_keys_pre_set(self):
+ """For no autokey archives, signing process should sign only for the
+ available keys, and skip signing the other files.
+ """
+ # Pre-generate KMOD and OPAL keys
+ self.getArchiveSigningKey(SigningKeyType.KMOD)
+ self.getArchiveSigningKey(SigningKeyType.OPAL)
+
+ filenames = ["1.0/empty.ko", "1.0/empty.opal"]
+
+ self.openArchive("test", "1.0", "amd64")
+ for filename in filenames:
+ self.tarfile.add_file(filename, b"some data for %s" % filename)
+
+ self.process_emulate()
+
+ signed_path = self.getSignedPath("test", "amd64")
+ self.assertThat(signed_path, SignedMatches(filenames + [
+ "1.0/SHA256SUMS", "1.0/empty.ko.sig", "1.0/empty.opal.sig",
+ "1.0/control/kmod.x509", "1.0/control/opal.x509"]))
+
+ self.assertEqual(0, self.signing_service_client.generate.call_count)
+ keys = self.signing_keys
+ self.assertItemsEqual([
+ call(
+ SigningKeyType.KMOD, keys[SigningKeyType.KMOD].fingerprint,
+ 'empty.ko', b'some data for 1.0/empty.ko',
+ SigningMode.DETACHED),
+ call(
+ SigningKeyType.OPAL, keys[SigningKeyType.OPAL].fingerprint,
+ 'empty.opal', b'some data for 1.0/empty.opal',
+ SigningMode.DETACHED)],
+ self.signing_service_client.sign.call_args_list)
+
+ def test_sign_with_autokey_ppa(self):
+ # PPAs should auto-generate keys. Let's use one for this test.
+ self.setUpPPA()
+
+ filenames = [
+ "1.0/empty.efi", "1.0/empty.ko", "1.0/empty.opal",
+ "1.0/empty.sipl", "1.0/empty.fit"]
+
+ self.openArchive("test", "1.0", "amd64")
+ for filename in filenames:
+ self.tarfile.add_file(filename, b"data - %s" % filename)
+
+ self.tarfile.close()
+ self.buffer.close()
+
+ upload = SigningUpload()
+ upload.process(self.archive, self.path, self.suite)
+
+ self.assertTrue(upload.autokey)
+
+ expected_signed_filenames = [
+ "1.0/empty.efi.signed", "1.0/empty.ko.sig",
+ "1.0/empty.opal.sig", "1.0/empty.sipl.sig",
+ "1.0/empty.fit.signed"]
+
+ expected_public_keys_filenames = [
+ "1.0/control/uefi.crt", "1.0/control/kmod.x509",
+ "1.0/control/opal.x509", "1.0/control/sipl.x509",
+ "1.0/control/fit.crt"]
+
+ signed_path = self.getSignedPath("test", "amd64")
+ self.assertThat(signed_path, SignedMatches(
+ ["1.0/SHA256SUMS"] + filenames + expected_public_keys_filenames +
+ expected_signed_filenames))
+
+ self.assertEqual(5, self.signing_service_client.generate.call_count)
+ self.assertEqual(5, self.signing_service_client.sign.call_count)
+
+ fingerprints = {
+ key_type: data['fingerprint'] for key_type, data in
+ self.signing_service_client.generate_returns}
+ self.assertItemsEqual([
+ call(
+ SigningKeyType.UEFI, fingerprints[SigningKeyType.UEFI],
+ 'empty.efi', b'data - 1.0/empty.efi', SigningMode.ATTACHED),
+ call(
+ SigningKeyType.KMOD, fingerprints[SigningKeyType.KMOD],
+ 'empty.ko', b'data - 1.0/empty.ko', SigningMode.DETACHED),
+ call(
+ SigningKeyType.OPAL, fingerprints[SigningKeyType.OPAL],
+ 'empty.opal', b'data - 1.0/empty.opal', SigningMode.DETACHED),
+ call(
+ SigningKeyType.SIPL, fingerprints[SigningKeyType.SIPL],
+ 'empty.sipl', b'data - 1.0/empty.sipl', SigningMode.DETACHED),
+ call(
+ SigningKeyType.FIT, fingerprints[SigningKeyType.FIT],
+ 'empty.fit', b'data - 1.0/empty.fit', SigningMode.ATTACHED)],
+ self.signing_service_client.sign.call_args_list)
+
+ # Checks that all files got signed
+ contents = self.getFileListContent(
+ signed_path, expected_signed_filenames)
+ key_types = (
+ SigningKeyType.UEFI, SigningKeyType.KMOD, SigningKeyType.OPAL,
+ SigningKeyType.SIPL, SigningKeyType.FIT)
+ expected_signed_contents = [
+ b"signed with key_type=%s" % k.name for k in key_types]
+ self.assertItemsEqual(expected_signed_contents, contents)
+
+ # Checks that all public keys ended up in the 1.0/control/xxx files
+ public_keys = {
+ key_type: data['public-key'] for key_type, data in
+ self.signing_service_client.generate_returns}
+ contents = self.getFileListContent(
+ signed_path, expected_public_keys_filenames)
+ expected_public_keys = [
+ public_keys[k] for k in key_types]
+ self.assertEqual(expected_public_keys, contents)
+
+ def test_fallback_handler(self):
+ upload = SigningUpload()
+
+ # Creating a new archive since our setUp method fills the self.archive
+ # with signing keys, and we don't want that here.
+ self.distro = self.factory.makeDistribution()
+ self.distroseries = self.factory.makeDistroSeries(
+ distribution=self.distro)
+ self.suite = self.distroseries.name
+ self.archive = self.factory.makeArchive(
+ distribution=self.distro,
+ purpose=ArchivePurpose.PRIMARY)
+ pubconf = getPubConfig(self.archive)
+ if not os.path.exists(pubconf.temproot):
+ os.makedirs(pubconf.temproot)
+ self.addCleanup(lambda: shutil.rmtree(pubconf.temproot, True))
+ old_umask = os.umask(0o022)
+ self.addCleanup(os.umask, old_umask)
+ self.addCleanup(lambda: shutil.rmtree(pubconf.distroroot, True))
+
+ # Make KMOD signing fail with an exception.
+ def mock_sign(key_type, *args, **kwargs):
+ if key_type == SigningKeyType.KMOD:
+ raise ValueError("!!")
+ return self.signing_service_client._sign(key_type, *args, **kwargs)
+
+ self.signing_service_client.sign.side_effect = mock_sign
+
+ # Pre-set KMOD fails on ".sign" method (should fallback to local
+ # signing method).
+ self.getArchiveSigningKey(SigningKeyType.KMOD)
+ upload.signKmod = FakeMethod(result=0)
+
+ # We don't have a signing service key for UEFI. Should fallback too.
+ upload.signUefi = FakeMethod(result=0)
+
+ # OPAL key works just fine.
+ self.getArchiveSigningKey(SigningKeyType.OPAL)
+ upload.signOpal = FakeMethod(result=0)
+
+ filenames = ["1.0/empty.efi", "1.0/empty.ko", "1.0/empty.opal"]
+
+ self.openArchive("test", "1.0", "amd64")
+ for filename in filenames:
+ self.tarfile.add_file(filename, b"data - %s" % filename)
+
+ self.tarfile.close()
+ self.buffer.close()
+
+ # Small hack to keep the tmpdir used during upload.process
+ # Without this hack, upload.tmpdir is set back to None at the end of
+ # process() method execution, during cleanup phase.
+ original_cleanup = upload.cleanup
+
+ def intercept_cleanup():
+ upload.tmpdir_used = upload.tmpdir
+ original_cleanup()
+
+ upload.cleanup = intercept_cleanup
+
+ # Pretend that all key files exists, so the fallback calls are not
+ # blocked.
+ upload.keyFilesExist = lambda _: True
+
+ upload.process(self.archive, self.path, self.suite)
+
+ # Make sure it only used the existing keys and fallbacks. No new key
+ # should be generated.
+ self.assertFalse(upload.autokey)
+
+ self.assertEqual(0, self.signing_service_client.generate.call_count)
+ self.assertEqual(2, self.signing_service_client.sign.call_count)
+
+ # Check kmod signing
+ self.assertEqual(1, upload.signKmod.call_count)
+ self.assertEqual(
+ [(os.path.join(upload.tmpdir_used, "1.0/empty.ko"), )],
+ upload.signKmod.extract_args())
+
+ # Check OPAL signing
+ self.assertEqual(0, upload.signOpal.call_count)
+
+ # Check UEFI signing
+ self.assertEqual(1, upload.signUefi.call_count)
+ self.assertEqual(
+ [(os.path.join(upload.tmpdir_used, "1.0/empty.efi"),)],
+ upload.signUefi.extract_args())
diff --git a/lib/lp/services/compat.py b/lib/lp/services/compat.py
index 942e690..8f3c813 100644
--- a/lib/lp/services/compat.py
+++ b/lib/lp/services/compat.py
@@ -11,9 +11,16 @@ from __future__ import absolute_import, print_function, unicode_literals
__metaclass__ = type
__all__ = [
'SafeConfigParser',
+ 'mock',
]
try:
from configparser import ConfigParser as SafeConfigParser
except ImportError:
from ConfigParser import SafeConfigParser
+
+
+try:
+ import mock
+except ImportError:
+ from unittest import mock
diff --git a/lib/lp/services/config/schema-lazr.conf b/lib/lp/services/config/schema-lazr.conf
index c43a908..18b452b 100644
--- a/lib/lp/services/config/schema-lazr.conf
+++ b/lib/lp/services/config/schema-lazr.conf
@@ -358,7 +358,7 @@ update_preview_diff_ready_timeout: 15
# An HTTP service that will have status 200 OK when the service is available
# for more connections, and 503 Service Unavailable when it is in the process
# of shutting down and so should not receive any more connections.
-web_status_port = tcp:8022
+web_status_port: tcp:8022
# The URL of the internal Bazaar hosting API endpoint.
internal_bzr_api_endpoint: none
@@ -1147,7 +1147,7 @@ dbname: session_prod
[librarianlogparser]
-logs_root = /srv/launchpadlibrarian.net-logs
+logs_root: /srv/launchpadlibrarian.net-logs
[librarian]
@@ -1203,7 +1203,7 @@ download_url: http://librarian.launchpad.net/
# datatype: urlbase
restricted_download_url: http://restricted-librarian.launchpad.net/
-use_https = True
+use_https: True
# The URL of the XML-RPC endpoint that handles verifying macaroons. This
# should implement IAuthServer.
@@ -1568,11 +1568,17 @@ generate_templates: True
[rosetta_pofile_stats]
# In daily runs of pofile statistics update, check for
# POFiles that have been updated in the last how many days.
-days_considered_recent = 7
+days_considered_recent: 7
# Number of seconds each LoopTuner iteration should take.
-looptuner_iteration_duration = 4
-
+looptuner_iteration_duration: 4
+
+# lp-signing service connection info. See lp-signing's documentation on how to
+# get valid keys.
+[signing]
+signing_endpoint: none
+client_private_key: none
+client_public_key: none
# For the personal standing updater cron script.
[standingupdater]
diff --git a/lib/lp/services/configure.zcml b/lib/lp/services/configure.zcml
index b85bfa5..f23faea 100644
--- a/lib/lp/services/configure.zcml
+++ b/lib/lp/services/configure.zcml
@@ -1,4 +1,4 @@
-<!-- Copyright 2010-2019 Canonical Ltd. This software is licensed under the
+<!-- Copyright 2010-2020 Canonical Ltd. This software is licensed under the
GNU Affero General Public License version 3 (see the file LICENSE).
-->
@@ -26,6 +26,7 @@
<include package=".profile" />
<include package=".scripts" />
<include package=".session" />
+ <include package=".signing" />
<include package=".sitesearch" />
<include package=".statistics" />
<include package=".temporaryblobstorage" />
diff --git a/lib/lp/services/features/flags.py b/lib/lp/services/features/flags.py
index 9fdb4a0..4694cac 100644
--- a/lib/lp/services/features/flags.py
+++ b/lib/lp/services/features/flags.py
@@ -1,4 +1,4 @@
-# Copyright 2010-2019 Canonical Ltd. This software is licensed under the
+# Copyright 2010-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
__all__ = [
@@ -227,6 +227,12 @@ flag_info = sorted([
'bing',
'Site search engine',
''),
+ ('archivepublisher.signing_service.enabled',
+ 'boolean',
+ 'If true, sign packages using signing service instead of local files.',
+ '',
+ '',
+ ''),
])
# The set of all flag names that are documented.
diff --git a/lib/lp/services/signing/__init__.py b/lib/lp/services/signing/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lp/services/signing/__init__.py
diff --git a/lib/lp/services/signing/configure.zcml b/lib/lp/services/signing/configure.zcml
new file mode 100644
index 0000000..4b72d7d
--- /dev/null
+++ b/lib/lp/services/signing/configure.zcml
@@ -0,0 +1,27 @@
+<configure
+ xmlns="http://namespaces.zope.org/zope">
+
+ <class class="lp.services.signing.model.signingkey.ArchiveSigningKey">
+ <allow
+ interface="lp.services.signing.interfaces.signingkey.IArchiveSigningKey"/>
+ </class>
+
+ <class class="lp.services.signing.model.signingkey.SigningKey">
+ <allow
+ interface="lp.services.signing.interfaces.signingkey.ISigningKey"/>
+ </class>
+
+ <securedutility
+ class="lp.services.signing.model.signingkey.ArchiveSigningKeySet"
+ provides="lp.services.signing.interfaces.signingkey.IArchiveSigningKeySet">
+ <allow
+ interface="lp.services.signing.interfaces.signingkey.IArchiveSigningKeySet"/>
+ </securedutility>
+
+ <securedutility
+ class="lp.services.signing.proxy.SigningServiceClient"
+ provides="lp.services.signing.interfaces.signingserviceclient.ISigningServiceClient">
+ <allow
+ interface="lp.services.signing.interfaces.signingserviceclient.ISigningServiceClient" />
+ </securedutility>
+</configure>
diff --git a/lib/lp/services/signing/enums.py b/lib/lp/services/signing/enums.py
new file mode 100644
index 0000000..fdffe5a
--- /dev/null
+++ b/lib/lp/services/signing/enums.py
@@ -0,0 +1,65 @@
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Enums for signing keys management
+"""
+
+__metaclass__ = type
+
+__all__ = [
+ 'SigningKeyType',
+ 'SigningMode',
+ ]
+
+from lazr.enum import (
+ DBEnumeratedType,
+ DBItem,
+ EnumeratedType,
+ Item,
+ )
+
+
+class SigningKeyType(DBEnumeratedType):
+ """Available key types on lp-signing service.
+
+ These items should be kept in sync with
+ lp-signing:lp-signing/lp_signing/enums.py (specially the numbers) to
+ avoid confusion when reading values from different databases.
+ """
+ UEFI = DBItem(1, """
+ UEFI
+
+ A signing key for UEFI Secure Boot images.
+ """)
+
+ KMOD = DBItem(2, """
+ Kmod
+
+ A signing key for kernel modules.
+ """)
+
+ OPAL = DBItem(3, """
+ OPAL
+
+ A signing key for OPAL kernel images.
+ """)
+
+ SIPL = DBItem(4, """
+ SIPL
+
+ A signing key for Secure Initial Program Load kernel images.
+ """)
+
+ FIT = DBItem(5, """
+ FIT
+
+ A signing key for U-Boot Flat Image Tree images.
+ """)
+
+
+class SigningMode(EnumeratedType):
+ """Archive file signing mode."""
+
+ ATTACHED = Item("Attached signature")
+ DETACHED = Item("Detached signature")
+ CLEAR = Item("Cleartext signature")
diff --git a/lib/lp/services/signing/interfaces/__init__.py b/lib/lp/services/signing/interfaces/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lp/services/signing/interfaces/__init__.py
diff --git a/lib/lp/services/signing/interfaces/signingkey.py b/lib/lp/services/signing/interfaces/signingkey.py
new file mode 100644
index 0000000..594a053
--- /dev/null
+++ b/lib/lp/services/signing/interfaces/signingkey.py
@@ -0,0 +1,126 @@
+# Copyright 2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Interfaces for signing keys stored at the signing service."""
+
+__metaclass__ = type
+
+__all__ = [
+ 'IArchiveSigningKey',
+ 'IArchiveSigningKeySet',
+ 'ISigningKey',
+ 'ISigningKeySet',
+]
+
+from lazr.restful.fields import Reference
+from zope.interface.interface import Interface
+from zope.schema import (
+ Bytes,
+ Choice,
+ Datetime,
+ Int,
+ Text,
+ )
+
+from lp import _
+from lp.registry.interfaces.distroseries import IDistroSeries
+from lp.services.signing.enums import SigningKeyType
+from lp.soyuz.interfaces.archive import IArchive
+
+
+class ISigningKey(Interface):
+ """A key registered to sign uploaded files"""
+
+ id = Int(title=_('ID'), required=True, readonly=True)
+
+ key_type = Choice(
+ title=_("The signing key type (UEFI, KMOD, etc)."),
+ required=True, readonly=True, vocabulary=SigningKeyType)
+
+ fingerprint = Text(
+ title=_("Fingerprint of the key"), required=True, readonly=True)
+
+ public_key = Bytes(
+ title=_("Public key binary content"), required=False,
+ readonly=True)
+
+ date_created = Datetime(
+ title=_('When this key was created'), required=True, readonly=True)
+
+ def sign(message, message_name):
+ """Sign the given message using this key
+
+ :param message: The message to be signed.
+ :param message_name: A name for the message being signed.
+ """
+
+
+class ISigningKeySet(Interface):
+ """Interface to deal with the collection of signing keys
+ """
+
+ def generate(key_type, description=None):
+ """Generates a new signing key on lp-signing and stores it in LP's
+ database.
+
+ :param key_type: One of the SigningKeyType enum's value
+ :param description: (optional) The description associated with this
+ key
+ :returns: The SigningKey object associated with the newly created
+ key at lp-signing"""
+
+
+class IArchiveSigningKey(Interface):
+ """Which signing key should be used by a specific archive"""
+
+ id = Int(title=_('ID'), required=True, readonly=True)
+
+ archive = Reference(
+ IArchive, title=_("Archive"), required=True, readonly=True,
+ description=_("The archive that owns this key."))
+
+ earliest_distro_series = Reference(
+ IDistroSeries, title=_("Distro series"), required=False, readonly=True,
+ description=_("The minimum series that uses this key, if any."))
+
+ key_type = Choice(
+ title=_("The signing key type (UEFI, KMOD, etc)."),
+ required=True, readonly=True, vocabulary=SigningKeyType)
+
+ signing_key = Reference(
+ ISigningKey, title=_("Signing key"), required=True, readonly=True,
+ description=_("Which signing key should be used by this archive"))
+
+
+class IArchiveSigningKeySet(Interface):
+ """Management class to deal with ArchiveSigningKey objects
+ """
+
+ def create(archive, earliest_distro_series, signing_key):
+ """Creates a new ArchiveSigningKey for archive/distro_series.
+
+ :return: A tuple like (db_object:ArchiveSigningKey, created:boolean)
+ with the ArchiveSigningKey and True if it was created (
+ False if it was updated).
+ """
+
+ def getSigningKey(key_type, archive, distro_series):
+ """Get the most suitable key for a given archive / distro series
+ pair.
+
+ :return: The most suitable key
+ """
+
+ def generate(key_type, archive, earliest_distro_series=None,
+ description=None):
+ """Generate a new key on signing service, and save it to db.
+
+ :param key_type: One of the SigningKeyType enum's value
+ :param archive: The package Archive that should be associated with
+ this key
+ :param earliest_distro_series: (optional) The minimum distro series
+ that should use the generated key.
+ :param description: (optional) The description associated with this
+ key
+ :returns: The generated ArchiveSigningKey
+ """
diff --git a/lib/lp/services/signing/interfaces/signingserviceclient.py b/lib/lp/services/signing/interfaces/signingserviceclient.py
new file mode 100644
index 0000000..0d2a242
--- /dev/null
+++ b/lib/lp/services/signing/interfaces/signingserviceclient.py
@@ -0,0 +1,47 @@
+# Copyright 2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Interfaces for signing keys stored at the signing service."""
+
+__metaclass__ = type
+
+__all__ = [
+ 'ISigningServiceClient',
+ ]
+
+from zope.interface import (
+ Attribute,
+ Interface,
+ )
+
+from lp import _
+
+
+class ISigningServiceClient(Interface):
+ service_public_key = Attribute(_("The public key of signing service."))
+ private_key = Attribute(_("This client's private key."))
+
+ def getNonce():
+ """Get nonce, to be used when sending messages.
+ """
+
+ def generate(key_type, description):
+ """Generate a key to be used when signing.
+
+ :param key_type: One of available key types at SigningKeyType
+ :param description: String description of the generated key
+ :return: A dict with 'fingerprint' (str) and 'public-key' (bytes)
+ """
+
+ def sign(key_type, fingerprint, message_name, message, mode):
+ """Sign the given message using the specified key_type and a
+ pre-generated fingerprint (see `generate` method).
+
+ :param key_type: One of the key types from SigningKeyType enum
+ :param fingerprint: The fingerprint of the signing key, generated by
+ the `generate` method
+ :param message_name: A description of the message being signed
+ :param message: The message to be signed
+ :param mode: SigningMode.ATTACHED or SigningMode.DETACHED
+ :return: A dict with 'public-key' and 'signed-message'
+ """
diff --git a/lib/lp/services/signing/model/__init__.py b/lib/lp/services/signing/model/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lp/services/signing/model/__init__.py
diff --git a/lib/lp/services/signing/model/signingkey.py b/lib/lp/services/signing/model/signingkey.py
new file mode 100644
index 0000000..e77c200
--- /dev/null
+++ b/lib/lp/services/signing/model/signingkey.py
@@ -0,0 +1,196 @@
+# Copyright 2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Database classes to manage signing keys stored at the signing service."""
+
+
+__metaclass__ = type
+
+__all__ = [
+ 'ArchiveSigningKey',
+ 'ArchiveSigningKeySet',
+ 'SigningKey',
+ ]
+
+from collections import defaultdict
+
+import pytz
+from storm.locals import (
+ Bytes,
+ DateTime,
+ Int,
+ Reference,
+ Unicode,
+ )
+from zope.component import getUtility
+from zope.interface import (
+ implementer,
+ provider,
+ )
+
+from lp.services.database.constants import (
+ DEFAULT,
+ UTC_NOW,
+ )
+from lp.services.database.enumcol import DBEnum
+from lp.services.database.interfaces import (
+ IMasterStore,
+ IStore,
+ )
+from lp.services.database.stormbase import StormBase
+from lp.services.signing.enums import (
+ SigningKeyType,
+ SigningMode,
+ )
+from lp.services.signing.interfaces.signingkey import (
+ IArchiveSigningKey,
+ IArchiveSigningKeySet,
+ ISigningKey,
+ ISigningKeySet,
+ )
+from lp.services.signing.interfaces.signingserviceclient import (
+ ISigningServiceClient,
+ )
+
+
+@implementer(ISigningKey)
+@provider(ISigningKeySet)
+class SigningKey(StormBase):
+ """A key stored at lp-signing, used to sign uploaded files and packages"""
+
+ __storm_table__ = 'SigningKey'
+
+ id = Int(primary=True)
+
+ key_type = DBEnum(enum=SigningKeyType, allow_none=False)
+
+ description = Unicode(allow_none=True)
+
+ fingerprint = Unicode(allow_none=False)
+
+ public_key = Bytes(allow_none=False)
+
+ date_created = DateTime(
+ allow_none=False, default=UTC_NOW, tzinfo=pytz.UTC)
+
+ def __init__(self, key_type, fingerprint, public_key,
+ description=None, date_created=DEFAULT):
+ """Builds the signing key
+
+ :param key_type: One of the SigningKeyType enum items
+ :param fingerprint: The key's fingerprint
+ :param public_key: The key's public key (raw; not base64-encoded)
+ """
+ super(SigningKey, self).__init__()
+ self.key_type = key_type
+ self.fingerprint = fingerprint
+ self.public_key = public_key
+ self.description = description
+ self.date_created = date_created
+
+ @classmethod
+ def generate(cls, key_type, description=None):
+ signing_service = getUtility(ISigningServiceClient)
+ generated_key = signing_service.generate(key_type, description)
+ signing_key = SigningKey(
+ key_type=key_type, fingerprint=generated_key['fingerprint'],
+ public_key=generated_key['public-key'],
+ description=description)
+ store = IMasterStore(SigningKey)
+ store.add(signing_key)
+ return signing_key
+
+ def sign(self, message, message_name):
+ if self.key_type in (SigningKeyType.UEFI, SigningKeyType.FIT):
+ mode = SigningMode.ATTACHED
+ else:
+ mode = SigningMode.DETACHED
+ signing_service = getUtility(ISigningServiceClient)
+ signed = signing_service.sign(
+ self.key_type, self.fingerprint, message_name, message, mode)
+ return signed['signed-message']
+
+
+@implementer(IArchiveSigningKey)
+class ArchiveSigningKey(StormBase):
+ """Which signing key should be used by a given archive / series.
+ """
+
+ __storm_table__ = 'ArchiveSigningKey'
+
+ id = Int(primary=True)
+
+ archive_id = Int(name="archive", allow_none=False)
+ archive = Reference(archive_id, 'Archive.id')
+
+ earliest_distro_series_id = Int(
+ name="earliest_distro_series", allow_none=True)
+ earliest_distro_series = Reference(
+ earliest_distro_series_id, 'DistroSeries.id')
+
+ key_type = DBEnum(enum=SigningKeyType, allow_none=False)
+
+ signing_key_id = Int(name="signing_key", allow_none=False)
+ signing_key = Reference(signing_key_id, SigningKey.id)
+
+ def __init__(self, archive=None, earliest_distro_series=None,
+ signing_key=None):
+ super(ArchiveSigningKey, self).__init__()
+ self.archive = archive
+ self.signing_key = signing_key
+ self.key_type = signing_key.key_type
+ self.earliest_distro_series = earliest_distro_series
+
+
+@implementer(IArchiveSigningKeySet)
+class ArchiveSigningKeySet:
+
+ @classmethod
+ def create(cls, archive, earliest_distro_series, signing_key):
+ store = IMasterStore(SigningKey)
+ obj = ArchiveSigningKey(archive, earliest_distro_series, signing_key)
+ store.add(obj)
+ return obj
+
+ @classmethod
+ def getSigningKey(cls, key_type, archive, distro_series):
+ store = IStore(ArchiveSigningKey)
+ # Gets all the keys of the given key_type available for the archive
+ rs = store.find(ArchiveSigningKey,
+ SigningKey.id == ArchiveSigningKey.signing_key_id,
+ SigningKey.key_type == key_type,
+ ArchiveSigningKey.key_type == key_type,
+ ArchiveSigningKey.archive == archive)
+
+ # prefetch related signing keys to avoid extra queries.
+ signing_keys = store.find(SigningKey, [
+ SigningKey.id.is_in([i.signing_key_id for i in rs])])
+ signing_keys_by_id = {i.id: i for i in signing_keys}
+
+ # Group keys per type, and per distro series
+ keys_per_series = defaultdict(dict)
+ for i in rs:
+ signing_key = signing_keys_by_id[i.signing_key_id]
+ keys_per_series[i.earliest_distro_series] = signing_key
+
+ # Let's search the most suitable per key type.
+ found_series = False
+ # Note that archive.distribution.series is, by default, sorted by
+ # "version", reversed.
+ for series in archive.distribution.series:
+ if series == distro_series:
+ found_series = True
+ if found_series and series in keys_per_series:
+ return keys_per_series[series]
+ # If no specific key for distro_series was found, returns
+ # the keys for the archive itself (or None if no key is
+ # available for the archive either).
+ return keys_per_series.get(None)
+
+ @classmethod
+ def generate(cls, key_type, archive, earliest_distro_series=None,
+ description=None):
+ signing_key = SigningKey.generate(key_type, description)
+ archive_signing = ArchiveSigningKeySet.create(
+ archive, earliest_distro_series, signing_key)
+ return archive_signing
diff --git a/lib/lp/services/signing/proxy.py b/lib/lp/services/signing/proxy.py
new file mode 100644
index 0000000..d8542e3
--- /dev/null
+++ b/lib/lp/services/signing/proxy.py
@@ -0,0 +1,182 @@
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Proxy calls to lp-signing service"""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+
+import base64
+import json
+
+from lazr.restful.utils import get_current_browser_request
+from nacl.encoding import Base64Encoder
+from nacl.public import (
+ Box,
+ PrivateKey,
+ PublicKey,
+ )
+from nacl.utils import random
+from six.moves.urllib.parse import urljoin
+from zope.interface import implementer
+
+from lp.services.config import config
+from lp.services.propertycache import (
+ cachedproperty,
+ get_property_cache,
+ )
+from lp.services.signing.enums import (
+ SigningKeyType,
+ SigningMode,
+ )
+from lp.services.signing.interfaces.signingserviceclient import (
+ ISigningServiceClient,
+ )
+from lp.services.timeline.requesttimeline import get_request_timeline
+from lp.services.timeout import urlfetch
+
+
+@implementer(ISigningServiceClient)
+class SigningServiceClient:
+ """Representation of lp-signing service REST interface
+
+ To benefit from caching, use this class as a singleton through
+ getUtility(ISigningServiceClient).
+ """
+
+ def _cleanCaches(self):
+ """Cleanup cached properties"""
+ del get_property_cache(self).service_public_key
+
+ def getUrl(self, path):
+ """Shortcut to concatenate lp-signing address with the desired
+ endpoint path.
+
+ :param path: The REST endpoint to be joined.
+ """
+ base_url = config.signing.signing_endpoint
+ return urljoin(base_url, path)
+
+ def _makeResponseNonce(self):
+ return random(Box.NONCE_SIZE)
+
+ def _decryptResponseJson(self, response, response_nonce):
+ box = Box(self.private_key, self.service_public_key)
+ return json.loads(box.decrypt(
+ response.content, response_nonce, encoder=Base64Encoder))
+
+ def _requestJson(self, path, method="GET", **kwargs):
+ """Helper method to do an HTTP request and get back a json from the
+ signing service, raising exception if status code != 2xx.
+
+ :param path: The endpoint path
+ :param method: The HTTP method to be used (GET, POST, etc)
+ :param needs_resp_nonce: Indicates if the endpoint requires us to
+ include a X-Response-Nonce, and returns back an encrypted
+ response JSON.
+ """
+ timeline = get_request_timeline(get_current_browser_request())
+ action = timeline.start(
+ "services-signing-proxy-%s" % method, "%s %s" %
+ (path, json.dumps(kwargs)))
+
+ headers = kwargs.get("headers", {})
+ response_nonce = None
+ if "X-Response-Nonce" in headers:
+ response_nonce = base64.b64decode(headers["X-Response-Nonce"])
+
+ try:
+ url = self.getUrl(path)
+ response = urlfetch(url, method=method.lower(), **kwargs)
+ response.raise_for_status()
+ if response_nonce is None:
+ return response.json()
+ else:
+ return self._decryptResponseJson(response, response_nonce)
+ finally:
+ action.finish()
+
+ @cachedproperty
+ def service_public_key(self):
+ """Returns the lp-signing service's public key.
+ """
+ data = self._requestJson("/service-key")
+ return PublicKey(data["service-key"], encoder=Base64Encoder)
+
+ @property
+ def private_key(self):
+ return PrivateKey(
+ config.signing.client_private_key, encoder=Base64Encoder)
+
+ def getNonce(self):
+ data = self._requestJson("/nonce", "POST")
+ return base64.b64decode(data["nonce"].encode("UTF-8"))
+
+ def _getAuthHeaders(self, nonce, response_nonce):
+ """Get headers to call authenticated endpoints.
+
+ :param nonce: The nonce bytes to be used (not the base64 encoded one!)
+ :param response_nonce: The X-Response-Nonce bytes to be used to
+ decrypt the boxed response.
+ :return: Header dict, ready to be used by requests
+ """
+ return {
+ "Content-Type": "application/x-boxed-json",
+ "X-Client-Public-Key": config.signing.client_public_key,
+ "X-Nonce": base64.b64encode(nonce),
+ "X-Response-Nonce": base64.b64encode(response_nonce),
+ }
+
+ def _encryptPayload(self, nonce, message):
+ """Returns the encrypted version of message, base64 encoded and
+ ready to be sent on a HTTP request to lp-signing service.
+
+ :param nonce: The original (non-base64 encoded) nonce
+ :param message: The str message to be encrypted
+ """
+ box = Box(self.private_key, self.service_public_key)
+ encrypted_message = box.encrypt(message, nonce, encoder=Base64Encoder)
+ return encrypted_message.ciphertext
+
+ def generate(self, key_type, description):
+ if key_type not in SigningKeyType.items:
+ raise ValueError("%s is not a valid key type" % key_type)
+
+ nonce = self.getNonce()
+ response_nonce = self._makeResponseNonce()
+ data = json.dumps({
+ "key-type": key_type.name,
+ "description": description,
+ }).encode("UTF-8")
+ ret = self._requestJson(
+ "/generate", "POST",
+ headers=self._getAuthHeaders(nonce, response_nonce),
+ data=self._encryptPayload(nonce, data))
+ return {
+ "fingerprint": ret["fingerprint"],
+ "public-key": base64.b64decode(ret["public-key"])}
+
+ def sign(self, key_type, fingerprint, message_name, message, mode):
+ if mode not in {SigningMode.ATTACHED, SigningMode.DETACHED}:
+ raise ValueError("%s is not a valid mode" % mode)
+ if key_type not in SigningKeyType.items:
+ raise ValueError("%s is not a valid key type" % key_type)
+
+ nonce = self.getNonce()
+ response_nonce = self._makeResponseNonce()
+ data = json.dumps({
+ "key-type": key_type.name,
+ "fingerprint": fingerprint,
+ "message-name": message_name,
+ "message": base64.b64encode(message).decode("UTF-8"),
+ "mode": mode.name,
+ }).encode("UTF-8")
+ data = self._requestJson(
+ "/sign", "POST",
+ headers=self._getAuthHeaders(nonce, response_nonce),
+ data=self._encryptPayload(nonce, data))
+
+ return {
+ 'public-key': base64.b64decode(data['public-key']),
+ 'signed-message': base64.b64decode(data['signed-message'])}
diff --git a/lib/lp/services/signing/tests/__init__.py b/lib/lp/services/signing/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lp/services/signing/tests/__init__.py
diff --git a/lib/lp/services/signing/tests/helpers.py b/lib/lp/services/signing/tests/helpers.py
new file mode 100644
index 0000000..e01730f
--- /dev/null
+++ b/lib/lp/services/signing/tests/helpers.py
@@ -0,0 +1,67 @@
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Helper functions for code testing live here."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+__all__ = [
+ 'SigningServiceClientFixture',
+ ]
+
+import fixtures
+from nacl.public import PrivateKey
+from six import text_type
+
+from lp.services.compat import mock
+from lp.services.signing.interfaces.signingserviceclient import (
+ ISigningServiceClient,
+ )
+from lp.testing.fixture import ZopeUtilityFixture
+
+
+class SigningServiceClientFixture(fixtures.Fixture):
+ """Mock for SigningServiceClient class.
+
+ This method fakes the API calls on generate and sign methods,
+ and provides a nice way of getting the fake returned values on
+ self.generate_returns and self.sign_returns attributes.
+
+ Both generate_returns and sign_returns format is the following:
+ [(key_type, api_return_dict), (key_type, api_return_dict), ...]"""
+ def __init__(self, factory):
+ self.factory = factory
+
+ self.generate = mock.Mock()
+ self.generate.side_effect = self._generate
+
+ self.sign = mock.Mock()
+ self.sign.side_effect = self._sign
+
+ self.generate_returns = []
+ self.sign_returns = []
+
+ def _generate(self, key_type, description):
+ key = bytes(PrivateKey.generate().public_key)
+ data = {
+ "fingerprint": text_type(self.factory.getUniqueHexString(40)),
+ "public-key": key}
+ self.generate_returns.append((key_type, data))
+ return data
+
+ def _sign(self, key_type, fingerprint, message_name, message, mode):
+ key = bytes(PrivateKey.generate().public_key)
+ signed_msg = "signed with key_type={}".format(key_type.name)
+ data = {
+ 'public-key': key,
+ 'signed-message': signed_msg}
+ self.sign_returns.append((key_type, data))
+ return data
+
+ def _setUp(self):
+ self.useFixture(ZopeUtilityFixture(self, ISigningServiceClient))
+
+ def _cleanup(self):
+ self.generate_returns = []
+ self.sign_returns = []
diff --git a/lib/lp/services/signing/tests/test_proxy.py b/lib/lp/services/signing/tests/test_proxy.py
new file mode 100644
index 0000000..a972ea3
--- /dev/null
+++ b/lib/lp/services/signing/tests/test_proxy.py
@@ -0,0 +1,318 @@
+# Copyright 2010-2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+__metaclass__ = type
+
+import base64
+import json
+
+from fixtures import MockPatch
+from fixtures.testcase import TestWithFixtures
+from nacl.encoding import Base64Encoder
+from nacl.public import (
+ Box,
+ PrivateKey,
+ PublicKey,
+ )
+from nacl.utils import random
+import responses
+from testtools.matchers import (
+ ContainsDict,
+ Equals,
+ )
+from zope.component import getUtility
+from zope.security.proxy import removeSecurityProxy
+
+from lp.services.config import config
+from lp.services.signing.enums import (
+ SigningKeyType,
+ SigningMode,
+ )
+from lp.services.signing.interfaces.signingserviceclient import (
+ ISigningServiceClient,
+ )
+from lp.services.signing.proxy import SigningServiceClient
+from lp.testing import TestCaseWithFactory
+from lp.testing.layers import ZopelessLayer
+
+
+class SigningServiceResponseFactory:
+ """Factory for fake responses from lp-signing service.
+
+ This class is a helper to pretend that lp-signing service is running by
+ mocking `requests` module, and returning fake responses from
+ response.get(url) and response.post(url). See `patch` method.
+ """
+ def __init__(self):
+ self.service_private_key = PrivateKey.generate()
+ self.service_public_key = self.service_private_key.public_key
+ self.b64_service_public_key = self.service_public_key.encode(
+ encoder=Base64Encoder).decode("UTF-8")
+
+ self.client_private_key = PrivateKey(
+ config.signing.client_private_key, encoder=Base64Encoder)
+ self.client_public_key = self.client_private_key.public_key
+
+ self.nonce = random(Box.NONCE_SIZE)
+ self.b64_nonce = base64.b64encode(self.nonce).decode("UTF-8")
+
+ self.generated_public_key = PrivateKey.generate().public_key
+ self.b64_generated_public_key = base64.b64encode(
+ bytes(self.generated_public_key))
+ self.generated_fingerprint = (
+ u'338D218488DFD597D8FCB9C328C3E9D9ADA16CEE')
+ self.b64_signed_msg = base64.b64encode(b"the-signed-msg")
+
+ self.signed_msg_template = "%s::signed!"
+
+ @classmethod
+ def getUrl(cls, path):
+ """Shortcut to get full path of an endpoint at lp-signing.
+ """
+ return SigningServiceClient().getUrl(path)
+
+ def _encryptPayload(self, data, nonce):
+ """Translated the given data dict as a boxed json, encrypted as
+ lp-signing would do."""
+ box = Box(self.service_private_key, self.client_public_key)
+ return box.encrypt(
+ json.dumps(data), nonce, encoder=Base64Encoder).ciphertext
+
+ def getAPISignedContent(self, call_index=0):
+ """Returns the signed message returned by the API.
+
+ This is a shortcut to avoid inspecting and decrypting API calls,
+ since we know that the content of /sign calls are hardcoded by this
+ fixture.
+ """
+ return self.signed_msg_template % (call_index + 1)
+
+ def addResponses(self, test_case):
+ """Patches all requests with default test values.
+
+ This method uses `responses` module to mock `requests`. You should use
+ @responses.activate decorator in your test method before
+ calling this method.
+
+ See https://github.com/getsentry/responses for details on how to
+ inspect the HTTP calls made.
+
+ Other helpful attributes are:
+ - self.b64_service_public_key
+ - self.b64_nonce
+ - self.generated_public_key
+ - self.generated_fingerprint
+ which holds the respective values used in the default fake responses.
+
+ The /sign endpoint will return, as signed message, "$n::signed!",
+ where $n is the call number (base64-encoded, as lp-signing would
+ return). This could be useful on black-box tests, where several
+ calls to /sign would be done and the response should be checked.
+ """
+ # Patch SigningServiceClient._makeResponseNonce to return always the
+ # same nonce, to simplify the tests.
+ response_nonce = random(Box.NONCE_SIZE)
+ test_case.useFixture(MockPatch(
+ 'lp.services.signing.proxy.SigningServiceClient.'
+ '_makeResponseNonce',
+ return_value=response_nonce))
+
+ responses.add(
+ responses.GET, self.getUrl("/service-key"),
+ json={"service-key": self.b64_service_public_key.decode('utf8')},
+ status=200)
+
+ responses.add(
+ responses.POST, self.getUrl("/nonce"),
+ json={"nonce": self.b64_nonce.decode('utf8')}, status=201)
+
+ responses.add(
+ responses.POST, self.getUrl("/generate"),
+ body=self._encryptPayload({
+ 'fingerprint': self.generated_fingerprint,
+ 'public-key': self.b64_generated_public_key.decode('utf8')
+ }, nonce=response_nonce),
+ status=201)
+ call_counts = {'/sign': 0}
+
+ def sign_callback(request):
+ call_counts['/sign'] += 1
+ signed = base64.b64encode(
+ self.signed_msg_template % call_counts['/sign'])
+ data = {'signed-message': signed.decode('utf8'),
+ 'public-key': self.b64_generated_public_key.decode('utf8')}
+ return 201, {}, self._encryptPayload(data, response_nonce)
+
+ responses.add_callback(
+ responses.POST, self.getUrl("/sign"),
+ callback=sign_callback)
+
+
+class SigningServiceProxyTest(TestCaseWithFactory, TestWithFixtures):
+ """Tests signing service without actually making calls to lp-signing.
+
+ Every REST call is mocked using self.response_factory, and most of this
+ class's work is actually calling those endpoints. So, many things are
+ mocked here, returning fake responses created at
+ SigningServiceResponseFactory.
+ """
+ layer = ZopelessLayer
+
+ def setUp(self, *args, **kwargs):
+ super(TestCaseWithFactory, self).setUp(*args, **kwargs)
+ self.response_factory = SigningServiceResponseFactory()
+
+ client = removeSecurityProxy(getUtility(ISigningServiceClient))
+ self.addCleanup(client._cleanCaches)
+
+ @responses.activate
+ def test_get_service_public_key(self):
+ self.response_factory.addResponses(self)
+
+ signing = getUtility(ISigningServiceClient)
+ key = removeSecurityProxy(signing.service_public_key)
+
+ # Asserts that the public key is correct.
+ self.assertIsInstance(key, PublicKey)
+ self.assertEqual(
+ key.encode(Base64Encoder),
+ self.response_factory.b64_service_public_key)
+
+ # Checks that the HTTP call was made
+ self.assertEqual(1, len(responses.calls))
+ call = responses.calls[0]
+ self.assertEqual("GET", call.request.method)
+ self.assertEqual(
+ self.response_factory.getUrl("/service-key"), call.request.url)
+
+ @responses.activate
+ def test_get_nonce(self):
+ self.response_factory.addResponses(self)
+
+ signing = getUtility(ISigningServiceClient)
+ nonce = signing.getNonce()
+
+ self.assertEqual(
+ base64.b64encode(nonce), self.response_factory.b64_nonce)
+
+ # Checks that the HTTP call was made
+ self.assertEqual(1, len(responses.calls))
+ call = responses.calls[0]
+ self.assertEqual("POST", call.request.method)
+ self.assertEqual(
+ self.response_factory.getUrl("/nonce"), call.request.url)
+
+ @responses.activate
+ def test_generate_unknown_key_type_raises_exception(self):
+ self.response_factory.addResponses(self)
+
+ signing = getUtility(ISigningServiceClient)
+ self.assertRaises(
+ ValueError, signing.generate, "banana", "Wrong key type")
+ self.assertEqual(0, len(responses.calls))
+
+ @responses.activate
+ def test_generate_key(self):
+ """Makes sure that the SigningService.generate method calls the
+ correct endpoints
+ """
+ self.response_factory.addResponses(self)
+ # Generate the key, and checks if we got back the correct dict.
+ signing = getUtility(ISigningServiceClient)
+ generated = signing.generate(SigningKeyType.UEFI, "my lp test key")
+
+ self.assertEqual(generated, {
+ 'public-key': bytes(self.response_factory.generated_public_key),
+ 'fingerprint': self.response_factory.generated_fingerprint})
+
+ self.assertEqual(3, len(responses.calls))
+
+ # expected order of HTTP calls
+ http_nonce, http_service_key, http_generate = responses.calls
+
+ self.assertEqual("POST", http_nonce.request.method)
+ self.assertEqual(
+ self.response_factory.getUrl("/nonce"), http_nonce.request.url)
+
+ self.assertEqual("GET", http_service_key.request.method)
+ self.assertEqual(
+ self.response_factory.getUrl("/service-key"),
+ http_service_key.request.url)
+
+ self.assertEqual("POST", http_generate.request.method)
+ self.assertEqual(
+ self.response_factory.getUrl("/generate"),
+ http_generate.request.url)
+ self.assertThat(http_generate.request.headers, ContainsDict({
+ "Content-Type": Equals("application/x-boxed-json"),
+ "X-Client-Public-Key": Equals(config.signing.client_public_key),
+ "X-Nonce": Equals(self.response_factory.b64_nonce)}))
+ self.assertIsNotNone(http_generate.request.body)
+
+ @responses.activate
+ def test_sign_invalid_mode(self):
+ signing = getUtility(ISigningServiceClient)
+ self.assertRaises(
+ ValueError, signing.sign,
+ SigningKeyType.UEFI, 'fingerprint', 'message_name', 'message',
+ 'NO-MODE')
+ self.assertEqual(0, len(responses.calls))
+
+ @responses.activate
+ def test_sign_invalid_key_type(self):
+ signing = getUtility(ISigningServiceClient)
+ self.assertRaises(
+ ValueError, signing.sign,
+ 'shrug', 'fingerprint', 'message_name', 'message',
+ SigningMode.ATTACHED)
+ self.assertEqual(0, len(responses.calls))
+
+ @responses.activate
+ def test_sign(self):
+ """Runs through SignService.sign() flow"""
+ # Replace GET /service-key response by our mock.
+ resp_factory = self.response_factory
+ resp_factory.addResponses(self)
+
+ fingerprint = self.factory.getUniqueHexString(40).upper()
+ key_type = SigningKeyType.KMOD
+ mode = SigningMode.DETACHED
+ message_name = 'my test msg'
+ message = 'this is the message content'
+
+ signing = getUtility(ISigningServiceClient)
+ data = signing.sign(
+ key_type, fingerprint, message_name, message, mode)
+
+ self.assertEqual(3, len(responses.calls))
+ # expected order of HTTP calls
+ http_nonce, http_service_key, http_sign = responses.calls
+
+ self.assertEqual("POST", http_nonce.request.method)
+ self.assertEqual(
+ self.response_factory.getUrl("/nonce"), http_nonce.request.url)
+
+ self.assertEqual("GET", http_service_key.request.method)
+ self.assertEqual(
+ self.response_factory.getUrl("/service-key"),
+ http_service_key.request.url)
+
+ self.assertEqual("POST", http_sign.request.method)
+ self.assertEqual(
+ self.response_factory.getUrl("/sign"),
+ http_sign.request.url)
+ self.assertThat(http_sign.request.headers, ContainsDict({
+ "Content-Type": Equals("application/x-boxed-json"),
+ "X-Client-Public-Key": Equals(config.signing.client_public_key),
+ "X-Nonce": Equals(self.response_factory.b64_nonce)}))
+ self.assertIsNotNone(http_sign.request.body)
+
+ # It should have returned the correct JSON content, with signed
+ # message from the API and the public-key.
+ self.assertEqual(2, len(data))
+ self.assertEqual(
+ self.response_factory.getAPISignedContent(),
+ data['signed-message'])
+ self.assertEqual(
+ bytes(self.response_factory.generated_public_key),
+ data['public-key'])
diff --git a/lib/lp/services/signing/tests/test_signingkey.py b/lib/lp/services/signing/tests/test_signingkey.py
new file mode 100644
index 0000000..2e0a5df
--- /dev/null
+++ b/lib/lp/services/signing/tests/test_signingkey.py
@@ -0,0 +1,244 @@
+# Copyright 2010-2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+__metaclass__ = type
+
+import base64
+
+from fixtures.testcase import TestWithFixtures
+import responses
+from storm.store import Store
+from testtools.matchers import MatchesStructure
+from zope.component import getUtility
+from zope.security.proxy import removeSecurityProxy
+
+from lp.services.database.interfaces import IMasterStore
+from lp.services.signing.enums import SigningKeyType
+from lp.services.signing.interfaces.signingkey import IArchiveSigningKeySet
+from lp.services.signing.interfaces.signingserviceclient import (
+ ISigningServiceClient,
+ )
+from lp.services.signing.model.signingkey import (
+ ArchiveSigningKey,
+ SigningKey,
+ )
+from lp.services.signing.tests.test_proxy import SigningServiceResponseFactory
+from lp.testing import TestCaseWithFactory
+from lp.testing.layers import ZopelessDatabaseLayer
+
+
+class TestSigningKey(TestCaseWithFactory, TestWithFixtures):
+
+ layer = ZopelessDatabaseLayer
+
+ def setUp(self, *args, **kwargs):
+ super(TestSigningKey, self).setUp(*args, **kwargs)
+ self.signing_service = SigningServiceResponseFactory()
+
+ client = removeSecurityProxy(getUtility(ISigningServiceClient))
+ self.addCleanup(client._cleanCaches)
+
+ @responses.activate
+ def test_generate_signing_key_saves_correctly(self):
+ self.signing_service.addResponses(self)
+
+ key = SigningKey.generate(SigningKeyType.UEFI, u"this is my key")
+ self.assertIsInstance(key, SigningKey)
+
+ store = IMasterStore(SigningKey)
+ store.invalidate()
+
+ rs = store.find(SigningKey)
+ self.assertEqual(1, rs.count())
+ db_key = rs.one()
+
+ self.assertEqual(SigningKeyType.UEFI, db_key.key_type)
+ self.assertEqual(
+ self.signing_service.generated_fingerprint, db_key.fingerprint)
+ self.assertEqual(
+ self.signing_service.b64_generated_public_key,
+ base64.b64encode(db_key.public_key))
+ self.assertEqual("this is my key", db_key.description)
+
+ @responses.activate
+ def test_sign_some_data(self):
+ self.signing_service.addResponses(self)
+
+ s = SigningKey(
+ SigningKeyType.UEFI, u"a fingerprint",
+ bytes(self.signing_service.generated_public_key),
+ description=u"This is my key!")
+ signed = s.sign("secure message", "message_name")
+
+ # Checks if the returned value is actually the returning value from
+ # HTTP POST /sign call to lp-signing service
+ self.assertEqual(3, len(responses.calls))
+ self.assertEqual(self.signing_service.getAPISignedContent(), signed)
+
+
+class TestArchiveSigningKey(TestCaseWithFactory):
+ layer = ZopelessDatabaseLayer
+
+ def setUp(self, *args, **kwargs):
+ super(TestArchiveSigningKey, self).setUp(*args, **kwargs)
+ self.signing_service = SigningServiceResponseFactory()
+
+ client = removeSecurityProxy(getUtility(ISigningServiceClient))
+ self.addCleanup(client._cleanCaches)
+
+ @responses.activate
+ def test_generate_saves_correctly(self):
+ self.signing_service.addResponses(self)
+
+ archive = self.factory.makeArchive()
+ distro_series = archive.distribution.series[0]
+
+ arch_key = getUtility(IArchiveSigningKeySet).generate(
+ SigningKeyType.UEFI, archive, earliest_distro_series=distro_series,
+ description=u"some description")
+
+ store = Store.of(arch_key)
+ store.invalidate()
+
+ rs = store.find(ArchiveSigningKey)
+ self.assertEqual(1, rs.count())
+
+ db_arch_key = rs.one()
+ self.assertThat(db_arch_key, MatchesStructure.byEquality(
+ key_type=SigningKeyType.UEFI, archive=archive,
+ earliest_distro_series=distro_series))
+
+ self.assertThat(db_arch_key.signing_key, MatchesStructure.byEquality(
+ key_type=SigningKeyType.UEFI, description=u"some description",
+ fingerprint=self.signing_service.generated_fingerprint,
+ public_key=bytes(self.signing_service.generated_public_key)))
+
+ def test_create(self):
+ archive = self.factory.makeArchive()
+ distro_series = archive.distribution.series[0]
+ signing_key = self.factory.makeSigningKey()
+
+ arch_signing_key_set = getUtility(IArchiveSigningKeySet)
+
+ arch_key = arch_signing_key_set.create(
+ archive, distro_series, signing_key)
+
+ store = Store.of(arch_key)
+ store.invalidate()
+ rs = store.find(ArchiveSigningKey)
+
+ self.assertEqual(1, rs.count())
+ db_arch_key = rs.one()
+ self.assertThat(db_arch_key, MatchesStructure.byEquality(
+ key_type=signing_key.key_type, archive=archive,
+ earliest_distro_series=distro_series,
+ signing_key=signing_key))
+
+ # Saving another type should create a new entry
+ signing_key_from_another_type = self.factory.makeSigningKey(
+ key_type=SigningKeyType.KMOD)
+ arch_signing_key_set.create(
+ archive, distro_series, signing_key_from_another_type)
+
+ self.assertEqual(2, store.find(ArchiveSigningKey).count())
+
+ def test_get_signing_keys_without_distro_series_configured(self):
+ UEFI = SigningKeyType.UEFI
+ KMOD = SigningKeyType.KMOD
+
+ archive = self.factory.makeArchive()
+ distro_series = archive.distribution.series[0]
+ uefi_key = self.factory.makeSigningKey(
+ key_type=SigningKeyType.UEFI)
+ kmod_key = self.factory.makeSigningKey(
+ key_type=SigningKeyType.KMOD)
+
+ # Fill the database with keys from other archives to make sure we
+ # are filtering it out
+ other_archive = self.factory.makeArchive()
+ arch_signing_key_set = getUtility(IArchiveSigningKeySet)
+ arch_signing_key_set.create(
+ other_archive, None, self.factory.makeSigningKey())
+
+ # Create a key for the archive (no specific series)
+ arch_uefi_key = arch_signing_key_set.create(
+ archive, None, uefi_key)
+ arch_kmod_key = arch_signing_key_set.create(
+ archive, None, kmod_key)
+
+ # Should find the keys if we ask for the archive key
+ self.assertEqual(
+ arch_uefi_key.signing_key,
+ arch_signing_key_set.getSigningKey(UEFI, archive, None))
+ self.assertEqual(
+ arch_kmod_key.signing_key,
+ arch_signing_key_set.getSigningKey(KMOD, archive, None))
+
+ # Should find the key if we ask for archive + distro_series key
+ self.assertEqual(
+ arch_uefi_key.signing_key,
+ arch_signing_key_set.getSigningKey(UEFI, archive, distro_series))
+ self.assertEqual(
+ arch_kmod_key.signing_key,
+ arch_signing_key_set.getSigningKey(KMOD, archive, distro_series))
+
+ def test_get_signing_keys_with_distro_series_configured(self):
+ UEFI = SigningKeyType.UEFI
+ KMOD = SigningKeyType.KMOD
+
+ archive = self.factory.makeArchive()
+ series = archive.distribution.series
+ uefi_key = self.factory.makeSigningKey(key_type=UEFI)
+ kmod_key = self.factory.makeSigningKey(key_type=KMOD)
+
+ # Fill the database with keys from other archives to make sure we
+ # are filtering it out
+ other_archive = self.factory.makeArchive()
+ arch_signing_key_set = getUtility(IArchiveSigningKeySet)
+ arch_signing_key_set.create(
+ other_archive, None, self.factory.makeSigningKey())
+
+ # Create a key for the archive (no specific series)
+ arch_uefi_key = arch_signing_key_set.create(
+ archive, None, uefi_key)
+
+ # for kmod, should give back this one if provided a
+ # newer distro series
+ arch_kmod_key = arch_signing_key_set.create(
+ archive, series[1], kmod_key)
+ old_arch_kmod_key = arch_signing_key_set.create(
+ archive, series[2], kmod_key)
+
+ # If no distroseries is specified, it should give back no KMOD key,
+ # since we don't have a default
+ self.assertEqual(
+ arch_uefi_key.signing_key,
+ arch_signing_key_set.getSigningKey(UEFI, archive, None))
+ self.assertEqual(
+ None,
+ arch_signing_key_set.getSigningKey(KMOD, archive, None))
+
+ # For the most recent series, use the KMOD key we've set for the
+ # previous one
+ self.assertEqual(
+ arch_uefi_key.signing_key,
+ arch_signing_key_set.getSigningKey(UEFI, archive, series[0]))
+ self.assertEqual(
+ arch_kmod_key.signing_key,
+ arch_signing_key_set.getSigningKey(KMOD, archive, series[0]))
+
+ # For the previous series, we have a KMOD key configured
+ self.assertEqual(
+ arch_uefi_key.signing_key,
+ arch_signing_key_set.getSigningKey(UEFI, archive, series[1]))
+ self.assertEqual(
+ arch_kmod_key.signing_key,
+ arch_signing_key_set.getSigningKey(KMOD, archive, series[1]))
+
+ # For the old series, we have an old KMOD key configured
+ self.assertEqual(
+ arch_uefi_key.signing_key,
+ arch_signing_key_set.getSigningKey(UEFI, archive, series[2]))
+ self.assertEqual(
+ old_arch_kmod_key.signing_key,
+ arch_signing_key_set.getSigningKey(KMOD, archive, series[2]))
diff --git a/lib/lp/testing/factory.py b/lib/lp/testing/factory.py
index 8e0f1c1..a3b7736 100644
--- a/lib/lp/testing/factory.py
+++ b/lib/lp/testing/factory.py
@@ -283,6 +283,9 @@ from lp.services.propertycache import (
clear_property_cache,
get_property_cache,
)
+from lp.services.signing.enums import SigningKeyType
+from lp.services.signing.interfaces.signingkey import IArchiveSigningKeySet
+from lp.services.signing.model.signingkey import SigningKey
from lp.services.temporaryblobstorage.interfaces import (
ITemporaryStorageManager,
)
@@ -4190,6 +4193,32 @@ class BareLaunchpadObjectFactory(ObjectFactory):
removeSecurityProxy(bpr).datecreated = date_created
return bpr
+ def makeSigningKey(self, key_type=None, fingerprint=None,
+ public_key=None, description=None):
+ """Makes a SigningKey (integration with lp-signing)
+ """
+ if key_type is None:
+ key_type = SigningKeyType.UEFI
+ if fingerprint is None:
+ fingerprint = self.getUniqueUnicode('fingerprint')
+ if public_key is None:
+ public_key = self.getUniqueHexString(64)
+ store = IMasterStore(SigningKey)
+ signing_key = SigningKey(
+ key_type=key_type, fingerprint=fingerprint, public_key=public_key,
+ description=description)
+ store.add(signing_key)
+ return signing_key
+
+ def makeArchiveSigningKey(self, archive=None, distro_series=None,
+ signing_key=None):
+ if archive is None:
+ archive = self.makeArchive()
+ if signing_key is None:
+ signing_key = self.makeSigningKey()
+ return getUtility(IArchiveSigningKeySet).create(
+ archive, distro_series, signing_key)
+
def makeSection(self, name=None):
"""Make a `Section`."""
if name is None: