launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #24994
[Merge] ~cjwatson/launchpad:sync-signingkeys-options into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:sync-signingkeys-options into launchpad:master.
Commit message:
Add several new options to sync-signingkeys
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/387203
--archive and --type allow limiting processing to a single archive and/or signing key type.
--overwrite allows overwriting keys that already exist on the signing service. Use with care, and probably only in conjunction with --archive and --type.
--dry-run just reports what would be done. This may be useful if preparing for an overwriting run.
I had to do some rearrangement of ArchiveSigningKeySet in preparation for this.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:sync-signingkeys-options into launchpad:master.
diff --git a/database/schema/security.cfg b/database/schema/security.cfg
index 93dc9f8..0b3996c 100644
--- a/database/schema/security.cfg
+++ b/database/schema/security.cfg
@@ -1207,7 +1207,7 @@ public.account = SELECT, INSERT, UPDATE
public.archive = SELECT, INSERT, UPDATE
public.archivearch = SELECT, INSERT, UPDATE, DELETE
public.archivejob = SELECT, INSERT
-public.archivesigningkey = SELECT, INSERT, UPDATE
+public.archivesigningkey = SELECT, INSERT, UPDATE, DELETE
public.binarypackagebuild = SELECT, INSERT, UPDATE
public.binarypackagefile = SELECT, INSERT, UPDATE
public.binarypackagename = SELECT, INSERT, UPDATE
diff --git a/lib/lp/archivepublisher/scripts/sync_signingkeys.py b/lib/lp/archivepublisher/scripts/sync_signingkeys.py
index e0b89d2..aa7fb53 100644
--- a/lib/lp/archivepublisher/scripts/sync_signingkeys.py
+++ b/lib/lp/archivepublisher/scripts/sync_signingkeys.py
@@ -16,15 +16,20 @@ from datetime import datetime
import os
from pytz import utc
+from storm.locals import Store
import transaction
from zope.component import getUtility
from lp.archivepublisher.config import getPubConfig
from lp.archivepublisher.model.publisherconfig import PublisherConfig
from lp.services.database.interfaces import IStore
-from lp.services.scripts.base import LaunchpadScript
+from lp.services.scripts.base import (
+ LaunchpadScript,
+ LaunchpadScriptFailure,
+ )
from lp.services.signing.enums import SigningKeyType
from lp.services.signing.interfaces.signingkey import IArchiveSigningKeySet
+from lp.soyuz.interfaces.archive import IArchiveSet
from lp.soyuz.model.archive import Archive
@@ -35,23 +40,62 @@ class SyncSigningKeysScript(LaunchpadScript):
def add_my_options(self):
self.parser.add_option(
+ "-A", "--archive",
+ help=(
+ "The reference of the archive to process "
+ "(default: all archives)."))
+ self.parser.add_option(
+ "-t", "--type",
+ help="The type of keys to process (default: all types).")
+
+ self.parser.add_option(
"-l", "--limit", dest="limit", type=int,
help="How many archives to fetch.")
-
self.parser.add_option(
"-o", "--offset", dest="offset", type=int,
help="Offset on archives list.")
+ self.parser.add_option(
+ "--overwrite", action="store_true", default=False,
+ help="Overwrite keys that already exist on the signing service.")
+ self.parser.add_option(
+ "-n", "--dry-run", action="store_true", default=False,
+ help="Report what would be done, but don't actually inject keys.")
+
def getArchives(self):
"""Gets the list of archives that should be processed."""
- archives = IStore(Archive).find(
- Archive,
- PublisherConfig.distribution_id == Archive.distributionID)
- archives = archives.order_by(Archive.id)
+ if self.options.archive is not None:
+ archive = getUtility(IArchiveSet).getByReference(
+ self.options.archive)
+ if archive is None:
+ raise LaunchpadScriptFailure(
+ "No archive named '%s' could be found." %
+ self.options.archive)
+ archives = [archive]
+ else:
+ archives = IStore(Archive).find(
+ Archive,
+ PublisherConfig.distribution_id == Archive.distributionID)
+ archives = archives.order_by(Archive.id)
start = self.options.offset if self.options.offset else 0
end = start + self.options.limit if self.options.limit else None
return archives[start:end]
+ def getKeyTypes(self):
+ """Gets the list of key types that should be processed."""
+ if self.options.type is not None:
+ try:
+ key_type = SigningKeyType.getTermByToken(
+ self.options.type).value
+ except LookupError:
+ raise LaunchpadScriptFailure(
+ "There is no signing key type named '%s'." %
+ self.options.type)
+ key_types = [key_type]
+ else:
+ key_types = SigningKeyType.items
+ return key_types
+
def getKeysPerType(self, dir):
"""Returns the existing key files per type in the given directory.
@@ -68,7 +112,7 @@ class SyncSigningKeysScript(LaunchpadScript):
os.path.join("fit", "fit.crt")),
}
found_keys_per_type = {}
- for key_type in SigningKeyType.items:
+ for key_type in self.getKeyTypes():
files = [os.path.join(dir, f) for f in keys_per_type[key_type]]
self.logger.debug("Checking files %s...", ', '.join(files))
if all(os.path.exists(f) for f in files):
@@ -102,25 +146,39 @@ class SyncSigningKeysScript(LaunchpadScript):
def inject(self, archive, key_type, series, priv_key_path, pub_key_path):
arch_signing_key_set = getUtility(IArchiveSigningKeySet)
- existing_signing_key = arch_signing_key_set.getSigningKey(
+ existing_archive_signing_key = arch_signing_key_set.get(
key_type, archive, series, exact_match=True)
- if existing_signing_key is not None:
- self.logger.info("Signing key for %s / %s / %s already exists",
- key_type, archive.reference,
- series.name if series else None)
- return existing_signing_key
-
- with open(priv_key_path, 'rb') as fd:
- private_key = fd.read()
- with open(pub_key_path, 'rb') as fd:
- public_key = fd.read()
-
- now = datetime.now().replace(tzinfo=utc)
- description = u"%s key for %s" % (key_type.name, archive.reference)
- return arch_signing_key_set.inject(
- key_type, private_key, public_key,
- description, now, archive,
- earliest_distro_series=series)
+ if existing_archive_signing_key is not None:
+ if self.options.overwrite:
+ self.logger.info(
+ "Overwriting existing signing key for %s / %s / %s",
+ key_type, archive.reference,
+ series.name if series else None)
+ Store.of(existing_archive_signing_key).remove(
+ existing_archive_signing_key)
+ else:
+ self.logger.info(
+ "Signing key for %s / %s / %s already exists",
+ key_type, archive.reference,
+ series.name if series else None)
+ return existing_archive_signing_key
+
+ if self.options.dry_run:
+ self.logger.info(
+ "Would inject signing key for %s / %s / %s",
+ key_type, archive.reference, series.name if series else None)
+ else:
+ with open(priv_key_path, 'rb') as fd:
+ private_key = fd.read()
+ with open(pub_key_path, 'rb') as fd:
+ public_key = fd.read()
+
+ now = datetime.now().replace(tzinfo=utc)
+ description = u"%s key for %s" % (key_type.name, archive.reference)
+ return arch_signing_key_set.inject(
+ key_type, private_key, public_key,
+ description, now, archive,
+ earliest_distro_series=series)
def processArchive(self, archive):
for series, path in self.getSeriesPaths(archive).items():
@@ -137,5 +195,8 @@ class SyncSigningKeysScript(LaunchpadScript):
self.logger.info(
"#%s - Processing keys for archive %s.", i, archive.reference)
self.processArchive(archive)
- transaction.commit()
+ if self.options.dry_run:
+ transaction.abort()
+ else:
+ transaction.commit()
self.logger.info("Finished processing archives injections.")
diff --git a/lib/lp/archivepublisher/signing.py b/lib/lp/archivepublisher/signing.py
index 1eca0a4..0425242 100644
--- a/lib/lp/archivepublisher/signing.py
+++ b/lib/lp/archivepublisher/signing.py
@@ -453,7 +453,7 @@ class SigningUpload(CustomUpload):
return
key_set = getUtility(IArchiveSigningKeySet)
- current_key = key_set.getSigningKey(
+ current_key = key_set.get(
key_type, self.archive, None, exact_match=True)
if current_key is not None:
self.logger.info("Skipping injection for key type %s: archive "
diff --git a/lib/lp/archivepublisher/tests/test_sync_signingkeys.py b/lib/lp/archivepublisher/tests/test_sync_signingkeys.py
index eb185de..d20e676 100644
--- a/lib/lp/archivepublisher/tests/test_sync_signingkeys.py
+++ b/lib/lp/archivepublisher/tests/test_sync_signingkeys.py
@@ -21,6 +21,7 @@ from fixtures import (
from pytz import utc
from testtools.content import text_content
from testtools.matchers import (
+ ContainsAll,
Equals,
MatchesDict,
MatchesStructure,
@@ -31,6 +32,7 @@ from zope.component import getUtility
from lp.archivepublisher.model.publisherconfig import PublisherConfig
from lp.archivepublisher.scripts.sync_signingkeys import SyncSigningKeysScript
from lp.services.compat import mock
+from lp.services.config import config
from lp.services.config.fixture import (
ConfigFixture,
ConfigUseFixture,
@@ -43,6 +45,7 @@ from lp.services.signing.testing.fixture import SigningServiceFixture
from lp.services.signing.tests.helpers import SigningServiceClientFixture
from lp.soyuz.model.archive import Archive
from lp.testing import TestCaseWithFactory
+from lp.testing.dbuser import dbuser
from lp.testing.layers import ZopelessDatabaseLayer
from lp.testing.script import run_script
@@ -65,7 +68,9 @@ class TestSyncSigningKeysScript(TestCaseWithFactory):
self.useFixture(ConfigUseFixture(config_name))
def makeScript(self, test_args):
- script = SyncSigningKeysScript("test-sync", test_args=test_args)
+ script = SyncSigningKeysScript(
+ "test-sync", dbuser=config.archivepublisher.dbuser,
+ test_args=test_args)
script.logger = BufferLogger()
return script
@@ -110,6 +115,22 @@ class TestSyncSigningKeysScript(TestCaseWithFactory):
archives = list(script.getArchives())
self.assertEqual(all_archives[2:5], archives)
+ def test_fetch_archives_with_reference(self):
+ all_archives = list(self.makeArchives())
+ script = self.makeScript(["--archive", all_archives[0].reference])
+ archives = list(script.getArchives())
+ self.assertEqual([all_archives[0]], archives)
+
+ def test_get_key_types(self):
+ script = self.makeScript([])
+ key_types = script.getKeyTypes()
+ self.assertEqual(SigningKeyType.items, key_types)
+
+ def test_get_key_types_with_selection(self):
+ script = self.makeScript(["--type", "UEFI"])
+ key_types = script.getKeyTypes()
+ self.assertEqual([SigningKeyType.UEFI], key_types)
+
def test_get_keys_per_type(self):
keys_dir = self.signing_root_dir
@@ -178,8 +199,7 @@ class TestSyncSigningKeysScript(TestCaseWithFactory):
'wb') as fd:
fd.write(b"Series 1 %s" % filename)
- script = self.makeScript([])
- script.getArchives = mock.Mock(return_value=[archive])
+ script = self.makeScript(["--archive", archive.reference])
script.inject = mock.Mock()
script.main()
@@ -234,6 +254,103 @@ class TestSyncSigningKeysScript(TestCaseWithFactory):
SigningKeyType.UEFI, None),
content)
+ def test_process_archive_dry_run(self):
+ signing_service_client = self.useFixture(
+ SigningServiceClientFixture(self.factory))
+
+ distro = self.factory.makeDistribution()
+ series1 = self.factory.makeDistroSeries(distribution=distro)
+ series2 = self.factory.makeDistroSeries(distribution=distro)
+
+ archive = self.factory.makeArchive(distribution=distro)
+ key_dirs = self.makeArchiveSigningDir(archive, [series1, series2])
+
+ archive_root = key_dirs[None]
+
+ archive_signing_key_fit = self.factory.makeArchiveSigningKey(
+ archive=archive, distro_series=series1,
+ signing_key=self.factory.makeSigningKey(
+ key_type=SigningKeyType.FIT))
+ fingerprint_fit = archive_signing_key_fit.signing_key.fingerprint
+
+ transaction.commit()
+
+ # Create fake UEFI keys for the root
+ for filename in ("uefi.key", "uefi.crt"):
+ with open(os.path.join(archive_root, filename), 'wb') as fd:
+ fd.write(b"Root %s" % filename)
+
+ # Create fake OPAL and Kmod keys for series1
+ for filename in ("opal.pem", "opal.x509", "kmod.pem", "kmod.x509"):
+ with open(os.path.join(key_dirs[series1], filename), 'wb') as fd:
+ fd.write(b"Series 1 %s" % filename)
+
+ # Create fake FIT keys for series1
+ os.makedirs(os.path.join(key_dirs[series1], "fit"))
+ for filename in ("fit.key", "fit.crt"):
+ with open(os.path.join(key_dirs[series1], "fit", filename),
+ 'wb') as fd:
+ fd.write(b"Series 1 %s" % filename)
+
+ script = self.makeScript(
+ ["--archive", archive.reference, "--overwrite", "--dry-run"])
+ script.main()
+
+ self.assertEqual(0, signing_service_client.inject.call_count)
+
+ # No changes are committed to the database.
+ archive_signing_key_set = getUtility(IArchiveSigningKeySet)
+ self.assertIsNone(
+ archive_signing_key_set.getSigningKey(
+ SigningKeyType.UEFI, archive, None, exact_match=True))
+ self.assertIsNone(
+ archive_signing_key_set.getSigningKey(
+ SigningKeyType.KMOD, archive, series1, exact_match=True))
+ self.assertIsNone(
+ archive_signing_key_set.getSigningKey(
+ SigningKeyType.OPAL, archive, series1, exact_match=True))
+ self.assertThat(
+ archive_signing_key_set.getSigningKey(
+ SigningKeyType.FIT, archive, series1, exact_match=True),
+ MatchesStructure.byEquality(fingerprint=fingerprint_fit))
+
+ # Check the log messages.
+ found_tpl = "INFO Found key files %s / %s (type=%s, series=%s)."
+ overwrite_tpl = (
+ "INFO Overwriting existing signing key for %s / %s / %s")
+ inject_tpl = "INFO Would inject signing key for %s / %s / %s"
+ self.assertThat(
+ script.logger.content.as_text().splitlines(),
+ ContainsAll([
+ "INFO #0 - Processing keys for archive %s." %
+ archive.reference,
+ found_tpl % (
+ os.path.join(archive_root, "uefi.key"),
+ os.path.join(archive_root, "uefi.crt"),
+ SigningKeyType.UEFI, None),
+ inject_tpl % (SigningKeyType.UEFI, archive.reference, None),
+ found_tpl % (
+ os.path.join(key_dirs[series1], "kmod.pem"),
+ os.path.join(key_dirs[series1], "kmod.x509"),
+ SigningKeyType.KMOD, series1.name),
+ inject_tpl % (
+ SigningKeyType.KMOD, archive.reference, series1.name),
+ found_tpl % (
+ os.path.join(key_dirs[series1], "opal.pem"),
+ os.path.join(key_dirs[series1], "opal.x509"),
+ SigningKeyType.OPAL, series1.name),
+ inject_tpl % (
+ SigningKeyType.OPAL, archive.reference, series1.name),
+ found_tpl % (
+ os.path.join(key_dirs[series1], "fit", "fit.key"),
+ os.path.join(key_dirs[series1], "fit", "fit.crt"),
+ SigningKeyType.FIT, series1.name),
+ overwrite_tpl % (
+ SigningKeyType.FIT, archive.reference, series1.name),
+ inject_tpl % (
+ SigningKeyType.FIT, archive.reference, series1.name),
+ ]))
+
def test_inject(self):
signing_service_client = self.useFixture(
SigningServiceClientFixture(self.factory))
@@ -258,8 +375,10 @@ class TestSyncSigningKeysScript(TestCaseWithFactory):
script = self.makeScript([])
- result_with_series = script.inject(
- archive, SigningKeyType.UEFI, series, priv_key_path, pub_key_path)
+ with dbuser(config.archivepublisher.dbuser):
+ result_with_series = script.inject(
+ archive, SigningKeyType.UEFI, series,
+ priv_key_path, pub_key_path)
self.assertThat(result_with_series, MatchesStructure.byEquality(
archive=archive,
@@ -278,8 +397,10 @@ class TestSyncSigningKeysScript(TestCaseWithFactory):
u"UEFI key for %s" % archive.reference, now.replace(tzinfo=utc)),
signing_service_client.inject.call_args[0])
- result_no_series = script.inject(
- archive, SigningKeyType.UEFI, None, priv_key_path, pub_key_path)
+ with dbuser(config.archivepublisher.dbuser):
+ result_no_series = script.inject(
+ archive, SigningKeyType.UEFI, None,
+ priv_key_path, pub_key_path)
self.assertThat(result_no_series, MatchesStructure.byEquality(
archive=archive,
@@ -312,12 +433,13 @@ class TestSyncSigningKeysScript(TestCaseWithFactory):
fd.write(b"Public key content")
expected_arch_signing_key = self.factory.makeArchiveSigningKey(
- archive=archive, distro_series=series).signing_key
+ archive=archive, distro_series=series)
key_type = expected_arch_signing_key.key_type
script = self.makeScript([])
- got_arch_key = script.inject(
- archive, key_type, series, priv_key_path, pub_key_path)
+ with dbuser(config.archivepublisher.dbuser):
+ got_arch_key = script.inject(
+ archive, key_type, series, priv_key_path, pub_key_path)
self.assertEqual(expected_arch_signing_key, got_arch_key)
self.assertIn(
@@ -325,6 +447,55 @@ class TestSyncSigningKeysScript(TestCaseWithFactory):
(key_type, archive.reference, series.name),
script.logger.content.as_text())
+ def test_inject_existing_key_with_overwrite(self):
+ signing_service_client = self.useFixture(
+ SigningServiceClientFixture(self.factory))
+
+ now = datetime.now()
+ mock_datetime = self.useFixture(MockPatch(
+ 'lp.archivepublisher.scripts.sync_signingkeys.datetime')).mock
+ mock_datetime.now = lambda: now
+
+ distro = self.factory.makeDistribution()
+ series = self.factory.makeDistroSeries(distribution=distro)
+ archive = self.factory.makeArchive(distribution=distro)
+
+ tmpdir = self.useFixture(TempDir()).path
+ priv_key_path = os.path.join(tmpdir, "priv.key")
+ pub_key_path = os.path.join(tmpdir, "pub.crt")
+ with open(priv_key_path, 'wb') as fd:
+ fd.write(b"Private key content")
+ with open(pub_key_path, 'wb') as fd:
+ fd.write(b"Public key content")
+
+ self.factory.makeArchiveSigningKey(
+ archive=archive, distro_series=series,
+ signing_key=self.factory.makeSigningKey(
+ key_type=SigningKeyType.UEFI))
+
+ script = self.makeScript(["--overwrite"])
+ with dbuser(config.archivepublisher.dbuser):
+ result = script.inject(
+ archive, SigningKeyType.UEFI, series,
+ priv_key_path, pub_key_path)
+
+ self.assertThat(result, MatchesStructure(
+ archive=Equals(archive),
+ earliest_distro_series=Equals(series),
+ key_type=Equals(SigningKeyType.UEFI),
+ signing_key=MatchesStructure.byEquality(
+ key_type=SigningKeyType.UEFI,
+ public_key=b"Public key content")))
+ self.assertEqual(
+ [(SigningKeyType.UEFI, b"Private key content",
+ b"Public key content",
+ "UEFI key for %s" % archive.reference, now.replace(tzinfo=utc))],
+ signing_service_client.inject.call_args)
+ self.assertIn(
+ "Overwriting existing signing key for %s / %s / %s" %
+ (SigningKeyType.UEFI, archive.reference, series.name),
+ script.logger.content.as_text())
+
def runScript(self):
transaction.commit()
ret, out, err = run_script("scripts/sync-signingkeys.py")
diff --git a/lib/lp/services/signing/interfaces/signingkey.py b/lib/lp/services/signing/interfaces/signingkey.py
index eccf2a0..77814b8 100644
--- a/lib/lp/services/signing/interfaces/signingkey.py
+++ b/lib/lp/services/signing/interfaces/signingkey.py
@@ -117,14 +117,22 @@ class IArchiveSigningKeySet(Interface):
False if it was updated).
"""
- def getSigningKey(key_type, archive, distro_series, exact_match=False):
- """Get the most suitable key for a given archive / distro series
- pair.
+ def get(key_type, archive, distro_series, exact_match=False):
+ """Get the most suitable ArchiveSigningKey for a given context.
:param exact_match: If True, returns the ArchiveSigningKey matching
- exactly the given key_type, archive and
- distro_series. If False, gets the best match.
- :return: The most suitable key
+ exactly the given key_type, archive and distro_series. If False,
+ gets the best match.
+ :return: The most suitable key, or None.
+ """
+
+ def getSigningKey(key_type, archive, distro_series, exact_match=False):
+ """Get the most suitable SigningKey for a given context.
+
+ :param exact_match: If True, returns the SigningKey matching exactly
+ the given key_type, archive and distro_series. If False, gets
+ the best match.
+ :return: The most suitable key, or None.
"""
def generate(key_type, description, archive, earliest_distro_series=None):
diff --git a/lib/lp/services/signing/model/signingkey.py b/lib/lp/services/signing/model/signingkey.py
index 5557e7e..0c754f9 100644
--- a/lib/lp/services/signing/model/signingkey.py
+++ b/lib/lp/services/signing/model/signingkey.py
@@ -12,8 +12,6 @@ __all__ = [
'SigningKey',
]
-from collections import defaultdict
-
import pytz
from storm.locals import (
Bytes,
@@ -175,46 +173,48 @@ class ArchiveSigningKeySet:
return obj
@classmethod
- def getSigningKey(cls, key_type, archive, distro_series,
- exact_match=False):
+ def get(cls, key_type, archive, distro_series, exact_match=False):
store = IStore(ArchiveSigningKey)
- # Gets all the keys of the given key_type available for the archive
- rs = store.find(ArchiveSigningKey,
- SigningKey.id == ArchiveSigningKey.signing_key_id,
- SigningKey.key_type == key_type,
- ArchiveSigningKey.key_type == key_type,
- ArchiveSigningKey.archive == archive)
+
+ # Get all the keys of the given key_type available for the archive.
+ archive_signing_keys = store.find(
+ ArchiveSigningKey,
+ ArchiveSigningKey.key_type == key_type,
+ ArchiveSigningKey.archive == archive)
if exact_match:
- rs = rs.find(
+ archive_signing_keys = archive_signing_keys.find(
ArchiveSigningKey.earliest_distro_series == distro_series)
- # prefetch related signing keys to avoid extra queries.
- signing_keys = store.find(SigningKey, [
- SigningKey.id.is_in([i.signing_key_id for i in rs])])
- signing_keys_by_id = {i.id: i for i in signing_keys}
-
- # Group keys per type, and per distro series
- keys_per_series = defaultdict(dict)
- for i in rs:
- signing_key = signing_keys_by_id[i.signing_key_id]
- keys_per_series[i.earliest_distro_series] = signing_key
+ # Group keys per distro series.
+ keys_per_series = {
+ archive_signing_key.earliest_distro_series_id: archive_signing_key
+ for archive_signing_key in archive_signing_keys}
- # Let's search the most suitable per key type.
+ # Find the most suitable per key type.
found_series = False
# Note that archive.distribution.series is, by default, sorted by
# "version", reversed.
for series in archive.distribution.series:
if series == distro_series:
found_series = True
- if found_series and series in keys_per_series:
- return keys_per_series[series]
+ if found_series and series.id in keys_per_series:
+ return keys_per_series[series.id]
# If no specific key for distro_series was found, returns
# the keys for the archive itself (or None if no key is
# available for the archive either).
return keys_per_series.get(None)
@classmethod
+ def getSigningKey(cls, key_type, archive, distro_series,
+ exact_match=False):
+ archive_signing_key = cls.get(
+ key_type, archive, distro_series, exact_match=exact_match)
+ return (
+ None if archive_signing_key is None
+ else archive_signing_key.signing_key)
+
+ @classmethod
def generate(cls, key_type, description, archive,
earliest_distro_series=None):
signing_key = SigningKey.generate(key_type, description)
diff --git a/lib/lp/services/signing/tests/test_signingkey.py b/lib/lp/services/signing/tests/test_signingkey.py
index 81376b1..a7d7360 100644
--- a/lib/lp/services/signing/tests/test_signingkey.py
+++ b/lib/lp/services/signing/tests/test_signingkey.py
@@ -141,6 +141,22 @@ class TestArchiveSigningKey(TestCaseWithFactory):
client = removeSecurityProxy(getUtility(ISigningServiceClient))
self.addCleanup(client._cleanCaches)
+ def assertGet(self, expected_archive_signing_key,
+ key_type, archive, distro_series, exact_match=False):
+ # get and getSigningKey return the expected results.
+ arch_signing_key_set = getUtility(IArchiveSigningKeySet)
+ expected_signing_key = (
+ None if expected_archive_signing_key is None
+ else expected_archive_signing_key.signing_key)
+ self.assertEqual(
+ expected_archive_signing_key,
+ arch_signing_key_set.get(
+ key_type, archive, distro_series, exact_match=exact_match))
+ self.assertEqual(
+ expected_signing_key,
+ arch_signing_key_set.getSigningKey(
+ key_type, archive, distro_series, exact_match=exact_match))
+
@responses.activate
def test_generate_saves_correctly(self):
self.signing_service.addResponses(self)
@@ -280,9 +296,6 @@ class TestArchiveSigningKey(TestCaseWithFactory):
self.assertEqual(2, store.find(ArchiveSigningKey).count())
def test_get_signing_keys_without_distro_series_configured(self):
- UEFI = SigningKeyType.UEFI
- KMOD = SigningKeyType.KMOD
-
archive = self.factory.makeArchive()
distro_series = archive.distribution.series[0]
uefi_key = self.factory.makeSigningKey(
@@ -304,25 +317,16 @@ class TestArchiveSigningKey(TestCaseWithFactory):
archive, None, kmod_key)
# Should find the keys if we ask for the archive key
- self.assertEqual(
- arch_uefi_key.signing_key,
- arch_signing_key_set.getSigningKey(UEFI, archive, None))
- self.assertEqual(
- arch_kmod_key.signing_key,
- arch_signing_key_set.getSigningKey(KMOD, archive, None))
+ self.assertGet(arch_uefi_key, SigningKeyType.UEFI, archive, None)
+ self.assertGet(arch_kmod_key, SigningKeyType.KMOD, archive, None)
# Should find the key if we ask for archive + distro_series key
- self.assertEqual(
- arch_uefi_key.signing_key,
- arch_signing_key_set.getSigningKey(UEFI, archive, distro_series))
- self.assertEqual(
- arch_kmod_key.signing_key,
- arch_signing_key_set.getSigningKey(KMOD, archive, distro_series))
+ self.assertGet(
+ arch_uefi_key, SigningKeyType.UEFI, archive, distro_series)
+ self.assertGet(
+ arch_kmod_key, SigningKeyType.KMOD, archive, distro_series)
def test_get_signing_key_exact_match(self):
- UEFI = SigningKeyType.UEFI
- KMOD = SigningKeyType.KMOD
-
archive = self.factory.makeArchive()
distro_series1 = archive.distribution.series[0]
distro_series2 = archive.distribution.series[1]
@@ -342,38 +346,27 @@ class TestArchiveSigningKey(TestCaseWithFactory):
archive, None, kmod_key)
# Should get the UEFI key for distro_series1
- self.assertEqual(
- series1_uefi_key.signing_key,
- arch_signing_key_set.getSigningKey(
- UEFI, archive, distro_series1, exact_match=True)
- )
+ self.assertGet(
+ series1_uefi_key,
+ SigningKeyType.UEFI, archive, distro_series1, exact_match=True)
# Should get the archive's KMOD key.
- self.assertEqual(
- arch_kmod_key.signing_key,
- arch_signing_key_set.getSigningKey(
- KMOD, archive, None, exact_match=True)
- )
+ self.assertGet(
+ arch_kmod_key,
+ SigningKeyType.KMOD, archive, None, exact_match=True)
# distro_series1 has no KMOD key.
- self.assertEqual(
+ self.assertGet(
None,
- arch_signing_key_set.getSigningKey(
- KMOD, archive, distro_series1, exact_match=True)
- )
+ SigningKeyType.KMOD, archive, distro_series1, exact_match=True)
# distro_series2 has no key at all.
- self.assertEqual(
+ self.assertGet(
None,
- arch_signing_key_set.getSigningKey(
- KMOD, archive, distro_series2, exact_match=True)
- )
+ SigningKeyType.KMOD, archive, distro_series2, exact_match=True)
def test_get_signing_keys_with_distro_series_configured(self):
- UEFI = SigningKeyType.UEFI
- KMOD = SigningKeyType.KMOD
-
archive = self.factory.makeArchive()
series = archive.distribution.series
- uefi_key = self.factory.makeSigningKey(key_type=UEFI)
- kmod_key = self.factory.makeSigningKey(key_type=KMOD)
+ uefi_key = self.factory.makeSigningKey(key_type=SigningKeyType.UEFI)
+ kmod_key = self.factory.makeSigningKey(key_type=SigningKeyType.KMOD)
# Fill the database with keys from other archives to make sure we
# are filtering it out
@@ -395,34 +388,19 @@ class TestArchiveSigningKey(TestCaseWithFactory):
# If no distroseries is specified, it should give back no KMOD key,
# since we don't have a default
- self.assertEqual(
- arch_uefi_key.signing_key,
- arch_signing_key_set.getSigningKey(UEFI, archive, None))
- self.assertEqual(
- None,
- arch_signing_key_set.getSigningKey(KMOD, archive, None))
+ self.assertGet(arch_uefi_key, SigningKeyType.UEFI, archive, None)
+ self.assertGet(None, SigningKeyType.KMOD, archive, None)
# For the most recent series, use the KMOD key we've set for the
# previous one
- self.assertEqual(
- arch_uefi_key.signing_key,
- arch_signing_key_set.getSigningKey(UEFI, archive, series[0]))
- self.assertEqual(
- arch_kmod_key.signing_key,
- arch_signing_key_set.getSigningKey(KMOD, archive, series[0]))
+ self.assertGet(arch_uefi_key, SigningKeyType.UEFI, archive, series[0])
+ self.assertGet(arch_kmod_key, SigningKeyType.KMOD, archive, series[0])
# For the previous series, we have a KMOD key configured
- self.assertEqual(
- arch_uefi_key.signing_key,
- arch_signing_key_set.getSigningKey(UEFI, archive, series[1]))
- self.assertEqual(
- arch_kmod_key.signing_key,
- arch_signing_key_set.getSigningKey(KMOD, archive, series[1]))
+ self.assertGet(arch_uefi_key, SigningKeyType.UEFI, archive, series[1])
+ self.assertGet(arch_kmod_key, SigningKeyType.KMOD, archive, series[1])
# For the old series, we have an old KMOD key configured
- self.assertEqual(
- arch_uefi_key.signing_key,
- arch_signing_key_set.getSigningKey(UEFI, archive, series[2]))
- self.assertEqual(
- old_arch_kmod_key.signing_key,
- arch_signing_key_set.getSigningKey(KMOD, archive, series[2]))
+ self.assertGet(arch_uefi_key, SigningKeyType.UEFI, archive, series[2])
+ self.assertGet(
+ old_arch_kmod_key, SigningKeyType.KMOD, archive, series[2])
diff --git a/scripts/sync-signingkeys.py b/scripts/sync-signingkeys.py
index 4cdb5cb..84310b0 100755
--- a/scripts/sync-signingkeys.py
+++ b/scripts/sync-signingkeys.py
@@ -3,9 +3,14 @@
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Script to inject archive keys into signing service."""
+
import _pythonpath
+
from lp.archivepublisher.scripts.sync_signingkeys import SyncSigningKeysScript
+from lp.services.config import config
+
if __name__ == '__main__':
- script = SyncSigningKeysScript('sync-signingkeys')
+ script = SyncSigningKeysScript(
+ 'sync-signingkeys', dbuser=config.archivepublisher.dbuser)
script.lock_and_run()