launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #31224
[Merge] ~lgp171188/launchpad:add-script-inject-extra-gpg-signing-key into launchpad:master
Guruprasad has proposed merging ~lgp171188/launchpad:add-script-inject-extra-gpg-signing-key into launchpad:master.
Commit message:
Add a script to inject an extra signing key to an archive
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~lgp171188/launchpad/+git/launchpad/+merge/469914
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~lgp171188/launchpad:add-script-inject-extra-gpg-signing-key into launchpad:master.
diff --git a/lib/lp/archivepublisher/scripts/inject_extra_gpg_signing_key.py b/lib/lp/archivepublisher/scripts/inject_extra_gpg_signing_key.py
new file mode 100644
index 0000000..da677cc
--- /dev/null
+++ b/lib/lp/archivepublisher/scripts/inject_extra_gpg_signing_key.py
@@ -0,0 +1,192 @@
+# Copyright 2024 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Script to inject an extra archive GPG signing key into signing service."""
+
+__all__ = [
+ "InjectExtraGPGSigningKeyScript",
+]
+
+import os
+from datetime import datetime, timezone
+
+from zope.component import getUtility
+
+from lp.services.gpg.interfaces import IGPGHandler
+from lp.services.scripts.base import LaunchpadScript, LaunchpadScriptFailure
+from lp.services.signing.enums import SigningKeyType
+from lp.services.signing.interfaces.signingkey import (
+ IArchiveSigningKeySet,
+ ISigningKeySet,
+)
+from lp.soyuz.interfaces.archive import IArchiveSet
+
+
+class InjectExtraGPGSigningKeyScript(LaunchpadScript):
+ description = (
+ "Injects an extra GPG signing key in this machine for the "
+ "specified archive into the signing service."
+ )
+
+ def add_my_options(self):
+ self.parser.add_option(
+ "-A",
+ "--archive",
+ help=(
+ "The reference of the archive to process "
+ "Format: ~user/distribution/archive-name. Example: "
+ "~user/ubuntu/ppa."
+ ),
+ )
+ self.parser.add_option(
+ "-l",
+ "--local-keys-directory",
+ help="The local directory where keys are found.",
+ )
+ self.parser.add_option(
+ "-f",
+ "--fingerprint",
+ help=(
+ "The fingerprint of the GPG key to inject for "
+ "the specified archive."
+ ),
+ )
+ self.parser.add_option(
+ "-n",
+ "--dry-run",
+ action="store_true",
+ default=False,
+ help=(
+ "Report what would be done, but don't actually "
+ "inject the key."
+ ),
+ )
+
+ def getArchive(self):
+ """Get the archive for the given archive reference."""
+ if self.options.archive:
+ archive = getUtility(IArchiveSet).getByReference(
+ self.options.archive
+ )
+ if archive is None:
+ raise LaunchpadScriptFailure(
+ f"Archive '{self.options.archive}' could not be found."
+ )
+ return archive
+
+ def injectGPG(self, archive, secret_key_path):
+ """Inject the secret key at the given path into the signing service."""
+ with open(secret_key_path, "rb") as key_file:
+ secret_key_export = key_file.read()
+ gpg_handler = getUtility(IGPGHandler)
+ secret_key = gpg_handler.importSecretKey(secret_key_export)
+ signing_key_set = getUtility(ISigningKeySet)
+
+ if self.options.dry_run:
+ self.logger.info(
+ "Would inject signing key with fingerprint '%s' for '%s',",
+ SigningKeyType.OPENPGP,
+ archive.reference,
+ )
+ else:
+ public_key = gpg_handler.retrieveKey(secret_key.fingerprint)
+ now = datetime.now().replace(tzinfo=timezone.utc)
+ signing_key = signing_key_set.inject(
+ SigningKeyType.OPENPGP,
+ secret_key.export(),
+ public_key.export(),
+ secret_key.uids[0].name,
+ now,
+ )
+ self.logger.info("Injected signing key into the signing service.")
+ getUtility(IArchiveSigningKeySet).create(
+ archive, None, signing_key
+ )
+ self.logger.info(
+ "Associated the signing key with archive '%s'.",
+ archive.reference,
+ )
+ return signing_key
+
+ def getSigningKey(self, fingerprint):
+ return (
+ getUtility(ISigningKeySet).get(SigningKeyType.OPENPGP, fingerprint)
+ or None
+ )
+
+ def isSigningKeyAssociatedWithArchive(self, archive, fingerprint):
+ return bool(
+ getUtility(IArchiveSigningKeySet).getByArchiveAndFingerprint(
+ archive, fingerprint
+ )
+ )
+
+ def processArchive(self, archive):
+ fingerprint = self.options.fingerprint
+ existing_signing_key = self.getSigningKey(fingerprint)
+ if existing_signing_key:
+ self.logger.error(
+ "Signing key with fingerprint '%s' exists already.",
+ fingerprint,
+ )
+ if not self.isSigningKeyAssociatedWithArchive(
+ archive, fingerprint
+ ):
+ self.logger.error(
+ "Signing key with fingerprint '%s' not associated "
+ "with the archive '%s'. Adding the association.",
+ fingerprint,
+ archive.reference,
+ )
+ getUtility(IArchiveSigningKeySet).create(
+ archive, None, existing_signing_key
+ )
+ self.logger.error(
+ "Aborting key injection into the signing service."
+ )
+ return
+ secret_key_path = os.path.join(
+ self.options.local_keys_directory, f"{fingerprint}.gpg"
+ )
+ if not os.path.exists(secret_key_path):
+ self.logger.error(
+ "Could not find key file at '%s'.", secret_key_path
+ )
+ return
+ else:
+ self.logger.info("Found key file at '%s'.", secret_key_path)
+ self.injectGPG(archive, secret_key_path)
+
+ def _validateOptions(self):
+ if not self.options.archive:
+ raise LaunchpadScriptFailure("Specify an archive.")
+ if not self.options.local_keys_directory:
+ raise LaunchpadScriptFailure(
+ "Specify the directory containing the private keys."
+ )
+ if not self.options.fingerprint:
+ raise LaunchpadScriptFailure(
+ "Specify the fingerprint of the GPG key to inject."
+ )
+
+ def main(self):
+ self._validateOptions()
+ archive = self.getArchive()
+ if not archive.signing_key_fingerprint:
+ self.logger.error(
+ "Archive '%s' does not have a signing key generated "
+ "by Launchpad yet. Cannot inject an extra key without "
+ "an existing key. Aborting.",
+ archive.reference,
+ )
+ return
+ self.logger.debug("Processing keys for archive %s.", archive.reference)
+ self.processArchive(archive)
+ if self.options.dry_run:
+ self.logger.info(
+ "Aborting the transaction since this is a dry run."
+ )
+ self.txn.abort()
+ else:
+ self.logger.info("Archive processed; committing.")
+ self.txn.commit()
diff --git a/lib/lp/archivepublisher/tests/test_inject_extra_gpg_signing_key.py b/lib/lp/archivepublisher/tests/test_inject_extra_gpg_signing_key.py
new file mode 100644
index 0000000..ddb3922
--- /dev/null
+++ b/lib/lp/archivepublisher/tests/test_inject_extra_gpg_signing_key.py
@@ -0,0 +1,308 @@
+# Copyright 2024 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""
+Test cases for the script that injects an extra GPG signing key for an
+archive into signing service.
+"""
+import os
+import shutil
+from datetime import datetime, timezone
+from unittest import mock
+
+from fixtures import MockPatch, TempDir
+from testtools.matchers import (
+ Equals,
+ MatchesListwise,
+ MatchesStructure,
+ StartsWith,
+)
+from zope.component import getUtility
+
+from lp.archivepublisher.scripts.inject_extra_gpg_signing_key import (
+ InjectExtraGPGSigningKeyScript,
+)
+from lp.services.config import config
+from lp.services.log.logger import BufferLogger
+from lp.services.scripts.base import LaunchpadScriptFailure
+from lp.services.signing.enums import SigningKeyType
+from lp.services.signing.interfaces.signingkey import IArchiveSigningKeySet
+from lp.services.signing.tests.helpers import SigningServiceClientFixture
+from lp.testing import ExpectedException, TestCaseWithFactory
+from lp.testing.dbuser import dbuser
+from lp.testing.faketransaction import FakeTransaction
+from lp.testing.gpgkeys import gpgkeysdir
+from lp.testing.layers import ZopelessDatabaseLayer
+
+
+class TestInjectExtraGPGSigningKey(TestCaseWithFactory):
+ layer = ZopelessDatabaseLayer
+
+ def setUp(self):
+ super().setUp()
+ self.keys_dir = self.useFixture(TempDir()).path
+
+ def makeScript(self, test_args):
+ script = InjectExtraGPGSigningKeyScript(
+ "test-inject",
+ dbuser=config.archivepublisher.dbuser,
+ test_args=test_args,
+ )
+ script.txn = FakeTransaction()
+ script.logger = BufferLogger()
+ return script
+
+ def makeArchiveWithSigningKeyFingerprint(self):
+ archive = self.factory.makeArchive()
+ archive.signing_key_fingerprint = self.factory.getUniqueHexString(
+ digits=40
+ ).upper()
+ return archive
+
+ def test_archive_not_specified(self):
+ script = self.makeScript([])
+ with ExpectedException(LaunchpadScriptFailure, "Specify an archive."):
+ script.main()
+
+ def test_local_keys_directory_not_specified(self):
+ script = self.makeScript(["--archive", "~user/ubuntu/archive"])
+ with ExpectedException(
+ LaunchpadScriptFailure,
+ "Specify the directory containing the private keys.",
+ ):
+ script.main()
+
+ def test_fingerprint_not_specified(self):
+ script = self.makeScript(
+ [
+ "--archive",
+ "~user/ubuntu/archive",
+ "--local-keys-dir",
+ self.keys_dir,
+ ]
+ )
+ with ExpectedException(
+ LaunchpadScriptFailure,
+ "Specify the fingerprint of the GPG key to inject.",
+ ):
+ script.main()
+
+ def test_get_archive_with_reference(self):
+ archive = self.factory.makeArchive()
+ script = self.makeScript(["--archive", archive.reference])
+ self.assertEqual(script.getArchive(), archive)
+
+ def test_get_archive_with_reference_archive_not_found(self):
+ with ExpectedException(
+ LaunchpadScriptFailure,
+ "Archive '~foo/ubuntu/bar' could not be found.",
+ ):
+ self.makeScript(["--archive", "~foo/ubuntu/bar"]).getArchive()
+
+ def test_archive_has_no_existing_signing_key(self):
+ archive = self.factory.makeArchive()
+ script = self.makeScript(
+ [
+ "--archive",
+ archive.reference,
+ "--fingerprint",
+ self.factory.getUniqueHexString(digits=40).upper(),
+ "--local-keys-dir",
+ self.keys_dir,
+ ]
+ )
+ script.injectGPG = mock.Mock()
+ script.main()
+ content = script.logger.content.as_text()
+ self.assertIn(
+ f"Archive '{archive.reference}' does not have a signing key "
+ "generated by Launchpad yet. Cannot inject an extra key without "
+ "an existing key. Aborting.",
+ content,
+ )
+ script.injectGPG.assert_not_called()
+
+ def test_signing_key_with_fingerprint_exists_already_unassociated(self):
+ archive = self.makeArchiveWithSigningKeyFingerprint()
+ signing_key = self.factory.makeSigningKey(
+ key_type=SigningKeyType.OPENPGP
+ )
+ script = self.makeScript(
+ [
+ "--archive",
+ archive.reference,
+ "--fingerprint",
+ signing_key.fingerprint,
+ "--local-keys-dir",
+ self.keys_dir,
+ ]
+ )
+ script.injectGPG = mock.Mock()
+ script.main()
+ content = script.logger.content.as_text()
+ self.assertIn(
+ f"Signing key with fingerprint '{signing_key.fingerprint}' "
+ "exists already.",
+ content,
+ )
+ self.assertIn(
+ f"Signing key with fingerprint '{signing_key.fingerprint}' not "
+ f"associated with the archive '{archive.reference}'. Adding "
+ "the association.",
+ content,
+ )
+ self.assertIn(
+ "Aborting key injection into the signing service.", content
+ )
+ script.injectGPG.assert_not_called()
+
+ def test_signing_key_with_fingerprint_exists_already_associated(self):
+ archive = self.makeArchiveWithSigningKeyFingerprint()
+ signing_key = self.factory.makeSigningKey(
+ key_type=SigningKeyType.OPENPGP,
+ fingerprint=archive.signing_key_fingerprint,
+ )
+ getUtility(IArchiveSigningKeySet).create(archive, None, signing_key)
+ script = self.makeScript(
+ [
+ "--archive",
+ archive.reference,
+ "--fingerprint",
+ signing_key.fingerprint,
+ "--local-keys-dir",
+ self.keys_dir,
+ ]
+ )
+ script.injectGPG = mock.Mock()
+ script.main()
+ content = script.logger.content.as_text()
+ self.assertIn(
+ f"Signing key with fingerprint '{signing_key.fingerprint}' "
+ "exists already.",
+ content,
+ )
+ self.assertNotIn(
+ f"Signing key with fingerprint '{signing_key.fingerprint}' not "
+ f"associated with the archive '{archive.reference}'. Adding "
+ "the association.",
+ content,
+ )
+ self.assertIn(
+ "Aborting key injection into the signing service.", content
+ )
+ script.injectGPG.assert_not_called()
+
+ def test_secret_key_file_not_found(self):
+ archive = self.makeArchiveWithSigningKeyFingerprint()
+ fingerprint = self.factory.getUniqueHexString(digits=40).upper()
+ script = self.makeScript(
+ [
+ "--archive",
+ archive.reference,
+ "--fingerprint",
+ fingerprint,
+ "--local-keys-dir",
+ self.keys_dir,
+ ]
+ )
+ script.injectGPG = mock.Mock()
+ script.main()
+ content = script.logger.content.as_text()
+ self.assertIn(
+ f"Could not find key file at '{self.keys_dir}/{fingerprint}.gpg'.",
+ content,
+ )
+ script.injectGPG.assert_not_called()
+
+ def test_secret_key_file_exists(self):
+ archive = self.makeArchiveWithSigningKeyFingerprint()
+ fingerprint = self.factory.getUniqueHexString(digits=40).upper()
+ secret_key_path = os.path.join(self.keys_dir, f"{fingerprint}.gpg")
+ with open(secret_key_path, "w") as fd:
+ fd.write(f"Private key {fingerprint}")
+ script = self.makeScript(
+ [
+ "--archive",
+ archive.reference,
+ "--fingerprint",
+ fingerprint,
+ "--local-keys-dir",
+ self.keys_dir,
+ ]
+ )
+ script.injectGPG = mock.Mock()
+ script.main()
+ content = script.logger.content.as_text()
+ self.assertIn(
+ f"Found key file at '{self.keys_dir}/{fingerprint}.gpg'.",
+ content,
+ )
+ script.injectGPG.assert_called_once_with(archive, secret_key_path)
+
+ def test_injectGPG(self):
+ signing_service_client = self.useFixture(
+ SigningServiceClientFixture(self.factory)
+ )
+ now = datetime.now()
+ mock_datetime = self.useFixture(
+ MockPatch(
+ "lp.archivepublisher.scripts"
+ ".inject_extra_gpg_signing_key.datetime"
+ )
+ ).mock
+ mock_datetime.now = lambda: now
+ archive = self.makeArchiveWithSigningKeyFingerprint()
+ fingerprint = self.factory.getUniqueHexString(digits=40).upper()
+ secret_key_path = os.path.join(self.keys_dir, f"{fingerprint}.gpg")
+ shutil.copyfile(
+ os.path.join(gpgkeysdir, "ppa-sample@xxxxxxxxxxxxxxxxx"),
+ secret_key_path,
+ )
+
+ script = self.makeScript(
+ [
+ "--archive",
+ archive.reference,
+ "--fingerprint",
+ fingerprint,
+ "--local-keys-dir",
+ self.keys_dir,
+ ]
+ )
+ with dbuser(config.archivepublisher.dbuser):
+ signing_key = script.injectGPG(archive, secret_key_path)
+
+ self.assertThat(
+ signing_key,
+ MatchesStructure(
+ key_type=Equals(SigningKeyType.OPENPGP),
+ public_key=StartsWith(
+ b"-----BEGIN PGP PUBLIC KEY BLOCK-----\n"
+ ),
+ date_created=Equals(now.replace(tzinfo=timezone.utc)),
+ ),
+ )
+ with open(secret_key_path, "rb") as f:
+ # Remove the GnuPG version line from the secret key bytes
+ # since the GPG private key export using the GPG library
+ # excludes that.
+ secret_key_bytes = b"".join(
+ [
+ line
+ for line in f.readlines()
+ if not line.startswith(b"Version: GnuPG")
+ ]
+ )
+ self.assertEqual(1, signing_service_client.inject.call_count)
+ self.assertThat(
+ signing_service_client.inject.call_args[0],
+ MatchesListwise(
+ [
+ Equals(SigningKeyType.OPENPGP),
+ Equals(secret_key_bytes),
+ StartsWith(b"-----BEGIN PGP PUBLIC KEY BLOCK-----\n"),
+ Equals("Launchpad PPA for Celso áéíóú Providelo"),
+ Equals(now.replace(tzinfo=timezone.utc)),
+ ]
+ ),
+ )
References