launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #21555
[Merge] lp:~cjwatson/launchpad/codeimport-git-progress into lp:launchpad
Colin Watson has proposed merging lp:~cjwatson/launchpad/codeimport-git-progress into lp:launchpad.
Commit message:
Enable throttled progress output from git-to-git import workers.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/codeimport-git-progress/+merge/323902
This may help with bug 1642699, though I'm not sure yet.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/codeimport-git-progress into lp:launchpad.
=== modified file 'lib/lp/codehosting/codeimport/tests/test_worker.py'
--- lib/lp/codehosting/codeimport/tests/test_worker.py 2017-01-12 18:01:56 +0000
+++ lib/lp/codehosting/codeimport/tests/test_worker.py 2017-05-11 12:10:44 +0000
@@ -50,7 +50,9 @@
import subvertpy.client
import subvertpy.ra
from testtools.matchers import (
+ ContainsAll,
Equals,
+ LessThan,
Matcher,
MatchesListwise,
Mismatch,
@@ -87,6 +89,7 @@
ForeignTreeStore,
get_default_bazaar_branch_store,
GitImportWorker,
+ GitToGitImportWorker,
ImportDataStore,
ToBzrImportWorker,
)
@@ -1304,6 +1307,53 @@
branch.repository.get_revision(branch.last_revision()).committer)
+class TestGitToGitImportWorker(TestCase):
+
+ def test_throttleProgress(self):
+ source_details = self.factory.makeCodeImportSourceDetails(
+ rcstype="git", target_rcstype="git")
+ logger = BufferLogger()
+ worker = GitToGitImportWorker(
+ source_details, logger, AcceptAnythingPolicy())
+ read_fd, write_fd = os.pipe()
+ pid = os.fork()
+ if pid == 0: # child
+ os.close(read_fd)
+ with os.fdopen(write_fd, "wb") as write:
+ write.write(b"Starting\n")
+ for i in range(50):
+ time.sleep(0.1)
+ write.write(("%d ...\r" % i).encode("UTF-8"))
+ if (i % 10) == 9:
+ write.write(
+ ("Interval %d\n" % (i // 10)).encode("UTF-8"))
+ write.write(b"Finishing\n")
+ os._exit(0)
+ else: # parent
+ os.close(write_fd)
+ with os.fdopen(read_fd, "rb") as read:
+ lines = list(worker._throttleProgress(read, timeout=0.5))
+ os.waitpid(pid, 0)
+ # Matching the exact sequence of lines would be too brittle, but
+ # we require some things to be true:
+ # All the non-progress lines must be present, in the right
+ # order.
+ self.assertEqual(
+ [u"Starting\n", u"Interval 0\n", u"Interval 1\n",
+ u"Interval 2\n", u"Interval 3\n", u"Interval 4\n",
+ u"Finishing\n"],
+ [line for line in lines if not line.endswith(u"\r")])
+ # No more than 15 progress lines may be present (allowing some
+ # slack for the child process being slow).
+ progress_lines = [line for line in lines if line.endswith(u"\r")]
+ self.assertThat(len(progress_lines), LessThan(16))
+ # All the progress lines immediately before interval markers
+ # must be present.
+ self.assertThat(
+ progress_lines,
+ ContainsAll([u"%d ...\r" % i for i in (9, 19, 29, 39, 49)]))
+
+
class CodeImportBranchOpenPolicyTests(TestCase):
def setUp(self):
=== modified file 'lib/lp/codehosting/codeimport/worker.py'
--- lib/lp/codehosting/codeimport/worker.py 2016-11-14 18:29:17 +0000
+++ lib/lp/codehosting/codeimport/worker.py 2017-05-11 12:10:44 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2016 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2017 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""The code import worker. This imports code from foreign repositories."""
@@ -19,10 +19,10 @@
'get_default_bazaar_branch_store',
]
-
import io
import os
import shutil
+import signal
import subprocess
from urlparse import (
urlsplit,
@@ -73,6 +73,7 @@
)
from pymacaroons import Macaroon
import SCM
+import six
from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
@@ -987,14 +988,94 @@
class GitToGitImportWorker(ImportWorker):
"""An import worker for imports from Git to Git."""
+ def _throttleProgress(self, file_obj, timeout=15.0):
+ """Throttle progress messages from a file object.
+
+ git can produce progress output on stderr, but it produces rather a
+ lot of it and we don't want it all to end up in logs. Throttle this
+ so that we only produce output every `timeout` seconds, or when we
+ see a line terminated with a newline rather than a carriage return.
+
+ :param file_obj: a file-like object opened in binary mode.
+ :param timeout: emit progress output only after this many seconds
+ have elapsed.
+ :return: an iterator of interesting text lines read from the file.
+ """
+ # newline="" requests universal newlines mode, but without
+ # translation.
+ if six.PY2 and isinstance(file_obj, file):
+ # A Python 2 file object can't be used directly to construct an
+ # io.TextIOWrapper.
+ class _ReadableFileWrapper:
+ def __init__(self, raw):
+ self._raw = raw
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_tb):
+ pass
+
+ def readable(self):
+ return True
+
+ def writable(self):
+ return False
+
+ def seekable(self):
+ return True
+
+ def __getattr__(self, name):
+ return getattr(self._raw, name)
+
+ with _ReadableFileWrapper(file_obj) as readable:
+ with io.BufferedReader(readable) as buffered:
+ for line in self._throttleProgress(
+ buffered, timeout=timeout):
+ yield line
+ return
+
+ class ReceivedAlarm(Exception):
+ pass
+
+ def alarm_handler(signum, frame):
+ raise ReceivedAlarm()
+
+ old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
+ try:
+ progress_buffer = None
+ with io.TextIOWrapper(
+ file_obj, encoding="UTF-8", errors="replace",
+ newline="") as wrapped_file:
+ while True:
+ try:
+ signal.setitimer(signal.ITIMER_REAL, timeout)
+ line = next(wrapped_file)
+ signal.setitimer(signal.ITIMER_REAL, 0)
+ if line.endswith(u"\r"):
+ progress_buffer = line
+ else:
+ if progress_buffer is not None:
+ yield progress_buffer
+ progress_buffer = None
+ yield line
+ except ReceivedAlarm:
+ if progress_buffer is not None:
+ yield progress_buffer
+ progress_buffer = None
+ except StopIteration:
+ break
+ finally:
+ signal.setitimer(signal.ITIMER_REAL, 0)
+ signal.signal(signal.SIGALRM, old_alarm)
+
def _runGit(self, *args, **kwargs):
"""Run git with arguments, sending output to the logger."""
cmd = ["git"] + list(args)
git_process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs)
- for line in git_process.stdout:
- line = line.decode("UTF-8", "replace").rstrip("\n")
- self._logger.info(sanitise_urls(line))
+ for line in self._throttleProgress(git_process.stdout):
+ self._logger.info(sanitise_urls(line.rstrip("\r\n")))
retcode = git_process.wait()
if retcode:
raise subprocess.CalledProcessError(retcode, cmd)
@@ -1127,13 +1208,14 @@
# Push the target of HEAD first to ensure that it is always
# available.
self._runGit(
- "push", target_url, "+%s:%s" % (new_head, new_head),
- cwd="repository")
+ "push", "--progress", target_url,
+ "+%s:%s" % (new_head, new_head), cwd="repository")
try:
self._setHead(target_url, new_head)
except GitProtocolError as e:
self._logger.info("Unable to set default branch: %s" % e)
- self._runGit("push", "--mirror", target_url, cwd="repository")
+ self._runGit(
+ "push", "--progress", "--mirror", target_url, cwd="repository")
except subprocess.CalledProcessError as e:
self._logger.info(
"Unable to push to hosting service: git push exited %s" %
Follow ups