launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #25101
[Merge] ~pappacena/launchpad:poll-for-librarian-client into launchpad:master
Thiago F. Pappacena has proposed merging ~pappacena/launchpad:poll-for-librarian-client into launchpad:master.
Commit message:
Using select.epoll instead of select.select on librarian client
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~pappacena/launchpad/+git/launchpad/+merge/388315
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/launchpad:poll-for-librarian-client into launchpad:master.
diff --git a/lib/lp/services/librarian/client.py b/lib/lp/services/librarian/client.py
index 4007016..877e51c 100644
--- a/lib/lp/services/librarian/client.py
+++ b/lib/lp/services/librarian/client.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
__metaclass__ = type
@@ -14,7 +14,7 @@ __all__ = [
import hashlib
-from select import select
+import select
import socket
from socket import (
SOCK_STREAM,
@@ -98,20 +98,31 @@ class FileUploadClient:
self.state.s = socket.socket(AF_INET, SOCK_STREAM)
self.state.s.connect((self.upload_host, self.upload_port))
self.state.f = self.state.s.makefile('rwb', 0)
+
+ # Register epoll for the socket.
+ self.state.s_poll = select.epoll()
+ self.state.s_poll.register(self.state.s.fileno(), select.EPOLLIN)
except socket.error as x:
raise UploadFailed(
'[%s:%s]: %s' % (self.upload_host, self.upload_port, x))
def _close(self):
"""Close connection"""
+ self.state.s_poll.unregister(self.state.s.fileno())
+ self.state.s_poll.close()
+ del self.state.s_poll
del self.state.s
del self.state.f
def _checkError(self):
- if select([self.state.s], [], [], 0)[0]:
+ poll_result = self.state.s_poll.poll(0)
+ if poll_result:
+ fileno, event = poll_result[0]
+ assert fileno == self.state.s.fileno()
+ assert event == select.EPOLLIN
response = six.ensure_str(
self.state.f.readline().strip(), errors='replace')
- raise UploadFailed('Server said: ' + response)
+ raise UploadFailed('Server said early: ' + response)
def _sendLine(self, line, check_for_error_responses=True):
self.state.f.write(six.ensure_binary(line + '\r\n'))
diff --git a/lib/lp/services/librarian/tests/test_client.py b/lib/lp/services/librarian/tests/test_client.py
index 15990b5..9b5c7e2 100644
--- a/lib/lp/services/librarian/tests/test_client.py
+++ b/lib/lp/services/librarian/tests/test_client.py
@@ -1,11 +1,13 @@
-# Copyright 2009-2016 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
from cStringIO import StringIO
import hashlib
import os
import re
+import socket
import textwrap
+import threading
import unittest
from fixtures import (
@@ -164,7 +166,40 @@ class LibrarianFileWrapperTestCase(TestCase):
self.assertRaises(http_client.IncompleteRead, file.read, chunksize=4)
-class LibrarianClientTestCase(unittest.TestCase):
+class EchoServer(threading.Thread):
+ """Fake TCP server that only replies back the data sent to it.
+
+ This is used to test librarian server early replies with error messages
+ during the upload process.
+ """
+ def __init__(self):
+ super(EchoServer, self).__init__()
+ self.should_stop = False
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.socket.settimeout(1)
+ self.socket.bind(('localhost', 0))
+ self.socket.listen(1)
+
+ def join(self, *args, **kwargs):
+ self.should_stop = True
+ super(EchoServer, self).join(*args, **kwargs)
+
+ def run(self):
+ while not self.should_stop:
+ try:
+ conn, addr = self.socket.accept()
+ data = conn.recv(1024)
+ conn.sendall(data)
+ except socket.timeout:
+ # We use the timeout to control how much time we will wait
+ # to check again if self.should_stop was set, and the thread
+ # will join.
+ pass
+ conn.close()
+ self.socket.close()
+
+
+class LibrarianClientTestCase(TestCase):
layer = LaunchpadFunctionalLayer
def test_addFileSendsDatabaseName(self):
@@ -203,6 +238,21 @@ class LibrarianClientTestCase(unittest.TestCase):
else:
self.fail("UploadFailed not raised")
+ def test_addFile_fails_when_server_returns_error_msg_on_socket(self):
+ server = EchoServer()
+ server.start()
+ self.addCleanup(server.join)
+
+ upload_host, upload_port = server.socket.getsockname()
+ self.pushConfig('librarian', upload_host=upload_host)
+ self.pushConfig('librarian', upload_port=upload_port)
+
+ client = LibrarianClient()
+
+ self.assertRaisesRegex(
+ UploadFailed, 'Server said early: STORE 7 sample.txt',
+ client.addFile, 'sample.txt', 7, StringIO('sample'), 'text/plain')
+
def test_addFile_uses_master(self):
# addFile is a write operation, so it should always use the
# master store, even if the slave is the default. Close the