← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/launchpad/dspc-update-garbo into lp:launchpad

 

Colin Watson has proposed merging lp:~cjwatson/launchpad/dspc-update-garbo into lp:launchpad.

Commit message:
Populate DistributionSourcePackageCache for new source publications from garbo-frequently, to avoid copy races.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/dspc-update-garbo/+merge/297333

Populate DistributionSourcePackageCache for new source publications from garbo-frequently, to avoid copy races.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/dspc-update-garbo into lp:launchpad.
=== modified file 'database/schema/security.cfg'
--- database/schema/security.cfg	2016-06-04 00:56:27 +0000
+++ database/schema/security.cfg	2016-06-14 12:53:41 +0000
@@ -2339,6 +2339,7 @@
 public.codeimportresult                 = SELECT, DELETE
 public.commercialsubscription           = SELECT, UPDATE
 public.diff                             = SELECT, DELETE
+public.distributionsourcepackagecache   = SELECT, INSERT
 public.distroseries                     = SELECT, UPDATE
 public.emailaddress                     = SELECT, UPDATE, DELETE
 public.garbojobstate                    = SELECT, INSERT, UPDATE, DELETE

=== modified file 'lib/lp/scripts/garbo.py'
--- lib/lp/scripts/garbo.py	2016-05-19 20:58:16 +0000
+++ lib/lp/scripts/garbo.py	2016-06-14 12:53:41 +0000
@@ -72,6 +72,7 @@
 from lp.registry.model.commercialsubscription import CommercialSubscription
 from lp.registry.model.person import Person
 from lp.registry.model.product import Product
+from lp.registry.model.sourcepackagename import SourcePackageName
 from lp.registry.model.teammembership import TeamMembership
 from lp.services.config import config
 from lp.services.database import postgresql
@@ -118,7 +119,11 @@
 from lp.services.verification.model.logintoken import LoginToken
 from lp.services.webhooks.interfaces import IWebhookJobSource
 from lp.services.webhooks.model import WebhookJob
+from lp.soyuz.enums import PackagePublishingStatus
 from lp.soyuz.model.archive import Archive
+from lp.soyuz.model.distributionsourcepackagecache import (
+    DistributionSourcePackageCache,
+    )
 from lp.soyuz.model.livefsbuild import LiveFSFile
 from lp.soyuz.model.publishing import SourcePackagePublishingHistory
 from lp.soyuz.model.reporting import LatestPersonSourcePackageReleaseCache
@@ -480,6 +485,105 @@
         transaction.commit()
 
 
+class PopulateDistributionSourcePackageCache(TunableLoop):
+    """Populate the DistributionSourcePackageCache table.
+
+    Ensure that new source publications have a row in
+    DistributionSourcePackageCache.
+    """
+    maximum_chunk_size = 1000
+
+    def __init__(self, log, abort_time=None):
+        super(PopulateDistributionSourcePackageCache, self).__init__(
+            log, abort_time)
+        self.store = IMasterStore(DistributionSourcePackageCache)
+        # Keep a record of the processed source publication ID so we know
+        # where the job got up to.
+        self.last_spph_id = 0
+        self.job_name = self.__class__.__name__
+        job_data = load_garbo_job_state(self.job_name)
+        if job_data:
+            self.last_spph_id = job_data.get('last_spph_id', 0)
+
+    def getPendingUpdates(self):
+        # Load the latest published source publication data.
+        origin = [
+            SourcePackagePublishingHistory,
+            Join(
+                SourcePackageRelease,
+                SourcePackageRelease.id ==
+                    SourcePackagePublishingHistory.sourcepackagereleaseID),
+            Join(
+                SourcePackageName,
+                SourcePackageName.id ==
+                    SourcePackageRelease.sourcepackagenameID),
+            Join(
+                Archive,
+                Archive.id == SourcePackagePublishingHistory.archiveID),
+            ]
+        rows = self.store.using(*origin).find(
+            (SourcePackagePublishingHistory.id,
+             Archive.id,
+             Archive.distributionID,
+             SourcePackageName.id,
+             SourcePackageName.name),
+            SourcePackagePublishingHistory.status.is_in((
+                PackagePublishingStatus.PENDING,
+                PackagePublishingStatus.PUBLISHED)),
+            SourcePackagePublishingHistory.id > self.last_spph_id)
+        return rows.order_by(SourcePackagePublishingHistory.id)
+
+    def isDone(self):
+        return self.getPendingUpdates().is_empty()
+
+    def __call__(self, chunk_size):
+        # Create a map of new source publications, keyed on (archive,
+        # distribution, SPN).
+        cache_filter_data = []
+        new_records = {}
+        for new_publication in self.getPendingUpdates()[:chunk_size]:
+            (spph_id, archive_id, distribution_id,
+             spn_id, spn_name) = new_publication
+            cache_filter_data.append((archive_id, distribution_id, spn_id))
+            new_records[(archive_id, distribution_id, spn_id)] = spn_name
+            self.last_spph_id = spph_id
+
+        # Gather all the current cached records corresponding to the data in
+        # the current batch.
+        existing_records = set()
+        rows = self.store.find(
+            DistributionSourcePackageCache,
+            In(
+                Row(
+                    DistributionSourcePackageCache.archiveID,
+                    DistributionSourcePackageCache.distributionID,
+                    DistributionSourcePackageCache.sourcepackagenameID),
+                map(Row, cache_filter_data)))
+        for dspc in rows:
+            existing_records.add(
+                (dspc.archiveID, dspc.distributionID,
+                 dspc.sourcepackagenameID))
+
+        # Bulk-create missing cache rows.
+        inserts = []
+        for data in set(new_records) - existing_records:
+            archive_id, distribution_id, spn_id = data
+            inserts.append(
+                (archive_id, distribution_id, spn_id, new_records[data]))
+        if inserts:
+            create(
+                (DistributionSourcePackageCache.archiveID,
+                 DistributionSourcePackageCache.distributionID,
+                 DistributionSourcePackageCache.sourcepackagenameID,
+                 DistributionSourcePackageCache.name),
+                inserts)
+
+        self.store.flush()
+        save_garbo_job_state(self.job_name, {
+            'last_spph_id': self.last_spph_id})
+        transaction.commit()
+
+
 class PopulateLatestPersonSourcePackageReleaseCache(TunableLoop):
     """Populate the LatestPersonSourcePackageReleaseCache table.
 
@@ -1683,6 +1787,7 @@
         BugSummaryJournalRollup,
         OpenIDConsumerAssociationPruner,
         OpenIDConsumerNoncePruner,
+        PopulateDistributionSourcePackageCache,
         PopulateLatestPersonSourcePackageReleaseCache,
         VoucherRedeemer,
         ]

=== modified file 'lib/lp/scripts/tests/test_garbo.py'
--- lib/lp/scripts/tests/test_garbo.py	2016-05-19 22:38:26 +0000
+++ lib/lp/scripts/tests/test_garbo.py	2016-06-14 12:53:41 +0000
@@ -120,6 +120,9 @@
     )
 from lp.soyuz.enums import PackagePublishingStatus
 from lp.soyuz.interfaces.livefs import LIVEFS_FEATURE_FLAG
+from lp.soyuz.model.distributionsourcepackagecache import (
+    DistributionSourcePackageCache,
+    )
 from lp.soyuz.model.livefsbuild import LiveFSFile
 from lp.soyuz.model.reporting import LatestPersonSourcePackageReleaseCache
 from lp.testing import (
@@ -1286,6 +1289,71 @@
 
         self.assertEqual(VCSType.GIT, product.vcs)
 
+    def test_PopulateDistributionSourcePackageCache(self):
+        switch_dbuser('testadmin')
+        # Make some test data.  We create source publications for different
+        # distributions and archives.
+        distributions = []
+        for _ in range(2):
+            distribution = self.factory.makeDistribution()
+            self.factory.makeDistroSeries(distribution=distribution)
+            distributions.append(distribution)
+        archives = []
+        for distribution in distributions:
+            archives.append(distribution.main_archive)
+            archives.append(
+                self.factory.makeArchive(distribution=distribution))
+        spns = [self.factory.makeSourcePackageName() for _ in range(2)]
+        spphs = []
+        for spn in spns:
+            for archive in archives:
+                spphs.append(self.factory.makeSourcePackagePublishingHistory(
+                    distroseries=archive.distribution.currentseries,
+                    archive=archive, status=PackagePublishingStatus.PUBLISHED,
+                    sourcepackagename=spn))
+        transaction.commit()
+
+        self.runFrequently()
+
+        def _assert_cached_names(spns, archive):
+            dspcs = DistributionSourcePackageCache._find(
+                archive.distribution, archive)
+            self.assertContentEqual(
+                spns, [dspc.sourcepackagename for dspc in dspcs])
+            if archive.is_main:
+                for spn in spns:
+                    self.assertEqual(
+                        1,
+                        archive.distribution.searchSourcePackages(
+                            spn.name).count())
+
+        def _assert_last_spph_id(spph_id):
+            job_data = load_garbo_job_state(
+                'PopulateDistributionSourcePackageCache')
+            self.assertEqual(spph_id, job_data['last_spph_id'])
+
+        for archive in archives:
+            _assert_cached_names(spns, archive)
+        _assert_last_spph_id(spphs[-1].id)
+
+        # Create some more publications.  Those with PENDING or PUBLISHED
+        # status are inserted into the cache, while others are ignored.
+        switch_dbuser('testadmin')
+        spphs.append(self.factory.makeSourcePackagePublishingHistory(
+            distroseries=archives[0].distribution.currentseries,
+            archive=archives[0], status=PackagePublishingStatus.PENDING))
+        spphs.append(self.factory.makeSourcePackagePublishingHistory(
+            distroseries=archives[0].distribution.currentseries,
+            archive=archives[0], status=PackagePublishingStatus.SUPERSEDED))
+        transaction.commit()
+
+        self.runFrequently()
+
+        _assert_cached_names(spns + [spphs[-2].sourcepackagename], archives[0])
+        for archive in archives[1:]:
+            _assert_cached_names(spns, archive)
+        _assert_last_spph_id(spphs[-2].id)
+
     def test_PopulateLatestPersonSourcePackageReleaseCache(self):
         switch_dbuser('testadmin')
         # Make some same test data - we create published source package

=== modified file 'lib/lp/soyuz/model/publishing.py'
--- lib/lp/soyuz/model/publishing.py	2016-06-01 01:42:52 +0000
+++ lib/lp/soyuz/model/publishing.py	2016-06-14 12:53:41 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2015 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2016 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 __metaclass__ = type
@@ -1264,13 +1264,10 @@
                              ancestor=None, create_dsd_job=True,
                              creator=None, sponsor=None, packageupload=None):
         """See `IPublishingSet`."""
-        # Circular imports.
+        # Circular import.
         from lp.registry.model.distributionsourcepackage import (
             DistributionSourcePackage,
             )
-        from lp.soyuz.model.distributionsourcepackagecache import (
-            DistributionSourcePackageCache,
-            )
 
         if distroseries.distribution != archive.distribution:
             raise AssertionError(
@@ -1300,11 +1297,6 @@
         Store.of(sourcepackagerelease).flush()
         del get_property_cache(sourcepackagerelease).published_archives
 
-        DistributionSourcePackageCache.update(
-            distroseries.distribution,
-            [sourcepackagerelease.sourcepackagename], archive,
-            with_binaries=False)
-
         return pub
 
     def getBuildsForSourceIds(self, source_publication_ids, archive=None,

=== modified file 'lib/lp/soyuz/tests/test_publishing.py'
--- lib/lp/soyuz/tests/test_publishing.py	2016-05-23 16:00:10 +0000
+++ lib/lp/soyuz/tests/test_publishing.py	2016-06-14 12:53:41 +0000
@@ -906,19 +906,6 @@
         super(TestPublishingSetLite, self).setUp()
         self.person = self.factory.makePerson()
 
-    def test_newSourcePublication_updates_cache(self):
-        ubuntutest = getUtility(IDistributionSet)["ubuntutest"]
-        breezy_autotest = ubuntutest["breezy-autotest"]
-        spn = self.factory.makeSourcePackageName()
-        self.assertEqual(0, ubuntutest.searchSourcePackages(spn.name).count())
-        spr = self.factory.makeSourcePackageRelease(
-            archive=ubuntutest.main_archive, sourcepackagename=spn,
-            distroseries=breezy_autotest, creator=self.person)
-        getUtility(IPublishingSet).newSourcePublication(
-            ubuntutest.main_archive, spr, breezy_autotest, spr.component,
-            spr.section, self.factory.getAnyPocket())
-        self.assertEqual(1, ubuntutest.searchSourcePackages(spn.name).count())
-
     def test_requestDeletion_marks_SPPHs_deleted(self):
         spph = self.factory.makeSourcePackagePublishingHistory()
         getUtility(IPublishingSet).requestDeletion([spph], self.person)

=== modified file 'lib/lp/testing/factory.py'
--- lib/lp/testing/factory.py	2016-06-02 04:54:37 +0000
+++ lib/lp/testing/factory.py	2016-06-14 12:53:41 +0000
@@ -4198,13 +4198,12 @@
                 sourcepackagename=dsp.sourcepackagename,
                 archive=archive)
         with dbuser('statistician'):
-            cache = IStore(DistributionSourcePackageCache).find(
-                DistributionSourcePackageCache,
-                DistributionSourcePackageCache.distribution == distribution,
-                DistributionSourcePackageCache.archive == archive,
-                DistributionSourcePackageCache.sourcepackagename ==
-                    dsp.sourcepackagename).one()
-            cache.binpkgnames = binary_names
+            DistributionSourcePackageCache(
+                distribution=dsp.distribution,
+                sourcepackagename=dsp.sourcepackagename,
+                archive=archive,
+                name=package_name,
+                binpkgnames=binary_names)
         return distribution, dsp
 
     def makeEmailMessage(self, body=None, sender=None, to=None,


Follow ups