← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~pappacena/launchpad:poll-for-librarian-client into launchpad:master

 

Thiago F. Pappacena has proposed merging ~pappacena/launchpad:poll-for-librarian-client into launchpad:master.

Commit message:
Using select.epoll instead of select.select on librarian client

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~pappacena/launchpad/+git/launchpad/+merge/388315
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/launchpad:poll-for-librarian-client into launchpad:master.
diff --git a/lib/lp/services/librarian/client.py b/lib/lp/services/librarian/client.py
index 4007016..877e51c 100644
--- a/lib/lp/services/librarian/client.py
+++ b/lib/lp/services/librarian/client.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 __metaclass__ = type
@@ -14,7 +14,7 @@ __all__ = [
 
 
 import hashlib
-from select import select
+import select
 import socket
 from socket import (
     SOCK_STREAM,
@@ -98,20 +98,31 @@ class FileUploadClient:
             self.state.s = socket.socket(AF_INET, SOCK_STREAM)
             self.state.s.connect((self.upload_host, self.upload_port))
             self.state.f = self.state.s.makefile('rwb', 0)
+
+            # Register epoll for the socket.
+            self.state.s_poll = select.epoll()
+            self.state.s_poll.register(self.state.s.fileno(), select.EPOLLIN)
         except socket.error as x:
             raise UploadFailed(
                 '[%s:%s]: %s' % (self.upload_host, self.upload_port, x))
 
     def _close(self):
         """Close connection"""
+        self.state.s_poll.unregister(self.state.s.fileno())
+        self.state.s_poll.close()
+        del self.state.s_poll
         del self.state.s
         del self.state.f
 
     def _checkError(self):
-        if select([self.state.s], [], [], 0)[0]:
+        poll_result = self.state.s_poll.poll(0)
+        if poll_result:
+            fileno, event = poll_result[0]
+            assert fileno == self.state.s.fileno()
+            assert event == select.EPOLLIN
             response = six.ensure_str(
                 self.state.f.readline().strip(), errors='replace')
-            raise UploadFailed('Server said: ' + response)
+            raise UploadFailed('Server said early: ' + response)
 
     def _sendLine(self, line, check_for_error_responses=True):
         self.state.f.write(six.ensure_binary(line + '\r\n'))
diff --git a/lib/lp/services/librarian/tests/test_client.py b/lib/lp/services/librarian/tests/test_client.py
index 15990b5..9b5c7e2 100644
--- a/lib/lp/services/librarian/tests/test_client.py
+++ b/lib/lp/services/librarian/tests/test_client.py
@@ -1,11 +1,13 @@
-# Copyright 2009-2016 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 from cStringIO import StringIO
 import hashlib
 import os
 import re
+import socket
 import textwrap
+import threading
 import unittest
 
 from fixtures import (
@@ -164,7 +166,40 @@ class LibrarianFileWrapperTestCase(TestCase):
         self.assertRaises(http_client.IncompleteRead, file.read, chunksize=4)
 
 
-class LibrarianClientTestCase(unittest.TestCase):
+class EchoServer(threading.Thread):
+    """Fake TCP server that only replies back the data sent to it.
+
+    This is used to test librarian server early replies with error messages
+    during the upload process.
+    """
+    def __init__(self):
+        super(EchoServer, self).__init__()
+        self.should_stop = False
+        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.socket.settimeout(1)
+        self.socket.bind(('localhost', 0))
+        self.socket.listen(1)
+
+    def join(self, *args, **kwargs):
+        self.should_stop = True
+        super(EchoServer, self).join(*args, **kwargs)
+
+    def run(self):
+        while not self.should_stop:
+            try:
+                conn, addr = self.socket.accept()
+                data = conn.recv(1024)
+                conn.sendall(data)
+            except socket.timeout:
+                # We use the timeout to control how much time we will wait
+                # to check again if self.should_stop was set, and the thread
+                # will join.
+                pass
+        conn.close()
+        self.socket.close()
+
+
+class LibrarianClientTestCase(TestCase):
     layer = LaunchpadFunctionalLayer
 
     def test_addFileSendsDatabaseName(self):
@@ -203,6 +238,21 @@ class LibrarianClientTestCase(unittest.TestCase):
         else:
             self.fail("UploadFailed not raised")
 
+    def test_addFile_fails_when_server_returns_error_msg_on_socket(self):
+        server = EchoServer()
+        server.start()
+        self.addCleanup(server.join)
+
+        upload_host, upload_port = server.socket.getsockname()
+        self.pushConfig('librarian', upload_host=upload_host)
+        self.pushConfig('librarian', upload_port=upload_port)
+
+        client = LibrarianClient()
+
+        self.assertRaisesRegex(
+            UploadFailed, 'Server said early: STORE 7 sample.txt',
+            client.addFile, 'sample.txt', 7, StringIO('sample'), 'text/plain')
+
     def test_addFile_uses_master(self):
         # addFile is a write operation, so it should always use the
         # master store, even if the slave is the default. Close the