launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #24015
[Merge] ~cjwatson/launchpad:codeimport-git-progress into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:codeimport-git-progress into launchpad:master.
Commit message:
Enable throttled progress output from git-to-git import workers
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/373732
This may help with bug 1642699, though I'm not sure yet.
Getting things to work with Python 2's disconnect between file objects and the io module is gratuitously inconvenient.
This is essentially the same as https://code.launchpad.net/~cjwatson/launchpad/codeimport-git-progress/+merge/323902, converted to git and rebased on master.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:codeimport-git-progress into launchpad:master.
diff --git a/lib/lp/codehosting/codeimport/tests/test_worker.py b/lib/lp/codehosting/codeimport/tests/test_worker.py
index 8bb5b7f..1ae4ee7 100644
--- a/lib/lp/codehosting/codeimport/tests/test_worker.py
+++ b/lib/lp/codehosting/codeimport/tests/test_worker.py
@@ -57,6 +57,10 @@ import scandir
import subvertpy
import subvertpy.client
import subvertpy.ra
+from testtools.matchers import (
+ ContainsAll,
+ LessThan,
+ )
import lp.codehosting
from lp.codehosting.codeimport.tarball import (
@@ -80,6 +84,7 @@ from lp.codehosting.codeimport.worker import (
ForeignTreeStore,
get_default_bazaar_branch_store,
GitImportWorker,
+ GitToGitImportWorker,
ImportDataStore,
ToBzrImportWorker,
)
@@ -1356,6 +1361,53 @@ class TestBzrImport(WorkerTest, TestActualImportMixin,
branch.repository.get_revision(branch.last_revision()).committer)
+class TestGitToGitImportWorker(TestCase):
+
+ def test_throttleProgress(self):
+ source_details = self.factory.makeCodeImportSourceDetails(
+ rcstype="git", target_rcstype="git")
+ logger = BufferLogger()
+ worker = GitToGitImportWorker(
+ source_details, logger, AcceptAnythingPolicy())
+ read_fd, write_fd = os.pipe()
+ pid = os.fork()
+ if pid == 0: # child
+ os.close(read_fd)
+ with os.fdopen(write_fd, "wb") as write:
+ write.write(b"Starting\n")
+ for i in range(50):
+ time.sleep(0.1)
+ write.write(("%d ...\r" % i).encode("UTF-8"))
+ if (i % 10) == 9:
+ write.write(
+ ("Interval %d\n" % (i // 10)).encode("UTF-8"))
+ write.write(b"Finishing\n")
+ os._exit(0)
+ else: # parent
+ os.close(write_fd)
+ with os.fdopen(read_fd, "rb") as read:
+ lines = list(worker._throttleProgress(read, timeout=0.5))
+ os.waitpid(pid, 0)
+ # Matching the exact sequence of lines would be too brittle, but
+ # we require some things to be true:
+ # All the non-progress lines must be present, in the right
+ # order.
+ self.assertEqual(
+ [u"Starting\n", u"Interval 0\n", u"Interval 1\n",
+ u"Interval 2\n", u"Interval 3\n", u"Interval 4\n",
+ u"Finishing\n"],
+ [line for line in lines if not line.endswith(u"\r")])
+ # No more than 15 progress lines may be present (allowing some
+ # slack for the child process being slow).
+ progress_lines = [line for line in lines if line.endswith(u"\r")]
+ self.assertThat(len(progress_lines), LessThan(16))
+ # All the progress lines immediately before interval markers
+ # must be present.
+ self.assertThat(
+ progress_lines,
+ ContainsAll([u"%d ...\r" % i for i in (9, 19, 29, 39, 49)]))
+
+
class CodeImportBranchOpenPolicyTests(TestCase):
def setUp(self):
diff --git a/lib/lp/codehosting/codeimport/worker.py b/lib/lp/codehosting/codeimport/worker.py
index 8916e8d..371c34d 100644
--- a/lib/lp/codehosting/codeimport/worker.py
+++ b/lib/lp/codehosting/codeimport/worker.py
@@ -19,10 +19,10 @@ __all__ = [
'get_default_bazaar_branch_store',
]
-
import io
import os
import shutil
+import signal
import subprocess
from urlparse import (
urlsplit,
@@ -78,6 +78,7 @@ from lazr.uri import (
)
from pymacaroons import Macaroon
import SCM
+import six
from lp.code.interfaces.branch import get_blacklisted_hostnames
from lp.codehosting.codeimport.foreigntree import CVSWorkingTree
@@ -960,14 +961,94 @@ class BzrImportWorker(PullingImportWorker):
class GitToGitImportWorker(ImportWorker):
"""An import worker for imports from Git to Git."""
+ def _throttleProgress(self, file_obj, timeout=15.0):
+ """Throttle progress messages from a file object.
+
+ git can produce progress output on stderr, but it produces rather a
+ lot of it and we don't want it all to end up in logs. Throttle this
+ so that we only produce output every `timeout` seconds, or when we
+ see a line terminated with a newline rather than a carriage return.
+
+ :param file_obj: a file-like object opened in binary mode.
+ :param timeout: emit progress output only after this many seconds
+ have elapsed.
+ :return: an iterator of interesting text lines read from the file.
+ """
+ # newline="" requests universal newlines mode, but without
+ # translation.
+ if six.PY2 and isinstance(file_obj, file):
+ # A Python 2 file object can't be used directly to construct an
+ # io.TextIOWrapper.
+ class _ReadableFileWrapper:
+ def __init__(self, raw):
+ self._raw = raw
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_tb):
+ pass
+
+ def readable(self):
+ return True
+
+ def writable(self):
+ return False
+
+ def seekable(self):
+ return True
+
+ def __getattr__(self, name):
+ return getattr(self._raw, name)
+
+ with _ReadableFileWrapper(file_obj) as readable:
+ with io.BufferedReader(readable) as buffered:
+ for line in self._throttleProgress(
+ buffered, timeout=timeout):
+ yield line
+ return
+
+ class ReceivedAlarm(Exception):
+ pass
+
+ def alarm_handler(signum, frame):
+ raise ReceivedAlarm()
+
+ old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
+ try:
+ progress_buffer = None
+ with io.TextIOWrapper(
+ file_obj, encoding="UTF-8", errors="replace",
+ newline="") as wrapped_file:
+ while True:
+ try:
+ signal.setitimer(signal.ITIMER_REAL, timeout)
+ line = next(wrapped_file)
+ signal.setitimer(signal.ITIMER_REAL, 0)
+ if line.endswith(u"\r"):
+ progress_buffer = line
+ else:
+ if progress_buffer is not None:
+ yield progress_buffer
+ progress_buffer = None
+ yield line
+ except ReceivedAlarm:
+ if progress_buffer is not None:
+ yield progress_buffer
+ progress_buffer = None
+ except StopIteration:
+ break
+ finally:
+ signal.setitimer(signal.ITIMER_REAL, 0)
+ signal.signal(signal.SIGALRM, old_alarm)
+
def _runGit(self, *args, **kwargs):
"""Run git with arguments, sending output to the logger."""
cmd = ["git"] + list(args)
git_process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs)
- for line in git_process.stdout:
- line = line.decode("UTF-8", "replace").rstrip("\n")
- self._logger.info(sanitise_urls(line))
+ for line in self._throttleProgress(git_process.stdout):
+ self._logger.info(sanitise_urls(line.rstrip("\r\n")))
retcode = git_process.wait()
if retcode:
raise subprocess.CalledProcessError(retcode, cmd)
@@ -1102,13 +1183,14 @@ class GitToGitImportWorker(ImportWorker):
# Push the target of HEAD first to ensure that it is always
# available.
self._runGit(
- "push", target_url, "+%s:%s" % (new_head, new_head),
- cwd="repository")
+ "push", "--progress", target_url,
+ "+%s:%s" % (new_head, new_head), cwd="repository")
try:
self._setHead(target_url, new_head)
except GitProtocolError as e:
self._logger.info("Unable to set default branch: %s" % e)
- self._runGit("push", "--mirror", target_url, cwd="repository")
+ self._runGit(
+ "push", "--progress", "--mirror", target_url, cwd="repository")
except subprocess.CalledProcessError as e:
self._logger.info(
"Unable to push to hosting service: git push exited %s" %