← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:bulk-logtail-update into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:bulk-logtail-update into launchpad:master.

Commit message:
Perform build queue logtail updates in bulk

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/384035

For better or worse, buildd-manager currently does its database work on the reactor thread.  Batching up the logtail updates reduces the number of non-trivial commits it needs to make in the common case where lots of builders are in the building status but relatively few are changing status.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:bulk-logtail-update into launchpad:master.
diff --git a/lib/lp/buildmaster/interactor.py b/lib/lp/buildmaster/interactor.py
index 4f809c4..d02249b 100644
--- a/lib/lp/buildmaster/interactor.py
+++ b/lib/lp/buildmaster/interactor.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 __metaclass__ = type
@@ -539,10 +539,37 @@ class BuilderInteractor(object):
             "Malformed status string: '%s'" % status_string)
         return status_string[len(lead_string):]
 
+    @staticmethod
+    def extractLogTail(slave_status):
+        """Extract the log tail from a builder status response.
+
+        :param slave_status: build status dict from BuilderSlave.status.
+        :return: a text string representing the tail of the build log, or
+            None if the log tail is unavailable and should be left
+            unchanged.
+        """
+        builder_status = slave_status["builder_status"]
+        if builder_status == "BuilderStatus.ABORTING":
+            logtail = "Waiting for slave process to be terminated"
+        elif slave_status.get("logtail") is not None:
+            # slave_status["logtail"] is normally an xmlrpc_client.Binary
+            # instance, and the contents might include invalid UTF-8 due to
+            # being a fixed number of bytes from the tail of the log.  Turn
+            # it into Unicode as best we can.
+            logtail = bytes(
+                slave_status.get("logtail")).decode("UTF-8", errors="replace")
+            # PostgreSQL text columns can't contain \0 characters, and since
+            # we only use this for web UI display purposes there's no point
+            # in going through contortions to store them.
+            logtail = logtail.replace("\0", "")
+        else:
+            logtail = None
+        return logtail
+
     @classmethod
     @defer.inlineCallbacks
     def updateBuild(cls, vitals, slave, slave_status, builder_factory,
-                    behaviour_factory):
+                    behaviour_factory, log_tail_updater):
         """Verify the current build job status.
 
         Perform the required actions for each state.
@@ -556,7 +583,10 @@ class BuilderInteractor(object):
         builder_status = slave_status['builder_status']
         if builder_status in (
                 'BuilderStatus.BUILDING', 'BuilderStatus.ABORTING'):
-            vitals.build_queue.collectStatus(slave_status)
+            logtail = cls.extractLogTail(slave_status)
+            if logtail is not None:
+                log_tail_updater.pending_updates[vitals.build_queue.id] = (
+                    logtail)
             vitals.build_queue.specific_build.updateStatus(
                 vitals.build_queue.specific_build.status,
                 slave_status=slave_status)
diff --git a/lib/lp/buildmaster/interfaces/buildqueue.py b/lib/lp/buildmaster/interfaces/buildqueue.py
index 4fb47f6..9bcf346 100644
--- a/lib/lp/buildmaster/interfaces/buildqueue.py
+++ b/lib/lp/buildmaster/interfaces/buildqueue.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2017 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Build interfaces."""
@@ -86,9 +86,6 @@ class IBuildQueue(Interface):
     def markAsBuilding(builder):
         """Set this queue item to a 'building' state."""
 
-    def collectStatus(slave_status):
-        """Collect status information from the builder."""
-
     def suspend():
         """Suspend this waiting job, removing it from the active queue."""
 
diff --git a/lib/lp/buildmaster/manager.py b/lib/lp/buildmaster/manager.py
index 3c80622..8d9ffca 100644
--- a/lib/lp/buildmaster/manager.py
+++ b/lib/lp/buildmaster/manager.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2016 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Soyuz buildd slave manager logic."""
@@ -16,7 +16,11 @@ import functools
 import logging
 
 import six
-from storm.expr import LeftJoin
+from storm.expr import (
+    Column,
+    LeftJoin,
+    Table,
+    )
 import transaction
 from twisted.application import service
 from twisted.internet import (
@@ -47,7 +51,12 @@ from lp.buildmaster.interfaces.builder import (
     )
 from lp.buildmaster.model.builder import Builder
 from lp.buildmaster.model.buildqueue import BuildQueue
+from lp.services.database.bulk import dbify_value
 from lp.services.database.interfaces import IStore
+from lp.services.database.stormexpr import (
+    BulkUpdate,
+    Values,
+    )
 from lp.services.propertycache import get_property_cache
 
 
@@ -278,12 +287,13 @@ class SlaveScanner:
     # greater than abort_timeout in launchpad-buildd's slave BuildManager.
     CANCEL_TIMEOUT = 180
 
-    def __init__(self, builder_name, builder_factory, logger, clock=None,
-                 interactor_factory=BuilderInteractor,
+    def __init__(self, builder_name, builder_factory, log_tail_updater, logger,
+                 clock=None, interactor_factory=BuilderInteractor,
                  slave_factory=BuilderInteractor.makeSlaveFromVitals,
                  behaviour_factory=BuilderInteractor.getBuildBehaviour):
         self.builder_name = builder_name
         self.builder_factory = builder_factory
+        self.log_tail_updater = log_tail_updater
         self.logger = logger
         self.interactor_factory = interactor_factory
         self.slave_factory = slave_factory
@@ -484,7 +494,7 @@ class SlaveScanner:
             assert slave_status is not None
             yield interactor.updateBuild(
                 vitals, slave, slave_status, self.builder_factory,
-                self.behaviour_factory)
+                self.behaviour_factory, self.log_tail_updater)
         else:
             if not vitals.builderok:
                 return
@@ -580,6 +590,63 @@ class NewBuildersScanner:
         return list(extra_builders)
 
 
+class LogTailUpdater:
+    """Perform build queue logtail updates in bulk."""
+
+    # How often to update, in seconds.
+    UPDATE_INTERVAL = 15
+
+    def __init__(self, manager, clock=None):
+        self.manager = manager
+        # Use the clock if provided, it's so that tests can
+        # advance it.  Use the reactor by default.
+        if clock is None:
+            clock = reactor
+        self._clock = clock
+        self.pending_updates = {}
+
+    def stop(self):
+        """Terminate the LoopingCall."""
+        self.loop.stop()
+
+    def scheduleUpdate(self):
+        """Schedule a callback UPDATE_INTERVAL seconds later."""
+        self.loop = LoopingCall(self.update)
+        self.loop.clock = self._clock
+        self.stopping_deferred = self.loop.start(self.UPDATE_INTERVAL)
+        return self.stopping_deferred
+
+    def update(self):
+        """Check for any pending updates and write them to the database."""
+        self.manager.logger.debug("Writing logtail updates.")
+        try:
+            pending_updates = self.pending_updates
+            self.pending_updates = {}
+            self.writeUpdates(pending_updates)
+        except Exception:
+            self.manager.logger.exception(
+                "Failure while writing logtail updates:\n")
+            transaction.abort()
+        self.manager.logger.debug("Logtail update complete.")
+
+    def writeUpdates(self, pending_updates):
+        if not pending_updates:
+            return
+        new_logtails = Table("new_logtails")
+        new_logtails_expr = Values(
+            new_logtails.name,
+            [("buildqueue", "integer"), ("logtail", "text")],
+            [[dbify_value(BuildQueue.id, buildqueue_id),
+              dbify_value(BuildQueue.logtail, logtail)]
+             for buildqueue_id, logtail in pending_updates.items()])
+        store = IStore(BuildQueue)
+        store.execute(BulkUpdate(
+            {BuildQueue.logtail: Column("logtail", new_logtails)},
+            table=BuildQueue, values=new_logtails_expr,
+            where=(BuildQueue.id == Column("buildqueue", new_logtails))))
+        transaction.commit()
+
+
 class BuilddManager(service.Service):
     """Main Buildd Manager service class."""
 
@@ -589,6 +656,7 @@ class BuilddManager(service.Service):
         self.logger = self._setupLogger()
         self.new_builders_scanner = NewBuildersScanner(
             manager=self, clock=clock)
+        self.log_tail_updater = LogTailUpdater(manager=self, clock=clock)
 
     def _setupLogger(self):
         """Set up a 'slave-scanner' logger that redirects to twisted.
@@ -613,6 +681,9 @@ class BuilddManager(service.Service):
         # Ask the NewBuildersScanner to add and start SlaveScanners for
         # each current builder, and any added in the future.
         self.new_builders_scanner.scheduleScan()
+        # Ask the LogTailUpdater to schedule bulk updates for build queue
+        # logtails.
+        self.log_tail_updater.scheduleUpdate()
 
     def stopService(self):
         """Callback for when we need to shut down."""
@@ -620,7 +691,9 @@ class BuilddManager(service.Service):
         # All the SlaveScanner objects need to be halted gracefully.
         deferreds = [slave.stopping_deferred for slave in self.builder_slaves]
         deferreds.append(self.new_builders_scanner.stopping_deferred)
+        deferreds.append(self.log_tail_updater.stopping_deferred)
 
+        self.log_tail_updater.stop()
         self.new_builders_scanner.stop()
         for slave in self.builder_slaves:
             slave.stopCycle()
@@ -635,7 +708,8 @@ class BuilddManager(service.Service):
         """Set up scanner objects for the builders specified."""
         for builder in builders:
             slave_scanner = SlaveScanner(
-                builder, self.builder_factory, self.logger)
+                builder, self.builder_factory, self.log_tail_updater,
+                self.logger)
             self.builder_slaves.append(slave_scanner)
             slave_scanner.startCycle()
 
diff --git a/lib/lp/buildmaster/model/buildqueue.py b/lib/lp/buildmaster/model/buildqueue.py
index 2af380a..6173a48 100644
--- a/lib/lp/buildmaster/model/buildqueue.py
+++ b/lib/lp/buildmaster/model/buildqueue.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 __metaclass__ = type
@@ -180,23 +180,6 @@ class BuildQueue(SQLBase):
         if builder is not None:
             del get_property_cache(builder).currentjob
 
-    def collectStatus(self, slave_status):
-        """See `IBuildQueue`."""
-        builder_status = slave_status["builder_status"]
-        if builder_status == "BuilderStatus.ABORTING":
-            self.logtail = "Waiting for slave process to be terminated"
-        elif slave_status.get("logtail") is not None:
-            # slave_status["logtail"] is normally an xmlrpc_client.Binary
-            # instance, and the contents might include invalid UTF-8 due to
-            # being a fixed number of bytes from the tail of the log.  Turn
-            # it into Unicode as best we can.
-            self.logtail = str(
-                slave_status.get("logtail")).decode("UTF-8", errors="replace")
-            # PostgreSQL text columns can't contain \0 characters, and since
-            # we only use this for web UI display purposes there's no point
-            # in going through contortions to store them.
-            self.logtail = self.logtail.replace("\0", "")
-
     def suspend(self):
         """See `IBuildQueue`."""
         if self.status != BuildQueueStatus.WAITING:
diff --git a/lib/lp/buildmaster/tests/test_manager.py b/lib/lp/buildmaster/tests/test_manager.py
index 491d2d5..9257ac8 100644
--- a/lib/lp/buildmaster/tests/test_manager.py
+++ b/lib/lp/buildmaster/tests/test_manager.py
@@ -2,7 +2,7 @@
 # NOTE: The first line above must stay first; do not move the copyright
 # notice to the top.  See http://www.python.org/dev/peps/pep-0263/.
 #
-# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Tests for the renovated slave scanner aka BuilddManager."""
@@ -16,10 +16,7 @@ import time
 from six.moves import xmlrpc_client
 from testtools.matchers import Equals
 from testtools.testcase import ExpectedException
-from testtools.twistedsupport import (
-    assert_fails_with,
-    AsynchronousDeferredRunTest,
-    )
+from testtools.twistedsupport import AsynchronousDeferredRunTest
 import transaction
 from twisted.internet import (
     defer,
@@ -53,6 +50,7 @@ from lp.buildmaster.manager import (
     BuilderFactory,
     JOB_RESET_THRESHOLD,
     judge_failure,
+    LogTailUpdater,
     NewBuildersScanner,
     PrefetchedBuilderFactory,
     recover_failure,
@@ -99,6 +97,12 @@ from lp.testing.matchers import HasQueryCount
 from lp.testing.sampledata import BOB_THE_BUILDER_NAME
 
 
+class FakeLogTailUpdater:
+    """A minimal fake version of `LogTailUpdater`."""
+
+    pending_updates = {}
+
+
 class TestSlaveScannerScan(TestCaseWithFactory):
     """Tests `SlaveScanner.scan` method.
 
@@ -153,8 +157,11 @@ class TestSlaveScannerScan(TestCaseWithFactory):
             builder_name = BOB_THE_BUILDER_NAME
         if builder_factory is None:
             builder_factory = BuilderFactory()
+        manager = BuilddManager(builder_factory=builder_factory, clock=clock)
+        manager.logger = BufferLogger()
         scanner = SlaveScanner(
-            builder_name, builder_factory, BufferLogger(), clock=clock)
+            builder_name, builder_factory, manager.log_tail_updater,
+            BufferLogger(), clock=clock)
         scanner.logger.name = 'slave-scanner'
 
         return scanner
@@ -179,28 +186,18 @@ class TestSlaveScannerScan(TestCaseWithFactory):
         self.assertEqual(0, builder.failure_count)
         self.assertTrue(builder.currentjob is not None)
 
-    def _checkNoDispatch(self, slave, builder):
-        """Assert that no dispatch has occurred.
-
-        'slave' is None, so no interations would be passed
-        to the asynchonous dispatcher and the builder remained active
-        and IDLE.
-        """
-        self.assertTrue(slave is None, "Unexpected slave.")
-
+    def _checkNoDispatch(self, builder):
+        """Assert that no dispatch has occurred."""
         builder = getUtility(IBuilderSet).get(builder.id)
         self.assertTrue(builder.builderok)
         self.assertTrue(builder.currentjob is None)
 
-    def _checkJobRescued(self, slave, builder, job):
+    def _checkJobRescued(self, builder, job):
         """`SlaveScanner.scan` rescued the job.
 
         Nothing gets dispatched,  the 'broken' builder remained disabled
         and the 'rescued' job is ready to be dispatched.
         """
-        self.assertTrue(
-            slave is None, "Unexpected slave.")
-
         builder = getUtility(IBuilderSet).get(builder.id)
         self.assertFalse(builder.builderok)
 
@@ -231,25 +228,23 @@ class TestSlaveScannerScan(TestCaseWithFactory):
         transaction.commit()
 
         # Run 'scan' and check its result.
-        slave = yield scanner.scan()
+        yield scanner.scan()
         self.assertIs(None, builder.currentjob)
-        self._checkJobRescued(slave, builder, job)
+        self._checkJobRescued(builder, job)
 
-    def _checkJobUpdated(self, slave, builder, job,
-                         logtail='This is a build log: 0'):
+    def _checkJobUpdated(self, builder, job, logtail='This is a build log: 0'):
         """`SlaveScanner.scan` updates legitimate jobs.
 
         Job is kept assigned to the active builder and its 'logtail' is
         updated.
         """
-        self.assertTrue(slave is None, "Unexpected slave.")
-
         builder = getUtility(IBuilderSet).get(builder.id)
         self.assertTrue(builder.builderok)
 
         job = getUtility(IBuildQueueSet).get(job.id)
         self.assertBuildingJob(job, builder, logtail=logtail)
 
+    @defer.inlineCallbacks
     def testScanUpdatesBuildingJobs(self):
         # Enable sampledata builder attached to an appropriate testing
         # slave. It will respond as if it was building the sampledata job.
@@ -268,10 +263,11 @@ class TestSlaveScannerScan(TestCaseWithFactory):
         # Run 'scan' and check its result.
         switch_dbuser(config.builddmaster.dbuser)
         scanner = self._getScanner()
-        d = defer.maybeDeferred(scanner.scan)
-        d.addCallback(self._checkJobUpdated, builder, job)
-        return d
+        yield scanner.scan()
+        scanner.log_tail_updater.update()
+        self._checkJobUpdated(builder, job)
 
+    @defer.inlineCallbacks
     def test_scan_with_nothing_to_dispatch(self):
         factory = LaunchpadObjectFactory()
         builder = factory.makeBuilder()
@@ -279,9 +275,10 @@ class TestSlaveScannerScan(TestCaseWithFactory):
         self.patch(BuilderSlave, 'makeBuilderSlave', FakeMethod(OkSlave()))
         transaction.commit()
         scanner = self._getScanner(builder_name=builder.name)
-        d = scanner.scan()
-        return d.addCallback(self._checkNoDispatch, builder)
+        yield scanner.scan()
+        self._checkNoDispatch(builder)
 
+    @defer.inlineCallbacks
     def test_scan_with_manual_builder(self):
         # Reset sampledata builder.
         builder = getUtility(IBuilderSet)[BOB_THE_BUILDER_NAME]
@@ -290,9 +287,8 @@ class TestSlaveScannerScan(TestCaseWithFactory):
         builder.manual = True
         transaction.commit()
         scanner = self._getScanner()
-        d = scanner.scan()
-        d.addCallback(self._checkNoDispatch, builder)
-        return d
+        yield scanner.scan()
+        self._checkNoDispatch(builder)
 
     @defer.inlineCallbacks
     def test_scan_with_not_ok_builder(self):
@@ -307,6 +303,7 @@ class TestSlaveScannerScan(TestCaseWithFactory):
         # Because the builder is not ok, we can't use _checkNoDispatch.
         self.assertIsNone(builder.currentjob)
 
+    @defer.inlineCallbacks
     def test_scan_of_broken_slave(self):
         builder = getUtility(IBuilderSet)[BOB_THE_BUILDER_NAME]
         self._resetBuilder(builder)
@@ -315,9 +312,10 @@ class TestSlaveScannerScan(TestCaseWithFactory):
         builder.failure_count = 0
         transaction.commit()
         scanner = self._getScanner(builder_name=builder.name)
-        d = scanner.scan()
-        return assert_fails_with(d, xmlrpc_client.Fault)
+        with ExpectedException(xmlrpc_client.Fault):
+            yield scanner.scan()
 
+    @defer.inlineCallbacks
     def test_scan_of_partial_utf8_logtail(self):
         # The builder returns a fixed number of bytes from the tail of the
         # log, so the first character can easily end up being invalid UTF-8.
@@ -343,10 +341,11 @@ class TestSlaveScannerScan(TestCaseWithFactory):
 
         switch_dbuser(config.builddmaster.dbuser)
         scanner = self._getScanner()
-        d = defer.maybeDeferred(scanner.scan)
-        d.addCallback(
-            self._checkJobUpdated, builder, job, logtail=u"\uFFFD\uFFFD──")
+        yield scanner.scan()
+        scanner.log_tail_updater.update()
+        self._checkJobUpdated(builder, job, logtail=u"\uFFFD\uFFFD──")
 
+    @defer.inlineCallbacks
     def test_scan_of_logtail_containing_nul(self):
         # PostgreSQL text columns can't store ASCII NUL (\0) characters, so
         # we make sure to filter those out of the logtail.
@@ -371,9 +370,9 @@ class TestSlaveScannerScan(TestCaseWithFactory):
 
         switch_dbuser(config.builddmaster.dbuser)
         scanner = self._getScanner()
-        d = defer.maybeDeferred(scanner.scan)
-        d.addCallback(
-            self._checkJobUpdated, builder, job, logtail=u"foobarbaz")
+        yield scanner.scan()
+        scanner.log_tail_updater.update()
+        self._checkJobUpdated(builder, job, logtail=u"foobarbaz")
 
     @defer.inlineCallbacks
     def test_scan_calls_builder_factory_prescanUpdate(self):
@@ -449,11 +448,13 @@ class TestSlaveScannerScan(TestCaseWithFactory):
         transaction.commit()
         scanner = self._getScanner(builder_name=builder.name)
         yield scanner.scan()
+        yield scanner.log_tail_updater.update()
         self.assertBuildingJob(job, builder, logtail="This is a build log: 0")
         self.assertIsNone(build.revision_id)
         slave.revision_id = "dummy"
         scanner = self._getScanner(builder_name=builder.name)
         yield scanner.scan()
+        yield scanner.log_tail_updater.update()
         self.assertBuildingJob(job, builder, logtail="This is a build log: 1")
         self.assertEqual("dummy", build.revision_id)
 
@@ -495,21 +496,21 @@ class TestSlaveScannerScan(TestCaseWithFactory):
     def test_scan_first_fail(self):
         # The first failure of a job should result in the failure_count
         # on the job and the builder both being incremented.
-        self._assertFailureCounting(
+        return self._assertFailureCounting(
             builder_count=0, job_count=0, expected_builder_count=1,
             expected_job_count=1)
 
     def test_scan_second_builder_fail(self):
         # The first failure of a job should result in the failure_count
         # on the job and the builder both being incremented.
-        self._assertFailureCounting(
+        return self._assertFailureCounting(
             builder_count=1, job_count=0, expected_builder_count=2,
             expected_job_count=1)
 
     def test_scan_second_job_fail(self):
         # The first failure of a job should result in the failure_count
         # on the job and the builder both being incremented.
-        self._assertFailureCounting(
+        return self._assertFailureCounting(
             builder_count=0, job_count=1, expected_builder_count=1,
             expected_job_count=2)
 
@@ -685,9 +686,11 @@ class TestSlaveScannerWithLibrarian(TestCaseWithFactory):
         # control.
         get_slave = FakeMethod(OkSlave())
         clock = task.Clock()
+        manager = BuilddManager(clock=clock)
+        manager.logger = BufferLogger()
         scanner = SlaveScanner(
-            builder.name, BuilderFactory(), BufferLogger(),
-            slave_factory=get_slave, clock=clock)
+            builder.name, BuilderFactory(), manager.log_tail_updater,
+            BufferLogger(), slave_factory=get_slave, clock=clock)
 
         # The slave is idle and dirty, so the first scan will clean it
         # with a reset.
@@ -714,10 +717,13 @@ class TestSlaveScannerWithLibrarian(TestCaseWithFactory):
         # updated.
         get_slave.result = BuildingSlave(build.build_cookie)
         yield scanner.scan()
+        yield scanner.log_tail_updater.update()
         self.assertEqual("This is a build log: 0", bq.logtail)
         yield scanner.scan()
+        yield scanner.log_tail_updater.update()
         self.assertEqual("This is a build log: 1", bq.logtail)
         yield scanner.scan()
+        yield scanner.log_tail_updater.update()
         self.assertEqual("This is a build log: 2", bq.logtail)
         self.assertEqual(
             ['status', 'status', 'status'], get_slave.result.method_log)
@@ -849,7 +855,7 @@ class TestSlaveScannerWithoutDB(TestCase):
         if behaviour is None:
             behaviour = TrivialBehaviour()
         return SlaveScanner(
-            'mock', builder_factory, BufferLogger(),
+            'mock', builder_factory, FakeLogTailUpdater(), BufferLogger(),
             interactor_factory=FakeMethod(interactor),
             slave_factory=FakeMethod(slave),
             behaviour_factory=FakeMethod(behaviour))
@@ -939,8 +945,11 @@ class TestSlaveScannerWithoutDB(TestCase):
     def test_getExpectedCookie_caches(self):
         bq = FakeBuildQueue('trivial')
         bf = MockBuilderFactory(MockBuilder(), bq)
+        manager = BuilddManager()
+        manager.logger = BufferLogger()
         scanner = SlaveScanner(
-            'mock', bf, BufferLogger(), interactor_factory=FakeMethod(None),
+            'mock', bf, manager.log_tail_updater, BufferLogger(),
+            interactor_factory=FakeMethod(None),
             slave_factory=FakeMethod(None),
             behaviour_factory=FakeMethod(TrivialBehaviour()))
 
@@ -1034,8 +1043,11 @@ class TestCancellationChecking(TestCaseWithFactory):
         return extract_vitals_from_db(self.builder)
 
     def _getScanner(self, clock=None):
+        manager = BuilddManager(clock=clock)
+        manager.logger = BufferLogger()
         scanner = SlaveScanner(
-            None, BuilderFactory(), BufferLogger(), clock=clock)
+            None, BuilderFactory(), manager.log_tail_updater, BufferLogger(),
+            clock=clock)
         scanner.logger.name = 'slave-scanner'
         return scanner
 
@@ -1126,6 +1138,21 @@ class TestBuilddManager(TestCase):
         clock.advance(advance)
         self.assertNotEqual(0, manager.new_builders_scanner.scan.call_count)
 
+    def test_startService_adds_LogTailUpdater(self):
+        # When startService is called, the manager will start up a
+        # LogTailUpdater object.
+        self._stub_out_scheduleNextScanCycle()
+        clock = task.Clock()
+        manager = BuilddManager(clock=clock)
+
+        # Replace scan() with FakeMethod so we can see if it was called.
+        manager.log_tail_updater.update = FakeMethod()
+
+        manager.startService()
+        advance = LogTailUpdater.UPDATE_INTERVAL + 1
+        clock.advance(advance)
+        self.assertNotEqual(0, manager.log_tail_updater.update.call_count)
+
 
 class TestFailureAssessments(TestCaseWithFactory):
 
@@ -1356,6 +1383,66 @@ class TestNewBuilders(TestCase):
         self.assertEqual(3, scanner.checkForNewBuilders.call_count)
 
 
+class TestLogTailUpdater(TestCaseWithFactory):
+    """Test updating of log tails."""
+
+    layer = LaunchpadZopelessLayer
+
+    def _getUpdater(self, clock=None):
+        return LogTailUpdater(
+            manager=BuilddManager(builder_factory=BuilderFactory()),
+            clock=clock)
+
+    def test_scheduleUpdate(self):
+        # scheduleUpdate calls the "update" method.
+        clock = task.Clock()
+        log_tail_updater = self._getUpdater(clock=clock)
+        log_tail_updater.update = FakeMethod()
+        log_tail_updater.scheduleUpdate()
+
+        clock.advance(LogTailUpdater.UPDATE_INTERVAL + 1)
+        self.assertNotEqual(
+            0, log_tail_updater.update.call_count,
+            "scheduleUpdate did not schedule anything")
+
+    def test_update(self):
+        # update writes pending updates to the database.
+        log_tail_updater = self._getUpdater()
+        bqs = [
+            self.factory.makeBinaryPackageBuild().queueBuild()
+            for _ in range(3)]
+        log_tail_updater.pending_updates[bqs[0].id] = "A log tail"
+        log_tail_updater.pending_updates[bqs[1].id] = "Another log tail"
+        transaction.commit()
+
+        log_tail_updater.update()
+        self.assertEqual("A log tail", bqs[0].logtail)
+        self.assertEqual("Another log tail", bqs[1].logtail)
+        self.assertIsNone(bqs[2].logtail)
+
+        self.assertEqual({}, log_tail_updater.pending_updates)
+        log_tail_updater.update()
+        self.assertEqual("A log tail", bqs[0].logtail)
+        self.assertEqual("Another log tail", bqs[1].logtail)
+        self.assertIsNone(bqs[2].logtail)
+
+    def test_update_swallows_exceptions(self):
+        # update swallows exceptions so the LoopingCall always retries.
+        clock = task.Clock()
+        log_tail_updater = self._getUpdater(clock=clock)
+        log_tail_updater.writeUpdates = FakeMethod(failure=Exception("Boom"))
+        log_tail_updater.scheduleUpdate()
+        self.assertEqual(1, log_tail_updater.writeUpdates.call_count)
+
+        # Even though the previous writeUpdates raised an exception, further
+        # updates will happen as normal.
+        advance = LogTailUpdater.UPDATE_INTERVAL + 1
+        clock.advance(advance)
+        self.assertEqual(2, log_tail_updater.writeUpdates.call_count)
+        clock.advance(advance)
+        self.assertEqual(3, log_tail_updater.writeUpdates.call_count)
+
+
 def is_file_growing(filepath, poll_interval=1, poll_repeat=10):
     """Poll the file size to see if it grows.