launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #25367
[Merge] ~cjwatson/launchpad:archive-gpg-inject-into-signing-service into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:archive-gpg-inject-into-signing-service into launchpad:master.
Commit message:
Extend sync-signingkeys to inject archive GPG signing keys
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/391303
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:archive-gpg-inject-into-signing-service into launchpad:master.
diff --git a/lib/lp/archivepublisher/scripts/sync_signingkeys.py b/lib/lp/archivepublisher/scripts/sync_signingkeys.py
index 32a43de..740a235 100644
--- a/lib/lp/archivepublisher/scripts/sync_signingkeys.py
+++ b/lib/lp/archivepublisher/scripts/sync_signingkeys.py
@@ -21,14 +21,21 @@ import transaction
from zope.component import getUtility
from lp.archivepublisher.config import getPubConfig
+from lp.archivepublisher.interfaces.archivegpgsigningkey import (
+ ISignableArchive,
+ )
from lp.archivepublisher.model.publisherconfig import PublisherConfig
from lp.services.database.interfaces import IStore
+from lp.services.gpg.interfaces import IGPGHandler
from lp.services.scripts.base import (
LaunchpadScript,
LaunchpadScriptFailure,
)
from lp.services.signing.enums import SigningKeyType
-from lp.services.signing.interfaces.signingkey import IArchiveSigningKeySet
+from lp.services.signing.interfaces.signingkey import (
+ IArchiveSigningKeySet,
+ ISigningKeySet,
+ )
from lp.soyuz.interfaces.archive import IArchiveSet
from lp.soyuz.model.archive import Archive
@@ -93,15 +100,13 @@ class SyncSigningKeysScript(LaunchpadScript):
self.options.type)
key_types = [key_type]
else:
- # While archives do have OpenPGP keys, they work in a rather
- # different way (and are used for signing the archive itself,
- # not its contents), so skip them for now.
key_types = [
SigningKeyType.UEFI,
SigningKeyType.KMOD,
SigningKeyType.OPAL,
SigningKeyType.SIPL,
SigningKeyType.FIT,
+ SigningKeyType.OPENPGP,
]
return key_types
@@ -122,6 +127,9 @@ class SyncSigningKeysScript(LaunchpadScript):
}
found_keys_per_type = {}
for key_type in self.getKeyTypes():
+ if key_type == SigningKeyType.OPENPGP:
+ # OpenPGP keys are handled separately.
+ continue
files = [os.path.join(dir, f) for f in keys_per_type[key_type]]
self.logger.debug("Checking files %s...", ', '.join(files))
if all(os.path.exists(f) for f in files):
@@ -138,7 +146,7 @@ class SyncSigningKeysScript(LaunchpadScript):
series_paths = {}
pubconf = getPubConfig(archive)
if pubconf is None or pubconf.signingroot is None:
- self.logger.info(
+ self.logger.debug(
"Skipping %s: no pubconfig or no signing root." %
archive.reference)
return {}
@@ -189,6 +197,35 @@ class SyncSigningKeysScript(LaunchpadScript):
description, now, archive,
earliest_distro_series=series)
+ def injectGPG(self, archive, secret_key_path):
+ with open(secret_key_path, "rb") as key_file:
+ secret_key_export = key_file.read()
+ gpg_handler = getUtility(IGPGHandler)
+ secret_key = gpg_handler.importSecretKey(secret_key_export)
+ signing_key_set = getUtility(ISigningKeySet)
+ existing_signing_key = signing_key_set.get(
+ SigningKeyType.OPENPGP, secret_key.fingerprint)
+ if existing_signing_key is not None:
+ # There's no point in honouring self.options.overwrite here,
+ # because we know we'll just end up with the same fingerprint
+ # anyway, and lp-signing will reject attempts to update an
+ # existing key with new key material.
+ self.logger.info(
+ "Signing key for %s / %s already exists",
+ SigningKeyType.OPENPGP, archive.reference)
+ return existing_signing_key
+
+ if self.options.dry_run:
+ self.logger.info(
+ "Would inject signing key for %s / %s",
+ SigningKeyType.OPENPGP, archive.reference)
+ else:
+ public_key = gpg_handler.retrieveKey(secret_key.fingerprint)
+ now = datetime.now().replace(tzinfo=utc)
+ return signing_key_set.inject(
+ SigningKeyType.OPENPGP, secret_key.export(),
+ public_key.export(), secret_key.uids[0].name, now)
+
def processArchive(self, archive):
for series, path in self.getSeriesPaths(archive).items():
keys_per_type = self.getKeysPerType(path)
@@ -198,10 +235,18 @@ class SyncSigningKeysScript(LaunchpadScript):
priv_key, pub_key, key_type,
series.name if series else None)
self.inject(archive, key_type, series, priv_key, pub_key)
+ if (SigningKeyType.OPENPGP in self.getKeyTypes() and
+ archive.signing_key is not None):
+ secret_key_path = ISignableArchive(archive).getPathForSecretKey(
+ archive.signing_key)
+ self.logger.info(
+ "Found key file %s (type=%s).",
+ secret_key_path, SigningKeyType.OPENPGP)
+ self.injectGPG(archive, secret_key_path)
def main(self):
for i, archive in enumerate(self.getArchives()):
- self.logger.info(
+ self.logger.debug(
"#%s - Processing keys for archive %s.", i, archive.reference)
self.processArchive(archive)
if self.options.dry_run:
diff --git a/lib/lp/archivepublisher/tests/test_sync_signingkeys.py b/lib/lp/archivepublisher/tests/test_sync_signingkeys.py
index 5023395..22e38f5 100644
--- a/lib/lp/archivepublisher/tests/test_sync_signingkeys.py
+++ b/lib/lp/archivepublisher/tests/test_sync_signingkeys.py
@@ -1,3 +1,7 @@
+# -*- coding: utf-8 -*-
+# NOTE: The first line above must stay first; do not move the copyright
+# notice to the top. See http://www.python.org/dev/peps/pep-0263/.
+#
# Copyright 2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
@@ -24,11 +28,19 @@ from testtools.matchers import (
ContainsAll,
Equals,
MatchesDict,
+ MatchesListwise,
MatchesStructure,
+ StartsWith,
)
+from testtools.twistedsupport import AsynchronousDeferredRunTest
import transaction
+from twisted.internet import defer
from zope.component import getUtility
+from lp.archivepublisher.interfaces.archivegpgsigningkey import (
+ IArchiveGPGSigningKey,
+ ISignableArchive,
+ )
from lp.archivepublisher.model.publisherconfig import PublisherConfig
from lp.archivepublisher.scripts.sync_signingkeys import SyncSigningKeysScript
from lp.services.compat import mock
@@ -41,17 +53,22 @@ from lp.services.database.interfaces import IStore
from lp.services.log.logger import BufferLogger
from lp.services.signing.enums import SigningKeyType
from lp.services.signing.interfaces.signingkey import IArchiveSigningKeySet
+from lp.services.signing.model.signingkey import SigningKey
from lp.services.signing.testing.fixture import SigningServiceFixture
from lp.services.signing.tests.helpers import SigningServiceClientFixture
from lp.soyuz.model.archive import Archive
from lp.testing import TestCaseWithFactory
from lp.testing.dbuser import dbuser
+from lp.testing.gpgkeys import gpgkeysdir
+from lp.testing.keyserver import InProcessKeyServerFixture
from lp.testing.layers import ZopelessDatabaseLayer
from lp.testing.script import run_script
class TestSyncSigningKeysScript(TestCaseWithFactory):
+
layer = ZopelessDatabaseLayer
+ run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=10)
def setUp(self):
super(TestSyncSigningKeysScript, self).setUp()
@@ -130,6 +147,7 @@ class TestSyncSigningKeysScript(TestCaseWithFactory):
SigningKeyType.OPAL,
SigningKeyType.SIPL,
SigningKeyType.FIT,
+ SigningKeyType.OPENPGP,
]
self.assertEqual(expected_key_types, key_types)
@@ -232,7 +250,7 @@ class TestSyncSigningKeysScript(TestCaseWithFactory):
# Check the log messages.
content = script.logger.content.as_text()
self.assertIn(
- "INFO #0 - Processing keys for archive %s." % archive.reference,
+ "DEBUG #0 - Processing keys for archive %s." % archive.reference,
content)
tpl = "INFO Found key files %s / %s (type=%s, series=%s)."
@@ -329,7 +347,7 @@ class TestSyncSigningKeysScript(TestCaseWithFactory):
self.assertThat(
script.logger.content.as_text().splitlines(),
ContainsAll([
- "INFO #0 - Processing keys for archive %s." %
+ "DEBUG #0 - Processing keys for archive %s." %
archive.reference,
found_tpl % (
os.path.join(archive_root, "uefi.key"),
@@ -503,6 +521,75 @@ class TestSyncSigningKeysScript(TestCaseWithFactory):
(SigningKeyType.UEFI, archive.reference, series.name),
script.logger.content.as_text())
+ @defer.inlineCallbacks
+ def setUpArchiveKey(self, archive, secret_key_path):
+ with InProcessKeyServerFixture() as keyserver:
+ yield keyserver.start()
+ yield IArchiveGPGSigningKey(archive).setSigningKey(
+ secret_key_path, async_keyserver=True)
+
+ @defer.inlineCallbacks
+ def test_injectGPG(self):
+ signing_service_client = self.useFixture(
+ SigningServiceClientFixture(self.factory))
+ now = datetime.now()
+ mock_datetime = self.useFixture(MockPatch(
+ 'lp.archivepublisher.scripts.sync_signingkeys.datetime')).mock
+ mock_datetime.now = lambda: now
+ archive = self.factory.makeArchive()
+ secret_key_path = os.path.join(
+ gpgkeysdir, "ppa-sample@xxxxxxxxxxxxxxxxx")
+ yield self.setUpArchiveKey(archive, secret_key_path)
+ self.assertIsNotNone(archive.signing_key)
+ script = self.makeScript([])
+
+ with dbuser(config.archivepublisher.dbuser):
+ secret_key_path = ISignableArchive(archive).getPathForSecretKey(
+ archive.signing_key)
+ signing_key = script.injectGPG(archive, secret_key_path)
+
+ self.assertThat(signing_key, MatchesStructure(
+ key_type=Equals(SigningKeyType.OPENPGP),
+ public_key=StartsWith(b"-----BEGIN PGP PUBLIC KEY BLOCK-----\n"),
+ date_created=Equals(now.replace(tzinfo=utc))))
+ with open(secret_key_path, "rb") as f:
+ secret_key_bytes = f.read()
+ self.assertEqual(1, signing_service_client.inject.call_count)
+ self.assertThat(
+ signing_service_client.inject.call_args[0], MatchesListwise([
+ Equals(SigningKeyType.OPENPGP),
+ Equals(secret_key_bytes),
+ StartsWith(b"-----BEGIN PGP PUBLIC KEY BLOCK-----\n"),
+ Equals("Launchpad PPA for Celso áéíóú Providelo"),
+ Equals(now.replace(tzinfo=utc)),
+ ]))
+
+ @defer.inlineCallbacks
+ def test_injectGPG_existing_key(self):
+ signing_service_client = self.useFixture(
+ SigningServiceClientFixture(self.factory))
+ archive = self.factory.makeArchive()
+ secret_key_path = os.path.join(
+ gpgkeysdir, "ppa-sample@xxxxxxxxxxxxxxxxx")
+ yield self.setUpArchiveKey(archive, secret_key_path)
+ self.assertIsNotNone(archive.signing_key)
+ expected_signing_key = self.factory.makeSigningKey(
+ key_type=SigningKeyType.OPENPGP,
+ fingerprint=archive.signing_key_fingerprint)
+ script = self.makeScript([])
+
+ with dbuser(config.archivepublisher.dbuser):
+ secret_key_path = ISignableArchive(archive).getPathForSecretKey(
+ archive.signing_key)
+ signing_key = script.injectGPG(archive, secret_key_path)
+
+ self.assertEqual(expected_signing_key, signing_key)
+ self.assertEqual(0, signing_service_client.inject.call_count)
+ self.assertIn(
+ "Signing key for %s / %s already exists" %
+ (SigningKeyType.OPENPGP, archive.reference),
+ script.logger.content.as_text())
+
def runScript(self):
transaction.commit()
ret, out, err = run_script("scripts/sync-signingkeys.py")
@@ -511,6 +598,7 @@ class TestSyncSigningKeysScript(TestCaseWithFactory):
self.assertEqual(0, ret)
transaction.commit()
+ @defer.inlineCallbacks
def test_script(self):
self.useFixture(SigningServiceFixture())
series = self.factory.makeDistroSeries()
@@ -521,6 +609,10 @@ class TestSyncSigningKeysScript(TestCaseWithFactory):
fd.write(b"Private key content")
with open(os.path.join(archive_root, "uefi.crt"), "wb") as fd:
fd.write(b"Public key content")
+ secret_key_path = os.path.join(
+ gpgkeysdir, "ppa-sample@xxxxxxxxxxxxxxxxx")
+ yield self.setUpArchiveKey(archive, secret_key_path)
+ self.assertIsNotNone(archive.signing_key)
self.runScript()
@@ -529,3 +621,11 @@ class TestSyncSigningKeysScript(TestCaseWithFactory):
self.assertThat(archive_signing_key, MatchesStructure(
key_type=Equals(SigningKeyType.UEFI),
public_key=Equals(b"Public key content")))
+ # We can't look the key up by fingerprint in this test, because the
+ # fake signing service makes up a random fingerprint. Just look for
+ # the most recently-added SigningKey.
+ gpg_signing_key = IStore(SigningKey).find(
+ SigningKey).order_by(SigningKey.date_created).last()
+ self.assertThat(gpg_signing_key, MatchesStructure(
+ key_type=Equals(SigningKeyType.OPENPGP),
+ public_key=StartsWith(b"-----BEGIN PGP PUBLIC KEY BLOCK-----\n")))