launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #25934
[Merge] ~cjwatson/launchpad:py3-puller into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:py3-puller into launchpad:master.
Commit message:
Fix lp.codehosting.puller for Python 3
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/395814
The netstring communication with the worker is now properly treated as bytes.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:py3-puller into launchpad:master.
diff --git a/lib/lp/codehosting/puller/scheduler.py b/lib/lp/codehosting/puller/scheduler.py
index 0102839..e2fbf88 100644
--- a/lib/lp/codehosting/puller/scheduler.py
+++ b/lib/lp/codehosting/puller/scheduler.py
@@ -12,9 +12,9 @@ __all__ = [
'PullerMonitorProtocol',
]
+import io
import os
import socket
-from StringIO import StringIO
from contrib.glock import (
GlobalLock,
@@ -123,6 +123,11 @@ class PullerWireProtocol(NetstringReceiver):
def stringReceived(self, line):
"""See `NetstringReceiver.stringReceived`."""
+ try:
+ line = line.decode('UTF-8')
+ except UnicodeDecodeError:
+ self.puller_protocol.unexpectedError(
+ failure.Failure(BadMessage(line)))
if (self._current_command is not None
and self._expected_args is not None):
# state [2]
@@ -180,7 +185,7 @@ class PullerMonitorProtocol(ProcessMonitorProtocolWithTimeout,
self.reported_mirror_finished = False
self.listener = listener
self.wire_protocol = PullerWireProtocol(self)
- self._stderr = StringIO()
+ self._stderr = io.BytesIO()
self._deferred.addCallbacks(
self.checkReportingFinishedAndNoStderr,
self.ensureReportingFinished)
@@ -194,7 +199,7 @@ class PullerMonitorProtocol(ProcessMonitorProtocolWithTimeout,
When the process exits cleanly, we expect it to have not printed
anything to stderr and to have reported success or failure. If it has
failed to do either of these things, we should fail noisily."""
- stderr = self._stderr.getvalue()
+ stderr = self._stderr.getvalue().decode('UTF-8', 'replace')
if stderr:
fail = failure.Failure(UnexpectedStderr(stderr))
fail.error = stderr
@@ -213,7 +218,7 @@ class PullerMonitorProtocol(ProcessMonitorProtocolWithTimeout,
as a failure reason.
"""
if not self.reported_mirror_finished:
- stderr = self._stderr.getvalue()
+ stderr = self._stderr.getvalue().decode('UTF-8', 'replace')
reason.error = stderr
if stderr:
errorline = stderr.splitlines()[-1]
diff --git a/lib/lp/codehosting/puller/tests/__init__.py b/lib/lp/codehosting/puller/tests/__init__.py
index 184af86..d0e8566 100644
--- a/lib/lp/codehosting/puller/tests/__init__.py
+++ b/lib/lp/codehosting/puller/tests/__init__.py
@@ -7,10 +7,10 @@ from __future__ import absolute_import, print_function
__metaclass__ = type
+import io
import os
import shutil
import socket
-from StringIO import StringIO
from breezy.tests import TestCaseWithTransport
from breezy.tests.http_server import (
@@ -48,7 +48,7 @@ class PullerWorkerMixin:
policy=None):
"""Anonymous creation method for PullerWorker."""
if protocol is None:
- protocol = PullerWorkerProtocol(StringIO())
+ protocol = PullerWorkerProtocol(io.BytesIO())
if branch_type is None:
if policy is None:
policy = AcceptAnythingBranchMirrorerPolicy()
diff --git a/lib/lp/codehosting/puller/tests/test_acceptance.py b/lib/lp/codehosting/puller/tests/test_acceptance.py
index 3db947a..e40dc37 100644
--- a/lib/lp/codehosting/puller/tests/test_acceptance.py
+++ b/lib/lp/codehosting/puller/tests/test_acceptance.py
@@ -25,6 +25,7 @@ from breezy.urlutils import (
)
from breezy.workingtree import WorkingTree
from fixtures import TempDir
+import six
import transaction
from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
@@ -82,7 +83,8 @@ class TestBranchPuller(PullerBranchTestCase, LoomTestMixin):
self.assertEqual(0, db_branch.mirror_failures)
mirrored_branch = self.openBranchAsUser(db_branch, accessing_user)
self.assertEqual(
- source_branch.last_revision(), db_branch.last_mirrored_id)
+ six.ensure_text(source_branch.last_revision()),
+ db_branch.last_mirrored_id)
self.assertEqual(
source_branch.last_revision(), mirrored_branch.last_revision())
self.assertEqual(
@@ -117,7 +119,8 @@ class TestBranchPuller(PullerBranchTestCase, LoomTestMixin):
:param command: A command and arguments given as a list.
:return: retcode, stdout, stderr
"""
- process = Popen(command, stdout=PIPE, stderr=PIPE)
+ process = Popen(
+ command, stdout=PIPE, stderr=PIPE, universal_newlines=True)
output, error = process.communicate()
return process.returncode, output, error
diff --git a/lib/lp/codehosting/puller/tests/test_scheduler.py b/lib/lp/codehosting/puller/tests/test_scheduler.py
index f87c78f..10b0f55 100644
--- a/lib/lp/codehosting/puller/tests/test_scheduler.py
+++ b/lib/lp/codehosting/puller/tests/test_scheduler.py
@@ -151,12 +151,14 @@ class TestPullerWireProtocol(TestCase):
def convertToNetstring(self, string):
"""Encode `string` as a netstring."""
- return '%d:%s,' % (len(string), string)
+ return b'%d:%s,' % (len(string), string)
def sendToProtocol(self, *arguments):
"""Send each element of `arguments` to the protocol as a netstring."""
for argument in arguments:
- self.protocol.dataReceived(self.convertToNetstring(str(argument)))
+ if not isinstance(argument, bytes):
+ argument = six.text_type(argument).encode('UTF-8')
+ self.protocol.dataReceived(self.convertToNetstring(argument))
def assertUnexpectedErrorCalled(self, exception_type):
"""Assert that the puller protocol's unexpectedError has been called.
@@ -213,10 +215,16 @@ class TestPullerWireProtocol(TestCase):
self.sendToProtocol('foo')
self.assertUnexpectedErrorCalled(scheduler.BadMessage)
+ def test_nonUTF8Message(self):
+ # The protocol notifies the listener if it receives a line not
+ # encoded in UTF-8.
+ self.sendToProtocol(b'\x80')
+ self.assertUnexpectedErrorCalled(scheduler.BadMessage)
+
def test_invalidNetstring(self):
# The protocol terminates the session if it receives an unparsable
# netstring.
- self.protocol.dataReceived('foo')
+ self.protocol.dataReceived(b'foo')
self.assertUnexpectedErrorCalled(NetstringParseError)
@@ -346,7 +354,7 @@ class TestPullerMonitorProtocol(ProcessTestsMixin, TestCase):
self.termination_deferred.addErrback(check_failure)
- self.protocol.errReceived('error message')
+ self.protocol.errReceived(b'error message')
self.simulateProcessExit(clean=False)
return assert_fails_with(
@@ -367,7 +375,7 @@ class TestPullerMonitorProtocol(ProcessTestsMixin, TestCase):
self.termination_deferred.addErrback(check_failure)
- self.protocol.errReceived('error message')
+ self.protocol.errReceived(b'error message')
self.simulateProcessExit()
return self.termination_deferred
@@ -385,7 +393,7 @@ class TestPullerMonitorProtocol(ProcessTestsMixin, TestCase):
# If the subprocess exits before reporting success or failure, the
# puller master should record failure.
self.protocol.do_startMirroring()
- self.protocol.errReceived('traceback')
+ self.protocol.errReceived(b'traceback')
self.simulateProcessExit(clean=False)
self.assertEqual(
self.listener.calls,
@@ -410,7 +418,7 @@ class TestPullerMonitorProtocol(ProcessTestsMixin, TestCase):
self.protocol.listener = FailingMirrorFailedStubPullerListener()
self.listener = self.protocol.listener
- self.protocol.errReceived('traceback')
+ self.protocol.errReceived(b'traceback')
self.simulateProcessExit(clean=False)
self.assertEqual(
flush_logged_errors(RuntimeError), [runtime_error_failure])
@@ -441,8 +449,8 @@ class TestPullerMaster(TestCase):
self.assertEqual('error message', oops['value'])
self.assertEqual('RuntimeError', oops['type'])
self.assertEqual(
- get_canonical_url_for_branch_name(
- self.eventHandler.unique_name), oops['url'])
+ six.ensure_binary(get_canonical_url_for_branch_name(
+ self.eventHandler.unique_name)), oops['url'])
def test_startMirroring(self):
# startMirroring does not send a message to the endpoint.
@@ -548,7 +556,8 @@ parser = OptionParser()
branch_type_name, default_stacked_on_url) = arguments
from breezy import branch
branch = branch.Branch.open(destination_url)
-protocol = PullerWorkerProtocol(sys.stdout)
+stdout = getattr(sys.stdout, 'buffer', sys.stdout)
+protocol = PullerWorkerProtocol(stdout)
"""
@@ -622,7 +631,7 @@ class TestPullerMasterIntegration(PullerBranchTestCase):
default_format.repository_format.get_format_string())
self.assertEqual(
[('branchChanged', LAUNCHPAD_SERVICES, self.db_branch.id, '',
- revision_id, control_string, branch_string,
+ six.ensure_str(revision_id), control_string, branch_string,
repository_string)],
self.client.calls)
return ignored
@@ -707,7 +716,7 @@ class TestPullerMasterIntegration(PullerBranchTestCase):
protocol.mirrorFailed('a', 'b')
protocol.sendEvent(
'lock_id', branch.control_files._lock.peek().get('user'))
- sys.stdout.flush()
+ stdout.flush()
branch.unlock()
"""
@@ -791,7 +800,7 @@ class TestPullerMasterIntegration(PullerBranchTestCase):
lock_and_wait_script = """
branch.lock_write()
protocol.sendEvent('branchLocked')
- sys.stdout.flush()
+ stdout.flush()
time.sleep(3600)
"""
diff --git a/lib/lp/codehosting/puller/tests/test_worker.py b/lib/lp/codehosting/puller/tests/test_worker.py
index c9dad6e..e011e54 100644
--- a/lib/lp/codehosting/puller/tests/test_worker.py
+++ b/lib/lp/codehosting/puller/tests/test_worker.py
@@ -8,7 +8,7 @@ from __future__ import absolute_import, print_function
__metaclass__ = type
import gc
-from StringIO import StringIO
+import io
import breezy.branch
from breezy.bzr.branch import BranchReferenceFormat
@@ -30,6 +30,7 @@ from breezy.url_policy_open import (
BranchOpener,
BranchOpenPolicy,
)
+import six
from lp.code.enums import BranchType
from lp.codehosting.puller.tests import (
@@ -57,16 +58,16 @@ from lp.testing.factory import (
def get_netstrings(line):
"""Parse `line` as a sequence of netstrings.
- :return: A list of strings.
+ :return: A list of byte strings.
"""
strings = []
while len(line) > 0:
- colon_index = line.find(':')
+ colon_index = line.find(b':')
length = int(line[:colon_index])
strings.append(line[(colon_index + 1):(colon_index + 1 + length)])
- assert ',' == line[colon_index + 1 + length], (
- 'Expected %r == %r' % (',', line[colon_index + 1 + length]))
- line = line[colon_index + length + 2:]
+ line = line[colon_index + 1 + length:]
+ assert b',' == line[:1], 'Expected %r == %r' % (b',', line[:1])
+ line = line[1:]
return strings
@@ -229,15 +230,15 @@ class TestPullerWorker(TestCaseWithTransport, PullerWorkerMixin):
def getStackedOnUrlFromNetStringOutput(self, netstring_output):
netstrings = get_netstrings(netstring_output)
- branchChanged_index = netstrings.index('branchChanged')
- return netstrings[branchChanged_index + 2]
+ branchChanged_index = netstrings.index(b'branchChanged')
+ return six.ensure_text(netstrings[branchChanged_index + 2])
def testSendsStackedInfo(self):
# When the puller worker stacks a branch, it reports the stacked on
# URL to the master.
base_branch = self.make_branch('base_branch', format='1.9')
stacked_branch = self.make_branch('stacked-branch', format='1.9')
- protocol_output = StringIO()
+ protocol_output = io.BytesIO()
to_mirror = self.makePullerWorker(
stacked_branch.base, self.get_url('destdir'),
protocol=PullerWorkerProtocol(protocol_output),
@@ -251,7 +252,7 @@ class TestPullerWorker(TestCaseWithTransport, PullerWorkerMixin):
# Mirroring an unstackable branch sends '' as the stacked-on location
# to the master.
source_branch = self.make_branch('source-branch', format='pack-0.92')
- protocol_output = StringIO()
+ protocol_output = io.BytesIO()
to_mirror = self.makePullerWorker(
source_branch.base, self.get_url('destdir'),
protocol=PullerWorkerProtocol(protocol_output))
@@ -264,7 +265,7 @@ class TestPullerWorker(TestCaseWithTransport, PullerWorkerMixin):
# Mirroring a non-stacked branch sends '' as the stacked-on location
# to the master.
source_branch = self.make_branch('source-branch', format='1.9')
- protocol_output = StringIO()
+ protocol_output = io.BytesIO()
to_mirror = self.makePullerWorker(
source_branch.base, self.get_url('destdir'),
protocol=PullerWorkerProtocol(protocol_output))
@@ -296,7 +297,7 @@ class TestReferenceOpener(TestCaseWithTransport):
branch_reference_format = BranchReferenceFormat()
branch_transport = a_bzrdir.get_branch_transport(
branch_reference_format)
- branch_transport.put_bytes('location', url)
+ branch_transport.put_bytes('location', six.ensure_binary(url))
branch_transport.put_bytes(
'format', branch_reference_format.get_format_string())
return a_bzrdir.root_transport.base
@@ -431,7 +432,7 @@ class TestWorkerProtocol(TestCaseInTempDir, PullerWorkerMixin):
def setUp(self):
TestCaseInTempDir.setUp(self)
- self.output = StringIO()
+ self.output = io.BytesIO()
self.protocol = PullerWorkerProtocol(self.output)
self.factory = ObjectFactory()
@@ -443,7 +444,8 @@ class TestWorkerProtocol(TestCaseInTempDir, PullerWorkerMixin):
def resetBuffers(self):
# Empty the test output and error buffers.
self.output.truncate(0)
- self.assertEqual('', self.output.getvalue())
+ self.output.seek(0)
+ self.assertEqual(b'', self.output.getvalue())
def test_nothingSentOnConstruction(self):
# The protocol sends nothing until it receives an event.
@@ -453,15 +455,15 @@ class TestWorkerProtocol(TestCaseInTempDir, PullerWorkerMixin):
def test_startMirror(self):
# Calling startMirroring sends 'startMirroring' as a netstring.
self.protocol.startMirroring()
- self.assertSentNetstrings(['startMirroring', '0'])
+ self.assertSentNetstrings([b'startMirroring', b'0'])
def test_branchChanged(self):
# Calling 'branchChanged' sends the arguments.
- arbitrary_args = [self.factory.getUniqueString() for x in range(6)]
+ arbitrary_args = [self.factory.getUniqueBytes() for x in range(6)]
self.protocol.startMirroring()
self.resetBuffers()
self.protocol.branchChanged(*arbitrary_args)
- self.assertSentNetstrings(['branchChanged', '6'] + arbitrary_args)
+ self.assertSentNetstrings([b'branchChanged', b'6'] + arbitrary_args)
def test_mirrorFailed(self):
# Calling 'mirrorFailed' sends the error message.
@@ -469,19 +471,19 @@ class TestWorkerProtocol(TestCaseInTempDir, PullerWorkerMixin):
self.resetBuffers()
self.protocol.mirrorFailed('Error Message', 'OOPS')
self.assertSentNetstrings(
- ['mirrorFailed', '2', 'Error Message', 'OOPS'])
+ [b'mirrorFailed', b'2', b'Error Message', b'OOPS'])
def test_progressMade(self):
# Calling 'progressMade' sends an arbitrary string indicating
# progress.
self.protocol.progressMade('test')
- self.assertSentNetstrings(['progressMade', '0'])
+ self.assertSentNetstrings([b'progressMade', b'0'])
def test_log(self):
# Calling 'log' sends 'log' as a netstring and its arguments, after
# formatting as a string.
self.protocol.log('logged %s', 'message')
- self.assertSentNetstrings(['log', '1', 'logged message'])
+ self.assertSentNetstrings([b'log', b'1', b'logged message'])
class TestWorkerProgressReporting(TestCaseWithTransport):
diff --git a/lib/lp/codehosting/puller/worker.py b/lib/lp/codehosting/puller/worker.py
index c0b426f..ce7a904 100644
--- a/lib/lp/codehosting/puller/worker.py
+++ b/lib/lp/codehosting/puller/worker.py
@@ -110,13 +110,16 @@ class PullerWorkerProtocol:
self.out_stream = output
def sendNetstring(self, string):
- self.out_stream.write('%d:%s,' % (len(string), string))
+ self.out_stream.write(
+ b'%d:%s,' % (len(string), six.ensure_binary(string)))
def sendEvent(self, command, *args):
self.sendNetstring(command)
self.sendNetstring(str(len(args)))
for argument in args:
- self.sendNetstring(str(argument))
+ if not isinstance(argument, bytes):
+ argument = six.text_type(argument).encode('UTF-8')
+ self.sendNetstring(argument)
def startMirroring(self):
self.sendEvent('startMirroring')
@@ -163,7 +166,7 @@ class BranchMirrorerPolicy(BranchOpenPolicy):
# Looms suck.
revision_id = None
else:
- revision_id = 'null:'
+ revision_id = b'null:'
source_branch.controldir.clone_on_transport(
dest_transport, revision_id=revision_id)
return Branch.open(destination_url)
diff --git a/scripts/mirror-branch.py b/scripts/mirror-branch.py
index 8aa9ff7..69f8b33 100755
--- a/scripts/mirror-branch.py
+++ b/scripts/mirror-branch.py
@@ -72,7 +72,8 @@ if __name__ == '__main__':
resource.setrlimit(resource.RLIMIT_AS, (1500000000, 1500000000))
- protocol = PullerWorkerProtocol(sys.stdout)
+ # The worker outputs netstrings, which are bytes.
+ protocol = PullerWorkerProtocol(getattr(sys.stdout, 'buffer', sys.stdout))
install_worker_ui_factory(protocol)
PullerWorker(
source_url, destination_url, int(branch_id), unique_name, branch_type,