launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #21154
[Merge] lp:~thomir/launchpad/kill-gpgservice-with-fire-and-stakes into lp:launchpad
Thomi Richards has proposed merging lp:~thomir/launchpad/kill-gpgservice-with-fire-and-stakes into lp:launchpad.
Commit message:
Remove gpgservice integration.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~thomir/launchpad/kill-gpgservice-with-fire-and-stakes/+merge/309968
This branch removed launchpad's gpgservice integration.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~thomir/launchpad/kill-gpgservice-with-fire-and-stakes into lp:launchpad.
=== modified file 'buildout.cfg'
--- buildout.cfg 2016-03-10 01:52:49 +0000
+++ buildout.cfg 2016-11-03 15:22:08 +0000
@@ -10,7 +10,6 @@
i18n
txlongpoll
txpkgupload
- gpgservice
unzip = true
eggs-directory = eggs
download-cache = download-cache
@@ -106,12 +105,3 @@
initialization = ${scripts:initialization}
entry-points = twistd-for-txpkgupload=twisted.scripts.twistd:run
scripts = twistd-for-txpkgupload
-
-[gpgservice]
-recipe = z3c.recipe.scripts
-eggs = ${scripts:eggs}
- gpgservice
-include-site-packages = false
-initialization = ${scripts:initialization}
-entry-points = gunicorn-for-gpgservice=gunicorn.app.wsgiapp:run
-scripts = gunicorn-for-gpgservice
=== modified file 'lib/lp/archivepublisher/tests/test_publishdistro.py'
--- lib/lp/archivepublisher/tests/test_publishdistro.py 2016-03-21 15:45:25 +0000
+++ lib/lp/archivepublisher/tests/test_publishdistro.py 2016-11-03 15:22:08 +0000
@@ -28,8 +28,6 @@
from lp.registry.interfaces.person import IPersonSet
from lp.registry.interfaces.pocket import PackagePublishingPocket
from lp.services.config import config
-from lp.services.features.testing import FeatureFixture
-from lp.services.gpg.interfaces import GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG
from lp.services.log.logger import (
BufferLogger,
DevNullLogger,
@@ -439,21 +437,6 @@
self.assertThat(breezy_inrelease_path, PathExists())
-class TestPublishDistroWithGPGService(TestPublishDistro):
- """A copy of the TestPublishDistro tests, but with the gpgservice feature
- flag enabled.
-
- Once gpgservice is the default and launchpad no longer manages it's own gpg
- key storage, these tests can be removed.
-
- """
-
- def setUp(self):
- super(TestPublishDistroWithGPGService, self).setUp()
- self.useFixture(FeatureFixture(
- {GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG: True}))
-
-
class FakeArchive:
"""A very simple fake `Archive`."""
def __init__(self, purpose=ArchivePurpose.PRIMARY):
=== modified file 'lib/lp/registry/browser/person.py'
--- lib/lp/registry/browser/person.py 2016-09-24 06:30:20 +0000
+++ lib/lp/registry/browser/person.py 2016-11-03 15:22:08 +0000
@@ -195,10 +195,7 @@
from lp.services.feeds.browser import FeedsMixin
from lp.services.geoip.interfaces import IRequestPreferredLanguages
from lp.services.gpg.interfaces import (
- GPG_DATABASE_READONLY_FEATURE_FLAG,
- GPG_HIDE_PERSON_KEY_LISTING,
GPGKeyNotFoundError,
- GPGReadOnly,
IGPGHandler,
)
from lp.services.identity.interfaces.account import (
@@ -1690,8 +1687,6 @@
It's shown when the person has OpenPGP keys registered or has rights
to register new ones.
"""
- if getFeatureFlag(GPG_HIDE_PERSON_KEY_LISTING):
- return False
return bool(self.gpg_keys) or (
check_permission('launchpad.Edit', self.context))
@@ -2522,8 +2517,6 @@
def form_action(self):
if self.request.method != "POST":
return ''
- if getFeatureFlag(GPG_DATABASE_READONLY_FEATURE_FLAG):
- raise GPGReadOnly()
permitted_actions = [
'claim_gpg',
'deactivate_gpg',
=== modified file 'lib/lp/registry/browser/tests/test_gpgkey.py'
--- lib/lp/registry/browser/tests/test_gpgkey.py 2016-07-25 00:07:01 +0000
+++ lib/lp/registry/browser/tests/test_gpgkey.py 2016-11-03 15:22:08 +0000
@@ -5,17 +5,6 @@
__metaclass__ = type
-from testtools.matchers import (
- Not,
- Raises,
- raises,
- )
-
-from lp.services.features.testing import FeatureFixture
-from lp.services.gpg.interfaces import (
- GPG_DATABASE_READONLY_FEATURE_FLAG,
- GPGReadOnly,
- )
from lp.services.webapp import canonical_url
from lp.testing import (
login_person,
@@ -53,25 +42,3 @@
expected_url = (
'%s/+editpgpkeys/+login?reauth=1' % canonical_url(person))
self.assertEqual(expected_url, response.getHeader('location'))
-
- def test_gpgkeys_POST_readonly_with_feature_flag_set(self):
- self.useFixture(FeatureFixture({
- GPG_DATABASE_READONLY_FEATURE_FLAG: True,
- }))
- person = self.factory.makePerson()
- login_person(person)
- view = create_initialized_view(
- person, "+editpgpkeys", principal=person, method='POST',
- have_fresh_login=True)
- self.assertThat(view.render, raises(GPGReadOnly))
-
- def test_gpgkeys_GET_readonly_with_feature_flag_set(self):
- self.useFixture(FeatureFixture({
- GPG_DATABASE_READONLY_FEATURE_FLAG: True,
- }))
- person = self.factory.makePerson()
- login_person(person)
- view = create_initialized_view(
- person, "+editpgpkeys", principal=person, method='GET',
- have_fresh_login=True)
- self.assertThat(view.render, Not(Raises()))
=== modified file 'lib/lp/registry/browser/tests/test_person.py'
--- lib/lp/registry/browser/tests/test_person.py 2016-06-21 01:32:35 +0000
+++ lib/lp/registry/browser/tests/test_person.py 2016-11-03 15:22:08 +0000
@@ -42,11 +42,6 @@
from lp.registry.model.milestone import milestone_sort_key
from lp.scripts.garbo import PopulateLatestPersonSourcePackageReleaseCache
from lp.services.config import config
-from lp.services.features.testing import FeatureFixture
-from lp.services.gpg.interfaces import (
- GPG_HIDE_PERSON_KEY_LISTING,
- GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG,
- )
from lp.services.identity.interfaces.account import AccountStatus
from lp.services.identity.interfaces.emailaddress import IEmailAddressSet
from lp.services.log.logger import FakeLogger
@@ -355,68 +350,6 @@
view = create_initialized_view(person, '+index')
self.assertTrue(view.should_show_gpgkeys_section)
- def test_gpg_keys_now_shown_for_owner_with_hide_keys_ff(self):
- self.useFixture(FeatureFixture({
- GPG_HIDE_PERSON_KEY_LISTING: True,
- }))
- person = self.factory.makePerson()
- with person_logged_in(person):
- view = create_initialized_view(person, '+index')
- self.assertFalse(view.should_show_gpgkeys_section)
-
- def test_gpg_keys_not_shown_for_user_with_gpg_keys_with_hide_keys_ff(self):
- self.useFixture(FeatureFixture({
- GPG_HIDE_PERSON_KEY_LISTING: True,
- }))
- person = self.factory.makePerson()
- self.factory.makeGPGKey(person)
- view = create_initialized_view(person, '+index')
- self.assertFalse(view.should_show_gpgkeys_section)
-
-
-class PersonIndexViewGPGTimelineTests(BrowserTestCase):
-
- layer = LaunchpadFunctionalLayer
-
- def get_markup(self, view, person):
- def fake_method():
- return canonical_url(person)
- with monkey_patch(view, _getURL=fake_method):
- markup = view.render()
- return markup
-
- def get_timeline_for_person_index_view_render(self):
- person = self.factory.makePerson()
- self.factory.makeGPGKey(person)
- view = create_initialized_view(person, '+index')
- self.get_markup(view, person)
- return get_request_timeline(
- get_current_browser_request())
-
- def assert_num_gpgservice_timeline_actions(self, expected_count, timeline):
- self.assertEqual(
- expected_count,
- len([a for a in timeline.actions if 'gpgservice' in a.category]))
-
- def test_hide_keys_ff_no_timeline_action(self):
- self.useFixture(FeatureFixture({
- GPG_HIDE_PERSON_KEY_LISTING: True,
- GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG: True,
- }))
- timeline = self.get_timeline_for_person_index_view_render()
-
- self.assert_num_gpgservice_timeline_actions(0, timeline)
-
- def test_one_gpg_timeline_action_when_rendering_view(self):
- self.useFixture(FeatureFixture({
- GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG: True,
- }))
- timeline = self.get_timeline_for_person_index_view_render()
-
- self.assert_num_gpgservice_timeline_actions(1, timeline)
-
-
-
class TestPersonViewKarma(TestCaseWithFactory):
=== modified file 'lib/lp/registry/interfaces/gpg.py'
--- lib/lp/registry/interfaces/gpg.py 2016-07-22 03:07:45 +0000
+++ lib/lp/registry/interfaces/gpg.py 2016-11-03 15:22:08 +0000
@@ -83,9 +83,6 @@
inactive ones.
"""
- def getOwnerIdForPerson(person):
- """return an owner id string suitable for sending to gpgservice."""
-
def getAllOwnerIdsForPerson(person):
"""Return all owner ids for a given person."""
=== modified file 'lib/lp/registry/model/gpgkey.py'
--- lib/lp/registry/model/gpgkey.py 2016-07-22 03:07:45 +0000
+++ lib/lp/registry/model/gpgkey.py 2016-11-03 15:22:08 +0000
@@ -25,17 +25,12 @@
SQLBase,
sqlvalues,
)
-from lp.services.features import getFeatureFlag
from lp.services.gpg.interfaces import (
- GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG,
- GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG,
GPGKeyAlgorithm,
- IGPGClient,
IGPGHandler,
)
from lp.services.openid.interfaces.openid import IOpenIDPersistentIdentity
from lp.services.openid.model.openididentifier import OpenIdIdentifier
-from lp.services.verification.interfaces.logintoken import ILoginTokenSet
@implementer(IGPGKey)
@@ -154,27 +149,6 @@
lp_key = self.new(
ownerID, keyid, fingerprint, keysize, algorithm,
can_encrypt=can_encrypt)
- if getFeatureFlag(GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG):
- # XXX: Further to the comment above, if READ_FROM_GPGSERVICE FF is
- # set then we need to duplicate the block above but reading from
- # the gpgservice instead of the database:
- client = getUtility(IGPGClient)
- owner_id = self.getOwnerIdForPerson(requester)
- # Users with more than one openid identifier may be re-activating
- # a key that was previously deactivated with their non-default
- # openid identifier. If that's the case, use the same openid
- # identifier rather than the default one - this happens even if the
- # read FF is not set:
- key_data = client.getKeyByFingerprint(fingerprint)
- if key_data:
- owner_id = key_data['owner']
- allowed_owner_ids = self.getAllOwnerIdsForPerson(requester)
- assert owner_id in allowed_owner_ids
- gpgservice_key = GPGServiceKey(
- client.addKeyForOwner(owner_id, key.fingerprint))
- if getFeatureFlag(GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG):
- is_new = key_data is None
- lp_key = gpgservice_key
return lp_key, is_new
def deactivate(self, key):
@@ -182,84 +156,35 @@
# active attribute. Retrieve it by fingerprint:
lp_key = GPGKey.selectOneBy(fingerprint=key.fingerprint)
lp_key.active = False
- if getFeatureFlag(GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG):
- # Users with more than one openid identifier may be
- # deactivating a key that is associated with their
- # non-default openid identifier. If that's the case, use
- # the same openid identifier rather than the default one:
- client = getUtility(IGPGClient)
- key_data = client.getKeyByFingerprint(key.fingerprint)
- if not key_data:
- # We get here if we're asked to deactivate a key that was never
- # activated. This should probably never happen.
- return
- openid_identifier = key_data['owner']
- client.disableKeyForOwner(openid_identifier, key.fingerprint)
def getByFingerprint(self, fingerprint, default=None):
"""See `IGPGKeySet`"""
- if getFeatureFlag(GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG):
- key_data = getUtility(IGPGClient).getKeyByFingerprint(fingerprint)
- return GPGServiceKey(key_data) if key_data else default
- else:
- result = GPGKey.selectOneBy(fingerprint=fingerprint)
- if result is None:
- return default
- return result
+ result = GPGKey.selectOneBy(fingerprint=fingerprint)
+ if result is None:
+ return default
+ return result
def getByFingerprints(self, fingerprints):
"""See `IGPGKeySet`"""
fingerprints = list(fingerprints)
- if getFeatureFlag(GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG):
- client = getUtility(IGPGClient)
- return [
- GPGServiceKey(key_data)
- for key_data in client.getKeysByFingerprints(fingerprints)]
- else:
- return list(IStore(GPGKey).find(
- GPGKey, GPGKey.fingerprint.is_in(fingerprints)))
+ return list(IStore(GPGKey).find(
+ GPGKey, GPGKey.fingerprint.is_in(fingerprints)))
def getGPGKeysForPerson(self, owner, active=True):
- if getFeatureFlag(GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG):
- client = getUtility(IGPGClient)
- owner_ids = self.getAllOwnerIdsForPerson(owner)
- if not owner_ids:
- return []
- gpg_keys = []
- for owner_id in owner_ids:
- key_data_list = client.getKeysForOwner(owner_id)['keys']
- gpg_keys.extend([
- GPGServiceKey(d) for d in key_data_list
- if d['enabled'] == active])
- if active is False:
- login_tokens = getUtility(ILoginTokenSet).getPendingGPGKeys(
- owner.id)
- token_fingerprints = [t.fingerprint for t in login_tokens]
- return [
- k for k in gpg_keys
- if k.fingerprint not in token_fingerprints]
- return gpg_keys
+ if active is False:
+ query = """
+ active = false
+ AND fingerprint NOT IN
+ (SELECT fingerprint FROM LoginToken
+ WHERE fingerprint IS NOT NULL
+ AND requester = %s
+ AND date_consumed is NULL
+ )
+ """ % sqlvalues(owner.id)
else:
- if active is False:
- query = """
- active = false
- AND fingerprint NOT IN
- (SELECT fingerprint FROM LoginToken
- WHERE fingerprint IS NOT NULL
- AND requester = %s
- AND date_consumed is NULL
- )
- """ % sqlvalues(owner.id)
- else:
- query = 'active=true'
- query += ' AND owner=%s' % sqlvalues(owner.id)
- return list(GPGKey.select(query, orderBy='id'))
-
- def getOwnerIdForPerson(self, owner):
- """See IGPGKeySet."""
- url = IOpenIDPersistentIdentity(owner).openid_canonical_url
- assert url is not None
- return url
+ query = 'active=true'
+ query += ' AND owner=%s' % sqlvalues(owner.id)
+ return list(GPGKey.select(query, orderBy='id'))
def getAllOwnerIdsForPerson(self, owner):
identifiers = IStore(OpenIdIdentifier).find(
=== removed file 'lib/lp/registry/stories/gpg-coc/gpg-with-gpgservice-ff.txt'
--- lib/lp/registry/stories/gpg-coc/gpg-with-gpgservice-ff.txt 2016-03-23 17:55:39 +0000
+++ lib/lp/registry/stories/gpg-coc/gpg-with-gpgservice-ff.txt 1970-01-01 00:00:00 +0000
@@ -1,692 +0,0 @@
-= Claiming GPG Keys =
-
-XXX: This file was copied from 'xx-gpg-coc.txt'. The only difference is that we
- set a feature flag that means that writes are sent to gpgservice. This file
- was used because it contains a reasonably complete set of gpg-related
- actions. Eventually gpg keys will be read-only, so most, if not all of these
- tests will be deleted.
-
- >>> from lp.services.features.testing import FeatureFixture
- >>> from lp.services.gpg.interfaces import (
- ... GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG,
- ... GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG,
- ... )
- >>> feature_fixture = FeatureFixture(
- ... {GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG: True,
- ... GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG: True})
- >>> feature_fixture.setUp()
-
-== Setup ==
-
- >>> import email
- >>> from lp.testing.keyserver import KeyServerTac
- >>> from lp.services.mail import stub
- >>> from zope.component import getUtility
- >>> from lp.registry.interfaces.person import IPersonSet
- >>> from lp.testing.pages import setupBrowserFreshLogin
-
-Set up the stub KeyServer:
-
- >>> tac = KeyServerTac()
- >>> tac.setUp()
-
-
-== Claim an encrypting GPG key ==
-
-This test verifies the basic claim a GPG key workflow.
-
-Start out with a clean page containing no imported keys:
-
- >>> login(ANONYMOUS)
- >>> name12 = getUtility(IPersonSet).getByEmail('test@xxxxxxxxxxxxx')
- >>> logout()
- >>> browser = setupBrowserFreshLogin(name12)
- >>> browser.open("http://launchpad.dev/~name12")
- >>> browser.getLink(url='+editpgpkeys').click()
- >>> print browser.title
- Change your OpenPGP keys...
-
- >>> browser.getControl(name='DEACTIVATE_GPGKEY')
- Traceback (most recent call last):
- ...
- LookupError: name 'DEACTIVATE_GPGKEY'
-
-Claim OpenPGP key:
-
- >>> key = "A419AE861E88BC9E04B9C26FBA2B9389DFD20543"
- >>> browser.getControl(name='fingerprint').value = key
- >>> browser.getControl(name='import').click()
- >>> print_feedback_messages(browser.contents)
- A message has been sent to test@xxxxxxxxxxxxx, encrypted
- with the key 1024D/DFD20543.
- To confirm the key is yours, decrypt the message and follow the
- link inside.
-
-Recover token URL from the encrypted part, but also make sure there's a clear
-text part that provides useful information to users who -- for whatever reason
--- cannot decrypt the token url. Start by grabbing the confirmation message.
-
- >>> from_addr, to_addrs, raw_msg = stub.test_emails.pop()
- >>> msg = email.message_from_string(raw_msg)
- >>> msg.get_content_type()
- 'text/plain'
-
-The message will be a single text/plain part with clear text instructions,
-followed by ASCII armored encrypted confirmation instructions. Ensure that
-the clear text instructions contain the expected URLs pointing to more help.
-
- >>> cipher_body = msg.get_payload(decode=True)
- >>> print cipher_body
- Hello,
- <BLANKLINE>
- This message contains the instructions for confirming registration of an
- OpenPGP key for use in Launchpad. The confirmation instructions have been
- encrypted with the OpenPGP key you have attempted to register. If you cannot
- read the unencrypted instructions below, it may be because your mail reader
- does not support automatic decryption of "ASCII armored" encrypted text.
- <BLANKLINE>
- Exact instructions for enabling this depends on the specific mail reader you
- are using. Please see this support page for more information:
- <BLANKLINE>
- https://help.launchpad.net/ReadingOpenPgpMail
- <BLANKLINE>
- For more general information on OpenPGP and related tools such as Gnu Privacy
- Guard (GPG), please see:
- <BLANKLINE>
- https://help.ubuntu.com/community/GnuPrivacyGuardHowto
- <BLANKLINE>
- -----BEGIN PGP MESSAGE-----
- ...
- -----END PGP MESSAGE-----
- <BLANKLINE>
- <BLANKLINE>
- Thanks,
- <BLANKLINE>
- The Launchpad Team
-
-Import the secret keys needed for this test:
-
- >>> from lp.services.gpg.interfaces import IGPGHandler
-
- >>> from lp.testing.gpgkeys import (
- ... import_secret_test_key, decrypt_content)
-
-
- >>> gpghandler = getUtility(IGPGHandler)
-
- >>> login(ANONYMOUS)
- >>> key = import_secret_test_key('test@xxxxxxxxxxxxxxxxx')
-
-'cipher_body' is a message encrypted with the just-imported
-1024D/DFD20543 OpenPGP key, we need to access the current IGpghandler
-instance to access this key and decrypt the message.
-
- >>> body = decrypt_content(cipher_body, 'test')
-
-Extract the token URL from the email:
-
- >>> from lp.services.verification.tests.logintoken import (
- ... get_token_url_from_string)
- >>> token_url = get_token_url_from_string(body)
-
-Go to the link sent by email, to validate the email address.
-
- >>> logout()
- >>> browser.open(token_url)
-
-Get redirected to +validategpg, and confirm token:
-
- >>> print browser.url
- http://launchpad.dev/token/.../+validategpg
- >>> browser.getControl('Continue').click()
-
-Get redirected to the user's homepage with a greeting:
-
- >>> browser.url
- 'http://launchpad.dev/~name12'
- >>> print_feedback_messages(browser.contents)
- The key 1024D/DFD20543 was successfully validated.
-
-Certify the key is imported:
-
- >>> browser.open("http://launchpad.dev/~name12/+editpgpkeys")
- >>> browser.getControl(name='DEACTIVATE_GPGKEY').displayOptions
- ['1024D/DFD20543']
-
-Verify that the key was imported with the "can encrypt" flag set:
-
- >>> from lp.registry.model.gpgkey import GPGKey
- >>> key = GPGKey.selectOneBy(
- ... fingerprint='A419AE861E88BC9E04B9C26FBA2B9389DFD20543')
- >>> print key.owner.name
- name12
- >>> print key.can_encrypt
- True
-
-
-== Claim a sign-only GPG key ==
-
-Here, Sample Person wants to claim a GPG key that can only sign
-content. They can't verify their key by decrypting content on demand, but
-they can verify it by signing content. Launchpad sends them an email
-token. The email step ensures that an attacker who knows Sample
-Person's Launchpad password can't associate arbitrary GPG keys with
-their Launchpad account.
-
- >>> browser.open("http://launchpad.dev/~name12/+editpgpkeys")
-
- >>> fingerprint = "447DBF38C4F9C4ED752246B77D88913717B05A8F"
- >>> browser.getControl(name='fingerprint').value = fingerprint
- >>> browser.getControl(name='import').click()
- >>> print_feedback_messages(browser.contents)
- A message has been sent to test@xxxxxxxxxxxxx. To
- confirm the key 1024D/17B05A8F is yours, follow
- the link inside.
-
-Sample Person checks their email.
-
- >>> from_addr, to_addrs, raw_msg = stub.test_emails.pop()
- >>> msg = email.message_from_string(raw_msg)
- >>> msg.get_content_type()
- 'text/plain'
- >>> body = msg.get_payload(decode=True)
-
-The email is not encrypted, since Sample Person didn't claim the
-ability to decrypt text with this key.
-
- >>> '-----BEGIN PGP MESSAGE-----' in body
- False
-
-The email does contain some information about the key, and a token URL
-Sample Person should visit to verify their ownership of the key.
-
- >>> print body
- <BLANKLINE>
- Hello,
- ...
- User name : Sample Person
- Email address: test@xxxxxxxxxxxxx
- ...
- Fingerprint : 447DBF38C4F9C4ED752246B77D88913717B05A8F
- Key type/ID : 1024D/17B05A8F
- <BLANKLINE>
- UIDs:
- sign.only@xxxxxxxxxxxxx
- ...
- http://launchpad.dev/token/...
-
- >>> token_url = get_token_url_from_string(body)
-
-Side note: in a little while, Sample User will be asked to sign some
-text which includes the date the token was generated (to avoid replay
-attacks). To make this testable, we set the creation date of this
-token to a fixed value:
-
- >>> nothing, token_value = token_url.split('http://launchpad.dev/token/')
-
- >>> import datetime, hashlib, pytz
- >>> from lp.services.verification.model.logintoken import LoginToken
- >>> logintoken = LoginToken.selectOneBy(
- ... _token=hashlib.sha256(token_value).hexdigest())
- >>> logintoken.date_created = datetime.datetime(
- ... 2005, 4, 1, 12, 0, 0, tzinfo=pytz.timezone('UTC'))
- >>> logintoken.sync()
-
-Back to Sample User. They visit the token URL and is asked to sign some
-text to prove they own the key.
-
- >>> browser.open(token_url)
- >>> browser.title
- 'Confirm sign-only OpenPGP key'
-
-Let's look at the text.
-
- >>> verification_content = find_main_content(
- ... browser.contents).pre.string
- >>> print verification_content
- Please register 447DBF38C4F9C4ED752246B77D88913717B05A8F to the
- Launchpad user name12. 2005-04-01 12:00:00 UTC
-
-If they refuse to sign the text, they get an error message.
-
- >>> browser.getControl('Continue').click()
- >>> browser.title
- 'Confirm sign-only OpenPGP key'
- >>> print_feedback_messages(browser.contents)
- There is 1 error.
- Required input is missing.
-
-If they sign a different text, they get an error message.
-
- >>> login(ANONYMOUS)
- >>> key = import_secret_test_key('sign.only@xxxxxxxxxxxxxxxxx')
- >>> bad = gpghandler.signContent(
- ... 'This is not the verification message!', key, 'test')
- >>> logout()
-
- >>> browser.getControl('Signed text').value = bad
- >>> browser.getControl('Continue').click()
- >>> print_feedback_messages(browser.contents)
- There is 1 error.
- The signed content does not match the message found in the email.
-
-If they sign the text with a different key, they get an error
-message. The following text was signed with the key DFD20543:
-
- >>> signed_content = """
- ... -----BEGIN PGP SIGNED MESSAGE-----
- ... Hash: SHA1
- ...
- ... Please register 447DBF38C4F9C4ED752246B77D88913717B05A8F to the
- ... Launchpad user name12. 2005-04-01 12:00:00 UTC
- ... -----BEGIN PGP SIGNATURE-----
- ... Version: GnuPG v1.4.1 (GNU/Linux)
- ...
- ... iD8DBQFDcLOh2yWXVgK6XvYRAkpWAKDFHRpVJc2flFwpQMMxub4cl+TcCACgyciu
- ... s7GH1fQGOQMqpvpinwOjGto=
- ... =w7/b
- ... -----END PGP SIGNATURE-----
- ... """
- >>> browser.getControl('Signed text').value = signed_content
- >>> browser.getControl('Continue').click()
- >>> print_feedback_messages(browser.contents)
- There is 1 error.
- The key used to sign the content (A419AE861E88BC9E04B9C26FBA2B9389DFD20543)
- is not the key you were registering
-
-If they sign the text correctly, they are redirected to their home page.
-
- >>> login(ANONYMOUS)
- >>> good = gpghandler.signContent(str(verification_content), key, 'test')
- >>> logout()
-
- >>> browser.getControl('Signed text').value = good
- >>> browser.getControl('Continue').click()
- >>> browser.url
- 'http://launchpad.dev/~name12'
- >>> print_feedback_messages(browser.contents)
- The key 1024D/17B05A8F was successfully validated.
-
-Now that the key has been validated, the login token is consumed:
-
- >>> consumed_token = LoginToken.selectOneBy(
- ... _token=hashlib.sha256(token_value).hexdigest())
- >>> consumed_token.date_consumed is not None
- True
-
-Now Sample Person's sign-only key is associated with their account. They
-verify this:
-
- >>> browser.open("http://launchpad.dev/~name12/+editpgpkeys")
-
- >>> content = find_main_content(browser.contents)
- >>> browser.getControl(name='DEACTIVATE_GPGKEY').displayOptions
- [...'1024D/17B05A8F (sign only)']
-
-On a mad whim they decide to de-activate the key they just imported.
-
- >>> browser.getControl(name="DEACTIVATE_GPGKEY").displayValue = [
- ... '1024D/17B05A8F (sign only)']
- >>> browser.getControl('Deactivate Key').click()
-
- >>> print_feedback_messages(browser.contents)
- Deactivated key(s): 1024D/17B05A8F
-
-Coming to their senses, they ask for a re-validation of the key.
-
- >>> browser.getControl(name="REACTIVATE_GPGKEY").value = ['3']
- >>> browser.getControl('Reactivate Key').click()
-
- >>> print_feedback_messages(browser.contents)
- A message has been sent to test@xxxxxxxxxxxxx with instructions
- to reactivate these key(s): 1024D/17B05A8F
-
-They open the page and checks that the key is displayed as pending
-revalidation.
-
- >>> browser.reload()
- >>> browser.getControl(name='REMOVE_GPGTOKEN').displayOptions
- ['447DBF38C4F9C4ED752246B77D88913717B05A8F']
-
-(We won't run through the whole validation process again, as this key isn't
-used in any more tests.)
-
-== Teardown ==
-
- >>> tac.tearDown()
-
-=========================
-Signing a Code of Conduct
-=========================
-
-Sample person has never signed a code of conduct.
-
- >>> browser = setupBrowser(auth='Basic test@xxxxxxxxxxxxx:test')
- >>> browser.open('http://launchpad.dev/~name12/+codesofconduct')
- >>> print extract_text(find_main_content(browser.contents))
- Codes of Conduct for Sample Person
- Launchpad records codes of conduct you sign as commitments to the
- principles of collaboration, tolerance and open communication that
- drive the open source community.
- Sample Person has never signed a code
- of conduct.
- See or sign new code of conduct releases
-
- # A helper function for reading a code-of-conduct file.
- >>> import os
- >>> def read_file(filename):
- ... path = os.path.join(os.path.dirname(__file__), filename)
- ... with open(path) as file_object:
- ... return file_object.read()
-
-
-Code of Conduct registration problems
-=====================================
-
-Sample Person tries unsuccessfully to register a truncated code of conduct.
-
- >>> truncated_coc = read_file('truncated_coc.asc')
- >>> browser.open('http://launchpad.dev/codeofconduct/2.0/+sign')
- >>> browser.getControl('Signed Code').value = truncated_coc
- >>> browser.getControl('Continue').click()
- >>> print_errors(browser.contents)
- There is 1 error.
- The signed text does not match the Code of Conduct. Make sure that you
- signed the correct text (white space differences are acceptable).
-
-Sample Person tries unsuccessfully to register an old version of the code.
-
- >>> coc_version_1_0 = read_file('10_coc.asc')
- >>> browser.getControl('Signed Code').value = coc_version_1_0
- >>> browser.getControl('Continue').click()
- >>> print_errors(browser.contents)
- There is 1 error.
- The signed text does not match the Code of Conduct. Make sure that you
- signed the correct text (white space differences are acceptable).
-
-
-Sample Person tries to access the old version page to sign it, and is informed
-that there is a new version available.
-
- >>> browser.open('http://launchpad.dev/codeofconduct/1.0/+sign')
- >>> browser.getLink('the current version').click()
- >>> print browser.url
- http://launchpad.dev/codeofconduct/2.0
-
- >>> browser.getLink('Sign it').click()
- >>> print browser.url
- http://launchpad.dev/codeofconduct/2.0/+sign
-
-
-Code of Conduct registration
-============================
-
-Sample Person registers the code of conduct, using a reformatted copy which
-has leading spaces removed. This succeeds because the words the same and
-appear in the same order.
-
- >>> reformatted_coc = read_file('reformatted_20_coc.asc')
- >>> browser.getControl('Signed Code').value = reformatted_coc
- >>> browser.getControl('Continue').click()
- >>> print browser.url
- http://launchpad.dev/~name12/+codesofconduct
-
-And now Sample Person's Codes of Conduct page shows that they've signed it.
-
- >>> browser.open('http://launchpad.dev/~name12/+codesofconduct')
- >>> print extract_text(find_main_content(browser.contents))
- Codes of Conduct for Sample Person
- Launchpad records codes of conduct you sign as commitments to the
- principles of collaboration, tolerance and open communication that
- drive the open source community.
- Active signatures
- If you change your mind about agreeing to a code of conduct,
- you can deactivate your signature.
- ...: digitally signed by Sample Person (1024D/DFD20543)
- ...
-
-
-Now Sample Person will deactivate their key...
-
- >>> browser = setupBrowserFreshLogin(name12)
- >>> browser.open('http://launchpad.dev/~name12/+editpgpkeys')
- >>> browser.url
- 'http://launchpad.dev/~name12/+editpgpkeys'
-
- >>> print browser.contents
- <...
- ...Your active keys...
- ...1024D/DFD20543...
-
-
-... but they forgot to select the checkbox of the key they want to remove.
-
- >>> browser.getControl('Deactivate Key').click()
- >>> for tag in find_main_content(browser.contents)('p', 'error message'):
- ... print tag.renderContents()
- No key(s) selected for deactivation.
-
-
-Now they select the checkbox and deactivate it.
-
- >>> browser.getControl('1024D/DFD20543').selected = True
- >>> browser.getControl('Deactivate Key').click()
- >>> soup = find_main_content(browser.contents)
- >>> for tag in soup('p', 'informational message'):
- ... print tag.renderContents()
- Deactivated key(s): 1024D/DFD20543
-
-
-Sample Person already has a deactivated key.
-
- >>> browser.open('http://launchpad.dev/~name12/+editpgpkeys')
- >>> browser.url
- 'http://launchpad.dev/~name12/+editpgpkeys'
-
- >>> print browser.contents
- <...
- ...Deactivated keys...
- ...1024D/DFD20543...
-
-
-Now they'll request their key to be reactivated.
-
- >>> browser.getControl('Reactivate Key').click()
- >>> soup = find_main_content(browser.contents)
- >>> for tag in soup('p', 'error message'):
- ... print tag.renderContents()
- No key(s) selected for reactivation.
-
- >>> browser.getControl('1024D/DFD20543').selected = True
- >>> browser.getControl('Reactivate Key').click()
- >>> soup = find_main_content(browser.contents)
- >>> for tag in soup('p', 'informational message'):
- ... print tag.renderContents()
- A message has been sent to test@xxxxxxxxxxxxx with instructions to reactivate...
-
-
-Get the token from the body of the email sent.
-
- >>> import email, re
- >>> from lp.services.mail import stub
- >>> from_addr, to_addrs, raw_msg = stub.test_emails.pop()
- >>> msg = email.message_from_string(raw_msg)
- >>> cipher_body = msg.get_payload(decode=1)
- >>> body = decrypt_content(cipher_body, 'test')
- >>> link = re.findall(r'http.*/token/.*', body)[0]
- >>> token = re.sub(r'.*token/', '', link)
- >>> token_url = 'http://launchpad.dev/token/%s' % token.encode('ascii')
-
-
-Going to the token page will get us redirected to the page of that specific
-token type (+validategpg).
-
- >>> browser.open(token_url)
- >>> browser.url == '%s/+validategpg' % token_url
- True
-
- >>> print browser.contents
- <...
- ...Confirm the OpenPGP key...A419AE861E88BC9E04B9C26FBA2B9389DFD20543...
- ...Sample Person...
-
-
-Now Sample Person confirms the reactivation.
-
- >>> browser.getControl('Continue').click()
- >>> browser.url
- 'http://launchpad.dev/~name12'
-
- >>> print browser.contents
- <...
- ...Key 1024D/DFD20543 successfully reactivated...
-
-
-And now we can see the key listed as one of Sample Person's active keys.
-
- >>> browser.open('http://launchpad.dev/~name12/+editpgpkeys')
- >>> print browser.contents
- <...
- ...Your active keys...
- ...1024D/DFD20543...
-
-This test verifies that we correctly handle keys which are in some way
-special: either invalid, broken, revoked, expired, or already imported.
-
- >>> import email
- >>> from lp.testing.keyserver import KeyServerTac
- >>> from lp.services.mail import stub
-
- >>> tac = KeyServerTac()
- >>> tac.setUp()
-
- >>> sign_only = "447D BF38 C4F9 C4ED 7522 46B7 7D88 9137 17B0 5A8F"
- >>> preimported = "A419AE861E88BC9E04B9C26FBA2B9389DFD20543"
-
-Try to import a key which is already imported:
-
- >>> del stub.test_emails[:]
- >>> browser.open('http://launchpad.dev/~name12/+editpgpkeys')
- >>> browser.getControl(name='fingerprint').value = preimported
- >>> browser.getControl(name='import').click()
- >>> "A message has been sent" in browser.contents
- False
- >>> stub.test_emails
- []
- >>> print browser.contents
- <BLANKLINE>
- ...
- ...has already been imported...
-
- >>> tac.tearDown()
-
-
-
-Ensure we are raising 404 error instead of System Error
-
- >>> print http(r"""
- ... POST /codeofconduct/donkey HTTP/1.1
- ... Authorization: Basic Zm9vLmJhckBjYW5vbmljYWwuY29tOnRlc3Q=
- ... Referer: https://launchpad.dev/
- ... """)
- HTTP/1.1 404 Not Found
- ...
-
-Check to see no CoC signature is registered for Mark:
-
- >>> admin_browser.open('http://localhost:9000/codeofconduct/console')
- >>> admin_browser.getControl(name='searchfor').value = ["all"]
- >>> admin_browser.getControl(name='name').value = "mark"
- >>> admin_browser.getControl(name='search').click()
- >>> "No signatures found." in admin_browser.contents
- True
-
-Perform Acknowledge process as Foo bar person:
-
- >>> admin_browser.open('http://localhost:9000/codeofconduct/console/+new')
- >>> admin_browser.title
- 'Register a code of conduct signature'
-
- >>> admin_browser.getControl(
- ... name='field.owner').value = "mark@xxxxxxxxxxx"
- >>> admin_browser.getControl('Register').click()
- >>> admin_browser.url
- 'http://localhost:9000/codeofconduct/console'
-
-Ensure the CoC was acknowledge by searching in the CoC Admin Console:
-
- >>> admin_browser.open('http://launchpad.dev/codeofconduct/console')
- >>> admin_browser.getControl(name='searchfor').value = ["all"]
- >>> admin_browser.getControl(name='name').value = "mark"
- >>> admin_browser.getControl(name='search').click()
- >>> print extract_text(find_tag_by_id(admin_browser.contents, 'matches'))
- Mark ... paper submission accepted by Foo Bar [ACTIVE]
-
-Test if the advertisement email was sent:
-
- >>> import email
- >>> from lp.services.mail import stub
- >>> from_addr, to_addrs, raw_msg = stub.test_emails.pop()
- >>> msg = email.message_from_string(raw_msg)
- >>> print msg.get_payload(decode=True)
- <BLANKLINE>
- ...
- User: 'Mark Shuttleworth'
- Paper Submitted acknowledge by Foo Bar
- ...
-
- Let's login with an Launchpad Admin
-
- >>> for pos, (key, _) in enumerate(browser.mech_browser.addheaders):
- ... if key == 'Authorization':
- ... del browser.mech_browser.addheaders[pos]
- ... break
- >>> browser.addHeader(
- ... 'Authorization', 'Basic guilherme.salgado@xxxxxxxxxxxxx:test')
-
- Check if we can see the Code of conduct page
-
- >>> browser.open('http://localhost:9000/codeofconduct')
- >>> 'Ubuntu Codes of Conduct' in browser.contents
- True
-
- The link to the Administrator console
-
- >>> admin_console_link = browser.getLink('Administration console')
- >>> admin_console_link.url
- 'http://localhost:9000/codeofconduct/console'
-
- Let's follow the link
-
- >>> admin_console_link.click()
-
- We are in the Administration page
-
- >>> browser.url
- 'http://localhost:9000/codeofconduct/console'
-
- >>> 'Administer code of conduct signatures' in browser.contents
- True
-
- >>> browser.getLink("register signatures").url
- 'http://localhost:9000/codeofconduct/console/+new'
-
-
- Back to the CoC front page let's see the current version of the CoC
-
- >>> browser.open('http://localhost:9000/codeofconduct')
- >>> browser.getLink('current version').click()
-
- >>> 'Ubuntu Code of Conduct - 2.0' in browser.contents
- True
-
- >>> browser.getLink('Sign it').url
- 'http://localhost:9000/codeofconduct/2.0/+sign'
-
- >>> browser.getLink('Download this version').url
- 'http://localhost:9000/codeofconduct/2.0/+download'
-
-
-= TearDown =
-
- >>> feature_fixture.cleanUp()
=== modified file 'lib/lp/registry/tests/test_gpgkey.py'
--- lib/lp/registry/tests/test_gpgkey.py 2016-07-25 00:20:12 +0000
+++ lib/lp/registry/tests/test_gpgkey.py 2016-11-03 15:22:08 +0000
@@ -19,12 +19,7 @@
ConfigFixture,
ConfigUseFixture,
)
-from lp.services.features.testing import FeatureFixture
-from lp.services.gpg.interfaces import (
- GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG,
- GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG,
- GPGKeyAlgorithm,
- )
+from lp.services.gpg.interfaces import GPGKeyAlgorithm
from lp.services.verification.interfaces.authtoken import LoginTokenType
from lp.services.verification.interfaces.logintoken import ILoginTokenSet
from lp.testing import TestCaseWithFactory
@@ -36,12 +31,6 @@
layer = LaunchpadFunctionalLayer
def test_can_add_keys_for_test(self):
- # IGPGKeySet.new _only_ writes to the launchpad database, so this test
- # only works if the read_from_gpgservice FF is *not* set. Once this is
- # the default codepath this test should be deleted.
- self.useFixture(FeatureFixture({
- GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG: None,
- }))
keyset = getUtility(IGPGKeySet)
person = self.factory.makePerson()
fingerprint = "DEADBEEF12345678DEADBEEF12345678DEADBEEF"
@@ -120,17 +109,6 @@
config_fixture.add_section('\n'.join(setting_lines))
self.useFixture(ConfigUseFixture(config_name))
- def test_getOwnerIdForPerson_uses_canonical_url(self):
- keyset = getUtility(IGPGKeySet)
- email = 'foo@xxxxxxx'
- person = self.factory.makePerson(email)
- openid_canonical_root = self.getUniqueString()
- self.set_config_parameters(openid_canonical_root=openid_canonical_root)
-
- self.assertThat(
- keyset.getOwnerIdForPerson(person),
- StartsWith(openid_canonical_root))
-
def test_getAllOwnerIdsForPerson_uses_canonical_url(self):
keyset = getUtility(IGPGKeySet)
email = 'foo@xxxxxxx'
@@ -140,15 +118,3 @@
for id in keyset.getAllOwnerIdsForPerson(person):
self.assertThat(id, StartsWith(openid_canonical_root))
-
-
-class GPGKeySetWithGPGServiceTests(GPGKeySetTests):
-
- """A copy of GPGKeySetTests, but with gpgservice used."""
-
- def setUp(self):
- super(GPGKeySetWithGPGServiceTests, self).setUp()
- self.useFixture(FeatureFixture({
- GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG: True,
- GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG: True,
- }))
=== modified file 'lib/lp/services/gpg/configure.zcml'
--- lib/lp/services/gpg/configure.zcml 2016-03-18 04:20:26 +0000
+++ lib/lp/services/gpg/configure.zcml 2016-11-03 15:22:08 +0000
@@ -19,16 +19,6 @@
<allow interface="lp.services.gpg.interfaces.IGPGHandler" />
</securedutility>
- <class class="lp.services.gpg.handler.GPGClient">
- <allow interface="lp.services.gpg.interfaces.IGPGClient" />
- </class>
-
- <securedutility
- class="lp.services.gpg.handler.LPGPGClient"
- provides="lp.services.gpg.interfaces.IGPGClient">
- <allow interface="lp.services.gpg.interfaces.IGPGClient" />
- </securedutility>
-
<class class="lp.services.gpg.handler.PymeSignature">
<allow interface="lp.services.gpg.interfaces.IPymeSignature" />
</class>
=== modified file 'lib/lp/services/gpg/handler.py'
--- lib/lp/services/gpg/handler.py 2016-07-20 05:24:30 +0000
+++ lib/lp/services/gpg/handler.py 2016-11-03 15:22:08 +0000
@@ -12,7 +12,6 @@
import atexit
import httplib
-import json
import os
import shutil
import socket
@@ -23,7 +22,6 @@
import urllib
import gpgme
-from gpgservice_client import GPGClient
from lazr.restful.utils import get_current_browser_request
import requests
from zope.interface import implementer
@@ -40,7 +38,6 @@
GPGKeyTemporarilyNotFoundError,
GPGUploadFailure,
GPGVerificationError,
- IGPGClient,
IGPGHandler,
IPymeKey,
IPymeSignature,
@@ -622,8 +619,7 @@
def sanitize_fingerprint(fingerprint):
"""Sanitize a GPG fingerprint.
- This is the ultimate implementation of IGPGHandler.sanitizeFingerprint, and
- is also used by the IGPGClient implementation.
+ This is the ultimate implementation of IGPGHandler.sanitizeFingerprint.
"""
# remove whitespaces, truncate to max of 40 (as per v4 keys) and
# convert to upper case
@@ -634,43 +630,3 @@
return None
return fingerprint
-
-
-@implementer(IGPGClient)
-class LPGPGClient(GPGClient):
- """See IGPGClient."""
-
- def __init__(self):
- super(LPGPGClient, self).__init__(bypass_proxy=True)
- self.action = None
-
- def get_endpoint(self):
- return config.gpgservice.api_endpoint
-
- def get_timeout(self):
- return 30.0
-
- def on_request_start(self, method, path, data=None,
- headers=dict()):
- assert self.action is None, "Error: Overlapping requests to gpgservice"
- timeline = get_request_timeline(
- get_current_browser_request())
- if data:
- data_summary = '%d byte body' % len(data)
- else:
- data_summary = 'no body'
- header_whitelist = (
- 'content-type',
- 'x-gpg-fingerprint',
- )
- headers = dict(
- [(k, v) for k, v in headers.items() if k.lower() in header_whitelist]
- )
- self.action = timeline.start(
- "gpgservice-%s" % method.upper(),
- ' '.join((path, data_summary, json.dumps(headers)))
- )
-
- def on_request_end(self):
- self.action.finish()
- self.action = None
=== modified file 'lib/lp/services/gpg/interfaces.py'
--- lib/lp/services/gpg/interfaces.py 2016-06-13 03:15:44 +0000
+++ lib/lp/services/gpg/interfaces.py 2016-11-03 15:22:08 +0000
@@ -2,21 +2,14 @@
# GNU Affero General Public License version 3 (see the file LICENSE).
__all__ = [
- 'GPG_DATABASE_READONLY_FEATURE_FLAG',
- 'GPG_HIDE_PERSON_KEY_LISTING',
- 'GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG',
- 'GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG',
'GPGKeyAlgorithm',
'GPGKeyDoesNotExistOnServer',
'GPGKeyExpired',
'GPGKeyNotFoundError',
'GPGKeyRevoked',
'GPGKeyTemporarilyNotFoundError',
- 'GPGReadOnly',
- 'GPGServiceException',
'GPGUploadFailure',
'GPGVerificationError',
- 'IGPGClient',
'IGPGHandler',
'IPymeKey',
'IPymeSignature',
@@ -27,36 +20,16 @@
'valid_keyid',
]
-import httplib
import re
-from gpgservice_client import GPGServiceException
from lazr.enum import (
DBEnumeratedType,
DBItem,
)
-from lazr.restful.declarations import error_status
from zope.interface import (
Attribute,
Interface,
)
-from zope.security.interfaces import Forbidden
-
-
-@error_status(httplib.FORBIDDEN)
-class GPGReadOnly(Forbidden):
- """GPG Service is in read-only mode."""
-
- def __init__(self):
- super(GPGReadOnly, self).__init__(
- "The GPG key storage facilities of Launchpad are currently "
- "read-only. Please try again later.")
-
-
-GPG_DATABASE_READONLY_FEATURE_FLAG = u"gpg.database_read_only"
-GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG = u"gpg.write_to_gpgservice"
-GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG = u"gpg.read_from_gpgservice"
-GPG_HIDE_PERSON_KEY_LISTING = u"gpg.hide_person_key_listing"
def valid_fingerprint(fingerprint):
@@ -428,79 +401,3 @@
name = Attribute("The name portion of this user ID")
email = Attribute("The email portion of this user ID")
comment = Attribute("The comment portion of this user ID")
-
-
-class IGPGClient(Interface):
-
- """A client for querying a gpgservice instance."""
-
- def getKeysForOwner(owner_id):
- """Get a list of keys for a given owner.
-
- :raises GPGServiceException: If we get an error from the gpgservice.
- :raises socket.error" on socket-level errors (connection timeouts etc)
- """
-
- def addKeyForOwner(owner_id, fingerprint):
- """Add a GPG key.
-
- :raises ValueError: if the fingerprint isn't valid.
- :raises GPGServiceException: If we get an error from the gpgservice.
- :raises socket.error" on socket-level errors (connection timeouts etc)
- """
-
- def disableKeyForOwner(owner_id, fingerprint):
- """Disable a GPG key.
-
- :raises ValueError: if the fingerprint isn't valid.
- :raises GPGServiceException: If we get an error from the gpgservice.
- :raises socket.error" on socket-level errors (connection timeouts etc)
- """
-
- def getKeyByFingerprint(fingerprint):
- """Get a GPG key by its fingerprint.
-
- :raises ValueError: if the fingerprint isn't valid.
- """
-
- def getKeysByFingerprints(fingerprints):
- """Bulk retrieve GPG keys by a list of fingerprints.
-
- :param fingerprints: A list of fingerprints to retrieve.
- :returns: A list of keys that were found.
- """
-
- def registerWriteHook(hook_callable):
- """Register a write hook.
-
- The hook_callable will be called with no arguments whenever an operation
- is performed that modifies the GPG database.
-
- :raises TypeError: if hook_callable is not a callable.
- :raises GPGServiceException: If we get an error from the gpgservice.
- """
-
- def unregisterWriteHook(hook_callable):
- """Deregister a write hook that was registered with register_write_hook.
-
- :raises ValueError: if hook_callable was not registered.
- """
-
- def addKeyForTest(owner_id, keyid, fingerprint, keysize, algorithm, enabled,
- can_encrypt):
- """Add a key to the gpgservice without checking the keyserver.
-
- This method is to be used for TESTING purposes only. The running
- gpgservice instance must have its test methods configured - something
- that should not be done in production. If this requirement is not met
- a RuntimeError will be raised.
-
- :param owner_id: A string representing the owner, as returned by
- IGPGKeySet.getOwnerIdForPerson
- :param keyid: A string describing the short-form gpg key id.
- :param fingerprint: A string containing the full GPG fingerprint.
- :param keysize: An integer, containing the keysize.
- :param algorithm: The key algorithm code, a single letter.
- :param enabled: Whether the key is enabled or not.
- :param can_encrypt: Whether the key can be used for encryption.
- """
=== modified file 'lib/lp/services/gpg/tests/test_gpghandler.py'
--- lib/lp/services/gpg/tests/test_gpghandler.py 2016-06-16 23:11:32 +0000
+++ lib/lp/services/gpg/tests/test_gpghandler.py 2016-11-03 15:22:08 +0000
@@ -2,38 +2,19 @@
# GNU Affero General Public License version 3 (see the file LICENSE).
import base64
-import json
import os
-import random
-import string
import subprocess
import gpgme
-from lazr.restful.utils import get_current_browser_request
-from testtools.matchers import (
- ContainsDict,
- Equals,
- HasLength,
- raises,
- )
from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
-from lp.registry.interfaces.gpg import IGPGKeySet
-from lp.registry.interfaces.person import IPersonSet
-from lp.services.database.constants import THIRTY_DAYS_AGO
-from lp.services.database.interfaces import IMasterStore
-from lp.services.features.testing import FeatureFixture
from lp.services.gpg.interfaces import (
GPGKeyDoesNotExistOnServer,
GPGKeyTemporarilyNotFoundError,
- GPGServiceException,
- IGPGClient,
IGPGHandler,
)
from lp.services.log.logger import BufferLogger
-from lp.services.openid.model.openididentifier import OpenIdIdentifier
-from lp.services.timeline.requesttimeline import get_request_timeline
from lp.services.timeout import (
get_default_timeout_function,
set_default_timeout_function,
@@ -44,8 +25,6 @@
logout,
TestCase,
)
-from lp.testing.factory import BareLaunchpadObjectFactory
-from lp.testing.fakemethod import FakeMethod
from lp.testing.gpgkeys import (
import_secret_test_key,
iter_test_key_emails,
@@ -53,11 +32,7 @@
test_pubkey_from_email,
)
from lp.testing.keyserver import KeyServerTac
-from lp.testing.layers import (
- GPGServiceLayer,
- LaunchpadFunctionalLayer,
- ZopelessDatabaseLayer,
- )
+from lp.testing.layers import LaunchpadFunctionalLayer
class TestGPGHandler(TestCase):
@@ -247,330 +222,6 @@
self.assertEqual(gpgme.MD_SHA512, int(validsig_tokens[7]))
-class GPGServiceZopelessLayer(ZopelessDatabaseLayer, GPGServiceLayer):
- """A layer specifically for running the IGPGClient utility tests."""
-
- @classmethod
- def setUp(cls):
- pass
-
- @classmethod
- def tearDown(cls):
- pass
-
- @classmethod
- def testSetUp(cls):
- pass
-
- @classmethod
- def testTearDown(cls):
- pass
-
-
-class GPGClientTests(TestCase):
-
- layer = GPGServiceZopelessLayer
-
- def setUp(self):
- super(GPGClientTests, self).setUp()
- self.factory = BareLaunchpadObjectFactory()
-
- def test_can_get_utility(self):
- client = getUtility(IGPGClient)
- self.assertIsNot(None, client)
-
- def get_random_owner_id_string(self):
- """Get a random string that's representative of the owner id scheme."""
- candidates = string.ascii_lowercase + string.digits
- openid_id = ''.join((random.choice(candidates) for i in range(6)))
- return 'http://testopenid.dev/+id/' + openid_id
-
- def test_get_key_for_user_with_sampledata(self):
- client = getUtility(IGPGClient)
- person = getUtility(IPersonSet).getByName('name16')
- openid_id = getUtility(IGPGKeySet).getOwnerIdForPerson(person)
- data = client.getKeysForOwner(openid_id)
- self.assertThat(data, ContainsDict({'keys': HasLength(1)}))
-
- def test_get_key_for_unknown_user(self):
- client = getUtility(IGPGClient)
- user = self.get_random_owner_id_string()
- data = client.getKeysForOwner(user)
- self.assertThat(data, ContainsDict({'keys': HasLength(0)}))
-
- def test_register_non_callable_raises_TypeError(self):
- client = getUtility(IGPGClient)
- self.assertThat(
- lambda: client.registerWriteHook("not a callable"),
- raises(TypeError))
-
- def test_unregister_with_unregistered_hook_raises_ValueError(self):
- client = getUtility(IGPGClient)
- self.assertThat(
- lambda: client.unregisterWriteHook("not registered"),
- raises(ValueError))
-
- def test_can_unregister_registered_write_hook(self):
- client = getUtility(IGPGClient)
- hook = FakeMethod()
- client.registerWriteHook(hook)
- client.unregisterWriteHook(hook)
-
- self.assertThat(
- lambda: client.unregisterWriteHook(hook),
- raises(ValueError))
-
- def test_can_add_new_fingerprint_for_user(self):
- self.useFixture(KeyServerTac())
- client = getUtility(IGPGClient)
- fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
- user = self.get_random_owner_id_string()
- client.addKeyForOwner(user, fingerprint)
- data = client.getKeysForOwner(user)
- self.assertThat(data, ContainsDict({'keys': HasLength(1)}))
- keys = data['keys']
- self.assertThat(
- keys[0],
- ContainsDict({
- 'fingerprint': Equals(fingerprint),
- 'enabled': Equals(True)
- }))
-
- def test_adding_fingerprint_notifies_writes(self):
- self.useFixture(KeyServerTac())
- client = getUtility(IGPGClient)
- hook = FakeMethod()
- client.registerWriteHook(hook)
- self.addCleanup(client.unregisterWriteHook, hook)
- fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
- user = self.get_random_owner_id_string()
- client.addKeyForOwner(user, fingerprint)
-
- self.assertThat(hook.call_count, Equals(1))
-
- def test_adding_invalid_fingerprint_raises_ValueError(self):
- client = getUtility(IGPGClient)
- self.assertThat(
- lambda: client.addKeyForOwner(
- self.get_random_owner_id_string(), ''),
- raises(ValueError("Invalid fingerprint: ''.")))
-
- def test_adding_duplicate_fingerprint_raises_GPGServiceException(self):
- self.useFixture(KeyServerTac())
- client = getUtility(IGPGClient)
- fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
- user_one = self.get_random_owner_id_string()
- user_two = self.get_random_owner_id_string()
- client.addKeyForOwner(user_one, fingerprint)
- self.assertThat(
- lambda: client.addKeyForOwner(user_two, fingerprint),
- raises(GPGServiceException(
- "Error: Fingerprint already in database.")))
-
- def test_disabling_active_key(self):
- self.useFixture(KeyServerTac())
- client = getUtility(IGPGClient)
- fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
- user = self.get_random_owner_id_string()
- client.addKeyForOwner(user, fingerprint)
- client.disableKeyForOwner(user, fingerprint)
- data = client.getKeysForOwner(user)
-
- self.assertThat(data, ContainsDict({'keys': HasLength(1)}))
- keys = data['keys']
- self.assertThat(keys[0], ContainsDict({'enabled': Equals(False)}))
-
- def test_disabling_key_notifies_writes(self):
- self.useFixture(KeyServerTac())
- client = getUtility(IGPGClient)
- fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
- user = self.get_random_owner_id_string()
- client.addKeyForOwner(user, fingerprint)
-
- hook = FakeMethod()
- client.registerWriteHook(hook)
- self.addCleanup(client.unregisterWriteHook, hook)
- client.disableKeyForOwner(user, fingerprint)
- self.assertThat(hook.call_count, Equals(1))
-
- def test_disabling_invalid_fingerprint_raises_ValueError(self):
- client = getUtility(IGPGClient)
- self.assertThat(
- lambda: client.disableKeyForOwner(
- self.get_random_owner_id_string(), ''),
- raises(ValueError("Invalid fingerprint: ''."))
- )
-
- def test_can_get_key_by_fingerprint(self):
- self.useFixture(KeyServerTac())
- client = getUtility(IGPGClient)
- fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
- user = self.get_random_owner_id_string()
- client.addKeyForOwner(user, fingerprint)
-
- key = client.getKeyByFingerprint(fingerprint)
- self.assertThat(
- key, ContainsDict({'owner': Equals(user),
- 'fingerprint': Equals(fingerprint)}))
-
- def test_get_missing_key_by_fingerprint(self):
- client = getUtility(IGPGClient)
- fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
- self.assertIsNone(client.getKeyByFingerprint(fingerprint))
-
- def test_get_key_with_bad_fingerprint_raises_ValueError(self):
- client = getUtility(IGPGClient)
- self.assertThat(lambda: client.getKeyByFingerprint('bad fingerprint'),
- raises(ValueError))
-
- def test_can_add_IGPGKey_to_test_enabled_gpgservice(self):
- self.useFixture(
- FeatureFixture({'gpg.write_to_gpgservice': True}))
- client = getUtility(IGPGClient)
- person = self.factory.makePerson()
- # With the feature flag enabled, the following creates a
- # gpg key on the gpgservice.
- gpgkey = self.factory.makeGPGKey(person)
- user = getUtility(IGPGKeySet).getOwnerIdForPerson(person)
- key = client.getKeyByFingerprint(gpgkey.fingerprint)
- self.assertThat(
- key, ContainsDict({'owner': Equals(user),
- 'fingerprint': Equals(gpgkey.fingerprint)}))
-
- def makePersonWithMultipleGPGKeysInDifferentOpenids(self):
- """Make a person with multiple GPG keys owned by
- different openid identifiers. This happens as a result
- of an account merge.
-
- :returns: an IPerson instance with two keys under
- different openid identifiers.
- """
- person = self.factory.makePerson()
- self.factory.makeGPGKey(person)
- # Create a second openid identifier from 30 days ago.
- # This simulates the account merge:
- identifier = OpenIdIdentifier()
- identifier.account = person.account
- identifier.identifier = u'openid_identifier'
- identifier.date_created = THIRTY_DAYS_AGO
- IMasterStore(OpenIdIdentifier).add(identifier)
- self.factory.makeGPGKey(person)
- return person
-
- def test_can_retrieve_keys_for_all_openid_identifiers(self):
- person = self.makePersonWithMultipleGPGKeysInDifferentOpenids()
- keys = getUtility(IGPGKeySet).getGPGKeysForPerson(person)
- self.assertThat(keys, HasLength(2))
-
- def test_can_deactivate_all_keys_with_multiple_openid_identifiers(self):
- person = self.makePersonWithMultipleGPGKeysInDifferentOpenids()
- keyset = getUtility(IGPGKeySet)
- key_one, key_two = keyset.getGPGKeysForPerson(person)
- keyset.deactivate(key_one)
- keyset.deactivate(key_two)
- key_one, key_two = keyset.getGPGKeysForPerson(person, active=False)
-
- self.assertFalse(key_one.active)
- self.assertFalse(key_two.active)
-
- def test_can_reactivate_all_keys_with_multiple_openid_identifiers(self):
- person = self.makePersonWithMultipleGPGKeysInDifferentOpenids()
- keyset = getUtility(IGPGKeySet)
- for k in keyset.getGPGKeysForPerson(person):
- keyset.deactivate(k)
- for k in keyset.getGPGKeysForPerson(person, active=False):
- keyset.activate(person, k, k.can_encrypt)
- key_one, key_two = keyset.getGPGKeysForPerson(person)
-
- self.assertTrue(key_one.active)
- self.assertTrue(key_two.active)
-
- def test_cannot_reactivate_someone_elses_key(self):
- person1 = self.factory.makePerson()
- key = self.factory.makeGPGKey(person1)
- person2 = self.factory.makePerson()
-
- keyset = getUtility(IGPGKeySet)
- keyset.deactivate(key)
- self.assertRaises(
- AssertionError,
- keyset.activate,
- person2, key, key.can_encrypt
- )
-
- def assert_last_timeline_action(self, expected_method, expected_url):
- timeline = get_request_timeline(get_current_browser_request())
- action = timeline.actions[-1]
- self.assertEqual("gpgservice-%s" % expected_method, action.category)
- self.assertEqual(expected_url, action.detail.split(" ", 1)[0])
-
- def test_get_keys_for_owner_has_timeline_support(self):
- client = getUtility(IGPGClient)
- user = self.get_random_owner_id_string()
- client.getKeysForOwner(user)
-
- self.assert_last_timeline_action(
- 'GET', construct_url("/users/{owner_id}/keys", user))
-
- def test_add_key_for_owner_has_timeline_support(self):
- self.useFixture(KeyServerTac())
- client = getUtility(IGPGClient)
- user = self.get_random_owner_id_string()
- fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
- client.addKeyForOwner(user, fingerprint)
-
- self.assert_last_timeline_action(
- 'POST', construct_url("/users/{owner_id}/keys", user))
-
- def test_disable_key_for_owner_has_timeline_support(self):
- self.useFixture(KeyServerTac())
- client = getUtility(IGPGClient)
- user = self.get_random_owner_id_string()
- fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
- client.addKeyForOwner(user, fingerprint)
- client.disableKeyForOwner(user, fingerprint)
-
- self.assert_last_timeline_action(
- 'DELETE',
- construct_url(
- "/users/{owner_id}/keys/{fingerprint}", user, fingerprint))
-
- def test_get_key_by_fingerprint_has_timeline_support(self):
- client = getUtility(IGPGClient)
- fingerprint = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
- client.getKeyByFingerprint(fingerprint)
-
- self.assert_last_timeline_action(
- 'GET',
- construct_url("/keys/{fingerprint}", fingerprint=fingerprint))
-
- def test_get_keys_by_fingerprints_has_timeline_support(self):
- client = getUtility(IGPGClient)
- fingerprints = [
- 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543',
- 'B439AF863EDEFC9E04FAB26FBA2B7289DF324545',
- ]
- client.getKeysByFingerprints(fingerprints)
-
- self.assert_last_timeline_action(
- 'GET',
- construct_url(
- "/keys/{fingerprint}", fingerprint=','.join(fingerprints)))
-
- def test_timeline_support_filters_unknown_headers(self):
- client = removeSecurityProxy(getUtility(IGPGClient))
- client._request(
- 'get', '/', headers={'X-Foo': 'bar', 'Content-Type': 'baz'})
-
- expected_headers = {'Content-Type': 'baz'}
- timeline = get_request_timeline(get_current_browser_request())
- action = timeline.actions[-1]
- self.assertEqual(
- '/ no body ' + json.dumps(expected_headers),
- action.detail
- )
-
-
def construct_url(template, owner_id='', fingerprint=''):
owner_id = base64.b64encode(owner_id, altchars='-_')
return template.format(owner_id=owner_id, fingerprint=fingerprint)
=== modified file 'lib/lp/services/verification/browser/logintoken.py'
--- lib/lp/services/verification/browser/logintoken.py 2016-02-10 02:37:07 +0000
+++ lib/lp/services/verification/browser/logintoken.py 2016-11-03 15:22:08 +0000
@@ -40,13 +40,10 @@
ITeam,
)
from lp.services.database.sqlbase import flush_database_updates
-from lp.services.features import getFeatureFlag
from lp.services.gpg.interfaces import (
- GPG_DATABASE_READONLY_FEATURE_FLAG,
GPGKeyExpired,
GPGKeyNotFoundError,
GPGKeyRevoked,
- GPGReadOnly,
GPGVerificationError,
IGPGHandler,
)
@@ -262,8 +259,6 @@
super(ValidateGPGKeyView, self).initialize()
def validate(self, data):
- if getFeatureFlag(GPG_DATABASE_READONLY_FEATURE_FLAG):
- raise GPGReadOnly()
self.gpg_key = self._getGPGKey()
if self.context.tokentype == LoginTokenType.VALIDATESIGNONLYGPG:
self._validateSignOnlyGPGKey(data)
=== modified file 'lib/lp/services/verification/browser/tests/test_logintoken.py'
--- lib/lp/services/verification/browser/tests/test_logintoken.py 2016-03-21 05:37:40 +0000
+++ lib/lp/services/verification/browser/tests/test_logintoken.py 2016-11-03 15:22:08 +0000
@@ -1,16 +1,9 @@
# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
-from testtools.matchers import raises
-
from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
-from lp.services.features.testing import FeatureFixture
-from lp.services.gpg.interfaces import (
- GPG_DATABASE_READONLY_FEATURE_FLAG,
- GPGReadOnly,
- )
from lp.services.identity.interfaces.account import AccountStatus
from lp.services.identity.interfaces.emailaddress import EmailAddressStatus
from lp.services.verification.browser.logintoken import (
@@ -82,28 +75,6 @@
self.assertEquals(harness.view.next_url, self.expected_next_url)
-class LoginTokenReadOnlyTests(TestCaseWithFactory):
-
- layer = LaunchpadFunctionalLayer
-
- def test_continue_action_failed_with_gpg_database_in_ro_mode(self):
- self.useFixture(FeatureFixture({
- GPG_DATABASE_READONLY_FEATURE_FLAG: True,
- }))
- person = self.factory.makePerson(name='test-user')
- email = removeSecurityProxy(person).preferredemail.email
- gpg_key = self.factory.makeGPGKey(person)
- token = getUtility(ILoginTokenSet).new(
- person, email, email, LoginTokenType.VALIDATEGPG,
- fingerprint=gpg_key.fingerprint)
-
- harness = LaunchpadFormHarness(token, ValidateGPGKeyView)
- self.assertThat(
- lambda: harness.submit('continue', {}),
- raises(GPGReadOnly)
- )
-
-
class TestClaimTeamView(TestCaseWithFactory):
"""Test the claiming of a team via login token."""
=== modified file 'lib/lp/testing/factory.py'
--- lib/lp/testing/factory.py 2016-10-11 15:28:25 +0000
+++ lib/lp/testing/factory.py 2016-11-03 15:22:08 +0000
@@ -224,7 +224,6 @@
SSHKeyType,
)
from lp.registry.model.commercialsubscription import CommercialSubscription
-from lp.registry.model.gpgkey import GPGServiceKey
from lp.registry.model.karma import KarmaTotalCache
from lp.registry.model.milestone import Milestone
from lp.registry.model.suitesourcepackage import SuiteSourcePackage
@@ -240,12 +239,8 @@
)
from lp.services.database.policy import MasterDatabasePolicy
from lp.services.database.sqlbase import flush_database_updates
-from lp.services.features import getFeatureFlag
from lp.services.gpg.interfaces import (
- GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG,
- GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG,
GPGKeyAlgorithm,
- IGPGClient,
IGPGHandler,
)
from lp.services.identity.interfaces.account import (
@@ -613,17 +608,6 @@
algorithm=GPGKeyAlgorithm.R,
active=True,
can_encrypt=False)
- if getFeatureFlag(GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG):
- client = getUtility(IGPGClient)
- openid_identifier = keyset.getOwnerIdForPerson(owner)
- client.addKeyForTest(
- openid_identifier, key.keyid, key.fingerprint, key.keysize,
- key.algorithm.name, key.active, key.can_encrypt)
- # Sadly client.addKeyForTest does not return the key that
- # was added:
- if getFeatureFlag(GPG_READ_FROM_GPGSERVICE_FEATURE_FLAG):
- return GPGServiceKey(
- client.getKeyByFingerprint(key.fingerprint))
return key
def makePerson(
=== modified file 'lib/lp/testing/gpgkeys/__init__.py'
--- lib/lp/testing/gpgkeys/__init__.py 2016-03-17 23:32:28 +0000
+++ lib/lp/testing/gpgkeys/__init__.py 2016-11-03 15:22:08 +0000
@@ -27,11 +27,8 @@
from lp.registry.interfaces.gpg import IGPGKeySet
from lp.registry.interfaces.person import IPersonSet
-from lp.services.features import getFeatureFlag
from lp.services.gpg.interfaces import (
- GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG,
GPGKeyAlgorithm,
- IGPGClient,
IGPGHandler,
)
@@ -74,12 +71,6 @@
keysize=key.keysize,
algorithm=GPGKeyAlgorithm.items[key.algorithm],
active=(not key.revoked))
- if getFeatureFlag(GPG_WRITE_TO_GPGSERVICE_FEATURE_FLAG):
- client = getUtility(IGPGClient)
- openid_identifier = keyset.getOwnerIdForPerson(person)
- client.addKeyForTest(
- openid_identifier, key.keyid, key.fingerprint, key.keysize,
- key.algorithm.name, key.active, key.can_encrypt)
def iter_test_key_emails():
=== removed directory 'lib/lp/testing/gpgservice'
=== removed file 'lib/lp/testing/gpgservice/__init__.py'
--- lib/lp/testing/gpgservice/__init__.py 2016-02-16 03:12:53 +0000
+++ lib/lp/testing/gpgservice/__init__.py 1970-01-01 00:00:00 +0000
@@ -1,10 +0,0 @@
-# Copyright 2016 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-from __future__ import absolute_import
-
-from . _fixture import GPGKeyServiceFixture
-
-__all__ = [
- 'GPGKeyServiceFixture'
- ]
=== removed file 'lib/lp/testing/gpgservice/_fixture.py'
--- lib/lp/testing/gpgservice/_fixture.py 2016-03-30 03:14:02 +0000
+++ lib/lp/testing/gpgservice/_fixture.py 1970-01-01 00:00:00 +0000
@@ -1,157 +0,0 @@
-# Copyright 2016 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-from __future__ import absolute_import
-
-from ConfigParser import SafeConfigParser
-import httplib
-import json
-import os.path
-import socket
-from StringIO import StringIO
-import subprocess
-from tempfile import NamedTemporaryFile
-from textwrap import dedent
-import time
-
-from fixtures import Fixture
-
-from lp.services.config import config
-
-__metaclass__ = type
-
-
-class GPGKeyServiceFixture(Fixture):
-
- """Run the gpgservice webapp and test key server."""
-
- def __init__(self, config_fixture=None):
- """Create a new GPGKeyServiceFixture.
-
- :param config_fixture: If specified, this must be a ConfigFixture
- instance. It will be updated with the relevant GPG service config
- details.
- """
- self._config_fixture = config_fixture
-
- def setUp(self):
- super(GPGKeyServiceFixture, self).setUp()
- # Write service config to a file on disk. This file gets deleted
- # when the fixture ends.
- service_config = _get_default_service_config()
- self._config_file = NamedTemporaryFile()
- self.addCleanup(self._config_file.close)
- service_config.write(self._config_file)
- self._config_file.flush()
-
- # Set the environment variable that tells gpgservice where to read its
- # config file from:
- env = os.environ.copy()
- env['GPGSERVICE_CONFIG_PATH'] = self._config_file.name
-
- gunicorn_path = os.path.join(
- config.root, 'bin', 'gunicorn-for-gpgservice')
- self.interface = '127.0.0.1'
- self.port = _get_unused_port()
- gunicorn_options = ['-b', self.bind_address]
- wsgi_app_name = 'gpgservice.webapp:app'
-
- self._process = subprocess.Popen(
- args=[gunicorn_path] + gunicorn_options + [wsgi_app_name],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env=env)
- self.addCleanup(self._kill_server)
- self._wait_for_service_start()
- self.reset_service_database()
- if self._config_fixture is not None:
- config_section = service_config = dedent("""\
- [gpgservice]
- api_endpoint: http://%s/
- """ % self.bind_address)
- self._config_fixture.add_section(config_section)
- config.reloadConfig()
-
- def _kill_server(self):
- self._process.terminate()
- stdout, stderr = self._process.communicate()
- self.addDetail('gpgservice-stdout', stdout)
- self.addDetail('gpgservice-stderr', stderr)
-
- def _wait_for_service_start(self):
- errors = []
- for i in range(50):
- conn = httplib.HTTPConnection(self.bind_address)
- try:
- conn.request('GET', '/')
- except socket.error as e:
- errors.append(e)
- else:
- resp = conn.getresponse()
- if resp.status == 200:
- return
- time.sleep(0.1)
- raise RuntimeError("Service not responding: %r" % errors)
-
- def reset_service_database(self):
- """Reset the gpgservice instance database to the launchpad sampledata.
- """
- conn = httplib.HTTPConnection(self.bind_address)
- test_data = {
- 'keys': [
- {
- 'owner':
- config.launchpad.openid_provider_root
- + '+id/name16_oid',
- 'id': '12345678',
- 'fingerprint': 'ABCDEF0123456789ABCDDCBA0000111112345678',
- 'size': 1024,
- 'algorithm': 'D',
- 'can_encrypt': True,
- 'enabled': True,
- }
- ]
- }
- headers = {'Content-Type': 'application/json'}
- conn.request('POST', '/test/reset_db', json.dumps(test_data), headers)
- resp = conn.getresponse()
- body = resp.read()
- if resp.status != 200:
- raise RuntimeError("Could not reset database: %s" % body)
-
- @property
- def bind_address(self):
- return '%s:%d' % (self.interface, self.port)
-
-
-def _get_default_service_config():
- config = SafeConfigParser()
- config.readfp(StringIO(dedent("""\
- [gpghandler]
- host: localhost
- public_host: keyserver.ubuntu.com
- upload_keys: True
- port: 11371
- timeout: 5.0
- maximum_upload_size: 16777216
- enable_test_endpoint: true
-
- [database]
- type: sqlite
- """)))
- return config
-
-
-def _get_unused_port():
- """Find and return an unused port
-
- There is a small race condition here (between the time we allocate the
- port, and the time it actually gets used), but for the purposes for which
- this function gets used it isn't a problem in practice.
- """
- s = socket.socket()
- try:
- s.bind(('localhost', 0))
- return s.getsockname()[1]
- finally:
- s.close()
=== removed directory 'lib/lp/testing/gpgservice/tests'
=== removed file 'lib/lp/testing/gpgservice/tests/__init__.py'
=== removed file 'lib/lp/testing/gpgservice/tests/test_fixture.py'
--- lib/lp/testing/gpgservice/tests/test_fixture.py 2016-03-30 03:14:02 +0000
+++ lib/lp/testing/gpgservice/tests/test_fixture.py 1970-01-01 00:00:00 +0000
@@ -1,69 +0,0 @@
-# Copyright 2016 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-from __future__ import absolute_import
-
-import base64
-import json
-import httplib
-
-from testtools import TestCase
-from testtools.matchers import (
- Contains,
- HasLength,
- Not,
- PathExists,
- StartsWith,
-)
-
-from lp.services.config import config
-from lp.services.config.fixture import (
- ConfigFixture,
- ConfigUseFixture,
- )
-from lp.testing.layers import BaseLayer
-from lp.testing.gpgservice import GPGKeyServiceFixture
-
-
-class GPGServiceFixtureTests(TestCase):
-
- layer = BaseLayer
-
- def test_fixture_writes_and_deletes_service_config_file(self):
- fixture = GPGKeyServiceFixture()
- with fixture:
- config_file_path = fixture._config_file.name
- self.assertThat(config_file_path, PathExists())
- self.assertThat(config_file_path, Not(PathExists()))
-
- def test_fixture_starts_gpgservice(self):
- fixture = self.useFixture(GPGKeyServiceFixture())
- conn = httplib.HTTPConnection(fixture.bind_address)
- conn.request('GET', '/')
- resp = conn.getresponse()
- self.assertEqual(200, resp.status)
- self.assertThat(resp.read(), StartsWith('gpgservice - Copyright'))
-
- def test_fixture_can_create_test_data(self):
- fixture = self.useFixture(GPGKeyServiceFixture())
- conn = httplib.HTTPConnection(fixture.bind_address)
- user = base64.b64encode(
- config.launchpad.openid_provider_root + '+id/name16_oid',
- altchars='-_')
- conn.request('GET', '/users/%s/keys' % user)
- resp = conn.getresponse()
- self.assertEqual(200, resp.status)
- data = json.loads(resp.read())
- self.assertThat(data, Contains('keys'))
- self.assertThat(data['keys'], HasLength(1))
-
- def test_fixture_can_set_config_data(self):
- config_name = self.getUniqueString()
- config_fixture = self.useFixture(
- ConfigFixture(config_name, BaseLayer.config_fixture.instance_name))
- self.useFixture(ConfigUseFixture(config_name))
- gpg_fixture = self.useFixture(GPGKeyServiceFixture(config_fixture))
-
- self.assertEqual(
- 'http://%s/' % gpg_fixture.bind_address,
- config.gpgservice.api_endpoint)
=== modified file 'lib/lp/testing/layers.py'
--- lib/lp/testing/layers.py 2016-02-22 23:43:33 +0000
+++ lib/lp/testing/layers.py 2016-11-03 15:22:08 +0000
@@ -28,7 +28,6 @@
'FunctionalLayer',
'GoogleLaunchpadFunctionalLayer',
'GoogleServiceLayer',
- 'GPGServiceLayer',
'LaunchpadFunctionalLayer',
'LaunchpadLayer',
'LaunchpadScriptLayer',
@@ -118,7 +117,6 @@
from lp.services.googlesearch.tests.googleserviceharness import (
GoogleServiceTestSetup,
)
-from lp.services.gpg.interfaces import IGPGClient
from lp.services.job.tests import celery_worker
from lp.services.librarian.model import LibraryFileAlias
from lp.services.librarianserver.testing.server import LibrarianServerFixture
@@ -150,7 +148,6 @@
logout,
reset_logging,
)
-from lp.testing.gpgservice import GPGKeyServiceFixture
from lp.testing.pgsql import PgTestSetup
from lp.testing.smtpd import SMTPController
@@ -1165,45 +1162,6 @@
logout()
-class GPGServiceLayer(BaseLayer):
-
- service_fixture = None
- gpgservice_needs_reset = False
-
- @classmethod
- @profiled
- def setUp(cls):
- gpg_client = removeSecurityProxy(getUtility(IGPGClient))
- gpg_client.registerWriteHook(cls._on_gpgservice_write)
- cls.service_fixture = GPGKeyServiceFixture(BaseLayer.config_fixture)
- cls.service_fixture.setUp()
-
- @classmethod
- @profiled
- def tearDown(cls):
- gpg_client = removeSecurityProxy(getUtility(IGPGClient))
- gpg_client.unregisterWriteHook(cls._on_gpgservice_write)
- cls.service_fixture.cleanUp()
- cls.service_fixture = None
- logout()
-
- @classmethod
- @profiled
- def testSetUp(cls):
- pass
-
- @classmethod
- @profiled
- def testTearDown(cls):
- if cls.gpgservice_needs_reset:
- cls.service_fixture.reset_service_database()
- cls.gpgservice_needs_reset = False
-
- @classmethod
- def _on_gpgservice_write(cls):
- cls.gpgservice_needs_reset = True
-
-
class TwistedLayer(BaseLayer):
"""A layer for cleaning up the Twisted thread pool."""
@@ -1332,7 +1290,7 @@
disconnect_stores()
-class LaunchpadFunctionalLayer(LaunchpadLayer, FunctionalLayer, GPGServiceLayer):
+class LaunchpadFunctionalLayer(LaunchpadLayer, FunctionalLayer):
"""Provides the Launchpad Zope3 application server environment."""
@classmethod
@@ -1496,7 +1454,7 @@
host = 'localhost'
-class LaunchpadZopelessLayer(LaunchpadScriptLayer, GPGServiceLayer):
+class LaunchpadZopelessLayer(LaunchpadScriptLayer):
"""Full Zopeless environment including Component Architecture and
database connections initialized.
"""
=== removed file 'scripts/gpgkey-export.py'
--- scripts/gpgkey-export.py 2016-03-01 21:20:12 +0000
+++ scripts/gpgkey-export.py 1970-01-01 00:00:00 +0000
@@ -1,53 +0,0 @@
-#!/usr/bin/python -S
-#
-# Copyright 2016 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-import _pythonpath
-
-import sys
-import json
-
-from zope.component import getUtility
-
-from lp.registry.interfaces.gpg import IGPGKeySet
-from lp.registry.model.gpgkey import GPGKey
-from lp.services.database.interfaces import IStore
-from lp.services.scripts.base import LaunchpadScript
-
-
-class GPGKeyExportScript(LaunchpadScript):
-
- description = "Export GPG keys as json."
-
- def add_my_options(self):
- self.parser.add_option(
- '-o', '--output', metavar='FILE', action='store',
- help='Export keys to this file', type='string', dest='output')
-
- def main(self):
- output = sys.stdout
- if self.options.output is not None:
- output = open(self.options.output, 'wb')
- json.dump(get_keys_as_json(), output)
- output.write('\n')
-
-
-def get_keys_as_json():
- keys = []
- keyset = getUtility(IGPGKeySet)
- for gpg_key in IStore(GPGKey).find(GPGKey):
- key_data = {
- 'owner': keyset.getOwnerIdForPerson(gpg_key.owner),
- 'id': gpg_key.id,
- 'fingerprint': gpg_key.fingerprint,
- 'size': gpg_key.keysize,
- 'algorithm': gpg_key.algorithm.name,
- 'enabled': gpg_key.active,
- 'can_encrypt': gpg_key.can_encrypt,
- }
- keys.append(key_data)
- return keys
-
-
-if __name__ == '__main__':
- GPGKeyExportScript("gpgkey-export").run()
=== modified file 'setup.py'
--- setup.py 2016-07-13 10:13:30 +0000
+++ setup.py 2016-11-03 15:22:08 +0000
@@ -41,7 +41,6 @@
'FeedParser',
'feedvalidator',
'funkload',
- 'gpgservice-client',
'html5browser',
'httmock',
'pygpgme',
=== modified file 'versions.cfg'
--- versions.cfg 2016-09-21 02:51:38 +0000
+++ versions.cfg 2016-11-03 15:22:08 +0000
@@ -38,8 +38,6 @@
flask = 0.10.1
FormEncode = 1.2.4
funkload = 1.16.1
-gpgservice = 0.1.2
-gpgservice-client = 0.0.10
grokcore.component = 1.6
gunicorn = 19.4.5
html5browser = 0.0.9
Follow ups