launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #00092
[Merge] lp:~jameinel/launchpad/correct-default-codebrowse into lp:launchpad
John A Meinel has proposed merging lp:~jameinel/launchpad/correct-default-codebrowse into lp:launchpad.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
In setting up a new Launchpad devel environment, I found that "make schema" created /var/tmp/bazaar.launchpad.dev but "make run_codebrowse" wanted to have "/var/tmp/codebrowse.launchpad.dev" created.
It seemed reasonable to set it to bazaar.launchpad.dev, since that is the URL you access it at.
--
https://code.launchpad.net/~jameinel/launchpad/correct-default-codebrowse/+merge/29895
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jameinel/launchpad/correct-default-codebrowse into lp:launchpad.
=== modified file 'configs/development/launchpad-lazr.conf'
--- configs/development/launchpad-lazr.conf 2010-07-01 06:31:55 +0000
+++ configs/development/launchpad-lazr.conf 2010-07-14 15:35:59 +0000
@@ -48,8 +48,8 @@
bugzilla-3.4.example.com.password: test
[codebrowse]
-cachepath: /var/tmp/codebrowse.launchpad.dev/cache
-log_folder: /var/tmp/codebrowse.launchpad.dev/logs
+cachepath: /var/tmp/bazaar.launchpad.dev/cache
+log_folder: /var/tmp/bazaar.launchpad.dev/logs
launchpad_root: https://code.launchpad.dev/
secret_path: configs/development/codebrowse-secret
=== modified file 'lib/canonical/launchpad/browser/oauth.py'
--- lib/canonical/launchpad/browser/oauth.py 2010-03-30 15:36:27 +0000
+++ lib/canonical/launchpad/browser/oauth.py 2010-07-14 15:35:59 +0000
@@ -245,15 +245,23 @@
token = consumer.getRequestToken(form.get('oauth_token'))
if token is None:
self.request.unauthorized(OAUTH_CHALLENGE)
- return u''
+ return u'No request token specified.'
if not check_oauth_signature(self.request, consumer, token):
- return u''
+ return u'Invalid OAuth signature.'
- if (not token.is_reviewed
- or token.permission == OAuthPermission.UNAUTHORIZED):
+ if not token.is_reviewed:
self.request.unauthorized(OAUTH_CHALLENGE)
- return u''
+ return u'Request token has not yet been reviewed. Try again later.'
+
+ if token.permission == OAuthPermission.UNAUTHORIZED:
+ # The end-user explicitly refused to authorize this
+ # token. We send 403 ("Forbidden") instead of 401
+ # ("Unauthorized") to distinguish this case and to
+ # indicate that, as RFC2616 says, "authorization will not
+ # help."
+ self.request.response.setStatus(403)
+ return u'End-user refused to authorize request token.'
access_token = token.createAccessToken()
context_name = None
=== modified file 'lib/canonical/launchpad/mail/incoming.py'
--- lib/canonical/launchpad/mail/incoming.py 2010-06-04 16:15:38 +0000
+++ lib/canonical/launchpad/mail/incoming.py 2010-07-14 15:35:59 +0000
@@ -7,13 +7,16 @@
__metaclass__ = type
-from logging import getLogger
from cStringIO import StringIO as cStringIO
from email.utils import getaddresses, parseaddr
import email.errors
+import logging
import re
import sys
+import dkim
+import dns.exception
+
import transaction
from zope.component import getUtility
from zope.interface import directlyProvides, directlyProvidedBy
@@ -65,6 +68,77 @@
"""The account for the person sending this email is inactive."""
+def extract_address_domain(address):
+ realname, email_address = email.utils.parseaddr(address)
+ return email_address.split('@')[1]
+
+
+_trusted_dkim_domains = [
+ 'gmail.com', 'google.com', 'mail.google.com', 'canonical.com']
+
+
+def _isDkimDomainTrusted(domain):
+ # Really this should come from a dynamically-modifiable
+ # configuration, but we don't have such a thing yet.
+ #
+ # Being listed here means that we trust the domain not to be an open relay
+ # or to allow arbitrary intra-domain spoofing.
+ return domain in _trusted_dkim_domains
+
+
+def _authenticateDkim(signed_message):
+ """"Attempt DKIM authentication of email; return True if known authentic
+
+ :param signed_message: ISignedMessage
+ """
+
+ log = logging.getLogger('mail-authenticate-dkim')
+ log.setLevel(logging.DEBUG)
+ # uncomment this for easier test debugging
+ # log.addHandler(logging.FileHandler('/tmp/dkim.log'))
+
+ dkim_log = cStringIO()
+ log.info('Attempting DKIM authentication of message %s from %s'
+ % (signed_message['Message-ID'], signed_message['From']))
+ signing_details = []
+ try:
+ # NB: if this fails with a keyword argument error, you need the
+ # python-dkim 0.3-3.2 that adds it
+ dkim_result = dkim.verify(
+ signed_message.parsed_string, dkim_log, details=signing_details)
+ except dkim.DKIMException, e:
+ log.warning('DKIM error: %r' % (e,))
+ dkim_result = False
+ except dns.exception.DNSException, e:
+ # many of them have lame messages, thus %r
+ log.warning('DNS exception: %r' % (e,))
+ dkim_result = False
+ else:
+ log.info('DKIM verification result=%s' % (dkim_result,))
+ log.debug('DKIM debug log: %s' % (dkim_log.getvalue(),))
+ if not dkim_result:
+ return False
+ # in addition to the dkim signature being valid, we have to check that it
+ # was actually signed by the user's domain.
+ if len(signing_details) != 1:
+ log.errors(
+ 'expected exactly one DKIM details record: %r'
+ % (signing_details,))
+ return False
+ signing_domain = signing_details[0]['d']
+ from_domain = extract_address_domain(signed_message['From'])
+ if signing_domain != from_domain:
+ log.warning("DKIM signing domain %s doesn't match From address %s; "
+ "disregarding signature"
+ % (signing_domain, from_domain))
+ return False
+ if not _isDkimDomainTrusted(signing_domain):
+ log.warning("valid DKIM signature from untrusted domain %s"
+ % (signing_domain,))
+ return False
+ return True
+
+
def authenticateEmail(mail):
"""Authenticates an email by verifying the PGP signature.
@@ -89,6 +163,17 @@
raise InactiveAccount(
"Mail from a user with an inactive account.")
+ dkim_result = _authenticateDkim(mail)
+
+ if dkim_result:
+ if mail.signature is not None:
+ log = logging.getLogger('process-mail')
+ log.info('message has gpg signature, therefore not treating DKIM '
+ 'success as conclusive')
+ else:
+ setupInteraction(principal, email_addr)
+ return principal
+
if signature is None:
# Mark the principal so that application code can check that the
# user was weakly authenticated.
@@ -184,7 +269,7 @@
mail['From'], 'Submit Request Failure', str(error), mail)
trans.commit()
- log = getLogger('process-mail')
+ log = logging.getLogger('process-mail')
mailbox = getUtility(IMailBox)
log.info("Opening the mail box.")
mailbox.open()
@@ -224,7 +309,7 @@
mail = signed_message_from_string(raw_mail)
except email.Errors.MessageError, error:
mailbox.delete(mail_id)
- log = getLogger('canonical.launchpad.mail')
+ log = logging.getLogger('canonical.launchpad.mail')
log.warn(
"Couldn't convert email to email.Message: %s" % (
file_alias_url, ),
@@ -329,7 +414,7 @@
# from being processed.
_handle_error(
"Unhandled exception", file_alias_url)
- log = getLogger('canonical.launchpad.mail')
+ log = logging.getLogger('canonical.launchpad.mail')
if file_alias_url is not None:
email_info = file_alias_url
else:
=== modified file 'lib/canonical/launchpad/pagetests/oauth/access-token.txt'
--- lib/canonical/launchpad/pagetests/oauth/access-token.txt 2009-10-22 13:02:12 +0000
+++ lib/canonical/launchpad/pagetests/oauth/access-token.txt 2010-07-14 15:35:59 +0000
@@ -74,9 +74,10 @@
... """ % urlencode(data2))
HTTP/1.1 401 Unauthorized
...
+ Request token has not yet been reviewed. Try again later.
-The same will happen if the signature is wrong or if the token's
-permission is UNAUTHORIZED.
+If the token is missing or the signature is wrong the response will
+also be 401.
>>> data2['oauth_signature'] = '&'.join(['foobar', token.secret])
>>> print http(r"""
@@ -85,6 +86,22 @@
... """ % urlencode(data2))
HTTP/1.1 401 Unauthorized
...
+ Invalid OAuth signature.
+
+ >>> data3 = data.copy()
+ >>> del(data3['oauth_token'])
+ >>> print http(r"""
+ ... GET /+access-token?%s HTTP/1.1
+ ... Host: launchpad.dev
+ ... """ % urlencode(data3))
+ HTTP/1.1 401 Unauthorized
+ ...
+ No request token specified.
+
+If the token's permission is set to UNAUTHORIZED, the response code is
+403 ("Forbidden"). This conveys that (to quote the HTTP RFC)
+"authorization will not help" and that the token can _never_ be
+exchanged for an access token.
>>> token.review(salgado, OAuthPermission.UNAUTHORIZED)
>>> data2['oauth_signature'] = '&'.join([consumer.secret, token.secret])
@@ -92,6 +109,7 @@
... GET /+access-token?%s HTTP/1.1
... Host: launchpad.dev
... """ % urlencode(data2))
- HTTP/1.1 401 Unauthorized
+ HTTP/1.1 403 Forbidden
...
+ End-user refused to authorize request token.
=== modified file 'lib/lp/bugs/mail/bugnotificationrecipients.py'
--- lib/lp/bugs/mail/bugnotificationrecipients.py 2010-06-16 08:20:23 +0000
+++ lib/lp/bugs/mail/bugnotificationrecipients.py 2010-07-14 15:35:59 +0000
@@ -73,7 +73,7 @@
reason = "You received this bug notification because you %s." % reason
self.add(person, reason, header)
- def addDupeSubscriber(self, person):
+ def addDupeSubscriber(self, person, duplicate_bug=None):
"""Registers a subscriber of a duplicate of this bug."""
reason = "Subscriber of Duplicate"
if person.isTeam():
@@ -82,6 +82,8 @@
reason += " @%s" % person.name
else:
text = "are a direct subscriber of a duplicate bug"
+ if duplicate_bug is not None:
+ text += " (%s)" % duplicate_bug.id
self._addReason(person, text, reason)
def addDirectSubscriber(self, person):
=== added file 'lib/lp/bugs/mail/tests/test_bug_duplicate_notifications.py'
--- lib/lp/bugs/mail/tests/test_bug_duplicate_notifications.py 1970-01-01 00:00:00 +0000
+++ lib/lp/bugs/mail/tests/test_bug_duplicate_notifications.py 2010-07-14 15:35:59 +0000
@@ -0,0 +1,69 @@
+# Copyright 2010 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for notification strings of duplicate Bugs."""
+
+from unittest import TestLoader
+
+import transaction
+
+from zope.component import getUtility
+from zope.interface import providedBy
+from zope.event import notify
+
+from canonical.testing import DatabaseFunctionalLayer
+from canonical.launchpad.database import BugNotification
+from canonical.launchpad.interfaces import BugTaskStatus
+from canonical.launchpad.webapp.interfaces import ILaunchBag
+
+from lp.services.mail import stub
+from lp.bugs.scripts.bugnotification import construct_email_notifications
+from lp.testing import TestCaseWithFactory
+
+from lazr.lifecycle.event import (ObjectModifiedEvent)
+from lazr.lifecycle.snapshot import Snapshot
+
+
+class TestAssignmentNotification(TestCaseWithFactory):
+ """Test emails sent when a bug is a duplicate of another."""
+
+ layer = DatabaseFunctionalLayer
+
+ def setUp(self):
+ # Run the tests as a logged-in user.
+ super(TestAssignmentNotification, self).setUp(
+ user='test@xxxxxxxxxxxxx')
+ self.user = getUtility(ILaunchBag).user
+ self.product = self.factory.makeProduct(owner=self.user,
+ name='project')
+ self.master_bug = self.factory.makeBug(product=self.product)
+ self.dup_bug = self.factory.makeBug(product=self.product)
+ self.master_bug_task = self.master_bug.getBugTask(self.product)
+ self.master_bug_task_before_modification = Snapshot(
+ self.master_bug_task,
+ providing=providedBy(self.master_bug_task))
+ self.person_subscribed_email = 'person@xxxxxxxxxxx'
+ self.person_subscribed = self.factory.makePerson(
+ name='subscribed', displayname='Person',
+ email=self.person_subscribed_email)
+ self.dup_bug.subscribe(self.person_subscribed, subscribed_by=self.user)
+ self.dup_bug.markAsDuplicate(self.master_bug)
+
+ def test_dup_subscriber_change_notification_message(self):
+ """Duplicate bug number in the reason (email footer) for
+ duplicate subscribers when a master bug is modified."""
+ self.assertEqual(len(stub.test_emails), 0, 'emails in queue')
+ self.master_bug_task.transitionToStatus(BugTaskStatus.CONFIRMED, self.user)
+ notify(ObjectModifiedEvent(
+ self.master_bug_task, self.master_bug_task_before_modification,
+ ['status'], user=self.user))
+ transaction.commit()
+ self.assertEqual(len(stub.test_emails), 2, 'email not sent')
+ rationale = 'duplicate bug (%i)' % self.dup_bug.id
+ msg = stub.test_emails[-1][2]
+ self.assertTrue(rationale in msg,
+ '%s not in\n%s\n' % (rationale, msg))
+
+
+def test_suite():
+ return TestLoader().loadTestsFromName(__name__)
=== modified file 'lib/lp/bugs/model/bug.py'
--- lib/lp/bugs/model/bug.py 2010-07-08 13:10:41 +0000
+++ lib/lp/bugs/model/bug.py 2010-07-14 15:35:59 +0000
@@ -673,12 +673,14 @@
if self.private:
return []
- dupe_subscribers = set(
- Person.select("""
- Person.id = BugSubscription.person AND
- BugSubscription.bug = Bug.id AND
- Bug.duplicateof = %d""" % self.id,
- clauseTables=["Bug", "BugSubscription"]))
+ dupe_details = dict(
+ Store.of(self).find(
+ (Person, Bug),
+ BugSubscription.person == Person.id,
+ BugSubscription.bug == Bug.id,
+ Bug.duplicateof == self.id))
+
+ dupe_subscribers = set([person for person in dupe_details.keys()])
# Direct and "also notified" subscribers take precedence over
# subscribers from dupes. Note that we don't supply recipients
@@ -688,7 +690,7 @@
if recipients is not None:
for subscriber in dupe_subscribers:
- recipients.addDupeSubscriber(subscriber)
+ recipients.addDupeSubscriber(subscriber, dupe_details[subscriber])
return sorted(
dupe_subscribers, key=operator.attrgetter("displayname"))
=== added file 'lib/lp/services/mail/tests/test_dkim.py'
--- lib/lp/services/mail/tests/test_dkim.py 1970-01-01 00:00:00 +0000
+++ lib/lp/services/mail/tests/test_dkim.py 2010-07-14 15:35:59 +0000
@@ -0,0 +1,257 @@
+# Copyright 2009-2010 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Test DKIM-signed messages"""
+
+__metaclass__ = type
+
+import logging
+import unittest
+
+from StringIO import StringIO
+
+import dkim
+import dns.resolver
+
+from zope.component import getUtility
+
+from canonical.launchpad.mail import (
+ incoming,
+ signed_message_from_string,
+ )
+from canonical.launchpad.mail.incoming import authenticateEmail
+from canonical.launchpad.interfaces.mail import IWeaklyAuthenticatedPrincipal
+from lp.testing import TestCaseWithFactory
+from canonical.testing.layers import DatabaseFunctionalLayer
+
+
+# sample private key made with 'openssl genrsa' and public key using 'openssl
+# rsa -pubout'. Not really the key for canonical.com ;-)
+sample_privkey = """\
+-----BEGIN RSA PRIVATE KEY-----
+MIIBOwIBAAJBANmBe10IgY+u7h3enWTukkqtUD5PR52Tb/mPfjC0QJTocVBq6Za/
+PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQJAYFUKsD+uMlcFu1D3YNaR
+EGYGXjJ6w32jYGJ/P072M3yWOq2S1dvDthI3nRT8MFjZ1wHDAYHrSpfDNJ3v2fvZ
+cQIhAPgRPmVYn+TGd59asiqG1SZqh+p+CRYHW7B8BsicG5t3AiEA4HYNOohlgWan
+8tKgqLJgUdPFbaHZO1nDyBgvV8hvWZUCIQDDdCq6hYKuKeYUy8w3j7cgJq3ih922
+2qNWwdJCfCWQbwIgTY0cBvQnNe0067WQIpj2pG7pkHZR6qqZ9SE+AjNTHX0CIQCI
+Mgq55Y9MCq5wqzy141rnxrJxTwK9ABo3IAFMWEov3g==
+-----END RSA PRIVATE KEY-----
+"""
+
+sample_pubkey = """\
+-----BEGIN PUBLIC KEY-----
+MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T
+b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ==
+-----END PUBLIC KEY-----
+"""
+
+sample_dns = """\
+k=rsa; \
+p=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANmBe10IgY+u7h3enWTukkqtUD5PR52T\
+b/mPfjC0QJTocVBq6Za/PlzfV+Py92VaCak19F4WrbVTK5Gg5tW220MCAwEAAQ=="""
+
+
+plain_content = """\
+From: Foo Bar <foo.bar@xxxxxxxxxxxxx>
+Date: Fri, 1 Apr 2010 00:00:00 +1000
+Subject: yet another comment
+To: 1@xxxxxxxxxxxxxxxxxxxxxxxxxx
+
+ importance critical
+
+Why isn't this fixed yet?"""
+
+
+class TestDKIM(TestCaseWithFactory):
+ """Messages can be strongly authenticated by DKIM."""
+
+ layer = DatabaseFunctionalLayer
+
+ def setUp(self):
+ # Login with admin roles as we aren't testing access here.
+ TestCaseWithFactory.setUp(self, 'admin@xxxxxxxxxxxxx')
+ self._log_output = StringIO()
+ handler = logging.StreamHandler(self._log_output)
+ logger = logging.getLogger('mail-authenticate-dkim')
+ logger.addHandler(handler)
+ self.addCleanup(lambda: logger.removeHandler(handler))
+ self.monkeypatch_dns()
+
+ def fake_signing(self, plain_message, canonicalize=None):
+ if canonicalize is None:
+ canonicalize = (dkim.Relaxed, dkim.Relaxed)
+ dkim_line = dkim.sign(plain_message,
+ selector='example',
+ domain='canonical.com',
+ privkey=sample_privkey,
+ debuglog=self._log_output,
+ canonicalize=canonicalize
+ )
+ assert dkim_line[-1] == '\n'
+ return dkim_line + plain_message
+
+ def monkeypatch_dns(self):
+ self._dns_responses = {}
+ def my_lookup(name):
+ try:
+ return self._dns_responses[name]
+ except KeyError:
+ raise dns.resolver.NXDOMAIN()
+ orig_dnstxt = dkim.dnstxt
+ dkim.dnstxt = my_lookup
+ def restore():
+ dkim.dnstxt = orig_dnstxt
+ self.addCleanup(restore)
+
+ def get_dkim_log(self):
+ return self._log_output.getvalue()
+
+ def assertStronglyAuthenticated(self, principal, signed_message):
+ if IWeaklyAuthenticatedPrincipal.providedBy(principal):
+ self.fail('expected strong authentication; got weak:\n'
+ + self.get_dkim_log() + '\n\n' + signed_message)
+
+ def assertWeaklyAuthenticated(self, principal, signed_message):
+ if not IWeaklyAuthenticatedPrincipal.providedBy(principal):
+ self.fail('expected weak authentication; got strong:\n'
+ + self.get_dkim_log() + '\n\n' + signed_message)
+
+ def assertDkimLogContains(self, substring):
+ l = self.get_dkim_log()
+ if l.find(substring) == -1:
+ self.fail("didn't find %r in log: %s" % (substring, l))
+
+ def test_dkim_garbage_pubkey(self):
+ signed_message = self.fake_signing(plain_content)
+ self._dns_responses['example._domainkey.canonical.com.'] = \
+ 'aothuaonu'
+ principal = authenticateEmail(
+ signed_message_from_string(signed_message))
+ self.assertWeaklyAuthenticated(principal, signed_message)
+ self.assertEqual(principal.person.preferredemail.email,
+ 'foo.bar@xxxxxxxxxxxxx')
+ self.assertDkimLogContains('invalid format in _domainkey txt record')
+
+ def test_dkim_valid_strict(self):
+ signed_message = self.fake_signing(plain_content,
+ canonicalize=(dkim.Simple, dkim.Simple))
+ self._dns_responses['example._domainkey.canonical.com.'] = \
+ sample_dns
+ principal = authenticateEmail(
+ signed_message_from_string(signed_message))
+ self.assertStronglyAuthenticated(principal, signed_message)
+ self.assertEqual(principal.person.preferredemail.email,
+ 'foo.bar@xxxxxxxxxxxxx')
+
+ def test_dkim_valid(self):
+ signed_message = self.fake_signing(plain_content)
+ self._dns_responses['example._domainkey.canonical.com.'] = \
+ sample_dns
+ principal = authenticateEmail(
+ signed_message_from_string(signed_message))
+ self.assertStronglyAuthenticated(principal, signed_message)
+ self.assertEqual(principal.person.preferredemail.email,
+ 'foo.bar@xxxxxxxxxxxxx')
+
+ def test_dkim_untrusted_signer(self):
+ # Valid signature from an untrusted domain -> untrusted
+ signed_message = self.fake_signing(plain_content)
+ self._dns_responses['example._domainkey.canonical.com.'] = \
+ sample_dns
+ saved_domains = incoming._trusted_dkim_domains[:]
+ def restore():
+ incoming._trusted_dkim_domains = saved_domains
+ self.addCleanup(restore)
+ incoming._trusted_dkim_domains = []
+ principal = authenticateEmail(
+ signed_message_from_string(signed_message))
+ self.assertWeaklyAuthenticated(principal, signed_message)
+ self.assertEqual(principal.person.preferredemail.email,
+ 'foo.bar@xxxxxxxxxxxxx')
+
+ def test_dkim_signing_irrelevant(self):
+ # It's totally valid for a message to be signed by a domain other than
+ # that of the From-sender, if that domain is relaying the message.
+ # However, we shouldn't then trust the purported sender, because they
+ # might have just made it up rather than relayed it.
+ tweaked_message = plain_content.replace('foo.bar@xxxxxxxxxxxxx',
+ 'steve.alexander@xxxxxxxxxxxxxxx')
+ signed_message = self.fake_signing(tweaked_message)
+ self._dns_responses['example._domainkey.canonical.com.'] = \
+ sample_dns
+ principal = authenticateEmail(
+ signed_message_from_string(signed_message))
+ self.assertWeaklyAuthenticated(principal, signed_message)
+ # should come from From, not the dkim signature
+ self.assertEqual(principal.person.preferredemail.email,
+ 'steve.alexander@xxxxxxxxxxxxxxx')
+
+ def test_dkim_changed_from_address(self):
+ # if the address part of the message has changed, it's detected. we
+ # still treat this as weakly authenticated by the purported From-header
+ # sender, though perhaps in future we would prefer to reject these
+ # messages.
+ signed_message = self.fake_signing(plain_content)
+ self._dns_responses['example._domainkey.canonical.com.'] = \
+ sample_dns
+ fiddled_message = signed_message.replace('From: Foo Bar <foo.bar@xxxxxxxxxxxxx>',
+ 'From: Carlos <carlos@xxxxxxxxxxxxx>')
+ principal = authenticateEmail(
+ signed_message_from_string(fiddled_message))
+ self.assertWeaklyAuthenticated(principal, fiddled_message)
+ # should come from From, not the dkim signature
+ self.assertEqual(principal.person.preferredemail.email,
+ 'carlos@xxxxxxxxxxxxx')
+
+ def test_dkim_changed_from_realname(self):
+ # if the real name part of the message has changed, it's detected
+ signed_message = self.fake_signing(plain_content)
+ self._dns_responses['example._domainkey.canonical.com.'] = \
+ sample_dns
+ fiddled_message = signed_message.replace('From: Foo Bar <foo.bar@xxxxxxxxxxxxx>',
+ 'From: Evil Foo <foo.bar@xxxxxxxxxxxxx>')
+ principal = authenticateEmail(
+ signed_message_from_string(fiddled_message))
+ # we don't care about the real name for determining the principal
+ self.assertWeaklyAuthenticated(principal, fiddled_message)
+ self.assertEqual(principal.person.preferredemail.email,
+ 'foo.bar@xxxxxxxxxxxxx')
+
+ def test_dkim_nxdomain(self):
+ # if there's no DNS entry for the pubkey
+ # it should be handled decently
+ signed_message = self.fake_signing(plain_content)
+ principal = authenticateEmail(
+ signed_message_from_string(signed_message))
+ self.assertWeaklyAuthenticated(principal, signed_message)
+ self.assertEqual(principal.person.preferredemail.email,
+ 'foo.bar@xxxxxxxxxxxxx')
+
+ def test_dkim_message_unsigned(self):
+ # degenerate case: no signature treated as weakly authenticated
+ principal = authenticateEmail(
+ signed_message_from_string(plain_content))
+ self.assertWeaklyAuthenticated(principal, plain_content)
+ self.assertEqual(principal.person.preferredemail.email,
+ 'foo.bar@xxxxxxxxxxxxx')
+ # the library doesn't log anything if there's no header at all
+
+ def test_dkim_body_mismatch(self):
+ # The message message has a syntactically valid DKIM signature that
+ # doesn't actually correspond to what was signed. We log something about
+ # this but we don't want to drop the message.
+ signed_message = self.fake_signing(plain_content)
+ signed_message += 'blah blah'
+ self._dns_responses['example._domainkey.canonical.com.'] = \
+ sample_dns
+ principal = authenticateEmail(
+ signed_message_from_string(signed_message))
+ self.assertWeaklyAuthenticated(principal, signed_message)
+ self.assertEqual(principal.person.preferredemail.email,
+ 'foo.bar@xxxxxxxxxxxxx')
+ self.assertDkimLogContains('body hash mismatch')
+
+
+def test_suite():
+ return unittest.TestLoader().loadTestsFromName(__name__)
=== modified file 'lib/lp/services/mail/tests/test_signedmessage.py'
--- lib/lp/services/mail/tests/test_signedmessage.py 2009-06-30 16:56:07 +0000
+++ lib/lp/services/mail/tests/test_signedmessage.py 2010-07-14 15:35:59 +0000
@@ -28,7 +28,7 @@
from canonical.testing.layers import DatabaseFunctionalLayer
class TestSignedMessage(TestCaseWithFactory):
- """Test SignedMessage class correctly extracts the GPG signatures."""
+ """Test SignedMessage class correctly extracts and verifies the GPG signatures."""
layer = DatabaseFunctionalLayer
=== modified file 'setup.py'
--- setup.py 2010-07-06 08:58:00 +0000
+++ setup.py 2010-07-14 15:35:59 +0000
@@ -29,6 +29,8 @@
'chameleon.core',
'chameleon.zpt',
'cssutils',
+ # Required for pydkim
+ 'dnspython',
'feedvalidator',
'funkload',
'launchpadlib',
@@ -51,6 +53,7 @@
'paramiko',
'python-memcached',
'pyasn1',
+ 'pydkim',
'python-openid',
'pytz',
# This appears to be a broken indirect dependency from zope.security:
=== modified file 'versions.cfg'
--- versions.cfg 2010-07-09 15:10:07 +0000
+++ versions.cfg 2010-07-14 15:35:59 +0000
@@ -13,6 +13,8 @@
# Required by Windmill to run on 2.4
ctypes = 1.0.2
docutils = 0.5
+# Required by pydkim
+dnspython = 1.7.1
elementtree = 1.2.6-20050316
epydoc = 3.0.1
feedvalidator = 0.0.0DEV-r1049
@@ -47,6 +49,7 @@
PasteDeploy = 1.3.3
pyasn1 = 0.0.9a
pycrypto = 2.0.1
+pydkim = 0.3-mbp-r7
pyOpenSSL = 0.10
python-memcached = 1.45
python-openid = 2.2.1