← Back to team overview

launchpad-reviewers team mailing list archive

Re: [Merge] lp:~apw/launchpad/signing-add-kernel-module-signing into lp:launchpad

 

Review: Approve



Diff comments:

> === modified file 'lib/lp/archivepublisher/signing.py'
> --- lib/lp/archivepublisher/signing.py	2016-05-11 15:31:20 +0000
> +++ lib/lp/archivepublisher/signing.py	2016-05-19 10:26:50 +0000
> @@ -107,81 +140,164 @@
>          except ValueError:
>              return None
>  
> -    def findEfiFilenames(self):
> -        """Find all the *.efi files in an extracted tarball."""
> +    def getArchiveOwnerAndName(self):
> +        # XXX apw 2016-05-18: pull out the PPA owner and name to seed key CN
> +        archive_name = os.path.dirname(self.archiveroot)
> +        owner_name = os.path.basename(os.path.dirname(archive_name))
> +        archive_name = os.path.basename(archive_name)
> +
> +        return owner_name + ' ' + archive_name
> +
> +    def callLog(self, description, cmdl):
> +        status = subprocess.call(cmdl)
> +        if status != 0:
> +            # Just log this rather than failing, since custom upload errors
> +            # tend to make the publisher rather upset.
> +            if self.logger is not None:
> +                self.logger.warning("%s Failed (cmd='%s')" %
> +                    (description, " ".join(cmdl)))
> +        return status
> +
> +    def findSigningHandlers(self):
> +        """Find all the signable files in an extracted tarball."""
>          for dirpath, dirnames, filenames in os.walk(self.tmpdir):
>              for filename in filenames:
>                  if filename.endswith(".efi"):
> -                    yield os.path.join(dirpath, filename)
> +                    yield (os.path.join(dirpath, filename), self.signUefi)
> +                elif filename.endswith(".ko"):
> +                    yield (os.path.join(dirpath, filename), self.signKmod)
> +
> +    def getKeys(self, which, generate, *keynames):
> +        """Validate and return the uefi key and cert for encryption."""
> +
> +        if self.autokey:
> +            for keyfile in keynames:
> +                if keyfile and not os.path.exists(keyfile):
> +                    generate()
> +                    break
> +
> +        valid = True
> +        for keyfile in keynames:
> +            if keyfile and not os.access(keyfile, os.R_OK):
> +                if self.logger is not None:
> +                    self.logger.warning(
> +                        "%s key %s not readable" % (which, keyfile))
> +                valid = False
> +
> +        if not valid:
> +            return [None for k in keynames]
> +        return keynames
>  
>      def generateUefiKeys(self):
>          """Generate new UEFI Keys for this archive."""
> -        directory = os.path.dirname(self.key)
> +        directory = os.path.dirname(self.uefi_key)
>          if not os.path.exists(directory):
>              os.makedirs(directory)
>  
> -        # XXX: pull out the PPA owner and name to seed key CN
> -        archive_name = os.path.dirname(self.archiveroot)
> -        owner_name = os.path.basename(os.path.dirname(archive_name))
> -        archive_name = os.path.basename(archive_name)
> -        common_name = '/CN=PPA ' + owner_name + ' ' + archive_name + '/'
> +        common_name = '/CN=PPA ' + self.getArchiveOwnerAndName() + '/'
>  
>          old_mask = os.umask(0o077)
>          try:
>              new_key_cmd = [
>                  'openssl', 'req', '-new', '-x509', '-newkey', 'rsa:2048',
> -                '-subj', common_name, '-keyout', self.key, '-out', self.cert,
> -                '-days', '3650', '-nodes', '-sha256',
> +                '-subj', common_name, '-keyout', self.uefi_key,
> +                '-out', self.uefi_cert, '-days', '3650', '-nodes', '-sha256',
>                  ]
> -            if subprocess.call(new_key_cmd) != 0:
> -                # Just log this rather than failing, since custom upload errors
> -                # tend to make the publisher rather upset.
> -                if self.logger is not None:
> -                    self.logger.warning(
> -                        "Failed to generate UEFI signing keys for %s" %
> -                        common_name)
> +            self.callLog("UEFI keygen", new_key_cmd)
>          finally:
>              os.umask(old_mask)
>  
> -        if os.path.exists(self.cert):
> -            os.chmod(self.cert, 0o644)
> -
> -    def getUefiKeys(self):
> -        """Validate and return the uefi key and cert for encryption."""
> -
> -        if self.key and self.cert:
> -            # If neither of the key files exists then attempt to
> -            # generate them.
> -            if (self.autokey and not os.path.exists(self.key)
> -                and not os.path.exists(self.cert)):
> -                self.generateUefiKeys()
> -
> -            # If we have keys, but cannot read them they are dead to us.
> -            if not os.access(self.key, os.R_OK):
> -                if self.logger is not None:
> -                    self.logger.warning(
> -                        "UEFI private key %s not readable" % self.key)
> -                self.key = None
> -            if not os.access(self.cert, os.R_OK):
> -                if self.logger is not None:
> -                    self.logger.warning(
> -                        "UEFI certificate %s not readable" % self.cert)
> -                self.cert = None
> -
> -        return (self.key, self.cert)
> +        if os.path.exists(self.uefi_cert):
> +            os.chmod(self.uefi_cert, 0o644)
>  
>      def signUefi(self, image):
>          """Attempt to sign an image."""
> -        (key, cert) = self.getUefiKeys()
> +        remove_if_exists("%s.signed" % image)
> +        (key, cert) = self.getKeys('UEFI', self.generateUefiKeys,
> +            self.uefi_key, self.uefi_cert)
>          if not key or not cert:
>              return
>          cmdl = ["sbsign", "--key", key, "--cert", cert, image]
> -        if subprocess.call(cmdl) != 0:
> -            # Just log this rather than failing, since custom upload errors
> -            # tend to make the publisher rather upset.
> -            if self.logger is not None:
> -                self.logger.warning("UEFI Signing Failed '%s'" %
> -                    " ".join(cmdl))
> +        return self.callLog("UEFI signing", cmdl)
> +
> +    def generateKmodKeys(self):
> +        """Generate new Kernel Signing Keys for this archive."""
> +        directory = os.path.dirname(self.kmod_pem)
> +        if not os.path.exists(directory):
> +            os.makedirs(directory)
> +
> +        old_mask = os.umask(0o077)
> +        try:
> +            with tempfile.NamedTemporaryFile(suffix='.keygen') as tf:
> +                common_name = self.getArchiveOwnerAndName()
> +
> +                genkey_text = textwrap.dedent("""\
> +                    [ req ]
> +                    default_bits = 4096
> +                    distinguished_name = req_distinguished_name
> +                    prompt = no
> +                    string_mask = utf8only
> +                    x509_extensions = myexts
> +
> +                    [ req_distinguished_name ]
> +                    CN = /CN=PPA """ + common_name + """ kmod/
> +
> +                    [ myexts ]
> +                    basicConstraints=critical,CA:FALSE
> +                    keyUsage=digitalSignature
> +                    subjectKeyIdentifier=hash
> +                    authorityKeyIdentifier=keyid
> +                    """)
> +
> +                print(genkey_text, file=tf)
> +
> +                # Close out the underlying file so we know it is complete.
> +                tf.file.close()
> +
> +                new_key_cmd = [
> +                    'openssl', 'req', '-new', '-nodes', '-utf8', '-sha512',
> +                    '-days', '3650', '-batch', '-x509', '-config', tf.name,
> +                    '-outform', 'PEM', '-out', self.kmod_pem,
> +                    '-keyout', self.kmod_pem
> +                    ]
> +                if self.callLog("Kmod keygen key", new_key_cmd) == 0:
> +                    new_x509_cmd = [
> +                        'openssl', 'x509', '-in', self.kmod_pem,
> +                        '-outform', 'DER', '-out', self.kmod_x509
> +                        ]
> +                    if self.callLog("Kmod keygen cert", new_x509_cmd) != 0:
> +                        os.unlink(self.kmod_pem)
> +        finally:
> +            os.umask(old_mask)
> +
> +    def signKmod(self, image):
> +        """Attempt to sign a kernel module."""
> +        remove_if_exists("%s.sig" % image)
> +        (pem, cert) = self.getKeys('Kernel Module', self.generateKmodKeys,
> +            self.kmod_pem, self.kmod_x509)
> +        if not pem or not cert:
> +            return
> +        cmdl = ["kmodsign", "-D", "sha512", pem, cert, image, image + ".sig"]
> +        return self.callLog("Kmod signing", cmdl)
> +
> +    def convertToTarball(self):
> +        """Convert unpacked output to signing tarball."""
> +        tarfilename = os.path.join(self.tmpdir, "signed.tar.gz")
> +        versiondir = os.path.join(self.tmpdir, self.version)
> +
> +        try:
> +            with tarfile.open(tarfilename, "w:gz") as tarball:
> +                tarball.add(versiondir, arcname=self.version)
> +        except tarfile.TarError as exc:
> +            raise SigningUploadPackError(tarfilename, exc)
> +
> +        # Clean out the original tree and move the signing tarball in.
> +        try:
> +            shutil.rmtree(versiondir)
> +            os.mkdir(versiondir)
> +            os.rename(tarfilename, os.path.join(versiondir, "signed.tar.gz"))
> +        except os.OSError as exc:

This is usually just spelled OSError.

> +            raise SigningUploadPackError(tarfilename, exc)
>  
>      def extract(self):
>          """Copy the custom upload to a temporary directory, and sign it.
> 
> === modified file 'lib/lp/archivepublisher/tests/test_signing.py'
> --- lib/lp/archivepublisher/tests/test_signing.py	2016-05-11 12:59:17 +0000
> +++ lib/lp/archivepublisher/tests/test_signing.py	2016-05-19 10:26:50 +0000
> @@ -20,20 +21,55 @@
>  from lp.testing.fakemethod import FakeMethod
>  
>  
> -class FakeMethodGenUefiKeys(FakeMethod):
> -    """Fake execution of generation of Uefi keys pairs."""
> +class FakeMethodCallLog(FakeMethod):
> +    """Fake execution general commands."""
>      def __init__(self, upload=None, *args, **kwargs):
> -        super(FakeMethodGenUefiKeys, self).__init__(*args, **kwargs)
> +        super(FakeMethodCallLog, self).__init__(*args, **kwargs)
>          self.upload = upload
> +        self.callers = {
> +            "UEFI signing": 0,
> +            "Kmod signing": 0,
> +            "UEFI keygen": 0,
> +            "Kmod keygen key": 0,
> +            "Kmod keygen cert": 0,
> +            }
>  
>      def __call__(self, *args, **kwargs):
> -        super(FakeMethodGenUefiKeys, self).__call__(*args, **kwargs)
> -
> -        write_file(self.upload.key, "")
> -        write_file(self.upload.cert, "")
> -
> -
> -class FakeConfig:
> +        super(FakeMethodCallLog, self).__call__(*args, **kwargs)
> +
> +        description = args[0]
> +        cmdl = args[1]
> +        self.callers[description] += 1
> +        if description == "UEFI signing":
> +            filename = cmdl[-1]
> +            if filename.endswith(".efi"):
> +                write_file(filename + ".signed", "")
> +
> +        elif description == "Kmod signing":
> +            filename = cmdl[-1]
> +            if filename.endswith(".ko.sig"):
> +                write_file(filename, "")
> +
> +        elif description == "Kmod keygen cert":
> +            write_file(self.upload.kmod_x509, "")
> +
> +        elif description == "Kmod keygen key":
> +            write_file(self.upload.kmod_pem, "")
> +
> +        elif description == "UEFI keygen":
> +            write_file(self.upload.uefi_key, "")
> +            write_file(self.upload.uefi_cert, "")
> +
> +        else:
> +            self.fail()

Is that a thing?  I don't see it defined anywhere.  I'd probably 'raise AssertionError("some useful message")' instead.

> +
> +        return 0
> +
> +    def caller_count(self, caller):
> +        return self.callers.get(caller, 0)
> +
> +
> +class FakeConfigPrimary:
>      """A fake publisher configuration for the main archive."""
>      def __init__(self, distroroot, signingroot):
>          self.distroroot = distroroot
> @@ -205,17 +398,102 @@
>              ]
>          self.assertEqual(expected_cmd, args)
>  
> -    def test_signs_image(self):
> -        # Each image in the tarball is signed.
> -        self.setUpKeyAndCert()
> -        self.openArchive("test", "1.0", "amd64")
> -        self.archive.add_file("1.0/empty.efi", "")
> -        upload = self.process()
> -        self.assertEqual(1, upload.signUefi.call_count)
> +    def test_correct_kmod_signing_command_executed(self):
> +        # Check that calling signKmod() will generate the expected command
> +        # when appropriate keys are present.
> +        self.setUpKmodKeys()
> +        fake_call = FakeMethod(result=0)
> +        self.useFixture(MonkeyPatch("subprocess.call", fake_call))
> +        upload = SigningUpload()
> +        upload.generateKmodKeys = FakeMethod()
> +        upload.setTargetDirectory(
> +            self.pubconf, "test_1.0_amd64.tar.gz", "distroseries")
> +        upload.signKmod('t.ko')
> +        self.assertEqual(1, fake_call.call_count)
> +        # Assert command form.
> +        args = fake_call.calls[0][0][0]
> +        expected_cmd = [
> +            'kmodsign', '-D', 'sha512', self.kmod_pem, self.kmod_x509,
> +            't.ko', 't.ko.sig'
> +            ]
> +        self.assertEqual(expected_cmd, args)
> +        self.assertEqual(0, upload.generateKmodKeys.call_count)
> +
> +    def test_correct_kmod_signing_command_executed_no_keys(self):
> +        # Check that calling signKmod() will generate no commands when
> +        # no keys are present.
> +        self.setUpKmodKeys(create=False)
> +        fake_call = FakeMethod(result=0)
> +        self.useFixture(MonkeyPatch("subprocess.call", fake_call))
> +        upload = SigningUpload()
> +        upload.generateKmodKeys = FakeMethod()
> +        upload.setTargetDirectory(
> +            self.pubconf, "test_1.0_amd64.tar.gz", "distroseries")
> +        upload.signUefi('t.ko')
> +        self.assertEqual(0, fake_call.call_count)
> +        self.assertEqual(0, upload.generateKmodKeys.call_count)
> +
> +    def test_correct_kmod_keygen_command_executed(self):
> +        # Check that calling generateUefiKeys() will generate the
> +        # expected command.
> +        self.setUpPPA()
> +        self.setUpKmodKeys(create=False)
> +        fake_call = FakeMethod(result=0)
> +        self.useFixture(MonkeyPatch("subprocess.call", fake_call))
> +        upload = SigningUpload()
> +        upload.setTargetDirectory(
> +            self.pubconf, "test_1.0_amd64.tar.gz", "distroseries")
> +        upload.generateKmodKeys()
> +        self.assertEqual(2, fake_call.call_count)
> +        # Assert the actual command matches.
> +        args = fake_call.calls[0][0][0]
> +        # Sanitise the keygen tmp file.
> +        if args[11] .endswith('.keygen'):

Lose the space.

> +            args[11] = 'XXX.keygen'
> +        expected_cmd = [
> +            'openssl', 'req', '-new', '-nodes', '-utf8', '-sha512',
> +            '-days', '3650', '-batch', '-x509',
> +            '-config', 'XXX.keygen', '-outform', 'PEM',
> +            '-out', self.kmod_pem, '-keyout', self.kmod_pem
> +            ]
> +        self.assertEqual(expected_cmd, args)
> +        args = fake_call.calls[1][0][0]
> +        expected_cmd = [
> +            'openssl', 'x509', '-in', self.kmod_pem, '-outform', 'DER',
> +            '-out', self.kmod_x509
> +            ]
> +        self.assertEqual(expected_cmd, args)
> +
> +    def test_signs_uefi_image(self):
> +        # Each image in the tarball is signed.
> +        self.setUpUefiKeys()
> +        self.openArchive("test", "1.0", "amd64")
> +        self.archive.add_file("1.0/empty.efi", "")
> +        upload = self.process()
> +        self.assertEqual(1, upload.signUefi.call_count)
> +
> +    def test_signs_kmod_image(self):
> +        # Each image in the tarball is signed.
> +        self.setUpKmodKeys()
> +        self.openArchive("test", "1.0", "amd64")
> +        self.archive.add_file("1.0/empty.ko", "")
> +        upload = self.process()
> +        self.assertEqual(1, upload.signKmod.call_count)
> +
> +    def test_signs_combo_image(self):
> +        # Each image in the tarball is signed.
> +        self.setUpKmodKeys()
> +        self.openArchive("test", "1.0", "amd64")
> +        self.archive.add_file("1.0/empty.efi", "")
> +        self.archive.add_file("1.0/empty.ko", "")
> +        self.archive.add_file("1.0/empty2.ko", "")
> +        upload = self.process()
> +        self.assertEqual(1, upload.signUefi.call_count)
> +        self.assertEqual(2, upload.signKmod.call_count)
>  
>      def test_installed(self):
>          # Files in the tarball are installed correctly.
> -        self.setUpKeyAndCert()
> +        self.setUpUefiKeys()
>          self.openArchive("test", "1.0", "amd64")
>          self.archive.add_file("1.0/empty.efi", "")
>          self.process()


-- 
https://code.launchpad.net/~apw/launchpad/signing-add-kernel-module-signing/+merge/294948
Your team Launchpad code reviewers is subscribed to branch lp:launchpad.


References