launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #17138
[Merge] lp:~cjwatson/launchpad/optimise-publish-a into lp:launchpad
Colin Watson has proposed merging lp:~cjwatson/launchpad/optimise-publish-a into lp:launchpad.
Commit message:
Optimise phase A of the publisher by searching for pending source and binary publications in one query each, rather than one per active distro(arch)series and pocket.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/optimise-publish-a/+merge/226312
Optimise phase A of the publisher by searching for pending source and binary publications in one query each, rather than one per active distro(arch)series and pocket.
Logs will come out in a slightly different order. I tried to order/group things so that it's not too much different, but this now publishes all sources followed by all binaries. Hopefully that's OK. I also made the logging a bit more compact and readable along the way.
I have not tested this with a real publisher. We'll want to give it a spin on dogfood.
--
https://code.launchpad.net/~cjwatson/launchpad/optimise-publish-a/+merge/226312
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/optimise-publish-a into lp:launchpad.
=== modified file 'lib/lp/archivepublisher/publishing.py'
--- lib/lp/archivepublisher/publishing.py 2014-07-09 12:19:54 +0000
+++ lib/lp/archivepublisher/publishing.py 2014-07-10 14:58:38 +0000
@@ -13,7 +13,9 @@
from datetime import datetime
import errno
import hashlib
+from itertools import groupby
import logging
+from operator import attrgetter
import os
import shutil
@@ -21,8 +23,11 @@
_multivalued,
Release,
)
-from storm.expr import Desc
-from storm.store import EmptyResultSet
+from storm.expr import (
+ Desc,
+ Join,
+ Or,
+ )
from zope.component import getUtility
from lp.app.interfaces.launchpad import ILaunchpadCelebrities
@@ -48,6 +53,10 @@
pocketsuffix,
)
from lp.registry.interfaces.series import SeriesStatus
+from lp.registry.model.distroseries import (
+ ACTIVE_UNRELEASED_STATUSES,
+ DistroSeries,
+ )
from lp.services.database.constants import UTC_NOW
from lp.services.database.interfaces import IStore
from lp.services.librarian.client import LibrarianClient
@@ -295,44 +304,19 @@
# provide custom builds for users who haven't upgraded yet.
return self.distro.series
- def checkLegalPocket(self, distroseries, publication, is_careful):
+ def checkLegalPocket(self, distroseries, pocket, is_careful):
"""Check if the publication can happen in the archive."""
# 'careful' mode re-publishes everything:
- if is_careful:
- return True
-
- if not publication.archive.canModifySuite(
- distroseries, publication.pocket):
- self.log.error(
- "Tried to publish %s (%s) into the %s pocket on series %s "
- "(%s), skipping" % (
- publication.displayname, publication.id,
- publication.pocket, distroseries.displayname,
- distroseries.status.name))
- return False
-
- return True
-
- def getPendingSourcePublications(self, distroseries, pocket, is_careful):
+ return is_careful or self.archive.canModifySuite(distroseries, pocket)
+
+ def getPendingSourcePublications(self, is_careful):
"""Return the specific group of source records to be published.
- If the distroseries is already released, it automatically refuses
- to publish records to the RELEASE pocket.
+ Exclude publications in the RELEASE pocket unless the distroseries
+ is unreleased or the archive allows updates to the RELEASE pocket
+ for stable distroseries (e.g. PPAs or partner).
"""
- # Exclude RELEASE pocket if the distroseries was already released,
- # since it should not change for main archive.
- # We allow RELEASE publishing for PPAs.
- # We also allow RELEASE publishing for partner.
- if (pocket == PackagePublishingPocket.RELEASE and
- not distroseries.isUnstable() and
- not self.archive.allowUpdatesToReleasePocket()):
- return EmptyResultSet()
-
- conditions = [
- SourcePackagePublishingHistory.distroseries == distroseries,
- SourcePackagePublishingHistory.pocket == pocket,
- SourcePackagePublishingHistory.archive == self.archive,
- ]
+ conditions = [SourcePackagePublishingHistory.archive == self.archive]
# Careful publishing should include all PUBLISHED rows, normal run
# only includes PENDING ones.
@@ -342,49 +326,70 @@
conditions.append(
SourcePackagePublishingHistory.status.is_in(statuses))
+ if not self.archive.allowUpdatesToReleasePocket():
+ conditions.extend([
+ SourcePackagePublishingHistory.distroseriesID ==
+ DistroSeries.id,
+ Or(
+ SourcePackagePublishingHistory.pocket !=
+ PackagePublishingPocket.RELEASE,
+ DistroSeries.status.is_in(ACTIVE_UNRELEASED_STATUSES)),
+ ])
+
publications = IStore(SourcePackagePublishingHistory).find(
SourcePackagePublishingHistory, *conditions)
- return publications.order_by(Desc(SourcePackagePublishingHistory.id))
-
- def publishSources(self, distroseries, pocket, is_careful=False):
- """Publish sources for a given distroseries and pocket.
+ return publications.order_by(
+ SourcePackagePublishingHistory.distroseriesID,
+ SourcePackagePublishingHistory.pocket,
+ Desc(SourcePackagePublishingHistory.id))
+
+ def publishSources(self, distroseries, pocket, spphs):
+ """Publish sources for a given distroseries and pocket."""
+ self.log.debug(
+ "Publishing pending sources for %s" %
+ distroseries.getSuite(pocket))
+ for spph in spphs:
+ spph.publish(self._diskpool, self.log)
+
+ def findAndPublishSources(self, is_careful=False):
+ """Search for and publish all pending sources.
:param is_careful: If True, republish all published records (system
will DTRT checking the hash of all published files).
Consider records returned by getPendingSourcePublications.
"""
- self.log.debug("Publishing %s-%s" % (distroseries.title, pocket.name))
- self.log.debug("Attempting to publish pending sources.")
-
dirty_pockets = set()
- for spph in self.getPendingSourcePublications(
- distroseries, pocket, is_careful):
- if not self.checkLegalPocket(distroseries, spph, is_careful):
- continue
- spph.publish(self._diskpool, self.log)
- dirty_pockets.add((distroseries.name, spph.pocket))
+ all_spphs = self.getPendingSourcePublications(is_careful)
+ for (distroseries, pocket), spphs in groupby(
+ all_spphs, attrgetter("distroseries", "pocket")):
+ if not self.isAllowed(distroseries, pocket):
+ self.log.debug("* Skipping %s", distroseries.getSuite(pocket))
+ elif not self.checkLegalPocket(distroseries, pocket, is_careful):
+ for spph in spphs:
+ self.log.error(
+ "Tried to publish %s (%s) into the %s pocket on "
+ "series %s (%s), skipping" % (
+ spph.displayname, spph.id, pocket,
+ distroseries.displayname,
+ distroseries.status.name))
+ else:
+ self.publishSources(distroseries, pocket, spphs)
+ dirty_pockets.add((distroseries.name, pocket))
return dirty_pockets
- def getPendingBinaryPublications(self, distroarchseries, pocket,
- is_careful):
+ def getPendingBinaryPublications(self, is_careful):
"""Return the specific group of binary records to be published.
- If the distroseries is already released, it automatically refuses
- to publish records to the RELEASE pocket.
+ Exclude publications in the RELEASE pocket unless the distroseries
+ is unreleased or the archive allows updates to the RELEASE pocket
+ for stable distroseries (e.g. PPAs or partner).
"""
- # Exclude RELEASE pocket if the distroseries was already released,
- # since it should not change, unless the archive allows it.
- if (pocket == PackagePublishingPocket.RELEASE and
- not distroarchseries.distroseries.isUnstable() and
- not self.archive.allowUpdatesToReleasePocket()):
- return EmptyResultSet()
-
conditions = [
- BinaryPackagePublishingHistory.distroarchseries ==
- distroarchseries,
- BinaryPackagePublishingHistory.pocket == pocket,
BinaryPackagePublishingHistory.archive == self.archive,
+ BinaryPackagePublishingHistory.distroarchseriesID ==
+ DistroArchSeries.id,
+ DistroArchSeries.distroseriesID == DistroSeries.id,
]
statuses = [PackagePublishingStatus.PENDING]
@@ -393,29 +398,55 @@
conditions.append(
BinaryPackagePublishingHistory.status.is_in(statuses))
+ if not self.archive.allowUpdatesToReleasePocket():
+ conditions.append(Or(
+ BinaryPackagePublishingHistory.pocket !=
+ PackagePublishingPocket.RELEASE,
+ DistroSeries.status.is_in(ACTIVE_UNRELEASED_STATUSES)))
+
publications = IStore(BinaryPackagePublishingHistory).find(
BinaryPackagePublishingHistory, *conditions)
- return publications.order_by(Desc(BinaryPackagePublishingHistory.id))
-
- def publishBinaries(self, distroarchseries, pocket, is_careful=False):
- """Publish binaries for a given distroseries and pocket.
+ return publications.order_by(
+ DistroSeries.id,
+ BinaryPackagePublishingHistory.pocket,
+ DistroArchSeries.architecturetag,
+ Desc(BinaryPackagePublishingHistory.id))
+
+ def publishBinaries(self, distroarchseries, pocket, bpphs):
+ """Publish binaries for a given distroarchseries and pocket."""
+ self.log.debug(
+ "Publishing pending binaries for %s/%s" % (
+ distroarchseries.distroseries.getSuite(pocket),
+ distroarchseries.architecturetag))
+ for bpph in bpphs:
+ bpph.publish(self._diskpool, self.log)
+
+ def findAndPublishBinaries(self, is_careful=False):
+ """Search for and publish all pending binaries.
:param is_careful: If True, republish all published records (system
will DTRT checking the hash of all published files).
Consider records returned by getPendingBinaryPublications.
"""
- self.log.debug("Attempting to publish pending binaries for %s"
- % distroarchseries.architecturetag)
-
- distroseries = distroarchseries.distroseries
dirty_pockets = set()
- for bpph in self.getPendingBinaryPublications(
- distroarchseries, pocket, is_careful):
- if not self.checkLegalPocket(distroseries, bpph, is_careful):
- continue
- bpph.publish(self._diskpool, self.log)
- dirty_pockets.add((distroseries.name, bpph.pocket))
+ all_bpphs = self.getPendingBinaryPublications(is_careful)
+ for (distroarchseries, pocket), bpphs in groupby(
+ all_bpphs, attrgetter("distroarchseries", "pocket")):
+ distroseries = distroarchseries.distroseries
+ if not self.isAllowed(distroseries, pocket):
+ pass # Already logged by publishSources.
+ elif not self.checkLegalPocket(distroseries, pocket, is_careful):
+ for bpph in bpphs:
+ self.log.error(
+ "Tried to publish %s (%s) into the %s pocket on "
+ "series %s (%s), skipping" % (
+ bpph.displayname, bpph.id, pocket,
+ distroseries.displayname,
+ distroseries.status.name))
+ else:
+ self.publishBinaries(distroarchseries, pocket, bpphs)
+ dirty_pockets.add((distroseries.name, pocket))
return dirty_pockets
def A_publish(self, force_publishing):
@@ -428,18 +459,10 @@
"""
self.log.debug("* Step A: Publishing packages")
- for distroseries in self.consider_series:
- for pocket in self.archive.getPockets():
- if self.isAllowed(distroseries, pocket):
- self.dirty_pockets.update(self.publishSources(
- distroseries, pocket, is_careful=force_publishing))
- for distroarchseries in distroseries.architectures:
- self.dirty_pockets.update(self.publishBinaries(
- distroarchseries, pocket,
- is_careful=force_publishing))
- else:
- self.log.debug(
- "* Skipping %s/%s", distroseries.name, pocket.name)
+ self.dirty_pockets.update(
+ self.findAndPublishSources(is_careful=force_publishing))
+ self.dirty_pockets.update(
+ self.findAndPublishBinaries(is_careful=force_publishing))
def A2_markPocketsWithDeletionsDirty(self):
"""An intermediate step in publishing to detect deleted packages.
=== modified file 'lib/lp/archivepublisher/tests/test_publisher.py'
--- lib/lp/archivepublisher/tests/test_publisher.py 2014-07-09 01:18:27 +0000
+++ lib/lp/archivepublisher/tests/test_publisher.py 2014-07-10 14:58:38 +0000
@@ -171,12 +171,10 @@
def checkLegalPocket(self, status, pocket):
distroseries = self.factory.makeDistroSeries(
distribution=self.ubuntutest, status=status)
- spph = self.factory.makeSourcePackagePublishingHistory(
- distroseries=distroseries, pocket=pocket)
publisher = Publisher(
self.logger, self.config, self.disk_pool,
distroseries.main_archive)
- return publisher.checkLegalPocket(distroseries, spph, False)
+ return publisher.checkLegalPocket(distroseries, pocket, False)
def test_checkLegalPocket_allows_unstable_release(self):
"""Publishing to RELEASE in a DEVELOPMENT series is allowed."""
@@ -218,11 +216,8 @@
def _publish(self, pocket, is_careful=False):
"""Publish the test IDistroSeries and its IDistroArchSeries."""
self._ensurePublisher()
- self.publisher.publishSources(
- self.breezy_autotest, pocket, is_careful=is_careful)
- for distroarchseries in self.breezy_autotest.architectures:
- self.publisher.publishBinaries(
- distroarchseries, pocket, is_careful=is_careful)
+ self.publisher.findAndPublishSources(is_careful=is_careful)
+ self.publisher.findAndPublishBinaries(is_careful=is_careful)
self.layer.txn.commit()
def checkPublicationsAreConsidered(self, pocket):
@@ -266,9 +261,13 @@
"""Check the results of an IDistroSeries publishing lookup."""
self._ensurePublisher()
pub_records = self.publisher.getPendingSourcePublications(
- self.breezy_autotest, pocket, is_careful=is_careful)
+ is_careful=is_careful)
+ pub_records = [
+ pub for pub in pub_records
+ if pub.distroseries == self.breezy_autotest and
+ pub.pocket == pocket]
- self.assertEqual(pub_records.count(), len(expected_result))
+ self.assertEqual(len(expected_result), len(pub_records))
self.assertEqual(
[item.id for item in expected_result],
[pub.id for pub in pub_records])
@@ -278,9 +277,13 @@
"""Check the results of an IDistroArchSeries publishing lookup."""
self._ensurePublisher()
pub_records = self.publisher.getPendingBinaryPublications(
- self.breezy_autotest_i386, pocket, is_careful=is_careful)
+ is_careful=is_careful)
+ pub_records = [
+ pub for pub in pub_records
+ if pub.distroarchseries == self.breezy_autotest_i386 and
+ pub.pocket == pocket]
- self.assertEqual(pub_records.count(), len(expected_result))
+ self.assertEqual(len(expected_result), len(pub_records))
self.assertEqual(
[item.id for item in expected_result],
[pub.id for pub in pub_records])
@@ -1031,7 +1034,9 @@
# Remove security proxy so that the publisher can call our fake
# method.
publisher.distro = removeSecurityProxy(publisher.distro)
- publisher.distro['hoary-test'].status = SeriesStatus.OBSOLETE
+ pub_source = self.getPubSource(distroseries=self.breezy_autotest)
+ self.getPubBinaries(
+ distroseries=self.breezy_autotest, pub_source=pub_source)
for status in (SeriesStatus.OBSOLETE, SeriesStatus.FUTURE):
naked_breezy_autotest = publisher.distro['breezy-autotest']
@@ -1055,6 +1060,11 @@
# Remove security proxy so that the publisher can call our fake
# method.
publisher.distro = removeSecurityProxy(publisher.distro)
+ pub_source = self.getPubSource(
+ distroseries=self.breezy_autotest, archive=test_archive)
+ self.getPubBinaries(
+ distroseries=self.breezy_autotest, archive=test_archive,
+ pub_source=pub_source)
for status in (SeriesStatus.OBSOLETE, SeriesStatus.FUTURE):
naked_breezy_autotest = publisher.distro['breezy-autotest']
@@ -1064,12 +1074,13 @@
publisher.A_publish(False)
- self.assertIn(
- (naked_breezy_autotest, RELEASE),
- publisher.publishSources.extract_args())
- self.assertIn(
- (naked_breezy_autotest.architectures[0], RELEASE),
- publisher.publishBinaries.extract_args())
+ source_args = [
+ args[:2] for args in publisher.publishSources.extract_args()]
+ self.assertIn((naked_breezy_autotest, RELEASE), source_args)
+ binary_args = [
+ args[:2] for args in publisher.publishBinaries.extract_args()]
+ self.assertIn(
+ (naked_breezy_autotest.architectures[0], RELEASE), binary_args)
def testPublisherBuilderFunctions(self):
"""Publisher can be initialized via provided helper function.
=== modified file 'lib/lp/registry/model/distroseries.py'
--- lib/lp/registry/model/distroseries.py 2014-07-08 22:31:49 +0000
+++ lib/lp/registry/model/distroseries.py 2014-07-10 14:58:38 +0000
@@ -6,6 +6,8 @@
__metaclass__ = type
__all__ = [
+ 'ACTIVE_RELEASED_STATUSES',
+ 'ACTIVE_UNRELEASED_STATUSES',
'DistroSeries',
'DistroSeriesSet',
]
@@ -191,6 +193,19 @@
)
+ACTIVE_RELEASED_STATUSES = [
+ SeriesStatus.CURRENT,
+ SeriesStatus.SUPPORTED,
+ ]
+
+
+ACTIVE_UNRELEASED_STATUSES = [
+ SeriesStatus.EXPERIMENTAL,
+ SeriesStatus.DEVELOPMENT,
+ SeriesStatus.FROZEN,
+ ]
+
+
class DistroSeries(SQLBase, BugTargetBase, HasSpecificationsMixin,
HasTranslationImportsMixin, HasTranslationTemplatesMixin,
HasMilestonesMixin, SeriesMixin,
@@ -945,11 +960,7 @@
def isUnstable(self):
"""See `IDistroSeries`."""
- return self.status in [
- SeriesStatus.FROZEN,
- SeriesStatus.DEVELOPMENT,
- SeriesStatus.EXPERIMENTAL,
- ]
+ return self.status in ACTIVE_UNRELEASED_STATUSES
def _getAllSources(self):
"""Get all sources ever published in this series' main archives."""
@@ -1535,14 +1546,11 @@
if isreleased:
# The query is filtered on released releases.
where_clause += "releasestatus in (%s, %s)" % sqlvalues(
- SeriesStatus.CURRENT,
- SeriesStatus.SUPPORTED)
+ *ACTIVE_RELEASED_STATUSES)
else:
# The query is filtered on unreleased releases.
where_clause += "releasestatus in (%s, %s, %s)" % sqlvalues(
- SeriesStatus.EXPERIMENTAL,
- SeriesStatus.DEVELOPMENT,
- SeriesStatus.FROZEN)
+ *ACTIVE_UNRELEASED_STATUSES)
if orderBy is not None:
return DistroSeries.select(where_clause, orderBy=orderBy)
else:
Follow ups