launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #25256
[Merge] ~cjwatson/launchpad:py3-gpg into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:py3-gpg into launchpad:master.
Commit message:
Make GPGHandler take content and signature as bytes
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/390273
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:py3-gpg into launchpad:master.
diff --git a/lib/lp/archivepublisher/archivegpgsigningkey.py b/lib/lp/archivepublisher/archivegpgsigningkey.py
index 7439a32..ead72b8 100644
--- a/lib/lp/archivepublisher/archivegpgsigningkey.py
+++ b/lib/lp/archivepublisher/archivegpgsigningkey.py
@@ -82,7 +82,7 @@ class SignableArchive:
if self.archive.signing_key is not None:
secret_key_path = self.getPathForSecretKey(
self.archive.signing_key)
- with open(secret_key_path) as secret_key_file:
+ with open(secret_key_path, "rb") as secret_key_file:
secret_key_export = secret_key_file.read()
gpghandler = getUtility(IGPGHandler)
secret_key = gpghandler.importSecretKey(secret_key_export)
@@ -92,11 +92,11 @@ class SignableArchive:
if mode not in {SigningMode.DETACHED, SigningMode.CLEAR}:
raise ValueError('Invalid signature mode for GPG: %s' % mode)
if self.archive.signing_key is not None:
- with open(input_path) as input_file:
+ with open(input_path, "rb") as input_file:
input_content = input_file.read()
signature = gpghandler.signContent(
input_content, secret_key, mode=self.gpgme_modes[mode])
- with open(output_path, "w") as output_file:
+ with open(output_path, "wb") as output_file:
output_file.write(signature)
output_paths.append(output_path)
elif find_run_parts_dir(
@@ -185,7 +185,7 @@ class ArchiveGPGSigningKey(SignableArchive):
if not os.path.exists(os.path.dirname(export_path)):
os.makedirs(os.path.dirname(export_path))
- with open(export_path, 'w') as export_file:
+ with open(export_path, 'wb') as export_file:
export_file.write(key.export())
def generateSigningKey(self):
@@ -218,7 +218,7 @@ class ArchiveGPGSigningKey(SignableArchive):
assert os.path.exists(key_path), (
"%s does not exist" % key_path)
- with open(key_path) as key_file:
+ with open(key_path, "rb") as key_file:
secret_key_export = key_file.read()
secret_key = getUtility(IGPGHandler).importSecretKey(secret_key_export)
return self._setupSigningKey(
diff --git a/lib/lp/services/gpg/doc/gpg-encryption.txt b/lib/lp/services/gpg/doc/gpg-encryption.txt
index 065bd35..1c79391 100644
--- a/lib/lp/services/gpg/doc/gpg-encryption.txt
+++ b/lib/lp/services/gpg/doc/gpg-encryption.txt
@@ -77,7 +77,7 @@ Let's try to pass unicode and see if it fails
>>> cipher = gpghandler.encryptContent(content, key)
Traceback (most recent call last):
...
- TypeError: Content cannot be Unicode.
+ TypeError: Content must be bytes.
Decrypt a unicode content:
@@ -87,7 +87,7 @@ Decrypt a unicode content:
>>> plain = decrypt_content(cipher, 'test')
Traceback (most recent call last):
...
- TypeError: Content cannot be Unicode.
+ TypeError: Content must be bytes.
What about a message encrypted for an unknown key.
diff --git a/lib/lp/services/gpg/handler.py b/lib/lp/services/gpg/handler.py
index 965282a..381f1cf 100644
--- a/lib/lp/services/gpg/handler.py
+++ b/lib/lp/services/gpg/handler.py
@@ -13,10 +13,10 @@ __all__ = [
import atexit
from contextlib import contextmanager
from datetime import datetime
+from io import BytesIO
import os
import shutil
import socket
-from StringIO import StringIO
import subprocess
import sys
import tempfile
@@ -173,16 +173,16 @@ class GPGHandler:
if signature:
# store detach-sig
- sig = StringIO(signature)
+ sig = BytesIO(signature)
# store the content
- plain = StringIO(content)
+ plain = BytesIO(content)
args = (sig, plain, None)
timeline_detail = "detached signature"
else:
# store clearsigned signature
- sig = StringIO(content)
+ sig = BytesIO(content)
# writeable content
- plain = StringIO()
+ plain = BytesIO()
args = (sig, None, plain)
timeline_detail = "clear signature"
@@ -220,8 +220,8 @@ class GPGHandler:
def getVerifiedSignature(self, content, signature=None):
"""See IGPGHandler."""
- assert not isinstance(content, six.text_type)
- assert not isinstance(signature, six.text_type)
+ assert isinstance(content, bytes)
+ assert signature is None or isinstance(signature, bytes)
ctx = get_gpgme_context()
@@ -265,10 +265,10 @@ class GPGHandler:
def importPublicKey(self, content):
"""See IGPGHandler."""
- assert isinstance(content, str)
+ assert isinstance(content, bytes)
context = get_gpgme_context()
- newkey = StringIO(content)
+ newkey = BytesIO(content)
with gpgme_timeline("import", "new public key"):
result = context.import_(newkey)
@@ -295,14 +295,14 @@ class GPGHandler:
def importSecretKey(self, content):
"""See `IGPGHandler`."""
- assert isinstance(content, str)
+ assert isinstance(content, bytes)
# Make sure that gpg-agent doesn't interfere.
if 'GPG_AGENT_INFO' in os.environ:
del os.environ['GPG_AGENT_INFO']
context = get_gpgme_context()
- newkey = StringIO(content)
+ newkey = BytesIO(content)
with gpgme_timeline("import", "new secret key"):
import_result = context.import_(newkey)
@@ -382,14 +382,14 @@ class GPGHandler:
def encryptContent(self, content, key):
"""See IGPGHandler."""
- if isinstance(content, six.text_type):
- raise TypeError('Content cannot be Unicode.')
+ if not isinstance(content, bytes):
+ raise TypeError('Content must be bytes.')
ctx = get_gpgme_context()
# setup containers
- plain = StringIO(content)
- cipher = StringIO()
+ plain = BytesIO(content)
+ cipher = BytesIO()
if key.key is None:
return None
@@ -408,8 +408,8 @@ class GPGHandler:
def signContent(self, content, key, password='', mode=None):
"""See IGPGHandler."""
- if not isinstance(content, str):
- raise TypeError('Content should be a string.')
+ if not isinstance(content, bytes):
+ raise TypeError('Content must be bytes.')
if mode is None:
mode = gpgme.SIG_MODE_CLEAR
@@ -420,8 +420,8 @@ class GPGHandler:
context.signers = [removeSecurityProxy(key.key)]
# Set up containers.
- plaintext = StringIO(content)
- signature = StringIO()
+ plaintext = BytesIO(content)
+ signature = BytesIO()
# Make sure that gpg-agent doesn't interfere.
if 'GPG_AGENT_INFO' in os.environ:
@@ -449,7 +449,7 @@ class GPGHandler:
# XXX michaeln 2010-05-07 bug=576405
# Currently gpgme.Context().keylist fails if passed a unicode
# string even though that's what is returned for fingerprints.
- if type(filter) == unicode:
+ if isinstance(filter, six.text_type):
filter = filter.encode('utf-8')
with gpgme_timeline(
@@ -667,7 +667,7 @@ class PymeKey:
return p.stdout.read()
context = get_gpgme_context()
- keydata = StringIO()
+ keydata = BytesIO()
with gpgme_timeline("export", self.fingerprint):
context.export(self.fingerprint.encode('ascii'), keydata)
diff --git a/lib/lp/testing/gpgkeys/__init__.py b/lib/lp/testing/gpgkeys/__init__.py
index d330c35..b2d4dca 100644
--- a/lib/lp/testing/gpgkeys/__init__.py
+++ b/lib/lp/testing/gpgkeys/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""OpenPGP keys used for testing.
@@ -19,12 +19,11 @@ decrypt data in pagetests.
__metaclass__ = type
-from cStringIO import StringIO
+from io import BytesIO
import os
import gpgme
import scandir
-import six
from zope.component import getUtility
from lp.registry.interfaces.gpg import IGPGKeySet
@@ -95,7 +94,7 @@ def import_secret_test_key(keyfile='test@xxxxxxxxxxxxxxxxx'):
:param keyfile: The name of the file to be imported.
"""
gpghandler = getUtility(IGPGHandler)
- with open(os.path.join(gpgkeysdir, keyfile)) as f:
+ with open(os.path.join(gpgkeysdir, keyfile), 'rb') as f:
seckey = f.read()
return gpghandler.importSecretKey(seckey)
@@ -107,7 +106,7 @@ def test_pubkey_file_from_email(email_addr):
def test_pubkey_from_email(email_addr):
"""Get the on disk content for a test pubkey by email address."""
- with open(test_pubkey_file_from_email(email_addr)) as f:
+ with open(test_pubkey_file_from_email(email_addr), 'rb') as f:
return f.read()
@@ -121,23 +120,23 @@ def test_keyrings():
def decrypt_content(content, password):
"""Return the decrypted content or None if failed
- content and password must be traditional strings. It's up to
- the caller to encode or decode properly.
+ content must be a byte string, and password must be a native string.
+ It's up to the caller to encode or decode properly.
:content: encrypted data content
- :password: unicode password to unlock the secret key in question
+ :password: password to unlock the secret key in question
"""
- if isinstance(password, six.text_type):
- raise TypeError('Password cannot be Unicode.')
+ if not isinstance(password, str):
+ raise TypeError('Password must be a str.')
- if isinstance(content, six.text_type):
- raise TypeError('Content cannot be Unicode.')
+ if not isinstance(content, bytes):
+ raise TypeError('Content must be bytes.')
ctx = get_gpgme_context()
# setup containers
- cipher = StringIO(content)
- plain = StringIO()
+ cipher = BytesIO(content)
+ plain = BytesIO()
def passphrase_cb(uid_hint, passphrase_info, prev_was_bad, fd):
os.write(fd, '%s\n' % password)