launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #04467
[Merge] lp:~stevenk/launchpad/kill-cruft into lp:launchpad
Steve Kowalik has proposed merging lp:~stevenk/launchpad/kill-cruft into lp:launchpad.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~stevenk/launchpad/kill-cruft/+merge/70238
The basis of this branch was the death of things that make use of IDistribution.guessPublishedSourcePackageName(). It sort of snowballed from there.
--
https://code.launchpad.net/~stevenk/launchpad/kill-cruft/+merge/70238
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~stevenk/launchpad/kill-cruft into lp:launchpad.
=== removed file 'cronscripts/create-debwatches.py'
--- cronscripts/create-debwatches.py 2011-05-29 01:36:05 +0000
+++ cronscripts/create-debwatches.py 1970-01-01 00:00:00 +0000
@@ -1,102 +0,0 @@
-#!/usr/bin/python -S
-#
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-# pylint: disable-msg=C0103,W0403
-
-# This script aims to ensure that there is a Malone watch on Debian bugs
-# that meet certain criteria. The Malone watch will be linked to a BugTask
-# on Debian for that bug. The business of syncing is handled separately.
-
-__metaclass__ = type
-
-import _pythonpath
-import os
-import logging
-
-# zope bits
-from zope.component import getUtility
-
-# canonical launchpad modules
-from lp.services.scripts.base import (
- LaunchpadCronScript, LaunchpadScriptFailure)
-from canonical.launchpad.scripts.debsync import do_import
-from lp.app.interfaces.launchpad import ILaunchpadCelebrities
-
-
-# setup core values and defaults
-debbugs_location_default = '/srv/bugs-mirror.debian.org/'
-debbugs_pl = '../lib/canonical/launchpad/scripts/debbugs-log.pl'
-
-# the minimum age, in days, of a debbugs bug before we will import it
-MIN_AGE = 7
-
-
-class CreateDebWatches(LaunchpadCronScript):
- description = """
- This script syncs debbugs from http://bugs.debian.org/ into Malone.
- It selects interesting bugs in debian and makes sure that there is a
- Malone bug for each of them. See debwatchsync for a tool that
- syncronises the bugs in Malone and debbugs, too.
- """
- loglevel = logging.WARNING
- def add_my_options(self):
- self.parser.set_defaults(max=None, debbugs=debbugs_location_default)
- self.parser.add_option('--debbugs', action='store', type='string',
- dest='debbugs',
- help="The location of your debbugs database.")
- self.parser.add_option(
- '--max', action='store', type='int', dest='max',
- help="The maximum number of bugs to create.")
- self.parser.add_option('--package', action='append', type='string',
- help="A list of packages for which we should import bugs.",
- dest="packages", default=[])
-
- def main(self):
- index_db_path = os.path.join(self.options.debbugs, 'index/index.db')
- if not os.path.exists(index_db_path):
- # make sure the debbugs location looks sane
- raise LaunchpadScriptFailure('%s is not a debbugs db.'
- % self.options.debbugs)
-
- # Make sure we import any Debian bugs specified on the command line
- target_bugs = set()
- for arg in self.args:
- try:
- target_bug = int(arg)
- except ValueError:
- self.logger.error(
- '%s is not a valid debian bug number.' % arg)
- target_bugs.add(target_bug)
-
- target_package_set = set()
- previousimportset = set()
-
- self.logger.info('Calculating target package set...')
-
- # first find all the published ubuntu packages
- ubuntu = getUtility(ILaunchpadCelebrities).ubuntu
- for p in ubuntu.currentrelease.getAllPublishedBinaries():
- target_package_set.add(
- p.binarypackagerelease.binarypackagename.name)
- # then add packages passed on the command line
- for package in self.options.packages:
- target_package_set.add(package)
- self.logger.info(
- '%d binary packages targeted.' % len(target_package_set))
-
- self.txn.abort()
- self.txn.begin()
- do_import(self.logger, self.options.max, self.options.debbugs,
- target_bugs, target_package_set, previousimportset, MIN_AGE,
- debbugs_pl)
- self.txn.commit()
-
- self.logger.info('Done!')
-
-
-if __name__ == '__main__':
- script = CreateDebWatches("debbugs-mkwatch")
- script.lock_and_run()
-
=== removed file 'cronscripts/update-debwatches.py'
--- cronscripts/update-debwatches.py 2011-06-15 15:11:43 +0000
+++ cronscripts/update-debwatches.py 1970-01-01 00:00:00 +0000
@@ -1,240 +0,0 @@
-#!/usr/bin/python -S
-#
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-# This script runs through the set of Debbugs watches, and tries to
-# syncronise each of those to the malone bug which is watching it.
-
-import _pythonpath
-import os
-import sys
-import email
-import logging
-
-# zope bits
-from zope.component import getUtility
-
-from canonical.database.constants import UTC_NOW
-from lp.app.errors import NotFoundError
-from lp.app.interfaces.launchpad import ILaunchpadCelebrities
-from lp.bugs.interfaces.bug import IBugSet
-from lp.bugs.interfaces.bugtask import (
- BugTaskSearchParams,
- IBugTaskSet,
- )
-from lp.bugs.interfaces.bugwatch import IBugWatchSet
-from lp.bugs.interfaces.cve import ICveSet
-from lp.bugs.scripts import debbugs
-from lp.services.scripts.base import (LaunchpadCronScript,
- LaunchpadScriptFailure)
-from lp.services.messages.interfaces.message import (
- InvalidEmailMessage,
- IMessageSet,
- )
-
-
-# setup core values and defaults
-debbugs_location_default = '/srv/bugs-mirror.debian.org/'
-
-
-class DebWatchUpdater(LaunchpadCronScript):
- loglevel = logging.WARNING
-
- def add_my_options(self):
- self.parser.add_option(
- '--max', action='store', type='int', dest='max',
- default=None, help="The maximum number of bugs to synchronise.")
- self.parser.add_option('--debbugs', action='store', type='string',
- dest='debbugs',
- default=debbugs_location_default,
- help="The location of your debbugs database.")
-
- def main(self):
- if not os.path.exists(
- os.path.join(self.options.debbugs, 'index/index.db')):
- raise LaunchpadScriptFailure('%s is not a debbugs db.'
- % self.options.debbugs)
-
- self.txn.begin()
- self.debbugs_db = debbugs.Database(self.options.debbugs)
- self.sync()
- self.txn.commit()
-
- self.logger.info('Done!')
-
- def sync(self):
- changedcounter = 0
-
- self.logger.info('Finding existing debbugs watches...')
- debbugs_tracker = getUtility(ILaunchpadCelebrities).debbugs
- debwatches = debbugs_tracker.watches
-
- previousimportset = set([b.remotebug for b in debwatches])
- self.logger.info(
- '%d debbugs previously imported.' % len(previousimportset))
-
- target_watches = [watch for watch in debwatches if watch.needscheck]
- self.logger.info(
- '%d debbugs watches to syncronise.' % len(target_watches))
-
- self.logger.info('Sorting bug watches...')
- target_watches.sort(key=lambda a: a.remotebug)
-
- self.logger.info('Syncing bug watches...')
- for watch in target_watches:
- if self.sync_watch(watch):
- changedcounter += 1
- self.txn.commit()
- if self.options.max:
- if changedcounter >= self.options.max:
- self.logger.info('Synchronised %d bugs!' % changedcounter)
- return
-
- def sync_watch(self, watch):
- # keep track of whether or not something changed
- waschanged = False
- # find the bug in malone
- malone_bug = watch.bug
- # find the bug in debbugs
- debian_bug = self.debbugs_db[int(watch.remotebug)]
- bugset = getUtility(IBugSet)
- bugtaskset = getUtility(IBugTaskSet)
- bugwatchset = getUtility(IBugWatchSet)
- messageset = getUtility(IMessageSet)
- debian = getUtility(ILaunchpadCelebrities).debian
- debbugs_tracker = getUtility(ILaunchpadCelebrities).debbugs
-
- # make sure we have tasks for all the debian package linkages, and
- # also make sure we have updated their status and severity
- # appropriately.
- for packagename in debian_bug.packagelist():
- try:
- srcpkgname = debian.guessPublishedSourcePackageName(
- packagename)
- except NotFoundError:
- self.logger.error(sys.exc_value)
- continue
- search_params = BugTaskSearchParams(user=None, bug=malone_bug,
- sourcepackagename=srcpkgname)
- search_params.setDistribution(debian)
- bugtasks = bugtaskset.search(search_params)
- if len(bugtasks) == 0:
- # we need a new task to link the bug to the debian package
- self.logger.info('Linking %d and debian %s' % (
- malone_bug.id, srcpkgname.name))
- # XXX: kiko 2007-02-03:
- # This code is completely untested and broken.
- bugtask = malone_bug.addTask(
- owner=malone_bug.owner, distribution=debian,
- sourcepackagename=srcpkgname)
- bugtask.bugwatch = watch
- waschanged = True
- else:
- assert len(bugtasks) == 1, 'Should only find a single task'
- bugtask = bugtasks[0]
- status = bugtask.status
- if status != bugtask.setStatusFromDebbugs(debian_bug.status):
- waschanged = True
- severity = bugtask.severity
- if severity != bugtask.setSeverityFromDebbugs(
- debian_bug.severity):
- waschanged = True
-
- known_msg_ids = set([msg.rfc822msgid for msg in malone_bug.messages])
-
- for raw_msg in debian_bug.comments:
-
- # parse it so we can extract the message id easily
- message = email.message_from_string(raw_msg)
-
- # see if we already have imported a message with this id for this
- # bug
- message_id = message['message-id']
- if message_id in known_msg_ids:
- # Skipping msg that is already imported
- continue
-
- # make sure this message is in the db
- msg = None
- try:
- msg = messageset.fromEmail(raw_msg, parsed_message=message,
- create_missing_persons=True)
- except InvalidEmailMessage:
- self.logger.error('Invalid email: %s' % sys.exc_value)
- if msg is None:
- continue
-
- # Create the link between the bug and this message.
- malone_bug.linkMessage(msg)
-
- # ok, this is a new message for this bug, so in effect something
- # has changed
- waschanged = True
-
- # now we need to analyse the message for useful data
- watches = bugwatchset.fromMessage(msg, malone_bug)
- for watch in watches:
- self.logger.info(
- 'New watch for #%s on %s' % (watch.bug.id, watch.url))
- waschanged = True
-
- # and also for CVE ref clues
- prior_cves = set(malone_bug.cves)
- cveset = getUtility(ICveSet)
- cves = cveset.inMessage(msg)
- for cve in cves:
- malone_bug.linkCVE(cve)
- if cve not in prior_cves:
- self.logger.info('CVE-%s (%s) found for Malone #%s' % (
- cve.sequence, cve.status.name, malone_bug.id))
-
- # now we know about this message for this bug
- known_msg_ids.add(message_id)
-
- # and best we commit, so that we can see the email that the
- # librarian has created in the db
- self.txn.commit()
-
- # Mark all merged bugs as duplicates of the lowest-numbered bug
- if (len(debian_bug.mergedwith) > 0 and
- min(debian_bug.mergedwith) > debian_bug.id):
- for merged_id in debian_bug.mergedwith:
- merged_bug = bugset.queryByRemoteBug(
- debbugs_tracker, merged_id)
- if merged_bug is not None:
- # Bug has been imported already
- if merged_bug.duplicateof == malone_bug:
- # we already know about this
- continue
- elif merged_bug.duplicateof is not None:
- # Interesting, we think it's a dup of something else
- self.logger.warning(
- 'Debbugs thinks #%d is a dup of #%d' % (
- merged_bug.id, merged_bug.duplicateof))
- continue
- # Go ahead and merge it
- self.logger.info(
- "Malone #%d is a duplicate of Malone #%d" % (
- merged_bug.id, malone_bug.id))
- merged_bug.duplicateof = malone_bug.id
-
- # the dup status has changed
- waschanged = True
-
- # make a note of the remote watch status, if it has changed
- if watch.remotestatus != debian_bug.status:
- watch.remotestatus = debian_bug.status
- waschanged = True
-
- # update the watch date details
- watch.lastchecked = UTC_NOW
- if waschanged:
- watch.lastchanged = UTC_NOW
- self.logger.info('Watch on Malone #%d changed.' % watch.bug.id)
- return waschanged
-
-
-if __name__ == '__main__':
- script = DebWatchUpdater('launchpad-debbugs-sync')
- script.lock_and_run()
=== removed file 'lib/canonical/launchpad/scripts/debsync.py'
--- lib/canonical/launchpad/scripts/debsync.py 2011-06-15 15:11:43 +0000
+++ lib/canonical/launchpad/scripts/debsync.py 1970-01-01 00:00:00 +0000
@@ -1,201 +0,0 @@
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Functions related to the import of Debbugs bugs into Malone."""
-
-__all__ = [
- 'bug_filter',
- 'do_import',
- 'import_bug',
- ]
-
-__metaclass__ = type
-
-import datetime
-import sys
-
-from zope.component import getUtility
-
-from canonical.database.sqlbase import flush_database_updates
-from lp.app.errors import NotFoundError
-from lp.app.interfaces.launchpad import ILaunchpadCelebrities
-from lp.bugs.interfaces.bug import (
- CreateBugParams,
- IBugSet,
- )
-from lp.bugs.interfaces.bugwatch import IBugWatchSet
-from lp.bugs.interfaces.cve import ICveSet
-from lp.bugs.scripts import debbugs
-from lp.services.encoding import guess as ensure_unicode
-from lp.services.messages.interfaces.message import (
- IMessageSet,
- InvalidEmailMessage,
- UnknownSender,
- )
-
-
-def bug_filter(bug, previous_import_set, target_bugs, target_package_set,
- minimum_age):
- """Function to choose which debian bugs will get processed by the sync
- script.
- """
- # don't re-import one that exists already
- if str(bug.id) in previous_import_set:
- return False
- # if we've been given a list, import only those
- if target_bugs:
- if bug.id in target_bugs:
- return True
- return False
- # we only want bugs in Sid
- if not bug.affects_unstable():
- return False
- # and we only want RC bugs
- #if not bug.is_release_critical():
- # return False
- # and we only want bugs that affect the packages we care about:
- if not bug.affects_package(target_package_set):
- return False
- # we will not import any dup bugs (any reason to?)
- if len(bug.mergedwith) > 0:
- return False
- # and we won't import any bug that is newer than one week, to give
- # debian some time to find dups
- if bug.date > datetime.datetime.now() - datetime.timedelta(minimum_age):
- return False
- return True
-
-
-def do_import(logger, max_imports, debbugs_location, target_bugs,
- target_package_set, previous_import_set, minimum_age, debbugs_pl):
-
- # figure out which bugs have been imported previously
- debbugs_tracker = getUtility(ILaunchpadCelebrities).debbugs
- for w in debbugs_tracker.watches:
- previous_import_set.add(w.remotebug)
- logger.info('%d debian bugs previously imported.' %
- len(previous_import_set))
-
- # find the new bugs to import
- logger.info('Selecting new debian bugs...')
- debbugs_db = debbugs.Database(debbugs_location, debbugs_pl)
- debian_bugs = []
- for debian_bug in debbugs_db:
- if bug_filter(debian_bug, previous_import_set, target_bugs,
- target_package_set, minimum_age):
- debian_bugs.append(debian_bug)
- logger.info('%d debian bugs ready to import.' % len(debian_bugs))
-
- # put them in ascending order
- logger.info('Sorting bugs...')
- debian_bugs.sort(lambda a, b: cmp(a.id, b.id))
-
- logger.info('Importing bugs...')
- newbugs = 0
- for debian_bug in debian_bugs:
- newbug = import_bug(debian_bug, logger)
- if newbug is True:
- newbugs += 1
- if max_imports:
- if newbugs >= max_imports:
- logger.info('Imported %d new bugs!' % newbugs)
- break
-
-
-def import_bug(debian_bug, logger):
- """Consider importing a debian bug, return True if you did."""
- bugset = getUtility(IBugSet)
- debbugs_tracker = getUtility(ILaunchpadCelebrities).debbugs
- malone_bug = bugset.queryByRemoteBug(debbugs_tracker, debian_bug.id)
- if malone_bug is not None:
- logger.error('Debbugs #%d was previously imported.' % debian_bug.id)
- return False
- # get the email which started it all
- try:
- email_txt = debian_bug.comments[0]
- except IndexError:
- logger.error('No initial mail for debian #%d' % debian_bug.id)
- return False
- except debbugs.LogParseFailed, e:
- logger.warning(e)
- return False
- msg = None
- messageset = getUtility(IMessageSet)
- debian = getUtility(ILaunchpadCelebrities).debian
- ubuntu = getUtility(ILaunchpadCelebrities).ubuntu
- try:
- msg = messageset.fromEmail(email_txt, distribution=debian,
- create_missing_persons=True)
- except UnknownSender:
- logger.error('Cannot create person for %s' % sys.exc_value)
- except InvalidEmailMessage:
- logger.error('Invalid email: %s' % sys.exc_value)
- if msg is None:
- logger.error('Failed to import debian #%d' % debian_bug.id)
- return False
-
- # get the bug details
- title = debian_bug.subject
- if not title:
- title = 'Debbugs #%d with no title' % debian_bug.id
- title = ensure_unicode(title)
- # debian_bug.package may have ,-separated package names, but
- # debian_bug.packagelist[0] is going to be a single package name for
- # sure. we work through the package list, try to find one we can
- # work with, otherwise give up
- srcpkg = pkgname = None
- for pkgname in debian_bug.packagelist():
- try:
- srcpkg = ubuntu.guessPublishedSourcePackageName(pkgname)
- except NotFoundError:
- logger.error(sys.exc_value)
- if srcpkg is None:
- # none of the package names gave us a source package we can use
- # XXX sabdfl 2005-09-16: Maybe this should just be connected to the
- # distro, and allowed to wait for re-assignment to a specific package?
- logger.error('Unable to find package details for %s' % (
- debian_bug.package))
- return False
- # sometimes debbugs has initial emails that contain the package name, we
- # can remove that
- if title.startswith(pkgname + ':'):
- title = title[len(pkgname) + 2:].strip()
- params = CreateBugParams(
- title=title, msg=msg, owner=msg.owner,
- datecreated=msg.datecreated)
- params.setBugTarget(distribution=debian, sourcepackagename=srcpkg)
- malone_bug = bugset.createBug(params)
- # create a debwatch for this bug
- thewatch = malone_bug.addWatch(debbugs_tracker, str(debian_bug.id),
- malone_bug.owner)
- thewatch.remotestatus = debian_bug.status
-
- # link the relevant task to this watch
- assert len(malone_bug.bugtasks) == 1, 'New bug should have only one task'
- task = malone_bug.bugtasks[0]
- task.bugwatch = thewatch
- task.setStatusFromDebbugs(debian_bug.status)
- task.setSeverityFromDebbugs(debian_bug.severity)
-
- # Let the world know about it!
- logger.info('%d/%s: %s: %s' % (
- debian_bug.id, malone_bug.id, debian_bug.package, title))
-
- # now we need to analyse the message for bugwatch clues
- bugwatchset = getUtility(IBugWatchSet)
- watches = bugwatchset.fromMessage(msg, malone_bug)
- for watch in watches:
- logger.info('New watch for %s on %s' % (watch.bug.id, watch.url))
-
- # and also for CVE ref clues
- cveset = getUtility(ICveSet)
- cves = cveset.inMessage(msg)
- prior_cves = malone_bug.cves
- for cve in cves:
- if cve not in prior_cves:
- malone_bug.linkCVE(cve)
- logger.info('CVE-%s (%s) found for Malone #%s' % (
- cve.sequence, cve.status.name, malone_bug.id))
-
- flush_database_updates()
- return True
=== removed file 'lib/lp/bugs/doc/bugzilla-import.txt'
--- lib/lp/bugs/doc/bugzilla-import.txt 2011-06-14 20:35:20 +0000
+++ lib/lp/bugs/doc/bugzilla-import.txt 1970-01-01 00:00:00 +0000
@@ -1,562 +0,0 @@
-Bugzilla Import
-===============
-
-The bugzilla import process makes use of a direct connection to the
-database. In order to aid in testing, all the database accesses are
-performed through a single class that can be replaced.
-
-We will start by defining a fake backend and some fake information for
-it to return:
-
- >>> from datetime import datetime
- >>> import pytz
- >>> UTC = pytz.timezone('UTC')
-
- >>> users = [
- ... ('test@xxxxxxxxxxxxx', 'Sample User'),
- ... ('foo.bar@xxxxxxxxxxxxx', 'Foo Bar'),
- ... ('new.user@xxxxxxxxxxxxx', 'New User') # <- not in Launchpad
- ... ]
-
- >>> buginfo = [
- ... (1, # bug_id
- ... 1, # assigned_to
- ... '', # bug_file_loc
- ... 'normal', # bug_severity
- ... 'NEW', # status
- ... datetime(2005, 4, 1, tzinfo=UTC), # creation
- ... 'Test bug 1', # short_desc,
- ... 'Linux', # op_sys
- ... 'P2', # priority
- ... 'Ubuntu', # product
- ... 'AMD64', # rep_platform
- ... 1, # reporter
- ... '---', # version
- ... 'mozilla-firefox', # component
- ... '', # resolution
- ... 'Ubuntu 5.10', # milestone
- ... 0, # qa_contact
- ... 'status', # status_whiteboard
- ... '', # keywords
- ... ''), # alias
- ... # A WONTFIX bug on a non-existant distro package
- ... (2, 1, 'http://www.ubuntu.com', 'enhancement', 'RESOLVED',
- ... datetime(2005, 4, 2, tzinfo=UTC), 'Test bug 2',
- ... 'Linux', 'P1', 'Ubuntu', 'i386', 2, '---', 'unknown',
- ... 'WONTFIX', '---', 0, '', '', ''),
- ... # An accepted bug:
- ... (3, 2, 'http://www.ubuntu.com', 'blocker', 'ASSIGNED',
- ... datetime(2005, 4, 3, tzinfo=UTC), 'Test bug 3',
- ... 'Linux', 'P1', 'Ubuntu', 'i386', 1, '---', 'netapplet',
- ... '', '---', 0, '', '', 'xyz'),
- ... # A fixed bug
- ... (4, 1, 'http://www.ubuntu.com', 'blocker', 'CLOSED',
- ... datetime(2005, 4, 4, tzinfo=UTC), 'Test bug 4',
- ... 'Linux', 'P1', 'Ubuntu', 'i386', 1, '---', 'mozilla-firefox',
- ... 'FIXED', '---', 0, '', '', 'FooBar'),
- ... # An UPSTREAM bug
- ... (5, 1,
- ... 'http://bugzilla.gnome.org/bugs/show_bug.cgi?id=273041',
- ... 'blocker', 'UPSTREAM',
- ... datetime(2005, 4, 4, tzinfo=UTC), 'Test bug 5',
- ... 'Linux', 'P1', 'Ubuntu', 'i386', 1, '---', 'evolution',
- ... '', '---', 0, '', '', 'deb1234'),
- ... ]
-
- >>> ccs = [[], [3], [], [], []]
-
- >>> comments = [
- ... [(1, datetime(2005, 4, 1, tzinfo=UTC), 'First comment'),
- ... (2, datetime(2005, 4, 1, 1, tzinfo=UTC), 'Second comment')],
- ... [(1, datetime(2005, 4, 2, tzinfo=UTC), 'First comment'),
- ... (2, datetime(2005, 4, 2, 1, tzinfo=UTC), 'Second comment')],
- ... [(2, datetime(2005, 4, 3, tzinfo=UTC), 'First comment'),
- ... (1, datetime(2005, 4, 3, 1, tzinfo=UTC),
- ... 'This is related to CVE-2005-1234'),
- ... (2, datetime(2005, 4, 3, 2, tzinfo=UTC),
- ... 'Created an attachment (id=1)')],
- ... [(1, datetime(2005, 4, 4, tzinfo=UTC), 'First comment')],
- ... [(1, datetime(2005, 4, 5, tzinfo=UTC), 'First comment')],
- ... ]
-
- >>> attachments = [
- ... [], [],
- ... [(1, datetime(2005, 4, 3, 2, tzinfo=UTC), 'An attachment',
- ... 'text/x-patch', True, 'foo.patch', 'the data', 2)],
- ... [], []
- ... ]
-
- >>> duplicates = [
- ... (1, 2),
- ... (3, 4),
- ... ]
-
- >>> class FakeBackend:
- ... def lookupUser(self, user_id):
- ... return users[user_id - 1]
- ... def getBugInfo(self, bug_id):
- ... return buginfo[bug_id - 1]
- ... def getBugCcs(self, bug_id):
- ... return ccs[bug_id - 1]
- ... def getBugComments(self, bug_id):
- ... return comments[bug_id - 1]
- ... def getBugAttachments(self, bug_id):
- ... return attachments[bug_id - 1]
- ... def getDuplicates(self):
- ... return duplicates
-
- >>> from itertools import chain
- >>> from zope.component import getUtility
- >>> from canonical.launchpad.ftests import login
- >>> from lp.app.interfaces.launchpad import ILaunchpadCelebrities
- >>> from lp.bugs.interfaces.bug import IBugSet
- >>> from lp.bugs.scripts import bugzilla
- >>> from lp.registry.interfaces.person import IPersonSet
-
-Get a reference to the Ubuntu bug tracker, and log in:
-
- >>> login('bug-importer@xxxxxxxxxxxxx')
- >>> bugtracker = getUtility(ILaunchpadCelebrities).ubuntu_bugzilla
-
-Now we create a bugzilla.Bugzilla instance to handle the import, using
-our fake backend data:
-
- >>> bz = bugzilla.Bugzilla(None)
- >>> bz.backend = FakeBackend()
-
-In order to verify that things get imported correctly, the following
-function will be used:
-
- >>> def bugInfo(bug):
- ... print 'Title:', bug.title
- ... print 'Reporter:', bug.owner.displayname
- ... print 'Created:', bug.datecreated
- ... if bug.name:
- ... print 'Nick: %s' % bug.name
- ... print 'Subscribers:'
- ... subscriber_names = sorted(
- ... p.displayname for p in chain(
- ... bug.getDirectSubscribers(),
- ... bug.getIndirectSubscribers()))
- ... for subscriber_name in subscriber_names:
- ... print ' %s' % subscriber_name
- ... for task in bug.bugtasks:
- ... print 'Task:', task.bugtargetdisplayname
- ... print ' Status:', task.status.name
- ... if task.product:
- ... print ' Product:', task.product.name
- ... if task.distribution:
- ... print ' Distro:', task.distribution.name
- ... if task.sourcepackagename:
- ... print ' Source package:', task.sourcepackagename.name
- ... if task.assignee:
- ... print ' Assignee:', task.assignee.displayname
- ... if task.importance:
- ... print ' Importance:', task.importance.name
- ... if task.statusexplanation:
- ... print ' Explanation:', task.statusexplanation
- ... if task.milestone:
- ... print ' Milestone:', task.milestone.name
- ... if task.bugwatch:
- ... print ' Watch:', task.bugwatch.url
- ... if bug.cves:
- ... print 'CVEs:'
- ... for cve in bug.cves:
- ... print ' %s' % cve.displayname
- ... print 'Messages:'
- ... for message in bug.messages:
- ... print ' Author:', message.owner.displayname
- ... print ' Date:', message.datecreated
- ... print ' Subject:', message.subject
- ... print ' %s' % message.text_contents
- ... print
- ... if bug.attachments.any():
- ... print 'Attachments:'
- ... for attachment in bug.attachments:
- ... print ' Title:', attachment.title
- ... print ' Type:', attachment.type.name
- ... print ' Name:', attachment.libraryfile.filename
- ... print ' Mime type:', attachment.libraryfile.mimetype
-
-
-Now we import bug #1 and check the results:
-
- >>> bug = bz.handleBug(1)
- >>> bugInfo(bug)
- Title: Test bug 1
- Reporter: Sample Person
- Created: 2005-04-01 00:00:00+00:00
- Subscribers:
- Foo Bar
- Sample Person
- Ubuntu Team
- Task: mozilla-firefox (Ubuntu)
- Status: NEW
- Distro: ubuntu
- Source package: mozilla-firefox
- Assignee: Sample Person
- Importance: MEDIUM
- Explanation: status (Bugzilla status=NEW, product=Ubuntu,
- component=mozilla-firefox)
- Milestone: ubuntu-5.10
- Messages:
- Author: Sample Person
- Date: 2005-04-01 00:00:00+00:00
- Subject: Test bug 1
- First comment
- <BLANKLINE>
- Author: Foo Bar
- Date: 2005-04-01 01:00:00+00:00
- Subject: Re: Test bug 1
- Second comment
- <BLANKLINE>
-
-As well as importing the bug, a bug watch is created, linking the new
-Launchpad bug to the original Bugzilla bug:
-
- >>> linked_bug = getUtility(IBugSet).queryByRemoteBug(bugtracker, 1)
- >>> linked_bug == bug
- True
-
-This bug watch link is used to prevent multiple imports of the same
-bug.
-
- >>> second_import = bz.handleBug(1)
- >>> bug == second_import
- True
-
-
-Next we try bug #2, which is assigned to a non-existant source
-package, so gets filed directly against the distribution. Some things
-to notice:
-
- * A Launchpad account is created for new.user@xxxxxxxxxxxxx as a side
- effect of the import, because they are subscribed to the bug.
- * The "RESOLVED WONTFIX" status is converted to a status of INVALID.
- * The fact that the "unknown" package does not exist in Ubuntu has
- been logged, along with the exception raised by
- guessPublishedSourcePackageName().
-
- >>> print getUtility(IPersonSet).getByEmail('new.user@xxxxxxxxxxxxx')
- None
- >>> bug = bz.handleBug(2)
- WARNING:lp.bugs.scripts.bugzilla:could not find package name for
- "unknown": 'Unknown package: unknown'
- >>> import transaction
- >>> transaction.commit()
-
- >>> bugInfo(bug)
- Title: Test bug 2
- Reporter: Foo Bar
- Created: 2005-04-02 00:00:00+00:00
- Subscribers:
- Foo Bar
- New User
- Sample Person
- Ubuntu Team
- Task: Ubuntu
- Status: INVALID
- Distro: ubuntu
- Assignee: Sample Person
- Importance: WISHLIST
- Explanation: Bugzilla status=RESOLVED WONTFIX, product=Ubuntu,
- component=unknown
- Messages:
- Author: Sample Person
- Date: 2005-04-02 00:00:00+00:00
- Subject: Test bug 2
- First comment
- <BLANKLINE>
- http://www.ubuntu.com
- <BLANKLINE>
- Author: Foo Bar
- Date: 2005-04-02 01:00:00+00:00
- Subject: Re: Test bug 2
- Second comment
- <BLANKLINE>
- >>> getUtility(IPersonSet).getByEmail('new.user@xxxxxxxxxxxxx')
- <Person at ...>
-
-
-Now import an ASSIGNED bug. Things to note about this import:
-
- * the second comment mentions a CVE, causing a link between the bug
- and CVE to be established.
- * The attachment on this bug is imported
-
- >>> bug = bz.handleBug(3)
- >>> bugInfo(bug)
- Title: Test bug 3
- Reporter: Sample Person
- Created: 2005-04-03 00:00:00+00:00
- Nick: xyz
- Subscribers:
- Foo Bar
- Sample Person
- Ubuntu Team
- Task: netapplet (Ubuntu)
- Status: CONFIRMED
- Distro: ubuntu
- Source package: netapplet
- Assignee: Foo Bar
- Importance: CRITICAL
- Explanation: Bugzilla status=ASSIGNED, product=Ubuntu,
- component=netapplet
- CVEs:
- CVE-2005-1234
- Messages:
- Author: Foo Bar
- Date: 2005-04-03 00:00:00+00:00
- Subject: Test bug 3
- First comment
- <BLANKLINE>
- http://www.ubuntu.com
- <BLANKLINE>
- Author: Sample Person
- Date: 2005-04-03 01:00:00+00:00
- Subject: Re: Test bug 3
- This is related to CVE-2005-1234
- <BLANKLINE>
- Author: Foo Bar
- Date: 2005-04-03 02:00:00+00:00
- Subject: Re: Test bug 3
- Created an attachment (id=1)
- <BLANKLINE>
- Attachments:
- Title: An attachment
- Type: PATCH
- Name: foo.patch
- Mime type: text/plain
-
-
-Next we import a fixed bug:
-
- >>> bug = bz.handleBug(4)
- >>> bugInfo(bug)
- Title: Test bug 4
- Reporter: Sample Person
- Created: 2005-04-04 00:00:00+00:00
- Nick: foobar
- Subscribers:
- Foo Bar
- Sample Person
- Ubuntu Team
- Task: mozilla-firefox (Ubuntu)
- Status: FIXRELEASED
- Distro: ubuntu
- Source package: mozilla-firefox
- Assignee: Sample Person
- Importance: CRITICAL
- Explanation: Bugzilla status=CLOSED FIXED, product=Ubuntu,
- component=mozilla-firefox
- Messages:
- Author: Sample Person
- Date: 2005-04-04 00:00:00+00:00
- Subject: Test bug 4
- First comment
- <BLANKLINE>
- http://www.ubuntu.com
- <BLANKLINE>
-
-
-The Ubuntu bugzilla uses the UPSTREAM state to categorise bugs that
-have been forwarded on to the upstream developers. Usually the
-upstream bug tracker URL is included in the URL field of the bug.
-
-The Malone equivalent of this is to create a second task on the bug,
-and attach a watch to the upstream bug tracker:
-
- # Make sane data to play this test.
- >>> from zope.component import getUtility
- >>> from lp.registry.interfaces.distribution import IDistributionSet
- >>> debian = getUtility(IDistributionSet).getByName('debian')
- >>> evolution_dsp = debian.getSourcePackage('evolution')
- >>> ignore = factory.makeSourcePackagePublishingHistory(
- ... distroseries=debian.currentseries,
- ... sourcepackagename=evolution_dsp.sourcepackagename)
- >>> transaction.commit()
-
- >>> bug = bz.handleBug(5)
- >>> bugInfo(bug)
- Title: Test bug 5
- Reporter: Sample Person
- Created: 2005-04-04 00:00:00+00:00
- Subscribers:
- Sample Person
- Ubuntu Team
- Task: Evolution
- Status: NEW
- Product: evolution
- Importance: UNDECIDED
- Watch: http://bugzilla.gnome.org/bugs/show_bug.cgi?id=273041
- Task: evolution (Ubuntu)
- Status: NEW
- Distro: ubuntu
- Source package: evolution
- Assignee: Sample Person
- Importance: CRITICAL
- Explanation: Bugzilla status=UPSTREAM, product=Ubuntu,
- component=evolution
- Task: evolution (Debian)
- Status: NEW
- Distro: debian
- Source package: evolution
- Importance: UNDECIDED
- Watch: http://bugs.debian.org/cgi-bin/bugreport.cgi?bug=1234
- Messages:
- Author: Sample Person
- Date: 2005-04-05 00:00:00+00:00
- Subject: Test bug 5
- First comment
- <BLANKLINE>
- http://bugzilla.gnome.org/bugs/show_bug.cgi?id=273041
- <BLANKLINE>
-
-XXX mpt 20060404: In sampledata Evolution uses Malone officially, so adding
-a watch to its external bug tracker is a bad example.
-
-
-Severity Mapping
-----------------
-
-Bugzilla severities are mapped to the equivalent Launchpad importance values:
-
- >>> bug = bugzilla.Bug(bz.backend, 1)
- >>> class FakeBugTask:
- ... def transitionToStatus(self, status, user):
- ... self.status = status
- ... def transitionToImportance(self, importance, user):
- ... self.importance = importance
- >>> bugtask = FakeBugTask()
- >>> for severity in ['blocker', 'critical', 'major', 'normal',
- ... 'minor', 'trivial', 'enhancement']:
- ... bug.bug_severity = severity
- ... bug.mapSeverity(bugtask)
- ... print '%-11s %s' % (severity, bugtask.importance.name)
- blocker CRITICAL
- critical CRITICAL
- major HIGH
- normal MEDIUM
- minor LOW
- trivial LOW
- enhancement WISHLIST
-
-
-Status Mapping
---------------
-
- >>> for status in ['UNCONFIRMED', 'NEW', 'ASSIGNED', 'REOPENED',
- ... 'NEEDINFO', 'UPSTREAM', 'PENDINGUPLOAD',
- ... 'RESOLVED', 'VERIFIED', 'CLOSED']:
- ... bug.bug_status = status
- ... bugtask.statusexplanation = ''
- ... bug.mapStatus(bugtask)
- ... print '%-13s %s' % (status, bugtask.status.name)
- UNCONFIRMED NEW
- NEW NEW
- ASSIGNED CONFIRMED
- REOPENED NEW
- NEEDINFO INCOMPLETE
- UPSTREAM NEW
- PENDINGUPLOAD FIXCOMMITTED
- RESOLVED INVALID
- VERIFIED INVALID
- CLOSED INVALID
-
-(note that RESOLVED, VERIFIED and CLOSED have been mapped to INVALID
-here because the Bugzilla resolution is set to WONTFIX).
-
-
-If the bug has been resolved, the resolution will affect the status:
-
- >>> bug.priority = 'P2'
- >>> bug.bug_status = 'RESOLVED'
- >>> for resolution in ['FIXED', 'INVALID', 'WONTFIX', 'NOTABUG',
- ... 'NOTWARTY', 'UNIVERSE', 'LATER', 'REMIND',
- ... 'DUPLICATE', 'WORKSFORME', 'MOVED']:
- ... bug.resolution = resolution
- ... bugtask.statusexplanation = ''
- ... bug.mapStatus(bugtask)
- ... print '%-10s %s' % (resolution, bugtask.status.name)
- FIXED FIXRELEASED
- INVALID INVALID
- WONTFIX INVALID
- NOTABUG INVALID
- NOTWARTY INVALID
- UNIVERSE INVALID
- LATER INVALID
- REMIND INVALID
- DUPLICATE INVALID
- WORKSFORME INVALID
- MOVED INVALID
-
-
-Bug Target Mapping
-------------------
-
-The Bugzilla.getLaunchpadTarget() method is used to map bugzilla bugs
-to Launchpad bug targets. This is not general purpose logic: it only
-applies to the Ubuntu bugzilla.
-
-The current mapping only handles bugs filed under the "Ubuntu"
-product. If the component the bug is filed under is a known package
-name, the bug is targeted at that package in ubuntu. If it isn't,
-then the bug is filed directly against the distribution.
-
- >>> def showMapping(product, component):
- ... bug.product = product
- ... bug.component = component
- ... target = bz.getLaunchpadBugTarget(bug)
- ... distribution = target.get('distribution')
- ... if distribution:
- ... print 'Distribution:', distribution.name
- ... spn = target.get('sourcepackagename')
- ... if spn:
- ... print 'Source package:', spn.name
- ... product = target.get('product')
- ... if product:
- ... print 'Product:', product.name
-
- >>> showMapping('Ubuntu', 'mozilla-firefox')
- Distribution: ubuntu
- Source package: mozilla-firefox
-
- >>> showMapping('Ubuntu', 'netapplet')
- Distribution: ubuntu
- Source package: netapplet
-
- >>> showMapping('Ubuntu', 'unknown-package-name')
- WARNING:lp.bugs.scripts.bugzilla:could not find package name for
- "unknown-package-name": 'Unknown package: unknown-package-name'
- Distribution: ubuntu
-
- >>> showMapping('not-Ubuntu', 'general')
- Traceback (most recent call last):
- ...
- AssertionError: product must be Ubuntu
-
-
-Duplicate Bug Handling
-----------------------
-
-The Bugzilla duplicate bugs table can be used to mark the
-corresponding Launchpad bugs as duplicates too:
-
- >>> from lp.testing.faketransaction import FakeTransaction
- >>> bz.processDuplicates(FakeTransaction())
-
-Now check that the bugs have been marked duplicate:
-
- >>> bug1 = getUtility(IBugSet).queryByRemoteBug(bugtracker, 1)
- >>> bug2 = getUtility(IBugSet).queryByRemoteBug(bugtracker, 2)
- >>> bug3 = getUtility(IBugSet).queryByRemoteBug(bugtracker, 3)
- >>> bug4 = getUtility(IBugSet).queryByRemoteBug(bugtracker, 4)
-
- >>> print bug1.duplicateof
- None
- >>> bug2.duplicateof == bug1
- True
- >>> bug3.duplicateof == None
- True
- >>> bug4.duplicateof == bug3
- True
=== removed file 'lib/lp/bugs/scripts/bugzilla.py'
--- lib/lp/bugs/scripts/bugzilla.py 2011-08-02 01:17:15 +0000
+++ lib/lp/bugs/scripts/bugzilla.py 1970-01-01 00:00:00 +0000
@@ -1,687 +0,0 @@
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Bugzilla to Launchpad import logic"""
-
-
-# Bugzilla schema:
-# http://lxr.mozilla.org/mozilla/source/webtools/bugzilla/Bugzilla/DB/Schema.pm
-
-# XXX: jamesh 2005-10-18
-# Currently unhandled bug info:
-# * Operating system and platform
-# * version (not really used in Ubuntu bugzilla though)
-# * keywords
-# * private bugs (none of the canonical-only bugs seem sensitive though)
-# * bug dependencies
-# * "bug XYZ" references inside comment text (at the moment we just
-# insert the full URL to the bug afterwards).
-#
-# Not all of these are necessary though
-
-__metaclass__ = type
-
-from cStringIO import StringIO
-import datetime
-import logging
-import re
-
-import pytz
-from storm.store import Store
-from zope.component import getUtility
-
-from canonical.launchpad.interfaces.emailaddress import IEmailAddressSet
-from canonical.launchpad.interfaces.librarian import ILibraryFileAliasSet
-from canonical.launchpad.webapp import canonical_url
-from lp.app.errors import NotFoundError
-from lp.app.interfaces.launchpad import ILaunchpadCelebrities
-from lp.bugs.interfaces.bug import (
- CreateBugParams,
- IBugSet,
- )
-from lp.bugs.interfaces.bugattachment import (
- BugAttachmentType,
- IBugAttachmentSet,
- )
-from lp.bugs.interfaces.bugtask import (
- BugTaskImportance,
- BugTaskStatus,
- IBugTaskSet,
- )
-from lp.bugs.interfaces.bugwatch import IBugWatchSet
-from lp.bugs.interfaces.cve import ICveSet
-from lp.registry.interfaces.person import (
- IPersonSet,
- PersonCreationRationale,
- )
-from lp.services.messages.interfaces.message import IMessageSet
-
-
-logger = logging.getLogger('lp.bugs.scripts.bugzilla')
-
-
-def _add_tz(dt):
- """Convert a naiive datetime value to a UTC datetime value."""
- assert dt.tzinfo is None, 'add_tz() only accepts naiive datetime values'
- return datetime.datetime(dt.year, dt.month, dt.day,
- dt.hour, dt.minute, dt.second,
- dt.microsecond, tzinfo=pytz.timezone('UTC'))
-
-
-class BugzillaBackend:
- """A wrapper for all the MySQL database access.
-
- The main purpose of this is to make it possible to test the rest
- of the import code without access to a MySQL database.
- """
- def __init__(self, conn, charset='UTF-8'):
- self.conn = conn
- self.cursor = conn.cursor()
- self.charset = charset
-
- def _decode(self, s):
- if s is not None:
- value = s.decode(self.charset, 'replace')
- # Postgres doesn't like values outside of the basic multilingual
- # plane (U+0000 - U+FFFF), so replace them (and surrogates) with
- # U+FFFD (replacement character).
- # Existance of these characters generally indicate an encoding
- # problem in the existing Bugzilla data.
- return re.sub(u'[^\u0000-\ud7ff\ue000-\uffff]', u'\ufffd', value)
- else:
- return None
-
- def lookupUser(self, user_id):
- """Look up information about a particular Bugzilla user ID"""
- self.cursor.execute('SELECT login_name, realname '
- ' FROM profiles '
- ' WHERE userid = %d' % user_id)
- if self.cursor.rowcount != 1:
- raise NotFoundError('could not look up user %d' % user_id)
- (login_name, realname) = self.cursor.fetchone()
- realname = self._decode(realname)
- return (login_name, realname)
-
- def getBugInfo(self, bug_id):
- """Retrieve information about a bug."""
- self.cursor.execute(
- 'SELECT bug_id, assigned_to, bug_file_loc, bug_severity, '
- ' bug_status, creation_ts, short_desc, op_sys, priority, '
- ' products.name, rep_platform, reporter, version, '
- ' components.name, resolution, target_milestone, qa_contact, '
- ' status_whiteboard, keywords, alias '
- ' FROM bugs '
- ' INNER JOIN products ON bugs.product_id = products.id '
- ' INNER JOIN components ON bugs.component_id = components.id '
- ' WHERE bug_id = %d' % bug_id)
- if self.cursor.rowcount != 1:
- raise NotFoundError('could not look up bug %d' % bug_id)
- (bug_id, assigned_to, bug_file_loc, bug_severity, bug_status,
- creation_ts, short_desc, op_sys, priority, product,
- rep_platform, reporter, version, component, resolution,
- target_milestone, qa_contact, status_whiteboard, keywords,
- alias) = self.cursor.fetchone()
-
- bug_file_loc = self._decode(bug_file_loc)
- creation_ts = _add_tz(creation_ts)
- product = self._decode(product)
- version = self._decode(version)
- component = self._decode(component)
- status_whiteboard = self._decode(status_whiteboard)
- keywords = self._decode(keywords)
- alias = self._decode(alias)
-
- return (bug_id, assigned_to, bug_file_loc, bug_severity,
- bug_status, creation_ts, short_desc, op_sys, priority,
- product, rep_platform, reporter, version, component,
- resolution, target_milestone, qa_contact,
- status_whiteboard, keywords, alias)
-
- def getBugCcs(self, bug_id):
- """Get the IDs of the people CC'd to the bug."""
- self.cursor.execute('SELECT who FROM cc WHERE bug_id = %d'
- % bug_id)
- return [row[0] for row in self.cursor.fetchall()]
-
- def getBugComments(self, bug_id):
- """Get the comments for the bug."""
- self.cursor.execute('SELECT who, bug_when, thetext '
- ' FROM longdescs '
- ' WHERE bug_id = %d '
- ' ORDER BY bug_when' % bug_id)
- # XXX: jamesh 2005-12-07:
- # Due to a bug in Debzilla, Ubuntu bugzilla bug 248 has > 7800
- # duplicate comments,consisting of someone's signature.
- # For the import, just ignore those comments.
- return [(who, _add_tz(when), self._decode(thetext))
- for (who, when, thetext) in self.cursor.fetchall()
- if thetext != '\n--=20\n Jacobo Tarr=EDo | '
- 'http://jacobo.tarrio.org/\n\n\n']
-
- def getBugAttachments(self, bug_id):
- """Get the attachments for the bug."""
- self.cursor.execute('SELECT attach_id, creation_ts, description, '
- ' mimetype, ispatch, filename, thedata, '
- ' submitter_id '
- ' FROM attachments '
- ' WHERE bug_id = %d '
- ' ORDER BY attach_id' % bug_id)
- return [(attach_id, _add_tz(creation_ts),
- self._decode(description), mimetype,
- ispatch, self._decode(filename), thedata, submitter_id)
- for (attach_id, creation_ts, description,
- mimetype, ispatch, filename, thedata,
- submitter_id) in self.cursor.fetchall()]
-
- def findBugs(self, product=None, component=None, status=None):
- """Returns the requested bug IDs as a list"""
- if product is None:
- product = []
- if component is None:
- component = []
- if status is None:
- status = []
- joins = []
- conditions = []
- if product:
- joins.append(
- 'INNER JOIN products ON bugs.product_id = products.id')
- conditions.append('products.name IN (%s)' %
- ', '.join([self.conn.escape(p) for p in product]))
- if component:
- joins.append(
- 'INNER JOIN components ON bugs.component_id = components.id')
- conditions.append('components.name IN (%s)' %
- ', '.join([self.conn.escape(c) for c in component]))
- if status:
- conditions.append('bugs.bug_status IN (%s)' %
- ', '.join([self.conn.escape(s) for s in status]))
- if conditions:
- conditions = 'WHERE %s' % ' AND '.join(conditions)
- else:
- conditions = ''
- self.cursor.execute('SELECT bug_id FROM bugs %s %s ORDER BY bug_id' %
- (' '.join(joins), conditions))
- return [bug_id for (bug_id,) in self.cursor.fetchall()]
-
- def getDuplicates(self):
- """Returns a list of (dupe_of, dupe) relations."""
- self.cursor.execute('SELECT dupe_of, dupe FROM duplicates '
- 'ORDER BY dupe, dupe_of')
- return [(dupe_of, dupe) for (dupe_of, dupe) in self.cursor.fetchall()]
-
-
-class Bug:
- """Representation of a Bugzilla Bug"""
- def __init__(self, backend, bug_id):
- self.backend = backend
- (self.bug_id, self.assigned_to, self.bug_file_loc, self.bug_severity,
- self.bug_status, self.creation_ts, self.short_desc, self.op_sys,
- self.priority, self.product, self.rep_platform, self.reporter,
- self.version, self.component, self.resolution,
- self.target_milestone, self.qa_contact, self.status_whiteboard,
- self.keywords, self.alias) = backend.getBugInfo(bug_id)
-
- self._ccs = None
- self._comments = None
- self._attachments = None
-
- @property
- def ccs(self):
- """Return the IDs of people CC'd to this bug"""
- if self._ccs is not None:
- return self._ccs
- self._ccs = self.backend.getBugCcs(self.bug_id)
- return self._ccs
-
- @property
- def comments(self):
- """Return the comments attached to this bug"""
- if self._comments is not None:
- return self._comments
- self._comments = self.backend.getBugComments(self.bug_id)
- return self._comments
-
- @property
- def attachments(self):
- """Return the attachments for this bug"""
- if self._attachments is not None:
- return self._attachments
- self._attachments = self.backend.getBugAttachments(self.bug_id)
- return self._attachments
-
- def mapSeverity(self, bugtask):
- """Set a Launchpad bug task's importance based on this bug's severity.
- """
- bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
- importance_map = {
- 'blocker': BugTaskImportance.CRITICAL,
- 'critical': BugTaskImportance.CRITICAL,
- 'major': BugTaskImportance.HIGH,
- 'normal': BugTaskImportance.MEDIUM,
- 'minor': BugTaskImportance.LOW,
- 'trivial': BugTaskImportance.LOW,
- 'enhancement': BugTaskImportance.WISHLIST
- }
- importance = importance_map.get(
- self.bug_severity, BugTaskImportance.UNKNOWN)
- bugtask.transitionToImportance(importance, bug_importer)
-
- def mapStatus(self, bugtask):
- """Set a Launchpad bug task's status based on this bug's status.
-
- If the bug is in the RESOLVED, VERIFIED or CLOSED states, the
- bug resolution is also taken into account when mapping the
- status.
-
- Additional information about the bugzilla status is appended
- to the bug task's status explanation.
- """
- bug_importer = getUtility(ILaunchpadCelebrities).bug_importer
-
- if self.bug_status == 'ASSIGNED':
- bugtask.transitionToStatus(
- BugTaskStatus.CONFIRMED, bug_importer)
- elif self.bug_status == 'NEEDINFO':
- bugtask.transitionToStatus(
- BugTaskStatus.INCOMPLETE, bug_importer)
- elif self.bug_status == 'PENDINGUPLOAD':
- bugtask.transitionToStatus(
- BugTaskStatus.FIXCOMMITTED, bug_importer)
- elif self.bug_status in ['RESOLVED', 'VERIFIED', 'CLOSED']:
- # depends on the resolution:
- if self.resolution == 'FIXED':
- bugtask.transitionToStatus(
- BugTaskStatus.FIXRELEASED, bug_importer)
- else:
- bugtask.transitionToStatus(
- BugTaskStatus.INVALID, bug_importer)
- else:
- bugtask.transitionToStatus(
- BugTaskStatus.NEW, bug_importer)
-
- # add the status to the notes section, to account for any lost
- # information
- bugzilla_status = 'Bugzilla status=%s' % self.bug_status
- if self.resolution:
- bugzilla_status += ' %s' % self.resolution
- bugzilla_status += ', product=%s' % self.product
- bugzilla_status += ', component=%s' % self.component
-
- if bugtask.statusexplanation:
- bugtask.statusexplanation = '%s (%s)' % (
- bugtask.statusexplanation, bugzilla_status)
- else:
- bugtask.statusexplanation = bugzilla_status
-
-
-class Bugzilla:
- """Representation of a bugzilla instance"""
-
- def __init__(self, conn):
- if conn is not None:
- self.backend = BugzillaBackend(conn)
- else:
- self.backend = None
- self.ubuntu = getUtility(ILaunchpadCelebrities).ubuntu
- self.debian = getUtility(ILaunchpadCelebrities).debian
- self.bugtracker = getUtility(ILaunchpadCelebrities).ubuntu_bugzilla
- self.debbugs = getUtility(ILaunchpadCelebrities).debbugs
- self.bugset = getUtility(IBugSet)
- self.bugtaskset = getUtility(IBugTaskSet)
- self.bugwatchset = getUtility(IBugWatchSet)
- self.cveset = getUtility(ICveSet)
- self.personset = getUtility(IPersonSet)
- self.emailset = getUtility(IEmailAddressSet)
- self.person_mapping = {}
-
- def person(self, bugzilla_id):
- """Get the Launchpad person corresponding to the given Bugzilla ID"""
- # Bugzilla treats a user ID of 0 as a NULL
- if bugzilla_id == 0:
- return None
-
- # Try and get the person using a cache of the mapping. We
- # check to make sure the person still exists and has not been
- # merged.
- person = None
- launchpad_id = self.person_mapping.get(bugzilla_id)
- if launchpad_id is not None:
- person = self.personset.get(launchpad_id)
- if person is not None and person.merged is not None:
- person = None
-
- # look up the person
- if person is None:
- email, displayname = self.backend.lookupUser(bugzilla_id)
-
- person = self.personset.ensurePerson(
- email, displayname, PersonCreationRationale.BUGIMPORT,
- comment=('when importing bugs from %s'
- % self.bugtracker.baseurl))
-
- # Bugzilla performs similar address checks to Launchpad, so
- # if the Launchpad account has no preferred email, use the
- # Bugzilla one.
- emailaddr = self.emailset.getByEmail(email)
- assert emailaddr is not None
- if person.preferredemail != emailaddr:
- person.validateAndEnsurePreferredEmail(emailaddr)
-
- self.person_mapping[bugzilla_id] = person.id
-
- return person
-
- def _getPackageName(self, bug):
- """Returns the source package name for the given bug."""
- # we currently only support mapping Ubuntu bugs ...
- if bug.product != 'Ubuntu':
- raise AssertionError('product must be Ubuntu')
-
- # kernel bugs are currently filed against the "linux"
- # component, which is not a source or binary package. The
- # following mapping was provided by BenC:
- if bug.component == 'linux':
- cutoffdate = datetime.datetime(2004, 12, 1,
- tzinfo=pytz.timezone('UTC'))
- if bug.bug_status == 'NEEDINFO' and bug.creation_ts < cutoffdate:
- pkgname = 'linux-source-2.6.12'
- else:
- pkgname = 'linux-source-2.6.15'
- else:
- pkgname = bug.component.encode('ASCII')
-
- try:
- return self.ubuntu.guessPublishedSourcePackageName(pkgname)
- except NotFoundError, e:
- logger.warning('could not find package name for "%s": %s',
- pkgname, str(e))
- return None
-
- def getLaunchpadBugTarget(self, bug):
- """Returns a dictionary of arguments to createBug() that correspond
- to the given bugzilla bug.
- """
- srcpkg = self._getPackageName(bug)
- return {
- 'distribution': self.ubuntu,
- 'sourcepackagename': srcpkg,
- }
-
- def getLaunchpadMilestone(self, bug):
- """Return the Launchpad milestone for a Bugzilla bug.
-
- If the milestone does not exist, then it is created.
- """
- if bug.product != 'Ubuntu':
- raise AssertionError('product must be Ubuntu')
-
- # Bugzilla uses a value of "---" to represent "no selected Milestone"
- # Launchpad represents this by setting the milestone column to NULL.
- if bug.target_milestone is None or bug.target_milestone == '---':
- return None
-
- # generate a Launchpad name from the Milestone name:
- name = re.sub(r'[^a-z0-9\+\.\-]', '-', bug.target_milestone.lower())
-
- milestone = self.ubuntu.getMilestone(name)
- if milestone is None:
- milestone = self.ubuntu.currentseries.newMilestone(name)
- Store.of(milestone).flush()
- return milestone
-
- def getLaunchpadUpstreamProduct(self, bug):
- """Find the upstream product for the given Bugzilla bug.
-
- This function relies on the package -> product linkage having been
- entered in advance.
- """
- srcpkgname = self._getPackageName(bug)
- # find a product series
- series = None
- for series in self.ubuntu.series:
- srcpkg = series.getSourcePackage(srcpkgname)
- if srcpkg:
- series = srcpkg.productseries
- if series:
- return series.product
- else:
- logger.warning('could not find upstream product for '
- 'source package "%s"', srcpkgname.name)
- return None
-
- _bug_re = re.compile('bug\s*#?\s*(?P<id>\d+)', re.IGNORECASE)
-
- def replaceBugRef(self, match):
- # XXX: jamesh 2005-10-24:
- # this is where bug number rewriting would be plugged in
- bug_id = int(match.group('id'))
- url = '%s/%d' % (canonical_url(self.bugtracker), bug_id)
- return '%s [%s]' % (match.group(0), url)
-
- def handleBug(self, bug_id):
- """Maybe import a single bug.
-
- If the bug has already been imported (detected by checking for
- a bug watch), it is skipped.
- """
- logger.info('Handling Bugzilla bug %d', bug_id)
-
- # is there a bug watch on the bug?
- lp_bug = self.bugset.queryByRemoteBug(self.bugtracker, bug_id)
-
- # if we already have an associated bug, don't add a new one.
- if lp_bug is not None:
- logger.info('Bugzilla bug %d is already being watched by '
- 'Launchpad bug %d', bug_id, lp_bug.id)
- return lp_bug
-
- bug = Bug(self.backend, bug_id)
-
- comments = bug.comments[:]
-
- # create a message for the initial comment:
- msgset = getUtility(IMessageSet)
- who, when, text = comments.pop(0)
- text = self._bug_re.sub(self.replaceBugRef, text)
- # If a URL is associated with the bug, add it to the description:
- if bug.bug_file_loc:
- text = text + '\n\n' + bug.bug_file_loc
- # the initial comment can't be empty:
- if not text.strip():
- text = '<empty comment>'
- msg = msgset.fromText(bug.short_desc, text, self.person(who), when)
-
- # create the bug
- target = self.getLaunchpadBugTarget(bug)
- params = CreateBugParams(
- msg=msg, datecreated=bug.creation_ts, title=bug.short_desc,
- owner=self.person(bug.reporter))
- params.setBugTarget(**target)
- lp_bug = self.bugset.createBug(params)
-
- # add the bug watch:
- lp_bug.addWatch(self.bugtracker, str(bug.bug_id), lp_bug.owner)
-
- # add remaining comments, and add CVEs found in all text
- lp_bug.findCvesInText(text, lp_bug.owner)
- for (who, when, text) in comments:
- text = self._bug_re.sub(self.replaceBugRef, text)
- msg = msgset.fromText(msg.followup_title, text,
- self.person(who), when)
- lp_bug.linkMessage(msg)
-
- # subscribe QA contact and CC's
- if bug.qa_contact:
- lp_bug.subscribe(
- self.person(bug.qa_contact), self.person(bug.reporter))
- for cc in bug.ccs:
- lp_bug.subscribe(self.person(cc), self.person(bug.reporter))
-
- # translate bugzilla status and severity to LP equivalents
- task = lp_bug.bugtasks[0]
- task.datecreated = bug.creation_ts
- task.transitionToAssignee(self.person(bug.assigned_to))
- task.statusexplanation = bug.status_whiteboard
- bug.mapSeverity(task)
- bug.mapStatus(task)
-
- # bugs with an alias of the form "deb1234" have been imported
- # from the Debian bug tracker by the "debzilla" program. For
- # these bugs, generate a task and watch on the corresponding
- # bugs.debian.org bug.
- if bug.alias:
- if re.match(r'^deb\d+$', bug.alias):
- watch = self.bugwatchset.createBugWatch(
- lp_bug, lp_bug.owner, self.debbugs, bug.alias[3:])
- debtarget = self.debian
- if target['sourcepackagename']:
- debtarget = debtarget.getSourcePackage(
- target['sourcepackagename'])
- debtask = self.bugtaskset.createTask(
- lp_bug, lp_bug.owner, debtarget)
- debtask.datecreated = bug.creation_ts
- debtask.bugwatch = watch
- else:
- # generate a Launchpad name from the alias:
- name = re.sub(r'[^a-z0-9\+\.\-]', '-', bug.alias.lower())
- lp_bug.name = name
-
- # for UPSTREAM bugs, try to find whether the URL field contains
- # a bug reference.
- if bug.bug_status == 'UPSTREAM':
- # see if the URL field contains a bug tracker reference
- watches = self.bugwatchset.fromText(bug.bug_file_loc,
- lp_bug, lp_bug.owner)
- # find the upstream product for this bug
- product = self.getLaunchpadUpstreamProduct(bug)
-
- # if we created a watch, and there is an upstream product,
- # create a new task and link it to the watch.
- if len(watches) > 0:
- if product:
- upstreamtask = self.bugtaskset.createTask(
- lp_bug, lp_bug.owner, product)
- upstreamtask.datecreated = bug.creation_ts
- upstreamtask.bugwatch = watches[0]
- else:
- logger.warning('Could not find upstream product to link '
- 'bug %d to', lp_bug.id)
-
- # translate milestone linkage
- task.milestone = self.getLaunchpadMilestone(bug)
-
- # import attachments
- for (attach_id, creation_ts, description, mimetype, ispatch,
- filename, thedata, submitter_id) in bug.attachments:
- # if the filename is missing for some reason, use a generic one.
- if filename is None or filename.strip() == '':
- filename = 'untitled'
- logger.debug('Creating attachment %s for bug %d',
- filename, bug.bug_id)
- if ispatch:
- attach_type = BugAttachmentType.PATCH
- mimetype = 'text/plain'
- else:
- attach_type = BugAttachmentType.UNSPECIFIED
-
- # look for a message starting with "Created an attachment (id=NN)"
- for msg in lp_bug.messages:
- if msg.text_contents.startswith(
- 'Created an attachment (id=%d)' % attach_id):
- break
- else:
- # could not find the add message, so create one:
- msg = msgset.fromText(description,
- 'Created attachment %s' % filename,
- self.person(submitter_id),
- creation_ts)
- lp_bug.linkMessage(msg)
-
- filealias = getUtility(ILibraryFileAliasSet).create(
- name=filename,
- size=len(thedata),
- file=StringIO(thedata),
- contentType=mimetype)
-
- getUtility(IBugAttachmentSet).create(
- bug=lp_bug, filealias=filealias, attach_type=attach_type,
- title=description, message=msg)
-
- return lp_bug
-
- def processDuplicates(self, trans):
- """Mark Launchpad bugs as duplicates based on Bugzilla duplicates.
-
- Launchpad bug A will be marked as a duplicate of bug B if:
- * bug A watches bugzilla bug A'
- * bug B watches bugzilla bug B'
- * bug A' is a duplicate of bug B'
- * bug A is not currently a duplicate of any other bug.
- """
-
- logger.info('Processing duplicate bugs')
- bugmap = {}
-
- def getlpbug(bugid):
- """Get the Launchpad bug corresponding to the given remote ID
-
- This function makes use of a cache dictionary to reduce the
- number of lookups.
- """
- lpbugid = bugmap.get(bugid)
- if lpbugid is not None:
- if lpbugid != 0:
- lpbug = self.bugset.get(lpbugid)
- else:
- lpbug = None
- else:
- lpbug = self.bugset.queryByRemoteBug(self.bugtracker, bugid)
- if lpbug is not None:
- bugmap[bugid] = lpbug.id
- else:
- bugmap[bugid] = 0
- return lpbug
-
- for (dupe_of, dupe) in self.backend.getDuplicates():
- # get the Launchpad bugs corresponding to the two Bugzilla bugs:
- trans.begin()
- lpdupe_of = getlpbug(dupe_of)
- lpdupe = getlpbug(dupe)
- # if both bugs exist in Launchpad, and lpdupe is not already
- # a duplicate, mark it as a duplicate of lpdupe_of.
- if (lpdupe_of is not None and lpdupe is not None and
- lpdupe.duplicateof is None):
- logger.info('Marking %d as a duplicate of %d',
- lpdupe.id, lpdupe_of.id)
- lpdupe.markAsDuplicate(lpdupe_of)
- trans.commit()
-
- def importBugs(self, trans, product=None, component=None, status=None):
- """Import Bugzilla bugs matching the given constraints.
-
- Each of product, component and status gives a list of
- products, components or statuses to limit the import to. An
- empty list matches all products, components or statuses.
- """
- if product is None:
- product = []
- if component is None:
- component = []
- if status is None:
- status = []
-
- bugs = self.backend.findBugs(product=product,
- component=component,
- status=status)
- for bug_id in bugs:
- trans.begin()
- try:
- self.handleBug(bug_id)
- except (SystemExit, KeyboardInterrupt):
- raise
- except:
- logger.exception('Could not import Bugzilla bug #%d', bug_id)
- trans.abort()
- else:
- trans.commit()
=== modified file 'lib/lp/services/scripts/tests/__init__.py'
--- lib/lp/services/scripts/tests/__init__.py 2010-11-16 12:56:01 +0000
+++ lib/lp/services/scripts/tests/__init__.py 2011-08-03 01:06:36 +0000
@@ -24,7 +24,6 @@
KNOWN_BROKEN = [
# Needs mysqldb module
- 'scripts/bugzilla-import.py',
'scripts/migrate-bugzilla-initialcontacts.py',
# circular import from hell (IHasOwner).
'scripts/clean-sourceforge-project-entries.py',
=== removed file 'scripts/bugzilla-import.py'
--- scripts/bugzilla-import.py 2010-04-27 19:48:39 +0000
+++ scripts/bugzilla-import.py 1970-01-01 00:00:00 +0000
@@ -1,97 +0,0 @@
-#!/usr/bin/python -S
-#
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-import sys
-import logging
-import optparse
-import MySQLdb
-
-# pylint: disable-msg=W0403
-import _pythonpath
-
-from canonical.config import config
-from canonical.lp import initZopeless
-from canonical.launchpad.scripts import (
- execute_zcml_for_scripts, logger_options, logger)
-from canonical.launchpad.webapp.interaction import setupInteractionByEmail
-
-from canonical.launchpad.scripts import bugzilla
-
-
-def make_connection(options):
- kws = {}
- if options.db_name is not None:
- kws['db'] = options.db_name
- if options.db_user is not None:
- kws['user'] = options.db_user
- if options.db_password is not None:
- kws['passwd'] = options.db_passwd
- if options.db_host is not None:
- kws['host'] = options.db_host
-
- return MySQLdb.connect(**kws)
-
-def main(argv):
- parser = optparse.OptionParser(
- description=("This script imports bugs from a Bugzilla "
- "into Launchpad."))
-
- parser.add_option('--component', metavar='COMPONENT', action='append',
- help='Limit to this bugzilla component',
- type='string', dest='component', default=[])
- parser.add_option('--status', metavar='STATUS,...', action='store',
- help='Only import bugs with the given status',
- type='string', dest='status',
- default=None)
-
- # MySQL connection details
- parser.add_option('-d', '--dbname', metavar='DB', action='store',
- help='The MySQL database name',
- type='string', dest='db_name', default='bugs_warty')
- parser.add_option('-U', '--username', metavar='USER', action='store',
- help='The MySQL user name',
- type='string', dest='db_user', default=None)
- parser.add_option('-p', '--password', metavar='PASSWORD', action='store',
- help='The MySQL password',
- type='string', dest='db_password', default=None)
- parser.add_option('-H', '--host', metavar='HOST', action='store',
- help='The MySQL database host',
- type='string', dest='db_host', default=None)
-
- # logging options
- logger_options(parser, logging.INFO)
-
- options, args = parser.parse_args(argv[1:])
- if options.status is not None:
- options.status = options.status.split(',')
- else:
- options.status = []
-
- logger(options, 'canonical.launchpad.scripts.bugzilla')
-
- # don't send email
- send_email_data = """
- [zopeless]
- send_email: False
- """
- config.push('send_email_data', send_email_data)
-
- execute_zcml_for_scripts()
- ztm = initZopeless()
- setupInteractionByEmail('bug-importer@xxxxxxxxxxxxx')
-
- db = make_connection(options)
- bz = bugzilla.Bugzilla(db)
-
- bz.importBugs(ztm,
- product=['Ubuntu'],
- component=options.component,
- status=options.status)
-
- bz.processDuplicates(ztm)
- config.pop('send_email_data')
-
-if __name__ == '__main__':
- sys.exit(main(sys.argv))
=== removed file 'scripts/migrate-bugzilla-initialcontacts.py'
--- scripts/migrate-bugzilla-initialcontacts.py 2011-05-29 01:38:41 +0000
+++ scripts/migrate-bugzilla-initialcontacts.py 1970-01-01 00:00:00 +0000
@@ -1,91 +0,0 @@
-#!/usr/bin/python -S
-#
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-import logging
-import MySQLdb
-
-import _pythonpath
-
-from zope.component import getUtility
-
-from canonical.lp import initZopeless
-from canonical.launchpad.scripts import execute_zcml_for_scripts
-from canonical.launchpad.interfaces.emailaddress import IEmailAddressSet
-from lp.app.interfaces.launchpad import ILaunchpadCelebrities
-from lp.app.errors import NotFoundError
-from lp.registry.interfaces.person import IPersonSet
-
-
-execute_zcml_for_scripts()
-ztm = initZopeless()
-logging.basicConfig(level=logging.INFO)
-
-ubuntu = getUtility(ILaunchpadCelebrities).ubuntu
-techboard = getUtility(IPersonSet).getByName('techboard')
-
-def getPerson(email, realname):
- # The debzilla user acts as a placeholder for "no specific maintainer".
- # We don't create a bug contact record for it.
- if email is None or email == 'debzilla@xxxxxxxxxx':
- return None
-
- personset = getUtility(IPersonSet)
- person = personset.getByEmail(email)
- if person:
- return person
-
- # we mark the bugzilla email as preferred email, since it has been
- # validated there.
- if email.endswith('@lists.ubuntu.com'):
- logging.info('creating team for %s (%s)', email, realname)
- person = personset.newTeam(techboard, email[:-17], realname)
- email = getUtility(IEmailAddressSet).new(email, person.id)
- person.setPreferredEmail(email)
- else:
- logging.info('creating person for %s (%s)', email, realname)
- person, email = personset.createPersonAndEmail(email,
- displayname=realname)
- person.setPreferredEmail(email)
-
- return person
-
-
-conn = MySQLdb.connect(db='bugs_warty')
-cursor = conn.cursor()
-
-# big arse query that gets all the default assignees and QA contacts:
-cursor.execute(
- "SELECT components.name, owner.login_name, owner.realname, "
- " qa.login_name, qa.realname "
- " FROM components "
- " JOIN products ON components.product_id = products.id "
- " LEFT JOIN profiles AS owner ON components.initialowner = owner.userid"
- " LEFT JOIN profiles AS qa ON components.initialqacontact = qa.userid "
- " WHERE products.name = 'Ubuntu'")
-
-for (component, owneremail, ownername, qaemail, qaname) in cursor.fetchall():
- logging.info('Processing %s', component)
- try:
- srcpkgname, binpkgname = ubuntu.getPackageNames(component)
- except NotFoundError, e:
- logging.warning('could not find package name for "%s": %s', component,
- str(e))
- continue
-
- srcpkg = ubuntu.getSourcePackage(srcpkgname)
-
- # default assignee => maintainer
- person = getPerson(owneremail, ownername)
- if person:
- if not srcpkg.isBugContact(person):
- srcpkg.addBugContact(person)
-
- # QA contact => maintainer
- person = getPerson(qaemail, qaname)
- if person:
- if not srcpkg.isBugContact(person):
- srcpkg.addBugContact(person)
-
-ztm.commit()