← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:py3-puller into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:py3-puller into launchpad:master.

Commit message:
Fix lp.codehosting.puller for Python 3

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/395814

The netstring communication with the worker is now properly treated as bytes.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:py3-puller into launchpad:master.
diff --git a/lib/lp/codehosting/puller/scheduler.py b/lib/lp/codehosting/puller/scheduler.py
index 0102839..e2fbf88 100644
--- a/lib/lp/codehosting/puller/scheduler.py
+++ b/lib/lp/codehosting/puller/scheduler.py
@@ -12,9 +12,9 @@ __all__ = [
     'PullerMonitorProtocol',
     ]
 
+import io
 import os
 import socket
-from StringIO import StringIO
 
 from contrib.glock import (
     GlobalLock,
@@ -123,6 +123,11 @@ class PullerWireProtocol(NetstringReceiver):
 
     def stringReceived(self, line):
         """See `NetstringReceiver.stringReceived`."""
+        try:
+            line = line.decode('UTF-8')
+        except UnicodeDecodeError:
+            self.puller_protocol.unexpectedError(
+                failure.Failure(BadMessage(line)))
         if (self._current_command is not None
             and self._expected_args is not None):
             # state [2]
@@ -180,7 +185,7 @@ class PullerMonitorProtocol(ProcessMonitorProtocolWithTimeout,
         self.reported_mirror_finished = False
         self.listener = listener
         self.wire_protocol = PullerWireProtocol(self)
-        self._stderr = StringIO()
+        self._stderr = io.BytesIO()
         self._deferred.addCallbacks(
             self.checkReportingFinishedAndNoStderr,
             self.ensureReportingFinished)
@@ -194,7 +199,7 @@ class PullerMonitorProtocol(ProcessMonitorProtocolWithTimeout,
         When the process exits cleanly, we expect it to have not printed
         anything to stderr and to have reported success or failure.  If it has
         failed to do either of these things, we should fail noisily."""
-        stderr = self._stderr.getvalue()
+        stderr = self._stderr.getvalue().decode('UTF-8', 'replace')
         if stderr:
             fail = failure.Failure(UnexpectedStderr(stderr))
             fail.error = stderr
@@ -213,7 +218,7 @@ class PullerMonitorProtocol(ProcessMonitorProtocolWithTimeout,
         as a failure reason.
         """
         if not self.reported_mirror_finished:
-            stderr = self._stderr.getvalue()
+            stderr = self._stderr.getvalue().decode('UTF-8', 'replace')
             reason.error = stderr
             if stderr:
                 errorline = stderr.splitlines()[-1]
diff --git a/lib/lp/codehosting/puller/tests/__init__.py b/lib/lp/codehosting/puller/tests/__init__.py
index 184af86..d0e8566 100644
--- a/lib/lp/codehosting/puller/tests/__init__.py
+++ b/lib/lp/codehosting/puller/tests/__init__.py
@@ -7,10 +7,10 @@ from __future__ import absolute_import, print_function
 
 __metaclass__ = type
 
+import io
 import os
 import shutil
 import socket
-from StringIO import StringIO
 
 from breezy.tests import TestCaseWithTransport
 from breezy.tests.http_server import (
@@ -48,7 +48,7 @@ class PullerWorkerMixin:
                          policy=None):
         """Anonymous creation method for PullerWorker."""
         if protocol is None:
-            protocol = PullerWorkerProtocol(StringIO())
+            protocol = PullerWorkerProtocol(io.BytesIO())
         if branch_type is None:
             if policy is None:
                 policy = AcceptAnythingBranchMirrorerPolicy()
diff --git a/lib/lp/codehosting/puller/tests/test_acceptance.py b/lib/lp/codehosting/puller/tests/test_acceptance.py
index 3db947a..e40dc37 100644
--- a/lib/lp/codehosting/puller/tests/test_acceptance.py
+++ b/lib/lp/codehosting/puller/tests/test_acceptance.py
@@ -25,6 +25,7 @@ from breezy.urlutils import (
     )
 from breezy.workingtree import WorkingTree
 from fixtures import TempDir
+import six
 import transaction
 from zope.component import getUtility
 from zope.security.proxy import removeSecurityProxy
@@ -82,7 +83,8 @@ class TestBranchPuller(PullerBranchTestCase, LoomTestMixin):
         self.assertEqual(0, db_branch.mirror_failures)
         mirrored_branch = self.openBranchAsUser(db_branch, accessing_user)
         self.assertEqual(
-            source_branch.last_revision(), db_branch.last_mirrored_id)
+            six.ensure_text(source_branch.last_revision()),
+            db_branch.last_mirrored_id)
         self.assertEqual(
             source_branch.last_revision(), mirrored_branch.last_revision())
         self.assertEqual(
@@ -117,7 +119,8 @@ class TestBranchPuller(PullerBranchTestCase, LoomTestMixin):
         :param command: A command and arguments given as a list.
         :return: retcode, stdout, stderr
         """
-        process = Popen(command, stdout=PIPE, stderr=PIPE)
+        process = Popen(
+            command, stdout=PIPE, stderr=PIPE, universal_newlines=True)
         output, error = process.communicate()
         return process.returncode, output, error
 
diff --git a/lib/lp/codehosting/puller/tests/test_scheduler.py b/lib/lp/codehosting/puller/tests/test_scheduler.py
index f87c78f..10b0f55 100644
--- a/lib/lp/codehosting/puller/tests/test_scheduler.py
+++ b/lib/lp/codehosting/puller/tests/test_scheduler.py
@@ -151,12 +151,14 @@ class TestPullerWireProtocol(TestCase):
 
     def convertToNetstring(self, string):
         """Encode `string` as a netstring."""
-        return '%d:%s,' % (len(string), string)
+        return b'%d:%s,' % (len(string), string)
 
     def sendToProtocol(self, *arguments):
         """Send each element of `arguments` to the protocol as a netstring."""
         for argument in arguments:
-            self.protocol.dataReceived(self.convertToNetstring(str(argument)))
+            if not isinstance(argument, bytes):
+                argument = six.text_type(argument).encode('UTF-8')
+            self.protocol.dataReceived(self.convertToNetstring(argument))
 
     def assertUnexpectedErrorCalled(self, exception_type):
         """Assert that the puller protocol's unexpectedError has been called.
@@ -213,10 +215,16 @@ class TestPullerWireProtocol(TestCase):
         self.sendToProtocol('foo')
         self.assertUnexpectedErrorCalled(scheduler.BadMessage)
 
+    def test_nonUTF8Message(self):
+        # The protocol notifies the listener if it receives a line not
+        # encoded in UTF-8.
+        self.sendToProtocol(b'\x80')
+        self.assertUnexpectedErrorCalled(scheduler.BadMessage)
+
     def test_invalidNetstring(self):
         # The protocol terminates the session if it receives an unparsable
         # netstring.
-        self.protocol.dataReceived('foo')
+        self.protocol.dataReceived(b'foo')
         self.assertUnexpectedErrorCalled(NetstringParseError)
 
 
@@ -346,7 +354,7 @@ class TestPullerMonitorProtocol(ProcessTestsMixin, TestCase):
 
         self.termination_deferred.addErrback(check_failure)
 
-        self.protocol.errReceived('error message')
+        self.protocol.errReceived(b'error message')
         self.simulateProcessExit(clean=False)
 
         return assert_fails_with(
@@ -367,7 +375,7 @@ class TestPullerMonitorProtocol(ProcessTestsMixin, TestCase):
 
         self.termination_deferred.addErrback(check_failure)
 
-        self.protocol.errReceived('error message')
+        self.protocol.errReceived(b'error message')
         self.simulateProcessExit()
 
         return self.termination_deferred
@@ -385,7 +393,7 @@ class TestPullerMonitorProtocol(ProcessTestsMixin, TestCase):
         # If the subprocess exits before reporting success or failure, the
         # puller master should record failure.
         self.protocol.do_startMirroring()
-        self.protocol.errReceived('traceback')
+        self.protocol.errReceived(b'traceback')
         self.simulateProcessExit(clean=False)
         self.assertEqual(
             self.listener.calls,
@@ -410,7 +418,7 @@ class TestPullerMonitorProtocol(ProcessTestsMixin, TestCase):
 
         self.protocol.listener = FailingMirrorFailedStubPullerListener()
         self.listener = self.protocol.listener
-        self.protocol.errReceived('traceback')
+        self.protocol.errReceived(b'traceback')
         self.simulateProcessExit(clean=False)
         self.assertEqual(
             flush_logged_errors(RuntimeError), [runtime_error_failure])
@@ -441,8 +449,8 @@ class TestPullerMaster(TestCase):
         self.assertEqual('error message', oops['value'])
         self.assertEqual('RuntimeError', oops['type'])
         self.assertEqual(
-            get_canonical_url_for_branch_name(
-                self.eventHandler.unique_name), oops['url'])
+            six.ensure_binary(get_canonical_url_for_branch_name(
+                self.eventHandler.unique_name)), oops['url'])
 
     def test_startMirroring(self):
         # startMirroring does not send a message to the endpoint.
@@ -548,7 +556,8 @@ parser = OptionParser()
  branch_type_name, default_stacked_on_url) = arguments
 from breezy import branch
 branch = branch.Branch.open(destination_url)
-protocol = PullerWorkerProtocol(sys.stdout)
+stdout = getattr(sys.stdout, 'buffer', sys.stdout)
+protocol = PullerWorkerProtocol(stdout)
 """
 
 
@@ -622,7 +631,7 @@ class TestPullerMasterIntegration(PullerBranchTestCase):
                 default_format.repository_format.get_format_string())
             self.assertEqual(
                 [('branchChanged', LAUNCHPAD_SERVICES, self.db_branch.id, '',
-                  revision_id, control_string, branch_string,
+                  six.ensure_str(revision_id), control_string, branch_string,
                   repository_string)],
                 self.client.calls)
             return ignored
@@ -707,7 +716,7 @@ class TestPullerMasterIntegration(PullerBranchTestCase):
         protocol.mirrorFailed('a', 'b')
         protocol.sendEvent(
             'lock_id', branch.control_files._lock.peek().get('user'))
-        sys.stdout.flush()
+        stdout.flush()
         branch.unlock()
         """
 
@@ -791,7 +800,7 @@ class TestPullerMasterIntegration(PullerBranchTestCase):
         lock_and_wait_script = """
         branch.lock_write()
         protocol.sendEvent('branchLocked')
-        sys.stdout.flush()
+        stdout.flush()
         time.sleep(3600)
         """
 
diff --git a/lib/lp/codehosting/puller/tests/test_worker.py b/lib/lp/codehosting/puller/tests/test_worker.py
index c9dad6e..e011e54 100644
--- a/lib/lp/codehosting/puller/tests/test_worker.py
+++ b/lib/lp/codehosting/puller/tests/test_worker.py
@@ -8,7 +8,7 @@ from __future__ import absolute_import, print_function
 __metaclass__ = type
 
 import gc
-from StringIO import StringIO
+import io
 
 import breezy.branch
 from breezy.bzr.branch import BranchReferenceFormat
@@ -30,6 +30,7 @@ from breezy.url_policy_open import (
     BranchOpener,
     BranchOpenPolicy,
     )
+import six
 
 from lp.code.enums import BranchType
 from lp.codehosting.puller.tests import (
@@ -57,16 +58,16 @@ from lp.testing.factory import (
 def get_netstrings(line):
     """Parse `line` as a sequence of netstrings.
 
-    :return: A list of strings.
+    :return: A list of byte strings.
     """
     strings = []
     while len(line) > 0:
-        colon_index = line.find(':')
+        colon_index = line.find(b':')
         length = int(line[:colon_index])
         strings.append(line[(colon_index + 1):(colon_index + 1 + length)])
-        assert ',' == line[colon_index + 1 + length], (
-            'Expected %r == %r' % (',', line[colon_index + 1 + length]))
-        line = line[colon_index + length + 2:]
+        line = line[colon_index + 1 + length:]
+        assert b',' == line[:1], 'Expected %r == %r' % (b',', line[:1])
+        line = line[1:]
     return strings
 
 
@@ -229,15 +230,15 @@ class TestPullerWorker(TestCaseWithTransport, PullerWorkerMixin):
 
     def getStackedOnUrlFromNetStringOutput(self, netstring_output):
         netstrings = get_netstrings(netstring_output)
-        branchChanged_index = netstrings.index('branchChanged')
-        return netstrings[branchChanged_index + 2]
+        branchChanged_index = netstrings.index(b'branchChanged')
+        return six.ensure_text(netstrings[branchChanged_index + 2])
 
     def testSendsStackedInfo(self):
         # When the puller worker stacks a branch, it reports the stacked on
         # URL to the master.
         base_branch = self.make_branch('base_branch', format='1.9')
         stacked_branch = self.make_branch('stacked-branch', format='1.9')
-        protocol_output = StringIO()
+        protocol_output = io.BytesIO()
         to_mirror = self.makePullerWorker(
             stacked_branch.base, self.get_url('destdir'),
             protocol=PullerWorkerProtocol(protocol_output),
@@ -251,7 +252,7 @@ class TestPullerWorker(TestCaseWithTransport, PullerWorkerMixin):
         # Mirroring an unstackable branch sends '' as the stacked-on location
         # to the master.
         source_branch = self.make_branch('source-branch', format='pack-0.92')
-        protocol_output = StringIO()
+        protocol_output = io.BytesIO()
         to_mirror = self.makePullerWorker(
             source_branch.base, self.get_url('destdir'),
             protocol=PullerWorkerProtocol(protocol_output))
@@ -264,7 +265,7 @@ class TestPullerWorker(TestCaseWithTransport, PullerWorkerMixin):
         # Mirroring a non-stacked branch sends '' as the stacked-on location
         # to the master.
         source_branch = self.make_branch('source-branch', format='1.9')
-        protocol_output = StringIO()
+        protocol_output = io.BytesIO()
         to_mirror = self.makePullerWorker(
             source_branch.base, self.get_url('destdir'),
             protocol=PullerWorkerProtocol(protocol_output))
@@ -296,7 +297,7 @@ class TestReferenceOpener(TestCaseWithTransport):
         branch_reference_format = BranchReferenceFormat()
         branch_transport = a_bzrdir.get_branch_transport(
             branch_reference_format)
-        branch_transport.put_bytes('location', url)
+        branch_transport.put_bytes('location', six.ensure_binary(url))
         branch_transport.put_bytes(
             'format', branch_reference_format.get_format_string())
         return a_bzrdir.root_transport.base
@@ -431,7 +432,7 @@ class TestWorkerProtocol(TestCaseInTempDir, PullerWorkerMixin):
 
     def setUp(self):
         TestCaseInTempDir.setUp(self)
-        self.output = StringIO()
+        self.output = io.BytesIO()
         self.protocol = PullerWorkerProtocol(self.output)
         self.factory = ObjectFactory()
 
@@ -443,7 +444,8 @@ class TestWorkerProtocol(TestCaseInTempDir, PullerWorkerMixin):
     def resetBuffers(self):
         # Empty the test output and error buffers.
         self.output.truncate(0)
-        self.assertEqual('', self.output.getvalue())
+        self.output.seek(0)
+        self.assertEqual(b'', self.output.getvalue())
 
     def test_nothingSentOnConstruction(self):
         # The protocol sends nothing until it receives an event.
@@ -453,15 +455,15 @@ class TestWorkerProtocol(TestCaseInTempDir, PullerWorkerMixin):
     def test_startMirror(self):
         # Calling startMirroring sends 'startMirroring' as a netstring.
         self.protocol.startMirroring()
-        self.assertSentNetstrings(['startMirroring', '0'])
+        self.assertSentNetstrings([b'startMirroring', b'0'])
 
     def test_branchChanged(self):
         # Calling 'branchChanged' sends the arguments.
-        arbitrary_args = [self.factory.getUniqueString() for x in range(6)]
+        arbitrary_args = [self.factory.getUniqueBytes() for x in range(6)]
         self.protocol.startMirroring()
         self.resetBuffers()
         self.protocol.branchChanged(*arbitrary_args)
-        self.assertSentNetstrings(['branchChanged', '6'] + arbitrary_args)
+        self.assertSentNetstrings([b'branchChanged', b'6'] + arbitrary_args)
 
     def test_mirrorFailed(self):
         # Calling 'mirrorFailed' sends the error message.
@@ -469,19 +471,19 @@ class TestWorkerProtocol(TestCaseInTempDir, PullerWorkerMixin):
         self.resetBuffers()
         self.protocol.mirrorFailed('Error Message', 'OOPS')
         self.assertSentNetstrings(
-            ['mirrorFailed', '2', 'Error Message', 'OOPS'])
+            [b'mirrorFailed', b'2', b'Error Message', b'OOPS'])
 
     def test_progressMade(self):
         # Calling 'progressMade' sends an arbitrary string indicating
         # progress.
         self.protocol.progressMade('test')
-        self.assertSentNetstrings(['progressMade', '0'])
+        self.assertSentNetstrings([b'progressMade', b'0'])
 
     def test_log(self):
         # Calling 'log' sends 'log' as a netstring and its arguments, after
         # formatting as a string.
         self.protocol.log('logged %s', 'message')
-        self.assertSentNetstrings(['log', '1', 'logged message'])
+        self.assertSentNetstrings([b'log', b'1', b'logged message'])
 
 
 class TestWorkerProgressReporting(TestCaseWithTransport):
diff --git a/lib/lp/codehosting/puller/worker.py b/lib/lp/codehosting/puller/worker.py
index c0b426f..ce7a904 100644
--- a/lib/lp/codehosting/puller/worker.py
+++ b/lib/lp/codehosting/puller/worker.py
@@ -110,13 +110,16 @@ class PullerWorkerProtocol:
         self.out_stream = output
 
     def sendNetstring(self, string):
-        self.out_stream.write('%d:%s,' % (len(string), string))
+        self.out_stream.write(
+            b'%d:%s,' % (len(string), six.ensure_binary(string)))
 
     def sendEvent(self, command, *args):
         self.sendNetstring(command)
         self.sendNetstring(str(len(args)))
         for argument in args:
-            self.sendNetstring(str(argument))
+            if not isinstance(argument, bytes):
+                argument = six.text_type(argument).encode('UTF-8')
+            self.sendNetstring(argument)
 
     def startMirroring(self):
         self.sendEvent('startMirroring')
@@ -163,7 +166,7 @@ class BranchMirrorerPolicy(BranchOpenPolicy):
             # Looms suck.
             revision_id = None
         else:
-            revision_id = 'null:'
+            revision_id = b'null:'
         source_branch.controldir.clone_on_transport(
             dest_transport, revision_id=revision_id)
         return Branch.open(destination_url)
diff --git a/scripts/mirror-branch.py b/scripts/mirror-branch.py
index 8aa9ff7..69f8b33 100755
--- a/scripts/mirror-branch.py
+++ b/scripts/mirror-branch.py
@@ -72,7 +72,8 @@ if __name__ == '__main__':
 
     resource.setrlimit(resource.RLIMIT_AS, (1500000000, 1500000000))
 
-    protocol = PullerWorkerProtocol(sys.stdout)
+    # The worker outputs netstrings, which are bytes.
+    protocol = PullerWorkerProtocol(getattr(sys.stdout, 'buffer', sys.stdout))
     install_worker_ui_factory(protocol)
     PullerWorker(
         source_url, destination_url, int(branch_id), unique_name, branch_type,