← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:process-accepted-batching into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:process-accepted-batching into launchpad:master.

Commit message:
Batch queue items in process-accepted

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/416091

`PackageUploadSet.getAll` (and hence `DistroSeries.getPackageUploads`) returns a `DecoratedResultSet`.  These are usually used in contexts where the result is batched by a view or by the webservice machinery, and they have a property that's slightly surprising outside those contexts: if you iterate over a decorated result set without first slicing it, then its `pre_iter_hook` is called for *all* rows in the result set.  This means that `process-accepted` can run out of memory if the number of accepted queue items for a given archive and series is very large, as happened recently for a test rebuild archive.

To fix this, process queue items in batches.  We need to take a bit of care here because the act of processing queue items changes the result of the query, but explicitly ordering by ID and adding a threshold condition deals with that.  The particular implementation I chose here processes items in ascending order by ID rather than descending order as was implicitly used previously, but I think that's slightly better anyway.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:process-accepted-batching into launchpad:master.
diff --git a/lib/lp/archivepublisher/scripts/processaccepted.py b/lib/lp/archivepublisher/scripts/processaccepted.py
index 3968e30..242fff0 100644
--- a/lib/lp/archivepublisher/scripts/processaccepted.py
+++ b/lib/lp/archivepublisher/scripts/processaccepted.py
@@ -11,6 +11,7 @@ from optparse import OptionValueError
 import sys
 
 from zope.component import getUtility
+from zope.security.proxy import removeSecurityProxy
 
 from lp.archivepublisher.publishing import GLOBAL_PUBLISHER_LOCK
 from lp.archivepublisher.scripts.base import PublisherScript
@@ -29,6 +30,7 @@ from lp.soyuz.enums import (
     )
 from lp.soyuz.interfaces.archive import IArchiveSet
 from lp.soyuz.model.processacceptedbugsjob import close_bugs_for_queue_item
+from lp.soyuz.model.queue import PackageUpload
 
 
 class ProcessAccepted(PublisherScript):
@@ -39,6 +41,8 @@ class ProcessAccepted(PublisherScript):
     them for publishing as appropriate.
     """
 
+    batch_size = 100
+
     @property
     def lockfilename(self):
         """See `LaunchpadScript`."""
@@ -121,16 +125,34 @@ class ProcessAccepted(PublisherScript):
                         archive.reference, distroseries.name))
 
                     queue_items = distroseries.getPackageUploads(
-                        status=PackageUploadStatus.ACCEPTED, archive=archive)
-                    for queue_item in queue_items:
-                        if self.processQueueItem(queue_item):
-                            processed_queue_ids.append(queue_item.id)
-                        # Commit even on error; we may have altered the
-                        # on-disk archive, so the partial state must
-                        # make it to the DB.
-                        self.txn.commit()
-                        close_bugs_for_queue_item(queue_item)
-                        self.txn.commit()
+                        status=PackageUploadStatus.ACCEPTED,
+                        archive=archive).order_by(PackageUpload.id)
+                    start = 0
+
+                    # DistroSeries.getPackageUploads returns a
+                    # DecoratedResultSet, so we must slice it in order to
+                    # iterate over it efficiently; if we don't, then the
+                    # pre_iter_hook will be called for all rows in the set
+                    # at once, which may consume a very large amount of
+                    # memory if the queue is large.  Processing queue items
+                    # changes the result of the query, so we need to
+                    # explicitly order by ID and keep track of how far we've
+                    # got.
+                    while True:
+                        batch = list(removeSecurityProxy(queue_items).find(
+                            PackageUpload.id > start)[:self.batch_size])
+                        for queue_item in batch:
+                            start = queue_item.id
+                            if self.processQueueItem(queue_item):
+                                processed_queue_ids.append(queue_item.id)
+                            # Commit even on error; we may have altered the
+                            # on-disk archive, so the partial state must
+                            # make it to the DB.
+                            self.txn.commit()
+                            close_bugs_for_queue_item(queue_item)
+                            self.txn.commit()
+                        if len(batch) < self.batch_size:
+                            break
             finally:
                 clear_request_started()
         return processed_queue_ids
diff --git a/lib/lp/archivepublisher/tests/test_processaccepted.py b/lib/lp/archivepublisher/tests/test_processaccepted.py
index 565d82b..155c1f1 100644
--- a/lib/lp/archivepublisher/tests/test_processaccepted.py
+++ b/lib/lp/archivepublisher/tests/test_processaccepted.py
@@ -5,7 +5,11 @@
 
 from optparse import OptionValueError
 
-from testtools.matchers import LessThan
+from testtools.matchers import (
+    EndsWith,
+    LessThan,
+    MatchesListwise,
+    )
 import transaction
 
 from lp.archivepublisher.scripts.processaccepted import ProcessAccepted
@@ -20,7 +24,10 @@ from lp.soyuz.enums import (
     )
 from lp.soyuz.model.queue import PackageUpload
 from lp.soyuz.tests.test_publishing import SoyuzTestPublisher
-from lp.testing import TestCaseWithFactory
+from lp.testing import (
+    StormStatementRecorder,
+    TestCaseWithFactory,
+    )
 from lp.testing.dbuser import switch_dbuser
 from lp.testing.layers import LaunchpadZopelessLayer
 
@@ -205,3 +212,29 @@ class TestProcessAccepted(TestCaseWithFactory):
         script = ProcessAccepted(
             test_args=['--all-derived', '-d', distro.name])
         self.assertRaises(OptionValueError, script.validateArguments)
+
+    def test_processes_in_batches(self):
+        distroseries = self.factory.makeDistroSeries(distribution=self.distro)
+        uploads = [
+            self.createWaitingAcceptancePackage(
+                distroseries=distroseries, sourcename='source%d' % i)
+            for i in range(5)]
+
+        script = self.getScript([])
+        script.batch_size = 2
+        switch_dbuser(self.dbuser)
+        with StormStatementRecorder() as recorder:
+            script.main()
+
+        for upload in uploads:
+            published = self.distro.main_archive.getPublishedSources(
+                name=upload.name).one()
+            self.assertEqual(PackagePublishingStatus.PENDING, published.status)
+        # The script made three queries for PackageUploads, each of which
+        # was limited to the batch size.
+        self.assertThat(
+            [stmt for stmt in recorder.statements
+             if 'SELECT DISTINCT PackageUpload' in stmt],
+            MatchesListwise([
+                EndsWith('LIMIT 2'), EndsWith('LIMIT 2'), EndsWith('LIMIT 2'),
+                ]))