← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~lgp171188/launchpad:signing-service-proxy-typo-fix into launchpad:master

 

Guruprasad has proposed merging ~lgp171188/launchpad:signing-service-proxy-typo-fix into launchpad:master.

Commit message:
Fix a typo in SigningServiceClient.sign

Also add tests for the SigningServiceClient to verify
the recent changes to this method.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~lgp171188/launchpad/+git/launchpad/+merge/464957
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~lgp171188/launchpad:signing-service-proxy-typo-fix into launchpad:master.
diff --git a/lib/lp/services/signing/proxy.py b/lib/lp/services/signing/proxy.py
index 43d01be..d70653d 100644
--- a/lib/lp/services/signing/proxy.py
+++ b/lib/lp/services/signing/proxy.py
@@ -186,6 +186,11 @@ class SigningServiceClient:
             raise ValueError("%s is not a valid mode" % mode)
         if key_type not in SigningKeyType.items:
             raise ValueError("%s is not a valid key type" % key_type)
+        if not isinstance(fingerprints, list):
+            raise ValueError(
+                "Expected a list of fingerprints but got "
+                f"{type(fingerprints)} instead"
+            )
         if not fingerprints:
             raise ValueError("Not even one fingerprint was provided")
         if len(fingerprints) > 1 and key_type != SigningKeyType.OPENPGP:
@@ -207,13 +212,12 @@ class SigningServiceClient:
             "message": base64.b64encode(message).decode("UTF-8"),
             "mode": mode.name,
         }
-
         ret = self._requestJson("/sign", "POST", encrypt=True, json=payload)
         if isinstance(ret["public-key"], str):
             public_key = base64.b64decode(ret["public-key"].encode("UTF-8"))
         else:  # is a list of public key strings
             public_key = [
-                base64.b64decode(x).encode("UTF-8") for x in ret["public-key"]
+                base64.b64decode(x.encode("UTF-8")) for x in ret["public-key"]
             ]
         return {
             "public-key": public_key,
diff --git a/lib/lp/services/signing/tests/test_proxy.py b/lib/lp/services/signing/tests/test_proxy.py
index c639f7e..6c8e97a 100644
--- a/lib/lp/services/signing/tests/test_proxy.py
+++ b/lib/lp/services/signing/tests/test_proxy.py
@@ -19,6 +19,7 @@ from testtools.matchers import (
     MatchesListwise,
     MatchesStructure,
 )
+from testtools.testcase import ExpectedException
 from zope.component import getUtility
 from zope.security.proxy import removeSecurityProxy
 
@@ -69,6 +70,10 @@ class SigningServiceResponseFactory:
         self.b64_generated_public_key = base64.b64encode(
             bytes(self.generated_public_key)
         ).decode("UTF-8")
+        self.generated_public_key_2 = PrivateKey.generate().public_key
+        self.b64_generated_public_key_2 = base64.b64encode(
+            bytes(self.generated_public_key_2)
+        ).decode("UTF-8")
         self.generated_fingerprint = "338D218488DFD597D8FCB9C328C3E9D9ADA16CEE"
         self.fingerprint_generator = fingerprint_generator
 
@@ -194,11 +199,24 @@ class SigningServiceResponseFactory:
             call_counts["/sign"] += 1
             signed_msg = self.signed_msg_template % call_counts["/sign"]
             signed = base64.b64encode(signed_msg)
-            data = {
+            payload = self._decryptPayload(request.body)
+
+            if isinstance(payload["fingerprint"], list):
+                public_key = [
+                    self.b64_generated_public_key,
+                    self.b64_generated_public_key_2,
+                ]
+            else:
+                public_key = self.b64_generated_public_key
+            response_payload = {
                 "signed-message": signed.decode("utf8"),
-                "public-key": self.b64_generated_public_key,
+                "public-key": public_key,
             }
-            return 201, {}, self._encryptPayload(data, self.response_nonce)
+            return (
+                201,
+                {},
+                self._encryptPayload(response_payload, self.response_nonce),
+            )
 
         responses.add_callback(
             responses.POST, self.getUrl("/sign"), callback=sign_callback
@@ -508,7 +526,7 @@ class SigningServiceProxyTest(TestCaseWithFactory, TestWithFixtures):
             ValueError,
             signing.sign,
             SigningKeyType.UEFI,
-            "fingerprint",
+            ["fingerprint"],
             "message_name",
             "message",
             "NO-MODE",
@@ -522,7 +540,7 @@ class SigningServiceProxyTest(TestCaseWithFactory, TestWithFixtures):
             ValueError,
             signing.sign,
             "shrug",
-            "fingerprint",
+            ["fingerprint"],
             "message_name",
             "message",
             SigningMode.ATTACHED,
@@ -530,6 +548,70 @@ class SigningServiceProxyTest(TestCaseWithFactory, TestWithFixtures):
         self.assertEqual(0, len(responses.calls))
 
     @responses.activate
+    def test_sign_fingerprints_string_instead_of_list(self):
+        signing = getUtility(ISigningServiceClient)
+        with ExpectedException(
+            ValueError, "Expected a list of fingerprints but got"
+        ):
+            signing.sign(
+                SigningKeyType.UEFI,
+                "fingerprint",
+                "message_name",
+                "message",
+                SigningMode.ATTACHED,
+            )
+        self.assertEqual(0, len(responses.calls))
+
+    @responses.activate
+    def test_sign_empty_list_of_fingerprints(self):
+        signing = getUtility(ISigningServiceClient)
+        with ExpectedException(
+            ValueError, "Not even one fingerprint was provided"
+        ):
+            signing.sign(
+                SigningKeyType.OPENPGP,
+                [],
+                "message_name",
+                "message",
+                SigningMode.ATTACHED,
+            )
+        self.assertRaises(
+            ValueError,
+            signing.sign,
+            SigningKeyType.OPENPGP,
+            [],
+            "message_name",
+            "message",
+            SigningMode.CLEAR,
+        )
+        self.assertEqual(0, len(responses.calls))
+
+    @responses.activate
+    def test_sign_multiple_fingerprints_non_openpgp(self):
+        signing = getUtility(ISigningServiceClient)
+        other_key_types = (
+            SigningKeyType.ANDROID_KERNEL,
+            SigningKeyType.CV2_KERNEL,
+            SigningKeyType.FIT,
+            SigningKeyType.KMOD,
+            SigningKeyType.OPAL,
+            SigningKeyType.SIPL,
+            SigningKeyType.UEFI,
+        )
+        for key_type in other_key_types:
+            with ExpectedException(
+                ValueError,
+                "Multi-signing is not supported for non-OpenPGP keys",
+            ):
+                signing.sign(
+                    key_type,
+                    ["fingerprint1", "fingerprint2"],
+                    "message_name",
+                    "message",
+                    SigningMode.ATTACHED,
+                )
+
+    @responses.activate
     def test_sign(self):
         """Runs through SignService.sign() flow"""
         # Replace GET /service-key response by our mock.
@@ -634,6 +716,115 @@ class SigningServiceProxyTest(TestCaseWithFactory, TestWithFixtures):
         )
 
     @responses.activate
+    def test_multi_sign_openpgp(self):
+        response_factory = self.response_factory
+        response_factory.addResponses(self)
+        fingerprints = [
+            self.factory.getUniqueHexString(40).upper() for _ in range(2)
+        ]
+
+        key_type = SigningKeyType.OPENPGP
+        mode = SigningMode.DETACHED
+        message_name = "my test msg"
+        message = b"this is the message content"
+
+        signing = getUtility(ISigningServiceClient)
+        data = signing.sign(
+            key_type, fingerprints, message_name, message, mode
+        )
+        self.assertEqual(3, len(responses.calls))
+        # expected order of HTTP calls
+        http_nonce, http_service_key, http_sign = responses.calls
+
+        self.assertEqual("POST", http_nonce.request.method)
+        self.assertEqual(
+            self.response_factory.getUrl("/nonce"), http_nonce.request.url
+        )
+
+        self.assertEqual("GET", http_service_key.request.method)
+        self.assertEqual(
+            self.response_factory.getUrl("/service-key"),
+            http_service_key.request.url,
+        )
+
+        self.assertEqual("POST", http_sign.request.method)
+        self.assertEqual(
+            self.response_factory.getUrl("/sign"), http_sign.request.url
+        )
+        self.assertThat(
+            http_sign.request.headers,
+            ContainsDict(
+                {
+                    "Content-Type": Equals("application/x-boxed-json"),
+                    "X-Client-Public-Key": Equals(
+                        config.signing.client_public_key
+                    ),
+                    "X-Nonce": Equals(self.response_factory.b64_nonce),
+                    "X-Response-Nonce": Equals(
+                        self.response_factory.b64_response_nonce
+                    ),
+                }
+            ),
+        )
+        self.assertThat(
+            http_sign.request.body,
+            AfterPreprocessing(
+                self.response_factory._decryptPayload,
+                MatchesDict(
+                    {
+                        "key-type": Equals("OPENPGP"),
+                        "fingerprint": Equals(fingerprints),
+                        "message-name": Equals(message_name),
+                        "message": Equals(
+                            base64.b64encode(message).decode("UTF-8")
+                        ),
+                        "mode": Equals("DETACHED"),
+                    }
+                ),
+            ),
+        )
+
+        self.assertTimeline(
+            [
+                ("POST", "/nonce", {}),
+                ("GET", "/service-key", {}),
+                (
+                    "POST",
+                    "/sign",
+                    {
+                        "headers": {
+                            "Content-Type": "application/x-boxed-json",
+                            "X-Client-Public-Key": (
+                                config.signing.client_public_key
+                            ),
+                            "X-Nonce": self.response_factory.b64_nonce,
+                            "X-Response-Nonce": (
+                                self.response_factory.b64_response_nonce
+                            ),
+                        },
+                    },
+                ),
+            ]
+        )
+
+        # It should have returned the correct JSON content, with signed
+        # message from the API and the public-key.
+        self.assertEqual(2, len(data))
+        public_key = removeSecurityProxy(data["public-key"])
+        self.assertTrue(isinstance(public_key, list))
+        self.assertEqual(2, len(public_key))
+        self.assertEqual(
+            self.response_factory.getAPISignedContent(), data["signed-message"]
+        )
+        self.assertEqual(
+            [
+                bytes(self.response_factory.generated_public_key),
+                bytes(self.response_factory.generated_public_key_2),
+            ],
+            public_key,
+        )
+
+    @responses.activate
     def test_inject_key(self):
         """Makes sure that the SigningService.inject method calls the
         correct endpoints, and actually injects key contents.