launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #04700
[Merge] lp:~lifeless/launchpad/bug-830789 into lp:launchpad
Robert Collins has proposed merging lp:~lifeless/launchpad/bug-830789 into lp:launchpad.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #830789 in Launchpad itself: "keyringtrustanalyzer unused"
https://bugs.launchpad.net/launchpad/+bug/830789
For more details, see:
https://code.launchpad.net/~lifeless/launchpad/bug-830789/+merge/72371
Unused code can go to code heaven.
--
https://code.launchpad.net/~lifeless/launchpad/bug-830789/+merge/72371
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~lifeless/launchpad/bug-830789 into lp:launchpad.
=== modified file 'lib/canonical/launchpad/interfaces/gpghandler.py'
--- lib/canonical/launchpad/interfaces/gpghandler.py 2011-05-30 12:45:29 +0000
+++ lib/canonical/launchpad/interfaces/gpghandler.py 2011-08-22 01:00:28 +0000
@@ -199,14 +199,6 @@
:return: a `PymeKey` object for the just-generated secret key.
"""
- def importKeyringFile(filepath):
- """Import the keyring filepath into the local key database.
-
- :param filepath: the path to a keyring to import.
-
- :returns: a list of the imported keys.
- """
-
def encryptContent(content, fingerprint):
"""Encrypt the given content for the given fingerprint.
@@ -271,15 +263,6 @@
:raise AssertionError: if the POST request doesn't succeed.
"""
- def checkTrustDb():
- """Check whether the OpenPGP trust database is up to date.
-
- The method automatically rebuild the trust values if necessary.
-
- The results will be visible in any new retrieved key objects.
- Existing key objects will not reflect the new trust value.
- """
-
def localKeys(filter=None, secret=False):
"""Return an iterator of all keys locally known about.
@@ -340,9 +323,6 @@
can_authenticate = Attribute(
"Whether the key can be used for authentication")
- def setOwnerTrust(value):
- """Set the owner_trust value for this key."""
-
def export():
"""Export the context key in ASCII-armored mode.
=== removed file 'lib/canonical/launchpad/scripts/ftests/test_keyringtrustanalyser.py'
--- lib/canonical/launchpad/scripts/ftests/test_keyringtrustanalyser.py 2011-08-12 11:19:40 +0000
+++ lib/canonical/launchpad/scripts/ftests/test_keyringtrustanalyser.py 1970-01-01 00:00:00 +0000
@@ -1,292 +0,0 @@
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-import logging
-import unittest
-
-import gpgme
-from zope.component import getUtility
-
-from canonical.launchpad.ftests import keys_for_tests
-from canonical.launchpad.interfaces.emailaddress import (
- EmailAddressStatus,
- IEmailAddressSet,
- )
-from canonical.launchpad.interfaces.gpghandler import IGPGHandler
-from canonical.testing.layers import LaunchpadZopelessLayer
-from lp.registry.interfaces.person import IPersonSet
-from lp.registry.scripts.keyringtrustanalyser import (
- addOtherKeyring,
- addTrustedKeyring,
- findEmailClusters,
- getValidUids,
- mergeClusters,
- )
-
-
-test_fpr = 'A419AE861E88BC9E04B9C26FBA2B9389DFD20543'
-foobar_fpr = '340CA3BB270E2716C9EE0B768E7EB7086C64A8C5'
-
-
-class LogCollector(logging.Handler):
-
- def __init__(self):
- logging.Handler.__init__(self)
- self.records = []
-
- def emit(self, record):
- self.records.append(self.format(record))
-
-
-def setupLogger(name='test_keyringtrustanalyser'):
- """Set up the named logger to collect log messages.
-
- Returns (logger, handler)
- """
- logger = logging.getLogger(name)
- for handler in logger.handlers[:]:
- logger.removeHandler(handler)
- handler.flush()
- handler.close()
- handler = LogCollector()
- handler.setFormatter(logging.Formatter("%(levelname)s:%(message)s"))
- logger.addHandler(handler)
- return logger, handler
-
-
-class TestKeyringTrustAnalyser(unittest.TestCase):
- layer = LaunchpadZopelessLayer
-
- def setUp(self):
- self.gpg_handler = getUtility(IGPGHandler)
-
- def tearDown(self):
- # XXX stub 2005-10-27: this should be a zope test cleanup
- # thing per SteveA.
- self.gpg_handler.resetLocalState()
-
- def _addTrustedKeys(self):
- # Add trusted key with ULTIMATE validity. This will mark UIDs as
- # valid with a single signature, which is appropriate with the
- # small amount of test data.
- filename = keys_for_tests.test_pubkey_file_from_email(
- 'test@xxxxxxxxxxxxx')
- addTrustedKeyring(filename, gpgme.VALIDITY_ULTIMATE)
-
- def _addUntrustedKeys(self):
- for ring in keys_for_tests.test_keyrings():
- addOtherKeyring(ring)
-
- def testAddTrustedKeyring(self):
- """Test addTrustedKeyring"""
- self._addTrustedKeys()
-
- # get key from keyring
- keys = [key for key in self.gpg_handler.localKeys()
- if key.fingerprint == test_fpr]
- self.assertEqual(len(keys), 1)
- key = keys[0]
- self.assertTrue('test@xxxxxxxxxxxxx' in key.emails)
- self.assertEqual(key.owner_trust, gpgme.VALIDITY_ULTIMATE)
-
- def testAddOtherKeyring(self):
- """Test addOtherKeyring"""
- self._addUntrustedKeys()
- fingerprints = set(key.fingerprint
- for key in self.gpg_handler.localKeys())
- self.assertTrue(test_fpr in fingerprints)
- self.assertTrue(foobar_fpr in fingerprints)
-
- def testGetValidUids(self):
- """Test getValidUids"""
- self._addTrustedKeys()
- self._addUntrustedKeys()
-
- # calculate valid UIDs
- validuids = list(getValidUids())
-
- # test@xxxxxxxxxxxxx's non-revoked UIDs are valid
- self.assertTrue((test_fpr, 'test@xxxxxxxxxxxxx') in validuids)
- self.assertTrue((test_fpr,
- 'sample.person@xxxxxxxxxxxxx') in validuids)
- self.assertTrue((test_fpr, 'sample.revoked@xxxxxxxxxxxxx')
- not in validuids)
-
- # foo.bar@xxxxxxxxxxxxx's non-revoked signed UIDs are valid
- self.assertTrue((foobar_fpr, 'foo.bar@xxxxxxxxxxxxx') in validuids)
- self.assertTrue((foobar_fpr,
- 'revoked@xxxxxxxxxxxxx') not in validuids)
- self.assertTrue((foobar_fpr, 'untrusted@xxxxxxxxxxxxx')
- not in validuids)
-
- def testFindEmailClusters(self):
- """Test findEmailClusters"""
- self._addTrustedKeys()
- self._addUntrustedKeys()
-
- clusters = list(findEmailClusters())
-
- # test@xxxxxxxxxxxxx is ultimately trusted, so its non-revoked keys
- # form a cluster
- self.assertTrue(set(['test@xxxxxxxxxxxxx',
- 'sample.person@xxxxxxxxxxxxx']) in clusters)
-
- # foobar has only one signed, non-revoked key
- self.assertTrue(set(['foo.bar@xxxxxxxxxxxxx']) in clusters)
-
-
-class TestMergeClusters(unittest.TestCase):
- """Tests of the mergeClusters() routine."""
- layer = LaunchpadZopelessLayer
-
- def _getEmails(self, person):
- emailset = getUtility(IEmailAddressSet)
- return set(address.email for address in emailset.getByPerson(person))
-
- def testNullMerge(self):
- """Test that a merge with an empty sequence of clusters works"""
- mergeClusters([])
-
- def testMergeOneAccountNoNewEmails(self):
- """Test that merging a single email address does not affect an
- account.
- """
- person = getUtility(IPersonSet).getByEmail('test@xxxxxxxxxxxxx')
- emails = self._getEmails(person)
- self.assertTrue('test@xxxxxxxxxxxxx' in emails)
- self.assertEqual(person.merged, None)
-
- mergeClusters([set(['test@xxxxxxxxxxxxx'])])
- self.assertEqual(person.merged, None)
- self.assertEqual(self._getEmails(person), emails)
-
- def testMergeOneAccountAddEmails(self):
- """Test that merging a cluster containing new email addresses adds
- those emails.
- """
- personset = getUtility(IPersonSet)
- emailset = getUtility(IEmailAddressSet)
-
- person = personset.getByEmail('test@xxxxxxxxxxxxx')
- self.assertEqual(person.merged, None)
- # make sure newemail doesn't exist
- self.assertEqual(personset.getByEmail('newemail@xxxxxxxxxxxxx'), None)
-
- mergeClusters([set(['test@xxxxxxxxxxxxx', 'newemail@xxxxxxxxxxxxx'])])
- self.assertEqual(person.merged, None)
- emails = self._getEmails(person)
-
- # both email addresses associated with account ...
- self.assertTrue('test@xxxxxxxxxxxxx' in emails)
- self.assertTrue('newemail@xxxxxxxxxxxxx' in emails)
-
- address = emailset.getByEmail('newemail@xxxxxxxxxxxxx')
- self.assertEqual(address.email, 'newemail@xxxxxxxxxxxxx')
- self.assertEqual(address.personID, person.id)
- self.assertEqual(address.status, EmailAddressStatus.NEW)
-
- def testMergeUnvalidatedAccountWithValidated(self):
- """Test merging an unvalidated account with a validated account."""
- personset = getUtility(IPersonSet)
-
- validated_person = personset.getByEmail('test@xxxxxxxxxxxxx')
- unvalidated_person = personset.getByEmail(
- 'matsubara@xxxxxxxxxxxx')
-
- allemails = self._getEmails(validated_person)
- allemails.update(self._getEmails(unvalidated_person))
-
- self.assertNotEqual(validated_person, unvalidated_person)
-
- self.assertNotEqual(validated_person.preferredemail, None)
- self.assertEqual(unvalidated_person.preferredemail, None)
-
- self.assertEqual(validated_person.merged, None)
- self.assertEqual(unvalidated_person.merged, None)
-
- mergeClusters([set(['test@xxxxxxxxxxxxx',
- 'matsubara@xxxxxxxxxxxx'])])
-
- # unvalidated person has been merged into the validated person
- self.assertEqual(validated_person.merged, None)
- self.assertEqual(unvalidated_person.merged, validated_person)
-
- # all email addresses are now associated with the valid person
- self.assertEqual(self._getEmails(validated_person), allemails)
- self.assertEqual(self._getEmails(unvalidated_person), set())
-
- def testMergeTwoValidatedAccounts(self):
- """Test merging of two validated accounts. This should do
- nothing, since both accounts are in use.
- """
- personset = getUtility(IPersonSet)
-
- person1 = personset.getByEmail('test@xxxxxxxxxxxxx')
- person2 = personset.getByEmail('foo.bar@xxxxxxxxxxxxx')
- self.assertNotEqual(person1, person2)
-
- self.assertNotEqual(person1.preferredemail, None)
- self.assertNotEqual(person2.preferredemail, None)
-
- self.assertEqual(person1.merged, None)
- self.assertEqual(person2.merged, None)
-
- logger, collector = setupLogger()
- mergeClusters([set(['test@xxxxxxxxxxxxx', 'foo.bar@xxxxxxxxxxxxx'])],
- logger=logger)
-
- self.assertEqual(person1.merged, None)
- self.assertEqual(person2.merged, None)
-
- messages = collector.records
- self.assertNotEqual(messages, [])
- self.assertTrue(messages[0].startswith('WARNING:Multiple validated '
- 'user accounts'))
-
- def testMergeTwoUnvalidatedAccounts(self):
- """Test merging of two unvalidated accounts. This will pick
- one account and merge the others into it (since none of the
- accounts have been used, there is no need to favour one over
- the other).
- """
- personset = getUtility(IPersonSet)
-
- person1 = personset.getByEmail('matsubara@xxxxxxxxxxxx')
- person2 = personset.getByEmail('martin.pitt@xxxxxxxxxxxxx')
-
- allemails = self._getEmails(person1)
- allemails.update(self._getEmails(person2))
-
- self.assertEqual(person1.preferredemail, None)
- self.assertEqual(person2.preferredemail, None)
-
- self.assertEqual(person1.merged, None)
- self.assertEqual(person2.merged, None)
-
- mergeClusters([set(['matsubara@xxxxxxxxxxxx',
- 'martin.pitt@xxxxxxxxxxxxx'])])
-
- # since we don't know which account will be merged, swap
- # person1 and person2 if person1 was merged into person2.
- if person1.merged is not None:
- person1, person2 = person2, person1
-
- # one account is merged into the other
- self.assertEqual(person1.merged, None)
- self.assertEqual(person2.merged, person1)
-
- self.assertEqual(self._getEmails(person1), allemails)
- self.assertEqual(self._getEmails(person2), set())
-
- def testMergeUnknownEmail(self):
- """Merging a cluster of unknown emails creates an account."""
- personset = getUtility(IPersonSet)
-
- self.assertEqual(personset.getByEmail('newemail@xxxxxxxxxxxxx'), None)
-
- mergeClusters([set(['newemail@xxxxxxxxxxxxx'])])
-
- person = personset.getByEmail('newemail@xxxxxxxxxxxxx')
- self.assertNotEqual(person, None)
- self.assertEqual(person.preferredemail, None)
- self.assertTrue('newemail@xxxxxxxxxxxxx' in self._getEmails(person))
=== modified file 'lib/canonical/launchpad/utilities/ftests/test_gpghandler.py'
--- lib/canonical/launchpad/utilities/ftests/test_gpghandler.py 2011-08-16 20:35:11 +0000
+++ lib/canonical/launchpad/utilities/ftests/test_gpghandler.py 2011-08-22 01:00:28 +0000
@@ -149,54 +149,6 @@
"""Do we have the expected test keyring files"""
self.assertEqual(len(list(keys_for_tests.test_keyrings())), 1)
- def testImportKeyRing(self):
- """Import a sample keyring and check its contents are available."""
- self.testEmptyGetKeys()
- importedkeys = set()
- for ring in keys_for_tests.test_keyrings():
- keys = self.gpg_handler.importKeyringFile(ring)
- importedkeys.update(key.fingerprint for key in keys)
-
- # check that expected keys are in importedkeys set
- self.assertTrue("340CA3BB270E2716C9EE0B768E7EB7086C64A8C5"
- in importedkeys)
- self.assertTrue("A419AE861E88BC9E04B9C26FBA2B9389DFD20543"
- in importedkeys)
-
- # check that importedkeys are in key ring
- keyring = set(key.fingerprint
- for key in self.gpg_handler.localKeys())
- self.assertNotEqual(len(keyring), 0)
- self.assertTrue(importedkeys.issubset(keyring))
-
- def testSetOwnerTrust(self):
- """Import a key and set the ownertrust."""
- self.testEmptyGetKeys()
- for email in keys_for_tests.iter_test_key_emails():
- pubkey = keys_for_tests.test_pubkey_from_email(email)
- self.gpg_handler.importPublicKey(pubkey)
-
- iterator = self.gpg_handler.localKeys()
- key = iterator.next()
- self.assertEqual(key.owner_trust, gpgme.VALIDITY_UNKNOWN)
- key.setOwnerTrust(gpgme.VALIDITY_FULL)
- self.assertEqual(key.owner_trust, gpgme.VALIDITY_FULL)
- other_iterator = self.gpg_handler.localKeys()
- other_key_instance = other_iterator.next()
- self.assertEqual(key.owner_trust, other_key_instance.owner_trust)
-
- def testCheckTrustDb(self):
- """Test IGPGHandler.checkTrustDb()"""
- self.testEmptyGetKeys()
-
- # check trust DB with no keys succeeds
- self.assertEqual(self.gpg_handler.checkTrustDb(), 0)
-
- # add some keys and check trust DB again
- for ring in keys_for_tests.test_keyrings():
- self.gpg_handler.importKeyringFile(ring)
- self.assertEqual(self.gpg_handler.checkTrustDb(), 0)
-
def testHomeDirectoryJob(self):
"""Does the job to touch the home work."""
gpghandler = getUtility(IGPGHandler)
=== modified file 'lib/canonical/launchpad/utilities/gpghandler.py'
--- lib/canonical/launchpad/utilities/gpghandler.py 2011-06-01 13:32:02 +0000
+++ lib/canonical/launchpad/utilities/gpghandler.py 2011-08-22 01:00:28 +0000
@@ -344,18 +344,6 @@
return key
- def importKeyringFile(self, filepath):
- """See IGPGHandler.importKeyringFile."""
- ctx = gpgme.Context()
- data = open(filepath, 'r')
- result = ctx.import_(data)
- # if not considered -> format wasn't recognized
- # no key was imported
- if result.considered == 0:
- raise ValueError('Empty or invalid keyring')
- return [PymeKey(fingerprint)
- for (fingerprint, result, status) in result.imports]
-
def encryptContent(self, content, fingerprint):
"""See IGPGHandler."""
if isinstance(content, unicode):
@@ -541,16 +529,6 @@
url = self.getURLForKeyInServer(fingerprint, action)
return urlfetch(url)
- def checkTrustDb(self):
- """See IGPGHandler"""
- p = subprocess.Popen(['gpg', '--check-trustdb', '--batch', '--yes'],
- close_fds=True,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- p.communicate()
- return p.returncode
-
class PymeSignature(object):
"""See IPymeSignature."""
@@ -617,19 +595,6 @@
self.emails = [uid.email for uid in self.uids
if valid_email(uid.email) and not uid.revoked]
- def setOwnerTrust(self, value):
- """Set the ownertrust on the actual gpg key"""
- if value not in (gpgme.VALIDITY_UNDEFINED, gpgme.VALIDITY_NEVER,
- gpgme.VALIDITY_MARGINAL, gpgme.VALIDITY_FULL,
- gpgme.VALIDITY_ULTIMATE):
- raise ValueError("invalid owner trust level")
- # edit the owner trust value on the key
- ctx = gpgme.Context()
- key = ctx.get_key(self.fingerprint.encode('ascii'), False)
- gpgme_editutil.edit_trust(ctx, key, value)
- # set the cached copy of owner_trust
- self.owner_trust = value
-
@property
def displayname(self):
return '%s%s/%s' % (self.keysize, self.algorithm, self.keyid)
=== removed file 'lib/lp/registry/scripts/keyringtrustanalyser.py'
--- lib/lp/registry/scripts/keyringtrustanalyser.py 2011-02-23 20:26:53 +0000
+++ lib/lp/registry/scripts/keyringtrustanalyser.py 1970-01-01 00:00:00 +0000
@@ -1,183 +0,0 @@
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-import gpgme
-from zope.component import getUtility
-
-from canonical.database.sqlbase import flush_database_updates
-from canonical.launchpad.interfaces.emailaddress import IEmailAddressSet
-from canonical.launchpad.interfaces.gpghandler import IGPGHandler
-from lp.app.validators.email import valid_email
-from lp.registry.interfaces.person import (
- IPersonSet,
- PersonCreationRationale,
- )
-
-
-__metaclass__ = type
-
-__all__ = [
- 'addTrustedKeyring',
- 'addOtherKeyring',
- 'getValidUids',
- 'findEmailClusters',
- 'mergeClusters',
- ]
-
-def addTrustedKeyring(filename, ownertrust=gpgme.VALIDITY_MARGINAL):
- """Add a keyring of keys owned by people trusted to make good signatues.
- """
- gpg = getUtility(IGPGHandler)
- keys = gpg.importKeyringFile(filename)
- for key in keys:
- key.setOwnerTrust(ownertrust)
-
-def addOtherKeyring(filename):
- """Add a keyring of possibly suspect keys"""
- gpg = getUtility(IGPGHandler)
- gpg.importKeyringFile(filename)
-
-def getValidUids(minvalid=gpgme.VALIDITY_MARGINAL):
- """Returns an iterator yielding (fingerprint, email) pairs.
-
- Only UIDs assigned a validity of at least 'minvalid' are returned.
- """
- gpg = getUtility(IGPGHandler)
- gpg.checkTrustDb()
- for key in gpg.localKeys():
- for uid in key.uids:
- if (not uid.revoked and valid_email(uid.email) and
- uid.validity >= minvalid):
- yield key.fingerprint, uid.email
-
-def findEmailClusters(minvalid=gpgme.VALIDITY_MARGINAL):
- """Returns an iterator yielding sets of related email addresses.
-
- Two email addresses are considered to be related if they appear as
- valid user IDs on a PGP key in the keyring.
- """
- emails = {} # fingerprint -> set(emails)
- fingerprints = {} # email -> set(fingerprints)
-
- # get the valid UIDs
- for fpr, email in getValidUids(minvalid):
- fingerprints.setdefault(email, set()).add(fpr)
- emails.setdefault(fpr, set()).add(email)
-
- # find clusters of keys based on the presence of shared valid UIDs
- clusters = {} # fingerprint -> set(fingerprints)
- for fprs in fingerprints.itervalues():
- cluster = fprs.copy()
- for fpr in fprs:
- x = clusters.get(fpr)
- if x is not None:
- cluster.update(x)
- for fpr in cluster:
- clusters[fpr] = cluster
-
- # return email addresses belonging to each key cluster
- for cluster in clusters.itervalues():
- email_cluster = set()
- for fpr in cluster:
- email_cluster.update(emails[fpr])
- yield email_cluster
-
-def _mergeOrAddEmails(personset, emailset, cluster, logger):
- """Helper function for mergeClusters()
-
- The strategy for merging clusters is as follows:
- 1. Find all Person objects attached to the given email addresses.
- 2. If there is more than one Person object associated with the cluster,
- merge them into one (merge into Person with preferred email).
- 3. If there are no Person objects associated with the cluster, create
- a new person.
- 4. For each email address not associated with the person or awaiting
- validation, add it to the person in state NEW (unvalidated).
-
- This algorithm does not handle the case where two accounts have a
- preferred email. This situation would indicate that users have
- logged in as both identities, and we don't want to kill accounts
- for no reason.
- """
- # get a list of Person objects associated with this address cluster
- people = set()
- for email in cluster:
- person = personset.getByEmail(email)
- if person:
- people.add(person)
-
- if len(people) > 1:
- # more than one Person object => account merge.
-
- # Check if any of the accounts have been used.
- # If one account has been used, we want to merge the others
- # into that one.
- # If more than one account has been used, bail.
-
- validpeople = set(person for person in people
- if person.preferredemail is not None)
- if len(validpeople) > 1:
- if logger:
- logger.warning('Multiple validated user accounts found for cluster %r: %s',
- cluster,
- ', '.join(['%s (%d)' % (person.name, person.id)
- for person in validpeople]))
- return None
- elif len(validpeople) == 1:
- person = validpeople.pop()
- people.remove(person)
- else:
- # no validated accounts -- pick one at random
- person = people.pop()
-
- # assign email addresses
- from zope.security.proxy import removeSecurityProxy
- for otherperson in people:
- for email in emailset.getByPerson(otherperson):
- # EmailAddress.person is a readonly field, so we need to
- # remove the security proxy here.
- removeSecurityProxy(email).personID = person.id
-
- # merge people
- for otherperson in people:
- if logger:
- logger.info('Merging %s (%d) into %s (%d)',
- otherperson.name, otherperson.id,
- person.name, person.id)
- personset.merge(otherperson, person)
-
- elif len(people) == 1:
- # one person: use that
- person = people.pop()
- else:
- # no person? create it.
- # We should have the display name from a key here ...
- person, email = personset.createPersonAndEmail(
- cluster.pop(), PersonCreationRationale.KEYRINGTRUSTANALYZER)
-
- # We now have one person. Now add the missing addresses:
- existing = set(email.email for email in emailset.getByPerson(person))
- existing.update(person.unvalidatedemails)
- for newemail in cluster.difference(existing):
- if logger:
- logger.info('Adding email %s to %s (%d)',
- newemail, person.name, person.id)
- emailset.new(email=newemail, person=person, account=person.account)
-
- flush_database_updates()
-
- return person
-
-def mergeClusters(clusters, ztm=None, logger=None):
- """Merge accounts for clusters of addresses.
-
- The first argument is an iterator returning sets of email addresses.
- """
- personset = getUtility(IPersonSet)
- emailset = getUtility(IEmailAddressSet)
- for cluster in clusters:
- if not cluster: continue
-
- if ztm: ztm.begin()
- _mergeOrAddEmails(personset, emailset, cluster, logger)
- if ztm: ztm.commit()
=== removed file 'scripts/find-email-clusters.py'
--- scripts/find-email-clusters.py 2010-04-27 19:48:39 +0000
+++ scripts/find-email-clusters.py 1970-01-01 00:00:00 +0000
@@ -1,101 +0,0 @@
-#!/usr/bin/python -S
-#
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-# pylint: disable-msg=W0403
-import _pythonpath
-
-import sys
-import logging
-import optparse
-
-import gpgme
-
-from canonical.launchpad.scripts import (
- execute_zcml_for_scripts, logger_options, logger as logger_from_options)
-from lp.registry.scripts.keyringtrustanalyser import (
- addOtherKeyring, addTrustedKeyring, findEmailClusters)
-
-
-validity_map = {
- 'UNDEFINED': gpgme.VALIDITY_UNDEFINED,
- 'NEVER': gpgme.VALIDITY_NEVER,
- 'MARGINAL': gpgme.VALIDITY_MARGINAL,
- 'FULL': gpgme.VALIDITY_FULL,
- 'ULTIMATE': gpgme.VALIDITY_ULTIMATE,
- }
-
-
-def main(argv):
- parser = optparse.OptionParser(
- usage="usage: %prog [options] keyrings ...",
- description="This script inferrs clusters of "
- "email addresses belonging to a single user "
- "from the user IDs attached to PGP keys.")
- parser.add_option('-o', '--output', metavar='FILE', action='store',
- help='Output clusters to given file',
- type='string', dest='output', default=None)
- parser.add_option('--trust', metavar='KEYRING', action='append',
- help='Trust the owners of keys on this keyring',
- type='string', dest='trust', default=[])
- parser.add_option('--owner-trust', metavar='TRUST', action='store',
- help='What level of trust to assign to trusted keys',
- type='string', dest='owner_trust', default='ULTIMATE')
- parser.add_option('--min-valid', metavar='TRUST', action='store',
- help='Minimum trust necessary for a user ID to '
- 'be considered valid',
- type='string', dest='minvalid', default='MARGINAL')
-
- logger_options(parser, logging.WARNING)
-
- options, args = parser.parse_args(argv[1:])
-
- # map validity options
- if options.owner_trust.upper() not in validity_map:
- sys.stderr.write('%s: unknown owner trust value %s'
- % (argv[0], options.owner_trust))
- return 1
- options.owner_trust = validity_map[options.owner_trust.upper()]
-
- if options.minvalid.upper() not in validity_map:
- sys.stderr.write('%s: unknown min valid value %s'
- % (argv[0], options.minvalid))
- return 1
- options.minvalid = validity_map[options.minvalid.upper()]
-
- # get logger
- logger = logger_from_options(options)
-
- if options.output is not None:
- logger.debug('openning %s', options.output)
- fp = open(options.output, 'w')
- else:
- fp = sys.stdout
-
- logger.info('Setting up utilities')
- execute_zcml_for_scripts()
-
- logger.info('Loading trusted keyrings')
- for keyring in options.trust:
- logger.info('Loading %s', keyring)
- addTrustedKeyring(keyring, options.owner_trust)
-
- logger.info('Loading other keyrings')
- for keyring in args:
- logger.info('Loading %s', keyring)
- addOtherKeyring(keyring)
-
- logger.info('Computing address clusters')
- for cluster in findEmailClusters(options.minvalid):
- for email in cluster:
- fp.write('%s\n' % email)
- fp.write('\n')
-
- logger.info('Done')
-
- return 0
-
-
-if __name__ == '__main__':
- sys.exit(main(sys.argv))
=== removed file 'scripts/merge-email-clusters.py'
--- scripts/merge-email-clusters.py 2010-04-27 19:48:39 +0000
+++ scripts/merge-email-clusters.py 1970-01-01 00:00:00 +0000
@@ -1,71 +0,0 @@
-#!/usr/bin/python -S
-#
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-# pylint: disable-msg=W0403
-import _pythonpath
-
-import sys
-import logging
-import optparse
-
-from canonical.lp import initZopeless
-from canonical.launchpad.scripts import (
- execute_zcml_for_scripts, logger_options, logger as logger_from_options)
-from lp.registry.scripts.keyringtrustanalyser import mergeClusters
-
-
-def readClusters(fp):
- """Read clusters of email addresses from the file (separated by blank
- lines), and yield them as sets."""
- cluster = set()
- for line in fp:
- line = line.strip()
- if line:
- cluster.add(line)
- elif cluster:
- yield cluster
- cluster = set()
- if cluster:
- yield cluster
-
-
-def main(argv):
- parser = optparse.OptionParser(
- description="This script reads a list of email address clusters. "
- "and updates the Launchpad database to match by adding email "
- "addresses to existing accounts, merging accounts and "
- "creating new accounts")
- parser.add_option('-i', '--input', metavar='FILE', action='store',
- help='Read clusters from the given file',
- type='string', dest='input', default=None)
-
- logger_options(parser, logging.WARNING)
-
- options, args = parser.parse_args(argv[1:])
-
- # get logger
- logger = logger_from_options(options)
-
- if options.input is not None:
- logger.debug('openning %s', options.input)
- fp = open(options.input, 'r')
- else:
- fp = sys.stdin
-
- logger.info('Setting up utilities')
- execute_zcml_for_scripts()
-
- logger.info('Connecting to database')
- ztm = initZopeless()
-
- mergeClusters(readClusters(fp), ztm, logger)
-
- logger.info('Done')
-
- return 0
-
-
-if __name__ == '__main__':
- sys.exit(main(sys.argv))