← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:copy-signingkeys into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:copy-signingkeys into launchpad:master.

Commit message:
Add a new copy-signingkeys script

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/396443

This copies ArchiveSigningKeys between archives, for situations where we want the same signing key to be usable by more than one archive.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:copy-signingkeys into launchpad:master.
diff --git a/lib/lp/archivepublisher/scripts/copy_signingkeys.py b/lib/lp/archivepublisher/scripts/copy_signingkeys.py
new file mode 100644
index 0000000..7d7be1d
--- /dev/null
+++ b/lib/lp/archivepublisher/scripts/copy_signingkeys.py
@@ -0,0 +1,114 @@
+# Copyright 2021 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Script to copy signing keys between archives."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+
+__all__ = [
+    'CopySigningKeysScript',
+    ]
+
+import sys
+
+import transaction
+from zope.component import getUtility
+
+from lp.app.errors import NotFoundError
+from lp.services.scripts.base import (
+    LaunchpadScript,
+    LaunchpadScriptFailure,
+    )
+from lp.services.signing.enums import SigningKeyType
+from lp.services.signing.interfaces.signingkey import IArchiveSigningKeySet
+from lp.soyuz.interfaces.archive import IArchiveSet
+
+
+class CopySigningKeysScript(LaunchpadScript):
+
+    usage = "Usage: %prog [options] FROM_ARCHIVE TO_ARCHIVE"
+    description = "Copy signing keys between archives."
+
+    def add_my_options(self):
+        self.parser.add_option(
+            "-t", "--key-type",
+            help="The type of keys to copy (default: all types).")
+
+        self.parser.add_option("-s", "--series", help="Series name.")
+
+        self.parser.add_option(
+            "-n", "--dry-run", action="store_true", default=False,
+            help="Report what would be done, but don't actually copy keys.")
+
+    def getArchive(self, reference):
+        archive = getUtility(IArchiveSet).getByReference(reference)
+        if archive is None:
+            raise LaunchpadScriptFailure(
+                "Could not find archive '%s'." % reference)
+        return archive
+
+    def getKeyTypes(self, name):
+        if name is not None:
+            try:
+                return [SigningKeyType.getTermByToken(name).value]
+            except LookupError:
+                raise LaunchpadScriptFailure(
+                    "There is no signing key type named '%s'." % name)
+        else:
+            return list(SigningKeyType.items)
+
+    def getSeries(self, series_name):
+        if series_name is None:
+            return None
+        try:
+            return self.from_archive.distribution[series_name]
+        except NotFoundError:
+            raise LaunchpadScriptFailure(
+                "Could not find series '%s' in %s." %
+                (series_name, self.from_archive.distribution.display_name))
+
+    def handle_options(self):
+        if len(self.args) != 2:
+            self.parser.print_help()
+            sys.exit(1)
+        self.from_archive = self.getArchive(self.args[0])
+        self.to_archive = self.getArchive(self.args[1])
+        self.key_types = self.getKeyTypes(self.options.key_type)
+        self.series = self.getSeries(self.options.series)
+
+    def copy(self, from_archive, to_archive, key_type, series=None):
+        series_name = series.name if series else None
+        from_archive_signing_key = getUtility(IArchiveSigningKeySet).get(
+            key_type, from_archive, series, exact_match=True)
+        if from_archive_signing_key is None:
+            self.logger.info(
+                "No %s signing key for %s / %s",
+                key_type, from_archive.reference, series_name)
+            return
+        to_archive_signing_key = getUtility(IArchiveSigningKeySet).get(
+            key_type, to_archive, series, exact_match=True)
+        if to_archive_signing_key is not None:
+            self.logger.warning(
+                "%s signing key for %s / %s already exists",
+                key_type, to_archive.reference, series_name)
+            return
+        self.logger.info(
+            "Copying %s signing key %s from %s / %s to %s / %s",
+            key_type, from_archive_signing_key.signing_key.fingerprint,
+            from_archive.reference, series_name,
+            to_archive.reference, series_name)
+        getUtility(IArchiveSigningKeySet).create(
+            to_archive, series, from_archive_signing_key.signing_key)
+
+    def main(self):
+        for key_type in self.key_types:
+            self.copy(
+                self.from_archive, self.to_archive, key_type,
+                series=self.series)
+        if self.options.dry_run:
+            self.logger.info("Dry run requested.  Not committing changes.")
+            transaction.abort()
+        else:
+            transaction.commit()
diff --git a/lib/lp/archivepublisher/tests/test_copy_signingkeys.py b/lib/lp/archivepublisher/tests/test_copy_signingkeys.py
new file mode 100644
index 0000000..bc179ed
--- /dev/null
+++ b/lib/lp/archivepublisher/tests/test_copy_signingkeys.py
@@ -0,0 +1,302 @@
+# Copyright 2021 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Test cases for copying signing keys between archives."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+
+from testtools.content import text_content
+from testtools.matchers import (
+    MatchesSetwise,
+    MatchesStructure,
+    )
+
+from lp.archivepublisher.scripts.copy_signingkeys import CopySigningKeysScript
+from lp.services.config import config
+from lp.services.database.interfaces import IStore
+from lp.services.log.logger import BufferLogger
+from lp.services.scripts.base import LaunchpadScriptFailure
+from lp.services.signing.enums import SigningKeyType
+from lp.services.signing.model.signingkey import ArchiveSigningKey
+from lp.services.utils import CapturedOutput
+from lp.testing import TestCaseWithFactory
+from lp.testing.layers import ZopelessDatabaseLayer
+
+
+class TestCopySigningKeysScript(TestCaseWithFactory):
+
+    layer = ZopelessDatabaseLayer
+
+    def makeScript(self, test_args=None, archives=None, expect_exit=False):
+        test_args = [] if test_args is None else list(test_args)
+        if archives is None:
+            archives = [self.factory.makeArchive() for _ in range(2)]
+        test_args.extend(archive.reference for archive in archives)
+        try:
+            with CapturedOutput() as captured:
+                script = CopySigningKeysScript(
+                    "copy-signingkeys", dbuser=config.archivepublisher.dbuser,
+                    test_args=test_args)
+        except SystemExit:
+            exited = True
+        else:
+            exited = False
+        stdout = captured.stdout.getvalue()
+        stderr = captured.stderr.getvalue()
+        if stdout:
+            self.addDetail("stdout", text_content(stdout))
+        if stderr:
+            self.addDetail("stderr", text_content(stderr))
+        if expect_exit:
+            if not exited:
+                raise AssertionError('Script unexpectedly exited successfully')
+        else:
+            if exited:
+                raise AssertionError(
+                    'Script unexpectedly exited unsuccessfully')
+            self.assertEqual("", stderr)
+            script.logger = BufferLogger()
+            return script
+
+    def findKeys(self, archives):
+        return IStore(ArchiveSigningKey).find(
+            ArchiveSigningKey,
+            ArchiveSigningKey.archive_id.is_in(
+                archive.id for archive in archives))
+
+    def test_getArchive(self):
+        archives = [self.factory.makeArchive() for _ in range(2)]
+        script = self.makeScript(archives=archives)
+        self.assertEqual(archives[0], script.from_archive)
+        self.assertEqual(archives[1], script.to_archive)
+
+    def test_getKeyTypes_all(self):
+        script = self.makeScript()
+        self.assertEqual(list(SigningKeyType.items), script.key_types)
+
+    def test_getKeyTypes_with_selection(self):
+        script = self.makeScript(test_args=["--key-type", "UEFI"])
+        self.assertEqual([SigningKeyType.UEFI], script.key_types)
+
+    def test_getSeries_none(self):
+        script = self.makeScript()
+        self.assertIsNone(script.series)
+
+    def test_getSeries_no_such_series(self):
+        archives = [self.factory.makeArchive() for _ in range(2)]
+        self.assertRaisesWithContent(
+            LaunchpadScriptFailure,
+            "Could not find series 'nonexistent' in %s." % (
+                archives[0].distribution.display_name),
+            self.makeScript,
+            test_args=["-s", "nonexistent"], archives=archives)
+
+    def test_getSeries(self):
+        archives = [self.factory.makeArchive() for _ in range(2)]
+        distro_series = self.factory.makeDistroSeries(
+            distribution=archives[0].distribution)
+        script = self.makeScript(
+            test_args=["-s", distro_series.name], archives=archives)
+        self.assertEqual(distro_series, script.series)
+
+    def test_wrong_number_of_arguments(self):
+        archives = [self.factory.makeArchive() for _ in range(3)]
+        self.makeScript(archives=archives[:1], expect_exit=True)
+        self.makeScript(archives=archives, expect_exit=True)
+
+    def test_copy_all_no_series(self):
+        archives = [self.factory.makeArchive() for _ in range(3)]
+        signing_keys = [
+            self.factory.makeSigningKey(key_type=key_type)
+            for key_type in (
+                SigningKeyType.UEFI, SigningKeyType.KMOD, SigningKeyType.OPAL)]
+        for signing_key in signing_keys[:2]:
+            self.factory.makeArchiveSigningKey(
+                archive=archives[0], signing_key=signing_key)
+        distro_series = self.factory.makeDistroSeries(
+            distribution=archives[0].distribution)
+        self.factory.makeArchiveSigningKey(
+            archive=archives[0], distro_series=distro_series,
+            signing_key=signing_keys[1])
+        self.factory.makeArchiveSigningKey(
+            archive=archives[2], signing_key=signing_keys[2])
+        script = self.makeScript(archives=archives[:2])
+        script.main()
+        expected_log = [
+            "INFO Copying UEFI signing key %s from %s / None to %s / None" % (
+                signing_keys[0].fingerprint,
+                archives[0].reference, archives[1].reference),
+            "INFO Copying Kmod signing key %s from %s / None to %s / None" % (
+                signing_keys[1].fingerprint,
+                archives[0].reference, archives[1].reference),
+            "INFO No OPAL signing key for %s / None" % archives[0].reference,
+            "INFO No SIPL signing key for %s / None" % archives[0].reference,
+            "INFO No FIT signing key for %s / None" % archives[0].reference,
+            "INFO No OpenPGP signing key for %s / None" %
+                archives[0].reference,
+            ]
+        self.assertEqual(
+            expected_log, script.logger.content.as_text().splitlines())
+        self.assertThat(
+            self.findKeys(archives),
+            MatchesSetwise(
+                MatchesStructure.byEquality(
+                    archive=archives[0], earliest_distro_series=None,
+                    key_type=SigningKeyType.UEFI, signing_key=signing_keys[0]),
+                MatchesStructure.byEquality(
+                    archive=archives[0], earliest_distro_series=None,
+                    key_type=SigningKeyType.KMOD, signing_key=signing_keys[1]),
+                MatchesStructure.byEquality(
+                    archive=archives[0], earliest_distro_series=distro_series,
+                    key_type=SigningKeyType.KMOD, signing_key=signing_keys[1]),
+                MatchesStructure.byEquality(
+                    archive=archives[1], earliest_distro_series=None,
+                    key_type=SigningKeyType.UEFI, signing_key=signing_keys[0]),
+                MatchesStructure.byEquality(
+                    archive=archives[1], earliest_distro_series=None,
+                    key_type=SigningKeyType.KMOD, signing_key=signing_keys[1]),
+                MatchesStructure.byEquality(
+                    archive=archives[2], earliest_distro_series=None,
+                    key_type=SigningKeyType.OPAL, signing_key=signing_keys[2]),
+                ))
+
+    def test_copy_by_key_type(self):
+        archives = [self.factory.makeArchive() for _ in range(3)]
+        signing_keys = [
+            self.factory.makeSigningKey(key_type=key_type)
+            for key_type in (SigningKeyType.UEFI, SigningKeyType.KMOD)]
+        for signing_key in signing_keys:
+            self.factory.makeArchiveSigningKey(
+                archive=archives[0], signing_key=signing_key)
+        distro_series = self.factory.makeDistroSeries(
+            distribution=archives[0].distribution)
+        self.factory.makeArchiveSigningKey(
+            archive=archives[0], distro_series=distro_series,
+            signing_key=signing_keys[0])
+        script = self.makeScript(
+            test_args=["--key-type", "UEFI"], archives=archives[:2])
+        script.main()
+        expected_log = [
+            "INFO Copying UEFI signing key %s from %s / None to %s / None" % (
+                signing_keys[0].fingerprint,
+                archives[0].reference, archives[1].reference),
+            ]
+        self.assertEqual(
+            expected_log, script.logger.content.as_text().splitlines())
+        self.assertThat(
+            self.findKeys(archives),
+            MatchesSetwise(
+                MatchesStructure.byEquality(
+                    archive=archives[0], earliest_distro_series=None,
+                    key_type=SigningKeyType.UEFI, signing_key=signing_keys[0]),
+                MatchesStructure.byEquality(
+                    archive=archives[0], earliest_distro_series=None,
+                    key_type=SigningKeyType.KMOD, signing_key=signing_keys[1]),
+                MatchesStructure.byEquality(
+                    archive=archives[0], earliest_distro_series=distro_series,
+                    key_type=SigningKeyType.UEFI, signing_key=signing_keys[0]),
+                MatchesStructure.byEquality(
+                    archive=archives[1], earliest_distro_series=None,
+                    key_type=SigningKeyType.UEFI, signing_key=signing_keys[0]),
+                ))
+
+    def test_copy_by_series(self):
+        distribution = self.factory.makeDistribution()
+        archives = [
+            self.factory.makeArchive(distribution=distribution)
+            for _ in range(3)]
+        signing_keys = [
+            self.factory.makeSigningKey(key_type=key_type)
+            for key_type in (SigningKeyType.UEFI, SigningKeyType.KMOD)]
+        distro_serieses = [
+            self.factory.makeDistroSeries(distribution=distribution)
+            for _ in range(2)]
+        for signing_key in signing_keys:
+            self.factory.makeArchiveSigningKey(
+                archive=archives[0], distro_series=distro_serieses[0],
+                signing_key=signing_key)
+        self.factory.makeArchiveSigningKey(
+            archive=archives[0], signing_key=signing_keys[0])
+        self.factory.makeArchiveSigningKey(
+            archive=archives[0], distro_series=distro_serieses[1],
+            signing_key=signing_keys[0])
+        script = self.makeScript(
+            test_args=["-s", distro_serieses[0].name], archives=archives[:2])
+        script.main()
+        expected_log = [
+            "INFO Copying UEFI signing key %s from %s / %s to %s / %s" % (
+                signing_keys[0].fingerprint,
+                archives[0].reference, distro_serieses[0].name,
+                archives[1].reference, distro_serieses[0].name),
+            "INFO Copying Kmod signing key %s from %s / %s to %s / %s" % (
+                signing_keys[1].fingerprint,
+                archives[0].reference, distro_serieses[0].name,
+                archives[1].reference, distro_serieses[0].name),
+            "INFO No OPAL signing key for %s / %s" % (
+                archives[0].reference, distro_serieses[0].name),
+            "INFO No SIPL signing key for %s / %s" % (
+                archives[0].reference, distro_serieses[0].name),
+            "INFO No FIT signing key for %s / %s" % (
+                archives[0].reference, distro_serieses[0].name),
+            "INFO No OpenPGP signing key for %s / %s" % (
+                archives[0].reference, distro_serieses[0].name),
+            ]
+        self.assertEqual(
+            expected_log, script.logger.content.as_text().splitlines())
+        self.assertThat(
+            self.findKeys(archives),
+            MatchesSetwise(
+                MatchesStructure.byEquality(
+                    archive=archives[0], earliest_distro_series=None,
+                    key_type=SigningKeyType.UEFI, signing_key=signing_keys[0]),
+                MatchesStructure.byEquality(
+                    archive=archives[0],
+                    earliest_distro_series=distro_serieses[0],
+                    key_type=SigningKeyType.UEFI, signing_key=signing_keys[0]),
+                MatchesStructure.byEquality(
+                    archive=archives[0],
+                    earliest_distro_series=distro_serieses[0],
+                    key_type=SigningKeyType.KMOD, signing_key=signing_keys[1]),
+                MatchesStructure.byEquality(
+                    archive=archives[0],
+                    earliest_distro_series=distro_serieses[1],
+                    key_type=SigningKeyType.UEFI, signing_key=signing_keys[0]),
+                MatchesStructure.byEquality(
+                    archive=archives[1],
+                    earliest_distro_series=distro_serieses[0],
+                    key_type=SigningKeyType.UEFI, signing_key=signing_keys[0]),
+                MatchesStructure.byEquality(
+                    archive=archives[1],
+                    earliest_distro_series=distro_serieses[0],
+                    key_type=SigningKeyType.KMOD, signing_key=signing_keys[1]),
+                ))
+
+    def test_copy_refuses_overwrite(self):
+        archives = [self.factory.makeArchive() for _ in range(2)]
+        signing_keys = [
+            self.factory.makeSigningKey(key_type=SigningKeyType.UEFI)
+            for _ in range(2)]
+        for archive, signing_key in zip(archives, signing_keys):
+            self.factory.makeArchiveSigningKey(
+                archive=archive, signing_key=signing_key)
+        script = self.makeScript(
+            test_args=["--key-type", "UEFI"], archives=archives[:2])
+        script.main()
+        expected_log = [
+            "WARNING UEFI signing key for %s / None already exists" %
+                archives[1].reference,
+            ]
+        self.assertEqual(
+            expected_log, script.logger.content.as_text().splitlines())
+        self.assertThat(
+            self.findKeys(archives),
+            MatchesSetwise(
+                MatchesStructure.byEquality(
+                    archive=archives[0], earliest_distro_series=None,
+                    key_type=SigningKeyType.UEFI, signing_key=signing_keys[0]),
+                MatchesStructure.byEquality(
+                    archive=archives[1], earliest_distro_series=None,
+                    key_type=SigningKeyType.UEFI, signing_key=signing_keys[1]),
+                ))
diff --git a/scripts/copy-signingkeys.py b/scripts/copy-signingkeys.py
new file mode 100755
index 0000000..4f226bf
--- /dev/null
+++ b/scripts/copy-signingkeys.py
@@ -0,0 +1,16 @@
+#!/usr/bin/python2 -S
+# Copyright 2021 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Script to copy signing keys between archives."""
+
+import _pythonpath
+
+from lp.archivepublisher.scripts.copy_signingkeys import CopySigningKeysScript
+from lp.services.config import config
+
+
+if __name__ == '__main__':
+    script = CopySigningKeysScript(
+        'copy-signingkeys', dbuser=config.archivepublisher.dbuser)
+    script.lock_and_run()