← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~apw/launchpad/signing-add-kernel-module-signing into lp:launchpad

 

Andy Whitcroft has proposed merging lp:~apw/launchpad/signing-add-kernel-module-signing into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~apw/launchpad/signing-add-kernel-module-signing/+merge/294948

Add Kernel Module (KMod) signing support to the raw-signing custom uploader and add support for signing options (tarball output and signature only modes).

This update adds support for signing kernel modules (.ko) files.  A new kernel signing key is used (generated for PPAs) to create detached signatures for those modules.  Appaendable signatures are generated and placed in module.ko.sig.

This update also adds support for raw-signing signing options.  These are specified in the raw-signing tarball via the raw-signing.options files.  Each line of this file specifies an option.  There are two options supported:

1) tarball -- this indicates that rather than exposing all of the signing artifacts in dists directly they should be converted into a signing.tar.gz in the version directory, and

2) sigonly -- this indicates that we only want the signing artifacts not the original files.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~apw/launchpad/signing-add-kernel-module-signing into lp:launchpad.
=== modified file 'lib/lp/archivepublisher/signing.py'
--- lib/lp/archivepublisher/signing.py	2016-05-11 15:31:20 +0000
+++ lib/lp/archivepublisher/signing.py	2016-05-17 18:09:54 +0000
@@ -18,11 +18,24 @@
 
 import os
 import subprocess
+import inspect
+import tarfile
+import shutil
 
-from lp.archivepublisher.customupload import CustomUpload
+from lp.archivepublisher.customupload import (
+    CustomUpload,
+    CustomUploadError,
+    )
 from lp.services.osutils import remove_if_exists
 
 
+class SigningUploadPackError(CustomUploadError):
+    def __init__(self, tarfile_path, exc):
+        message = "Problem building tarball '%s': %s" % (
+            tarfile_path, str(exc))
+        CustomUploadError.__init__(self, message)
+
+
 class SigningUpload(CustomUpload):
     """Signing custom upload.
 
@@ -67,12 +80,16 @@
             if self.logger is not None:
                 self.logger.warning(
                     "No signing root configured for this archive")
-            self.key = None
-            self.cert = None
+            self.uefi_key = None
+            self.uefi_cert = None
+            self.kmod_pem = None
+            self.kmod_x509 = None
             self.autokey = False
         else:
-            self.key = os.path.join(pubconf.signingroot, "uefi.key")
-            self.cert = os.path.join(pubconf.signingroot, "uefi.crt")
+            self.uefi_key = os.path.join(pubconf.signingroot, "uefi.key")
+            self.uefi_cert = os.path.join(pubconf.signingroot, "uefi.crt")
+            self.kmod_pem = os.path.join(pubconf.signingroot, "kmod.pem")
+            self.kmod_x509 = os.path.join(pubconf.signingroot, "kmod.x509")
             self.autokey = pubconf.signingautokey
 
         self.setComponents(tarfile_path)
@@ -99,6 +116,19 @@
             dists_signed, "%s-%s" % (self.package, self.arch))
         self.archiveroot = pubconf.archiveroot
 
+    def setSigningOptions(self):
+        """ find and extract raw-signing.options from the tarball."""
+        self.signing_options = {}
+
+        options_file = os.path.join(self.tmpdir, self.version,
+            "raw-signing.options")
+        if not os.path.exists(options_file):
+            return
+
+        with open(options_file) as options_fd:
+            for option in options_fd:
+                self.signing_options[option.strip()] = True
+
     @classmethod
     def getSeriesKey(cls, tarfile_path):
         try:
@@ -107,31 +137,58 @@
         except ValueError:
             return None
 
-    def findEfiFilenames(self):
-        """Find all the *.efi files in an extracted tarball."""
+    def getArchiveOwner(self):
+        # XXX: pull out the PPA owner and name to seed key CN
+        archive_name = os.path.dirname(self.archiveroot)
+        owner_name = os.path.basename(os.path.dirname(archive_name))
+        archive_name = os.path.basename(archive_name)
+
+        return owner_name + ' ' + archive_name
+
+    def findSigningHandlers(self):
+        """Find all the signable files in an extracted tarball."""
         for dirpath, dirnames, filenames in os.walk(self.tmpdir):
             for filename in filenames:
                 if filename.endswith(".efi"):
-                    yield os.path.join(dirpath, filename)
+                    yield (os.path.join(dirpath, filename), self.signUefi)
+                elif filename.endswith(".ko"):
+                    yield (os.path.join(dirpath, filename), self.signKmod)
+
+    def getKeys(self, which, generate, *keynames):
+        """Validate and return the uefi key and cert for encryption."""
+
+        if self.autokey:
+            for keyfile in keynames:
+                if keyfile and not os.path.exists(keyfile):
+                    generate()
+                    break
+
+        valid = True
+        for keyfile in keynames:
+            if keyfile and not os.access(keyfile, os.R_OK):
+                if self.logger is not None:
+                    self.logger.warning(
+                        "%s key %s not readable" % (which, keyfile))
+                valid = False
+
+        if not valid:
+            return [None for k in keynames]
+        return keynames
 
     def generateUefiKeys(self):
         """Generate new UEFI Keys for this archive."""
-        directory = os.path.dirname(self.key)
+        directory = os.path.dirname(self.uefi_key)
         if not os.path.exists(directory):
             os.makedirs(directory)
 
-        # XXX: pull out the PPA owner and name to seed key CN
-        archive_name = os.path.dirname(self.archiveroot)
-        owner_name = os.path.basename(os.path.dirname(archive_name))
-        archive_name = os.path.basename(archive_name)
-        common_name = '/CN=PPA ' + owner_name + ' ' + archive_name + '/'
+        common_name = '/CN=PPA ' + self.getArchiveOwner() + '/'
 
         old_mask = os.umask(0o077)
         try:
             new_key_cmd = [
                 'openssl', 'req', '-new', '-x509', '-newkey', 'rsa:2048',
-                '-subj', common_name, '-keyout', self.key, '-out', self.cert,
-                '-days', '3650', '-nodes', '-sha256',
+                '-subj', common_name, '-keyout', self.uefi_key,
+                '-out', self.uefi_cert, '-days', '3650', '-nodes', '-sha256',
                 ]
             if subprocess.call(new_key_cmd) != 0:
                 # Just log this rather than failing, since custom upload errors
@@ -143,45 +200,121 @@
         finally:
             os.umask(old_mask)
 
-        if os.path.exists(self.cert):
-            os.chmod(self.cert, 0o644)
-
-    def getUefiKeys(self):
-        """Validate and return the uefi key and cert for encryption."""
-
-        if self.key and self.cert:
-            # If neither of the key files exists then attempt to
-            # generate them.
-            if (self.autokey and not os.path.exists(self.key)
-                and not os.path.exists(self.cert)):
-                self.generateUefiKeys()
-
-            # If we have keys, but cannot read them they are dead to us.
-            if not os.access(self.key, os.R_OK):
-                if self.logger is not None:
-                    self.logger.warning(
-                        "UEFI private key %s not readable" % self.key)
-                self.key = None
-            if not os.access(self.cert, os.R_OK):
-                if self.logger is not None:
-                    self.logger.warning(
-                        "UEFI certificate %s not readable" % self.cert)
-                self.cert = None
-
-        return (self.key, self.cert)
+        if os.path.exists(self.uefi_cert):
+            os.chmod(self.uefi_cert, 0o644)
 
     def signUefi(self, image):
         """Attempt to sign an image."""
-        (key, cert) = self.getUefiKeys()
+        remove_if_exists("%s.signed" % image)
+        (key, cert) = self.getKeys('UEFI', self.generateUefiKeys,
+            self.uefi_key, self.uefi_cert)
         if not key or not cert:
             return
         cmdl = ["sbsign", "--key", key, "--cert", cert, image]
-        if subprocess.call(cmdl) != 0:
+        status = subprocess.call(cmdl)
+        if status != 0:
             # Just log this rather than failing, since custom upload errors
             # tend to make the publisher rather upset.
             if self.logger is not None:
                 self.logger.warning("UEFI Signing Failed '%s'" %
                     " ".join(cmdl))
+        return status
+
+    def generateKmodKeys(self):
+        """Generate new Kernel Signing Keys for this archive."""
+        directory = os.path.dirname(self.kmod_pem)
+        if not os.path.exists(directory):
+            os.makedirs(directory)
+
+        genkey_text = inspect.cleandoc("""\
+            [ req ]
+            default_bits = 4096
+            distinguished_name = req_distinguished_name
+            prompt = no
+            string_mask = utf8only
+            x509_extensions = myexts
+
+            [ req_distinguished_name ]
+            CN = /CN=PPA """ + self.getArchiveOwner() + """ kmod/
+
+            [ myexts ]
+            basicConstraints=critical,CA:FALSE
+            keyUsage=digitalSignature
+            subjectKeyIdentifier=hash
+            authorityKeyIdentifier=keyid
+            """)
+
+        genkey_filename = os.path.join(directory, "kmod.genkey")
+        with open(genkey_filename, "w") as genkey_fd:
+            print >> genkey_fd, genkey_text
+
+        old_mask = os.umask(0o077)
+        try:
+            new_key_cmd = [
+                'openssl', 'req', '-new', '-nodes', '-utf8', '-sha512',
+                '-days', '3650', '-batch', '-x509', '-config', genkey_filename,
+                '-outform', 'PEM', '-out', self.kmod_pem,
+                '-keyout', self.kmod_pem
+                ]
+            if subprocess.call(new_key_cmd) != 0:
+                # Just log this rather than failing, since custom upload errors
+                # tend to make the publisher rather upset.
+                if self.logger is not None:
+                    self.logger.warning(
+                        "Failed to generate Kmod signing key for %s" %
+                        common_name)
+            else:
+                new_x509_cmd = [
+                    'openssl', 'x509', '-in', self.kmod_pem,
+                    '-outform', 'DER', '-out', self.kmod_x509
+                    ]
+                if subprocess.call(new_x509_cmd) != 0:
+                    # Just log this rather than failing (as above).
+                    if self.logger is not None:
+                        self.logger.warning(
+                            "Failed to generate Kmod x509 certificate for %s" %
+                            common_name)
+                    os.unlink(self.kmod_pem)
+        finally:
+            os.umask(old_mask)
+
+    def signKmod(self, image):
+        """Attempt to sign a kernel module."""
+        remove_if_exists("%s.sig" % image)
+        (pem, cert) = self.getKeys('Kernel Module', self.generateKmodKeys,
+            self.kmod_pem, self.kmod_x509)
+        if not pem or not cert:
+            return
+        cmdl = ["kmodsign", "-D", "sha512", pem, cert, image, image + ".sig"]
+        status = subprocess.call(cmdl)
+        if status != 0:
+            # Just log this rather than failing, since custom upload errors
+            # tend to make the publisher rather upset.
+            if self.logger is not None:
+                self.logger.warning("Kmod Signing Failed '%s'" %
+                    " ".join(cmdl))
+        return status
+
+    def convertToTarball(self):
+        tarfilename = os.path.join(self.tmpdir, "signed.tar.gz")
+        versiondir = os.path.join(self.tmpdir, self.version)
+
+        try:
+            try:
+                tarball = tarfile.open(tarfilename, "w:gz")
+                tarball.add(versiondir, arcname=self.version)
+            finally:
+                tarball.close()
+        except tarfile.TarError as exc:
+            raise SigningUploadPackError(tarfilename, exc)
+
+        # Clean out the original tree and move the signing tarball in.
+        try:
+            shutil.rmtree(versiondir)
+            os.mkdir(versiondir)
+            os.rename(tarfilename, os.path.join(versiondir, "signed.tar.gz"))
+        except os.OSError as exc:
+            raise SigningUploadPackError(tarfilename, exc)
 
     def extract(self):
         """Copy the custom upload to a temporary directory, and sign it.
@@ -189,10 +322,16 @@
         No actual extraction is required.
         """
         super(SigningUpload, self).extract()
-        efi_filenames = list(self.findEfiFilenames())
-        for efi_filename in efi_filenames:
-            remove_if_exists("%s.signed" % efi_filename)
-            self.signUefi(efi_filename)
+        self.setSigningOptions()
+        filehandlers = list(self.findSigningHandlers())
+        for (filename, handler) in filehandlers:
+            if (handler(filename) == 0 and
+                'sigonly' in self.signing_options):
+                os.unlink(filename)
+
+        # If tarball output is requested, tar up the results.
+        if 'tarball' in self.signing_options:
+            self.convertToTarball()
 
     def shouldInstall(self, filename):
         return filename.startswith("%s/" % self.version)

=== modified file 'lib/lp/archivepublisher/tests/test_signing.py'
--- lib/lp/archivepublisher/tests/test_signing.py	2016-05-11 12:59:17 +0000
+++ lib/lp/archivepublisher/tests/test_signing.py	2016-05-17 18:09:54 +0000
@@ -6,6 +6,7 @@
 __metaclass__ = type
 
 import os
+import tarfile
 
 from fixtures import MonkeyPatch
 
@@ -20,6 +21,60 @@
 from lp.testing.fakemethod import FakeMethod
 
 
+class FakeMethodCallers(FakeMethod):
+    """Fake execution general commands."""
+    def __init__(self, upload=None, *args, **kwargs):
+        super(FakeMethodCallers, self).__init__(*args, **kwargs)
+        self.upload = upload
+        self.callers = {}
+
+    def __call__(self, *args, **kwargs):
+        super(FakeMethodCallers, self).__call__(*args, **kwargs)
+
+        import inspect
+        frame = inspect.currentframe()
+        try:
+            frame = frame.f_back
+            caller = frame.f_code.co_name
+        finally:
+            del frame  # As per no-leak stack inspection in Python reference.
+
+        self.callers[caller] = self.callers.get(caller, 0) + 1
+
+        if hasattr(self, caller):
+            return getattr(self, caller)(*args, **kwargs)
+
+        return 0
+
+    def caller_call_count(self, caller):
+        return self.callers.get(caller, 0)
+
+
+class FakeMethodGenerateKeys(FakeMethodCallers):
+    """ Fake UEFI/Kmod key Generators."""
+    def generateUefiKeys(self, *args, **kwargs):
+            write_file(self.upload.uefi_key, "")
+            write_file(self.upload.uefi_cert, "")
+            return 0
+
+    def generateKmodKeys(self, *args, **kwargs):
+            write_file(self.upload.kmod_pem, "")
+            write_file(self.upload.kmod_x509, "")
+            return 0
+
+    def signUefi(self, *args, **kwargs):
+            filename = args[0][-1]
+            if filename.endswith(".efi"):
+                write_file(filename + ".signed", "")
+            return 0
+
+    def signKmod(self, *args, **kwargs):
+            filename = args[0][-1]
+            if filename.endswith(".ko.sig"):
+                write_file(filename, "")
+            return 0
+
+
 class FakeMethodGenUefiKeys(FakeMethod):
     """Fake execution of generation of Uefi keys pairs."""
     def __init__(self, upload=None, *args, **kwargs):
@@ -29,11 +84,24 @@
     def __call__(self, *args, **kwargs):
         super(FakeMethodGenUefiKeys, self).__call__(*args, **kwargs)
 
-        write_file(self.upload.key, "")
-        write_file(self.upload.cert, "")
-
-
-class FakeConfig:
+        write_file(self.upload.uefi_key, "")
+        write_file(self.upload.uefi_cert, "")
+
+
+class FakeMethodGenkmodKeys(FakeMethod):
+    """Fake execution of generation of Uefi keys pairs."""
+    def __init__(self, upload=None, *args, **kwargs):
+        super(FakeMethodGenUefiKeys, self).__init__(*args, **kwargs)
+        self.upload = upload
+
+    def __call__(self, *args, **kwargs):
+        super(FakeMethodGenUefiKeys, self).__call__(*args, **kwargs)
+
+        write_file(self.upload.kmod_pem, "")
+        write_file(self.upload.kmod_x509, "")
+
+
+class FakeConfigPrimary:
     """A fake publisher configuration for the main archive."""
     def __init__(self, distroroot, signingroot):
         self.distroroot = distroroot
@@ -42,6 +110,15 @@
         self.signingautokey = False
 
 
+class FakeConfigCopy:
+    """A fake publisher configuration for a copy archive."""
+    def __init__(self, distroroot):
+        self.distroroot = distroroot
+        self.signingroot = None
+        self.archiveroot = os.path.join(self.distroroot, 'ubuntu')
+        self.signingautokey = False
+
+
 class FakeConfigPPA:
     """A fake publisher configuration for a PPA."""
     def __init__(self, distroroot, signingroot, owner, ppa):
@@ -57,7 +134,7 @@
         super(TestSigning, self).setUp()
         self.temp_dir = self.makeTemporaryDirectory()
         self.signing_dir = self.makeTemporaryDirectory()
-        self.pubconf = FakeConfig(self.temp_dir, self.signing_dir)
+        self.pubconf = FakeConfigPrimary(self.temp_dir, self.signing_dir)
         self.suite = "distroseries"
         # CustomUpload.installFiles requires a umask of 0o022.
         old_umask = os.umask(0o022)
@@ -68,28 +145,51 @@
             'ubuntu-archive', 'testing')
         self.testcase_cn = '/CN=PPA ubuntu-archive testing/'
 
-    def setUpKeyAndCert(self, create=True):
+    def setUpUefiKeys(self, create=True):
         self.key = os.path.join(self.signing_dir, "uefi.key")
         self.cert = os.path.join(self.signing_dir, "uefi.crt")
         if create:
             write_file(self.key, "")
             write_file(self.cert, "")
 
+    def setUpKmodKeys(self, create=True):
+        self.kmod_pem = os.path.join(self.signing_dir, "kmod.pem")
+        self.kmod_x509 = os.path.join(self.signing_dir, "kmod.x509")
+        self.kmod_genkey = os.path.join(self.signing_dir, "kmod.genkey")
+        if create:
+            write_file(self.kmod_pem, "")
+            write_file(self.kmod_x509, "")
+
     def openArchive(self, loader_type, version, arch):
         self.path = os.path.join(
             self.temp_dir, "%s_%s_%s.tar.gz" % (loader_type, version, arch))
         self.buffer = open(self.path, "wb")
         self.archive = LaunchpadWriteTarFile(self.buffer)
 
+    def assertCallCount(self, count, call):
+        self.assertEqual(count, self.fake_call.caller_call_count(call))
+
+    def process_emulate(self):
+        self.archive.close()
+        self.buffer.close()
+        upload = SigningUpload()
+        # Under no circumstances is it safe to execute actual commands.
+        self.fake_call = FakeMethodGenerateKeys(upload=upload)
+        self.useFixture(MonkeyPatch("subprocess.call", self.fake_call))
+        upload.process(self.pubconf, self.path, self.suite)
+
+        return upload
+
     def process(self):
         self.archive.close()
         self.buffer.close()
-        fake_call = FakeMethod()
         upload = SigningUpload()
         upload.signUefi = FakeMethod()
+        upload.signKmod = FakeMethod()
+        # Under no circumstances is it safe to execute actual commands.
+        fake_call = FakeMethod(result=0)
         self.useFixture(MonkeyPatch("subprocess.call", fake_call))
         upload.process(self.pubconf, self.path, self.suite)
-        # Under no circumstances is it safe to execute actual commands.
         self.assertEqual(0, fake_call.call_count)
 
         return upload
@@ -106,37 +206,164 @@
         return os.path.join(self.getDistsPath(), "uefi",
             "%s-%s" % (loader_type, arch))
 
-    def test_unconfigured(self):
+    def test_archive_copy(self):
         # If there is no key/cert configuration, processing succeeds but
-        # nothing is signed.  Signing is attempted.
-        self.pubconf = FakeConfig(self.temp_dir, None)
-        self.openArchive("test", "1.0", "amd64")
-        self.archive.add_file("1.0/empty.efi", "")
-        upload = self.process()
-        self.assertEqual(1, upload.signUefi.call_count)
-
-    def test_missing_key_and_cert(self):
-        # If the configured key/cert are missing, processing succeeds but
-        # nothing is signed.  Signing is attempted.
-        self.openArchive("test", "1.0", "amd64")
-        self.archive.add_file("1.0/empty.efi", "")
-        upload = self.process()
-        self.assertEqual(1, upload.signUefi.call_count)
-
-    def test_no_efi_files(self):
+        # nothing is signed.
+        self.pubconf = FakeConfigCopy(self.temp_dir)
+        self.openArchive("test", "1.0", "amd64")
+        self.archive.add_file("1.0/empty.efi", "")
+        self.archive.add_file("1.0/empty.ko", "")
+        self.process_emulate()
+        self.assertCallCount(0, 'generateUefiKeys')
+        self.assertCallCount(0, 'generateKmodKeys')
+        self.assertCallCount(0, 'signUefi')
+        self.assertCallCount(0, 'signKmod')
+
+    def test_archive_primary_no_keys(self):
+        # If the configured key/cert are missing, processing succeeds but
+        # nothing is signed.
+        self.openArchive("test", "1.0", "amd64")
+        self.archive.add_file("1.0/empty.efi", "")
+        self.archive.add_file("1.0/empty.ko", "")
+        self.process_emulate()
+        self.assertCallCount(0, 'generateUefiKeys')
+        self.assertCallCount(0, 'generateKmodKeys')
+        self.assertCallCount(0, 'signUefi')
+        self.assertCallCount(0, 'signKmod')
+
+    def test_archive_primary_keys(self):
+        # If the configured key/cert are missing, processing succeeds but
+        # nothing is signed.
+        self.setUpUefiKeys()
+        self.setUpKmodKeys()
+        self.openArchive("test", "1.0", "amd64")
+        self.archive.add_file("1.0/empty.efi", "")
+        self.archive.add_file("1.0/empty.ko", "")
+        self.process_emulate()
+        self.assertCallCount(0, 'generateUefiKeys')
+        self.assertCallCount(0, 'generateKmodKeys')
+        self.assertCallCount(1, 'signUefi')
+        self.assertCallCount(1, 'signKmod')
+
+    def test_PPA_creates_keys(self):
+        # If the configured key/cert are missing, processing succeeds but
+        # nothing is signed.
+        self.setUpPPA()
+        self.openArchive("test", "1.0", "amd64")
+        self.archive.add_file("1.0/empty.efi", "")
+        self.archive.add_file("1.0/empty.ko", "")
+        self.process_emulate()
+        self.assertCallCount(1, 'generateUefiKeys')
+        self.assertCallCount(2, 'generateKmodKeys')
+        self.assertCallCount(1, 'signUefi')
+        self.assertCallCount(1, 'signKmod')
+
+    def test_options_handling_none(self):
+        # If the configured key/cert are missing, processing succeeds but
+        # nothing is signed.
+        self.openArchive("test", "1.0", "amd64")
+        self.archive.add_file("1.0/raw-signing.options", "")
+        upload = self.process_emulate()
+        self.assertEqual([], sorted(upload.signing_options.keys()))
+
+    def test_options_handling_single(self):
+        # If the configured key/cert are missing, processing succeeds but
+        # nothing is signed.
+        self.openArchive("test", "1.0", "amd64")
+        self.archive.add_file("1.0/raw-signing.options", "first\n")
+        upload = self.process_emulate()
+        self.assertEqual(['first'], sorted(upload.signing_options.keys()))
+
+    def test_options_handling_multiple(self):
+        # If the configured key/cert are missing, processing succeeds but
+        # nothing is signed.
+        self.openArchive("test", "1.0", "amd64")
+        self.archive.add_file("1.0/raw-signing.options", "first\nsecond\n")
+        upload = self.process_emulate()
+        self.assertEqual(['first', 'second'],
+            sorted(upload.signing_options.keys()))
+
+    def test_options_tarball(self):
+        # Specifying the "tarball" option should create an tarball in
+        # the tmpdir.
+        self.setUpUefiKeys()
+        self.setUpKmodKeys()
+        self.openArchive("test", "1.0", "amd64")
+        self.archive.add_file("1.0/raw-signing.options", "tarball")
+        self.archive.add_file("1.0/empty.efi", "")
+        self.archive.add_file("1.0/empty.ko", "")
+        self.process_emulate()
+        self.assertFalse(os.path.exists(os.path.join(
+            self.getSignedPath("test", "amd64"), "1.0", "empty.efi")))
+        self.assertFalse(os.path.exists(os.path.join(
+            self.getSignedPath("test", "amd64"), "1.0", "empty.ko")))
+        self.assertTrue(os.path.exists(os.path.join(
+            self.getSignedPath("test", "amd64"), "1.0", "signed.tar.gz")))
+        with tarfile.open(os.path.join(self.getSignedPath("test", "amd64"),
+            "1.0", "signed.tar.gz")) as tarball:
+            self.assertEqual([
+                '1.0',
+                '1.0/empty.efi',
+                '1.0/empty.efi.signed',
+                '1.0/empty.ko',
+                '1.0/empty.ko.sig',
+                '1.0/raw-signing.options',
+                ], sorted(tarball.getnames()))
+
+    def test_options_sigonly(self):
+        # Specifying the "sigonly" option should trigger removal of
+        # the source files leaving signatures only.
+        self.setUpUefiKeys()
+        self.setUpKmodKeys()
+        self.openArchive("test", "1.0", "amd64")
+        self.archive.add_file("1.0/raw-signing.options", "sigonly")
+        self.archive.add_file("1.0/empty.efi", "")
+        self.archive.add_file("1.0/empty.ko", "")
+        self.process_emulate()
+        self.assertFalse(os.path.exists(os.path.join(
+            self.getSignedPath("test", "amd64"), "1.0", "empty.efi")))
+        self.assertTrue(os.path.exists(os.path.join(
+            self.getSignedPath("test", "amd64"), "1.0", "empty.efi.signed")))
+        self.assertFalse(os.path.exists(os.path.join(
+            self.getSignedPath("test", "amd64"), "1.0", "empty.ko")))
+        self.assertTrue(os.path.exists(os.path.join(
+            self.getSignedPath("test", "amd64"), "1.0", "empty.ko.sig")))
+
+    def test_options_tarball_sigonly(self):
+        # Specifying the "tarball" option should create an tarball in
+        # the tmpdir.  Adding sigonly should trigger removal of the
+        # original files.
+        self.setUpUefiKeys()
+        self.setUpKmodKeys()
+        self.openArchive("test", "1.0", "amd64")
+        self.archive.add_file("1.0/raw-signing.options", "tarball\nsigonly")
+        self.archive.add_file("1.0/empty.efi", "")
+        self.archive.add_file("1.0/empty.ko", "")
+        self.process_emulate()
+        with tarfile.open(os.path.join(self.getSignedPath("test", "amd64"),
+            "1.0", "signed.tar.gz")) as tarball:
+            self.assertEqual([
+                '1.0',
+                '1.0/empty.efi.signed',
+                '1.0/empty.ko.sig',
+                '1.0/raw-signing.options',
+                ], sorted(tarball.getnames()))
+
+    def test_no_signed_files(self):
         # Tarballs containing no *.efi files are extracted without complaint.
         # Nothing is signed.
-        self.setUpKeyAndCert()
+        self.setUpUefiKeys()
         self.openArchive("empty", "1.0", "amd64")
         self.archive.add_file("1.0/hello", "world")
         upload = self.process()
         self.assertTrue(os.path.exists(os.path.join(
             self.getSignedPath("empty", "amd64"), "1.0", "hello")))
         self.assertEqual(0, upload.signUefi.call_count)
+        self.assertEqual(0, upload.signKmod.call_count)
 
     def test_already_exists(self):
         # If the target directory already exists, processing fails.
-        self.setUpKeyAndCert()
+        self.setUpUefiKeys()
         self.openArchive("test", "1.0", "amd64")
         self.archive.add_file("1.0/empty.efi", "")
         os.makedirs(os.path.join(self.getSignedPath("test", "amd64"), "1.0"))
@@ -144,7 +371,7 @@
 
     def test_bad_umask(self):
         # The umask must be 0o022 to avoid incorrect permissions.
-        self.setUpKeyAndCert()
+        self.setUpUefiKeys()
         self.openArchive("test", "1.0", "amd64")
         self.archive.add_file("1.0/dir/file.efi", "foo")
         os.umask(0o002)  # cleanup already handled by setUp
@@ -153,8 +380,8 @@
     def test_correct_uefi_signing_command_executed(self):
         # Check that calling signUefi() will generate the expected command
         # when appropriate keys are present.
-        self.setUpKeyAndCert()
-        fake_call = FakeMethod()
+        self.setUpUefiKeys()
+        fake_call = FakeMethod(result=0)
         self.useFixture(MonkeyPatch("subprocess.call", fake_call))
         upload = SigningUpload()
         upload.generateUefiKeys = FakeMethod()
@@ -173,8 +400,8 @@
     def test_correct_uefi_signing_command_executed_no_keys(self):
         # Check that calling signUefi() will generate no commands when
         # no keys are present.
-        self.setUpKeyAndCert(create=False)
-        fake_call = FakeMethod()
+        self.setUpUefiKeys(create=False)
+        fake_call = FakeMethod(result=0)
         self.useFixture(MonkeyPatch("subprocess.call", fake_call))
         upload = SigningUpload()
         upload.generateUefiKeys = FakeMethod()
@@ -188,8 +415,8 @@
         # Check that calling generateUefiKeys() will generate the
         # expected command.
         self.setUpPPA()
-        self.setUpKeyAndCert(create=False)
-        fake_call = FakeMethod()
+        self.setUpUefiKeys(create=False)
+        fake_call = FakeMethod(result=0)
         self.useFixture(MonkeyPatch("subprocess.call", fake_call))
         upload = SigningUpload()
         upload.setTargetDirectory(
@@ -205,17 +432,99 @@
             ]
         self.assertEqual(expected_cmd, args)
 
-    def test_signs_image(self):
-        # Each image in the tarball is signed.
-        self.setUpKeyAndCert()
-        self.openArchive("test", "1.0", "amd64")
-        self.archive.add_file("1.0/empty.efi", "")
-        upload = self.process()
-        self.assertEqual(1, upload.signUefi.call_count)
+    def test_correct_kmod_signing_command_executed(self):
+        # Check that calling signKmod() will generate the expected command
+        # when appropriate keys are present.
+        self.setUpKmodKeys()
+        fake_call = FakeMethod(result=0)
+        self.useFixture(MonkeyPatch("subprocess.call", fake_call))
+        upload = SigningUpload()
+        upload.generateKmodKeys = FakeMethod()
+        upload.setTargetDirectory(
+            self.pubconf, "test_1.0_amd64.tar.gz", "distroseries")
+        upload.signKmod('t.ko')
+        self.assertEqual(1, fake_call.call_count)
+        # Assert command form.
+        args = fake_call.calls[0][0][0]
+        expected_cmd = [
+            'kmodsign', '-D', 'sha512', self.kmod_pem, self.kmod_x509,
+            't.ko', 't.ko.sig'
+            ]
+        self.assertEqual(expected_cmd, args)
+        self.assertEqual(0, upload.generateKmodKeys.call_count)
+
+    def test_correct_kmod_signing_command_executed_no_keys(self):
+        # Check that calling signKmod() will generate no commands when
+        # no keys are present.
+        self.setUpKmodKeys(create=False)
+        fake_call = FakeMethod(result=0)
+        self.useFixture(MonkeyPatch("subprocess.call", fake_call))
+        upload = SigningUpload()
+        upload.generateKmodKeys = FakeMethod()
+        upload.setTargetDirectory(
+            self.pubconf, "test_1.0_amd64.tar.gz", "distroseries")
+        upload.signUefi('t.ko')
+        self.assertEqual(0, fake_call.call_count)
+        self.assertEqual(0, upload.generateKmodKeys.call_count)
+
+    def test_correct_kmod_keygen_command_executed(self):
+        # Check that calling generateUefiKeys() will generate the
+        # expected command.
+        self.setUpPPA()
+        self.setUpKmodKeys(create=False)
+        fake_call = FakeMethod(result=0)
+        self.useFixture(MonkeyPatch("subprocess.call", fake_call))
+        upload = SigningUpload()
+        upload.setTargetDirectory(
+            self.pubconf, "test_1.0_amd64.tar.gz", "distroseries")
+        upload.generateKmodKeys()
+        self.assertEqual(2, fake_call.call_count)
+        # Assert the actual command matches.
+        args = fake_call.calls[0][0][0]
+        expected_cmd = [
+            'openssl', 'req', '-new', '-nodes', '-utf8', '-sha512',
+            '-days', '3650', '-batch', '-x509',
+            '-config', self.kmod_genkey, '-outform', 'PEM',
+            '-out', self.kmod_pem, '-keyout', self.kmod_pem
+            ]
+        self.assertEqual(expected_cmd, args)
+        args = fake_call.calls[1][0][0]
+        expected_cmd = [
+            'openssl', 'x509', '-in', self.kmod_pem, '-outform', 'DER',
+            '-out', self.kmod_x509
+            ]
+        self.assertEqual(expected_cmd, args)
+
+    def test_signs_uefi_image(self):
+        # Each image in the tarball is signed.
+        self.setUpUefiKeys()
+        self.openArchive("test", "1.0", "amd64")
+        self.archive.add_file("1.0/empty.efi", "")
+        upload = self.process()
+        self.assertEqual(1, upload.signUefi.call_count)
+
+    def test_signs_kmod_image(self):
+        # Each image in the tarball is signed.
+        self.setUpKmodKeys()
+        self.openArchive("test", "1.0", "amd64")
+        self.archive.add_file("1.0/empty.ko", "")
+        upload = self.process()
+        self.assertEqual(1, upload.signKmod.call_count)
+
+    def test_signs_combo_image(self):
+        # Each image in the tarball is signed.
+        self.setUpKmodKeys()
+        self.openArchive("test", "1.0", "amd64")
+        self.archive.add_file("1.0/empty.efi", "")
+        self.archive.add_file("1.0/empty.ko", "")
+        self.archive.add_file("1.0/empty2.ko", "")
+        upload = self.process()
+        self.assertEqual(1, upload.signUefi.call_count)
+        self.assertEqual(2, upload.signKmod.call_count)
 
     def test_installed(self):
         # Files in the tarball are installed correctly.
-        self.setUpKeyAndCert()
+        self.setUpUefiKeys()
         self.openArchive("test", "1.0", "amd64")
         self.archive.add_file("1.0/empty.efi", "")
         self.process()
@@ -231,7 +540,7 @@
     def test_installed_existing_uefi(self):
         # Files in the tarball are installed correctly.
         os.makedirs(os.path.join(self.getDistsPath(), "uefi"))
-        self.setUpKeyAndCert()
+        self.setUpUefiKeys()
         self.openArchive("test", "1.0", "amd64")
         self.archive.add_file("1.0/empty.efi", "")
         self.process()
@@ -247,7 +556,7 @@
     def test_installed_existing_signing(self):
         # Files in the tarball are installed correctly.
         os.makedirs(os.path.join(self.getDistsPath(), "signing"))
-        self.setUpKeyAndCert()
+        self.setUpUefiKeys()
         self.openArchive("test", "1.0", "amd64")
         self.archive.add_file("1.0/empty.efi", "")
         self.process()
@@ -262,10 +571,10 @@
 
     def test_create_uefi_keys_autokey_off(self):
         # Keys are not created.
-        self.setUpKeyAndCert(create=False)
+        self.setUpUefiKeys(create=False)
         self.assertFalse(os.path.exists(self.key))
         self.assertFalse(os.path.exists(self.cert))
-        fake_call = FakeMethod()
+        fake_call = FakeMethod(result=0)
         self.useFixture(MonkeyPatch("subprocess.call", fake_call))
         upload = SigningUpload()
         upload.generateUefiKeys = FakeMethodGenUefiKeys(upload=upload)
@@ -279,10 +588,10 @@
     def test_create_uefi_keys_autokey_on(self):
         # Keys are created on demand.
         self.setUpPPA()
-        self.setUpKeyAndCert(create=False)
+        self.setUpUefiKeys(create=False)
         self.assertFalse(os.path.exists(self.key))
         self.assertFalse(os.path.exists(self.cert))
-        fake_call = FakeMethod()
+        fake_call = FakeMethod(result=0)
         self.useFixture(MonkeyPatch("subprocess.call", fake_call))
         upload = SigningUpload()
         upload.generateUefiKeys = FakeMethodGenUefiKeys(upload=upload)


Follow ups