← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/launchpad/optimise-publish-a into lp:launchpad

 

Colin Watson has proposed merging lp:~cjwatson/launchpad/optimise-publish-a into lp:launchpad.

Commit message:
Optimise phase A of the publisher by searching for pending source and binary publications in one query each, rather than one per active distro(arch)series and pocket.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/optimise-publish-a/+merge/226312

Optimise phase A of the publisher by searching for pending source and binary publications in one query each, rather than one per active distro(arch)series and pocket.

Logs will come out in a slightly different order.  I tried to order/group things so that it's not too much different, but this now publishes all sources followed by all binaries.  Hopefully that's OK.  I also made the logging a bit more compact and readable along the way.

I have not tested this with a real publisher.  We'll want to give it a spin on dogfood.
-- 
https://code.launchpad.net/~cjwatson/launchpad/optimise-publish-a/+merge/226312
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/optimise-publish-a into lp:launchpad.
=== modified file 'lib/lp/archivepublisher/publishing.py'
--- lib/lp/archivepublisher/publishing.py	2014-07-09 12:19:54 +0000
+++ lib/lp/archivepublisher/publishing.py	2014-07-10 14:58:38 +0000
@@ -13,7 +13,9 @@
 from datetime import datetime
 import errno
 import hashlib
+from itertools import groupby
 import logging
+from operator import attrgetter
 import os
 import shutil
 
@@ -21,8 +23,11 @@
     _multivalued,
     Release,
     )
-from storm.expr import Desc
-from storm.store import EmptyResultSet
+from storm.expr import (
+    Desc,
+    Join,
+    Or,
+    )
 from zope.component import getUtility
 
 from lp.app.interfaces.launchpad import ILaunchpadCelebrities
@@ -48,6 +53,10 @@
     pocketsuffix,
     )
 from lp.registry.interfaces.series import SeriesStatus
+from lp.registry.model.distroseries import (
+    ACTIVE_UNRELEASED_STATUSES,
+    DistroSeries,
+    )
 from lp.services.database.constants import UTC_NOW
 from lp.services.database.interfaces import IStore
 from lp.services.librarian.client import LibrarianClient
@@ -295,44 +304,19 @@
             # provide custom builds for users who haven't upgraded yet.
             return self.distro.series
 
-    def checkLegalPocket(self, distroseries, publication, is_careful):
+    def checkLegalPocket(self, distroseries, pocket, is_careful):
         """Check if the publication can happen in the archive."""
         # 'careful' mode re-publishes everything:
-        if is_careful:
-            return True
-
-        if not publication.archive.canModifySuite(
-                distroseries, publication.pocket):
-            self.log.error(
-                "Tried to publish %s (%s) into the %s pocket on series %s "
-                "(%s), skipping" % (
-                    publication.displayname, publication.id,
-                    publication.pocket, distroseries.displayname,
-                    distroseries.status.name))
-            return False
-
-        return True
-
-    def getPendingSourcePublications(self, distroseries, pocket, is_careful):
+        return is_careful or self.archive.canModifySuite(distroseries, pocket)
+
+    def getPendingSourcePublications(self, is_careful):
         """Return the specific group of source records to be published.
 
-        If the distroseries is already released, it automatically refuses
-        to publish records to the RELEASE pocket.
+        Exclude publications in the RELEASE pocket unless the distroseries
+        is unreleased or the archive allows updates to the RELEASE pocket
+        for stable distroseries (e.g. PPAs or partner).
         """
-        # Exclude RELEASE pocket if the distroseries was already released,
-        # since it should not change for main archive.
-        # We allow RELEASE publishing for PPAs.
-        # We also allow RELEASE publishing for partner.
-        if (pocket == PackagePublishingPocket.RELEASE and
-            not distroseries.isUnstable() and
-            not self.archive.allowUpdatesToReleasePocket()):
-            return EmptyResultSet()
-
-        conditions = [
-            SourcePackagePublishingHistory.distroseries == distroseries,
-            SourcePackagePublishingHistory.pocket == pocket,
-            SourcePackagePublishingHistory.archive == self.archive,
-            ]
+        conditions = [SourcePackagePublishingHistory.archive == self.archive]
 
         # Careful publishing should include all PUBLISHED rows, normal run
         # only includes PENDING ones.
@@ -342,49 +326,70 @@
         conditions.append(
             SourcePackagePublishingHistory.status.is_in(statuses))
 
+        if not self.archive.allowUpdatesToReleasePocket():
+            conditions.extend([
+                SourcePackagePublishingHistory.distroseriesID ==
+                    DistroSeries.id,
+                Or(
+                    SourcePackagePublishingHistory.pocket !=
+                        PackagePublishingPocket.RELEASE,
+                    DistroSeries.status.is_in(ACTIVE_UNRELEASED_STATUSES)),
+                ])
+
         publications = IStore(SourcePackagePublishingHistory).find(
             SourcePackagePublishingHistory, *conditions)
-        return publications.order_by(Desc(SourcePackagePublishingHistory.id))
-
-    def publishSources(self, distroseries, pocket, is_careful=False):
-        """Publish sources for a given distroseries and pocket.
+        return publications.order_by(
+            SourcePackagePublishingHistory.distroseriesID,
+            SourcePackagePublishingHistory.pocket,
+            Desc(SourcePackagePublishingHistory.id))
+
+    def publishSources(self, distroseries, pocket, spphs):
+        """Publish sources for a given distroseries and pocket."""
+        self.log.debug(
+            "Publishing pending sources for %s" %
+            distroseries.getSuite(pocket))
+        for spph in spphs:
+            spph.publish(self._diskpool, self.log)
+
+    def findAndPublishSources(self, is_careful=False):
+        """Search for and publish all pending sources.
 
         :param is_careful: If True, republish all published records (system
             will DTRT checking the hash of all published files).
 
         Consider records returned by getPendingSourcePublications.
         """
-        self.log.debug("Publishing %s-%s" % (distroseries.title, pocket.name))
-        self.log.debug("Attempting to publish pending sources.")
-
         dirty_pockets = set()
-        for spph in self.getPendingSourcePublications(
-                distroseries, pocket, is_careful):
-            if not self.checkLegalPocket(distroseries, spph, is_careful):
-                continue
-            spph.publish(self._diskpool, self.log)
-            dirty_pockets.add((distroseries.name, spph.pocket))
+        all_spphs = self.getPendingSourcePublications(is_careful)
+        for (distroseries, pocket), spphs in groupby(
+                all_spphs, attrgetter("distroseries", "pocket")):
+            if not self.isAllowed(distroseries, pocket):
+                self.log.debug("* Skipping %s", distroseries.getSuite(pocket))
+            elif not self.checkLegalPocket(distroseries, pocket, is_careful):
+                for spph in spphs:
+                    self.log.error(
+                        "Tried to publish %s (%s) into the %s pocket on "
+                        "series %s (%s), skipping" % (
+                            spph.displayname, spph.id, pocket,
+                            distroseries.displayname,
+                            distroseries.status.name))
+            else:
+                self.publishSources(distroseries, pocket, spphs)
+                dirty_pockets.add((distroseries.name, pocket))
         return dirty_pockets
 
-    def getPendingBinaryPublications(self, distroarchseries, pocket,
-                                     is_careful):
+    def getPendingBinaryPublications(self, is_careful):
         """Return the specific group of binary records to be published.
 
-        If the distroseries is already released, it automatically refuses
-        to publish records to the RELEASE pocket.
+        Exclude publications in the RELEASE pocket unless the distroseries
+        is unreleased or the archive allows updates to the RELEASE pocket
+        for stable distroseries (e.g. PPAs or partner).
         """
-        # Exclude RELEASE pocket if the distroseries was already released,
-        # since it should not change, unless the archive allows it.
-        if (pocket == PackagePublishingPocket.RELEASE and
-            not distroarchseries.distroseries.isUnstable() and
-            not self.archive.allowUpdatesToReleasePocket()):
-            return EmptyResultSet()
-
         conditions = [
-            BinaryPackagePublishingHistory.distroarchseries ==
-                distroarchseries,
-            BinaryPackagePublishingHistory.pocket == pocket,
             BinaryPackagePublishingHistory.archive == self.archive,
+            BinaryPackagePublishingHistory.distroarchseriesID ==
+                DistroArchSeries.id,
+            DistroArchSeries.distroseriesID == DistroSeries.id,
             ]
 
         statuses = [PackagePublishingStatus.PENDING]
@@ -393,29 +398,55 @@
         conditions.append(
             BinaryPackagePublishingHistory.status.is_in(statuses))
 
+        if not self.archive.allowUpdatesToReleasePocket():
+            conditions.append(Or(
+                BinaryPackagePublishingHistory.pocket !=
+                    PackagePublishingPocket.RELEASE,
+                DistroSeries.status.is_in(ACTIVE_UNRELEASED_STATUSES)))
+
         publications = IStore(BinaryPackagePublishingHistory).find(
             BinaryPackagePublishingHistory, *conditions)
-        return publications.order_by(Desc(BinaryPackagePublishingHistory.id))
-
-    def publishBinaries(self, distroarchseries, pocket, is_careful=False):
-        """Publish binaries for a given distroseries and pocket.
+        return publications.order_by(
+            DistroSeries.id,
+            BinaryPackagePublishingHistory.pocket,
+            DistroArchSeries.architecturetag,
+            Desc(BinaryPackagePublishingHistory.id))
+
+    def publishBinaries(self, distroarchseries, pocket, bpphs):
+        """Publish binaries for a given distroarchseries and pocket."""
+        self.log.debug(
+            "Publishing pending binaries for %s/%s" % (
+                distroarchseries.distroseries.getSuite(pocket),
+                distroarchseries.architecturetag))
+        for bpph in bpphs:
+            bpph.publish(self._diskpool, self.log)
+
+    def findAndPublishBinaries(self, is_careful=False):
+        """Search for and publish all pending binaries.
 
         :param is_careful: If True, republish all published records (system
             will DTRT checking the hash of all published files).
 
         Consider records returned by getPendingBinaryPublications.
         """
-        self.log.debug("Attempting to publish pending binaries for %s"
-              % distroarchseries.architecturetag)
-
-        distroseries = distroarchseries.distroseries
         dirty_pockets = set()
-        for bpph in self.getPendingBinaryPublications(
-                distroarchseries, pocket, is_careful):
-            if not self.checkLegalPocket(distroseries, bpph, is_careful):
-                continue
-            bpph.publish(self._diskpool, self.log)
-            dirty_pockets.add((distroseries.name, bpph.pocket))
+        all_bpphs = self.getPendingBinaryPublications(is_careful)
+        for (distroarchseries, pocket), bpphs in groupby(
+                all_bpphs, attrgetter("distroarchseries", "pocket")):
+            distroseries = distroarchseries.distroseries
+            if not self.isAllowed(distroseries, pocket):
+                pass  # Already logged by publishSources.
+            elif not self.checkLegalPocket(distroseries, pocket, is_careful):
+                for bpph in bpphs:
+                    self.log.error(
+                        "Tried to publish %s (%s) into the %s pocket on "
+                        "series %s (%s), skipping" % (
+                            bpph.displayname, bpph.id, pocket,
+                            distroseries.displayname,
+                            distroseries.status.name))
+            else:
+                self.publishBinaries(distroarchseries, pocket, bpphs)
+                dirty_pockets.add((distroseries.name, pocket))
         return dirty_pockets
 
     def A_publish(self, force_publishing):
@@ -428,18 +459,10 @@
         """
         self.log.debug("* Step A: Publishing packages")
 
-        for distroseries in self.consider_series:
-            for pocket in self.archive.getPockets():
-                if self.isAllowed(distroseries, pocket):
-                    self.dirty_pockets.update(self.publishSources(
-                        distroseries, pocket, is_careful=force_publishing))
-                    for distroarchseries in distroseries.architectures:
-                        self.dirty_pockets.update(self.publishBinaries(
-                            distroarchseries, pocket,
-                            is_careful=force_publishing))
-                else:
-                    self.log.debug(
-                        "* Skipping %s/%s", distroseries.name, pocket.name)
+        self.dirty_pockets.update(
+            self.findAndPublishSources(is_careful=force_publishing))
+        self.dirty_pockets.update(
+            self.findAndPublishBinaries(is_careful=force_publishing))
 
     def A2_markPocketsWithDeletionsDirty(self):
         """An intermediate step in publishing to detect deleted packages.

=== modified file 'lib/lp/archivepublisher/tests/test_publisher.py'
--- lib/lp/archivepublisher/tests/test_publisher.py	2014-07-09 01:18:27 +0000
+++ lib/lp/archivepublisher/tests/test_publisher.py	2014-07-10 14:58:38 +0000
@@ -171,12 +171,10 @@
     def checkLegalPocket(self, status, pocket):
         distroseries = self.factory.makeDistroSeries(
             distribution=self.ubuntutest, status=status)
-        spph = self.factory.makeSourcePackagePublishingHistory(
-            distroseries=distroseries, pocket=pocket)
         publisher = Publisher(
             self.logger, self.config, self.disk_pool,
             distroseries.main_archive)
-        return publisher.checkLegalPocket(distroseries, spph, False)
+        return publisher.checkLegalPocket(distroseries, pocket, False)
 
     def test_checkLegalPocket_allows_unstable_release(self):
         """Publishing to RELEASE in a DEVELOPMENT series is allowed."""
@@ -218,11 +216,8 @@
     def _publish(self, pocket, is_careful=False):
         """Publish the test IDistroSeries and its IDistroArchSeries."""
         self._ensurePublisher()
-        self.publisher.publishSources(
-            self.breezy_autotest, pocket, is_careful=is_careful)
-        for distroarchseries in self.breezy_autotest.architectures:
-            self.publisher.publishBinaries(
-                distroarchseries, pocket, is_careful=is_careful)
+        self.publisher.findAndPublishSources(is_careful=is_careful)
+        self.publisher.findAndPublishBinaries(is_careful=is_careful)
         self.layer.txn.commit()
 
     def checkPublicationsAreConsidered(self, pocket):
@@ -266,9 +261,13 @@
         """Check the results of an IDistroSeries publishing lookup."""
         self._ensurePublisher()
         pub_records = self.publisher.getPendingSourcePublications(
-            self.breezy_autotest, pocket, is_careful=is_careful)
+            is_careful=is_careful)
+        pub_records = [
+            pub for pub in pub_records
+                if pub.distroseries == self.breezy_autotest and
+                   pub.pocket == pocket]
 
-        self.assertEqual(pub_records.count(), len(expected_result))
+        self.assertEqual(len(expected_result), len(pub_records))
         self.assertEqual(
             [item.id for item in expected_result],
             [pub.id for pub in pub_records])
@@ -278,9 +277,13 @@
         """Check the results of an IDistroArchSeries publishing lookup."""
         self._ensurePublisher()
         pub_records = self.publisher.getPendingBinaryPublications(
-            self.breezy_autotest_i386, pocket, is_careful=is_careful)
+            is_careful=is_careful)
+        pub_records = [
+            pub for pub in pub_records
+                if pub.distroarchseries == self.breezy_autotest_i386 and
+                   pub.pocket == pocket]
 
-        self.assertEqual(pub_records.count(), len(expected_result))
+        self.assertEqual(len(expected_result), len(pub_records))
         self.assertEqual(
             [item.id for item in expected_result],
             [pub.id for pub in pub_records])
@@ -1031,7 +1034,9 @@
         # Remove security proxy so that the publisher can call our fake
         # method.
         publisher.distro = removeSecurityProxy(publisher.distro)
-        publisher.distro['hoary-test'].status = SeriesStatus.OBSOLETE
+        pub_source = self.getPubSource(distroseries=self.breezy_autotest)
+        self.getPubBinaries(
+            distroseries=self.breezy_autotest, pub_source=pub_source)
 
         for status in (SeriesStatus.OBSOLETE, SeriesStatus.FUTURE):
             naked_breezy_autotest = publisher.distro['breezy-autotest']
@@ -1055,6 +1060,11 @@
         # Remove security proxy so that the publisher can call our fake
         # method.
         publisher.distro = removeSecurityProxy(publisher.distro)
+        pub_source = self.getPubSource(
+            distroseries=self.breezy_autotest, archive=test_archive)
+        self.getPubBinaries(
+            distroseries=self.breezy_autotest, archive=test_archive,
+            pub_source=pub_source)
 
         for status in (SeriesStatus.OBSOLETE, SeriesStatus.FUTURE):
             naked_breezy_autotest = publisher.distro['breezy-autotest']
@@ -1064,12 +1074,13 @@
 
             publisher.A_publish(False)
 
-            self.assertIn(
-                (naked_breezy_autotest, RELEASE),
-                publisher.publishSources.extract_args())
-            self.assertIn(
-                (naked_breezy_autotest.architectures[0], RELEASE),
-                publisher.publishBinaries.extract_args())
+            source_args = [
+                args[:2] for args in publisher.publishSources.extract_args()]
+            self.assertIn((naked_breezy_autotest, RELEASE), source_args)
+            binary_args = [
+                args[:2] for args in publisher.publishBinaries.extract_args()]
+            self.assertIn(
+                (naked_breezy_autotest.architectures[0], RELEASE), binary_args)
 
     def testPublisherBuilderFunctions(self):
         """Publisher can be initialized via provided helper function.

=== modified file 'lib/lp/registry/model/distroseries.py'
--- lib/lp/registry/model/distroseries.py	2014-07-08 22:31:49 +0000
+++ lib/lp/registry/model/distroseries.py	2014-07-10 14:58:38 +0000
@@ -6,6 +6,8 @@
 __metaclass__ = type
 
 __all__ = [
+    'ACTIVE_RELEASED_STATUSES',
+    'ACTIVE_UNRELEASED_STATUSES',
     'DistroSeries',
     'DistroSeriesSet',
     ]
@@ -191,6 +193,19 @@
     )
 
 
+ACTIVE_RELEASED_STATUSES = [
+    SeriesStatus.CURRENT,
+    SeriesStatus.SUPPORTED,
+    ]
+
+
+ACTIVE_UNRELEASED_STATUSES = [
+    SeriesStatus.EXPERIMENTAL,
+    SeriesStatus.DEVELOPMENT,
+    SeriesStatus.FROZEN,
+    ]
+
+
 class DistroSeries(SQLBase, BugTargetBase, HasSpecificationsMixin,
                    HasTranslationImportsMixin, HasTranslationTemplatesMixin,
                    HasMilestonesMixin, SeriesMixin,
@@ -945,11 +960,7 @@
 
     def isUnstable(self):
         """See `IDistroSeries`."""
-        return self.status in [
-            SeriesStatus.FROZEN,
-            SeriesStatus.DEVELOPMENT,
-            SeriesStatus.EXPERIMENTAL,
-        ]
+        return self.status in ACTIVE_UNRELEASED_STATUSES
 
     def _getAllSources(self):
         """Get all sources ever published in this series' main archives."""
@@ -1535,14 +1546,11 @@
             if isreleased:
                 # The query is filtered on released releases.
                 where_clause += "releasestatus in (%s, %s)" % sqlvalues(
-                    SeriesStatus.CURRENT,
-                    SeriesStatus.SUPPORTED)
+                    *ACTIVE_RELEASED_STATUSES)
             else:
                 # The query is filtered on unreleased releases.
                 where_clause += "releasestatus in (%s, %s, %s)" % sqlvalues(
-                    SeriesStatus.EXPERIMENTAL,
-                    SeriesStatus.DEVELOPMENT,
-                    SeriesStatus.FROZEN)
+                    *ACTIVE_UNRELEASED_STATUSES)
         if orderBy is not None:
             return DistroSeries.select(where_clause, orderBy=orderBy)
         else:


Follow ups