launchpad-reviewers team mailing list archive
  
  - 
     launchpad-reviewers team launchpad-reviewers team
- 
    Mailing list archive
  
- 
    Message #25256
  
 [Merge] ~cjwatson/launchpad:py3-gpg into	launchpad:master
  
Colin Watson has proposed merging ~cjwatson/launchpad:py3-gpg into launchpad:master.
Commit message:
Make GPGHandler take content and signature as bytes
Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/390273
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:py3-gpg into launchpad:master.
diff --git a/lib/lp/archivepublisher/archivegpgsigningkey.py b/lib/lp/archivepublisher/archivegpgsigningkey.py
index 7439a32..ead72b8 100644
--- a/lib/lp/archivepublisher/archivegpgsigningkey.py
+++ b/lib/lp/archivepublisher/archivegpgsigningkey.py
@@ -82,7 +82,7 @@ class SignableArchive:
         if self.archive.signing_key is not None:
             secret_key_path = self.getPathForSecretKey(
                 self.archive.signing_key)
-            with open(secret_key_path) as secret_key_file:
+            with open(secret_key_path, "rb") as secret_key_file:
                 secret_key_export = secret_key_file.read()
             gpghandler = getUtility(IGPGHandler)
             secret_key = gpghandler.importSecretKey(secret_key_export)
@@ -92,11 +92,11 @@ class SignableArchive:
             if mode not in {SigningMode.DETACHED, SigningMode.CLEAR}:
                 raise ValueError('Invalid signature mode for GPG: %s' % mode)
             if self.archive.signing_key is not None:
-                with open(input_path) as input_file:
+                with open(input_path, "rb") as input_file:
                     input_content = input_file.read()
                 signature = gpghandler.signContent(
                     input_content, secret_key, mode=self.gpgme_modes[mode])
-                with open(output_path, "w") as output_file:
+                with open(output_path, "wb") as output_file:
                     output_file.write(signature)
                 output_paths.append(output_path)
             elif find_run_parts_dir(
@@ -185,7 +185,7 @@ class ArchiveGPGSigningKey(SignableArchive):
         if not os.path.exists(os.path.dirname(export_path)):
             os.makedirs(os.path.dirname(export_path))
 
-        with open(export_path, 'w') as export_file:
+        with open(export_path, 'wb') as export_file:
             export_file.write(key.export())
 
     def generateSigningKey(self):
@@ -218,7 +218,7 @@ class ArchiveGPGSigningKey(SignableArchive):
         assert os.path.exists(key_path), (
             "%s does not exist" % key_path)
 
-        with open(key_path) as key_file:
+        with open(key_path, "rb") as key_file:
             secret_key_export = key_file.read()
         secret_key = getUtility(IGPGHandler).importSecretKey(secret_key_export)
         return self._setupSigningKey(
diff --git a/lib/lp/services/gpg/doc/gpg-encryption.txt b/lib/lp/services/gpg/doc/gpg-encryption.txt
index 065bd35..1c79391 100644
--- a/lib/lp/services/gpg/doc/gpg-encryption.txt
+++ b/lib/lp/services/gpg/doc/gpg-encryption.txt
@@ -77,7 +77,7 @@ Let's try to pass unicode and see if it fails
     >>> cipher = gpghandler.encryptContent(content, key)
     Traceback (most recent call last):
     ...
-    TypeError: Content cannot be Unicode.
+    TypeError: Content must be bytes.
 
 Decrypt a unicode content:
 
@@ -87,7 +87,7 @@ Decrypt a unicode content:
     >>> plain = decrypt_content(cipher, 'test')    
     Traceback (most recent call last):
     ...
-    TypeError: Content cannot be Unicode.
+    TypeError: Content must be bytes.
 
 What about a message encrypted for an unknown key.
 
diff --git a/lib/lp/services/gpg/handler.py b/lib/lp/services/gpg/handler.py
index 965282a..381f1cf 100644
--- a/lib/lp/services/gpg/handler.py
+++ b/lib/lp/services/gpg/handler.py
@@ -13,10 +13,10 @@ __all__ = [
 import atexit
 from contextlib import contextmanager
 from datetime import datetime
+from io import BytesIO
 import os
 import shutil
 import socket
-from StringIO import StringIO
 import subprocess
 import sys
 import tempfile
@@ -173,16 +173,16 @@ class GPGHandler:
 
         if signature:
             # store detach-sig
-            sig = StringIO(signature)
+            sig = BytesIO(signature)
             # store the content
-            plain = StringIO(content)
+            plain = BytesIO(content)
             args = (sig, plain, None)
             timeline_detail = "detached signature"
         else:
             # store clearsigned signature
-            sig = StringIO(content)
+            sig = BytesIO(content)
             # writeable content
-            plain = StringIO()
+            plain = BytesIO()
             args = (sig, None, plain)
             timeline_detail = "clear signature"
 
@@ -220,8 +220,8 @@ class GPGHandler:
     def getVerifiedSignature(self, content, signature=None):
         """See IGPGHandler."""
 
-        assert not isinstance(content, six.text_type)
-        assert not isinstance(signature, six.text_type)
+        assert isinstance(content, bytes)
+        assert signature is None or isinstance(signature, bytes)
 
         ctx = get_gpgme_context()
 
@@ -265,10 +265,10 @@ class GPGHandler:
 
     def importPublicKey(self, content):
         """See IGPGHandler."""
-        assert isinstance(content, str)
+        assert isinstance(content, bytes)
         context = get_gpgme_context()
 
-        newkey = StringIO(content)
+        newkey = BytesIO(content)
         with gpgme_timeline("import", "new public key"):
             result = context.import_(newkey)
 
@@ -295,14 +295,14 @@ class GPGHandler:
 
     def importSecretKey(self, content):
         """See `IGPGHandler`."""
-        assert isinstance(content, str)
+        assert isinstance(content, bytes)
 
         # Make sure that gpg-agent doesn't interfere.
         if 'GPG_AGENT_INFO' in os.environ:
             del os.environ['GPG_AGENT_INFO']
 
         context = get_gpgme_context()
-        newkey = StringIO(content)
+        newkey = BytesIO(content)
         with gpgme_timeline("import", "new secret key"):
             import_result = context.import_(newkey)
 
@@ -382,14 +382,14 @@ class GPGHandler:
 
     def encryptContent(self, content, key):
         """See IGPGHandler."""
-        if isinstance(content, six.text_type):
-            raise TypeError('Content cannot be Unicode.')
+        if not isinstance(content, bytes):
+            raise TypeError('Content must be bytes.')
 
         ctx = get_gpgme_context()
 
         # setup containers
-        plain = StringIO(content)
-        cipher = StringIO()
+        plain = BytesIO(content)
+        cipher = BytesIO()
 
         if key.key is None:
             return None
@@ -408,8 +408,8 @@ class GPGHandler:
 
     def signContent(self, content, key, password='', mode=None):
         """See IGPGHandler."""
-        if not isinstance(content, str):
-            raise TypeError('Content should be a string.')
+        if not isinstance(content, bytes):
+            raise TypeError('Content must be bytes.')
 
         if mode is None:
             mode = gpgme.SIG_MODE_CLEAR
@@ -420,8 +420,8 @@ class GPGHandler:
         context.signers = [removeSecurityProxy(key.key)]
 
         # Set up containers.
-        plaintext = StringIO(content)
-        signature = StringIO()
+        plaintext = BytesIO(content)
+        signature = BytesIO()
 
         # Make sure that gpg-agent doesn't interfere.
         if 'GPG_AGENT_INFO' in os.environ:
@@ -449,7 +449,7 @@ class GPGHandler:
         # XXX michaeln 2010-05-07 bug=576405
         # Currently gpgme.Context().keylist fails if passed a unicode
         # string even though that's what is returned for fingerprints.
-        if type(filter) == unicode:
+        if isinstance(filter, six.text_type):
             filter = filter.encode('utf-8')
 
         with gpgme_timeline(
@@ -667,7 +667,7 @@ class PymeKey:
             return p.stdout.read()
 
         context = get_gpgme_context()
-        keydata = StringIO()
+        keydata = BytesIO()
         with gpgme_timeline("export", self.fingerprint):
             context.export(self.fingerprint.encode('ascii'), keydata)
 
diff --git a/lib/lp/testing/gpgkeys/__init__.py b/lib/lp/testing/gpgkeys/__init__.py
index d330c35..b2d4dca 100644
--- a/lib/lp/testing/gpgkeys/__init__.py
+++ b/lib/lp/testing/gpgkeys/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """OpenPGP keys used for testing.
@@ -19,12 +19,11 @@ decrypt data in pagetests.
 
 __metaclass__ = type
 
-from cStringIO import StringIO
+from io import BytesIO
 import os
 
 import gpgme
 import scandir
-import six
 from zope.component import getUtility
 
 from lp.registry.interfaces.gpg import IGPGKeySet
@@ -95,7 +94,7 @@ def import_secret_test_key(keyfile='test@xxxxxxxxxxxxxxxxx'):
     :param keyfile: The name of the file to be imported.
     """
     gpghandler = getUtility(IGPGHandler)
-    with open(os.path.join(gpgkeysdir, keyfile)) as f:
+    with open(os.path.join(gpgkeysdir, keyfile), 'rb') as f:
         seckey = f.read()
     return gpghandler.importSecretKey(seckey)
 
@@ -107,7 +106,7 @@ def test_pubkey_file_from_email(email_addr):
 
 def test_pubkey_from_email(email_addr):
     """Get the on disk content for a test pubkey by email address."""
-    with open(test_pubkey_file_from_email(email_addr)) as f:
+    with open(test_pubkey_file_from_email(email_addr), 'rb') as f:
         return f.read()
 
 
@@ -121,23 +120,23 @@ def test_keyrings():
 def decrypt_content(content, password):
     """Return the decrypted content or None if failed
 
-    content and password must be traditional strings. It's up to
-    the caller to encode or decode properly.
+    content must be a byte string, and password must be a native string.
+    It's up to the caller to encode or decode properly.
 
     :content: encrypted data content
-    :password: unicode password to unlock the secret key in question
+    :password: password to unlock the secret key in question
     """
-    if isinstance(password, six.text_type):
-        raise TypeError('Password cannot be Unicode.')
+    if not isinstance(password, str):
+        raise TypeError('Password must be a str.')
 
-    if isinstance(content, six.text_type):
-        raise TypeError('Content cannot be Unicode.')
+    if not isinstance(content, bytes):
+        raise TypeError('Content must be bytes.')
 
     ctx = get_gpgme_context()
 
     # setup containers
-    cipher = StringIO(content)
-    plain = StringIO()
+    cipher = BytesIO(content)
+    plain = BytesIO()
 
     def passphrase_cb(uid_hint, passphrase_info, prev_was_bad, fd):
         os.write(fd, '%s\n' % password)