launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #28166
[Merge] ~cjwatson/launchpad:process-accepted-batching into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:process-accepted-batching into launchpad:master.
Commit message:
Batch queue items in process-accepted
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/416091
`PackageUploadSet.getAll` (and hence `DistroSeries.getPackageUploads`) returns a `DecoratedResultSet`. These are usually used in contexts where the result is batched by a view or by the webservice machinery, and they have a property that's slightly surprising outside those contexts: if you iterate over a decorated result set without first slicing it, then its `pre_iter_hook` is called for *all* rows in the result set. This means that `process-accepted` can run out of memory if the number of accepted queue items for a given archive and series is very large, as happened recently for a test rebuild archive.
To fix this, process queue items in batches. We need to take a bit of care here because the act of processing queue items changes the result of the query, but explicitly ordering by ID and adding a threshold condition deals with that. The particular implementation I chose here processes items in ascending order by ID rather than descending order as was implicitly used previously, but I think that's slightly better anyway.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:process-accepted-batching into launchpad:master.
diff --git a/lib/lp/archivepublisher/scripts/processaccepted.py b/lib/lp/archivepublisher/scripts/processaccepted.py
index 3968e30..242fff0 100644
--- a/lib/lp/archivepublisher/scripts/processaccepted.py
+++ b/lib/lp/archivepublisher/scripts/processaccepted.py
@@ -11,6 +11,7 @@ from optparse import OptionValueError
import sys
from zope.component import getUtility
+from zope.security.proxy import removeSecurityProxy
from lp.archivepublisher.publishing import GLOBAL_PUBLISHER_LOCK
from lp.archivepublisher.scripts.base import PublisherScript
@@ -29,6 +30,7 @@ from lp.soyuz.enums import (
)
from lp.soyuz.interfaces.archive import IArchiveSet
from lp.soyuz.model.processacceptedbugsjob import close_bugs_for_queue_item
+from lp.soyuz.model.queue import PackageUpload
class ProcessAccepted(PublisherScript):
@@ -39,6 +41,8 @@ class ProcessAccepted(PublisherScript):
them for publishing as appropriate.
"""
+ batch_size = 100
+
@property
def lockfilename(self):
"""See `LaunchpadScript`."""
@@ -121,16 +125,34 @@ class ProcessAccepted(PublisherScript):
archive.reference, distroseries.name))
queue_items = distroseries.getPackageUploads(
- status=PackageUploadStatus.ACCEPTED, archive=archive)
- for queue_item in queue_items:
- if self.processQueueItem(queue_item):
- processed_queue_ids.append(queue_item.id)
- # Commit even on error; we may have altered the
- # on-disk archive, so the partial state must
- # make it to the DB.
- self.txn.commit()
- close_bugs_for_queue_item(queue_item)
- self.txn.commit()
+ status=PackageUploadStatus.ACCEPTED,
+ archive=archive).order_by(PackageUpload.id)
+ start = 0
+
+ # DistroSeries.getPackageUploads returns a
+ # DecoratedResultSet, so we must slice it in order to
+ # iterate over it efficiently; if we don't, then the
+ # pre_iter_hook will be called for all rows in the set
+ # at once, which may consume a very large amount of
+ # memory if the queue is large. Processing queue items
+ # changes the result of the query, so we need to
+ # explicitly order by ID and keep track of how far we've
+ # got.
+ while True:
+ batch = list(removeSecurityProxy(queue_items).find(
+ PackageUpload.id > start)[:self.batch_size])
+ for queue_item in batch:
+ start = queue_item.id
+ if self.processQueueItem(queue_item):
+ processed_queue_ids.append(queue_item.id)
+ # Commit even on error; we may have altered the
+ # on-disk archive, so the partial state must
+ # make it to the DB.
+ self.txn.commit()
+ close_bugs_for_queue_item(queue_item)
+ self.txn.commit()
+ if len(batch) < self.batch_size:
+ break
finally:
clear_request_started()
return processed_queue_ids
diff --git a/lib/lp/archivepublisher/tests/test_processaccepted.py b/lib/lp/archivepublisher/tests/test_processaccepted.py
index 565d82b..155c1f1 100644
--- a/lib/lp/archivepublisher/tests/test_processaccepted.py
+++ b/lib/lp/archivepublisher/tests/test_processaccepted.py
@@ -5,7 +5,11 @@
from optparse import OptionValueError
-from testtools.matchers import LessThan
+from testtools.matchers import (
+ EndsWith,
+ LessThan,
+ MatchesListwise,
+ )
import transaction
from lp.archivepublisher.scripts.processaccepted import ProcessAccepted
@@ -20,7 +24,10 @@ from lp.soyuz.enums import (
)
from lp.soyuz.model.queue import PackageUpload
from lp.soyuz.tests.test_publishing import SoyuzTestPublisher
-from lp.testing import TestCaseWithFactory
+from lp.testing import (
+ StormStatementRecorder,
+ TestCaseWithFactory,
+ )
from lp.testing.dbuser import switch_dbuser
from lp.testing.layers import LaunchpadZopelessLayer
@@ -205,3 +212,29 @@ class TestProcessAccepted(TestCaseWithFactory):
script = ProcessAccepted(
test_args=['--all-derived', '-d', distro.name])
self.assertRaises(OptionValueError, script.validateArguments)
+
+ def test_processes_in_batches(self):
+ distroseries = self.factory.makeDistroSeries(distribution=self.distro)
+ uploads = [
+ self.createWaitingAcceptancePackage(
+ distroseries=distroseries, sourcename='source%d' % i)
+ for i in range(5)]
+
+ script = self.getScript([])
+ script.batch_size = 2
+ switch_dbuser(self.dbuser)
+ with StormStatementRecorder() as recorder:
+ script.main()
+
+ for upload in uploads:
+ published = self.distro.main_archive.getPublishedSources(
+ name=upload.name).one()
+ self.assertEqual(PackagePublishingStatus.PENDING, published.status)
+ # The script made three queries for PackageUploads, each of which
+ # was limited to the batch size.
+ self.assertThat(
+ [stmt for stmt in recorder.statements
+ if 'SELECT DISTINCT PackageUpload' in stmt],
+ MatchesListwise([
+ EndsWith('LIMIT 2'), EndsWith('LIMIT 2'), EndsWith('LIMIT 2'),
+ ]))