← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:code-import-decouple-tests into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:code-import-decouple-tests into launchpad:master with ~cjwatson/launchpad:remove-old-svn-code-import-tests as a prerequisite.

Commit message:
Decouple code import tests from database

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/394167

We're aiming to split out lp.codehosting.codeimport into a separate codebase, after which point its tests will no longer be able to use the Launchpad database.  Prepare for this by rearranging the worker monitor tests to prepare worker argument lists and make assertions about XML-RPC requests, rather than creating code import database objects and making assertions about code import results.  This does make them slightly less exhaustive as integration tests, but allows them to run without Launchpad's test database infrastructure.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:code-import-decouple-tests into launchpad:master.
diff --git a/lib/lp/codehosting/codeimport/tests/test_worker.py b/lib/lp/codehosting/codeimport/tests/test_worker.py
index b6c2614..6a464b3 100644
--- a/lib/lp/codehosting/codeimport/tests/test_worker.py
+++ b/lib/lp/codehosting/codeimport/tests/test_worker.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Tests for the code import worker."""
@@ -771,7 +771,7 @@ class TestGitImportWorker(WorkerTest):
             new_branch.repository._transport.get('git/git-cache').read())
 
 
-def clean_up_default_stores_for_import(source_details):
+def clean_up_default_stores_for_import(target_id):
     """Clean up the default branch and foreign tree stores for an import.
 
     This checks for an existing branch and/or other import data corresponding
@@ -783,13 +783,13 @@ def clean_up_default_stores_for_import(source_details):
     :source_details: A `CodeImportSourceDetails` describing the import.
     """
     tree_transport = get_transport(config.codeimport.foreign_tree_store)
-    prefix = '%08x' % source_details.target_id
+    prefix = '%08x' % target_id
     if tree_transport.has('.'):
         for filename in tree_transport.list_dir('.'):
             if filename.startswith(prefix):
                 tree_transport.delete(filename)
     branchstore = get_default_bazaar_branch_store()
-    branch_name = '%08x' % source_details.target_id
+    branch_name = '%08x' % target_id
     if branchstore.transport.has(branch_name):
         branchstore.transport.delete_tree(branch_name)
 
@@ -898,7 +898,7 @@ class TestActualImportMixin:
             'trunk', [('README', b'Original contents')])
         source_details = CodeImportSourceDetails.fromArguments(arguments)
 
-        clean_up_default_stores_for_import(source_details)
+        clean_up_default_stores_for_import(source_details.target_id)
 
         script_path = os.path.join(
             config.root, 'scripts', 'code-import-worker.py')
@@ -916,7 +916,8 @@ class TestActualImportMixin:
         self.assertPositive(output.tell())
 
         self.addCleanup(
-            lambda: clean_up_default_stores_for_import(source_details))
+            lambda: clean_up_default_stores_for_import(
+                source_details.target_id))
 
         tree_path = tempfile.mkdtemp()
         self.addCleanup(lambda: shutil.rmtree(tree_path))
@@ -936,7 +937,7 @@ class TestActualImportMixin:
             'trunk', [('README', b'Original contents')])
         source_details = CodeImportSourceDetails.fromArguments(arguments)
 
-        clean_up_default_stores_for_import(source_details)
+        clean_up_default_stores_for_import(source_details.target_id)
 
         script_path = os.path.join(
             config.root, 'scripts', 'code-import-worker.py')
diff --git a/lib/lp/codehosting/codeimport/tests/test_workermonitor.py b/lib/lp/codehosting/codeimport/tests/test_workermonitor.py
index 734c259..f60d19a 100644
--- a/lib/lp/codehosting/codeimport/tests/test_workermonitor.py
+++ b/lib/lp/codehosting/codeimport/tests/test_workermonitor.py
@@ -6,21 +6,27 @@
 from __future__ import absolute_import, print_function
 
 __metaclass__ = type
-__all__ = [
-    'nuke_codeimport_sample_data',
-    ]
 
 import io
 import os
 import shutil
 import subprocess
 import tempfile
+from textwrap import dedent
 
 from bzrlib.branch import Branch
 from bzrlib.tests import TestCaseInTempDir
 from dulwich.repo import Repo as GitRepo
+from fixtures import MockPatchObject
 import oops_twisted
+from pymacaroons import Macaroon
 from six.moves import xmlrpc_client
+from testtools.matchers import (
+    AnyMatch,
+    Equals,
+    IsInstance,
+    MatchesListwise,
+    )
 from testtools.twistedsupport import (
     assert_fails_with,
     AsynchronousDeferredRunTest,
@@ -31,21 +37,12 @@ from twisted.internet import (
     protocol,
     reactor,
     )
-from twisted.web import xmlrpc
-from zope.component import getUtility
-from zope.security.proxy import removeSecurityProxy
-
-from lp.code.enums import (
-    CodeImportResultStatus,
-    CodeImportReviewStatus,
-    RevisionControlSystems,
-    TargetRevisionControlSystems,
+from twisted.web import (
+    server,
+    xmlrpc,
     )
-from lp.code.interfaces.branch import IBranch
-from lp.code.interfaces.codeimport import ICodeImportSet
-from lp.code.interfaces.codeimportjob import ICodeImportJobSet
-from lp.code.model.codeimport import CodeImport
-from lp.code.model.codeimportjob import CodeImportJob
+
+from lp.code.enums import CodeImportResultStatus
 from lp.code.tests.helpers import GitHostingFixture
 from lp.codehosting.codeimport.tests.servers import (
     BzrServer,
@@ -57,7 +54,6 @@ from lp.codehosting.codeimport.tests.test_worker import (
     clean_up_default_stores_for_import,
     )
 from lp.codehosting.codeimport.worker import (
-    CodeImportSourceDetails,
     CodeImportWorkerExitCode,
     get_default_bazaar_branch_store,
     )
@@ -71,7 +67,6 @@ from lp.services.config.fixture import (
     ConfigFixture,
     ConfigUseFixture,
     )
-from lp.services.database.interfaces import IStore
 from lp.services.log.logger import BufferLogger
 from lp.services.twistedsupport import suppress_stderr
 from lp.services.twistedsupport.tests.test_processmonitor import (
@@ -79,16 +74,7 @@ from lp.services.twistedsupport.tests.test_processmonitor import (
     ProcessTestsMixin,
     )
 from lp.services.webapp import errorlog
-from lp.testing import (
-    login,
-    logout,
-    TestCase,
-    )
-from lp.testing.factory import LaunchpadObjectFactory
-from lp.testing.layers import (
-    LaunchpadZopelessLayer,
-    ZopelessAppServerLayer,
-    )
+from lp.testing import TestCase
 from lp.xmlrpc.faults import NoSuchCodeImportJob
 
 
@@ -168,47 +154,54 @@ class TestWorkerMonitorProtocol(ProcessTestsMixin, TestCase):
             self.protocol._tail, 'line 3\nline 4\nline 5\nline 6\n')
 
 
-class FakeCodeImportScheduleEndpointProxy:
-    """A fake implementation of a proxy to `ICodeImportScheduler`.
+class FakeCodeImportScheduler(xmlrpc.XMLRPC, object):
+    """A fake implementation of `ICodeImportScheduler`.
 
     The constructor takes a dictionary mapping job ids to information that
-    should be returned by getImportDataForJobID and the exception to raise if
+    should be returned by getImportDataForJobID and the fault to return if
     getImportDataForJobID is called with a job id not in the passed-in
     dictionary, defaulting to a fault with the same code as
-    NoSuchCodeImportJob (because the class of the exception is lost when you
-    go through XML-RPC serialization).
+    NoSuchCodeImportJob (because the class of the fault is lost when you go
+    through XML-RPC serialization).
     """
 
-    def __init__(self, jobs_dict, no_such_job_exception=None):
+    def __init__(self, jobs_dict, no_such_job_fault=None):
+        super(FakeCodeImportScheduler, self).__init__(allowNone=True)
         self.calls = []
         self.jobs_dict = jobs_dict
-        if no_such_job_exception is None:
-            no_such_job_exception = xmlrpc.Fault(
+        if no_such_job_fault is None:
+            no_such_job_fault = xmlrpc.Fault(
                 faultCode=NoSuchCodeImportJob.error_code, faultString='')
-        self.no_such_job_exception = no_such_job_exception
+        self.no_such_job_fault = no_such_job_fault
+
+    def xmlrpc_getImportDataForJobID(self, job_id):
+        self.calls.append(('getImportDataForJobID', job_id))
+        if job_id in self.jobs_dict:
+            return self.jobs_dict[job_id]
+        else:
+            return self.no_such_job_fault
 
-    def callRemote(self, method_name, *args):
-        method = getattr(self, '_remote_%s' % method_name, self._default)
-        deferred = defer.maybeDeferred(method, *args)
+    def xmlrpc_updateHeartbeat(self, job_id, log_tail):
+        self.calls.append(('updateHeartbeat', job_id, log_tail))
+        return 0
 
-        def append_to_log(pass_through):
-            self.calls.append((method_name,) + tuple(args))
-            return pass_through
+    def xmlrpc_finishJobID(self, job_id, status_name, log_file):
+        self.calls.append(('finishJobID', job_id, status_name, log_file))
 
-        deferred.addCallback(append_to_log)
-        return deferred
 
-    def _default(self, *args):
-        return None
+class FakeCodeImportSchedulerMixin:
 
-    def _remote_getImportDataForJobID(self, job_id):
-        if job_id in self.jobs_dict:
-            return self.jobs_dict[job_id]
-        else:
-            raise self.no_such_job_exception
+    def makeFakeCodeImportScheduler(self, jobs_dict, no_such_job_fault=None):
+        """Start a `FakeCodeImportScheduler` and return its URL."""
+        scheduler = FakeCodeImportScheduler(
+            jobs_dict, no_such_job_fault=no_such_job_fault)
+        scheduler_listener = reactor.listenTCP(0, server.Site(scheduler))
+        self.addCleanup(scheduler_listener.stopListening)
+        scheduler_port = scheduler_listener.getHost().port
+        return scheduler, 'http://localhost:%d/' % scheduler_port
 
 
-class TestWorkerMonitorUnit(TestCase):
+class TestWorkerMonitorUnit(FakeCodeImportSchedulerMixin, TestCase):
     """Unit tests for most of the `CodeImportWorkerMonitor` class.
 
     We have to pay attention to the fact that several of the methods of the
@@ -217,20 +210,19 @@ class TestWorkerMonitorUnit(TestCase):
     they did.
     """
 
-    layer = LaunchpadZopelessLayer
     run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=20)
 
     def makeWorkerMonitorWithJob(self, job_id=1, job_data={}):
+        self.scheduler, scheduler_url = self.makeFakeCodeImportScheduler(
+            {job_id: job_data})
         return CodeImportWorkerMonitor(
-            job_id, BufferLogger(),
-            FakeCodeImportScheduleEndpointProxy({job_id: job_data}),
-            "anything")
+            job_id, BufferLogger(), xmlrpc.Proxy(scheduler_url), "anything")
 
-    def makeWorkerMonitorWithoutJob(self, exception=None):
+    def makeWorkerMonitorWithoutJob(self, fault=None):
+        self.scheduler, scheduler_url = self.makeFakeCodeImportScheduler(
+            {}, fault)
         return CodeImportWorkerMonitor(
-            1, BufferLogger(),
-            FakeCodeImportScheduleEndpointProxy({}, exception),
-            None)
+            1, BufferLogger(), xmlrpc.Proxy(scheduler_url), None)
 
     def test_getWorkerArguments(self):
         # getWorkerArguments returns a deferred that fires with the
@@ -253,8 +245,9 @@ class TestWorkerMonitorUnit(TestCase):
     def test_getWorkerArguments_endpoint_failure_raises(self):
         # When getImportDataForJobID raises an arbitrary exception, it is not
         # handled in any special way by getWorkerArguments.
-        worker_monitor = self.makeWorkerMonitorWithoutJob(
-            exception=ZeroDivisionError())
+        self.useFixture(MockPatchObject(
+            xmlrpc_client, 'loads', side_effect=ZeroDivisionError()))
+        worker_monitor = self.makeWorkerMonitorWithoutJob()
         return assert_fails_with(
             worker_monitor.getWorkerArguments(), ZeroDivisionError)
 
@@ -262,7 +255,7 @@ class TestWorkerMonitorUnit(TestCase):
         # When getImportDataForJobID signals an arbitrary fault, it is not
         # handled in any special way by getWorkerArguments.
         worker_monitor = self.makeWorkerMonitorWithoutJob(
-            exception=xmlrpc.Fault(1, ''))
+            fault=xmlrpc.Fault(1, ''))
         return assert_fails_with(
             worker_monitor.getWorkerArguments(), xmlrpc.Fault)
 
@@ -275,7 +268,7 @@ class TestWorkerMonitorUnit(TestCase):
         def check_updated_details(result):
             self.assertEqual(
                 [('updateHeartbeat', job_id, log_tail)],
-                worker_monitor.codeimport_endpoint.calls)
+                self.scheduler.calls)
 
         return worker_monitor.updateHeartbeat(log_tail).addCallback(
             check_updated_details)
@@ -291,7 +284,7 @@ class TestWorkerMonitorUnit(TestCase):
             self.assertEqual(
                 [('finishJobID', job_id, 'SUCCESS',
                   xmlrpc_client.Binary(b''))],
-                worker_monitor.codeimport_endpoint.calls)
+                self.scheduler.calls)
 
         return worker_monitor.finishJob(
             CodeImportResultStatus.SUCCESS).addCallback(
@@ -309,7 +302,7 @@ class TestWorkerMonitorUnit(TestCase):
             self.assertEqual(
                 [('finishJobID', job_id, 'SUCCESS',
                   xmlrpc_client.Binary(log_bytes))],
-                worker_monitor.codeimport_endpoint.calls)
+                self.scheduler.calls)
 
         return worker_monitor.finishJob(
             CodeImportResultStatus.SUCCESS).addCallback(
@@ -449,7 +442,6 @@ class TestWorkerMonitorUnit(TestCase):
     def test_callFinishJobLogsTracebackOnFailure(self):
         # When callFinishJob is called with a failure, it dumps the traceback
         # of the failure into the log file.
-        self.layer.force_dirty_database()
         worker_monitor = self.makeWorkerMonitorWithJob()
         ret = worker_monitor.callFinishJob(makeFailure(RuntimeError))
 
@@ -471,7 +463,7 @@ class TestWorkerMonitorUnit(TestCase):
         self.assertEqual(calls, [])
 
 
-class TestWorkerMonitorRunNoProcess(TestCase):
+class TestWorkerMonitorRunNoProcess(FakeCodeImportSchedulerMixin, TestCase):
     """Tests for `CodeImportWorkerMonitor.run` that don't launch a subprocess.
     """
 
@@ -486,15 +478,10 @@ class TestWorkerMonitorRunNoProcess(TestCase):
         bit is tested above).
         """
 
-        def __init__(self, process_deferred, has_job=True):
-            if has_job:
-                job_data = {1: {'arguments': []}}
-            else:
-                job_data = {}
+        def __init__(self, job_id, logger, codeimport_endpoint, access_policy,
+                     process_deferred):
             CodeImportWorkerMonitor.__init__(
-                self, 1, BufferLogger(),
-                FakeCodeImportScheduleEndpointProxy(job_data),
-                "anything")
+                self, job_id, logger, codeimport_endpoint, access_policy)
             self.result_status = None
             self.process_deferred = process_deferred
 
@@ -506,6 +493,16 @@ class TestWorkerMonitorRunNoProcess(TestCase):
             self.result_status = status
             return defer.succeed(None)
 
+    def makeWorkerMonitor(self, process_deferred, has_job=True):
+        if has_job:
+            job_data = {1: {'arguments': []}}
+        else:
+            job_data = {}
+        _, scheduler_url = self.makeFakeCodeImportScheduler(job_data)
+        return self.WorkerMonitor(
+            1, BufferLogger(), xmlrpc.Proxy(scheduler_url), "anything",
+            process_deferred)
+
     def assertFinishJobCalledWithStatus(self, ignored, worker_monitor,
                                         status):
         """Assert that finishJob was called with the given status."""
@@ -518,7 +515,7 @@ class TestWorkerMonitorRunNoProcess(TestCase):
     def test_success(self):
         # In the successful case, finishJob is called with
         # CodeImportResultStatus.SUCCESS.
-        worker_monitor = self.WorkerMonitor(defer.succeed(None))
+        worker_monitor = self.makeWorkerMonitor(defer.succeed(None))
         return worker_monitor.run().addCallback(
             self.assertFinishJobCalledWithStatus, worker_monitor,
             CodeImportResultStatus.SUCCESS)
@@ -534,7 +531,7 @@ class TestWorkerMonitorRunNoProcess(TestCase):
             publisher_adapter=oops_twisted.defer_publisher,
             publisher_helpers=oops_twisted.publishers)
         self.addCleanup(errorlog.globalErrorUtility.configure)
-        worker_monitor = self.WorkerMonitor(defer.fail(RuntimeError()))
+        worker_monitor = self.makeWorkerMonitor(defer.fail(RuntimeError()))
         return worker_monitor.run().addCallback(
             self.assertFinishJobCalledWithStatus, worker_monitor,
             CodeImportResultStatus.FAILURE)
@@ -542,7 +539,7 @@ class TestWorkerMonitorRunNoProcess(TestCase):
     def test_quiet_exit(self):
         # If the process deferred fails with ExitQuietly, the call to run()
         # succeeds, and finishJob is not called at all.
-        worker_monitor = self.WorkerMonitor(
+        worker_monitor = self.makeWorkerMonitor(
             defer.succeed(None), has_job=False)
         return worker_monitor.run().addCallback(
             self.assertFinishJobNotCalled, worker_monitor)
@@ -550,7 +547,7 @@ class TestWorkerMonitorRunNoProcess(TestCase):
     def test_quiet_exit_from_finishJob(self):
         # If finishJob fails with ExitQuietly, the call to run() still
         # succeeds.
-        worker_monitor = self.WorkerMonitor(defer.succeed(None))
+        worker_monitor = self.makeWorkerMonitor(defer.succeed(None))
 
         def finishJob(reason):
             raise ExitQuietly
@@ -565,7 +562,7 @@ class TestWorkerMonitorRunNoProcess(TestCase):
             publisher_helpers=oops_twisted.publishers)
         self.addCleanup(errorlog.globalErrorUtility.configure)
         failure_msg = b"test_callFinishJob_logs_failure expected failure"
-        worker_monitor = self.WorkerMonitor(
+        worker_monitor = self.makeWorkerMonitor(
             defer.fail(RuntimeError(failure_msg)))
         d = worker_monitor.run()
 
@@ -580,14 +577,6 @@ class TestWorkerMonitorRunNoProcess(TestCase):
         return d
 
 
-def nuke_codeimport_sample_data():
-    """Delete all the sample data that might interfere with tests."""
-    for job in IStore(CodeImportJob).find(CodeImportJob):
-        job.destroySelf()
-    for code_import in IStore(CodeImport).find(CodeImport):
-        code_import.destroySelf()
-
-
 class CIWorkerMonitorProtocolForTesting(CodeImportWorkerMonitorProtocol):
     """A `CodeImportWorkerMonitorProtocol` that counts `resetTimeout` calls.
     """
@@ -619,27 +608,20 @@ class CIWorkerMonitorForTesting(CodeImportWorkerMonitor):
         return protocol
 
 
-class TestWorkerMonitorIntegration(TestCaseInTempDir, TestCase):
+class TestWorkerMonitorIntegration(FakeCodeImportSchedulerMixin,
+                                   TestCaseInTempDir, TestCase):
 
-    layer = ZopelessAppServerLayer
     run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=60)
 
     def setUp(self):
         super(TestWorkerMonitorIntegration, self).setUp()
-        login('no-priv@xxxxxxxxxxxxx')
-        self.factory = LaunchpadObjectFactory()
-        nuke_codeimport_sample_data()
         self.repo_path = tempfile.mkdtemp()
         self.disable_directory_isolation()
         self.addCleanup(shutil.rmtree, self.repo_path)
         self.foreign_commit_count = 0
 
-    def tearDown(self):
-        super(TestWorkerMonitorIntegration, self).tearDown()
-        logout()
-
     def makeCVSCodeImport(self):
-        """Make a `CodeImport` that points to a real CVS repository."""
+        """Make arguments that point to a real CVS repository."""
         cvs_server = CVSServer(self.repo_path)
         cvs_server.start_server()
         self.addCleanup(cvs_server.stop_server)
@@ -647,11 +629,13 @@ class TestWorkerMonitorIntegration(TestCaseInTempDir, TestCase):
         cvs_server.makeModule('trunk', [('README', 'original\n')])
         self.foreign_commit_count = 2
 
-        return self.factory.makeCodeImport(
-            cvs_root=cvs_server.getRoot(), cvs_module=u'trunk')
+        return [
+            str(self.factory.getUniqueInteger()), 'cvs', 'bzr',
+            cvs_server.getRoot(), '--cvs-module', 'trunk',
+            ]
 
     def makeBzrSvnCodeImport(self):
-        """Make a `CodeImport` that points to a real Subversion repository."""
+        """Make arguments that point to a real Subversion repository."""
         self.subversion_server = SubversionServer(
             self.repo_path, use_svn_serve=True)
         self.subversion_server.start_server()
@@ -660,11 +644,13 @@ class TestWorkerMonitorIntegration(TestCaseInTempDir, TestCase):
             'trunk', [('README', b'contents')])
         self.foreign_commit_count = 2
 
-        return self.factory.makeCodeImport(
-            svn_branch_url=url, rcs_type=RevisionControlSystems.BZR_SVN)
+        return [
+            str(self.factory.getUniqueInteger()), 'bzr-svn', 'bzr',
+            url,
+            ]
 
-    def makeGitCodeImport(self, target_rcs_type=None):
-        """Make a `CodeImport` that points to a real Git repository."""
+    def makeGitCodeImport(self, target_rcs_type='bzr'):
+        """Make arguments that point to a real Git repository."""
         self.git_server = GitServer(self.repo_path, use_server=False)
         self.git_server.start_server()
         self.addCleanup(self.git_server.stop_server)
@@ -672,40 +658,43 @@ class TestWorkerMonitorIntegration(TestCaseInTempDir, TestCase):
         self.git_server.makeRepo('source', [('README', 'contents')])
         self.foreign_commit_count = 1
 
-        return self.factory.makeCodeImport(
-            git_repo_url=self.git_server.get_url('source'),
-            target_rcs_type=target_rcs_type)
+        target_id = (
+            str(self.factory.getUniqueInteger()) if target_rcs_type == 'bzr'
+            else self.factory.getUniqueUnicode())
+        arguments = [
+            target_id, 'git', target_rcs_type,
+            self.git_server.get_url('source'),
+            ]
+        if target_rcs_type == 'git':
+            arguments.extend(['--macaroon', Macaroon().serialize()])
+        return arguments
 
     def makeBzrCodeImport(self):
-        """Make a `CodeImport` that points to a real Bazaar branch."""
+        """Make arguments that point to a real Bazaar branch."""
         self.bzr_server = BzrServer(self.repo_path)
         self.bzr_server.start_server()
         self.addCleanup(self.bzr_server.stop_server)
 
         self.bzr_server.makeRepo([('README', 'contents')])
         self.foreign_commit_count = 1
-        return self.factory.makeCodeImport(
-            bzr_branch_url=self.bzr_server.get_url())
 
-    def getStartedJobForImport(self, code_import):
+        return [
+            str(self.factory.getUniqueInteger()), 'bzr', 'bzr',
+            self.bzr_server.get_url(),
+            ]
+
+    def getStartedJobForImport(self, arguments):
         """Get a started `CodeImportJob` for `code_import`.
 
-        This method approves the import, creates a job, marks it started and
-        returns the job.  It also makes sure there are no branches or foreign
+        This method returns a job ID and job data, imitating an approved job
+        on Launchpad.  It also makes sure there are no branches or foreign
         trees in the default stores to interfere with processing this job.
         """
-        if code_import.review_status != CodeImportReviewStatus.REVIEWED:
-            code_import.updateFromData(
-                {'review_status': CodeImportReviewStatus.REVIEWED},
-                self.factory.makePerson())
-        job = getUtility(ICodeImportJobSet).getJobForMachine(u'machine', 10)
-        self.assertEqual(code_import, job.code_import)
-        source_details = CodeImportSourceDetails.fromArguments(
-            removeSecurityProxy(job.makeWorkerArguments()))
-        if IBranch.providedBy(code_import.target):
-            clean_up_default_stores_for_import(source_details)
-            self.addCleanup(clean_up_default_stores_for_import, source_details)
-        return job
+        if arguments[2] == 'bzr':
+            target_id = int(arguments[0])
+            clean_up_default_stores_for_import(target_id)
+            self.addCleanup(clean_up_default_stores_for_import, target_id)
+        return (1, {'arguments': arguments})
 
     def makeTargetGitServer(self):
         """Set up a target Git server that can receive imports."""
@@ -714,9 +703,9 @@ class TestWorkerMonitorIntegration(TestCaseInTempDir, TestCase):
         self.target_git_server = GitServer(self.target_store, use_server=True)
         self.target_git_server.start_server()
         self.addCleanup(self.target_git_server.stop_server)
-        config_name = self.getUniqueString()
+        config_name = self.factory.getUniqueUnicode()
         config_fixture = self.useFixture(ConfigFixture(
-            config_name, self.layer.config_fixture.instance_name))
+            config_name, os.environ['LPCONFIG']))
         setting_lines = [
             "[codehosting]",
             "git_browse_root: %s" % self.target_git_server.get_url(""),
@@ -728,100 +717,85 @@ class TestWorkerMonitorIntegration(TestCaseInTempDir, TestCase):
         self.useFixture(ConfigUseFixture(config_name))
         self.useFixture(GitHostingFixture())
 
-    def assertCodeImportResultCreated(self, code_import):
-        """Assert that a `CodeImportResult` was created for `code_import`."""
-        self.assertEqual(len(list(code_import.results)), 1)
-        result = list(code_import.results)[0]
-        self.assertEqual(result.status, CodeImportResultStatus.SUCCESS)
-
-    def assertBranchImportedOKForCodeImport(self, code_import):
+    def assertBranchImportedOKForCodeImport(self, target_id):
         """Assert that a branch was pushed into the default branch store."""
-        if IBranch.providedBy(code_import.target):
+        if target_id.isdigit():
             url = get_default_bazaar_branch_store()._getMirrorURL(
-                code_import.branch.id)
+                int(target_id))
             branch = Branch.open(url)
             commit_count = branch.revno()
         else:
-            repo_path = os.path.join(
-                self.target_store, code_import.target.unique_name)
+            repo_path = os.path.join(self.target_store, target_id)
             commit_count = int(subprocess.check_output(
                 ["git", "rev-list", "--count", "HEAD"],
                 cwd=repo_path, universal_newlines=True))
         self.assertEqual(self.foreign_commit_count, commit_count)
 
-    def assertImported(self, code_import_id):
-        """Assert that the `CodeImport` of the given id was imported."""
+    def assertImported(self, job_id, job_data):
+        """Assert that the code import with the given job id was imported.
+
+        Since we don't have a full Launchpad appserver instance here, we
+        just check that the code import worker has made the correct XML-RPC
+        calls via the given worker monitor.
+        """
         # In the in-memory tests, check that resetTimeout on the
         # CodeImportWorkerMonitorProtocol was called at least once.
         if self._protocol is not None:
             self.assertPositive(self._protocol.reset_calls)
-        code_import = getUtility(ICodeImportSet).get(code_import_id)
-        self.assertCodeImportResultCreated(code_import)
-        self.assertBranchImportedOKForCodeImport(code_import)
+        self.assertThat(self.scheduler.calls, AnyMatch(
+            MatchesListwise([
+                Equals('finishJobID'),
+                Equals(job_id),
+                Equals('SUCCESS'),
+                IsInstance(xmlrpc_client.Binary),
+                ])))
+        self.assertBranchImportedOKForCodeImport(job_data['arguments'][0])
 
-    def performImport(self, job_id):
-        """Perform the import job with ID job_id.
+    @defer.inlineCallbacks
+    def performImport(self, job_id, job_data):
+        """Perform the import job with ID job_id and data job_data.
 
-        Return a Deferred that fires when it the job is done.
+        Return a Deferred that fires when the job is done.
 
         This implementation does it in-process.
         """
         logger = BufferLogger()
-        monitor = CIWorkerMonitorForTesting(
-            job_id, logger,
-            xmlrpc.Proxy(
-                config.codeimportdispatcher.codeimportscheduler_url.encode(
-                    'UTF-8')),
-            "anything")
-        deferred = monitor.run()
-
-        def save_protocol_object(result):
-            """Save the process protocol object.
-
-            We do this in an addBoth so that it's called after the process
-            protocol is actually constructed but before we drop the last
-            reference to the monitor object.
-            """
-            self._protocol = monitor._protocol
-            return result
-
-        return deferred.addBoth(save_protocol_object)
+        self.scheduler, scheduler_url = self.makeFakeCodeImportScheduler(
+            {job_id: job_data})
+        worker_monitor = CIWorkerMonitorForTesting(
+            job_id, logger, xmlrpc.Proxy(scheduler_url), "anything")
+        result = yield worker_monitor.run()
+        self._protocol = worker_monitor._protocol
+        defer.returnValue(result)
 
     @defer.inlineCallbacks
     def test_import_cvs(self):
         # Create a CVS CodeImport and import it.
-        job = self.getStartedJobForImport(self.makeCVSCodeImport())
-        code_import_id = job.code_import.id
-        job_id = job.id
-        self.layer.txn.commit()
-        yield self.performImport(job_id)
-        self.assertImported(code_import_id)
+        job_id, job_data = self.getStartedJobForImport(
+            self.makeCVSCodeImport())
+        yield self.performImport(job_id, job_data)
+        self.assertImported(job_id, job_data)
 
     @defer.inlineCallbacks
     def test_import_git(self):
         # Create a Git CodeImport and import it.
-        job = self.getStartedJobForImport(self.makeGitCodeImport())
-        code_import_id = job.code_import.id
-        job_id = job.id
-        self.layer.txn.commit()
-        yield self.performImport(job_id)
-        self.assertImported(code_import_id)
+        job_id, job_data = self.getStartedJobForImport(
+            self.makeGitCodeImport())
+        yield self.performImport(job_id, job_data)
+        self.assertImported(job_id, job_data)
 
     @defer.inlineCallbacks
     def test_import_git_to_git(self):
         # Create a Git-to-Git CodeImport and import it.
         self.makeTargetGitServer()
-        job = self.getStartedJobForImport(self.makeGitCodeImport(
-            target_rcs_type=TargetRevisionControlSystems.GIT))
-        code_import_id = job.code_import.id
-        job_id = job.id
-        self.layer.txn.commit()
+        job_id, job_data = self.getStartedJobForImport(
+            self.makeGitCodeImport(target_rcs_type='git'))
         target_repo_path = os.path.join(
-            self.target_store, job.code_import.target.unique_name)
+            self.target_store, job_data['arguments'][0])
         os.makedirs(target_repo_path)
         self.target_git_server.createRepository(target_repo_path, bare=True)
-        yield self.performImport(job_id)
-        self.assertImported(code_import_id)
+        yield self.performImport(job_id, job_data)
+        self.assertImported(job_id, job_data)
         target_repo = GitRepo(target_repo_path)
         self.assertContentEqual(
             ["heads/master"], target_repo.refs.keys(base="refs"))
@@ -833,11 +807,8 @@ class TestWorkerMonitorIntegration(TestCaseInTempDir, TestCase):
         # Create a Git-to-Git CodeImport and import it incrementally with
         # ref and HEAD changes.
         self.makeTargetGitServer()
-        job = self.getStartedJobForImport(self.makeGitCodeImport(
-            target_rcs_type=TargetRevisionControlSystems.GIT))
-        code_import_id = job.code_import.id
-        job_id = job.id
-        self.layer.txn.commit()
+        job_id, job_data = self.getStartedJobForImport(
+            self.makeGitCodeImport(target_rcs_type='git'))
         source_repo = GitRepo(os.path.join(self.repo_path, "source"))
         commit = source_repo.refs["refs/heads/master"]
         source_repo.refs["refs/heads/one"] = commit
@@ -845,11 +816,11 @@ class TestWorkerMonitorIntegration(TestCaseInTempDir, TestCase):
         source_repo.refs.set_symbolic_ref("HEAD", "refs/heads/one")
         del source_repo.refs["refs/heads/master"]
         target_repo_path = os.path.join(
-            self.target_store, job.code_import.target.unique_name)
+            self.target_store, job_data['arguments'][0])
         self.target_git_server.makeRepo(
-            job.code_import.target.unique_name, [("NEWS", "contents")])
-        yield self.performImport(job_id)
-        self.assertImported(code_import_id)
+            job_data['arguments'][0], [("NEWS", "contents")])
+        yield self.performImport(job_id, job_data)
+        self.assertImported(job_id, job_data)
         target_repo = GitRepo(target_repo_path)
         self.assertContentEqual(
             ["heads/one", "heads/two"], target_repo.refs.keys(base="refs"))
@@ -860,22 +831,18 @@ class TestWorkerMonitorIntegration(TestCaseInTempDir, TestCase):
     @defer.inlineCallbacks
     def test_import_bzr(self):
         # Create a Bazaar CodeImport and import it.
-        job = self.getStartedJobForImport(self.makeBzrCodeImport())
-        code_import_id = job.code_import.id
-        job_id = job.id
-        self.layer.txn.commit()
-        yield self.performImport(job_id)
-        self.assertImported(code_import_id)
+        job_id, job_data = self.getStartedJobForImport(
+            self.makeBzrCodeImport())
+        yield self.performImport(job_id, job_data)
+        self.assertImported(job_id, job_data)
 
     @defer.inlineCallbacks
     def test_import_bzrsvn(self):
         # Create a Subversion-via-bzr-svn CodeImport and import it.
-        job = self.getStartedJobForImport(self.makeBzrSvnCodeImport())
-        code_import_id = job.code_import.id
-        job_id = job.id
-        self.layer.txn.commit()
-        yield self.performImport(job_id)
-        self.assertImported(code_import_id)
+        job_id, job_data = self.getStartedJobForImport(
+            self.makeBzrSvnCodeImport())
+        yield self.performImport(job_id, job_data)
+        self.assertImported(job_id, job_data)
 
 
 class DeferredOnExit(protocol.ProcessProtocol):
@@ -894,16 +861,26 @@ class TestWorkerMonitorIntegrationScript(TestWorkerMonitorIntegration):
     """Tests for CodeImportWorkerMonitor that execute a child process."""
 
     def setUp(self):
-        TestWorkerMonitorIntegration.setUp(self)
+        super(TestWorkerMonitorIntegrationScript, self).setUp()
         self._protocol = None
 
-    def performImport(self, job_id):
-        """Perform the import job with ID job_id.
+    def performImport(self, job_id, job_data):
+        """Perform the import job with ID job_id and data job_data.
 
-        Return a Deferred that fires when it the job is done.
+        Return a Deferred that fires when the job is done.
 
         This implementation does it in a child process.
         """
+        self.scheduler, scheduler_url = self.makeFakeCodeImportScheduler(
+            {job_id: job_data})
+        config_name = self.factory.getUniqueUnicode()
+        config_fixture = self.useFixture(
+            ConfigFixture(config_name, os.environ['LPCONFIG']))
+        config_fixture.add_section(dedent("""
+            [codeimportdispatcher]
+            codeimportscheduler_url: %s
+            """) % scheduler_url)
+        self.useFixture(ConfigUseFixture(config_name))
         script_path = os.path.join(
             config.root, 'scripts', 'code-import-worker-monitor.py')
         process_end_deferred = defer.Deferred()