launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #18092
[Merge] lp:~cjwatson/launchpad/timeout-with-requests into lp:launchpad
Colin Watson has proposed merging lp:~cjwatson/launchpad/timeout-with-requests into lp:launchpad.
Commit message:
Convert lp.services.timeout to use requests rather than urllib2.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/timeout-with-requests/+merge/252570
Convert lp.services.timeout to use requests rather than urllib2.
This is far more difficult than it ought to be: ideally none of this Cleanable* edifice would be needed in the first place, and requests/urllib3 makes us dig through several layers to get to the actual socket. There seems to be no reasonable way to implement this without reimplementing a couple of methods from requests, so this has to be checked when we upgrade to new versions.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/timeout-with-requests into lp:launchpad.
=== modified file 'lib/lp/services/googlesearch/doc/google-searchservice.txt'
--- lib/lp/services/googlesearch/doc/google-searchservice.txt 2011-12-30 08:13:14 +0000
+++ lib/lp/services/googlesearch/doc/google-searchservice.txt 2015-03-11 12:15:57 +0000
@@ -6,6 +6,12 @@
(cs-be) client. Given one or more terms, it will retrieve an XML
summary of the matching launchpad.net pages.
+We silence logging of new HTTP connections from requests throughout.
+
+ >>> from fixtures import FakeLogger
+ >>> logger = FakeLogger()
+ >>> logger.setUp()
+
GoogleSearchService
===================
@@ -616,3 +622,5 @@
# Restore the configuration and the timeout state.
>>> timeout_data = config.pop('timeout_data')
>>> set_default_timeout_function(old_timeout_function)
+
+ >>> logger.cleanUp()
=== modified file 'lib/lp/services/tests/test_timeout.py'
--- lib/lp/services/tests/test_timeout.py 2012-06-28 16:36:49 +0000
+++ lib/lp/services/tests/test_timeout.py 2015-03-11 12:15:57 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012 Canonical Ltd. This software is licensed under the
+# Copyright 2012-2015 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""timeout.py tests.
@@ -6,7 +6,6 @@
__metaclass__ = type
-from cStringIO import StringIO
from SimpleXMLRPCServer import (
SimpleXMLRPCRequestHandler,
SimpleXMLRPCServer,
@@ -14,13 +13,13 @@
import socket
from textwrap import dedent
import threading
-import time
-import urllib2
import xmlrpclib
-from zope.interface import implements
+from requests.exceptions import (
+ ConnectionError,
+ InvalidSchema,
+ )
-from lp.services.log.logger import FakeLogger
from lp.services.timeout import (
get_default_timeout_function,
set_default_timeout_function,
@@ -30,11 +29,11 @@
with_timeout,
)
from lp.testing import TestCase
-from lp.testing.layers import BaseLayer
@with_timeout()
-def no_default_timeout(): pass
+def no_default_timeout():
+ pass
class EchoOrWaitXMLRPCReqHandler(SimpleXMLRPCRequestHandler):
@@ -54,10 +53,12 @@
class MySimpleXMLRPCServer(SimpleXMLRPCServer):
"""Create a simple XMLRPC server to listen for requests."""
allow_reuse_address = True
+
def serve_2_requests(self):
for i in range(2):
self.handle_request()
self.server_close()
+
def handle_error(self, request, address):
pass
@@ -69,11 +70,13 @@
finishes before the supplied timeout, it should function normally.
"""
wait_evt = threading.Event()
+
@with_timeout(timeout=0.5)
def wait_100ms():
"""Function that waits for a supplied number of seconds."""
wait_evt.wait(0.1)
return "Succeeded."
+
self.assertEqual("Succeeded.", wait_100ms())
def test_timeout_overrun(self):
@@ -87,11 +90,13 @@
# inform us that it is about to exit.
wait_evt = threading.Event()
stopping_evt = threading.Event()
+
@with_timeout(timeout=0.5)
def wait_for_event():
"""Function that waits for a supplied number of seconds."""
wait_evt.wait()
stopping_evt.set()
+
self.assertRaises(TimeoutError, wait_for_event)
wait_evt.set()
stopping_evt.wait()
@@ -115,6 +120,7 @@
socket.setdefaulttimeout(5)
sockets = socket.socketpair()
closed = []
+
def close_socket():
closed.append(True)
sockets[0].shutdown(socket.SHUT_RDWR)
@@ -152,16 +158,17 @@
method."""
def do_definition():
@with_timeout(cleanup='not_a_method', timeout=0.5)
- def a_function(): pass
+ def a_function():
+ pass
self.assertRaises(TypeError, do_definition)
def test_timeout_uses_default(self):
- """If the timeout parameter isn't provided, it will default to the value
- returned by the function installed as "default_timeout_function". A
- function is used because it's useful for the timeout value to be
- determined dynamically. For example, if you want to limit the
- overall processing to 30s and you already did 14s, you want that timeout
- to be 16s.
+ """If the timeout parameter isn't provided, it will default to the
+ value returned by the function installed as
+ "default_timeout_function". A function is used because it's useful
+ for the timeout value to be determined dynamically. For example, if
+ you want to limit the overall processing to 30s and you already did
+ 14s, you want that timeout to be 16s.
By default, there is no default_timeout_function.
"""
@@ -177,23 +184,25 @@
str(e))
def test_set_default_timeout(self):
- """the set_default_timeout_function() takes a function that should return
- the number of seconds to wait.
+ """The set_default_timeout_function() takes a function that should
+ return the number of seconds to wait.
"""
using_default = []
+
def my_default_timeout():
using_default.append(True)
return 1
+
set_default_timeout_function(my_default_timeout)
self.addCleanup(set_default_timeout_function, None)
no_default_timeout()
self.assertEqual([True], using_default)
def make_test_socket(self):
- """One common use case for timing out is when making an HTTP request to
- an external site to fetch content. To this end, the timeout module has
- a urlfetch() function that retrieve a URL using custom urllib2 handlers
- that will timeout using the default timeout function and clean-up the
+ """One common use case for timing out is when making an HTTP request
+ to an external site to fetch content. To this end, the timeout
+ module has a urlfetch() function that retrieves a URL in such a way
+ as to timeout using the default timeout function and clean-up the
socket properly.
"""
sock = socket.socket()
@@ -206,11 +215,11 @@
http_server_url = 'http://%s:%d/' % sock.getsockname()
return sock, http_server_url
- def test_urlfetch_raises_urllib2_exceptions(self):
- """Normal urllib2 exceptions are raised."""
+ def test_urlfetch_raises_requests_exceptions(self):
+ """Normal requests exceptions are raised."""
sock, http_server_url = self.make_test_socket()
- e = self.assertRaises(urllib2.URLError, urlfetch, http_server_url)
+ e = self.assertRaises(ConnectionError, urlfetch, http_server_url)
self.assertIn('Connection refused', str(e))
def test_urlfetch_timeout_after_listen(self):
@@ -237,6 +246,7 @@
sock, http_server_url = self.make_test_socket()
sock.listen(1)
stop_event = threading.Event()
+
def slow_reply():
(client_sock, client_addr) = sock.accept()
content = 'You are veeeeryyy patient!'
@@ -252,12 +262,15 @@
if stop_event.wait(0.05):
break
client_sock.close()
+
slow_thread = threading.Thread(target=slow_reply)
slow_thread.start()
saved_threads = set(threading.enumerate())
self.assertRaises(TimeoutError, urlfetch, http_server_url)
- # Note that the cleanup also takes care of leaving no worker thread behind.
- remaining_threads = set(threading.enumerate()).difference(saved_threads)
+ # Note that the cleanup also takes care of leaving no worker thread
+ # behind.
+ remaining_threads = set(
+ threading.enumerate()).difference(saved_threads)
self.assertEqual(set(), remaining_threads)
stop_event.set()
slow_thread.join()
@@ -266,6 +279,7 @@
"""When the request succeeds, the result content is returned."""
sock, http_server_url = self.make_test_socket()
sock.listen(1)
+
def success_result():
(client_sock, client_addr) = sock.accept()
client_sock.sendall(dedent("""\
@@ -275,17 +289,20 @@
Success."""))
client_sock.close()
+
t = threading.Thread(target=success_result)
t.start()
self.assertEqual('Success.', urlfetch(http_server_url))
t.join()
- def test_urlfetch_only_supports_http_urls(self):
- """urlfetch() only supports http urls:"""
+ def test_urlfetch_does_not_support_ftp_urls(self):
+ """urlfetch() does not supports ftp urls."""
set_default_timeout_function(lambda: 1)
self.addCleanup(set_default_timeout_function, None)
- e = self.assertRaises(AssertionError, urlfetch, 'ftp://localhost')
- self.assertEqual('only http is supported.', str(e))
+ url = 'ftp://localhost/'
+ e = self.assertRaises(InvalidSchema, urlfetch, url)
+ self.assertEqual(
+ "No connection adapters were found for '%s'" % url, str(e))
def test_xmlrpc_transport(self):
""" Another use case for timeouts is communicating with external
=== modified file 'lib/lp/services/timeout.py'
--- lib/lp/services/timeout.py 2014-08-29 01:41:14 +0000
+++ lib/lp/services/timeout.py 2015-03-11 12:15:57 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2015 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Helpers to time out external operations."""
@@ -14,16 +14,32 @@
"with_timeout",
]
-import httplib
import socket
import sys
-from threading import Thread
-import urllib2
+from threading import (
+ Lock,
+ Thread,
+ )
from xmlrpclib import (
SafeTransport,
Transport,
)
+from requests import Session
+from requests.adapters import (
+ DEFAULT_POOLBLOCK,
+ HTTPAdapter,
+ )
+from requests.packages.urllib3.connectionpool import (
+ HTTPConnectionPool,
+ HTTPSConnectionPool,
+ )
+from requests.packages.urllib3.exceptions import ClosedPoolError
+from requests.packages.urllib3.poolmanager import (
+ PoolManager,
+ SSL_KEYWORDS,
+ )
+
default_timeout_function = None
@@ -145,47 +161,113 @@
return call_with_timeout
-class CleanableHTTPHandler(urllib2.HTTPHandler):
- """Subclass of `urllib2.HTTPHandler` that can be cleaned-up."""
-
- def http_open(self, req):
- """See `urllib2.HTTPHandler`."""
- def connection_factory(*args, **kwargs):
- """Save the created connection so that we can clean it up."""
- self.__conn = httplib.HTTPConnection(*args, **kwargs)
- return self.__conn
- return self.do_open(connection_factory, req)
-
- def reset_connection(self):
- """Reset the underlying HTTP connection."""
- try:
- self.__conn.sock.shutdown(socket.SHUT_RDWR)
- except AttributeError:
- # It's possible that the other thread closed the socket
- # beforehand.
- pass
- self.__conn.close()
+class CleanableConnectionPoolMixin:
+ """Enhance urllib3's connection pools to support forced socket cleanup."""
+
+ def __init__(self, *args, **kwargs):
+ super(CleanableConnectionPoolMixin, self).__init__(*args, **kwargs)
+ self._all_connections = []
+ self._all_connections_mutex = Lock()
+
+ def _new_conn(self):
+ self._all_connections_mutex.acquire()
+ try:
+ if self._all_connections is None:
+ raise ClosedPoolError(self, "Pool is closed.")
+ conn = super(CleanableConnectionPoolMixin, self)._new_conn()
+ self._all_connections.append(conn)
+ return conn
+ finally:
+ self._all_connections_mutex.release()
+
+ def close(self):
+ self._all_connections_mutex.acquire()
+ try:
+ if self._all_connections is None:
+ return
+ for conn in self._all_connections:
+ sock = getattr(conn, "sock", None)
+ if sock is not None:
+ sock.shutdown(socket.SHUT_RDWR)
+ sock.close()
+ conn.sock = None
+ self._all_connections = None
+ finally:
+ self._all_connections_mutex.release()
+ super(CleanableConnectionPoolMixin, self).close()
+
+
+class CleanableHTTPConnectionPool(
+ CleanableConnectionPoolMixin, HTTPConnectionPool):
+ pass
+
+
+class CleanableHTTPSConnectionPool(
+ CleanableConnectionPoolMixin, HTTPSConnectionPool):
+ pass
+
+
+cleanable_pool_classes_by_scheme = {
+ "http": CleanableHTTPConnectionPool,
+ "https": CleanableHTTPSConnectionPool,
+ }
+
+
+class CleanablePoolManager(PoolManager):
+ """A version of urllib3's PoolManager supporting forced socket cleanup."""
+
+ # XXX cjwatson 2015-03-11: Reimplements PoolManager._new_pool; check
+ # this when upgrading requests.
+ def _new_pool(self, scheme, host, port):
+ if scheme not in cleanable_pool_classes_by_scheme:
+ raise ValueError("Unhandled scheme: %s" % scheme)
+ pool_cls = cleanable_pool_classes_by_scheme[scheme]
+ if scheme == 'http':
+ kwargs = self.connection_pool_kw.copy()
+ for kw in SSL_KEYWORDS:
+ kwargs.pop(kw, None)
+
+ return pool_cls(host, port, **kwargs)
+
+
+class CleanableHTTPAdapter(HTTPAdapter):
+ """Enhance HTTPAdapter to use CleanablePoolManager."""
+
+ # XXX cjwatson 2015-03-11: Reimplements HTTPAdapter.init_poolmanager;
+ # check this when upgrading requests.
+ def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK):
+ # save these values for pickling
+ self._pool_connections = connections
+ self._pool_maxsize = maxsize
+ self._pool_block = block
+
+ self.poolmanager = CleanablePoolManager(
+ num_pools=connections, maxsize=maxsize, block=block)
class URLFetcher:
"""Object fetching remote URLs with a time out."""
@with_timeout(cleanup='cleanup')
- def fetch(self, url, data=None):
+ def fetch(self, url, **request_kwargs):
"""Fetch the URL using a custom HTTP handler supporting timeout."""
- assert url.startswith('http://'), "only http is supported."
- self.handler = CleanableHTTPHandler()
- opener = urllib2.build_opener(self.handler)
- return opener.open(url, data).read()
+ request_kwargs.setdefault("method", "GET")
+ self.session = Session()
+ # Don't honour things like environment proxy configuration.
+ self.session.trust_env = False
+ # Mount our custom adapters.
+ self.session.mount("https://", CleanableHTTPAdapter())
+ self.session.mount("http://", CleanableHTTPAdapter())
+ return self.session.request(url=url, **request_kwargs).content
def cleanup(self):
"""Reset the connection when the operation timed out."""
- self.handler.reset_connection()
-
-
-def urlfetch(url, data=None):
- """Wrapper for `urllib2.urlopen()` that times out."""
- return URLFetcher().fetch(url, data)
+ self.session.close()
+
+
+def urlfetch(url, **request_kwargs):
+ """Wrapper for `requests.get()` that times out."""
+ return URLFetcher().fetch(url, **request_kwargs)
class TransportWithTimeout(Transport):
Follow ups