launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #24756
[Merge] ~pappacena/launchpad:fix-signingkey-duplication-on-inject into launchpad:master
Thiago F. Pappacena has proposed merging ~pappacena/launchpad:fix-signingkey-duplication-on-inject into launchpad:master.
Commit message:
Fixing duplicate key error when injecting the same signing key for different archives.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~pappacena/launchpad/+git/launchpad/+merge/384448
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/launchpad:fix-signingkey-duplication-on-inject into launchpad:master.
diff --git a/lib/lp/services/signing/model/signingkey.py b/lib/lp/services/signing/model/signingkey.py
index 0479b61..5637f82 100644
--- a/lib/lp/services/signing/model/signingkey.py
+++ b/lib/lp/services/signing/model/signingkey.py
@@ -106,13 +106,21 @@ class SigningKey(StormBase):
signing_service = getUtility(ISigningServiceClient)
generated_key = signing_service.inject(
key_type, private_key, public_key, description, created_at)
- signing_key = SigningKey(
- key_type=key_type, fingerprint=generated_key['fingerprint'],
- public_key=bytes(public_key),
- description=description, date_created=created_at)
+ fingerprint = generated_key['fingerprint']
+
store = IMasterStore(SigningKey)
- store.add(signing_key)
- return signing_key
+ # Check if the key if already saved in the database
+ db_key = store.find(
+ SigningKey,
+ SigningKey.key_type == key_type,
+ SigningKey.fingerprint == fingerprint).one()
+ if db_key is None:
+ db_key = SigningKey(
+ key_type=key_type, fingerprint=fingerprint,
+ public_key=bytes(public_key),
+ description=description, date_created=created_at)
+ store.add(db_key)
+ return db_key
def sign(self, message, message_name):
if self.key_type in (SigningKeyType.UEFI, SigningKeyType.FIT):
diff --git a/lib/lp/services/signing/tests/test_signingkey.py b/lib/lp/services/signing/tests/test_signingkey.py
index 8613dd7..a5fe2b3 100644
--- a/lib/lp/services/signing/tests/test_signingkey.py
+++ b/lib/lp/services/signing/tests/test_signingkey.py
@@ -91,6 +91,31 @@ class TestSigningKey(TestCaseWithFactory, TestWithFixtures):
self.assertEqual(created_at, db_key.date_created)
@responses.activate
+ def test_inject_signing_key_dont_duplicate_same_fingerprint(self):
+ self.signing_service.addResponses(self)
+
+ priv_key = PrivateKey.generate()
+ pub_key = priv_key.public_key
+ created_at = datetime(2020, 4, 16, 16, 35).replace(tzinfo=utc)
+
+ key = SigningKey.inject(
+ SigningKeyType.KMOD, bytes(priv_key), bytes(pub_key),
+ u"This is a test key", created_at)
+ self.assertIsInstance(key, SigningKey)
+
+ store = IMasterStore(SigningKey)
+ store.flush()
+
+ # This should give back the same key
+ new_key = SigningKey.inject(
+ SigningKeyType.KMOD, bytes(priv_key), bytes(pub_key),
+ u"This is a test key with another description", created_at)
+ store.flush()
+
+ self.assertEqual(key.id, new_key.id)
+ self.assertEqual(5, len(responses.calls))
+
+ @responses.activate
def test_sign_some_data(self):
self.signing_service.addResponses(self)
@@ -175,6 +200,58 @@ class TestArchiveSigningKey(TestCaseWithFactory):
fingerprint=self.signing_service.generated_fingerprint,
public_key=bytes(pub_key)))
+ @responses.activate
+ def test_inject_same_key_for_different_archives(self):
+ self.signing_service.addResponses(self)
+
+ archive = self.factory.makeArchive()
+ distro_series = archive.distribution.series[0]
+
+ priv_key = PrivateKey.generate()
+ pub_key = priv_key.public_key
+
+ now = datetime.now().replace(tzinfo=utc)
+ arch_key = getUtility(IArchiveSigningKeySet).inject(
+ SigningKeyType.UEFI, bytes(priv_key), bytes(pub_key),
+ u"Some description", now, archive,
+ earliest_distro_series=distro_series)
+
+ store = Store.of(arch_key)
+ store.invalidate()
+
+ rs = store.find(ArchiveSigningKey)
+ self.assertEqual(1, rs.count())
+
+ db_arch_key = rs.one()
+ self.assertThat(db_arch_key, MatchesStructure.byEquality(
+ key_type=SigningKeyType.UEFI, archive=archive,
+ earliest_distro_series=distro_series))
+
+ self.assertThat(db_arch_key.signing_key, MatchesStructure.byEquality(
+ key_type=SigningKeyType.UEFI, description=u"Some description",
+ fingerprint=self.signing_service.generated_fingerprint,
+ public_key=bytes(pub_key)))
+
+ # Inject the same key for a new archive, and check that the
+ # corresponding SigningKey object is the same.
+ another_archive = self.factory.makeArchive()
+
+ another_arch_key = getUtility(IArchiveSigningKeySet).inject(
+ SigningKeyType.UEFI, bytes(priv_key), bytes(pub_key),
+ u"Another description", now, another_archive)
+ store.invalidate()
+
+ rs = store.find(ArchiveSigningKey)
+ self.assertEqual(2, rs.count())
+
+ another_db_arch_key = rs.order_by('id').last()
+ self.assertNotEqual(another_arch_key.id, arch_key.id)
+ self.assertEqual(another_arch_key.signing_key, arch_key.signing_key)
+
+ self.assertThat(another_db_arch_key, MatchesStructure.byEquality(
+ key_type=SigningKeyType.UEFI, archive=another_archive,
+ earliest_distro_series=None))
+
def test_create(self):
archive = self.factory.makeArchive()
distro_series = archive.distribution.series[0]