[Merge] lp:~jameinel/launchpad/py27-timeout-1018905 into lp:launchpad


John A Meinel has proposed merging lp:~jameinel/launchpad/py27-timeout-1018905 into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #1018905 in Launchpad itself: "lp/services/doc/timeout.txt hangs in Python 2.7"

For more details, see:

= Summary =

This provides some python-2.7 compatibility, and moves a bunch of doc
tests into unit tests.

== Proposed fix ==

The doc tests were hanging under python 2.7 and precise, but it was
unclear where. So we moved them all to be unit-tests.
There is also one fix for integration with XMLRPC, where the connection
class is different in 2.6 vs 2.7.

== LOC Rationale ==

2 files changed, 313 insertions(+), 329 deletions(-)

So this is a net 16 line win. Some compatibility code can be removed when
we upgrade to 2.7, and it gets rid of some doctests.

== Tests ==

Old: bin/test -vv --layer lp.testing.layers.DatabaseFunctionalLayer -t

New: bin/test -m lp.services.tests.test_timeout

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jameinel/launchpad/py27-timeout-1018905 into lp:launchpad.
=== renamed file 'lib/lp/services/doc/timeout.txt' => 'lib/lp/services/tests/test_timeout.py'
--- lib/lp/services/doc/timeout.txt	2011-12-30 08:13:14 +0000
+++ lib/lp/services/tests/test_timeout.py	2012-06-28 16:09:27 +0000
@@ -1,326 +1,304 @@
-= Timing-out on external resources =
-When making a request to an external resource (web service, external
-process), we want to make sure that this request is interrupted if it
-takes too long to complete.
-The lp.services.timeout module provides a @with_timeout decorator
-that makes implementing that kind of behaviour easy.
-The time to wait can be passed using the timeout parameter to the
-    >>> from lp.services.timeout import with_timeout
-    >>> from select import select
-    >>> @with_timeout(timeout=0.5)
-    ... def wait_for(time):
-    ...     """Function that wait for a number of seconds."""
-    ...     select((), (), (), time)
-    ...     return "Succeeded."
-    >>> wait_for(0.1)
-    'Succeeded.'
-If the operation cannot be completed in the allotted time, a TimeoutError
-is raised.
-    >>> wait_for(1)
-    Traceback (most recent call last):
-    ...
-    TimeoutError: timeout exceeded.
-Other exceptions are reported correctly to the caller though:
-    >>> @with_timeout(timeout=0.5)
-    ... def call_with_error():
-    ...     raise Exception("This exception will be raised in the caller.")
-    >>> call_with_error()
-    Traceback (most recent call last):
-      ...
-    Exception: This exception will be raised in the caller.
-== Cleaning up timed out operation ==
-Since we want to time out operations involving an external resource
-(subprocess, remote site), we need a way to clean-up these resources
-once they time out. To this end, the with_timeout decorator accepts a
-callable parameter (named 'cleanup') that will be invoked if the
-operation times out.
-    >>> import socket
-    # Make sure that potential failures on this test don't cause it to
-    # hang forever.
-    >>> old_timeout = socket.getdefaulttimeout()
-    >>> socket.setdefaulttimeout(5)
-    >>> sockets = socket.socketpair()
-    >>> closed = False
-    >>> def close_socket():
-    ...     global closed
-    ...     closed = True
-    ...     sockets[0].shutdown(socket.SHUT_RDWR)
-    >>> @with_timeout(cleanup=close_socket, timeout=0.5)
-    ... def block():
-    ...     """This will block indefinitely."""
-    ...     sockets[0].recv(1024)
-    >>> block()
-    Traceback (most recent call last):
-      ...
-    TimeoutError: timeout exceeded.
-    >>> print closed
-    True
-The cleanup parameter can also be a string in which case it will be
-interpreted as the name of an instance method.
-    >>> class expirable_socket(object):
-    ...     def __init__(self):
-    ...         self.closed = False
-    ...         self.sockets = socket.socketpair()
-    ...
-    ...     @with_timeout(cleanup="shutdown", timeout=0.5)
-    ...     def block(self):
-    ...         self.sockets[0].recv(1024)
-    ...
-    ...     def shutdown(self):
-    ...         self.closed = True
-    ...         self.sockets[0].shutdown(socket.SHUT_RDWR)
-    >>> a_socket = expirable_socket()
-    >>> a_socket.block()
-    Traceback (most recent call last):
-      ...
-    TimeoutError: timeout exceeded.
-    >>> a_socket.closed
-    True
-It's an error to use a string cleanup when the function isn't a method.
-    >>> @with_timeout(cleanup='not_a_method', timeout=0.5)
-    ... def a_function(): pass
-    Traceback (most recent call last):
-      ...
-    TypeError: when not wrapping a method, cleanup must be a callable.
-== Default time out ==
-If the timeout parameter isn't provided, it will default to the value
-returned by the function installed as "default_timeout_function". A
-function is used because it's useful for the timeout value to be
-determined dynamically. For example, if you want to limit the
-overall processing to 30s and you already did 14s, you want that timeout
-to be 16s.
-By default, there is no default_timeout_function.
-    >>> from lp.services.timeout import (get_default_timeout_function,
-    ...     set_default_timeout_function)
-    >>> print get_default_timeout_function()
-    None
-When there is no default timeout function, it's an error not to provide
-a default timeout argument.
-    >>> @with_timeout()
-    ... def no_default_timeout(): pass
-    >>> no_default_timeout()
-    Traceback (most recent call last):
-      ...
-    AssertionError: no timeout set and there is no default timeout
-    function.
-The set_default_timeout_function() takes a function that should return
-the number of seconds to wait.
-    >>> def my_default_timeout():
-    ...     print "Will use default timeout."
-    ...     return 1
-    >>> set_default_timeout_function(my_default_timeout)
-    >>> no_default_timeout()
-    Will use default timeout.
-=== urlfetch() ===
-One common use case for timing out is when making an HTTP request to an
-external site to fetch content. To this end, the timeout module has a
-urlfetch() function that retrieve a URL using custom urllib2 handlers
-that will timeout using the default timeout function and clean-up the
-socket properly.
-    # Create a socket bound to a random port.
-    >>> sock = socket.socket()
-    >>> sock.settimeout(2)
-    >>> sock.bind(('', 0))
-    >>> from lp.services.timeout import urlfetch
-    # Use 1s as default timeout.
-    >>> set_default_timeout_function(lambda: 1)
-Normal urllib2 exceptions are raised:
-    >>> http_server_url = 'http://%s:%d/' % sock.getsockname()
-    >>> urlfetch(http_server_url)
-    Traceback (most recent call last):
-      ...
-    URLError: ...Connection refused...
-After the listen() is called, connections will hang until accept() is
-called, so a TimeoutError will be raised.
-    >>> sock.listen(1)
-    >>> urlfetch(http_server_url)
-    Traceback (most recent call last):
-      ...
-    TimeoutError: timeout exceeded.
-The client socket was closed properly, as we can see by calling recv()
-twice on the connected socket. The first recv() returns the request data
-sent by the client, the second one will block until the client closes
-its end of the connection. If the client closes its socket, '' is
-received, otherwise a socket timeout will occur.
-    >>> client_sock, client_addr = sock.accept()
-    >>> print client_sock.recv(1024)
-    GET / HTTP/1.1...
-    >>> client_sock.recv(1024)
-    ''
-The function also times out if the server replies very slowly.
-(Do the server part in a separate thread.)
-    >>> import threading
-    >>> import time
-    >>> from textwrap import dedent
-    >>> def slow_reply():
-    ...     (client_sock, client_addr) = sock.accept()
-    ...     content = 'You are veeeeryyy patient!'
-    ...     client_sock.sendall(dedent('''\
-    ...         HTTP/1.0 200 Ok
-    ...         Content-Type: text/plain
-    ...         Content-Length: %d\n\n''' % len(content)))
-    ...
-    ...     # Send the body of the reply very slowly, so that
-    ...     # it times out in read() and not urlopen.
-    ...     for c in content:
-    ...         client_sock.send(c)
-    ...         time.sleep(0.05)
-    ...     client_sock.close()
-    >>> slow_thread = threading.Thread(target=slow_reply)
-    >>> slow_thread.start()
-    >>> saved_threads = set(threading.enumerate())
-    >>> print urlfetch(http_server_url)
-    Traceback (most recent call last):
-      ...
-    TimeoutError: timeout exceeded.
-Note that the cleanup also takes care of leaving no worker thread behind.
-    >>> set(threading.enumerate()).difference(saved_threads)
-    set([])
-When the request succeeds, the result content is returned.
-    >>> def success_result():
-    ...     (client_sock, client_addr) = sock.accept()
-    ...     client_sock.sendall(dedent('''\
-    ...         HTTP/1.0 200 Ok
-    ...         Content-Type: text/plain
-    ...         Content-Length: 8
-    ...
-    ...         Success.'''))
-    ...     client_sock.close()
-    >>> threading.Thread(target=success_result).start()
-    >>> print urlfetch(http_server_url)
-    Success.
-urlfetch() only supports http urls:
-    >>> urlfetch('ftp://localhost')
-    Traceback (most recent call last):
-      ...
-    AssertionError: only http is supported.
-== TransportWithTimeouts ==
-Another use case for timeouts is communicating with external systems
-using XMLRPC.  In order to allow timeouts using XMLRPC we provide a
-transport that is timeout-aware.  The Transport is used for XMLRPC
-over HTTP.
-    # Create a socket bound to a random port, just to obtain a free port.
-    >>> sock = socket.socket()
-    >>> sock.bind(('', 0))
-    >>> addr, port = sock.getsockname()
-    >>> sock.close()
-Create a simple XMLRPC server to listen for requests.  The request
-handler will respond to 'echo' requests normally but will hang
-indefinitely for all other requests.  This allows us to show a
-successful request followed by one that times out.
-    >>> from SimpleXMLRPCServer import (
-    ...     SimpleXMLRPCRequestHandler, SimpleXMLRPCServer)
-    >>> class EchoOrWaitXMLRPCReqHandler(SimpleXMLRPCRequestHandler):
-    ...     def _dispatch(self, method, params):
-    ...         if method == 'echo':
-    ...             return params[0]
-    ...         else:
-    ...             # Will hang until the client closes its end of the socket.
-    ...             self.connection.settimeout(None)
-    ...             self.connection.recv(1024)
-    >>> class MySimpleXMLRPCServer(SimpleXMLRPCServer):
-    ...     allow_reuse_address = True
-    ...     def serve_2_requests(self):
-    ...         for i in range(2):
-    ...             self.handle_request()
-    ...         self.server_close()
-    ...     def handle_error(self, request, address):
-    ...         pass
-    >>> server = MySimpleXMLRPCServer(
-    ...              ("localhost", port),
-    ...              requestHandler=EchoOrWaitXMLRPCReqHandler,
-    ...              logRequests=False)
-    >>> server_thread = threading.Thread(target=server.serve_2_requests)
-    >>> server_thread.start()
-    >>> from lp.services.timeout import TransportWithTimeout
-    >>> from xmlrpclib import ServerProxy
-    >>> proxy = ServerProxy('https://localhost:%d' % port,
-    ...                     transport=TransportWithTimeout())
-    >>> print proxy.echo("Successful test message.")
-    Successful test message.
-    >>> print proxy.no_response("Unsuccessful test message.")
-    Traceback (most recent call last):
-      ...
-    TimeoutError: timeout exceeded.
-    >>> server_thread.join()
-== Cleanup ==
-    >>> slow_thread.join()
-    # Reset default socket timeout.
-    >>> socket.setdefaulttimeout(old_timeout)
-    # Reset the default timeout function.
-    >>> set_default_timeout_function(None)
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+"""timeout.py tests.
+__metaclass__ = type
+from cStringIO import StringIO
+import select
+from SimpleXMLRPCServer import (
+    SimpleXMLRPCRequestHandler,
+    SimpleXMLRPCServer,
+    )
+import socket
+from textwrap import dedent
+import threading
+import time
+import urllib2
+import xmlrpclib
+from zope.interface import implements
+from lp.services.log.logger import FakeLogger
+from lp.services.timeout import (
+    get_default_timeout_function,
+    set_default_timeout_function,
+    TimeoutError,
+    TransportWithTimeout,
+    urlfetch,
+    with_timeout,
+    )
+from lp.testing import TestCase
+from lp.testing.layers import BaseLayer
+def wait_for(time):
+    """Function that waits for a supplied number of seconds."""
+    select.select((), (), (), time)
+    return "Succeeded."
+def no_default_timeout(): pass
+class EchoOrWaitXMLRPCReqHandler(SimpleXMLRPCRequestHandler):
+    """The request handler will respond to 'echo' requests normally but will
+    hang indefinitely for all other requests.  This allows us to show a
+    successful request followed by one that times out.
+    """
+    def _dispatch(self, method, params):
+        if method == 'echo':
+            return params[0]
+        else:
+            # Will hang until the client closes its end of the socket.
+            self.connection.settimeout(None)
+            self.connection.recv(1024)
+class MySimpleXMLRPCServer(SimpleXMLRPCServer):
+    """Create a simple XMLRPC server to listen for requests."""
+    allow_reuse_address = True
+    def serve_2_requests(self):
+        for i in range(2):
+            self.handle_request()
+        self.server_close()
+    def handle_error(self, request, address):
+        pass
+class TestTimeout(TestCase):
+    def test_timeout_succeeds(self):
+        """After decorating a function 'with_timeout', as long as that function
+        finishes before the supplied timeout, it should function normally.
+        """
+        self.assertEqual("Succeeded.", wait_for(0.1))
+    def test_timeout_overrun(self):
+        """If the operation cannot be completed in the allotted time, a
+        TimeoutError is raised.
+        """
+        self.assertRaises(TimeoutError, wait_for, 1.0)
+        # Note: If with_timeout fails without a cleanup function, it leaves the
+        #       thread running, though the caller has gotten the TimeoutError
+        #       exception.
+    def test_timeout_with_failing_function(self):
+        """Other exceptions are reported correctly to the caller."""
+        @with_timeout(timeout=0.5)
+        def call_with_error():
+            raise RuntimeError("This exception will be raised in the caller.")
+        self.assertRaises(RuntimeError, call_with_error)
+    def test_timeout_with_cleanup(self):
+        """Since we want to time out operations involving an external resource
+        (subprocess, remote site), we need a way to clean-up these resources
+        once they time out. To this end, the with_timeout decorator accepts a
+        callable parameter (named 'cleanup') that will be invoked if the
+        operation times out.
+        """
+        old_timeout = socket.getdefaulttimeout()
+        self.addCleanup(socket.setdefaulttimeout, old_timeout)
+        socket.setdefaulttimeout(5)
+        sockets = socket.socketpair()
+        closed = []
+        def close_socket():
+            closed.append(True)
+            sockets[0].shutdown(socket.SHUT_RDWR)
+        @with_timeout(cleanup=close_socket, timeout=0.5)
+        def block():
+            """This will block indefinitely."""
+            sockets[0].recv(1024)
+        self.assertRaises(TimeoutError, block)
+        self.assertEqual([True], closed)
+    def test_timeout_with_string_cleanup(self):
+        """The cleanup parameter can also be a string in which case it will be
+        interpreted as the name of an instance method.
+        """
+        class expirable_socket(object):
+            def __init__(self):
+                self.closed = False
+                self.sockets = socket.socketpair()
+            @with_timeout(cleanup="shutdown", timeout=0.5)
+            def block(self):
+                self.sockets[0].recv(1024)
+            def shutdown(self):
+                self.closed = True
+                self.sockets[0].shutdown(socket.SHUT_RDWR)
+        a_socket = expirable_socket()
+        self.assertRaises(TimeoutError, a_socket.block)
+        self.assertIs(True, a_socket.closed)
+    def test_invalid_string_without_method(self):
+        """It's an error to use a string cleanup when the function isn't a
+        method."""
+        def do_definition():
+            @with_timeout(cleanup='not_a_method', timeout=0.5)
+            def a_function(): pass
+        self.assertRaises(TypeError, do_definition)
+    def test_timeout_uses_default(self):
+        """If the timeout parameter isn't provided, it will default to the value
+        returned by the function installed as "default_timeout_function". A
+        function is used because it's useful for the timeout value to be
+        determined dynamically. For example, if you want to limit the
+        overall processing to 30s and you already did 14s, you want that timeout
+        to be 16s.
+        By default, there is no default_timeout_function.
+        """
+        self.assertIs(None, get_default_timeout_function())
+    def test_timeout_requires_value_when_no_default(self):
+        """When there is no default timeout function, it's an error not to
+        provide a default timeout argument.
+        """
+        e = self.assertRaises(AssertionError, no_default_timeout)
+        self.assertEqual(
+            "no timeout set and there is no default timeout function.",
+            str(e))
+    def test_set_default_timeout(self):
+        """the set_default_timeout_function() takes a function that should return
+        the number of seconds to wait.
+        """
+        using_default = []
+        def my_default_timeout():
+            using_default.append(True)
+            return 1
+        set_default_timeout_function(my_default_timeout)
+        self.addCleanup(set_default_timeout_function, None)
+        no_default_timeout()
+        self.assertEqual([True], using_default)
+    def make_test_socket(self):
+        """One common use case for timing out is when making an HTTP request to
+        an external site to fetch content. To this end, the timeout module has
+        a urlfetch() function that retrieve a URL using custom urllib2 handlers
+        that will timeout using the default timeout function and clean-up the
+        socket properly.
+        """
+        sock = socket.socket()
+        sock.settimeout(2)
+        sock.bind(('', 0))
+        # Use 1s as default timeout.
+        set_default_timeout_function(lambda: 1)
+        self.addCleanup(set_default_timeout_function, None)
+        http_server_url = 'http://%s:%d/' % sock.getsockname()
+        return sock, http_server_url
+    def test_urlfetch_raises_urllib2_exceptions(self):
+        """Normal urllib2 exceptions are raised."""
+        sock, http_server_url = self.make_test_socket()
+        e = self.assertRaises(urllib2.URLError, urlfetch, http_server_url)
+        self.assertIn('Connection refused', str(e))
+    def test_urlfetch_timeout_after_listen(self):
+        """After the listen() is called, connections will hang until accept()
+        is called, so a TimeoutError will be raised.
+        """
+        sock, http_server_url = self.make_test_socket()
+        sock.listen(1)
+        self.assertRaises(TimeoutError, urlfetch, http_server_url)
+        # The client socket was closed properly, as we can see by calling
+        # recv() twice on the connected socket. The first recv() returns the
+        # request data sent by the client, the second one will block until the
+        # client closes its end of the connection. If the client closes its
+        # socket, '' is received, otherwise a socket timeout will occur.
+        client_sock, client_addr = sock.accept()
+        self.assertStartsWith(client_sock.recv(1024), "GET / HTTP/1.1")
+        self.assertEqual('', client_sock.recv(1024))
+    def test_urlfetch_slow_server(self):
+        """The function also times out if the server replies very slowly.
+        (Do the server part in a separate thread.)
+        """
+        sock, http_server_url = self.make_test_socket()
+        sock.listen(1)
+        stop_event = threading.Event()
+        def slow_reply():
+            (client_sock, client_addr) = sock.accept()
+            content = 'You are veeeeryyy patient!'
+            client_sock.sendall(dedent("""\
+                HTTP/1.0 200 Ok
+                Content-Type: text/plain
+                Content-Length: %d\n\n""" % len(content)))
+            # Send the body of the reply very slowly, so that
+            # it times out in read() and not urlopen.
+            for c in content:
+                client_sock.send(c)
+                if stop_event.wait(0.05):
+                    break
+            client_sock.close()
+        slow_thread = threading.Thread(target=slow_reply)
+        slow_thread.start()
+        saved_threads = set(threading.enumerate())
+        self.assertRaises(TimeoutError, urlfetch, http_server_url)
+        # Note that the cleanup also takes care of leaving no worker thread behind.
+        remaining_threads = set(threading.enumerate()).difference(saved_threads)
+        self.assertEqual(set(), remaining_threads)
+        stop_event.set()
+        slow_thread.join()
+    def test_urlfetch_returns_the_content(self):
+        """When the request succeeds, the result content is returned."""
+        sock, http_server_url = self.make_test_socket()
+        sock.listen(1)
+        def success_result():
+            (client_sock, client_addr) = sock.accept()
+            client_sock.sendall(dedent("""\
+                HTTP/1.0 200 Ok
+                Content-Type: text/plain
+                Content-Length: 8
+                Success."""))
+            client_sock.close()
+        t = threading.Thread(target=success_result)
+        t.start()
+        self.assertEqual('Success.', urlfetch(http_server_url))
+        t.join()
+    def test_urlfetch_only_supports_http_urls(self):
+        """urlfetch() only supports http urls:"""
+        set_default_timeout_function(lambda: 1)
+        self.addCleanup(set_default_timeout_function, None)
+        e = self.assertRaises(AssertionError, urlfetch, 'ftp://localhost')
+        self.assertEqual('only http is supported.', str(e))
+    def test_xmlrpc_transport(self):
+        """ Another use case for timeouts is communicating with external
+        systems using XMLRPC.  In order to allow timeouts using XMLRPC we
+        provide a transport that is timeout-aware.  The Transport is used for
+        XMLRPC over HTTP.
+        """
+        # Create a socket bound to a random port, just to obtain a free port.
+        set_default_timeout_function(lambda: 1)
+        self.addCleanup(set_default_timeout_function, None)
+        sock, http_server_url = self.make_test_socket()
+        addr, port = sock.getsockname()
+        sock.close()
+        server = MySimpleXMLRPCServer(('', port),
+            requestHandler=EchoOrWaitXMLRPCReqHandler,
+            logRequests=False)
+        server_thread = threading.Thread(target=server.serve_2_requests)
+        server_thread.start()
+        proxy = xmlrpclib.ServerProxy(http_server_url,
+                                      transport=TransportWithTimeout())
+        self.assertEqual('Successful test message.',
+                         proxy.echo('Successful test message.'))
+        self.assertRaises(TimeoutError,
+                          proxy.no_response, 'Unsuccessful test message.')
+        server_thread.join()

=== modified file 'lib/lp/services/timeout.py'
--- lib/lp/services/timeout.py	2012-01-01 02:58:52 +0000
+++ lib/lp/services/timeout.py	2012-06-28 16:09:27 +0000
@@ -115,9 +115,11 @@
     def __call__(self, f):
         """Wraps the method."""
         def call_with_timeout(*args, **kwargs):
+            # Ensure that we have a timeout before we start the thread
+            timeout = self.timeout
             t = ThreadCapturingResult(f, args, kwargs)
-            t.join(self.timeout)
+            t.join(timeout)
             if t.isAlive():
                 if self.cleanup is not None:
                     if isinstance(self.cleanup, basestring):
@@ -197,13 +199,17 @@
     def cleanup(self):
         """In the event of a timeout cleanup by closing the connection."""
+        # py2.6 compatibility
+        # In Python 2.6, xmlrpclib.Transport wraps its HTTPConnection in an
+        # HTTP compatibility class. In 2.7 it just uses the conn directly.
+        http_conn = getattr(self.conn, '_conn', self.conn)
-            self.conn._conn.sock.shutdown(socket.SHUT_RDWR)
+            http_conn.sock.shutdown(socket.SHUT_RDWR)
         except AttributeError:
             # It's possible that the other thread closed the socket
             # beforehand.
-        self.conn._conn.close()
+        http_conn.close()
 class SafeTransportWithTimeout(SafeTransport):

