launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #20613
[Merge] lp:~cjwatson/launchpad/dspc-update-garbo into lp:launchpad
Colin Watson has proposed merging lp:~cjwatson/launchpad/dspc-update-garbo into lp:launchpad.
Commit message:
Populate DistributionSourcePackageCache for new source publications from garbo-frequently, to avoid copy races.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/dspc-update-garbo/+merge/297333
Populate DistributionSourcePackageCache for new source publications from garbo-frequently, to avoid copy races.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/dspc-update-garbo into lp:launchpad.
=== modified file 'database/schema/security.cfg'
--- database/schema/security.cfg 2016-06-04 00:56:27 +0000
+++ database/schema/security.cfg 2016-06-14 12:53:41 +0000
@@ -2339,6 +2339,7 @@
public.codeimportresult = SELECT, DELETE
public.commercialsubscription = SELECT, UPDATE
public.diff = SELECT, DELETE
+public.distributionsourcepackagecache = SELECT, INSERT
public.distroseries = SELECT, UPDATE
public.emailaddress = SELECT, UPDATE, DELETE
public.garbojobstate = SELECT, INSERT, UPDATE, DELETE
=== modified file 'lib/lp/scripts/garbo.py'
--- lib/lp/scripts/garbo.py 2016-05-19 20:58:16 +0000
+++ lib/lp/scripts/garbo.py 2016-06-14 12:53:41 +0000
@@ -72,6 +72,7 @@
from lp.registry.model.commercialsubscription import CommercialSubscription
from lp.registry.model.person import Person
from lp.registry.model.product import Product
+from lp.registry.model.sourcepackagename import SourcePackageName
from lp.registry.model.teammembership import TeamMembership
from lp.services.config import config
from lp.services.database import postgresql
@@ -118,7 +119,11 @@
from lp.services.verification.model.logintoken import LoginToken
from lp.services.webhooks.interfaces import IWebhookJobSource
from lp.services.webhooks.model import WebhookJob
+from lp.soyuz.enums import PackagePublishingStatus
from lp.soyuz.model.archive import Archive
+from lp.soyuz.model.distributionsourcepackagecache import (
+ DistributionSourcePackageCache,
+ )
from lp.soyuz.model.livefsbuild import LiveFSFile
from lp.soyuz.model.publishing import SourcePackagePublishingHistory
from lp.soyuz.model.reporting import LatestPersonSourcePackageReleaseCache
@@ -480,6 +485,105 @@
transaction.commit()
+class PopulateDistributionSourcePackageCache(TunableLoop):
+ """Populate the DistributionSourcePackageCache table.
+
+ Ensure that new source publications have a row in
+ DistributionSourcePackageCache.
+ """
+ maximum_chunk_size = 1000
+
+ def __init__(self, log, abort_time=None):
+ super(PopulateDistributionSourcePackageCache, self).__init__(
+ log, abort_time)
+ self.store = IMasterStore(DistributionSourcePackageCache)
+ # Keep a record of the processed source publication ID so we know
+ # where the job got up to.
+ self.last_spph_id = 0
+ self.job_name = self.__class__.__name__
+ job_data = load_garbo_job_state(self.job_name)
+ if job_data:
+ self.last_spph_id = job_data.get('last_spph_id', 0)
+
+ def getPendingUpdates(self):
+ # Load the latest published source publication data.
+ origin = [
+ SourcePackagePublishingHistory,
+ Join(
+ SourcePackageRelease,
+ SourcePackageRelease.id ==
+ SourcePackagePublishingHistory.sourcepackagereleaseID),
+ Join(
+ SourcePackageName,
+ SourcePackageName.id ==
+ SourcePackageRelease.sourcepackagenameID),
+ Join(
+ Archive,
+ Archive.id == SourcePackagePublishingHistory.archiveID),
+ ]
+ rows = self.store.using(*origin).find(
+ (SourcePackagePublishingHistory.id,
+ Archive.id,
+ Archive.distributionID,
+ SourcePackageName.id,
+ SourcePackageName.name),
+ SourcePackagePublishingHistory.status.is_in((
+ PackagePublishingStatus.PENDING,
+ PackagePublishingStatus.PUBLISHED)),
+ SourcePackagePublishingHistory.id > self.last_spph_id)
+ return rows.order_by(SourcePackagePublishingHistory.id)
+
+ def isDone(self):
+ return self.getPendingUpdates().is_empty()
+
+ def __call__(self, chunk_size):
+ # Create a map of new source publications, keyed on (archive,
+ # distribution, SPN).
+ cache_filter_data = []
+ new_records = {}
+ for new_publication in self.getPendingUpdates()[:chunk_size]:
+ (spph_id, archive_id, distribution_id,
+ spn_id, spn_name) = new_publication
+ cache_filter_data.append((archive_id, distribution_id, spn_id))
+ new_records[(archive_id, distribution_id, spn_id)] = spn_name
+ self.last_spph_id = spph_id
+
+ # Gather all the current cached records corresponding to the data in
+ # the current batch.
+ existing_records = set()
+ rows = self.store.find(
+ DistributionSourcePackageCache,
+ In(
+ Row(
+ DistributionSourcePackageCache.archiveID,
+ DistributionSourcePackageCache.distributionID,
+ DistributionSourcePackageCache.sourcepackagenameID),
+ map(Row, cache_filter_data)))
+ for dspc in rows:
+ existing_records.add(
+ (dspc.archiveID, dspc.distributionID,
+ dspc.sourcepackagenameID))
+
+ # Bulk-create missing cache rows.
+ inserts = []
+ for data in set(new_records) - existing_records:
+ archive_id, distribution_id, spn_id = data
+ inserts.append(
+ (archive_id, distribution_id, spn_id, new_records[data]))
+ if inserts:
+ create(
+ (DistributionSourcePackageCache.archiveID,
+ DistributionSourcePackageCache.distributionID,
+ DistributionSourcePackageCache.sourcepackagenameID,
+ DistributionSourcePackageCache.name),
+ inserts)
+
+ self.store.flush()
+ save_garbo_job_state(self.job_name, {
+ 'last_spph_id': self.last_spph_id})
+ transaction.commit()
+
+
class PopulateLatestPersonSourcePackageReleaseCache(TunableLoop):
"""Populate the LatestPersonSourcePackageReleaseCache table.
@@ -1683,6 +1787,7 @@
BugSummaryJournalRollup,
OpenIDConsumerAssociationPruner,
OpenIDConsumerNoncePruner,
+ PopulateDistributionSourcePackageCache,
PopulateLatestPersonSourcePackageReleaseCache,
VoucherRedeemer,
]
=== modified file 'lib/lp/scripts/tests/test_garbo.py'
--- lib/lp/scripts/tests/test_garbo.py 2016-05-19 22:38:26 +0000
+++ lib/lp/scripts/tests/test_garbo.py 2016-06-14 12:53:41 +0000
@@ -120,6 +120,9 @@
)
from lp.soyuz.enums import PackagePublishingStatus
from lp.soyuz.interfaces.livefs import LIVEFS_FEATURE_FLAG
+from lp.soyuz.model.distributionsourcepackagecache import (
+ DistributionSourcePackageCache,
+ )
from lp.soyuz.model.livefsbuild import LiveFSFile
from lp.soyuz.model.reporting import LatestPersonSourcePackageReleaseCache
from lp.testing import (
@@ -1286,6 +1289,71 @@
self.assertEqual(VCSType.GIT, product.vcs)
+ def test_PopulateDistributionSourcePackageCache(self):
+ switch_dbuser('testadmin')
+ # Make some test data. We create source publications for different
+ # distributions and archives.
+ distributions = []
+ for _ in range(2):
+ distribution = self.factory.makeDistribution()
+ self.factory.makeDistroSeries(distribution=distribution)
+ distributions.append(distribution)
+ archives = []
+ for distribution in distributions:
+ archives.append(distribution.main_archive)
+ archives.append(
+ self.factory.makeArchive(distribution=distribution))
+ spns = [self.factory.makeSourcePackageName() for _ in range(2)]
+ spphs = []
+ for spn in spns:
+ for archive in archives:
+ spphs.append(self.factory.makeSourcePackagePublishingHistory(
+ distroseries=archive.distribution.currentseries,
+ archive=archive, status=PackagePublishingStatus.PUBLISHED,
+ sourcepackagename=spn))
+ transaction.commit()
+
+ self.runFrequently()
+
+ def _assert_cached_names(spns, archive):
+ dspcs = DistributionSourcePackageCache._find(
+ archive.distribution, archive)
+ self.assertContentEqual(
+ spns, [dspc.sourcepackagename for dspc in dspcs])
+ if archive.is_main:
+ for spn in spns:
+ self.assertEqual(
+ 1,
+ archive.distribution.searchSourcePackages(
+ spn.name).count())
+
+ def _assert_last_spph_id(spph_id):
+ job_data = load_garbo_job_state(
+ 'PopulateDistributionSourcePackageCache')
+ self.assertEqual(spph_id, job_data['last_spph_id'])
+
+ for archive in archives:
+ _assert_cached_names(spns, archive)
+ _assert_last_spph_id(spphs[-1].id)
+
+ # Create some more publications. Those with PENDING or PUBLISHED
+ # status are inserted into the cache, while others are ignored.
+ switch_dbuser('testadmin')
+ spphs.append(self.factory.makeSourcePackagePublishingHistory(
+ distroseries=archives[0].distribution.currentseries,
+ archive=archives[0], status=PackagePublishingStatus.PENDING))
+ spphs.append(self.factory.makeSourcePackagePublishingHistory(
+ distroseries=archives[0].distribution.currentseries,
+ archive=archives[0], status=PackagePublishingStatus.SUPERSEDED))
+ transaction.commit()
+
+ self.runFrequently()
+
+ _assert_cached_names(spns + [spphs[-2].sourcepackagename], archives[0])
+ for archive in archives[1:]:
+ _assert_cached_names(spns, archive)
+ _assert_last_spph_id(spphs[-2].id)
+
def test_PopulateLatestPersonSourcePackageReleaseCache(self):
switch_dbuser('testadmin')
# Make some same test data - we create published source package
=== modified file 'lib/lp/soyuz/model/publishing.py'
--- lib/lp/soyuz/model/publishing.py 2016-06-01 01:42:52 +0000
+++ lib/lp/soyuz/model/publishing.py 2016-06-14 12:53:41 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2015 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2016 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
__metaclass__ = type
@@ -1264,13 +1264,10 @@
ancestor=None, create_dsd_job=True,
creator=None, sponsor=None, packageupload=None):
"""See `IPublishingSet`."""
- # Circular imports.
+ # Circular import.
from lp.registry.model.distributionsourcepackage import (
DistributionSourcePackage,
)
- from lp.soyuz.model.distributionsourcepackagecache import (
- DistributionSourcePackageCache,
- )
if distroseries.distribution != archive.distribution:
raise AssertionError(
@@ -1300,11 +1297,6 @@
Store.of(sourcepackagerelease).flush()
del get_property_cache(sourcepackagerelease).published_archives
- DistributionSourcePackageCache.update(
- distroseries.distribution,
- [sourcepackagerelease.sourcepackagename], archive,
- with_binaries=False)
-
return pub
def getBuildsForSourceIds(self, source_publication_ids, archive=None,
=== modified file 'lib/lp/soyuz/tests/test_publishing.py'
--- lib/lp/soyuz/tests/test_publishing.py 2016-05-23 16:00:10 +0000
+++ lib/lp/soyuz/tests/test_publishing.py 2016-06-14 12:53:41 +0000
@@ -906,19 +906,6 @@
super(TestPublishingSetLite, self).setUp()
self.person = self.factory.makePerson()
- def test_newSourcePublication_updates_cache(self):
- ubuntutest = getUtility(IDistributionSet)["ubuntutest"]
- breezy_autotest = ubuntutest["breezy-autotest"]
- spn = self.factory.makeSourcePackageName()
- self.assertEqual(0, ubuntutest.searchSourcePackages(spn.name).count())
- spr = self.factory.makeSourcePackageRelease(
- archive=ubuntutest.main_archive, sourcepackagename=spn,
- distroseries=breezy_autotest, creator=self.person)
- getUtility(IPublishingSet).newSourcePublication(
- ubuntutest.main_archive, spr, breezy_autotest, spr.component,
- spr.section, self.factory.getAnyPocket())
- self.assertEqual(1, ubuntutest.searchSourcePackages(spn.name).count())
-
def test_requestDeletion_marks_SPPHs_deleted(self):
spph = self.factory.makeSourcePackagePublishingHistory()
getUtility(IPublishingSet).requestDeletion([spph], self.person)
=== modified file 'lib/lp/testing/factory.py'
--- lib/lp/testing/factory.py 2016-06-02 04:54:37 +0000
+++ lib/lp/testing/factory.py 2016-06-14 12:53:41 +0000
@@ -4198,13 +4198,12 @@
sourcepackagename=dsp.sourcepackagename,
archive=archive)
with dbuser('statistician'):
- cache = IStore(DistributionSourcePackageCache).find(
- DistributionSourcePackageCache,
- DistributionSourcePackageCache.distribution == distribution,
- DistributionSourcePackageCache.archive == archive,
- DistributionSourcePackageCache.sourcepackagename ==
- dsp.sourcepackagename).one()
- cache.binpkgnames = binary_names
+ DistributionSourcePackageCache(
+ distribution=dsp.distribution,
+ sourcepackagename=dsp.sourcepackagename,
+ archive=archive,
+ name=package_name,
+ binpkgnames=binary_names)
return distribution, dsp
def makeEmailMessage(self, body=None, sender=None, to=None,
Follow ups