← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~pappacena/launchpad:bulk-inject-lp-signing into launchpad:master

 

Thiago F. Pappacena has proposed merging ~pappacena/launchpad:bulk-inject-lp-signing into launchpad:master with ~pappacena/launchpad:inject-lp-signing-generated-keys as a prerequisite.

Commit message:
Adding the command to bulk import to signing service the existing signing keys for all archives.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~pappacena/launchpad/+git/launchpad/+merge/382948
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/launchpad:bulk-inject-lp-signing into launchpad:master.
diff --git a/lib/lp/archivepublisher/scripts/sync_signingkeys.py b/lib/lp/archivepublisher/scripts/sync_signingkeys.py
new file mode 100644
index 0000000..f25d5c2
--- /dev/null
+++ b/lib/lp/archivepublisher/scripts/sync_signingkeys.py
@@ -0,0 +1,132 @@
+# Copyright 2020 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Script to inject archive keys into signing service."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+
+
+__all__ = [
+    'SyncSigningKeysScript',
+    ]
+
+from datetime import datetime
+import os
+
+from pytz import utc
+from zope.component import getUtility
+
+from lp.archivepublisher.config import getPubConfig
+from lp.archivepublisher.model.publisherconfig import PublisherConfig
+from lp.services.database.interfaces import IStore
+from lp.services.scripts.base import LaunchpadScript
+from lp.services.signing.enums import SigningKeyType
+from lp.services.signing.interfaces.signingkey import IArchiveSigningKeySet
+from lp.soyuz.model.archive import Archive
+
+
+class SyncSigningKeysScript(LaunchpadScript):
+    description = (
+        "Injects into signing services all key files currently in this "
+        "machine.")
+
+    def add_my_options(self):
+        self.parser.add_option(
+            "-l", "--limit", dest="limit", type=int,
+            help="How many archives to fetch.")
+
+        self.parser.add_option(
+            "-o", "--offset", dest="offset", type=int,
+            help="Offset on archives list.")
+
+    def getArchives(self):
+        """Gets the list of archives that should be processed."""
+        archives = IStore(Archive).find(
+            Archive,
+            PublisherConfig.distribution_id == Archive.distributionID)
+        archives = archives.order_by(Archive.id)
+        start = self.options.offset if self.options.offset else 0
+        end = start + self.options.limit if self.options.limit else None
+        return archives[start:end]
+
+    def getKeysPerType(self, dir):
+        """Returns the existing key files per type in the given directory.
+
+        :param dir: The directory path to scan for keys
+        :return: A dict where keys are SigningKeyTypes and the value is a
+                 tuple of (key, cert) files names."""
+        keys_per_type = {
+            SigningKeyType.UEFI: ("uefi.key", "uefi.crt"),
+            SigningKeyType.KMOD: ("kmod.pem", "kmod.x509"),
+            SigningKeyType.OPAL: ("opal.pem", "opal.x509"),
+            SigningKeyType.SIPL: ("sipl.pem", "sipl.x509"),
+            SigningKeyType.FIT: ("fit.key", "fit.crt"),
+        }
+        for key_type in SigningKeyType.items:
+            files = [os.path.join(dir, f) for f in keys_per_type[key_type]]
+            if not all(os.path.exists(f) for f in files):
+                del keys_per_type[key_type]
+                continue
+        return keys_per_type
+
+    def getSeriesPaths(self, archive):
+        """Returns the directory of each series containing signing keys.
+
+        :param archive: The Archive object to search for signing keys.
+        :return: A dict where keys are DistroSeries objects (or None for the
+                 archive's root signing) and the values are the directories
+                 where the keys for that series are stored."""
+        series_paths = {}
+        pubconf = getPubConfig(archive)
+        if pubconf is None or pubconf.signingroot is None:
+            self.logger.info(
+                "Skipping %s: no pubconfig or no signing root." %
+                archive.reference)
+            return {}
+        for series in archive.distribution.series:
+            path = os.path.join(pubconf.signingroot, series.name)
+            if os.path.exists(path):
+                series_paths[series] = path
+        if os.path.exists(pubconf.signingroot):
+            series_paths[None] = pubconf.signingroot
+        return series_paths
+
+    def inject(self, archive, key_type, series, priv_key_path, pub_key_path):
+        arch_signing_key_set = getUtility(IArchiveSigningKeySet)
+        existing_signing_key = arch_signing_key_set.getSigningKey(
+            key_type, archive, series, exact_match=True)
+        if existing_signing_key is not None:
+            self.logger.info("Signing key for %s / %s / %s already exists",
+                             key_type, archive.reference, series.name)
+            return existing_signing_key
+
+        with open(priv_key_path, 'rb') as fd:
+            private_key = fd.read()
+        with open(pub_key_path, 'rb') as fd:
+            public_key = fd.read()
+
+        now = datetime.now().replace(tzinfo=utc)
+        description = u"%s key for %s" % (key_type.name, archive.reference)
+        return arch_signing_key_set.inject(
+            key_type, private_key, public_key,
+            description, now, archive,
+            earliest_distro_series=series)
+
+    def processArchive(self, archive):
+        for series, path in self.getSeriesPaths(archive).items():
+            keys_per_type = self.getKeysPerType(path)
+            for key_type, (priv_key, pub_key) in keys_per_type.items():
+                self.logger.info(
+                    "Found key files %s / %s (type=%s, series=%s)." %
+                    (priv_key, pub_key, key_type,
+                     series.name if series else None))
+                self.inject(archive, key_type, series, priv_key, pub_key)
+
+    def main(self):
+        for i, archive in enumerate(self.getArchives()):
+            self.logger.info(
+                "#%s - Processing keys for archive %s.", i, archive.reference)
+            self.processArchive(archive)
+        self.logger.info("Finished processing archives injections.")
diff --git a/lib/lp/archivepublisher/tests/test_sync_signingkeys.py b/lib/lp/archivepublisher/tests/test_sync_signingkeys.py
new file mode 100644
index 0000000..3a01658
--- /dev/null
+++ b/lib/lp/archivepublisher/tests/test_sync_signingkeys.py
@@ -0,0 +1,267 @@
+# Copyright 2020 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Test cases for the script that injects signing keys into signing service."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+
+
+__all__ = [
+    'SyncSigningKeysScript',
+    ]
+
+from datetime import datetime
+import os
+
+from fixtures import (
+    MockPatch,
+    TempDir,
+    )
+from pytz import utc
+from testtools.matchers import (
+    Equals,
+    MatchesDict,
+    MatchesStructure,
+    )
+
+from lp.archivepublisher.model.publisherconfig import PublisherConfig
+from lp.archivepublisher.scripts.sync_signingkeys import SyncSigningKeysScript
+from lp.services.compat import mock
+from lp.services.database.interfaces import IStore
+from lp.services.log.logger import BufferLogger
+from lp.services.signing.enums import SigningKeyType
+from lp.services.signing.tests.helpers import SigningServiceClientFixture
+from lp.soyuz.model.archive import Archive
+from lp.testing import TestCaseWithFactory
+from lp.testing.layers import ZopelessDatabaseLayer
+
+
+class TestSyncSigningKeysScript(TestCaseWithFactory):
+    layer = ZopelessDatabaseLayer
+
+    def setUp(self):
+        super(TestSyncSigningKeysScript, self).setUp()
+        self.signing_root_dir = self.useFixture(TempDir()).path
+        self.pushConfig(
+            "personalpackagearchive", signing_keys_root=self.signing_root_dir)
+
+    def makeScript(self, test_args):
+        script = SyncSigningKeysScript("test-sync", test_args=test_args)
+        script.logger = BufferLogger()
+        return script
+
+    def makeArchives(self):
+        for i in range(10):
+            self.factory.makeArchive()
+        conditions = PublisherConfig.distribution_id == Archive.distributionID
+        return IStore(Archive).find(Archive, conditions).order_by(Archive.id)
+
+    def makeArchiveSigningDir(self, ppa, series=None):
+        """Creates the directory tree to hold signing keys for the PPA
+        and specific list of DistroSeries provided.
+
+        :param archive: The Archive that will hold the keys (should be PPA)
+        :param series: A list of DistroSeries
+        :return: A dict with series as keys (and None for the root archive)
+                 and the values being the directory where the keys should be.
+        """
+        archive_root = os.path.join(
+            self.signing_root_dir, "signing", ppa.owner.name, ppa.name)
+        os.makedirs(archive_root)
+
+        ret = {None: archive_root}
+        for series in (series or []):
+            path = os.path.join(archive_root, series.name)
+            ret[series] = path
+            os.makedirs(path)
+        return ret
+
+    def test_fetch_archives_without_limit_and_offset(self):
+        script = self.makeScript([])
+        all_archives = list(self.makeArchives())
+        archives = list(script.getArchives())
+        self.assertEqual(all_archives, archives)
+
+    def test_fetch_archives_with_limit_and_offset(self):
+        script = self.makeScript([
+            "--limit", "3",
+            "--offset", "2"
+        ])
+        all_archives = list(self.makeArchives())
+        archives = list(script.getArchives())
+        self.assertEqual(all_archives[2:5], archives)
+
+    def test_get_keys_per_type(self):
+        keys_dir = self.signing_root_dir
+
+        # Create fake uefi keys, and missing opal pem
+        for filename in ["uefi.key", "uefi.crt", "opal.x509"]:
+            with open(os.path.join(keys_dir, filename), 'wb') as fd:
+                fd.write(b"something something")
+
+        script = self.makeScript([])
+        self.assertThat(script.getKeysPerType(keys_dir), MatchesDict({
+            SigningKeyType.UEFI: Equals(("uefi.key", "uefi.crt"))
+        }))
+
+    def test_get_series_paths(self):
+        distro = self.factory.makeDistribution()
+        series1 = self.factory.makeDistroSeries(distribution=distro)
+        series2 = self.factory.makeDistroSeries(distribution=distro)
+        # For this series, we will not create the keys directory.
+        self.factory.makeDistroSeries(distribution=distro)
+
+        archive = self.factory.makeArchive(distribution=distro)
+        key_dirs = self.makeArchiveSigningDir(archive, [series1, series2])
+        archive_root = key_dirs[None]
+
+        script = self.makeScript([])
+        self.assertThat(script.getSeriesPaths(archive), MatchesDict({
+            series1: Equals(os.path.join(archive_root, series1.name)),
+            series2: Equals(os.path.join(archive_root, series2.name)),
+            None: Equals(archive_root)
+        }))
+
+    def test_process_archive(self):
+        distro = self.factory.makeDistribution()
+        series1 = self.factory.makeDistroSeries(distribution=distro)
+        series2 = self.factory.makeDistroSeries(distribution=distro)
+
+        archive = self.factory.makeArchive(distribution=distro)
+        key_dirs = self.makeArchiveSigningDir(archive, [series1, series2])
+
+        archive_root = key_dirs[None]
+
+        # Create fake uefi keys for the root
+        for filename in ["uefi.key", "uefi.crt"]:
+            with open(os.path.join(archive_root, filename), 'wb') as fd:
+                fd.write(b"Root %s" % filename)
+
+        # Create fake opal keys for series1
+        for filename in ["opal.pem", "opal.x509", "kmod.pem", "kmod.x509"]:
+            with open(os.path.join(key_dirs[series1], filename), 'wb') as fd:
+                fd.write(b"Series 1 %s" % filename)
+
+        script = self.makeScript([])
+        script.getArchives = mock.Mock(return_value=[archive])
+        script.inject = mock.Mock()
+        script.main()
+
+        self.assertItemsEqual([
+            mock.call(archive, SigningKeyType.KMOD, series1, "kmod.pem",
+                      "kmod.x509"),
+            mock.call(archive, SigningKeyType.OPAL, series1, "opal.pem",
+                      "opal.x509"),
+            mock.call(archive, SigningKeyType.UEFI, None, "uefi.key",
+                      "uefi.crt")],
+            script.inject.call_args_list)
+
+        # Check the log messages.
+        content = script.logger.content.as_text()
+        self.assertIn(
+            "INFO #0 - Processing keys for archive %s." % archive.reference,
+            content)
+
+        tpl = "INFO Found key files %s / %s (type=%s, series=%s)."
+        self.assertIn(
+            tpl % ("kmod.pem", "kmod.x509", SigningKeyType.KMOD, series1.name),
+            content)
+        self.assertIn(
+            tpl % ("opal.pem", "opal.x509", SigningKeyType.OPAL, series1.name),
+            content)
+        self.assertIn(
+            tpl % ("uefi.key", "uefi.crt", SigningKeyType.UEFI, None),
+            content)
+
+    def test_inject(self):
+        signing_service_client = self.useFixture(
+            SigningServiceClientFixture(self.factory))
+
+        now = datetime.now()
+        mock_datetime = self.useFixture(MockPatch(
+            'lp.archivepublisher.scripts.sync_signingkeys.datetime')).mock
+        mock_datetime.now = lambda: now
+
+        tmpdir = self.useFixture(TempDir()).path
+        priv_key_path = os.path.join(tmpdir, "priv.key")
+        pub_key_path = os.path.join(tmpdir, "pub.crt")
+
+        with open(priv_key_path, 'wb') as fd:
+            fd.write(b"Private key content")
+        with open(pub_key_path, 'wb') as fd:
+            fd.write(b"Public key content")
+
+        distro = self.factory.makeDistribution()
+        series = self.factory.makeDistroSeries(distribution=distro)
+        archive = self.factory.makeArchive(distribution=distro)
+
+        script = self.makeScript([])
+
+        result_with_series = script.inject(
+            archive, SigningKeyType.UEFI, series, priv_key_path, pub_key_path)
+
+        self.assertThat(result_with_series, MatchesStructure.byEquality(
+            archive=archive,
+            earliest_distro_series=series,
+            key_type=SigningKeyType.UEFI))
+        self.assertThat(
+            result_with_series.signing_key, MatchesStructure.byEquality(
+                key_type=SigningKeyType.UEFI,
+                public_key=b"Public key content"))
+
+        # Check if we called lp-signing's /inject endpoint correctly
+        self.assertEqual(1, signing_service_client.inject.call_count)
+        self.assertEqual(
+            (SigningKeyType.UEFI, b"Private key content",
+             b"Public key content",
+             u"UEFI key for %s" % archive.reference, now.replace(tzinfo=utc)),
+            signing_service_client.inject.call_args[0])
+
+        result_no_series = script.inject(
+            archive, SigningKeyType.UEFI, None, priv_key_path, pub_key_path)
+
+        self.assertThat(result_no_series, MatchesStructure.byEquality(
+            archive=archive,
+            earliest_distro_series=None,
+            key_type=SigningKeyType.UEFI))
+        self.assertThat(
+            result_no_series.signing_key, MatchesStructure.byEquality(
+                key_type=SigningKeyType.UEFI,
+                public_key=b"Public key content"))
+
+        # Check again lp-signing's /inject endpoint call
+        self.assertEqual(2, signing_service_client.inject.call_count)
+        self.assertEqual(
+            (SigningKeyType.UEFI, b"Private key content",
+             b"Public key content",
+             u"UEFI key for %s" % archive.reference, now.replace(tzinfo=utc)),
+            signing_service_client.inject.call_args[0])
+
+    def test_inject_existing_key(self):
+        distro = self.factory.makeDistribution()
+        series = self.factory.makeDistroSeries(distribution=distro)
+        archive = self.factory.makeArchive(distribution=distro)
+
+        tmpdir = self.useFixture(TempDir()).path
+        priv_key_path = os.path.join(tmpdir, "priv.key")
+        pub_key_path = os.path.join(tmpdir, "pub.crt")
+        with open(priv_key_path, 'wb') as fd:
+            fd.write(b"Private key content")
+        with open(pub_key_path, 'wb') as fd:
+            fd.write(b"Public key content")
+
+        expected_arch_signing_key = self.factory.makeArchiveSigningKey(
+            archive=archive, distro_series=series)
+        key_type = expected_arch_signing_key.key_type
+
+        script = self.makeScript([])
+        got_arch_key = script.inject(
+            archive, key_type, series, priv_key_path, pub_key_path)
+        self.assertEqual(expected_arch_signing_key, got_arch_key)
+
+        self.assertIn(
+            "Signing key for %s / %s / %s already exists" %
+            (key_type, archive.reference, series.name),
+            script.logger.content.as_text())
diff --git a/scripts/sync-signingkeys.py b/scripts/sync-signingkeys.py
new file mode 100755
index 0000000..59c5efb
--- /dev/null
+++ b/scripts/sync-signingkeys.py
@@ -0,0 +1,12 @@
+#!/usr/bin/python2 -S
+# Copyright 2020 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Script to inject archive keys into signing service."""
+import _pythonpath
+from lp.archivepublisher.scripts.sync_signingkeys import SyncSigningKeysScript
+
+if __name__ == '__main__':
+    script = SyncSigningKeysScript(
+        'lp.archivepublisher.scripts.sync_signingkeys')
+    script.lock_and_run()