← Back to team overview

launchpad-reviewers team mailing list archive

Re: [Merge] lp:~apw/launchpad/signing-add-kernel-module-signing into lp:launchpad

 

Review: Needs Fixing



Diff comments:

> === modified file 'lib/lp/archivepublisher/signing.py'
> --- lib/lp/archivepublisher/signing.py	2016-05-11 15:31:20 +0000
> +++ lib/lp/archivepublisher/signing.py	2016-05-17 18:09:54 +0000
> @@ -18,11 +18,24 @@
>  
>  import os
>  import subprocess
> +import inspect
> +import tarfile
> +import shutil

Sort these.

>  
> -from lp.archivepublisher.customupload import CustomUpload
> +from lp.archivepublisher.customupload import (
> +    CustomUpload,
> +    CustomUploadError,
> +    )
>  from lp.services.osutils import remove_if_exists
>  
>  
> +class SigningUploadPackError(CustomUploadError):
> +    def __init__(self, tarfile_path, exc):
> +        message = "Problem building tarball '%s': %s" % (
> +            tarfile_path, str(exc))

No need for str() here, as %s has that effect already.

> +        CustomUploadError.__init__(self, message)
> +
> +
>  class SigningUpload(CustomUpload):
>      """Signing custom upload.
>  
> @@ -99,6 +116,19 @@
>              dists_signed, "%s-%s" % (self.package, self.arch))
>          self.archiveroot = pubconf.archiveroot
>  
> +    def setSigningOptions(self):
> +        """ find and extract raw-signing.options from the tarball."""

Docstrings should be sentence case and should have no leading whitespace.

> +        self.signing_options = {}
> +
> +        options_file = os.path.join(self.tmpdir, self.version,
> +            "raw-signing.options")
> +        if not os.path.exists(options_file):
> +            return
> +
> +        with open(options_file) as options_fd:
> +            for option in options_fd:
> +                self.signing_options[option.strip()] = True
> +
>      @classmethod
>      def getSeriesKey(cls, tarfile_path):
>          try:
> @@ -107,31 +137,58 @@
>          except ValueError:
>              return None
>  
> -    def findEfiFilenames(self):
> -        """Find all the *.efi files in an extracted tarball."""
> +    def getArchiveOwner(self):

getArchiveOwnerAndName, I think.

> +        # XXX: pull out the PPA owner and name to seed key CN

It's usual to sign and date XXX comments, e.g. "XXX apw 2016-05-18".

> +        archive_name = os.path.dirname(self.archiveroot)
> +        owner_name = os.path.basename(os.path.dirname(archive_name))
> +        archive_name = os.path.basename(archive_name)
> +
> +        return owner_name + ' ' + archive_name
> +
> +    def findSigningHandlers(self):
> +        """Find all the signable files in an extracted tarball."""
>          for dirpath, dirnames, filenames in os.walk(self.tmpdir):
>              for filename in filenames:
>                  if filename.endswith(".efi"):
> -                    yield os.path.join(dirpath, filename)
> +                    yield (os.path.join(dirpath, filename), self.signUefi)
> +                elif filename.endswith(".ko"):
> +                    yield (os.path.join(dirpath, filename), self.signKmod)
> +
> +    def getKeys(self, which, generate, *keynames):
> +        """Validate and return the uefi key and cert for encryption."""
> +
> +        if self.autokey:
> +            for keyfile in keynames:
> +                if keyfile and not os.path.exists(keyfile):
> +                    generate()
> +                    break
> +
> +        valid = True
> +        for keyfile in keynames:
> +            if keyfile and not os.access(keyfile, os.R_OK):
> +                if self.logger is not None:
> +                    self.logger.warning(
> +                        "%s key %s not readable" % (which, keyfile))
> +                valid = False
> +
> +        if not valid:
> +            return [None for k in keynames]
> +        return keynames
>  
>      def generateUefiKeys(self):
>          """Generate new UEFI Keys for this archive."""
> -        directory = os.path.dirname(self.key)
> +        directory = os.path.dirname(self.uefi_key)
>          if not os.path.exists(directory):
>              os.makedirs(directory)
>  
> -        # XXX: pull out the PPA owner and name to seed key CN
> -        archive_name = os.path.dirname(self.archiveroot)
> -        owner_name = os.path.basename(os.path.dirname(archive_name))
> -        archive_name = os.path.basename(archive_name)
> -        common_name = '/CN=PPA ' + owner_name + ' ' + archive_name + '/'
> +        common_name = '/CN=PPA ' + self.getArchiveOwner() + '/'
>  
>          old_mask = os.umask(0o077)
>          try:
>              new_key_cmd = [
>                  'openssl', 'req', '-new', '-x509', '-newkey', 'rsa:2048',
> -                '-subj', common_name, '-keyout', self.key, '-out', self.cert,
> -                '-days', '3650', '-nodes', '-sha256',
> +                '-subj', common_name, '-keyout', self.uefi_key,
> +                '-out', self.uefi_cert, '-days', '3650', '-nodes', '-sha256',
>                  ]
>              if subprocess.call(new_key_cmd) != 0:
>                  # Just log this rather than failing, since custom upload errors
> @@ -143,45 +200,121 @@
>          finally:
>              os.umask(old_mask)
>  
> -        if os.path.exists(self.cert):
> -            os.chmod(self.cert, 0o644)
> -
> -    def getUefiKeys(self):
> -        """Validate and return the uefi key and cert for encryption."""
> -
> -        if self.key and self.cert:
> -            # If neither of the key files exists then attempt to
> -            # generate them.
> -            if (self.autokey and not os.path.exists(self.key)
> -                and not os.path.exists(self.cert)):
> -                self.generateUefiKeys()
> -
> -            # If we have keys, but cannot read them they are dead to us.
> -            if not os.access(self.key, os.R_OK):
> -                if self.logger is not None:
> -                    self.logger.warning(
> -                        "UEFI private key %s not readable" % self.key)
> -                self.key = None
> -            if not os.access(self.cert, os.R_OK):
> -                if self.logger is not None:
> -                    self.logger.warning(
> -                        "UEFI certificate %s not readable" % self.cert)
> -                self.cert = None
> -
> -        return (self.key, self.cert)
> +        if os.path.exists(self.uefi_cert):
> +            os.chmod(self.uefi_cert, 0o644)
>  
>      def signUefi(self, image):
>          """Attempt to sign an image."""
> -        (key, cert) = self.getUefiKeys()
> +        remove_if_exists("%s.signed" % image)
> +        (key, cert) = self.getKeys('UEFI', self.generateUefiKeys,
> +            self.uefi_key, self.uefi_cert)
>          if not key or not cert:
>              return
>          cmdl = ["sbsign", "--key", key, "--cert", cert, image]
> -        if subprocess.call(cmdl) != 0:
> +        status = subprocess.call(cmdl)
> +        if status != 0:
>              # Just log this rather than failing, since custom upload errors
>              # tend to make the publisher rather upset.
>              if self.logger is not None:
>                  self.logger.warning("UEFI Signing Failed '%s'" %
>                      " ".join(cmdl))
> +        return status
> +
> +    def generateKmodKeys(self):
> +        """Generate new Kernel Signing Keys for this archive."""
> +        directory = os.path.dirname(self.kmod_pem)
> +        if not os.path.exists(directory):
> +            os.makedirs(directory)
> +
> +        genkey_text = inspect.cleandoc("""\

This isn't a docstring, so inspect.cleandoc is inappropriate.  I think you were probably looking for textwrap.dedent here.

I'd prefer this to be a temporary file (using the tempfile module) rather than remaining on the filesystem forever.

> +            [ req ]
> +            default_bits = 4096
> +            distinguished_name = req_distinguished_name
> +            prompt = no
> +            string_mask = utf8only
> +            x509_extensions = myexts
> +
> +            [ req_distinguished_name ]
> +            CN = /CN=PPA """ + self.getArchiveOwner() + """ kmod/
> +
> +            [ myexts ]
> +            basicConstraints=critical,CA:FALSE
> +            keyUsage=digitalSignature
> +            subjectKeyIdentifier=hash
> +            authorityKeyIdentifier=keyid
> +            """)
> +
> +        genkey_filename = os.path.join(directory, "kmod.genkey")
> +        with open(genkey_filename, "w") as genkey_fd:
> +            print >> genkey_fd, genkey_text

New code should do "from __future__ import print_function" immediately after the module docstring, and then use "print(genkey_text, file=genkey_fd)".  Or you could use write_file from lp.services.osutils.

> +
> +        old_mask = os.umask(0o077)
> +        try:
> +            new_key_cmd = [
> +                'openssl', 'req', '-new', '-nodes', '-utf8', '-sha512',
> +                '-days', '3650', '-batch', '-x509', '-config', genkey_filename,
> +                '-outform', 'PEM', '-out', self.kmod_pem,
> +                '-keyout', self.kmod_pem
> +                ]
> +            if subprocess.call(new_key_cmd) != 0:
> +                # Just log this rather than failing, since custom upload errors
> +                # tend to make the publisher rather upset.
> +                if self.logger is not None:
> +                    self.logger.warning(
> +                        "Failed to generate Kmod signing key for %s" %
> +                        common_name)
> +            else:
> +                new_x509_cmd = [
> +                    'openssl', 'x509', '-in', self.kmod_pem,
> +                    '-outform', 'DER', '-out', self.kmod_x509
> +                    ]
> +                if subprocess.call(new_x509_cmd) != 0:
> +                    # Just log this rather than failing (as above).
> +                    if self.logger is not None:
> +                        self.logger.warning(
> +                            "Failed to generate Kmod x509 certificate for %s" %
> +                            common_name)
> +                    os.unlink(self.kmod_pem)
> +        finally:
> +            os.umask(old_mask)
> +
> +    def signKmod(self, image):
> +        """Attempt to sign a kernel module."""
> +        remove_if_exists("%s.sig" % image)
> +        (pem, cert) = self.getKeys('Kernel Module', self.generateKmodKeys,
> +            self.kmod_pem, self.kmod_x509)
> +        if not pem or not cert:
> +            return
> +        cmdl = ["kmodsign", "-D", "sha512", pem, cert, image, image + ".sig"]
> +        status = subprocess.call(cmdl)
> +        if status != 0:
> +            # Just log this rather than failing, since custom upload errors
> +            # tend to make the publisher rather upset.
> +            if self.logger is not None:
> +                self.logger.warning("Kmod Signing Failed '%s'" %
> +                    " ".join(cmdl))
> +        return status
> +
> +    def convertToTarball(self):
> +        tarfilename = os.path.join(self.tmpdir, "signed.tar.gz")
> +        versiondir = os.path.join(self.tmpdir, self.version)
> +
> +        try:
> +            try:
> +                tarball = tarfile.open(tarfilename, "w:gz")
> +                tarball.add(versiondir, arcname=self.version)
> +            finally:
> +                tarball.close()

Simpler:

  with tarfile.open(tarfilename, "w:gz") as tarball:
      tarball.add(versiondir, arcname=self.version)

> +        except tarfile.TarError as exc:
> +            raise SigningUploadPackError(tarfilename, exc)
> +
> +        # Clean out the original tree and move the signing tarball in.
> +        try:
> +            shutil.rmtree(versiondir)
> +            os.mkdir(versiondir)
> +            os.rename(tarfilename, os.path.join(versiondir, "signed.tar.gz"))
> +        except os.OSError as exc:
> +            raise SigningUploadPackError(tarfilename, exc)
>  
>      def extract(self):
>          """Copy the custom upload to a temporary directory, and sign it.
> 
> === modified file 'lib/lp/archivepublisher/tests/test_signing.py'
> --- lib/lp/archivepublisher/tests/test_signing.py	2016-05-11 12:59:17 +0000
> +++ lib/lp/archivepublisher/tests/test_signing.py	2016-05-17 18:09:54 +0000
> @@ -20,6 +21,60 @@
>  from lp.testing.fakemethod import FakeMethod
>  
>  
> +class FakeMethodCallers(FakeMethod):
> +    """Fake execution general commands."""
> +    def __init__(self, upload=None, *args, **kwargs):
> +        super(FakeMethodCallers, self).__init__(*args, **kwargs)
> +        self.upload = upload
> +        self.callers = {}
> +
> +    def __call__(self, *args, **kwargs):
> +        super(FakeMethodCallers, self).__call__(*args, **kwargs)
> +
> +        import inspect
> +        frame = inspect.currentframe()
> +        try:
> +            frame = frame.f_back
> +            caller = frame.f_code.co_name
> +        finally:
> +            del frame  # As per no-leak stack inspection in Python reference.
> +
> +        self.callers[caller] = self.callers.get(caller, 0) + 1
> +
> +        if hasattr(self, caller):
> +            return getattr(self, caller)(*args, **kwargs)
> +
> +        return 0

Wow.  Um.

I would greatly prefer test code to be rather less opaque and more straightforward, so that we don't have to spend too much effort proving that the test code is correct.  It looks like you're using this in order to behave differently depending on who called your mock subprocess.call.  Instead, I would suggest having a single FakeMethod for that, and having it just look at its arguments to decide what to do.  That seems both simpler and more in line with how an actual subprocess invocation works.

> +
> +    def caller_call_count(self, caller):
> +        return self.callers.get(caller, 0)
> +
> +
> +class FakeMethodGenerateKeys(FakeMethodCallers):
> +    """ Fake UEFI/Kmod key Generators."""
> +    def generateUefiKeys(self, *args, **kwargs):
> +            write_file(self.upload.uefi_key, "")
> +            write_file(self.upload.uefi_cert, "")
> +            return 0
> +
> +    def generateKmodKeys(self, *args, **kwargs):
> +            write_file(self.upload.kmod_pem, "")
> +            write_file(self.upload.kmod_x509, "")
> +            return 0
> +
> +    def signUefi(self, *args, **kwargs):
> +            filename = args[0][-1]
> +            if filename.endswith(".efi"):
> +                write_file(filename + ".signed", "")
> +            return 0
> +
> +    def signKmod(self, *args, **kwargs):
> +            filename = args[0][-1]
> +            if filename.endswith(".ko.sig"):
> +                write_file(filename, "")
> +            return 0
> +
> +
>  class FakeMethodGenUefiKeys(FakeMethod):
>      """Fake execution of generation of Uefi keys pairs."""
>      def __init__(self, upload=None, *args, **kwargs):
> @@ -106,37 +206,164 @@
>          return os.path.join(self.getDistsPath(), "uefi",
>              "%s-%s" % (loader_type, arch))
>  
> -    def test_unconfigured(self):
> +    def test_archive_copy(self):
>          # If there is no key/cert configuration, processing succeeds but
> -        # nothing is signed.  Signing is attempted.
> -        self.pubconf = FakeConfig(self.temp_dir, None)
> -        self.openArchive("test", "1.0", "amd64")
> -        self.archive.add_file("1.0/empty.efi", "")
> -        upload = self.process()
> -        self.assertEqual(1, upload.signUefi.call_count)
> -
> -    def test_missing_key_and_cert(self):
> -        # If the configured key/cert are missing, processing succeeds but
> -        # nothing is signed.  Signing is attempted.
> -        self.openArchive("test", "1.0", "amd64")
> -        self.archive.add_file("1.0/empty.efi", "")
> -        upload = self.process()
> -        self.assertEqual(1, upload.signUefi.call_count)
> -
> -    def test_no_efi_files(self):
> +        # nothing is signed.
> +        self.pubconf = FakeConfigCopy(self.temp_dir)
> +        self.openArchive("test", "1.0", "amd64")
> +        self.archive.add_file("1.0/empty.efi", "")
> +        self.archive.add_file("1.0/empty.ko", "")
> +        self.process_emulate()
> +        self.assertCallCount(0, 'generateUefiKeys')
> +        self.assertCallCount(0, 'generateKmodKeys')
> +        self.assertCallCount(0, 'signUefi')
> +        self.assertCallCount(0, 'signKmod')
> +
> +    def test_archive_primary_no_keys(self):
> +        # If the configured key/cert are missing, processing succeeds but
> +        # nothing is signed.
> +        self.openArchive("test", "1.0", "amd64")
> +        self.archive.add_file("1.0/empty.efi", "")
> +        self.archive.add_file("1.0/empty.ko", "")
> +        self.process_emulate()
> +        self.assertCallCount(0, 'generateUefiKeys')
> +        self.assertCallCount(0, 'generateKmodKeys')
> +        self.assertCallCount(0, 'signUefi')
> +        self.assertCallCount(0, 'signKmod')
> +
> +    def test_archive_primary_keys(self):
> +        # If the configured key/cert are missing, processing succeeds but
> +        # nothing is signed.
> +        self.setUpUefiKeys()
> +        self.setUpKmodKeys()
> +        self.openArchive("test", "1.0", "amd64")
> +        self.archive.add_file("1.0/empty.efi", "")
> +        self.archive.add_file("1.0/empty.ko", "")
> +        self.process_emulate()
> +        self.assertCallCount(0, 'generateUefiKeys')
> +        self.assertCallCount(0, 'generateKmodKeys')
> +        self.assertCallCount(1, 'signUefi')
> +        self.assertCallCount(1, 'signKmod')
> +
> +    def test_PPA_creates_keys(self):
> +        # If the configured key/cert are missing, processing succeeds but
> +        # nothing is signed.
> +        self.setUpPPA()
> +        self.openArchive("test", "1.0", "amd64")
> +        self.archive.add_file("1.0/empty.efi", "")
> +        self.archive.add_file("1.0/empty.ko", "")
> +        self.process_emulate()
> +        self.assertCallCount(1, 'generateUefiKeys')
> +        self.assertCallCount(2, 'generateKmodKeys')
> +        self.assertCallCount(1, 'signUefi')
> +        self.assertCallCount(1, 'signKmod')
> +
> +    def test_options_handling_none(self):
> +        # If the configured key/cert are missing, processing succeeds but
> +        # nothing is signed.
> +        self.openArchive("test", "1.0", "amd64")
> +        self.archive.add_file("1.0/raw-signing.options", "")
> +        upload = self.process_emulate()
> +        self.assertEqual([], sorted(upload.signing_options.keys()))

You can use self.assertContentEqual([], upload.signing_options.keys()) and similar below rather than the explicit sorted().

> +
> +    def test_options_handling_single(self):
> +        # If the configured key/cert are missing, processing succeeds but
> +        # nothing is signed.
> +        self.openArchive("test", "1.0", "amd64")
> +        self.archive.add_file("1.0/raw-signing.options", "first\n")
> +        upload = self.process_emulate()
> +        self.assertEqual(['first'], sorted(upload.signing_options.keys()))
> +
> +    def test_options_handling_multiple(self):
> +        # If the configured key/cert are missing, processing succeeds but
> +        # nothing is signed.
> +        self.openArchive("test", "1.0", "amd64")
> +        self.archive.add_file("1.0/raw-signing.options", "first\nsecond\n")
> +        upload = self.process_emulate()
> +        self.assertEqual(['first', 'second'],
> +            sorted(upload.signing_options.keys()))
> +
> +    def test_options_tarball(self):
> +        # Specifying the "tarball" option should create an tarball in
> +        # the tmpdir.
> +        self.setUpUefiKeys()
> +        self.setUpKmodKeys()
> +        self.openArchive("test", "1.0", "amd64")
> +        self.archive.add_file("1.0/raw-signing.options", "tarball")
> +        self.archive.add_file("1.0/empty.efi", "")
> +        self.archive.add_file("1.0/empty.ko", "")
> +        self.process_emulate()
> +        self.assertFalse(os.path.exists(os.path.join(
> +            self.getSignedPath("test", "amd64"), "1.0", "empty.efi")))
> +        self.assertFalse(os.path.exists(os.path.join(
> +            self.getSignedPath("test", "amd64"), "1.0", "empty.ko")))
> +        self.assertTrue(os.path.exists(os.path.join(
> +            self.getSignedPath("test", "amd64"), "1.0", "signed.tar.gz")))
> +        with tarfile.open(os.path.join(self.getSignedPath("test", "amd64"),
> +            "1.0", "signed.tar.gz")) as tarball:
> +            self.assertEqual([

Confusing indentation, and similar below.  Consider assigning the path to signed.tar.gz to a local variable so that this is a bit more readable.

> +                '1.0',
> +                '1.0/empty.efi',
> +                '1.0/empty.efi.signed',
> +                '1.0/empty.ko',
> +                '1.0/empty.ko.sig',
> +                '1.0/raw-signing.options',
> +                ], sorted(tarball.getnames()))
> +
> +    def test_options_sigonly(self):
> +        # Specifying the "sigonly" option should trigger removal of
> +        # the source files leaving signatures only.
> +        self.setUpUefiKeys()
> +        self.setUpKmodKeys()
> +        self.openArchive("test", "1.0", "amd64")
> +        self.archive.add_file("1.0/raw-signing.options", "sigonly")
> +        self.archive.add_file("1.0/empty.efi", "")
> +        self.archive.add_file("1.0/empty.ko", "")
> +        self.process_emulate()
> +        self.assertFalse(os.path.exists(os.path.join(
> +            self.getSignedPath("test", "amd64"), "1.0", "empty.efi")))
> +        self.assertTrue(os.path.exists(os.path.join(
> +            self.getSignedPath("test", "amd64"), "1.0", "empty.efi.signed")))
> +        self.assertFalse(os.path.exists(os.path.join(
> +            self.getSignedPath("test", "amd64"), "1.0", "empty.ko")))
> +        self.assertTrue(os.path.exists(os.path.join(
> +            self.getSignedPath("test", "amd64"), "1.0", "empty.ko.sig")))
> +
> +    def test_options_tarball_sigonly(self):
> +        # Specifying the "tarball" option should create an tarball in
> +        # the tmpdir.  Adding sigonly should trigger removal of the
> +        # original files.
> +        self.setUpUefiKeys()
> +        self.setUpKmodKeys()
> +        self.openArchive("test", "1.0", "amd64")
> +        self.archive.add_file("1.0/raw-signing.options", "tarball\nsigonly")
> +        self.archive.add_file("1.0/empty.efi", "")
> +        self.archive.add_file("1.0/empty.ko", "")
> +        self.process_emulate()
> +        with tarfile.open(os.path.join(self.getSignedPath("test", "amd64"),
> +            "1.0", "signed.tar.gz")) as tarball:
> +            self.assertEqual([
> +                '1.0',
> +                '1.0/empty.efi.signed',
> +                '1.0/empty.ko.sig',
> +                '1.0/raw-signing.options',
> +                ], sorted(tarball.getnames()))
> +
> +    def test_no_signed_files(self):
>          # Tarballs containing no *.efi files are extracted without complaint.
>          # Nothing is signed.
> -        self.setUpKeyAndCert()
> +        self.setUpUefiKeys()
>          self.openArchive("empty", "1.0", "amd64")
>          self.archive.add_file("1.0/hello", "world")
>          upload = self.process()
>          self.assertTrue(os.path.exists(os.path.join(
>              self.getSignedPath("empty", "amd64"), "1.0", "hello")))
>          self.assertEqual(0, upload.signUefi.call_count)
> +        self.assertEqual(0, upload.signKmod.call_count)
>  
>      def test_already_exists(self):
>          # If the target directory already exists, processing fails.
> -        self.setUpKeyAndCert()
> +        self.setUpUefiKeys()
>          self.openArchive("test", "1.0", "amd64")
>          self.archive.add_file("1.0/empty.efi", "")
>          os.makedirs(os.path.join(self.getSignedPath("test", "amd64"), "1.0"))


-- 
https://code.launchpad.net/~apw/launchpad/signing-add-kernel-module-signing/+merge/294948
Your team Launchpad code reviewers is subscribed to branch lp:launchpad.


References