← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:signing-client-openpgp into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:signing-client-openpgp into launchpad:master with ~cjwatson/launchpad:signing-client-payload-tests as a prerequisite.

Commit message:
Add OpenPGP support to signing service client

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/389007
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:signing-client-openpgp into launchpad:master.
diff --git a/lib/lp/services/signing/enums.py b/lib/lp/services/signing/enums.py
index fdffe5a..ef3b612 100644
--- a/lib/lp/services/signing/enums.py
+++ b/lib/lp/services/signing/enums.py
@@ -1,12 +1,12 @@
 # Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
-"""Enums for signing keys management
-"""
+"""Enums for signing keys management."""
 
 __metaclass__ = type
 
 __all__ = [
+    'OpenPGPKeyAlgorithm',
     'SigningKeyType',
     'SigningMode',
     ]
@@ -56,6 +56,21 @@ class SigningKeyType(DBEnumeratedType):
         A signing key for U-Boot Flat Image Tree images.
         """)
 
+    OPENPGP = DBItem(6, """
+        OpenPGP
+
+        An OpenPGP signing key.
+        """)
+
+
+class OpenPGPKeyAlgorithm(EnumeratedType):
+
+    RSA = Item("""
+        RSA
+
+        A Rivest-Shamir-Adleman key.
+        """)
+
 
 class SigningMode(EnumeratedType):
     """Archive file signing mode."""
diff --git a/lib/lp/services/signing/interfaces/signingkey.py b/lib/lp/services/signing/interfaces/signingkey.py
index 77814b8..1708ba2 100644
--- a/lib/lp/services/signing/interfaces/signingkey.py
+++ b/lib/lp/services/signing/interfaces/signingkey.py
@@ -59,12 +59,17 @@ class ISigningKeySet(Interface):
     """Interface to deal with the collection of signing keys
     """
 
-    def generate(key_type, description):
+    def generate(key_type, description,
+                 openpgp_key_algorithm=None, length=None):
         """Generates a new signing key on lp-signing and stores it in LP's
         database.
 
         :param key_type: One of the SigningKeyType enum's value
         :param description: The description associated with this key
+        :param openpgp_key_algorithm: One of `OpenPGPKeyAlgorithm` (required
+            if key_type is SigningKeyType.OPENPGP)
+        :param length: The key length (required if key_type is
+            SigningKeyType.OPENPGP)
         :return: The SigningKey object associated with the newly created
                  key at lp-signing
         """
diff --git a/lib/lp/services/signing/interfaces/signingserviceclient.py b/lib/lp/services/signing/interfaces/signingserviceclient.py
index 1ad7a1f..26dd69a 100644
--- a/lib/lp/services/signing/interfaces/signingserviceclient.py
+++ b/lib/lp/services/signing/interfaces/signingserviceclient.py
@@ -25,11 +25,16 @@ class ISigningServiceClient(Interface):
         """Get nonce, to be used when sending messages.
         """
 
-    def generate(key_type, description):
+    def generate(key_type, description,
+                 openpgp_key_algorithm=None, length=None):
         """Generate a key to be used when signing.
 
         :param key_type: One of available key types at SigningKeyType
         :param description: String description of the generated key
+        :param openpgp_key_algorithm: One of `OpenPGPKeyAlgorithm` (required
+            if key_type is SigningKeyType.OPENPGP)
+        :param length: The key length (required if key_type is
+            SigningKeyType.OPENPGP)
         :return: A dict with 'fingerprint' (str) and 'public-key' (bytes)
         """
 
diff --git a/lib/lp/services/signing/model/signingkey.py b/lib/lp/services/signing/model/signingkey.py
index 0c754f9..4ecbed9 100644
--- a/lib/lp/services/signing/model/signingkey.py
+++ b/lib/lp/services/signing/model/signingkey.py
@@ -87,9 +87,12 @@ class SigningKey(StormBase):
         self.date_created = date_created
 
     @classmethod
-    def generate(cls, key_type, description):
+    def generate(cls, key_type, description,
+                 openpgp_key_algorithm=None, length=None):
         signing_service = getUtility(ISigningServiceClient)
-        generated_key = signing_service.generate(key_type, description)
+        generated_key = signing_service.generate(
+            key_type, description,
+            openpgp_key_algorithm=openpgp_key_algorithm, length=length)
         signing_key = SigningKey(
             key_type=key_type, fingerprint=generated_key['fingerprint'],
             public_key=generated_key['public-key'],
diff --git a/lib/lp/services/signing/proxy.py b/lib/lp/services/signing/proxy.py
index 3147b5a..932a483 100644
--- a/lib/lp/services/signing/proxy.py
+++ b/lib/lp/services/signing/proxy.py
@@ -144,20 +144,33 @@ class SigningServiceClient:
         encrypted_message = box.encrypt(message, nonce, encoder=Base64Encoder)
         return encrypted_message.ciphertext
 
-    def generate(self, key_type, description):
+    def generate(self, key_type, description,
+                 openpgp_key_algorithm=None, length=None):
         if key_type not in SigningKeyType.items:
             raise ValueError("%s is not a valid key type" % key_type)
+        if key_type == SigningKeyType.OPENPGP:
+            if openpgp_key_algorithm is None:
+                raise ValueError(
+                    "SigningKeyType.OPENPGP requires openpgp_key_algorithm")
+            if length is None:
+                raise ValueError("SigningKeyType.OPENPGP requires length")
 
         nonce = self.getNonce()
         response_nonce = self._makeResponseNonce()
-        data = json.dumps({
+        raw_payload = {
             "key-type": key_type.name,
             "description": description,
-            }).encode("UTF-8")
+            }
+        if key_type == SigningKeyType.OPENPGP:
+            raw_payload.update({
+                "openpgp-key-algorithm": openpgp_key_algorithm.name,
+                "length": length,
+                })
+        payload = json.dumps(raw_payload).encode("UTF-8")
         ret = self._requestJson(
             "/generate", "POST",
             headers=self._getAuthHeaders(nonce, response_nonce),
-            data=self._encryptPayload(nonce, data))
+            data=self._encryptPayload(nonce, payload))
         return {
             "fingerprint": ret["fingerprint"],
             "public-key": base64.b64decode(ret["public-key"])}
diff --git a/lib/lp/services/signing/tests/test_proxy.py b/lib/lp/services/signing/tests/test_proxy.py
index 2e1e0df..d9c760d 100644
--- a/lib/lp/services/signing/tests/test_proxy.py
+++ b/lib/lp/services/signing/tests/test_proxy.py
@@ -30,6 +30,7 @@ from zope.security.proxy import removeSecurityProxy
 
 from lp.services.config import config
 from lp.services.signing.enums import (
+    OpenPGPKeyAlgorithm,
     SigningKeyType,
     SigningMode,
     )
@@ -317,6 +318,78 @@ class SigningServiceProxyTest(TestCaseWithFactory, TestWithFixtures):
             ])
 
     @responses.activate
+    def test_generate_key_openpgp_missing_algorithm(self):
+        self.response_factory.addResponses(self)
+
+        signing = getUtility(ISigningServiceClient)
+        self.assertRaises(
+            ValueError, signing.generate,
+            SigningKeyType.OPENPGP, "Missing OpenPGP algorithm")
+        self.assertEqual(0, len(responses.calls))
+
+    @responses.activate
+    def test_generate_key_openpgp(self):
+        self.response_factory.addResponses(self)
+        # Generate the key, and checks if we got back the correct dict.
+        signing = getUtility(ISigningServiceClient)
+        generated = signing.generate(
+            SigningKeyType.OPENPGP, "my lp test key",
+            openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA, length=4096)
+
+        self.assertEqual(generated, {
+            'public-key': bytes(self.response_factory.generated_public_key),
+            'fingerprint': self.response_factory.generated_fingerprint,
+            })
+
+        self.assertEqual(3, len(responses.calls))
+
+        # expected order of HTTP calls
+        http_nonce, http_service_key, http_generate = responses.calls
+
+        self.assertEqual("POST", http_nonce.request.method)
+        self.assertEqual(
+            self.response_factory.getUrl("/nonce"), http_nonce.request.url)
+
+        self.assertEqual("GET", http_service_key.request.method)
+        self.assertEqual(
+            self.response_factory.getUrl("/service-key"),
+            http_service_key.request.url)
+
+        self.assertEqual("POST", http_generate.request.method)
+        self.assertEqual(
+            self.response_factory.getUrl("/generate"),
+            http_generate.request.url)
+        self.assertThat(http_generate.request.headers, ContainsDict({
+            "Content-Type": Equals("application/x-boxed-json"),
+            "X-Client-Public-Key": Equals(config.signing.client_public_key),
+            "X-Nonce": Equals(self.response_factory.b64_nonce),
+            "X-Response-Nonce": Equals(
+                self.response_factory.b64_response_nonce),
+            }))
+        self.assertThat(http_generate.request.body, AfterPreprocessing(
+            self.response_factory._decryptPayload,
+            MatchesDict({
+                "key-type": Equals("OPENPGP"),
+                "description": Equals("my lp test key"),
+                "openpgp-key-algorithm": Equals("RSA"),
+                "length": Equals(4096),
+                })))
+
+        self.assertTimeline([
+            ("POST", "/nonce", {}),
+            ("GET", "/service-key", {}),
+            ("POST", "/generate", {
+                "headers": {
+                    "Content-Type": "application/x-boxed-json",
+                    "X-Client-Public-Key": config.signing.client_public_key,
+                    "X-Nonce": self.response_factory.b64_nonce,
+                    "X-Response-Nonce":
+                        self.response_factory.b64_response_nonce,
+                    },
+                }),
+            ])
+
+    @responses.activate
     def test_sign_invalid_mode(self):
         signing = getUtility(ISigningServiceClient)
         self.assertRaises(