launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #20043
[Merge] lp:~thomir/launchpad/devel-add-read-ff into lp:launchpad
Thomi Richards has proposed merging lp:~thomir/launchpad/devel-add-read-ff into lp:launchpad.
Commit message:
Add feature flag to read GPG keys from gpgservice.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~thomir/launchpad/devel-add-read-ff/+merge/287694
Add feature flag to read GPG keys from gpgservice.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~thomir/launchpad/devel-add-read-ff into lp:launchpad.
=== modified file 'buildout.cfg'
--- buildout.cfg 2016-02-15 00:54:24 +0000
+++ buildout.cfg 2016-03-01 19:34:04 +0000
@@ -31,7 +31,7 @@
prefer-final = true
-develop = .
+develop = . /home/thomi/code/canonical/canonical-gpg-service
[configuration]
instance_name = development
=== modified file 'database/schema/security.cfg'
--- database/schema/security.cfg 2016-01-19 17:41:11 +0000
+++ database/schema/security.cfg 2016-03-01 19:34:04 +0000
@@ -893,8 +893,8 @@
public.bugnotificationrecipient = SELECT, INSERT
public.bugsubscription = SELECT
public.bugsubscriptionfilter = SELECT
+public.bugsubscriptionfilterimportance = SELECT
public.bugsubscriptionfilterinformationtype = SELECT
-public.bugsubscriptionfilterimportance = SELECT
public.bugsubscriptionfilterstatus = SELECT
public.bugsubscriptionfiltertag = SELECT
public.bugtag = SELECT
@@ -919,6 +919,7 @@
public.messagechunk = SELECT, INSERT
public.milestone = SELECT
public.milestonetag = SELECT
+public.openididentifier = SELECT
public.packagecopyjob = SELECT, INSERT, DELETE
public.packagecopyrequest = SELECT, INSERT, UPDATE
public.packagediff = SELECT, INSERT
=== modified file 'lib/lp/archivepublisher/archivesigningkey.py'
--- lib/lp/archivepublisher/archivesigningkey.py 2015-09-13 18:30:51 +0000
+++ lib/lp/archivepublisher/archivesigningkey.py 2016-03-01 19:34:04 +0000
@@ -103,11 +103,9 @@
pub_key = gpghandler.retrieveKey(secret_key.fingerprint)
gpghandler.uploadPublicKey(pub_key.fingerprint)
- algorithm = GPGKeyAlgorithm.items[pub_key.algorithm]
key_owner = getUtility(ILaunchpadCelebrities).ppa_key_guard
- self.archive.signing_key = getUtility(IGPGKeySet).new(
- key_owner, pub_key.keyid, pub_key.fingerprint, pub_key.keysize,
- algorithm, active=True, can_encrypt=pub_key.can_encrypt)
+ self.archive.signing_key, _ = getUtility(IGPGKeySet).activate(
+ key_owner, pub_key, pub_key.can_encrypt)
def signRepository(self, suite):
"""See `IArchiveSigningKey`."""
=== modified file 'lib/lp/archivepublisher/tests/test_publishdistro.py'
--- lib/lp/archivepublisher/tests/test_publishdistro.py 2016-02-04 19:46:52 +0000
+++ lib/lp/archivepublisher/tests/test_publishdistro.py 2016-03-01 19:34:04 +0000
@@ -24,6 +24,8 @@
from lp.registry.interfaces.person import IPersonSet
from lp.registry.interfaces.pocket import PackagePublishingPocket
from lp.services.config import config
+from lp.services.features.testing import FeatureFixture
+from lp.services.gpg.interfaces import GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG
from lp.services.log.logger import (
BufferLogger,
DevNullLogger,
@@ -391,6 +393,21 @@
self.assertNotExists(index_path)
+class TestPublishDistroWithGPGService(TestPublishDistro):
+ """A copy of the TestPublishDistro tests, but with the gpgservice feature
+ flag enabled.
+
+ Once gpgservice is the default and launchpad no longer manages it's own gpg
+ key storage, these tests can be removed.
+
+ """
+
+ def setUp(self):
+ super(TestPublishDistroWithGPGService, self).setUp()
+ self.useFixture(FeatureFixture(
+ {GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG: True}))
+
+
class FakeArchive:
"""A very simple fake `Archive`."""
def __init__(self, purpose=ArchivePurpose.PRIMARY):
=== modified file 'lib/lp/registry/browser/person.py'
--- lib/lp/registry/browser/person.py 2016-02-29 20:18:23 +0000
+++ lib/lp/registry/browser/person.py 2016-03-01 19:34:04 +0000
@@ -210,8 +210,10 @@
from lp.services.geoip.interfaces import IRequestPreferredLanguages
from lp.services.gpg.interfaces import (
GPG_DATABASE_READONLY_FEATURE_FLAG,
+ GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG,
GPGKeyNotFoundError,
GPGReadOnly,
+ IGPGClient,
IGPGHandler,
)
from lp.services.identity.interfaces.account import (
@@ -2565,6 +2567,8 @@
self.key_already_imported = True
return
+ # Launchpad talks to the keyserver directly to check if the key has been
+ # uploaded to the key server.
try:
key = gpghandler.retrieveKey(self.fingerprint)
except GPGKeyNotFoundError:
@@ -2589,7 +2593,6 @@
key_fingerprints = [key_fingerprints]
gpgkeyset = getUtility(IGPGKeySet)
-
deactivated_keys = []
for key_fingerprint in key_fingerprints:
gpgkey = gpgkeyset.getByFingerprint(key_fingerprint)
@@ -2599,7 +2602,7 @@
self.error_message = structured(
"Cannot deactivate someone else's key")
return
- gpgkey.active = False
+ gpgkeyset.deactivate(gpgkey)
deactivated_keys.append(gpgkey.displayname)
flush_database_updates()
=== modified file 'lib/lp/registry/configure.zcml'
--- lib/lp/registry/configure.zcml 2016-02-05 20:28:29 +0000
+++ lib/lp/registry/configure.zcml 2016-03-01 19:34:04 +0000
@@ -1284,6 +1284,14 @@
permission="launchpad.Edit"
set_attributes="active can_encrypt"/>
</class>
+ <class
+ class="lp.registry.model.gpgkey.GPGServiceKey">
+ <allow
+ interface="lp.registry.interfaces.gpg.IGPGKey"/>
+ <require
+ permission="launchpad.Edit"
+ set_attributes="active can_encrypt"/>
+ </class>
<!-- GPGKeySet -->
=== modified file 'lib/lp/registry/interfaces/gpg.py'
--- lib/lp/registry/interfaces/gpg.py 2016-03-01 14:15:26 +0000
+++ lib/lp/registry/interfaces/gpg.py 2016-03-01 19:34:04 +0000
@@ -73,11 +73,20 @@
reactivated an existing key.
"""
+ def deactivate(key):
+ """Deactivate a key.
+
+ :param key: An IGPGKey instance.
+ """
+
def getByFingerprint(fingerprint, default=None):
"""Return UNIQUE result for a given Key fingerprint including
inactive ones.
"""
+ def getOwnerIdForPerson(person):
+ """return an owner id string suitable for sending to gpgservice."""
+
def getGPGKeysForPerson(person, active=True):
"""Return OpenGPG keys for a person.
=== modified file 'lib/lp/registry/model/gpgkey.py'
--- lib/lp/registry/model/gpgkey.py 2016-03-01 14:15:26 +0000
+++ lib/lp/registry/model/gpgkey.py 2016-03-01 19:34:04 +0000
@@ -18,15 +18,22 @@
IGPGKey,
IGPGKeySet,
)
+from lp.registry.interfaces.person import IPersonSet
from lp.services.database.enumcol import EnumCol
from lp.services.database.sqlbase import (
SQLBase,
sqlvalues,
)
+from lp.services.features import getFeatureFlag
from lp.services.gpg.interfaces import (
+ GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG,
+ GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG,
GPGKeyAlgorithm,
+ IGPGClient,
IGPGHandler,
)
+from lp.services.openid.interfaces.openid import IOpenIDPersistentIdentity
+from lp.services.openid.model.openididentifier import OpenIdIdentifier
@implementer(IGPGKey)
@@ -59,56 +66,131 @@
return '%s%s/%s' % (self.keysize, self.algorithm.title, self.keyid)
+@implementer(IGPGKey)
+class GPGServiceKey:
+
+ def __init__(self, key_data):
+ self._key_data = key_data
+ self.active = key_data['enabled']
+
+ @property
+ def keysize(self):
+ return self._key_data['size']
+
+ @property
+ def algorithm(self):
+ return GPGKeyAlgorithm.items[self._key_data['algorithm']]
+
+ @property
+ def keyid(self):
+ return self._key_data['id']
+
+ @property
+ def fingerprint(self):
+ return self._key_data['fingerprint']
+
+ @property
+ def displayname(self):
+ return '%s%s/%s' % (self.keysize, self.algorithm.title, self.keyid)
+
+ @property
+ def keyserverURL(self):
+ return getUtility(
+ IGPGHandler).getURLForKeyInServer(self.fingerprint, public=True)
+
+ @property
+ def can_encrypt(self):
+ return self._key_data['can_encrypt']
+
+ @property
+ def owner(self):
+ return getUtility(IPersonSet).getByOpenIDIdentifier(
+ self._key_data['owner'])
+
+ @property
+ def ownerID(self):
+ return self.owner.id
+
+
+
@implementer(IGPGKeySet)
class GPGKeySet:
def new(self, ownerID, keyid, fingerprint, keysize,
algorithm, active=True, can_encrypt=False):
"""See `IGPGKeySet`"""
- return GPGKey(owner=ownerID, keyid=keyid,
+ key = GPGKey(owner=ownerID, keyid=keyid,
fingerprint=fingerprint, keysize=keysize,
algorithm=algorithm, active=active,
can_encrypt=can_encrypt)
+ return key
def activate(self, requester, key, can_encrypt):
"""See `IGPGKeySet`."""
fingerprint = key.fingerprint
lp_key = self.getByFingerprint(fingerprint)
if lp_key:
+ is_new = False
# Then the key already exists, so let's reactivate it.
lp_key.active = True
lp_key.can_encrypt = can_encrypt
- return lp_key, False
- ownerID = requester.id
- keyid = key.keyid
- keysize = key.keysize
- algorithm = GPGKeyAlgorithm.items[key.algorithm]
- lp_key = self.new(
- ownerID, keyid, fingerprint, keysize, algorithm,
- can_encrypt=can_encrypt)
- return lp_key, True
+ else:
+ is_new = True
+ ownerID = requester.id
+ keyid = key.keyid
+ keysize = key.keysize
+ algorithm = GPGKeyAlgorithm.items[key.algorithm]
+ lp_key = self.new(
+ ownerID, keyid, fingerprint, keysize, algorithm,
+ can_encrypt=can_encrypt)
+ if getFeatureFlag(GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG):
+ client = getUtility(IGPGClient)
+ openid_identifier = self.getOwnerIdForPerson(lp_key.owner)
+ client.addKeyForOwner(openid_identifier, key.fingerprint)
+ return lp_key, is_new
+
+ def deactivate(self, key):
+ key.active = False
+ if getFeatureFlag(GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG):
+ client = getUtility(IGPGClient)
+ openid_identifier = self.getOwnerIdForPerson(key.owner)
+ client.disableKeyForOwner(openid_identifier, key.fingerprint)
def getByFingerprint(self, fingerprint, default=None):
"""See `IGPGKeySet`"""
- result = GPGKey.selectOneBy(fingerprint=fingerprint)
- if result is None:
- return default
- return result
+ if getFeatureFlag(GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG):
+ key_data = getUtility(IGPGClient).getKeyByFingerprint(fingerprint)
+ return GPGServiceKey(key_data) if key_data else default
+ else:
+ result = GPGKey.selectOneBy(fingerprint=fingerprint)
+ if result is None:
+ return default
+ return result
def getGPGKeysForPerson(self, owner, active=True):
- if active is False:
- query = """
- active = false
- AND fingerprint NOT IN
- (SELECT fingerprint FROM LoginToken
- WHERE fingerprint IS NOT NULL
- AND requester = %s
- AND date_consumed is NULL
- )
- """ % sqlvalues(owner.id)
+ if getFeatureFlag(GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG):
+ client = getUtility(IGPGClient)
+ owner_id = self.getOwnerIdForPerson(owner)
+ keys = client.getKeysForOwner(owner_id)['keys']
+ return [GPGServiceKey(d) for d in keys if d['enabled'] == active]
else:
- query = 'active=true'
-
- query += ' AND owner=%s' % sqlvalues(owner.id)
-
- return list(GPGKey.select(query, orderBy='id'))
+ if active is False:
+ query = """
+ active = false
+ AND fingerprint NOT IN
+ (SELECT fingerprint FROM LoginToken
+ WHERE fingerprint IS NOT NULL
+ AND requester = %s
+ AND date_consumed is NULL
+ )
+ """ % sqlvalues(owner.id)
+ else:
+ query = 'active=true'
+
+ query += ' AND owner=%s' % sqlvalues(owner.id)
+
+ return list(GPGKey.select(query, orderBy='id'))
+
+ def getOwnerIdForPerson(self, owner):
+ """See IGPGKeySet."""
+ return IOpenIDPersistentIdentity(owner).openid_identity_url
=== added file 'lib/lp/registry/stories/gpg-coc/gpg-with-gpgservice-ff.txt'
--- lib/lp/registry/stories/gpg-coc/gpg-with-gpgservice-ff.txt 1970-01-01 00:00:00 +0000
+++ lib/lp/registry/stories/gpg-coc/gpg-with-gpgservice-ff.txt 2016-03-01 19:34:04 +0000
@@ -0,0 +1,691 @@
+= Claiming GPG Keys =
+
+XXX: This file was copied from 'xx-gpg-coc.txt'. The only difference is that we
+ set a feature flag that means that writes are sent to gpgservice. This file
+ was used because it contains a reasonably complete set of gpg-related
+ actions. Eventually gpg keys will be read-only, so most, if not all of these
+ tests will be deleted.
+
+ >>> from lp.services.features.testing import FeatureFixture
+ >>> from lp.services.gpg.interfaces import GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG
+ >>> feature_fixture = FeatureFixture(
+ ... {GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG: True})
+ >>> feature_fixture.setUp()
+
+== Setup ==
+
+ >>> import email
+ >>> from lp.testing.keyserver import KeyServerTac
+ >>> from lp.services.mail import stub
+ >>> from zope.component import getUtility
+ >>> from lp.registry.interfaces.person import IPersonSet
+ >>> from lp.testing.pages import setupBrowserFreshLogin
+
+Set up the stub KeyServer:
+
+ >>> tac = KeyServerTac()
+ >>> tac.setUp()
+
+
+== Claim an encrypting GPG key ==
+
+This test verifies the basic claim a GPG key workflow.
+
+Start out with a clean page containing no imported keys:
+
+ >>> login(ANONYMOUS)
+ >>> name12 = getUtility(IPersonSet).getByEmail('test@xxxxxxxxxxxxx')
+ >>> logout()
+ >>> browser = setupBrowserFreshLogin(name12)
+ >>> browser.open("http://launchpad.dev/~name12")
+ >>> browser.getLink(url='+editpgpkeys').click()
+ >>> print browser.title
+ Change your OpenPGP keys...
+
+ >>> browser.getControl(name='DEACTIVATE_GPGKEY')
+ Traceback (most recent call last):
+ ...
+ LookupError: name 'DEACTIVATE_GPGKEY'
+
+Claim OpenPGP key:
+
+ >>> key = "A419AE861E88BC9E04B9C26FBA2B9389DFD20543"
+ >>> browser.getControl(name='fingerprint').value = key
+ >>> browser.getControl(name='import').click()
+ >>> print_feedback_messages(browser.contents)
+ A message has been sent to test@xxxxxxxxxxxxx, encrypted
+ with the key 1024D/DFD20543.
+ To confirm the key is yours, decrypt the message and follow the
+ link inside.
+
+Recover token URL from the encrypted part, but also make sure there's a clear
+text part that provides useful information to users who -- for whatever reason
+-- cannot decrypt the token url. Start by grabbing the confirmation message.
+
+ >>> from_addr, to_addrs, raw_msg = stub.test_emails.pop()
+ >>> msg = email.message_from_string(raw_msg)
+ >>> msg.get_content_type()
+ 'text/plain'
+
+The message will be a single text/plain part with clear text instructions,
+followed by ASCII armored encrypted confirmation instructions. Ensure that
+the clear text instructions contain the expected URLs pointing to more help.
+
+ >>> cipher_body = msg.get_payload(decode=True)
+ >>> print cipher_body
+ Hello,
+ <BLANKLINE>
+ This message contains the instructions for confirming registration of an
+ OpenPGP key for use in Launchpad. The confirmation instructions have been
+ encrypted with the OpenPGP key you have attempted to register. If you cannot
+ read the unencrypted instructions below, it may be because your mail reader
+ does not support automatic decryption of "ASCII armored" encrypted text.
+ <BLANKLINE>
+ Exact instructions for enabling this depends on the specific mail reader you
+ are using. Please see this support page for more information:
+ <BLANKLINE>
+ https://help.launchpad.net/ReadingOpenPgpMail
+ <BLANKLINE>
+ For more general information on OpenPGP and related tools such as Gnu Privacy
+ Guard (GPG), please see:
+ <BLANKLINE>
+ https://help.ubuntu.com/community/GnuPrivacyGuardHowto
+ <BLANKLINE>
+ -----BEGIN PGP MESSAGE-----
+ ...
+ -----END PGP MESSAGE-----
+ <BLANKLINE>
+ <BLANKLINE>
+ Thanks,
+ <BLANKLINE>
+ The Launchpad Team
+
+Import the secret keys needed for this test:
+
+ >>> from lp.services.gpg.interfaces import IGPGHandler
+
+ >>> from lp.testing.gpgkeys import (
+ ... import_secret_test_key, decrypt_content)
+
+
+ >>> gpghandler = getUtility(IGPGHandler)
+
+ >>> login(ANONYMOUS)
+ >>> key = import_secret_test_key('test@xxxxxxxxxxxxxxxxx')
+
+'cipher_body' is a message encrypted with the just-imported
+1024D/DFD20543 OpenPGP key, we need to access the current IGpghandler
+instance to access this key and decrypt the message.
+
+ >>> body = decrypt_content(cipher_body, 'test')
+
+Extract the token URL from the email:
+
+ >>> from lp.services.verification.tests.logintoken import (
+ ... get_token_url_from_string)
+ >>> token_url = get_token_url_from_string(body)
+
+Go to the link sent by email, to validate the email address.
+
+ >>> logout()
+ >>> browser.open(token_url)
+
+Get redirected to +validategpg, and confirm token:
+
+ >>> print browser.url
+ http://launchpad.dev/token/.../+validategpg
+ >>> browser.getControl('Continue').click()
+
+Get redirected to the user's homepage with a greeting:
+
+ >>> browser.url
+ 'http://launchpad.dev/~name12'
+ >>> print_feedback_messages(browser.contents)
+ The key 1024D/DFD20543 was successfully validated.
+
+Certify the key is imported:
+
+ >>> browser.open("http://launchpad.dev/~name12/+editpgpkeys")
+ >>> browser.getControl(name='DEACTIVATE_GPGKEY').displayOptions
+ ['1024D/DFD20543']
+
+Verify that the key was imported with the "can encrypt" flag set:
+
+ >>> from lp.registry.model.gpgkey import GPGKey
+ >>> key = GPGKey.selectOneBy(
+ ... fingerprint='A419AE861E88BC9E04B9C26FBA2B9389DFD20543')
+ >>> print key.owner.name
+ name12
+ >>> print key.can_encrypt
+ True
+
+
+== Claim a sign-only GPG key ==
+
+Here, Sample Person wants to claim a GPG key that can only sign
+content. They can't verify their key by decrypting content on demand, but
+they can verify it by signing content. Launchpad sends them an email
+token. The email step ensures that an attacker who knows Sample
+Person's Launchpad password can't associate arbitrary GPG keys with
+their Launchpad account.
+
+ >>> browser.open("http://launchpad.dev/~name12/+editpgpkeys")
+
+ >>> fingerprint = "447DBF38C4F9C4ED752246B77D88913717B05A8F"
+ >>> browser.getControl(name='fingerprint').value = fingerprint
+ >>> browser.getControl(name='import').click()
+ >>> print_feedback_messages(browser.contents)
+ A message has been sent to test@xxxxxxxxxxxxx. To
+ confirm the key 1024D/17B05A8F is yours, follow
+ the link inside.
+
+Sample Person checks their email.
+
+ >>> from_addr, to_addrs, raw_msg = stub.test_emails.pop()
+ >>> msg = email.message_from_string(raw_msg)
+ >>> msg.get_content_type()
+ 'text/plain'
+ >>> body = msg.get_payload(decode=True)
+
+The email is not encrypted, since Sample Person didn't claim the
+ability to decrypt text with this key.
+
+ >>> '-----BEGIN PGP MESSAGE-----' in body
+ False
+
+The email does contain some information about the key, and a token URL
+Sample Person should visit to verify their ownership of the key.
+
+ >>> print body
+ <BLANKLINE>
+ Hello,
+ ...
+ User name : Sample Person
+ Email address: test@xxxxxxxxxxxxx
+ ...
+ Fingerprint : 447DBF38C4F9C4ED752246B77D88913717B05A8F
+ Key type/ID : 1024D/17B05A8F
+ <BLANKLINE>
+ UIDs:
+ sign.only@xxxxxxxxxxxxx
+ ...
+ http://launchpad.dev/token/...
+
+ >>> token_url = get_token_url_from_string(body)
+
+Side note: in a little while, Sample User will be asked to sign some
+text which includes the date the token was generated (to avoid replay
+attacks). To make this testable, we set the creation date of this
+token to a fixed value:
+
+ >>> nothing, token_value = token_url.split('http://launchpad.dev/token/')
+
+ >>> import datetime, hashlib, pytz
+ >>> from lp.services.verification.model.logintoken import LoginToken
+ >>> logintoken = LoginToken.selectOneBy(
+ ... _token=hashlib.sha256(token_value).hexdigest())
+ >>> logintoken.date_created = datetime.datetime(
+ ... 2005, 4, 1, 12, 0, 0, tzinfo=pytz.timezone('UTC'))
+ >>> logintoken.sync()
+
+Back to Sample User. They visit the token URL and is asked to sign some
+text to prove they own the key.
+
+ >>> browser.open(token_url)
+ >>> browser.title
+ 'Confirm sign-only OpenPGP key'
+
+Let's look at the text.
+
+ >>> verification_content = find_main_content(
+ ... browser.contents).pre.string
+ >>> print verification_content
+ Please register 447DBF38C4F9C4ED752246B77D88913717B05A8F to the
+ Launchpad user name12. 2005-04-01 12:00:00 UTC
+
+If they refuse to sign the text, they get an error message.
+
+ >>> browser.getControl('Continue').click()
+ >>> browser.title
+ 'Confirm sign-only OpenPGP key'
+ >>> print_feedback_messages(browser.contents)
+ There is 1 error.
+ Required input is missing.
+
+If they sign a different text, they get an error message.
+
+ >>> login(ANONYMOUS)
+ >>> key = import_secret_test_key('sign.only@xxxxxxxxxxxxxxxxx')
+ >>> bad = gpghandler.signContent(
+ ... 'This is not the verification message!',
+ ... '447DBF38C4F9C4ED752246B77D88913717B05A8F', 'test')
+ >>> logout()
+
+ >>> browser.getControl('Signed text').value = bad
+ >>> browser.getControl('Continue').click()
+ >>> print_feedback_messages(browser.contents)
+ There is 1 error.
+ The signed content does not match the message found in the email.
+
+If they sign the text with a different key, they get an error
+message. The following text was signed with the key DFD20543:
+
+ >>> signed_content = """
+ ... -----BEGIN PGP SIGNED MESSAGE-----
+ ... Hash: SHA1
+ ...
+ ... Please register 447DBF38C4F9C4ED752246B77D88913717B05A8F to the
+ ... Launchpad user name12. 2005-04-01 12:00:00 UTC
+ ... -----BEGIN PGP SIGNATURE-----
+ ... Version: GnuPG v1.4.1 (GNU/Linux)
+ ...
+ ... iD8DBQFDcLOh2yWXVgK6XvYRAkpWAKDFHRpVJc2flFwpQMMxub4cl+TcCACgyciu
+ ... s7GH1fQGOQMqpvpinwOjGto=
+ ... =w7/b
+ ... -----END PGP SIGNATURE-----
+ ... """
+ >>> browser.getControl('Signed text').value = signed_content
+ >>> browser.getControl('Continue').click()
+ >>> print_feedback_messages(browser.contents)
+ There is 1 error.
+ The key used to sign the content (A419AE861E88BC9E04B9C26FBA2B9389DFD20543)
+ is not the key you were registering
+
+If they sign the text correctly, they are redirected to their home page.
+
+ >>> login(ANONYMOUS)
+ >>> good = gpghandler.signContent(
+ ... str(verification_content),
+ ... '447DBF38C4F9C4ED752246B77D88913717B05A8F', 'test')
+ >>> logout()
+
+ >>> browser.getControl('Signed text').value = good
+ >>> browser.getControl('Continue').click()
+ >>> browser.url
+ 'http://launchpad.dev/~name12'
+ >>> print_feedback_messages(browser.contents)
+ The key 1024D/17B05A8F was successfully validated.
+
+Now that the key has been validated, the login token is consumed:
+
+ >>> consumed_token = LoginToken.selectOneBy(
+ ... _token=hashlib.sha256(token_value).hexdigest())
+ >>> consumed_token.date_consumed is not None
+ True
+
+Now Sample Person's sign-only key is associated with their account. They
+verify this:
+
+ >>> browser.open("http://launchpad.dev/~name12/+editpgpkeys")
+
+ >>> content = find_main_content(browser.contents)
+ >>> browser.getControl(name='DEACTIVATE_GPGKEY').displayOptions
+ [...'1024D/17B05A8F (sign only)']
+
+On a mad whim they decide to de-activate the key they just imported.
+
+ >>> browser.getControl(name="DEACTIVATE_GPGKEY").displayValue = [
+ ... '1024D/17B05A8F (sign only)']
+ >>> browser.getControl('Deactivate Key').click()
+
+ >>> print_feedback_messages(browser.contents)
+ Deactivated key(s): 1024D/17B05A8F
+
+Coming to their senses, they ask for a re-validation of the key.
+
+ >>> browser.getControl(name="REACTIVATE_GPGKEY").value = ['3']
+ >>> browser.getControl('Reactivate Key').click()
+
+ >>> print_feedback_messages(browser.contents)
+ A message has been sent to test@xxxxxxxxxxxxx with instructions
+ to reactivate these key(s): 1024D/17B05A8F
+
+They open the page and checks that the key is displayed as pending
+revalidation.
+
+ >>> browser.reload()
+ >>> browser.getControl(name='REMOVE_GPGTOKEN').displayOptions
+ ['447DBF38C4F9C4ED752246B77D88913717B05A8F']
+
+(We won't run through the whole validation process again, as this key isn't
+used in any more tests.)
+
+== Teardown ==
+
+ >>> tac.tearDown()
+
+=========================
+Signing a Code of Conduct
+=========================
+
+Sample person has never signed a code of conduct.
+
+ >>> browser = setupBrowser(auth='Basic test@xxxxxxxxxxxxx:test')
+ >>> browser.open('http://launchpad.dev/~name12/+codesofconduct')
+ >>> print extract_text(find_main_content(browser.contents))
+ Codes of Conduct for Sample Person
+ Launchpad records codes of conduct you sign as commitments to the
+ principles of collaboration, tolerance and open communication that
+ drive the open source community.
+ Sample Person has never signed a code
+ of conduct.
+ See or sign new code of conduct releases
+
+ # A helper function for reading a code-of-conduct file.
+ >>> import os
+ >>> def read_file(filename):
+ ... path = os.path.join(os.path.dirname(__file__), filename)
+ ... with open(path) as file_object:
+ ... return file_object.read()
+
+
+Code of Conduct registration problems
+=====================================
+
+Sample Person tries unsuccessfully to register a truncated code of conduct.
+
+ >>> truncated_coc = read_file('truncated_coc.asc')
+ >>> browser.open('http://launchpad.dev/codeofconduct/2.0/+sign')
+ >>> browser.getControl('Signed Code').value = truncated_coc
+ >>> browser.getControl('Continue').click()
+ >>> print_errors(browser.contents)
+ There is 1 error.
+ The signed text does not match the Code of Conduct. Make sure that you
+ signed the correct text (white space differences are acceptable).
+
+Sample Person tries unsuccessfully to register an old version of the code.
+
+ >>> coc_version_1_0 = read_file('10_coc.asc')
+ >>> browser.getControl('Signed Code').value = coc_version_1_0
+ >>> browser.getControl('Continue').click()
+ >>> print_errors(browser.contents)
+ There is 1 error.
+ The signed text does not match the Code of Conduct. Make sure that you
+ signed the correct text (white space differences are acceptable).
+
+
+Sample Person tries to access the old version page to sign it, and is informed
+that there is a new version available.
+
+ >>> browser.open('http://launchpad.dev/codeofconduct/1.0/+sign')
+ >>> browser.getLink('the current version').click()
+ >>> print browser.url
+ http://launchpad.dev/codeofconduct/2.0
+
+ >>> browser.getLink('Sign it').click()
+ >>> print browser.url
+ http://launchpad.dev/codeofconduct/2.0/+sign
+
+
+Code of Conduct registration
+============================
+
+Sample Person registers the code of conduct, using a reformatted copy which
+has leading spaces removed. This succeeds because the words the same and
+appear in the same order.
+
+ >>> reformatted_coc = read_file('reformatted_20_coc.asc')
+ >>> browser.getControl('Signed Code').value = reformatted_coc
+ >>> browser.getControl('Continue').click()
+ >>> print browser.url
+ http://launchpad.dev/~name12/+codesofconduct
+
+And now Sample Person's Codes of Conduct page shows that they've signed it.
+
+ >>> browser.open('http://launchpad.dev/~name12/+codesofconduct')
+ >>> print extract_text(find_main_content(browser.contents))
+ Codes of Conduct for Sample Person
+ Launchpad records codes of conduct you sign as commitments to the
+ principles of collaboration, tolerance and open communication that
+ drive the open source community.
+ Active signatures
+ If you change your mind about agreeing to a code of conduct,
+ you can deactivate your signature.
+ ...: digitally signed by Sample Person (1024D/DFD20543)
+ ...
+
+
+Now Sample Person will deactivate their key...
+
+ >>> browser = setupBrowserFreshLogin(name12)
+ >>> browser.open('http://launchpad.dev/~name12/+editpgpkeys')
+ >>> browser.url
+ 'http://launchpad.dev/~name12/+editpgpkeys'
+
+ >>> print browser.contents
+ <...
+ ...Your active keys...
+ ...1024D/DFD20543...
+
+
+... but they forgot to select the checkbox of the key they want to remove.
+
+ >>> browser.getControl('Deactivate Key').click()
+ >>> for tag in find_main_content(browser.contents)('p', 'error message'):
+ ... print tag.renderContents()
+ No key(s) selected for deactivation.
+
+
+Now they select the checkbox and deactivate it.
+
+ >>> browser.getControl('1024D/DFD20543').selected = True
+ >>> browser.getControl('Deactivate Key').click()
+ >>> soup = find_main_content(browser.contents)
+ >>> for tag in soup('p', 'informational message'):
+ ... print tag.renderContents()
+ Deactivated key(s): 1024D/DFD20543
+
+
+Sample Person already has a deactivated key.
+
+ >>> browser.open('http://launchpad.dev/~name12/+editpgpkeys')
+ >>> browser.url
+ 'http://launchpad.dev/~name12/+editpgpkeys'
+
+ >>> print browser.contents
+ <...
+ ...Deactivated keys...
+ ...1024D/DFD20543...
+
+
+Now they'll request their key to be reactivated.
+
+ >>> browser.getControl('Reactivate Key').click()
+ >>> soup = find_main_content(browser.contents)
+ >>> for tag in soup('p', 'error message'):
+ ... print tag.renderContents()
+ No key(s) selected for reactivation.
+
+ >>> browser.getControl('1024D/DFD20543').selected = True
+ >>> browser.getControl('Reactivate Key').click()
+ >>> soup = find_main_content(browser.contents)
+ >>> for tag in soup('p', 'informational message'):
+ ... print tag.renderContents()
+ A message has been sent to test@xxxxxxxxxxxxx with instructions to reactivate...
+
+
+Get the token from the body of the email sent.
+
+ >>> import email, re
+ >>> from lp.services.mail import stub
+ >>> from_addr, to_addrs, raw_msg = stub.test_emails.pop()
+ >>> msg = email.message_from_string(raw_msg)
+ >>> cipher_body = msg.get_payload(decode=1)
+ >>> body = decrypt_content(cipher_body, 'test')
+ >>> link = re.findall(r'http.*/token/.*', body)[0]
+ >>> token = re.sub(r'.*token/', '', link)
+ >>> token_url = 'http://launchpad.dev/token/%s' % token.encode('ascii')
+
+
+Going to the token page will get us redirected to the page of that specific
+token type (+validategpg).
+
+ >>> browser.open(token_url)
+ >>> browser.url == '%s/+validategpg' % token_url
+ True
+
+ >>> print browser.contents
+ <...
+ ...Confirm the OpenPGP key...A419AE861E88BC9E04B9C26FBA2B9389DFD20543...
+ ...Sample Person...
+
+
+Now Sample Person confirms the reactivation.
+
+ >>> browser.getControl('Continue').click()
+ >>> browser.url
+ 'http://launchpad.dev/~name12'
+
+ >>> print browser.contents
+ <...
+ ...Key 1024D/DFD20543 successfully reactivated...
+
+
+And now we can see the key listed as one of Sample Person's active keys.
+
+ >>> browser.open('http://launchpad.dev/~name12/+editpgpkeys')
+ >>> print browser.contents
+ <...
+ ...Your active keys...
+ ...1024D/DFD20543...
+
+This test verifies that we correctly handle keys which are in some way
+special: either invalid, broken, revoked, expired, or already imported.
+
+ >>> import email
+ >>> from lp.testing.keyserver import KeyServerTac
+ >>> from lp.services.mail import stub
+
+ >>> tac = KeyServerTac()
+ >>> tac.setUp()
+
+ >>> sign_only = "447D BF38 C4F9 C4ED 7522 46B7 7D88 9137 17B0 5A8F"
+ >>> preimported = "A419AE861E88BC9E04B9C26FBA2B9389DFD20543"
+
+Try to import a key which is already imported:
+
+ >>> del stub.test_emails[:]
+ >>> browser.open('http://launchpad.dev/~name12/+editpgpkeys')
+ >>> browser.getControl(name='fingerprint').value = preimported
+ >>> browser.getControl(name='import').click()
+ >>> "A message has been sent" in browser.contents
+ False
+ >>> stub.test_emails
+ []
+ >>> print browser.contents
+ <BLANKLINE>
+ ...
+ ...has already been imported...
+
+ >>> tac.tearDown()
+
+
+
+Ensure we are raising 404 error instead of System Error
+
+ >>> print http(r"""
+ ... POST /codeofconduct/donkey HTTP/1.1
+ ... Authorization: Basic Zm9vLmJhckBjYW5vbmljYWwuY29tOnRlc3Q=
+ ... Referer: https://launchpad.dev/
+ ... """)
+ HTTP/1.1 404 Not Found
+ ...
+
+Check to see no CoC signature is registered for Mark:
+
+ >>> admin_browser.open('http://localhost:9000/codeofconduct/console')
+ >>> admin_browser.getControl(name='searchfor').value = ["all"]
+ >>> admin_browser.getControl(name='name').value = "mark"
+ >>> admin_browser.getControl(name='search').click()
+ >>> "No signatures found." in admin_browser.contents
+ True
+
+Perform Acknowledge process as Foo bar person:
+
+ >>> admin_browser.open('http://localhost:9000/codeofconduct/console/+new')
+ >>> admin_browser.title
+ 'Register a code of conduct signature'
+
+ >>> admin_browser.getControl(
+ ... name='field.owner').value = "mark@xxxxxxxxxxx"
+ >>> admin_browser.getControl('Register').click()
+ >>> admin_browser.url
+ 'http://localhost:9000/codeofconduct/console'
+
+Ensure the CoC was acknowledge by searching in the CoC Admin Console:
+
+ >>> admin_browser.open('http://launchpad.dev/codeofconduct/console')
+ >>> admin_browser.getControl(name='searchfor').value = ["all"]
+ >>> admin_browser.getControl(name='name').value = "mark"
+ >>> admin_browser.getControl(name='search').click()
+ >>> print extract_text(find_tag_by_id(admin_browser.contents, 'matches'))
+ Mark ... paper submission accepted by Foo Bar [ACTIVE]
+
+Test if the advertisement email was sent:
+
+ >>> import email
+ >>> from lp.services.mail import stub
+ >>> from_addr, to_addrs, raw_msg = stub.test_emails.pop()
+ >>> msg = email.message_from_string(raw_msg)
+ >>> print msg.get_payload(decode=True)
+ <BLANKLINE>
+ ...
+ User: 'Mark Shuttleworth'
+ Paper Submitted acknowledge by Foo Bar
+ ...
+
+ Let's login with an Launchpad Admin
+
+ >>> for pos, (key, _) in enumerate(browser.mech_browser.addheaders):
+ ... if key == 'Authorization':
+ ... del browser.mech_browser.addheaders[pos]
+ ... break
+ >>> browser.addHeader(
+ ... 'Authorization', 'Basic guilherme.salgado@xxxxxxxxxxxxx:test')
+
+ Check if we can see the Code of conduct page
+
+ >>> browser.open('http://localhost:9000/codeofconduct')
+ >>> 'Ubuntu Codes of Conduct' in browser.contents
+ True
+
+ The link to the Administrator console
+
+ >>> admin_console_link = browser.getLink('Administration console')
+ >>> admin_console_link.url
+ 'http://localhost:9000/codeofconduct/console'
+
+ Let's follow the link
+
+ >>> admin_console_link.click()
+
+ We are in the Administration page
+
+ >>> browser.url
+ 'http://localhost:9000/codeofconduct/console'
+
+ >>> 'Administer code of conduct signatures' in browser.contents
+ True
+
+ >>> browser.getLink("register signatures").url
+ 'http://localhost:9000/codeofconduct/console/+new'
+
+
+ Back to the CoC front page let's see the current version of the CoC
+
+ >>> browser.open('http://localhost:9000/codeofconduct')
+ >>> browser.getLink('current version').click()
+
+ >>> 'Ubuntu Code of Conduct - 2.0' in browser.contents
+ True
+
+ >>> browser.getLink('Sign it').url
+ 'http://localhost:9000/codeofconduct/2.0/+sign'
+
+ >>> browser.getLink('Download this version').url
+ 'http://localhost:9000/codeofconduct/2.0/+download'
+
+
+= TearDown =
+
+ >>> feature_fixture.cleanUp()
=== modified file 'lib/lp/services/gpg/configure.zcml'
--- lib/lp/services/gpg/configure.zcml 2011-12-09 00:24:57 +0000
+++ lib/lp/services/gpg/configure.zcml 2016-03-01 19:34:04 +0000
@@ -19,6 +19,16 @@
<allow interface="lp.services.gpg.interfaces.IGPGHandler" />
</securedutility>
+ <class class="lp.services.gpg.handler.GPGClient">
+ <allow interface="lp.services.gpg.interfaces.IGPGClient" />
+ </class>
+
+ <securedutility
+ class="lp.services.gpg.handler.GPGClient"
+ provides="lp.services.gpg.interfaces.IGPGClient">
+ <allow interface="lp.services.gpg.interfaces.IGPGClient" />
+ </securedutility>
+
<class class="lp.services.gpg.handler.PymeSignature">
<allow interface="lp.services.gpg.interfaces.IPymeSignature" />
</class>
=== modified file 'lib/lp/services/gpg/handler.py'
--- lib/lp/services/gpg/handler.py 2015-09-26 02:55:36 +0000
+++ lib/lp/services/gpg/handler.py 2016-03-01 19:34:04 +0000
@@ -11,7 +11,9 @@
]
import atexit
+import base64
import httplib
+import json
import os
import shutil
import socket
@@ -21,9 +23,12 @@
import tempfile
import urllib
import urllib2
+from urlparse import urljoin
import gpgme
from lazr.restful.utils import get_current_browser_request
+import requests
+from requests.status_codes import codes as http_codes
from zope.interface import implementer
from lp.app.validators.email import valid_email
@@ -35,8 +40,10 @@
GPGKeyNotFoundError,
GPGKeyRevoked,
GPGKeyTemporarilyNotFoundError,
+ GPGServiceException,
GPGUploadFailure,
GPGVerificationError,
+ IGPGClient,
IGPGHandler,
IPymeKey,
IPymeSignature,
@@ -45,6 +52,7 @@
SecretGPGKeyImportDetected,
valid_fingerprint,
)
+from lp.services.openid.model.openididentifier import OpenIdIdentifier
from lp.services.timeline.requesttimeline import get_request_timeline
from lp.services.timeout import (
TimeoutError,
@@ -106,15 +114,7 @@
def sanitizeFingerprint(self, fingerprint):
"""See IGPGHandler."""
- # remove whitespaces, truncate to max of 40 (as per v4 keys) and
- # convert to upper case
- fingerprint = fingerprint.replace(' ', '')
- fingerprint = fingerprint[:40].upper()
-
- if not valid_fingerprint(fingerprint):
- return None
-
- return fingerprint
+ return sanitize_fingerprint(fingerprint)
def resetLocalState(self):
"""See IGPGHandler."""
@@ -619,3 +619,150 @@
self.name = uid.name
self.email = uid.email
self.comment = uid.comment
+
+
+def sanitize_fingerprint(fingerprint):
+ """Sanitize a GPG fingerprint.
+
+ This is the ultimate implementation of IGPGHandler.sanitizeFingerprint, and
+ is also used by the IGPGClient implementation.
+ """
+ # remove whitespaces, truncate to max of 40 (as per v4 keys) and
+ # convert to upper case
+ fingerprint = fingerprint.replace(' ', '')
+ fingerprint = fingerprint[:40].upper()
+
+ if not valid_fingerprint(fingerprint):
+ return None
+
+ return fingerprint
+
+
+def sanitize_fingerprint_or_raise(fingerprint):
+ """Check the sanity of 'fingerprint'.
+
+ If 'fingerprint' is a valid fingerprint, the sanitised version will be
+ returned (see sanitize_fingerprint).
+
+ Otherwise, a ValueError will be raised.
+ """
+ sane_fingerprint = sanitize_fingerprint(fingerprint)
+ if sane_fingerprint is None:
+ raise ValueError("Invalid fingerprint: %r." % fingerprint)
+ return sane_fingerprint
+
+
+@implementer(IGPGClient)
+class GPGClient:
+ """See IGPGClient."""
+
+ def __init__(self):
+ self.write_hooks = set()
+
+ def getKeysForOwner(self, owner_id):
+ """See IGPGClient."""
+ path = '/users/%s/keys' % self._encode_owner_id(owner_id)
+ resp = self._request('get', path)
+ if resp.status_code != http_codes['OK']:
+ self.raise_for_error(resp)
+ return resp.json()
+
+ def addKeyForOwner(self, owner_id, fingerprint):
+ """See IGPGClient."""
+ fingerprint = sanitize_fingerprint_or_raise(fingerprint)
+ path = '/users/%s/keys' % self._encode_owner_id(owner_id)
+ data = dict(fingerprint=fingerprint)
+ resp = self._request('post', path, data)
+ if resp.status_code == http_codes['CREATED']:
+ self._notify_writes()
+ elif resp.status_code != http_codes['OK']:
+ self.raise_for_error(resp)
+
+ def disableKeyForOwner(self, owner_id, fingerprint):
+ """See IGPGClient."""
+ fingerprint = sanitize_fingerprint_or_raise(fingerprint)
+ path = '/users/%s/keys/%s' % (self._encode_owner_id(owner_id), fingerprint)
+ resp = self._request('delete', path)
+ if resp.status_code == http_codes['OK']:
+ self._notify_writes()
+ else:
+ self.raise_for_error(resp)
+
+ def getKeyByFingerprint(self, fingerprint):
+ fingerprint = sanitize_fingerprint_or_raise(fingerprint)
+ path = '/keys/%s' % fingerprint
+ resp = self._request('get', path)
+ if resp.status_code == http_codes['OK']:
+ return resp.json()
+ elif resp.status_code == http_codes['NOT_FOUND']:
+ return None
+ else:
+ self.raise_for_error(resp)
+
+ def registerWriteHook(self, hook_callable):
+ """See IGPGClient."""
+ if not callable(hook_callable):
+ raise TypeError("'hook_callable' parameter must be a callable.")
+ self.write_hooks.add(hook_callable)
+
+ def unregisterWriteHook(self, hook_callable):
+ """See IGPGClient."""
+ if hook_callable not in self.write_hooks:
+ raise ValueError("%r not registered.")
+ self.write_hooks.remove(hook_callable)
+
+ def addKeyForTest(self, owner_id, keyid, fingerprint, keysize, algorithm, enabled,
+ can_encrypt):
+ """See IGPGClient."""
+ document = {'keys': [{
+ 'owner': owner_id,
+ 'id': keyid,
+ 'fingerprint': fingerprint,
+ 'size': keysize,
+ 'algorithm': algorithm,
+ 'enabled': enabled,
+ 'can_encrypt': can_encrypt}]}
+ path = '/test/add_keys'
+ resp = self._request('post', path, document)
+ if resp.status_code == http_codes['NOT_FOUND']:
+ raise RuntimeError(
+ "gpgservice was not configured with test endpoints enabled.")
+ elif resp.status_code != http_codes['OK']:
+ self.raise_for_error(resp)
+
+ def _notify_writes(self):
+ errors = []
+ for hook in self.write_hooks:
+ try:
+ hook()
+ except Exception as e:
+ errors.append(str(e))
+ if errors:
+ raise Exception("The operation succeeded, but one or more write"
+ " hooks failed: %s" % ', '.join(errors))
+
+ def _encode_owner_id(self, owner_id):
+ return base64.b64encode(owner_id, altchars='-_')
+
+ def raise_for_error(self, response):
+ """Raise GPGServiceException based on what's in 'response'."""
+ if response.headers['Content-Type'] == 'application/json':
+ message = response.json()['status']
+ else:
+ message = "Unhandled service error. HTTP Status: %d HTTP Body: %s" % (
+ response.status_code, response.content)
+ raise GPGServiceException(message)
+
+ @property
+ def timeout(self):
+ # Perhaps this should be from config?
+ return 30.0
+
+ @property
+ def endpoint(self):
+ return "http://{}".format(config.gpgservice.api_endpoint)
+
+ def _request(self, method, path, data=None, **kwargs):
+ response = getattr(requests, method)(
+ urljoin(self.endpoint, path), json=data, timeout=self.timeout, **kwargs)
+ return response
=== modified file 'lib/lp/services/gpg/interfaces.py'
--- lib/lp/services/gpg/interfaces.py 2016-02-10 00:51:55 +0000
+++ lib/lp/services/gpg/interfaces.py 2016-03-01 19:34:04 +0000
@@ -3,6 +3,8 @@
__all__ = [
'GPG_DATABASE_READONLY_FEATURE_FLAG',
+ 'GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG',
+ 'GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG',
'GPGKeyAlgorithm',
'GPGKeyDoesNotExistOnServer',
'GPGKeyExpired',
@@ -10,8 +12,10 @@
'GPGKeyRevoked',
'GPGKeyTemporarilyNotFoundError',
'GPGReadOnly',
+ 'GPGServiceException',
'GPGUploadFailure',
'GPGVerificationError',
+ 'IGPGClient',
'IGPGHandler',
'IPymeKey',
'IPymeSignature',
@@ -34,9 +38,7 @@
Attribute,
Interface,
)
-from zope.security.interfaces import (
- Forbidden,
- )
+from zope.security.interfaces import Forbidden
@error_status(httplib.FORBIDDEN)
@@ -50,6 +52,8 @@
GPG_DATABASE_READONLY_FEATURE_FLAG = u"gpg.database_read_only"
+GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG = u"gpg.write_to_gpgservice"
+GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG = u"gpg.read_from_gpgservice"
def valid_fingerprint(fingerprint):
@@ -421,3 +425,81 @@
name = Attribute("The name portion of this user ID")
email = Attribute("The email portion of this user ID")
comment = Attribute("The comment portion of this user ID")
+
+
+class GPGServiceException(Exception):
+
+ """Raised when we get an error from the gpgservice.
+
+ More specific errors for commonly encountered errors may be added once we
+ actually integrate gpgservice with the rest of launchpad.
+ """
+
+
+class IGPGClient(Interface):
+
+ """A client for querying a gpgservice instance."""
+
+ def getKeysForOwner(owner_id):
+ """Get a list of keys for a given owner.
+
+ :raises GPGServiceException: If we get an error from the gpgservice.
+ :raises socket.error" on socket-level errors (connection timeouts etc)
+ """
+
+ def addKeyForOwner(owner_id, fingerprint):
+ """Add a GPG key.
+
+ :raises ValueError: if the fingerprint isn't valid.
+ :raises GPGServiceException: If we get an error from the gpgservice.
+ :raises socket.error" on socket-level errors (connection timeouts etc)
+ """
+
+ def disableKeyForOwner(owner_id, fingerprint):
+ """Disable a GPG key.
+
+ :raises ValueError: if the fingerprint isn't valid.
+ :raises GPGServiceException: If we get an error from the gpgservice.
+ :raises socket.error" on socket-level errors (connection timeouts etc)
+ """
+
+ def getKeyByFingerprint(fingerprint):
+ """Get a GPG key by it's fingerprint.
+
+ :raises ValueError: if the fingerprint isn't valid.
+ """
+
+ def registerWriteHook(hook_callable):
+ """Register a write hook.
+
+ The hook_callable will be called with no arguments whenever an operation
+ is performed that modifies the GPG database.
+
+ :raises TypeError: if hook_callable is not a callable.
+ :raises GPGServiceException: If we get an error from the gpgservice.
+ """
+
+ def unregisterWriteHook(hook_callable):
+ """Deregister a write hook that was registered with register_write_hook.
+
+ :raises ValueError: if hook_callable was not registered.
+ """
+
+ def addKeyForTest(owner_id, keyid, fingerprint, keysize, algorithm, enabled,
+ can_encrypt):
+ """Add a key to the gpgservice without checking the keyserver.
+
+ This method is to be used for TESTING purposes only. The running
+ gpgservice instance must have its test methods configured - something
+ that should not be done in production. If this requirement is not met
+ a RuntimeError will be raised.
+
+ :param owner_id: A string representing the owner, as returned by
+ IGPGKeySet.getOwnerIdForPerson
+ :param keyid: A string describing the short-form gpg key id.
+ :param fingerprint: A string containing the full GPG fingerprint.
+ :param keysize: An integer, containing the keysize.
+ :param algorithm: The key algorithm code, a single letter.
+ :param enabled: Whether the key is enabled or not.
+ :param can_encrypt: Whether the key can be used for encryption.
+ """
=== modified file 'lib/lp/services/gpg/tests/test_gpghandler.py'
--- lib/lp/services/gpg/tests/test_gpghandler.py 2015-09-26 02:55:36 +0000
+++ lib/lp/services/gpg/tests/test_gpghandler.py 2016-03-01 19:34:04 +0000
@@ -1,15 +1,37 @@
# Copyright 2009-2015 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
+import random
+import string
+
+from testtools.matchers import (
+ Contains,
+ ContainsDict,
+ Equals,
+ HasLength,
+ Not,
+ raises,
+ )
from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
+from lp.registry.interfaces.gpg import IGPGKeySet
+from lp.registry.interfaces.person import IPersonSet
+from lp.services.config.fixture import (
+ ConfigFixture,
+ ConfigUseFixture,
+ )
+from lp.services.gpg.handler import GPGClient
from lp.services.gpg.interfaces import (
+ GPGKeyAlgorithm,
GPGKeyDoesNotExistOnServer,
GPGKeyTemporarilyNotFoundError,
+ GPGServiceException,
+ IGPGClient,
IGPGHandler,
)
from lp.services.log.logger import BufferLogger
+from lp.services.openid.model.openididentifier import OpenIdIdentifier
from lp.services.timeout import (
get_default_timeout_function,
set_default_timeout_function,
@@ -20,14 +42,21 @@
logout,
TestCase,
)
+from lp.testing.factory import BareLaunchpadObjectFactory
+from lp.testing.fakemethod import FakeMethod
from lp.testing.gpgkeys import (
import_secret_test_key,
iter_test_key_emails,
test_keyrings,
test_pubkey_from_email,
)
+from lp.testing.gpgservice import GPGKeyServiceFixture
from lp.testing.keyserver import KeyServerTac
-from lp.testing.layers import LaunchpadFunctionalLayer
+from lp.testing.layers import (
+ GPGServiceLayer,
+ LaunchpadFunctionalLayer,
+ ZopelessDatabaseLayer,
+ )
class TestImportKeyRing(TestCase):
@@ -184,3 +213,188 @@
self.assertRaises(
GPGKeyDoesNotExistOnServer,
removeSecurityProxy(self.gpg_handler)._getPubKey, fingerprint)
+
+
+class GPGServiceZopelessLayer(ZopelessDatabaseLayer, GPGServiceLayer):
+ """A layer specifically for running the IGPGClient utility tests."""
+
+ @classmethod
+ def setUp(cls):
+ pass
+
+ @classmethod
+ def tearDown(cls):
+ pass
+
+ @classmethod
+ def testSetUp(cls):
+ pass
+
+ @classmethod
+ def testTearDown(cls):
+ pass
+
+
+class GPGClientTests(TestCase):
+
+ layer = GPGServiceZopelessLayer
+
+ def setUp(self):
+ super(GPGClientTests, self).setUp()
+ self.factory = BareLaunchpadObjectFactory()
+
+ def test_can_get_utility(self):
+ client = getUtility(IGPGClient)
+ self.assertIsNot(None, client)
+
+ def get_random_owner_id_string(self):
+ """Get a random string that's representative of the owner id scheme."""
+ candidates = string.ascii_lowercase + string.digits
+ openid_id = ''.join((random.choice(candidates) for i in range(6)))
+ return 'http://testopenid.dev/+id/' + openid_id
+
+ def test_get_key_for_user_with_sampledata(self):
+ client = getUtility(IGPGClient)
+ person = getUtility(IPersonSet).getByName('name16')
+ openid_id = getUtility(IGPGKeySet).getOwnerIdForPerson(person)
+ data = client.getKeysForOwner(openid_id)
+ self.assertThat(data, ContainsDict({'keys': HasLength(1)}))
+
+ def test_get_key_for_unknown_user(self):
+ client = getUtility(IGPGClient)
+ user = self.get_random_owner_id_string()
+ data = client.getKeysForOwner(user)
+ self.assertThat(data, ContainsDict({'keys': HasLength(0)}))
+
+ def test_register_non_callable_raises_TypeError(self):
+ client = getUtility(IGPGClient)
+ self.assertThat(
+ lambda: client.registerWriteHook("not a callable"),
+ raises(TypeError))
+
+ def test_unregister_with_unregistered_hook_raises_ValueError(self):
+ client = getUtility(IGPGClient)
+ self.assertThat(
+ lambda: client.unregisterWriteHook("not registered"),
+ raises(ValueError))
+
+ def test_can_unregister_registered_write_hook(self):
+ client = getUtility(IGPGClient)
+ hook = FakeMethod()
+ client.registerWriteHook(hook)
+ client.unregisterWriteHook(hook)
+
+ self.assertThat(
+ lambda: client.unregisterWriteHook(hook),
+ raises(ValueError))
+
+ def test_can_add_new_fingerprint_for_user(self):
+ self.useFixture(KeyServerTac())
+ client = getUtility(IGPGClient)
+ fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
+ user = self.get_random_owner_id_string()
+ client.addKeyForOwner(user, fingerprint)
+ data = client.getKeysForOwner(user)
+ self.assertThat(data, ContainsDict({'keys': HasLength(1)}))
+ keys = data['keys']
+ self.assertThat(
+ keys[0],
+ ContainsDict({
+ 'fingerprint': Equals(fingerprint),
+ 'enabled': Equals(True)
+ }))
+
+ def test_adding_fingerprint_notifies_writes(self):
+ self.useFixture(KeyServerTac())
+ client = getUtility(IGPGClient)
+ hook = FakeMethod()
+ client.registerWriteHook(hook)
+ self.addCleanup(client.unregisterWriteHook, hook)
+ fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
+ user = self.get_random_owner_id_string()
+ client.addKeyForOwner(user, fingerprint)
+
+ self.assertThat(hook.call_count, Equals(1))
+
+ def test_adding_invalid_fingerprint_raises_ValueError(self):
+ client = getUtility(IGPGClient)
+ self.assertThat(
+ lambda: client.addKeyForOwner(self.get_random_owner_id_string(), ''),
+ raises(ValueError("Invalid fingerprint: ''.")))
+
+ def test_adding_duplicate_fingerprint_raises_GPGServiceException(self):
+ self.useFixture(KeyServerTac())
+ client = getUtility(IGPGClient)
+ fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
+ user_one = self.get_random_owner_id_string()
+ user_two = self.get_random_owner_id_string()
+ client.addKeyForOwner(user_one, fingerprint)
+ self.assertThat(
+ lambda: client.addKeyForOwner(user_two, fingerprint),
+ raises(GPGServiceException("Error: Fingerprint already in database.")))
+
+ def test_disabling_active_key(self):
+ client = getUtility(IGPGClient)
+ fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
+ user = self.get_random_owner_id_string()
+ client.addKeyForOwner(user, fingerprint)
+ client.disableKeyForOwner(user, fingerprint)
+ data = client.getKeysForOwner(user)
+
+ self.assertThat(data, ContainsDict({'keys': HasLength(1)}))
+ keys = data['keys']
+ self.assertThat(keys[0], ContainsDict({'enabled': Equals(False)}))
+
+ def test_disabling_key_notifies_writes(self):
+ client = getUtility(IGPGClient)
+ fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
+ user = self.get_random_owner_id_string()
+ client.addKeyForOwner(user, fingerprint)
+
+ hook = FakeMethod()
+ client.registerWriteHook(hook)
+ self.addCleanup(client.unregisterWriteHook, hook)
+ client.disableKeyForOwner(user, fingerprint)
+ self.assertThat(hook.call_count, Equals(1))
+
+ def test_disabling_invalid_fingerprint_raises_ValueError(self):
+ client = getUtility(IGPGClient)
+ self.assertThat(
+ lambda: client.disableKeyForOwner(self.get_random_owner_id_string(), ''),
+ raises(ValueError("Invalid fingerprint: ''."))
+ )
+
+ def test_can_get_key_by_fingerprint(self):
+ client = getUtility(IGPGClient)
+ fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
+ user = self.get_random_owner_id_string()
+ client.addKeyForOwner(user, fingerprint)
+
+ key = client.getKeyByFingerprint(fingerprint)
+ self.assertThat(
+ key, ContainsDict({'owner': Equals(user),
+ 'fingerprint': Equals(fingerprint)}))
+
+ def test_get_missing_key_by_fingerprint(self):
+ client = getUtility(IGPGClient)
+ fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
+ self.assertIsNone(client.getKeyByFingerprint(fingerprint))
+
+ def test_get_key_with_bad_fingerprint_raises_ValueError(self):
+ client = getUtility(IGPGClient)
+ self.assertThat(lambda: client.getKeyByFingerprint('bad fingerprint'),
+ raises(ValueError))
+
+ def test_can_add_IGPGKey_to_test_enabled_gpgservice(self):
+ client = getUtility(IGPGClient)
+ person = self.factory.makePerson()
+ gpgkey = self.factory.makeGPGKey(person)
+ user = self.get_random_owner_id_string()
+ client.addKeyForTest(user, gpgkey.keyid, gpgkey.fingerprint,
+ gpgkey.keysize, gpgkey.algorithm.name,
+ gpgkey.active, gpgkey.can_encrypt)
+
+ key = client.getKeyByFingerprint(gpgkey.fingerprint)
+ self.assertThat(
+ key, ContainsDict({'owner': Equals(user),
+ 'fingerprint': Equals(gpgkey.fingerprint)}))
=== modified file 'lib/lp/testing/factory.py'
--- lib/lp/testing/factory.py 2016-02-06 02:20:04 +0000
+++ lib/lp/testing/factory.py 2016-03-01 19:34:04 +0000
@@ -231,7 +231,9 @@
)
from lp.services.database.policy import MasterDatabasePolicy
from lp.services.database.sqlbase import flush_database_updates
+from lp.services.features import getFeatureFlag
from lp.services.gpg.interfaces import (
+ GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG,
GPGKeyAlgorithm,
IGPGHandler,
)
@@ -586,7 +588,8 @@
"""Give 'owner' a crappy GPG key for the purposes of testing."""
key_id = self.getUniqueHexString(digits=8).upper()
fingerprint = key_id + 'A' * 32
- return getUtility(IGPGKeySet).new(
+ keyset = getUtility(IGPGKeySet)
+ key = keyset.new(
owner.id,
keyid=key_id,
fingerprint=fingerprint,
@@ -594,6 +597,13 @@
algorithm=GPGKeyAlgorithm.R,
active=True,
can_encrypt=False)
+ if getFeatureFlag(GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG):
+ client = getUtility(IGPGClient)
+ openid_identifier = keyset.getOwnerIdForPerson(owner)
+ client.addKeyForTest(
+ openid_identifier, key.keyid, key.fingerprint, key.keysize,
+ key.algorithm.name, key.active, key.can_encrypt)
+ return key
def makePerson(
self, email=None, name=None, displayname=None, account_status=None,
=== modified file 'lib/lp/testing/gpgkeys/__init__.py'
--- lib/lp/testing/gpgkeys/__init__.py 2012-12-26 01:12:37 +0000
+++ lib/lp/testing/gpgkeys/__init__.py 2016-03-01 19:34:04 +0000
@@ -27,12 +27,13 @@
from lp.registry.interfaces.gpg import IGPGKeySet
from lp.registry.interfaces.person import IPersonSet
+from lp.services.features import getFeatureFlag
from lp.services.gpg.interfaces import (
+ GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG,
GPGKeyAlgorithm,
IGPGHandler,
)
-
gpgkeysdir = os.path.join(os.path.dirname(__file__), 'data')
@@ -64,13 +65,20 @@
return
# Insert the key into the database.
- getUtility(IGPGKeySet).new(
- ownerID=personset.getByEmail(email_addr).id,
+ keyset = getUtility(IGPGKeySet)
+ key = keyset.new(
+ ownerID=person.id,
keyid=key.keyid,
fingerprint=key.fingerprint,
keysize=key.keysize,
algorithm=GPGKeyAlgorithm.items[key.algorithm],
active=(not key.revoked))
+ if getFeatureFlag(GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG):
+ client = getUtility(IGPGClient)
+ openid_identifier = keyset.getOwnerIdForPerson(person)
+ client.addKeyForTest(
+ openid_identifier, key.keyid, key.fingerprint, key.keysize,
+ key.algorithm.name, key.active, key.can_encrypt)
def iter_test_key_emails():
=== modified file 'lib/lp/testing/gpgservice/_fixture.py'
--- lib/lp/testing/gpgservice/_fixture.py 2016-02-16 05:36:37 +0000
+++ lib/lp/testing/gpgservice/_fixture.py 2016-03-01 19:34:04 +0000
@@ -99,7 +99,7 @@
test_data = {
'keys': [
{
- 'owner': 'name16_oid',
+ 'owner': config.launchpad.openid_provider_root + '+id/name16_oid',
'id': '12345678',
'fingerprint': 'ABCDEF0123456789ABCDDCBA0000111112345678',
'size': 1024,
=== modified file 'lib/lp/testing/gpgservice/tests/test_fixture.py'
--- lib/lp/testing/gpgservice/tests/test_fixture.py 2016-02-16 03:12:53 +0000
+++ lib/lp/testing/gpgservice/tests/test_fixture.py 2016-03-01 19:34:04 +0000
@@ -3,6 +3,7 @@
from __future__ import absolute_import
+import base64
import json
import httplib
@@ -46,7 +47,9 @@
def test_fixture_can_create_test_data(self):
fixture = self.useFixture(GPGKeyServiceFixture())
conn = httplib.HTTPConnection(fixture.bind_address)
- conn.request('GET', '/users/name16_oid/keys')
+ user = base64.b64encode(
+ config.launchpad.openid_provider_root + '+id/name16_oid', altchars='-_')
+ conn.request('GET', '/users/%s/keys' % user)
resp = conn.getresponse()
self.assertEqual(200, resp.status)
data = json.loads(resp.read())
=== modified file 'lib/lp/testing/layers.py'
--- lib/lp/testing/layers.py 2015-12-16 11:48:10 +0000
+++ lib/lp/testing/layers.py 2016-03-01 19:34:04 +0000
@@ -28,6 +28,7 @@
'FunctionalLayer',
'GoogleLaunchpadFunctionalLayer',
'GoogleServiceLayer',
+ 'GPGServiceLayer',
'LaunchpadFunctionalLayer',
'LaunchpadLayer',
'LaunchpadScriptLayer',
@@ -98,6 +99,7 @@
endInteraction,
getSecurityPolicy,
)
+from zope.security.proxy import removeSecurityProxy
from zope.server.logger.pythonlogger import PythonLogger
from lp.services import pidfile
@@ -116,6 +118,7 @@
from lp.services.googlesearch.tests.googleserviceharness import (
GoogleServiceTestSetup,
)
+from lp.services.gpg.interfaces import IGPGClient
from lp.services.job.tests import celery_worker
from lp.services.librarian.model import LibraryFileAlias
from lp.services.librarianserver.testing.server import LibrarianServerFixture
@@ -147,6 +150,7 @@
logout,
reset_logging,
)
+from lp.testing.gpgservice import GPGKeyServiceFixture
from lp.testing.pgsql import PgTestSetup
from lp.testing.smtpd import SMTPController
@@ -1161,6 +1165,45 @@
logout()
+class GPGServiceLayer(BaseLayer):
+
+ service_fixture = None
+ gpgservice_needs_reset = False
+
+ @classmethod
+ @profiled
+ def setUp(cls):
+ gpg_client = removeSecurityProxy(getUtility(IGPGClient))
+ gpg_client.registerWriteHook(cls._on_gpgservice_write)
+ cls.service_fixture = GPGKeyServiceFixture(BaseLayer.config_fixture)
+ cls.service_fixture.setUp()
+
+ @classmethod
+ @profiled
+ def tearDown(cls):
+ gpg_client = removeSecurityProxy(getUtility(IGPGClient))
+ gpg_client.unregisterWriteHook(cls._on_gpgservice_write)
+ cls.service_fixture.cleanUp()
+ cls.service_fixture = None
+ logout()
+
+ @classmethod
+ @profiled
+ def testSetUp(cls):
+ pass
+
+ @classmethod
+ @profiled
+ def testTearDown(cls):
+ if cls.gpgservice_needs_reset:
+ cls.service_fixture.reset_service_database()
+ cls.gpgservice_needs_reset = False
+
+ @classmethod
+ def _on_gpgservice_write(cls):
+ cls.gpgservice_needs_reset = True
+
+
class TwistedLayer(BaseLayer):
"""A layer for cleaning up the Twisted thread pool."""
@@ -1289,7 +1332,7 @@
disconnect_stores()
-class LaunchpadFunctionalLayer(LaunchpadLayer, FunctionalLayer):
+class LaunchpadFunctionalLayer(LaunchpadLayer, FunctionalLayer, GPGServiceLayer):
"""Provides the Launchpad Zope3 application server environment."""
@classmethod
@@ -1453,7 +1496,7 @@
host = 'localhost'
-class LaunchpadZopelessLayer(LaunchpadScriptLayer):
+class LaunchpadZopelessLayer(LaunchpadScriptLayer, GPGServiceLayer):
"""Full Zopeless environment including Component Architecture and
database connections initialized.
"""
=== modified file 'versions.cfg'
--- versions.cfg 2016-02-15 00:54:24 +0000
+++ versions.cfg 2016-03-01 19:34:04 +0000
@@ -38,7 +38,7 @@
flask = 0.10.1
FormEncode = 1.2.4
funkload = 1.16.1
-gpgservice = 0.1.0
+gpgservice = 0.1.2
grokcore.component = 1.6
gunicorn = 19.4.5
html5browser = 0.0.9
References