← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~lgp171188/launchpad:add-script-inject-extra-gpg-signing-key into launchpad:master

 

Guruprasad has proposed merging ~lgp171188/launchpad:add-script-inject-extra-gpg-signing-key into launchpad:master.

Commit message:
Add a script to inject an extra signing key to an archive

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~lgp171188/launchpad/+git/launchpad/+merge/469914
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~lgp171188/launchpad:add-script-inject-extra-gpg-signing-key into launchpad:master.
diff --git a/lib/lp/archivepublisher/scripts/inject_extra_gpg_signing_key.py b/lib/lp/archivepublisher/scripts/inject_extra_gpg_signing_key.py
new file mode 100644
index 0000000..da677cc
--- /dev/null
+++ b/lib/lp/archivepublisher/scripts/inject_extra_gpg_signing_key.py
@@ -0,0 +1,192 @@
+# Copyright 2024 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Script to inject an extra archive GPG signing key into signing service."""
+
+__all__ = [
+    "InjectExtraGPGSigningKeyScript",
+]
+
+import os
+from datetime import datetime, timezone
+
+from zope.component import getUtility
+
+from lp.services.gpg.interfaces import IGPGHandler
+from lp.services.scripts.base import LaunchpadScript, LaunchpadScriptFailure
+from lp.services.signing.enums import SigningKeyType
+from lp.services.signing.interfaces.signingkey import (
+    IArchiveSigningKeySet,
+    ISigningKeySet,
+)
+from lp.soyuz.interfaces.archive import IArchiveSet
+
+
+class InjectExtraGPGSigningKeyScript(LaunchpadScript):
+    description = (
+        "Injects an extra GPG signing key in this machine for the "
+        "specified archive into the signing service."
+    )
+
+    def add_my_options(self):
+        self.parser.add_option(
+            "-A",
+            "--archive",
+            help=(
+                "The reference of the archive to process "
+                "Format: ~user/distribution/archive-name. Example: "
+                "~user/ubuntu/ppa."
+            ),
+        )
+        self.parser.add_option(
+            "-l",
+            "--local-keys-directory",
+            help="The local directory where keys are found.",
+        )
+        self.parser.add_option(
+            "-f",
+            "--fingerprint",
+            help=(
+                "The fingerprint of the GPG key to inject for "
+                "the specified archive."
+            ),
+        )
+        self.parser.add_option(
+            "-n",
+            "--dry-run",
+            action="store_true",
+            default=False,
+            help=(
+                "Report what would be done, but don't actually "
+                "inject the key."
+            ),
+        )
+
+    def getArchive(self):
+        """Get the archive for the given archive reference."""
+        if self.options.archive:
+            archive = getUtility(IArchiveSet).getByReference(
+                self.options.archive
+            )
+            if archive is None:
+                raise LaunchpadScriptFailure(
+                    f"Archive '{self.options.archive}' could not be found."
+                )
+            return archive
+
+    def injectGPG(self, archive, secret_key_path):
+        """Inject the secret key at the given path into the signing service."""
+        with open(secret_key_path, "rb") as key_file:
+            secret_key_export = key_file.read()
+        gpg_handler = getUtility(IGPGHandler)
+        secret_key = gpg_handler.importSecretKey(secret_key_export)
+        signing_key_set = getUtility(ISigningKeySet)
+
+        if self.options.dry_run:
+            self.logger.info(
+                "Would inject signing key with fingerprint '%s' for '%s',",
+                SigningKeyType.OPENPGP,
+                archive.reference,
+            )
+        else:
+            public_key = gpg_handler.retrieveKey(secret_key.fingerprint)
+            now = datetime.now().replace(tzinfo=timezone.utc)
+            signing_key = signing_key_set.inject(
+                SigningKeyType.OPENPGP,
+                secret_key.export(),
+                public_key.export(),
+                secret_key.uids[0].name,
+                now,
+            )
+            self.logger.info("Injected signing key into the signing service.")
+            getUtility(IArchiveSigningKeySet).create(
+                archive, None, signing_key
+            )
+            self.logger.info(
+                "Associated the signing key with archive '%s'.",
+                archive.reference,
+            )
+            return signing_key
+
+    def getSigningKey(self, fingerprint):
+        return (
+            getUtility(ISigningKeySet).get(SigningKeyType.OPENPGP, fingerprint)
+            or None
+        )
+
+    def isSigningKeyAssociatedWithArchive(self, archive, fingerprint):
+        return bool(
+            getUtility(IArchiveSigningKeySet).getByArchiveAndFingerprint(
+                archive, fingerprint
+            )
+        )
+
+    def processArchive(self, archive):
+        fingerprint = self.options.fingerprint
+        existing_signing_key = self.getSigningKey(fingerprint)
+        if existing_signing_key:
+            self.logger.error(
+                "Signing key with fingerprint '%s' exists already.",
+                fingerprint,
+            )
+            if not self.isSigningKeyAssociatedWithArchive(
+                archive, fingerprint
+            ):
+                self.logger.error(
+                    "Signing key with fingerprint '%s' not associated "
+                    "with the archive '%s'. Adding the association.",
+                    fingerprint,
+                    archive.reference,
+                )
+                getUtility(IArchiveSigningKeySet).create(
+                    archive, None, existing_signing_key
+                )
+            self.logger.error(
+                "Aborting key injection into the signing service."
+            )
+            return
+        secret_key_path = os.path.join(
+            self.options.local_keys_directory, f"{fingerprint}.gpg"
+        )
+        if not os.path.exists(secret_key_path):
+            self.logger.error(
+                "Could not find key file at '%s'.", secret_key_path
+            )
+            return
+        else:
+            self.logger.info("Found key file at '%s'.", secret_key_path)
+        self.injectGPG(archive, secret_key_path)
+
+    def _validateOptions(self):
+        if not self.options.archive:
+            raise LaunchpadScriptFailure("Specify an archive.")
+        if not self.options.local_keys_directory:
+            raise LaunchpadScriptFailure(
+                "Specify the directory containing the private keys."
+            )
+        if not self.options.fingerprint:
+            raise LaunchpadScriptFailure(
+                "Specify the fingerprint of the GPG key to inject."
+            )
+
+    def main(self):
+        self._validateOptions()
+        archive = self.getArchive()
+        if not archive.signing_key_fingerprint:
+            self.logger.error(
+                "Archive '%s' does not have a signing key generated "
+                "by Launchpad yet. Cannot inject an extra key without "
+                "an existing key. Aborting.",
+                archive.reference,
+            )
+            return
+        self.logger.debug("Processing keys for archive %s.", archive.reference)
+        self.processArchive(archive)
+        if self.options.dry_run:
+            self.logger.info(
+                "Aborting the transaction since this is a dry run."
+            )
+            self.txn.abort()
+        else:
+            self.logger.info("Archive processed; committing.")
+            self.txn.commit()
diff --git a/lib/lp/archivepublisher/tests/test_inject_extra_gpg_signing_key.py b/lib/lp/archivepublisher/tests/test_inject_extra_gpg_signing_key.py
new file mode 100644
index 0000000..ddb3922
--- /dev/null
+++ b/lib/lp/archivepublisher/tests/test_inject_extra_gpg_signing_key.py
@@ -0,0 +1,308 @@
+# Copyright 2024 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""
+Test cases for the script that injects an extra GPG signing key for an
+archive into signing service.
+"""
+import os
+import shutil
+from datetime import datetime, timezone
+from unittest import mock
+
+from fixtures import MockPatch, TempDir
+from testtools.matchers import (
+    Equals,
+    MatchesListwise,
+    MatchesStructure,
+    StartsWith,
+)
+from zope.component import getUtility
+
+from lp.archivepublisher.scripts.inject_extra_gpg_signing_key import (
+    InjectExtraGPGSigningKeyScript,
+)
+from lp.services.config import config
+from lp.services.log.logger import BufferLogger
+from lp.services.scripts.base import LaunchpadScriptFailure
+from lp.services.signing.enums import SigningKeyType
+from lp.services.signing.interfaces.signingkey import IArchiveSigningKeySet
+from lp.services.signing.tests.helpers import SigningServiceClientFixture
+from lp.testing import ExpectedException, TestCaseWithFactory
+from lp.testing.dbuser import dbuser
+from lp.testing.faketransaction import FakeTransaction
+from lp.testing.gpgkeys import gpgkeysdir
+from lp.testing.layers import ZopelessDatabaseLayer
+
+
+class TestInjectExtraGPGSigningKey(TestCaseWithFactory):
+    layer = ZopelessDatabaseLayer
+
+    def setUp(self):
+        super().setUp()
+        self.keys_dir = self.useFixture(TempDir()).path
+
+    def makeScript(self, test_args):
+        script = InjectExtraGPGSigningKeyScript(
+            "test-inject",
+            dbuser=config.archivepublisher.dbuser,
+            test_args=test_args,
+        )
+        script.txn = FakeTransaction()
+        script.logger = BufferLogger()
+        return script
+
+    def makeArchiveWithSigningKeyFingerprint(self):
+        archive = self.factory.makeArchive()
+        archive.signing_key_fingerprint = self.factory.getUniqueHexString(
+            digits=40
+        ).upper()
+        return archive
+
+    def test_archive_not_specified(self):
+        script = self.makeScript([])
+        with ExpectedException(LaunchpadScriptFailure, "Specify an archive."):
+            script.main()
+
+    def test_local_keys_directory_not_specified(self):
+        script = self.makeScript(["--archive", "~user/ubuntu/archive"])
+        with ExpectedException(
+            LaunchpadScriptFailure,
+            "Specify the directory containing the private keys.",
+        ):
+            script.main()
+
+    def test_fingerprint_not_specified(self):
+        script = self.makeScript(
+            [
+                "--archive",
+                "~user/ubuntu/archive",
+                "--local-keys-dir",
+                self.keys_dir,
+            ]
+        )
+        with ExpectedException(
+            LaunchpadScriptFailure,
+            "Specify the fingerprint of the GPG key to inject.",
+        ):
+            script.main()
+
+    def test_get_archive_with_reference(self):
+        archive = self.factory.makeArchive()
+        script = self.makeScript(["--archive", archive.reference])
+        self.assertEqual(script.getArchive(), archive)
+
+    def test_get_archive_with_reference_archive_not_found(self):
+        with ExpectedException(
+            LaunchpadScriptFailure,
+            "Archive '~foo/ubuntu/bar' could not be found.",
+        ):
+            self.makeScript(["--archive", "~foo/ubuntu/bar"]).getArchive()
+
+    def test_archive_has_no_existing_signing_key(self):
+        archive = self.factory.makeArchive()
+        script = self.makeScript(
+            [
+                "--archive",
+                archive.reference,
+                "--fingerprint",
+                self.factory.getUniqueHexString(digits=40).upper(),
+                "--local-keys-dir",
+                self.keys_dir,
+            ]
+        )
+        script.injectGPG = mock.Mock()
+        script.main()
+        content = script.logger.content.as_text()
+        self.assertIn(
+            f"Archive '{archive.reference}' does not have a signing key "
+            "generated by Launchpad yet. Cannot inject an extra key without "
+            "an existing key. Aborting.",
+            content,
+        )
+        script.injectGPG.assert_not_called()
+
+    def test_signing_key_with_fingerprint_exists_already_unassociated(self):
+        archive = self.makeArchiveWithSigningKeyFingerprint()
+        signing_key = self.factory.makeSigningKey(
+            key_type=SigningKeyType.OPENPGP
+        )
+        script = self.makeScript(
+            [
+                "--archive",
+                archive.reference,
+                "--fingerprint",
+                signing_key.fingerprint,
+                "--local-keys-dir",
+                self.keys_dir,
+            ]
+        )
+        script.injectGPG = mock.Mock()
+        script.main()
+        content = script.logger.content.as_text()
+        self.assertIn(
+            f"Signing key with fingerprint '{signing_key.fingerprint}' "
+            "exists already.",
+            content,
+        )
+        self.assertIn(
+            f"Signing key with fingerprint '{signing_key.fingerprint}' not "
+            f"associated with the archive '{archive.reference}'. Adding "
+            "the association.",
+            content,
+        )
+        self.assertIn(
+            "Aborting key injection into the signing service.", content
+        )
+        script.injectGPG.assert_not_called()
+
+    def test_signing_key_with_fingerprint_exists_already_associated(self):
+        archive = self.makeArchiveWithSigningKeyFingerprint()
+        signing_key = self.factory.makeSigningKey(
+            key_type=SigningKeyType.OPENPGP,
+            fingerprint=archive.signing_key_fingerprint,
+        )
+        getUtility(IArchiveSigningKeySet).create(archive, None, signing_key)
+        script = self.makeScript(
+            [
+                "--archive",
+                archive.reference,
+                "--fingerprint",
+                signing_key.fingerprint,
+                "--local-keys-dir",
+                self.keys_dir,
+            ]
+        )
+        script.injectGPG = mock.Mock()
+        script.main()
+        content = script.logger.content.as_text()
+        self.assertIn(
+            f"Signing key with fingerprint '{signing_key.fingerprint}' "
+            "exists already.",
+            content,
+        )
+        self.assertNotIn(
+            f"Signing key with fingerprint '{signing_key.fingerprint}' not "
+            f"associated with the archive '{archive.reference}'. Adding "
+            "the association.",
+            content,
+        )
+        self.assertIn(
+            "Aborting key injection into the signing service.", content
+        )
+        script.injectGPG.assert_not_called()
+
+    def test_secret_key_file_not_found(self):
+        archive = self.makeArchiveWithSigningKeyFingerprint()
+        fingerprint = self.factory.getUniqueHexString(digits=40).upper()
+        script = self.makeScript(
+            [
+                "--archive",
+                archive.reference,
+                "--fingerprint",
+                fingerprint,
+                "--local-keys-dir",
+                self.keys_dir,
+            ]
+        )
+        script.injectGPG = mock.Mock()
+        script.main()
+        content = script.logger.content.as_text()
+        self.assertIn(
+            f"Could not find key file at '{self.keys_dir}/{fingerprint}.gpg'.",
+            content,
+        )
+        script.injectGPG.assert_not_called()
+
+    def test_secret_key_file_exists(self):
+        archive = self.makeArchiveWithSigningKeyFingerprint()
+        fingerprint = self.factory.getUniqueHexString(digits=40).upper()
+        secret_key_path = os.path.join(self.keys_dir, f"{fingerprint}.gpg")
+        with open(secret_key_path, "w") as fd:
+            fd.write(f"Private key {fingerprint}")
+        script = self.makeScript(
+            [
+                "--archive",
+                archive.reference,
+                "--fingerprint",
+                fingerprint,
+                "--local-keys-dir",
+                self.keys_dir,
+            ]
+        )
+        script.injectGPG = mock.Mock()
+        script.main()
+        content = script.logger.content.as_text()
+        self.assertIn(
+            f"Found key file at '{self.keys_dir}/{fingerprint}.gpg'.",
+            content,
+        )
+        script.injectGPG.assert_called_once_with(archive, secret_key_path)
+
+    def test_injectGPG(self):
+        signing_service_client = self.useFixture(
+            SigningServiceClientFixture(self.factory)
+        )
+        now = datetime.now()
+        mock_datetime = self.useFixture(
+            MockPatch(
+                "lp.archivepublisher.scripts"
+                ".inject_extra_gpg_signing_key.datetime"
+            )
+        ).mock
+        mock_datetime.now = lambda: now
+        archive = self.makeArchiveWithSigningKeyFingerprint()
+        fingerprint = self.factory.getUniqueHexString(digits=40).upper()
+        secret_key_path = os.path.join(self.keys_dir, f"{fingerprint}.gpg")
+        shutil.copyfile(
+            os.path.join(gpgkeysdir, "ppa-sample@xxxxxxxxxxxxxxxxx"),
+            secret_key_path,
+        )
+
+        script = self.makeScript(
+            [
+                "--archive",
+                archive.reference,
+                "--fingerprint",
+                fingerprint,
+                "--local-keys-dir",
+                self.keys_dir,
+            ]
+        )
+        with dbuser(config.archivepublisher.dbuser):
+            signing_key = script.injectGPG(archive, secret_key_path)
+
+        self.assertThat(
+            signing_key,
+            MatchesStructure(
+                key_type=Equals(SigningKeyType.OPENPGP),
+                public_key=StartsWith(
+                    b"-----BEGIN PGP PUBLIC KEY BLOCK-----\n"
+                ),
+                date_created=Equals(now.replace(tzinfo=timezone.utc)),
+            ),
+        )
+        with open(secret_key_path, "rb") as f:
+            # Remove the GnuPG version line from the secret key bytes
+            # since the GPG private key export using the GPG library
+            # excludes that.
+            secret_key_bytes = b"".join(
+                [
+                    line
+                    for line in f.readlines()
+                    if not line.startswith(b"Version: GnuPG")
+                ]
+            )
+        self.assertEqual(1, signing_service_client.inject.call_count)
+        self.assertThat(
+            signing_service_client.inject.call_args[0],
+            MatchesListwise(
+                [
+                    Equals(SigningKeyType.OPENPGP),
+                    Equals(secret_key_bytes),
+                    StartsWith(b"-----BEGIN PGP PUBLIC KEY BLOCK-----\n"),
+                    Equals("Launchpad PPA for Celso áéíóú Providelo"),
+                    Equals(now.replace(tzinfo=timezone.utc)),
+                ]
+            ),
+        )

References