← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~wgrant/launchpad/ss-bulk-fetch into lp:launchpad

 

William Grant has proposed merging lp:~wgrant/launchpad/ss-bulk-fetch into lp:launchpad.

Commit message:
Reduce buildd-manager's transaction usage significantly. Scans of disabled or idle+manual builders no longer hit the DB at all, with their data being pulled from the DB in a single bulk request.

Requested reviews:
  William Grant (wgrant): code

For more details, see:
https://code.launchpad.net/~wgrant/launchpad/ss-bulk-fetch/+merge/189527
-- 
https://code.launchpad.net/~wgrant/launchpad/ss-bulk-fetch/+merge/189527
Your team Launchpad code reviewers is subscribed to branch lp:launchpad.
=== modified file 'lib/lp/buildmaster/interactor.py'
--- lib/lp/buildmaster/interactor.py	2013-10-03 05:35:29 +0000
+++ lib/lp/buildmaster/interactor.py	2013-10-07 05:10:45 +0000
@@ -216,12 +216,15 @@
     ('name', 'url', 'virtualized', 'vm_host', 'builderok', 'manual',
      'build_queue'))
 
-
-def extract_vitals_from_db(builder, build_queue=None):
+_BQ_UNSPECIFIED = object()
+
+
+def extract_vitals_from_db(builder, build_queue=_BQ_UNSPECIFIED):
+    if build_queue == _BQ_UNSPECIFIED:
+        build_queue = builder.currentjob
     return BuilderVitals(
         builder.name, builder.url, builder.virtualized, builder.vm_host,
-        builder.builderok, builder.manual,
-        build_queue or builder.currentjob)
+        builder.builderok, builder.manual, build_queue)
 
 
 class BuilderInteractor(object):

=== modified file 'lib/lp/buildmaster/manager.py'
--- lib/lp/buildmaster/manager.py	2013-10-03 05:35:29 +0000
+++ lib/lp/buildmaster/manager.py	2013-10-07 05:10:45 +0000
@@ -10,8 +10,10 @@
     'BUILDD_MANAGER_LOG_NAME',
     ]
 
+import datetime
 import logging
 
+from storm.expr import LeftJoin
 import transaction
 from twisted.application import service
 from twisted.internet import (
@@ -36,6 +38,8 @@
     IBuilderSet,
     )
 from lp.buildmaster.model.builder import Builder
+from lp.buildmaster.model.buildqueue import BuildQueue
+from lp.services.database.interfaces import IStore
 from lp.services.propertycache import get_property_cache
 
 
@@ -43,19 +47,85 @@
 
 
 class BuilderFactory:
+    """A dumb builder factory that just talks to the DB."""
+
+    def update(self):
+        """Update the factory's view of the world.
+
+        For the basic BuilderFactory this is a no-op, but others might do
+        something.
+        """
+        return
+
+    def prescanUpdate(self):
+        """Update the factory's view of the world before each scan.
+
+        For the basic BuilderFactory this means ending the transaction
+        to ensure that data retrieved is up to date.
+        """
+        transaction.abort()
+
+    @property
+    def date_updated(self):
+        return datetime.datetime.utcnow()
 
     def __getitem__(self, name):
+        """Get the named `Builder` Storm object."""
         return getUtility(IBuilderSet).getByName(name)
 
     def getVitals(self, name):
+        """Get the named `BuilderVitals` object."""
         return extract_vitals_from_db(self[name])
 
     def iterVitals(self):
+        """Iterate over all `BuilderVitals` objects."""
         return (
             extract_vitals_from_db(b)
             for b in getUtility(IBuilderSet).__iter__())
 
 
+class PrefetchedBuilderFactory:
+    """A smart builder factory that does efficient bulk queries.
+
+    `getVitals` and `iterVitals` don't touch the DB directly. They work
+    from cached data updated by `update`.
+    """
+
+    date_updated = None
+
+    def update(self):
+        """See `BuilderFactory`."""
+        transaction.abort()
+        builders_and_bqs = IStore(Builder).using(
+            Builder, LeftJoin(BuildQueue, BuildQueue.builderID == Builder.id)
+            ).find((Builder, BuildQueue))
+        self.vitals_map = dict(
+            (b.name, extract_vitals_from_db(b, bq))
+            for b, bq in builders_and_bqs)
+        transaction.abort()
+        self.date_updated = datetime.datetime.utcnow()
+
+    def prescanUpdate(self):
+        """See `BuilderFactory`.
+
+        This is a no-op, as the data was already brought sufficiently up
+        to date by update().
+        """
+        return
+
+    def __getitem__(self, name):
+        """See `BuilderFactory`."""
+        return getUtility(IBuilderSet).getByName(name)
+
+    def getVitals(self, name):
+        """See `BuilderFactory`."""
+        return self.vitals_map[name]
+
+    def iterVitals(self):
+        """See `BuilderFactory`."""
+        return (b for n, b in sorted(self.vitals_map.iteritems()))
+
+
 @defer.inlineCallbacks
 def assessFailureCounts(logger, vitals, builder, slave, interactor, exception):
     """View builder/job failure_count and work out which needs to die.
@@ -158,6 +228,7 @@
             clock = reactor
         self._clock = clock
         self.date_cancel = None
+        self.date_scanned = None
 
         # We cache the build cookie, keyed on the BuildQueue, to avoid
         # hitting the DB on every scan.
@@ -176,12 +247,26 @@
         self.loop.stop()
 
     def singleCycle(self):
-        self.logger.debug("Scanning builder: %s" % self.builder_name)
+        # Inhibit scanning if the BuilderFactory hasn't updated since
+        # the last run. This doesn't matter for the base BuilderFactory,
+        # as it's always up to date, but PrefetchedBuilderFactory caches
+        # heavily, and we don't want to eg. forget that we dispatched a
+        # build in the previous cycle.
+        if (self.date_scanned is not None
+            and self.date_scanned > self.builder_factory.date_updated):
+            self.logger.debug(
+                "Skipping builder %s (cache out of date)" % self.builder_name)
+            return defer.succeed(None)
+
+        self.logger.debug("Scanning builder %s" % self.builder_name)
         d = self.scan()
-
         d.addErrback(self._scanFailed)
+        d.addBoth(self._updateDateScanned)
         return d
 
+    def _updateDateScanned(self, ignored):
+        self.date_scanned = datetime.datetime.utcnow()
+
     @defer.inlineCallbacks
     def _scanFailed(self, failure):
         """Deal with failures encountered during the scan cycle.
@@ -237,7 +322,7 @@
             by resuming a slave host, so that there is no need to update its
             status.
         """
-        if not vitals.build_queue:
+        if vitals.build_queue is None:
             self.date_cancel = None
             defer.returnValue(False)
         build = vitals.build_queue.specific_job.build
@@ -299,9 +384,7 @@
         :return: A Deferred that fires when the scan is complete.
         """
         self.logger.debug("Scanning %s." % self.builder_name)
-        # Commit and refetch the Builder object to ensure we have the
-        # latest data from the DB.
-        transaction.commit()
+        self.builder_factory.prescanUpdate()
         vitals = self.builder_factory.getVitals(self.builder_name)
         interactor = self.interactor_factory()
         slave = self.slave_factory(vitals)
@@ -360,7 +443,7 @@
     """If new builders appear, create a scanner for them."""
 
     # How often to check for new builders, in seconds.
-    SCAN_INTERVAL = 300
+    SCAN_INTERVAL = 15
 
     def __init__(self, manager, clock=None):
         self.manager = manager
@@ -369,9 +452,7 @@
         if clock is None:
             clock = reactor
         self._clock = clock
-        self.current_builders = [
-            vitals.name for vitals in
-            self.manager.builder_factory.iterVitals()]
+        self.current_builders = []
 
     def stop(self):
         """Terminate the LoopingCall."""
@@ -386,6 +467,7 @@
 
     def scan(self):
         """If a new builder appears, create a SlaveScanner for it."""
+        self.manager.builder_factory.update()
         new_builders = self.checkForNewBuilders()
         self.manager.addScanForBuilders(new_builders)
 
@@ -403,9 +485,9 @@
 class BuilddManager(service.Service):
     """Main Buildd Manager service class."""
 
-    def __init__(self, clock=None):
+    def __init__(self, clock=None, builder_factory=None):
         self.builder_slaves = []
-        self.builder_factory = BuilderFactory()
+        self.builder_factory = builder_factory or PrefetchedBuilderFactory()
         self.logger = self._setupLogger()
         self.new_builders_scanner = NewBuildersScanner(
             manager=self, clock=clock)
@@ -429,17 +511,10 @@
 
     def startService(self):
         """Service entry point, called when the application starts."""
-
-        # Get a list of builders and set up scanners on each one.
-
-        builders = [
-            vitals.name for vitals in self.builder_factory.iterVitals()]
-        self.addScanForBuilders(builders)
+        # Ask the NewBuildersScanner to add and start SlaveScanners for
+        # each current builder, and any added in the future.
         self.new_builders_scanner.scheduleScan()
 
-        # Events will now fire in the SlaveScanner objects to scan each
-        # builder.
-
     def stopService(self):
         """Callback for when we need to shut down."""
         # XXX: lacks unit tests

=== modified file 'lib/lp/buildmaster/tests/test_manager.py'
--- lib/lp/buildmaster/tests/test_manager.py	2013-10-03 05:35:29 +0000
+++ lib/lp/buildmaster/tests/test_manager.py	2013-10-07 05:10:45 +0000
@@ -12,6 +12,7 @@
     assert_fails_with,
     AsynchronousDeferredRunTest,
     )
+from testtools.matchers import Equals
 import transaction
 from twisted.internet import (
     defer,
@@ -39,6 +40,7 @@
     BuilddManager,
     BuilderFactory,
     NewBuildersScanner,
+    PrefetchedBuilderFactory,
     SlaveScanner,
     )
 from lp.buildmaster.model.builder import Builder
@@ -59,6 +61,7 @@
 from lp.testing import (
     ANONYMOUS,
     login,
+    StormStatementRecorder,
     TestCase,
     TestCaseWithFactory,
     )
@@ -70,6 +73,7 @@
     LaunchpadZopelessLayer,
     ZopelessDatabaseLayer,
     )
+from lp.testing.matchers import HasQueryCount
 from lp.testing.sampledata import BOB_THE_BUILDER_NAME
 
 
@@ -78,7 +82,7 @@
 
     This method uses the old framework for scanning and dispatching builds.
     """
-    layer = LaunchpadZopelessLayer
+    layer = ZopelessDatabaseLayer
     run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=20)
 
     def setUp(self):
@@ -93,7 +97,7 @@
         ubuntu = getUtility(IDistributionSet).getByName('ubuntu')
         hoary = ubuntu.getSeries('hoary')
         test_publisher.setUpDefaultDistroSeries(hoary)
-        test_publisher.addFakeChroots()
+        test_publisher.addFakeChroots(db_only=True)
 
     def _resetBuilder(self, builder):
         """Reset the given builder and its job."""
@@ -119,15 +123,17 @@
         self.assertEqual(build.status, BuildStatus.BUILDING)
         self.assertEqual(job.logtail, logtail)
 
-    def _getScanner(self, builder_name=None, clock=None):
+    def _getScanner(self, builder_name=None, clock=None, builder_factory=None):
         """Instantiate a SlaveScanner object.
 
         Replace its default logging handler by a testing version.
         """
         if builder_name is None:
             builder_name = BOB_THE_BUILDER_NAME
+        if builder_factory is None:
+            builder_factory = BuilderFactory()
         scanner = SlaveScanner(
-            builder_name, BuilderFactory(), BufferLogger(), clock=clock)
+            builder_name, builder_factory, BufferLogger(), clock=clock)
         scanner.logger.name = 'slave-scanner'
 
         return scanner
@@ -247,6 +253,7 @@
         factory = LaunchpadObjectFactory()
         builder = factory.makeBuilder()
         self.patch(BuilderSlave, 'makeBuilderSlave', FakeMethod(OkSlave()))
+        transaction.commit()
         scanner = self._getScanner(builder_name=builder.name)
         d = scanner.scan()
         return d.addCallback(self._checkNoDispatch, builder)
@@ -257,6 +264,7 @@
         self._resetBuilder(builder)
         self.patch(BuilderSlave, 'makeBuilderSlave', FakeMethod(OkSlave()))
         builder.manual = True
+        transaction.commit()
         scanner = self._getScanner()
         d = scanner.scan()
         d.addCallback(self._checkNoDispatch, builder)
@@ -269,6 +277,7 @@
         self._resetBuilder(builder)
         self.patch(BuilderSlave, 'makeBuilderSlave', FakeMethod(OkSlave()))
         builder.builderok = False
+        transaction.commit()
         scanner = self._getScanner()
         yield scanner.scan()
         # Because the builder is not ok, we can't use _checkNoDispatch.
@@ -280,11 +289,62 @@
         self.patch(
             BuilderSlave, 'makeBuilderSlave', FakeMethod(BrokenSlave()))
         builder.failure_count = 0
+        transaction.commit()
         scanner = self._getScanner(builder_name=builder.name)
         d = scanner.scan()
         return assert_fails_with(d, xmlrpclib.Fault)
 
     @defer.inlineCallbacks
+    def test_scan_calls_builder_factory_prescanUpdate(self):
+        # SlaveScanner.scan() starts by calling
+        # BuilderFactory.prescanUpdate() to eg. perform necessary
+        # transaction management.
+        bf = BuilderFactory()
+        bf.prescanUpdate = FakeMethod()
+        scanner = self._getScanner(builder_factory=bf)
+
+        # Disable the builder so we don't try to use the slave. It's not
+        # relevant for this test.
+        builder = getUtility(IBuilderSet)[BOB_THE_BUILDER_NAME]
+        builder.builderok = False
+        transaction.commit()
+
+        yield scanner.scan()
+
+        self.assertEqual(1, bf.prescanUpdate.call_count)
+
+    @defer.inlineCallbacks
+    def test_scan_skipped_if_builderfactory_stale(self):
+        # singleCycle does nothing if the BuilderFactory's update
+        # timestamp is older than the end of the previous scan. This
+        # prevents eg. a scan after a dispatch from failing to notice
+        # that a build has been dispatched.
+        pbf = PrefetchedBuilderFactory()
+        pbf.update()
+        scanner = self._getScanner(builder_factory=pbf)
+        fake_scan = FakeMethod()
+
+        def _fake_scan():
+            fake_scan()
+            return defer.succeed(None)
+        scanner.scan = _fake_scan
+        self.assertEqual(0, fake_scan.call_count)
+
+        # An initial cycle triggers a scan.
+        yield scanner.singleCycle()
+        self.assertEqual(1, fake_scan.call_count)
+
+        # But a subsequent cycle without updating BuilderFactory's data
+        # is a no-op.
+        yield scanner.singleCycle()
+        self.assertEqual(1, fake_scan.call_count)
+
+        # Updating the BuilderFactory causes scans to resume.
+        pbf.update()
+        yield scanner.singleCycle()
+        self.assertEqual(2, fake_scan.call_count)
+
+    @defer.inlineCallbacks
     def _assertFailureCounting(self, builder_count, job_count,
                                expected_builder_count, expected_job_count):
         # If scan() fails with an exception, failure_counts should be
@@ -304,7 +364,7 @@
         naked_job.build.failure_count = job_count
         # The _scanFailed() calls abort, so make sure our existing
         # failure counts are persisted.
-        self.layer.txn.commit()
+        transaction.commit()
 
         # singleCycle() calls scan() which is our fake one that throws an
         # exception.
@@ -350,7 +410,7 @@
         builder.failure_count = (
             Builder.RESET_THRESHOLD * Builder.RESET_FAILURE_THRESHOLD)
         builder.currentjob.reset()
-        self.layer.txn.commit()
+        transaction.commit()
 
         yield scanner.singleCycle()
         self.assertFalse(builder.builderok)
@@ -379,6 +439,7 @@
         job = removeSecurityProxy(builder._findBuildCandidate())
         job.virtualized = True
         builder.virtualized = True
+        transaction.commit()
         yield scanner.singleCycle()
 
         # The failure_count will have been incremented on the builder, we
@@ -443,6 +504,94 @@
         self.assertEqual(BuildStatus.CANCELLED, build.status)
 
 
+class TestPrefetchedBuilderFactory(TestCaseWithFactory):
+
+    layer = ZopelessDatabaseLayer
+
+    def test_get(self):
+        # PrefetchedBuilderFactory.__getitem__ is unoptimised, just
+        # querying and returning the named builder.
+        builder = self.factory.makeBuilder()
+        pbf = PrefetchedBuilderFactory()
+        self.assertEqual(builder, pbf[builder.name])
+
+    def test_update(self):
+        # update grabs all of the Builders and their BuildQueues in a
+        # single query.
+        builders = [self.factory.makeBuilder() for i in range(5)]
+        for i in range(3):
+            bq = self.factory.makeBinaryPackageBuild().queueBuild()
+            bq.markAsBuilding(builders[i])
+        pbf = PrefetchedBuilderFactory()
+        transaction.commit()
+        pbf.update()
+        with StormStatementRecorder() as recorder:
+            pbf.update()
+        self.assertThat(recorder, HasQueryCount(Equals(1)))
+
+    def test_getVitals(self):
+        # PrefetchedBuilderFactory.getVitals looks up the BuilderVitals
+        # in a local cached map, without hitting the DB.
+        builder = self.factory.makeBuilder()
+        bq = self.factory.makeBinaryPackageBuild().queueBuild()
+        bq.markAsBuilding(builder)
+        transaction.commit()
+        name = builder.name
+        pbf = PrefetchedBuilderFactory()
+        pbf.update()
+
+        def assertQuerylessVitals(comparator):
+            expected_vitals = extract_vitals_from_db(builder)
+            transaction.commit()
+            with StormStatementRecorder() as recorder:
+                got_vitals = pbf.getVitals(name)
+                comparator(expected_vitals, got_vitals)
+                comparator(expected_vitals.build_queue, got_vitals.build_queue)
+            self.assertThat(recorder, HasQueryCount(Equals(0)))
+            return got_vitals
+
+        # We can get the vitals of a builder from the factory without
+        # any DB queries.
+        vitals = assertQuerylessVitals(self.assertEqual)
+        self.assertIsNot(None, vitals.build_queue)
+
+        # If we cancel the BuildQueue to unassign it, the factory
+        # doesn't notice immediately.
+        bq.cancel()
+        vitals = assertQuerylessVitals(self.assertNotEqual)
+        self.assertIsNot(None, vitals.build_queue)
+
+        # But the vitals will show the builder as idle if we ask the
+        # factory to refetch.
+        pbf.update()
+        vitals = assertQuerylessVitals(self.assertEqual)
+        self.assertIs(None, vitals.build_queue)
+
+    def test_iterVitals(self):
+        # PrefetchedBuilderFactory.iterVitals looks up the details from
+        # the local cached map, without hitting the DB.
+
+        # Construct 5 new builders, 3 with builds. This is in addition
+        # to the 2 in sampledata, 1 with a build.
+        builders = [self.factory.makeBuilder() for i in range(5)]
+        for i in range(3):
+            bq = self.factory.makeBinaryPackageBuild().queueBuild()
+            bq.markAsBuilding(builders[i])
+        transaction.commit()
+        pbf = PrefetchedBuilderFactory()
+        pbf.update()
+
+        with StormStatementRecorder() as recorder:
+            all_vitals = list(pbf.iterVitals())
+        self.assertThat(recorder, HasQueryCount(Equals(0)))
+        # Compare the counts with what we expect, and the full result
+        # with the non-prefetching BuilderFactory.
+        self.assertEqual(7, len(all_vitals))
+        self.assertEqual(
+            4, len([v for v in all_vitals if v.build_queue is not None]))
+        self.assertContentEqual(BuilderFactory().iterVitals(), all_vitals)
+
+
 class FakeBuildQueue:
 
     def __init__(self):
@@ -451,12 +600,19 @@
 
 
 class MockBuilderFactory:
+    """A mock builder factory which uses a preset Builder and BuildQueue."""
 
     def __init__(self, builder, build_queue):
         self.updateTestData(builder, build_queue)
         self.get_call_count = 0
         self.getVitals_call_count = 0
 
+    def update(self):
+        return
+
+    def prescanUpdate(self):
+        return
+
     def updateTestData(self, builder, build_queue):
         self._builder = builder
         self._build_queue = build_queue
@@ -809,14 +965,11 @@
     layer = LaunchpadZopelessLayer
 
     def _getScanner(self, clock=None):
-        return NewBuildersScanner(manager=BuilddManager(), clock=clock)
-
-    def test_init_stores_existing_builders(self):
-        # Make sure that NewBuildersScanner initializes itself properly
-        # by storing a list of existing builders.
-        all_builders = [builder.name for builder in getUtility(IBuilderSet)]
-        builder_scanner = self._getScanner()
-        self.assertEqual(all_builders, builder_scanner.current_builders)
+        nbs = NewBuildersScanner(
+            manager=BuilddManager(builder_factory=BuilderFactory()),
+            clock=clock)
+        nbs.checkForNewBuilders()
+        return nbs
 
     def test_scheduleScan(self):
         # Test that scheduleScan calls the "scan" method.
@@ -849,6 +1002,7 @@
     def test_checkForNewBuilders_detects_builder_only_once(self):
         # checkForNewBuilders() only detects a new builder once.
         builder_scanner = self._getScanner()
+        self.assertEqual([], builder_scanner.checkForNewBuilders())
         LaunchpadObjectFactory().makeBuilder(name="sammy")
         self.assertEqual(["sammy"], builder_scanner.checkForNewBuilders())
         self.assertEqual([], builder_scanner.checkForNewBuilders())

=== modified file 'lib/lp/soyuz/tests/test_publishing.py'
--- lib/lp/soyuz/tests/test_publishing.py	2013-09-13 06:20:49 +0000
+++ lib/lp/soyuz/tests/test_publishing.py	2013-10-07 05:10:45 +0000
@@ -131,11 +131,15 @@
         self.breezy_autotest_i386.addOrUpdateChroot(fake_chroot)
         self.breezy_autotest_hppa.addOrUpdateChroot(fake_chroot)
 
-    def addFakeChroots(self, distroseries=None):
+    def addFakeChroots(self, distroseries=None, db_only=False):
         """Add fake chroots for all the architectures in distroseries."""
         if distroseries is None:
             distroseries = self.distroseries
-        fake_chroot = self.addMockFile('fake_chroot.tar.gz')
+        if db_only:
+            fake_chroot = self.factory.makeLibraryFileAlias(
+                filename='fake_chroot.tar.gz', db_only=True)
+        else:
+            fake_chroot = self.addMockFile('fake_chroot.tar.gz')
         for arch in distroseries.architectures:
             arch.addOrUpdateChroot(fake_chroot)
 


References