launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #16054
[Merge] lp:~wgrant/launchpad/ss-bulk-fetch into lp:launchpad
William Grant has proposed merging lp:~wgrant/launchpad/ss-bulk-fetch into lp:launchpad.
Commit message:
Reduce buildd-manager's transaction usage significantly. Scans of disabled or idle+manual builders no longer hit the DB at all, with their data being pulled from the DB in a single bulk request.
Requested reviews:
William Grant (wgrant): code
For more details, see:
https://code.launchpad.net/~wgrant/launchpad/ss-bulk-fetch/+merge/189527
--
https://code.launchpad.net/~wgrant/launchpad/ss-bulk-fetch/+merge/189527
Your team Launchpad code reviewers is subscribed to branch lp:launchpad.
=== modified file 'lib/lp/buildmaster/interactor.py'
--- lib/lp/buildmaster/interactor.py 2013-10-03 05:35:29 +0000
+++ lib/lp/buildmaster/interactor.py 2013-10-07 05:10:45 +0000
@@ -216,12 +216,15 @@
('name', 'url', 'virtualized', 'vm_host', 'builderok', 'manual',
'build_queue'))
-
-def extract_vitals_from_db(builder, build_queue=None):
+_BQ_UNSPECIFIED = object()
+
+
+def extract_vitals_from_db(builder, build_queue=_BQ_UNSPECIFIED):
+ if build_queue == _BQ_UNSPECIFIED:
+ build_queue = builder.currentjob
return BuilderVitals(
builder.name, builder.url, builder.virtualized, builder.vm_host,
- builder.builderok, builder.manual,
- build_queue or builder.currentjob)
+ builder.builderok, builder.manual, build_queue)
class BuilderInteractor(object):
=== modified file 'lib/lp/buildmaster/manager.py'
--- lib/lp/buildmaster/manager.py 2013-10-03 05:35:29 +0000
+++ lib/lp/buildmaster/manager.py 2013-10-07 05:10:45 +0000
@@ -10,8 +10,10 @@
'BUILDD_MANAGER_LOG_NAME',
]
+import datetime
import logging
+from storm.expr import LeftJoin
import transaction
from twisted.application import service
from twisted.internet import (
@@ -36,6 +38,8 @@
IBuilderSet,
)
from lp.buildmaster.model.builder import Builder
+from lp.buildmaster.model.buildqueue import BuildQueue
+from lp.services.database.interfaces import IStore
from lp.services.propertycache import get_property_cache
@@ -43,19 +47,85 @@
class BuilderFactory:
+ """A dumb builder factory that just talks to the DB."""
+
+ def update(self):
+ """Update the factory's view of the world.
+
+ For the basic BuilderFactory this is a no-op, but others might do
+ something.
+ """
+ return
+
+ def prescanUpdate(self):
+ """Update the factory's view of the world before each scan.
+
+ For the basic BuilderFactory this means ending the transaction
+ to ensure that data retrieved is up to date.
+ """
+ transaction.abort()
+
+ @property
+ def date_updated(self):
+ return datetime.datetime.utcnow()
def __getitem__(self, name):
+ """Get the named `Builder` Storm object."""
return getUtility(IBuilderSet).getByName(name)
def getVitals(self, name):
+ """Get the named `BuilderVitals` object."""
return extract_vitals_from_db(self[name])
def iterVitals(self):
+ """Iterate over all `BuilderVitals` objects."""
return (
extract_vitals_from_db(b)
for b in getUtility(IBuilderSet).__iter__())
+class PrefetchedBuilderFactory:
+ """A smart builder factory that does efficient bulk queries.
+
+ `getVitals` and `iterVitals` don't touch the DB directly. They work
+ from cached data updated by `update`.
+ """
+
+ date_updated = None
+
+ def update(self):
+ """See `BuilderFactory`."""
+ transaction.abort()
+ builders_and_bqs = IStore(Builder).using(
+ Builder, LeftJoin(BuildQueue, BuildQueue.builderID == Builder.id)
+ ).find((Builder, BuildQueue))
+ self.vitals_map = dict(
+ (b.name, extract_vitals_from_db(b, bq))
+ for b, bq in builders_and_bqs)
+ transaction.abort()
+ self.date_updated = datetime.datetime.utcnow()
+
+ def prescanUpdate(self):
+ """See `BuilderFactory`.
+
+ This is a no-op, as the data was already brought sufficiently up
+ to date by update().
+ """
+ return
+
+ def __getitem__(self, name):
+ """See `BuilderFactory`."""
+ return getUtility(IBuilderSet).getByName(name)
+
+ def getVitals(self, name):
+ """See `BuilderFactory`."""
+ return self.vitals_map[name]
+
+ def iterVitals(self):
+ """See `BuilderFactory`."""
+ return (b for n, b in sorted(self.vitals_map.iteritems()))
+
+
@defer.inlineCallbacks
def assessFailureCounts(logger, vitals, builder, slave, interactor, exception):
"""View builder/job failure_count and work out which needs to die.
@@ -158,6 +228,7 @@
clock = reactor
self._clock = clock
self.date_cancel = None
+ self.date_scanned = None
# We cache the build cookie, keyed on the BuildQueue, to avoid
# hitting the DB on every scan.
@@ -176,12 +247,26 @@
self.loop.stop()
def singleCycle(self):
- self.logger.debug("Scanning builder: %s" % self.builder_name)
+ # Inhibit scanning if the BuilderFactory hasn't updated since
+ # the last run. This doesn't matter for the base BuilderFactory,
+ # as it's always up to date, but PrefetchedBuilderFactory caches
+ # heavily, and we don't want to eg. forget that we dispatched a
+ # build in the previous cycle.
+ if (self.date_scanned is not None
+ and self.date_scanned > self.builder_factory.date_updated):
+ self.logger.debug(
+ "Skipping builder %s (cache out of date)" % self.builder_name)
+ return defer.succeed(None)
+
+ self.logger.debug("Scanning builder %s" % self.builder_name)
d = self.scan()
-
d.addErrback(self._scanFailed)
+ d.addBoth(self._updateDateScanned)
return d
+ def _updateDateScanned(self, ignored):
+ self.date_scanned = datetime.datetime.utcnow()
+
@defer.inlineCallbacks
def _scanFailed(self, failure):
"""Deal with failures encountered during the scan cycle.
@@ -237,7 +322,7 @@
by resuming a slave host, so that there is no need to update its
status.
"""
- if not vitals.build_queue:
+ if vitals.build_queue is None:
self.date_cancel = None
defer.returnValue(False)
build = vitals.build_queue.specific_job.build
@@ -299,9 +384,7 @@
:return: A Deferred that fires when the scan is complete.
"""
self.logger.debug("Scanning %s." % self.builder_name)
- # Commit and refetch the Builder object to ensure we have the
- # latest data from the DB.
- transaction.commit()
+ self.builder_factory.prescanUpdate()
vitals = self.builder_factory.getVitals(self.builder_name)
interactor = self.interactor_factory()
slave = self.slave_factory(vitals)
@@ -360,7 +443,7 @@
"""If new builders appear, create a scanner for them."""
# How often to check for new builders, in seconds.
- SCAN_INTERVAL = 300
+ SCAN_INTERVAL = 15
def __init__(self, manager, clock=None):
self.manager = manager
@@ -369,9 +452,7 @@
if clock is None:
clock = reactor
self._clock = clock
- self.current_builders = [
- vitals.name for vitals in
- self.manager.builder_factory.iterVitals()]
+ self.current_builders = []
def stop(self):
"""Terminate the LoopingCall."""
@@ -386,6 +467,7 @@
def scan(self):
"""If a new builder appears, create a SlaveScanner for it."""
+ self.manager.builder_factory.update()
new_builders = self.checkForNewBuilders()
self.manager.addScanForBuilders(new_builders)
@@ -403,9 +485,9 @@
class BuilddManager(service.Service):
"""Main Buildd Manager service class."""
- def __init__(self, clock=None):
+ def __init__(self, clock=None, builder_factory=None):
self.builder_slaves = []
- self.builder_factory = BuilderFactory()
+ self.builder_factory = builder_factory or PrefetchedBuilderFactory()
self.logger = self._setupLogger()
self.new_builders_scanner = NewBuildersScanner(
manager=self, clock=clock)
@@ -429,17 +511,10 @@
def startService(self):
"""Service entry point, called when the application starts."""
-
- # Get a list of builders and set up scanners on each one.
-
- builders = [
- vitals.name for vitals in self.builder_factory.iterVitals()]
- self.addScanForBuilders(builders)
+ # Ask the NewBuildersScanner to add and start SlaveScanners for
+ # each current builder, and any added in the future.
self.new_builders_scanner.scheduleScan()
- # Events will now fire in the SlaveScanner objects to scan each
- # builder.
-
def stopService(self):
"""Callback for when we need to shut down."""
# XXX: lacks unit tests
=== modified file 'lib/lp/buildmaster/tests/test_manager.py'
--- lib/lp/buildmaster/tests/test_manager.py 2013-10-03 05:35:29 +0000
+++ lib/lp/buildmaster/tests/test_manager.py 2013-10-07 05:10:45 +0000
@@ -12,6 +12,7 @@
assert_fails_with,
AsynchronousDeferredRunTest,
)
+from testtools.matchers import Equals
import transaction
from twisted.internet import (
defer,
@@ -39,6 +40,7 @@
BuilddManager,
BuilderFactory,
NewBuildersScanner,
+ PrefetchedBuilderFactory,
SlaveScanner,
)
from lp.buildmaster.model.builder import Builder
@@ -59,6 +61,7 @@
from lp.testing import (
ANONYMOUS,
login,
+ StormStatementRecorder,
TestCase,
TestCaseWithFactory,
)
@@ -70,6 +73,7 @@
LaunchpadZopelessLayer,
ZopelessDatabaseLayer,
)
+from lp.testing.matchers import HasQueryCount
from lp.testing.sampledata import BOB_THE_BUILDER_NAME
@@ -78,7 +82,7 @@
This method uses the old framework for scanning and dispatching builds.
"""
- layer = LaunchpadZopelessLayer
+ layer = ZopelessDatabaseLayer
run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=20)
def setUp(self):
@@ -93,7 +97,7 @@
ubuntu = getUtility(IDistributionSet).getByName('ubuntu')
hoary = ubuntu.getSeries('hoary')
test_publisher.setUpDefaultDistroSeries(hoary)
- test_publisher.addFakeChroots()
+ test_publisher.addFakeChroots(db_only=True)
def _resetBuilder(self, builder):
"""Reset the given builder and its job."""
@@ -119,15 +123,17 @@
self.assertEqual(build.status, BuildStatus.BUILDING)
self.assertEqual(job.logtail, logtail)
- def _getScanner(self, builder_name=None, clock=None):
+ def _getScanner(self, builder_name=None, clock=None, builder_factory=None):
"""Instantiate a SlaveScanner object.
Replace its default logging handler by a testing version.
"""
if builder_name is None:
builder_name = BOB_THE_BUILDER_NAME
+ if builder_factory is None:
+ builder_factory = BuilderFactory()
scanner = SlaveScanner(
- builder_name, BuilderFactory(), BufferLogger(), clock=clock)
+ builder_name, builder_factory, BufferLogger(), clock=clock)
scanner.logger.name = 'slave-scanner'
return scanner
@@ -247,6 +253,7 @@
factory = LaunchpadObjectFactory()
builder = factory.makeBuilder()
self.patch(BuilderSlave, 'makeBuilderSlave', FakeMethod(OkSlave()))
+ transaction.commit()
scanner = self._getScanner(builder_name=builder.name)
d = scanner.scan()
return d.addCallback(self._checkNoDispatch, builder)
@@ -257,6 +264,7 @@
self._resetBuilder(builder)
self.patch(BuilderSlave, 'makeBuilderSlave', FakeMethod(OkSlave()))
builder.manual = True
+ transaction.commit()
scanner = self._getScanner()
d = scanner.scan()
d.addCallback(self._checkNoDispatch, builder)
@@ -269,6 +277,7 @@
self._resetBuilder(builder)
self.patch(BuilderSlave, 'makeBuilderSlave', FakeMethod(OkSlave()))
builder.builderok = False
+ transaction.commit()
scanner = self._getScanner()
yield scanner.scan()
# Because the builder is not ok, we can't use _checkNoDispatch.
@@ -280,11 +289,62 @@
self.patch(
BuilderSlave, 'makeBuilderSlave', FakeMethod(BrokenSlave()))
builder.failure_count = 0
+ transaction.commit()
scanner = self._getScanner(builder_name=builder.name)
d = scanner.scan()
return assert_fails_with(d, xmlrpclib.Fault)
@defer.inlineCallbacks
+ def test_scan_calls_builder_factory_prescanUpdate(self):
+ # SlaveScanner.scan() starts by calling
+ # BuilderFactory.prescanUpdate() to eg. perform necessary
+ # transaction management.
+ bf = BuilderFactory()
+ bf.prescanUpdate = FakeMethod()
+ scanner = self._getScanner(builder_factory=bf)
+
+ # Disable the builder so we don't try to use the slave. It's not
+ # relevant for this test.
+ builder = getUtility(IBuilderSet)[BOB_THE_BUILDER_NAME]
+ builder.builderok = False
+ transaction.commit()
+
+ yield scanner.scan()
+
+ self.assertEqual(1, bf.prescanUpdate.call_count)
+
+ @defer.inlineCallbacks
+ def test_scan_skipped_if_builderfactory_stale(self):
+ # singleCycle does nothing if the BuilderFactory's update
+ # timestamp is older than the end of the previous scan. This
+ # prevents eg. a scan after a dispatch from failing to notice
+ # that a build has been dispatched.
+ pbf = PrefetchedBuilderFactory()
+ pbf.update()
+ scanner = self._getScanner(builder_factory=pbf)
+ fake_scan = FakeMethod()
+
+ def _fake_scan():
+ fake_scan()
+ return defer.succeed(None)
+ scanner.scan = _fake_scan
+ self.assertEqual(0, fake_scan.call_count)
+
+ # An initial cycle triggers a scan.
+ yield scanner.singleCycle()
+ self.assertEqual(1, fake_scan.call_count)
+
+ # But a subsequent cycle without updating BuilderFactory's data
+ # is a no-op.
+ yield scanner.singleCycle()
+ self.assertEqual(1, fake_scan.call_count)
+
+ # Updating the BuilderFactory causes scans to resume.
+ pbf.update()
+ yield scanner.singleCycle()
+ self.assertEqual(2, fake_scan.call_count)
+
+ @defer.inlineCallbacks
def _assertFailureCounting(self, builder_count, job_count,
expected_builder_count, expected_job_count):
# If scan() fails with an exception, failure_counts should be
@@ -304,7 +364,7 @@
naked_job.build.failure_count = job_count
# The _scanFailed() calls abort, so make sure our existing
# failure counts are persisted.
- self.layer.txn.commit()
+ transaction.commit()
# singleCycle() calls scan() which is our fake one that throws an
# exception.
@@ -350,7 +410,7 @@
builder.failure_count = (
Builder.RESET_THRESHOLD * Builder.RESET_FAILURE_THRESHOLD)
builder.currentjob.reset()
- self.layer.txn.commit()
+ transaction.commit()
yield scanner.singleCycle()
self.assertFalse(builder.builderok)
@@ -379,6 +439,7 @@
job = removeSecurityProxy(builder._findBuildCandidate())
job.virtualized = True
builder.virtualized = True
+ transaction.commit()
yield scanner.singleCycle()
# The failure_count will have been incremented on the builder, we
@@ -443,6 +504,94 @@
self.assertEqual(BuildStatus.CANCELLED, build.status)
+class TestPrefetchedBuilderFactory(TestCaseWithFactory):
+
+ layer = ZopelessDatabaseLayer
+
+ def test_get(self):
+ # PrefetchedBuilderFactory.__getitem__ is unoptimised, just
+ # querying and returning the named builder.
+ builder = self.factory.makeBuilder()
+ pbf = PrefetchedBuilderFactory()
+ self.assertEqual(builder, pbf[builder.name])
+
+ def test_update(self):
+ # update grabs all of the Builders and their BuildQueues in a
+ # single query.
+ builders = [self.factory.makeBuilder() for i in range(5)]
+ for i in range(3):
+ bq = self.factory.makeBinaryPackageBuild().queueBuild()
+ bq.markAsBuilding(builders[i])
+ pbf = PrefetchedBuilderFactory()
+ transaction.commit()
+ pbf.update()
+ with StormStatementRecorder() as recorder:
+ pbf.update()
+ self.assertThat(recorder, HasQueryCount(Equals(1)))
+
+ def test_getVitals(self):
+ # PrefetchedBuilderFactory.getVitals looks up the BuilderVitals
+ # in a local cached map, without hitting the DB.
+ builder = self.factory.makeBuilder()
+ bq = self.factory.makeBinaryPackageBuild().queueBuild()
+ bq.markAsBuilding(builder)
+ transaction.commit()
+ name = builder.name
+ pbf = PrefetchedBuilderFactory()
+ pbf.update()
+
+ def assertQuerylessVitals(comparator):
+ expected_vitals = extract_vitals_from_db(builder)
+ transaction.commit()
+ with StormStatementRecorder() as recorder:
+ got_vitals = pbf.getVitals(name)
+ comparator(expected_vitals, got_vitals)
+ comparator(expected_vitals.build_queue, got_vitals.build_queue)
+ self.assertThat(recorder, HasQueryCount(Equals(0)))
+ return got_vitals
+
+ # We can get the vitals of a builder from the factory without
+ # any DB queries.
+ vitals = assertQuerylessVitals(self.assertEqual)
+ self.assertIsNot(None, vitals.build_queue)
+
+ # If we cancel the BuildQueue to unassign it, the factory
+ # doesn't notice immediately.
+ bq.cancel()
+ vitals = assertQuerylessVitals(self.assertNotEqual)
+ self.assertIsNot(None, vitals.build_queue)
+
+ # But the vitals will show the builder as idle if we ask the
+ # factory to refetch.
+ pbf.update()
+ vitals = assertQuerylessVitals(self.assertEqual)
+ self.assertIs(None, vitals.build_queue)
+
+ def test_iterVitals(self):
+ # PrefetchedBuilderFactory.iterVitals looks up the details from
+ # the local cached map, without hitting the DB.
+
+ # Construct 5 new builders, 3 with builds. This is in addition
+ # to the 2 in sampledata, 1 with a build.
+ builders = [self.factory.makeBuilder() for i in range(5)]
+ for i in range(3):
+ bq = self.factory.makeBinaryPackageBuild().queueBuild()
+ bq.markAsBuilding(builders[i])
+ transaction.commit()
+ pbf = PrefetchedBuilderFactory()
+ pbf.update()
+
+ with StormStatementRecorder() as recorder:
+ all_vitals = list(pbf.iterVitals())
+ self.assertThat(recorder, HasQueryCount(Equals(0)))
+ # Compare the counts with what we expect, and the full result
+ # with the non-prefetching BuilderFactory.
+ self.assertEqual(7, len(all_vitals))
+ self.assertEqual(
+ 4, len([v for v in all_vitals if v.build_queue is not None]))
+ self.assertContentEqual(BuilderFactory().iterVitals(), all_vitals)
+
+
class FakeBuildQueue:
def __init__(self):
@@ -451,12 +600,19 @@
class MockBuilderFactory:
+ """A mock builder factory which uses a preset Builder and BuildQueue."""
def __init__(self, builder, build_queue):
self.updateTestData(builder, build_queue)
self.get_call_count = 0
self.getVitals_call_count = 0
+ def update(self):
+ return
+
+ def prescanUpdate(self):
+ return
+
def updateTestData(self, builder, build_queue):
self._builder = builder
self._build_queue = build_queue
@@ -809,14 +965,11 @@
layer = LaunchpadZopelessLayer
def _getScanner(self, clock=None):
- return NewBuildersScanner(manager=BuilddManager(), clock=clock)
-
- def test_init_stores_existing_builders(self):
- # Make sure that NewBuildersScanner initializes itself properly
- # by storing a list of existing builders.
- all_builders = [builder.name for builder in getUtility(IBuilderSet)]
- builder_scanner = self._getScanner()
- self.assertEqual(all_builders, builder_scanner.current_builders)
+ nbs = NewBuildersScanner(
+ manager=BuilddManager(builder_factory=BuilderFactory()),
+ clock=clock)
+ nbs.checkForNewBuilders()
+ return nbs
def test_scheduleScan(self):
# Test that scheduleScan calls the "scan" method.
@@ -849,6 +1002,7 @@
def test_checkForNewBuilders_detects_builder_only_once(self):
# checkForNewBuilders() only detects a new builder once.
builder_scanner = self._getScanner()
+ self.assertEqual([], builder_scanner.checkForNewBuilders())
LaunchpadObjectFactory().makeBuilder(name="sammy")
self.assertEqual(["sammy"], builder_scanner.checkForNewBuilders())
self.assertEqual([], builder_scanner.checkForNewBuilders())
=== modified file 'lib/lp/soyuz/tests/test_publishing.py'
--- lib/lp/soyuz/tests/test_publishing.py 2013-09-13 06:20:49 +0000
+++ lib/lp/soyuz/tests/test_publishing.py 2013-10-07 05:10:45 +0000
@@ -131,11 +131,15 @@
self.breezy_autotest_i386.addOrUpdateChroot(fake_chroot)
self.breezy_autotest_hppa.addOrUpdateChroot(fake_chroot)
- def addFakeChroots(self, distroseries=None):
+ def addFakeChroots(self, distroseries=None, db_only=False):
"""Add fake chroots for all the architectures in distroseries."""
if distroseries is None:
distroseries = self.distroseries
- fake_chroot = self.addMockFile('fake_chroot.tar.gz')
+ if db_only:
+ fake_chroot = self.factory.makeLibraryFileAlias(
+ filename='fake_chroot.tar.gz', db_only=True)
+ else:
+ fake_chroot = self.addMockFile('fake_chroot.tar.gz')
for arch in distroseries.architectures:
arch.addOrUpdateChroot(fake_chroot)
References