← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/launchpad/process-accepted-bugs-job into lp:launchpad

 

Colin Watson has proposed merging lp:~cjwatson/launchpad/process-accepted-bugs-job into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #745799 in Launchpad itself: "DistroSeries:+queue Timeout accepting packages (bug structural subscriptions)"
  https://bugs.launchpad.net/launchpad/+bug/745799

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/process-accepted-bugs-job/+merge/122420

== Summary ==

Bug 745799: Accepting an upload may need to close an arbitrarily large number of bugs.  Closing bugs involves a large number of queries due to structural subscriptions.  Uploads are sometimes accepted in contexts involving a timeout (notably DistroSeries:+queue and the PackageUpload.acceptFromQueue API method).  Even if structural subscription handling were made more efficient, the unbounded nature of the work means that there's a reasonable argument that it ought to be handled asynchronously in any event.

== Proposed fix ==

I understand that notifications may be made asynchronous at some point.  I'm not exactly sure what shape that work will take as I haven't seen anything concrete.  Pending that, I think a reasonable solution is simply to create a dedicated job for this, which can be run frequently; this can always be refactored in terms of something more general later if it turns out to be appropriate.

== Pre-implementation notes ==

  http://irclogs.ubuntu.com/2012/08/04/%23launchpad-dev.html#t01:49

Also, when this is deployed, we'll need to change the production crontabs to run this job.  We could also pre-emptively change ackee-launchpad to run 'cronscripts/process-job-source-groups.py FREQUENT' rather than 'cronscripts/process-job-source.py IPlainPackageCopyJobSource', which would be my personal preference, but mthaddon had some reservations:

  https://irclogs.canonical.com/2012/08/01/%23launchpad-ops.html#t10:36

Comments welcome.

== LOC Rationale ==

+445.  This is the last blocker for https://code.launchpad.net/~cjwatson/launchpad/remove-queue-tool/+merge/114464, which will be -1956 or thereabouts, and even given the queue API extensions that preceded this the whole arc comes out several hundred lines in the black.

== Tests ==

bin/test -vvct lp.soyuz.tests.test_processaccepted -t lp.soyuz.tests.test_queue.TestQueuePageClosingBugs

== Demo and Q/A ==

Set a distroseries to FROZEN on dogfood; upload a package to it, closing some open bug on that package; accept the package from the queue using 'queue accept' (lp:ubuntu-archive-tools) or the web UI; check that the bug is not closed immediately; run 'cronscripts/process-job-source.py IProcessAcceptedBugsJobSource'; check that the bug is now closed.

== Lint ==

Pre-existing lint, not straightforwardly fixable:

./lib/lp/services/config/schema-lazr.conf
     450: Line exceeds 80 characters.
    1032: Line exceeds 80 characters.
    1039: Line exceeds 80 characters.
    1577: Line exceeds 80 characters.
-- 
https://code.launchpad.net/~cjwatson/launchpad/process-accepted-bugs-job/+merge/122420
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/process-accepted-bugs-job into lp:launchpad.
=== modified file 'lib/lp/services/config/schema-lazr.conf'
--- lib/lp/services/config/schema-lazr.conf	2012-08-09 04:44:13 +0000
+++ lib/lp/services/config/schema-lazr.conf	2012-09-02 20:31:19 +0000
@@ -1747,6 +1747,7 @@
     IMembershipNotificationJobSource,
     IPersonMergeJobSource,
     IPlainPackageCopyJobSource,
+    IProcessAcceptedBugsJobSource,
     IQuestionEmailJobSource,
     IRemoveArtifactSubscriptionsJobSource,
     ISevenDayCommercialExpirationJobSource,
@@ -1775,6 +1776,12 @@
 dbuser: copy_packages
 crontab_group: FREQUENT
 
+[IProcessAcceptedBugsJobSource]
+# This section is used by cronscripts/process-job-source.py.
+module: lp.soyuz.interfaces.processacceptedbugsjob
+dbuser: process_accepted
+crontab_group: FREQUENT
+
 [IQuestionEmailJobSource]
 # This section is used by cronscripts/process-job-source.py.
 module: lp.answers.interfaces.questionjob

=== modified file 'lib/lp/soyuz/configure.zcml'
--- lib/lp/soyuz/configure.zcml	2012-07-11 13:27:35 +0000
+++ lib/lp/soyuz/configure.zcml	2012-09-02 20:31:19 +0000
@@ -988,19 +988,31 @@
       <allow interface="lp.soyuz.interfaces.packagerelationship.IPackageRelationshipSet"/>
     </class>
 
-   <!-- Overrides -->
-   <class class="lp.soyuz.adapters.overrides.SourceOverride">
-        <allow interface="lp.soyuz.adapters.overrides.ISourceOverride" />
-   </class>
-   <class class="lp.soyuz.adapters.overrides.BinaryOverride">
-        <allow interface="lp.soyuz.adapters.overrides.IBinaryOverride" />
-   </class>
-
-   <!-- OverridePolicy -->
-   <class class="lp.soyuz.adapters.overrides.UbuntuOverridePolicy">
-        <allow interface="lp.soyuz.adapters.overrides.IOverridePolicy" />
-   </class>
-
-   <webservice:register module="lp.soyuz.interfaces.webservice" />
+    <!-- Overrides -->
+    <class class="lp.soyuz.adapters.overrides.SourceOverride">
+      <allow interface="lp.soyuz.adapters.overrides.ISourceOverride" />
+    </class>
+    <class class="lp.soyuz.adapters.overrides.BinaryOverride">
+      <allow interface="lp.soyuz.adapters.overrides.IBinaryOverride" />
+    </class>
+
+    <!-- OverridePolicy -->
+    <class class="lp.soyuz.adapters.overrides.UbuntuOverridePolicy">
+      <allow interface="lp.soyuz.adapters.overrides.IOverridePolicy" />
+    </class>
+
+    <!-- ProcessAcceptedBugsJobSource -->
+    <securedutility
+	component=".model.processacceptedbugsjob.ProcessAcceptedBugsJob"
+	provides=".interfaces.processacceptedbugsjob.IProcessAcceptedBugsJobSource">
+      <allow interface=".interfaces.processacceptedbugsjob.IProcessAcceptedBugsJobSource" />
+    </securedutility>
+
+    <!-- ProcessAcceptedBugsJob -->
+    <class class=".model.processacceptedbugsjob.ProcessAcceptedBugsJob">
+      <allow interface=".interfaces.processacceptedbugsjob.IProcessAcceptedBugsJob" />
+    </class>
+
+    <webservice:register module="lp.soyuz.interfaces.webservice" />
 
 </configure>

=== added file 'lib/lp/soyuz/interfaces/processacceptedbugsjob.py'
--- lib/lp/soyuz/interfaces/processacceptedbugsjob.py	1970-01-01 00:00:00 +0000
+++ lib/lp/soyuz/interfaces/processacceptedbugsjob.py	2012-09-02 20:31:19 +0000
@@ -0,0 +1,57 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+__metaclass__ = type
+
+__all__ = [
+    "IProcessAcceptedBugsJob",
+    "IProcessAcceptedBugsJobSource",
+    ]
+
+from lazr.restful.fields import Reference
+from zope.interface import Attribute
+
+from lp import _
+from lp.registry.interfaces.distroseries import IDistroSeries
+from lp.services.job.interfaces.job import (
+    IJob,
+    IJobSource,
+    IRunnableJob,
+    )
+from lp.soyuz.interfaces.sourcepackagerelease import ISourcePackageRelease
+
+
+class IProcessAcceptedBugsJob(IRunnableJob):
+    """An `IJob` to close bugs for accepted package uploads."""
+
+    job = Reference(
+        schema=IJob, title=_("The common Job attributes"),
+        required=True, readonly=True)
+
+    distroseries = Reference(
+        schema=IDistroSeries, title=_("Context distroseries"),
+        required=True, readonly=True)
+
+    sourcepackagerelease = Reference(
+        schema=ISourcePackageRelease, title=_("Context sourcepackagerelease"),
+        required=True, readonly=True)
+
+    metadata = Attribute(_("A dict of data about the job."))
+
+    bug_ids = Attribute(_("A list of bug IDs."))
+
+    def getOperationDescription():
+        """Return a description of the bug-closing operation."""
+
+
+class IProcessAcceptedBugsJobSource(IJobSource):
+    """A source for jobs to close bugs for accepted package uploads."""
+
+    def create(distroseries, sourcepackagerelease, bug_ids):
+        """Create a new `IProcessAcceptedBugsJob`.
+
+        :param distroseries: A `IDistroSeries` for which to close bugs.
+        :param sourcepackagerelease: An `ISourcePackageRelease` for which to
+            close bugs.
+        :param bug_ids: An iterable of bug IDs to close.
+        """

=== added file 'lib/lp/soyuz/model/processacceptedbugsjob.py'
--- lib/lp/soyuz/model/processacceptedbugsjob.py	1970-01-01 00:00:00 +0000
+++ lib/lp/soyuz/model/processacceptedbugsjob.py	2012-09-02 20:31:19 +0000
@@ -0,0 +1,164 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+__metaclass__ = type
+
+__all__ = [
+    "close_bug_ids_for_sourcepackagerelease",
+    "ProcessAcceptedBugsJob",
+    ]
+
+import logging
+
+from storm.locals import (
+    And,
+    Int,
+    JSON,
+    Reference,
+    )
+from zope.component import getUtility
+from zope.interface import (
+    classProvides,
+    implements,
+    )
+
+from lp.app.interfaces.launchpad import ILaunchpadCelebrities
+from lp.bugs.interfaces.bug import IBugSet
+from lp.bugs.interfaces.bugtask import BugTaskStatus
+from lp.registry.model.distroseries import DistroSeries
+from lp.services.config import config
+from lp.services.database.lpstorm import IMasterStore
+from lp.services.database.stormbase import StormBase
+from lp.services.job.model.job import Job
+from lp.services.job.runner import BaseRunnableJob
+from lp.services.webapp.interfaces import (
+    DEFAULT_FLAVOR,
+    IStoreSelector,
+    MAIN_STORE,
+    )
+from lp.soyuz.interfaces.processacceptedbugsjob import (
+    IProcessAcceptedBugsJob,
+    IProcessAcceptedBugsJobSource,
+    )
+from lp.soyuz.model.sourcepackagerelease import SourcePackageRelease
+
+
+def close_bug_ids_for_sourcepackagerelease(distroseries, spr, bug_ids):
+    bugs = list(getUtility(IBugSet).getByNumbers(bug_ids))
+    janitor = getUtility(ILaunchpadCelebrities).janitor
+    target = distroseries.getSourcePackage(spr.sourcepackagename)
+    assert spr.changelog_entry is not None, (
+        "New source uploads should have a changelog.")
+    content = (
+        "This bug was fixed in the package %s"
+        "\n\n---------------\n%s" % (spr.title, spr.changelog_entry))
+
+    for bug in bugs:
+        edited_task = bug.setStatus(
+            target=target, status=BugTaskStatus.FIXRELEASED, user=janitor)
+        if edited_task is not None:
+            bug.newMessage(
+                owner=janitor,
+                subject=bug.followup_subject(),
+                content=content)
+
+
+class ProcessAcceptedBugsJob(StormBase, BaseRunnableJob):
+    """Base class for jobs to close bugs for accepted package uploads."""
+
+    __storm_table__ = "ProcessAcceptedBugsJob"
+
+    config = config.IProcessAcceptedBugsJobSource
+
+    implements(IProcessAcceptedBugsJob)
+
+    # Oddly, BaseRunnableJob inherits from BaseRunnableJobSource so this class
+    # is both the factory for jobs (the "implements", above) and the source
+    # for runnable jobs (not the constructor of the job source, the class
+    # provides the IJobSource interface itself).
+    classProvides(IProcessAcceptedBugsJobSource)
+
+    # The Job table contains core job details.
+    job_id = Int("job", primary=True)
+    job = Reference(job_id, Job.id)
+
+    distroseries_id = Int(name="distroseries")
+    distroseries = Reference(distroseries_id, DistroSeries.id)
+
+    sourcepackagerelease_id = Int(name="sourcepackagerelease")
+    sourcepackagerelease = Reference(
+        sourcepackagerelease_id, SourcePackageRelease.id)
+
+    metadata = JSON('json_data')
+
+    def __init__(self, distroseries, sourcepackagerelease, bug_ids):
+        self.job = Job()
+        self.distroseries = distroseries
+        self.sourcepackagerelease = sourcepackagerelease
+        self.metadata = {"bug_ids": list(bug_ids)}
+        super(ProcessAcceptedBugsJob, self).__init__()
+
+    @property
+    def bug_ids(self):
+        return self.metadata["bug_ids"]
+
+    @classmethod
+    def create(cls, distroseries, sourcepackagerelease, bug_ids):
+        """See `IProcessAcceptedBugsJobSource`."""
+        assert distroseries is not None, "No distroseries specified."
+        assert sourcepackagerelease is not None, (
+            "No sourcepackagerelease specified.")
+        assert sourcepackagerelease.changelog_entry is not None, (
+            "New source uploads should have a changelog.")
+        assert bug_ids, "No bug IDs specified."
+        job = ProcessAcceptedBugsJob(
+            distroseries, sourcepackagerelease, bug_ids)
+        IMasterStore(ProcessAcceptedBugsJob).add(job)
+        job.celeryRunOnCommit()
+        return job
+
+    def getOperationDescription(self):
+        """See `IRunnableJob`."""
+        return "closing bugs for accepted package upload"
+
+    def run(self):
+        """See `IRunnableJob`."""
+        logger = logging.getLogger()
+        spr = self.sourcepackagerelease
+        logger.info(
+            "Closing bugs for %s/%s (%s)" %
+            (spr.name, spr.version, self.distroseries))
+        close_bug_ids_for_sourcepackagerelease(
+            self.distroseries, spr, self.metadata["bug_ids"])
+
+    def __repr__(self):
+        """Returns an informative representation of the job."""
+        parts = ["%s to close bugs [" % self.__class__.__name__]
+        parts.append(", ".join(str(bug_id) for bug_id in self.bug_ids))
+        spr = self.sourcepackagerelease
+        parts.append(
+            "] for %s/%s (%s)" % (spr.name, spr.version, self.distroseries))
+        return "<%s>" % "".join(parts)
+
+    @staticmethod
+    def iterReady():
+        """See `IJobSource`."""
+        store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
+        return store.find((ProcessAcceptedBugsJob),
+            And(ProcessAcceptedBugsJob.job == Job.id,
+                Job.id.is_in(Job.ready_jobs)))
+
+    def makeDerived(self):
+        """Support UniversalJobSource.
+
+        (Most Job ORM classes are generic, because their database table is
+        used for several related job types.  Therefore, they have derived
+        classes to implement the specific Job.
+
+        ProcessAcceptedBugsJob implements the specific job, so its
+        makeDerived returns itself.)
+        """
+        return self
+
+    def getDBClass(self):
+        return self.__class__

=== modified file 'lib/lp/soyuz/scripts/processaccepted.py'
--- lib/lp/soyuz/scripts/processaccepted.py	2012-07-05 09:43:58 +0000
+++ lib/lp/soyuz/scripts/processaccepted.py	2012-09-02 20:31:19 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Helper functions for the process-accepted.py script."""
@@ -8,7 +8,7 @@
     'close_bugs_for_queue_item',
     'close_bugs_for_sourcepackagerelease',
     'close_bugs_for_sourcepublication',
-    'get_bugs_from_changes_file',
+    'get_bug_ids_from_changes_file',
     'ProcessAccepted',
     ]
 
@@ -17,20 +17,17 @@
 
 from debian.deb822 import Deb822Dict
 from zope.component import getUtility
-from zope.security.proxy import removeSecurityProxy
+from zope.security.management import getSecurityPolicy
 
-from lp.app.errors import NotFoundError
-from lp.app.interfaces.launchpad import ILaunchpadCelebrities
 from lp.archivepublisher.publishing import GLOBAL_PUBLISHER_LOCK
 from lp.archiveuploader.tagfiles import parse_tagfile_content
-from lp.bugs.interfaces.bug import IBugSet
-from lp.bugs.interfaces.bugtask import BugTaskStatus
 from lp.registry.interfaces.distribution import IDistributionSet
 from lp.registry.interfaces.pocket import PackagePublishingPocket
 from lp.services.scripts.base import (
     LaunchpadCronScript,
     LaunchpadScriptFailure,
     )
+from lp.services.webapp.authorization import LaunchpadPermissiveSecurityPolicy
 from lp.services.webapp.errorlog import (
     ErrorReportingUtility,
     ScriptRequest,
@@ -43,34 +40,34 @@
     re_lp_closes,
     )
 from lp.soyuz.interfaces.archive import IArchiveSet
+from lp.soyuz.interfaces.processacceptedbugsjob import (
+    IProcessAcceptedBugsJobSource,
+    )
 from lp.soyuz.interfaces.queue import IPackageUploadSet
-
-
-def get_bugs_from_changes_file(changes_file):
-    """Parse the changes file and return a list of bugs referenced by it.
+from lp.soyuz.model.processacceptedbugsjob import (
+    close_bug_ids_for_sourcepackagerelease,
+    )
+
+
+def get_bug_ids_from_changes_file(changes_file):
+    """Parse the changes file and return a list of bug IDs referenced by it.
 
     The bugs is specified in the Launchpad-bugs-fixed header, and are
     separated by a space character. Nonexistent bug ids are ignored.
     """
     tags = Deb822Dict(parse_tagfile_content(changes_file.read()))
     bugs_fixed_line = tags.get('Launchpad-bugs-fixed', '')
-    bugs = []
+    bug_ids = []
     for bug_id in bugs_fixed_line.split():
         if not bug_id.isdigit():
             continue
-        bug_id = int(bug_id)
-        try:
-            bug = getUtility(IBugSet).get(bug_id)
-        except NotFoundError:
-            continue
-        else:
-            bugs.append(bug)
-    return bugs
-
-
-def get_bugs_from_changelog_entry(sourcepackagerelease, since_version):
+        bug_ids.append(int(bug_id))
+    return bug_ids
+
+
+def get_bug_ids_from_changelog_entry(sourcepackagerelease, since_version):
     """Parse the changelog_entry in the sourcepackagerelease and return a
-    list of `IBug`s referenced by it.
+    list of bug IDs referenced by it.
     """
     changelog = sourcepackagerelease.aggregate_changelog(since_version)
     closes = []
@@ -84,17 +81,7 @@
         for match in regex:
             bug_match = re_bug_numbers.findall(match.group(0))
             closes += map(int, bug_match)
-
-    bugs = []
-    for bug_id in closes:
-        try:
-            bug = getUtility(IBugSet).get(bug_id)
-        except NotFoundError:
-            continue
-        else:
-            bugs.append(bug)
-
-    return bugs
+    return closes
 
 
 def can_close_bugs(target):
@@ -148,7 +135,8 @@
 
     for source_queue_item in queue_item.sources:
         close_bugs_for_sourcepackagerelease(
-            source_queue_item.sourcepackagerelease, changesfile_object)
+            queue_item.distroseries, source_queue_item.sourcepackagerelease,
+            changesfile_object)
 
 
 def close_bugs_for_sourcepublication(source_publication, since_version=None):
@@ -164,17 +152,18 @@
     changesfile_object = sourcepackagerelease.upload_changesfile
 
     close_bugs_for_sourcepackagerelease(
-        sourcepackagerelease, changesfile_object, since_version,
-        upload_distroseries=source_publication.distroseries)
-
-
-def close_bugs_for_sourcepackagerelease(source_release, changesfile_object,
-                                        since_version=None,
-                                        upload_distroseries=None):
+        source_publication.distroseries, sourcepackagerelease,
+        changesfile_object, since_version)
+
+
+def close_bugs_for_sourcepackagerelease(distroseries, source_release,
+                                        changesfile_object,
+                                        since_version=None):
     """Close bugs for a given source.
 
-    Given a `ISourcePackageRelease` and a corresponding changesfile object,
-    close bugs mentioned in the changesfile in the context of the source.
+    Given an `IDistroSeries`, an `ISourcePackageRelease`, and a
+    corresponding changesfile object, close bugs mentioned in the
+    changesfile in the context of the source.
 
     If changesfile_object is None and since_version is supplied,
     close all the bugs in changelog entries made after that version and up
@@ -184,45 +173,25 @@
     requirement to do so right now.
     """
     if since_version and source_release.changelog:
-        bugs_to_close = get_bugs_from_changelog_entry(
+        bug_ids_to_close = get_bug_ids_from_changelog_entry(
             source_release, since_version=since_version)
     elif changesfile_object:
-        bugs_to_close = get_bugs_from_changes_file(changesfile_object)
+        bug_ids_to_close = get_bug_ids_from_changes_file(changesfile_object)
     else:
         return
 
     # No bugs to be closed by this upload, move on.
-    if not bugs_to_close:
+    if not bug_ids_to_close:
         return
 
-    janitor = getUtility(ILaunchpadCelebrities).janitor
-    for bug in bugs_to_close:
-        # We need to remove the security proxy here because the bug
-        # might be private and if this code is called via someone using
-        # the +queue page they will get an OOPS.  Ideally, we should
-        # migrate this code to the Job system though, but that's a lot
-        # of work.  If you don't do that and you're changing stuff in
-        # here, BE CAREFUL with the unproxied bug object and look at
-        # what you're doing with it that might violate security.
-        bug = removeSecurityProxy(bug)
-        if upload_distroseries is not None:
-            target = upload_distroseries.getSourcePackage(
-                source_release.sourcepackagename)
-        else:
-            target = source_release.sourcepackage
-        edited_task = bug.setStatus(
-            target=target, status=BugTaskStatus.FIXRELEASED, user=janitor)
-        if edited_task is not None:
-            assert source_release.changelog_entry is not None, (
-                "New source uploads should have a changelog.")
-            content = (
-                "This bug was fixed in the package %s"
-                "\n\n---------------\n%s" % (
-                source_release.title, source_release.changelog_entry))
-            bug.newMessage(
-                owner=janitor,
-                subject=bug.followup_subject(),
-                content=content)
+    if getSecurityPolicy() == LaunchpadPermissiveSecurityPolicy:
+        # We're already running in a script, so we can just close the bugs
+        # directly.
+        close_bug_ids_for_sourcepackagerelease(
+            distroseries, source_release, bug_ids_to_close)
+    else:
+        job_source = getUtility(IProcessAcceptedBugsJobSource)
+        job_source.create(distroseries, source_release, bug_ids_to_close)
 
 
 class TargetPolicy:

=== modified file 'lib/lp/soyuz/scripts/tests/test_queue.py'
--- lib/lp/soyuz/scripts/tests/test_queue.py	2012-08-08 11:48:29 +0000
+++ lib/lp/soyuz/scripts/tests/test_queue.py	2012-09-02 20:31:19 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """queue tool base class tests."""
@@ -47,6 +47,9 @@
     PackageUploadStatus,
     )
 from lp.soyuz.interfaces.archive import IArchiveSet
+from lp.soyuz.interfaces.processacceptedbugsjob import (
+    IProcessAcceptedBugsJobSource,
+    )
 from lp.soyuz.interfaces.queue import IPackageUploadSet
 from lp.soyuz.model.queue import PackageUploadBuild
 from lp.soyuz.scripts.processaccepted import (
@@ -1085,8 +1088,8 @@
         # we're testing.
         spr = self.factory.makeSourcePackageRelease(changelog_entry="blah")
         archive_admin = self.factory.makePerson()
-        dsp = spr.upload_distroseries.distribution.getSourcePackage(
-            spr.sourcepackagename)
+        series = spr.upload_distroseries
+        dsp = series.distribution.getSourcePackage(spr.sourcepackagename)
         bug = self.factory.makeBug(
             target=dsp, information_type=InformationType.USERDATA)
         changes = StringIO(changes_file_template % bug.id)
@@ -1095,12 +1098,17 @@
             # The archive admin user can't normally see this bug.
             self.assertRaises(ForbiddenAttribute, bug, 'status')
             # But the bug closure should work.
-            close_bugs_for_sourcepackagerelease(spr, changes)
+            close_bugs_for_sourcepackagerelease(series, spr, changes)
 
-        # Verify it was closed.
+        # Rather than closing the bugs immediately, this creates a
+        # ProcessAcceptedBugsJob.
         with celebrity_logged_in("admin"):
-            self.assertEqual(
-                bug.default_bugtask.status, BugTaskStatus.FIXRELEASED)
+            self.assertEqual(BugTaskStatus.NEW, bug.default_bugtask.status)
+        job_source = getUtility(IProcessAcceptedBugsJobSource)
+        [job] = list(job_source.iterReady())
+        self.assertEqual(series, job.distroseries)
+        self.assertEqual(spr, job.sourcepackagerelease)
+        self.assertEqual([bug.id], job.bug_ids)
 
 
 class TestQueueToolInJail(TestQueueBase, TestCase):

=== modified file 'lib/lp/soyuz/tests/test_processaccepted.py'
--- lib/lp/soyuz/tests/test_processaccepted.py	2012-01-20 15:42:44 +0000
+++ lib/lp/soyuz/tests/test_processaccepted.py	2012-09-02 20:31:19 +0000
@@ -1,4 +1,4 @@
-# Copyright 2010-2011 Canonical Ltd.  This software is licensed under the
+# Copyright 2010-2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Test process-accepted.py"""
@@ -22,7 +22,7 @@
     )
 from lp.soyuz.model.queue import PackageUpload
 from lp.soyuz.scripts.processaccepted import (
-    get_bugs_from_changes_file,
+    get_bug_ids_from_changes_file,
     ProcessAccepted,
     )
 from lp.soyuz.tests.test_publishing import SoyuzTestPublisher
@@ -221,54 +221,55 @@
             dsp.derived_series.distribution, script.findTargetDistros())
 
 
-class TestBugsFromChangesFile(TestCaseWithFactory):
-    """Test get_bugs_from_changes_file."""
+class TestBugIDsFromChangesFile(TestCaseWithFactory):
+    """Test get_bug_ids_from_changes_file."""
 
     layer = LaunchpadZopelessLayer
     dbuser = config.uploadqueue.dbuser
 
     def setUp(self):
-        super(TestBugsFromChangesFile, self).setUp()
+        super(TestBugIDsFromChangesFile, self).setUp()
         self.changes = Changes({
             'Format': '1.8',
             'Source': 'swat',
             })
 
-    def getBugs(self):
-        """Serialize self.changes and use get_bugs_from_changes_file to
-        extract bugs from it.
+    def getBugIDs(self):
+        """Serialize self.changes and use get_bug_ids_from_changes_file to
+        extract bug IDs from it.
         """
         stream = StringIO()
         self.changes.dump(stream)
         stream.seek(0)
-        return get_bugs_from_changes_file(stream)
+        return get_bug_ids_from_changes_file(stream)
 
     def test_no_bugs(self):
         # An empty list is returned if there are no bugs
         # mentioned.
-        self.assertEquals([], self.getBugs())
+        self.assertEqual([], self.getBugIDs())
 
     def test_invalid_bug_id(self):
         # Invalid bug ids (i.e. containing non-digit characters) are ignored.
         self.changes["Launchpad-Bugs-Fixed"] = "bla"
-        self.assertEquals([], self.getBugs())
+        self.assertEqual([], self.getBugIDs())
 
     def test_unknown_bug_id(self):
-        # Unknown bug ids are ignored.
+        # Unknown bug ids are passed through; they will be ignored later, by
+        # close_bug_ids_for_sourcepackagerelease.
         self.changes["Launchpad-Bugs-Fixed"] = "45120"
-        self.assertEquals([], self.getBugs())
+        self.assertEqual([45120], self.getBugIDs())
 
     def test_valid_bug(self):
         # For valid bug ids the bug object is returned.
         bug = self.factory.makeBug()
         self.changes["Launchpad-Bugs-Fixed"] = "%d" % bug.id
-        self.assertEquals([bug], self.getBugs())
+        self.assertEqual([bug.id], self.getBugIDs())
 
     def test_case_sensitivity(self):
         # The spelling of Launchpad-Bugs-Fixed is case-insensitive.
         bug = self.factory.makeBug()
         self.changes["LaUnchpad-Bugs-fixed"] = "%d" % bug.id
-        self.assertEquals([bug], self.getBugs())
+        self.assertEqual([bug.id], self.getBugIDs())
 
     def test_multiple_bugs(self):
         # Multiple bug ids can be specified, separated by spaces.
@@ -276,4 +277,4 @@
         bug2 = self.factory.makeBug()
         self.changes["Launchpad-Bugs-Fixed"] = "%d invalid %d" % (
             bug1.id, bug2.id)
-        self.assertEquals([bug1, bug2], self.getBugs())
+        self.assertEqual([bug1.id, bug2.id], self.getBugIDs())

=== added file 'lib/lp/soyuz/tests/test_processacceptedbugsjob.py'
--- lib/lp/soyuz/tests/test_processacceptedbugsjob.py	1970-01-01 00:00:00 +0000
+++ lib/lp/soyuz/tests/test_processacceptedbugsjob.py	2012-09-02 20:31:19 +0000
@@ -0,0 +1,227 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for jobs to close bugs for accepted package uploads."""
+
+from itertools import product
+
+from testtools.content import text_content
+import transaction
+from zope.component import getUtility
+from zope.security.proxy import removeSecurityProxy
+
+from lp.bugs.interfaces.bugtask import BugTaskStatus
+from lp.registry.enums import InformationType
+from lp.registry.interfaces.series import SeriesStatus
+from lp.services.config import config
+from lp.services.features.testing import FeatureFixture
+from lp.services.job.interfaces.job import JobStatus
+from lp.services.job.runner import JobRunner
+from lp.services.job.tests import block_on_job
+from lp.services.webapp.testing import verifyObject
+from lp.soyuz.interfaces.processacceptedbugsjob import (
+    IProcessAcceptedBugsJob,
+    IProcessAcceptedBugsJobSource,
+    )
+from lp.soyuz.model.processacceptedbugsjob import (
+    close_bug_ids_for_sourcepackagerelease,
+    )
+from lp.soyuz.tests.test_publishing import SoyuzTestPublisher
+from lp.testing import (
+    run_script,
+    TestCaseWithFactory,
+    )
+from lp.testing.fakemethod import FakeMethod
+from lp.testing.layers import (
+    CeleryJobLayer,
+    LaunchpadZopelessLayer,
+    )
+
+
+class TestCloseBugIDsForSourcePackageRelease(TestCaseWithFactory):
+
+    layer = LaunchpadZopelessLayer
+    dbuser = config.IProcessAcceptedBugsJobSource.dbuser
+
+    def setUp(self):
+        super(TestCloseBugIDsForSourcePackageRelease, self).setUp()
+        # Create a distribution with two series, two source package names,
+        # and an SPR and a bug task for all combinations of those.
+        self.distro = self.factory.makeDistribution()
+        self.series = [
+            self.factory.makeDistroSeries(
+                distribution=self.distro, status=status)
+            for status in (SeriesStatus.CURRENT, SeriesStatus.DEVELOPMENT)]
+        self.spns = [self.factory.makeSourcePackageName() for _ in range(2)]
+        self.bug = self.factory.makeBug()
+        self.sprs = [
+            self.factory.makeSourcePackageRelease(
+                sourcepackagename=spn, distroseries=series,
+                changelog_entry="changelog")
+            for spn, series in product(self.spns, self.series)]
+        self.bugtasks = [
+            self.factory.makeBugTask(
+                target=spr.upload_distroseries.getSourcePackage(
+                    spr.sourcepackagename),
+                bug=self.bug)
+            for spr in self.sprs]
+
+    def test_correct_tasks_with_distroseries(self):
+        # Close the task for the correct source package name and the given
+        # series.
+        close_bug_ids_for_sourcepackagerelease(
+            self.series[0], self.sprs[0], [self.bug.id])
+        self.assertEqual(BugTaskStatus.FIXRELEASED, self.bugtasks[0].status)
+        for i in (1, 2, 3):
+            self.assertEqual(BugTaskStatus.NEW, self.bugtasks[i].status)
+
+    def test_correct_message(self):
+        # When closing a bug, a reasonable message is added.
+        close_bug_ids_for_sourcepackagerelease(
+            self.series[0], self.sprs[0], [self.bug.id])
+        self.assertEqual(2, self.bug.messages.count())
+        self.assertEqual(
+            "This bug was fixed in the package %s"
+            "\n\n---------------\nchangelog" % self.sprs[0].title,
+            self.bug.messages[1].text_contents)
+
+    def test_ignore_unknown_bug_ids(self):
+        # Unknown bug IDs are ignored, and no message is added.
+        close_bug_ids_for_sourcepackagerelease(
+            self.series[0], self.sprs[0], [self.bug.id + 1])
+        for bugtask in self.bugtasks:
+            self.assertEqual(BugTaskStatus.NEW, bugtask.status)
+        self.assertEqual(1, self.bug.messages.count())
+
+    def test_private_bug(self):
+        # Closing private bugs is not a problem.
+        self.bug.transitionToInformationType(
+            InformationType.USERDATA, self.distro.owner)
+        close_bug_ids_for_sourcepackagerelease(
+            self.series[0], self.sprs[0], [self.bug.id])
+        self.assertEqual(BugTaskStatus.FIXRELEASED, self.bugtasks[0].status)
+
+
+class TestProcessAcceptedBugsJob(TestCaseWithFactory):
+
+    layer = LaunchpadZopelessLayer
+    dbuser = config.IProcessAcceptedBugsJobSource.dbuser
+
+    def setUp(self):
+        super(TestProcessAcceptedBugsJob, self).setUp()
+        self.publisher = SoyuzTestPublisher()
+        self.publisher.prepareBreezyAutotest()
+        self.distroseries = self.publisher.breezy_autotest
+
+    def makeJob(self, distroseries=None, spr=None, bug_ids=[1]):
+        """Create a `ProcessAcceptedBugsJob`."""
+        if distroseries is None:
+            distroseries = self.distroseries
+        if spr is None:
+            spr = self.factory.makeSourcePackageRelease(
+                distroseries=distroseries, changelog_entry="changelog")
+        return getUtility(IProcessAcceptedBugsJobSource).create(
+            distroseries, spr, bug_ids)
+
+    def test_job_implements_IProcessAcceptedBugsJob(self):
+        job = self.makeJob()
+        self.assertTrue(verifyObject(IProcessAcceptedBugsJob, job))
+
+    def test_job_source_implements_IProcessAcceptedBugsJobSource(self):
+        job_source = getUtility(IProcessAcceptedBugsJobSource)
+        self.assertTrue(
+            verifyObject(IProcessAcceptedBugsJobSource, job_source))
+
+    def test_create(self):
+        # A ProcessAcceptedBugsJob can be created and stores its arguments.
+        spr = self.factory.makeSourcePackageRelease(
+            distroseries=self.distroseries, changelog_entry="changelog")
+        bug_ids = [1, 2]
+        job = self.makeJob(spr=spr, bug_ids=bug_ids)
+        self.assertProvides(job, IProcessAcceptedBugsJob)
+        self.assertEqual(self.distroseries, job.distroseries)
+        self.assertEqual(spr, job.sourcepackagerelease)
+        self.assertEqual(bug_ids, job.bug_ids)
+
+    def test_run_raises_errors(self):
+        # A job reports unexpected errors as exceptions.
+        class Boom(Exception):
+            pass
+
+        distroseries = self.factory.makeDistroSeries()
+        removeSecurityProxy(distroseries).getSourcePackage = FakeMethod(
+            failure=Boom())
+        job = self.makeJob(distroseries=distroseries)
+        self.assertRaises(Boom, job.run)
+
+    def test___repr__(self):
+        spr = self.factory.makeSourcePackageRelease(
+            distroseries=self.distroseries, changelog_entry="changelog")
+        bug_ids = [1, 2]
+        job = self.makeJob(spr=spr, bug_ids=bug_ids)
+        self.assertEqual(
+            ("<ProcessAcceptedBugsJob to close bugs [1, 2] for "
+             "{spr.name}/{spr.version} ({distroseries.distribution.name} "
+             "{distroseries.name})>").format(
+                distroseries=self.distroseries, spr=spr),
+            repr(job))
+
+    def test_run(self):
+        # A proper test run closes bugs.
+        spr = self.factory.makeSourcePackageRelease(
+            distroseries=self.distroseries, changelog_entry="changelog")
+        bug = self.factory.makeBug()
+        bugtask = self.factory.makeBugTask(target=spr.sourcepackage, bug=bug)
+        self.assertEqual(BugTaskStatus.NEW, bugtask.status)
+        job = self.makeJob(spr=spr, bug_ids=[bug.id])
+        JobRunner([job]).runAll()
+        self.assertEqual(BugTaskStatus.FIXRELEASED, bugtask.status)
+
+    def test_smoke(self):
+        spr = self.factory.makeSourcePackageRelease(
+            distroseries=self.distroseries, changelog_entry="changelog")
+        bug = self.factory.makeBug()
+        bugtask = self.factory.makeBugTask(target=spr.sourcepackage, bug=bug)
+        self.assertEqual(BugTaskStatus.NEW, bugtask.status)
+        self.makeJob(spr=spr, bug_ids=[bug.id])
+        transaction.commit()
+
+        out, err, exit_code = run_script(
+            "LP_DEBUG_SQL=1 cronscripts/process-job-source.py -vv %s" % (
+                IProcessAcceptedBugsJobSource.getName()))
+
+        self.addDetail("stdout", text_content(out))
+        self.addDetail("stderr", text_content(err))
+
+        self.assertEqual(0, exit_code)
+        self.assertEqual(BugTaskStatus.FIXRELEASED, bugtask.status)
+
+
+class TestViaCelery(TestCaseWithFactory):
+    """ProcessAcceptedBugsJob runs under Celery."""
+
+    layer = CeleryJobLayer
+
+    def test_run(self):
+        # A proper test run closes bugs.
+        self.useFixture(FeatureFixture({
+            "jobs.celery.enabled_classes": "ProcessAcceptedBugsJob",
+        }))
+
+        distroseries = self.factory.makeDistroSeries()
+        spr = self.factory.makeSourcePackageRelease(
+            distroseries=distroseries, changelog_entry="changelog")
+        bug = self.factory.makeBug()
+        bugtask = self.factory.makeBugTask(target=spr.sourcepackage, bug=bug)
+        self.assertEqual(BugTaskStatus.NEW, bugtask.status)
+        job = getUtility(IProcessAcceptedBugsJobSource).create(
+            distroseries, spr, [bug.id])
+        self.assertEqual(distroseries, job.distroseries)
+        self.assertEqual(spr, job.sourcepackagerelease)
+        self.assertEqual([bug.id], job.bug_ids)
+
+        with block_on_job(self):
+            transaction.commit()
+
+        self.assertEqual(JobStatus.COMPLETED, job.status)
+        self.assertEqual(BugTaskStatus.FIXRELEASED, bugtask.status)


Follow ups