← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~pappacena/launchpad:fix-signingkey-duplication-on-inject into launchpad:master

 

Thiago F. Pappacena has proposed merging ~pappacena/launchpad:fix-signingkey-duplication-on-inject into launchpad:master.

Commit message:
Fixing duplicate key error when injecting the same signing key for different archives.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~pappacena/launchpad/+git/launchpad/+merge/384448
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/launchpad:fix-signingkey-duplication-on-inject into launchpad:master.
diff --git a/lib/lp/services/signing/model/signingkey.py b/lib/lp/services/signing/model/signingkey.py
index 0479b61..5637f82 100644
--- a/lib/lp/services/signing/model/signingkey.py
+++ b/lib/lp/services/signing/model/signingkey.py
@@ -106,13 +106,21 @@ class SigningKey(StormBase):
         signing_service = getUtility(ISigningServiceClient)
         generated_key = signing_service.inject(
             key_type, private_key, public_key, description, created_at)
-        signing_key = SigningKey(
-            key_type=key_type, fingerprint=generated_key['fingerprint'],
-            public_key=bytes(public_key),
-            description=description, date_created=created_at)
+        fingerprint = generated_key['fingerprint']
+
         store = IMasterStore(SigningKey)
-        store.add(signing_key)
-        return signing_key
+        # Check if the key if already saved in the database
+        db_key = store.find(
+            SigningKey,
+            SigningKey.key_type == key_type,
+            SigningKey.fingerprint == fingerprint).one()
+        if db_key is None:
+            db_key = SigningKey(
+                key_type=key_type, fingerprint=fingerprint,
+                public_key=bytes(public_key),
+                description=description, date_created=created_at)
+            store.add(db_key)
+        return db_key
 
     def sign(self, message, message_name):
         if self.key_type in (SigningKeyType.UEFI, SigningKeyType.FIT):
diff --git a/lib/lp/services/signing/tests/test_signingkey.py b/lib/lp/services/signing/tests/test_signingkey.py
index 8613dd7..a5fe2b3 100644
--- a/lib/lp/services/signing/tests/test_signingkey.py
+++ b/lib/lp/services/signing/tests/test_signingkey.py
@@ -91,6 +91,31 @@ class TestSigningKey(TestCaseWithFactory, TestWithFixtures):
         self.assertEqual(created_at, db_key.date_created)
 
     @responses.activate
+    def test_inject_signing_key_dont_duplicate_same_fingerprint(self):
+        self.signing_service.addResponses(self)
+
+        priv_key = PrivateKey.generate()
+        pub_key = priv_key.public_key
+        created_at = datetime(2020, 4, 16, 16, 35).replace(tzinfo=utc)
+
+        key = SigningKey.inject(
+            SigningKeyType.KMOD, bytes(priv_key), bytes(pub_key),
+            u"This is a test key", created_at)
+        self.assertIsInstance(key, SigningKey)
+
+        store = IMasterStore(SigningKey)
+        store.flush()
+
+        # This should give back the same key
+        new_key = SigningKey.inject(
+            SigningKeyType.KMOD, bytes(priv_key), bytes(pub_key),
+            u"This is a test key with another description", created_at)
+        store.flush()
+
+        self.assertEqual(key.id, new_key.id)
+        self.assertEqual(5, len(responses.calls))
+
+    @responses.activate
     def test_sign_some_data(self):
         self.signing_service.addResponses(self)
 
@@ -175,6 +200,58 @@ class TestArchiveSigningKey(TestCaseWithFactory):
             fingerprint=self.signing_service.generated_fingerprint,
             public_key=bytes(pub_key)))
 
+    @responses.activate
+    def test_inject_same_key_for_different_archives(self):
+        self.signing_service.addResponses(self)
+
+        archive = self.factory.makeArchive()
+        distro_series = archive.distribution.series[0]
+
+        priv_key = PrivateKey.generate()
+        pub_key = priv_key.public_key
+
+        now = datetime.now().replace(tzinfo=utc)
+        arch_key = getUtility(IArchiveSigningKeySet).inject(
+            SigningKeyType.UEFI, bytes(priv_key), bytes(pub_key),
+            u"Some description", now, archive,
+            earliest_distro_series=distro_series)
+
+        store = Store.of(arch_key)
+        store.invalidate()
+
+        rs = store.find(ArchiveSigningKey)
+        self.assertEqual(1, rs.count())
+
+        db_arch_key = rs.one()
+        self.assertThat(db_arch_key, MatchesStructure.byEquality(
+            key_type=SigningKeyType.UEFI, archive=archive,
+            earliest_distro_series=distro_series))
+
+        self.assertThat(db_arch_key.signing_key, MatchesStructure.byEquality(
+            key_type=SigningKeyType.UEFI, description=u"Some description",
+            fingerprint=self.signing_service.generated_fingerprint,
+            public_key=bytes(pub_key)))
+
+        # Inject the same key for a new archive, and check that the
+        # corresponding SigningKey object is the same.
+        another_archive = self.factory.makeArchive()
+
+        another_arch_key = getUtility(IArchiveSigningKeySet).inject(
+            SigningKeyType.UEFI, bytes(priv_key), bytes(pub_key),
+            u"Another description", now, another_archive)
+        store.invalidate()
+
+        rs = store.find(ArchiveSigningKey)
+        self.assertEqual(2, rs.count())
+
+        another_db_arch_key = rs.order_by('id').last()
+        self.assertNotEqual(another_arch_key.id, arch_key.id)
+        self.assertEqual(another_arch_key.signing_key, arch_key.signing_key)
+
+        self.assertThat(another_db_arch_key, MatchesStructure.byEquality(
+            key_type=SigningKeyType.UEFI, archive=another_archive,
+            earliest_distro_series=None))
+
     def test_create(self):
         archive = self.factory.makeArchive()
         distro_series = archive.distribution.series[0]