launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #24309
[Merge] ~pappacena/launchpad:lp-signing-integration into launchpad:master
Thiago F. Pappacena has proposed merging ~pappacena/launchpad:lp-signing-integration into launchpad:master.
Commit message:
[HOLD! DON'T MERGE] Integrating LP to lp-signing service
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~pappacena/launchpad/+git/launchpad/+merge/378364
This MP will add a new implementation of the "signing" process for file uploads using the new lp-signing service.
For migration purpose, this MP should include a way to pre-load the existing keys into LP's database and lp-signing's database. Apart from that, the new implementation will be toggleable, that is, during the migration phase, it should be possible to disable the usage of lp-signing and fall back to use the current implementation.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/launchpad:lp-signing-integration into launchpad:master.
diff --git a/lib/lp/archivepublisher/archivesigningkey.py b/lib/lp/archivepublisher/archivegpgsigningkey.py
similarity index 94%
rename from lib/lp/archivepublisher/archivesigningkey.py
rename to lib/lp/archivepublisher/archivegpgsigningkey.py
index cb8737b..2b14365 100644
--- a/lib/lp/archivepublisher/archivesigningkey.py
+++ b/lib/lp/archivepublisher/archivegpgsigningkey.py
@@ -1,12 +1,12 @@
# Copyright 2009-2018 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
-"""ArchiveSigningKey implementation."""
+"""ArchiveGPGSigningKey implementation."""
__metaclass__ = type
__all__ = [
- 'ArchiveSigningKey',
+ 'ArchiveGPGSigningKey',
'SignableArchive',
'SigningMode',
]
@@ -29,9 +29,9 @@ from zope.security.proxy import (
from lp.app.interfaces.launchpad import ILaunchpadCelebrities
from lp.archivepublisher.config import getPubConfig
-from lp.archivepublisher.interfaces.archivesigningkey import (
+from lp.archivepublisher.interfaces.archivegpgsigningkey import (
CannotSignArchive,
- IArchiveSigningKey,
+ IArchiveGPGSigningKey,
ISignableArchive,
)
from lp.archivepublisher.run_parts import (
@@ -176,18 +176,18 @@ class SignableArchive:
[(path, "%s.gpg" % path, SigningMode.DETACHED, suite)], log=log)
-@implementer(IArchiveSigningKey)
-class ArchiveSigningKey(SignableArchive):
+@implementer(IArchiveGPGSigningKey)
+class ArchiveGPGSigningKey(SignableArchive):
"""`IArchive` adapter for manipulating its GPG key."""
def getPathForSecretKey(self, key):
- """See `IArchiveSigningKey`."""
+ """See `IArchiveGPGSigningKey`."""
return os.path.join(
config.personalpackagearchive.signing_keys_root,
"%s.gpg" % key.fingerprint)
def exportSecretKey(self, key):
- """See `IArchiveSigningKey`."""
+ """See `IArchiveGPGSigningKey`."""
assert key.secret, "Only secret keys should be exported."
export_path = self.getPathForSecretKey(key)
@@ -198,7 +198,7 @@ class ArchiveSigningKey(SignableArchive):
export_file.write(key.export())
def generateSigningKey(self):
- """See `IArchiveSigningKey`."""
+ """See `IArchiveGPGSigningKey`."""
assert self.archive.signing_key is None, (
"Cannot override signing_keys.")
@@ -208,7 +208,7 @@ class ArchiveSigningKey(SignableArchive):
default_ppa = self.archive.owner.archive
if self.archive != default_ppa:
if default_ppa.signing_key is None:
- IArchiveSigningKey(default_ppa).generateSigningKey()
+ IArchiveGPGSigningKey(default_ppa).generateSigningKey()
key = default_ppa.signing_key
self.archive.signing_key_owner = key.owner
self.archive.signing_key_fingerprint = key.fingerprint
@@ -221,7 +221,7 @@ class ArchiveSigningKey(SignableArchive):
self._setupSigningKey(secret_key)
def setSigningKey(self, key_path, async_keyserver=False):
- """See `IArchiveSigningKey`."""
+ """See `IArchiveGPGSigningKey`."""
assert self.archive.signing_key is None, (
"Cannot override signing_keys.")
assert os.path.exists(key_path), (
diff --git a/lib/lp/archivepublisher/configure.zcml b/lib/lp/archivepublisher/configure.zcml
index 99bdcb4..f52d4ff 100644
--- a/lib/lp/archivepublisher/configure.zcml
+++ b/lib/lp/archivepublisher/configure.zcml
@@ -10,15 +10,15 @@
xmlns:xmlrpc="http://namespaces.zope.org/xmlrpc"
i18n_domain="launchpad">
- <!-- ArchiveSigningKey -->
- <class class="lp.archivepublisher.archivesigningkey.ArchiveSigningKey">
- <allow interface="lp.archivepublisher.interfaces.archivesigningkey.IArchiveSigningKey"/>
+ <!-- ArchiveGPGSigningKey -->
+ <class class="lp.archivepublisher.archivegpgsigningkey.ArchiveGPGSigningKey">
+ <allow interface="lp.archivepublisher.interfaces.archivegpgsigningkey.IArchiveGPGSigningKey"/>
</class>
<adapter
for="lp.soyuz.interfaces.archive.IArchive"
- provides="lp.archivepublisher.interfaces.archivesigningkey.IArchiveSigningKey"
- factory="lp.archivepublisher.archivesigningkey.ArchiveSigningKey"
+ provides="lp.archivepublisher.interfaces.archivegpgsigningkey.IArchiveGPGSigningKey"
+ factory="lp.archivepublisher.archivegpgsigningkey.ArchiveGPGSigningKey"
/>
<!-- PublisherConfig -->
diff --git a/lib/lp/archivepublisher/customupload.py b/lib/lp/archivepublisher/customupload.py
index 7db20cb..0c6a2c9 100644
--- a/lib/lp/archivepublisher/customupload.py
+++ b/lib/lp/archivepublisher/customupload.py
@@ -27,7 +27,7 @@ from lp.archivepublisher.debversion import (
Version as make_version,
VersionError,
)
-from lp.archivepublisher.interfaces.archivesigningkey import ISignableArchive
+from lp.archivepublisher.interfaces.archivegpgsigningkey import ISignableArchive
from lp.services.librarian.utils import copy_and_close
from lp.soyuz.interfaces.queue import (
CustomUploadError,
diff --git a/lib/lp/archivepublisher/interfaces/archivesigningkey.py b/lib/lp/archivepublisher/interfaces/archivegpgsigningkey.py
similarity index 95%
rename from lib/lp/archivepublisher/interfaces/archivesigningkey.py
rename to lib/lp/archivepublisher/interfaces/archivegpgsigningkey.py
index 01e4a50..85d3bfb 100644
--- a/lib/lp/archivepublisher/interfaces/archivesigningkey.py
+++ b/lib/lp/archivepublisher/interfaces/archivegpgsigningkey.py
@@ -1,13 +1,13 @@
# Copyright 2009-2018 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
-"""ArchiveSigningKey interface."""
+"""ArchiveGPGSigningKey interface."""
__metaclass__ = type
__all__ = [
'CannotSignArchive',
- 'IArchiveSigningKey',
+ 'IArchiveGPGSigningKey',
'ISignableArchive',
]
@@ -67,8 +67,8 @@ class ISignableArchive(Interface):
"""
-class IArchiveSigningKey(ISignableArchive):
- """`ArchiveSigningKey` interface.
+class IArchiveGPGSigningKey(ISignableArchive):
+ """`ArchiveGPGSigningKey` interface.
`IArchive` adapter for operations using its 'signing_key'.
diff --git a/lib/lp/archivepublisher/publishing.py b/lib/lp/archivepublisher/publishing.py
index 523c51c..3365c68 100644
--- a/lib/lp/archivepublisher/publishing.py
+++ b/lib/lp/archivepublisher/publishing.py
@@ -63,7 +63,7 @@ from lp.archivepublisher.indices import (
build_source_stanza_fields,
build_translations_stanza_fields,
)
-from lp.archivepublisher.interfaces.archivesigningkey import ISignableArchive
+from lp.archivepublisher.interfaces.archivegpgsigningkey import ISignableArchive
from lp.archivepublisher.model.ftparchive import FTPArchiveHandler
from lp.archivepublisher.utils import (
get_ppa_reference,
diff --git a/lib/lp/archivepublisher/signing.py b/lib/lp/archivepublisher/signing.py
index b685a61..1fce277 100644
--- a/lib/lp/archivepublisher/signing.py
+++ b/lib/lp/archivepublisher/signing.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2018 Canonical Ltd. This software is licensed under the
+# Copyright 2012-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""The processing of Signing tarballs.
@@ -18,6 +18,7 @@ __all__ = [
"UefiUpload",
]
+from functools import partial
import os
import shutil
import stat
@@ -27,10 +28,16 @@ import tempfile
import textwrap
import scandir
+from zope.component._api import getUtility
+from zope.security.proxy import removeSecurityProxy
from lp.archivepublisher.config import getPubConfig
from lp.archivepublisher.customupload import CustomUpload
+from lp.registry.interfaces.distroseries import IDistroSeriesSet
+from lp.services.features import getFeatureFlag
from lp.services.osutils import remove_if_exists
+from lp.services.signing.enums import SigningKeyType
+from lp.services.signing.model.signingkey import ArchiveSigningKey
from lp.soyuz.interfaces.queue import CustomUploadError
@@ -41,7 +48,132 @@ class SigningUploadPackError(CustomUploadError):
CustomUploadError.__init__(self, message)
-class SigningUpload(CustomUpload):
+def should_use_signing_service():
+ """Checks if we should be using lp-signing service or not.
+ """
+ default_value = False
+ value = getFeatureFlag('lp.services.signing.enabled')
+ if value is None:
+ return False
+ return value.lower().strip() not in ['false', '0', 'no', 'off']
+
+
+class BaseSigningUpload(CustomUpload):
+ """Common methods between LocalSigningUpload and SigningServiceUpload.
+ """
+ def __init__(self, *args, **kwargs):
+ super(BaseSigningUpload, self).__init__(*args, **kwargs)
+
+ @staticmethod
+ def parsePath(tarfile_path):
+ tarfile_base = os.path.basename(tarfile_path)
+ bits = tarfile_base.split("_")
+ if len(bits) != 3:
+ raise ValueError("%s is not TYPE_VERSION_ARCH" % tarfile_base)
+ return bits[0], bits[1], bits[2].split(".")[0]
+
+ def setComponents(self, tarfile_path):
+ self.package, self.version, self.arch = self.parsePath(
+ tarfile_path)
+
+ def setTargetDirectory(self, archive, tarfile_path, suite):
+ raise NotImplementedError(
+ "This method should be implemented in subclasses")
+
+ def publishPublicKey(self, key):
+ """Record this key as having been used in this upload."""
+ self.public_keys.add(key)
+
+ def copyPublishedPublicKeys(self):
+ raise NotImplementedError(
+ "This method should be implemented in subclasses")
+
+ def setSigningOptions(self):
+ """Find and extract raw-signing options from the tarball."""
+ self.signing_options = {}
+
+ # Look for an options file in the top level control directory.
+ options_file = os.path.join(self.tmpdir, self.version,
+ "control", "options")
+ if not os.path.exists(options_file):
+ return
+
+ with open(options_file) as options_fd:
+ for option in options_fd:
+ self.signing_options[option.strip()] = True
+
+ @classmethod
+ def getSeriesKey(cls, tarfile_path):
+ try:
+ package, _, arch = cls.parsePath(tarfile_path)
+ return package, arch
+ except ValueError:
+ return None
+
+ def findSigningHandlers(self):
+ raise NotImplementedError(
+ "This method should be implemented in subclasses")
+
+ def convertToTarball(self):
+ """Convert unpacked output to signing tarball."""
+ tarfilename = os.path.join(self.tmpdir, "signed.tar.gz")
+ versiondir = os.path.join(self.tmpdir, self.version)
+
+ try:
+ with tarfile.open(tarfilename, "w:gz") as tarball:
+ tarball.add(versiondir, arcname=self.version)
+ except tarfile.TarError as exc:
+ raise SigningUploadPackError(tarfilename, exc)
+
+ # Clean out the original tree and move the signing tarball in.
+ try:
+ shutil.rmtree(versiondir)
+ os.mkdir(versiondir)
+ os.rename(tarfilename, os.path.join(versiondir, "signed.tar.gz"))
+ except OSError as exc:
+ raise SigningUploadPackError(tarfilename, exc)
+
+ def extract(self):
+ """Copy the custom upload to a temporary directory, and sign it.
+
+ No actual extraction is required.
+ """
+ super(BaseSigningUpload, self).extract()
+ self.setSigningOptions()
+ for (filename, handler) in self.findSigningHandlers():
+ if (handler(filename) == 0 and
+ 'signed-only' in self.signing_options):
+ os.unlink(filename)
+
+ # Copy out the public keys where they were used.
+ self.copyPublishedPublicKeys()
+
+ # If tarball output is requested, tar up the results.
+ if 'tarball' in self.signing_options:
+ self.convertToTarball()
+
+ def installFiles(self, archive, suite):
+ """After installation hash and sign the installed result."""
+ # Avoid circular import.
+ from lp.archivepublisher.publishing import DirectoryHash
+
+ super(BaseSigningUpload, self).installFiles(archive, suite)
+
+ versiondir = os.path.join(self.targetdir, self.version)
+ with DirectoryHash(versiondir, self.temproot) as hasher:
+ hasher.add_dir(versiondir)
+ for checksum_path in hasher.checksum_paths:
+ if self.shouldSign(checksum_path):
+ self.sign(archive, suite, checksum_path)
+
+ def shouldInstall(self, filename):
+ return filename.startswith("%s/" % self.version)
+
+ def shouldSign(self, filename):
+ return filename.endswith("SUMS")
+
+
+class LocalSigningUpload(BaseSigningUpload):
"""Signing custom upload.
The filename must be of the form:
@@ -70,17 +202,8 @@ class SigningUpload(CustomUpload):
dists_directory = "signed"
- @staticmethod
- def parsePath(tarfile_path):
- tarfile_base = os.path.basename(tarfile_path)
- bits = tarfile_base.split("_")
- if len(bits) != 3:
- raise ValueError("%s is not TYPE_VERSION_ARCH" % tarfile_base)
- return bits[0], bits[1], bits[2].split(".")[0]
-
- def setComponents(self, tarfile_path):
- self.package, self.version, self.arch = self.parsePath(
- tarfile_path)
+ def __init__(self, *args, **kwargs):
+ super(LocalSigningUpload, self).__init__(*args, **kwargs)
def getSeriesPath(self, pubconf, key_name, archive, signing_for):
"""Find the key path for a given series.
@@ -167,10 +290,6 @@ class SigningUpload(CustomUpload):
self.public_keys = set()
- def publishPublicKey(self, key):
- """Record this key as having been used in this upload."""
- self.public_keys.add(key)
-
def copyPublishedPublicKeys(self):
"""Copy out published keys into the custom upload."""
keydir = os.path.join(self.tmpdir, self.version, "control")
@@ -183,29 +302,7 @@ class SigningUpload(CustomUpload):
else:
if self.logger is not None:
self.logger.warning(
- "%s: public key not world readable" % key)
-
- def setSigningOptions(self):
- """Find and extract raw-signing options from the tarball."""
- self.signing_options = {}
-
- # Look for an options file in the top level control directory.
- options_file = os.path.join(self.tmpdir, self.version,
- "control", "options")
- if not os.path.exists(options_file):
- return
-
- with open(options_file) as options_fd:
- for option in options_fd:
- self.signing_options[option.strip()] = True
-
- @classmethod
- def getSeriesKey(cls, tarfile_path):
- try:
- package, _, arch = cls.parsePath(tarfile_path)
- return package, arch
- except ValueError:
- return None
+ "%s: public key not world readable" % key)
def callLog(self, description, cmdl):
status = subprocess.call(cmdl)
@@ -441,67 +538,129 @@ class SigningUpload(CustomUpload):
image_signed]
return self.callLog("FIT signing", cmdl)
- def convertToTarball(self):
- """Convert unpacked output to signing tarball."""
- tarfilename = os.path.join(self.tmpdir, "signed.tar.gz")
- versiondir = os.path.join(self.tmpdir, self.version)
- try:
- with tarfile.open(tarfilename, "w:gz") as tarball:
- tarball.add(versiondir, arcname=self.version)
- except tarfile.TarError as exc:
- raise SigningUploadPackError(tarfilename, exc)
+class SigningServiceUpload(BaseSigningUpload):
+ """SigningUpload version that uses lp-signing service to sign things,
+ instead of running signing commands locally
+ """
- # Clean out the original tree and move the signing tarball in.
- try:
- shutil.rmtree(versiondir)
- os.mkdir(versiondir)
- os.rename(tarfilename, os.path.join(versiondir, "signed.tar.gz"))
- except OSError as exc:
- raise SigningUploadPackError(tarfilename, exc)
+ custom_type = "signing"
- def extract(self):
- """Copy the custom upload to a temporary directory, and sign it.
+ dists_directory = "signed"
- No actual extraction is required.
- """
- super(SigningUpload, self).extract()
- self.setSigningOptions()
- filehandlers = list(self.findSigningHandlers())
- for (filename, handler) in filehandlers:
- if (handler(filename) == 0 and
- 'signed-only' in self.signing_options):
- os.unlink(filename)
+ def __init__(self, *args, **kwargs):
+ super(SigningServiceUpload, self).__init__(*args, **kwargs)
+ # Attributes only used by lp-signing-enabled version
+ self.distro_series = None
+ self.pubconf = None
- # Copy out the public keys where they were used.
- self.copyPublishedPublicKeys()
+ def setTargetDirectory(self, archive, tarfile_path, suite):
+ self.pubconf = getPubConfig(archive)
+ self.archive = archive
+ distro_series_name = suite.split('-')[0]
+ distro_series_set = removeSecurityProxy(getUtility(IDistroSeriesSet))
+ self.distro_series = distro_series_set.queryByName(
+ self.archive.distribution, distro_series_name)
- # If tarball output is requested, tar up the results.
- if 'tarball' in self.signing_options:
- self.convertToTarball()
+ self.autokey = self.pubconf.signingautokey
+ self.setComponents(tarfile_path)
+ dists_signed = os.path.join(self.pubconf.archiveroot, "dists",
+ suite, "main", self.dists_directory)
+ self.targetdir = os.path.join(
+ dists_signed, "%s-%s" % (self.package, self.arch))
+ self.archiveroot = self.pubconf.archiveroot
+ self.temproot = self.pubconf.temproot
+ self.public_keys = set()
- def installFiles(self, archive, suite):
- """After installation hash and sign the installed result."""
- # Avoid circular import.
- from lp.archivepublisher.publishing import DirectoryHash
+ def findSigningHandlers(self):
+ # Avoid circular import issue
+ from lp.services.signing.model.signingkey import ArchiveSigningKey
- super(SigningUpload, self).installFiles(archive, suite)
+ keys = ArchiveSigningKey.get_signing_keys(
+ self.archive, self.distro_series)
- versiondir = os.path.join(self.targetdir, self.version)
- with DirectoryHash(versiondir, self.temproot) as hasher:
- hasher.add_dir(versiondir)
- for checksum_path in hasher.checksum_paths:
- if self.shouldSign(checksum_path):
- self.sign(archive, suite, checksum_path)
+ for dirpath, dirnames, filenames in scandir.walk(self.tmpdir):
+ for filename in filenames:
+ file_path = os.path.join(dirpath, filename)
+ if filename.endswith(".efi"):
+ key_type = SigningKeyType.UEFI
+ elif filename.endswith(".ko"):
+ key_type = SigningKeyType.KMOD
+ elif filename.endswith(".opal"):
+ key_type = SigningKeyType.OPAL
+ elif filename.endswith(".sipl"):
+ key_type = SigningKeyType.SIPL
+ elif filename.endswith(".fit"):
+ key_type = SigningKeyType.FIT
+ else:
+ continue
- def shouldInstall(self, filename):
- return filename.startswith("%s/" % self.version)
+ key = keys.get(key_type)
+ handler = partial(self.signUsingKey, key_type, key)
+ yield file_path, handler
+
+ def signUsingKey(self, key_type, key, filename):
+ if key is None:
+ if not self.autokey:
+ return
+ key = ArchiveSigningKey.generate(key_type, self.archive)
+
+ signing_key = key.signing_key
+ with open(filename) as fd:
+ content = fd.read()
+
+ signed_content = signing_key.sign(content)
+
+ if key_type in (SigningKeyType.UEFI, SigningKeyType.FIT):
+ file_sufix = ".signed"
+ public_key_sufix = ".crt"
+ else:
+ file_sufix = ".sig"
+ public_key_sufix = ".x509"
+
+ signed_filename = filename + file_sufix
+ public_key_filename = key_type.name.lower() + public_key_sufix
+
+ with open(signed_filename, 'w') as fd:
+ fd.write(signed_content)
+
+ self.publishPublicKey((public_key_filename, signing_key.public_key))
+ # For historical reason, this method returns zero if everything went
+ # well (to keep compatibility with LocalSigningUpload class).
+ return 0
+
+ def copyPublishedPublicKeys(self):
+ """Copy out published keys into the custom upload."""
+ keydir = os.path.join(self.tmpdir, self.version, "control")
+ if not os.path.exists(keydir):
+ os.makedirs(keydir)
+ for filename, content in self.public_keys:
+ file_path = os.path.join(keydir, os.path.basename(filename))
+ with open(file_path, 'w') as fd:
+ fd.write(content)
- def shouldSign(self, filename):
- return filename.endswith("SUMS")
+class SigningUpload:
+ """Temp toggle to return the correct implementation of SigningUpload.
-class UefiUpload(SigningUpload):
+ Once we decide to turn off the local management of keys (using
+ filesystems and local commands), we can deprecate this class and simply
+ rename SigningServiceUpload to SigningUpload.
+ """
+ def __new__(cls, *args, **kwargs):
+ if should_use_signing_service():
+ klass = SigningServiceUpload
+ else:
+ klass = LocalSigningUpload
+ obj = klass.__new__(klass)
+ # __init__ should be called here, since we are not returning an
+ # instance of SigningUpload directly (and python will not call
+ # __init__ automatically in this case)
+ obj.__init__(*args, **kwargs)
+ return obj
+
+
+class LocalUefiUpload(LocalSigningUpload):
"""Legacy UEFI Signing custom upload.
Provides backwards compatibility UEFI signing uploads. Existing
@@ -514,5 +673,25 @@ class UefiUpload(SigningUpload):
packages are converted to the new form and location.
"""
custom_type = "uefi"
+ dists_directory = "uefi"
+
+class ServiceUefiUpload(SigningServiceUpload):
+ custom_type = "uefi"
dists_directory = "uefi"
+
+
+class UefiUpload:
+ """Temp toggle to return the correct implementation of UefiUpload.
+
+ See @SigningUpload temporary toggle comments. This class is exactly the
+ same, but for the legacy UefiUpload.
+ """
+ def __new__(cls, *args, **kwargs):
+ if should_use_signing_service():
+ klass = ServiceUefiUpload
+ else:
+ klass = LocalUefiUpload
+ obj = klass.__new__(klass)
+ obj.__init__(*args, **kwargs)
+ return obj
\ No newline at end of file
diff --git a/lib/lp/archivepublisher/tests/archive-signing.txt b/lib/lp/archivepublisher/tests/archive-signing.txt
index 853398e..f2dfca9 100644
--- a/lib/lp/archivepublisher/tests/archive-signing.txt
+++ b/lib/lp/archivepublisher/tests/archive-signing.txt
@@ -127,10 +127,10 @@ location defined by the system configuration.
/var/tmp/ppa-signing-keys.test
In order to manipulate 'signing_keys' securily the target archive
-object has to be adapted to `IArchiveSigningKey`.
+object has to be adapted to `IArchiveGPGSigningKey`.
- >>> from lp.archivepublisher.interfaces.archivesigningkey import (
- ... IArchiveSigningKey)
+ >>> from lp.archivepublisher.interfaces.archivegpgsigningkey import (
+ ... IArchiveGPGSigningKey)
We will adapt Celso's PPA after modifing its distribution to allow
proper publish configuration based on the sampledata.
@@ -142,15 +142,15 @@ proper publish configuration based on the sampledata.
>>> cprov.archive.distribution = getUtility(
... IDistributionSet).getByName('ubuntutest')
- >>> archive_signing_key = IArchiveSigningKey(cprov.archive)
+ >>> archive_signing_key = IArchiveGPGSigningKey(cprov.archive)
-Once adapted `IArchiveSigningKey` is properly implemented.
+Once adapted `IArchiveGPGSigningKey` is properly implemented.
>>> from zope.interface.verify import verifyObject
- >>> verifyObject(IArchiveSigningKey, archive_signing_key)
+ >>> verifyObject(IArchiveGPGSigningKey, archive_signing_key)
True
-`IArchiveSigningKey` object contain the corresponding IArchive
+`IArchiveGPGSigningKey` object contain the corresponding IArchive
object.
>>> print archive_signing_key.archive.displayname
@@ -185,7 +185,7 @@ in the expected path.
... archive_signing_key.getPathForSecretKey(mock_key)).read()
Secret True
-At this point we can use the `IArchiveSigningKey` to generate and
+At this point we can use the `IArchiveGPGSigningKey` to generate and
assign a real signing_key, although this procedure depends heavily on
machine entropy and ends up being very slow in our test machine.
@@ -193,7 +193,7 @@ machine entropy and ends up being very slow in our test machine.
We will use a pre-existing key in our tree which is virtually
identical to the one that would be generated. The key will be 'set' by
-using a method `IArchiveSigningKey` skips the key generation but uses
+using a method `IArchiveGPGSigningKey` skips the key generation but uses
exactly the same procedure for setting the signing_key information.
>>> import os
@@ -300,7 +300,7 @@ Default PPAs are always created first and thus get their keys generated
before the named-ppa for the same owner. We submit the named-ppa to
the key generation procedure, as it would be normally in production.
- >>> named_ppa_signing_key = IArchiveSigningKey(named_ppa)
+ >>> named_ppa_signing_key = IArchiveGPGSigningKey(named_ppa)
>>> named_ppa_signing_key.generateSigningKey()
Instead of generating a new key, the signing key from the default ppa
@@ -348,7 +348,7 @@ in the default PPA context then propagated to the named-ppa. The key is
named after the user, even if the default PPA name is something different.
>>> cprov.display_name = "Not Celso Providelo"
- >>> named_ppa_signing_key = IArchiveSigningKey(named_ppa)
+ >>> named_ppa_signing_key = IArchiveGPGSigningKey(named_ppa)
>>> named_ppa_signing_key.generateSigningKey()
Generating: Launchpad PPA for Not Celso Providelo
@@ -366,7 +366,7 @@ Restore the original functionality of GPGHandler.
Signing PPA repository
----------------------
-`IArchiveSigningKey.signRepository` can be user to sign repositories
+`IArchiveGPGSigningKey.signRepository` can be user to sign repositories
for archive which already contains a 'signing_key'.
Celso's default PPA will uses the testing signing key.
diff --git a/lib/lp/archivepublisher/tests/test_archivesigningkey.py b/lib/lp/archivepublisher/tests/test_archivegpgsigningkey.py
similarity index 95%
rename from lib/lp/archivepublisher/tests/test_archivesigningkey.py
rename to lib/lp/archivepublisher/tests/test_archivegpgsigningkey.py
index a394823..47600b6 100644
--- a/lib/lp/archivepublisher/tests/test_archivesigningkey.py
+++ b/lib/lp/archivepublisher/tests/test_archivegpgsigningkey.py
@@ -1,7 +1,7 @@
# Copyright 2016-2019 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
-"""Test ArchiveSigningKey."""
+"""Test ArchiveGPGSigningKey."""
from __future__ import absolute_import, print_function, unicode_literals
@@ -16,8 +16,8 @@ from twisted.internet import defer
from zope.component import getUtility
from lp.archivepublisher.config import getPubConfig
-from lp.archivepublisher.interfaces.archivesigningkey import (
- IArchiveSigningKey,
+from lp.archivepublisher.interfaces.archivegpgsigningkey import (
+ IArchiveGPGSigningKey,
ISignableArchive,
)
from lp.archivepublisher.interfaces.publisherconfig import IPublisherConfigSet
@@ -51,7 +51,7 @@ class TestSignableArchiveWithSigningKey(TestCaseWithFactory):
with InProcessKeyServerFixture() as keyserver:
yield keyserver.start()
key_path = os.path.join(gpgkeysdir, 'ppa-sample@xxxxxxxxxxxxxxxxx')
- yield IArchiveSigningKey(self.archive).setSigningKey(
+ yield IArchiveGPGSigningKey(self.archive).setSigningKey(
key_path, async_keyserver=True)
def test_signFile_absolute_within_archive(self):
diff --git a/lib/lp/archivepublisher/tests/test_customupload.py b/lib/lp/archivepublisher/tests/test_customupload.py
index a5f8a89..43a88db 100644
--- a/lib/lp/archivepublisher/tests/test_customupload.py
+++ b/lib/lp/archivepublisher/tests/test_customupload.py
@@ -33,8 +33,8 @@ from lp.archivepublisher.customupload import (
CustomUploadTarballBadSymLink,
CustomUploadTarballInvalidFileType,
)
-from lp.archivepublisher.interfaces.archivesigningkey import (
- IArchiveSigningKey,
+from lp.archivepublisher.interfaces.archivegpgsigningkey import (
+ IArchiveGPGSigningKey,
)
from lp.archivepublisher.interfaces.publisherconfig import IPublisherConfigSet
from lp.archivepublisher.tests.test_run_parts import RunPartsMixin
@@ -260,7 +260,7 @@ class TestSigning(TestCaseWithFactory, RunPartsMixin):
self.assertIsNone(self.archive.signing_key)
self.useFixture(InProcessKeyServerFixture()).start()
key_path = os.path.join(gpgkeysdir, 'ppa-sample@xxxxxxxxxxxxxxxxx')
- yield IArchiveSigningKey(self.archive).setSigningKey(
+ yield IArchiveGPGSigningKey(self.archive).setSigningKey(
key_path, async_keyserver=True)
self.assertIsNotNone(self.archive.signing_key)
custom_processor = CustomUpload()
@@ -280,7 +280,8 @@ class TestSigning(TestCaseWithFactory, RunPartsMixin):
write_file(filename, b"contents")
self.assertIsNone(self.archive.signing_key)
run_parts_fixture = self.useFixture(MonkeyPatch(
- "lp.archivepublisher.archivesigningkey.run_parts", FakeMethod()))
+ "lp.archivepublisher.archivegpgsigningkey.run_parts",
+ FakeMethod()))
custom_processor = CustomUpload()
custom_processor.sign(self.archive, "suite", filename)
args, kwargs = run_parts_fixture.new_value.calls[0]
diff --git a/lib/lp/archivepublisher/tests/test_publishdistro.py b/lib/lp/archivepublisher/tests/test_publishdistro.py
index d81421e..3d6d699 100644
--- a/lib/lp/archivepublisher/tests/test_publishdistro.py
+++ b/lib/lp/archivepublisher/tests/test_publishdistro.py
@@ -24,8 +24,8 @@ from zope.security.proxy import removeSecurityProxy
from lp.app.interfaces.launchpad import ILaunchpadCelebrities
from lp.archivepublisher.config import getPubConfig
-from lp.archivepublisher.interfaces.archivesigningkey import (
- IArchiveSigningKey,
+from lp.archivepublisher.interfaces.archivegpgsigningkey import (
+ IArchiveGPGSigningKey,
)
from lp.archivepublisher.interfaces.publisherconfig import IPublisherConfigSet
from lp.archivepublisher.publishing import Publisher
@@ -256,7 +256,7 @@ class TestPublishDistro(TestNativePublishingBase):
self.setUpRequireSigningKeys()
yield self.useFixture(InProcessKeyServerFixture()).start()
key_path = os.path.join(gpgkeysdir, 'ppa-sample@xxxxxxxxxxxxxxxxx')
- yield IArchiveSigningKey(cprov.archive).setSigningKey(
+ yield IArchiveGPGSigningKey(cprov.archive).setSigningKey(
key_path, async_keyserver=True)
name16.archive.signing_key_owner = cprov.archive.signing_key_owner
name16.archive.signing_key_fingerprint = (
@@ -308,7 +308,7 @@ class TestPublishDistro(TestNativePublishingBase):
self.setUpRequireSigningKeys()
yield self.useFixture(InProcessKeyServerFixture()).start()
key_path = os.path.join(gpgkeysdir, 'ppa-sample@xxxxxxxxxxxxxxxxx')
- yield IArchiveSigningKey(private_ppa).setSigningKey(
+ yield IArchiveGPGSigningKey(private_ppa).setSigningKey(
key_path, async_keyserver=True)
# Try a plain PPA run, to ensure the private one is NOT published.
@@ -413,7 +413,7 @@ class TestPublishDistro(TestNativePublishingBase):
self.setUpRequireSigningKeys()
yield self.useFixture(InProcessKeyServerFixture()).start()
key_path = os.path.join(gpgkeysdir, 'ppa-sample@xxxxxxxxxxxxxxxxx')
- yield IArchiveSigningKey(archive).setSigningKey(
+ yield IArchiveGPGSigningKey(archive).setSigningKey(
key_path, async_keyserver=True)
self.layer.txn.commit()
diff --git a/lib/lp/archivepublisher/tests/test_publisher.py b/lib/lp/archivepublisher/tests/test_publisher.py
index ec60e1f..8fd5616 100644
--- a/lib/lp/archivepublisher/tests/test_publisher.py
+++ b/lib/lp/archivepublisher/tests/test_publisher.py
@@ -67,8 +67,8 @@ from zope.security.proxy import removeSecurityProxy
from lp.archivepublisher.config import getPubConfig
from lp.archivepublisher.diskpool import DiskPool
-from lp.archivepublisher.interfaces.archivesigningkey import (
- IArchiveSigningKey,
+from lp.archivepublisher.interfaces.archivegpgsigningkey import (
+ IArchiveGPGSigningKey,
)
from lp.archivepublisher.publishing import (
BY_HASH_STAY_OF_EXECUTION,
@@ -3211,7 +3211,7 @@ class TestPublisherRepositorySignatures(
# Set a signing key for Celso's PPA.
key_path = os.path.join(gpgkeysdir, 'ppa-sample@xxxxxxxxxxxxxxxxx')
- yield IArchiveSigningKey(cprov.archive).setSigningKey(
+ yield IArchiveGPGSigningKey(cprov.archive).setSigningKey(
key_path, async_keyserver=True)
self.assertTrue(cprov.archive.signing_key is not None)
diff --git a/lib/lp/archivepublisher/tests/test_signing.py b/lib/lp/archivepublisher/tests/test_signing.py
index 3966362..cf7ef09 100644
--- a/lib/lp/archivepublisher/tests/test_signing.py
+++ b/lib/lp/archivepublisher/tests/test_signing.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2019 Canonical Ltd. This software is licensed under the
+# Copyright 2012-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Test UEFI custom uploads."""
@@ -13,6 +13,7 @@ import stat
import tarfile
from fixtures import MonkeyPatch
+import responses
import scandir
from testtools.matchers import (
Contains,
@@ -21,6 +22,7 @@ from testtools.matchers import (
Matcher,
MatchesAll,
MatchesDict,
+ MatchesStructure,
Mismatch,
Not,
StartsWith,
@@ -34,8 +36,8 @@ from lp.archivepublisher.customupload import (
CustomUploadAlreadyExists,
CustomUploadBadUmask,
)
-from lp.archivepublisher.interfaces.archivesigningkey import (
- IArchiveSigningKey,
+from lp.archivepublisher.interfaces.archivegpgsigningkey import (
+ IArchiveGPGSigningKey,
)
from lp.archivepublisher.interfaces.publisherconfig import IPublisherConfigSet
from lp.archivepublisher.signing import (
@@ -43,7 +45,14 @@ from lp.archivepublisher.signing import (
UefiUpload,
)
from lp.archivepublisher.tests.test_run_parts import RunPartsMixin
+from lp.services.features.testing import FeatureFixture
from lp.services.osutils import write_file
+from lp.services.signing.model.signingkey import ArchiveSigningKey
+from lp.services.signing.proxy import (
+ SigningKeyType,
+ SigningService,
+ )
+from lp.services.signing.tests.test_proxy import SigningServiceResponseFactory
from lp.services.tarfile_helpers import LaunchpadWriteTarFile
from lp.soyuz.enums import ArchivePurpose
from lp.testing import TestCaseWithFactory
@@ -79,6 +88,7 @@ class SignedMatches(Matcher):
class FakeMethodCallLog(FakeMethod):
"""Fake execution general commands."""
+
def __init__(self, upload=None, *args, **kwargs):
super(FakeMethodCallLog, self).__init__(*args, **kwargs)
self.upload = upload
@@ -169,7 +179,6 @@ class FakeMethodCallLog(FakeMethod):
class TestSigningHelpers(TestCaseWithFactory):
-
layer = ZopelessDatabaseLayer
run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=10)
@@ -212,7 +221,7 @@ class TestSigningHelpers(TestCaseWithFactory):
with InProcessKeyServerFixture() as keyserver:
yield keyserver.start()
key_path = os.path.join(gpgkeysdir, 'ppa-sample@xxxxxxxxxxxxxxxxx')
- yield IArchiveSigningKey(self.archive).setSigningKey(
+ yield IArchiveGPGSigningKey(self.archive).setSigningKey(
key_path, async_keyserver=True)
def setUpUefiKeys(self, create=True, series=None):
@@ -267,7 +276,7 @@ class TestSigningHelpers(TestCaseWithFactory):
return os.path.join(pubconf.archiveroot, "dists", self.suite, "main")
-class TestSigning(RunPartsMixin, TestSigningHelpers):
+class TestLocalSigningUpload(RunPartsMixin, TestSigningHelpers):
def getSignedPath(self, loader_type, arch):
return os.path.join(self.getDistsPath(), "signed",
@@ -1290,7 +1299,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
self.tarfile.add_file("1.0/empty.sipl", b"")
self.process_emulate()
sha256file = os.path.join(self.getSignedPath("test", "amd64"),
- "1.0", "SHA256SUMS")
+ "1.0", "SHA256SUMS")
self.assertTrue(os.path.exists(sha256file))
@defer.inlineCallbacks
@@ -1309,7 +1318,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
self.tarfile.add_file("1.0/empty.sipl", b"")
self.process_emulate()
sha256file = os.path.join(self.getSignedPath("test", "amd64"),
- "1.0", "SHA256SUMS")
+ "1.0", "SHA256SUMS")
self.assertTrue(os.path.exists(sha256file))
self.assertThat(
sha256file + '.gpg',
@@ -1333,7 +1342,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
self.tarfile.add_file("1.0/empty.sipl", b"")
self.process_emulate()
sha256file = os.path.join(self.getSignedPath("test", "amd64"),
- "1.0", "SHA256SUMS")
+ "1.0", "SHA256SUMS")
self.assertTrue(os.path.exists(sha256file))
self.assertThat(
sha256file + '.gpg',
@@ -1344,10 +1353,10 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
"1.0", "signed.tar.gz")
with tarfile.open(tarfilename) as tarball:
self.assertThat(tarball.getnames(), MatchesAll(*[
- Not(Contains(name)) for name in [
- "1.0/SHA256SUMS", "1.0/SHA256SUMS.gpg",
- "1.0/signed.tar.gz",
- ]]))
+ Not(Contains(name)) for name in [
+ "1.0/SHA256SUMS", "1.0/SHA256SUMS.gpg",
+ "1.0/signed.tar.gz",
+ ]]))
def test_checksumming_tree_signed_with_external_run_parts(self):
# Checksum files can be signed using an external run-parts helper.
@@ -1356,7 +1365,8 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
# run_parts is called.
self.enableRunParts(distribution_name=self.distro.name)
run_parts_fixture = self.useFixture(MonkeyPatch(
- "lp.archivepublisher.archivesigningkey.run_parts", FakeMethod()))
+ "lp.archivepublisher.archivegpgsigningkey.run_parts",
+ FakeMethod()))
self.setUpUefiKeys()
self.setUpKmodKeys()
self.setUpOpalKeys()
@@ -1367,7 +1377,7 @@ class TestSigning(RunPartsMixin, TestSigningHelpers):
self.tarfile.add_file("1.0/empty.sipl", "")
self.process_emulate()
sha256file = os.path.join(self.getSignedPath("test", "amd64"),
- "1.0", "SHA256SUMS")
+ "1.0", "SHA256SUMS")
self.assertTrue(os.path.exists(sha256file))
self.assertEqual(1, run_parts_fixture.new_value.call_count)
args, kwargs = run_parts_fixture.new_value.calls[-1]
@@ -1498,3 +1508,328 @@ class TestUefi(TestSigningHelpers):
self.getDistsPath(), "uefi")))
self.assertTrue(os.path.exists(os.path.join(
self.getSignedPath("test", "amd64"), "1.0", "empty.efi")))
+
+
+class TestSigningUploadWithSigningService(TestSigningHelpers):
+ """Tests for SigningUpload using lp-signing service
+ """
+ layer = ZopelessDatabaseLayer
+
+ def setUp(self, *args, **kwargs):
+ super(TestSigningUploadWithSigningService, self).setUp(
+ *args, **kwargs)
+ self.useFixture(FeatureFixture({'lp.services.signing.enabled': True}))
+ self.signing_service = SigningServiceResponseFactory()
+
+ def tearDown(self):
+ super(TestSigningUploadWithSigningService, self).tearDown()
+ # Cleanup SigningService caches every round
+ SigningService._instance = None
+
+ @staticmethod
+ def get_filelist_content(basedir, filenames):
+ contents = []
+ for filename in filenames:
+ with open(os.path.join(basedir, filename)) as fd:
+ contents.append(fd.read())
+ return contents
+
+ def getSignedPath(self, loader_type, arch):
+ return os.path.join(self.getDistsPath(), "signed",
+ "%s-%s" % (loader_type, arch))
+
+ def process_emulate(self):
+ """Shotcut to the close tarfile and run SigningUpload.process
+ """
+ self.tarfile.close()
+ self.buffer.close()
+
+ upload = SigningUpload()
+ upload.process(self.archive, self.path, self.suite)
+ return upload
+
+ @responses.activate
+ def test_set_target_directory_with_distroseries(self):
+ archive = self.factory.makeArchive()
+ series_name = archive.distribution.series[1].name
+
+ upload = SigningUpload()
+ upload.setTargetDirectory(
+ archive, "test_1.0_amd64.tar.gz", series_name)
+
+ pubconfig = getPubConfig(archive)
+ self.assertThat(upload, MatchesStructure.byEquality(
+ distro_series=archive.distribution.series[1],
+ archive=archive,
+ autokey=pubconfig.signingautokey))
+ self.assertEqual(0, len(responses.calls))
+
+ @responses.activate
+ def test_options_handling_single(self):
+ """If the configured key/cert are missing, processing succeeds but
+ nothing is signed.
+ """
+ self.openArchive("test", "1.0", "amd64")
+ self.tarfile.add_file("1.0/control/options", b"first\n")
+
+ upload = self.process_emulate()
+
+ self.assertContentEqual(['first'], upload.signing_options.keys())
+ self.assertEqual(0, len(responses.calls))
+
+ @responses.activate
+ def test_options_handling_multiple(self):
+ """If the configured key/cert are missing, processing succeeds but
+ nothing is signed.
+ """
+ self.signing_service.patch()
+ self.openArchive("test", "1.0", "amd64")
+ self.tarfile.add_file("1.0/control/options", b"first\nsecond\n")
+
+ upload = self.process_emulate()
+
+ self.assertContentEqual(['first', 'second'],
+ upload.signing_options.keys())
+ self.assertEqual(0, len(responses.calls))
+
+ @responses.activate
+ def test_options_tarball(self):
+ """Specifying the "tarball" option should create an tarball in tmpdir.
+ """
+ self.signing_service.patch()
+ # Use PPA to enable autokey and actually sign things.
+ self.setUpPPA()
+ self.openArchive("test", "1.0", "amd64")
+ self.tarfile.add_file("1.0/control/options", b"tarball")
+ self.tarfile.add_file("1.0/empty.efi", b"")
+ self.tarfile.add_file("1.0/empty.ko", b"")
+ self.tarfile.add_file("1.0/empty.opal", b"")
+ self.tarfile.add_file("1.0/empty.sipl", b"")
+ self.tarfile.add_file("1.0/empty.fit", b"")
+
+ upload = self.process_emulate()
+
+ self.assertThat(self.getSignedPath("test", "amd64"), SignedMatches([
+ "1.0/SHA256SUMS",
+ "1.0/signed.tar.gz",
+ ]))
+ tarfilename = os.path.join(self.getSignedPath("test", "amd64"),
+ "1.0", "signed.tar.gz")
+ with tarfile.open(tarfilename) as tarball:
+ self.assertContentEqual([
+ '1.0', '1.0/control', '1.0/control/options',
+ '1.0/empty.efi', '1.0/empty.efi.signed',
+ '1.0/control/uefi.crt',
+ '1.0/empty.ko', '1.0/empty.ko.sig', '1.0/control/kmod.x509',
+ '1.0/empty.opal', '1.0/empty.opal.sig',
+ '1.0/control/opal.x509',
+ '1.0/empty.sipl', '1.0/empty.sipl.sig',
+ '1.0/control/sipl.x509',
+ '1.0/empty.fit', '1.0/empty.fit.signed',
+ '1.0/control/fit.crt',
+ ], tarball.getnames())
+
+ @responses.activate
+ def test_options_signed_only(self):
+ """Specifying the "signed-only" option should trigger removal of
+ the source files leaving signatures only.
+ """
+ self.signing_service.patch()
+ # Use PPA to enable autokey and actually sign things.
+ self.setUpPPA()
+ self.openArchive("test", "1.0", "amd64")
+ self.tarfile.add_file("1.0/control/options", b"signed-only")
+ self.tarfile.add_file("1.0/empty.efi", b"")
+ self.tarfile.add_file("1.0/empty.ko", b"")
+ self.tarfile.add_file("1.0/empty.opal", b"")
+ self.tarfile.add_file("1.0/empty.sipl", b"")
+ self.tarfile.add_file("1.0/empty.fit", b"")
+
+ upload = self.process_emulate()
+
+ self.assertThat(self.getSignedPath("test", "amd64"), SignedMatches([
+ "1.0/SHA256SUMS", "1.0/control/options",
+ "1.0/empty.efi.signed", "1.0/control/uefi.crt",
+ "1.0/empty.ko.sig", "1.0/control/kmod.x509",
+ "1.0/empty.opal.sig", "1.0/control/opal.x509",
+ "1.0/empty.sipl.sig", "1.0/control/sipl.x509",
+ "1.0/empty.fit.signed", "1.0/control/fit.crt",
+ ]))
+
+ @responses.activate
+ def test_options_tarball_signed_only(self):
+ """Specifying the "tarball" option should create an tarball in
+ the tmpdir. Adding signed-only should trigger removal of the
+ original files.
+ """
+ self.signing_service.patch()
+ self.setUpPPA()
+ self.openArchive("test", "1.0", "amd64")
+ self.tarfile.add_file("1.0/control/options", b"tarball\nsigned-only")
+ self.tarfile.add_file("1.0/empty.efi", b"")
+ self.tarfile.add_file("1.0/empty.ko", b"")
+ self.tarfile.add_file("1.0/empty.opal", b"")
+ self.tarfile.add_file("1.0/empty.sipl", b"")
+ self.tarfile.add_file("1.0/empty.fit", b"")
+ self.process_emulate()
+ self.assertThat(self.getSignedPath("test", "amd64"), SignedMatches([
+ "1.0/SHA256SUMS",
+ "1.0/signed.tar.gz",
+ ]))
+ tarfilename = os.path.join(self.getSignedPath("test", "amd64"),
+ "1.0", "signed.tar.gz")
+ with tarfile.open(tarfilename) as tarball:
+ self.assertContentEqual([
+ '1.0', '1.0/control', '1.0/control/options',
+ '1.0/empty.efi.signed', '1.0/control/uefi.crt',
+ '1.0/empty.ko.sig', '1.0/control/kmod.x509',
+ '1.0/empty.opal.sig', '1.0/control/opal.x509',
+ '1.0/empty.sipl.sig', '1.0/control/sipl.x509',
+ '1.0/empty.fit.signed', '1.0/control/fit.crt',
+ ], tarball.getnames())
+
+ @responses.activate
+ def test_archive_copy(self):
+ """If there is no key/cert configuration, processing succeeds but
+ nothing is signed.
+ """
+ self.archive = self.factory.makeArchive(
+ distribution=self.distro, purpose=ArchivePurpose.COPY)
+ pubconf = getPubConfig(self.archive)
+ if not os.path.exists(pubconf.temproot):
+ os.makedirs(pubconf.temproot)
+ self.openArchive("test", "1.0", "amd64")
+ self.tarfile.add_file("1.0/empty.efi", b"")
+ self.tarfile.add_file("1.0/empty.ko", b"")
+ self.tarfile.add_file("1.0/empty.opal", b"")
+ self.tarfile.add_file("1.0/empty.sipl", b"")
+ self.tarfile.add_file("1.0/empty.fit", b"")
+ self.tarfile.close()
+ self.buffer.close()
+
+ upload = SigningUpload()
+ upload.process(self.archive, self.path, self.suite)
+
+ signed_path = self.getSignedPath("test", "amd64")
+ self.assertThat(signed_path, SignedMatches(
+ ["1.0/SHA256SUMS", "1.0/empty.efi", "1.0/empty.ko",
+ "1.0/empty.opal", "1.0/empty.sipl", "1.0/empty.fit", ]))
+ self.assertEqual(0, len(responses.calls))
+
+ @responses.activate
+ def test_sign_without_autokey_and_no_key_pre_set(self):
+ self.signing_service.patch()
+
+ filenames = [
+ "1.0/empty.efi", "1.0/empty.ko", "1.0/empty.opal",
+ "1.0/empty.sipl", "1.0/empty.fit"]
+
+ # Write data on the archive
+ self.openArchive("test", "1.0", "amd64")
+ for filename in filenames:
+ self.tarfile.add_file(filename, b"somedata for %s" % filename)
+
+ upload = self.process_emulate()
+
+ self.assertFalse(upload.autokey)
+ self.assertEqual(0, len(responses.calls))
+
+ signed_path = self.getSignedPath("test", "amd64")
+ self.assertThat(signed_path, SignedMatches(
+ ["1.0/SHA256SUMS"] + filenames))
+
+ @responses.activate
+ def test_sign_without_autokey_and_some_keys_pre_set(self):
+ """For no autokey archives, signing process should sign only for the
+ available keys, and skip signing the other files
+ """
+ self.signing_service.patch()
+
+ # Pre-generate KMOD and OPAL keys
+ ArchiveSigningKey.generate(SigningKeyType.KMOD, self.archive)
+ ArchiveSigningKey.generate(SigningKeyType.OPAL, self.archive)
+ # Resets HTTP calls history and SigningService caches
+ responses.calls.reset()
+ SigningService._instance = None
+
+ filenames = [
+ "1.0/empty.efi", "1.0/empty.ko", "1.0/empty.opal",
+ "1.0/empty.sipl", "1.0/empty.fit"]
+
+ self.openArchive("test", "1.0", "amd64")
+ for filename in filenames:
+ self.tarfile.add_file(filename, b"somedata for %s" % filename)
+
+ upload = self.process_emulate()
+
+ signed_path = self.getSignedPath("test", "amd64")
+ self.assertThat(signed_path, SignedMatches(filenames + [
+ "1.0/SHA256SUMS", "1.0/empty.ko.sig", "1.0/empty.opal.sig",
+ "1.0/control/kmod.x509", "1.0/control/opal.x509"]))
+
+ http_calls = ["%s %s" % (call.request.method, call.request.path_url)
+ for call in responses.calls]
+ self.assertEqual([
+ 'POST /nonce', 'GET /service-key', 'POST /sign',
+ 'POST /nonce', 'POST /sign'], http_calls)
+
+ @responses.activate
+ def test_sign_with_autokey_ppa(self):
+ self.signing_service.patch()
+
+ # PPAs should auto-generate keys. Let's use one for this test.
+ self.setUpPPA()
+
+ filenames = [
+ "1.0/empty.efi", "1.0/empty.ko", "1.0/empty.opal",
+ "1.0/empty.sipl", "1.0/empty.fit"]
+
+ self.openArchive("test", "1.0", "amd64")
+ for filename in filenames:
+ self.tarfile.add_file(filename, b"somedata for %s" % filename)
+
+ self.tarfile.close()
+ self.buffer.close()
+
+ upload = SigningUpload()
+ upload.process(self.archive, self.path, self.suite)
+
+ self.assertTrue(upload.autokey)
+
+ expected_signed_filenames = [
+ "1.0/empty.efi.signed", "1.0/empty.ko.sig",
+ "1.0/empty.opal.sig", "1.0/empty.sipl.sig",
+ "1.0/empty.fit.signed"]
+
+ expected_public_keys_filenames = [
+ "1.0/control/uefi.crt", "1.0/control/kmod.x509",
+ "1.0/control/opal.x509", "1.0/control/sipl.x509",
+ "1.0/control/fit.crt"]
+
+ signed_path = self.getSignedPath("test", "amd64")
+ self.assertThat(signed_path, SignedMatches(
+ ["1.0/SHA256SUMS"] + filenames + expected_public_keys_filenames +
+ expected_signed_filenames))
+
+ # 21 calls: 4 (/nonce, /generate, /nonce and /sign) * 5 files
+ # + 1 to get service-key
+ self.assertEqual(21, len(responses.calls))
+
+ # Checks that all files got signed
+ # (patched responses from /sign on lp-signing gets back
+ # "$callIndex::signed!", so we should check this ended up in the
+ # signed files).
+ contents = self.get_filelist_content(
+ signed_path, expected_signed_filenames)
+ expected_signed_contents = {
+ "%s::signed!" % i for i in range(1, len(filenames) + 1)}
+ self.assertItemsEqual(expected_signed_contents, contents)
+
+ # Checks that all public keys ended up in the 1.0/control/xxx files
+ contents = self.get_filelist_content(
+ signed_path, expected_public_keys_filenames)
+ # Patched lp-signing gives back always the same generated public key
+ expected_public_keys = [
+ self.signing_service.generated_public_key
+ for i in range(len(expected_public_keys_filenames))]
+ self.assertEqual(expected_public_keys, contents)
diff --git a/lib/lp/code/model/tests/test_recipebuilder.py b/lib/lp/code/model/tests/test_recipebuilder.py
index fd96be5..c07fce7 100644
--- a/lib/lp/code/model/tests/test_recipebuilder.py
+++ b/lib/lp/code/model/tests/test_recipebuilder.py
@@ -19,8 +19,8 @@ from twisted.trial.unittest import TestCase as TrialTestCase
from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
-from lp.archivepublisher.interfaces.archivesigningkey import (
- IArchiveSigningKey,
+from lp.archivepublisher.interfaces.archivegpgsigningkey import (
+ IArchiveGPGSigningKey,
)
from lp.buildmaster.enums import BuildStatus
from lp.buildmaster.interactor import BuilderInteractor
@@ -347,7 +347,7 @@ class TestAsyncRecipeBuilder(TestRecipeBuilderBase):
yield self.useFixture(InProcessKeyServerFixture()).start()
archive = self.factory.makeArchive()
key_path = os.path.join(gpgkeysdir, "ppa-sample@xxxxxxxxxxxxxxxxx")
- yield IArchiveSigningKey(archive).setSigningKey(
+ yield IArchiveGPGSigningKey(archive).setSigningKey(
key_path, async_keyserver=True)
job = self.makeJob(archive=archive, with_builder=True)
distroarchseries = job.build.distroseries.architectures[0]
diff --git a/lib/lp/services/config/schema-lazr.conf b/lib/lp/services/config/schema-lazr.conf
index d8c1204..1da2d92 100644
--- a/lib/lp/services/config/schema-lazr.conf
+++ b/lib/lp/services/config/schema-lazr.conf
@@ -1671,6 +1671,12 @@ days_considered_recent = 7
# Number of seconds each LoopTuner iteration should take.
looptuner_iteration_duration = 4
+# lp-signing service connection info. See lp-signing's documentation on how to
+# get valid keys. The keys provided below are just examples.
+[signing]
+lp_signing_address = http://signing.launchpad.test:8000
+local_private_key = O73bJzd3hybyBxUKk0FaR6K9CbbmxBYkw6vCrIWZkSY=
+local_public_key = xEtwSS7kdGmo0ElcN2fR/mcHS0A42zhYbo/+5KV4xRs=
# For the personal standing updater cron script.
[standingupdater]
diff --git a/lib/lp/services/signing/__init__.py b/lib/lp/services/signing/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lp/services/signing/__init__.py
diff --git a/lib/lp/services/signing/enums.py b/lib/lp/services/signing/enums.py
new file mode 100644
index 0000000..90c7bc6
--- /dev/null
+++ b/lib/lp/services/signing/enums.py
@@ -0,0 +1,50 @@
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Enum for archive publisher
+"""
+
+__metaclass__ = type
+
+__all__ = [
+ 'SigningKeyType',
+ ]
+
+from lazr.enum import (
+ DBEnumeratedType,
+ DBItem,
+ )
+
+
+class SigningKeyType(DBEnumeratedType):
+ """Available key types on lp-signing service
+ """
+ UEFI = DBItem(0, """
+ UEFI key
+
+ UEFI signing key
+ """)
+
+ KMOD = DBItem(1, """
+ KMOD key
+
+ KMOD signing key
+ """)
+
+ OPAL = DBItem(2, """
+ OPAL key
+
+ OPAL signing key
+ """)
+
+ SIPL = DBItem(3, """
+ SIPL key
+
+ SIPL signing key
+ """)
+
+ FIT = DBItem(4, """
+ FIT key
+
+ FIT signing key
+ """)
\ No newline at end of file
diff --git a/lib/lp/services/signing/interfaces/__init__.py b/lib/lp/services/signing/interfaces/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lp/services/signing/interfaces/__init__.py
diff --git a/lib/lp/services/signing/interfaces/signingkey.py b/lib/lp/services/signing/interfaces/signingkey.py
new file mode 100644
index 0000000..972ba07
--- /dev/null
+++ b/lib/lp/services/signing/interfaces/signingkey.py
@@ -0,0 +1,62 @@
+# Copyright 2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Interfaces for signing keys stored at the signing service."""
+
+__metaclass__ = type
+
+__all__ = [
+ 'ISigningKey',
+ 'IArchiveSigningKey'
+]
+
+from lp.services.signing.enums import SigningKeyType
+from lp.registry.interfaces.distroseries import IDistroSeries
+from lp.soyuz.interfaces.archive import IArchive
+from zope.interface.interface import Interface
+from zope.schema import (
+ Int,
+ Text,
+ Datetime,
+ Choice
+ )
+from lazr.restful.fields import Reference
+from lp import _
+
+
+class ISigningKey(Interface):
+ """A key registered to sign uploaded files"""
+
+ id = Int(title=_('ID'), required=True, readonly=True)
+
+ key_type = Choice(
+ title=_("The signing key type (UEFI, KMOD, etc)."),
+ required=True, readonly=True, vocabulary=SigningKeyType)
+
+ fingerprint = Text(
+ title=_("Fingerprint of the key"), required=True, readonly=True)
+
+ public_key = Text(
+ title=_("Public key binary content"), required=False,
+ readonly=True)
+
+ date_created = Datetime(
+ title=_('When this key was created'), required=True, readonly=True)
+
+
+class IArchiveSigningKey(Interface):
+ """Which signing key should be used by a specific archive"""
+
+ id = Int(title=_('ID'), required=True, readonly=True)
+
+ archive = Reference(
+ IArchive, title=_("Archive"), required=True,
+ description=_("The archive that owns this key."))
+
+ distro_series = Reference(
+ IDistroSeries, title=_("Distro series"), required=False,
+ description=_("The minimum series that uses this key, if any."))
+
+ signing_key = Reference(
+ ISigningKey, title=_("Signing key"), required=True, readonly=True,
+ description=_("Which signing key should be used by this archive"))
diff --git a/lib/lp/services/signing/model/__init__.py b/lib/lp/services/signing/model/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lp/services/signing/model/__init__.py
diff --git a/lib/lp/services/signing/model/signingkey.py b/lib/lp/services/signing/model/signingkey.py
new file mode 100644
index 0000000..a4b9646
--- /dev/null
+++ b/lib/lp/services/signing/model/signingkey.py
@@ -0,0 +1,226 @@
+# Copyright 2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Database classes to manage signing keys stored at the signing service."""
+
+
+__metaclass__ = type
+
+__all__ = [
+ 'SigningKey',
+ 'ArchiveSigningKey',
+ ]
+
+import base64
+from collections import defaultdict
+
+import pytz
+from storm.locals import (
+ DateTime,
+ Int,
+ RawStr,
+ Reference,
+ Unicode,
+ )
+from zope.interface.declarations import implementer
+
+from lp.services.database.constants import (
+ DEFAULT,
+ UTC_NOW,
+ )
+from lp.services.database.enumcol import DBEnum
+from lp.services.database.interfaces import (
+ IMasterStore,
+ IStore,
+ )
+from lp.services.database.stormbase import StormBase
+from lp.services.signing.enums import SigningKeyType
+from lp.services.signing.interfaces.signingkey import (
+ IArchiveSigningKey,
+ ISigningKey,
+ )
+from lp.services.signing.proxy import SigningService
+
+
+@implementer(ISigningKey)
+class SigningKey(StormBase):
+ """A key stored at lp-signing, used to sign uploaded files and packages"""
+
+ __storm_table__ = 'SigningKey'
+
+ id = Int(primary=True)
+
+ key_type = DBEnum(enum=SigningKeyType, allow_none=False)
+
+ description = Unicode(allow_none=True)
+
+ fingerprint = Unicode(allow_none=False)
+
+ public_key = RawStr(allow_none=True)
+
+ date_created = DateTime(
+ allow_none=False, default=UTC_NOW, tzinfo=pytz.UTC)
+
+ def __init__(self, key_type, fingerprint, public_key,
+ description=None, date_created=DEFAULT):
+ """Builds the signing key
+
+ :param key_type: One of the SigningKeyType enum items
+ :param fingerprint: The key's fingerprint
+ :param public_key: The key's public key (raw; not base64-encoded)
+ """
+ super(SigningKey, self).__init__()
+ self.key_type = key_type
+ self.fingerprint = fingerprint
+ self.public_key = public_key
+ self.description = description
+ self.date_created = date_created
+
+ @classmethod
+ def generate(cls, key_type, description=None):
+ """Generates a new signing key on lp-signing and stores it in LP's
+ database.
+
+ :param key_type: One of the SigningKeyType enum's value
+ :param description: (optional) The description associated with this
+ key
+ :returns: The SigningKey object associated with the newly created
+ key at lp-singing"""
+ signing_service = SigningService()
+ generated_key = signing_service.generate(key_type, description)
+ signing_key = SigningKey(
+ key_type=key_type, fingerprint=generated_key['fingerprint'],
+ public_key=base64.b64decode(generated_key['public-key']),
+ description=description)
+ store = IMasterStore(SigningKey)
+ store.add(signing_key)
+ return signing_key
+
+ def sign(self, message, message_name=None):
+ """Sign the given message using this key
+
+ :param message: The message to be signed.
+ :param message_name: A name for the message beign signed.
+ """
+ if self.key_type in (SigningKeyType.UEFI, SigningKeyType.FIT):
+ mode = "ATTACHED"
+ else:
+ mode = "DETACHED"
+ signing_service = SigningService()
+ signed = signing_service.sign(
+ self.key_type, self.fingerprint, message_name, message, mode)
+ return signed['signed-message']
+
+
+@implementer(IArchiveSigningKey)
+class ArchiveSigningKey(StormBase):
+ """Which signing key should be used by a given archive / series.
+ """
+
+ __storm_table__ = 'ArchiveSigningKey'
+
+ id = Int(primary=True)
+
+ archive_id = Int(name="archive")
+ archive = Reference(archive_id, 'Archive.id')
+
+ distro_series_id = Int(name="distro_series", allow_none=True)
+ distro_series = Reference(distro_series_id, 'DistroSeries.id')
+
+ signing_key_id = Int(name="signing_key", allow_none=False)
+ signing_key = Reference(signing_key_id, SigningKey.id)
+
+ def __init__(self, archive, distro_series, signing_key):
+ super(ArchiveSigningKey, self).__init__()
+ self.archive = archive
+ self.distro_series = distro_series
+ self.signing_key = signing_key
+
+ @classmethod
+ def create_or_update(cls, archive, distro_series, signing_key):
+ """Creates a new ArchiveSigningKey, or updates the existing one from
+ the same type to point to the new signing key.
+
+ :return: A tuple like (db_object:ArchiveSigningKey, created:boolean)
+ with the ArchiveSigningKey and True if it was created (
+ False if it was updated).
+ """
+ store = IMasterStore(SigningKey)
+ key_type = signing_key.key_type
+ obj = store.find(ArchiveSigningKey, [
+ ArchiveSigningKey.signing_key_id == SigningKey.id,
+ SigningKey.key_type == key_type,
+ ArchiveSigningKey.distro_series == distro_series,
+ ArchiveSigningKey.archive == archive
+ ]).one()
+ if obj is not None:
+ obj.signing_key = signing_key
+ created = False
+ else:
+ obj = ArchiveSigningKey(archive, distro_series, signing_key)
+ created = True
+ store.add(obj)
+ return obj, created
+
+ @classmethod
+ def get_signing_keys(cls, archive, distro_series):
+ """Get the most suitable keys for a given archive / distro series
+ pair.
+
+ :return: A dict of most suitable key per type, like {
+ SigningKeyType.UEFI: <ArchiveSigningKey object 1>,
+ SigningKeyType.KMOD: <ArchiveSigningKey object 1>, ... }
+ """
+ # Gets all the keys available for the given archive.
+ store = IStore(ArchiveSigningKey)
+ rs = store.find(ArchiveSigningKey, [
+ ArchiveSigningKey.archive == archive])
+
+ # prefetch related signing keys to avoid extra queries.
+ signing_keys = store.find(SigningKey, [
+ SigningKey.id.is_in([i.signing_key_id for i in rs])])
+ signing_keys_by_id = {i.id: i for i in signing_keys}
+
+ # Group keys per type, and per distro series
+ keys_data = defaultdict(dict)
+ for i in rs:
+ signing_key = signing_keys_by_id[i.signing_key_id]
+ keys_data[signing_key.key_type][i.distro_series] = i
+
+ ret_keys = {}
+
+ # Let's search the most suitable per key type.
+ for key_type, keys_per_series in keys_data.items():
+ found_series = False
+ found_key = False
+ for series in archive.distribution.series:
+ if series == distro_series:
+ found_series = True
+ if found_series and series in keys_per_series:
+ ret_keys[key_type] = keys_per_series[series]
+ found_key = True
+ break
+ # If not specific keys for distro_series was found, returns
+ # the keys for the archive itself (or None if no key is
+ # available for the archive either).
+ if not found_series or not found_key:
+ ret_keys[key_type] = keys_per_series.get(None)
+ return ret_keys
+
+ @classmethod
+ def generate(cls, key_type, archive, distro_series=None,
+ description=None):
+ """Generated a new key on signing service, and save it to db.
+
+ :param key_type: One of the SigningKeyType enum's value
+ :param archive: The package Archive that should be associated with
+ this key
+ :param distro_series: (optional) The DistroSeries object
+ :param description: (optional) The description associated with this
+ key
+ :returns: The generated ArchiveSigningKey
+ """
+ signing_key = SigningKey.generate(key_type, description)
+ archive_signing, created = ArchiveSigningKey.create_or_update(
+ archive, distro_series, signing_key)
+ return archive_signing
\ No newline at end of file
diff --git a/lib/lp/services/signing/proxy.py b/lib/lp/services/signing/proxy.py
new file mode 100644
index 0000000..552496e
--- /dev/null
+++ b/lib/lp/services/signing/proxy.py
@@ -0,0 +1,174 @@
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Proxy calls to lp-signing service"""
+
+from __future__ import division
+
+__metaclass__ = type
+
+import base64
+import json
+
+from lazr.restful.utils import get_current_browser_request
+from nacl.encoding import Base64Encoder
+from nacl.public import (
+ Box,
+ PrivateKey,
+ PublicKey,
+ )
+
+from lp.services.config import LaunchpadConfig
+from lp.services.propertycache import cachedproperty
+from lp.services.signing.enums import SigningKeyType
+from lp.services.timeline.requesttimeline import get_request_timeline
+from lp.services.timeout import urlfetch
+
+
+config = LaunchpadConfig()
+
+
+class SigningService:
+ """Representation of lp-signing service REST interface
+
+ This class is a singleton (see __new__ method and _instance attribute).
+ """
+ LP_SIGNING_ADDRESS = config.signing.lp_signing_address
+ LOCAL_PRIVATE_KEY = config.signing.local_private_key
+ LOCAL_PUBLIC_KEY = config.signing.local_public_key
+
+ ATTACHED = "ATTACHED"
+ DETACHED = "DETACHED"
+
+ _instance = None
+
+ def __new__(cls, *args, **kwargs):
+ """Builder for this class to return a singleton instance.
+
+ At first, there will be no way to have multiple different instances
+ of lp-signing running (at least not in a way that launchpad should
+ be aware of). So, keeping this class as a singleton creates the
+ benefit of keeping cached across several points of the system the
+ @cachedproperties we have here (service_public_key, for example,
+ costs an HTTP request every time it needs to fill the cache).
+ """
+ if not isinstance(cls._instance, cls):
+ cls._instance = object.__new__(cls, *args, **kwargs)
+ return cls._instance
+
+ def get_url(self, path):
+ """Shotcut to concatenate LP_SIGNING_ADDRESS with the desired
+ endpoint path.
+
+ :param path: The REST endpoint to be joined.
+ """
+ return self.LP_SIGNING_ADDRESS + path
+
+ def _get_json(self, path, method="GET", **kwargs):
+ """Helper method to do an HTTP request and get back a json from the
+ signing service, raising exception if status code != 2xx.
+ """
+ timeline = get_request_timeline(get_current_browser_request())
+ action = timeline.start(
+ "lp-services-signin-proxy-%s" % method, "%s %s %s" %
+ (path, method, json.dumps(kwargs)))
+
+ try:
+ url = self.get_url(path)
+ response = urlfetch(url, method=method.lower(), **kwargs)
+ response.raise_for_status()
+ return response.json()
+ finally:
+ action.finish()
+
+ @cachedproperty
+ def service_public_key(self):
+ """Returns the lp-signing service's public key.
+ """
+ data = self._get_json("/service-key")
+ return PublicKey(data["service-key"], encoder=Base64Encoder)
+
+ @property
+ def private_key(self):
+ return PrivateKey(self.LOCAL_PRIVATE_KEY, encoder=Base64Encoder)
+
+ def get_nonce(self):
+ """Get nonce, to be used when sending messages.
+ """
+ data = self._get_json("/nonce", "POST")
+ return base64.b64decode(data["nonce"].encode("UTF-8"))
+
+ def _get_auth_headers(self, nonce):
+ """Get headers to call authenticated endpoints.
+
+ :param nonce: The nonce bytes to be used (not the base64 encoded one!)
+ :return: Header dict, ready to be used by requests
+ """
+ return {
+ "Content-Type": "application/x-boxed-json",
+ "X-Client-Public-Key": self.LOCAL_PUBLIC_KEY,
+ "X-Nonce": base64.b64encode(nonce)}
+
+ def _encrypt_payload(self, nonce, message):
+ """Returns the encrypted version of message, base64 encoded and
+ ready to be sent on a HTTP request to lp-signing service.
+
+ :param nonce: The original (non-base64 encoded) nonce
+ :param message: The str message to be encrypted
+ """
+ box = Box(self.private_key, self.service_public_key)
+ encrypted_message = box.encrypt(message, nonce, encoder=Base64Encoder)
+ return encrypted_message.ciphertext
+
+ def generate(self, key_type, description):
+ """Generate a key to be used when signing.
+
+ :param key_type: One of available key types at SigningKeyType
+ :param description: String description of the generated key
+ :return: A dict with 'fingerprint' (str) and 'public-key' (a
+ Base64-encoded NaCl public key)
+ """
+ if key_type not in SigningKeyType.items:
+ raise ValueError("%s is not a valid key type" % key_type)
+ nonce = self.get_nonce()
+ data = json.dumps({
+ "key-type": key_type.name,
+ "description": description,
+ }).encode("UTF-8")
+ return self._get_json(
+ "/generate", "POST", headers=self._get_auth_headers(nonce),
+ data=self._encrypt_payload(nonce, data))
+
+ def sign(self, key_type, fingerprint, message_name, message, mode):
+ """Sign the given message using the specified key_type and a
+ pre-generated fingerprint (see `generate` method).
+
+ :param key_type: One of the key types from SigningKeyType enum
+ :param fingerprint: The fingerprint of the signing key, generated by
+ the `generate` method
+ :param message_name: A description of the message being signed
+ :param message: The message to be signed
+ :param mode: SignService.ATTACHED or SignService.DETACHED
+ :return: A dict with 'public-key' and 'signed-message'
+ """
+ if mode not in {SigningService.ATTACHED, SigningService.DETACHED}:
+ raise ValueError("%s is not a valid mode" % mode)
+ if key_type not in SigningKeyType.items:
+ raise ValueError("%s is not a valid key type" % key_type)
+
+ nonce = self.get_nonce()
+ data = json.dumps({
+ "key-type": key_type.name,
+ "fingerprint": fingerprint,
+ "message-name": message_name,
+ "message": base64.b64encode(message).decode("UTF-8"),
+ "mode": mode,
+ }).encode("UTF-8")
+ data = self._get_json(
+ "/sign", "POST",
+ headers=self._get_auth_headers(nonce),
+ data=self._encrypt_payload(nonce, data))
+
+ return {
+ 'public-key': data['public-key'],
+ 'signed-message': base64.b64decode(data['signed-message'])}
\ No newline at end of file
diff --git a/lib/lp/services/signing/tests/__init__.py b/lib/lp/services/signing/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lp/services/signing/tests/__init__.py
diff --git a/lib/lp/services/signing/tests/test_proxy.py b/lib/lp/services/signing/tests/test_proxy.py
new file mode 100644
index 0000000..28b5b90
--- /dev/null
+++ b/lib/lp/services/signing/tests/test_proxy.py
@@ -0,0 +1,324 @@
+# Copyright 2010-2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+__metaclass__ = type
+
+import base64
+import json
+from collections import defaultdict
+
+import mock
+from lp.services.signing.enums import SigningKeyType
+from mock import ANY
+from nacl.encoding import Base64Encoder
+from nacl.public import PublicKey
+import requests
+import responses
+
+from lp.services.signing.proxy import SigningService
+from lp.testing import TestCaseWithFactory
+from lp.testing.layers import BaseLayer
+
+
+class SigningServiceResponseFactory:
+ """Factory for fake responses from lp-signing service.
+
+ This class is a helper to pretend that lp-signing service is running by
+ mocking `requests` module, and returning fake responses from
+ response.get(url) and response.post(url). See `patch` method.
+ """
+ def __init__(self):
+ self.base64_service_public_key = (
+ u"x7vTtpmn0+DvKNdmtf047fn1JRQI5eMnOQRy3xJ1m10=")
+ self.base64_nonce = u"neSSa2MUZlQU3XiipU2TfiaqW5nrVUpR"
+ self.b64_generated_public_key = (
+ 'MIIFPDCCAySgAwIBAgIUIeKkWwl4R1dFsFrpNcfMxzursvcwDQYJKoZIhvcNAQ'
+ 'ENBQAwGDEWMBQGA1UEAwwNdGVzdCBrZXkgS21vZDAeFw0yMDAxMzExNzI1NTha'
+ 'Fw0zMDAxMjgxNzI1NThaMBgxFjAUBgNVBAMMDXRlc3Qga2V5IEttb2QwggIiMA'
+ '0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQDn8EyLrKwC3KhPa5jG5kZOaxPe'
+ 'GlCjA3S/+A6CgV66a/5Vkx+yGbov39VTCekURTmhcCTz5NDGO5BZ+XECdgezoE'
+ '7D76krWiQYMtukhRqvsh4FwA+wq6aV/As0NGDf6MgSRQL7V0pTRpquP8kUrJvu'
+ 'nVbM+BvdZqaTKOe4HB8juETqTylzcIoLL47AFbWYxUHM8UgDJdd8lycyx2XMpL'
+ 'uRxX0VYJNW9h1VMI15cQMI6+iPyAO2sjRMCqyRQkBN5/UxqsADS2PSHK2+BOZF'
+ 'BnrXs35ZVNIKqY/2PMTuv14oPm4/PM43o4WqxKc8Lew2xEggTFJ6kjSw9NtN+q'
+ 'teVg+ZkTs7Xk4MErkuAojSJkg+ES6GuQjT1JF0aBvrXw2ZaBRYV6IZM7qxpCq/'
+ 'OPkxWokt3Zej0sg1ONYueNl2GCGr+nxUIouG4hdb23El2vk4bfX8RKHTKm2tX6'
+ 'SJtlG3UQY9ezloD/Cwzxvy1JIvTXopci16AYfk40Sx5UWEUG+8J7oa60b3F3tX'
+ 'h2nK62pHeZKiKDJVUEhu5DMYkuFXqs844tcqq2Lp4I9APRATIBpptdgaltRpZv'
+ 's0OLaZfV4HtilVsAZ2OQ1NA73HRi8Nr8ibJQ/Prkv0nwelg1cTv4G2iyOPWJKm'
+ 'p/ElspzMNlOY4amrDagLHbS4im1fy0NrLPBxgwIDAQABo34wfDAMBgNVHRMBAf'
+ '8EAjAAMAsGA1UdDwQEAwIHgDAdBgNVHQ4EFgQUQu55cTFXP8Xpc8KXXoGyjQ4a'
+ '8ZswHwYDVR0jBBgwFoAUQu55cTFXP8Xpc8KXXoGyjQ4a8ZswHwYDVR0lBBgwFg'
+ 'YIKwYBBQUHAwMGCisGAQQBkggQAQIwDQYJKoZIhvcNAQENBQADggIBABfzyFX8'
+ '2SVZkUP1wPkD6IF/cw6WNhHaCFbHaU7KOZc4IartIu+ftNTCPcMdIPmNBCOEdZ'
+ 'srn56UjyLId8x83AQ1Zci8bnKLXm5Jv0LVrrKvNfYPooFqZ2vwKmtdJxEYJtyH'
+ 'x4KOd9cSpzabdZ1l+o9n+mWAAuJWoRhWO1AAdQzXKyNuDgKTXXfgPIV3eQtS+U'
+ '/Ro55FqbJXD52I/T4RZQeW66mTvQsv0XiIjgk/5odfIngdQmGjwLXJvdH0Y/7+'
+ '+pYmigNYv0DgzsBO/hGRHO3fw/OOobJvLa9YuXVn0gRmOHkhiiH2f1wO/xg+ML'
+ 'HeC2Ng8vIEcB9AIZme1rbSonzln87sOPNp/tMV4iuOPXnffd9UWO/7bnxU7F1P'
+ '07iEafLp6Pru8iLixVrBs6o+B88lmkzT7wdA+jXL187X9wrLFdIz96b6+195x5'
+ '569msLewAzAMnldvtDN1JEmusHaQd+BgHlQNd6LUb+Uf4YxjyWE3hGIF1YWgma'
+ '/+oYo03b4VELW7E5z37cWd7q8N5rzcS5oTWx+XWfLikNO/N9nK+REtCcCQvMOU'
+ 'R0OBvL9F1A+vVmY0ffHYHAnoUAhIJ+QtctnyLiL+8WYtTh2v7EYglnsiW3id96'
+ 'k4jd7ojqpCOF9DNyNr1qELk1cb/rReipInCgGFOZodWWCsDiYkLuIu8e')
+ self.generated_public_key = base64.b64decode(
+ self.b64_generated_public_key)
+ self.generated_fingerprint = (
+ u'338D218488DFD597D8FCB9C328C3E9D9ADA16CEE')
+ self.b64_signed_msg = base64.b64encode("the-signed-msg")
+
+ @classmethod
+ def get_url(cls, path):
+ """Shortcut to get full path of an endpoint at lp-signing.
+ """
+ return SigningService().get_url(path)
+
+ def patch(self):
+ """Patches all requests with default test values.
+
+ This method uses `responses` module to mock `requests`. You should use
+ @responses.activate decorator in your test method before
+ calling this method.
+
+ See https://github.com/getsentry/responses for details on how to
+ inspect the HTTP calls made.
+
+ Other helpful attributes are:
+ - self.base64_service_public_key
+ - self.base64_nonce
+ - self.generated_public_key
+ - self.generated_fingerprint
+ which holds the respective values used in the default fake responses.
+
+ The /sign endpoint will return, as signed message, "$n::signed!",
+ where $n is the call number (base64-encoded, as lp-signing would
+ return). This could be useful on black-box tests, where several
+ calls to /sign would be done and the response should be checked.
+ """
+ responses.add(
+ responses.GET, self.get_url("/service-key"),
+ json={"service-key": self.base64_service_public_key}, status=200)
+
+ responses.add(
+ responses.POST, self.get_url("/nonce"),
+ json={"nonce": self.base64_nonce}, status=201)
+
+ responses.add(
+ responses.POST, self.get_url("/generate"),
+ json={'fingerprint': self.generated_fingerprint,
+ 'public-key': self.b64_generated_public_key},
+ status=201)
+
+ call_counts = {'/sign': 0}
+
+ def sign_callback(request):
+ call_counts['/sign'] += 1
+ signed = base64.b64encode("%s::signed!" % call_counts['/sign'])
+ data = {'signed-message': signed,
+ 'public-key': self.b64_generated_public_key}
+ return 201, {}, json.dumps(data)
+
+ responses.add_callback(
+ responses.POST, self.get_url("/sign"),
+ callback=sign_callback)
+
+
+class SigningServiceProxyTest(TestCaseWithFactory):
+ """Tests signing service without actually making calls to lp-signing.
+
+ Every REST call is mocked using self.response_factory, and most of this
+ class's work is actually calling those endpoints. So, many things are
+ mocked here, returning fake responses created at
+ SigningServiceResponseFactory.
+ """
+ layer = BaseLayer
+
+ def setUp(self, *args, **kwargs):
+ super(TestCaseWithFactory, self).setUp(*args, **kwargs)
+ self.response_factory = SigningServiceResponseFactory()
+
+ def tearDown(self):
+ super(SigningServiceProxyTest, self).tearDown()
+ # clean singleton instance of signing service.
+ SigningService._instance = None
+
+ def assertHeaderContains(self, request, headers):
+ """Checks if the request's header contains the headers dictionary
+ provided
+
+ :param request: The requests.Request object
+ :param headers: Dictionary of expected headers
+ """
+ missing_headers = []
+ # List of tuples like (header key, got, expected)
+ different_headers = []
+ for k, v in headers.items():
+ if k not in request.headers:
+ missing_headers.append(k)
+ continue
+ if v != request.headers[k]:
+ different_headers.append((k, request.headers[k], v))
+ continue
+ failure_msgs = []
+ if missing_headers:
+ text = ", ".join(missing_headers)
+ failure_msgs.append("Missing headers: %s" % text)
+ if different_headers:
+ text = "; ".join(
+ "Header '%s': [got: %s / expected: %s]" % (k, got, expected)
+ for k, got, expected in different_headers)
+ failure_msgs.append(text)
+ if failure_msgs:
+ text = "\n".join(failure_msgs)
+ self.fail(
+ "Request header does not contain expected items:\n%s" % text)
+
+ @responses.activate
+ def test_get_service_public_key(self):
+ self.response_factory.patch()
+
+ signing = SigningService()
+ key = signing.service_public_key
+
+ # Asserts that the public key is correct.
+ self.assertIsInstance(key, PublicKey)
+ self.assertEqual(
+ key.encode(Base64Encoder),
+ self.response_factory.base64_service_public_key)
+
+ # Checks that the HTTP call was made
+ self.assertEqual(1, len(responses.calls))
+ call = responses.calls[0]
+ self.assertEqual("GET", call.request.method)
+ self.assertEqual(
+ self.response_factory.get_url("/service-key"), call.request.url)
+
+ @responses.activate
+ def test_get_nonce(self):
+ self.response_factory.patch()
+
+ signing = SigningService()
+ nonce = signing.get_nonce()
+
+ self.assertEqual(
+ base64.b64encode(nonce), self.response_factory.base64_nonce)
+
+ # Checks that the HTTP call was made
+ self.assertEqual(1, len(responses.calls))
+ call = responses.calls[0]
+ self.assertEqual("POST", call.request.method)
+ self.assertEqual(
+ self.response_factory.get_url("/nonce"), call.request.url)
+
+ @responses.activate
+ def test_generate_unknown_key_type_raises_exception(self):
+ self.response_factory.patch()
+
+ signing = SigningService()
+ self.assertRaises(
+ ValueError, signing.generate, "banana", "Wrong key type")
+ self.assertEqual(0, len(responses.calls))
+
+ @responses.activate
+ def test_generate_key(self):
+ """Makes sure that the SigningService.generate method calls the
+ correct endpoints
+ """
+ self.response_factory.patch()
+ # Generate the key, and checks if we got back the correct dict.
+ signing = SigningService()
+ generated = signing.generate(SigningKeyType.UEFI, "my lp test key")
+
+ self.assertEqual(generated, {
+ 'public-key': self.response_factory.b64_generated_public_key,
+ 'fingerprint': self.response_factory.generated_fingerprint})
+
+ self.assertEqual(3, len(responses.calls))
+
+ # expected order of HTTP calls
+ http_nonce, http_service_key, http_generate = responses.calls
+
+ self.assertEqual("POST", http_nonce.request.method)
+ self.assertEqual(
+ self.response_factory.get_url("/nonce"), http_nonce.request.url)
+
+ self.assertEqual("GET", http_service_key.request.method)
+ self.assertEqual(
+ self.response_factory.get_url("/service-key"),
+ http_service_key.request.url)
+
+ self.assertEqual("POST", http_generate.request.method)
+ self.assertEqual(
+ self.response_factory.get_url("/generate"),
+ http_generate.request.url)
+ self.assertHeaderContains(http_generate.request, {
+ "Content-Type": "application/x-boxed-json",
+ "X-Client-Public-Key": signing.LOCAL_PUBLIC_KEY,
+ "X-Nonce": self.response_factory.base64_nonce})
+ self.assertIsNotNone(http_generate.request.body)
+
+ @responses.activate
+ def test_sign_invalid_mode(self):
+ signing = SigningService()
+ self.assertRaises(
+ ValueError, signing.sign,
+ SigningKeyType.UEFI, 'fingerprint', 'message_name', 'message',
+ 'NO-MODE')
+ self.assertEqual(0, len(responses.calls))
+
+ @responses.activate
+ def test_sign_invalid_key_type(self):
+ signing = SigningService()
+ self.assertRaises(
+ ValueError, signing.sign,
+ 'shrug', 'fingerprint', 'message_name', 'message', 'ATTACHED')
+ self.assertEqual(0, len(responses.calls))
+
+ @responses.activate
+ def test_sign(self):
+ """Runs through SignService.sign() flow"""
+ # Replace GET /service-key response by our mock.
+ resp_factory = self.response_factory
+ resp_factory.patch()
+
+ fingerprint = '338D218488DFD597D8FCB9C328C3E9D9ADA16CEE'
+ key_type = SigningKeyType.KMOD
+ mode = 'DETACHED'
+ message_name = 'my test msg'
+ message = 'this is the message content'
+
+ signing = SigningService()
+ data = signing.sign(
+ key_type, fingerprint, message_name, message, mode)
+
+ self.assertEqual(3, len(responses.calls))
+ # expected order of HTTP calls
+ http_nonce, http_service_key, http_sign = responses.calls
+
+ self.assertEqual("POST", http_nonce.request.method)
+ self.assertEqual(
+ self.response_factory.get_url("/nonce"), http_nonce.request.url)
+
+ self.assertEqual("GET", http_service_key.request.method)
+ self.assertEqual(
+ self.response_factory.get_url("/service-key"),
+ http_service_key.request.url)
+
+ self.assertEqual("POST", http_sign.request.method)
+ self.assertEqual(
+ self.response_factory.get_url("/sign"),
+ http_sign.request.url)
+ self.assertHeaderContains(http_sign.request, {
+ "Content-Type": "application/x-boxed-json",
+ "X-Client-Public-Key": signing.LOCAL_PUBLIC_KEY,
+ "X-Nonce": self.response_factory.base64_nonce})
+ self.assertIsNotNone(http_sign.request.body)
+
+ # It should have returned the values from response.json(),
+ # but decoding what is base64-encoded.
+ self.assertEqual(2, len(data))
+ resp_json = http_sign.response.json()
+ self.assertEqual(data['public-key'], resp_json['public-key'])
+ self.assertEqual(
+ data['signed-message'],
+ base64.b64decode(resp_json['signed-message']))
\ No newline at end of file
diff --git a/lib/lp/services/signing/tests/test_signingkey.py b/lib/lp/services/signing/tests/test_signingkey.py
new file mode 100644
index 0000000..12b5758
--- /dev/null
+++ b/lib/lp/services/signing/tests/test_signingkey.py
@@ -0,0 +1,228 @@
+# Copyright 2010-2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+__metaclass__ = type
+
+import base64
+
+import responses
+
+from lp.services.signing.enums import SigningKeyType
+from lp.services.signing.model.signingkey import SigningKey, ArchiveSigningKey
+from lp.services.database.interfaces import IMasterStore
+from lp.services.signing.proxy import SigningService
+from lp.services.signing.tests.test_proxy import SigningServiceResponseFactory
+from lp.testing import TestCaseWithFactory
+from lp.testing.layers import DatabaseFunctionalLayer
+from storm.store import Store
+from testtools.matchers import MatchesStructure
+
+
+class TestSigningKey(TestCaseWithFactory):
+
+ layer = DatabaseFunctionalLayer
+
+ def setUp(self, *args, **kwargs):
+ super(TestSigningKey, self).setUp(*args, **kwargs)
+ self.signing_service = SigningServiceResponseFactory()
+
+ def tearDown(self):
+ super(TestSigningKey, self).tearDown()
+ # clean singleton instance of signing service.
+ SigningService._instance = None
+
+ @responses.activate
+ def test_generate_signing_key_saves_correctly(self):
+ self.signing_service.patch()
+
+ key = SigningKey.generate(SigningKeyType.UEFI, u"this is my key")
+ self.assertIsInstance(key, SigningKey)
+
+ store = IMasterStore(SigningKey)
+ store.invalidate()
+
+ rs = store.find(SigningKey)
+ self.assertEqual(1, rs.count())
+ db_key = rs.one()
+
+ self.assertEqual(SigningKeyType.UEFI, db_key.key_type)
+ self.assertEqual(
+ self.signing_service.generated_fingerprint, db_key.fingerprint)
+ self.assertEqual(
+ self.signing_service.b64_generated_public_key,
+ base64.b64encode(db_key.public_key))
+ self.assertEqual("this is my key", db_key.description)
+
+ @responses.activate
+ def test_sign_some_data(self):
+ self.signing_service.patch()
+
+ s = SigningKey(
+ SigningKeyType.UEFI, u"a fingerprint",
+ self.signing_service.b64_generated_public_key,
+ description=u"This is my key!")
+ signed = s.sign("secure message", "message_name")
+
+ # Checks if the returned value is actually the returning value from
+ # HTTP POST /sign call to lp-signing service
+ self.assertEqual(3, len(responses.calls))
+ http_sign = responses.calls[-1]
+ api_resp = http_sign.response.json()
+ self.assertEqual(
+ base64.b64decode(api_resp['signed-message']), signed)
+
+
+class TestArchiveSigningKey(TestCaseWithFactory):
+ layer = DatabaseFunctionalLayer
+
+ def setUp(self, *args, **kwargs):
+ super(TestArchiveSigningKey, self).setUp(*args, **kwargs)
+ self.signing_service = SigningServiceResponseFactory()
+
+ @responses.activate
+ def test_generate_saves_correctly(self):
+ self.signing_service.patch()
+
+ archive = self.factory.makeArchive()
+ distro_series = archive.distribution.series[0]
+
+ arch_key = ArchiveSigningKey.generate(
+ SigningKeyType.UEFI, archive, distro_series=distro_series,
+ description=u"some description")
+
+ store = Store.of(arch_key)
+ store.invalidate()
+
+ rs = store.find(ArchiveSigningKey)
+ self.assertEqual(1, rs.count())
+
+ db_arch_key = rs.one()
+ self.assertThat(db_arch_key, MatchesStructure.byEquality(
+ archive=archive, distro_series=distro_series))
+
+ self.assertThat(db_arch_key.signing_key, MatchesStructure.byEquality(
+ key_type=SigningKeyType.UEFI, description=u"some description",
+ fingerprint=self.signing_service.generated_fingerprint,
+ public_key=self.signing_service.generated_public_key))
+
+ def test_create_or_update(self):
+ archive = self.factory.makeArchive()
+ distro_series = archive.distribution.series[0]
+ signing_key = self.factory.makeSigningKey()
+
+ arch_key, created = ArchiveSigningKey.create_or_update(
+ archive, distro_series, signing_key)
+
+ store = Store.of(arch_key)
+ store.invalidate()
+ rs = store.find(ArchiveSigningKey)
+
+ self.assertEqual(1, rs.count())
+ db_arch_key = rs.one()
+ self.assertTrue(created)
+ self.assertThat(db_arch_key, MatchesStructure.byEquality(
+ archive=archive, distro_series=distro_series,
+ signing_key=signing_key))
+
+ another_signing_key = self.factory.makeSigningKey()
+ updated_arch_key, created = ArchiveSigningKey.create_or_update(
+ archive, distro_series, another_signing_key)
+
+ store.invalidate()
+ rs = store.find(ArchiveSigningKey)
+ self.assertEqual(1, store.find(ArchiveSigningKey).count())
+ db_arch_key = rs.one()
+ self.assertFalse(created)
+ self.assertThat(db_arch_key, MatchesStructure.byEquality(
+ archive=archive, distro_series=distro_series,
+ signing_key=signing_key))
+
+ # Saving another type should create a new entry
+ signing_key_from_another_type = self.factory.makeSigningKey(
+ key_type=SigningKeyType.KMOD)
+ arch_key_another_type, created = ArchiveSigningKey.create_or_update(
+ archive, distro_series, signing_key_from_another_type)
+
+ self.assertTrue(created)
+ self.assertEqual(2, store.find(ArchiveSigningKey).count())
+
+ def test_get_signing_keys_without_distro_series_configured(self):
+ UEFI = SigningKeyType.UEFI
+ KMOD = SigningKeyType.KMOD
+
+ archive = self.factory.makeArchive()
+ distro_series = archive.distribution.series[0]
+ uefi_key = self.factory.makeSigningKey(
+ key_type=SigningKeyType.UEFI)
+ kmod_key = self.factory.makeSigningKey(
+ key_type=SigningKeyType.KMOD)
+
+ # Fill the database with keys from other archives to make sure we
+ # are filtering it out
+ other_archive = archive = self.factory.makeArchive()
+ ArchiveSigningKey.create_or_update(
+ other_archive, None, self.factory.makeSigningKey())
+
+ # Create a key for the archive (no specific series)
+ arch_uefi_key, created = ArchiveSigningKey.create_or_update(
+ archive, None, uefi_key)
+ arch_kmod_key, created = ArchiveSigningKey.create_or_update(
+ archive, None, kmod_key)
+
+ # Should find the keys if we ask for the archive key
+ self.assertEqual(
+ {UEFI: arch_uefi_key, KMOD: arch_kmod_key},
+ ArchiveSigningKey.get_signing_keys(archive, None))
+
+ # Should find the key if we ask for archive + distro_series key
+ self.assertEqual(
+ {UEFI: arch_uefi_key, KMOD: arch_kmod_key},
+ ArchiveSigningKey.get_signing_keys(archive, distro_series))
+
+ def test_get_signing_keys_with_distro_series_configured(self):
+ UEFI = SigningKeyType.UEFI
+ KMOD = SigningKeyType.KMOD
+
+ archive = self.factory.makeArchive()
+ series = archive.distribution.series
+ uefi_key = self.factory.makeSigningKey(key_type=UEFI)
+ kmod_key = self.factory.makeSigningKey(key_type=KMOD)
+
+ # Fill the database with keys from other archives to make sure we
+ # are filtering it out
+ other_archive = archive = self.factory.makeArchive()
+ ArchiveSigningKey.create_or_update(
+ other_archive, None, self.factory.makeSigningKey())
+
+ # Create a key for the archive (no specific series)
+ arch_uefi_key, created = ArchiveSigningKey.create_or_update(
+ archive, None, uefi_key)
+
+ # for kmod, should give back this one if provided a
+ # newer distro series
+ arch_kmod_key, created = ArchiveSigningKey.create_or_update(
+ archive, series[1], kmod_key)
+ old_arch_kmod_key, created = ArchiveSigningKey.create_or_update(
+ archive, series[2], kmod_key)
+
+ # If no distroseries is specified, it should give back no KMOD key,
+ # since we don't have a default
+ self.assertEqual(
+ {UEFI: arch_uefi_key, KMOD: None},
+ ArchiveSigningKey.get_signing_keys(archive, None))
+
+ # For the most recent series, use the KMOD key we've set for the
+ # previous one
+ self.assertEqual(
+ {UEFI: arch_uefi_key, KMOD: arch_kmod_key},
+ ArchiveSigningKey.get_signing_keys(archive, series[0]))
+
+ # For the previous series, we have a KMOD key configured
+ self.assertEqual(
+ {UEFI: arch_uefi_key, KMOD: arch_kmod_key},
+ ArchiveSigningKey.get_signing_keys(archive, series[1]))
+
+ # For the old series, we have an old KMOD key configured
+ self.assertEqual(
+ {UEFI: arch_uefi_key, KMOD: old_arch_kmod_key},
+ ArchiveSigningKey.get_signing_keys(archive, series[2]))
\ No newline at end of file
diff --git a/lib/lp/snappy/tests/test_snapbuildbehaviour.py b/lib/lp/snappy/tests/test_snapbuildbehaviour.py
index 3a506c5..b397bba 100644
--- a/lib/lp/snappy/tests/test_snapbuildbehaviour.py
+++ b/lib/lp/snappy/tests/test_snapbuildbehaviour.py
@@ -54,8 +54,8 @@ from zope.publisher.xmlrpc import TestRequest
from zope.security.proxy import removeSecurityProxy
from lp.app.enums import InformationType
-from lp.archivepublisher.interfaces.archivesigningkey import (
- IArchiveSigningKey,
+from lp.archivepublisher.interfaces.archivegpgsigningkey import (
+ IArchiveGPGSigningKey,
)
from lp.buildmaster.enums import (
BuildBaseImageType,
@@ -649,7 +649,7 @@ class TestAsyncSnapBuildBehaviour(TestSnapBuildBehaviourBase):
yield self.useFixture(InProcessKeyServerFixture()).start()
archive = self.factory.makeArchive()
key_path = os.path.join(gpgkeysdir, "ppa-sample@xxxxxxxxxxxxxxxxx")
- yield IArchiveSigningKey(archive).setSigningKey(
+ yield IArchiveGPGSigningKey(archive).setSigningKey(
key_path, async_keyserver=True)
job = self.makeJob(archive=archive)
self.factory.makeBinaryPackagePublishingHistory(
diff --git a/lib/lp/soyuz/adapters/tests/test_archivedependencies.py b/lib/lp/soyuz/adapters/tests/test_archivedependencies.py
index dbb2343..7cd3cad 100644
--- a/lib/lp/soyuz/adapters/tests/test_archivedependencies.py
+++ b/lib/lp/soyuz/adapters/tests/test_archivedependencies.py
@@ -20,8 +20,8 @@ from twisted.internet.threads import deferToThread
from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
-from lp.archivepublisher.interfaces.archivesigningkey import (
- IArchiveSigningKey,
+from lp.archivepublisher.interfaces.archivegpgsigningkey import (
+ IArchiveGPGSigningKey,
)
from lp.registry.interfaces.distribution import IDistributionSet
from lp.registry.interfaces.pocket import PackagePublishingPocket
@@ -174,7 +174,7 @@ class TestSourcesList(TestCaseWithFactory):
archive = self.factory.makeArchive(distribution=self.ubuntu, **kwargs)
if signing_key_name is not None:
key_path = os.path.join(gpgkeysdir, "%s.sec" % signing_key_name)
- yield IArchiveSigningKey(archive).setSigningKey(
+ yield IArchiveGPGSigningKey(archive).setSigningKey(
key_path, async_keyserver=True)
if publish_binary:
self.publisher.getPubBinaries(
diff --git a/lib/lp/soyuz/scripts/ppakeygenerator.py b/lib/lp/soyuz/scripts/ppakeygenerator.py
index 00bebac..c1aa3f6 100644
--- a/lib/lp/soyuz/scripts/ppakeygenerator.py
+++ b/lib/lp/soyuz/scripts/ppakeygenerator.py
@@ -7,8 +7,8 @@ __all__ = [
from zope.component import getUtility
-from lp.archivepublisher.interfaces.archivesigningkey import (
- IArchiveSigningKey,
+from lp.archivepublisher.interfaces.archivegpgsigningkey import (
+ IArchiveGPGSigningKey,
)
from lp.services.scripts.base import (
LaunchpadCronScript,
@@ -32,7 +32,7 @@ class PPAKeyGenerator(LaunchpadCronScript):
self.logger.info(
"Generating signing key for %s (%s)" %
(archive.reference, archive.displayname))
- archive_signing_key = IArchiveSigningKey(archive)
+ archive_signing_key = IArchiveGPGSigningKey(archive)
archive_signing_key.generateSigningKey()
self.logger.info("Key %s" % archive.signing_key.fingerprint)
diff --git a/lib/lp/soyuz/tests/test_archive.py b/lib/lp/soyuz/tests/test_archive.py
index 20b7b2f..8660e70 100644
--- a/lib/lp/soyuz/tests/test_archive.py
+++ b/lib/lp/soyuz/tests/test_archive.py
@@ -37,8 +37,8 @@ from zope.security.proxy import removeSecurityProxy
from lp.app.errors import NotFoundError
from lp.app.interfaces.launchpad import ILaunchpadCelebrities
-from lp.archivepublisher.interfaces.archivesigningkey import (
- IArchiveSigningKey,
+from lp.archivepublisher.interfaces.archivegpgsigningkey import (
+ IArchiveGPGSigningKey,
)
from lp.buildmaster.enums import (
BuildQueueStatus,
@@ -1827,7 +1827,7 @@ class TestArchiveDependencies(TestCaseWithFactory):
with InProcessKeyServerFixture() as keyserver:
yield keyserver.start()
key_path = os.path.join(gpgkeysdir, 'ppa-sample@xxxxxxxxxxxxxxxxx')
- yield IArchiveSigningKey(p3a).setSigningKey(
+ yield IArchiveGPGSigningKey(p3a).setSigningKey(
key_path, async_keyserver=True)
dependency = self.factory.makeArchive(
name='dependency', private=True, owner=p3a.owner)
diff --git a/lib/lp/soyuz/tests/test_binarypackagebuildbehaviour.py b/lib/lp/soyuz/tests/test_binarypackagebuildbehaviour.py
index 61e4847..cc08bc5 100644
--- a/lib/lp/soyuz/tests/test_binarypackagebuildbehaviour.py
+++ b/lib/lp/soyuz/tests/test_binarypackagebuildbehaviour.py
@@ -22,8 +22,8 @@ from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
from lp.archivepublisher.diskpool import poolify
-from lp.archivepublisher.interfaces.archivesigningkey import (
- IArchiveSigningKey,
+from lp.archivepublisher.interfaces.archivegpgsigningkey import (
+ IArchiveGPGSigningKey,
)
from lp.buildmaster.enums import (
BuilderCleanStatus,
@@ -354,7 +354,7 @@ class TestBinaryBuildPackageBehaviour(TestCaseWithFactory):
archive = self.factory.makeArchive()
builder = self.factory.makeBuilder()
key_path = os.path.join(gpgkeysdir, "ppa-sample@xxxxxxxxxxxxxxxxx")
- yield IArchiveSigningKey(archive).setSigningKey(
+ yield IArchiveGPGSigningKey(archive).setSigningKey(
key_path, async_keyserver=True)
build = self.factory.makeBinaryPackageBuild(archive=archive)
self.factory.makeBinaryPackagePublishingHistory(
diff --git a/lib/lp/soyuz/tests/test_livefsbuildbehaviour.py b/lib/lp/soyuz/tests/test_livefsbuildbehaviour.py
index 29e697e..95e9694 100644
--- a/lib/lp/soyuz/tests/test_livefsbuildbehaviour.py
+++ b/lib/lp/soyuz/tests/test_livefsbuildbehaviour.py
@@ -20,8 +20,8 @@ from twisted.trial.unittest import TestCase as TrialTestCase
from zope.component import getUtility
from zope.security.proxy import Proxy
-from lp.archivepublisher.interfaces.archivesigningkey import (
- IArchiveSigningKey,
+from lp.archivepublisher.interfaces.archivegpgsigningkey import (
+ IArchiveGPGSigningKey,
)
from lp.buildmaster.enums import (
BuildBaseImageType,
@@ -247,7 +247,7 @@ class TestAsyncLiveFSBuildBehaviour(TestLiveFSBuildBehaviourBase):
yield self.useFixture(InProcessKeyServerFixture()).start()
archive = self.factory.makeArchive()
key_path = os.path.join(gpgkeysdir, "ppa-sample@xxxxxxxxxxxxxxxxx")
- yield IArchiveSigningKey(archive).setSigningKey(
+ yield IArchiveGPGSigningKey(archive).setSigningKey(
key_path, async_keyserver=True)
job = self.makeJob(archive=archive, with_builder=True)
self.factory.makeBinaryPackagePublishingHistory(
diff --git a/lib/lp/testing/factory.py b/lib/lp/testing/factory.py
index 4009403..7b59c08 100644
--- a/lib/lp/testing/factory.py
+++ b/lib/lp/testing/factory.py
@@ -51,6 +51,8 @@ from breezy.revision import Revision as BzrRevision
from cryptography.utils import int_to_bytes
from lazr.jobrunner.jobrunner import SuspendJobException
import pytz
+from lp.services.signing.enums import SigningKeyType
+from lp.services.signing.model.signingkey import SigningKey
from pytz import UTC
import six
from twisted.conch.ssh.common import (
@@ -4177,6 +4179,25 @@ class BareLaunchpadObjectFactory(ObjectFactory):
removeSecurityProxy(bpr).datecreated = date_created
return bpr
+ def makeSigningKey(self, key_type=None, fingerprint=None,
+ public_key=None, description=None):
+ """Makes a SigningKey (integration with lp-signing)
+ """
+ if key_type is None:
+ key_type = SigningKeyType.UEFI
+ if fingerprint is None:
+ fingerprint = self.getUniqueUnicode('fingerprint')
+ if public_key is None:
+ public_key = self.getUniqueHexString(64)
+ store = IMasterStore(SigningKey)
+ signing_key = SigningKey(
+ key_type=key_type, fingerprint=fingerprint, public_key=public_key,
+ description=description)
+ store.add(signing_key)
+ return signing_key
+
+
+
def makeSection(self, name=None):
"""Make a `Section`."""
if name is None:
Follow ups