← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:stormify-codeimportjob into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:stormify-codeimportjob into launchpad:master.

Commit message:
Convert CodeImportJob to Storm

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/380981

We need to take a little care around the XML-RPC scheduler: despite XML-RPC strings being Unicode, Python 2's xmlrpclib likes to unmarshal them as plain str when they're within the ASCII subset, so we need to turn them back into Unicode.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:stormify-codeimportjob into launchpad:master.
diff --git a/lib/lp/code/model/codeimport.py b/lib/lp/code/model/codeimport.py
index a92ee7d..4162632 100644
--- a/lib/lp/code/model/codeimport.py
+++ b/lib/lp/code/model/codeimport.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2016 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Database classes including and related to CodeImport."""
@@ -34,6 +34,7 @@ from storm.references import Reference
 from zope.component import getUtility
 from zope.event import notify
 from zope.interface import implementer
+from zope.security.proxy import removeSecurityProxy
 
 from lp.app.errors import NotFoundError
 from lp.code.enums import (
@@ -158,7 +159,7 @@ class CodeImport(SQLBase):
         seconds = default_interval_dict.get(self.rcs_type, 21600)
         return timedelta(seconds=seconds)
 
-    import_job = Reference("<primary key>", "CodeImportJob.code_importID",
+    import_job = Reference("<primary key>", "CodeImportJob.code_import_id",
                            on_remote=True)
 
     def getImportDetailsForDisplay(self):
@@ -347,9 +348,8 @@ class CodeImportSet:
 
     def delete(self, code_import):
         """See `ICodeImportSet`."""
-        from lp.code.model.codeimportjob import CodeImportJob
         if code_import.import_job is not None:
-            CodeImportJob.delete(code_import.import_job.id)
+            removeSecurityProxy(code_import.import_job).destroySelf()
         CodeImport.delete(code_import.id)
 
     def get(self, id):
diff --git a/lib/lp/code/model/codeimportjob.py b/lib/lp/code/model/codeimportjob.py
index 702c93c..a758adc 100644
--- a/lib/lp/code/model/codeimportjob.py
+++ b/lib/lp/code/model/codeimportjob.py
@@ -1,8 +1,10 @@
-# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Database classes for the CodeImportJob table."""
 
+from __future__ import absolute_import, print_function, unicode_literals
+
 __metaclass__ = type
 __all__ = [
     'CodeImportJob',
@@ -10,13 +12,17 @@ __all__ = [
     'CodeImportJobWorkflow',
     ]
 
-import datetime
-
-from sqlobject import (
-    ForeignKey,
-    IntCol,
-    SQLObjectNotFound,
-    StringCol,
+from datetime import timedelta
+
+import pytz
+from storm.expr import Cast
+from storm.locals import (
+    DateTime,
+    Desc,
+    Int,
+    Reference,
+    Store,
+    Unicode,
     )
 from zope.component import getUtility
 from zope.interface import implementer
@@ -50,13 +56,10 @@ from lp.code.model.codeimportresult import CodeImportResult
 from lp.registry.interfaces.person import validate_public_person
 from lp.services.config import config
 from lp.services.database.constants import UTC_NOW
-from lp.services.database.datetimecol import UtcDateTimeCol
-from lp.services.database.enumcol import EnumCol
+from lp.services.database.enumcol import DBEnum
 from lp.services.database.interfaces import IStore
-from lp.services.database.sqlbase import (
-    SQLBase,
-    sqlvalues,
-    )
+from lp.services.database.sqlbase import get_transaction_timestamp
+from lp.services.database.stormbase import StormBase
 from lp.services.macaroons.interfaces import (
     BadMacaroonContext,
     IMacaroonIssuer,
@@ -66,53 +69,48 @@ from lp.services.macaroons.model import MacaroonIssuerBase
 
 
 @implementer(ICodeImportJob)
-class CodeImportJob(SQLBase):
+class CodeImportJob(StormBase):
     """See `ICodeImportJob`."""
 
-    date_created = UtcDateTimeCol(notNull=True, default=UTC_NOW)
+    __storm_table__ = 'CodeImportJob'
 
-    code_import = ForeignKey(
-        dbName='code_import', foreignKey='CodeImport', notNull=True)
+    id = Int(primary=True)
 
-    machine = ForeignKey(
-        dbName='machine', foreignKey='CodeImportMachine',
-        notNull=False, default=None)
+    date_created = DateTime(tzinfo=pytz.UTC, allow_none=False, default=UTC_NOW)
 
-    date_due = UtcDateTimeCol(notNull=True)
+    code_import_id = Int(name='code_import', allow_none=False)
+    code_import = Reference(code_import_id, 'CodeImport.id')
 
-    state = EnumCol(
-        enum=CodeImportJobState, notNull=True,
+    machine_id = Int(name='machine', allow_none=True, default=None)
+    machine = Reference(machine_id, 'CodeImportMachine.id')
+
+    date_due = DateTime(tzinfo=pytz.UTC, allow_none=False)
+
+    state = DBEnum(
+        enum=CodeImportJobState, allow_none=False,
         default=CodeImportJobState.PENDING)
 
-    requesting_user = ForeignKey(
-        dbName='requesting_user', foreignKey='Person',
-        storm_validator=validate_public_person,
-        notNull=False, default=None)
+    requesting_user_id = Int(
+        name='requesting_user', allow_none=True,
+        validator=validate_public_person, default=None)
+    requesting_user = Reference(requesting_user_id, 'Person.id')
+
+    ordering = Int(allow_none=True, default=None)
 
-    ordering = IntCol(notNull=False, default=None)
+    heartbeat = DateTime(tzinfo=pytz.UTC, allow_none=True, default=None)
 
-    heartbeat = UtcDateTimeCol(notNull=False, default=None)
+    logtail = Unicode(allow_none=True, default=None)
 
-    logtail = StringCol(notNull=False, default=None)
+    date_started = DateTime(tzinfo=pytz.UTC, allow_none=True, default=None)
 
-    date_started = UtcDateTimeCol(notNull=False, default=None)
+    def __init__(self, code_import, date_due):
+        super(CodeImportJob, self).__init__()
+        self.code_import = code_import
+        self.date_due = date_due
 
     def isOverdue(self):
         """See `ICodeImportJob`."""
-        # SQLObject offers no easy way to compare a timestamp to UTC_NOW, so
-        # we must use trickery here.
-
-        # First we flush any pending update to self to ensure that the
-        # following database query will give the correct result even if
-        # date_due was modified in this transaction.
-        self.syncUpdate()
-
-        # Then, we try to find a CodeImportJob object with the id of self, and
-        # a date_due of now or past. If we find one, this means self is
-        # overdue.
-        import_job = CodeImportJob.selectOne(
-            "id = %s AND date_due <= %s" % sqlvalues(self.id, UTC_NOW))
-        return import_job is not None
+        return self.date_due <= get_transaction_timestamp(Store.of(self))
 
     def makeWorkerArguments(self):
         """See `ICodeImportJob`."""
@@ -169,6 +167,9 @@ class CodeImportJob(SQLBase):
             result.append(macaroon.serialize())
         return result
 
+    def destroySelf(self):
+        Store.of(self).remove(self)
+
 
 @implementer(ICodeImportJobSet, ICodeImportJobSetPublic)
 class CodeImportJobSet(object):
@@ -179,10 +180,7 @@ class CodeImportJobSet(object):
 
     def getById(self, id):
         """See `ICodeImportJobSet`."""
-        try:
-            return CodeImportJob.get(id)
-        except SQLObjectNotFound:
-            return None
+        return IStore(CodeImportJob).get(CodeImportJob, id)
 
     def getJobForMachine(self, hostname, worker_limit):
         """See `ICodeImportJobSet`."""
@@ -195,12 +193,12 @@ class CodeImportJobSet(object):
                 hostname, CodeImportMachineState.ONLINE)
         elif not machine.shouldLookForJob(worker_limit):
             return None
-        job = CodeImportJob.selectOne(
-            """id IN (SELECT id FROM CodeImportJob
-               WHERE date_due <= %s AND state = %s
-               ORDER BY requesting_user IS NULL, date_due
-               LIMIT 1)"""
-            % sqlvalues(UTC_NOW, CodeImportJobState.PENDING))
+        job = IStore(CodeImportJob).find(
+            CodeImportJob,
+            CodeImportJob.date_due <= UTC_NOW,
+            CodeImportJob.state == CodeImportJobState.PENDING).order_by(
+                CodeImportJob.requesting_user == None,
+                CodeImportJob.date_due).first()
         if job is not None:
             job_workflow.startJob(job, machine)
             return job
@@ -209,11 +207,12 @@ class CodeImportJobSet(object):
 
     def getReclaimableJobs(self):
         """See `ICodeImportJobSet`."""
+        interval = config.codeimportworker.maximum_heartbeat_interval
         return IStore(CodeImportJob).find(
             CodeImportJob,
-            "state = %s and heartbeat < %s + '-%s seconds'"
-            % sqlvalues(CodeImportJobState.RUNNING, UTC_NOW,
-                        config.codeimportworker.maximum_heartbeat_interval))
+            CodeImportJob.state == CodeImportJobState.RUNNING,
+            CodeImportJob.heartbeat < (
+                UTC_NOW - Cast(timedelta(seconds=interval), 'interval')))
 
 
 @implementer(ICodeImportJobWorkflow)
@@ -233,16 +232,18 @@ class CodeImportJobWorkflow:
             interval = code_import.effective_update_interval
 
         job = CodeImportJob(code_import=code_import, date_due=UTC_NOW)
+        IStore(CodeImportJob).add(job)
 
         # Find the most recent CodeImportResult for this CodeImport. We
         # sort by date_created because we do not have an index on
         # date_job_started in the database, and that should give the same
         # sort order.
-        most_recent_result_list = list(CodeImportResult.selectBy(
-            code_import=code_import).orderBy(['-date_created']).limit(1))
+        most_recent_result = IStore(CodeImportResult).find(
+            CodeImportResult,
+            CodeImportResult.code_import == code_import).order_by(
+                Desc(CodeImportResult.date_created)).first()
 
-        if len(most_recent_result_list) != 0:
-            [most_recent_result] = most_recent_result_list
+        if most_recent_result is not None:
             date_due = most_recent_result.date_job_started + interval
             job.date_due = max(job.date_due, date_due)
 
@@ -301,7 +302,7 @@ class CodeImportJobWorkflow:
         naked_job = removeSecurityProxy(import_job)
         naked_job.date_started = UTC_NOW
         naked_job.heartbeat = UTC_NOW
-        naked_job.logtail = u''
+        naked_job.logtail = ''
         naked_job.machine = machine
         naked_job.state = CodeImportJobState.RUNNING
         getUtility(ICodeImportEventSet).newStart(
@@ -365,7 +366,7 @@ class CodeImportJobWorkflow:
             code_import.updateFromData(
                 dict(review_status=CodeImportReviewStatus.FAILING), None)
         elif status == CodeImportResultStatus.SUCCESS_PARTIAL:
-            interval = datetime.timedelta(0)
+            interval = timedelta(0)
         elif failure_count > 0:
             interval = (code_import.effective_update_interval *
                         (2 ** (failure_count - 1)))
@@ -406,7 +407,7 @@ class CodeImportJobWorkflow:
             import_job, CodeImportResultStatus.RECLAIMED, None)
         # 3)
         if code_import.review_status == CodeImportReviewStatus.REVIEWED:
-            self.newJob(code_import, datetime.timedelta(0))
+            self.newJob(code_import, timedelta(0))
         # 4)
         getUtility(ICodeImportEventSet).newReclaim(
             code_import, machine, job_id)
diff --git a/lib/lp/code/model/codeimportmachine.py b/lib/lp/code/model/codeimportmachine.py
index 4dfaeb7..1983adc 100644
--- a/lib/lp/code/model/codeimportmachine.py
+++ b/lib/lp/code/model/codeimportmachine.py
@@ -1,4 +1,4 @@
-# Copyright 2009 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Database classes including and related to CodeImportMachine."""
@@ -14,6 +14,7 @@ from sqlobject import (
     SQLMultipleJoin,
     StringCol,
     )
+from storm.locals import ReferenceSet
 from zope.component import getUtility
 from zope.interface import implementer
 
@@ -48,9 +49,9 @@ class CodeImportMachine(SQLBase):
         default=CodeImportMachineState.OFFLINE)
     heartbeat = UtcDateTimeCol(notNull=False)
 
-    current_jobs = SQLMultipleJoin(
-        'CodeImportJob', joinColumn='machine',
-        orderBy=['date_started', 'id'])
+    current_jobs = ReferenceSet(
+        '<primary key>', 'CodeImportJob.machine_id',
+        order_by=('CodeImportJob.date_started', 'CodeImportJob.id'))
 
     events = SQLMultipleJoin(
         'CodeImportEvent', joinColumn='machine',
diff --git a/lib/lp/code/model/tests/test_codeimport.py b/lib/lp/code/model/tests/test_codeimport.py
index a57f79e..7b5166a 100644
--- a/lib/lp/code/model/tests/test_codeimport.py
+++ b/lib/lp/code/model/tests/test_codeimport.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2017 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Unit tests for methods of CodeImport and CodeImportSet."""
@@ -47,6 +47,7 @@ from lp.code.model.codeimportresult import CodeImportResult
 from lp.code.tests.codeimporthelpers import make_running_import
 from lp.code.tests.helpers import GitHostingFixture
 from lp.registry.interfaces.person import IPersonSet
+from lp.services.database.interfaces import IStore
 from lp.testing import (
     login,
     login_person,
@@ -377,7 +378,7 @@ class TestCodeImportStatusUpdate(TestCodeImportBase):
         self.import_operator = getUtility(IPersonSet).getByEmail(
             'david.allouche@xxxxxxxxxxxxx')
         # Remove existing jobs.
-        for job in CodeImportJob.select():
+        for job in IStore(CodeImportJob).find(CodeImportJob):
             job.destroySelf()
 
     def makeApprovedImportWithPendingJob(self):
diff --git a/lib/lp/code/model/tests/test_codeimportjob.py b/lib/lp/code/model/tests/test_codeimportjob.py
index 68499dd..c099944 100644
--- a/lib/lp/code/model/tests/test_codeimportjob.py
+++ b/lib/lp/code/model/tests/test_codeimportjob.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Unit tests for CodeImportJob and CodeImportJobWorkflow."""
@@ -237,7 +237,7 @@ class TestCodeImportJobSetGetJobForMachine(TestCaseWithFactory):
         # the sample data and set up some objects.
         super(TestCodeImportJobSetGetJobForMachine, self).setUp()
         login_for_code_imports()
-        for job in CodeImportJob.select():
+        for job in IStore(CodeImportJob).find(CodeImportJob):
             job.destroySelf()
         self.machine = self.factory.makeCodeImportMachine(set_online=True)
 
@@ -351,7 +351,7 @@ class ReclaimableJobTests(TestCaseWithFactory):
     def setUp(self):
         super(ReclaimableJobTests, self).setUp()
         login_for_code_imports()
-        for job in CodeImportJob.select():
+        for job in IStore(CodeImportJob).find(CodeImportJob):
             job.destroySelf()
 
     def makeJobWithHeartbeatInPast(self, seconds_in_past):
diff --git a/lib/lp/code/xmlrpc/codeimportscheduler.py b/lib/lp/code/xmlrpc/codeimportscheduler.py
index 1cf05cb..35ba346 100644
--- a/lib/lp/code/xmlrpc/codeimportscheduler.py
+++ b/lib/lp/code/xmlrpc/codeimportscheduler.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """The code import scheduler XML-RPC API."""
@@ -8,6 +8,7 @@ __all__ = [
     'CodeImportSchedulerAPI',
     ]
 
+import six
 from zope.component import getUtility
 from zope.interface import implementer
 from zope.security.proxy import removeSecurityProxy
@@ -35,7 +36,7 @@ class CodeImportSchedulerAPI(LaunchpadXMLRPCView):
     def getJobForMachine(self, hostname, worker_limit):
         """See `ICodeImportScheduler`."""
         job = getUtility(ICodeImportJobSet).getJobForMachine(
-            hostname, worker_limit)
+            six.ensure_text(hostname), worker_limit)
         if job is not None:
             return job.id
         else:
@@ -58,11 +59,13 @@ class CodeImportSchedulerAPI(LaunchpadXMLRPCView):
 
     def updateHeartbeat(self, job_id, log_tail):
         """See `ICodeImportScheduler`."""
-        return self._updateHeartbeat(job_id, log_tail)
+        return self._updateHeartbeat(job_id, six.ensure_text(log_tail))
 
     def finishJobID(self, job_id, status_name, log_file_alias_url):
         """See `ICodeImportScheduler`."""
-        return self._finishJobID(job_id, status_name, log_file_alias_url)
+        return self._finishJobID(
+            job_id, six.ensure_text(status_name),
+            six.ensure_text(log_file_alias_url))
 
     @return_fault
     def _getImportDataForJobID(self, job_id):
diff --git a/lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py b/lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py
index 60ad9ce..91ceae3 100644
--- a/lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py
+++ b/lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py
@@ -1,4 +1,4 @@
-# Copyright 2010-2018 Canonical Ltd.  This software is licensed under the
+# Copyright 2010-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Test for the methods of `ICodeImportScheduler`."""
@@ -15,6 +15,7 @@ from lp.code.model.codeimportjob import CodeImportJob
 from lp.code.tests.codeimporthelpers import make_running_import
 from lp.code.xmlrpc.codeimportscheduler import CodeImportSchedulerAPI
 from lp.services.database.constants import UTC_NOW
+from lp.services.database.interfaces import IStore
 from lp.services.webapp import canonical_url
 from lp.testing import (
     run_with_login,
@@ -32,7 +33,7 @@ class TestCodeImportSchedulerAPI(TestCaseWithFactory):
         TestCaseWithFactory.setUp(self)
         self.api = CodeImportSchedulerAPI(None, None)
         self.machine = self.factory.makeCodeImportMachine(set_online=True)
-        for job in CodeImportJob.select():
+        for job in IStore(CodeImportJob).find(CodeImportJob):
             job.destroySelf()
 
     def makeCodeImportJob(self, running):
@@ -84,7 +85,7 @@ class TestCodeImportSchedulerAPI(TestCaseWithFactory):
     def test_updateHeartbeat(self):
         # updateHeartbeat calls the updateHeartbeat job workflow method.
         code_import_job = self.makeCodeImportJob(running=True)
-        log_tail = self.factory.getUniqueString()
+        log_tail = self.factory.getUniqueUnicode()
         self.api.updateHeartbeat(code_import_job.id, log_tail)
         self.assertSqlAttributeEqualsDate(
             code_import_job, 'heartbeat', UTC_NOW)
diff --git a/lib/lp/codehosting/codeimport/tests/test_workermonitor.py b/lib/lp/codehosting/codeimport/tests/test_workermonitor.py
index 047cbc9..d2422cb 100644
--- a/lib/lp/codehosting/codeimport/tests/test_workermonitor.py
+++ b/lib/lp/codehosting/codeimport/tests/test_workermonitor.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Tests for the CodeImportWorkerMonitor and related classes."""
@@ -72,6 +72,7 @@ from lp.services.config.fixture import (
     ConfigFixture,
     ConfigUseFixture,
     )
+from lp.services.database.interfaces import IStore
 from lp.services.log.logger import BufferLogger
 from lp.services.twistedsupport import suppress_stderr
 from lp.services.twistedsupport.tests.test_processmonitor import (
@@ -669,7 +670,7 @@ class TestWorkerMonitorRunNoProcess(TestCase):
 
 def nuke_codeimport_sample_data():
     """Delete all the sample data that might interfere with tests."""
-    for job in CodeImportJob.select():
+    for job in IStore(CodeImportJob).find(CodeImportJob):
         job.destroySelf()
     for code_import in CodeImport.select():
         code_import.destroySelf()
@@ -796,7 +797,7 @@ class TestWorkerMonitorIntegration(TestCaseInTempDir, TestCase):
             code_import.updateFromData(
                 {'review_status': CodeImportReviewStatus.REVIEWED},
                 self.factory.makePerson())
-        job = getUtility(ICodeImportJobSet).getJobForMachine('machine', 10)
+        job = getUtility(ICodeImportJobSet).getJobForMachine(u'machine', 10)
         self.assertEqual(code_import, job.code_import)
         source_details = CodeImportSourceDetails.fromArguments(
             removeSecurityProxy(job.makeWorkerArguments()))
diff --git a/lib/lp/codehosting/codeimport/workermonitor.py b/lib/lp/codehosting/codeimport/workermonitor.py
index 767f0e3..2a2035a 100644
--- a/lib/lp/codehosting/codeimport/workermonitor.py
+++ b/lib/lp/codehosting/codeimport/workermonitor.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Code to talk to the database about what the worker script is doing."""
@@ -10,6 +10,7 @@ __all__ = []
 import os
 import tempfile
 
+import six
 from twisted.internet import (
     defer,
     error,
@@ -57,7 +58,7 @@ class CodeImportWorkerMonitorProtocol(ProcessMonitorProtocolWithTimeout):
             self, deferred, clock=clock,
             timeout=config.codeimport.worker_inactivity_timeout)
         self.worker_monitor = worker_monitor
-        self._tail = ''
+        self._tail = b''
         self._log_file = log_file
         self._looping_call = task.LoopingCall(self._updateHeartbeat)
         self._looping_call.clock = self._clock
@@ -91,7 +92,7 @@ class CodeImportWorkerMonitorProtocol(ProcessMonitorProtocolWithTimeout):
         """
         self.resetTimeout()
         self._log_file.write(data)
-        self._tail = '\n'.join((self._tail + data).split('\n')[-5:])
+        self._tail = b'\n'.join((self._tail + data).split(b'\n')[-5:])
 
     errReceived = outReceived
 
@@ -195,8 +196,12 @@ class CodeImportWorkerMonitor:
     def updateHeartbeat(self, tail):
         """Call the updateHeartbeat method for the job we are working on."""
         self._logger.debug("Updating heartbeat.")
+        # The log tail is really bytes, but it's stored in the database as a
+        # text column, so it's easiest to convert it to text now; passing
+        # text over XML-RPC requires less boilerplate than bytes anyway.
         deferred = self.codeimport_endpoint.callRemote(
-            'updateHeartbeat', self._job_id, tail)
+            'updateHeartbeat', self._job_id,
+            six.ensure_text(tail, errors='replace'))
         return deferred.addErrback(self._trap_nosuchcodeimportjob)
 
     def _createLibrarianFileAlias(self, name, size, file, contentType):