launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #24625
[Merge] ~pappacena/launchpad:bulk-inject-lp-signing into launchpad:master
Thiago F. Pappacena has proposed merging ~pappacena/launchpad:bulk-inject-lp-signing into launchpad:master with ~pappacena/launchpad:inject-lp-signing-generated-keys as a prerequisite.
Commit message:
Adding the command to bulk import to signing service the existing signing keys for all archives.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~pappacena/launchpad/+git/launchpad/+merge/382948
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/launchpad:bulk-inject-lp-signing into launchpad:master.
diff --git a/lib/lp/archivepublisher/scripts/sync_signingkeys.py b/lib/lp/archivepublisher/scripts/sync_signingkeys.py
new file mode 100644
index 0000000..f25d5c2
--- /dev/null
+++ b/lib/lp/archivepublisher/scripts/sync_signingkeys.py
@@ -0,0 +1,132 @@
+# Copyright 2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Script to inject archive keys into signing service."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+
+
+__all__ = [
+ 'SyncSigningKeysScript',
+ ]
+
+from datetime import datetime
+import os
+
+from pytz import utc
+from zope.component import getUtility
+
+from lp.archivepublisher.config import getPubConfig
+from lp.archivepublisher.model.publisherconfig import PublisherConfig
+from lp.services.database.interfaces import IStore
+from lp.services.scripts.base import LaunchpadScript
+from lp.services.signing.enums import SigningKeyType
+from lp.services.signing.interfaces.signingkey import IArchiveSigningKeySet
+from lp.soyuz.model.archive import Archive
+
+
+class SyncSigningKeysScript(LaunchpadScript):
+ description = (
+ "Injects into signing services all key files currently in this "
+ "machine.")
+
+ def add_my_options(self):
+ self.parser.add_option(
+ "-l", "--limit", dest="limit", type=int,
+ help="How many archives to fetch.")
+
+ self.parser.add_option(
+ "-o", "--offset", dest="offset", type=int,
+ help="Offset on archives list.")
+
+ def getArchives(self):
+ """Gets the list of archives that should be processed."""
+ archives = IStore(Archive).find(
+ Archive,
+ PublisherConfig.distribution_id == Archive.distributionID)
+ archives = archives.order_by(Archive.id)
+ start = self.options.offset if self.options.offset else 0
+ end = start + self.options.limit if self.options.limit else None
+ return archives[start:end]
+
+ def getKeysPerType(self, dir):
+ """Returns the existing key files per type in the given directory.
+
+ :param dir: The directory path to scan for keys
+ :return: A dict where keys are SigningKeyTypes and the value is a
+ tuple of (key, cert) files names."""
+ keys_per_type = {
+ SigningKeyType.UEFI: ("uefi.key", "uefi.crt"),
+ SigningKeyType.KMOD: ("kmod.pem", "kmod.x509"),
+ SigningKeyType.OPAL: ("opal.pem", "opal.x509"),
+ SigningKeyType.SIPL: ("sipl.pem", "sipl.x509"),
+ SigningKeyType.FIT: ("fit.key", "fit.crt"),
+ }
+ for key_type in SigningKeyType.items:
+ files = [os.path.join(dir, f) for f in keys_per_type[key_type]]
+ if not all(os.path.exists(f) for f in files):
+ del keys_per_type[key_type]
+ continue
+ return keys_per_type
+
+ def getSeriesPaths(self, archive):
+ """Returns the directory of each series containing signing keys.
+
+ :param archive: The Archive object to search for signing keys.
+ :return: A dict where keys are DistroSeries objects (or None for the
+ archive's root signing) and the values are the directories
+ where the keys for that series are stored."""
+ series_paths = {}
+ pubconf = getPubConfig(archive)
+ if pubconf is None or pubconf.signingroot is None:
+ self.logger.info(
+ "Skipping %s: no pubconfig or no signing root." %
+ archive.reference)
+ return {}
+ for series in archive.distribution.series:
+ path = os.path.join(pubconf.signingroot, series.name)
+ if os.path.exists(path):
+ series_paths[series] = path
+ if os.path.exists(pubconf.signingroot):
+ series_paths[None] = pubconf.signingroot
+ return series_paths
+
+ def inject(self, archive, key_type, series, priv_key_path, pub_key_path):
+ arch_signing_key_set = getUtility(IArchiveSigningKeySet)
+ existing_signing_key = arch_signing_key_set.getSigningKey(
+ key_type, archive, series, exact_match=True)
+ if existing_signing_key is not None:
+ self.logger.info("Signing key for %s / %s / %s already exists",
+ key_type, archive.reference, series.name)
+ return existing_signing_key
+
+ with open(priv_key_path, 'rb') as fd:
+ private_key = fd.read()
+ with open(pub_key_path, 'rb') as fd:
+ public_key = fd.read()
+
+ now = datetime.now().replace(tzinfo=utc)
+ description = u"%s key for %s" % (key_type.name, archive.reference)
+ return arch_signing_key_set.inject(
+ key_type, private_key, public_key,
+ description, now, archive,
+ earliest_distro_series=series)
+
+ def processArchive(self, archive):
+ for series, path in self.getSeriesPaths(archive).items():
+ keys_per_type = self.getKeysPerType(path)
+ for key_type, (priv_key, pub_key) in keys_per_type.items():
+ self.logger.info(
+ "Found key files %s / %s (type=%s, series=%s)." %
+ (priv_key, pub_key, key_type,
+ series.name if series else None))
+ self.inject(archive, key_type, series, priv_key, pub_key)
+
+ def main(self):
+ for i, archive in enumerate(self.getArchives()):
+ self.logger.info(
+ "#%s - Processing keys for archive %s.", i, archive.reference)
+ self.processArchive(archive)
+ self.logger.info("Finished processing archives injections.")
diff --git a/lib/lp/archivepublisher/tests/test_sync_signingkeys.py b/lib/lp/archivepublisher/tests/test_sync_signingkeys.py
new file mode 100644
index 0000000..3a01658
--- /dev/null
+++ b/lib/lp/archivepublisher/tests/test_sync_signingkeys.py
@@ -0,0 +1,267 @@
+# Copyright 2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Test cases for the script that injects signing keys into signing service."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+
+
+__all__ = [
+ 'SyncSigningKeysScript',
+ ]
+
+from datetime import datetime
+import os
+
+from fixtures import (
+ MockPatch,
+ TempDir,
+ )
+from pytz import utc
+from testtools.matchers import (
+ Equals,
+ MatchesDict,
+ MatchesStructure,
+ )
+
+from lp.archivepublisher.model.publisherconfig import PublisherConfig
+from lp.archivepublisher.scripts.sync_signingkeys import SyncSigningKeysScript
+from lp.services.compat import mock
+from lp.services.database.interfaces import IStore
+from lp.services.log.logger import BufferLogger
+from lp.services.signing.enums import SigningKeyType
+from lp.services.signing.tests.helpers import SigningServiceClientFixture
+from lp.soyuz.model.archive import Archive
+from lp.testing import TestCaseWithFactory
+from lp.testing.layers import ZopelessDatabaseLayer
+
+
+class TestSyncSigningKeysScript(TestCaseWithFactory):
+ layer = ZopelessDatabaseLayer
+
+ def setUp(self):
+ super(TestSyncSigningKeysScript, self).setUp()
+ self.signing_root_dir = self.useFixture(TempDir()).path
+ self.pushConfig(
+ "personalpackagearchive", signing_keys_root=self.signing_root_dir)
+
+ def makeScript(self, test_args):
+ script = SyncSigningKeysScript("test-sync", test_args=test_args)
+ script.logger = BufferLogger()
+ return script
+
+ def makeArchives(self):
+ for i in range(10):
+ self.factory.makeArchive()
+ conditions = PublisherConfig.distribution_id == Archive.distributionID
+ return IStore(Archive).find(Archive, conditions).order_by(Archive.id)
+
+ def makeArchiveSigningDir(self, ppa, series=None):
+ """Creates the directory tree to hold signing keys for the PPA
+ and specific list of DistroSeries provided.
+
+ :param archive: The Archive that will hold the keys (should be PPA)
+ :param series: A list of DistroSeries
+ :return: A dict with series as keys (and None for the root archive)
+ and the values being the directory where the keys should be.
+ """
+ archive_root = os.path.join(
+ self.signing_root_dir, "signing", ppa.owner.name, ppa.name)
+ os.makedirs(archive_root)
+
+ ret = {None: archive_root}
+ for series in (series or []):
+ path = os.path.join(archive_root, series.name)
+ ret[series] = path
+ os.makedirs(path)
+ return ret
+
+ def test_fetch_archives_without_limit_and_offset(self):
+ script = self.makeScript([])
+ all_archives = list(self.makeArchives())
+ archives = list(script.getArchives())
+ self.assertEqual(all_archives, archives)
+
+ def test_fetch_archives_with_limit_and_offset(self):
+ script = self.makeScript([
+ "--limit", "3",
+ "--offset", "2"
+ ])
+ all_archives = list(self.makeArchives())
+ archives = list(script.getArchives())
+ self.assertEqual(all_archives[2:5], archives)
+
+ def test_get_keys_per_type(self):
+ keys_dir = self.signing_root_dir
+
+ # Create fake uefi keys, and missing opal pem
+ for filename in ["uefi.key", "uefi.crt", "opal.x509"]:
+ with open(os.path.join(keys_dir, filename), 'wb') as fd:
+ fd.write(b"something something")
+
+ script = self.makeScript([])
+ self.assertThat(script.getKeysPerType(keys_dir), MatchesDict({
+ SigningKeyType.UEFI: Equals(("uefi.key", "uefi.crt"))
+ }))
+
+ def test_get_series_paths(self):
+ distro = self.factory.makeDistribution()
+ series1 = self.factory.makeDistroSeries(distribution=distro)
+ series2 = self.factory.makeDistroSeries(distribution=distro)
+ # For this series, we will not create the keys directory.
+ self.factory.makeDistroSeries(distribution=distro)
+
+ archive = self.factory.makeArchive(distribution=distro)
+ key_dirs = self.makeArchiveSigningDir(archive, [series1, series2])
+ archive_root = key_dirs[None]
+
+ script = self.makeScript([])
+ self.assertThat(script.getSeriesPaths(archive), MatchesDict({
+ series1: Equals(os.path.join(archive_root, series1.name)),
+ series2: Equals(os.path.join(archive_root, series2.name)),
+ None: Equals(archive_root)
+ }))
+
+ def test_process_archive(self):
+ distro = self.factory.makeDistribution()
+ series1 = self.factory.makeDistroSeries(distribution=distro)
+ series2 = self.factory.makeDistroSeries(distribution=distro)
+
+ archive = self.factory.makeArchive(distribution=distro)
+ key_dirs = self.makeArchiveSigningDir(archive, [series1, series2])
+
+ archive_root = key_dirs[None]
+
+ # Create fake uefi keys for the root
+ for filename in ["uefi.key", "uefi.crt"]:
+ with open(os.path.join(archive_root, filename), 'wb') as fd:
+ fd.write(b"Root %s" % filename)
+
+ # Create fake opal keys for series1
+ for filename in ["opal.pem", "opal.x509", "kmod.pem", "kmod.x509"]:
+ with open(os.path.join(key_dirs[series1], filename), 'wb') as fd:
+ fd.write(b"Series 1 %s" % filename)
+
+ script = self.makeScript([])
+ script.getArchives = mock.Mock(return_value=[archive])
+ script.inject = mock.Mock()
+ script.main()
+
+ self.assertItemsEqual([
+ mock.call(archive, SigningKeyType.KMOD, series1, "kmod.pem",
+ "kmod.x509"),
+ mock.call(archive, SigningKeyType.OPAL, series1, "opal.pem",
+ "opal.x509"),
+ mock.call(archive, SigningKeyType.UEFI, None, "uefi.key",
+ "uefi.crt")],
+ script.inject.call_args_list)
+
+ # Check the log messages.
+ content = script.logger.content.as_text()
+ self.assertIn(
+ "INFO #0 - Processing keys for archive %s." % archive.reference,
+ content)
+
+ tpl = "INFO Found key files %s / %s (type=%s, series=%s)."
+ self.assertIn(
+ tpl % ("kmod.pem", "kmod.x509", SigningKeyType.KMOD, series1.name),
+ content)
+ self.assertIn(
+ tpl % ("opal.pem", "opal.x509", SigningKeyType.OPAL, series1.name),
+ content)
+ self.assertIn(
+ tpl % ("uefi.key", "uefi.crt", SigningKeyType.UEFI, None),
+ content)
+
+ def test_inject(self):
+ signing_service_client = self.useFixture(
+ SigningServiceClientFixture(self.factory))
+
+ now = datetime.now()
+ mock_datetime = self.useFixture(MockPatch(
+ 'lp.archivepublisher.scripts.sync_signingkeys.datetime')).mock
+ mock_datetime.now = lambda: now
+
+ tmpdir = self.useFixture(TempDir()).path
+ priv_key_path = os.path.join(tmpdir, "priv.key")
+ pub_key_path = os.path.join(tmpdir, "pub.crt")
+
+ with open(priv_key_path, 'wb') as fd:
+ fd.write(b"Private key content")
+ with open(pub_key_path, 'wb') as fd:
+ fd.write(b"Public key content")
+
+ distro = self.factory.makeDistribution()
+ series = self.factory.makeDistroSeries(distribution=distro)
+ archive = self.factory.makeArchive(distribution=distro)
+
+ script = self.makeScript([])
+
+ result_with_series = script.inject(
+ archive, SigningKeyType.UEFI, series, priv_key_path, pub_key_path)
+
+ self.assertThat(result_with_series, MatchesStructure.byEquality(
+ archive=archive,
+ earliest_distro_series=series,
+ key_type=SigningKeyType.UEFI))
+ self.assertThat(
+ result_with_series.signing_key, MatchesStructure.byEquality(
+ key_type=SigningKeyType.UEFI,
+ public_key=b"Public key content"))
+
+ # Check if we called lp-signing's /inject endpoint correctly
+ self.assertEqual(1, signing_service_client.inject.call_count)
+ self.assertEqual(
+ (SigningKeyType.UEFI, b"Private key content",
+ b"Public key content",
+ u"UEFI key for %s" % archive.reference, now.replace(tzinfo=utc)),
+ signing_service_client.inject.call_args[0])
+
+ result_no_series = script.inject(
+ archive, SigningKeyType.UEFI, None, priv_key_path, pub_key_path)
+
+ self.assertThat(result_no_series, MatchesStructure.byEquality(
+ archive=archive,
+ earliest_distro_series=None,
+ key_type=SigningKeyType.UEFI))
+ self.assertThat(
+ result_no_series.signing_key, MatchesStructure.byEquality(
+ key_type=SigningKeyType.UEFI,
+ public_key=b"Public key content"))
+
+ # Check again lp-signing's /inject endpoint call
+ self.assertEqual(2, signing_service_client.inject.call_count)
+ self.assertEqual(
+ (SigningKeyType.UEFI, b"Private key content",
+ b"Public key content",
+ u"UEFI key for %s" % archive.reference, now.replace(tzinfo=utc)),
+ signing_service_client.inject.call_args[0])
+
+ def test_inject_existing_key(self):
+ distro = self.factory.makeDistribution()
+ series = self.factory.makeDistroSeries(distribution=distro)
+ archive = self.factory.makeArchive(distribution=distro)
+
+ tmpdir = self.useFixture(TempDir()).path
+ priv_key_path = os.path.join(tmpdir, "priv.key")
+ pub_key_path = os.path.join(tmpdir, "pub.crt")
+ with open(priv_key_path, 'wb') as fd:
+ fd.write(b"Private key content")
+ with open(pub_key_path, 'wb') as fd:
+ fd.write(b"Public key content")
+
+ expected_arch_signing_key = self.factory.makeArchiveSigningKey(
+ archive=archive, distro_series=series)
+ key_type = expected_arch_signing_key.key_type
+
+ script = self.makeScript([])
+ got_arch_key = script.inject(
+ archive, key_type, series, priv_key_path, pub_key_path)
+ self.assertEqual(expected_arch_signing_key, got_arch_key)
+
+ self.assertIn(
+ "Signing key for %s / %s / %s already exists" %
+ (key_type, archive.reference, series.name),
+ script.logger.content.as_text())
diff --git a/scripts/sync-signingkeys.py b/scripts/sync-signingkeys.py
new file mode 100755
index 0000000..59c5efb
--- /dev/null
+++ b/scripts/sync-signingkeys.py
@@ -0,0 +1,12 @@
+#!/usr/bin/python2 -S
+# Copyright 2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Script to inject archive keys into signing service."""
+import _pythonpath
+from lp.archivepublisher.scripts.sync_signingkeys import SyncSigningKeysScript
+
+if __name__ == '__main__':
+ script = SyncSigningKeysScript(
+ 'lp.archivepublisher.scripts.sync_signingkeys')
+ script.lock_and_run()