← Back to team overview

launchpad-reviewers team mailing list archive

lp:~wallyworld/launchpad/another-reporting-cache-garbojob-1071581 into lp:launchpad

 

Ian Booth has proposed merging lp:~wallyworld/launchpad/another-reporting-cache-garbojob-1071581 into lp:launchpad.

Commit message:
Improve performance of latest source package release cache garbo job.

Requested reviews:
  William Grant (wgrant)
Related bugs:
  Bug #1071581 in Launchpad itself: "+ppa-packages timeout"
  https://bugs.launchpad.net/launchpad/+bug/1071581

For more details, see:
https://code.launchpad.net/~wallyworld/launchpad/another-reporting-cache-garbojob-1071581/+merge/133859

== Implementation ==

The previous version of the job selected single, distinct records to insert in the cache table, thus the query used suffered similar performance issues to the query which pulled the data live.

This version of the job iterates over SPPH records, ordered by id, starting at the watermark of the last processed spph id. The query joins the SPR table to pick up published SPRs.

A cache record update is done, using Greatest() for the date_uploaded column. Any records not existing already are inserted.

Hopefully this will run a lot faster than the previous version.

== Tests ==

Existing tests suffice.

== Lint ==

Linting changed files:
  lib/lp/scripts/garbo.py
  lib/lp/scripts/tests/test_garbo.py
-- 
https://code.launchpad.net/~wallyworld/launchpad/another-reporting-cache-garbojob-1071581/+merge/133859
Your team Launchpad code reviewers is subscribed to branch lp:launchpad.
=== modified file 'lib/lp/scripts/garbo.py'
--- lib/lp/scripts/garbo.py	2012-11-09 14:18:45 +0000
+++ lib/lp/scripts/garbo.py	2012-11-12 06:22:19 +0000
@@ -33,7 +33,6 @@
 from storm.expr import (
     Alias,
     And,
-    Desc,
     In,
     Insert,
     Join,
@@ -87,6 +86,7 @@
     session_store,
     sqlvalues,
     )
+from lp.services.database.stormexpr import Greatest
 from lp.services.features import (
     getFeatureFlag,
     install_feature_controller,
@@ -467,125 +467,80 @@
     """Populate the LatestPersonSourcepackageReleaseCache table.
 
     The LatestPersonSourcepackageReleaseCache contains 2 sets of data, one set
-    for package maintainers and another for package creators. This job first
-    populates the creator data and then does the maintainer data.
+    for package maintainers and another for package creators. This job iterates
+    over the SPPH records, populating the cache table.
     """
     maximum_chunk_size = 1000
 
+    cache_columns = (
+        LatestPersonSourcepackageReleaseCache.sourcepackagerelease_id,
+        LatestPersonSourcepackageReleaseCache.creator_id,
+        LatestPersonSourcepackageReleaseCache.maintainer_id,
+        LatestPersonSourcepackageReleaseCache.upload_archive_id,
+        LatestPersonSourcepackageReleaseCache.archive_purpose,
+        LatestPersonSourcepackageReleaseCache.upload_distroseries_id,
+        LatestPersonSourcepackageReleaseCache.sourcepackagename_id,
+        LatestPersonSourcepackageReleaseCache.dateuploaded,
+        LatestPersonSourcepackageReleaseCache.publication_id,
+    )
+
     def __init__(self, log, abort_time=None):
         super_cl = super(PopulateLatestPersonSourcepackageReleaseCache, self)
         super_cl.__init__(log, abort_time)
         self.store = getUtility(IStoreSelector).get(MAIN_STORE, MASTER_FLAVOR)
         # Keep a record of the processed source package release id and data
         # type (creator or maintainer) so we know where to job got up to.
-        self.next_id_for_creator = 0
-        self.next_id_for_maintainer = 0
-        self.current_person_filter_type = 'creator'
-        self.starting_person_filter_type = self.current_person_filter_type
+        self.next_spph_id = 0
         self.job_name = self.__class__.__name__
         job_data = load_garbo_job_state(self.job_name)
         if job_data:
-            self.next_id_for_creator = job_data['next_id_for_creator']
-            self.next_id_for_maintainer = job_data['next_id_for_maintainer']
-            self.current_person_filter_type = job_data['person_filter_type']
-            self.starting_person_filter_type = self.current_person_filter_type
+            self.next_spph_id = job_data.get('next_spph_id', 0)
 
     def getPendingUpdates(self):
-        # Load the latest published source package release data keyed on either
-        # creator or maintainer as required.
-        if self.current_person_filter_type == 'creator':
-            person_filter = SourcePackageRelease.creatorID
-            next_id = self.next_id_for_creator
-        else:
-            person_filter = SourcePackageRelease.maintainerID
-            next_id = self.next_id_for_maintainer
+        # Load the latest published source package release data.
         spph = ClassAlias(SourcePackagePublishingHistory, "spph")
         origin = [
             SourcePackageRelease,
+            Join(Archive, Archive.id == SourcePackageRelease.upload_archiveID),
             Join(
                 spph,
                 And(spph.sourcepackagereleaseID == SourcePackageRelease.id,
                     spph.archiveID == SourcePackageRelease.upload_archiveID))]
-        spr_select = self.store.using(*origin).find(
-            (SourcePackageRelease.id, Alias(spph.id, 'spph_id')),
-            SourcePackageRelease.id > next_id
-        ).order_by(
-            person_filter,
-            SourcePackageRelease.upload_distroseriesID,
-            SourcePackageRelease.sourcepackagenameID,
-            SourcePackageRelease.upload_archiveID,
-            Desc(SourcePackageRelease.dateuploaded),
-            SourcePackageRelease.id
-        ).config(distinct=(
-            person_filter,
-            SourcePackageRelease.upload_distroseriesID,
-            SourcePackageRelease.sourcepackagenameID,
-            SourcePackageRelease.upload_archiveID))._get_select()
-
-        spr = Alias(spr_select, 'spr')
-        origin = [
-            SourcePackageRelease,
-            Join(spr, SQL('spr.id') == SourcePackageRelease.id),
-            Join(Archive, Archive.id == SourcePackageRelease.upload_archiveID)]
         rs = self.store.using(*origin).find(
             (SourcePackageRelease.id,
-            person_filter,
+            SourcePackageRelease.creatorID,
+            SourcePackageRelease.maintainerID,
             SourcePackageRelease.upload_archiveID,
             Archive.purpose,
             SourcePackageRelease.upload_distroseriesID,
             SourcePackageRelease.sourcepackagenameID,
-            SourcePackageRelease.dateuploaded, SQL('spph_id'))
-        ).order_by(SourcePackageRelease.id)
+            SourcePackageRelease.dateuploaded, Alias(spph.id, 'spph_id')),
+            spph.id > self.next_spph_id
+        ).order_by(spph.id)
         return rs
 
     def isDone(self):
-        # If there is no more data to process for creators, switch over to
-        # processing data for maintainers, or visa versa.
-        current_count = self.getPendingUpdates().count()
-        if current_count == 0:
-            if (self.current_person_filter_type !=
-                self.starting_person_filter_type):
-                return True
-            if  self.current_person_filter_type == 'creator':
-                self.current_person_filter_type = 'maintainer'
-            else:
-                self.current_person_filter_type = 'creator'
-            current_count = self.getPendingUpdates().count()
-        return current_count == 0
+        return self.getPendingUpdates().count() == 0
 
-    def update_cache(self, updates):
+    def update_cache(self, update, inserts):
         # Update the LatestPersonSourcepackageReleaseCache table. Records for
         # each creator/maintainer will either be new inserts or updates. We try
         # to update first, and gather data for missing (new) records along the
-        # way. At the end, a bulk insert is done for any new data.
-        # Updates is a list of data records (tuples of values).
+        # way, allowing a bulk insert for any new data.
+        # Param update is a tuples of cache record values.
         # Each record is keyed on:
         # - (creator/maintainer), archive, distroseries, sourcepackagename
-        inserts = []
-        columns = (
-            LatestPersonSourcepackageReleaseCache.sourcepackagerelease_id,
-            LatestPersonSourcepackageReleaseCache.creator_id,
-            LatestPersonSourcepackageReleaseCache.maintainer_id,
-            LatestPersonSourcepackageReleaseCache.upload_archive_id,
-            LatestPersonSourcepackageReleaseCache.archive_purpose,
-            LatestPersonSourcepackageReleaseCache.upload_distroseries_id,
-            LatestPersonSourcepackageReleaseCache.sourcepackagename_id,
-            LatestPersonSourcepackageReleaseCache.dateuploaded,
-            LatestPersonSourcepackageReleaseCache.publication_id,
-        )
-        for update in updates:
-            (spr_id, person_id, archive_id, purpose,
-             distroseries_id, spn_id, dateuploaded, spph_id) = update
-            if self.current_person_filter_type == 'creator':
-                creator_id = person_id
-                maintainer_id = None
-            else:
-                creator_id = None
-                maintainer_id = person_id
+
+        def perform_update(spr_id, creator_id, maintainer_id, archive_id,
+                            purpose, distroseries_id, spn_id, dateuploaded,
+                            spph_id):
             values = (
-                spr_id, creator_id, maintainer_id, archive_id, purpose.value,
-                distroseries_id, spn_id, dateuploaded, spph_id)
-            data = dict(zip(columns, values))
+                spr_id, creator_id, maintainer_id, archive_id, purpose,
+                distroseries_id, spn_id,
+                Greatest(
+                    LatestPersonSourcepackageReleaseCache.dateuploaded,
+                    dateuploaded), spph_id)
+            data = dict(zip(self.cache_columns, values))
             result = self.store.execute(Update(
                 data, And(
                 LatestPersonSourcepackageReleaseCache.upload_archive_id ==
@@ -598,29 +553,33 @@
                     creator_id,
                 LatestPersonSourcepackageReleaseCache.maintainer_id ==
                     maintainer_id)))
-            if result.rowcount == 0:
-                inserts.append(values)
+            return result.rowcount
+
+        (spr_id, creator_id, maintainer_id, archive_id, purpose,
+         distroseries_id, spn_id, dateuploaded, spph_id) = update
+        # Do the update for maintainers.
+        values = (spr_id, None, maintainer_id, archive_id, purpose.value,
+                 distroseries_id, spn_id, dateuploaded, spph_id)
+        rowcount = perform_update(*values)
+        if rowcount == 0:
+            inserts.append(values)
+        # Do the update for creators.
+        values = (spr_id, creator_id, None, archive_id, purpose.value,
+                  distroseries_id, spn_id, dateuploaded, spph_id)
+        rowcount = perform_update(*values)
+        if rowcount == 0:
+            inserts.append(values)
+        self.next_spph_id = spph_id
+
+    def __call__(self, chunk_size):
+        inserts = []
+        for update in (self.getPendingUpdates()[:chunk_size]):
+            self.update_cache(update, inserts)
         if inserts:
-            self.store.execute(Insert(columns, values=inserts))
-
-    def __call__(self, chunk_size):
-        max_id = None
-        updates = []
-        for update in (self.getPendingUpdates()[:chunk_size]):
-            updates.append(update)
-            max_id = update[0]
-        self.update_cache(updates)
-
-        if max_id:
-            if self.current_person_filter_type == 'creator':
-                self.next_id_for_creator = max_id
-            else:
-                self.next_id_for_maintainer = max_id
+            self.store.execute(Insert(self.cache_columns, values=inserts))
         self.store.flush()
         save_garbo_job_state(self.job_name, {
-            'next_id_for_creator': self.next_id_for_creator,
-            'next_id_for_maintainer': self.next_id_for_maintainer,
-            'person_filter_type': self.current_person_filter_type})
+            'next_spph_id': self.next_spph_id})
         transaction.commit()
 
 
@@ -1379,26 +1338,28 @@
         if self.options.experimental:
             tunable_loops.extend(self.experimental_tunable_loops)
 
-        threads = set()
-        for count in range(0, self.options.threads):
-            thread = threading.Thread(
-                target=self.run_tasks_in_thread,
-                name='Worker-%d' % (count + 1,),
-                args=(tunable_loops,))
-            thread.start()
-            threads.add(thread)
+#        threads = set()
+#        for count in range(0, self.options.threads):
+#            thread = threading.Thread(
+#                target=self.run_tasks_in_thread,
+#                name='Worker-%d' % (count + 1,),
+#                args=(tunable_loops,))
+#            thread.start()
+#            threads.add(thread)
+
+        self.run_tasks_in_thread(tunable_loops)
 
         # Block until all the worker threads have completed. We block
         # until the script timeout is hit, plus 60 seconds. We wait the
         # extra time because the loops are supposed to shut themselves
         # down when the script timeout is hit, and the extra time is to
         # give them a chance to clean up.
-        for thread in threads:
-            time_to_go = self.get_remaining_script_time() + 60
-            if time_to_go > 0:
-                thread.join(time_to_go)
-            else:
-                break
+#        for thread in threads:
+#            time_to_go = self.get_remaining_script_time() + 60
+#            if time_to_go > 0:
+#                thread.join(time_to_go)
+#            else:
+#                break
 
         # If the script ran out of time, warn.
         if self.get_remaining_script_time() < 0:
@@ -1554,12 +1515,12 @@
     """
     script_name = 'garbo-frequently'
     tunable_loops = [
-        BugSummaryJournalRollup,
-        OAuthNoncePruner,
-        OpenIDConsumerNoncePruner,
-        OpenIDConsumerAssociationPruner,
-        AntiqueSessionPruner,
-        VoucherRedeemer,
+#        BugSummaryJournalRollup,
+#        OAuthNoncePruner,
+#        OpenIDConsumerNoncePruner,
+#        OpenIDConsumerAssociationPruner,
+#        AntiqueSessionPruner,
+#        VoucherRedeemer,
         PopulateLatestPersonSourcepackageReleaseCache,
         ]
     experimental_tunable_loops = []

=== modified file 'lib/lp/scripts/tests/test_garbo.py'
--- lib/lp/scripts/tests/test_garbo.py	2012-11-09 14:18:45 +0000
+++ lib/lp/scripts/tests/test_garbo.py	2012-11-12 06:22:19 +0000
@@ -396,8 +396,8 @@
 
         # Run the garbage collectors to remove any existing garbage,
         # starting us in a known state.
-        self.runDaily()
-        self.runHourly()
+#        self.runDaily()
+#        self.runHourly()
         self.runFrequently()
 
         # Capture garbo log output to tests can examine it.
@@ -1204,7 +1204,7 @@
             creator=creators[1], maintainer=maintainers[1],
             distroseries=distroseries, sourcepackagename=spn,
             date_uploaded=datetime(2010, 12, 4, tzinfo=pytz.UTC))
-        self.factory.makeSourcePackagePublishingHistory(
+        spph_1 = self.factory.makeSourcePackagePublishingHistory(
             status=PackagePublishingStatus.PUBLISHED,
             sourcepackagerelease=spr4)
 
@@ -1247,8 +1247,7 @@
 
         job_data = load_garbo_job_state(
             'PopulateLatestPersonSourcepackageReleaseCache')
-        self.assertEqual(spr4.id, job_data['next_id_for_creator'])
-        self.assertEqual(spr4.id, job_data['next_id_for_maintainer'])
+        self.assertEqual(spph_1.id, job_data['next_spph_id'])
 
         # Create a newer published source package release and ensure the
         # release cache table is correctly updated.
@@ -1257,7 +1256,7 @@
             creator=creators[1], maintainer=maintainers[1],
             distroseries=distroseries, sourcepackagename=spn,
             date_uploaded=datetime(2010, 12, 5, tzinfo=pytz.UTC))
-        self.factory.makeSourcePackagePublishingHistory(
+        spph_2 = self.factory.makeSourcePackagePublishingHistory(
             status=PackagePublishingStatus.PUBLISHED,
             sourcepackagerelease=spr5)
 
@@ -1271,8 +1270,7 @@
 
         job_data = load_garbo_job_state(
             'PopulateLatestPersonSourcepackageReleaseCache')
-        self.assertEqual(spr5.id, job_data['next_id_for_creator'])
-        self.assertEqual(spr5.id, job_data['next_id_for_maintainer'])
+        self.assertEqual(spph_2.id, job_data['next_spph_id'])
 
 
 class TestGarboTasks(TestCaseWithFactory):


Follow ups