← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:publish-distro-reset-store into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:publish-distro-reset-store into launchpad:master.

Commit message:
publish-distro: Reset Storm store between each archive

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #1910422 in Launchpad itself: "PPA publisher becomes very slow for several runs then recovers"
  https://bugs.launchpad.net/launchpad/+bug/1910422

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/396031

When publishing large archives, a large number of alive objects can be accumulated in the Storm store, which means that each commit can take a long time as it has to iterate over every alive object to invalidate it.  This has been particularly noticeable on production since raising the
Storm cache size for LPCONFIG=ppa-publish to 500000 from the default value of 10000, which has fixed preloading at the expense of slowing down commits: after publishing some large archives, each commit can take on the order of ten seconds, and since there are typically four commits per dirty PPA this can add up very quickly.

To avoid this, reset the store between archives in roughly the same way we do between web requests.  This means we have to go to some more work in tests to reload objects from the database.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:publish-distro-reset-store into launchpad:master.
diff --git a/lib/lp/archivepublisher/scripts/publish_ftpmaster.py b/lib/lp/archivepublisher/scripts/publish_ftpmaster.py
index 2610531..27427cf 100644
--- a/lib/lp/archivepublisher/scripts/publish_ftpmaster.py
+++ b/lib/lp/archivepublisher/scripts/publish_ftpmaster.py
@@ -1,4 +1,4 @@
-# Copyright 2011-2018 Canonical Ltd.  This software is licensed under the
+# Copyright 2011-2021 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Master distro publishing script."""
@@ -353,7 +353,7 @@ class PublishFTPMaster(LaunchpadCronScript):
             test_args=arguments, logger=self.logger, ignore_cron_control=True)
         publish_distro.logger = self.logger
         publish_distro.txn = self.txn
-        publish_distro.main()
+        publish_distro.main(reset_store_between_archives=False)
 
     def publishDistroArchive(self, distribution, archive,
                              security_suites=None):
diff --git a/lib/lp/archivepublisher/scripts/publishdistro.py b/lib/lp/archivepublisher/scripts/publishdistro.py
index b114e8e..dabcb8c 100644
--- a/lib/lp/archivepublisher/scripts/publishdistro.py
+++ b/lib/lp/archivepublisher/scripts/publishdistro.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2016 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2021 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Publisher script class."""
@@ -10,6 +10,7 @@ __all__ = [
 from optparse import OptionValueError
 
 from six.moves import filter as ifilter
+from storm.store import Store
 from zope.component import getUtility
 
 from lp.app.errors import NotFoundError
@@ -351,33 +352,53 @@ class PublishDistro(PublisherScript):
         if self.options.enable_apt:
             publisher.createSeriesAliases()
 
-    def main(self):
+    def processArchive(self, archive_id, reset_store=True):
+        set_request_started(
+            request_statements=LimitedList(10000),
+            txn=self.txn, enable_timeout=False)
+        try:
+            archive = getUtility(IArchiveSet).get(archive_id)
+            distribution = archive.distribution
+            allowed_suites = self.findAllowedSuites(distribution)
+            if archive.status == ArchiveStatus.DELETING:
+                publisher = self.getPublisher(
+                    distribution, archive, allowed_suites)
+                work_done = self.deleteArchive(archive, publisher)
+            elif archive.can_be_published:
+                publisher = self.getPublisher(
+                    distribution, archive, allowed_suites)
+                self.publishArchive(archive, publisher)
+                work_done = True
+            else:
+                work_done = False
+        finally:
+            clear_request_started()
+
+        if work_done:
+            self.txn.commit()
+            if reset_store:
+                # Reset the store after processing each dirty archive, as
+                # otherwise the process of publishing large archives can
+                # accumulate a large number of alive objects in the Storm
+                # store and cause performance problems.
+                Store.of(archive).reset()
+
+    def main(self, reset_store_between_archives=True):
         """See `LaunchpadScript`."""
         self.validateOptions()
         self.logOptions()
 
+        archive_ids = []
         for distribution in self.findDistros():
-            allowed_suites = self.findAllowedSuites(distribution)
             for archive in self.getTargetArchives(distribution):
-                set_request_started(
-                    request_statements=LimitedList(10000),
-                    txn=self.txn, enable_timeout=False)
-                try:
-                    if archive.status == ArchiveStatus.DELETING:
-                        publisher = self.getPublisher(
-                            distribution, archive, allowed_suites)
-                        work_done = self.deleteArchive(archive, publisher)
-                    elif archive.can_be_published:
-                        publisher = self.getPublisher(
-                            distribution, archive, allowed_suites)
-                        self.publishArchive(archive, publisher)
-                        work_done = True
-                    else:
-                        work_done = False
-                finally:
-                    clear_request_started()
-
-                if work_done:
-                    self.txn.commit()
+                if archive.distribution != distribution:
+                    raise AssertionError(
+                        "Archive %s does not match distribution %r" %
+                        (archive.reference, distribution))
+                archive_ids.append(archive.id)
+
+        for archive_id in archive_ids:
+            self.processArchive(
+                archive_id, reset_store=reset_store_between_archives)
 
         self.logger.debug("Ciao")
diff --git a/lib/lp/archivepublisher/tests/test_publishdistro.py b/lib/lp/archivepublisher/tests/test_publishdistro.py
index 66bab77..a260c71 100644
--- a/lib/lp/archivepublisher/tests/test_publishdistro.py
+++ b/lib/lp/archivepublisher/tests/test_publishdistro.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2021 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Functional tests for publish-distro.py script."""
@@ -14,6 +14,7 @@ import subprocess
 import sys
 
 import six
+from storm.store import Store
 from testtools.matchers import (
     Not,
     PathExists,
@@ -21,7 +22,10 @@ from testtools.matchers import (
 from testtools.twistedsupport import AsynchronousDeferredRunTest
 from twisted.internet import defer
 from zope.component import getUtility
-from zope.security.proxy import removeSecurityProxy
+from zope.security.proxy import (
+    ProxyFactory,
+    removeSecurityProxy,
+    )
 
 from lp.app.interfaces.launchpad import ILaunchpadCelebrities
 from lp.archivepublisher.config import getPubConfig
@@ -35,6 +39,7 @@ from lp.registry.interfaces.distribution import IDistributionSet
 from lp.registry.interfaces.person import IPersonSet
 from lp.registry.interfaces.pocket import PackagePublishingPocket
 from lp.services.config import config
+from lp.services.database.interfaces import IStore
 from lp.services.log.logger import (
     BufferLogger,
     DevNullLogger,
@@ -46,6 +51,7 @@ from lp.soyuz.enums import (
     PackagePublishingStatus,
     )
 from lp.soyuz.interfaces.archive import IArchiveSet
+from lp.soyuz.model.publishing import SourcePackagePublishingHistory
 from lp.soyuz.tests.test_publishing import TestNativePublishingBase
 from lp.testing import TestCaseWithFactory
 from lp.testing.dbuser import switch_dbuser
@@ -86,6 +92,16 @@ class TestPublishDistro(TestNativePublishingBase):
         stdout, stderr = process.communicate()
         return (process.returncode, stdout, stderr)
 
+    def loadPubSource(self, spph_id):
+        """Load a source package publishing history row from the DB.
+
+        `PublishDistro.processArchive` resets the store for performance
+        reasons, so tests need to reload database objects from scratch after
+        calling it.
+        """
+        return ProxyFactory(IStore(SourcePackagePublishingHistory).get(
+            SourcePackagePublishingHistory, spph_id))
+
     def testPublishDistroRun(self):
         """Try a simple publish-distro run.
 
@@ -94,12 +110,12 @@ class TestPublishDistro(TestNativePublishingBase):
 
         This method also ensures the publish-distro.py script is runnable.
         """
-        pub_source = self.getPubSource(filecontent=b'foo')
+        pub_source_id = self.getPubSource(filecontent=b'foo').id
         self.layer.txn.commit()
 
         rc, out, err = self.runPublishDistroScript()
 
-        pub_source.sync()
+        pub_source = self.loadPubSource(pub_source_id)
         self.assertEqual(0, rc, "Publisher failed with:\n%s\n%s" % (out, err))
         self.assertEqual(pub_source.status, PackagePublishingStatus.PUBLISHED)
 
@@ -112,10 +128,10 @@ class TestPublishDistro(TestNativePublishingBase):
         Make a DELETED source to see if the dirty pocket processing
         works for deletions.
         """
-        pub_source = self.getPubSource(filecontent=b'foo')
+        pub_source_id = self.getPubSource(filecontent=b'foo').id
         self.layer.txn.commit()
         self.runPublishDistro()
-        pub_source.sync()
+        pub_source = self.loadPubSource(pub_source_id)
 
         random_person = getUtility(IPersonSet).getByName('name16')
         pub_source.requestDeletion(random_person)
@@ -123,7 +139,7 @@ class TestPublishDistro(TestNativePublishingBase):
         self.assertTrue(pub_source.scheduleddeletiondate is None,
             "pub_source.scheduleddeletiondate should not be set, and it is.")
         self.runPublishDistro()
-        pub_source.sync()
+        pub_source = self.loadPubSource(pub_source_id)
         self.assertTrue(pub_source.scheduleddeletiondate is not None,
             "pub_source.scheduleddeletiondate should be set, and it's not.")
 
@@ -142,16 +158,16 @@ class TestPublishDistro(TestNativePublishingBase):
         targeted to the specified suite, other records should be untouched
         and not present in disk.
         """
-        pub_source = self.getPubSource(filecontent=b'foo')
-        pub_source2 = self.getPubSource(
+        pub_source_id = self.getPubSource(filecontent=b'foo').id
+        pub_source2_id = self.getPubSource(
             sourcename='baz', filecontent=b'baz',
-            distroseries=self.ubuntutest['hoary-test'])
+            distroseries=self.ubuntutest['hoary-test']).id
         self.layer.txn.commit()
 
         self.runPublishDistro(['-s', 'hoary-test'])
 
-        pub_source.sync()
-        pub_source2.sync()
+        pub_source = self.loadPubSource(pub_source_id)
+        pub_source2 = self.loadPubSource(pub_source2_id)
         self.assertEqual(pub_source.status, PackagePublishingStatus.PENDING)
         self.assertEqual(
             pub_source2.status, PackagePublishingStatus.PUBLISHED)
@@ -222,11 +238,11 @@ class TestPublishDistro(TestNativePublishingBase):
         """publish-distro skips PPAs that do not yet have a signing key."""
         self.setUpRequireSigningKeys()
         cprov = getUtility(IPersonSet).getByName('cprov')
-        pub_source = self.getPubSource(archive=cprov.archive)
+        pub_source_id = self.getPubSource(archive=cprov.archive).id
         removeSecurityProxy(cprov.archive).distribution = self.ubuntutest
         self.layer.txn.commit()
         self.runPublishDistro(['--ppa'])
-        pub_source.sync()
+        pub_source = self.loadPubSource(pub_source_id)
         self.assertEqual(PackagePublishingStatus.PENDING, pub_source.status)
 
     @defer.inlineCallbacks
@@ -235,18 +251,18 @@ class TestPublishDistro(TestNativePublishingBase):
 
         It should deal only with PPA publications.
         """
-        pub_source = self.getPubSource(filecontent=b'foo')
+        pub_source_id = self.getPubSource(filecontent=b'foo').id
 
         cprov = getUtility(IPersonSet).getByName('cprov')
-        pub_source2 = self.getPubSource(
-            sourcename='baz', filecontent=b'baz', archive=cprov.archive)
+        pub_source2_id = self.getPubSource(
+            sourcename='baz', filecontent=b'baz', archive=cprov.archive).id
 
         ubuntutest = getUtility(IDistributionSet)['ubuntutest']
         name16 = getUtility(IPersonSet).getByName('name16')
         getUtility(IArchiveSet).new(purpose=ArchivePurpose.PPA, owner=name16,
             distribution=ubuntutest)
-        pub_source3 = self.getPubSource(
-            sourcename='bar', filecontent=b'bar', archive=name16.archive)
+        pub_source3_id = self.getPubSource(
+            sourcename='bar', filecontent=b'bar', archive=name16.archive).id
 
         # Override PPAs distributions
         naked_archive = removeSecurityProxy(cprov.archive)
@@ -267,9 +283,9 @@ class TestPublishDistro(TestNativePublishingBase):
 
         self.runPublishDistro(['--ppa'])
 
-        pub_source.sync()
-        pub_source2.sync()
-        pub_source3.sync()
+        pub_source = self.loadPubSource(pub_source_id)
+        pub_source2 = self.loadPubSource(pub_source2_id)
+        pub_source3 = self.loadPubSource(pub_source3_id)
         self.assertEqual(pub_source.status, PackagePublishingStatus.PENDING)
         self.assertEqual(
             pub_source2.status, PackagePublishingStatus.PUBLISHED)
@@ -280,12 +296,12 @@ class TestPublishDistro(TestNativePublishingBase):
         self.assertEqual(False, os.path.exists(foo_path))
 
         baz_path = os.path.join(
-            config.personalpackagearchive.root, cprov.name,
+            config.personalpackagearchive.root, 'cprov',
             'ppa/ubuntutest/pool/main/b/baz/baz_666.dsc')
         self.assertEqual('baz', open(baz_path).read().strip())
 
         bar_path = os.path.join(
-            config.personalpackagearchive.root, name16.name,
+            config.personalpackagearchive.root, 'name16',
             'ppa/ubuntutest/pool/main/b/bar/bar_666.dsc')
         self.assertEqual('bar', open(bar_path).read().strip())
 
@@ -302,8 +318,8 @@ class TestPublishDistro(TestNativePublishingBase):
             private=True, distribution=ubuntutest)
 
         # Publish something to the private PPA:
-        pub_source = self.getPubSource(
-            sourcename='baz', filecontent=b'baz', archive=private_ppa)
+        pub_source_id = self.getPubSource(
+            sourcename='baz', filecontent=b'baz', archive=private_ppa).id
         self.layer.txn.commit()
 
         self.setUpRequireSigningKeys()
@@ -315,14 +331,14 @@ class TestPublishDistro(TestNativePublishingBase):
         # Try a plain PPA run, to ensure the private one is NOT published.
         self.runPublishDistro(['--ppa'])
 
-        pub_source.sync()
+        pub_source = self.loadPubSource(pub_source_id)
         self.assertEqual(pub_source.status, PackagePublishingStatus.PENDING)
 
         # Now publish the private PPAs and make sure they are really
         # published.
         self.runPublishDistro(['--private-ppa'])
 
-        pub_source.sync()
+        pub_source = self.loadPubSource(pub_source_id)
         self.assertEqual(pub_source.status, PackagePublishingStatus.PUBLISHED)
 
     def testPublishCopyArchive(self):
@@ -351,18 +367,20 @@ class TestPublishDistro(TestNativePublishingBase):
         removeSecurityProxy(copy_archive).publish = True
 
         # Publish something.
-        pub_source = self.getPubSource(
-            sourcename='baz', filecontent=b'baz', archive=copy_archive)
+        pub_source_id = self.getPubSource(
+            sourcename='baz', filecontent=b'baz', archive=copy_archive).id
 
         # Try a plain PPA run, to ensure the copy archive is not published.
         self.runPublishDistro(['--ppa'])
 
+        pub_source = self.loadPubSource(pub_source_id)
         self.assertEqual(pub_source.status, PackagePublishingStatus.PENDING)
 
         # Now publish the copy archives and make sure they are really
         # published.
         self.runPublishDistro(['--copy-archive'])
 
+        pub_source = self.loadPubSource(pub_source_id)
         self.assertEqual(pub_source.status, PackagePublishingStatus.PUBLISHED)
 
         # Make sure that the files were published in the right place.
@@ -409,7 +427,9 @@ class TestPublishDistro(TestNativePublishingBase):
     def testCarefulRelease(self):
         """publish-distro can be asked to just rewrite Release files."""
         archive = self.factory.makeArchive(distribution=self.ubuntutest)
-        pub_source = self.getPubSource(filecontent=b'foo', archive=archive)
+        archive_id = archive.id
+        pub_source_id = self.getPubSource(
+            filecontent=b'foo', archive=archive).id
 
         self.setUpRequireSigningKeys()
         yield self.useFixture(InProcessKeyServerFixture()).start()
@@ -421,9 +441,10 @@ class TestPublishDistro(TestNativePublishingBase):
 
         self.runPublishDistro(['--ppa'])
 
-        pub_source.sync()
+        pub_source = self.loadPubSource(pub_source_id)
         self.assertEqual(PackagePublishingStatus.PUBLISHED, pub_source.status)
 
+        archive = getUtility(IArchiveSet).get(archive_id)
         dists_path = getPubConfig(archive).distsroot
         hoary_inrelease_path = os.path.join(
             dists_path, 'hoary-test', 'InRelease')
@@ -447,11 +468,12 @@ class TestPublishDistro(TestNativePublishingBase):
 
     def testDirtySuites(self):
         """publish-distro can be told to publish specific suites."""
-        archive = self.factory.makeArchive(distribution=self.ubuntutest)
+        archive_id = self.factory.makeArchive(distribution=self.ubuntutest).id
         self.layer.txn.commit()
 
         # publish-distro has nothing to publish.
         self.runPublishDistro(['--ppa'])
+        archive = getUtility(IArchiveSet).get(archive_id)
         breezy_release_path = os.path.join(
             getPubConfig(archive).distsroot, 'breezy-autotest', 'Release')
         self.assertThat(breezy_release_path, Not(PathExists()))
@@ -466,9 +488,10 @@ class TestPublishDistro(TestNativePublishingBase):
 
 class FakeArchive:
     """A very simple fake `Archive`."""
-    def __init__(self, purpose=ArchivePurpose.PRIMARY):
+    def __init__(self, distribution, purpose=ArchivePurpose.PRIMARY):
         self.publish = True
         self.can_be_published = True
+        self.distribution = distribution
         self.purpose = purpose
         self.status = ArchiveStatus.ACTIVE
         self.dirty_suites = []
@@ -902,7 +925,7 @@ class TestPublishDistroMethods(TestCaseWithFactory):
         script = self.makeScript(distro)
         script.txn = FakeTransaction()
         publisher = FakePublisher()
-        script.publishArchive(FakeArchive(), publisher)
+        script.publishArchive(FakeArchive(distro), publisher)
         self.assertEqual(1, publisher.A_publish.call_count)
         self.assertEqual(
             1, publisher.A2_markPocketsWithDeletionsDirty.call_count)
@@ -925,7 +948,7 @@ class TestPublishDistroMethods(TestCaseWithFactory):
             script = self.makeScript(distro, args=[option])
             script.txn = FakeTransaction()
             publisher = FakePublisher()
-            script.publishArchive(FakeArchive(), publisher)
+            script.publishArchive(FakeArchive(distro), publisher)
             for check_option, steps in possible_options.items():
                 for step in steps:
                     publisher_step = getattr(publisher, step)
@@ -942,7 +965,7 @@ class TestPublishDistroMethods(TestCaseWithFactory):
         script = self.makeScript(distro)
         script.txn = FakeTransaction()
         publisher = FakePublisher()
-        script.publishArchive(FakeArchive(), publisher)
+        script.publishArchive(FakeArchive(distro), publisher)
         self.assertEqual(1, publisher.C_doFTPArchive.call_count)
         self.assertEqual(0, publisher.C_writeIndexes.call_count)
 
@@ -954,10 +977,30 @@ class TestPublishDistroMethods(TestCaseWithFactory):
         script = self.makeScript(distro)
         script.txn = FakeTransaction()
         publisher = FakePublisher()
-        script.publishArchive(FakeArchive(ArchivePurpose.PPA), publisher)
+        script.publishArchive(
+            FakeArchive(distro, ArchivePurpose.PPA), publisher)
         self.assertEqual(0, publisher.C_doFTPArchive.call_count)
         self.assertEqual(1, publisher.C_writeIndexes.call_count)
 
+    def test_processArchive_resets_store(self):
+        # The store is reset after processing each archive, as otherwise a
+        # large number of alive objects in the store can cause performance
+        # problems.
+        distro = self.makeDistro()
+        script = self.makeScript(distro)
+        script.txn = FakeTransaction()
+        archive = self.factory.makeArchive(distribution=distro)
+        archive_id = archive.id
+        publisher = FakePublisher()
+        script.getPublisher = FakeMethod(publisher)
+        script.publishArchive = FakeMethod()
+        store = Store.of(archive)
+        self.assertNotEqual({}, store._alive)
+        script.processArchive(archive_id)
+        self.assertEqual({}, store._alive)
+        [((published_archive, _), _)] = script.publishArchive.calls
+        self.assertEqual(archive, published_archive)
+
     def test_publishes_only_selected_archives(self):
         # The script publishes only the archives returned by
         # getTargetArchives, for the distributions returned by
@@ -969,7 +1012,7 @@ class TestPublishDistroMethods(TestCaseWithFactory):
         script = self.makeScript()
         script.txn = FakeTransaction()
         script.findDistros = FakeMethod([distro])
-        archive = FakeArchive()
+        archive = self.factory.makeArchive(distribution=distro)
         script.getTargetArchives = FakeMethod([archive])
         publisher = FakePublisher()
         script.getPublisher = FakeMethod(publisher)