launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #26127
[Merge] ~cjwatson/launchpad:py3-signed-message-from-bytes into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:py3-signed-message-from-bytes into launchpad:master.
Commit message:
lp.services.mail.signedmessage: Recast using bytes
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/397084
On Python 3, we generally want to deal with email messages as bytes. One fiddly bit of this is lp.services.mail.signedmessage.signed_message_from_string, which has the wrong API for the new world. Recast this as signed_message_from_bytes, and update all callers and associated code.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:py3-signed-message-from-bytes into launchpad:master.
diff --git a/lib/lp/answers/mail/handler.py b/lib/lp/answers/mail/handler.py
index 8c28968..f2084bf 100644
--- a/lib/lp/answers/mail/handler.py
+++ b/lib/lp/answers/mail/handler.py
@@ -45,7 +45,7 @@ class AnswerTrackerHandler:
messageset = getUtility(IMessageSet)
message = messageset.fromEmail(
- signed_msg.parsed_string,
+ signed_msg.parsed_bytes,
owner=getUtility(ILaunchBag).user,
filealias=filealias,
parsed_message=signed_msg)
diff --git a/lib/lp/answers/tests/emailinterface.txt b/lib/lp/answers/tests/emailinterface.txt
index f30e23e..462df34 100644
--- a/lib/lp/answers/tests/emailinterface.txt
+++ b/lib/lp/answers/tests/emailinterface.txt
@@ -31,7 +31,7 @@ AnswerTrackerHandler.
# Define a helper function to send email to the Answer Tracker handler.
>>> from lp.answers.mail.handler import AnswerTrackerHandler
>>> from email.utils import formatdate, make_msgid, mktime_tz
- >>> from lp.services.mail.signedmessage import signed_message_from_string
+ >>> from lp.services.mail.signedmessage import signed_message_from_bytes
>>> handler = AnswerTrackerHandler()
>>> def send_question_email(question_id, from_addr, subject, body):
... login(from_addr)
@@ -46,7 +46,7 @@ AnswerTrackerHandler.
... lines.append('')
... lines.append(body)
... raw_msg = '\n'.join(lines)
- ... msg = signed_message_from_string(raw_msg.encode('UTF-8'))
+ ... msg = signed_message_from_bytes(raw_msg.encode('UTF-8'))
... if handler.process(msg, msg['To']):
... # Ensures that the DB user has the correct permission to \
... # saves the changes.
@@ -62,12 +62,12 @@ configuration variable.)
All other email addresses are ignored:
- >>> raw_msg = """From: test@xxxxxxxxxxxxx
+ >>> raw_msg = b"""From: test@xxxxxxxxxxxxx
... To: foo@xxxxxxxxxxxxxxxxxxxxx
... Subject: Hello
...
... Hello there."""
- >>> msg = signed_message_from_string(raw_msg)
+ >>> msg = signed_message_from_bytes(raw_msg)
>>> handler.process(msg, msg['To'])
False
diff --git a/lib/lp/bugs/tests/bugs-emailinterface.txt b/lib/lp/bugs/tests/bugs-emailinterface.txt
index ccb90bc..498ad18 100644
--- a/lib/lp/bugs/tests/bugs-emailinterface.txt
+++ b/lib/lp/bugs/tests/bugs-emailinterface.txt
@@ -1820,8 +1820,8 @@ signing the mail:
>>> login(sampledata.USER_EMAIL)
>>> simulate_receiving_untrusted_mail()
- >>> from lp.services.mail.signedmessage import signed_message_from_string
- >>> msg = signed_message_from_string(submit_mail)
+ >>> from lp.services.mail.signedmessage import signed_message_from_bytes
+ >>> msg = signed_message_from_bytes(submit_mail)
>>> import email.utils
>>> msg['Message-Id'] = email.utils.make_msgid()
>>> handler.process(
diff --git a/lib/lp/code/mail/codehandler.py b/lib/lp/code/mail/codehandler.py
index 80eceff..15f288e 100644
--- a/lib/lp/code/mail/codehandler.py
+++ b/lib/lp/code/mail/codehandler.py
@@ -306,7 +306,7 @@ class CodeHandler:
ensure_not_weakly_authenticated(mail, 'code review')
message = getUtility(IMessageSet).fromEmail(
- mail.parsed_string,
+ mail.parsed_bytes,
owner=getUtility(ILaunchBag).user,
filealias=file_alias,
parsed_message=mail)
diff --git a/lib/lp/code/model/codereviewcomment.py b/lib/lp/code/model/codereviewcomment.py
index 1029b4e..68b9d78 100644
--- a/lib/lp/code/model/codereviewcomment.py
+++ b/lib/lp/code/model/codereviewcomment.py
@@ -26,7 +26,7 @@ from lp.code.interfaces.codereviewcomment import (
)
from lp.services.database.enumcol import DBEnum
from lp.services.database.stormbase import StormBase
-from lp.services.mail.signedmessage import signed_message_from_string
+from lp.services.mail.signedmessage import signed_message_from_bytes
def quote_text_as_email(text, width=80):
@@ -135,7 +135,7 @@ class CodeReviewComment(StormBase):
"""See `ICodeReviewComment`."""
if self.message.raw is None:
return None
- return signed_message_from_string(self.message.raw.read())
+ return signed_message_from_bytes(self.message.raw.read())
@property
def visible(self):
diff --git a/lib/lp/registry/model/distroseries.py b/lib/lp/registry/model/distroseries.py
index ae88622..3242c33 100644
--- a/lib/lp/registry/model/distroseries.py
+++ b/lib/lp/registry/model/distroseries.py
@@ -116,7 +116,7 @@ from lp.services.librarian.model import (
LibraryFileAlias,
LibraryFileContent,
)
-from lp.services.mail.signedmessage import signed_message_from_string
+from lp.services.mail.signedmessage import signed_message_from_bytes
from lp.services.propertycache import (
cachedproperty,
get_property_cache,
@@ -1316,7 +1316,7 @@ class DistroSeries(SQLBase, BugTargetBase, HasSpecificationsMixin,
# The PGP signature is stripped from all changesfiles
# to avoid replay attacks (see bugs 159304 and 451396).
- signed_message = signed_message_from_string(changesfilecontent)
+ signed_message = signed_message_from_bytes(changesfilecontent)
if signed_message is not None:
# Overwrite `changesfilecontent` with the text stripped
# of the PGP signature.
diff --git a/lib/lp/services/mail/doc/emailauthentication.txt b/lib/lp/services/mail/doc/emailauthentication.txt
index e875e8c..fc37e8a 100644
--- a/lib/lp/services/mail/doc/emailauthentication.txt
+++ b/lib/lp/services/mail/doc/emailauthentication.txt
@@ -108,12 +108,16 @@ be canonicalised to \r\n. In order to ensure that the line endings in
signed_canonicalised.txt are not already '\r\n', we recreate the test
message.
+ >>> from lp.services.compat import (
+ ... message_as_bytes,
+ ... message_from_bytes,
+ ... )
>>> from lp.services.mail.signedmessage import SignedMessage
>>> msg = read_test_message('signed_canonicalised.txt')
- >>> msg_lines = msg.as_string().splitlines()
- >>> msg = email.message_from_string(
- ... '\n'.join(msg_lines), _class=SignedMessage)
- >>> msg.parsed_string = msg.as_string()
+ >>> msg_lines = message_as_bytes(msg).splitlines()
+ >>> msg = message_from_bytes(
+ ... b'\n'.join(msg_lines), _class=SignedMessage)
+ >>> msg.parsed_bytes = message_as_bytes(msg)
>>> from lp.services.gpg.interfaces import IGPGHandler
>>> getUtility(IGPGHandler).getVerifiedSignature(
@@ -129,10 +133,10 @@ message.
authenticateEmail() doesn't have any problems verifying the signature:
>>> from lp.registry.interfaces.person import IPerson
- >>> for line_ending in '\n', '\r\n':
- ... msg = email.message_from_string(
+ >>> for line_ending in b'\n', b'\r\n':
+ ... msg = message_from_bytes(
... line_ending.join(msg_lines), _class=SignedMessage)
- ... msg.parsed_string = msg.as_string()
+ ... msg.parsed_bytes = message_as_bytes(msg)
... principal = authenticateEmail(msg, accept_any_timestamp)
... authenticated_person = IPerson(principal)
... print(authenticated_person.preferredemail.email)
@@ -147,7 +151,7 @@ starts failing, Python is probably fixed, so the manual boundary parsing
hack can be removed.
>>> msg = read_test_message('signed_folded_header.txt')
- >>> print(msg.parsed_string) #doctest: -NORMALIZE_WHITESPACE
+ >>> print(six.ensure_str(msg.parsed_bytes)) #doctest: -NORMALIZE_WHITESPACE
Date:...
...
Content-Type: multipart/mixed;
diff --git a/lib/lp/services/mail/doc/signedmessage.txt b/lib/lp/services/mail/doc/signedmessage.txt
index f80d1ef..6234367 100644
--- a/lib/lp/services/mail/doc/signedmessage.txt
+++ b/lib/lp/services/mail/doc/signedmessage.txt
@@ -1,20 +1,20 @@
SignedMessage extends email.message.Message in order to provide easy
access to signed content and the signature of messages.
-You can create it from a string using signed_message_from_string. It
-basically the same as using email.message_from_string and passing
+You can create it from a byte string using signed_message_from_bytes. It
+basically the same as using email.message_from_bytes and passing
SignedMessage as the _class parameter, but it also ensures that all
the attributes are correctly set.
>>> from lp.services.mail.interfaces import ISignedMessage
- >>> from lp.services.mail.signedmessage import signed_message_from_string
+ >>> from lp.services.mail.signedmessage import signed_message_from_bytes
>>> from lp.testing import verifyObject
- >>> msg = signed_message_from_string('To: someone\n\nHello.')
+ >>> msg = signed_message_from_bytes(b'To: someone\n\nHello.')
>>> verifyObject(ISignedMessage, msg)
True
>>> msg['To']
'someone'
- >>> print(msg.parsed_string)
+ >>> print(six.ensure_text(msg.parsed_bytes))
To: someone
<BLANKLINE>
Hello.
@@ -32,9 +32,11 @@ You can access the headers of the message:
>>> print(msg['From'])
Sample Person <test@xxxxxxxxxxxxx>
-The raw text that was signed is available as msg.signedContent:
+The raw byte string that was signed is available as msg.signedContent:
- >>> print(msg.signedContent)
+ >>> isinstance(msg.signedContent, bytes)
+ True
+ >>> print(six.ensure_text(msg.signedContent))
Some signed content.
<BLANKLINE>
With multiple paragraphs.
@@ -50,7 +52,9 @@ object as well:
Finally the signature can be accessed via msg.signature:
- >>> print(msg.signature)
+ >>> isinstance(msg.signature, bytes)
+ True
+ >>> print(six.ensure_text(msg.signature))
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.2.5 (GNU/Linux)
<BLANKLINE>
@@ -72,7 +76,7 @@ unescaped.
Sample Person
...
- >>> print(msg.signedContent)
+ >>> print(six.ensure_text(msg.signedContent))
Some signed content.
<BLANKLINE>
--
@@ -86,7 +90,7 @@ contains of two MIME parts, the signed text, and the signature:
The signed content includes the MIME headers as well:
- >>> print(msg.signedContent)
+ >>> print(six.ensure_text(msg.signedContent))
Content-Type: text/plain; charset=us-ascii
Content-Disposition: inline
<BLANKLINE>
@@ -103,7 +107,7 @@ separately:
And of course the signature is accessible as well:
- >>> print(msg.signature)
+ >>> print(six.ensure_text(msg.signature))
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.2.5 (GNU/Linux)
<BLANKLINE>
@@ -133,7 +137,7 @@ It handles signed multipart messages as well:
A signed attachment.
<BLANKLINE>
- >>> print(msg.signature)
+ >>> print(six.ensure_text(msg.signature))
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.2.5 (GNU/Linux)
<BLANKLINE>
@@ -143,7 +147,8 @@ It handles signed multipart messages as well:
-----END PGP SIGNATURE-----
>>> msg = read_test_message('signed_folded_header.txt')
- >>> print(msg.signedContent) #doctest: -NORMALIZE_WHITESPACE
+ >>> print(six.ensure_text(msg.signedContent))
+ ... #doctest: -NORMALIZE_WHITESPACE
Content-Type: multipart/mixed;
boundary="--------------------EuxKj2iCbKjpUGkD"
...
diff --git a/lib/lp/services/mail/incoming.py b/lib/lp/services/mail/incoming.py
index 6c29e20..7bbe006 100644
--- a/lib/lp/services/mail/incoming.py
+++ b/lib/lp/services/mail/incoming.py
@@ -16,6 +16,7 @@ import sys
import dkim
import dns.exception
+import six
import transaction
from zope.component import getUtility
from zope.interface import (
@@ -49,7 +50,7 @@ from lp.services.mail.interfaces import IWeaklyAuthenticatedPrincipal
from lp.services.mail.mailbox import IMailBox
from lp.services.mail.notification import send_process_error_notification
from lp.services.mail.sendmail import do_paranoid_envelope_to_validation
-from lp.services.mail.signedmessage import signed_message_from_string
+from lp.services.mail.signedmessage import signed_message_from_bytes
from lp.services.webapp import canonical_url
from lp.services.webapp.errorlog import (
ErrorReportingUtility,
@@ -66,33 +67,42 @@ from lp.services.webapp.interfaces import (
# Match '\n' and '\r' line endings. That is, all '\r' that are not
# followed by a '\n', and all '\n' that are not preceded by a '\r'.
-non_canonicalised_line_endings = re.compile('((?<!\r)\n)|(\r(?!\n))')
+non_canonicalised_line_endings = re.compile(br'((?<!\r)\n)|(\r(?!\n))')
# Match trailing whitespace.
-trailing_whitespace = re.compile(r'[ \t]*((?=\r\n)|$)')
+trailing_whitespace = re.compile(br'[ \t]*((?=\r\n)|$)')
# this is a hard limit on the size of email we will be willing to store in
# the database.
MAX_EMAIL_SIZE = 10 * 1024 * 1024
-def canonicalise_line_endings(text):
+def canonicalise_line_endings(buf):
r"""Canonicalise the line endings to '\r\n'.
- >>> canonicalise_line_endings('\n\nfoo\nbar\rbaz\r\n')
+ >>> b = canonicalise_line_endings(b'\n\nfoo\nbar\rbaz\r\n')
+ >>> isinstance(b, bytes)
+ True
+ >>> six.ensure_str(b)
'\r\n\r\nfoo\r\nbar\r\nbaz\r\n'
- >>> canonicalise_line_endings('\r\rfoo\r\nbar\rbaz\n')
+ >>> b = canonicalise_line_endings(b'\r\rfoo\r\nbar\rbaz\n')
+ >>> isinstance(b, bytes)
+ True
+ >>> six.ensure_str(b)
'\r\n\r\nfoo\r\nbar\r\nbaz\r\n'
- >>> canonicalise_line_endings('\r\nfoo\r\nbar\nbaz\r')
+ >>> b = canonicalise_line_endings(b'\r\nfoo\r\nbar\nbaz\r')
+ >>> isinstance(b, bytes)
+ True
+ >>> six.ensure_str(b)
'\r\nfoo\r\nbar\r\nbaz\r\n'
"""
- if non_canonicalised_line_endings.search(text):
- text = non_canonicalised_line_endings.sub('\r\n', text)
- if trailing_whitespace.search(text):
- text = trailing_whitespace.sub('', text)
- return text
+ if non_canonicalised_line_endings.search(buf):
+ buf = non_canonicalised_line_endings.sub(b'\r\n', buf)
+ if trailing_whitespace.search(buf):
+ buf = trailing_whitespace.sub(b'', buf)
+ return buf
class InvalidSignature(Exception):
@@ -144,7 +154,7 @@ def _verifyDkimOrigin(signed_message):
dkim_checker = None
dkim_result = False
try:
- dkim_checker = dkim.DKIM(signed_message.parsed_string, logger=dkim_log)
+ dkim_checker = dkim.DKIM(signed_message.parsed_bytes, logger=dkim_log)
dkim_result = dkim_checker.verify()
except dkim.DKIMException as e:
log.info('DKIM error: %r' % (e,))
@@ -168,7 +178,7 @@ def _verifyDkimOrigin(signed_message):
return None
# in addition to the dkim signature being valid, we have to check that it
# was actually signed by the user's domain.
- signing_domain = dkim_checker.domain
+ signing_domain = six.ensure_str(dkim_checker.domain)
if not _isDkimDomainTrusted(signing_domain):
log.info("valid DKIM signature from untrusted domain %s"
% (signing_domain,))
@@ -426,7 +436,7 @@ def handleMail(trans=transaction, signature_timestamp_checker=None):
log.exception('Upload to Librarian failed')
continue
try:
- mail = signed_message_from_string(raw_mail)
+ mail = signed_message_from_bytes(raw_mail)
except email.errors.MessageError:
# If we can't parse the message, we can't send a reply back to
# the user, but logging an exception will let us investigate.
diff --git a/lib/lp/services/mail/interfaces.py b/lib/lp/services/mail/interfaces.py
index cca09df..41818a4 100644
--- a/lib/lp/services/mail/interfaces.py
+++ b/lib/lp/services/mail/interfaces.py
@@ -25,8 +25,8 @@ from zope.interface import (
Interface,
)
from zope.schema import (
- ASCII,
Bool,
+ Bytes,
)
from lp import _
@@ -52,15 +52,15 @@ class ISignedMessage(Interface):
signedMessage = Attribute("The part that was signed, represented "
"as an email.message.Message.")
- signedContent = ASCII(title=_("Signed Content"),
- description=_("The text that was signed."))
+ signedContent = Bytes(title=_("Signed Content"),
+ description=_("The byte string that was signed."))
- signature = ASCII(title=_("Signature"),
+ signature = Bytes(title=_("Signature"),
description=_("The OpenPGP signature used to sign "
"the message."))
- parsed_string = Attribute(
- "The string that was parsed to create the SignedMessage.")
+ parsed_bytes = Attribute(
+ "The byte string that was parsed to create the SignedMessage.")
class IMailHandler(Interface):
diff --git a/lib/lp/services/mail/signedmessage.py b/lib/lp/services/mail/signedmessage.py
index e549bc6..02eeb90 100644
--- a/lib/lp/services/mail/signedmessage.py
+++ b/lib/lp/services/mail/signedmessage.py
@@ -7,25 +7,25 @@ __metaclass__ = type
__all__ = [
'SignedMessage',
- 'signed_message_from_string',
+ 'signed_message_from_bytes',
'strip_pgp_signature',
]
-import email
from email.message import Message
import re
from zope.interface import implementer
+from lp.services.compat import message_from_bytes
from lp.services.mail.interfaces import ISignedMessage
clearsigned_re = re.compile(
- r'-----BEGIN PGP SIGNED MESSAGE-----'
- '.*?(?:\r\n|\n)(?:\r\n|\n)(.*)(?:\r\n|\n)'
- '(-----BEGIN PGP SIGNATURE-----'
- '.*'
- '-----END PGP SIGNATURE-----)',
+ br'-----BEGIN PGP SIGNED MESSAGE-----'
+ br'.*?(?:\r\n|\n)(?:\r\n|\n)(.*)(?:\r\n|\n)'
+ br'(-----BEGIN PGP SIGNATURE-----'
+ br'.*'
+ br'-----END PGP SIGNATURE-----)',
re.DOTALL)
# Regexp for matching the signed content in multipart messages.
@@ -33,17 +33,17 @@ multipart_signed_content = (
r'%(boundary)s\n(?P<signed_content>.*?)\n%(boundary)s\n.*?\n%(boundary)s')
# Lines that start with '-' are escaped with '- '.
-dash_escaped = re.compile('^- ', re.MULTILINE)
+dash_escaped = re.compile(b'^- ', re.MULTILINE)
-def signed_message_from_string(string):
- """Parse the string and return a SignedMessage.
+def signed_message_from_bytes(buf):
+ """Parse the byte string and return a SignedMessage.
It makes sure that the SignedMessage instance has access to the
- parsed string.
+ parsed bytes.
"""
- msg = email.message_from_string(string, _class=SignedMessage)
- msg.parsed_string = string
+ msg = message_from_bytes(buf, _class=SignedMessage)
+ msg.parsed_bytes = buf
return msg
@@ -51,19 +51,19 @@ def signed_message_from_string(string):
class SignedMessage(Message):
"""Provides easy access to signed content and the signature"""
- parsed_string = None
+ parsed_bytes = None
def _getSignatureAndSignedContent(self):
"""Returns the PGP signature and the content that's signed.
- The signature is returned as a string, and the content is
- returned as a string.
+ The signature is returned as a byte string, and the content is
+ returned as a byte string.
If the message isn't signed, both signature and the content is
None.
"""
- assert self.parsed_string is not None, (
- 'Use signed_message_from_string() to create the message.')
+ assert self.parsed_bytes is not None, (
+ 'Use signed_message_from_bytes() to create the message.')
signed_content = signature = None
# Check for MIME/PGP signed message first.
# See: RFC3156 - MIME Security with OpenPGP
@@ -76,17 +76,16 @@ class SignedMessage(Message):
content_part, signature_part = payload
sig_content_type = signature_part.get_content_type()
if sig_content_type == 'application/pgp-signature':
- # We need to extract the signed content from the
- # parsed string, since content_part.as_string()
- # isn't guarenteed to return the exact string it was
- # created from.
+ # We need to extract the signed content from the parsed
+ # bytes, since content_part.as_bytes() isn't guaranteed
+ # to return the exact byte string it was created from.
boundary = '--' + self.get_boundary()
match = re.search(
- multipart_signed_content % {
- 'boundary': re.escape(boundary)},
- self.parsed_string, re.DOTALL)
+ (multipart_signed_content % {
+ 'boundary': re.escape(boundary)}).encode('ASCII'),
+ self.parsed_bytes, re.DOTALL)
signed_content = match.group('signed_content')
- signature = signature_part.get_payload()
+ signature = signature_part.get_payload(decode=True)
return signature, signed_content
# If we still have no signature, then we have one of several cases:
# 1) We do not have a multipart message
@@ -108,11 +107,11 @@ class SignedMessage(Message):
for part in self.walk():
if part.is_multipart():
continue
- match = clearsigned_re.search(part.get_payload())
+ match = clearsigned_re.search(part.get_payload(decode=True))
if match is not None:
signed_content_unescaped = match.group(1)
signed_content = dash_escaped.sub(
- '', signed_content_unescaped)
+ b'', signed_content_unescaped)
signature = match.group(2)
return signature, signed_content
# Stop processing after the first non-multipart part.
@@ -130,15 +129,15 @@ class SignedMessage(Message):
return None
else:
if (not self.is_multipart() and
- clearsigned_re.search(self.get_payload())):
+ clearsigned_re.search(self.get_payload(decode=True))):
# Add a new line so that a message with no headers will
# be created.
- signed_content = '\n' + signed_content
- return signed_message_from_string(signed_content)
+ signed_content = b'\n' + signed_content
+ return signed_message_from_bytes(signed_content)
@property
def signedContent(self):
- """Returns the PGP signed content as a string.
+ """Returns the PGP signed content as a byte string.
Returns None if the message wasn't signed.
"""
@@ -157,14 +156,14 @@ class SignedMessage(Message):
@property
def raw_length(self):
"""Return the length in bytes of the underlying raw form."""
- return len(self.parsed_string)
+ return len(self.parsed_bytes)
-def strip_pgp_signature(text):
- """Strip any PGP signature from the supplied text."""
- signed_message = signed_message_from_string(text)
+def strip_pgp_signature(buf):
+ """Strip any PGP signature from the supplied byte string."""
+ signed_message = signed_message_from_bytes(buf)
# For unsigned text the signedContent will be None.
if signed_message.signedContent is not None:
return signed_message.signedContent
else:
- return text
+ return buf
diff --git a/lib/lp/services/mail/tests/helpers.py b/lib/lp/services/mail/tests/helpers.py
index 600794f..42b8a4a 100644
--- a/lib/lp/services/mail/tests/helpers.py
+++ b/lib/lp/services/mail/tests/helpers.py
@@ -9,7 +9,7 @@ __metaclass__ = type
import os.path
-from lp.services.mail.signedmessage import signed_message_from_string
+from lp.services.mail.signedmessage import signed_message_from_bytes
testmails_path = os.path.join(os.path.dirname(__file__), 'emails')
@@ -20,6 +20,6 @@ def read_test_message(filename):
The test messages are located in lp/services/mail/tests/emails
"""
- with open(os.path.join(testmails_path, filename)) as f:
+ with open(os.path.join(testmails_path, filename), 'rb') as f:
message_string = f.read()
- return signed_message_from_string(message_string)
+ return signed_message_from_bytes(message_string)
diff --git a/lib/lp/services/mail/tests/test_dkim.py b/lib/lp/services/mail/tests/test_dkim.py
index 26d3d33..2adb272 100644
--- a/lib/lp/services/mail/tests/test_dkim.py
+++ b/lib/lp/services/mail/tests/test_dkim.py
@@ -20,13 +20,13 @@ from lp.services.mail.incoming import (
InactiveAccount,
)
from lp.services.mail.interfaces import IWeaklyAuthenticatedPrincipal
-from lp.services.mail.signedmessage import signed_message_from_string
+from lp.services.mail.signedmessage import signed_message_from_bytes
from lp.testing import TestCaseWithFactory
from lp.testing.layers import DatabaseFunctionalLayer
# sample private key made with 'openssl genrsa' and public key using 'openssl
# rsa -pubout'. Not really the key for canonical.com ;-)
-sample_privkey = """\
+sample_privkey = b"""\
-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQC7ozYozzZuLYQi2DXSMtI3wWzWd7tAJfg+zwbOcNS4Aib6lo3R
y6ansi+fOhHSwgeOrkBGKzgHi2T8iDPzpUFhAZuOFsQaVY6yHzhXwPFi/nKYtFxU
@@ -44,7 +44,7 @@ fTl3LEI1X53HlyAUuQJALHmpSB2Qq0DqmgcAGXkDsh3GRfMv91a7u99VDT/fe+J0
-----END RSA PRIVATE KEY-----
"""
-sample_pubkey = """\
+sample_pubkey = b"""\
-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC7ozYozzZuLYQi2DXSMtI3wWzW
d7tAJfg+zwbOcNS4Aib6lo3Ry6ansi+fOhHSwgeOrkBGKzgHi2T8iDPzpUFhAZuO
@@ -53,7 +53,7 @@ FsQaVY6yHzhXwPFi/nKYtFxUX0DE4/GxkmNDgBOPqIpyEUQJvf5+byvb5mI85AS0
-----END PUBLIC KEY-----
"""
-sample_dns = """\
+sample_dns = b"""\
k=rsa; \
p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC7ozYozzZuLYQi2DXSMtI3wWzW\
d7tAJfg+zwbOcNS4Aib6lo3Ry6ansi+fOhHSwgeOrkBGKzgHi2T8iDPzpUFhAZuO\
@@ -80,12 +80,12 @@ class TestDKIM(TestCaseWithFactory):
if canonicalize is None:
canonicalize = (dkim.Relaxed, dkim.Relaxed)
dkim_line = dkim.sign(plain_message,
- selector='example',
- domain='canonical.com',
+ selector=b'example',
+ domain=b'canonical.com',
privkey=sample_privkey,
logger=self.logger,
canonicalize=canonicalize)
- assert dkim_line[-1] == '\n'
+ assert dkim_line[-1:] == b'\n'
return dkim_line + plain_message
def monkeypatch_dns(self):
@@ -116,9 +116,9 @@ class TestDKIM(TestCaseWithFactory):
if response_type == 'valid':
key = sample_dns
elif response_type == 'broken':
- key = sample_dns.replace(';', '')
+ key = sample_dns.replace(b';', b'')
elif response_type == 'garbage':
- key = 'abcdefg'
+ key = b'abcdefg'
else:
raise ValueError(response_type)
self._dns_responses[u'example._domainkey.canonical.com.'] = key
@@ -129,22 +129,24 @@ class TestDKIM(TestCaseWithFactory):
def assertStronglyAuthenticated(self, principal, signed_message):
if IWeaklyAuthenticatedPrincipal.providedBy(principal):
self.fail('expected strong authentication; got weak:\n'
- + self.get_dkim_log() + '\n\n' + signed_message)
+ + self.get_dkim_log() + '\n\n'
+ + six.ensure_text(signed_message, errors='replace'))
def assertWeaklyAuthenticated(self, principal, signed_message):
if not IWeaklyAuthenticatedPrincipal.providedBy(principal):
self.fail('expected weak authentication; got strong:\n'
- + self.get_dkim_log() + '\n\n' + signed_message)
+ + self.get_dkim_log() + '\n\n'
+ + six.ensure_text(signed_message, errors='replace'))
def assertDkimLogContains(self, substring):
l = self.get_dkim_log()
if l.find(substring) == -1:
self.fail("didn't find %r in log: %s" % (substring, l))
- def makeMessageText(self, sender=None, from_address=None):
+ def makeMessageBytes(self, sender=None, from_address=None):
if from_address is None:
from_address = "Foo Bar <foo.bar@xxxxxxxxxxxxx>"
- text = ("From: " + from_address + "\n" + """\
+ buf = (b"From: " + six.ensure_binary(from_address) + b"\n" + b"""\
Date: Fri, 1 Apr 2010 00:00:00 +1000
Subject: yet another comment
To: 1@xxxxxxxxxxxxxxxxxxxxxxxxxx
@@ -153,8 +155,8 @@ To: 1@xxxxxxxxxxxxxxxxxxxxxxxxxx
Why isn't this fixed yet?""")
if sender is not None:
- text = "Sender: " + sender + "\n" + text
- return text
+ buf = b"Sender: " + six.ensure_binary(sender) + b"\n" + buf
+ return buf
def test_dkim_broken_pubkey(self):
"""Handle a subtly-broken pubkey like qq.com, see bug 881237.
@@ -162,10 +164,10 @@ Why isn't this fixed yet?""")
The message is not trusted but inbound message processing does not
abort either.
"""
- signed_message = self.fake_signing(self.makeMessageText())
+ signed_message = self.fake_signing(self.makeMessageBytes())
self.preload_dns_response('broken')
principal = authenticateEmail(
- signed_message_from_string(signed_message))
+ signed_message_from_bytes(signed_message))
self.assertWeaklyAuthenticated(principal, signed_message)
self.assertEqual(principal.person.preferredemail.email,
'foo.bar@xxxxxxxxxxxxx')
@@ -173,17 +175,17 @@ Why isn't this fixed yet?""")
"ERROR unknown algorithm in k= tag")
def test_dkim_garbage_pubkey(self):
- signed_message = self.fake_signing(self.makeMessageText())
+ signed_message = self.fake_signing(self.makeMessageBytes())
self.preload_dns_response('garbage')
principal = authenticateEmail(
- signed_message_from_string(signed_message))
+ signed_message_from_bytes(signed_message))
self.assertWeaklyAuthenticated(principal, signed_message)
self.assertEqual(principal.person.preferredemail.email,
'foo.bar@xxxxxxxxxxxxx')
# We seem to just get the public key as the error message, which
# isn't the most informative of errors, but this is buried inside
# dkimpy and there isn't much we can do about it.
- self.assertDkimLogContains("ERROR abcdefg")
+ self.assertDkimLogContains("ERROR %s" % b"abcdefg")
def test_dkim_disabled(self):
"""With disabling flag set, mail isn't trusted."""
@@ -194,27 +196,27 @@ Why isn't this fixed yet?""")
self.test_dkim_valid_strict)
def test_dkim_valid_strict(self):
- signed_message = self.fake_signing(self.makeMessageText(),
+ signed_message = self.fake_signing(self.makeMessageBytes(),
canonicalize=(dkim.Simple, dkim.Simple))
self.preload_dns_response()
principal = authenticateEmail(
- signed_message_from_string(signed_message))
+ signed_message_from_bytes(signed_message))
self.assertStronglyAuthenticated(principal, signed_message)
self.assertEqual(principal.person.preferredemail.email,
'foo.bar@xxxxxxxxxxxxx')
def test_dkim_valid(self):
- signed_message = self.fake_signing(self.makeMessageText())
+ signed_message = self.fake_signing(self.makeMessageBytes())
self.preload_dns_response()
principal = authenticateEmail(
- signed_message_from_string(signed_message))
+ signed_message_from_bytes(signed_message))
self.assertStronglyAuthenticated(principal, signed_message)
self.assertEqual(principal.person.preferredemail.email,
'foo.bar@xxxxxxxxxxxxx')
def test_dkim_untrusted_signer(self):
# Valid signature from an untrusted domain -> untrusted
- signed_message = self.fake_signing(self.makeMessageText())
+ signed_message = self.fake_signing(self.makeMessageBytes())
self.preload_dns_response()
saved_domains = incoming._trusted_dkim_domains[:]
@@ -224,7 +226,7 @@ Why isn't this fixed yet?""")
self.addCleanup(restore)
incoming._trusted_dkim_domains = []
principal = authenticateEmail(
- signed_message_from_string(signed_message))
+ signed_message_from_bytes(signed_message))
self.assertWeaklyAuthenticated(principal, signed_message)
self.assertEqual(principal.person.preferredemail.email,
'foo.bar@xxxxxxxxxxxxx')
@@ -234,12 +236,12 @@ Why isn't this fixed yet?""")
# that of the From-sender, if that domain is relaying the message.
# However, we shouldn't then trust the purported sender, because they
# might have just made it up rather than relayed it.
- tweaked_message = self.makeMessageText(
+ tweaked_message = self.makeMessageBytes(
from_address='steve.alexander@xxxxxxxxxxxxxxx')
signed_message = self.fake_signing(tweaked_message)
self.preload_dns_response()
principal = authenticateEmail(
- signed_message_from_string(signed_message))
+ signed_message_from_bytes(signed_message))
self.assertWeaklyAuthenticated(principal, signed_message)
# should come from From, not the dkim signature
self.assertEqual(principal.person.preferredemail.email,
@@ -250,13 +252,13 @@ Why isn't this fixed yet?""")
# We still treat this as weakly authenticated by the purported
# From-header sender, though perhaps in future we would prefer
# to reject these messages.
- signed_message = self.fake_signing(self.makeMessageText())
+ signed_message = self.fake_signing(self.makeMessageBytes())
self.preload_dns_response()
fiddled_message = signed_message.replace(
- 'From: Foo Bar <foo.bar@xxxxxxxxxxxxx>',
- 'From: Carlos <carlos@xxxxxxxxxxxxx>')
+ b'From: Foo Bar <foo.bar@xxxxxxxxxxxxx>',
+ b'From: Carlos <carlos@xxxxxxxxxxxxx>')
principal = authenticateEmail(
- signed_message_from_string(fiddled_message))
+ signed_message_from_bytes(fiddled_message))
self.assertWeaklyAuthenticated(principal, fiddled_message)
# should come from From, not the dkim signature
self.assertEqual(principal.person.preferredemail.email,
@@ -264,13 +266,13 @@ Why isn't this fixed yet?""")
def test_dkim_changed_from_realname(self):
# If the real name part of the message has changed, it's detected.
- signed_message = self.fake_signing(self.makeMessageText())
+ signed_message = self.fake_signing(self.makeMessageBytes())
self.preload_dns_response()
fiddled_message = signed_message.replace(
- 'From: Foo Bar <foo.bar@xxxxxxxxxxxxx>',
- 'From: Evil Foo <foo.bar@xxxxxxxxxxxxx>')
+ b'From: Foo Bar <foo.bar@xxxxxxxxxxxxx>',
+ b'From: Evil Foo <foo.bar@xxxxxxxxxxxxx>')
principal = authenticateEmail(
- signed_message_from_string(fiddled_message))
+ signed_message_from_bytes(fiddled_message))
# We don't care about the real name for determining the principal.
self.assertWeaklyAuthenticated(principal, fiddled_message)
self.assertEqual(principal.person.preferredemail.email,
@@ -279,9 +281,9 @@ Why isn't this fixed yet?""")
def test_dkim_nxdomain(self):
# If there's no DNS entry for the pubkey it should be handled
# decently.
- signed_message = self.fake_signing(self.makeMessageText())
+ signed_message = self.fake_signing(self.makeMessageBytes())
principal = authenticateEmail(
- signed_message_from_string(signed_message))
+ signed_message_from_bytes(signed_message))
self.assertWeaklyAuthenticated(principal, signed_message)
self.assertEqual(principal.person.preferredemail.email,
'foo.bar@xxxxxxxxxxxxx')
@@ -291,8 +293,8 @@ Why isn't this fixed yet?""")
# treated as weakly authenticated.
# The library doesn't log anything if there's no header at all.
principal = authenticateEmail(
- signed_message_from_string(self.makeMessageText()))
- self.assertWeaklyAuthenticated(principal, self.makeMessageText())
+ signed_message_from_bytes(self.makeMessageBytes()))
+ self.assertWeaklyAuthenticated(principal, self.makeMessageBytes())
self.assertEqual(principal.person.preferredemail.email,
'foo.bar@xxxxxxxxxxxxx')
@@ -300,11 +302,11 @@ Why isn't this fixed yet?""")
# The message has a syntactically valid DKIM signature that
# doesn't actually correspond to what was signed. We log
# something about this but we don't want to drop the message.
- signed_message = self.fake_signing(self.makeMessageText())
- signed_message += 'blah blah'
+ signed_message = self.fake_signing(self.makeMessageBytes())
+ signed_message += b'blah blah'
self.preload_dns_response()
principal = authenticateEmail(
- signed_message_from_string(signed_message))
+ signed_message_from_bytes(signed_message))
self.assertWeaklyAuthenticated(principal, signed_message)
self.assertEqual(principal.person.preferredemail.email,
'foo.bar@xxxxxxxxxxxxx')
@@ -324,12 +326,12 @@ Why isn't this fixed yet?""")
person=person,
address='dkimtest@xxxxxxxxxxx')
self.preload_dns_response()
- tweaked_message = self.makeMessageText(
+ tweaked_message = self.makeMessageBytes(
sender="dkimtest@xxxxxxxxxxxxx",
from_address="DKIM Test <dkimtest@xxxxxxxxxxx>")
signed_message = self.fake_signing(tweaked_message)
principal = authenticateEmail(
- signed_message_from_string(signed_message))
+ signed_message_from_bytes(signed_message))
self.assertStronglyAuthenticated(principal, signed_message)
def test_dkim_signed_but_from_unknown_address(self):
@@ -342,12 +344,12 @@ Why isn't this fixed yet?""")
name='dkimtest',
displayname='DKIM Test')
self.preload_dns_response()
- tweaked_message = self.makeMessageText(
+ tweaked_message = self.makeMessageBytes(
sender="dkimtest@xxxxxxxxxxxxx",
from_address="DKIM Test <dkimtest@xxxxxxxxxxx>")
signed_message = self.fake_signing(tweaked_message)
principal = authenticateEmail(
- signed_message_from_string(signed_message))
+ signed_message_from_bytes(signed_message))
self.assertEqual(principal.person.preferredemail.email,
'dkimtest@xxxxxxxxxxx')
self.assertWeaklyAuthenticated(principal, signed_message)
@@ -369,12 +371,12 @@ Why isn't this fixed yet?""")
displayname='DKIM Test')
self.factory.makeEmail(sender_address, person, EmailAddressStatus.NEW)
self.preload_dns_response()
- tweaked_message = self.makeMessageText(
+ tweaked_message = self.makeMessageBytes(
sender=sender_address,
from_address="DKIM Test <dkimtest@xxxxxxxxxxx>")
signed_message = self.fake_signing(tweaked_message)
principal = authenticateEmail(
- signed_message_from_string(signed_message))
+ signed_message_from_bytes(signed_message))
self.assertEqual(principal.person.preferredemail.email,
from_address)
self.assertWeaklyAuthenticated(principal, signed_message)
@@ -397,11 +399,11 @@ Why isn't this fixed yet?""")
displayname='DKIM Test',
account_status=AccountStatus.NOACCOUNT)
self.preload_dns_response()
- message_text = self.makeMessageText(
+ message_text = self.makeMessageBytes(
sender=from_address,
from_address=from_address)
signed_message = self.fake_signing(message_text)
self.assertRaises(
InactiveAccount,
authenticateEmail,
- signed_message_from_string(signed_message))
+ signed_message_from_bytes(signed_message))
diff --git a/lib/lp/services/mail/tests/test_signedmessage.py b/lib/lp/services/mail/tests/test_signedmessage.py
index 140f629..05146ea 100644
--- a/lib/lp/services/mail/tests/test_signedmessage.py
+++ b/lib/lp/services/mail/tests/test_signedmessage.py
@@ -18,6 +18,7 @@ import gpgme
from zope.component import getUtility
from lp.registry.interfaces.person import IPersonSet
+from lp.services.compat import message_as_bytes
from lp.services.gpg.interfaces import IGPGHandler
from lp.services.mail.helpers import IncomingEmailError
from lp.services.mail.incoming import (
@@ -25,7 +26,7 @@ from lp.services.mail.incoming import (
canonicalise_line_endings,
)
from lp.services.mail.interfaces import IWeaklyAuthenticatedPrincipal
-from lp.services.mail.signedmessage import signed_message_from_string
+from lp.services.mail.signedmessage import signed_message_from_bytes
from lp.testing import TestCaseWithFactory
from lp.testing.factory import GPGSigningContext
from lp.testing.gpgkeys import (
@@ -54,7 +55,7 @@ class TestSignedMessage(TestCaseWithFactory):
# and generates a weakly authenticated principal.
sender = self.factory.makePerson()
email_message = self.factory.makeEmailMessage(sender=sender)
- msg = signed_message_from_string(email_message.as_string())
+ msg = signed_message_from_bytes(message_as_bytes(email_message))
self.assertIs(None, msg.signedContent)
self.assertIs(None, msg.signature)
principal = authenticateEmail(msg)
@@ -79,7 +80,7 @@ class TestSignedMessage(TestCaseWithFactory):
body=body, signing_context=signing_context)
getUtility(IGPGHandler).resetLocalState()
self.assertFalse(msg.is_multipart())
- return signed_message_from_string(msg.as_string())
+ return signed_message_from_bytes(message_as_bytes(msg))
def test_clearsigned_message_wrong_sender(self):
# If the message is signed, but the key doesn't belong to the sender,
@@ -137,7 +138,7 @@ class TestSignedMessage(TestCaseWithFactory):
key = import_secret_test_key()
gpghandler = getUtility(IGPGHandler)
signature = gpghandler.signContent(
- canonicalise_line_endings(body_text.as_string()),
+ canonicalise_line_endings(message_as_bytes(body_text)),
key, 'test', gpgme.SIG_MODE_DETACH)
gpghandler.resetLocalState()
@@ -146,7 +147,7 @@ class TestSignedMessage(TestCaseWithFactory):
attachment['Content-Type'] = 'application/pgp-signature'
msg.attach(attachment)
self.assertTrue(msg.is_multipart())
- return signed_message_from_string(msg.as_string())
+ return signed_message_from_bytes(message_as_bytes(msg))
def test_detached_signature_message_wrong_sender(self):
# If the message is signed, but the key doesn't belong to the sender,
@@ -180,7 +181,7 @@ class TestSignedMessage(TestCaseWithFactory):
sender = getUtility(IPersonSet).getByEmail('test@xxxxxxxxxxxxx')
sender.require_strong_email_authentication = True
email_message = self.factory.makeEmailMessage(sender=sender)
- msg = signed_message_from_string(email_message.as_string())
+ msg = signed_message_from_bytes(message_as_bytes(email_message))
error = self.assertRaises(IncomingEmailError, authenticateEmail, msg)
expected_message = (
"Launchpad only accepts signed email from your address.\n\n"
diff --git a/lib/lp/testing/factory.py b/lib/lp/testing/factory.py
index 7d60973..0bd8f83 100644
--- a/lib/lp/testing/factory.py
+++ b/lib/lp/testing/factory.py
@@ -240,6 +240,7 @@ from lp.registry.model.commercialsubscription import CommercialSubscription
from lp.registry.model.karma import KarmaTotalCache
from lp.registry.model.milestone import Milestone
from lp.registry.model.suitesourcepackage import SuiteSourcePackage
+from lp.services.compat import message_as_bytes
from lp.services.config import config
from lp.services.database.constants import (
DEFAULT,
@@ -2223,7 +2224,7 @@ class BareLaunchpadObjectFactory(ObjectFactory):
if force_transfer_encoding:
encode_base64(body_part)
body_part.set_charset(charset)
- mail.parsed_string = mail.as_string()
+ mail.parsed_bytes = message_as_bytes(mail)
return mail
def makeSpecification(self, product=None, title=None, distribution=None,
diff --git a/scripts/process-one-mail.py b/scripts/process-one-mail.py
index 95ae16f..3557d8d 100755
--- a/scripts/process-one-mail.py
+++ b/scripts/process-one-mail.py
@@ -12,7 +12,7 @@ import sys
from lp.services.config import config
from lp.services.mail.helpers import save_mail_to_librarian
from lp.services.mail.incoming import handle_one_mail
-from lp.services.mail.signedmessage import signed_message_from_string
+from lp.services.mail.signedmessage import signed_message_from_bytes
from lp.services.scripts.base import LaunchpadScript
@@ -31,6 +31,8 @@ class ProcessMail(LaunchpadScript):
# with handling a mailbox, which we're avoiding here.
if len(self.args) >= 1:
from_file = open(self.args[0], 'rb')
+ elif sys.version_info[0] >= 3:
+ from_file = sys.stdin.buffer
else:
from_file = sys.stdin
self.logger.debug("reading message from %r" % (from_file,))
@@ -40,7 +42,7 @@ class ProcessMail(LaunchpadScript):
self.logger.debug("got %d bytes" % len(raw_mail))
file_alias = save_mail_to_librarian(raw_mail)
self.logger.debug("saved to librarian as %r" % (file_alias,))
- parsed_mail = signed_message_from_string(raw_mail)
+ parsed_mail = signed_message_from_bytes(raw_mail)
# Kinda kludgey way to cause sendmail to just print it.
config.sendmail_to_stdout = True
handle_one_mail(