launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #24610
[Merge] ~pappacena/launchpad:inject-lp-signing-generated-keys into launchpad:master
Thiago F. Pappacena has proposed merging ~pappacena/launchpad:inject-lp-signing-generated-keys into launchpad:master.
Commit message:
Adding the possibility to inject into lp-signing the locally generated signing keys.
It is possible to control which key types to inject when auto-generating them by setting the feature flag `archivepublisher.signing_service.injection.enabled` with a list of key types (comma-separated). Eg.: "KMOD,UEFI".
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~pappacena/launchpad/+git/launchpad/+merge/382779
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/launchpad:inject-lp-signing-generated-keys into launchpad:master.
diff --git a/lib/lp/archivepublisher/signing.py b/lib/lp/archivepublisher/signing.py
index 9a175db..6231775 100644
--- a/lib/lp/archivepublisher/signing.py
+++ b/lib/lp/archivepublisher/signing.py
@@ -18,8 +18,11 @@ __all__ = [
"UefiUpload",
]
+import base64
+from datetime import datetime
from functools import partial
import os
+import re
import shutil
import stat
import subprocess
@@ -27,6 +30,7 @@ import tarfile
import tempfile
import textwrap
+from pytz import utc
import scandir
from zope.component import getUtility
@@ -42,6 +46,8 @@ from lp.soyuz.interfaces.queue import CustomUploadError
PUBLISHER_USES_SIGNING_SERVICE = (
'archivepublisher.signing_service.enabled')
+PUBLISHER_SIGNING_SERVICE_INJECTS_KEYS = (
+ 'archivepublisher.signing_service.injection.enabled')
class SigningUploadPackError(CustomUploadError):
@@ -260,7 +266,7 @@ class SigningUpload(CustomUpload):
# tend to make the publisher rather upset.
if self.logger is not None:
self.logger.warning("%s Failed (cmd='%s')" %
- (description, " ".join(cmdl)))
+ (description, " ".join(cmdl)))
return status
def findSigningHandlers(self):
@@ -421,6 +427,40 @@ class SigningUpload(CustomUpload):
return [None for k in keynames]
return keynames
+ def injectIntoSigningService(
+ self, key_type, private_key_file, public_key_file):
+ """Injects the given key pair into signing service for current
+ archive."""
+ if key_type not in SigningKeyType:
+ raise ValueError("%s is not a valid key type to inject" % key_type)
+
+ feature_flag = (
+ getFeatureFlag(PUBLISHER_SIGNING_SERVICE_INJECTS_KEYS) or '')
+ key_types_to_inject = [i.strip() for i in feature_flag.split(',')]
+ if not key_types_to_inject:
+ return
+
+ if key_type.name not in key_types_to_inject:
+ if self.logger:
+ self.logger.info(
+ "Skipping injection for key type %s: not in %s" %
+ (key_type, key_types_to_inject))
+ return
+
+ if self.logger:
+ self.logger.info(
+ "Injecting key_type %s for archive %s into signing service" %
+ (key_type, self.archive.name))
+
+ private_key = LocalKeyFile(private_key_file).getPrivateKey()
+ public_key = LocalKeyFile(public_key_file).getPublicKey()
+
+ now = datetime.now().replace(tzinfo=utc)
+ getUtility(IArchiveSigningKeySet).inject(
+ key_type, private_key, public_key,
+ u"Auto-generated %s key" % key_type.name, now,
+ self.archive, earliest_distro_series=None)
+
def generateKeyCommonName(self, owner, archive, suffix=''):
# PPA <owner> <archive> <suffix>
# truncate <owner> <archive> to ensure the overall form is shorter
@@ -454,6 +494,10 @@ class SigningUpload(CustomUpload):
if os.path.exists(cert_filename):
os.chmod(cert_filename, 0o644)
+ signing_key_type = getattr(SigningKeyType, key_type.upper())
+ self.injectIntoSigningService(
+ signing_key_type, key_filename, cert_filename)
+
def generateUefiKeys(self):
"""Generate new UEFI Keys for this archive."""
self.generateKeyCrtPair("UEFI", self.uefi_key, self.uefi_cert)
@@ -541,6 +585,10 @@ class SigningUpload(CustomUpload):
if os.path.exists(x509_filename):
os.chmod(x509_filename, 0o644)
+ signing_key_type = getattr(SigningKeyType, key_type.upper())
+ self.injectIntoSigningService(
+ signing_key_type, pem_filename, x509_filename)
+
def generateKmodKeys(self):
"""Generate new Kernel Signing Keys for this archive."""
config = self.generateOpensslConfig("Kmod", self.openssl_config_kmod)
@@ -691,3 +739,44 @@ class UefiUpload(SigningUpload):
"""
custom_type = "uefi"
dists_directory = "uefi"
+
+
+class LocalKeyFile:
+ """Helper to extract content of locally generated key files."""
+ def __init__(self, filename):
+ self.filename = filename
+ self._content = None
+
+ @property
+ def content(self):
+ if self._content is None:
+ with open(self.filename) as fd:
+ self._content = fd.read()
+ return self._content
+
+ def getBase64KeyContent(self, tag="PRIVATE KEY"):
+ """
+ Extracts the base64 content of the given file content.
+
+ :param key_file_content: The content of a key file.
+ :param tag: Either 'PRIVATE KEY' or 'CERTIFICATE'.
+ :return: The binary content (base64-decoded).
+ """
+ m = re.search(
+ r"-----BEGIN %s-----\n(.*)?\n-----END %s-----" % (tag, tag),
+ self.content, flags=re.DOTALL)
+ if not m:
+ raise ValueError("No content between -----BEGIN/END %s-----" % tag)
+ return base64.b64decode(m.groups()[0])
+
+ def getPrivateKey(self):
+ return self.getBase64KeyContent("PRIVATE KEY")
+
+ def getPublicKey(self):
+ try:
+ return self.getBase64KeyContent("CERTIFICATE")
+ except ValueError:
+ # If there is no tag "CERTIFICATE" in the file, it should be a
+ # binary file already.
+ with open(self.filename, 'rb') as fd:
+ return fd.read()
diff --git a/lib/lp/archivepublisher/tests/test_signing.py b/lib/lp/archivepublisher/tests/test_signing.py
index 87cf391..837349d 100644
--- a/lib/lp/archivepublisher/tests/test_signing.py
+++ b/lib/lp/archivepublisher/tests/test_signing.py
@@ -7,14 +7,20 @@ from __future__ import absolute_import, print_function, unicode_literals
__metaclass__ = type
+import base64
+from datetime import datetime
import os
import re
import shutil
import stat
import tarfile
-from fixtures import MonkeyPatch
+from fixtures import (
+ MockPatch,
+ MonkeyPatch,
+ )
from mock import call
+from pytz import utc
import scandir
from testtools.matchers import (
Contains,
@@ -42,12 +48,14 @@ from lp.archivepublisher.interfaces.archivegpgsigningkey import (
)
from lp.archivepublisher.interfaces.publisherconfig import IPublisherConfigSet
from lp.archivepublisher.signing import (
+ PUBLISHER_SIGNING_SERVICE_INJECTS_KEYS,
PUBLISHER_USES_SIGNING_SERVICE,
SigningUpload,
UefiUpload,
)
from lp.archivepublisher.tests.test_run_parts import RunPartsMixin
from lp.services.features.testing import FeatureFixture
+from lp.services.log.logger import BufferLogger
from lp.services.osutils import write_file
from lp.services.signing.enums import SigningMode
from lp.services.signing.proxy import SigningKeyType
@@ -2009,3 +2017,58 @@ class TestSigningUploadWithSigningService(TestSigningHelpers):
self.assertEqual(
[(os.path.join(upload.tmpdir_used, "1.0/empty.efi"),)],
upload.signUefi.extract_args())
+
+ def test_fallback_injects_key(self):
+ self.useFixture(FeatureFixture({PUBLISHER_USES_SIGNING_SERVICE: ''}))
+ self.useFixture(FeatureFixture({
+ PUBLISHER_SIGNING_SERVICE_INJECTS_KEYS: 'SIPL,OPAL'}))
+
+ now = datetime.now()
+ mock_datetime = self.useFixture(MockPatch(
+ 'lp.archivepublisher.signing.datetime')).mock
+ mock_datetime.now = lambda: now
+
+ logger = BufferLogger()
+ upload = SigningUpload(logger=logger)
+
+ # Setup PPA to ensure it auto-generates keys.
+ self.setUpPPA()
+
+ filenames = ["1.0/empty.efi", "1.0/empty.opal"]
+
+ self.openArchive("test", "1.0", "amd64")
+ for filename in filenames:
+ self.tarfile.add_file(filename, b"data - %s" % filename)
+ self.tarfile.close()
+ self.buffer.close()
+
+ upload.process(self.archive, self.path, self.suite)
+ self.assertTrue(upload.autokey)
+
+ # Read the key file content
+ with open(upload.opal_pem) as fd:
+ pem = fd.read()
+ start_tag = "-----BEGIN PRIVATE KEY-----"
+ start = pem.index(start_tag) + len(start_tag)
+ end = pem.index("-----END PRIVATE KEY-----")
+ private_key = base64.b64decode(pem[start:end].strip())
+ with open(upload.opal_x509) as fd:
+ public_key = fd.read()
+
+ # Check if we called lp-signing's /inject endpoint correctly
+ self.assertEqual(1, self.signing_service_client.inject.call_count)
+ self.assertEqual(
+ (SigningKeyType.OPAL, private_key, public_key,
+ u"Auto-generated OPAL key", now.replace(tzinfo=utc)),
+ self.signing_service_client.inject.call_args[0])
+
+ log_content = logger.content.as_text()
+ self.assertIn(
+ "INFO Injecting key_type OPAL for archive %s into signing "
+ "service" % (self.archive.name),
+ log_content)
+
+ self.assertIn(
+ "INFO Skipping injection for key type UEFI: "
+ "not in [u'SIPL', u'OPAL']",
+ log_content)
diff --git a/lib/lp/services/signing/tests/helpers.py b/lib/lp/services/signing/tests/helpers.py
index e01730f..5675bc6 100644
--- a/lib/lp/services/signing/tests/helpers.py
+++ b/lib/lp/services/signing/tests/helpers.py
@@ -39,8 +39,12 @@ class SigningServiceClientFixture(fixtures.Fixture):
self.sign = mock.Mock()
self.sign.side_effect = self._sign
+ self.inject = mock.Mock()
+ self.inject.side_effect = self._inject
+
self.generate_returns = []
self.sign_returns = []
+ self.inject_returns = []
def _generate(self, key_type, description):
key = bytes(PrivateKey.generate().public_key)
@@ -59,6 +63,12 @@ class SigningServiceClientFixture(fixtures.Fixture):
self.sign_returns.append((key_type, data))
return data
+ def _inject(self, key_type, private_key, public_key, description,
+ created_at):
+ data = {'fingerprint': text_type(self.factory.getUniqueHexString(40))}
+ self.inject_returns.append(data)
+ return data
+
def _setUp(self):
self.useFixture(ZopeUtilityFixture(self, ISigningServiceClient))