launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #24957
[Merge] ~cjwatson/lp-signing:openpgp into lp-signing:master
Colin Watson has proposed merging ~cjwatson/lp-signing:openpgp into lp-signing:master with ~cjwatson/lp-signing:signature-mode-clear as a prerequisite.
Commit message:
Add OpenPGP support
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/lp-signing/+git/lp-signing/+merge/386810
The gpg package doesn't support normal installation within a virtualenv, and needs to match the rest of the GnuPG stack, so it's simplest to symlink it in from a system package.
This currently only supports RSA keys, although adding other key algorithms in future should be straightforward.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/lp-signing:openpgp into lp-signing:master.
diff --git a/Makefile b/Makefile
index 0fc6bdd..c5c3751 100644
--- a/Makefile
+++ b/Makefile
@@ -9,6 +9,9 @@ BIND ?= 0.0.0.0:8000
GUNICORN = $(ENV)/bin/talisker
TESTS ?= $(SERVICE_PACKAGE).tests.test_suite
+SITE_PACKAGES := \
+ $$($(PYTHON3) -c 'from distutils.sysconfig import get_python_lib; print(get_python_lib())')
+
# development config
export SERVICE_CONFIG = $(CURDIR)/service.conf
@@ -40,6 +43,7 @@ SWIFT_OBJECT_PATH = \
$(ENV)/prod: | $(DEPENDENCY_DIR)
virtualenv $(ENV) --python=python3
$(PIP) install -f $(DEPENDENCY_DIR) --no-index -c requirements.txt -e .
+ ./link-system-packages.py "$(SITE_PACKAGES)" system-packages.txt
git rev-parse HEAD >version-info.txt
@touch $@
diff --git a/charm/lp-signing/layer.yaml b/charm/lp-signing/layer.yaml
index fcded8f..2ea516b 100644
--- a/charm/lp-signing/layer.yaml
+++ b/charm/lp-signing/layer.yaml
@@ -4,7 +4,9 @@ includes:
options:
apt:
packages:
+ - gnupg
- openssl
+ - python3-gpg
- sbsigntool
- u-boot-tools
ols:
diff --git a/charm/lp-signing/reactive/lp-signing.py b/charm/lp-signing/reactive/lp-signing.py
index 3e46c23..a563e26 100644
--- a/charm/lp-signing/reactive/lp-signing.py
+++ b/charm/lp-signing/reactive/lp-signing.py
@@ -3,6 +3,8 @@
from __future__ import absolute_import, print_function, unicode_literals
+import os.path
+import subprocess
from urllib.parse import (
urlparse,
urlunparse,
@@ -70,6 +72,14 @@ def configure(pgsql):
if pgsql.master is None:
return
+ site_packages = subprocess.check_output(
+ [os.path.join(base.code_dir(), 'env', 'bin', 'python'), '-c',
+ 'from distutils.sysconfig import get_python_lib; '
+ 'print(get_python_lib())'], universal_newlines=True).rstrip('\n')
+ subprocess.check_call(
+ [os.path.join(base.code_dir(), 'link-system-packages.py'),
+ site_packages, os.path.join(base.code_dir(), 'system-packages.txt')])
+
config = hookenv.config()
config_path = base.service_config_path()
svc_config = dict(config)
diff --git a/dependencies.txt b/dependencies.txt
index b4a682f..eebebee 100644
--- a/dependencies.txt
+++ b/dependencies.txt
@@ -1,3 +1,5 @@
+gnupg
openssl
+python3-gpg
sbsigntool
u-boot-tools
diff --git a/link-system-packages.py b/link-system-packages.py
new file mode 100755
index 0000000..d557835
--- /dev/null
+++ b/link-system-packages.py
@@ -0,0 +1,59 @@
+#! /usr/bin/python3
+
+# Copyright 2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Link system-installed Python modules into lp-signing's virtualenv."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+from argparse import ArgumentParser
+from distutils.sysconfig import get_python_lib
+import importlib
+import os.path
+import re
+
+
+def link_module(name, virtualenv_libdir, optional=False):
+ try:
+ module = importlib.import_module(name)
+ except ImportError:
+ if optional:
+ print("Skipping missing optional module %s." % name)
+ return
+ else:
+ raise
+ path = module.__file__
+ if os.path.basename(path).startswith("__init__."):
+ path = os.path.dirname(path)
+ system_libdir = get_python_lib(plat_specific=path.endswith(".so"))
+ if os.path.commonprefix([path, system_libdir]) != system_libdir:
+ raise RuntimeError(
+ "%s imported from outside %s (%s)" % (name, system_libdir, path))
+ target_path = os.path.join(
+ virtualenv_libdir, os.path.relpath(path, system_libdir))
+ if os.path.lexists(target_path) and os.path.islink(target_path):
+ os.unlink(target_path)
+ os.symlink(path, target_path)
+
+
+def main():
+ parser = ArgumentParser()
+ parser.add_argument("virtualenv_libdir")
+ parser.add_argument("module_file", type=open)
+ args = parser.parse_args()
+
+ for line in args.module_file:
+ line = re.sub(r"#.*", "", line).strip()
+ if not line:
+ continue
+ if line.endswith("?"):
+ line = line[:-1]
+ optional = True
+ else:
+ optional = False
+ link_module(line, args.virtualenv_libdir, optional=optional)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/lp_signing/enums.py b/lp_signing/enums.py
index 9318223..8d020d4 100644
--- a/lp_signing/enums.py
+++ b/lp_signing/enums.py
@@ -10,6 +10,7 @@ from lazr.enum import (
__all__ = [
"KeyType",
+ "OpenPGPKeyAlgorithm",
"SignatureMode",
]
@@ -46,6 +47,21 @@ class KeyType(DBEnumeratedType):
A signing key for U-Boot Flat Image Tree images.
""")
+ OPENPGP = DBItem(6, """
+ OpenPGP
+
+ An OpenPGP signing key.
+ """)
+
+
+class OpenPGPKeyAlgorithm(EnumeratedType):
+
+ RSA = Item("""
+ RSA
+
+ A Rivest-Shamir-Adleman key.
+ """)
+
class SignatureMode(EnumeratedType):
diff --git a/lp_signing/exceptions.py b/lp_signing/exceptions.py
index b7c9a0d..eab295d 100644
--- a/lp_signing/exceptions.py
+++ b/lp_signing/exceptions.py
@@ -88,6 +88,12 @@ class KeyGenerationError(APIError):
status_code = 500
+class KeyImportError(APIError):
+ """Importing a key failed."""
+
+ status_code = 500
+
+
class ClientNotAllowed(APIError):
"""The client is not authorized to use this key."""
diff --git a/lp_signing/model/key.py b/lp_signing/model/key.py
index a1363ea..741e6de 100644
--- a/lp_signing/model/key.py
+++ b/lp_signing/model/key.py
@@ -11,6 +11,7 @@ import base64
from contextlib import contextmanager
import json
import logging
+import os
from pathlib import Path
import re
import shutil
@@ -18,8 +19,10 @@ import subprocess
from subprocess import CalledProcessError
from tempfile import TemporaryDirectory
from textwrap import dedent
+import time
from flask_storm import store
+import gpg
from nacl.public import PrivateKey
import pytz
from storm.locals import (
@@ -38,11 +41,13 @@ from lp_signing.database.constants import DEFAULT
from lp_signing.database.enumcol import DBEnum
from lp_signing.enums import (
KeyType,
+ OpenPGPKeyAlgorithm,
SignatureMode,
)
from lp_signing.exceptions import (
Conflict,
KeyGenerationError,
+ KeyImportError,
NoSuchKey,
SignatureError,
UnsupportedSignatureMode,
@@ -111,6 +116,89 @@ def _log_subprocess_run(args, **kwargs):
return process
+_gpg_conf = dedent("""\
+ # Avoid wasting time verifying the local keyring's consistency.
+ no-auto-check-trustdb
+
+ # Prefer a SHA-2 hash where possible.
+ personal-digest-preferences SHA512 SHA384 SHA256 SHA224
+ """)
+
+
+# Based loosely on gpgme/lang/python/tests/support.py:EphemeralContext.
+@contextmanager
+def _gpg_context(tmp):
+ """An ephemeral GPG context, as a context manager."""
+ home = tmp / "gpg"
+ home.mkdir(mode=0o700)
+ try:
+ (home / "gpg.conf").write_text(_gpg_conf)
+ (home / "gpg-agent.conf").write_text(dedent(f"""\
+ disable-scdaemon
+ """))
+
+ with gpg.Context(home_dir=str(home)) as ctx:
+ ctx.armor = True
+ yield ctx
+
+ # Ask the agent to quit.
+ agent_socket = home / "S.gpg-agent"
+ ctx.protocol = gpg.constants.protocol.ASSUAN
+ ctx.set_engine_info(ctx.protocol, file_name=str(agent_socket))
+ try:
+ ctx.assuan_transact(["KILLAGENT"])
+ except gpg.errors.GPGMEError as e:
+ if e.getcode() == gpg.errors.ASS_CONNECT_FAILED:
+ # The agent was not running.
+ pass
+ else: # pragma: no cover
+ raise
+
+ # Block until it is really gone.
+ while agent_socket.exists():
+ time.sleep(.01)
+ finally:
+ shutil.rmtree(home, ignore_errors=True)
+
+
+def _gpg_key_import(ctx, data):
+ """Import data into a GPG context."""
+ # XXX cjwatson 2020-07-02: This is mostly copied from gpgme. We can
+ # remove this once we have python3-gpg >= 1.12.0.
+ try:
+ ctx.op_import(data)
+ result = ctx.op_import_result()
+ if result.considered == 0:
+ status = gpg.constants.STATUS_IMPORT_PROBLEM
+ else:
+ status = gpg.constants.STATUS_KEY_CONSIDERED
+ except gpg.errors.GPGMEError as e: # pragma: no cover
+ if e.code_str == "No data":
+ status = gpg.constants.STATUS_NODATA
+ else:
+ status = gpg.constants.STATUS_FILE_ERROR
+ except TypeError: # pragma: no cover
+ if hasattr(data, "decode"):
+ status = gpg.constants.STATUS_NO_PUBKEY
+ elif hasattr(data, "encode"):
+ status = gpg.constants.STATUS_FILE_ERROR
+ else:
+ status = gpg.constants.STATUS_ERROR
+ except Exception: # pragma: no cover
+ status = gpg.constants.STATUS_ERROR
+
+ if status == gpg.constants.STATUS_KEY_CONSIDERED:
+ import_result = result
+ else:
+ import_result = status
+
+ return import_result
+
+
+class GPGError(Exception):
+ pass
+
+
class KeyStorageEncryptedContainer(NaClEncryptedContainerBase):
@property
@@ -286,6 +374,48 @@ class Key(Storm):
return pem.read_bytes(), cert.read_bytes()
@classmethod
+ def _generateOpenPGP(cls, ctx, openpgp_key_algorithm, length, name):
+ """Generate a new OpenPGP key pair.
+
+ :param ctx: A GPG context.
+ :param openpgp_key_algorithm: The `OpenPGPKeyAlgorithm` to use.
+ :param length: The key length.
+ :param name: The name for the new key.
+ :return: A tuple of (private key, public_key, fingerprint).
+ """
+ if openpgp_key_algorithm == OpenPGPKeyAlgorithm.RSA:
+ algorithm = "rsa%d" % length
+ else:
+ raise KeyGenerationError.single(
+ f"Unknown OpenPGP key algorithm {openpgp_key_algorithm.name}")
+ key = ctx.create_key(
+ name, algorithm=algorithm, expires=False, sign=True)
+ if not key.primary:
+ raise GPGError("Secret key generation failed.")
+ if key.sub:
+ raise GPGError(
+ "Got an encryption subkey despite not asking for one.")
+
+ # XXX cjwatson 2020-07-02: This could be simplified using
+ # ctx.key_export_secret and ctx.key_export_minimal once we have
+ # python3-gpg >= 1.12.0.
+ secret_data = gpg.Data()
+ ctx.op_export(key.fpr, gpg.constants.EXPORT_MODE_SECRET, secret_data)
+ secret_data.seek(0, os.SEEK_SET)
+ export_secret_result = secret_data.read()
+ if not export_secret_result:
+ raise GPGError("Failed to export secret key.")
+
+ public_data = gpg.Data()
+ ctx.op_export(key.fpr, gpg.constants.EXPORT_MODE_MINIMAL, public_data)
+ public_data.seek(0, os.SEEK_SET)
+ export_public_result = public_data.read()
+ if not export_public_result:
+ raise GPGError("Failed to export public key.")
+
+ return export_secret_result, export_public_result, key.fpr
+
+ @classmethod
def _getX509Fingerprint(cls, key_type, public_key):
"""Get the fingerprint of an X509 "public key" (i.e. certificate).
@@ -303,11 +433,31 @@ class Key(Storm):
output.decode("UTF-8").rstrip("\n").split("=")[1].replace(":", ""))
@classmethod
- def generate(cls, key_type, description):
+ def _getOpenPGPFingerprint(cls, ctx, public_key):
+ """Get the fingerprint of an OpenPGP public key.
+
+ :param ctx: A GPG context.
+ :param public_key: The public key (`bytes`).
+ :return: The fingerprint (`str`).
+ """
+ result = _gpg_key_import(ctx, public_key)
+ if not getattr(result, "imported", 0):
+ _log.error("Failed to get fingerprint of new key: %s", result)
+ raise KeyImportError.single(
+ f"Failed to get fingerprint of new key: {result}")
+ return result.imports[0].fpr
+
+ @classmethod
+ def generate(cls, key_type, description, openpgp_key_algorithm=None,
+ length=None):
"""Generate a new key for an archive.
:param key_type: The `KeyType` to generate.
:param description: A description of the key (`str`).
+ :param openpgp_key_algorithm: The `OpenPGPKeyAlgorithm` to use (only
+ for `key_type=KeyType.OPENPGP`).
+ :param length: The key length (currently only for
+ `key_type=KeyType.OPENPGP`).
:return: A new `Key`.
"""
common_name = cls._generateKeyCommonName(description, str(key_type))
@@ -317,18 +467,31 @@ class Key(Storm):
if key_type in (KeyType.UEFI, KeyType.FIT):
private_key, public_key = cls._generateKeyCertPair(
tmp, key_type, common_name)
- else:
+ elif key_type in (KeyType.KMOD, KeyType.OPAL, KeyType.SIPL):
private_key, public_key = cls._generatePEMX509(
tmp, key_type, common_name)
- except subprocess.CalledProcessError as e:
+ elif key_type == KeyType.OPENPGP:
+ with _gpg_context(tmp) as ctx:
+ private_key, public_key, fingerprint = (
+ cls._generateOpenPGP(
+ ctx, openpgp_key_algorithm, length,
+ description))
+ else:
+ raise KeyGenerationError.single(
+ f"Unknown key type {key_type.name}")
+ except KeyGenerationError:
+ raise
+ except Exception as e:
_log.error("Failed to generate key: %s", e)
raise KeyGenerationError.single(f"Failed to generate key: {e}")
- try:
- fingerprint = cls._getX509Fingerprint(key_type, public_key)
- except subprocess.CalledProcessError as e:
- _log.error("Failed to get fingerprint of new key: %s", e)
- raise KeyGenerationError.single(
- f"Failed to get fingerprint of new key: {e}")
+ if key_type != KeyType.OPENPGP:
+ try:
+ fingerprint = cls._getX509Fingerprint(
+ key_type, public_key)
+ except subprocess.CalledProcessError as e:
+ _log.error("Failed to get fingerprint of new key: %s", e)
+ raise KeyGenerationError.single(
+ f"Failed to get fingerprint of new key: {e}")
_log.info("Generated new key with fingerprint %s", fingerprint)
return cls.new(key_type, fingerprint, private_key, public_key)
@@ -347,13 +510,17 @@ class Key(Storm):
:return: The injected `Key`.
"""
_log.info("Injecting %s key for %s", key_type, description)
- try:
- fingerprint = cls._getX509Fingerprint(key_type, public_key)
- except subprocess.CalledProcessError as e:
- _log.error("Failed to get fingerprint of new key: %s", e)
- raise KeyGenerationError.single(
+ if key_type == KeyType.OPENPGP:
+ with _temporary_path() as tmp, _gpg_context(Path(tmp)) as ctx:
+ fingerprint = cls._getOpenPGPFingerprint(ctx, public_key)
+ else:
+ try:
+ fingerprint = cls._getX509Fingerprint(key_type, public_key)
+ except subprocess.CalledProcessError as e:
+ _log.error("Failed to get fingerprint of new key: %s", e)
+ raise KeyImportError.single(
f"Failed to get fingerprint of new key: {e}")
- _log.info("Injected new key with fingerprint %s", fingerprint)
+ _log.info("Injecting new key with fingerprint %s", fingerprint)
try:
key = Key.getByTypeAndFingerprint(key_type, fingerprint)
if (key.getPrivateKey() == private_key) and (
@@ -408,6 +575,42 @@ class Key(Storm):
"""
return KeyAuthorization.get(self, client) is not None
+ def _signOpenPGP(self, tmp, message_name, message, mode):
+ """Sign a message with this OpenPGP key.
+
+ :param tmp: A `Path` to a temporary directory.
+ :param message_name: The base name of the message file to sign
+ (`str`).
+ :param message: The message to sign (`bytes`).
+ :param mode: A `SignatureMode`.
+ :return: The signed message (`bytes`).
+ """
+ if mode == SignatureMode.ATTACHED:
+ sig_mode = gpg.constants.SIG_MODE_NORMAL
+ elif mode == SignatureMode.DETACHED:
+ sig_mode = gpg.constants.SIG_MODE_DETACH
+ elif mode == SignatureMode.CLEAR:
+ sig_mode = gpg.constants.SIG_MODE_CLEAR
+ else:
+ raise UnsupportedSignatureMode.single(
+ f"Signature mode {mode.name} not supported with "
+ f"{self.key_type.name}")
+ _log.info(
+ "Making %s signature of %s with %s key %s",
+ mode.name.lower(), message_name, self.key_type, self.fingerprint)
+ with _gpg_context(tmp) as ctx:
+ import_result = _gpg_key_import(ctx, self.getPrivateKey())
+ if not getattr(import_result, "secret_imported", 0):
+ raise SignatureError.single(
+ "Failed to import stored OpenPGP key")
+ gpg_key = ctx.keylist(pattern=self.fingerprint, secret=True)
+ ctx.signers = list(gpg_key)
+ try:
+ return ctx.sign(message, mode=sig_mode)[0]
+ except gpg.errors.GpgError as e:
+ _log.error("Failed to sign message: %s", e)
+ raise SignatureError.single(f"Failed to sign message: {e}")
+
def sign(self, message_name, message, mode):
"""Sign a message with this key.
@@ -419,6 +622,11 @@ class Key(Storm):
:return: The signed message (`bytes`).
"""
with _temporary_path() as tmp:
+ if self.key_type == KeyType.OPENPGP:
+ # This works sufficiently differently from other key types
+ # that we handle it in a separate method.
+ return self._signOpenPGP(tmp, message_name, message, mode)
+
key = tmp / f"{self.key_type.name.lower()}.key"
key.write_bytes(self.getPrivateKey())
cert = tmp / f"{self.key_type.name.lower()}.crt"
diff --git a/lp_signing/model/tests/test_key.py b/lp_signing/model/tests/test_key.py
index 1e59fa5..0509593 100644
--- a/lp_signing/model/tests/test_key.py
+++ b/lp_signing/model/tests/test_key.py
@@ -5,13 +5,18 @@
import re
from datetime import datetime, timezone
+from pathlib import Path
+from tempfile import TemporaryDirectory
+from fixtures import MockPatch
from flask_storm import store
+import gpg
from testtools import TestCase
from testtools.matchers import (
AfterPreprocessing,
EndsWith,
Equals,
+ HasLength,
Is,
MatchesListwise,
MatchesRegex,
@@ -22,15 +27,22 @@ from testtools.matchers import (
from lp_signing.database.helpers import get_transaction_timestamp
from lp_signing.enums import (
KeyType,
+ OpenPGPKeyAlgorithm,
SignatureMode,
)
from lp_signing.exceptions import (
NoSuchKey,
UnsupportedSignatureMode,
)
-from lp_signing.model.key import Key
+from lp_signing.model.key import (
+ _gpg_context,
+ Key,
+ )
from lp_signing.tests import factory
-from lp_signing.tests.matchers import RanCommand
+from lp_signing.tests.matchers import (
+ MatchesOpenPGPKey,
+ RanCommand,
+ )
from lp_signing.tests.testfixtures import (
ConfigOverrideFixture,
DatabaseFixture,
@@ -315,6 +327,48 @@ class TestKey(TestCase):
Equals(public_key))),
]))
+ def test_generate_openpgp(self):
+ key = Key.generate(
+ KeyType.OPENPGP, "~signing-owner/ubuntu/testing",
+ openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA, length=1024)
+ now = get_transaction_timestamp(store)
+ self.assertThat(key, MatchesStructure(
+ key_type=Equals(KeyType.OPENPGP),
+ fingerprint=HasLength(40),
+ public_key=MatchesOpenPGPKey(
+ MatchesStructure(
+ fpr=Equals(key.fingerprint),
+ secret=Equals(0),
+ subkeys=MatchesListwise([
+ MatchesStructure.byEquality(
+ pubkey_algo=1,
+ length=1024),
+ ]),
+ uids=MatchesListwise([
+ MatchesStructure.byEquality(
+ uid="~signing-owner/ubuntu/testing"),
+ ]))),
+ created_at=Equals(now),
+ updated_at=Equals(now)))
+ self.assertThat(
+ key.getPrivateKey(),
+ MatchesOpenPGPKey(
+ MatchesStructure(
+ fpr=Equals(key.fingerprint),
+ secret=Equals(1),
+ subkeys=MatchesListwise([
+ MatchesStructure.byEquality(
+ pubkey_algo=1,
+ length=1024),
+ ]),
+ uids=MatchesListwise([
+ MatchesStructure.byEquality(
+ uid="~signing-owner/ubuntu/testing"),
+ ])),
+ secret=True))
+ self.assertEqual(
+ key, Key.getByTypeAndFingerprint(KeyType.OPENPGP, key.fingerprint))
+
def test_addAuthorization(self):
key = factory.create_key()
clients = [factory.create_client() for _ in range(2)]
@@ -495,6 +549,42 @@ class TestKey(TestCase):
UnsupportedSignatureMode,
key.sign, "t.fit", b"test data", SignatureMode.CLEAR)
+ def test_sign_openpgp_attached(self):
+ key = Key.generate(
+ KeyType.OPENPGP, "~signing-owner/ubuntu/testing",
+ openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA, length=1024)
+ mock_sign = self.useFixture(MockPatch(
+ "gpg.Context.sign", return_value=(b"test signed data", None))).mock
+ self.assertEqual(
+ b"test signed data",
+ key.sign("t.gpg", b"test data", SignatureMode.ATTACHED))
+ mock_sign.assert_called_once_with(
+ b"test data", mode=gpg.constants.SIG_MODE_NORMAL)
+
+ def test_sign_openpgp_detached(self):
+ key = Key.generate(
+ KeyType.OPENPGP, "~signing-owner/ubuntu/testing",
+ openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA, length=1024)
+ mock_sign = self.useFixture(MockPatch(
+ "gpg.Context.sign", return_value=(b"test signed data", None))).mock
+ self.assertEqual(
+ b"test signed data",
+ key.sign("t.gpg", b"test data", SignatureMode.DETACHED))
+ mock_sign.assert_called_once_with(
+ b"test data", mode=gpg.constants.SIG_MODE_DETACH)
+
+ def test_sign_openpgp_clear(self):
+ key = Key.generate(
+ KeyType.OPENPGP, "~signing-owner/ubuntu/testing",
+ openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA, length=1024)
+ mock_sign = self.useFixture(MockPatch(
+ "gpg.Context.sign", return_value=(b"test signed data", None))).mock
+ self.assertEqual(
+ b"test signed data",
+ key.sign("t.gpg", b"test data", SignatureMode.CLEAR))
+ mock_sign.assert_called_once_with(
+ b"test data", mode=gpg.constants.SIG_MODE_CLEAR)
+
def test_inject_uefi(self):
private_key = factory.generate_random_bytes(size=64)
public_key = factory.generate_random_bytes(size=64)
@@ -649,3 +739,24 @@ class TestKey(TestCase):
fingerprint_args
)
]))
+
+ def test_inject_openpgp(self):
+ with TemporaryDirectory() as tmp, _gpg_context(Path(tmp)) as ctx:
+ # Generate a test key, but do not store it in the database.
+ private_key, public_key, fingerprint = Key._generateOpenPGP(
+ ctx, OpenPGPKeyAlgorithm.RSA, 1024,
+ "~signing-owner/ubuntu/testing")
+ description = "PPA signing-owner testing"
+ created_at = datetime.utcnow().replace(tzinfo=timezone.utc)
+ key = Key.inject(
+ KeyType.OPENPGP, private_key, public_key, description, created_at)
+ now = get_transaction_timestamp(store)
+ self.assertThat(key, MatchesStructure.byEquality(
+ key_type=KeyType.OPENPGP,
+ fingerprint=fingerprint,
+ public_key=public_key,
+ created_at=created_at,
+ updated_at=now))
+ self.assertEqual(private_key, key.getPrivateKey())
+ self.assertEqual(
+ key, Key.getByTypeAndFingerprint(KeyType.OPENPGP, fingerprint))
diff --git a/lp_signing/tests/factory.py b/lp_signing/tests/factory.py
index c79203c..596f7b4 100644
--- a/lp_signing/tests/factory.py
+++ b/lp_signing/tests/factory.py
@@ -34,9 +34,9 @@ def generate_unique_string(prefix=None):
return f"{prefix}-{generate_unique_integer()}"
-def generate_fingerprint():
- """Generate a random string that looks like an X.509 fingerprint."""
- return "".join(random.choices("0123456789ABCDEF", k=16))
+def generate_fingerprint(length=16):
+ """Generate a random string that looks like a fingerprint."""
+ return "".join(random.choices("0123456789ABCDEF", k=length))
def generate_random_bytes(size):
diff --git a/lp_signing/tests/matchers.py b/lp_signing/tests/matchers.py
index 0089fa1..187f054 100644
--- a/lp_signing/tests/matchers.py
+++ b/lp_signing/tests/matchers.py
@@ -5,7 +5,9 @@
import json
+import gpg
from testtools.matchers import (
+ AfterPreprocessing,
AnyMatch,
Contains,
Equals,
@@ -13,6 +15,14 @@ from testtools.matchers import (
MatchesListwise,
MatchesStructure,
Mismatch,
+ StartsWith,
+ )
+
+from lp_signing.enums import SignatureMode
+from lp_signing.model.key import (
+ _gpg_context,
+ _gpg_key_import,
+ _temporary_path,
)
@@ -72,3 +82,59 @@ class HasAPIError(BaseJSONResponseMatcher):
data = json.loads(matchee.data.decode(matchee.charset))
return AnyMatch(self.expected_message_matcher).match(
[error["message"] for error in data["error_list"]])
+
+
+class MatchesOpenPGPKey(AfterPreprocessing):
+
+ def __init__(self, matcher, secret=False):
+ super().__init__(self._load_gpg_key, matcher)
+ self.secret = secret
+
+ def _load_gpg_key(self, value):
+ with _temporary_path() as tmp, _gpg_context(tmp) as ctx:
+ result = _gpg_key_import(ctx, value)
+ return list(
+ ctx.keylist(result.imports[0].fpr, secret=self.secret))[0]
+
+ def match(self, matchee):
+ if self.secret:
+ expected_prefix = b"-----BEGIN PGP PRIVATE KEY BLOCK-----\n"
+ expected_key_type = "secret"
+ else:
+ expected_prefix = b"-----BEGIN PGP PUBLIC KEY BLOCK-----\n"
+ expected_key_type = "public"
+ if not matchee.startswith(expected_prefix):
+ return Mismatch(f"{matchee} is not a {expected_key_type} key")
+ return super().match(matchee)
+
+
+class OpenPGPSignatureVerifies(Matcher):
+
+ def __init__(self, public_key, message, mode):
+ self.public_key = public_key
+ self.message = message
+ self.mode = mode
+
+ def match(self, matchee):
+ if self.mode == SignatureMode.ATTACHED:
+ expected_prefix = b"-----BEGIN PGP MESSAGE-----\n"
+ elif self.mode == SignatureMode.DETACHED:
+ expected_prefix = b"-----BEGIN PGP SIGNATURE-----\n"
+ elif self.mode == SignatureMode.CLEAR:
+ expected_prefix = b"-----BEGIN PGP SIGNED MESSAGE-----\n"
+ else:
+ raise AssertionError(f"Unknown signature mode {self.mode}")
+ mismatch = StartsWith(expected_prefix).match(matchee)
+ if mismatch is not None:
+ return mismatch
+ with _temporary_path() as tmp, _gpg_context(tmp) as ctx:
+ _gpg_key_import(ctx, self.public_key)
+ try:
+ if self.mode == SignatureMode.DETACHED:
+ data, result = ctx.verify(self.message, matchee)
+ else:
+ data, result = ctx.verify(matchee)
+ except gpg.errors.BadSignatures as e:
+ return Mismatch(f"GPG verification of {matchee} failed: {e}")
+ if self.mode != SignatureMode.DETACHED:
+ return Equals(self.message).match(data)
diff --git a/lp_signing/tests/test_webapi.py b/lp_signing/tests/test_webapi.py
index 10426dd..2ae3bff 100644
--- a/lp_signing/tests/test_webapi.py
+++ b/lp_signing/tests/test_webapi.py
@@ -13,6 +13,9 @@ from cryptography.x509 import (
load_der_x509_certificate,
load_pem_x509_certificate,
)
+from fixtures import MockPatch
+import gpg
+from gpg.results import GenkeyResult
from flask_storm import store
from nacl.encoding import Base64Encoder
from nacl.public import (
@@ -36,14 +39,23 @@ from testtools.matchers import (
)
from lp_signing.auth import boxed_authentication
-from lp_signing.enums import KeyType
+from lp_signing.enums import (
+ KeyType,
+ OpenPGPKeyAlgorithm,
+ SignatureMode,
+ )
from lp_signing.exceptions import InvalidNonce
-from lp_signing.model.key import Key
+from lp_signing.model.key import (
+ _gpg_context,
+ Key,
+ )
from lp_signing.model.nonce import Nonce
from lp_signing.tests import factory
from lp_signing.tests.matchers import (
HasAPIError,
IsJSONResponse,
+ MatchesOpenPGPKey,
+ OpenPGPSignatureVerifies,
)
from lp_signing.tests.testfixtures import (
AppFixture,
@@ -207,7 +219,10 @@ class TestGenerateView(TestCase):
def test_missing_key_type(self):
resp = self.post_generate({"description": ""})
self.assertThat(
- resp, HasAPIError("'key-type' is a required property at /"))
+ resp,
+ HasAPIError(
+ "{'description': ''} is not valid under any of the given "
+ "schemas at /"))
self.assertNonceConsumed()
def test_invalid_key_type(self):
@@ -215,14 +230,59 @@ class TestGenerateView(TestCase):
self.assertThat(
resp,
HasAPIError(
- "'nonsense' is not one of ['UEFI', 'KMOD', 'OPAL', 'SIPL', "
- "'FIT']"))
+ "{'key-type': 'nonsense', 'description': ''} is not valid "
+ "under any of the given schemas at /"))
self.assertNonceConsumed()
def test_missing_description(self):
resp = self.post_generate({"key-type": "UEFI"})
self.assertThat(
- resp, HasAPIError("'description' is a required property at /"))
+ resp,
+ HasAPIError(
+ "{'key-type': 'UEFI'} is not valid under any of the given "
+ "schemas at /"))
+ self.assertNonceConsumed()
+
+ def test_openpgp_missing_openpgp_key_algorithm(self):
+ resp = self.post_generate({
+ "key-type": "OPENPGP",
+ "description": "",
+ "length": 1024,
+ })
+ self.assertThat(
+ resp,
+ HasAPIError(
+ "{'key-type': 'OPENPGP', 'description': '', 'length': 1024} "
+ "is not valid under any of the given schemas at /"))
+ self.assertNonceConsumed()
+
+ def test_openpgp_invalid_openpgp_key_algorithm(self):
+ resp = self.post_generate({
+ "key-type": "OPENPGP",
+ "description": "",
+ "openpgp-key-algorithm": "nonsense",
+ "length": 1024,
+ })
+ self.assertThat(
+ resp,
+ HasAPIError(
+ "{'key-type': 'OPENPGP', 'description': '', "
+ "'openpgp-key-algorithm': 'nonsense', 'length': 1024} is not "
+ "valid under any of the given schemas at /"))
+ self.assertNonceConsumed()
+
+ def test_openpgp_missing_length(self):
+ resp = self.post_generate({
+ "key-type": "OPENPGP",
+ "description": "",
+ "openpgp-key-algorithm": "RSA",
+ })
+ self.assertThat(
+ resp,
+ HasAPIError(
+ "{'key-type': 'OPENPGP', 'description': '', "
+ "'openpgp-key-algorithm': 'RSA'} is not valid under any of "
+ "the given schemas at /"))
self.assertNonceConsumed()
def test_generate_uefi(self):
@@ -541,6 +601,62 @@ class TestGenerateView(TestCase):
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
+ def test_generate_openpgp(self):
+ # Integration test: generate and return a real OpenPGP key.
+ resp = self.post_generate(
+ {
+ "key-type": "OPENPGP",
+ "description": "PPA test-owner test-archive",
+ "openpgp-key-algorithm": "RSA",
+ "length": 1024,
+ })
+ self.assertThat(resp, IsJSONResponse(
+ MatchesDict({
+ "fingerprint": HasLength(40),
+ "public-key": AfterPreprocessing(
+ lambda data: base64.b64decode(data.encode("UTF-8")),
+ MatchesOpenPGPKey(
+ MatchesStructure(
+ fpr=Equals(resp.json["fingerprint"]),
+ secret=Equals(0),
+ subkeys=MatchesListwise([
+ MatchesStructure.byEquality(
+ pubkey_algo=1,
+ length=1024),
+ ]),
+ uids=MatchesListwise([
+ MatchesStructure.byEquality(
+ uid="PPA test-owner test-archive"),
+ ])))),
+ }),
+ expected_status=201))
+ self.assertNonceConsumed()
+ # The new key was committed to the database.
+ key = Key.getByTypeAndFingerprint(
+ KeyType.OPENPGP, resp.json["fingerprint"])
+ self.assertThat(key, MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ public_key=base64.b64decode(
+ resp.json["public-key"].encode("UTF-8")),
+ authorizations=[self.client]))
+
+ def test_generate_openpgp_create_error(self):
+ result = GenkeyResult(None)
+ result.primary = False
+ result.sub = False
+ self.useFixture(MockPatch(
+ "gpg.Context.create_key", return_value=result))
+ resp = self.post_generate(
+ {
+ "key-type": "OPENPGP",
+ "description": "PPA test-owner test-archive",
+ "openpgp-key-algorithm": "RSA",
+ "length": 1024,
+ })
+ error_re = r"Failed to generate key: Secret key generation failed."
+ self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
+ self.assertNonceConsumed()
+
class TestSignView(TestCase):
@@ -642,7 +758,7 @@ class TestSignView(TestCase):
resp,
HasAPIError(
"'nonsense' is not one of ['UEFI', 'KMOD', 'OPAL', 'SIPL', "
- "'FIT']"))
+ "'FIT', 'OPENPGP']"))
self.assertNonceConsumed()
def test_missing_fingerprint(self):
@@ -1140,6 +1256,105 @@ class TestSignView(TestCase):
resp,
HasAPIError("Signature mode CLEAR not supported with FIT"))
+ def test_sign_openpgp_attached(self):
+ key = Key.generate(
+ KeyType.OPENPGP, "~signing-owner/ubuntu/testing",
+ openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA, length=1024)
+ key.addAuthorization(self.client)
+ store.commit()
+ resp = self.post_sign(
+ {
+ "key-type": "OPENPGP",
+ "fingerprint": key.fingerprint,
+ "message-name": "t.gpg",
+ "message": base64.b64encode(b"test data").decode("UTF-8"),
+ "mode": "ATTACHED",
+ })
+ self.assertThat(resp, IsJSONResponse(
+ MatchesDict({
+ "public-key": AfterPreprocessing(
+ lambda data: base64.b64decode(data.encode("UTF-8")),
+ Equals(key.public_key)),
+ "signed-message": AfterPreprocessing(
+ lambda data: base64.b64decode(data.encode("UTF-8")),
+ OpenPGPSignatureVerifies(
+ key.public_key, b"test data", SignatureMode.ATTACHED)),
+ })))
+ self.assertNonceConsumed()
+
+ def test_sign_openpgp_detached(self):
+ key = Key.generate(
+ KeyType.OPENPGP, "~signing-owner/ubuntu/testing",
+ openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA, length=1024)
+ key.addAuthorization(self.client)
+ store.commit()
+ resp = self.post_sign(
+ {
+ "key-type": "OPENPGP",
+ "fingerprint": key.fingerprint,
+ "message-name": "t.gpg",
+ "message": base64.b64encode(b"test data").decode("UTF-8"),
+ "mode": "DETACHED",
+ })
+ self.assertThat(resp, IsJSONResponse(
+ MatchesDict({
+ "public-key": AfterPreprocessing(
+ lambda data: base64.b64decode(data.encode("UTF-8")),
+ Equals(key.public_key)),
+ "signed-message": AfterPreprocessing(
+ lambda data: base64.b64decode(data.encode("UTF-8")),
+ OpenPGPSignatureVerifies(
+ key.public_key, b"test data", SignatureMode.DETACHED)),
+ })))
+ self.assertNonceConsumed()
+
+ def test_sign_openpgp_clear(self):
+ key = Key.generate(
+ KeyType.OPENPGP, "~signing-owner/ubuntu/testing",
+ openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA, length=1024)
+ key.addAuthorization(self.client)
+ store.commit()
+ resp = self.post_sign(
+ {
+ "key-type": "OPENPGP",
+ "fingerprint": key.fingerprint,
+ "message-name": "t.gpg",
+ "message": base64.b64encode(b"test data").decode("UTF-8"),
+ "mode": "CLEAR",
+ })
+ self.assertThat(resp, IsJSONResponse(
+ MatchesDict({
+ "public-key": AfterPreprocessing(
+ lambda data: base64.b64decode(data.encode("UTF-8")),
+ Equals(key.public_key)),
+ "signed-message": AfterPreprocessing(
+ lambda data: base64.b64decode(data.encode("UTF-8")),
+ OpenPGPSignatureVerifies(
+ key.public_key, b"test data\n", SignatureMode.CLEAR)),
+ })))
+ self.assertNonceConsumed()
+
+ def test_sign_openpgp_sign_error(self):
+ key = Key.generate(
+ KeyType.OPENPGP, "~signing-owner/ubuntu/testing",
+ openpgp_key_algorithm=OpenPGPKeyAlgorithm.RSA, length=1024)
+ key.addAuthorization(self.client)
+ store.commit()
+ self.useFixture(MockPatch(
+ "gpg.Context.sign",
+ side_effect=gpg.errors.UnsupportedAlgorithm("Boom")))
+ resp = self.post_sign(
+ {
+ "key-type": "OPENPGP",
+ "fingerprint": key.fingerprint,
+ "message-name": "t.gpg",
+ "message": base64.b64encode(b"test data").decode("UTF-8"),
+ "mode": "ATTACHED",
+ })
+ error_re = r"Failed to sign message: Boom"
+ self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
+ self.assertNonceConsumed()
+
class TestInjectView(TestCase):
@@ -1286,7 +1501,7 @@ class TestInjectView(TestCase):
resp,
HasAPIError(
"'nonsense' is not one of ['UEFI', 'KMOD', 'OPAL', 'SIPL', "
- "'FIT']"))
+ "'FIT', 'OPENPGP']"))
self.assertNonceConsumed()
def test_inject_uefi(self):
@@ -1558,6 +1773,47 @@ class TestInjectView(TestCase):
self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
self.assertNonceConsumed()
+ def test_inject_openpgp(self):
+ # Integration test: inject a real OpenPGP key and return its
+ # fingerprint.
+ with TemporaryDirectory() as tmp, _gpg_context(Path(tmp)) as ctx:
+ private_key, public_key, fingerprint = Key._generateOpenPGP(
+ ctx, OpenPGPKeyAlgorithm.RSA, 1024, "OpenPGP test description")
+
+ now_with_tz = datetime.now().replace(tzinfo=pytz.utc)
+ resp = self.post_inject({
+ "key-type": "OPENPGP",
+ "private-key": base64.b64encode(private_key).decode("UTF-8"),
+ "public-key": base64.b64encode(public_key).decode("UTF-8"),
+ "created-at": now_with_tz.isoformat(),
+ "description": "OpenPGP test description",
+ })
+
+ self.assertThat(resp, IsJSONResponse(
+ MatchesDict({
+ "fingerprint": HasLength(40),
+ }),
+ expected_status=200))
+ self.assertNonceConsumed()
+ # The new key was committed to the database.
+ key = Key.getByTypeAndFingerprint(
+ KeyType.OPENPGP, resp.json["fingerprint"])
+ self.assertThat(key, MatchesStructure.byEquality(
+ fingerprint=resp.json["fingerprint"],
+ authorizations=[self.clients[0]]))
+
+ def test_inject_openpgp_fingerprint_error(self):
+ resp = self.post_inject({
+ "key-type": "OPENPGP",
+ "private-key": base64.b64encode(b"").decode("UTF-8"),
+ "public-key": base64.b64encode(b"").decode("UTF-8"),
+ "created-at": datetime.utcnow().isoformat(),
+ "description": "OpenPGP test description",
+ })
+ error_re = r"Failed to get fingerprint of new key: IMPORT_PROBLEM"
+ self.assertThat(resp, HasAPIError(MatchesRegex(error_re), 500))
+ self.assertNonceConsumed()
+
def test_inject_duplicate_key_different_clients(self):
common_name = Key._generateKeyCommonName(
"PPA test-owner test-archive", 'FIT')
diff --git a/lp_signing/webapi.py b/lp_signing/webapi.py
index 2222806..ea8b92b 100644
--- a/lp_signing/webapi.py
+++ b/lp_signing/webapi.py
@@ -25,6 +25,7 @@ from nacl.encoding import Base64Encoder
from lp_signing.auth import boxed_authentication
from lp_signing.enums import (
KeyType,
+ OpenPGPKeyAlgorithm,
SignatureMode,
)
from lp_signing.exceptions import (
@@ -98,14 +99,43 @@ generate_api = service.api("/generate", "generate_key", methods=["POST"])
@encrypted_response
@validate_body({
"type": "object",
- "properties": {
- "key-type": {
- "type": "string",
- "enum": [item.token for item in KeyType],
+ # XXX cjwatson 2020-07-02: This produces suboptimal error messages; once
+ # acceptable supports JSON Schema draft 7, we can do a better job here
+ # using conditional schemas.
+ "oneOf": [
+ {
+ "properties": {
+ "key-type": {
+ "type": "string",
+ "enum": [
+ item.token for item in KeyType
+ if item.value != KeyType.OPENPGP
+ ],
+ },
+ "description": {"type": "string"},
+ },
+ "required": ["key-type", "description"],
+ "additionalProperties": False,
},
- "description": {"type": "string"},
- },
- "required": ["key-type", "description"],
+ {
+ "properties": {
+ "key-type": {
+ "type": "string",
+ "enum": [KeyType.OPENPGP.name],
+ },
+ "description": {"type": "string"},
+ "openpgp-key-algorithm": {
+ "type": "string",
+ "enum": [item.token for item in OpenPGPKeyAlgorithm],
+ },
+ "length": {"type": "integer"},
+ },
+ "required": [
+ "key-type", "description", "openpgp-key-algorithm", "length",
+ ],
+ "additionalProperties": False,
+ },
+ ],
})
@validate_output({
"type": "object",
@@ -118,8 +148,16 @@ generate_api = service.api("/generate", "generate_key", methods=["POST"])
def generate_key():
payload = request.get_json()
key_type = KeyType.items[payload["key-type"]]
+ if "openpgp-key-algorithm" in payload:
+ openpgp_key_algorithm = OpenPGPKeyAlgorithm.items[
+ payload["openpgp-key-algorithm"]]
+ else:
+ openpgp_key_algorithm = None
+ length = payload.get("length")
description = payload["description"]
- key = Key.generate(key_type, description)
+ key = Key.generate(
+ key_type, description, openpgp_key_algorithm=openpgp_key_algorithm,
+ length=length)
key.addAuthorization(request.client)
store.commit()
return {
diff --git a/system-packages.txt b/system-packages.txt
new file mode 100644
index 0000000..db65e70
--- /dev/null
+++ b/system-packages.txt
@@ -0,0 +1,11 @@
+# System-installed Python packages to link into our virtualenv. This
+# facility should be reserved for cases where installing them as a normal
+# Python dependency is impossible or unreliable (perhaps due to frequent ABI
+# changes in system libraries they depend on, or frequent security updates
+# managed by the distribution's security team).
+#
+# Package names that end with "?" are optional, in that link-system-packages
+# will not fail if they are missing; this should be reserved for packages
+# only used by the test suite.
+
+gpg