launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #29684
[Merge] ~cjwatson/lp-signing:black-isort into lp-signing:master
Colin Watson has proposed merging ~cjwatson/lp-signing:black-isort into lp-signing:master.
Commit message:
Apply black and isort
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/lp-signing/+git/lp-signing/+merge/437445
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/lp-signing:black-isort into lp-signing:master.
diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs
new file mode 100644
index 0000000..ad2a26d
--- /dev/null
+++ b/.git-blame-ignore-revs
@@ -0,0 +1,2 @@
+# apply black and isort
+5a860fde26131e93fdd903e032b83f84582deccb
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 8984e28..edf5ffb 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -15,6 +15,14 @@ repos:
- id: check-xml
- id: check-yaml
- id: debug-statements
+- repo: https://github.com/psf/black
+ rev: 23.1.0
+ hooks:
+ - id: black
+- repo: https://github.com/PyCQA/isort
+ rev: 5.12.0
+ hooks:
+ - id: isort
- repo: https://github.com/PyCQA/flake8
rev: 3.9.2
hooks:
diff --git a/charm/lp-signing/reactive/lp-signing.py b/charm/lp-signing/reactive/lp-signing.py
index 23a594d..3aab8c8 100644
--- a/charm/lp-signing/reactive/lp-signing.py
+++ b/charm/lp-signing/reactive/lp-signing.py
@@ -5,113 +5,115 @@ from __future__ import absolute_import, print_function, unicode_literals
import os.path
import subprocess
-from urllib.parse import (
- urlparse,
- urlunparse,
- )
+from urllib.parse import urlparse, urlunparse
-from charmhelpers.core import (
- hookenv,
- host,
- templating,
- )
-from charms.reactive import (
- remove_state,
- set_state,
- when,
- when_not,
- )
-from ols import (
- base,
- http,
- postgres,
- )
+from charmhelpers.core import hookenv, host, templating
+from charms.reactive import remove_state, set_state, when, when_not
+from ols import base, http, postgres
def stormify_db_uri(uri):
"""Storm requires postgres:// rather than postgresql://."""
parsed_uri = urlparse(uri)
scheme = parsed_uri.scheme
- if scheme == 'postgresql':
- scheme = 'postgres'
+ if scheme == "postgresql":
+ scheme = "postgres"
return urlunparse((scheme, *parsed_uri[1:]))
def configure_rsync():
config = hookenv.config()
- if config['log_hosts_allow']:
+ if config["log_hosts_allow"]:
rsync_config = dict(config)
- rsync_config['base_dir'] = base.base_dir()
+ rsync_config["base_dir"] = base.base_dir()
templating.render(
- 'lp-signing-rsync.j2', '/etc/rsync-juju.d/010-lp-signing.conf',
- rsync_config, perms=0o644)
- if not host.service_restart('rsync'):
- raise RuntimeError('Failed to restart rsync')
+ "lp-signing-rsync.j2",
+ "/etc/rsync-juju.d/010-lp-signing.conf",
+ rsync_config,
+ perms=0o644,
+ )
+ if not host.service_restart("rsync"):
+ raise RuntimeError("Failed to restart rsync")
-@when('ols.configured', 'db-admin.master.available')
-@when_not('service.migrated')
+@when("ols.configured", "db-admin.master.available")
+@when_not("service.migrated")
def migrate(pgsql):
if pgsql.master is None:
return
- hookenv.log('Running migrations.')
+ hookenv.log("Running migrations.")
success, output = postgres.run_migration(pgsql, action=False)
if success:
- hookenv.log('Migrations run successfully.')
- set_state('service.migrated')
+ hookenv.log("Migrations run successfully.")
+ set_state("service.migrated")
else:
- hookenv.log('Failed to run migrations:')
+ hookenv.log("Failed to run migrations:")
for line in output.splitlines():
hookenv.log(line)
-@when('ols.configured', 'db.master.available', 'service.migrated')
-@when_not('service.configured')
+@when("ols.configured", "db.master.available", "service.migrated")
+@when_not("service.configured")
def configure(pgsql):
if pgsql.master is None:
return
- system_packages = os.path.join(base.code_dir(), 'system-packages.txt')
+ system_packages = os.path.join(base.code_dir(), "system-packages.txt")
if os.path.exists(system_packages):
site_packages = subprocess.check_output(
- [os.path.join(base.code_dir(), 'env', 'bin', 'python'), '-c',
- 'from distutils.sysconfig import get_python_lib; '
- 'print(get_python_lib())'], universal_newlines=True).rstrip('\n')
+ [
+ os.path.join(base.code_dir(), "env", "bin", "python"),
+ "-c",
+ "from distutils.sysconfig import get_python_lib; "
+ "print(get_python_lib())",
+ ],
+ universal_newlines=True,
+ ).rstrip("\n")
subprocess.check_call(
- [os.path.join(base.code_dir(), 'link-system-packages.py'),
- site_packages, system_packages])
+ [
+ os.path.join(base.code_dir(), "link-system-packages.py"),
+ site_packages,
+ system_packages,
+ ]
+ )
config = hookenv.config()
config_path = base.service_config_path()
svc_config = dict(config)
master, standbys = postgres.get_db_uris(pgsql)
- svc_config['master_url'] = stormify_db_uri(master)
- svc_config['standby_urls'] = [
- stormify_db_uri(standby) for standby in standbys]
+ svc_config["master_url"] = stormify_db_uri(master)
+ svc_config["standby_urls"] = [
+ stormify_db_uri(standby) for standby in standbys
+ ]
- hookenv.log('Writing service config.')
+ hookenv.log("Writing service config.")
templating.render(
- 'service.conf.j2', config_path, svc_config,
- owner='root', group=base.user(), perms=0o440)
+ "service.conf.j2",
+ config_path,
+ svc_config,
+ owner="root",
+ group=base.user(),
+ perms=0o440,
+ )
configure_rsync()
- set_state('service.configured')
+ set_state("service.configured")
-@when('service.configured', 'ols.wsgi.configured')
+@when("service.configured", "ols.wsgi.configured")
def check_is_running():
if http.is_listening():
- hookenv.status_set('active', 'Ready')
+ hookenv.status_set("active", "Ready")
else:
- hookenv.status_set('blocked', 'Service not running, check logs')
+ hookenv.status_set("blocked", "Service not running, check logs")
-@when('config.changed.build_label')
+@when("config.changed.build_label")
def build_label_changed():
- remove_state('ols.service.installed')
- remove_state('ols.configured')
- remove_state('service.migrated')
- remove_state('service.configured')
+ remove_state("ols.service.installed")
+ remove_state("ols.configured")
+ remove_state("service.migrated")
+ remove_state("service.configured")
diff --git a/link-system-packages.py b/link-system-packages.py
index d557835..3431c92 100755
--- a/link-system-packages.py
+++ b/link-system-packages.py
@@ -7,11 +7,11 @@
from __future__ import absolute_import, print_function, unicode_literals
-from argparse import ArgumentParser
-from distutils.sysconfig import get_python_lib
import importlib
import os.path
import re
+from argparse import ArgumentParser
+from distutils.sysconfig import get_python_lib
def link_module(name, virtualenv_libdir, optional=False):
@@ -29,9 +29,11 @@ def link_module(name, virtualenv_libdir, optional=False):
system_libdir = get_python_lib(plat_specific=path.endswith(".so"))
if os.path.commonprefix([path, system_libdir]) != system_libdir:
raise RuntimeError(
- "%s imported from outside %s (%s)" % (name, system_libdir, path))
+ "%s imported from outside %s (%s)" % (name, system_libdir, path)
+ )
target_path = os.path.join(
- virtualenv_libdir, os.path.relpath(path, system_libdir))
+ virtualenv_libdir, os.path.relpath(path, system_libdir)
+ )
if os.path.lexists(target_path) and os.path.islink(target_path):
os.unlink(target_path)
os.symlink(path, target_path)
diff --git a/lp_signing/__init__.py b/lp_signing/__init__.py
index b7d09b4..e536ff8 100644
--- a/lp_signing/__init__.py
+++ b/lp_signing/__init__.py
@@ -5,8 +5,10 @@
import warnings
-
# Silence a warning before doing anything else.
warnings.filterwarnings(
- "ignore", r".*The psycopg2 wheel package will be renamed.*",
- category=UserWarning, module="psycopg2")
+ "ignore",
+ r".*The psycopg2 wheel package will be renamed.*",
+ category=UserWarning,
+ module="psycopg2",
+)
diff --git a/lp_signing/auth.py b/lp_signing/auth.py
index 9865704..c35080d 100644
--- a/lp_signing/auth.py
+++ b/lp_signing/auth.py
@@ -6,23 +6,12 @@
import base64
import json
-from flask import (
- Request,
- request,
- Response,
- )
+from flask import Request, Response, request
from nacl.encoding import Base64Encoder
-from nacl.public import (
- Box,
- PrivateKey,
- PublicKey,
- )
+from nacl.public import Box, PrivateKey, PublicKey
from lp_signing.config import read_config
-from lp_signing.exceptions import (
- DataValidationError,
- InvalidNonce,
- )
+from lp_signing.exceptions import DataValidationError, InvalidNonce
from lp_signing.model.client import Client
from lp_signing.model.nonce import Nonce
@@ -36,7 +25,7 @@ class BoxedAuthentication:
self.service_private_keys = [
PrivateKey(key, encoder=Base64Encoder)
for key in json.loads(config["auth"]["service_private_keys"])
- ]
+ ]
except Exception: # pragma: no cover
# Yes, this is uninformative, but we don't want to accidentally
# dump private keys in exceptions.
@@ -82,8 +71,8 @@ class BoxedRequest(Request):
@property
def is_json(self):
return (
- super().is_json or
- self.mimetype == "application/x-boxed-json") # pragma: no cover
+ super().is_json or self.mimetype == "application/x-boxed-json"
+ ) # pragma: no cover
def get_client_public_key(self):
try:
@@ -100,12 +89,14 @@ class BoxedRequest(Request):
header = self.headers[header_name]
except KeyError:
raise DataValidationError.single(
- "{} header not provided".format(header_name))
+ "{} header not provided".format(header_name)
+ )
try:
return base64.b64decode(header.encode("UTF-8"), validate=True)
except Exception:
raise InvalidNonce.single(
- "Cannot decode {} header".format(header_name))
+ "Cannot decode {} header".format(header_name)
+ )
def _get_data_for_json(self, cache):
"""Read, authenticate, and decrypt incoming data.
@@ -131,7 +122,8 @@ class BoxedRequest(Request):
nonce = self.get_nonce("X-Nonce")
Nonce.check(nonce)
for i, private_key in enumerate(
- boxed_authentication.service_private_keys):
+ boxed_authentication.service_private_keys
+ ):
try:
box = Box(private_key, client_public_key)
data = box.decrypt(data, nonce, encoder=Base64Encoder)
@@ -141,7 +133,8 @@ class BoxedRequest(Request):
self.client = Client.getByPublicKey(client_public_key)
if self.client is None:
raise DataValidationError.single(
- "Unregistered client public key")
+ "Unregistered client public key"
+ )
break
else:
raise DataValidationError.single("Authentication failed")
@@ -180,7 +173,8 @@ class BoxedResponse(Response):
client_public_key = request.get_client_public_key()
nonce = request.get_nonce("X-Response-Nonce")
private_key = boxed_authentication.service_private_keys[
- request.private_key_index]
+ request.private_key_index
+ ]
box = Box(private_key, client_public_key)
data = self.get_data()
try:
diff --git a/lp_signing/cli.py b/lp_signing/cli.py
index 9b1d13c..c9cbe0f 100644
--- a/lp_signing/cli.py
+++ b/lp_signing/cli.py
@@ -9,17 +9,13 @@ import click
from flask.cli import FlaskGroup
from flask_storm import store
from nacl.encoding import Base64Encoder
-from nacl.public import (
- PrivateKey,
- PublicKey,
- )
+from nacl.public import PrivateKey, PublicKey
from lp_signing.model.client import Client
from lp_signing.webapp import app
class PublicKeyParamType(click.ParamType):
-
name = "public-key"
def convert(self, value, param, ctx):
@@ -45,11 +41,17 @@ def encode_key(key):
@cli.command("generate-key-pair")
@click.option(
- "--private-key-path", type=click.Path(dir_okay=False),
- help="Output path for private key", show_default="standard output")
+ "--private-key-path",
+ type=click.Path(dir_okay=False),
+ help="Output path for private key",
+ show_default="standard output",
+)
@click.option(
- "--public-key-path", type=click.Path(dir_okay=False),
- help="Output path for public key", show_default="standard output")
+ "--public-key-path",
+ type=click.Path(dir_okay=False),
+ help="Output path for public key",
+ show_default="standard output",
+)
def generate_key_pair(private_key_path, public_key_path):
"""Generate a NaCl key pair.
diff --git a/lp_signing/config.py b/lp_signing/config.py
index be8403a..f488894 100644
--- a/lp_signing/config.py
+++ b/lp_signing/config.py
@@ -12,7 +12,6 @@ from urllib.parse import quote
from nacl.encoding import Base64Encoder
from nacl.public import PrivateKey
-
ROOT = Path(__file__).parent.parent.resolve()
@@ -23,28 +22,37 @@ DEFAULT_TEST_CONFIG = {
"database": {
"master_url": f"postgres://{quote(_test_db_path, safe='')}/lp-signing",
"standby_urls": "[]",
- },
+ },
"auth": {
- "service_private_keys": json.dumps([
- PrivateKey.generate().encode(encoder=Base64Encoder).decode(
- "UTF-8"),
- ]),
- },
+ "service_private_keys": json.dumps(
+ [
+ PrivateKey.generate()
+ .encode(encoder=Base64Encoder)
+ .decode("UTF-8"),
+ ]
+ ),
+ },
"key_storage": {
- "private_keys": json.dumps([
- PrivateKey.generate().encode(encoder=Base64Encoder).decode(
- "UTF-8"),
- ]),
- },
- }
+ "private_keys": json.dumps(
+ [
+ PrivateKey.generate()
+ .encode(encoder=Base64Encoder)
+ .decode("UTF-8"),
+ ]
+ ),
+ },
+}
def _read_config():
path = Path(os.environ.get("SERVICE_CONFIG", ROOT / "service.conf"))
- config = configparser.ConfigParser({
- "QUOTED_TEST_DB_PATH": quote(_test_db_path, safe="").replace(
- "%", "%%"),
- })
+ config = configparser.ConfigParser(
+ {
+ "QUOTED_TEST_DB_PATH": quote(_test_db_path, safe="").replace(
+ "%", "%%"
+ ),
+ }
+ )
config.read(path)
return {sec: dict(config.items(sec)) for sec in config.sections()}
@@ -75,10 +83,12 @@ class ConfigProvider(object):
for key in new_config:
merged_value = new_config[key]
if key in config:
- if (isinstance(config[key], dict) and
- isinstance(new_config[key], dict)):
+ if isinstance(config[key], dict) and isinstance(
+ new_config[key], dict
+ ):
merged_value = self.merge_config(
- config[key], new_config[key])
+ config[key], new_config[key]
+ )
merged_config[key] = merged_value
return merged_config
diff --git a/lp_signing/crypto.py b/lp_signing/crypto.py
index 671338c..cf38693 100644
--- a/lp_signing/crypto.py
+++ b/lp_signing/crypto.py
@@ -4,17 +4,13 @@
"""A container for data encrypted at rest using configured keys."""
__all__ = [
- 'NaClEncryptedContainerBase',
- ]
+ "NaClEncryptedContainerBase",
+]
import base64
from nacl.exceptions import CryptoError as NaClCryptoError
-from nacl.public import (
- PrivateKey,
- PublicKey,
- SealedBox,
- )
+from nacl.public import PrivateKey, PublicKey, SealedBox
class CryptoError(Exception):
@@ -73,7 +69,8 @@ class NaClEncryptedContainerBase:
raise CryptoError(str(e)) from e
return (
base64.b64encode(self.public_key_bytes).decode("UTF-8"),
- base64.b64encode(data_encrypted).decode("UTF-8"))
+ base64.b64encode(data_encrypted).decode("UTF-8"),
+ )
@property
def private_key_bytes(self):
@@ -120,8 +117,9 @@ class NaClEncryptedContainerBase:
raise CryptoError(str(e)) from e
if public_key_bytes != self.public_key_bytes:
raise ValueError(
- "Public key %r does not match configured public key %r" %
- (public_key_bytes, self.public_key_bytes))
+ "Public key %r does not match configured public key %r"
+ % (public_key_bytes, self.public_key_bytes)
+ )
if self.private_key is None:
raise ValueError("No private key configured")
try:
diff --git a/lp_signing/database/constants.py b/lp_signing/database/constants.py
index fbee062..e65d75e 100644
--- a/lp_signing/database/constants.py
+++ b/lp_signing/database/constants.py
@@ -5,7 +5,6 @@
from storm.expr import SQL
-
DEFAULT = SQL("DEFAULT")
UTC_NOW = SQL("CURRENT_TIMESTAMP AT TIME ZONE 'UTC'")
diff --git a/lp_signing/database/enumcol.py b/lp_signing/database/enumcol.py
index 2892ccd..f3dbf8f 100644
--- a/lp_signing/database/enumcol.py
+++ b/lp_signing/database/enumcol.py
@@ -4,13 +4,10 @@
"""An enumerated variable type for Storm."""
__all__ = [
- 'DBEnum',
- ]
+ "DBEnum",
+]
-from lazr.enum import (
- DBEnumeratedType,
- DBItem,
- )
+from lazr.enum import DBEnumeratedType, DBItem
from storm.properties import SimpleProperty
from storm.variables import Variable
@@ -18,7 +15,8 @@ from storm.variables import Variable
def check_enum_type(enum):
if not issubclass(enum, DBEnumeratedType):
raise TypeError(
- '%r must be a DBEnumeratedType: %r' % (enum, type(enum)))
+ "%r must be a DBEnumeratedType: %r" % (enum, type(enum))
+ )
def check_type(enum):
@@ -28,6 +26,7 @@ def check_type(enum):
class DBEnumVariable(Variable):
"""A Storm variable class representing a DBEnumeratedType."""
+
__slots__ = ("_enum",)
def __init__(self, *args, **kwargs):
@@ -45,13 +44,15 @@ class DBEnumVariable(Variable):
return enum.items[value]
except KeyError:
pass
- raise KeyError('%r not present in any of %r' % (value, self._enum))
+ raise KeyError("%r not present in any of %r" % (value, self._enum))
else:
if not isinstance(value, DBItem):
raise TypeError("Not a DBItem: %r" % (value,))
if value.enum not in self._enum:
- raise TypeError("DBItem from unknown enum %r, not in %r" % (
- value.enum.name, self._enum))
+ raise TypeError(
+ "DBItem from unknown enum %r, not in %r"
+ % (value.enum.name, self._enum)
+ )
return value
def parse_get(self, value, to_db):
diff --git a/lp_signing/database/helpers.py b/lp_signing/database/helpers.py
index 39ec67b..b7f4723 100644
--- a/lp_signing/database/helpers.py
+++ b/lp_signing/database/helpers.py
@@ -2,8 +2,8 @@
# GNU Affero General Public License version 3 (see the file LICENSE).
__all__ = [
- 'get_transaction_timestamp',
- ]
+ "get_transaction_timestamp",
+]
import pytz
@@ -11,5 +11,6 @@ import pytz
def get_transaction_timestamp(store):
"""Get the timestamp for the current transaction on `store`."""
timestamp = store.execute(
- "SELECT CURRENT_TIMESTAMP AT TIME ZONE 'UTC'").get_one()[0]
- return timestamp.replace(tzinfo=pytz.timezone('UTC'))
+ "SELECT CURRENT_TIMESTAMP AT TIME ZONE 'UTC'"
+ ).get_one()[0]
+ return timestamp.replace(tzinfo=pytz.timezone("UTC"))
diff --git a/lp_signing/database/tests/test_enumcol.py b/lp_signing/database/tests/test_enumcol.py
index ea5ba1f..c87a397 100644
--- a/lp_signing/database/tests/test_enumcol.py
+++ b/lp_signing/database/tests/test_enumcol.py
@@ -10,16 +10,9 @@ from lazr.enum import (
EnumeratedType,
Item,
use_template,
- )
-from storm.locals import (
- AutoReload,
- Int,
- Storm,
- )
-from testtools import (
- ExpectedException,
- TestCase,
- )
+)
+from storm.locals import AutoReload, Int, Storm
+from testtools import ExpectedException, TestCase
from lp_signing.database.constants import DEFAULT
from lp_signing.database.enumcol import DBEnum
@@ -29,17 +22,23 @@ from lp_signing.tests.testfixtures import DatabaseFixture
class FooType(DBEnumeratedType):
"""Enumerated type for the foo column."""
- ONE = DBItem(1, """
+ ONE = DBItem(
+ 1,
+ """
One
Number one.
- """)
+ """,
+ )
- TWO = DBItem(2, """
+ TWO = DBItem(
+ 2,
+ """
Two
Number two.
- """)
+ """,
+ )
class BarType(DBEnumeratedType):
@@ -51,7 +50,6 @@ class BarType(DBEnumeratedType):
class FooTest(Storm):
-
__storm_table__ = "FooTest"
id = Int(primary=True)
@@ -59,16 +57,17 @@ class FooTest(Storm):
class TestDBEnum(TestCase):
-
def setUp(self):
super().setUp()
self.useFixture(DatabaseFixture())
- store.execute("""
+ store.execute(
+ """
CREATE TABLE FooTest (
id serial NOT NULL,
foo integer DEFAULT 1 NOT NULL
);
- """)
+ """
+ )
store.execute('GRANT ALL ON TABLE FooTest TO "lp-signing";')
store.execute('GRANT ALL ON footest_id_seq TO "lp-signing";')
self.addCleanup(store.execute, "DROP TABLE FooTest;")
@@ -81,9 +80,11 @@ class TestDBEnum(TestCase):
TWO = Item("Two")
with ExpectedException(
- TypeError,
- r"<EnumeratedType 'PlainFooType'> must be a DBEnumeratedType: "
- r"<class 'lazr.enum.*MetaEnum'>"):
+ TypeError,
+ r"<EnumeratedType 'PlainFooType'> must be a DBEnumeratedType: "
+ r"<class 'lazr.enum.*MetaEnum'>",
+ ):
+
class BadFooTest(Storm):
__storm_table__ = "FooTest"
id = Int(primary=True)
@@ -117,17 +118,21 @@ class TestDBEnum(TestCase):
def test_rejects_items_from_incorrect_enum(self):
# DBEnum variables reject items from other enumerations.
class AnotherType(DBEnumeratedType):
- ONE = DBItem(1, """
+ ONE = DBItem(
+ 1,
+ """
One
Number one.
- """)
+ """,
+ )
t = FooTest()
with ExpectedException(
- TypeError,
- r"DBItem from unknown enum 'AnotherType', not in "
- r"\(<DBEnumeratedType 'FooType'>,\)"):
+ TypeError,
+ r"DBItem from unknown enum 'AnotherType', not in "
+ r"\(<DBEnumeratedType 'FooType'>,\)",
+ ):
t.foo = AnotherType.ONE
def test_rejects_derived_enum(self):
@@ -137,9 +142,10 @@ class TestDBEnum(TestCase):
t = FooTest()
with ExpectedException(
- TypeError,
- r"DBItem from unknown enum 'DerivedType', not in "
- r"\(<DBEnumeratedType 'FooType'>,\)"):
+ TypeError,
+ r"DBItem from unknown enum 'DerivedType', not in "
+ r"\(<DBEnumeratedType 'FooType'>,\)",
+ ):
t.foo = DerivedType.ONE
def test_multiple_enums(self):
@@ -173,7 +179,8 @@ class TestDBEnum(TestCase):
store.execute("UPDATE FooTest SET foo = 10 WHERE id = ?", (t.id,))
t.foo = AutoReload
with ExpectedException(
- KeyError,
- r"\"10 not present in any of "
- r"\(<DBEnumeratedType 'FooType'>,\)\""):
+ KeyError,
+ r"\"10 not present in any of "
+ r"\(<DBEnumeratedType 'FooType'>,\)\"",
+ ):
t.foo
diff --git a/lp_signing/enums.py b/lp_signing/enums.py
index 43828c0..f05b8fc 100644
--- a/lp_signing/enums.py
+++ b/lp_signing/enums.py
@@ -1,99 +1,123 @@
# Copyright 2019 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
-from lazr.enum import (
- DBEnumeratedType,
- DBItem,
- EnumeratedType,
- Item,
- )
+from lazr.enum import DBEnumeratedType, DBItem, EnumeratedType, Item
__all__ = [
"KeyType",
"OpenPGPKeyAlgorithm",
"SignatureMode",
- ]
+]
class KeyType(DBEnumeratedType):
-
- UEFI = DBItem(1, """
+ UEFI = DBItem(
+ 1,
+ """
UEFI
A signing key for UEFI Secure Boot images.
- """)
+ """,
+ )
- KMOD = DBItem(2, """
+ KMOD = DBItem(
+ 2,
+ """
Kmod
A signing key for kernel modules.
- """)
+ """,
+ )
- OPAL = DBItem(3, """
+ OPAL = DBItem(
+ 3,
+ """
OPAL
A signing key for OPAL kernel images.
- """)
+ """,
+ )
- SIPL = DBItem(4, """
+ SIPL = DBItem(
+ 4,
+ """
SIPL
A signing key for Secure Initial Program Load kernel images.
- """)
+ """,
+ )
- FIT = DBItem(5, """
+ FIT = DBItem(
+ 5,
+ """
FIT
A signing key for U-Boot Flat Image Tree images.
- """)
+ """,
+ )
- OPENPGP = DBItem(6, """
+ OPENPGP = DBItem(
+ 6,
+ """
OpenPGP
An OpenPGP signing key.
- """)
+ """,
+ )
- CV2_KERNEL = DBItem(7, """
+ CV2_KERNEL = DBItem(
+ 7,
+ """
CV2 Kernel
An Ambarella CV2 kernel signing key.
- """)
+ """,
+ )
- ANDROID_KERNEL = DBItem(8, """
+ ANDROID_KERNEL = DBItem(
+ 8,
+ """
Android Kernel
An Android kernel signing key.
- """)
+ """,
+ )
class OpenPGPKeyAlgorithm(EnumeratedType):
-
- RSA = Item("""
+ RSA = Item(
+ """
RSA
A Rivest-Shamir-Adleman key.
- """)
+ """
+ )
class SignatureMode(EnumeratedType):
-
- ATTACHED = Item("""
+ ATTACHED = Item(
+ """
Attached
The result of signing should consist of both the original message
and the signature.
- """)
+ """
+ )
- DETACHED = Item("""
+ DETACHED = Item(
+ """
Detached
The result of signing should be just the signature.
- """)
+ """
+ )
- CLEAR = Item("""
+ CLEAR = Item(
+ """
Clear
The result of signing should consist of both the original message
and the signature, encapsulated in a plain-text format. This is
only meaningful for OpenPGP signatures.
- """)
+ """
+ )
diff --git a/lp_signing/model/client.py b/lp_signing/model/client.py
index 90f3aae..1618ff9 100644
--- a/lp_signing/model/client.py
+++ b/lp_signing/model/client.py
@@ -5,20 +5,13 @@
__all__ = [
"Client",
- ]
+]
+import pytz
from flask_storm import store
from nacl.encoding import Base64Encoder
from nacl.public import PublicKey
-import pytz
-from storm.locals import (
- DateTime,
- Int,
- RawStr,
- Reference,
- Storm,
- Unicode,
- )
+from storm.locals import DateTime, Int, RawStr, Reference, Storm, Unicode
from lp_signing.database.constants import DEFAULT
@@ -48,9 +41,8 @@ class Client(Storm):
:return: A new `Client`.
"""
client = Client(
- name=name,
- created_at=created_at,
- updated_at=updated_at)
+ name=name, created_at=created_at, updated_at=updated_at
+ )
store.add(client)
return client
@@ -65,7 +57,9 @@ class Client(Storm):
return [
client_public_key.public_key
for client_public_key in store.find(
- ClientPublicKey, ClientPublicKey.client == self)]
+ ClientPublicKey, ClientPublicKey.client == self
+ )
+ ]
def registerPublicKey(self, public_key):
"""Register a public key for this client.
@@ -102,8 +96,9 @@ class Client(Storm):
return store.find(
Client,
Client.id == ClientPublicKey.client_id,
- ClientPublicKey._public_key == public_key.encode(
- encoder=Base64Encoder)).one()
+ ClientPublicKey._public_key
+ == public_key.encode(encoder=Base64Encoder),
+ ).one()
class ClientPublicKey(Storm):
@@ -134,5 +129,6 @@ class ClientPublicKey(Storm):
store.find(
ClientPublicKey,
ClientPublicKey.client == client,
- ClientPublicKey._public_key == public_key.encode(
- encoder=Base64Encoder)).remove()
+ ClientPublicKey._public_key
+ == public_key.encode(encoder=Base64Encoder),
+ ).remove()
diff --git a/lp_signing/model/key.py b/lp_signing/model/key.py
index 2f74892..c43a7f5 100644
--- a/lp_signing/model/key.py
+++ b/lp_signing/model/key.py
@@ -5,46 +5,35 @@
__all__ = [
"Key",
- ]
+]
import base64
-from contextlib import contextmanager
import hashlib
import json
import logging
import os
-from pathlib import Path
import re
import shutil
import subprocess
+import time
+from contextlib import contextmanager
+from pathlib import Path
from subprocess import CalledProcessError
from tempfile import TemporaryDirectory
from textwrap import dedent
-import time
-from flask_storm import store
import gpg
-from nacl.public import PrivateKey
import pytz
-from storm.locals import (
- DateTime,
- Int,
- RawStr,
- Reference,
- Storm,
- Unicode,
- )
+from flask_storm import store
+from nacl.public import PrivateKey
from storm.databases.postgres import JSON
+from storm.locals import DateTime, Int, RawStr, Reference, Storm, Unicode
from lp_signing.config import read_config
from lp_signing.crypto import NaClEncryptedContainerBase
from lp_signing.database.constants import DEFAULT
from lp_signing.database.enumcol import DBEnum
-from lp_signing.enums import (
- KeyType,
- OpenPGPKeyAlgorithm,
- SignatureMode,
- )
+from lp_signing.enums import KeyType, OpenPGPKeyAlgorithm, SignatureMode
from lp_signing.exceptions import (
Conflict,
KeyGenerationError,
@@ -52,15 +41,15 @@ from lp_signing.exceptions import (
NoSuchKey,
SignatureError,
UnsupportedSignatureMode,
- )
+)
from lp_signing.model.client import Client
-
_log = logging.getLogger(__name__)
# {common_name} will be substituted using `str.format`.
-_keygen_template_base = dedent("""
+_keygen_template_base = dedent(
+ """
[ req ]
default_bits = 4096
distinguished_name = req_distinguished_name
@@ -76,19 +65,23 @@ _keygen_template_base = dedent("""
keyUsage=digitalSignature
subjectKeyIdentifier=hash
authorityKeyIdentifier=keyid
- """)
+ """
+)
_keygen_templates = {
- KeyType.KMOD: _keygen_template_base + dedent("""
+ KeyType.KMOD: _keygen_template_base
+ + dedent(
+ """
# codeSigning: specifies that this key is used to sign code.
# 1.3.6.1.4.1.2312.16.1.2: defines this key as used for
# module signing only. See https://lkml.org/lkml/2015/8/26/741.
extendedKeyUsage = codeSigning,1.3.6.1.4.1.2312.16.1.2
- """),
+ """
+ ),
KeyType.OPAL: _keygen_template_base,
KeyType.SIPL: _keygen_template_base,
- }
+}
# XXX see if we can do all disk operations in a tmpfs
@@ -100,9 +93,10 @@ def _temporary_path():
def _log_subprocess_run(args, **kwargs):
- check = kwargs.pop('check', False)
+ check = kwargs.pop("check", False)
process = subprocess.run(
- args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs)
+ args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs
+ )
for line in process.stdout.splitlines():
if isinstance(line, bytes):
line = line.decode("UTF-8", errors="replace")
@@ -112,18 +106,23 @@ def _log_subprocess_run(args, **kwargs):
# opportunity to see the command output in a debug log above.
if check and process.returncode:
raise CalledProcessError(
- process.returncode, process.args, output=process.stdout,
- stderr=process.stdout)
+ process.returncode,
+ process.args,
+ output=process.stdout,
+ stderr=process.stdout,
+ )
return process
-_gpg_conf = dedent("""\
+_gpg_conf = dedent(
+ """\
# Avoid wasting time verifying the local keyring's consistency.
no-auto-check-trustdb
# Prefer a SHA-2 hash where possible.
personal-digest-preferences SHA512 SHA384 SHA256 SHA224
- """)
+ """
+)
# Based loosely on gpgme/lang/python/tests/support.py:EphemeralContext.
@@ -134,9 +133,13 @@ def _gpg_context(tmp):
home.mkdir(mode=0o700)
try:
(home / "gpg.conf").write_text(_gpg_conf)
- (home / "gpg-agent.conf").write_text(dedent("""\
+ (home / "gpg-agent.conf").write_text(
+ dedent(
+ """\
disable-scdaemon
- """))
+ """
+ )
+ )
with gpg.Context(home_dir=str(home)) as ctx:
ctx.armor = True
@@ -157,7 +160,7 @@ def _gpg_context(tmp):
# Block until it is really gone.
while agent_socket.exists():
- time.sleep(.01)
+ time.sleep(0.01)
finally:
shutil.rmtree(home, ignore_errors=True)
@@ -201,7 +204,6 @@ class GPGError(Exception):
class KeyStorageEncryptedContainer(NaClEncryptedContainerBase):
-
@property
def public_key_bytes(self):
private_key_bytes = self.private_key_bytes
@@ -217,7 +219,7 @@ class KeyStorageEncryptedContainer(NaClEncryptedContainerBase):
private_keys = [
base64.b64decode(key.encode("UTF-8"))
for key in json.loads(config["key_storage"]["private_keys"])
- ]
+ ]
except Exception: # pragma: no cover
# Yes, this is uninformative, but we don't want to accidentally
# dump private keys in exceptions.
@@ -248,8 +250,15 @@ class Key(Storm):
created_at = DateTime(name="created_at", tzinfo=pytz.UTC, allow_none=False)
updated_at = DateTime(name="updated_at", tzinfo=pytz.UTC, allow_none=False)
- def __init__(self, key_type, fingerprint, private_key, public_key,
- created_at=DEFAULT, updated_at=DEFAULT):
+ def __init__(
+ self,
+ key_type,
+ fingerprint,
+ private_key,
+ public_key,
+ created_at=DEFAULT,
+ updated_at=DEFAULT,
+ ):
self.key_type = key_type
self.fingerprint = fingerprint
self.setPrivateKey(private_key)
@@ -258,8 +267,15 @@ class Key(Storm):
self.updated_at = updated_at
@classmethod
- def new(cls, key_type, fingerprint, private_key, public_key,
- created_at=DEFAULT, updated_at=DEFAULT):
+ def new(
+ cls,
+ key_type,
+ fingerprint,
+ private_key,
+ public_key,
+ created_at=DEFAULT,
+ updated_at=DEFAULT,
+ ):
"""Insert a key into the database.
:param key_type: The `KeyType` of this key.
@@ -274,13 +290,13 @@ class Key(Storm):
private_key=private_key,
public_key=public_key,
created_at=created_at,
- updated_at=updated_at)
+ updated_at=updated_at,
+ )
store.add(key)
return key
def __str__(self):
- return (
- f"<Key key_type={self.key_type} fingerprint={self.fingerprint}>")
+ return f"<Key key_type={self.key_type} fingerprint={self.fingerprint}>"
__repr__ = __str__
@@ -310,11 +326,12 @@ class Key(Storm):
:raises NoSuchKey: if the requested key does not exist.
"""
key = store.find(
- Key,
- Key.key_type == key_type, Key.fingerprint == fingerprint).one()
+ Key, Key.key_type == key_type, Key.fingerprint == fingerprint
+ ).one()
if key is None:
raise NoSuchKey.single(
- f"No {key_type} key with fingerprint {fingerprint}")
+ f"No {key_type} key with fingerprint {fingerprint}"
+ )
return key
@classmethod
@@ -328,7 +345,7 @@ class Key(Storm):
"""
if suffix:
suffix = f" {suffix}"
- return description[:64 - len(suffix)] + suffix
+ return description[: 64 - len(suffix)] + suffix
@classmethod
def _generateKeyCertPair(cls, tmp, key_type, common_name):
@@ -341,12 +358,28 @@ class Key(Storm):
"""
key = tmp / f"{key_type.name.lower()}.key"
cert = tmp / f"{key_type.name.lower()}.crt"
- common_name_esc = re.sub(r'([/=])', r'\\\1', common_name)
- _log_subprocess_run([
- "openssl", "req", "-new", "-x509", "-newkey", "rsa:2048",
- "-subj", f"/CN={common_name_esc}/", "-keyout", str(key),
- "-out", str(cert), "-days", "9131", "-nodes", "-sha256",
- ], check=True)
+ common_name_esc = re.sub(r"([/=])", r"\\\1", common_name)
+ _log_subprocess_run(
+ [
+ "openssl",
+ "req",
+ "-new",
+ "-x509",
+ "-newkey",
+ "rsa:2048",
+ "-subj",
+ f"/CN={common_name_esc}/",
+ "-keyout",
+ str(key),
+ "-out",
+ str(cert),
+ "-days",
+ "9131",
+ "-nodes",
+ "-sha256",
+ ],
+ check=True,
+ )
return key.read_bytes(), cert.read_bytes()
@classmethod
@@ -363,10 +396,17 @@ class Key(Storm):
cert_file_name = tmp / f"{key_type.name.lower()}.crt"
# Convert private key from PKCS#8 to PKCS#1 format
PKCS1_key = tmp / f"{key_type.name.lower()}.key"
- _log_subprocess_run([
- "openssl", "rsa", "-in", str(key_file_name),
- "-out", str(PKCS1_key),
- ], check=True)
+ _log_subprocess_run(
+ [
+ "openssl",
+ "rsa",
+ "-in",
+ str(key_file_name),
+ "-out",
+ str(PKCS1_key),
+ ],
+ check=True,
+ )
return PKCS1_key.read_bytes(), cert_file_name.read_bytes()
@classmethod
@@ -383,15 +423,42 @@ class Key(Storm):
keygen.write_text(keygen_template.format(common_name=common_name))
pem = tmp / f"{key_type.name.lower()}.pem"
cert = tmp / f"{key_type.name.lower()}.x509"
- _log_subprocess_run([
- "openssl", "req", "-new", "-nodes", "-utf8", "-sha512",
- "-days", "9131", "-batch", "-x509", "-config", str(keygen),
- "-outform", "PEM", "-out", str(pem), "-keyout", str(pem),
- ], check=True)
- _log_subprocess_run([
- "openssl", "x509", "-in", str(pem), "-outform", "DER",
- "-out", str(cert),
- ], check=True)
+ _log_subprocess_run(
+ [
+ "openssl",
+ "req",
+ "-new",
+ "-nodes",
+ "-utf8",
+ "-sha512",
+ "-days",
+ "9131",
+ "-batch",
+ "-x509",
+ "-config",
+ str(keygen),
+ "-outform",
+ "PEM",
+ "-out",
+ str(pem),
+ "-keyout",
+ str(pem),
+ ],
+ check=True,
+ )
+ _log_subprocess_run(
+ [
+ "openssl",
+ "x509",
+ "-in",
+ str(pem),
+ "-outform",
+ "DER",
+ "-out",
+ str(cert),
+ ],
+ check=True,
+ )
return pem.read_bytes(), cert.read_bytes()
@classmethod
@@ -408,14 +475,17 @@ class Key(Storm):
algorithm = "rsa%d" % length
else:
raise KeyGenerationError.single(
- f"Unknown OpenPGP key algorithm {openpgp_key_algorithm.name}")
+ f"Unknown OpenPGP key algorithm {openpgp_key_algorithm.name}"
+ )
key = ctx.create_key(
- name, algorithm=algorithm, expires=False, sign=True)
+ name, algorithm=algorithm, expires=False, sign=True
+ )
if not key.primary:
raise GPGError("Secret key generation failed.")
if key.sub:
raise GPGError(
- "Got an encryption subkey despite not asking for one.")
+ "Got an encryption subkey despite not asking for one."
+ )
# XXX cjwatson 2020-07-02: This could be simplified using
# ctx.key_export_secret and ctx.key_export_minimal once we have
@@ -446,16 +516,33 @@ class Key(Storm):
"""
private_key = tmp / f"{key_type.name.lower()}.priv"
public_key = tmp / f"{key_type.name.lower()}.pub"
- _log_subprocess_run([
- "openssl", "genpkey", "-algorithm", "RSA",
- "-out", str(private_key),
- "-pkeyopt", "rsa_keygen_bits:2048",
- "-pkeyopt", "rsa_keygen_pubexp:65537",
- ], check=True)
- _log_subprocess_run([
- "openssl", "pkey", "-in", str(private_key), "-pubout",
- "-out", str(public_key),
- ], check=True)
+ _log_subprocess_run(
+ [
+ "openssl",
+ "genpkey",
+ "-algorithm",
+ "RSA",
+ "-out",
+ str(private_key),
+ "-pkeyopt",
+ "rsa_keygen_bits:2048",
+ "-pkeyopt",
+ "rsa_keygen_pubexp:65537",
+ ],
+ check=True,
+ )
+ _log_subprocess_run(
+ [
+ "openssl",
+ "pkey",
+ "-in",
+ str(private_key),
+ "-pubout",
+ "-out",
+ str(public_key),
+ ],
+ check=True,
+ )
return private_key.read_bytes(), public_key.read_bytes()
@classmethod
@@ -466,15 +553,25 @@ class Key(Storm):
:param public_key: The public key (`bytes`).
:return: The fingerprint (`str`).
"""
- output = subprocess.run([
- "openssl", "x509",
- "-inform",
- "PEM" if key_type in (
- KeyType.UEFI, KeyType.FIT, KeyType.ANDROID_KERNEL) else "DER",
- "-noout", "-fingerprint",
- ], input=public_key, stdout=subprocess.PIPE, check=True).stdout
+ output = subprocess.run(
+ [
+ "openssl",
+ "x509",
+ "-inform",
+ "PEM"
+ if key_type
+ in (KeyType.UEFI, KeyType.FIT, KeyType.ANDROID_KERNEL)
+ else "DER",
+ "-noout",
+ "-fingerprint",
+ ],
+ input=public_key,
+ stdout=subprocess.PIPE,
+ check=True,
+ ).stdout
return (
- output.decode("UTF-8").rstrip("\n").split("=")[1].replace(":", ""))
+ output.decode("UTF-8").rstrip("\n").split("=")[1].replace(":", "")
+ )
@classmethod
def _getOpenPGPFingerprint(cls, ctx, public_key):
@@ -488,7 +585,8 @@ class Key(Storm):
if not getattr(result, "imported", 0):
_log.error("Failed to get fingerprint of new key: %s", result)
raise KeyImportError.single(
- f"Failed to get fingerprint of new key: {result}")
+ f"Failed to get fingerprint of new key: {result}"
+ )
return result.imports[0].fpr
@classmethod
@@ -498,14 +596,24 @@ class Key(Storm):
:param public_key: The public key (`bytes`).
:return: The fingerprint (`str`).
"""
- output = subprocess.run([
- "openssl", "pkey", "-pubin", "-outform", "DER",
- ], input=public_key, stdout=subprocess.PIPE, check=True).stdout
+ output = subprocess.run(
+ [
+ "openssl",
+ "pkey",
+ "-pubin",
+ "-outform",
+ "DER",
+ ],
+ input=public_key,
+ stdout=subprocess.PIPE,
+ check=True,
+ ).stdout
return hashlib.sha1(output).hexdigest().upper()
@classmethod
- def generate(cls, key_type, description, openpgp_key_algorithm=None,
- length=None):
+ def generate(
+ cls, key_type, description, openpgp_key_algorithm=None, length=None
+ ):
"""Generate a new key for an archive.
:param key_type: The `KeyType` to generate.
@@ -522,52 +630,66 @@ class Key(Storm):
try:
if key_type in (KeyType.UEFI, KeyType.FIT):
private_key, public_key = cls._generateKeyCertPair(
- tmp, key_type, common_name)
+ tmp, key_type, common_name
+ )
elif key_type in (KeyType.KMOD, KeyType.OPAL, KeyType.SIPL):
private_key, public_key = cls._generatePEMX509(
- tmp, key_type, common_name)
+ tmp, key_type, common_name
+ )
elif key_type == KeyType.OPENPGP:
with _gpg_context(tmp) as ctx:
- private_key, public_key, fingerprint = (
- cls._generateOpenPGP(
- ctx, openpgp_key_algorithm, length,
- description))
+ (
+ private_key,
+ public_key,
+ fingerprint,
+ ) = cls._generateOpenPGP(
+ ctx, openpgp_key_algorithm, length, description
+ )
elif key_type == KeyType.CV2_KERNEL:
private_key, public_key = cls._generateRSA(tmp, key_type)
elif key_type == KeyType.ANDROID_KERNEL:
private_key, public_key = cls._generateAndroidKernelKeys(
- tmp, key_type, common_name)
+ tmp, key_type, common_name
+ )
else:
raise KeyGenerationError.single(
- f"Unknown key type {key_type.name}")
+ f"Unknown key type {key_type.name}"
+ )
except KeyGenerationError:
raise
except Exception as e:
_log.error("Failed to generate key: %s", e)
raise KeyGenerationError.single(f"Failed to generate key: {e}")
if key_type in (
- KeyType.UEFI, KeyType.KMOD, KeyType.OPAL, KeyType.SIPL,
- KeyType.FIT, KeyType.ANDROID_KERNEL):
+ KeyType.UEFI,
+ KeyType.KMOD,
+ KeyType.OPAL,
+ KeyType.SIPL,
+ KeyType.FIT,
+ KeyType.ANDROID_KERNEL,
+ ):
try:
- fingerprint = cls._getX509Fingerprint(
- key_type, public_key)
+ fingerprint = cls._getX509Fingerprint(key_type, public_key)
except subprocess.CalledProcessError as e:
_log.error("Failed to get fingerprint of new key: %s", e)
raise KeyGenerationError.single(
- f"Failed to get fingerprint of new key: {e}")
+ f"Failed to get fingerprint of new key: {e}"
+ )
elif key_type == KeyType.CV2_KERNEL:
try:
fingerprint = cls._getRSAFingerprint(public_key)
except subprocess.CalledProcessError as e:
_log.error("Failed to get fingerprint of new key: %s", e)
raise KeyGenerationError.single(
- f"Failed to get fingerprint of new key: {e}")
+ f"Failed to get fingerprint of new key: {e}"
+ )
_log.info("Generated new key with fingerprint %s", fingerprint)
return cls.new(key_type, fingerprint, private_key, public_key)
@classmethod
- def inject(cls, key_type, private_key, public_key,
- description, created_at):
+ def inject(
+ cls, key_type, private_key, public_key, description, created_at
+ ):
"""Inject a key that was created in Launchpad.
:param key_type: The `KeyType` to inject.
@@ -581,14 +703,20 @@ class Key(Storm):
"""
_log.info("Injecting %s key", key_type)
if key_type in (
- KeyType.UEFI, KeyType.KMOD, KeyType.OPAL, KeyType.SIPL,
- KeyType.FIT, KeyType.ANDROID_KERNEL):
+ KeyType.UEFI,
+ KeyType.KMOD,
+ KeyType.OPAL,
+ KeyType.SIPL,
+ KeyType.FIT,
+ KeyType.ANDROID_KERNEL,
+ ):
try:
fingerprint = cls._getX509Fingerprint(key_type, public_key)
except subprocess.CalledProcessError as e:
_log.error("Failed to get fingerprint of new key: %s", e)
raise KeyImportError.single(
- f"Failed to get fingerprint of new key: {e}")
+ f"Failed to get fingerprint of new key: {e}"
+ )
elif key_type == KeyType.OPENPGP:
with _temporary_path() as tmp, _gpg_context(Path(tmp)) as ctx:
fingerprint = cls._getOpenPGPFingerprint(ctx, public_key)
@@ -598,24 +726,28 @@ class Key(Storm):
except subprocess.CalledProcessError as e:
_log.error("Failed to get fingerprint of new key: %s", e)
raise KeyImportError.single(
- f"Failed to get fingerprint of new key: {e}")
+ f"Failed to get fingerprint of new key: {e}"
+ )
else:
raise KeyImportError.single(f"Unknown key type {key_type.name}")
_log.info("Injecting new key with fingerprint %s", fingerprint)
try:
key = Key.getByTypeAndFingerprint(key_type, fingerprint)
if (key.getPrivateKey() == private_key) and (
- key.public_key == public_key):
+ key.public_key == public_key
+ ):
if key.created_at > created_at:
key.created_at = created_at
return key
else:
raise Conflict.single(
f"Rejected attempt to update already existent private "
- f"and public key material with fingerprint {fingerprint}")
+ f"and public key material with fingerprint {fingerprint}"
+ )
except NoSuchKey:
- return cls.new(key_type, fingerprint, private_key, public_key,
- created_at)
+ return cls.new(
+ key_type, fingerprint, private_key, public_key, created_at
+ )
@property
def authorizations(self):
@@ -623,7 +755,9 @@ class Key(Storm):
return [
key_authorization.client
for key_authorization in store.find(
- KeyAuthorization, KeyAuthorization.key == self)]
+ KeyAuthorization, KeyAuthorization.key == self
+ )
+ ]
def addAuthorization(self, client):
"""Authorize a client to sign with this key. If the client is
@@ -634,7 +768,8 @@ class Key(Storm):
existing_auth = store.find(
KeyAuthorization,
KeyAuthorization.key == self,
- KeyAuthorization.client == client).one()
+ KeyAuthorization.client == client,
+ ).one()
if existing_auth:
return
key_authorization = KeyAuthorization(self, client)
@@ -675,17 +810,22 @@ class Key(Storm):
else:
raise UnsupportedSignatureMode.single(
f"Signature mode {mode.name} not supported with "
- f"{self.key_type.name}")
+ f"{self.key_type.name}"
+ )
_log.info(
"Making %s signature of %s (SHA-256: %s) with %s key %s",
- mode.name.lower(), message_name,
- hashlib.sha256(message).hexdigest(), self.key_type,
- self.fingerprint)
+ mode.name.lower(),
+ message_name,
+ hashlib.sha256(message).hexdigest(),
+ self.key_type,
+ self.fingerprint,
+ )
with _gpg_context(tmp) as ctx:
import_result = _gpg_key_import(ctx, self.getPrivateKey())
if not getattr(import_result, "secret_imported", 0):
raise SignatureError.single(
- "Failed to import stored OpenPGP key")
+ "Failed to import stored OpenPGP key"
+ )
gpg_key = ctx.keylist(pattern=self.fingerprint, secret=True)
ctx.signers = list(gpg_key)
try:
@@ -733,38 +873,62 @@ class Key(Storm):
if self.key_type == KeyType.UEFI:
if mode == SignatureMode.ATTACHED:
cmd = [
- "sbsign", "--key", str(key), "--cert", str(cert),
+ "sbsign",
+ "--key",
+ str(key),
+ "--cert",
+ str(cert),
str(message_path),
- ]
+ ]
elif self.key_type in (KeyType.KMOD, KeyType.OPAL, KeyType.SIPL):
if mode == SignatureMode.DETACHED:
cmd = [
- "kmodsign", "-D", "sha512", str(key), str(cert),
- str(message_path), str(sig_path),
- ]
+ "kmodsign",
+ "-D",
+ "sha512",
+ str(key),
+ str(cert),
+ str(message_path),
+ str(sig_path),
+ ]
elif self.key_type == KeyType.FIT:
if mode == SignatureMode.ATTACHED:
# mkimage signs in place.
shutil.move(str(message_path), str(sig_path))
cmd = [
- "mkimage", "-F", "-k", str(tmp), "-r", str(sig_path),
- ]
+ "mkimage",
+ "-F",
+ "-k",
+ str(tmp),
+ "-r",
+ str(sig_path),
+ ]
elif self.key_type in (KeyType.CV2_KERNEL, KeyType.ANDROID_KERNEL):
if mode == SignatureMode.DETACHED:
cmd = [
- "openssl", "dgst", "-sha256", "-sign", str(key),
- "-out", str(sig_path), str(message_path),
- ]
+ "openssl",
+ "dgst",
+ "-sha256",
+ "-sign",
+ str(key),
+ "-out",
+ str(sig_path),
+ str(message_path),
+ ]
if cmd is None:
raise UnsupportedSignatureMode.single(
f"Signature mode {mode.name} not supported with "
- f"{self.key_type.name}")
+ f"{self.key_type.name}"
+ )
_log.info(
"Making %s signature of %s (SHA-256: %s) with %s key %s",
- mode.name.lower(), message_name,
- hashlib.sha256(message).hexdigest(), self.key_type,
- self.fingerprint)
+ mode.name.lower(),
+ message_name,
+ hashlib.sha256(message).hexdigest(),
+ self.key_type,
+ self.fingerprint,
+ )
try:
_log_subprocess_run(cmd, check=True)
except subprocess.CalledProcessError as e:
@@ -798,11 +962,13 @@ class KeyAuthorization(Storm):
return store.find(
KeyAuthorization,
KeyAuthorization.key == key,
- KeyAuthorization.client == client).one()
+ KeyAuthorization.client == client,
+ ).one()
@classmethod
def remove(cls, key, client):
store.find(
KeyAuthorization,
KeyAuthorization.key == key,
- KeyAuthorization.client == client).remove()
+ KeyAuthorization.client == client,
+ ).remove()
diff --git a/lp_signing/model/nonce.py b/lp_signing/model/nonce.py
index 578d7e7..3467ec8 100644
--- a/lp_signing/model/nonce.py
+++ b/lp_signing/model/nonce.py
@@ -5,27 +5,19 @@
__all__ = [
"Nonce",
- ]
+]
from datetime import timedelta
+import pytz
from flask_storm import store
from nacl.public import Box
from nacl.utils import random
-import pytz
-from storm.locals import (
- Bool,
- DateTime,
- Int,
- Not,
- Storm,
- RawStr,
- )
+from storm.locals import Bool, DateTime, Int, Not, RawStr, Storm
from lp_signing.database.constants import UTC_NOW
from lp_signing.exceptions import InvalidNonce
-
_lifetime = timedelta(seconds=60)
@@ -80,7 +72,8 @@ class Nonce(Storm):
Nonce,
Nonce.nonce == nonce,
Not(Nonce.used),
- Nonce.expires > UTC_NOW).one()
+ Nonce.expires > UTC_NOW,
+ ).one()
if row is None:
raise InvalidNonce.single("Invalid nonce")
row.used = True
diff --git a/lp_signing/model/tests/test_client.py b/lp_signing/model/tests/test_client.py
index 82a0177..1e72d65 100644
--- a/lp_signing/model/tests/test_client.py
+++ b/lp_signing/model/tests/test_client.py
@@ -12,7 +12,6 @@ from lp_signing.tests.testfixtures import DatabaseFixture
class TestClient(TestCase):
-
def setUp(self):
super().setUp()
self.useFixture(DatabaseFixture())
@@ -30,7 +29,8 @@ class TestClient(TestCase):
client.registerPublicKey(private_keys[1].public_key)
self.assertItemsEqual(
[private_key.public_key for private_key in private_keys],
- client.public_keys)
+ client.public_keys,
+ )
def test_unregisterPublicKey(self):
client = factory.create_client()
@@ -53,9 +53,12 @@ class TestClient(TestCase):
clients[0].registerPublicKey(private_keys[1].public_key)
clients[1].registerPublicKey(private_keys[2].public_key)
self.assertEqual(
- clients[0], Client.getByPublicKey(private_keys[0].public_key))
+ clients[0], Client.getByPublicKey(private_keys[0].public_key)
+ )
self.assertEqual(
- clients[0], Client.getByPublicKey(private_keys[1].public_key))
+ clients[0], Client.getByPublicKey(private_keys[1].public_key)
+ )
self.assertEqual(
- clients[1], Client.getByPublicKey(private_keys[2].public_key))
+ clients[1], Client.getByPublicKey(private_keys[2].public_key)
+ )
self.assertIsNone(Client.getByPublicKey(private_keys[3].public_key))
diff --git a/lp_signing/model/tests/test_key.py b/lp_signing/model/tests/test_key.py
index ab051b2..85a4178 100644
--- a/lp_signing/model/tests/test_key.py
+++ b/lp_signing/model/tests/test_key.py
@@ -3,18 +3,15 @@
"""Test the database model for signing keys."""
-from datetime import (
- datetime,
- timezone,
- )
import hashlib
-from pathlib import Path
import re
+from datetime import datetime, timezone
+from pathlib import Path
from tempfile import TemporaryDirectory
+import gpg
from fixtures import MockPatch
from flask_storm import store
-import gpg
from talisker.testing import TestContext
from testtools import TestCase
from testtools.matchers import (
@@ -27,27 +24,14 @@ from testtools.matchers import (
MatchesRegex,
MatchesStructure,
Not,
- )
+)
from lp_signing.database.helpers import get_transaction_timestamp
-from lp_signing.enums import (
- KeyType,
- OpenPGPKeyAlgorithm,
- SignatureMode,
- )
-from lp_signing.exceptions import (
- NoSuchKey,
- UnsupportedSignatureMode,
- )
-from lp_signing.model.key import (
- _gpg_context,
- Key,
- )
+from lp_signing.enums import KeyType, OpenPGPKeyAlgorithm, SignatureMode
+from lp_signing.exceptions import NoSuchKey, UnsupportedSignatureMode
+from lp_signing.model.key import Key, _gpg_context
from lp_signing.tests import factory
-from lp_signing.tests.matchers import (
- MatchesOpenPGPKey,
- RanCommand,
- )
+from lp_signing.tests.matchers import MatchesOpenPGPKey, RanCommand
from lp_signing.tests.testfixtures import (
ConfigOverrideFixture,
DatabaseFixture,
@@ -57,11 +41,10 @@ from lp_signing.tests.testfixtures import (
FakeOpenSSLSign,
FakeProcesses,
FakeSBSign,
- )
+)
class TestKey(TestCase):
-
def setUp(self):
super().setUp()
self.useFixture(ConfigOverrideFixture({}))
@@ -71,19 +54,22 @@ class TestKey(TestCase):
def test___str__(self):
key = factory.create_key(key_type=KeyType.UEFI, fingerprint="0" * 16)
self.assertEqual(
- "<Key key_type=UEFI fingerprint=0000000000000000>", str(key))
+ "<Key key_type=UEFI fingerprint=0000000000000000>", str(key)
+ )
def test_getByTypeAndFingerprint(self):
keys = []
for key_type in KeyType.items:
- keys.extend([
- factory.create_key(key_type=key_type) for _ in range(2)])
+ keys.extend(
+ [factory.create_key(key_type=key_type) for _ in range(2)]
+ )
for key in keys:
self.assertEqual(
- key,
- Key.getByTypeAndFingerprint(key.key_type, key.fingerprint))
+ key, Key.getByTypeAndFingerprint(key.key_type, key.fingerprint)
+ )
self.assertRaises(
- NoSuchKey, Key.getByTypeAndFingerprint, KeyType.UEFI, "0" * 16)
+ NoSuchKey, Key.getByTypeAndFingerprint, KeyType.UEFI, "0" * 16
+ )
def test__generateKeyCommonName_plain(self):
common_name = Key._generateKeyCommonName("PPA testing-team ppa")
@@ -91,7 +77,8 @@ class TestKey(TestCase):
def test__generateKeyCommonName_suffix(self):
common_name = Key._generateKeyCommonName(
- "PPA testing-team ppa", suffix="kmod")
+ "PPA testing-team ppa", suffix="kmod"
+ )
self.assertEqual("PPA testing-team ppa kmod", common_name)
def test__generateKeyCommonName_plain_just_short(self):
@@ -101,7 +88,8 @@ class TestKey(TestCase):
def test__generateKeyCommonName_suffix_just_short(self):
common_name = Key._generateKeyCommonName(
- f"PPA {'t' * 30} {'p' * 24}", suffix="kmod")
+ f"PPA {'t' * 30} {'p' * 24}", suffix="kmod"
+ )
self.assertEqual(f"PPA {'t' * 30} {'p' * 24} kmod", common_name)
self.assertEqual(64, len(common_name))
@@ -112,7 +100,8 @@ class TestKey(TestCase):
def test__generateKeyCommonName_suffix_long(self):
common_name = Key._generateKeyCommonName(
- f"PPA {'t' * 40} {'p' * 40}", suffix="kmod-plus")
+ f"PPA {'t' * 40} {'p' * 40}", suffix="kmod-plus"
+ )
self.assertEqual(f"PPA {'t' * 40} {'p' * 9} kmod-plus", common_name)
self.assertEqual(64, len(common_name))
@@ -124,34 +113,60 @@ class TestKey(TestCase):
self.processes_fixture.add(fake_openssl)
key = Key.generate(KeyType.UEFI, "~signing-owner/ubuntu/testing")
now = get_transaction_timestamp(store)
- self.assertThat(key, MatchesStructure.byEquality(
- key_type=KeyType.UEFI,
- fingerprint=fingerprint,
- public_key=public_key,
- created_at=now,
- updated_at=now))
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ key_type=KeyType.UEFI,
+ fingerprint=fingerprint,
+ public_key=public_key,
+ created_at=now,
+ updated_at=now,
+ ),
+ )
self.assertEqual(private_key, key.getPrivateKey())
self.assertEqual(
- key, Key.getByTypeAndFingerprint(KeyType.UEFI, fingerprint))
+ key, Key.getByTypeAndFingerprint(KeyType.UEFI, fingerprint)
+ )
req_args = [
- "openssl", "req", "-new", "-x509", "-newkey", "rsa:2048",
- "-subj", r"/CN=~signing-owner\/ubuntu\/testing UEFI/",
- "-keyout", EndsWith("uefi.key"), "-out", EndsWith("uefi.crt"),
- "-days", "9131", "-nodes", "-sha256",
- ]
+ "openssl",
+ "req",
+ "-new",
+ "-x509",
+ "-newkey",
+ "rsa:2048",
+ "-subj",
+ r"/CN=~signing-owner\/ubuntu\/testing UEFI/",
+ "-keyout",
+ EndsWith("uefi.key"),
+ "-out",
+ EndsWith("uefi.crt"),
+ "-days",
+ "9131",
+ "-nodes",
+ "-sha256",
+ ]
x509_args = [
- "openssl", "x509", "-inform", "PEM", "-noout", "-fingerprint",
- ]
+ "openssl",
+ "x509",
+ "-inform",
+ "PEM",
+ "-noout",
+ "-fingerprint",
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([
- RanCommand(req_args, stdin=Is(None)),
- RanCommand(
- x509_args,
- stdin=AfterPreprocessing(
- lambda f: f.getvalue(),
- Equals(public_key))),
- ]))
+ MatchesListwise(
+ [
+ RanCommand(req_args, stdin=Is(None)),
+ RanCommand(
+ x509_args,
+ stdin=AfterPreprocessing(
+ lambda f: f.getvalue(), Equals(public_key)
+ ),
+ ),
+ ]
+ ),
+ )
def test_generate_kmod(self):
private_key = factory.generate_random_bytes(size=64)
@@ -161,46 +176,88 @@ class TestKey(TestCase):
self.processes_fixture.add(fake_openssl)
key = Key.generate(KeyType.KMOD, "~signing-owner/ubuntu/testing")
now = get_transaction_timestamp(store)
- self.assertThat(key, MatchesStructure.byEquality(
- key_type=KeyType.KMOD,
- fingerprint=fingerprint,
- public_key=public_key,
- created_at=now,
- updated_at=now))
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ key_type=KeyType.KMOD,
+ fingerprint=fingerprint,
+ public_key=public_key,
+ created_at=now,
+ updated_at=now,
+ ),
+ )
self.assertEqual(private_key, key.getPrivateKey())
self.assertEqual(
- key, Key.getByTypeAndFingerprint(KeyType.KMOD, fingerprint))
+ key, Key.getByTypeAndFingerprint(KeyType.KMOD, fingerprint)
+ )
self.assertIn("[ req ]", fake_openssl.keygen_text)
- self.assertThat(fake_openssl.keygen_text, MatchesRegex(
- r".*\bCN\s*=\s*~signing-owner/ubuntu/testing\b", flags=re.S))
- self.assertThat(fake_openssl.keygen_text, MatchesRegex(
- r".*\bextendedKeyUsage\s*=\s*"
- r"codeSigning,1.3.6.1.4.1.2312.16.1.2\s*\b", flags=re.S))
+ self.assertThat(
+ fake_openssl.keygen_text,
+ MatchesRegex(
+ r".*\bCN\s*=\s*~signing-owner/ubuntu/testing\b", flags=re.S
+ ),
+ )
+ self.assertThat(
+ fake_openssl.keygen_text,
+ MatchesRegex(
+ r".*\bextendedKeyUsage\s*=\s*"
+ r"codeSigning,1.3.6.1.4.1.2312.16.1.2\s*\b",
+ flags=re.S,
+ ),
+ )
req_args = [
- "openssl", "req", "-new", "-nodes", "-utf8", "-sha512",
- "-days", "9131", "-batch", "-x509",
- "-config", EndsWith("kmod.keygen"),
- "-outform", "PEM", "-out", EndsWith("kmod.pem"),
- "-keyout", EndsWith("kmod.pem"),
- ]
+ "openssl",
+ "req",
+ "-new",
+ "-nodes",
+ "-utf8",
+ "-sha512",
+ "-days",
+ "9131",
+ "-batch",
+ "-x509",
+ "-config",
+ EndsWith("kmod.keygen"),
+ "-outform",
+ "PEM",
+ "-out",
+ EndsWith("kmod.pem"),
+ "-keyout",
+ EndsWith("kmod.pem"),
+ ]
x509_args = [
- "openssl", "x509", "-in", EndsWith("kmod.pem"),
- "-outform", "DER", "-out", EndsWith("kmod.x509"),
- ]
+ "openssl",
+ "x509",
+ "-in",
+ EndsWith("kmod.pem"),
+ "-outform",
+ "DER",
+ "-out",
+ EndsWith("kmod.x509"),
+ ]
fingerprint_args = [
- "openssl", "x509", "-inform", "DER", "-noout", "-fingerprint",
- ]
+ "openssl",
+ "x509",
+ "-inform",
+ "DER",
+ "-noout",
+ "-fingerprint",
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([
- RanCommand(req_args, stdin=Is(None)),
- RanCommand(x509_args, stdin=Is(None)),
- RanCommand(
- fingerprint_args,
- stdin=AfterPreprocessing(
- lambda f: f.getvalue(),
- Equals(public_key))),
- ]))
+ MatchesListwise(
+ [
+ RanCommand(req_args, stdin=Is(None)),
+ RanCommand(x509_args, stdin=Is(None)),
+ RanCommand(
+ fingerprint_args,
+ stdin=AfterPreprocessing(
+ lambda f: f.getvalue(), Equals(public_key)
+ ),
+ ),
+ ]
+ ),
+ )
def test_generate_opal(self):
private_key = factory.generate_random_bytes(size=64)
@@ -210,44 +267,81 @@ class TestKey(TestCase):
self.processes_fixture.add(fake_openssl)
key = Key.generate(KeyType.OPAL, "~signing-owner/ubuntu/testing")
now = get_transaction_timestamp(store)
- self.assertThat(key, MatchesStructure.byEquality(
- key_type=KeyType.OPAL,
- fingerprint=fingerprint,
- public_key=public_key,
- created_at=now,
- updated_at=now))
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ key_type=KeyType.OPAL,
+ fingerprint=fingerprint,
+ public_key=public_key,
+ created_at=now,
+ updated_at=now,
+ ),
+ )
self.assertEqual(private_key, key.getPrivateKey())
self.assertEqual(
- key, Key.getByTypeAndFingerprint(KeyType.OPAL, fingerprint))
+ key, Key.getByTypeAndFingerprint(KeyType.OPAL, fingerprint)
+ )
self.assertIn("[ req ]", fake_openssl.keygen_text)
- self.assertThat(fake_openssl.keygen_text, MatchesRegex(
- r".*\bCN\s*=\s*~signing-owner/ubuntu/testing\b", flags=re.S))
+ self.assertThat(
+ fake_openssl.keygen_text,
+ MatchesRegex(
+ r".*\bCN\s*=\s*~signing-owner/ubuntu/testing\b", flags=re.S
+ ),
+ )
self.assertNotIn("extendedKeyUsage", fake_openssl.keygen_text)
req_args = [
- "openssl", "req", "-new", "-nodes", "-utf8", "-sha512",
- "-days", "9131", "-batch", "-x509",
- "-config", EndsWith("opal.keygen"),
- "-outform", "PEM", "-out", EndsWith("opal.pem"),
- "-keyout", EndsWith("opal.pem"),
- ]
+ "openssl",
+ "req",
+ "-new",
+ "-nodes",
+ "-utf8",
+ "-sha512",
+ "-days",
+ "9131",
+ "-batch",
+ "-x509",
+ "-config",
+ EndsWith("opal.keygen"),
+ "-outform",
+ "PEM",
+ "-out",
+ EndsWith("opal.pem"),
+ "-keyout",
+ EndsWith("opal.pem"),
+ ]
x509_args = [
- "openssl", "x509", "-in", EndsWith("opal.pem"),
- "-outform", "DER", "-out", EndsWith("opal.x509"),
- ]
+ "openssl",
+ "x509",
+ "-in",
+ EndsWith("opal.pem"),
+ "-outform",
+ "DER",
+ "-out",
+ EndsWith("opal.x509"),
+ ]
fingerprint_args = [
- "openssl", "x509", "-inform", "DER", "-noout", "-fingerprint",
- ]
+ "openssl",
+ "x509",
+ "-inform",
+ "DER",
+ "-noout",
+ "-fingerprint",
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([
- RanCommand(req_args, stdin=Is(None)),
- RanCommand(x509_args, stdin=Is(None)),
- RanCommand(
- fingerprint_args,
- stdin=AfterPreprocessing(
- lambda f: f.getvalue(),
- Equals(public_key))),
- ]))
+ MatchesListwise(
+ [
+ RanCommand(req_args, stdin=Is(None)),
+ RanCommand(x509_args, stdin=Is(None)),
+ RanCommand(
+ fingerprint_args,
+ stdin=AfterPreprocessing(
+ lambda f: f.getvalue(), Equals(public_key)
+ ),
+ ),
+ ]
+ ),
+ )
def test_generate_sipl(self):
private_key = factory.generate_random_bytes(size=64)
@@ -257,44 +351,81 @@ class TestKey(TestCase):
self.processes_fixture.add(fake_openssl)
key = Key.generate(KeyType.SIPL, "~signing-owner/ubuntu/testing")
now = get_transaction_timestamp(store)
- self.assertThat(key, MatchesStructure.byEquality(
- key_type=KeyType.SIPL,
- fingerprint=fingerprint,
- public_key=public_key,
- created_at=now,
- updated_at=now))
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ key_type=KeyType.SIPL,
+ fingerprint=fingerprint,
+ public_key=public_key,
+ created_at=now,
+ updated_at=now,
+ ),
+ )
self.assertEqual(private_key, key.getPrivateKey())
self.assertEqual(
- key, Key.getByTypeAndFingerprint(KeyType.SIPL, fingerprint))
+ key, Key.getByTypeAndFingerprint(KeyType.SIPL, fingerprint)
+ )
self.assertIn("[ req ]", fake_openssl.keygen_text)
- self.assertThat(fake_openssl.keygen_text, MatchesRegex(
- r".*\bCN\s*=\s*~signing-owner/ubuntu/testing\b", flags=re.S))
+ self.assertThat(
+ fake_openssl.keygen_text,
+ MatchesRegex(
+ r".*\bCN\s*=\s*~signing-owner/ubuntu/testing\b", flags=re.S
+ ),
+ )
self.assertNotIn("extendedKeyUsage", fake_openssl.keygen_text)
req_args = [
- "openssl", "req", "-new", "-nodes", "-utf8", "-sha512",
- "-days", "9131", "-batch", "-x509",
- "-config", EndsWith("sipl.keygen"),
- "-outform", "PEM", "-out", EndsWith("sipl.pem"),
- "-keyout", EndsWith("sipl.pem"),
- ]
+ "openssl",
+ "req",
+ "-new",
+ "-nodes",
+ "-utf8",
+ "-sha512",
+ "-days",
+ "9131",
+ "-batch",
+ "-x509",
+ "-config",
+ EndsWith("sipl.keygen"),
+ "-outform",
+ "PEM",
+ "-out",
+ EndsWith("sipl.pem"),
+ "-keyout",
+ EndsWith("sipl.pem"),
+ ]
x509_args = [
- "openssl", "x509", "-in", EndsWith("sipl.pem"),
- "-outform", "DER", "-out", EndsWith("sipl.x509"),
- ]
+ "openssl",
+ "x509",
+ "-in",
+ EndsWith("sipl.pem"),
+ "-outform",
+ "DER",
+ "-out",
+ EndsWith("sipl.x509"),
+ ]
fingerprint_args = [
- "openssl", "x509", "-inform", "DER", "-noout", "-fingerprint",
- ]
+ "openssl",
+ "x509",
+ "-inform",
+ "DER",
+ "-noout",
+ "-fingerprint",
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([
- RanCommand(req_args, stdin=Is(None)),
- RanCommand(x509_args, stdin=Is(None)),
- RanCommand(
- fingerprint_args,
- stdin=AfterPreprocessing(
- lambda f: f.getvalue(),
- Equals(public_key))),
- ]))
+ MatchesListwise(
+ [
+ RanCommand(req_args, stdin=Is(None)),
+ RanCommand(x509_args, stdin=Is(None)),
+ RanCommand(
+ fingerprint_args,
+ stdin=AfterPreprocessing(
+ lambda f: f.getvalue(), Equals(public_key)
+ ),
+ ),
+ ]
+ ),
+ )
def test_generate_fit(self):
private_key = factory.generate_random_bytes(size=64)
@@ -304,76 +435,125 @@ class TestKey(TestCase):
self.processes_fixture.add(fake_openssl)
key = Key.generate(KeyType.FIT, "~signing-owner/ubuntu/testing")
now = get_transaction_timestamp(store)
- self.assertThat(key, MatchesStructure.byEquality(
- key_type=KeyType.FIT,
- fingerprint=fingerprint,
- public_key=public_key,
- created_at=now,
- updated_at=now))
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ key_type=KeyType.FIT,
+ fingerprint=fingerprint,
+ public_key=public_key,
+ created_at=now,
+ updated_at=now,
+ ),
+ )
self.assertEqual(private_key, key.getPrivateKey())
self.assertEqual(
- key, Key.getByTypeAndFingerprint(KeyType.FIT, fingerprint))
+ key, Key.getByTypeAndFingerprint(KeyType.FIT, fingerprint)
+ )
req_args = [
- "openssl", "req", "-new", "-x509", "-newkey", "rsa:2048",
- "-subj", r"/CN=~signing-owner\/ubuntu\/testing FIT/",
- "-keyout", EndsWith("fit.key"), "-out", EndsWith("fit.crt"),
- "-days", "9131", "-nodes", "-sha256",
- ]
+ "openssl",
+ "req",
+ "-new",
+ "-x509",
+ "-newkey",
+ "rsa:2048",
+ "-subj",
+ r"/CN=~signing-owner\/ubuntu\/testing FIT/",
+ "-keyout",
+ EndsWith("fit.key"),
+ "-out",
+ EndsWith("fit.crt"),
+ "-days",
+ "9131",
+ "-nodes",
+ "-sha256",
+ ]
x509_args = [
- "openssl", "x509", "-inform", "PEM", "-noout", "-fingerprint",
- ]
+ "openssl",
+ "x509",
+ "-inform",
+ "PEM",
+ "-noout",
+ "-fingerprint",
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([
- RanCommand(req_args, stdin=Is(None)),
- RanCommand(
- x509_args,
- stdin=AfterPreprocessing(
- lambda f: f.getvalue(),
- Equals(public_key))),
- ]))
+ MatchesListwise(
+ [
+ RanCommand(req_args, stdin=Is(None)),
+ RanCommand(
+ x509_args,
+ stdin=AfterPreprocessing(
+ lambda f: f.getvalue(), Equals(public_key)
+ ),
+ ),
+ ]
+ ),
+ )
def test_generate_openpgp(self):
key = Key.generate(
- KeyType.OPENPGP, "~signing-owner/ubuntu/testing",
- openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA, length=1024)
+ KeyType.OPENPGP,
+ "~signing-owner/ubuntu/testing",
+ openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA,
+ length=1024,
+ )
now = get_transaction_timestamp(store)
- self.assertThat(key, MatchesStructure(
- key_type=Equals(KeyType.OPENPGP),
- fingerprint=HasLength(40),
- public_key=MatchesOpenPGPKey(
- MatchesStructure(
- fpr=Equals(key.fingerprint),
- secret=Equals(0),
- subkeys=MatchesListwise([
- MatchesStructure.byEquality(
- pubkey_algo=1,
- length=1024),
- ]),
- uids=MatchesListwise([
- MatchesStructure.byEquality(
- uid="~signing-owner/ubuntu/testing"),
- ]))),
- created_at=Equals(now),
- updated_at=Equals(now)))
+ self.assertThat(
+ key,
+ MatchesStructure(
+ key_type=Equals(KeyType.OPENPGP),
+ fingerprint=HasLength(40),
+ public_key=MatchesOpenPGPKey(
+ MatchesStructure(
+ fpr=Equals(key.fingerprint),
+ secret=Equals(0),
+ subkeys=MatchesListwise(
+ [
+ MatchesStructure.byEquality(
+ pubkey_algo=1, length=1024
+ ),
+ ]
+ ),
+ uids=MatchesListwise(
+ [
+ MatchesStructure.byEquality(
+ uid="~signing-owner/ubuntu/testing"
+ ),
+ ]
+ ),
+ )
+ ),
+ created_at=Equals(now),
+ updated_at=Equals(now),
+ ),
+ )
self.assertThat(
key.getPrivateKey(),
MatchesOpenPGPKey(
MatchesStructure(
fpr=Equals(key.fingerprint),
secret=Equals(1),
- subkeys=MatchesListwise([
- MatchesStructure.byEquality(
- pubkey_algo=1,
- length=1024),
- ]),
- uids=MatchesListwise([
- MatchesStructure.byEquality(
- uid="~signing-owner/ubuntu/testing"),
- ])),
- secret=True))
+ subkeys=MatchesListwise(
+ [
+ MatchesStructure.byEquality(
+ pubkey_algo=1, length=1024
+ ),
+ ]
+ ),
+ uids=MatchesListwise(
+ [
+ MatchesStructure.byEquality(
+ uid="~signing-owner/ubuntu/testing"
+ ),
+ ]
+ ),
+ ),
+ secret=True,
+ ),
+ )
self.assertEqual(
- key, Key.getByTypeAndFingerprint(KeyType.OPENPGP, key.fingerprint))
+ key, Key.getByTypeAndFingerprint(KeyType.OPENPGP, key.fingerprint)
+ )
def test_generate_cv2_kernel(self):
private_key = factory.generate_random_bytes(size=64)
@@ -383,37 +563,57 @@ class TestKey(TestCase):
self.processes_fixture.add(fake_openssl)
key = Key.generate(KeyType.CV2_KERNEL, "~signing-owner/ubuntu/testing")
now = get_transaction_timestamp(store)
- self.assertThat(key, MatchesStructure.byEquality(
- key_type=KeyType.CV2_KERNEL,
- fingerprint=fingerprint,
- public_key=public_key,
- created_at=now,
- updated_at=now))
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ key_type=KeyType.CV2_KERNEL,
+ fingerprint=fingerprint,
+ public_key=public_key,
+ created_at=now,
+ updated_at=now,
+ ),
+ )
self.assertEqual(private_key, key.getPrivateKey())
self.assertEqual(
- key, Key.getByTypeAndFingerprint(KeyType.CV2_KERNEL, fingerprint))
+ key, Key.getByTypeAndFingerprint(KeyType.CV2_KERNEL, fingerprint)
+ )
genpkey_args = [
- "openssl", "genpkey", "-algorithm", "RSA",
- "-out", EndsWith("cv2_kernel.priv"),
- "-pkeyopt", "rsa_keygen_bits:2048",
- "-pkeyopt", "rsa_keygen_pubexp:65537",
- ]
+ "openssl",
+ "genpkey",
+ "-algorithm",
+ "RSA",
+ "-out",
+ EndsWith("cv2_kernel.priv"),
+ "-pkeyopt",
+ "rsa_keygen_bits:2048",
+ "-pkeyopt",
+ "rsa_keygen_pubexp:65537",
+ ]
pkey_args = [
- "openssl", "pkey", "-in", EndsWith("cv2_kernel.priv"),
- "-pubout", "-out", EndsWith("cv2_kernel.pub"),
- ]
+ "openssl",
+ "pkey",
+ "-in",
+ EndsWith("cv2_kernel.priv"),
+ "-pubout",
+ "-out",
+ EndsWith("cv2_kernel.pub"),
+ ]
pkey_der_args = ["openssl", "pkey", "-pubin", "-outform", "DER"]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([
- RanCommand(genpkey_args, stdin=Is(None)),
- RanCommand(pkey_args, stdin=Is(None)),
- RanCommand(
- pkey_der_args,
- stdin=AfterPreprocessing(
- lambda f: f.getvalue(),
- Equals(public_key))),
- ]))
+ MatchesListwise(
+ [
+ RanCommand(genpkey_args, stdin=Is(None)),
+ RanCommand(pkey_args, stdin=Is(None)),
+ RanCommand(
+ pkey_der_args,
+ stdin=AfterPreprocessing(
+ lambda f: f.getvalue(), Equals(public_key)
+ ),
+ ),
+ ]
+ ),
+ )
def test_generate_android_kernel(self):
private_key = factory.generate_random_bytes(size=64)
@@ -421,50 +621,82 @@ class TestKey(TestCase):
public_key = factory.generate_random_bytes(size=64)
fingerprint = hashlib.sha1(public_key).hexdigest().upper()
fake_openssl = FakeOpenSSL(
- private_key, public_key, fingerprint,
- private_key_pkcs1=private_key_pkcs1)
+ private_key,
+ public_key,
+ fingerprint,
+ private_key_pkcs1=private_key_pkcs1,
+ )
self.processes_fixture.add(fake_openssl)
key = Key.generate(
- KeyType.ANDROID_KERNEL,
- "~signing-owner/ubuntu/testing")
+ KeyType.ANDROID_KERNEL, "~signing-owner/ubuntu/testing"
+ )
now = get_transaction_timestamp(store)
- self.assertThat(key, MatchesStructure.byEquality(
- key_type=KeyType.ANDROID_KERNEL,
- fingerprint=fingerprint,
- public_key=public_key,
- created_at=now,
- updated_at=now))
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ key_type=KeyType.ANDROID_KERNEL,
+ fingerprint=fingerprint,
+ public_key=public_key,
+ created_at=now,
+ updated_at=now,
+ ),
+ )
self.assertEqual(private_key_pkcs1, key.getPrivateKey())
self.assertEqual(
- key, Key.getByTypeAndFingerprint(
- KeyType.ANDROID_KERNEL, fingerprint))
+ key,
+ Key.getByTypeAndFingerprint(KeyType.ANDROID_KERNEL, fingerprint),
+ )
req_args = [
- "openssl", "req", "-new", "-x509", "-newkey", "rsa:2048",
- "-subj", r"/CN=~signing-owner\/ubuntu\/testing Android Kernel/",
- "-keyout", EndsWith("android_kernel.key"),
- "-out", EndsWith("android_kernel.crt"), "-days", "9131",
- "-nodes", "-sha256",
- ]
+ "openssl",
+ "req",
+ "-new",
+ "-x509",
+ "-newkey",
+ "rsa:2048",
+ "-subj",
+ r"/CN=~signing-owner\/ubuntu\/testing Android Kernel/",
+ "-keyout",
+ EndsWith("android_kernel.key"),
+ "-out",
+ EndsWith("android_kernel.crt"),
+ "-days",
+ "9131",
+ "-nodes",
+ "-sha256",
+ ]
rsa_args = [
- "openssl", "rsa", "-in", EndsWith("android_kernel.key"),
- "-out", EndsWith("android_kernel.key"),
- ]
+ "openssl",
+ "rsa",
+ "-in",
+ EndsWith("android_kernel.key"),
+ "-out",
+ EndsWith("android_kernel.key"),
+ ]
x509_args = [
- "openssl", "x509", "-inform", "PEM", "-noout", "-fingerprint",
- ]
+ "openssl",
+ "x509",
+ "-inform",
+ "PEM",
+ "-noout",
+ "-fingerprint",
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([
- RanCommand(req_args, stdin=Is(None)),
- RanCommand(rsa_args, stdin=Is(None)),
- RanCommand(
- x509_args,
- stdin=AfterPreprocessing(
- lambda f: f.getvalue(),
- Equals(public_key))),
- ]))
+ MatchesListwise(
+ [
+ RanCommand(req_args, stdin=Is(None)),
+ RanCommand(rsa_args, stdin=Is(None)),
+ RanCommand(
+ x509_args,
+ stdin=AfterPreprocessing(
+ lambda f: f.getvalue(), Equals(public_key)
+ ),
+ ),
+ ]
+ ),
+ )
def test_addAuthorization(self):
key = factory.create_key()
@@ -511,40 +743,60 @@ class TestKey(TestCase):
with TestContext() as ctx:
self.assertEqual(
b"test signed data",
- key.sign("t.efi", message, SignatureMode.ATTACHED))
+ key.sign("t.efi", message, SignatureMode.ATTACHED),
+ )
self.assertEqual(message, fake_sbsign.message_bytes)
sbsign_args = [
- "sbsign", "--key", EndsWith("uefi.key"),
- "--cert", EndsWith("uefi.crt"), EndsWith("t.efi"),
- ]
+ "sbsign",
+ "--key",
+ EndsWith("uefi.key"),
+ "--cert",
+ EndsWith("uefi.crt"),
+ EndsWith("t.efi"),
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([RanCommand(sbsign_args)]))
+ MatchesListwise([RanCommand(sbsign_args)]),
+ )
message_sha256 = hashlib.sha256(message).hexdigest()
ctx.assert_log(
level="info",
message=(
f"Making attached signature of t.efi "
f"(SHA-256: {message_sha256}) with UEFI key "
- f"{key.fingerprint}"))
+ f"{key.fingerprint}"
+ ),
+ )
def test_sign_uefi_detached_unsupported(self):
key = factory.create_key(KeyType.UEFI)
self.assertRaises(
UnsupportedSignatureMode,
- key.sign, "t.efi", b"test data", SignatureMode.DETACHED)
+ key.sign,
+ "t.efi",
+ b"test data",
+ SignatureMode.DETACHED,
+ )
def test_sign_uefi_clear_unsupported(self):
key = factory.create_key(KeyType.UEFI)
self.assertRaises(
UnsupportedSignatureMode,
- key.sign, "t.efi", b"test data", SignatureMode.CLEAR)
+ key.sign,
+ "t.efi",
+ b"test data",
+ SignatureMode.CLEAR,
+ )
def test_sign_kmod_attached_unsupported(self):
key = factory.create_key(KeyType.KMOD)
self.assertRaises(
UnsupportedSignatureMode,
- key.sign, "t.ko", b"test data", SignatureMode.ATTACHED)
+ key.sign,
+ "t.ko",
+ b"test data",
+ SignatureMode.ATTACHED,
+ )
def test_sign_kmod_detached(self):
key = factory.create_key(KeyType.KMOD)
@@ -554,35 +806,51 @@ class TestKey(TestCase):
with TestContext() as ctx:
self.assertEqual(
b"test signed data",
- key.sign("t.ko", message, SignatureMode.DETACHED))
+ key.sign("t.ko", message, SignatureMode.DETACHED),
+ )
self.assertEqual(message, fake_kmodsign.message_bytes)
kmodsign_args = [
- "kmodsign", "-D", "sha512",
- EndsWith("kmod.key"), EndsWith("kmod.crt"),
- EndsWith("t.ko"), EndsWith("t.ko.sig"),
- ]
+ "kmodsign",
+ "-D",
+ "sha512",
+ EndsWith("kmod.key"),
+ EndsWith("kmod.crt"),
+ EndsWith("t.ko"),
+ EndsWith("t.ko.sig"),
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([RanCommand(kmodsign_args)]))
+ MatchesListwise([RanCommand(kmodsign_args)]),
+ )
message_sha256 = hashlib.sha256(message).hexdigest()
ctx.assert_log(
level="info",
message=(
f"Making detached signature of t.ko "
f"(SHA-256: {message_sha256}) with Kmod key "
- f"{key.fingerprint}"))
+ f"{key.fingerprint}"
+ ),
+ )
def test_sign_kmod_clear_unsupported(self):
key = factory.create_key(KeyType.KMOD)
self.assertRaises(
UnsupportedSignatureMode,
- key.sign, "t.ko", b"test data", SignatureMode.CLEAR)
+ key.sign,
+ "t.ko",
+ b"test data",
+ SignatureMode.CLEAR,
+ )
def test_sign_opal_attached_unsupported(self):
key = factory.create_key(KeyType.OPAL)
self.assertRaises(
UnsupportedSignatureMode,
- key.sign, "t.opal", b"test data", SignatureMode.ATTACHED)
+ key.sign,
+ "t.opal",
+ b"test data",
+ SignatureMode.ATTACHED,
+ )
def test_sign_opal_detached(self):
key = factory.create_key(KeyType.OPAL)
@@ -592,35 +860,51 @@ class TestKey(TestCase):
with TestContext() as ctx:
self.assertEqual(
b"test signed data",
- key.sign("t.opal", message, SignatureMode.DETACHED))
+ key.sign("t.opal", message, SignatureMode.DETACHED),
+ )
self.assertEqual(message, fake_kmodsign.message_bytes)
kmodsign_args = [
- "kmodsign", "-D", "sha512",
- EndsWith("opal.key"), EndsWith("opal.crt"),
- EndsWith("t.opal"), EndsWith("t.opal.sig"),
- ]
+ "kmodsign",
+ "-D",
+ "sha512",
+ EndsWith("opal.key"),
+ EndsWith("opal.crt"),
+ EndsWith("t.opal"),
+ EndsWith("t.opal.sig"),
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([RanCommand(kmodsign_args)]))
+ MatchesListwise([RanCommand(kmodsign_args)]),
+ )
message_sha256 = hashlib.sha256(message).hexdigest()
ctx.assert_log(
level="info",
message=(
f"Making detached signature of t.opal "
f"(SHA-256: {message_sha256}) with OPAL key "
- f"{key.fingerprint}"))
+ f"{key.fingerprint}"
+ ),
+ )
def test_sign_opal_clear_unsupported(self):
key = factory.create_key(KeyType.OPAL)
self.assertRaises(
UnsupportedSignatureMode,
- key.sign, "t.opal", b"test data", SignatureMode.CLEAR)
+ key.sign,
+ "t.opal",
+ b"test data",
+ SignatureMode.CLEAR,
+ )
def test_sign_sipl_attached_unsupported(self):
key = factory.create_key(KeyType.SIPL)
self.assertRaises(
UnsupportedSignatureMode,
- key.sign, "t.sipl", b"test data", SignatureMode.ATTACHED)
+ key.sign,
+ "t.sipl",
+ b"test data",
+ SignatureMode.ATTACHED,
+ )
def test_sign_sipl_detached(self):
key = factory.create_key(KeyType.SIPL)
@@ -630,29 +914,41 @@ class TestKey(TestCase):
with TestContext() as ctx:
self.assertEqual(
b"test signed data",
- key.sign("t.sipl", message, SignatureMode.DETACHED))
+ key.sign("t.sipl", message, SignatureMode.DETACHED),
+ )
self.assertEqual(message, fake_kmodsign.message_bytes)
kmodsign_args = [
- "kmodsign", "-D", "sha512",
- EndsWith("sipl.key"), EndsWith("sipl.crt"),
- EndsWith("t.sipl"), EndsWith("t.sipl.sig"),
- ]
+ "kmodsign",
+ "-D",
+ "sha512",
+ EndsWith("sipl.key"),
+ EndsWith("sipl.crt"),
+ EndsWith("t.sipl"),
+ EndsWith("t.sipl.sig"),
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([RanCommand(kmodsign_args)]))
+ MatchesListwise([RanCommand(kmodsign_args)]),
+ )
message_sha256 = hashlib.sha256(message).hexdigest()
ctx.assert_log(
level="info",
message=(
f"Making detached signature of t.sipl "
f"(SHA-256: {message_sha256}) with SIPL key "
- f"{key.fingerprint}"))
+ f"{key.fingerprint}"
+ ),
+ )
def test_sign_sipl_clear_unsupported(self):
key = factory.create_key(KeyType.SIPL)
self.assertRaises(
UnsupportedSignatureMode,
- key.sign, "t.sipl", b"test data", SignatureMode.CLEAR)
+ key.sign,
+ "t.sipl",
+ b"test data",
+ SignatureMode.CLEAR,
+ )
def test_sign_fit_attached(self):
key = factory.create_key(KeyType.FIT)
@@ -662,103 +958,153 @@ class TestKey(TestCase):
with TestContext() as ctx:
self.assertEqual(
b"test signed data",
- key.sign("t.fit", message, SignatureMode.ATTACHED))
+ key.sign("t.fit", message, SignatureMode.ATTACHED),
+ )
self.assertEqual(message, fake_mkimage.message_bytes)
mkimage_args = [
- "mkimage", "-F", "-k", Not(Is(None)), "-r",
+ "mkimage",
+ "-F",
+ "-k",
+ Not(Is(None)),
+ "-r",
EndsWith("t.fit.signed"),
- ]
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([RanCommand(mkimage_args)]))
+ MatchesListwise([RanCommand(mkimage_args)]),
+ )
message_sha256 = hashlib.sha256(message).hexdigest()
ctx.assert_log(
level="info",
message=(
f"Making attached signature of t.fit "
f"(SHA-256: {message_sha256}) with FIT key "
- f"{key.fingerprint}"))
+ f"{key.fingerprint}"
+ ),
+ )
def test_sign_fit_detached_unsupported(self):
key = factory.create_key(KeyType.FIT)
self.assertRaises(
UnsupportedSignatureMode,
- key.sign, "t.fit", b"test data", SignatureMode.DETACHED)
+ key.sign,
+ "t.fit",
+ b"test data",
+ SignatureMode.DETACHED,
+ )
def test_sign_fit_clear_unsupported(self):
key = factory.create_key(KeyType.FIT)
self.assertRaises(
UnsupportedSignatureMode,
- key.sign, "t.fit", b"test data", SignatureMode.CLEAR)
+ key.sign,
+ "t.fit",
+ b"test data",
+ SignatureMode.CLEAR,
+ )
def test_sign_openpgp_attached(self):
key = Key.generate(
- KeyType.OPENPGP, "~signing-owner/ubuntu/testing",
- openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA, length=1024)
+ KeyType.OPENPGP,
+ "~signing-owner/ubuntu/testing",
+ openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA,
+ length=1024,
+ )
message = b"test data"
- mock_sign = self.useFixture(MockPatch(
- "gpg.Context.sign", return_value=(b"test signed data", None))).mock
+ mock_sign = self.useFixture(
+ MockPatch(
+ "gpg.Context.sign", return_value=(b"test signed data", None)
+ )
+ ).mock
with TestContext() as ctx:
self.assertEqual(
b"test signed data",
- key.sign("t.gpg", message, SignatureMode.ATTACHED))
+ key.sign("t.gpg", message, SignatureMode.ATTACHED),
+ )
mock_sign.assert_called_once_with(
- message, mode=gpg.constants.SIG_MODE_NORMAL)
+ message, mode=gpg.constants.SIG_MODE_NORMAL
+ )
message_sha256 = hashlib.sha256(message).hexdigest()
ctx.assert_log(
level="info",
message=(
f"Making attached signature of t.gpg "
f"(SHA-256: {message_sha256}) with OpenPGP key "
- f"{key.fingerprint}"))
+ f"{key.fingerprint}"
+ ),
+ )
def test_sign_openpgp_detached(self):
key = Key.generate(
- KeyType.OPENPGP, "~signing-owner/ubuntu/testing",
- openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA, length=1024)
+ KeyType.OPENPGP,
+ "~signing-owner/ubuntu/testing",
+ openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA,
+ length=1024,
+ )
message = b"test data"
- mock_sign = self.useFixture(MockPatch(
- "gpg.Context.sign", return_value=(b"test signed data", None))).mock
+ mock_sign = self.useFixture(
+ MockPatch(
+ "gpg.Context.sign", return_value=(b"test signed data", None)
+ )
+ ).mock
with TestContext() as ctx:
self.assertEqual(
b"test signed data",
- key.sign("t.gpg", message, SignatureMode.DETACHED))
+ key.sign("t.gpg", message, SignatureMode.DETACHED),
+ )
mock_sign.assert_called_once_with(
- message, mode=gpg.constants.SIG_MODE_DETACH)
+ message, mode=gpg.constants.SIG_MODE_DETACH
+ )
message_sha256 = hashlib.sha256(message).hexdigest()
ctx.assert_log(
level="info",
message=(
f"Making detached signature of t.gpg "
f"(SHA-256: {message_sha256}) with OpenPGP key "
- f"{key.fingerprint}"))
+ f"{key.fingerprint}"
+ ),
+ )
def test_sign_openpgp_clear(self):
key = Key.generate(
- KeyType.OPENPGP, "~signing-owner/ubuntu/testing",
- openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA, length=1024)
+ KeyType.OPENPGP,
+ "~signing-owner/ubuntu/testing",
+ openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA,
+ length=1024,
+ )
message = b"test data"
- mock_sign = self.useFixture(MockPatch(
- "gpg.Context.sign", return_value=(b"test signed data", None))).mock
+ mock_sign = self.useFixture(
+ MockPatch(
+ "gpg.Context.sign", return_value=(b"test signed data", None)
+ )
+ ).mock
with TestContext() as ctx:
self.assertEqual(
b"test signed data",
- key.sign("t.gpg", message, SignatureMode.CLEAR))
+ key.sign("t.gpg", message, SignatureMode.CLEAR),
+ )
mock_sign.assert_called_once_with(
- message, mode=gpg.constants.SIG_MODE_CLEAR)
+ message, mode=gpg.constants.SIG_MODE_CLEAR
+ )
message_sha256 = hashlib.sha256(message).hexdigest()
ctx.assert_log(
level="info",
message=(
f"Making clear signature of t.gpg "
f"(SHA-256: {message_sha256}) with OpenPGP key "
- f"{key.fingerprint}"))
+ f"{key.fingerprint}"
+ ),
+ )
def test_sign_cv2_kernel_attached_unsupported(self):
key = factory.create_key(KeyType.CV2_KERNEL)
self.assertRaises(
UnsupportedSignatureMode,
- key.sign, "t.cv2_kernel", b"test data", SignatureMode.ATTACHED)
+ key.sign,
+ "t.cv2_kernel",
+ b"test data",
+ SignatureMode.ATTACHED,
+ )
def test_sign_cv2_kernel_detached(self):
key = factory.create_key(KeyType.CV2_KERNEL)
@@ -768,34 +1114,52 @@ class TestKey(TestCase):
with TestContext() as ctx:
self.assertEqual(
b"test signed data",
- key.sign("t.cv2_kernel", message, SignatureMode.DETACHED))
+ key.sign("t.cv2_kernel", message, SignatureMode.DETACHED),
+ )
self.assertEqual(message, fake_openssl_sign.message_bytes)
openssl_sign_args = [
- "openssl", "dgst", "-sha256", "-sign", EndsWith("cv2_kernel.priv"),
- "-out", EndsWith("t.cv2_kernel.sig"), EndsWith("t.cv2_kernel"),
- ]
+ "openssl",
+ "dgst",
+ "-sha256",
+ "-sign",
+ EndsWith("cv2_kernel.priv"),
+ "-out",
+ EndsWith("t.cv2_kernel.sig"),
+ EndsWith("t.cv2_kernel"),
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([RanCommand(openssl_sign_args)]))
+ MatchesListwise([RanCommand(openssl_sign_args)]),
+ )
message_sha256 = hashlib.sha256(message).hexdigest()
ctx.assert_log(
level="info",
message=(
f"Making detached signature of t.cv2_kernel "
f"(SHA-256: {message_sha256}) with CV2 Kernel key "
- f"{key.fingerprint}"))
+ f"{key.fingerprint}"
+ ),
+ )
def test_sign_cv2_kernel_clear_unsupported(self):
key = factory.create_key(KeyType.CV2_KERNEL)
self.assertRaises(
UnsupportedSignatureMode,
- key.sign, "t.cv2_kernel", b"test data", SignatureMode.CLEAR)
+ key.sign,
+ "t.cv2_kernel",
+ b"test data",
+ SignatureMode.CLEAR,
+ )
def test_sign_android_kernel_attached_unsupported(self):
key = factory.create_key(KeyType.ANDROID_KERNEL)
self.assertRaises(
UnsupportedSignatureMode,
- key.sign, "t.android_kernel", b"test data", SignatureMode.ATTACHED)
+ key.sign,
+ "t.android_kernel",
+ b"test data",
+ SignatureMode.ATTACHED,
+ )
def test_sign_android_kernel_detached(self):
key = factory.create_key(KeyType.ANDROID_KERNEL)
@@ -805,30 +1169,42 @@ class TestKey(TestCase):
with TestContext() as ctx:
self.assertEqual(
b"test signed data",
- key.sign("t.android_kernel", message, SignatureMode.DETACHED))
+ key.sign("t.android_kernel", message, SignatureMode.DETACHED),
+ )
self.assertEqual(message, fake_openssl_sign.message_bytes)
openssl_sign_args = [
- "openssl", "dgst", "-sha256", "-sign",
+ "openssl",
+ "dgst",
+ "-sha256",
+ "-sign",
EndsWith("android_kernel.key"),
- "-out", EndsWith("t.android_kernel.sig"),
+ "-out",
+ EndsWith("t.android_kernel.sig"),
EndsWith("t.android_kernel"),
- ]
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([RanCommand(openssl_sign_args)]))
+ MatchesListwise([RanCommand(openssl_sign_args)]),
+ )
message_sha256 = hashlib.sha256(message).hexdigest()
ctx.assert_log(
level="info",
message=(
f"Making detached signature of t.android_kernel "
f"(SHA-256: {message_sha256}) with Android Kernel key "
- f"{key.fingerprint}"))
+ f"{key.fingerprint}"
+ ),
+ )
def test_sign_android_kernel_clear_unsupported(self):
key = factory.create_key(KeyType.ANDROID_KERNEL)
self.assertRaises(
UnsupportedSignatureMode,
- key.sign, "t.android_kernel", b"test data", SignatureMode.CLEAR)
+ key.sign,
+ "t.android_kernel",
+ b"test data",
+ SignatureMode.CLEAR,
+ )
def test_inject_uefi(self):
private_key = factory.generate_random_bytes(size=64)
@@ -838,28 +1214,42 @@ class TestKey(TestCase):
created_at = datetime.utcnow().replace(tzinfo=timezone.utc)
fake_openssl = FakeOpenSSL(private_key, public_key, fingerprint)
self.processes_fixture.add(fake_openssl)
- key = Key.inject(KeyType.UEFI, private_key, public_key, description,
- created_at)
+ key = Key.inject(
+ KeyType.UEFI, private_key, public_key, description, created_at
+ )
now = get_transaction_timestamp(store)
- self.assertThat(key, MatchesStructure.byEquality(
- key_type=KeyType.UEFI,
- fingerprint=fingerprint,
- public_key=public_key,
- created_at=created_at,
- updated_at=now))
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ key_type=KeyType.UEFI,
+ fingerprint=fingerprint,
+ public_key=public_key,
+ created_at=created_at,
+ updated_at=now,
+ ),
+ )
self.assertEqual(private_key, key.getPrivateKey())
self.assertEqual(
- key, Key.getByTypeAndFingerprint(KeyType.UEFI, fingerprint))
+ key, Key.getByTypeAndFingerprint(KeyType.UEFI, fingerprint)
+ )
fingerprint_args = [
- "openssl", "x509", "-inform", "PEM", "-noout", "-fingerprint",
- ]
+ "openssl",
+ "x509",
+ "-inform",
+ "PEM",
+ "-noout",
+ "-fingerprint",
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([
- RanCommand(
- fingerprint_args,
- ),
- ]))
+ MatchesListwise(
+ [
+ RanCommand(
+ fingerprint_args,
+ ),
+ ]
+ ),
+ )
def test_inject_kmod(self):
private_key = factory.generate_random_bytes(size=64)
@@ -869,28 +1259,42 @@ class TestKey(TestCase):
self.processes_fixture.add(fake_openssl)
description = "PPA signing-owner testing"
created_at = datetime.utcnow().replace(tzinfo=timezone.utc)
- key = Key.inject(KeyType.KMOD, private_key, public_key, description,
- created_at)
+ key = Key.inject(
+ KeyType.KMOD, private_key, public_key, description, created_at
+ )
now = get_transaction_timestamp(store)
- self.assertThat(key, MatchesStructure.byEquality(
- key_type=KeyType.KMOD,
- fingerprint=fingerprint,
- public_key=public_key,
- created_at=created_at,
- updated_at=now))
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ key_type=KeyType.KMOD,
+ fingerprint=fingerprint,
+ public_key=public_key,
+ created_at=created_at,
+ updated_at=now,
+ ),
+ )
self.assertEqual(private_key, key.getPrivateKey())
self.assertEqual(
- key, Key.getByTypeAndFingerprint(KeyType.KMOD, fingerprint))
+ key, Key.getByTypeAndFingerprint(KeyType.KMOD, fingerprint)
+ )
fingerprint_args = [
- "openssl", "x509", "-inform", "DER", "-noout", "-fingerprint",
- ]
+ "openssl",
+ "x509",
+ "-inform",
+ "DER",
+ "-noout",
+ "-fingerprint",
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([
- RanCommand(
- fingerprint_args,
- ),
- ]))
+ MatchesListwise(
+ [
+ RanCommand(
+ fingerprint_args,
+ ),
+ ]
+ ),
+ )
def test_inject_opal(self):
private_key = factory.generate_random_bytes(size=64)
@@ -900,28 +1304,36 @@ class TestKey(TestCase):
self.processes_fixture.add(fake_openssl)
description = "PPA signing-owner testing"
created_at = datetime.utcnow().replace(tzinfo=timezone.utc)
- key = Key.inject(KeyType.OPAL, private_key, public_key, description,
- created_at)
+ key = Key.inject(
+ KeyType.OPAL, private_key, public_key, description, created_at
+ )
now = get_transaction_timestamp(store)
- self.assertThat(key, MatchesStructure.byEquality(
- key_type=KeyType.OPAL,
- fingerprint=fingerprint,
- public_key=public_key,
- created_at=created_at,
- updated_at=now))
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ key_type=KeyType.OPAL,
+ fingerprint=fingerprint,
+ public_key=public_key,
+ created_at=created_at,
+ updated_at=now,
+ ),
+ )
self.assertEqual(private_key, key.getPrivateKey())
self.assertEqual(
- key, Key.getByTypeAndFingerprint(KeyType.OPAL, fingerprint))
+ key, Key.getByTypeAndFingerprint(KeyType.OPAL, fingerprint)
+ )
fingerprint_args = [
- "openssl", "x509", "-inform", "DER", "-noout", "-fingerprint",
- ]
+ "openssl",
+ "x509",
+ "-inform",
+ "DER",
+ "-noout",
+ "-fingerprint",
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([
- RanCommand(
- fingerprint_args
- )
- ]))
+ MatchesListwise([RanCommand(fingerprint_args)]),
+ )
def test_inject_sipl(self):
private_key = factory.generate_random_bytes(size=64)
@@ -931,28 +1343,36 @@ class TestKey(TestCase):
self.processes_fixture.add(fake_openssl)
description = "PPA signing-owner testing"
created_at = datetime.utcnow().replace(tzinfo=timezone.utc)
- key = Key.inject(KeyType.SIPL, private_key, public_key, description,
- created_at)
+ key = Key.inject(
+ KeyType.SIPL, private_key, public_key, description, created_at
+ )
now = get_transaction_timestamp(store)
- self.assertThat(key, MatchesStructure.byEquality(
- key_type=KeyType.SIPL,
- fingerprint=fingerprint,
- public_key=public_key,
- created_at=created_at,
- updated_at=now))
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ key_type=KeyType.SIPL,
+ fingerprint=fingerprint,
+ public_key=public_key,
+ created_at=created_at,
+ updated_at=now,
+ ),
+ )
self.assertEqual(private_key, key.getPrivateKey())
self.assertEqual(
- key, Key.getByTypeAndFingerprint(KeyType.SIPL, fingerprint))
+ key, Key.getByTypeAndFingerprint(KeyType.SIPL, fingerprint)
+ )
fingerprint_args = [
- "openssl", "x509", "-inform", "DER", "-noout", "-fingerprint",
- ]
+ "openssl",
+ "x509",
+ "-inform",
+ "DER",
+ "-noout",
+ "-fingerprint",
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([
- RanCommand(
- fingerprint_args
- )
- ]))
+ MatchesListwise([RanCommand(fingerprint_args)]),
+ )
def test_inject_fit(self):
private_key = factory.generate_random_bytes(size=64)
@@ -962,49 +1382,66 @@ class TestKey(TestCase):
self.processes_fixture.add(fake_openssl)
description = "PPA signing-owner testing"
created_at = datetime.utcnow().replace(tzinfo=timezone.utc)
- key = Key.inject(KeyType.FIT, private_key, public_key, description,
- created_at)
+ key = Key.inject(
+ KeyType.FIT, private_key, public_key, description, created_at
+ )
now = get_transaction_timestamp(store)
- self.assertThat(key, MatchesStructure.byEquality(
- key_type=KeyType.FIT,
- fingerprint=fingerprint,
- public_key=public_key,
- created_at=created_at,
- updated_at=now))
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ key_type=KeyType.FIT,
+ fingerprint=fingerprint,
+ public_key=public_key,
+ created_at=created_at,
+ updated_at=now,
+ ),
+ )
self.assertEqual(private_key, key.getPrivateKey())
self.assertEqual(
- key, Key.getByTypeAndFingerprint(KeyType.FIT, fingerprint))
+ key, Key.getByTypeAndFingerprint(KeyType.FIT, fingerprint)
+ )
fingerprint_args = [
- "openssl", "x509", "-inform", "PEM", "-noout", "-fingerprint",
- ]
+ "openssl",
+ "x509",
+ "-inform",
+ "PEM",
+ "-noout",
+ "-fingerprint",
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([
- RanCommand(
- fingerprint_args
- )
- ]))
+ MatchesListwise([RanCommand(fingerprint_args)]),
+ )
def test_inject_openpgp(self):
with TemporaryDirectory() as tmp, _gpg_context(Path(tmp)) as ctx:
# Generate a test key, but do not store it in the database.
private_key, public_key, fingerprint = Key._generateOpenPGP(
- ctx, OpenPGPKeyAlgorithm.RSA, 1024,
- "~signing-owner/ubuntu/testing")
+ ctx,
+ OpenPGPKeyAlgorithm.RSA,
+ 1024,
+ "~signing-owner/ubuntu/testing",
+ )
description = "PPA signing-owner testing"
created_at = datetime.utcnow().replace(tzinfo=timezone.utc)
key = Key.inject(
- KeyType.OPENPGP, private_key, public_key, description, created_at)
+ KeyType.OPENPGP, private_key, public_key, description, created_at
+ )
now = get_transaction_timestamp(store)
- self.assertThat(key, MatchesStructure.byEquality(
- key_type=KeyType.OPENPGP,
- fingerprint=fingerprint,
- public_key=public_key,
- created_at=created_at,
- updated_at=now))
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ key_type=KeyType.OPENPGP,
+ fingerprint=fingerprint,
+ public_key=public_key,
+ created_at=created_at,
+ updated_at=now,
+ ),
+ )
self.assertEqual(private_key, key.getPrivateKey())
self.assertEqual(
- key, Key.getByTypeAndFingerprint(KeyType.OPENPGP, fingerprint))
+ key, Key.getByTypeAndFingerprint(KeyType.OPENPGP, fingerprint)
+ )
def test_inject_cv2_kernel(self):
private_key = factory.generate_random_bytes(size=64)
@@ -1015,22 +1452,32 @@ class TestKey(TestCase):
description = "PPA signing-owner testing"
created_at = datetime.utcnow().replace(tzinfo=timezone.utc)
key = Key.inject(
- KeyType.CV2_KERNEL, private_key, public_key, description,
- created_at)
+ KeyType.CV2_KERNEL,
+ private_key,
+ public_key,
+ description,
+ created_at,
+ )
now = get_transaction_timestamp(store)
- self.assertThat(key, MatchesStructure.byEquality(
- key_type=KeyType.CV2_KERNEL,
- fingerprint=fingerprint,
- public_key=public_key,
- created_at=created_at,
- updated_at=now))
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ key_type=KeyType.CV2_KERNEL,
+ fingerprint=fingerprint,
+ public_key=public_key,
+ created_at=created_at,
+ updated_at=now,
+ ),
+ )
self.assertEqual(private_key, key.getPrivateKey())
self.assertEqual(
- key, Key.getByTypeAndFingerprint(KeyType.CV2_KERNEL, fingerprint))
+ key, Key.getByTypeAndFingerprint(KeyType.CV2_KERNEL, fingerprint)
+ )
pkey_der_args = ["openssl", "pkey", "-pubin", "-outform", "DER"]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([RanCommand(pkey_der_args)]))
+ MatchesListwise([RanCommand(pkey_der_args)]),
+ )
def test_inject_android_kernel(self):
private_key = factory.generate_random_bytes(size=64)
@@ -1041,21 +1488,37 @@ class TestKey(TestCase):
description = "PPA signing-owner testing"
created_at = datetime.utcnow().replace(tzinfo=timezone.utc)
key = Key.inject(
- KeyType.ANDROID_KERNEL, private_key, public_key, description,
- created_at)
+ KeyType.ANDROID_KERNEL,
+ private_key,
+ public_key,
+ description,
+ created_at,
+ )
now = get_transaction_timestamp(store)
- self.assertThat(key, MatchesStructure.byEquality(
- key_type=KeyType.ANDROID_KERNEL,
- fingerprint=fingerprint,
- public_key=public_key,
- created_at=created_at,
- updated_at=now))
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ key_type=KeyType.ANDROID_KERNEL,
+ fingerprint=fingerprint,
+ public_key=public_key,
+ created_at=created_at,
+ updated_at=now,
+ ),
+ )
self.assertEqual(private_key, key.getPrivateKey())
self.assertEqual(
- key, Key.getByTypeAndFingerprint(
- KeyType.ANDROID_KERNEL, fingerprint))
- pkey_der_args = ["openssl", "x509", "-inform",
- "PEM", "-noout", "-fingerprint"]
+ key,
+ Key.getByTypeAndFingerprint(KeyType.ANDROID_KERNEL, fingerprint),
+ )
+ pkey_der_args = [
+ "openssl",
+ "x509",
+ "-inform",
+ "PEM",
+ "-noout",
+ "-fingerprint",
+ ]
self.assertThat(
self.processes_fixture.procs,
- MatchesListwise([RanCommand(pkey_der_args)]))
+ MatchesListwise([RanCommand(pkey_der_args)]),
+ )
diff --git a/lp_signing/model/tests/test_nonce.py b/lp_signing/model/tests/test_nonce.py
index c2c5177..2911ab0 100644
--- a/lp_signing/model/tests/test_nonce.py
+++ b/lp_signing/model/tests/test_nonce.py
@@ -8,12 +8,7 @@ from datetime import timedelta
from flask_storm import store
from storm.expr import Cast
from testtools import TestCase
-from testtools.matchers import (
- Equals,
- HasLength,
- Is,
- MatchesStructure,
- )
+from testtools.matchers import Equals, HasLength, Is, MatchesStructure
from lp_signing.database.constants import UTC_NOW
from lp_signing.database.helpers import get_transaction_timestamp
@@ -23,18 +18,22 @@ from lp_signing.tests.testfixtures import DatabaseFixture
class TestNonce(TestCase):
-
def setUp(self):
super().setUp()
self.useFixture(DatabaseFixture())
def test_generate(self):
nonce1 = Nonce.generate()
- self.assertThat(nonce1, MatchesStructure(
- nonce=HasLength(24),
- used=Is(False),
- expires=Equals(
- get_transaction_timestamp(store) + timedelta(seconds=60))))
+ self.assertThat(
+ nonce1,
+ MatchesStructure(
+ nonce=HasLength(24),
+ used=Is(False),
+ expires=Equals(
+ get_transaction_timestamp(store) + timedelta(seconds=60)
+ ),
+ ),
+ )
nonce2 = Nonce.generate()
self.assertNotEqual(nonce1.nonce, nonce2.nonce)
diff --git a/lp_signing/tests/__init__.py b/lp_signing/tests/__init__.py
index f28f71f..7f54ebd 100644
--- a/lp_signing/tests/__init__.py
+++ b/lp_signing/tests/__init__.py
@@ -8,7 +8,6 @@ import talisker.logs
import lp_signing
-
talisker.logs.configure_test_logging()
diff --git a/lp_signing/tests/factory.py b/lp_signing/tests/factory.py
index 596f7b4..e02d895 100644
--- a/lp_signing/tests/factory.py
+++ b/lp_signing/tests/factory.py
@@ -3,12 +3,12 @@
"""Test factory."""
-from datetime import datetime
import inspect
-from itertools import count
-from pathlib import Path
import random
import re
+from datetime import datetime
+from itertools import count
+from pathlib import Path
import pytz
@@ -16,7 +16,6 @@ from lp_signing.enums import KeyType
from lp_signing.model.client import Client
from lp_signing.model.key import Key
-
_unique_int_counter = count(100000)
@@ -54,12 +53,17 @@ def create_client(name=None, created_at=None, updated_at=None):
name,
created_at=created_at,
updated_at=updated_at,
- )
+ )
-def create_key(key_type=None, fingerprint=None,
- private_key=None, public_key=None,
- created_at=None, updated_at=None):
+def create_key(
+ key_type=None,
+ fingerprint=None,
+ private_key=None,
+ public_key=None,
+ created_at=None,
+ updated_at=None,
+):
"""Create a Key database object."""
key_type = key_type or KeyType.UEFI
fingerprint = fingerprint or generate_fingerprint()
@@ -74,4 +78,4 @@ def create_key(key_type=None, fingerprint=None,
public_key=public_key,
created_at=created_at,
updated_at=updated_at,
- )
+ )
diff --git a/lp_signing/tests/matchers.py b/lp_signing/tests/matchers.py
index 187f054..293e6d8 100644
--- a/lp_signing/tests/matchers.py
+++ b/lp_signing/tests/matchers.py
@@ -16,26 +16,21 @@ from testtools.matchers import (
MatchesStructure,
Mismatch,
StartsWith,
- )
+)
from lp_signing.enums import SignatureMode
-from lp_signing.model.key import (
- _gpg_context,
- _gpg_key_import,
- _temporary_path,
- )
+from lp_signing.model.key import _gpg_context, _gpg_key_import, _temporary_path
class RanCommand(MatchesStructure):
-
def __init__(self, args, **kwargs):
args_matchers = [
- Equals(arg) if isinstance(arg, str) else arg for arg in args]
+ Equals(arg) if isinstance(arg, str) else arg for arg in args
+ ]
super().__init__(args=MatchesListwise(args_matchers), **kwargs)
class BaseJSONResponseMatcher(Matcher):
-
def __init__(self, expected_status=200):
super().__init__()
self.expected_status = expected_status
@@ -43,9 +38,9 @@ class BaseJSONResponseMatcher(Matcher):
def match(self, matchee):
if matchee.status_code != self.expected_status:
return Mismatch(
- "Expected status code %d, got %d instead" % (
- self.expected_status, matchee.status_code
- ))
+ "Expected status code %d, got %d instead"
+ % (self.expected_status, matchee.status_code)
+ )
if matchee.content_type != "application/json":
return Mismatch(
"Expected 'application/json' content type. Got '%s' instead."
@@ -54,7 +49,6 @@ class BaseJSONResponseMatcher(Matcher):
class IsJSONResponse(BaseJSONResponseMatcher):
-
def __init__(self, json_matcher, expected_status=200):
super().__init__(expected_status=expected_status)
self.json_matcher = json_matcher
@@ -67,7 +61,6 @@ class IsJSONResponse(BaseJSONResponseMatcher):
class HasAPIError(BaseJSONResponseMatcher):
-
def __init__(self, expected_message, expected_status=400):
super().__init__(expected_status=expected_status)
if isinstance(expected_message, str):
@@ -81,11 +74,11 @@ class HasAPIError(BaseJSONResponseMatcher):
return err
data = json.loads(matchee.data.decode(matchee.charset))
return AnyMatch(self.expected_message_matcher).match(
- [error["message"] for error in data["error_list"]])
+ [error["message"] for error in data["error_list"]]
+ )
class MatchesOpenPGPKey(AfterPreprocessing):
-
def __init__(self, matcher, secret=False):
super().__init__(self._load_gpg_key, matcher)
self.secret = secret
@@ -94,7 +87,8 @@ class MatchesOpenPGPKey(AfterPreprocessing):
with _temporary_path() as tmp, _gpg_context(tmp) as ctx:
result = _gpg_key_import(ctx, value)
return list(
- ctx.keylist(result.imports[0].fpr, secret=self.secret))[0]
+ ctx.keylist(result.imports[0].fpr, secret=self.secret)
+ )[0]
def match(self, matchee):
if self.secret:
@@ -109,7 +103,6 @@ class MatchesOpenPGPKey(AfterPreprocessing):
class OpenPGPSignatureVerifies(Matcher):
-
def __init__(self, public_key, message, mode):
self.public_key = public_key
self.message = message
diff --git a/lp_signing/tests/test_crypto.py b/lp_signing/tests/test_crypto.py
index 4c1987e..bfe2085 100644
--- a/lp_signing/tests/test_crypto.py
+++ b/lp_signing/tests/test_crypto.py
@@ -6,14 +6,10 @@
from nacl.public import PrivateKey
from testtools import TestCase
-from lp_signing.crypto import (
- CryptoError,
- NaClEncryptedContainerBase,
- )
+from lp_signing.crypto import CryptoError, NaClEncryptedContainerBase
class FakeEncryptedContainer(NaClEncryptedContainerBase):
-
def __init__(self, public_key_bytes, private_key_bytes=None):
self._public_key_bytes = public_key_bytes
self._private_key_bytes = private_key_bytes
@@ -28,7 +24,6 @@ class FakeEncryptedContainer(NaClEncryptedContainerBase):
class TestNaClEncryptedContainerBase(TestCase):
-
def test_public_key_valid(self):
public_key = PrivateKey.generate().public_key
container = FakeEncryptedContainer(bytes(public_key))
@@ -54,7 +49,8 @@ class TestNaClEncryptedContainerBase(TestCase):
def test_private_key_valid(self):
private_key = PrivateKey.generate()
container = FakeEncryptedContainer(
- bytes(private_key.public_key), bytes(private_key))
+ bytes(private_key.public_key), bytes(private_key)
+ )
self.assertEqual(private_key, container.private_key)
self.assertTrue(container.can_decrypt)
@@ -73,6 +69,8 @@ class TestNaClEncryptedContainerBase(TestCase):
def test_encrypt_decrypt(self):
private_key = PrivateKey.generate()
container = FakeEncryptedContainer(
- bytes(private_key.public_key), bytes(private_key))
+ bytes(private_key.public_key), bytes(private_key)
+ )
self.assertEqual(
- b"plaintext", container.decrypt(container.encrypt(b"plaintext")))
+ b"plaintext", container.decrypt(container.encrypt(b"plaintext"))
+ )
diff --git a/lp_signing/tests/test_talisker.py b/lp_signing/tests/test_talisker.py
index 9abf818..dd58a25 100644
--- a/lp_signing/tests/test_talisker.py
+++ b/lp_signing/tests/test_talisker.py
@@ -12,8 +12,8 @@ import socket
import subprocess
import tempfile
import time
-from urllib.parse import urlunsplit
from pathlib import Path
+from urllib.parse import urlunsplit
import fixtures
import requests
@@ -26,7 +26,7 @@ from testtools.matchers import (
Equals,
MatchesRegex,
MatchesStructure,
- )
+)
class FixtureResource(testresources.TestResourceManager):
@@ -55,13 +55,15 @@ class FixtureResource(testresources.TestResourceManager):
class TaliskerFixture(fixtures.Fixture):
-
def _setUp(self):
self.bind = f"localhost:{_get_unused_port()}"
root = Path(__file__).parent.parent.parent.resolve()
talisker = root / "env" / "bin" / "talisker"
talisker_cmd = [
- str(talisker), "-b", self.bind, "lp_signing.webapp:app",
+ str(talisker),
+ "-b",
+ self.bind,
+ "lp_signing.webapp:app",
]
stdout_fd, self._stdout_name = tempfile.mkstemp()
self.addCleanup(os.remove, self._stdout_name)
@@ -107,17 +109,21 @@ class TaliskerFixture(fixtures.Fixture):
class TaliskerTests(testresources.ResourcedTestCase, TestCase):
-
resources = [("talisker", FixtureResource(TaliskerFixture()))]
def test_check_returns_200_and_revision(self):
path = self.talisker.url("/_status/check")
resp = requests.get(path)
- self.assertThat(resp, MatchesStructure(
- status_code=Equals(200),
- headers=ContainsDict(
- {"Content-Type": Equals("application/json")})))
+ self.assertThat(
+ resp,
+ MatchesStructure(
+ status_code=Equals(200),
+ headers=ContainsDict(
+ {"Content-Type": Equals("application/json")}
+ ),
+ ),
+ )
self.assertThat(resp.json(), Equals({"ok": True}))
def test_ping_returns_200_and_revision(self):
@@ -131,9 +137,12 @@ class TaliskerTests(testresources.ResourcedTestCase, TestCase):
path = self.talisker.url("/")
resp = requests.get(path)
- self.assertThat(resp, MatchesStructure(
- status_code=Equals(200),
- headers=IncludesGitRevisionHeader()))
+ self.assertThat(
+ resp,
+ MatchesStructure(
+ status_code=Equals(200), headers=IncludesGitRevisionHeader()
+ ),
+ )
def test_sentry_endpoint(self):
path = self.talisker.url("/_status/test/sentry")
@@ -143,7 +152,9 @@ class TaliskerTests(testresources.ResourcedTestCase, TestCase):
self.assertThat(
self.talisker.stderr.strip(),
Contains(
- "talisker.endpoints.TestException: this is a test, ignore"))
+ "talisker.endpoints.TestException: this is a test, ignore"
+ ),
+ )
def IsGitHash():
@@ -171,4 +182,5 @@ def _get_unused_port():
def load_tests(loader, standard_tests, pattern):
from testresources import OptimisingTestSuite
+
return OptimisingTestSuite(standard_tests)
diff --git a/lp_signing/tests/test_webapi.py b/lp_signing/tests/test_webapi.py
index 468d91f..c5933f2 100644
--- a/lp_signing/tests/test_webapi.py
+++ b/lp_signing/tests/test_webapi.py
@@ -2,29 +2,25 @@
# GNU Affero General Public License version 3 (see the file LICENSE).
import base64
-from datetime import datetime, timedelta
import json
+from datetime import datetime, timedelta
from pathlib import Path
-import pytz
from tempfile import TemporaryDirectory
+import gpg
+import pytz
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
from cryptography.hazmat.primitives.serialization import load_pem_public_key
from cryptography.x509 import (
load_der_x509_certificate,
load_pem_x509_certificate,
- )
+)
from fixtures import MockPatch
-import gpg
-from gpg.results import GenkeyResult
from flask_storm import store
+from gpg.results import GenkeyResult
from nacl.encoding import Base64Encoder
-from nacl.public import (
- Box,
- PrivateKey,
- PublicKey,
- )
+from nacl.public import Box, PrivateKey, PublicKey
from nacl.utils import random
from testtools import TestCase
from testtools.matchers import (
@@ -40,19 +36,12 @@ from testtools.matchers import (
MatchesRegex,
MatchesStructure,
Not,
- )
+)
from lp_signing.auth import boxed_authentication
-from lp_signing.enums import (
- KeyType,
- OpenPGPKeyAlgorithm,
- SignatureMode,
- )
+from lp_signing.enums import KeyType, OpenPGPKeyAlgorithm, SignatureMode
from lp_signing.exceptions import InvalidNonce
-from lp_signing.model.key import (
- _gpg_context,
- Key,
- )
+from lp_signing.model.key import Key, _gpg_context
from lp_signing.model.nonce import Nonce
from lp_signing.tests import factory
from lp_signing.tests.matchers import (
@@ -60,7 +49,7 @@ from lp_signing.tests.matchers import (
IsJSONResponse,
MatchesOpenPGPKey,
OpenPGPSignatureVerifies,
- )
+)
from lp_signing.tests.testfixtures import (
AppFixture,
DatabaseFixture,
@@ -70,31 +59,32 @@ from lp_signing.tests.testfixtures import (
FakeOpenSSLSign,
FakeProcesses,
FakeSBSign,
- )
+)
class TestRootView(TestCase):
-
def setUp(self):
super().setUp()
self.fixture = self.useFixture(AppFixture())
def test_root(self):
resp = self.fixture.client.get("/")
- self.assertThat(resp, MatchesStructure.byEquality(
- status_code=200,
- content_type="text/plain",
- data=b"Launchpad signing service.\n"))
+ self.assertThat(
+ resp,
+ MatchesStructure.byEquality(
+ status_code=200,
+ content_type="text/plain",
+ data=b"Launchpad signing service.\n",
+ ),
+ )
class MatchesOID(MatchesStructure):
-
def __init__(self, dotted_string):
super().__init__(dotted_string=Equals(dotted_string))
class MatchesExtendedKeyUsageExtension(Matcher):
-
def __init__(self, usage_matcher=None):
self.usage_matcher = usage_matcher
@@ -107,26 +97,33 @@ class MatchesExtendedKeyUsageExtension(Matcher):
class TestServiceKeyView(TestCase):
-
def setUp(self):
super().setUp()
self.fixture = self.useFixture(AppFixture())
def test_service_key(self):
resp = self.fixture.client.get("/service-key")
- expected_preferred_key = (
- boxed_authentication.service_private_keys[0].public_key)
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "service-key": AfterPreprocessing(
- lambda data: PublicKey(
- data.encode("UTF-8"), encoder=Base64Encoder),
- Equals(expected_preferred_key)),
- })))
+ expected_preferred_key = boxed_authentication.service_private_keys[
+ 0
+ ].public_key
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "service-key": AfterPreprocessing(
+ lambda data: PublicKey(
+ data.encode("UTF-8"), encoder=Base64Encoder
+ ),
+ Equals(expected_preferred_key),
+ ),
+ }
+ )
+ ),
+ )
class TestNonceView(TestCase):
-
def setUp(self):
super().setUp()
self.fixture = self.useFixture(AppFixture())
@@ -134,18 +131,26 @@ class TestNonceView(TestCase):
def test_nonce(self):
resp = self.fixture.client.post("/nonce")
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "nonce": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- HasLength(Box.NONCE_SIZE)),
- }),
- expected_status=201))
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "nonce": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ HasLength(Box.NONCE_SIZE),
+ ),
+ }
+ ),
+ expected_status=201,
+ ),
+ )
Nonce.check(base64.b64decode(resp.json["nonce"].encode("UTF-8")))
class TestGenerateView(TestCase):
-
def setUp(self):
super().setUp()
self.fixture = self.useFixture(AppFixture())
@@ -162,7 +167,8 @@ class TestGenerateView(TestCase):
kwargs.setdefault("nonce", self.nonce)
kwargs.setdefault("response_nonce", self.response_nonce)
return self.fixture.client.post(
- "/generate", json_data=json_data, **kwargs)
+ "/generate", json_data=json_data, **kwargs
+ )
def test_unauthenticated(self):
resp = self.post_generate(private_key=None)
@@ -170,7 +176,8 @@ class TestGenerateView(TestCase):
def test_undecodable_client_public_key(self):
resp = self.post_generate(
- headers={"X-Client-Public-Key": "nonsense key"})
+ headers={"X-Client-Public-Key": "nonsense key"}
+ )
self.assertThat(resp, HasAPIError("Cannot decode client public key"))
def test_undecodable_nonce(self):
@@ -181,13 +188,16 @@ class TestGenerateView(TestCase):
data = json.dumps({}).encode("UTF-8")
box = Box(
self.private_key,
- boxed_authentication.service_private_keys[0].public_key)
+ boxed_authentication.service_private_keys[0].public_key,
+ )
message = box.encrypt(data, self.nonce, encoder=Base64Encoder)
resp = self.post_generate(
headers={"X-Response-Nonce": "nonsense nonce"},
- encrypted_data=message.ciphertext)
+ encrypted_data=message.ciphertext,
+ )
self.assertThat(
- resp, HasAPIError("Cannot decode X-Response-Nonce header"))
+ resp, HasAPIError("Cannot decode X-Response-Nonce header")
+ )
def test_nonce_already_used(self):
Nonce.check(self.nonce)
@@ -227,7 +237,9 @@ class TestGenerateView(TestCase):
resp,
HasAPIError(
"{'description': ''} is not valid under any of the given "
- "schemas at /"))
+ "schemas at /"
+ ),
+ )
self.assertNonceConsumed()
def test_invalid_key_type(self):
@@ -236,7 +248,9 @@ class TestGenerateView(TestCase):
resp,
HasAPIError(
"{'key-type': 'nonsense', 'description': ''} is not valid "
- "under any of the given schemas at /"))
+ "under any of the given schemas at /"
+ ),
+ )
self.assertNonceConsumed()
def test_missing_description(self):
@@ -245,49 +259,63 @@ class TestGenerateView(TestCase):
resp,
HasAPIError(
"{'key-type': 'UEFI'} is not valid under any of the given "
- "schemas at /"))
+ "schemas at /"
+ ),
+ )
self.assertNonceConsumed()
def test_openpgp_missing_openpgp_key_algorithm(self):
- resp = self.post_generate({
- "key-type": "OPENPGP",
- "description": "",
- "length": 1024,
- })
+ resp = self.post_generate(
+ {
+ "key-type": "OPENPGP",
+ "description": "",
+ "length": 1024,
+ }
+ )
self.assertThat(
resp,
HasAPIError(
"{'key-type': 'OPENPGP', 'description': '', 'length': 1024} "
- "is not valid under any of the given schemas at /"))
+ "is not valid under any of the given schemas at /"
+ ),
+ )
self.assertNonceConsumed()
def test_openpgp_invalid_openpgp_key_algorithm(self):
- resp = self.post_generate({
- "key-type": "OPENPGP",
- "description": "",
- "openpgp-key-algorithm": "nonsense",
- "length": 1024,
- })
+ resp = self.post_generate(
+ {
+ "key-type": "OPENPGP",
+ "description": "",
+ "openpgp-key-algorithm": "nonsense",
+ "length": 1024,
+ }
+ )
self.assertThat(
resp,
HasAPIError(
"{'key-type': 'OPENPGP', 'description': '', "
"'openpgp-key-algorithm': 'nonsense', 'length': 1024} is not "
- "valid under any of the given schemas at /"))
+ "valid under any of the given schemas at /"
+ ),
+ )
self.assertNonceConsumed()
def test_openpgp_missing_length(self):
- resp = self.post_generate({
- "key-type": "OPENPGP",
- "description": "",
- "openpgp-key-algorithm": "RSA",
- })
+ resp = self.post_generate(
+ {
+ "key-type": "OPENPGP",
+ "description": "",
+ "openpgp-key-algorithm": "RSA",
+ }
+ )
self.assertThat(
resp,
HasAPIError(
"{'key-type': 'OPENPGP', 'description': '', "
"'openpgp-key-algorithm': 'RSA'} is not valid under any of "
- "the given schemas at /"))
+ "the given schemas at /"
+ ),
+ )
self.assertNonceConsumed()
def test_generate_uefi(self):
@@ -296,29 +324,48 @@ class TestGenerateView(TestCase):
{
"key-type": "UEFI",
"description": "PPA test-owner test-archive",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "fingerprint": HasLength(40),
- "public-key": AfterPreprocessing(
- lambda data: load_pem_x509_certificate(
- base64.b64decode(data.encode("UTF-8")),
- default_backend()),
- MatchesStructure(
- subject=AfterPreprocessing(
- lambda subject: subject.rfc4514_string(),
- Equals("CN=PPA test-owner test-archive UEFI")))),
- }),
- expected_status=201))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ "public-key": AfterPreprocessing(
+ lambda data: load_pem_x509_certificate(
+ base64.b64decode(data.encode("UTF-8")),
+ default_backend(),
+ ),
+ MatchesStructure(
+ subject=AfterPreprocessing(
+ lambda subject: subject.rfc4514_string(),
+ Equals(
+ "CN=PPA test-owner test-archive UEFI"
+ ),
+ )
+ ),
+ ),
+ }
+ ),
+ expected_status=201,
+ ),
+ )
self.assertNonceConsumed()
# The new key was committed to the database.
key = Key.getByTypeAndFingerprint(
- KeyType.UEFI, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- public_key=base64.b64decode(
- resp.json["public-key"].encode("UTF-8")),
- authorizations=[self.client]))
+ KeyType.UEFI, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ public_key=base64.b64decode(
+ resp.json["public-key"].encode("UTF-8")
+ ),
+ authorizations=[self.client],
+ ),
+ )
def test_generate_uefi_req_error(self):
processes_fixture = self.useFixture(FakeProcesses())
@@ -327,10 +374,12 @@ class TestGenerateView(TestCase):
{
"key-type": "UEFI",
"description": "PPA test-owner test-archive",
- })
+ }
+ )
error_re = (
r"Failed to generate key: "
- r"Command .*'openssl', 'req'.* returned non-zero exit status 1")
+ r"Command .*'openssl', 'req'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -344,10 +393,12 @@ class TestGenerateView(TestCase):
{
"key-type": "UEFI",
"description": "PPA test-owner test-archive",
- })
+ }
+ )
error_re = (
r"Failed to get fingerprint of new key: "
- r"Command .*'-fingerprint'.* returned non-zero exit status 1")
+ r"Command .*'-fingerprint'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -357,36 +408,63 @@ class TestGenerateView(TestCase):
{
"key-type": "KMOD",
"description": "PPA test-owner test-archive",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "fingerprint": HasLength(40),
- "public-key": AfterPreprocessing(
- lambda data: load_der_x509_certificate(
- base64.b64decode(data.encode("UTF-8")),
- default_backend()),
- MatchesStructure(
- subject=AfterPreprocessing(
- lambda subject: subject.rfc4514_string(),
- Equals("CN=PPA test-owner test-archive Kmod")),
- extensions=AnyMatch(
- MatchesExtendedKeyUsageExtension(
- MatchesListwise([
- # codeSigning
- MatchesOID("1.3.6.1.5.5.7.3.3"),
- MatchesOID("1.3.6.1.4.1.2312.16.1.2"),
- ]))))),
- }),
- expected_status=201))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ "public-key": AfterPreprocessing(
+ lambda data: load_der_x509_certificate(
+ base64.b64decode(data.encode("UTF-8")),
+ default_backend(),
+ ),
+ MatchesStructure(
+ subject=AfterPreprocessing(
+ lambda subject: subject.rfc4514_string(),
+ Equals(
+ "CN=PPA test-owner test-archive Kmod"
+ ),
+ ),
+ extensions=AnyMatch(
+ MatchesExtendedKeyUsageExtension(
+ MatchesListwise(
+ [
+ # codeSigning
+ MatchesOID(
+ "1.3.6.1.5.5.7.3.3"
+ ),
+ MatchesOID(
+ "1.3.6.1.4.1.2312.16.1.2"
+ ),
+ ]
+ )
+ )
+ ),
+ ),
+ ),
+ }
+ ),
+ expected_status=201,
+ ),
+ )
self.assertNonceConsumed()
# The new key was committed to the database.
key = Key.getByTypeAndFingerprint(
- KeyType.KMOD, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- public_key=base64.b64decode(
- resp.json["public-key"].encode("UTF-8")),
- authorizations=[self.client]))
+ KeyType.KMOD, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ public_key=base64.b64decode(
+ resp.json["public-key"].encode("UTF-8")
+ ),
+ authorizations=[self.client],
+ ),
+ )
def test_generate_kmod_req_error(self):
processes_fixture = self.useFixture(FakeProcesses())
@@ -395,10 +473,12 @@ class TestGenerateView(TestCase):
{
"key-type": "KMOD",
"description": "PPA test-owner test-archive",
- })
+ }
+ )
error_re = (
r"Failed to generate key: "
- r"Command .*'openssl', 'req'.* returned non-zero exit status 1")
+ r"Command .*'openssl', 'req'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -412,10 +492,12 @@ class TestGenerateView(TestCase):
{
"key-type": "KMOD",
"description": "PPA test-owner test-archive",
- })
+ }
+ )
error_re = (
r"Failed to get fingerprint of new key: "
- r"Command .*'-fingerprint'.* returned non-zero exit status 1")
+ r"Command .*'-fingerprint'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -425,31 +507,53 @@ class TestGenerateView(TestCase):
{
"key-type": "OPAL",
"description": "PPA test-owner test-archive",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "fingerprint": HasLength(40),
- "public-key": AfterPreprocessing(
- lambda data: load_der_x509_certificate(
- base64.b64decode(data.encode("UTF-8")),
- default_backend()),
- MatchesStructure(
- subject=AfterPreprocessing(
- lambda subject: subject.rfc4514_string(),
- Equals("CN=PPA test-owner test-archive OPAL")),
- extensions=Not(AnyMatch(
- MatchesExtendedKeyUsageExtension())))),
- }),
- expected_status=201))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ "public-key": AfterPreprocessing(
+ lambda data: load_der_x509_certificate(
+ base64.b64decode(data.encode("UTF-8")),
+ default_backend(),
+ ),
+ MatchesStructure(
+ subject=AfterPreprocessing(
+ lambda subject: subject.rfc4514_string(),
+ Equals(
+ "CN=PPA test-owner test-archive OPAL"
+ ),
+ ),
+ extensions=Not(
+ AnyMatch(
+ MatchesExtendedKeyUsageExtension()
+ )
+ ),
+ ),
+ ),
+ }
+ ),
+ expected_status=201,
+ ),
+ )
self.assertNonceConsumed()
# The new key was committed to the database.
key = Key.getByTypeAndFingerprint(
- KeyType.OPAL, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- public_key=base64.b64decode(
- resp.json["public-key"].encode("UTF-8")),
- authorizations=[self.client]))
+ KeyType.OPAL, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ public_key=base64.b64decode(
+ resp.json["public-key"].encode("UTF-8")
+ ),
+ authorizations=[self.client],
+ ),
+ )
def test_generate_opal_req_error(self):
processes_fixture = self.useFixture(FakeProcesses())
@@ -458,10 +562,12 @@ class TestGenerateView(TestCase):
{
"key-type": "OPAL",
"description": "PPA test-owner test-archive",
- })
+ }
+ )
error_re = (
r"Failed to generate key: "
- r"Command .*'openssl', 'req'.* returned non-zero exit status 1")
+ r"Command .*'openssl', 'req'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -475,10 +581,12 @@ class TestGenerateView(TestCase):
{
"key-type": "OPAL",
"description": "PPA test-owner test-archive",
- })
+ }
+ )
error_re = (
r"Failed to get fingerprint of new key: "
- r"Command .*'-fingerprint'.* returned non-zero exit status 1")
+ r"Command .*'-fingerprint'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -488,31 +596,53 @@ class TestGenerateView(TestCase):
{
"key-type": "SIPL",
"description": "PPA test-owner test-archive",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "fingerprint": HasLength(40),
- "public-key": AfterPreprocessing(
- lambda data: load_der_x509_certificate(
- base64.b64decode(data.encode("UTF-8")),
- default_backend()),
- MatchesStructure(
- subject=AfterPreprocessing(
- lambda subject: subject.rfc4514_string(),
- Equals("CN=PPA test-owner test-archive SIPL")),
- extensions=Not(AnyMatch(
- MatchesExtendedKeyUsageExtension())))),
- }),
- expected_status=201))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ "public-key": AfterPreprocessing(
+ lambda data: load_der_x509_certificate(
+ base64.b64decode(data.encode("UTF-8")),
+ default_backend(),
+ ),
+ MatchesStructure(
+ subject=AfterPreprocessing(
+ lambda subject: subject.rfc4514_string(),
+ Equals(
+ "CN=PPA test-owner test-archive SIPL"
+ ),
+ ),
+ extensions=Not(
+ AnyMatch(
+ MatchesExtendedKeyUsageExtension()
+ )
+ ),
+ ),
+ ),
+ }
+ ),
+ expected_status=201,
+ ),
+ )
self.assertNonceConsumed()
# The new key was committed to the database.
key = Key.getByTypeAndFingerprint(
- KeyType.SIPL, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- public_key=base64.b64decode(
- resp.json["public-key"].encode("UTF-8")),
- authorizations=[self.client]))
+ KeyType.SIPL, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ public_key=base64.b64decode(
+ resp.json["public-key"].encode("UTF-8")
+ ),
+ authorizations=[self.client],
+ ),
+ )
def test_generate_sipl_req_error(self):
processes_fixture = self.useFixture(FakeProcesses())
@@ -521,10 +651,12 @@ class TestGenerateView(TestCase):
{
"key-type": "SIPL",
"description": "PPA test-owner test-archive",
- })
+ }
+ )
error_re = (
r"Failed to generate key: "
- r"Command .*'openssl', 'req'.* returned non-zero exit status 1")
+ r"Command .*'openssl', 'req'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -538,10 +670,12 @@ class TestGenerateView(TestCase):
{
"key-type": "SIPL",
"description": "PPA test-owner test-archive",
- })
+ }
+ )
error_re = (
r"Failed to get fingerprint of new key: "
- r"Command .*'-fingerprint'.* returned non-zero exit status 1")
+ r"Command .*'-fingerprint'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -551,29 +685,48 @@ class TestGenerateView(TestCase):
{
"key-type": "FIT",
"description": "PPA test-owner test-archive",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "fingerprint": HasLength(40),
- "public-key": AfterPreprocessing(
- lambda data: load_pem_x509_certificate(
- base64.b64decode(data.encode("UTF-8")),
- default_backend()),
- MatchesStructure(
- subject=AfterPreprocessing(
- lambda subject: subject.rfc4514_string(),
- Equals("CN=PPA test-owner test-archive FIT")))),
- }),
- expected_status=201))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ "public-key": AfterPreprocessing(
+ lambda data: load_pem_x509_certificate(
+ base64.b64decode(data.encode("UTF-8")),
+ default_backend(),
+ ),
+ MatchesStructure(
+ subject=AfterPreprocessing(
+ lambda subject: subject.rfc4514_string(),
+ Equals(
+ "CN=PPA test-owner test-archive FIT"
+ ),
+ )
+ ),
+ ),
+ }
+ ),
+ expected_status=201,
+ ),
+ )
self.assertNonceConsumed()
# The new key was committed to the database.
key = Key.getByTypeAndFingerprint(
- KeyType.FIT, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- public_key=base64.b64decode(
- resp.json["public-key"].encode("UTF-8")),
- authorizations=[self.client]))
+ KeyType.FIT, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ public_key=base64.b64decode(
+ resp.json["public-key"].encode("UTF-8")
+ ),
+ authorizations=[self.client],
+ ),
+ )
def test_generate_fit_req_error(self):
processes_fixture = self.useFixture(FakeProcesses())
@@ -582,10 +735,12 @@ class TestGenerateView(TestCase):
{
"key-type": "FIT",
"description": "PPA test-owner test-archive",
- })
+ }
+ )
error_re = (
r"Failed to generate key: "
- r"Command .*'openssl', 'req'.* returned non-zero exit status 1")
+ r"Command .*'openssl', 'req'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -599,10 +754,12 @@ class TestGenerateView(TestCase):
{
"key-type": "FIT",
"description": "PPA test-owner test-archive",
- })
+ }
+ )
error_re = (
r"Failed to get fingerprint of new key: "
- r"Command .*'-fingerprint'.* returned non-zero exit status 1")
+ r"Command .*'-fingerprint'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -614,93 +771,139 @@ class TestGenerateView(TestCase):
"description": "PPA test-owner test-archive",
"openpgp-key-algorithm": "RSA",
"length": 1024,
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "fingerprint": HasLength(40),
- "public-key": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- MatchesOpenPGPKey(
- MatchesStructure(
- fpr=Equals(resp.json["fingerprint"]),
- secret=Equals(0),
- subkeys=MatchesListwise([
- MatchesStructure.byEquality(
- pubkey_algo=1,
- length=1024),
- ]),
- uids=MatchesListwise([
- MatchesStructure.byEquality(
- uid="PPA test-owner test-archive"),
- ])))),
- }),
- expected_status=201))
+ }
+ )
+ key_matcher = MatchesOpenPGPKey(
+ MatchesStructure(
+ fpr=Equals(resp.json["fingerprint"]),
+ secret=Equals(0),
+ subkeys=MatchesListwise(
+ [
+ MatchesStructure.byEquality(
+ pubkey_algo=1, length=1024
+ ),
+ ]
+ ),
+ uids=MatchesListwise(
+ [
+ MatchesStructure.byEquality(
+ uid="PPA test-owner test-archive"
+ ),
+ ]
+ ),
+ )
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ "public-key": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ key_matcher,
+ ),
+ }
+ ),
+ expected_status=201,
+ ),
+ )
self.assertNonceConsumed()
# The new key was committed to the database.
key = Key.getByTypeAndFingerprint(
- KeyType.OPENPGP, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- public_key=base64.b64decode(
- resp.json["public-key"].encode("UTF-8")),
- authorizations=[self.client]))
+ KeyType.OPENPGP, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ public_key=base64.b64decode(
+ resp.json["public-key"].encode("UTF-8")
+ ),
+ authorizations=[self.client],
+ ),
+ )
def test_generate_openpgp_create_error(self):
result = GenkeyResult(None)
result.primary = False
result.sub = False
- self.useFixture(MockPatch(
- "gpg.Context.create_key", return_value=result))
+ self.useFixture(
+ MockPatch("gpg.Context.create_key", return_value=result)
+ )
resp = self.post_generate(
{
"key-type": "OPENPGP",
"description": "PPA test-owner test-archive",
"openpgp-key-algorithm": "RSA",
"length": 1024,
- })
+ }
+ )
error_re = r"Failed to generate key: Secret key generation failed."
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
def test_generate_cv2_kernel(self):
# Integration test: generate and return a real CV2 kernel key.
- resp = self.post_generate({
- "key-type": "CV2_KERNEL",
- "description": "PPA test-owner test-archive",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "fingerprint": HasLength(40),
- "public-key": AfterPreprocessing(
- lambda data: load_pem_public_key(
- base64.b64decode(data.encode("UTF-8")),
- default_backend()),
- MatchesAll(
- IsInstance(RSAPublicKey),
- MatchesStructure(key_size=Equals(2048)))),
- }),
- expected_status=201))
+ resp = self.post_generate(
+ {
+ "key-type": "CV2_KERNEL",
+ "description": "PPA test-owner test-archive",
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ "public-key": AfterPreprocessing(
+ lambda data: load_pem_public_key(
+ base64.b64decode(data.encode("UTF-8")),
+ default_backend(),
+ ),
+ MatchesAll(
+ IsInstance(RSAPublicKey),
+ MatchesStructure(key_size=Equals(2048)),
+ ),
+ ),
+ }
+ ),
+ expected_status=201,
+ ),
+ )
self.assertNonceConsumed()
# The new key was committed to the database.
key = Key.getByTypeAndFingerprint(
- KeyType.CV2_KERNEL, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- public_key=base64.b64decode(
- resp.json["public-key"].encode("UTF-8")),
- authorizations=[self.client]))
+ KeyType.CV2_KERNEL, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ public_key=base64.b64decode(
+ resp.json["public-key"].encode("UTF-8")
+ ),
+ authorizations=[self.client],
+ ),
+ )
def test_generate_cv2_kernel_genpkey_error(self):
processes_fixture = self.useFixture(FakeProcesses())
processes_fixture.add(lambda _: {"returncode": 1}, name="openssl")
- resp = self.post_generate({
- "key-type": "CV2_KERNEL",
- "description": "PPA test-owner test-archive",
- })
+ resp = self.post_generate(
+ {
+ "key-type": "CV2_KERNEL",
+ "description": "PPA test-owner test-archive",
+ }
+ )
error_re = (
r"Failed to generate key: "
r"Command .*'openssl', 'genpkey'.* returned non-zero exit status "
- r"1")
+ r"1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -710,13 +913,16 @@ class TestGenerateView(TestCase):
public_key = factory.generate_random_bytes(size=64)
fake_openssl = FakeOpenSSL(private_key, public_key, None)
processes_fixture.add(fake_openssl)
- resp = self.post_generate({
- "key-type": "CV2_KERNEL",
- "description": "PPA test-owner test-archive",
- })
+ resp = self.post_generate(
+ {
+ "key-type": "CV2_KERNEL",
+ "description": "PPA test-owner test-archive",
+ }
+ )
error_re = (
r"Failed to get fingerprint of new key: "
- r"Command .*'-outform', 'DER'.* returned non-zero exit status 1")
+ r"Command .*'-outform', 'DER'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -726,32 +932,51 @@ class TestGenerateView(TestCase):
{
"key-type": "ANDROID_KERNEL",
"description": "PPA test-owner test-archive",
- })
-
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "fingerprint": HasLength(40),
- "public-key": AfterPreprocessing(
- lambda data: load_pem_x509_certificate(
- base64.b64decode(data.encode("UTF-8")),
- default_backend()),
- MatchesStructure(
- subject=AfterPreprocessing(
- lambda subject: subject.rfc4514_string(),
- Equals("CN=PPA test-owner test-archive "
- "Android Kernel")))),
- }),
- expected_status=201))
+ }
+ )
+
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ "public-key": AfterPreprocessing(
+ lambda data: load_pem_x509_certificate(
+ base64.b64decode(data.encode("UTF-8")),
+ default_backend(),
+ ),
+ MatchesStructure(
+ subject=AfterPreprocessing(
+ lambda subject: subject.rfc4514_string(),
+ Equals(
+ "CN=PPA test-owner test-archive "
+ "Android Kernel"
+ ),
+ )
+ ),
+ ),
+ }
+ ),
+ expected_status=201,
+ ),
+ )
self.assertNonceConsumed()
# The new key was committed to the database.
key = Key.getByTypeAndFingerprint(
- KeyType.ANDROID_KERNEL, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- public_key=base64.b64decode(
- resp.json["public-key"].encode("UTF-8")),
- authorizations=[self.client]))
+ KeyType.ANDROID_KERNEL, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ public_key=base64.b64decode(
+ resp.json["public-key"].encode("UTF-8")
+ ),
+ authorizations=[self.client],
+ ),
+ )
def test_generate_android_kernel_req_error(self):
processes_fixture = self.useFixture(FakeProcesses())
@@ -760,10 +985,12 @@ class TestGenerateView(TestCase):
{
"key-type": "ANDROID_KERNEL",
"description": "PPA test-owner test-archive",
- })
+ }
+ )
error_re = (
r"Failed to generate key: "
- r"Command .*'openssl', 'req'.* returned non-zero exit status 1")
+ r"Command .*'openssl', 'req'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -777,10 +1004,12 @@ class TestGenerateView(TestCase):
{
"key-type": "ANDROID_KERNEL",
"description": "PPA test-owner test-archive",
- })
+ }
+ )
error_re = (
r"Failed to generate key: "
- r"Command .*'openssl', 'rsa'.* returned non-zero exit status 1")
+ r"Command .*'openssl', 'rsa'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -790,22 +1019,24 @@ class TestGenerateView(TestCase):
private_key_pkcs1 = factory.generate_random_bytes(size=64)
public_key = factory.generate_random_bytes(size=64)
fake_openssl = FakeOpenSSL(
- private_key, public_key, None, private_key_pkcs1=private_key_pkcs1)
+ private_key, public_key, None, private_key_pkcs1=private_key_pkcs1
+ )
processes_fixture.add(fake_openssl)
resp = self.post_generate(
{
"key-type": "ANDROID_KERNEL",
"description": "PPA test-owner test-archive",
- })
+ }
+ )
error_re = (
r"Failed to get fingerprint of new key: "
- r"Command .*'-fingerprint'.* returned non-zero exit status 1")
+ r"Command .*'-fingerprint'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
class TestSignView(TestCase):
-
def setUp(self):
super().setUp()
self.fixture = self.useFixture(AppFixture())
@@ -839,13 +1070,16 @@ class TestSignView(TestCase):
data = json.dumps({}).encode("UTF-8")
box = Box(
self.private_key,
- boxed_authentication.service_private_keys[0].public_key)
+ boxed_authentication.service_private_keys[0].public_key,
+ )
message = box.encrypt(data, self.nonce, encoder=Base64Encoder)
resp = self.post_sign(
headers={"X-Response-Nonce": "nonsense nonce"},
- encrypted_data=message.ciphertext)
+ encrypted_data=message.ciphertext,
+ )
self.assertThat(
- resp, HasAPIError("Cannot decode X-Response-Nonce header"))
+ resp, HasAPIError("Cannot decode X-Response-Nonce header")
+ )
def test_nonce_already_used(self):
Nonce.check(self.nonce)
@@ -886,9 +1120,11 @@ class TestSignView(TestCase):
"message-name": "",
"message": "",
"mode": "ATTACHED",
- })
+ }
+ )
self.assertThat(
- resp, HasAPIError("'key-type' is a required property at /"))
+ resp, HasAPIError("'key-type' is a required property at /")
+ )
self.assertNonceConsumed()
def test_invalid_key_type(self):
@@ -899,12 +1135,15 @@ class TestSignView(TestCase):
"message-name": "",
"message": "",
"mode": "ATTACHED",
- })
+ }
+ )
self.assertThat(
resp,
HasAPIError(
"'nonsense' is not one of ['UEFI', 'KMOD', 'OPAL', 'SIPL', "
- "'FIT', 'OPENPGP', 'CV2_KERNEL', 'ANDROID_KERNEL']"))
+ "'FIT', 'OPENPGP', 'CV2_KERNEL', 'ANDROID_KERNEL']"
+ ),
+ )
self.assertNonceConsumed()
def test_missing_fingerprint(self):
@@ -914,9 +1153,11 @@ class TestSignView(TestCase):
"message-name": "",
"message": "",
"mode": "ATTACHED",
- })
+ }
+ )
self.assertThat(
- resp, HasAPIError("'fingerprint' is a required property at /"))
+ resp, HasAPIError("'fingerprint' is a required property at /")
+ )
self.assertNonceConsumed()
def test_missing_message_name(self):
@@ -926,9 +1167,11 @@ class TestSignView(TestCase):
"fingerprint": "",
"message": "",
"mode": "ATTACHED",
- })
+ }
+ )
self.assertThat(
- resp, HasAPIError("'message-name' is a required property at /"))
+ resp, HasAPIError("'message-name' is a required property at /")
+ )
self.assertNonceConsumed()
def test_missing_message(self):
@@ -938,9 +1181,11 @@ class TestSignView(TestCase):
"fingerprint": "",
"message-name": "",
"mode": "ATTACHED",
- })
+ }
+ )
self.assertThat(
- resp, HasAPIError("'message' is a required property at /"))
+ resp, HasAPIError("'message' is a required property at /")
+ )
self.assertNonceConsumed()
def test_missing_mode(self):
@@ -949,10 +1194,12 @@ class TestSignView(TestCase):
"key-type": "UEFI",
"fingerprint": "",
"message-name": "",
- "message": ""
- })
+ "message": "",
+ }
+ )
self.assertThat(
- resp, HasAPIError("'mode' is a required property at /"))
+ resp, HasAPIError("'mode' is a required property at /")
+ )
self.assertNonceConsumed()
def test_invalid_mode(self):
@@ -963,11 +1210,14 @@ class TestSignView(TestCase):
"message-name": "",
"message": "",
"mode": "nonsense",
- })
+ }
+ )
self.assertThat(
resp,
HasAPIError(
- "'nonsense' is not one of ['ATTACHED', 'DETACHED', 'CLEAR']"))
+ "'nonsense' is not one of ['ATTACHED', 'DETACHED', 'CLEAR']"
+ ),
+ )
self.assertNonceConsumed()
def test_unknown_key(self):
@@ -978,10 +1228,12 @@ class TestSignView(TestCase):
"message-name": "",
"message": "",
"mode": "ATTACHED",
- })
+ }
+ )
self.assertThat(
resp,
- HasAPIError("No UEFI key with fingerprint 0000000000000000", 404))
+ HasAPIError("No UEFI key with fingerprint 0000000000000000", 404),
+ )
self.assertNonceConsumed()
def test_bad_message_encoding(self):
@@ -992,7 +1244,8 @@ class TestSignView(TestCase):
"message-name": "",
"message": "some nonsense",
"mode": "ATTACHED",
- })
+ }
+ )
self.assertThat(resp, HasAPIError("Cannot decode message"))
self.assertNonceConsumed()
@@ -1006,10 +1259,12 @@ class TestSignView(TestCase):
"message-name": "t.efi",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "ATTACHED",
- })
+ }
+ )
self.assertThat(
resp,
- HasAPIError(f"{self.client} is not allowed to use {key}", 403))
+ HasAPIError(f"{self.client} is not allowed to use {key}", 403),
+ )
self.assertNonceConsumed()
def test_sign_uefi_attached(self):
@@ -1026,16 +1281,29 @@ class TestSignView(TestCase):
"message-name": "t.efi",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "ATTACHED",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "public-key": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- Equals(key.public_key)),
- "signed-message": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- Equals(b"test signed data")),
- })))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "public-key": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ Equals(key.public_key),
+ ),
+ "signed-message": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ Equals(b"test signed data"),
+ ),
+ }
+ )
+ ),
+ )
self.assertNonceConsumed()
def test_sign_uefi_attached_sbsign_error(self):
@@ -1051,10 +1319,12 @@ class TestSignView(TestCase):
"message-name": "t.efi",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "ATTACHED",
- })
+ }
+ )
error_re = (
r"Failed to sign message: "
- r"Command .*'sbsign'.* returned non-zero exit status 1")
+ r"Command .*'sbsign'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -1069,10 +1339,12 @@ class TestSignView(TestCase):
"message-name": "t.efi",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "DETACHED",
- })
+ }
+ )
self.assertThat(
resp,
- HasAPIError("Signature mode DETACHED not supported with UEFI"))
+ HasAPIError("Signature mode DETACHED not supported with UEFI"),
+ )
def test_sign_uefi_clear_unsupported(self):
key = factory.create_key(key_type=KeyType.UEFI)
@@ -1085,10 +1357,11 @@ class TestSignView(TestCase):
"message-name": "t.efi",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "CLEAR",
- })
+ }
+ )
self.assertThat(
- resp,
- HasAPIError("Signature mode CLEAR not supported with UEFI"))
+ resp, HasAPIError("Signature mode CLEAR not supported with UEFI")
+ )
def test_sign_kmod_attached_unsupported(self):
key = factory.create_key(key_type=KeyType.KMOD)
@@ -1101,10 +1374,12 @@ class TestSignView(TestCase):
"message-name": "t.ko",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "ATTACHED",
- })
+ }
+ )
self.assertThat(
resp,
- HasAPIError("Signature mode ATTACHED not supported with KMOD"))
+ HasAPIError("Signature mode ATTACHED not supported with KMOD"),
+ )
def test_sign_kmod_detached(self):
key = factory.create_key(key_type=KeyType.KMOD)
@@ -1120,16 +1395,29 @@ class TestSignView(TestCase):
"message-name": "t.ko",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "DETACHED",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "public-key": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- Equals(key.public_key)),
- "signed-message": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- Equals(b"test signed data")),
- })))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "public-key": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ Equals(key.public_key),
+ ),
+ "signed-message": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ Equals(b"test signed data"),
+ ),
+ }
+ )
+ ),
+ )
self.assertNonceConsumed()
def test_sign_kmod_clear_unsupported(self):
@@ -1143,10 +1431,11 @@ class TestSignView(TestCase):
"message-name": "t.ko",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "CLEAR",
- })
+ }
+ )
self.assertThat(
- resp,
- HasAPIError("Signature mode CLEAR not supported with KMOD"))
+ resp, HasAPIError("Signature mode CLEAR not supported with KMOD")
+ )
def test_sign_kmod_kmodsign_error(self):
key = factory.create_key(key_type=KeyType.KMOD)
@@ -1161,10 +1450,12 @@ class TestSignView(TestCase):
"message-name": "t.ko",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "DETACHED",
- })
+ }
+ )
error_re = (
r"Failed to sign message: "
- r"Command .*'kmodsign'.* returned non-zero exit status 1")
+ r"Command .*'kmodsign'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -1179,10 +1470,12 @@ class TestSignView(TestCase):
"message-name": "t.opal",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "ATTACHED",
- })
+ }
+ )
self.assertThat(
resp,
- HasAPIError("Signature mode ATTACHED not supported with OPAL"))
+ HasAPIError("Signature mode ATTACHED not supported with OPAL"),
+ )
def test_sign_opal_detached(self):
key = factory.create_key(key_type=KeyType.OPAL)
@@ -1198,16 +1491,29 @@ class TestSignView(TestCase):
"message-name": "t.opal",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "DETACHED",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "public-key": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- Equals(key.public_key)),
- "signed-message": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- Equals(b"test signed data")),
- })))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "public-key": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ Equals(key.public_key),
+ ),
+ "signed-message": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ Equals(b"test signed data"),
+ ),
+ }
+ )
+ ),
+ )
self.assertNonceConsumed()
def test_sign_opal_detached_kmodsign_error(self):
@@ -1223,10 +1529,12 @@ class TestSignView(TestCase):
"message-name": "t.opal",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "DETACHED",
- })
+ }
+ )
error_re = (
r"Failed to sign message: "
- r"Command .*'kmodsign'.* returned non-zero exit status 1")
+ r"Command .*'kmodsign'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -1241,10 +1549,11 @@ class TestSignView(TestCase):
"message-name": "t.opal",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "CLEAR",
- })
+ }
+ )
self.assertThat(
- resp,
- HasAPIError("Signature mode CLEAR not supported with OPAL"))
+ resp, HasAPIError("Signature mode CLEAR not supported with OPAL")
+ )
def test_sign_sipl_attached_unsupported(self):
key = factory.create_key(key_type=KeyType.SIPL)
@@ -1257,10 +1566,12 @@ class TestSignView(TestCase):
"message-name": "t.sipl",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "ATTACHED",
- })
+ }
+ )
self.assertThat(
resp,
- HasAPIError("Signature mode ATTACHED not supported with SIPL"))
+ HasAPIError("Signature mode ATTACHED not supported with SIPL"),
+ )
def test_sign_sipl_detached(self):
key = factory.create_key(key_type=KeyType.SIPL)
@@ -1276,16 +1587,29 @@ class TestSignView(TestCase):
"message-name": "t.sipl",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "DETACHED",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "public-key": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- Equals(key.public_key)),
- "signed-message": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- Equals(b"test signed data")),
- })))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "public-key": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ Equals(key.public_key),
+ ),
+ "signed-message": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ Equals(b"test signed data"),
+ ),
+ }
+ )
+ ),
+ )
self.assertNonceConsumed()
def test_sign_sipl_detached_kmodsign_error(self):
@@ -1301,10 +1625,12 @@ class TestSignView(TestCase):
"message-name": "t.sipl",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "DETACHED",
- })
+ }
+ )
error_re = (
r"Failed to sign message: "
- r"Command .*'kmodsign'.* returned non-zero exit status 1")
+ r"Command .*'kmodsign'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -1319,10 +1645,11 @@ class TestSignView(TestCase):
"message-name": "t.sipl",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "CLEAR",
- })
+ }
+ )
self.assertThat(
- resp,
- HasAPIError("Signature mode CLEAR not supported with SIPL"))
+ resp, HasAPIError("Signature mode CLEAR not supported with SIPL")
+ )
def test_sign_fit_attached(self):
key = factory.create_key(key_type=KeyType.FIT)
@@ -1338,16 +1665,29 @@ class TestSignView(TestCase):
"message-name": "t.fit",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "ATTACHED",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "public-key": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- Equals(key.public_key)),
- "signed-message": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- Equals(b"test signed data")),
- })))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "public-key": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ Equals(key.public_key),
+ ),
+ "signed-message": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ Equals(b"test signed data"),
+ ),
+ }
+ )
+ ),
+ )
self.assertNonceConsumed()
def test_sign_fit_attached_sbsign_error(self):
@@ -1363,10 +1703,12 @@ class TestSignView(TestCase):
"message-name": "t.fit",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "ATTACHED",
- })
+ }
+ )
error_re = (
r"Failed to sign message: "
- r"Command .*'mkimage'.* returned non-zero exit status 1")
+ r"Command .*'mkimage'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -1381,10 +1723,11 @@ class TestSignView(TestCase):
"message-name": "t.fit",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "DETACHED",
- })
+ }
+ )
self.assertThat(
- resp,
- HasAPIError("Signature mode DETACHED not supported with FIT"))
+ resp, HasAPIError("Signature mode DETACHED not supported with FIT")
+ )
def test_sign_fit_clear_unsupported(self):
key = factory.create_key(key_type=KeyType.FIT)
@@ -1397,15 +1740,19 @@ class TestSignView(TestCase):
"message-name": "t.fit",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "CLEAR",
- })
+ }
+ )
self.assertThat(
- resp,
- HasAPIError("Signature mode CLEAR not supported with FIT"))
+ resp, HasAPIError("Signature mode CLEAR not supported with FIT")
+ )
def test_sign_openpgp_attached(self):
key = Key.generate(
- KeyType.OPENPGP, "~signing-owner/ubuntu/testing",
- openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA, length=1024)
+ KeyType.OPENPGP,
+ "~signing-owner/ubuntu/testing",
+ openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA,
+ length=1024,
+ )
key.addAuthorization(self.client)
store.commit()
resp = self.post_sign(
@@ -1415,23 +1762,42 @@ class TestSignView(TestCase):
"message-name": "t.gpg",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "ATTACHED",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "public-key": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- Equals(key.public_key)),
- "signed-message": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- OpenPGPSignatureVerifies(
- key.public_key, b"test data", SignatureMode.ATTACHED)),
- })))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "public-key": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ Equals(key.public_key),
+ ),
+ "signed-message": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ OpenPGPSignatureVerifies(
+ key.public_key,
+ b"test data",
+ SignatureMode.ATTACHED,
+ ),
+ ),
+ }
+ )
+ ),
+ )
self.assertNonceConsumed()
def test_sign_openpgp_detached(self):
key = Key.generate(
- KeyType.OPENPGP, "~signing-owner/ubuntu/testing",
- openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA, length=1024)
+ KeyType.OPENPGP,
+ "~signing-owner/ubuntu/testing",
+ openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA,
+ length=1024,
+ )
key.addAuthorization(self.client)
store.commit()
resp = self.post_sign(
@@ -1441,23 +1807,42 @@ class TestSignView(TestCase):
"message-name": "t.gpg",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "DETACHED",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "public-key": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- Equals(key.public_key)),
- "signed-message": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- OpenPGPSignatureVerifies(
- key.public_key, b"test data", SignatureMode.DETACHED)),
- })))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "public-key": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ Equals(key.public_key),
+ ),
+ "signed-message": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ OpenPGPSignatureVerifies(
+ key.public_key,
+ b"test data",
+ SignatureMode.DETACHED,
+ ),
+ ),
+ }
+ )
+ ),
+ )
self.assertNonceConsumed()
def test_sign_openpgp_clear(self):
key = Key.generate(
- KeyType.OPENPGP, "~signing-owner/ubuntu/testing",
- openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA, length=1024)
+ KeyType.OPENPGP,
+ "~signing-owner/ubuntu/testing",
+ openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA,
+ length=1024,
+ )
key.addAuthorization(self.client)
store.commit()
resp = self.post_sign(
@@ -1467,28 +1852,50 @@ class TestSignView(TestCase):
"message-name": "t.gpg",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "CLEAR",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "public-key": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- Equals(key.public_key)),
- "signed-message": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- OpenPGPSignatureVerifies(
- key.public_key, b"test data\n", SignatureMode.CLEAR)),
- })))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "public-key": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ Equals(key.public_key),
+ ),
+ "signed-message": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ OpenPGPSignatureVerifies(
+ key.public_key,
+ b"test data\n",
+ SignatureMode.CLEAR,
+ ),
+ ),
+ }
+ )
+ ),
+ )
self.assertNonceConsumed()
def test_sign_openpgp_sign_error(self):
key = Key.generate(
- KeyType.OPENPGP, "~signing-owner/ubuntu/testing",
- openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA, length=1024)
+ KeyType.OPENPGP,
+ "~signing-owner/ubuntu/testing",
+ openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA,
+ length=1024,
+ )
key.addAuthorization(self.client)
store.commit()
- self.useFixture(MockPatch(
- "gpg.Context.sign",
- side_effect=gpg.errors.UnsupportedAlgorithm("Boom")))
+ self.useFixture(
+ MockPatch(
+ "gpg.Context.sign",
+ side_effect=gpg.errors.UnsupportedAlgorithm("Boom"),
+ )
+ )
resp = self.post_sign(
{
"key-type": "OPENPGP",
@@ -1496,7 +1903,8 @@ class TestSignView(TestCase):
"message-name": "t.gpg",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "ATTACHED",
- })
+ }
+ )
error_re = r"Failed to sign message: Boom"
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -1512,11 +1920,14 @@ class TestSignView(TestCase):
"message-name": "t.cv2_kernel",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "ATTACHED",
- })
+ }
+ )
self.assertThat(
resp,
HasAPIError(
- "Signature mode ATTACHED not supported with CV2_KERNEL"))
+ "Signature mode ATTACHED not supported with CV2_KERNEL"
+ ),
+ )
def test_sign_cv2_kernel_detached(self):
key = factory.create_key(key_type=KeyType.CV2_KERNEL)
@@ -1532,16 +1943,29 @@ class TestSignView(TestCase):
"message-name": "t.cv2_kernel",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "DETACHED",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "public-key": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- Equals(key.public_key)),
- "signed-message": AfterPreprocessing(
- lambda data: base64.b64decode(data.encode("UTF-8")),
- Equals(b"test signed data")),
- })))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "public-key": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ Equals(key.public_key),
+ ),
+ "signed-message": AfterPreprocessing(
+ lambda data: base64.b64decode(
+ data.encode("UTF-8")
+ ),
+ Equals(b"test signed data"),
+ ),
+ }
+ )
+ ),
+ )
self.assertNonceConsumed()
def test_sign_cv2_kernel_detached_openssl_sign_error(self):
@@ -1557,10 +1981,12 @@ class TestSignView(TestCase):
"message-name": "t.cv2_kernel",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "DETACHED",
- })
+ }
+ )
error_re = (
r"Failed to sign message: "
- r"Command .*'openssl', 'dgst'.* returned non-zero exit status 1")
+ r"Command .*'openssl', 'dgst'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -1575,14 +2001,15 @@ class TestSignView(TestCase):
"message-name": "t.cv2_kernel",
"message": base64.b64encode(b"test data").decode("UTF-8"),
"mode": "CLEAR",
- })
+ }
+ )
self.assertThat(
resp,
- HasAPIError("Signature mode CLEAR not supported with CV2_KERNEL"))
+ HasAPIError("Signature mode CLEAR not supported with CV2_KERNEL"),
+ )
class TestInjectView(TestCase):
-
def setUp(self):
super().setUp()
self.fixture = self.useFixture(AppFixture())
@@ -1601,7 +2028,8 @@ class TestInjectView(TestCase):
kwargs.setdefault("nonce", self.nonces[index])
kwargs.setdefault("response_nonce", self.response_nonce)
return self.fixture.client.post(
- "/inject", json_data=json_data, **kwargs)
+ "/inject", json_data=json_data, **kwargs
+ )
def test_unauthenticated(self):
resp = self.post_inject(private_key=None)
@@ -1609,7 +2037,8 @@ class TestInjectView(TestCase):
def test_undecodable_client_public_key(self):
resp = self.post_inject(
- headers={"X-Client-Public-Key": "nonsense key"})
+ headers={"X-Client-Public-Key": "nonsense key"}
+ )
self.assertThat(resp, HasAPIError("Cannot decode client public key"))
def test_undecodable_nonce(self):
@@ -1620,13 +2049,16 @@ class TestInjectView(TestCase):
data = json.dumps({}).encode("UTF-8")
box = Box(
self.private_keys[0],
- boxed_authentication.service_private_keys[0].public_key)
+ boxed_authentication.service_private_keys[0].public_key,
+ )
message = box.encrypt(data, self.nonces[0], encoder=Base64Encoder)
resp = self.post_inject(
headers={"X-Response-Nonce": "nonsense nonce"},
- encrypted_data=message.ciphertext)
+ encrypted_data=message.ciphertext,
+ )
self.assertThat(
- resp, HasAPIError("Cannot decode X-Response-Nonce header"))
+ resp, HasAPIError("Cannot decode X-Response-Nonce header")
+ )
def test_nonce_already_used(self):
Nonce.check(self.nonces[0])
@@ -1661,63 +2093,92 @@ class TestInjectView(TestCase):
self.assertNonceConsumed()
def test_missing_key_type(self):
- resp = self.post_inject({"private-key": "",
- "public-key": "",
- "created-at": "",
- "description": ""})
+ resp = self.post_inject(
+ {
+ "private-key": "",
+ "public-key": "",
+ "created-at": "",
+ "description": "",
+ }
+ )
self.assertThat(
- resp, HasAPIError("'key-type' is a required property at /"))
+ resp, HasAPIError("'key-type' is a required property at /")
+ )
self.assertNonceConsumed()
def test_missing_private_key(self):
- resp = self.post_inject({"key-type": "UEFI",
- "public-key": "",
- "created-at": "",
- "description": ""})
+ resp = self.post_inject(
+ {
+ "key-type": "UEFI",
+ "public-key": "",
+ "created-at": "",
+ "description": "",
+ }
+ )
self.assertThat(
- resp, HasAPIError("'private-key' is a required property at /"))
+ resp, HasAPIError("'private-key' is a required property at /")
+ )
self.assertNonceConsumed()
def test_missing_public_key(self):
- resp = self.post_inject({"key-type": "UEFI",
- "private-key": "",
- "created-at": "",
- "description": ""})
+ resp = self.post_inject(
+ {
+ "key-type": "UEFI",
+ "private-key": "",
+ "created-at": "",
+ "description": "",
+ }
+ )
self.assertThat(
- resp, HasAPIError("'public-key' is a required property at /"))
+ resp, HasAPIError("'public-key' is a required property at /")
+ )
self.assertNonceConsumed()
def test_missing_created_at(self):
- resp = self.post_inject({"key-type": "UEFI",
- "public-key": "",
- "private-key": "",
- "description": ""})
+ resp = self.post_inject(
+ {
+ "key-type": "UEFI",
+ "public-key": "",
+ "private-key": "",
+ "description": "",
+ }
+ )
self.assertThat(
- resp, HasAPIError("'created-at' is a required property at /"))
+ resp, HasAPIError("'created-at' is a required property at /")
+ )
self.assertNonceConsumed()
def test_missing_description(self):
- resp = self.post_inject({"key-type": "UEFI",
- "public-key": "",
- "private-key": "",
- "created-at": ""})
+ resp = self.post_inject(
+ {
+ "key-type": "UEFI",
+ "public-key": "",
+ "private-key": "",
+ "created-at": "",
+ }
+ )
self.assertThat(
- resp, HasAPIError("'description' is a required property at /"))
+ resp, HasAPIError("'description' is a required property at /")
+ )
self.assertNonceConsumed()
def test_invalid_created_at_format(self):
private_key = factory.generate_random_bytes(size=64)
public_key = factory.generate_random_bytes(size=64)
- resp = self.post_inject({
- "key-type": "UEFI",
- "private-key": base64.b64encode(private_key).decode("UTF-8"),
- "public-key": base64.b64encode(public_key).decode("UTF-8"),
- "description": "",
- "created-at": "xxx"})
+ resp = self.post_inject(
+ {
+ "key-type": "UEFI",
+ "private-key": base64.b64encode(private_key).decode("UTF-8"),
+ "public-key": base64.b64encode(public_key).decode("UTF-8"),
+ "description": "",
+ "created-at": "xxx",
+ }
+ )
self.assertThat(
resp,
- HasAPIError("created-at 'xxx' is not a valid datetime format"))
+ HasAPIError("created-at 'xxx' is not a valid datetime format"),
+ )
self.assertNonceConsumed()
def test_invalid_key_type(self):
@@ -1726,7 +2187,9 @@ class TestInjectView(TestCase):
resp,
HasAPIError(
"'nonsense' is not one of ['UEFI', 'KMOD', 'OPAL', 'SIPL', "
- "'FIT', 'OPENPGP', 'CV2_KERNEL', 'ANDROID_KERNEL']"))
+ "'FIT', 'OPENPGP', 'CV2_KERNEL', 'ANDROID_KERNEL']"
+ ),
+ )
self.assertNonceConsumed()
def test_inject_uefi(self):
@@ -1734,36 +2197,47 @@ class TestInjectView(TestCase):
# fingerprint
common_name = Key._generateKeyCommonName(
- "UEFI test description",
- 'UEFI')
+ "UEFI test description", "UEFI"
+ )
with TemporaryDirectory() as tmp:
private_key, public_key = Key._generateKeyCertPair(
- Path(tmp), KeyType.UEFI, common_name)
+ Path(tmp), KeyType.UEFI, common_name
+ )
now_with_tz = datetime.now().replace(tzinfo=pytz.utc)
resp = self.post_inject(
{
"key-type": "UEFI",
- "private-key": base64.b64encode(
- private_key).decode("UTF-8"),
- "public-key": base64.b64encode(
- public_key).decode("UTF-8"),
+ "private-key": base64.b64encode(private_key).decode("UTF-8"),
+ "public-key": base64.b64encode(public_key).decode("UTF-8"),
"created-at": now_with_tz.isoformat(),
"description": "UEFI test description",
- })
+ }
+ )
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "fingerprint": HasLength(40),
- }),
- expected_status=200))
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ }
+ ),
+ expected_status=200,
+ ),
+ )
self.assertNonceConsumed()
# The new key was committed to the database.
key = Key.getByTypeAndFingerprint(
- KeyType.UEFI, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- authorizations=[self.clients[0]]))
+ KeyType.UEFI, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ authorizations=[self.clients[0]],
+ ),
+ )
def test_inject_uefi_fingerprint_error(self):
processes_fixture = self.useFixture(FakeProcesses())
@@ -1774,16 +2248,16 @@ class TestInjectView(TestCase):
resp = self.post_inject(
{
"key-type": "UEFI",
- "private-key": base64.b64encode(
- private_key).decode("UTF-8"),
- "public-key": base64.b64encode(
- public_key).decode("UTF-8"),
+ "private-key": base64.b64encode(private_key).decode("UTF-8"),
+ "public-key": base64.b64encode(public_key).decode("UTF-8"),
"created-at": (datetime.utcnow()).isoformat(),
"description": "UEFI test description",
- })
+ }
+ )
error_re = (
r"Failed to get fingerprint of new key: "
- r"Command .*'-fingerprint'.* returned non-zero exit status 1")
+ r"Command .*'-fingerprint'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -1791,34 +2265,45 @@ class TestInjectView(TestCase):
# Integration test: generate and return a real KMOD key.
common_name = Key._generateKeyCommonName(
- "PPA test-owner test-archive",
- 'KMOD')
+ "PPA test-owner test-archive", "KMOD"
+ )
with TemporaryDirectory() as tmp:
private_key, public_key = Key._generatePEMX509(
- Path(tmp), KeyType.KMOD, common_name)
+ Path(tmp), KeyType.KMOD, common_name
+ )
resp = self.post_inject(
{
"key-type": "KMOD",
- "private-key": base64.b64encode(
- private_key).decode("UTF-8"),
- "public-key": base64.b64encode(
- public_key).decode("UTF-8"),
+ "private-key": base64.b64encode(private_key).decode("UTF-8"),
+ "public-key": base64.b64encode(public_key).decode("UTF-8"),
"created-at": (datetime.utcnow()).isoformat(),
"description": "KMOD test description",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "fingerprint": HasLength(40),
- }),
- expected_status=200))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ }
+ ),
+ expected_status=200,
+ ),
+ )
self.assertNonceConsumed()
# The new key was committed to the database.
key = Key.getByTypeAndFingerprint(
- KeyType.KMOD, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- authorizations=[self.clients[0]]))
+ KeyType.KMOD, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ authorizations=[self.clients[0]],
+ ),
+ )
def test_inject_kmod_fingerprint_error(self):
processes_fixture = self.useFixture(FakeProcesses())
@@ -1829,16 +2314,16 @@ class TestInjectView(TestCase):
resp = self.post_inject(
{
"key-type": "KMOD",
- "private-key": base64.b64encode(
- private_key).decode("UTF-8"),
- "public-key": base64.b64encode(
- public_key).decode("UTF-8"),
+ "private-key": base64.b64encode(private_key).decode("UTF-8"),
+ "public-key": base64.b64encode(public_key).decode("UTF-8"),
"created-at": (datetime.utcnow()).isoformat(),
"description": "PPA test-owner test-archive",
- })
+ }
+ )
error_re = (
r"Failed to get fingerprint of new key: "
- r"Command .*'-fingerprint'.* returned non-zero exit status 1")
+ r"Command .*'-fingerprint'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -1846,33 +2331,45 @@ class TestInjectView(TestCase):
# Integration test: generate and return a real OPAL key.
common_name = Key._generateKeyCommonName(
- "PPA test-owner test-archive", 'OPAL')
+ "PPA test-owner test-archive", "OPAL"
+ )
with TemporaryDirectory() as tmp:
private_key, public_key = Key._generatePEMX509(
- Path(tmp), KeyType.OPAL, common_name)
+ Path(tmp), KeyType.OPAL, common_name
+ )
resp = self.post_inject(
{
"key-type": "OPAL",
- "private-key": base64.b64encode(
- private_key).decode("UTF-8"),
- "public-key": base64.b64encode(
- public_key).decode("UTF-8"),
+ "private-key": base64.b64encode(private_key).decode("UTF-8"),
+ "public-key": base64.b64encode(public_key).decode("UTF-8"),
"created-at": (datetime.utcnow()).isoformat(),
"description": "PPA test-owner test-archive",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "fingerprint": HasLength(40),
- }),
- expected_status=200))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ }
+ ),
+ expected_status=200,
+ ),
+ )
self.assertNonceConsumed()
# The new key was committed to the database.
key = Key.getByTypeAndFingerprint(
- KeyType.OPAL, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- authorizations=[self.clients[0]]))
+ KeyType.OPAL, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ authorizations=[self.clients[0]],
+ ),
+ )
def test_inject_opal_fingerprint_error(self):
processes_fixture = self.useFixture(FakeProcesses())
@@ -1887,10 +2384,12 @@ class TestInjectView(TestCase):
"public-key": "",
"created-at": (datetime.utcnow()).isoformat(),
"description": "PPA test-owner test-archive",
- })
+ }
+ )
error_re = (
r"Failed to get fingerprint of new key: "
- r"Command .*'-fingerprint'.* returned non-zero exit status 1")
+ r"Command .*'-fingerprint'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -1898,33 +2397,45 @@ class TestInjectView(TestCase):
# Integration test: generate and return a real SIPL key.
common_name = Key._generateKeyCommonName(
- "PPA test-owner test-archive", 'SIPL')
+ "PPA test-owner test-archive", "SIPL"
+ )
with TemporaryDirectory() as tmp:
private_key, public_key = Key._generatePEMX509(
- Path(tmp), KeyType.SIPL, common_name)
+ Path(tmp), KeyType.SIPL, common_name
+ )
resp = self.post_inject(
{
"key-type": "SIPL",
- "private-key": base64.b64encode(
- private_key).decode("UTF-8"),
- "public-key": base64.b64encode(
- public_key).decode("UTF-8"),
+ "private-key": base64.b64encode(private_key).decode("UTF-8"),
+ "public-key": base64.b64encode(public_key).decode("UTF-8"),
"created-at": (datetime.utcnow()).isoformat(),
"description": "PPA test-owner test-archive",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "fingerprint": HasLength(40),
- }),
- expected_status=200))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ }
+ ),
+ expected_status=200,
+ ),
+ )
self.assertNonceConsumed()
# The new key was committed to the database.
key = Key.getByTypeAndFingerprint(
- KeyType.SIPL, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- authorizations=[self.clients[0]]))
+ KeyType.SIPL, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ authorizations=[self.clients[0]],
+ ),
+ )
def test_inject_sipl_fingerprint_error(self):
processes_fixture = self.useFixture(FakeProcesses())
@@ -1939,10 +2450,12 @@ class TestInjectView(TestCase):
"public-key": "",
"created-at": (datetime.utcnow()).isoformat(),
"description": "PPA test-owner test-archive",
- })
+ }
+ )
error_re = (
r"Failed to get fingerprint of new key: "
- r"Command .*'-fingerprint'.* returned non-zero exit status 1")
+ r"Command .*'-fingerprint'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -1950,33 +2463,45 @@ class TestInjectView(TestCase):
# Integration test: generate and return a real FIT key.
common_name = Key._generateKeyCommonName(
- "PPA test-owner test-archive", 'FIT')
+ "PPA test-owner test-archive", "FIT"
+ )
with TemporaryDirectory() as tmp:
private_key, public_key = Key._generateKeyCertPair(
- Path(tmp), KeyType.FIT, common_name)
+ Path(tmp), KeyType.FIT, common_name
+ )
resp = self.post_inject(
{
"key-type": "FIT",
- "private-key": base64.b64encode(
- private_key).decode("UTF-8"),
- "public-key": base64.b64encode(
- public_key).decode("UTF-8"),
+ "private-key": base64.b64encode(private_key).decode("UTF-8"),
+ "public-key": base64.b64encode(public_key).decode("UTF-8"),
"created-at": (datetime.utcnow()).isoformat(),
"description": "PPA test-owner test-archive",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "fingerprint": HasLength(40),
- }),
- expected_status=200))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ }
+ ),
+ expected_status=200,
+ ),
+ )
self.assertNonceConsumed()
# The new key was committed to the database.
key = Key.getByTypeAndFingerprint(
- KeyType.FIT, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- authorizations=[self.clients[0]]))
+ KeyType.FIT, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ authorizations=[self.clients[0]],
+ ),
+ )
def test_inject_fit_fingerprint_error(self):
processes_fixture = self.useFixture(FakeProcesses())
@@ -1991,10 +2516,12 @@ class TestInjectView(TestCase):
"public-key": "",
"created-at": (datetime.utcnow()).isoformat(),
"description": "PPA test-owner test-archive",
- })
+ }
+ )
error_re = (
r"Failed to get fingerprint of new key: "
- r"Command .*'-fingerprint'.* returned non-zero exit status 1")
+ r"Command .*'-fingerprint'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -2003,38 +2530,54 @@ class TestInjectView(TestCase):
# fingerprint.
with TemporaryDirectory() as tmp, _gpg_context(Path(tmp)) as ctx:
private_key, public_key, fingerprint = Key._generateOpenPGP(
- ctx, OpenPGPKeyAlgorithm.RSA, 1024, "OpenPGP test description")
+ ctx, OpenPGPKeyAlgorithm.RSA, 1024, "OpenPGP test description"
+ )
now_with_tz = datetime.now().replace(tzinfo=pytz.utc)
- resp = self.post_inject({
- "key-type": "OPENPGP",
- "private-key": base64.b64encode(private_key).decode("UTF-8"),
- "public-key": base64.b64encode(public_key).decode("UTF-8"),
- "created-at": now_with_tz.isoformat(),
- "description": "OpenPGP test description",
- })
-
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "fingerprint": HasLength(40),
- }),
- expected_status=200))
+ resp = self.post_inject(
+ {
+ "key-type": "OPENPGP",
+ "private-key": base64.b64encode(private_key).decode("UTF-8"),
+ "public-key": base64.b64encode(public_key).decode("UTF-8"),
+ "created-at": now_with_tz.isoformat(),
+ "description": "OpenPGP test description",
+ }
+ )
+
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ }
+ ),
+ expected_status=200,
+ ),
+ )
self.assertNonceConsumed()
# The new key was committed to the database.
key = Key.getByTypeAndFingerprint(
- KeyType.OPENPGP, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- authorizations=[self.clients[0]]))
+ KeyType.OPENPGP, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ authorizations=[self.clients[0]],
+ ),
+ )
def test_inject_openpgp_fingerprint_error(self):
- resp = self.post_inject({
- "key-type": "OPENPGP",
- "private-key": base64.b64encode(b"").decode("UTF-8"),
- "public-key": base64.b64encode(b"").decode("UTF-8"),
- "created-at": datetime.utcnow().isoformat(),
- "description": "OpenPGP test description",
- })
+ resp = self.post_inject(
+ {
+ "key-type": "OPENPGP",
+ "private-key": base64.b64encode(b"").decode("UTF-8"),
+ "public-key": base64.b64encode(b"").decode("UTF-8"),
+ "created-at": datetime.utcnow().isoformat(),
+ "description": "OpenPGP test description",
+ }
+ )
error_re = r"Failed to get fingerprint of new key: IMPORT_PROBLEM"
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
@@ -2044,25 +2587,37 @@ class TestInjectView(TestCase):
# fingerprint.
with TemporaryDirectory() as tmp:
private_key, public_key = Key._generateRSA(
- Path(tmp), KeyType.CV2_KERNEL)
-
- resp = self.post_inject({
- "key-type": "CV2_KERNEL",
- "private-key": base64.b64encode(private_key).decode("UTF-8"),
- "public-key": base64.b64encode(public_key).decode("UTF-8"),
- "created-at": datetime.utcnow().isoformat(),
- "description": "PPA test-owner test-archive",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({"fingerprint": HasLength(40)}),
- expected_status=200))
+ Path(tmp), KeyType.CV2_KERNEL
+ )
+
+ resp = self.post_inject(
+ {
+ "key-type": "CV2_KERNEL",
+ "private-key": base64.b64encode(private_key).decode("UTF-8"),
+ "public-key": base64.b64encode(public_key).decode("UTF-8"),
+ "created-at": datetime.utcnow().isoformat(),
+ "description": "PPA test-owner test-archive",
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict({"fingerprint": HasLength(40)}),
+ expected_status=200,
+ ),
+ )
self.assertNonceConsumed()
# The new key was committed to the database.
key = Key.getByTypeAndFingerprint(
- KeyType.CV2_KERNEL, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- authorizations=[self.clients[0]]))
+ KeyType.CV2_KERNEL, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ authorizations=[self.clients[0]],
+ ),
+ )
def test_inject_cv2_kernel_fingerprint_error(self):
processes_fixture = self.useFixture(FakeProcesses())
@@ -2070,218 +2625,328 @@ class TestInjectView(TestCase):
public_key = factory.generate_random_bytes(size=64)
fake_openssl = FakeOpenSSL(private_key, public_key, None)
processes_fixture.add(fake_openssl)
- resp = self.post_inject({
- "key-type": "CV2_KERNEL",
- "private-key": "",
- "public-key": "",
- "created-at": datetime.utcnow().isoformat(),
- "description": "PPA test-owner test-archive",
- })
+ resp = self.post_inject(
+ {
+ "key-type": "CV2_KERNEL",
+ "private-key": "",
+ "public-key": "",
+ "created-at": datetime.utcnow().isoformat(),
+ "description": "PPA test-owner test-archive",
+ }
+ )
error_re = (
r"Failed to get fingerprint of new key: "
- r"Command .*'-outform', 'DER'.* returned non-zero exit status 1")
+ r"Command .*'-outform', 'DER'.* returned non-zero exit status 1"
+ )
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
def test_inject_duplicate_key_different_clients(self):
common_name = Key._generateKeyCommonName(
- "PPA test-owner test-archive", 'FIT')
+ "PPA test-owner test-archive", "FIT"
+ )
with TemporaryDirectory() as tmp:
private_key, public_key = Key._generateKeyCertPair(
- Path(tmp), KeyType.FIT, common_name)
+ Path(tmp), KeyType.FIT, common_name
+ )
resp = self.post_inject(
{
"key-type": "FIT",
- "private-key": base64.b64encode(
- bytes(private_key)).decode("UTF-8"),
- "public-key": base64.b64encode(
- bytes(public_key)).decode("UTF-8"),
+ "private-key": base64.b64encode(bytes(private_key)).decode(
+ "UTF-8"
+ ),
+ "public-key": base64.b64encode(bytes(public_key)).decode(
+ "UTF-8"
+ ),
"created-at": (datetime.utcnow()).isoformat(),
"description": "PPA test-owner test-archive",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "fingerprint": HasLength(40),
- }),
- expected_status=200))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ }
+ ),
+ expected_status=200,
+ ),
+ )
self.assertNonceConsumed()
key = Key.getByTypeAndFingerprint(
- KeyType.FIT, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- authorizations=[self.clients[0]]))
+ KeyType.FIT, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ authorizations=[self.clients[0]],
+ ),
+ )
resp = self.post_inject(
{
"key-type": "FIT",
- "private-key": base64.b64encode(
- bytes(private_key)).decode("UTF-8"),
- "public-key": base64.b64encode(
- bytes(public_key)).decode("UTF-8"),
+ "private-key": base64.b64encode(bytes(private_key)).decode(
+ "UTF-8"
+ ),
+ "public-key": base64.b64encode(bytes(public_key)).decode(
+ "UTF-8"
+ ),
"created-at": (datetime.utcnow()).isoformat(),
"description": "PPA test-owner test-archive",
- }, index=1)
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "fingerprint": HasLength(40),
- }),
- expected_status=200))
+ },
+ index=1,
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ }
+ ),
+ expected_status=200,
+ ),
+ )
self.assertNonceConsumed()
key = Key.getByTypeAndFingerprint(
- KeyType.FIT, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- authorizations=[self.clients[0], self.clients[1]]))
+ KeyType.FIT, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ authorizations=[self.clients[0], self.clients[1]],
+ ),
+ )
def test_inject_duplicate_key_from_same_client(self):
common_name = Key._generateKeyCommonName(
- "PPA test-owner test-archive", 'FIT')
+ "PPA test-owner test-archive", "FIT"
+ )
with TemporaryDirectory() as tmp:
private_key, public_key = Key._generateKeyCertPair(
- Path(tmp), KeyType.FIT, common_name)
+ Path(tmp), KeyType.FIT, common_name
+ )
private_key = bytes(private_key)
public_key = bytes(public_key)
client = self.clients[0]
- resp = self.post_inject({
- "key-type": "FIT",
- "private-key": base64.b64encode(private_key).decode("UTF-8"),
- "public-key": base64.b64encode(public_key).decode("UTF-8"),
- "created-at": (datetime.utcnow()).isoformat(),
- "description": "PPA test-owner test-archive"})
- self.assertThat(resp, IsJSONResponse(MatchesDict({
- "fingerprint": HasLength(40),
- }),
- expected_status=200))
+ resp = self.post_inject(
+ {
+ "key-type": "FIT",
+ "private-key": base64.b64encode(private_key).decode("UTF-8"),
+ "public-key": base64.b64encode(public_key).decode("UTF-8"),
+ "created-at": (datetime.utcnow()).isoformat(),
+ "description": "PPA test-owner test-archive",
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ }
+ ),
+ expected_status=200,
+ ),
+ )
self.assertNonceConsumed()
key = Key.getByTypeAndFingerprint(
- KeyType.FIT, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- authorizations=[client]))
+ KeyType.FIT, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"], authorizations=[client]
+ ),
+ )
# Regenerate nonces (consumed by the previous request).
self.nonces = [Nonce.generate().nonce for _ in range(2)]
store.commit()
- resp = self.post_inject({
- "key-type": "FIT",
- "private-key": base64.b64encode(private_key).decode("UTF-8"),
- "public-key": base64.b64encode(public_key).decode("UTF-8"),
- "created-at": (datetime.utcnow()).isoformat(),
- "description": "PPA test-owner test-archive"})
- self.assertThat(resp, IsJSONResponse(MatchesDict({
- "fingerprint": HasLength(40),
- }),
- expected_status=200))
+ resp = self.post_inject(
+ {
+ "key-type": "FIT",
+ "private-key": base64.b64encode(private_key).decode("UTF-8"),
+ "public-key": base64.b64encode(public_key).decode("UTF-8"),
+ "created-at": (datetime.utcnow()).isoformat(),
+ "description": "PPA test-owner test-archive",
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ }
+ ),
+ expected_status=200,
+ ),
+ )
self.assertNonceConsumed()
key = Key.getByTypeAndFingerprint(
- KeyType.FIT, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- authorizations=[client]))
+ KeyType.FIT, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"], authorizations=[client]
+ ),
+ )
def test_inject_update_private_public_material(self):
common_name = Key._generateKeyCommonName(
- "PPA test-owner test-archive", 'FIT')
+ "PPA test-owner test-archive", "FIT"
+ )
with TemporaryDirectory() as tmp:
private_key, public_key = Key._generateKeyCertPair(
- Path(tmp), KeyType.FIT, common_name)
+ Path(tmp), KeyType.FIT, common_name
+ )
resp = self.post_inject(
{
"key-type": "FIT",
- "private-key": base64.b64encode(
- bytes(private_key)).decode("UTF-8"),
- "public-key": base64.b64encode(
- bytes(public_key)).decode("UTF-8"),
+ "private-key": base64.b64encode(bytes(private_key)).decode(
+ "UTF-8"
+ ),
+ "public-key": base64.b64encode(bytes(public_key)).decode(
+ "UTF-8"
+ ),
"created-at": (datetime.utcnow()).isoformat(),
"description": "PPA test-owner test-archive",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "fingerprint": HasLength(40),
- }),
- expected_status=200))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ }
+ ),
+ expected_status=200,
+ ),
+ )
self.assertNonceConsumed()
key = Key.getByTypeAndFingerprint(
- KeyType.FIT, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- authorizations=[self.clients[0]]))
+ KeyType.FIT, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ authorizations=[self.clients[0]],
+ ),
+ )
private_key_material = factory.generate_random_bytes(size=64)
resp = self.post_inject(
{
"key-type": "FIT",
"private-key": base64.b64encode(
- bytes(private_key_material)).decode("UTF-8"),
- "public-key": base64.b64encode(
- bytes(public_key)).decode("UTF-8"),
+ bytes(private_key_material)
+ ).decode("UTF-8"),
+ "public-key": base64.b64encode(bytes(public_key)).decode(
+ "UTF-8"
+ ),
"created-at": (datetime.utcnow()).isoformat(),
"description": "PPA test-owner test-archive",
- }, index=1)
+ },
+ index=1,
+ )
self.assertNonceConsumed()
- self.assertThat(resp, HasAPIError("Rejected attempt to update already "
- "existent private and public key "
- "material with fingerprint %s"
- % key.fingerprint,
- expected_status=409))
+ self.assertThat(
+ resp,
+ HasAPIError(
+ "Rejected attempt to update already "
+ "existent private and public key "
+ "material with fingerprint %s" % key.fingerprint,
+ expected_status=409,
+ ),
+ )
def test_inject_update_created_at(self):
common_name = Key._generateKeyCommonName(
- "PPA test-owner test-archive", 'FIT')
+ "PPA test-owner test-archive", "FIT"
+ )
with TemporaryDirectory() as tmp:
private_key, public_key = Key._generateKeyCertPair(
- Path(tmp), KeyType.FIT, common_name)
+ Path(tmp), KeyType.FIT, common_name
+ )
existent_date = datetime.utcnow()
resp = self.post_inject(
{
"key-type": "FIT",
- "private-key": base64.b64encode(
- bytes(private_key)).decode("UTF-8"),
- "public-key": base64.b64encode(
- bytes(public_key)).decode("UTF-8"),
+ "private-key": base64.b64encode(bytes(private_key)).decode(
+ "UTF-8"
+ ),
+ "public-key": base64.b64encode(bytes(public_key)).decode(
+ "UTF-8"
+ ),
"created-at": existent_date.isoformat(),
"description": "PPA test-owner test-archive",
- })
- self.assertThat(resp, IsJSONResponse(
- MatchesDict({
- "fingerprint": HasLength(40),
- }),
- expected_status=200))
+ }
+ )
+ self.assertThat(
+ resp,
+ IsJSONResponse(
+ MatchesDict(
+ {
+ "fingerprint": HasLength(40),
+ }
+ ),
+ expected_status=200,
+ ),
+ )
self.assertNonceConsumed()
key = Key.getByTypeAndFingerprint(
- KeyType.FIT, resp.json["fingerprint"])
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- authorizations=[self.clients[0]]))
+ KeyType.FIT, resp.json["fingerprint"]
+ )
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ authorizations=[self.clients[0]],
+ ),
+ )
older_date = datetime.utcnow() - timedelta(days=3)
resp = self.post_inject(
{
"key-type": "FIT",
- "private-key": base64.b64encode(
- bytes(private_key)).decode("UTF-8"),
- "public-key": base64.b64encode(
- bytes(public_key)).decode("UTF-8"),
+ "private-key": base64.b64encode(bytes(private_key)).decode(
+ "UTF-8"
+ ),
+ "public-key": base64.b64encode(bytes(public_key)).decode(
+ "UTF-8"
+ ),
"created-at": older_date.isoformat(),
"description": "PPA test-owner test-archive",
- }, index=1)
+ },
+ index=1,
+ )
self.assertNonceConsumed()
- self.assertThat(key, MatchesStructure.byEquality(
- fingerprint=resp.json["fingerprint"],
- authorizations=[self.clients[0],
- self.clients[1]]))
+ self.assertThat(
+ key,
+ MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ authorizations=[self.clients[0], self.clients[1]],
+ ),
+ )
self.assertEqual(pytz.utc.localize(older_date), key.created_at)
class TestAddAuthorizationView(TestCase):
-
def setUp(self):
super().setUp()
self.fixture = self.useFixture(AppFixture())
@@ -2299,7 +2964,8 @@ class TestAddAuthorizationView(TestCase):
kwargs.setdefault("nonce", self.nonces[index])
kwargs.setdefault("response_nonce", self.response_nonce)
return self.fixture.client.post(
- "/authorizations/add", json_data=json_data, **kwargs)
+ "/authorizations/add", json_data=json_data, **kwargs
+ )
def test_unauthenticated(self):
resp = self.post_add_authorization(private_key=None)
@@ -2307,25 +2973,30 @@ class TestAddAuthorizationView(TestCase):
def test_undecodable_client_public_key(self):
resp = self.post_add_authorization(
- headers={"X-Client-Public-Key": "nonsense key"})
+ headers={"X-Client-Public-Key": "nonsense key"}
+ )
self.assertThat(resp, HasAPIError("Cannot decode client public key"))
def test_undecodable_nonce(self):
resp = self.post_add_authorization(
- headers={"X-Nonce": "nonsense nonce"})
+ headers={"X-Nonce": "nonsense nonce"}
+ )
self.assertThat(resp, HasAPIError("Cannot decode X-Nonce header"))
def test_undecodable_response_nonce(self):
data = json.dumps({}).encode("UTF-8")
box = Box(
self.private_keys[0],
- boxed_authentication.service_private_keys[0].public_key)
+ boxed_authentication.service_private_keys[0].public_key,
+ )
message = box.encrypt(data, self.nonces[0], encoder=Base64Encoder)
resp = self.post_add_authorization(
headers={"X-Response-Nonce": "nonsense nonce"},
- encrypted_data=message.ciphertext)
+ encrypted_data=message.ciphertext,
+ )
self.assertThat(
- resp, HasAPIError("Cannot decode X-Response-Nonce header"))
+ resp, HasAPIError("Cannot decode X-Response-Nonce header")
+ )
def test_nonce_already_used(self):
Nonce.check(self.nonces[0])
@@ -2360,80 +3031,101 @@ class TestAddAuthorizationView(TestCase):
self.assertNonceConsumed()
def test_missing_key_type(self):
- resp = self.post_add_authorization({
- "fingerprint": "",
- "client-name": "",
- })
+ resp = self.post_add_authorization(
+ {
+ "fingerprint": "",
+ "client-name": "",
+ }
+ )
self.assertThat(
- resp, HasAPIError("'key-type' is a required property at /"))
+ resp, HasAPIError("'key-type' is a required property at /")
+ )
self.assertNonceConsumed()
def test_missing_fingerprint(self):
- resp = self.post_add_authorization({
- "key-type": "",
- "client-name": "",
- })
+ resp = self.post_add_authorization(
+ {
+ "key-type": "",
+ "client-name": "",
+ }
+ )
self.assertThat(
- resp, HasAPIError("'fingerprint' is a required property at /"))
+ resp, HasAPIError("'fingerprint' is a required property at /")
+ )
self.assertNonceConsumed()
def test_missing_client_name(self):
- resp = self.post_add_authorization({
- "key-type": "",
- "fingerprint": "",
- })
+ resp = self.post_add_authorization(
+ {
+ "key-type": "",
+ "fingerprint": "",
+ }
+ )
self.assertThat(
- resp, HasAPIError("'client-name' is a required property at /"))
+ resp, HasAPIError("'client-name' is a required property at /")
+ )
self.assertNonceConsumed()
def test_invalid_key_type(self):
- resp = self.post_add_authorization({
- "key-type": "nonsense",
- "fingerprint": "",
- "client-name": "",
- })
+ resp = self.post_add_authorization(
+ {
+ "key-type": "nonsense",
+ "fingerprint": "",
+ "client-name": "",
+ }
+ )
self.assertThat(
resp,
HasAPIError(
"'nonsense' is not one of ['UEFI', 'KMOD', 'OPAL', 'SIPL', "
- "'FIT', 'OPENPGP', 'CV2_KERNEL', 'ANDROID_KERNEL']"))
+ "'FIT', 'OPENPGP', 'CV2_KERNEL', 'ANDROID_KERNEL']"
+ ),
+ )
self.assertNonceConsumed()
def test_unknown_key(self):
- resp = self.post_add_authorization({
- "key-type": "UEFI",
- "fingerprint": "0" * 16,
- "client-name": "",
- })
+ resp = self.post_add_authorization(
+ {
+ "key-type": "UEFI",
+ "fingerprint": "0" * 16,
+ "client-name": "",
+ }
+ )
self.assertThat(
resp,
- HasAPIError("No UEFI key with fingerprint 0000000000000000", 404))
+ HasAPIError("No UEFI key with fingerprint 0000000000000000", 404),
+ )
self.assertNonceConsumed()
def test_unknown_client(self):
key = factory.create_key(key_type=KeyType.UEFI)
store.commit()
- resp = self.post_add_authorization({
- "key-type": "UEFI",
- "fingerprint": key.fingerprint,
- "client-name": "nonsense",
- })
+ resp = self.post_add_authorization(
+ {
+ "key-type": "UEFI",
+ "fingerprint": key.fingerprint,
+ "client-name": "nonsense",
+ }
+ )
self.assertThat(
- resp, HasAPIError("No registered client named nonsense"))
+ resp, HasAPIError("No registered client named nonsense")
+ )
self.assertNonceConsumed()
def test_client_not_allowed(self):
key = factory.create_key(key_type=KeyType.UEFI)
store.commit()
- resp = self.post_add_authorization({
- "key-type": "UEFI",
- "fingerprint": key.fingerprint,
- "client-name": self.clients[1].name,
- })
+ resp = self.post_add_authorization(
+ {
+ "key-type": "UEFI",
+ "fingerprint": key.fingerprint,
+ "client-name": self.clients[1].name,
+ }
+ )
self.assertThat(
resp,
- HasAPIError(
- f"{self.clients[0]} is not allowed to use {key}", 403))
+ HasAPIError(f"{self.clients[0]} is not allowed to use {key}", 403),
+ )
self.assertNonceConsumed()
def test_add_authorization(self):
@@ -2441,11 +3133,13 @@ class TestAddAuthorizationView(TestCase):
key.addAuthorization(self.clients[0])
self.assertFalse(key.isAuthorized(self.clients[1]))
store.commit()
- resp = self.post_add_authorization({
- "key-type": "UEFI",
- "fingerprint": key.fingerprint,
- "client-name": self.clients[1].name,
- })
+ resp = self.post_add_authorization(
+ {
+ "key-type": "UEFI",
+ "fingerprint": key.fingerprint,
+ "client-name": self.clients[1].name,
+ }
+ )
self.assertThat(resp, IsJSONResponse(MatchesDict({})))
self.assertTrue(key.isAuthorized(self.clients[1]))
self.assertNonceConsumed()
diff --git a/lp_signing/tests/test_webapp.py b/lp_signing/tests/test_webapp.py
index 9857263..6cfdba8 100644
--- a/lp_signing/tests/test_webapp.py
+++ b/lp_signing/tests/test_webapp.py
@@ -3,34 +3,25 @@
import json
from pathlib import Path
-from urllib.parse import (
- quote,
- urlparse,
- )
+from urllib.parse import quote, urlparse
from flask import make_response
from flask_storm import store
from testtools import TestCase
from testtools.matchers import MatchesStructure
-from lp_signing.tests.testfixtures import (
- AppFixture,
- DatabaseFixture,
- )
-from lp_signing.webapp import (
- app,
- health_check,
- )
+from lp_signing.tests.testfixtures import AppFixture, DatabaseFixture
+from lp_signing.webapp import app, health_check
class TestHealthCheck(TestCase):
-
def test_success(self):
self.useFixture(DatabaseFixture())
resp = make_response(health_check())
- self.assertThat(resp, MatchesStructure.byEquality(
- status_code=200,
- json={"ok": True}))
+ self.assertThat(
+ resp,
+ MatchesStructure.byEquality(status_code=200, json={"ok": True}),
+ )
def test_store_closed(self):
app_context = app.app_context()
@@ -38,13 +29,13 @@ class TestHealthCheck(TestCase):
self.addCleanup(app_context.pop)
store.close()
resp = make_response(health_check())
- self.assertThat(resp, MatchesStructure.byEquality(
- status_code=500,
- json={"ok": False}))
+ self.assertThat(
+ resp,
+ MatchesStructure.byEquality(status_code=500, json={"ok": False}),
+ )
class TestConfiguration(TestCase):
-
def test_database_uri(self):
fixture = self.useFixture(AppFixture())
root = Path(__file__).parent.parent.parent.resolve()
@@ -53,14 +44,18 @@ class TestConfiguration(TestCase):
MatchesStructure.byEquality(
scheme="postgres",
hostname=quote(str(root / "dev-db" / "localhost"), safe=""),
- path="/lp-signing"))
+ path="/lp-signing",
+ ),
+ )
def test_binds_no_standbys(self):
fixture = self.useFixture(AppFixture())
self.assertEqual({}, fixture.app.config["STORM_BINDS"])
def test_binds_with_standbys(self):
- fixture = self.useFixture(AppFixture(
- {"database": {"standby_urls": json.dumps(["sqlite:"])}}))
+ fixture = self.useFixture(
+ AppFixture({"database": {"standby_urls": json.dumps(["sqlite:"])}})
+ )
self.assertEqual(
- {"standby0": "sqlite:"}, fixture.app.config["STORM_BINDS"])
+ {"standby0": "sqlite:"}, fixture.app.config["STORM_BINDS"]
+ )
diff --git a/lp_signing/tests/testfixtures.py b/lp_signing/tests/testfixtures.py
index 766772a..112e3ea 100644
--- a/lp_signing/tests/testfixtures.py
+++ b/lp_signing/tests/testfixtures.py
@@ -6,14 +6,11 @@
import hashlib
import io
import json
-from pathlib import Path
import re
import subprocess
+from pathlib import Path
-from fixtures import (
- Fixture,
- MockPatch,
- )
+from fixtures import Fixture, MockPatch
from fixtures._fixtures import popen
from flask.testing import FlaskClient
from flask_storm import store
@@ -21,24 +18,14 @@ from nacl.encoding import Base64Encoder
from nacl.public import Box
from systemfixtures import FakeProcesses as _FakeProcesses
-from lp_signing import (
- config,
- webapp,
- )
+from lp_signing import config, webapp
from lp_signing.auth import boxed_authentication
-from lp_signing.model.client import (
- Client,
- ClientPublicKey,
- )
-from lp_signing.model.key import (
- Key,
- KeyAuthorization,
- )
+from lp_signing.model.client import Client, ClientPublicKey
+from lp_signing.model.key import Key, KeyAuthorization
from lp_signing.model.nonce import Nonce
class ConfigOverrideFixture(Fixture):
-
def __init__(self, new_config):
super().__init__()
self._new_config = new_config
@@ -48,7 +35,6 @@ class ConfigOverrideFixture(Fixture):
class DatabaseFixture(Fixture):
-
def _setUp(self):
self._app_context = webapp.app.app_context()
self._app_context.push()
@@ -66,7 +52,7 @@ class DatabaseFixture(Fixture):
ClientPublicKey,
Client,
Nonce,
- ]
+ ]
for model in models_to_remove:
store.find(model).remove()
store.commit()
@@ -75,50 +61,75 @@ class DatabaseFixture(Fixture):
class TestClient(FlaskClient):
"""A Flask client that understands JSON and boxed authentication."""
- def post(self, *args, headers=None, data=None, json_data=None,
- encrypted_data=None, private_key=None, nonce=None,
- response_nonce=None, **kwargs):
+ def post(
+ self,
+ *args,
+ headers=None,
+ data=None,
+ json_data=None,
+ encrypted_data=None,
+ private_key=None,
+ nonce=None,
+ response_nonce=None,
+ **kwargs,
+ ):
headers = dict(headers) if headers else {}
if json_data is not None:
if data is not None:
raise AssertionError(
- "Cannot pass both data and json_data") # pragma: no cover
+ "Cannot pass both data and json_data"
+ ) # pragma: no cover
if "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
data = json.dumps(json_data).encode("UTF-8")
if private_key is not None:
headers.setdefault(
"X-Client-Public-Key",
- private_key.public_key.encode(
- encoder=Base64Encoder).decode("UTF-8"))
+ private_key.public_key.encode(encoder=Base64Encoder).decode(
+ "UTF-8"
+ ),
+ )
if nonce is not None:
headers.setdefault(
- "X-Nonce", Base64Encoder.encode(nonce).decode("UTF-8"))
+ "X-Nonce", Base64Encoder.encode(nonce).decode("UTF-8")
+ )
if response_nonce is not None:
headers.setdefault(
"X-Response-Nonce",
- Base64Encoder.encode(response_nonce).decode("UTF-8"))
- if (private_key is not None and nonce is not None and
- encrypted_data is None):
+ Base64Encoder.encode(response_nonce).decode("UTF-8"),
+ )
+ if (
+ private_key is not None
+ and nonce is not None
+ and encrypted_data is None
+ ):
data = data or b""
box = Box(
private_key,
- boxed_authentication.service_private_keys[0].public_key)
+ boxed_authentication.service_private_keys[0].public_key,
+ )
message = box.encrypt(data, nonce, encoder=Base64Encoder)
headers["Content-Type"] = "application/x-boxed-json"
encrypted_data = message.ciphertext
else:
encrypted_data = data
resp = super().post(
- *args, headers=headers, data=encrypted_data, **kwargs)
- if (private_key is not None and response_nonce is not None and
- resp.mimetype == "application/x-boxed-json"):
+ *args, headers=headers, data=encrypted_data, **kwargs
+ )
+ if (
+ private_key is not None
+ and response_nonce is not None
+ and resp.mimetype == "application/x-boxed-json"
+ ):
box = Box(
private_key,
- boxed_authentication.service_private_keys[0].public_key)
+ boxed_authentication.service_private_keys[0].public_key,
+ )
resp.set_data(
box.decrypt(
- resp.get_data(), response_nonce, encoder=Base64Encoder))
+ resp.get_data(), response_nonce, encoder=Base64Encoder
+ )
+ )
resp.mimetype = "application/json"
# If there's a current transaction, roll it back. The POST may have
# committed a transaction in another thread, so anything we try to
@@ -128,7 +139,6 @@ class TestClient(FlaskClient):
class AppFixture(Fixture):
-
def __init__(self, nondefault_config={}):
"""Enable the engine to be recreated in tests where required."""
super().__init__()
@@ -153,7 +163,6 @@ class AppFixture(Fixture):
# the time we get to here, popen.FakeProcess has already been set to
# systemfixtures.processes.fixture._FakeProcessWithMissingAPIs.
class FakeProcessWithWorkingInput(popen.FakeProcess):
-
def communicate(self, input=None, timeout=None):
if self.stdin and input:
self.stdin.write(input)
@@ -161,20 +170,22 @@ class FakeProcessWithWorkingInput(popen.FakeProcess):
class FakeProcesses(_FakeProcesses):
-
def _setUp(self):
super()._setUp()
- self.useFixture(MockPatch(
- "fixtures._fixtures.popen.FakeProcess",
- FakeProcessWithWorkingInput))
+ self.useFixture(
+ MockPatch(
+ "fixtures._fixtures.popen.FakeProcess",
+ FakeProcessWithWorkingInput,
+ )
+ )
class FakeOpenSSL:
-
name = "openssl"
- def __init__(self, private_key, public_key, fingerprint,
- private_key_pkcs1=None):
+ def __init__(
+ self, private_key, public_key, fingerprint, private_key_pkcs1=None
+ ):
self.private_key = private_key
self.public_key = public_key
self.fingerprint = fingerprint
@@ -194,8 +205,10 @@ class FakeOpenSSL:
key_path = args[args.index("-keyout") + 1]
Path(key_path).write_bytes(self.private_key)
if "-out" in args:
- if ("-outform" not in args or
- args[args.index("-outform") + 1] != "PEM"):
+ if (
+ "-outform" not in args
+ or args[args.index("-outform") + 1] != "PEM"
+ ):
cert_path = args[args.index("-out") + 1]
Path(cert_path).write_bytes(self.public_key)
elif args[1] == "rsa":
@@ -212,7 +225,8 @@ class FakeOpenSSL:
if "-fingerprint" in args:
if self.fingerprint is not None:
fingerprint_colons = ":".join(
- re.findall("..", self.fingerprint))
+ re.findall("..", self.fingerprint)
+ )
output = f"SHA1 Fingerprint={fingerprint_colons}\n"
info["stdout"] = io.BytesIO(output.encode("UTF-8"))
else:
@@ -225,20 +239,24 @@ class FakeOpenSSL:
if "-out" in args:
public_key_path = args[args.index("-out") + 1]
Path(public_key_path).write_bytes(self.public_key)
- if ("-outform" in args and
- args[args.index("-outform") + 1] == "DER"):
+ if (
+ "-outform" in args
+ and args[args.index("-outform") + 1] == "DER"
+ ):
if self.fingerprint is not None:
# For plain RSA keys, Key._getRSAFingerprint computes
# the fingerprint in Python based on the DER encoding of
# the public key. Check that this matches
# self.fingerprint to avoid confusion.
- public_key_fingerprint = hashlib.sha1(
- self.public_key).hexdigest().upper()
+ public_key_fingerprint = (
+ hashlib.sha1(self.public_key).hexdigest().upper()
+ )
if public_key_fingerprint != self.fingerprint:
raise AssertionError(
f"Fingerprint of self.public_key does not match "
f"self.fingerprint ({public_key_fingerprint} != "
- f"{self.fingerprint}")
+ f"{self.fingerprint}"
+ )
info["stdout"] = io.BytesIO(self.public_key)
else:
info["returncode"] = 1
@@ -246,7 +264,6 @@ class FakeOpenSSL:
class FakeSBSign:
-
name = "sbsign"
def __init__(self, sig_data):
@@ -262,7 +279,6 @@ class FakeSBSign:
class FakeKmodSign:
-
name = "kmodsign"
def __init__(self, sig_data):
@@ -278,7 +294,6 @@ class FakeKmodSign:
class FakeMkimage:
-
name = "mkimage"
def __init__(self, sig_data):
@@ -294,7 +309,6 @@ class FakeMkimage:
class FakeOpenSSLSign:
-
name = "openssl"
def __init__(self, sig_data):
diff --git a/lp_signing/webapi.py b/lp_signing/webapi.py
index ff35566..e45232e 100644
--- a/lp_signing/webapi.py
+++ b/lp_signing/webapi.py
@@ -12,31 +12,19 @@ import base64
import binascii
import functools
-from acceptable import (
- AcceptableService,
- validate_body,
- validate_output,
- )
+import iso8601
+from acceptable import AcceptableService, validate_body, validate_output
from flask import request
from flask_storm import store
-import iso8601
from nacl.encoding import Base64Encoder
from lp_signing.auth import boxed_authentication
-from lp_signing.enums import (
- KeyType,
- OpenPGPKeyAlgorithm,
- SignatureMode,
- )
-from lp_signing.exceptions import (
- ClientNotAllowed,
- DataValidationError,
- )
+from lp_signing.enums import KeyType, OpenPGPKeyAlgorithm, SignatureMode
+from lp_signing.exceptions import ClientNotAllowed, DataValidationError
from lp_signing.model.client import Client
from lp_signing.model.key import Key
from lp_signing.model.nonce import Nonce
-
service = AcceptableService(__name__)
@@ -52,31 +40,36 @@ service_key_api = service.api("/service-key", "service_key", methods=["GET"])
@service_key_api.view(introduced_at="1.0")
-@validate_output({
- "type": "object",
- "properties": {"service-key": {"type": "string"}}, # base64
- "required": ["service-key"],
- "additionalProperties": False,
- })
+@validate_output(
+ {
+ "type": "object",
+ "properties": {"service-key": {"type": "string"}}, # base64
+ "required": ["service-key"],
+ "additionalProperties": False,
+ }
+)
def service_key():
"""Return the current preferred public key for this service."""
preferred_key = boxed_authentication.service_private_keys[0].public_key
return {
"service-key": preferred_key.encode(encoder=Base64Encoder).decode(
- "UTF-8"),
- }, 200
+ "UTF-8"
+ ),
+ }, 200
nonce_api = service.api("/nonce", "nonce", methods=["POST"])
@nonce_api.view(introduced_at="1.0")
-@validate_output({
- "type": "object",
- "properties": {"nonce": {"type": "string"}}, # base64
- "required": ["nonce"],
- "additionalProperties": False,
- })
+@validate_output(
+ {
+ "type": "object",
+ "properties": {"nonce": {"type": "string"}}, # base64
+ "required": ["nonce"],
+ "additionalProperties": False,
+ }
+)
def nonce():
"""Generate a nonce, used to make requests to other endpoints."""
nonce = Nonce.generate()
@@ -98,73 +91,85 @@ generate_api = service.api("/generate", "generate_key", methods=["POST"])
@generate_api.view(introduced_at="1.0")
@encrypted_response
-@validate_body({
- "type": "object",
- # XXX cjwatson 2020-07-02: This produces suboptimal error messages; once
- # acceptable supports JSON Schema draft 7, we can do a better job here
- # using conditional schemas.
- "oneOf": [
- {
- "properties": {
- "key-type": {
- "type": "string",
- "enum": [
- item.token for item in KeyType
- if item.value != KeyType.OPENPGP
+@validate_body(
+ {
+ "type": "object",
+ # XXX cjwatson 2020-07-02: This produces suboptimal error messages;
+ # once acceptable supports JSON Schema draft 7, we can do a better
+ # job here using conditional schemas.
+ "oneOf": [
+ {
+ "properties": {
+ "key-type": {
+ "type": "string",
+ "enum": [
+ item.token
+ for item in KeyType
+ if item.value != KeyType.OPENPGP
],
},
- "description": {"type": "string"},
+ "description": {"type": "string"},
},
- "required": ["key-type", "description"],
- "additionalProperties": False,
+ "required": ["key-type", "description"],
+ "additionalProperties": False,
},
- {
- "properties": {
- "key-type": {
- "type": "string",
- "enum": [KeyType.OPENPGP.name],
+ {
+ "properties": {
+ "key-type": {
+ "type": "string",
+ "enum": [KeyType.OPENPGP.name],
},
- "description": {"type": "string"},
- "openpgp-key-algorithm": {
- "type": "string",
- "enum": [item.token for item in OpenPGPKeyAlgorithm],
+ "description": {"type": "string"},
+ "openpgp-key-algorithm": {
+ "type": "string",
+ "enum": [item.token for item in OpenPGPKeyAlgorithm],
},
- "length": {"type": "integer"},
+ "length": {"type": "integer"},
},
- "required": [
- "key-type", "description", "openpgp-key-algorithm", "length",
+ "required": [
+ "key-type",
+ "description",
+ "openpgp-key-algorithm",
+ "length",
],
- "additionalProperties": False,
+ "additionalProperties": False,
},
],
- })
-@validate_output({
- "type": "object",
- "properties": {
- "fingerprint": {"type": "string"},
- "public-key": {"type": "string"}, # base64
+ }
+)
+@validate_output(
+ {
+ "type": "object",
+ "properties": {
+ "fingerprint": {"type": "string"},
+ "public-key": {"type": "string"}, # base64
},
- "required": ["fingerprint", "public-key"],
- })
+ "required": ["fingerprint", "public-key"],
+ }
+)
def generate_key():
payload = request.get_json()
key_type = KeyType.items[payload["key-type"]]
if "openpgp-key-algorithm" in payload:
openpgp_key_algorithm = OpenPGPKeyAlgorithm.items[
- payload["openpgp-key-algorithm"]]
+ payload["openpgp-key-algorithm"]
+ ]
else:
openpgp_key_algorithm = None
length = payload.get("length")
description = payload["description"]
key = Key.generate(
- key_type, description, openpgp_key_algorithm=openpgp_key_algorithm,
- length=length)
+ key_type,
+ description,
+ openpgp_key_algorithm=openpgp_key_algorithm,
+ length=length,
+ )
key.addAuthorization(request.client)
store.commit()
return {
"fingerprint": key.fingerprint,
"public-key": base64.b64encode(key.public_key).decode("UTF-8"),
- }, 201
+ }, 201
sign_api = service.api("/sign", "sign_message", methods=["POST"])
@@ -172,31 +177,41 @@ sign_api = service.api("/sign", "sign_message", methods=["POST"])
@sign_api.view(introduced_at="1.0")
@encrypted_response
-@validate_body({
- "type": "object",
- "properties": {
- "key-type": {
- "type": "string",
- "enum": [item.token for item in KeyType],
+@validate_body(
+ {
+ "type": "object",
+ "properties": {
+ "key-type": {
+ "type": "string",
+ "enum": [item.token for item in KeyType],
},
- "fingerprint": {"type": "string"},
- "message-name": {"type": "string"},
- "message": {"type": "string"}, # base64-encoded
- "mode": {
- "type": "string",
- "enum": [item.token for item in SignatureMode],
+ "fingerprint": {"type": "string"},
+ "message-name": {"type": "string"},
+ "message": {"type": "string"}, # base64-encoded
+ "mode": {
+ "type": "string",
+ "enum": [item.token for item in SignatureMode],
},
},
- "required": ["key-type", "fingerprint", "message-name", "message", "mode"],
- })
-@validate_output({
- "type": "object",
- "properties": {
- "public-key": {"type": "string"}, # base64
- "signed-message": {"type": "string"}, # base64
+ "required": [
+ "key-type",
+ "fingerprint",
+ "message-name",
+ "message",
+ "mode",
+ ],
+ }
+)
+@validate_output(
+ {
+ "type": "object",
+ "properties": {
+ "public-key": {"type": "string"}, # base64
+ "signed-message": {"type": "string"}, # base64
},
- "required": ["signed_message"],
- })
+ "required": ["signed_message"],
+ }
+)
def sign_message():
payload = request.get_json()
key_type = KeyType.items[payload["key-type"]]
@@ -204,19 +219,21 @@ def sign_message():
message_name = payload["message-name"]
try:
message = base64.b64decode(
- payload["message"].encode("UTF-8"), validate=True)
+ payload["message"].encode("UTF-8"), validate=True
+ )
except binascii.Error:
raise DataValidationError.single("Cannot decode message")
mode = SignatureMode.items[payload["mode"]]
key = Key.getByTypeAndFingerprint(key_type, fingerprint)
if not key.isAuthorized(request.client):
raise ClientNotAllowed.single(
- f"{request.client} is not allowed to use {key}")
+ f"{request.client} is not allowed to use {key}"
+ )
signed_message = key.sign(message_name, message, mode)
return {
"public-key": base64.b64encode(key.public_key).decode("UTF-8"),
"signed-message": base64.b64encode(signed_message).decode("UTF-8"),
- }, 200
+ }, 200
inject_api = service.api("/inject", "inject_key", methods=["POST"])
@@ -224,28 +241,37 @@ inject_api = service.api("/inject", "inject_key", methods=["POST"])
@inject_api.view(introduced_at="1.0")
@encrypted_response
-@validate_body({
- "type": "object",
- "properties": {
- "key-type": {
- "type": "string",
- "enum": [item.token for item in KeyType],
+@validate_body(
+ {
+ "type": "object",
+ "properties": {
+ "key-type": {
+ "type": "string",
+ "enum": [item.token for item in KeyType],
},
- "private-key": {"type": "string"},
- "public-key": {"type": "string"},
- "created-at": {"type": "string"},
- "description": {"type": "string"},
+ "private-key": {"type": "string"},
+ "public-key": {"type": "string"},
+ "created-at": {"type": "string"},
+ "description": {"type": "string"},
},
- "required": ["key-type", "private-key", "public-key",
- "created-at", "description"],
- })
-@validate_output({
- "type": "object",
- "properties": {
- "fingerprint": {"type": "string"},
+ "required": [
+ "key-type",
+ "private-key",
+ "public-key",
+ "created-at",
+ "description",
+ ],
+ }
+)
+@validate_output(
+ {
+ "type": "object",
+ "properties": {
+ "fingerprint": {"type": "string"},
},
- "required": ["fingerprint"],
- })
+ "required": ["fingerprint"],
+ }
+)
def inject_key():
payload = request.get_json()
key_type = KeyType.items[payload["key-type"]]
@@ -254,9 +280,11 @@ def inject_key():
try:
private_key = base64.b64decode(
- payload["private-key"].encode("UTF-8"), validate=True)
+ payload["private-key"].encode("UTF-8"), validate=True
+ )
public_key = base64.b64decode(
- payload["public-key"].encode("UTF-8"), validate=True)
+ payload["public-key"].encode("UTF-8"), validate=True
+ )
except binascii.Error:
raise DataValidationError.single("Cannot decode message")
@@ -264,37 +292,42 @@ def inject_key():
created_at = iso8601.parse_date(created_at)
except iso8601.ParseError:
raise DataValidationError.single(
- "created-at '%s' is not a valid datetime format" % created_at)
+ "created-at '%s' is not a valid datetime format" % created_at
+ )
- key = Key.inject(key_type, private_key, public_key,
- description, created_at)
+ key = Key.inject(
+ key_type, private_key, public_key, description, created_at
+ )
key.addAuthorization(request.client)
store.commit()
return {
"fingerprint": key.fingerprint,
- }, 200
+ }, 200
add_authorization_api = service.api(
- "/authorizations/add", "add_authorization", methods=["POST"])
+ "/authorizations/add", "add_authorization", methods=["POST"]
+)
@add_authorization_api.view(introduced_at="1.0")
@encrypted_response
-@validate_body({
- "type": "object",
- "properties": {
- "key-type": {
- "type": "string",
- "enum": [item.token for item in KeyType],
+@validate_body(
+ {
+ "type": "object",
+ "properties": {
+ "key-type": {
+ "type": "string",
+ "enum": [item.token for item in KeyType],
},
- "fingerprint": {"type": "string"},
- "client-name": {"type": "string"},
+ "fingerprint": {"type": "string"},
+ "client-name": {"type": "string"},
},
- "required": ["key-type", "fingerprint", "client-name"],
- })
+ "required": ["key-type", "fingerprint", "client-name"],
+ }
+)
@validate_output({"type": "object"})
def add_authorization():
payload = request.get_json()
@@ -305,10 +338,12 @@ def add_authorization():
client = Client.getByName(client_name)
if client is None:
raise DataValidationError.single(
- f"No registered client named {client_name}")
+ f"No registered client named {client_name}"
+ )
if not key.isAuthorized(request.client):
raise ClientNotAllowed.single(
- f"{request.client} is not allowed to use {key}")
+ f"{request.client} is not allowed to use {key}"
+ )
key.addAuthorization(client)
store.commit()
return {}, 200
diff --git a/lp_signing/webapp.py b/lp_signing/webapp.py
index 5ba5158..ed39e4c 100644
--- a/lp_signing/webapp.py
+++ b/lp_signing/webapp.py
@@ -4,28 +4,20 @@
"""Entry point for the Launchpad signing service."""
import json
+import logging
-from acceptable import DataValidationError
import flask
-from flask_storm import (
- FlaskStorm,
- store,
- )
-import logging
-from storm.locals import StormError
-import talisker.statsd
import talisker.flask
+import talisker.statsd
+from acceptable import DataValidationError
+from flask_storm import FlaskStorm, store
+from storm.locals import StormError
from lp_signing import webapi
-from lp_signing.auth import (
- boxed_authentication,
- BoxedRequest,
- BoxedResponse,
- )
+from lp_signing.auth import BoxedRequest, BoxedResponse, boxed_authentication
from lp_signing.config import read_config
from lp_signing.exceptions import APIError
-
_log = logging.getLogger(__name__)
@@ -101,8 +93,12 @@ def create_web_application():
if statsd_client is not None:
if flask.request.endpoint is not None:
name = ".".join(
- ("views", flask.request.endpoint, flask.request.method,
- str(response.status_code))
+ (
+ "views",
+ flask.request.endpoint,
+ flask.request.method,
+ str(response.status_code),
+ )
)
flask._request_ctx_stack.top.timer.stat = name
flask._request_ctx_stack.top.timer.stop()
diff --git a/publish-to-swift b/publish-to-swift
index e5cfd59..21264a5 100755
--- a/publish-to-swift
+++ b/publish-to-swift
@@ -2,11 +2,11 @@
"""Publish a built tarball to Swift for deployment."""
-from argparse import ArgumentParser
import os
import re
import subprocess
import sys
+from argparse import ArgumentParser
def ensure_container_privs(container_name):
@@ -23,15 +23,20 @@ def get_swift_storage_url():
# with swiftclient.
auth = subprocess.run(
["swift", "auth"],
- stdout=subprocess.PIPE, check=True,
- universal_newlines=True).stdout.splitlines()
+ stdout=subprocess.PIPE,
+ check=True,
+ universal_newlines=True,
+ ).stdout.splitlines()
return [
- line.split("=", 1)[1] for line in auth
- if line.startswith("export OS_STORAGE_URL=")][0]
+ line.split("=", 1)[1]
+ for line in auth
+ if line.startswith("export OS_STORAGE_URL=")
+ ][0]
-def publish_file_to_swift(container_name, object_path, local_path,
- overwrite=True):
+def publish_file_to_swift(
+ container_name, object_path, local_path, overwrite=True
+):
"""Publish a file to a Swift container."""
storage_url = get_swift_storage_url()
@@ -41,32 +46,55 @@ def publish_file_to_swift(container_name, object_path, local_path,
try:
stats = subprocess.run(
["swift", "stat", container_name, object_path],
- stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, check=True,
- universal_newlines=True).stdout
+ stdout=subprocess.PIPE,
+ stderr=subprocess.DEVNULL,
+ check=True,
+ universal_newlines=True,
+ ).stdout
if re.search(
- r"Object: %s$" % re.escape(object_path), stats, flags=re.M):
+ r"Object: %s$" % re.escape(object_path), stats, flags=re.M
+ ):
already_published = True
except subprocess.CalledProcessError:
pass
if already_published:
- print("Object {} already published to {}.".format(
- object_path, container_name))
+ print(
+ "Object {} already published to {}.".format(
+ object_path, container_name
+ )
+ )
if not overwrite:
return
- print("Publishing {} to {} as {}.".format(
- local_path, container_name, object_path))
+ print(
+ "Publishing {} to {} as {}.".format(
+ local_path, container_name, object_path
+ )
+ )
try:
subprocess.run(
- ["swift", "upload", "--object-name", object_path,
- container_name, local_path])
+ [
+ "swift",
+ "upload",
+ "--object-name",
+ object_path,
+ container_name,
+ local_path,
+ ]
+ )
except subprocess.CalledProcessError:
- sys.exit("Failed to upload {} to {} as {}".format(
- local_path, container_name, object_path))
+ sys.exit(
+ "Failed to upload {} to {} as {}".format(
+ local_path, container_name, object_path
+ )
+ )
- print("Published file: {}/{}/{}".format(
- storage_url, container_name, object_path))
+ print(
+ "Published file: {}/{}/{}".format(
+ storage_url, container_name, object_path
+ )
+ )
def main():
@@ -105,7 +133,7 @@ def main():
"OS_USER_DOMAIN_ID",
"OS_USER_DOMAIN_NAME",
"OS_USER_ID",
- }
+ }
for key, value in sorted(os.environ.items()):
if key.startswith("OS_"):
if key not in safe_keys:
@@ -115,8 +143,11 @@ def main():
overwrite = "FORCE_REBUILD" in os.environ
ensure_container_privs(args.container_name)
publish_file_to_swift(
- args.container_name, args.swift_object_path, args.local_path,
- overwrite=overwrite)
+ args.container_name,
+ args.swift_object_path,
+ args.local_path,
+ overwrite=overwrite,
+ )
if __name__ == "__main__":
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..10ee528
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,7 @@
+[tool.black]
+line-length = 79
+
+[tool.isort]
+known_first_party = ["lp_signing"]
+line_length = 79
+profile = "black"
diff --git a/setup.cfg b/setup.cfg
index 6a93b1a..feb8a3d 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,9 +1,5 @@
[flake8]
ignore =
- # will be fixed by using black
- E121
- E123
- E126
# incompatible with black
E203
# binary operator on same or next line; mutually exclusive
diff --git a/setup.py b/setup.py
index ac4020f..a76c95c 100755
--- a/setup.py
+++ b/setup.py
@@ -3,11 +3,7 @@
# Copyright 2017-2019 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
-from setuptools import (
- find_packages,
- setup,
- )
-
+from setuptools import find_packages, setup
requires = [
"acceptable",
@@ -20,7 +16,7 @@ requires = [
"pytz",
"storm",
"talisker[flask,pg]",
- ]
+]
test_requires = [
"coverage",
"cryptography",
@@ -29,7 +25,7 @@ test_requires = [
"systemfixtures",
"testresources",
"testtools",
- ]
+]
setup(
name="lp-signing",
@@ -48,7 +44,7 @@ setup(
"License :: OSI Approved :: GNU Affero General Public License v3",
"Operating System :: OS Independent",
"Programming Language :: Python",
- ],
+ ],
install_requires=requires,
tests_require=test_requires,
extras_require={"test": test_requires},
@@ -56,6 +52,6 @@ setup(
entry_points={
"console_scripts": [
"lp-signing = lp_signing.cli:cli",
- ],
- },
- )
+ ],
+ },
+)