launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #31077
[Merge] ~lgp171188/launchpad:signing-service-proxy-typo-fix into launchpad:master
Guruprasad has proposed merging ~lgp171188/launchpad:signing-service-proxy-typo-fix into launchpad:master.
Commit message:
Fix a typo in SigningServiceClient.sign
Also add tests for the SigningServiceClient to verify
the recent changes to this method.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~lgp171188/launchpad/+git/launchpad/+merge/464957
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~lgp171188/launchpad:signing-service-proxy-typo-fix into launchpad:master.
diff --git a/lib/lp/services/signing/proxy.py b/lib/lp/services/signing/proxy.py
index 43d01be..d70653d 100644
--- a/lib/lp/services/signing/proxy.py
+++ b/lib/lp/services/signing/proxy.py
@@ -186,6 +186,11 @@ class SigningServiceClient:
raise ValueError("%s is not a valid mode" % mode)
if key_type not in SigningKeyType.items:
raise ValueError("%s is not a valid key type" % key_type)
+ if not isinstance(fingerprints, list):
+ raise ValueError(
+ "Expected a list of fingerprints but got "
+ f"{type(fingerprints)} instead"
+ )
if not fingerprints:
raise ValueError("Not even one fingerprint was provided")
if len(fingerprints) > 1 and key_type != SigningKeyType.OPENPGP:
@@ -207,13 +212,12 @@ class SigningServiceClient:
"message": base64.b64encode(message).decode("UTF-8"),
"mode": mode.name,
}
-
ret = self._requestJson("/sign", "POST", encrypt=True, json=payload)
if isinstance(ret["public-key"], str):
public_key = base64.b64decode(ret["public-key"].encode("UTF-8"))
else: # is a list of public key strings
public_key = [
- base64.b64decode(x).encode("UTF-8") for x in ret["public-key"]
+ base64.b64decode(x.encode("UTF-8")) for x in ret["public-key"]
]
return {
"public-key": public_key,
diff --git a/lib/lp/services/signing/tests/test_proxy.py b/lib/lp/services/signing/tests/test_proxy.py
index c639f7e..6c8e97a 100644
--- a/lib/lp/services/signing/tests/test_proxy.py
+++ b/lib/lp/services/signing/tests/test_proxy.py
@@ -19,6 +19,7 @@ from testtools.matchers import (
MatchesListwise,
MatchesStructure,
)
+from testtools.testcase import ExpectedException
from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
@@ -69,6 +70,10 @@ class SigningServiceResponseFactory:
self.b64_generated_public_key = base64.b64encode(
bytes(self.generated_public_key)
).decode("UTF-8")
+ self.generated_public_key_2 = PrivateKey.generate().public_key
+ self.b64_generated_public_key_2 = base64.b64encode(
+ bytes(self.generated_public_key_2)
+ ).decode("UTF-8")
self.generated_fingerprint = "338D218488DFD597D8FCB9C328C3E9D9ADA16CEE"
self.fingerprint_generator = fingerprint_generator
@@ -194,11 +199,24 @@ class SigningServiceResponseFactory:
call_counts["/sign"] += 1
signed_msg = self.signed_msg_template % call_counts["/sign"]
signed = base64.b64encode(signed_msg)
- data = {
+ payload = self._decryptPayload(request.body)
+
+ if isinstance(payload["fingerprint"], list):
+ public_key = [
+ self.b64_generated_public_key,
+ self.b64_generated_public_key_2,
+ ]
+ else:
+ public_key = self.b64_generated_public_key
+ response_payload = {
"signed-message": signed.decode("utf8"),
- "public-key": self.b64_generated_public_key,
+ "public-key": public_key,
}
- return 201, {}, self._encryptPayload(data, self.response_nonce)
+ return (
+ 201,
+ {},
+ self._encryptPayload(response_payload, self.response_nonce),
+ )
responses.add_callback(
responses.POST, self.getUrl("/sign"), callback=sign_callback
@@ -508,7 +526,7 @@ class SigningServiceProxyTest(TestCaseWithFactory, TestWithFixtures):
ValueError,
signing.sign,
SigningKeyType.UEFI,
- "fingerprint",
+ ["fingerprint"],
"message_name",
"message",
"NO-MODE",
@@ -522,7 +540,7 @@ class SigningServiceProxyTest(TestCaseWithFactory, TestWithFixtures):
ValueError,
signing.sign,
"shrug",
- "fingerprint",
+ ["fingerprint"],
"message_name",
"message",
SigningMode.ATTACHED,
@@ -530,6 +548,70 @@ class SigningServiceProxyTest(TestCaseWithFactory, TestWithFixtures):
self.assertEqual(0, len(responses.calls))
@responses.activate
+ def test_sign_fingerprints_string_instead_of_list(self):
+ signing = getUtility(ISigningServiceClient)
+ with ExpectedException(
+ ValueError, "Expected a list of fingerprints but got"
+ ):
+ signing.sign(
+ SigningKeyType.UEFI,
+ "fingerprint",
+ "message_name",
+ "message",
+ SigningMode.ATTACHED,
+ )
+ self.assertEqual(0, len(responses.calls))
+
+ @responses.activate
+ def test_sign_empty_list_of_fingerprints(self):
+ signing = getUtility(ISigningServiceClient)
+ with ExpectedException(
+ ValueError, "Not even one fingerprint was provided"
+ ):
+ signing.sign(
+ SigningKeyType.OPENPGP,
+ [],
+ "message_name",
+ "message",
+ SigningMode.ATTACHED,
+ )
+ self.assertRaises(
+ ValueError,
+ signing.sign,
+ SigningKeyType.OPENPGP,
+ [],
+ "message_name",
+ "message",
+ SigningMode.CLEAR,
+ )
+ self.assertEqual(0, len(responses.calls))
+
+ @responses.activate
+ def test_sign_multiple_fingerprints_non_openpgp(self):
+ signing = getUtility(ISigningServiceClient)
+ other_key_types = (
+ SigningKeyType.ANDROID_KERNEL,
+ SigningKeyType.CV2_KERNEL,
+ SigningKeyType.FIT,
+ SigningKeyType.KMOD,
+ SigningKeyType.OPAL,
+ SigningKeyType.SIPL,
+ SigningKeyType.UEFI,
+ )
+ for key_type in other_key_types:
+ with ExpectedException(
+ ValueError,
+ "Multi-signing is not supported for non-OpenPGP keys",
+ ):
+ signing.sign(
+ key_type,
+ ["fingerprint1", "fingerprint2"],
+ "message_name",
+ "message",
+ SigningMode.ATTACHED,
+ )
+
+ @responses.activate
def test_sign(self):
"""Runs through SignService.sign() flow"""
# Replace GET /service-key response by our mock.
@@ -634,6 +716,115 @@ class SigningServiceProxyTest(TestCaseWithFactory, TestWithFixtures):
)
@responses.activate
+ def test_multi_sign_openpgp(self):
+ response_factory = self.response_factory
+ response_factory.addResponses(self)
+ fingerprints = [
+ self.factory.getUniqueHexString(40).upper() for _ in range(2)
+ ]
+
+ key_type = SigningKeyType.OPENPGP
+ mode = SigningMode.DETACHED
+ message_name = "my test msg"
+ message = b"this is the message content"
+
+ signing = getUtility(ISigningServiceClient)
+ data = signing.sign(
+ key_type, fingerprints, message_name, message, mode
+ )
+ self.assertEqual(3, len(responses.calls))
+ # expected order of HTTP calls
+ http_nonce, http_service_key, http_sign = responses.calls
+
+ self.assertEqual("POST", http_nonce.request.method)
+ self.assertEqual(
+ self.response_factory.getUrl("/nonce"), http_nonce.request.url
+ )
+
+ self.assertEqual("GET", http_service_key.request.method)
+ self.assertEqual(
+ self.response_factory.getUrl("/service-key"),
+ http_service_key.request.url,
+ )
+
+ self.assertEqual("POST", http_sign.request.method)
+ self.assertEqual(
+ self.response_factory.getUrl("/sign"), http_sign.request.url
+ )
+ self.assertThat(
+ http_sign.request.headers,
+ ContainsDict(
+ {
+ "Content-Type": Equals("application/x-boxed-json"),
+ "X-Client-Public-Key": Equals(
+ config.signing.client_public_key
+ ),
+ "X-Nonce": Equals(self.response_factory.b64_nonce),
+ "X-Response-Nonce": Equals(
+ self.response_factory.b64_response_nonce
+ ),
+ }
+ ),
+ )
+ self.assertThat(
+ http_sign.request.body,
+ AfterPreprocessing(
+ self.response_factory._decryptPayload,
+ MatchesDict(
+ {
+ "key-type": Equals("OPENPGP"),
+ "fingerprint": Equals(fingerprints),
+ "message-name": Equals(message_name),
+ "message": Equals(
+ base64.b64encode(message).decode("UTF-8")
+ ),
+ "mode": Equals("DETACHED"),
+ }
+ ),
+ ),
+ )
+
+ self.assertTimeline(
+ [
+ ("POST", "/nonce", {}),
+ ("GET", "/service-key", {}),
+ (
+ "POST",
+ "/sign",
+ {
+ "headers": {
+ "Content-Type": "application/x-boxed-json",
+ "X-Client-Public-Key": (
+ config.signing.client_public_key
+ ),
+ "X-Nonce": self.response_factory.b64_nonce,
+ "X-Response-Nonce": (
+ self.response_factory.b64_response_nonce
+ ),
+ },
+ },
+ ),
+ ]
+ )
+
+ # It should have returned the correct JSON content, with signed
+ # message from the API and the public-key.
+ self.assertEqual(2, len(data))
+ public_key = removeSecurityProxy(data["public-key"])
+ self.assertTrue(isinstance(public_key, list))
+ self.assertEqual(2, len(public_key))
+ self.assertEqual(
+ self.response_factory.getAPISignedContent(), data["signed-message"]
+ )
+ self.assertEqual(
+ [
+ bytes(self.response_factory.generated_public_key),
+ bytes(self.response_factory.generated_public_key_2),
+ ],
+ public_key,
+ )
+
+ @responses.activate
def test_inject_key(self):
"""Makes sure that the SigningService.inject method calls the
correct endpoints, and actually injects key contents.