launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #24540
[Merge] ~cjwatson/lp-signing:encrypt-responses into lp-signing:master
Colin Watson has proposed merging ~cjwatson/lp-signing:encrypt-responses into lp-signing:master.
Commit message:
Encrypt most responses
Requested reviews:
William Grant (wgrant): security
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/lp-signing/+git/lp-signing/+merge/381514
Since signing payloads may be confidential, encrypting responses to /generate, /sign, and /inject is probably a good idea, and it defends against man-in-the-middle attacks involving replaying previous responses.
The response to /service-key can't usefully be encrypted because the encryption involves the service key returned in the response, and there's no point in encrypting the response to /nonce because it isn't confidential and is already protected from replay attacks in other ways.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/lp-signing:encrypt-responses into lp-signing:master.
diff --git a/README.rst b/README.rst
index 1e40995..4d990f7 100644
--- a/README.rst
+++ b/README.rst
@@ -52,12 +52,16 @@ Sample code for generating and signing keys then looks something like this:
.. code-block:: python
+ import base64
+ import json
+
from nacl.encoding import Base64Encoder
from nacl.public import (
Box,
PrivateKey,
PublicKey,
)
+ from nacl.utils import random
import requests
# Replace the IP address here with your signing service's IP address.
@@ -69,7 +73,8 @@ Sample code for generating and signing keys then looks something like this:
response.json()["service-key"], encoder=Base64Encoder)
private_key = PrivateKey.generate()
- encoded_public_key = private_key.public_key.encode(encoder=Base64Encoder)
+ encoded_public_key = (
+ private_key.public_key.encode(encoder=Base64Encoder).decode("UTF-8"))
print(
f"Run 'env/bin/lp-signing register-client test-client "
f"{encoded_public_key}' to register a test client.")
@@ -79,25 +84,40 @@ Sample code for generating and signing keys then looks something like this:
response.raise_for_status()
return base64.b64decode(response.json()["nonce"].encode("UTF-8"))
+ def make_response_nonce():
+ return random(Box.NONCE_SIZE)
+
+ def decrypt_response_json(response, response_nonce):
+ box = Box(private_key, service_key)
+ return json.loads(box.decrypt(
+ response.content, response_nonce, encoder=Base64Encoder))
+
def generate(key_type, description):
nonce = get_nonce()
+ response_nonce = make_response_nonce()
data = json.dumps({
"key-type": key_type,
"description": description,
}).encode("UTF-8")
+ box = Box(private_key, service_key)
encrypted_message = box.encrypt(data, nonce, encoder=Base64Encoder)
- return requests.post(
- f"http://{base_url}/generate",
+ response = requests.post(
+ f"{base_url}/generate",
headers={
"Content-Type": "application/x-boxed-json",
"X-Client-Public-Key": private_key.public_key.encode(
encoder=Base64Encoder).decode("UTF-8"),
"X-Nonce": encrypted_message.nonce.decode("UTF-8"),
+ "X-Response-Nonce": (
+ Base64Encoder.encode(response_nonce).decode("UTF-8")),
},
data=encrypted_message.ciphertext)
+ response.raise_for_status()
+ return decrypt_response_json(response, response_nonce)
def sign(key_type, fingerprint, message_name, message, mode):
nonce = get_nonce()
+ response_nonce = make_response_nonce()
data = json.dumps({
"key-type": key_type,
"fingerprint": fingerprint,
@@ -105,24 +125,27 @@ Sample code for generating and signing keys then looks something like this:
"message": base64.b64encode(message).decode("UTF-8"),
"mode": mode,
}).encode("UTF-8")
+ box = Box(private_key, service_key)
encrypted_message = box.encrypt(data, nonce, encoder=Base64Encoder)
- return requests.post(
+ response = requests.post(
f"{base_url}/sign",
headers={
"Content-Type": "application/x-boxed-json",
"X-Client-Public-Key": private_key.public_key.encode(
encoder=Base64Encoder).decode("UTF-8"),
"X-Nonce": encrypted_message.nonce.decode("UTF-8"),
+ "X-Response-Nonce": (
+ Base64Encoder.encode(response_nonce).decode("UTF-8")),
},
data=encrypted_message.ciphertext)
+ response.raise_for_status()
+ return decrypt_response_json(response, response_nonce)
- response = generate("UEFI", "test")
- response.raise_for_status()
- fingerprint = response.json()["fingerprint"]
+ response_json = generate("UEFI", "test")
+ fingerprint = response_json["fingerprint"]
with open("/path/to/test/image", "rb") as f:
- response = sign(
+ response_json = sign(
"UEFI", fingerprint, "test-image", f.read(), "ATTACHED")
- response.raise_for_status()
with open("/path/to/test/image.signed", "wb") as f:
- f.write(base64.b64decode(response.json()["signed-message"]))
+ f.write(base64.b64decode(response_json["signed-message"]))
diff --git a/lp_signing/auth.py b/lp_signing/auth.py
index e78a46b..f73ce77 100644
--- a/lp_signing/auth.py
+++ b/lp_signing/auth.py
@@ -6,7 +6,11 @@
import base64
import json
-from flask import Request
+from flask import (
+ Request,
+ request,
+ Response,
+ )
from nacl.encoding import Base64Encoder
from nacl.public import (
Box,
@@ -59,6 +63,7 @@ class BoxedRequest(Request):
Content-Type: application/x-boxed-json
X-Client-Public-Key: <base64-encoded NaCl public key of client>
X-Nonce: <base64-encoded nonce as retrieved from /nonce endpoint>
+ X-Response-Nonce: <base64-encoded client-generated nonce>
<base64-encoded ciphertext>
@@ -71,6 +76,8 @@ class BoxedRequest(Request):
"""
client = None
+ private_key_index = 0
+ encrypted_response = False
@property
def is_json(self):
@@ -78,6 +85,28 @@ class BoxedRequest(Request):
super().is_json or
self.mimetype == "application/x-boxed-json") # pragma: no cover
+ def _get_client_public_key(self):
+ try:
+ header = self.headers["X-Client-Public-Key"]
+ except KeyError:
+ raise DataValidationError.single("Client public key not provided")
+ try:
+ return PublicKey(header.encode("UTF-8"), encoder=Base64Encoder)
+ except Exception:
+ raise DataValidationError.single("Cannot decode client public key")
+
+ def _get_nonce(self, header_name):
+ try:
+ header = self.headers[header_name]
+ except KeyError:
+ raise DataValidationError.single(
+ "{} header not provided".format(header_name))
+ try:
+ return base64.b64decode(header.encode("UTF-8"), validate=True)
+ except Exception:
+ raise InvalidNonce.single(
+ "Cannot decode {} header".format(header_name))
+
def _get_data_for_json(self, cache):
"""Read, authenticate, and decrypt incoming data.
@@ -86,7 +115,10 @@ class BoxedRequest(Request):
that should not yet be committed.
As a side-effect, this sets `self.client` to the `Client` with the
- given registered public key, which must exist.
+ given registered public key, which must exist, and
+ `self.private_key_index` to the index of the service private key
+ that was used in `boxed_authentication.service_private_keys` (for
+ later response encryption).
See `flask.wrappers.JSONMixin.get_json`, on which this is based.
@@ -95,27 +127,17 @@ class BoxedRequest(Request):
failed.
"""
data = self.get_data(cache)
- if ("X-Client-Public-Key" not in self.headers or
- "X-Nonce" not in self.headers):
- raise DataValidationError.single("Request not authenticated")
- try:
- client_public_key = PublicKey(
- self.headers["X-Client-Public-Key"].encode("UTF-8"),
- encoder=Base64Encoder)
- except Exception:
- raise DataValidationError.single("Cannot decode client public key")
- try:
- nonce = base64.b64decode(
- self.headers["X-Nonce"].encode("UTF-8"), validate=True)
- except Exception:
- raise InvalidNonce.single("Cannot decode nonce")
+ client_public_key = self._get_client_public_key()
+ nonce = self._get_nonce("X-Nonce")
Nonce.check(nonce)
- for private_key in boxed_authentication.service_private_keys:
+ for i, private_key in enumerate(
+ boxed_authentication.service_private_keys):
try:
box = Box(private_key, client_public_key)
data = box.decrypt(data, nonce, encoder=Base64Encoder)
except Exception:
continue
+ self.private_key_index = i
self.client = Client.getByPublicKey(client_public_key)
if self.client is None:
raise DataValidationError.single(
@@ -124,3 +146,46 @@ class BoxedRequest(Request):
else:
raise DataValidationError.single("Authentication failed")
return data
+
+
+class BoxedResponse(Response):
+ """A response object that can send encrypted and authenticated data.
+
+ The response is only encrypted/authenticated if
+ `request.encrypted_response` is true and the response Content-Type
+ indicates that it is JSON.
+
+ The nonce is sent by the client in the X-Response-Nonce header (see
+ `BoxedRequest`), and the client is expected to decrypt the response
+ using that same nonce. This is much simpler than the request nonce
+ arrangements since the client only needs to store the nonce for the
+ lifetime of the request.
+
+ Note that it is safe for the nonce to be sent in the clear: NaCl does
+ not consider nonces to be secret, and attempts to inject a different
+ nonce in order to perform a replay attack will fail since they won't
+ match the nonce that the client is expecting to use.
+ """
+
+ def encrypt(self):
+ """Encrypt and authenticate outgoing data if necessary.
+
+ The response data is encrypted in place.
+
+ Only do this if `encrypted_response` is true and the response
+ Content-Type indicates that it is JSON.
+ """
+ if not request.encrypted_response or not self.is_json:
+ return
+ client_public_key = request._get_client_public_key()
+ nonce = request._get_nonce("X-Response-Nonce")
+ private_key = boxed_authentication.service_private_keys[
+ request.private_key_index]
+ box = Box(private_key, client_public_key)
+ data = self.get_data()
+ try:
+ encrypted_message = box.encrypt(data, nonce, encoder=Base64Encoder)
+ except Exception:
+ raise DataValidationError.single("Response encryption failed")
+ self.set_data(encrypted_message.ciphertext)
+ self.mimetype = "application/x-boxed-json"
diff --git a/lp_signing/tests/test_webapi.py b/lp_signing/tests/test_webapi.py
index 0201740..8841d10 100644
--- a/lp_signing/tests/test_webapi.py
+++ b/lp_signing/tests/test_webapi.py
@@ -3,6 +3,7 @@
import base64
from datetime import datetime, timedelta
+import json
from pathlib import Path
import pytz
from tempfile import TemporaryDirectory
@@ -19,6 +20,7 @@ from nacl.public import (
PrivateKey,
PublicKey,
)
+from nacl.utils import random
from testtools import TestCase
from testtools.matchers import (
AfterPreprocessing,
@@ -135,17 +137,19 @@ class TestGenerateView(TestCase):
self.private_key = PrivateKey.generate()
self.client.registerPublicKey(self.private_key.public_key)
self.nonce = Nonce.generate().nonce
+ self.response_nonce = random(Box.NONCE_SIZE)
store.commit()
def post_generate(self, json_data=None, **kwargs):
kwargs.setdefault("private_key", self.private_key)
kwargs.setdefault("nonce", self.nonce)
+ kwargs.setdefault("response_nonce", self.response_nonce)
return self.fixture.client.post(
"/generate", json_data=json_data, **kwargs)
def test_unauthenticated(self):
resp = self.post_generate(private_key=None)
- self.assertThat(resp, HasAPIError("Request not authenticated"))
+ self.assertThat(resp, HasAPIError("Client public key not provided"))
def test_undecodable_client_public_key(self):
resp = self.post_generate(
@@ -154,7 +158,19 @@ class TestGenerateView(TestCase):
def test_undecodable_nonce(self):
resp = self.post_generate(headers={"X-Nonce": "nonsense nonce"})
- self.assertThat(resp, HasAPIError("Cannot decode nonce"))
+ self.assertThat(resp, HasAPIError("Cannot decode X-Nonce header"))
+
+ def test_undecodable_response_nonce(self):
+ data = json.dumps({}).encode("UTF-8")
+ box = Box(
+ self.private_key,
+ boxed_authentication.service_private_keys[0].public_key)
+ message = box.encrypt(data, self.nonce, encoder=Base64Encoder)
+ resp = self.post_generate(
+ headers={"X-Response-Nonce": "nonsense nonce"},
+ encrypted_data=message.ciphertext)
+ self.assertThat(
+ resp, HasAPIError("Cannot decode X-Response-Nonce header"))
def test_nonce_already_used(self):
Nonce.check(self.nonce)
@@ -536,16 +552,18 @@ class TestSignView(TestCase):
self.private_key = PrivateKey.generate()
self.client.registerPublicKey(self.private_key.public_key)
self.nonce = Nonce.generate().nonce
+ self.response_nonce = random(Box.NONCE_SIZE)
store.commit()
def post_sign(self, json_data=None, **kwargs):
kwargs.setdefault("private_key", self.private_key)
kwargs.setdefault("nonce", self.nonce)
+ kwargs.setdefault("response_nonce", self.response_nonce)
return self.fixture.client.post("/sign", json_data=json_data, **kwargs)
def test_unauthenticated(self):
resp = self.post_sign(private_key=None)
- self.assertThat(resp, HasAPIError("Request not authenticated"))
+ self.assertThat(resp, HasAPIError("Client public key not provided"))
def test_undecodable_client_public_key(self):
resp = self.post_sign(headers={"X-Client-Public-Key": "nonsense key"})
@@ -553,7 +571,19 @@ class TestSignView(TestCase):
def test_undecodable_nonce(self):
resp = self.post_sign(headers={"X-Nonce": "nonsense nonce"})
- self.assertThat(resp, HasAPIError("Cannot decode nonce"))
+ self.assertThat(resp, HasAPIError("Cannot decode X-Nonce header"))
+
+ def test_undecodable_response_nonce(self):
+ data = json.dumps({}).encode("UTF-8")
+ box = Box(
+ self.private_key,
+ boxed_authentication.service_private_keys[0].public_key)
+ message = box.encrypt(data, self.nonce, encoder=Base64Encoder)
+ resp = self.post_sign(
+ headers={"X-Response-Nonce": "nonsense nonce"},
+ encrypted_data=message.ciphertext)
+ self.assertThat(
+ resp, HasAPIError("Cannot decode X-Response-Nonce header"))
def test_nonce_already_used(self):
Nonce.check(self.nonce)
@@ -1042,18 +1072,20 @@ class TestInjectView(TestCase):
self.clients[0].registerPublicKey(self.private_keys[0].public_key)
self.clients[1].registerPublicKey(self.private_keys[1].public_key)
self.nonces = [Nonce.generate().nonce for _ in range(2)]
+ self.response_nonce = random(Box.NONCE_SIZE)
store.commit()
def post_inject(self, json_data=None, index=0, **kwargs):
kwargs.setdefault("private_key", self.private_keys[index])
kwargs.setdefault("nonce", self.nonces[index])
+ kwargs.setdefault("response_nonce", self.response_nonce)
return self.fixture.client.post(
"/inject", json_data=json_data, **kwargs)
def test_unauthenticated(self):
resp = self.post_inject(private_key=None)
- self.assertThat(resp, HasAPIError("Request not authenticated"))
+ self.assertThat(resp, HasAPIError("Client public key not provided"))
def test_undecodable_client_public_key(self):
resp = self.post_inject(
@@ -1062,7 +1094,19 @@ class TestInjectView(TestCase):
def test_undecodable_nonce(self):
resp = self.post_inject(headers={"X-Nonce": "nonsense nonce"})
- self.assertThat(resp, HasAPIError("Cannot decode nonce"))
+ self.assertThat(resp, HasAPIError("Cannot decode X-Nonce header"))
+
+ def test_undecodable_response_nonce(self):
+ data = json.dumps({}).encode("UTF-8")
+ box = Box(
+ self.private_keys[0],
+ boxed_authentication.service_private_keys[0].public_key)
+ message = box.encrypt(data, self.nonces[0], encoder=Base64Encoder)
+ resp = self.post_inject(
+ headers={"X-Response-Nonce": "nonsense nonce"},
+ encrypted_data=message.ciphertext)
+ self.assertThat(
+ resp, HasAPIError("Cannot decode X-Response-Nonce header"))
def test_nonce_already_used(self):
Nonce.check(self.nonces[0])
diff --git a/lp_signing/tests/testfixtures.py b/lp_signing/tests/testfixtures.py
index 8cfb84d..ccaec93 100644
--- a/lp_signing/tests/testfixtures.py
+++ b/lp_signing/tests/testfixtures.py
@@ -75,7 +75,8 @@ class TestClient(FlaskClient):
"""A Flask client that understands JSON and boxed authentication."""
def post(self, *args, headers=None, data=None, json_data=None,
- encrypted_data=None, private_key=None, nonce=None, **kwargs):
+ encrypted_data=None, private_key=None, nonce=None,
+ response_nonce=None, **kwargs):
headers = dict(headers) if headers else {}
if json_data is not None:
if data is not None:
@@ -92,6 +93,10 @@ class TestClient(FlaskClient):
if nonce is not None:
headers.setdefault(
"X-Nonce", Base64Encoder.encode(nonce).decode("UTF-8"))
+ if response_nonce is not None:
+ headers.setdefault(
+ "X-Response-Nonce",
+ Base64Encoder.encode(response_nonce).decode("UTF-8"))
if (private_key is not None and nonce is not None and
encrypted_data is None):
data = data or b""
@@ -105,6 +110,15 @@ class TestClient(FlaskClient):
encrypted_data = data
resp = super().post(
*args, headers=headers, data=encrypted_data, **kwargs)
+ if (private_key is not None and response_nonce is not None and
+ resp.mimetype == "application/x-boxed-json"):
+ box = Box(
+ private_key,
+ boxed_authentication.service_private_keys[0].public_key)
+ resp.set_data(
+ box.decrypt(
+ resp.get_data(), response_nonce, encoder=Base64Encoder))
+ resp.mimetype = "application/json"
# If there's a current transaction, roll it back. The POST may have
# committed a transaction in another thread, so anything we try to
# commit might otherwise conflict with that.
diff --git a/lp_signing/webapi.py b/lp_signing/webapi.py
index 409fff6..5726bf6 100644
--- a/lp_signing/webapi.py
+++ b/lp_signing/webapi.py
@@ -14,6 +14,7 @@ from datetime import (
datetime,
timezone,
)
+import functools
from acceptable import (
AcceptableService,
@@ -84,10 +85,20 @@ def nonce():
return {"nonce": base64.b64encode(nonce.nonce).decode("UTF-8")}, 201
+def encrypted_response(func):
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ request.encrypted_response = True
+ return func(*args, **kwargs)
+
+ return wrapper
+
+
generate_api = service.api("/generate", "generate_key", methods=["POST"])
@generate_api.view(introduced_at="1.0")
+@encrypted_response
@validate_body({
"type": "object",
"properties": {
@@ -124,6 +135,7 @@ sign_api = service.api("/sign", "sign_message", methods=["POST"])
@sign_api.view(introduced_at="1.0")
+@encrypted_response
@validate_body({
"type": "object",
"properties": {
@@ -175,6 +187,7 @@ inject_api = service.api("/inject", "inject_key", methods=["POST"])
@inject_api.view(introduced_at="1.0")
+@encrypted_response
@validate_body({
"type": "object",
"properties": {
diff --git a/lp_signing/webapp.py b/lp_signing/webapp.py
index 7867765..5ba5158 100644
--- a/lp_signing/webapp.py
+++ b/lp_signing/webapp.py
@@ -20,6 +20,7 @@ from lp_signing import webapi
from lp_signing.auth import (
boxed_authentication,
BoxedRequest,
+ BoxedResponse,
)
from lp_signing.config import read_config
from lp_signing.exceptions import APIError
@@ -55,8 +56,17 @@ def configure_app(app, config):
app.config["JSONIFY_PRETTYPRINT_REGULAR"] = False
app.config["ACCEPTABLE_VALIDATE_OUTPUT"] = False
- # We use a custom request class to handle authentication.
+ # We use custom request and response classes to handle authentication.
app.request_class = BoxedRequest
+ app.response_class = BoxedResponse
+
+ @app.after_request
+ def encrypt_response(response):
+ try:
+ response.encrypt()
+ return response
+ except Exception as e:
+ return flask.current_app.handle_user_exception(e)
def create_web_application():