← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:code-import-worker-librarian into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:code-import-worker-librarian into launchpad:master.

Commit message:
Send code import log file data over XML-RPC

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/392369

This will make it easier to split out the code import worker from the main Launchpad codebase, as it will no longer need the remote librarian upload client which currently expects to have direct Launchpad database access.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:code-import-worker-librarian into launchpad:master.
diff --git a/lib/lp/codehosting/codeimport/tests/test_workermonitor.py b/lib/lp/codehosting/codeimport/tests/test_workermonitor.py
index 2810f86..df206b6 100644
--- a/lib/lp/codehosting/codeimport/tests/test_workermonitor.py
+++ b/lib/lp/codehosting/codeimport/tests/test_workermonitor.py
@@ -20,20 +20,17 @@ from bzrlib.branch import Branch
 from bzrlib.tests import TestCaseInTempDir
 from dulwich.repo import Repo as GitRepo
 import oops_twisted
-from six.moves.urllib.request import urlopen
+from six.moves import xmlrpc_client
 from testtools.twistedsupport import (
     assert_fails_with,
     AsynchronousDeferredRunTest,
-    flush_logged_errors,
     )
-import transaction
 from twisted.internet import (
     defer,
     error,
     protocol,
     reactor,
     )
-from twisted.python import log
 from twisted.web import xmlrpc
 from zope.component import getUtility
 from zope.security.proxy import removeSecurityProxy
@@ -88,7 +85,6 @@ from lp.testing import (
     TestCase,
     )
 from lp.testing.factory import LaunchpadObjectFactory
-from lp.testing.fakemethod import FakeMethod
 from lp.testing.layers import (
     LaunchpadZopelessLayer,
     ZopelessAppServerLayer,
@@ -224,20 +220,14 @@ class TestWorkerMonitorUnit(TestCase):
     layer = LaunchpadZopelessLayer
     run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=20)
 
-    class WorkerMonitor(CodeImportWorkerMonitor):
-        """A subclass of CodeImportWorkerMonitor that stubs logging OOPSes."""
-
-        def _logOopsFromFailure(self, failure):
-            log.err(failure)
-
-    def makeWorkerMonitorWithJob(self, job_id=1, job_data=()):
-        return self.WorkerMonitor(
+    def makeWorkerMonitorWithJob(self, job_id=1, job_data={}):
+        return CodeImportWorkerMonitor(
             job_id, BufferLogger(),
             FakeCodeImportScheduleEndpointProxy({job_id: job_data}),
             "anything")
 
     def makeWorkerMonitorWithoutJob(self, exception=None):
-        return self.WorkerMonitor(
+        return CodeImportWorkerMonitor(
             1, BufferLogger(),
             FakeCodeImportScheduleEndpointProxy({}, exception),
             None)
@@ -247,64 +237,11 @@ class TestWorkerMonitorUnit(TestCase):
         # 'arguments' part of what getImportDataForJobID returns.
         args = [self.factory.getUniqueString(),
                 self.factory.getUniqueString()]
-        worker_monitor = self.makeWorkerMonitorWithJob(1, (args, 1, 2))
-        return worker_monitor.getWorkerArguments().addCallback(
-            self.assertEqual, args)
-
-    def test_getWorkerArguments_dict(self):
-        # getWorkerArguments returns a deferred that fires with the
-        # 'arguments' part of what getImportDataForJobID returns.
-        # (New protocol: data passed as a dict.)
-        args = [self.factory.getUniqueString(),
-                self.factory.getUniqueString()]
-        data = {'arguments': args, 'target_url': 1, 'log_file_name': 2}
+        data = {'arguments': args}
         worker_monitor = self.makeWorkerMonitorWithJob(1, data)
         return worker_monitor.getWorkerArguments().addCallback(
             self.assertEqual, args)
 
-    def test_getWorkerArguments_sets_target_url_and_logfilename(self):
-        # getWorkerArguments sets the _target_url (for use in oops reports)
-        # and _log_file_name (for upload to the librarian) attributes on the
-        # WorkerMonitor from the data returned by getImportDataForJobID.
-        target_url = self.factory.getUniqueString()
-        log_file_name = self.factory.getUniqueString()
-        worker_monitor = self.makeWorkerMonitorWithJob(
-            1, (['a'], target_url, log_file_name))
-
-        def check_branch_log(ignored):
-            # Looking at the _ attributes here is in slightly poor taste, but
-            # much much easier than them by logging and parsing an oops, etc.
-            self.assertEqual(
-                (target_url, log_file_name),
-                (worker_monitor._target_url, worker_monitor._log_file_name))
-
-        return worker_monitor.getWorkerArguments().addCallback(
-            check_branch_log)
-
-    def test_getWorkerArguments_sets_target_url_and_logfilename_dict(self):
-        # getWorkerArguments sets the _target_url (for use in oops reports)
-        # and _log_file_name (for upload to the librarian) attributes on the
-        # WorkerMonitor from the data returned by getImportDataForJobID.
-        # (New protocol: data passed as a dict.)
-        target_url = self.factory.getUniqueString()
-        log_file_name = self.factory.getUniqueString()
-        data = {
-            'arguments': ['a'],
-            'target_url': target_url,
-            'log_file_name': log_file_name,
-            }
-        worker_monitor = self.makeWorkerMonitorWithJob(1, data)
-
-        def check_branch_log(ignored):
-            # Looking at the _ attributes here is in slightly poor taste, but
-            # much much easier than them by logging and parsing an oops, etc.
-            self.assertEqual(
-                (target_url, log_file_name),
-                (worker_monitor._target_url, worker_monitor._log_file_name))
-
-        return worker_monitor.getWorkerArguments().addCallback(
-            check_branch_log)
-
     def test_getWorkerArguments_job_not_found_raises_exit_quietly(self):
         # When getImportDataForJobID signals a fault indicating that
         # getWorkerArguments didn't find the supplied job, getWorkerArguments
@@ -345,61 +282,34 @@ class TestWorkerMonitorUnit(TestCase):
 
     def test_finishJob_calls_finishJobID_empty_log_file(self):
         # When the log file is empty, finishJob calls finishJobID with the
-        # name of the status enum and an empty string to indicate that no log
-        # file was uplaoded to the librarian.
+        # name of the status enum and an empty binary string.
         job_id = self.factory.getUniqueInteger()
         worker_monitor = self.makeWorkerMonitorWithJob(job_id)
         self.assertEqual(worker_monitor._log_file.tell(), 0)
 
         def check_finishJob_called(result):
             self.assertEqual(
-                [('finishJobID', job_id, 'SUCCESS', '')],
+                [('finishJobID', job_id, 'SUCCESS',
+                  xmlrpc_client.Binary(b''))],
                 worker_monitor.codeimport_endpoint.calls)
 
         return worker_monitor.finishJob(
             CodeImportResultStatus.SUCCESS).addCallback(
             check_finishJob_called)
 
-    def test_finishJob_uploads_nonempty_file_to_librarian(self):
-        # finishJob method uploads the log file to the librarian and calls the
-        # finishJobID XML-RPC method with the url of that file.
-        self.layer.force_dirty_database()
-        log_bytes = self.factory.getUniqueBytes()
-        worker_monitor = self.makeWorkerMonitorWithJob()
-        worker_monitor._log_file.write(log_bytes)
-
-        def check_file_uploaded(result):
-            transaction.abort()
-            url = worker_monitor.codeimport_endpoint.calls[0][3]
-            got_log_bytes = urlopen(url).read()
-            self.assertEqual(log_bytes, got_log_bytes)
-
-        return worker_monitor.finishJob(
-            CodeImportResultStatus.SUCCESS).addCallback(
-            check_file_uploaded)
-
-    @suppress_stderr
-    def test_finishJob_still_calls_finishJobID_if_upload_fails(self):
-        # If the upload to the librarian fails for any reason, the worker
-        # monitor still calls the finishJobID XML-RPC method, but logs an
-        # error to indicate there was a problem.
-        class Fail(Exception):
-            """Some arbitrary failure."""
-
-        # Write some text so that we try to upload the log.
+    def test_finishJob_sends_nonempty_file_to_scheduler(self):
+        # finishJob method calls finishJobID with the contents of the log
+        # file.
         job_id = self.factory.getUniqueInteger()
+        log_bytes = self.factory.getUniqueBytes()
         worker_monitor = self.makeWorkerMonitorWithJob(job_id)
-        worker_monitor._log_file.write(b'some text')
-
-        # Make _createLibrarianFileAlias fail in a distinctive way.
-        worker_monitor._createLibrarianFileAlias = FakeMethod(failure=Fail())
+        worker_monitor._log_file.write(log_bytes)
 
         def check_finishJob_called(result):
             self.assertEqual(
-                [('finishJobID', job_id, 'SUCCESS', '')],
+                [('finishJobID', job_id, 'SUCCESS',
+                  xmlrpc_client.Binary(log_bytes))],
                 worker_monitor.codeimport_endpoint.calls)
-            errors = flush_logged_errors(Fail)
-            self.assertEqual(1, len(errors))
 
         return worker_monitor.finishJob(
             CodeImportResultStatus.SUCCESS).addCallback(
@@ -578,7 +488,7 @@ class TestWorkerMonitorRunNoProcess(TestCase):
 
         def __init__(self, process_deferred, has_job=True):
             if has_job:
-                job_data = {1: ([], '', '')}
+                job_data = {1: {'arguments': []}}
             else:
                 job_data = {}
             CodeImportWorkerMonitor.__init__(
diff --git a/lib/lp/codehosting/codeimport/workermonitor.py b/lib/lp/codehosting/codeimport/workermonitor.py
index 1fc9ec2..6fa304b 100644
--- a/lib/lp/codehosting/codeimport/workermonitor.py
+++ b/lib/lp/codehosting/codeimport/workermonitor.py
@@ -13,6 +13,7 @@ import os
 import tempfile
 
 import six
+from six.moves import xmlrpc_client
 from twisted.internet import (
     defer,
     error,
@@ -21,16 +22,13 @@ from twisted.internet import (
     )
 from twisted.python import failure
 from twisted.web import xmlrpc
-from zope.component import getUtility
 
 from lp.code.enums import CodeImportResultStatus
 from lp.codehosting.codeimport.worker import CodeImportWorkerExitCode
 from lp.services.config import config
-from lp.services.librarian.interfaces.client import IFileUploadClient
 from lp.services.twistedsupport.processmonitor import (
     ProcessMonitorProtocolWithTimeout,
     )
-from lp.services.webapp import errorlog
 from lp.xmlrpc.faults import NoSuchCodeImportJob
 
 
@@ -137,30 +135,8 @@ class CodeImportWorkerMonitor:
         self.codeimport_endpoint = codeimport_endpoint
         self._call_finish_job = True
         self._log_file = tempfile.TemporaryFile()
-        self._target_url = None
-        self._log_file_name = 'no-name-set.txt'
         self._access_policy = access_policy
 
-    def _logOopsFromFailure(self, failure):
-        config = errorlog.globalErrorUtility._oops_config
-        context = {
-            'twisted_failure': failure,
-            'http_request': errorlog.ScriptRequest(
-                [('code_import_job_id', self._job_id)], self._target_url),
-            }
-        report = config.create(context)
-
-        def log_oops_if_published(ids):
-            if ids:
-                self._logger.info(
-                    "Logged OOPS id %s: %s: %s",
-                    report['id'], report.get('type', 'No exception type'),
-                    report.get('value', 'No exception value'))
-
-        d = config.publish(report)
-        d.addCallback(log_oops_if_published)
-        return d
-
     def _trap_nosuchcodeimportjob(self, failure):
         failure.trap(xmlrpc.Fault)
         if failure.value.faultCode == NoSuchCodeImportJob.error_code:
@@ -170,25 +146,12 @@ class CodeImportWorkerMonitor:
             raise failure.value
 
     def getWorkerArguments(self):
-        """Get arguments for the worker for the import we are working on.
-
-        This also sets the _target_url and _log_file_name attributes for use
-        in the _logOopsFromFailure and finishJob methods respectively.
-        """
+        """Get arguments for the worker for the import we are working on."""
         deferred = self.codeimport_endpoint.callRemote(
             'getImportDataForJobID', self._job_id)
 
         def _processResult(result):
-            if isinstance(result, dict):
-                code_import_arguments = result['arguments']
-                target_url = result['target_url']
-                log_file_name = result['log_file_name']
-            else:
-                # XXX cjwatson 2018-03-15: Remove once the scheduler always
-                # sends a dict.
-                code_import_arguments, target_url, log_file_name = result
-            self._target_url = target_url
-            self._log_file_name = log_file_name
+            code_import_arguments = result['arguments']
             self._logger.info(
                 'Found source details: %s', code_import_arguments)
             return code_import_arguments
@@ -206,40 +169,12 @@ class CodeImportWorkerMonitor:
             six.ensure_text(tail, errors='replace'))
         return deferred.addErrback(self._trap_nosuchcodeimportjob)
 
-    def _createLibrarianFileAlias(self, name, size, file, contentType):
-        """Call `IFileUploadClient.remoteAddFile` with the given parameters.
-
-        This is a separate method that exists only to be patched in tests.
-        """
-        # This blocks, but never mind: nothing else is going on in the process
-        # by this point.  We could dispatch to a thread if we felt like it, or
-        # even come up with an asynchronous implementation of the librarian
-        # protocol (it's not very complicated).
-        return getUtility(IFileUploadClient).remoteAddFile(
-            name, size, file, contentType)
-
     def finishJob(self, status):
-        """Call the finishJobID method for the job we are working on.
-
-        This method uploads the log file to the librarian first.
-        """
-        log_file_size = self._log_file.tell()
-        if log_file_size > 0:
-            self._log_file.seek(0)
-            try:
-                log_file_alias_url = self._createLibrarianFileAlias(
-                    self._log_file_name, log_file_size, self._log_file,
-                    'text/plain')
-                self._logger.info(
-                    "Uploaded logs to librarian %s.", log_file_alias_url)
-            except:
-                self._logger.error("Upload to librarian failed.")
-                self._logOopsFromFailure(failure.Failure())
-                log_file_alias_url = ''
-        else:
-            log_file_alias_url = ''
+        """Call the finishJobID method for the job we are working on."""
+        self._log_file.seek(0)
         return self.codeimport_endpoint.callRemote(
-            'finishJobID', self._job_id, status.name, log_file_alias_url
+            'finishJobID', self._job_id, status.name,
+            xmlrpc_client.Binary(self._log_file.read())
             ).addErrback(self._trap_nosuchcodeimportjob)
 
     def _makeProcessProtocol(self, deferred):