← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/launchpad/codeimport-git-progress into lp:launchpad

 

Colin Watson has proposed merging lp:~cjwatson/launchpad/codeimport-git-progress into lp:launchpad.

Commit message:
Enable throttled progress output from git-to-git import workers.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/codeimport-git-progress/+merge/323902

This may help with bug 1642699, though I'm not sure yet.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/codeimport-git-progress into lp:launchpad.
=== modified file 'lib/lp/codehosting/codeimport/tests/test_worker.py'
--- lib/lp/codehosting/codeimport/tests/test_worker.py	2017-01-12 18:01:56 +0000
+++ lib/lp/codehosting/codeimport/tests/test_worker.py	2017-05-11 12:10:44 +0000
@@ -50,7 +50,9 @@
 import subvertpy.client
 import subvertpy.ra
 from testtools.matchers import (
+    ContainsAll,
     Equals,
+    LessThan,
     Matcher,
     MatchesListwise,
     Mismatch,
@@ -87,6 +89,7 @@
     ForeignTreeStore,
     get_default_bazaar_branch_store,
     GitImportWorker,
+    GitToGitImportWorker,
     ImportDataStore,
     ToBzrImportWorker,
     )
@@ -1304,6 +1307,53 @@
             branch.repository.get_revision(branch.last_revision()).committer)
 
 
+class TestGitToGitImportWorker(TestCase):
+
+    def test_throttleProgress(self):
+        source_details = self.factory.makeCodeImportSourceDetails(
+            rcstype="git", target_rcstype="git")
+        logger = BufferLogger()
+        worker = GitToGitImportWorker(
+            source_details, logger, AcceptAnythingPolicy())
+        read_fd, write_fd = os.pipe()
+        pid = os.fork()
+        if pid == 0:  # child
+            os.close(read_fd)
+            with os.fdopen(write_fd, "wb") as write:
+                write.write(b"Starting\n")
+                for i in range(50):
+                    time.sleep(0.1)
+                    write.write(("%d ...\r" % i).encode("UTF-8"))
+                    if (i % 10) == 9:
+                        write.write(
+                            ("Interval %d\n" % (i // 10)).encode("UTF-8"))
+                write.write(b"Finishing\n")
+            os._exit(0)
+        else:  # parent
+            os.close(write_fd)
+            with os.fdopen(read_fd, "rb") as read:
+                lines = list(worker._throttleProgress(read, timeout=0.5))
+            os.waitpid(pid, 0)
+            # Matching the exact sequence of lines would be too brittle, but
+            # we require some things to be true:
+            # All the non-progress lines must be present, in the right
+            # order.
+            self.assertEqual(
+                [u"Starting\n", u"Interval 0\n", u"Interval 1\n",
+                 u"Interval 2\n", u"Interval 3\n", u"Interval 4\n",
+                 u"Finishing\n"],
+                [line for line in lines if not line.endswith(u"\r")])
+            # No more than 15 progress lines may be present (allowing some
+            # slack for the child process being slow).
+            progress_lines = [line for line in lines if line.endswith(u"\r")]
+            self.assertThat(len(progress_lines), LessThan(16))
+            # All the progress lines immediately before interval markers
+            # must be present.
+            self.assertThat(
+                progress_lines,
+                ContainsAll([u"%d ...\r" % i for i in (9, 19, 29, 39, 49)]))
+
+
 class CodeImportBranchOpenPolicyTests(TestCase):
 
     def setUp(self):

=== modified file 'lib/lp/codehosting/codeimport/worker.py'
--- lib/lp/codehosting/codeimport/worker.py	2016-11-14 18:29:17 +0000
+++ lib/lp/codehosting/codeimport/worker.py	2017-05-11 12:10:44 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2016 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2017 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """The code import worker. This imports code from foreign repositories."""
@@ -19,10 +19,10 @@
     'get_default_bazaar_branch_store',
     ]
 
-
 import io
 import os
 import shutil
+import signal
 import subprocess
 from urlparse import (
     urlsplit,
@@ -73,6 +73,7 @@
     )
 from pymacaroons import Macaroon
 import SCM
+import six
 from zope.component import getUtility
 from zope.security.proxy import removeSecurityProxy
 
@@ -987,14 +988,94 @@
 class GitToGitImportWorker(ImportWorker):
     """An import worker for imports from Git to Git."""
 
+    def _throttleProgress(self, file_obj, timeout=15.0):
+        """Throttle progress messages from a file object.
+
+        git can produce progress output on stderr, but it produces rather a
+        lot of it and we don't want it all to end up in logs.  Throttle this
+        so that we only produce output every `timeout` seconds, or when we
+        see a line terminated with a newline rather than a carriage return.
+
+        :param file_obj: a file-like object opened in binary mode.
+        :param timeout: emit progress output only after this many seconds
+            have elapsed.
+        :return: an iterator of interesting text lines read from the file.
+        """
+        # newline="" requests universal newlines mode, but without
+        # translation.
+        if six.PY2 and isinstance(file_obj, file):
+            # A Python 2 file object can't be used directly to construct an
+            # io.TextIOWrapper.
+            class _ReadableFileWrapper:
+                def __init__(self, raw):
+                    self._raw = raw
+
+                def __enter__(self):
+                    return self
+
+                def __exit__(self, exc_type, exc_value, exc_tb):
+                    pass
+
+                def readable(self):
+                    return True
+
+                def writable(self):
+                    return False
+
+                def seekable(self):
+                    return True
+
+                def __getattr__(self, name):
+                    return getattr(self._raw, name)
+
+            with _ReadableFileWrapper(file_obj) as readable:
+                with io.BufferedReader(readable) as buffered:
+                    for line in self._throttleProgress(
+                            buffered, timeout=timeout):
+                        yield line
+                    return
+
+        class ReceivedAlarm(Exception):
+            pass
+
+        def alarm_handler(signum, frame):
+            raise ReceivedAlarm()
+
+        old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
+        try:
+            progress_buffer = None
+            with io.TextIOWrapper(
+                    file_obj, encoding="UTF-8", errors="replace",
+                    newline="") as wrapped_file:
+                while True:
+                    try:
+                        signal.setitimer(signal.ITIMER_REAL, timeout)
+                        line = next(wrapped_file)
+                        signal.setitimer(signal.ITIMER_REAL, 0)
+                        if line.endswith(u"\r"):
+                            progress_buffer = line
+                        else:
+                            if progress_buffer is not None:
+                                yield progress_buffer
+                                progress_buffer = None
+                            yield line
+                    except ReceivedAlarm:
+                        if progress_buffer is not None:
+                            yield progress_buffer
+                            progress_buffer = None
+                    except StopIteration:
+                        break
+        finally:
+            signal.setitimer(signal.ITIMER_REAL, 0)
+            signal.signal(signal.SIGALRM, old_alarm)
+
     def _runGit(self, *args, **kwargs):
         """Run git with arguments, sending output to the logger."""
         cmd = ["git"] + list(args)
         git_process = subprocess.Popen(
             cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs)
-        for line in git_process.stdout:
-            line = line.decode("UTF-8", "replace").rstrip("\n")
-            self._logger.info(sanitise_urls(line))
+        for line in self._throttleProgress(git_process.stdout):
+            self._logger.info(sanitise_urls(line.rstrip("\r\n")))
         retcode = git_process.wait()
         if retcode:
             raise subprocess.CalledProcessError(retcode, cmd)
@@ -1127,13 +1208,14 @@
                 # Push the target of HEAD first to ensure that it is always
                 # available.
                 self._runGit(
-                    "push", target_url, "+%s:%s" % (new_head, new_head),
-                    cwd="repository")
+                    "push", "--progress", target_url,
+                    "+%s:%s" % (new_head, new_head), cwd="repository")
                 try:
                     self._setHead(target_url, new_head)
                 except GitProtocolError as e:
                     self._logger.info("Unable to set default branch: %s" % e)
-            self._runGit("push", "--mirror", target_url, cwd="repository")
+            self._runGit(
+                "push", "--progress", "--mirror", target_url, cwd="repository")
         except subprocess.CalledProcessError as e:
             self._logger.info(
                 "Unable to push to hosting service: git push exited %s" %


Follow ups