← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/lp-signing:add-authorization-api into lp-signing:master

 

Colin Watson has proposed merging ~cjwatson/lp-signing:add-authorization-api into lp-signing:master.

Commit message:
Add /authorizations/add endpoint

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/lp-signing/+git/lp-signing/+merge/396383

This allows authorizing an additional client to use a given key.  The client making the request must already be authorized to use the key in question.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/lp-signing:add-authorization-api into lp-signing:master.
diff --git a/lp_signing/model/client.py b/lp_signing/model/client.py
index 560770e..90f3aae 100644
--- a/lp_signing/model/client.py
+++ b/lp_signing/model/client.py
@@ -83,6 +83,15 @@ class Client(Storm):
         ClientPublicKey.remove(self, public_key)
 
     @classmethod
+    def getByName(cls, name):
+        """Get the client with a given name.
+
+        :param name: A client name (`str`).
+        :return: A `Client` with the given name, or None.
+        """
+        return store.find(Client, Client.name == name).one()
+
+    @classmethod
     def getByPublicKey(cls, public_key):
         """Get the client with a given registered public key.
 
diff --git a/lp_signing/model/tests/test_client.py b/lp_signing/model/tests/test_client.py
index 5d89ed9..82a0177 100644
--- a/lp_signing/model/tests/test_client.py
+++ b/lp_signing/model/tests/test_client.py
@@ -40,6 +40,12 @@ class TestClient(TestCase):
         client.unregisterPublicKey(private_keys[0].public_key)
         self.assertEqual([private_keys[1].public_key], client.public_keys)
 
+    def test_getByName(self):
+        clients = [factory.create_client(name) for name in ("foo", "bar")]
+        self.assertEqual(clients[0], Client.getByName("foo"))
+        self.assertEqual(clients[1], Client.getByName("bar"))
+        self.assertIsNone(Client.getByName("baz"))
+
     def test_getByPublicKey(self):
         clients = [factory.create_client() for _ in range(3)]
         private_keys = [PrivateKey.generate() for _ in range(4)]
diff --git a/lp_signing/tests/test_webapi.py b/lp_signing/tests/test_webapi.py
index 2ae3bff..c4ad4ac 100644
--- a/lp_signing/tests/test_webapi.py
+++ b/lp_signing/tests/test_webapi.py
@@ -2009,3 +2009,174 @@ class TestInjectView(TestCase):
             authorizations=[self.clients[0],
                             self.clients[1]]))
         self.assertEqual(pytz.utc.localize(older_date), key.created_at)
+
+
+class TestAddAuthorizationView(TestCase):
+
+    def setUp(self):
+        super().setUp()
+        self.fixture = self.useFixture(AppFixture())
+        self.useFixture(DatabaseFixture())
+        self.clients = [factory.create_client() for _ in range(2)]
+        self.private_keys = [PrivateKey.generate() for _ in range(2)]
+        self.clients[0].registerPublicKey(self.private_keys[0].public_key)
+        self.clients[1].registerPublicKey(self.private_keys[1].public_key)
+        self.nonces = [Nonce.generate().nonce for _ in range(2)]
+        self.response_nonce = random(Box.NONCE_SIZE)
+        store.commit()
+
+    def post_add_authorization(self, json_data=None, index=0, **kwargs):
+        kwargs.setdefault("private_key", self.private_keys[index])
+        kwargs.setdefault("nonce", self.nonces[index])
+        kwargs.setdefault("response_nonce", self.response_nonce)
+        return self.fixture.client.post(
+            "/authorizations/add", json_data=json_data, **kwargs)
+
+    def test_unauthenticated(self):
+        resp = self.post_add_authorization(private_key=None)
+        self.assertThat(resp, HasAPIError("Client public key not provided"))
+
+    def test_undecodable_client_public_key(self):
+        resp = self.post_add_authorization(
+            headers={"X-Client-Public-Key": "nonsense key"})
+        self.assertThat(resp, HasAPIError("Cannot decode client public key"))
+
+    def test_undecodable_nonce(self):
+        resp = self.post_add_authorization(
+            headers={"X-Nonce": "nonsense nonce"})
+        self.assertThat(resp, HasAPIError("Cannot decode X-Nonce header"))
+
+    def test_undecodable_response_nonce(self):
+        data = json.dumps({}).encode("UTF-8")
+        box = Box(
+            self.private_keys[0],
+            boxed_authentication.service_private_keys[0].public_key)
+        message = box.encrypt(data, self.nonces[0], encoder=Base64Encoder)
+        resp = self.post_add_authorization(
+            headers={"X-Response-Nonce": "nonsense nonce"},
+            encrypted_data=message.ciphertext)
+        self.assertThat(
+            resp, HasAPIError("Cannot decode X-Response-Nonce header"))
+
+    def test_nonce_already_used(self):
+        Nonce.check(self.nonces[0])
+        resp = self.post_add_authorization()
+        self.assertThat(resp, HasAPIError("Invalid nonce"))
+
+    def assertNonceConsumed(self):
+        self.assertRaises(InvalidNonce, Nonce.check, self.nonces[0])
+
+    def test_unboxable_data(self):
+        resp = self.post_add_authorization(encrypted_data=b"data")
+        self.assertThat(resp, HasAPIError("Authentication failed"))
+        self.assertNonceConsumed()
+
+    def test_unregistered_private_key(self):
+        private_key = PrivateKey.generate()
+        resp = self.post_add_authorization(private_key=private_key)
+        self.assertThat(resp, HasAPIError("Unregistered client public key"))
+        self.assertNonceConsumed()
+
+    def test_wrong_service_public_key(self):
+        private_key = PrivateKey.generate()
+        box = Box(self.private_keys[0], private_key.public_key)
+        message = box.encrypt(b"{}", self.nonces[0], encoder=Base64Encoder)
+        resp = self.post_add_authorization(encrypted_data=message.ciphertext)
+        self.assertThat(resp, HasAPIError("Authentication failed"))
+        self.assertNonceConsumed()
+
+    def test_no_json(self):
+        resp = self.post_add_authorization()
+        self.assertThat(resp, HasAPIError("Error decoding JSON request body"))
+        self.assertNonceConsumed()
+
+    def test_missing_key_type(self):
+        resp = self.post_add_authorization({
+            "fingerprint": "",
+            "client-name": "",
+            })
+        self.assertThat(
+            resp, HasAPIError("'key-type' is a required property at /"))
+        self.assertNonceConsumed()
+
+    def test_missing_fingerprint(self):
+        resp = self.post_add_authorization({
+            "key-type": "",
+            "client-name": "",
+            })
+        self.assertThat(
+            resp, HasAPIError("'fingerprint' is a required property at /"))
+        self.assertNonceConsumed()
+
+    def test_missing_client_name(self):
+        resp = self.post_add_authorization({
+            "key-type": "",
+            "fingerprint": "",
+            })
+        self.assertThat(
+            resp, HasAPIError("'client-name' is a required property at /"))
+        self.assertNonceConsumed()
+
+    def test_invalid_key_type(self):
+        resp = self.post_add_authorization({
+            "key-type": "nonsense",
+            "fingerprint": "",
+            "client-name": "",
+            })
+        self.assertThat(
+            resp,
+            HasAPIError(
+                "'nonsense' is not one of ['UEFI', 'KMOD', 'OPAL', 'SIPL', "
+                "'FIT', 'OPENPGP']"))
+        self.assertNonceConsumed()
+
+    def test_unknown_key(self):
+        resp = self.post_add_authorization({
+            "key-type": "UEFI",
+            "fingerprint": "0" * 16,
+            "client-name": "",
+            })
+        self.assertThat(
+            resp,
+            HasAPIError("No UEFI key with fingerprint 0000000000000000", 404))
+        self.assertNonceConsumed()
+
+    def test_unknown_client(self):
+        key = factory.create_key(key_type=KeyType.UEFI)
+        store.commit()
+        resp = self.post_add_authorization({
+            "key-type": "UEFI",
+            "fingerprint": key.fingerprint,
+            "client-name": "nonsense",
+            })
+        self.assertThat(
+            resp, HasAPIError("No registered client named nonsense"))
+        self.assertNonceConsumed()
+
+    def test_client_not_allowed(self):
+        key = factory.create_key(key_type=KeyType.UEFI)
+        store.commit()
+        resp = self.post_add_authorization({
+            "key-type": "UEFI",
+            "fingerprint": key.fingerprint,
+            "client-name": self.clients[1].name,
+            })
+        self.assertThat(
+            resp,
+            HasAPIError(
+                f"{self.clients[0]} is not allowed to use {key}", 403))
+        self.assertNonceConsumed()
+
+    def test_add_authorization(self):
+        key = factory.create_key(key_type=KeyType.UEFI)
+        key.addAuthorization(self.clients[0])
+        self.assertFalse(key.isAuthorized(self.clients[1]))
+        store.commit()
+        resp = self.post_add_authorization({
+            "key-type": "UEFI",
+            "fingerprint": key.fingerprint,
+            "client-name": self.clients[1].name,
+            })
+        self.assertThat(resp, IsJSONResponse(MatchesDict({})))
+        self.assertTrue(key.isAuthorized(self.clients[1]))
+        self.assertNonceConsumed()
diff --git a/lp_signing/webapi.py b/lp_signing/webapi.py
index ea8b92b..ff35566 100644
--- a/lp_signing/webapi.py
+++ b/lp_signing/webapi.py
@@ -32,6 +32,7 @@ from lp_signing.exceptions import (
     ClientNotAllowed,
     DataValidationError,
     )
+from lp_signing.model.client import Client
 from lp_signing.model.key import Key
 from lp_signing.model.nonce import Nonce
 
@@ -274,3 +275,40 @@ def inject_key():
     return {
         "fingerprint": key.fingerprint,
         }, 200
+
+
+add_authorization_api = service.api(
+    "/authorizations/add", "add_authorization", methods=["POST"])
+
+
+@add_authorization_api.view(introduced_at="1.0")
+@encrypted_response
+@validate_body({
+    "type": "object",
+    "properties": {
+        "key-type": {
+            "type": "string",
+            "enum": [item.token for item in KeyType],
+            },
+        "fingerprint": {"type": "string"},
+        "client-name": {"type": "string"},
+        },
+    "required": ["key-type", "fingerprint", "client-name"],
+    })
+@validate_output({"type": "object"})
+def add_authorization():
+    payload = request.get_json()
+    key_type = KeyType.items[payload["key-type"]]
+    fingerprint = payload["fingerprint"]
+    client_name = payload["client-name"]
+    key = Key.getByTypeAndFingerprint(key_type, fingerprint)
+    client = Client.getByName(client_name)
+    if client is None:
+        raise DataValidationError.single(
+            f"No registered client named {client_name}")
+    if not key.isAuthorized(request.client):
+        raise ClientNotAllowed.single(
+            f"{request.client} is not allowed to use {key}")
+    key.addAuthorization(client)
+    store.commit()
+    return {}, 200