← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/launchpad/timeout-with-requests into lp:launchpad

 

Colin Watson has proposed merging lp:~cjwatson/launchpad/timeout-with-requests into lp:launchpad.

Commit message:
Convert lp.services.timeout to use requests rather than urllib2.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/timeout-with-requests/+merge/252570

Convert lp.services.timeout to use requests rather than urllib2.

This is far more difficult than it ought to be: ideally none of this Cleanable* edifice would be needed in the first place, and requests/urllib3 makes us dig through several layers to get to the actual socket.  There seems to be no reasonable way to implement this without reimplementing a couple of methods from requests, so this has to be checked when we upgrade to new versions.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/timeout-with-requests into lp:launchpad.
=== modified file 'lib/lp/services/googlesearch/doc/google-searchservice.txt'
--- lib/lp/services/googlesearch/doc/google-searchservice.txt	2011-12-30 08:13:14 +0000
+++ lib/lp/services/googlesearch/doc/google-searchservice.txt	2015-03-11 12:15:57 +0000
@@ -6,6 +6,12 @@
 (cs-be) client. Given one or more terms, it will retrieve an XML
 summary of the matching launchpad.net pages.
 
+We silence logging of new HTTP connections from requests throughout.
+
+    >>> from fixtures import FakeLogger
+    >>> logger = FakeLogger()
+    >>> logger.setUp()
+
 
 GoogleSearchService
 ===================
@@ -616,3 +622,5 @@
     # Restore the configuration and the timeout state.
     >>> timeout_data = config.pop('timeout_data')
     >>> set_default_timeout_function(old_timeout_function)
+
+    >>> logger.cleanUp()

=== modified file 'lib/lp/services/tests/test_timeout.py'
--- lib/lp/services/tests/test_timeout.py	2012-06-28 16:36:49 +0000
+++ lib/lp/services/tests/test_timeout.py	2015-03-11 12:15:57 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# Copyright 2012-2015 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """timeout.py tests.
@@ -6,7 +6,6 @@
 
 __metaclass__ = type
 
-from cStringIO import StringIO
 from SimpleXMLRPCServer import (
     SimpleXMLRPCRequestHandler,
     SimpleXMLRPCServer,
@@ -14,13 +13,13 @@
 import socket
 from textwrap import dedent
 import threading
-import time
-import urllib2
 import xmlrpclib
 
-from zope.interface import implements
+from requests.exceptions import (
+    ConnectionError,
+    InvalidSchema,
+    )
 
-from lp.services.log.logger import FakeLogger
 from lp.services.timeout import (
     get_default_timeout_function,
     set_default_timeout_function,
@@ -30,11 +29,11 @@
     with_timeout,
     )
 from lp.testing import TestCase
-from lp.testing.layers import BaseLayer
 
 
 @with_timeout()
-def no_default_timeout(): pass
+def no_default_timeout():
+    pass
 
 
 class EchoOrWaitXMLRPCReqHandler(SimpleXMLRPCRequestHandler):
@@ -54,10 +53,12 @@
 class MySimpleXMLRPCServer(SimpleXMLRPCServer):
     """Create a simple XMLRPC server to listen for requests."""
     allow_reuse_address = True
+
     def serve_2_requests(self):
         for i in range(2):
             self.handle_request()
         self.server_close()
+
     def handle_error(self, request, address):
         pass
 
@@ -69,11 +70,13 @@
         finishes before the supplied timeout, it should function normally.
         """
         wait_evt = threading.Event()
+
         @with_timeout(timeout=0.5)
         def wait_100ms():
             """Function that waits for a supplied number of seconds."""
             wait_evt.wait(0.1)
             return "Succeeded."
+
         self.assertEqual("Succeeded.", wait_100ms())
 
     def test_timeout_overrun(self):
@@ -87,11 +90,13 @@
         # inform us that it is about to exit.
         wait_evt = threading.Event()
         stopping_evt = threading.Event()
+
         @with_timeout(timeout=0.5)
         def wait_for_event():
             """Function that waits for a supplied number of seconds."""
             wait_evt.wait()
             stopping_evt.set()
+
         self.assertRaises(TimeoutError, wait_for_event)
         wait_evt.set()
         stopping_evt.wait()
@@ -115,6 +120,7 @@
         socket.setdefaulttimeout(5)
         sockets = socket.socketpair()
         closed = []
+
         def close_socket():
             closed.append(True)
             sockets[0].shutdown(socket.SHUT_RDWR)
@@ -152,16 +158,17 @@
         method."""
         def do_definition():
             @with_timeout(cleanup='not_a_method', timeout=0.5)
-            def a_function(): pass
+            def a_function():
+                pass
         self.assertRaises(TypeError, do_definition)
 
     def test_timeout_uses_default(self):
-        """If the timeout parameter isn't provided, it will default to the value
-        returned by the function installed as "default_timeout_function". A
-        function is used because it's useful for the timeout value to be
-        determined dynamically. For example, if you want to limit the
-        overall processing to 30s and you already did 14s, you want that timeout
-        to be 16s.
+        """If the timeout parameter isn't provided, it will default to the
+        value returned by the function installed as
+        "default_timeout_function". A function is used because it's useful
+        for the timeout value to be determined dynamically. For example, if
+        you want to limit the overall processing to 30s and you already did
+        14s, you want that timeout to be 16s.
 
         By default, there is no default_timeout_function.
         """
@@ -177,23 +184,25 @@
             str(e))
 
     def test_set_default_timeout(self):
-        """the set_default_timeout_function() takes a function that should return
-        the number of seconds to wait.
+        """The set_default_timeout_function() takes a function that should
+        return the number of seconds to wait.
         """
         using_default = []
+
         def my_default_timeout():
             using_default.append(True)
             return 1
+
         set_default_timeout_function(my_default_timeout)
         self.addCleanup(set_default_timeout_function, None)
         no_default_timeout()
         self.assertEqual([True], using_default)
 
     def make_test_socket(self):
-        """One common use case for timing out is when making an HTTP request to
-        an external site to fetch content. To this end, the timeout module has
-        a urlfetch() function that retrieve a URL using custom urllib2 handlers
-        that will timeout using the default timeout function and clean-up the
+        """One common use case for timing out is when making an HTTP request
+        to an external site to fetch content. To this end, the timeout
+        module has a urlfetch() function that retrieves a URL in such a way
+        as to timeout using the default timeout function and clean-up the
         socket properly.
         """
         sock = socket.socket()
@@ -206,11 +215,11 @@
         http_server_url = 'http://%s:%d/' % sock.getsockname()
         return sock, http_server_url
 
-    def test_urlfetch_raises_urllib2_exceptions(self):
-        """Normal urllib2 exceptions are raised."""
+    def test_urlfetch_raises_requests_exceptions(self):
+        """Normal requests exceptions are raised."""
         sock, http_server_url = self.make_test_socket()
 
-        e = self.assertRaises(urllib2.URLError, urlfetch, http_server_url)
+        e = self.assertRaises(ConnectionError, urlfetch, http_server_url)
         self.assertIn('Connection refused', str(e))
 
     def test_urlfetch_timeout_after_listen(self):
@@ -237,6 +246,7 @@
         sock, http_server_url = self.make_test_socket()
         sock.listen(1)
         stop_event = threading.Event()
+
         def slow_reply():
             (client_sock, client_addr) = sock.accept()
             content = 'You are veeeeryyy patient!'
@@ -252,12 +262,15 @@
                 if stop_event.wait(0.05):
                     break
             client_sock.close()
+
         slow_thread = threading.Thread(target=slow_reply)
         slow_thread.start()
         saved_threads = set(threading.enumerate())
         self.assertRaises(TimeoutError, urlfetch, http_server_url)
-        # Note that the cleanup also takes care of leaving no worker thread behind.
-        remaining_threads = set(threading.enumerate()).difference(saved_threads)
+        # Note that the cleanup also takes care of leaving no worker thread
+        # behind.
+        remaining_threads = set(
+            threading.enumerate()).difference(saved_threads)
         self.assertEqual(set(), remaining_threads)
         stop_event.set()
         slow_thread.join()
@@ -266,6 +279,7 @@
         """When the request succeeds, the result content is returned."""
         sock, http_server_url = self.make_test_socket()
         sock.listen(1)
+
         def success_result():
             (client_sock, client_addr) = sock.accept()
             client_sock.sendall(dedent("""\
@@ -275,17 +289,20 @@
 
                 Success."""))
             client_sock.close()
+
         t = threading.Thread(target=success_result)
         t.start()
         self.assertEqual('Success.', urlfetch(http_server_url))
         t.join()
 
-    def test_urlfetch_only_supports_http_urls(self):
-        """urlfetch() only supports http urls:"""
+    def test_urlfetch_does_not_support_ftp_urls(self):
+        """urlfetch() does not supports ftp urls."""
         set_default_timeout_function(lambda: 1)
         self.addCleanup(set_default_timeout_function, None)
-        e = self.assertRaises(AssertionError, urlfetch, 'ftp://localhost')
-        self.assertEqual('only http is supported.', str(e))
+        url = 'ftp://localhost/'
+        e = self.assertRaises(InvalidSchema, urlfetch, url)
+        self.assertEqual(
+            "No connection adapters were found for '%s'" % url, str(e))
 
     def test_xmlrpc_transport(self):
         """ Another use case for timeouts is communicating with external

=== modified file 'lib/lp/services/timeout.py'
--- lib/lp/services/timeout.py	2014-08-29 01:41:14 +0000
+++ lib/lp/services/timeout.py	2015-03-11 12:15:57 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2015 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Helpers to time out external operations."""
@@ -14,16 +14,32 @@
     "with_timeout",
     ]
 
-import httplib
 import socket
 import sys
-from threading import Thread
-import urllib2
+from threading import (
+    Lock,
+    Thread,
+    )
 from xmlrpclib import (
     SafeTransport,
     Transport,
     )
 
+from requests import Session
+from requests.adapters import (
+    DEFAULT_POOLBLOCK,
+    HTTPAdapter,
+    )
+from requests.packages.urllib3.connectionpool import (
+    HTTPConnectionPool,
+    HTTPSConnectionPool,
+    )
+from requests.packages.urllib3.exceptions import ClosedPoolError
+from requests.packages.urllib3.poolmanager import (
+    PoolManager,
+    SSL_KEYWORDS,
+    )
+
 
 default_timeout_function = None
 
@@ -145,47 +161,113 @@
         return call_with_timeout
 
 
-class CleanableHTTPHandler(urllib2.HTTPHandler):
-    """Subclass of `urllib2.HTTPHandler` that can be cleaned-up."""
-
-    def http_open(self, req):
-        """See `urllib2.HTTPHandler`."""
-        def connection_factory(*args, **kwargs):
-            """Save the created connection so that we can clean it up."""
-            self.__conn = httplib.HTTPConnection(*args, **kwargs)
-            return self.__conn
-        return self.do_open(connection_factory, req)
-
-    def reset_connection(self):
-        """Reset the underlying HTTP connection."""
-        try:
-            self.__conn.sock.shutdown(socket.SHUT_RDWR)
-        except AttributeError:
-            # It's possible that the other thread closed the socket
-            # beforehand.
-            pass
-        self.__conn.close()
+class CleanableConnectionPoolMixin:
+    """Enhance urllib3's connection pools to support forced socket cleanup."""
+
+    def __init__(self, *args, **kwargs):
+        super(CleanableConnectionPoolMixin, self).__init__(*args, **kwargs)
+        self._all_connections = []
+        self._all_connections_mutex = Lock()
+
+    def _new_conn(self):
+        self._all_connections_mutex.acquire()
+        try:
+            if self._all_connections is None:
+                raise ClosedPoolError(self, "Pool is closed.")
+            conn = super(CleanableConnectionPoolMixin, self)._new_conn()
+            self._all_connections.append(conn)
+            return conn
+        finally:
+            self._all_connections_mutex.release()
+
+    def close(self):
+        self._all_connections_mutex.acquire()
+        try:
+            if self._all_connections is None:
+                return
+            for conn in self._all_connections:
+                sock = getattr(conn, "sock", None)
+                if sock is not None:
+                    sock.shutdown(socket.SHUT_RDWR)
+                    sock.close()
+                    conn.sock = None
+            self._all_connections = None
+        finally:
+            self._all_connections_mutex.release()
+        super(CleanableConnectionPoolMixin, self).close()
+
+
+class CleanableHTTPConnectionPool(
+    CleanableConnectionPoolMixin, HTTPConnectionPool):
+    pass
+
+
+class CleanableHTTPSConnectionPool(
+    CleanableConnectionPoolMixin, HTTPSConnectionPool):
+    pass
+
+
+cleanable_pool_classes_by_scheme = {
+    "http": CleanableHTTPConnectionPool,
+    "https": CleanableHTTPSConnectionPool,
+    }
+
+
+class CleanablePoolManager(PoolManager):
+    """A version of urllib3's PoolManager supporting forced socket cleanup."""
+
+    # XXX cjwatson 2015-03-11: Reimplements PoolManager._new_pool; check
+    # this when upgrading requests.
+    def _new_pool(self, scheme, host, port):
+        if scheme not in cleanable_pool_classes_by_scheme:
+            raise ValueError("Unhandled scheme: %s" % scheme)
+        pool_cls = cleanable_pool_classes_by_scheme[scheme]
+        if scheme == 'http':
+            kwargs = self.connection_pool_kw.copy()
+            for kw in SSL_KEYWORDS:
+                kwargs.pop(kw, None)
+
+        return pool_cls(host, port, **kwargs)
+
+
+class CleanableHTTPAdapter(HTTPAdapter):
+    """Enhance HTTPAdapter to use CleanablePoolManager."""
+
+    # XXX cjwatson 2015-03-11: Reimplements HTTPAdapter.init_poolmanager;
+    # check this when upgrading requests.
+    def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK):
+        # save these values for pickling
+        self._pool_connections = connections
+        self._pool_maxsize = maxsize
+        self._pool_block = block
+
+        self.poolmanager = CleanablePoolManager(
+            num_pools=connections, maxsize=maxsize, block=block)
 
 
 class URLFetcher:
     """Object fetching remote URLs with a time out."""
 
     @with_timeout(cleanup='cleanup')
-    def fetch(self, url, data=None):
+    def fetch(self, url, **request_kwargs):
         """Fetch the URL using a custom HTTP handler supporting timeout."""
-        assert url.startswith('http://'), "only http is supported."
-        self.handler = CleanableHTTPHandler()
-        opener = urllib2.build_opener(self.handler)
-        return opener.open(url, data).read()
+        request_kwargs.setdefault("method", "GET")
+        self.session = Session()
+        # Don't honour things like environment proxy configuration.
+        self.session.trust_env = False
+        # Mount our custom adapters.
+        self.session.mount("https://";, CleanableHTTPAdapter())
+        self.session.mount("http://";, CleanableHTTPAdapter())
+        return self.session.request(url=url, **request_kwargs).content
 
     def cleanup(self):
         """Reset the connection when the operation timed out."""
-        self.handler.reset_connection()
-
-
-def urlfetch(url, data=None):
-    """Wrapper for `urllib2.urlopen()` that times out."""
-    return URLFetcher().fetch(url, data)
+        self.session.close()
+
+
+def urlfetch(url, **request_kwargs):
+    """Wrapper for `requests.get()` that times out."""
+    return URLFetcher().fetch(url, **request_kwargs)
 
 
 class TransportWithTimeout(Transport):


Follow ups