← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~lifeless/python-oops-twisted/bug-873030 into lp:python-oops-twisted

 

Robert Collins has proposed merging lp:~lifeless/python-oops-twisted/bug-873030 into lp:python-oops-twisted.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #873030 in python-oops-twisted: "IBodyProducer isn't supported with twisted wsgi stacks"
  https://bugs.launchpad.net/python-oops-twisted/+bug/873030

For more details, see:
https://code.launchpad.net/~lifeless/python-oops-twisted/bug-873030/+merge/81427

This adds support for IBodyProducer in WSGI app_body's. This is a Twisted specific extension requested by U1 (see the bug for some more details).

Its a lot of code, because the IBodyProducer protocol is complex - it permits pausing, and this makes it a little nontrivial to wrap (e.g. we need to manage the returned deferred ourselves).
-- 
https://code.launchpad.net/~lifeless/python-oops-twisted/bug-873030/+merge/81427
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~lifeless/python-oops-twisted/bug-873030 into lp:python-oops-twisted.
=== modified file '.bzrignore'
--- .bzrignore	2011-09-23 07:44:42 +0000
+++ .bzrignore	2011-11-07 03:12:24 +0000
@@ -8,3 +8,4 @@
 ./download-cache
 ./dist
 ./MANIFEST
+.testrepository

=== added file '.testr.conf'
--- .testr.conf	1970-01-01 00:00:00 +0000
+++ .testr.conf	2011-11-07 03:12:24 +0000
@@ -0,0 +1,4 @@
+[DEFAULT]
+test_command=PYTHONPATH=. bin/py -m subunit.run $LISTOPT $IDOPTION oops_twisted.tests.test_suite
+test_id_option=--load-list $IDFILE
+test_list_option=--list

=== modified file 'NEWS'
--- NEWS	2011-10-13 03:02:31 +0000
+++ NEWS	2011-11-07 03:12:24 +0000
@@ -12,6 +12,9 @@
 * Make the logic for creating an OOPS report from a Failure reusable outside of
   the log handling logic. (Robert Collins)
 
+* Support for IBodyProducer enhanced WSGI gateways added.
+  (Robert Collins, 873030).
+
 0.0.3
 -----
 

=== modified file 'README'
--- README	2011-09-23 07:44:42 +0000
+++ README	2011-11-07 03:12:24 +0000
@@ -40,6 +40,9 @@
 Usage
 =====
 
+OOPS Configuration
+++++++++++++++++++
+
 * Setup your configuration::
 
   >>> from oops_twisted import Config
@@ -56,6 +59,9 @@
 
  A helper 'defer_publisher' is supplied to do this for your convenience.
 
+Catching log.err calls
+++++++++++++++++++++++
+
 * create an OOPS log observer::
 
  >>> from oops_twisted import OOPSObserver
@@ -81,6 +87,19 @@
 
  >>> observer = OOPSObserver(config, twisted.log.PythonLoggingObserver().emit)
 
+Extending WSGI
+++++++++++++++
+
+oops_twisted supports an extended WSGI contract where if the returned iterator
+for the body implements t.w.i.IBodyProducer, then the iterator that
+oops_twisted's WSGI wrapper returns will also implement IBodyProducer. This is
+useful with a customised Twisted WSGI resource that runs IBodyProducer
+iterators in the IO loop, rather than using up a threadpool thread. To use this
+pass tracker=oops_twisted.wsgi.body_producer_tracker when calling
+oops_wsgi.make_app. Note that a non-twisted OOPS Config is assumed because 
+the WSGI protocol is synchronous: be sure to provide the oops_wsgi make_app
+with a non-twisted OOPS Config.
+
 For more information see pydoc oops_twisted.
 
 Installation

=== modified file 'oops_twisted/__init__.py'
--- oops_twisted/__init__.py	2011-10-13 03:02:31 +0000
+++ oops_twisted/__init__.py	2011-11-07 03:12:24 +0000
@@ -39,6 +39,9 @@
 Usage
 =====
 
+OOPS Configuration
+++++++++++++++++++
+
 * Setup your configuration::
 
   >>> from oops_twisted import Config
@@ -55,6 +58,9 @@
 
  A helper 'defer_publisher' is supplied to do this for your convenience.
 
+Catching log.err calls
+++++++++++++++++++++++
+
 * create an OOPS log observer::
 
  >>> from oops_twisted import OOPSObserver
@@ -92,6 +98,37 @@
  >>> from twisted.python.failure import Failure
  >>> report = config.create(dict(twisted_failure=Failure()))
  >>> config.publish(report)
+
+Extending WSGI
+++++++++++++++
+
+oops_twisted supports an extended WSGI contract where if the returned iterator
+for the body implements t.w.i.IBodyProducer, then the iterator that
+oops_twisted's WSGI wrapper returns will also implement IBodyProducer. This is
+useful with a customised Twisted WSGI resource that runs IBodyProducer
+iterators in the IO loop, rather than using up a threadpool thread. To use this
+pass tracker=oops_twisted.wsgi.body_producer_tracker when calling
+oops_wsgi.make_app. Note that a non-twisted OOPS Config is assumed because 
+the WSGI protocol is synchronous: be sure to provide the oops_wsgi make_app
+with a non-twisted OOPS Config.
+
+If you are publishing with native OOPS publishers you may want to write a small
+synchronous publish-to-an-internal queue as you cannot use
+t.i.t.blockingCallFromThread: the WSGI protocol permits start_response to be
+called quite late, which may happen after an IBodyProducer has been returned to
+the WSGI gateway and all further code will be executing in the reactor thread.
+Specifically the call to startProducing may trigger start_response being called
+before the first write() occurs; and the call to start_response may trigger an
+OOPS being published if:
+ - it contains an exc_info value
+ - it has a status code matching the oops-on-status code values
+ - (in future) the response takes too long to start flowing
+Another route to exceptions is in startProducing itself, which may error, with
+similar consequences in that the oops config must be called into, and it has to
+be compatible with the config for start_response handling. If there is a need
+to address this, oops_twisted could take responsibility for exception handling
+in the IBodyProducer code path, with the cost of needing a second OOPS config
+- a native Twisted one.
 """
 
 

=== modified file 'oops_twisted/tests/__init__.py'
--- oops_twisted/tests/__init__.py	2011-10-13 03:02:31 +0000
+++ oops_twisted/tests/__init__.py	2011-11-07 03:12:24 +0000
@@ -24,6 +24,7 @@
         'config',
         'createhooks',
         'log',
+        'wsgi',
         ]
     return TestLoader().loadTestsFromNames(
         ['oops_twisted.tests.test_' + name for name in test_mod_names])

=== added file 'oops_twisted/tests/test_wsgi.py'
--- oops_twisted/tests/test_wsgi.py	1970-01-01 00:00:00 +0000
+++ oops_twisted/tests/test_wsgi.py	2011-11-07 03:12:24 +0000
@@ -0,0 +1,297 @@
+# Copyright (c) 2011, Canonical Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for the twisted wsgi body tracker."""
+
+from operator import methodcaller
+
+from oops_wsgi.middleware import generator_tracker
+from testtools import TestCase
+from testtools.deferredruntest import AsynchronousDeferredRunTest
+from testtools.matchers import MatchesException
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred
+from twisted.web.iweb import IBodyProducer
+from zope.interface import implements
+from zope.interface.verify import verifyObject
+
+from oops_twisted.wsgi import body_producer_tracker
+
+
+class TestBodyProducer(TestCase):
+
+    run_tests_with = AsynchronousDeferredRunTest
+
+    def setUp(self):
+        super(TestBodyProducer, self).setUp()
+        self.calls = []
+
+    def on_first_bytes(self):
+        self.calls.append('on_first_bytes')
+
+    def on_finish(self):
+        self.calls.append('on_finish')
+
+    def on_exception_ok(self, exc_info):
+        self.calls.append('on exception %s' % (exc_info[1],))
+        return 'error page'
+
+    def on_exception_fail(self, exc_info):
+        self.calls.append('on exception %s' % (exc_info[1],))
+        raise ValueError('fail')
+
+    def test_does_not_wrap_non_IBodyProducer(self):
+        app_body = ['foo', 'bar']
+        tracker = body_producer_tracker(self.on_first_bytes, self.on_finish,
+            self.on_exception_ok, app_body)
+        self.assertFalse(IBodyProducer.providedBy(tracker))
+        self.assertEqual(['foo', 'bar'], list(tracker))
+
+    def test_wrapper_implements_everything(self):
+        chunks = ['foo', 'bar']
+        app_body = ListProducer(list(chunks))
+        tracker = body_producer_tracker(self.on_first_bytes, self.on_finish,
+            self.on_exception_ok, app_body)
+        self.assertTrue(verifyObject(IBodyProducer, tracker))
+        consumer = Consumer()
+        consumer.registerProducer(tracker, True)
+        d = tracker.startProducing(consumer)
+        d.addCallback(lambda _: self.assertEqual(chunks, consumer.received))
+        return d
+
+    def test_call_order_empty_body(self):
+        # With an empty body the two callbacks are called.
+        chunks = []
+        app_body = ListProducer(list(chunks))
+        tracker = body_producer_tracker(self.on_first_bytes, self.on_finish,
+            self.on_exception_ok, app_body)
+        consumer = Consumer()
+        consumer.registerProducer(tracker, True)
+        d = tracker.startProducing(consumer)
+        d.addCallback(lambda _: self.assertEqual(chunks, consumer.received))
+        expected = ['on_finish']
+        d.addCallback(lambda _: self.assertEqual(expected, self.calls))
+        return d
+
+    def test_call_order_two_items(self):
+        chunks = ['foo', 'bar']
+        app_body = ListProducer(list(chunks))
+        tracker = body_producer_tracker(self.on_first_bytes, self.on_finish,
+            self.on_exception_ok, app_body)
+        consumer = Consumer(self.calls)
+        consumer.registerProducer(tracker, True)
+        d = tracker.startProducing(consumer)
+        d.addCallback(lambda _: self.assertEqual(chunks, consumer.received))
+        # On_first_bytes is called before the bytes are written to ensure
+        # headers get pushed through.
+        expected = ['on_first_bytes', 'foo', 'bar', 'on_finish']
+        d.addCallback(lambda _: self.assertEqual(expected, self.calls))
+        return d
+
+    def test_on_exception_on_error_fails(self):
+        # When an exception is raised from on_error, it indicates the oops code
+        # did not take over the response - it re-raises the current exception.
+        # In that case, we want the app environment to know - the wrapped
+        # deferred needs to fire.
+        chunks = ['foo', 'bar']
+        app_body = ListProducer(list(chunks), error_chunk=2)
+        tracker = body_producer_tracker(self.on_first_bytes, self.on_finish,
+            self.on_exception_fail, app_body)
+        consumer = Consumer(self.calls)
+        consumer.registerProducer(tracker, True)
+        d = tracker.startProducing(consumer)
+        expectedException = MatchesException(Exception("fail"))
+        d.addErrback(lambda failure:self.assertThat(
+            (failure.type, failure.value, ''), expectedException))
+        # The first chunk should have gotten through
+        expected = ['on_first_bytes', 'foo', 'on exception Test Error']
+        d.addCallback(lambda _: self.assertEqual(expected, self.calls))
+        return d
+
+    def test_on_exception_on_error_succeeds_paused(self):
+        # When on_error succeeds the oops code has rendered an error page which
+        # should be written to the consumer. If the consumer is paused, this
+        # has to be buffered.
+        chunks = ['foo', 'bar']
+        app_body = ListProducer(list(chunks), error_chunk=1, delay_write=True)
+        tracker = body_producer_tracker(self.on_first_bytes, self.on_finish,
+            self.on_exception_ok, app_body)
+        consumer = Consumer(self.calls)
+        consumer.registerProducer(tracker, True)
+        d = tracker.startProducing(consumer)
+        # Pause everything.
+        tracker.pauseProducing()
+        # Trigger a failure (as if the app_body had suddenly failed).
+        app_body._finish_error()
+        # We must not have fired d yet - we fire on escaping error or when the
+        # writes are done, and with the producer paused, the error page cannot
+        # have been written yet.
+        self.assertFalse(d.called)
+        reactor.callLater(0, tracker.resumeProducing)
+        expected = ['on exception Test Error', 'error page']
+        d.addCallback(lambda _: self.assertEqual(expected, self.calls))
+        d.addCallback(
+            lambda _: self.assertEqual(len('error page'), tracker.length))
+        return d
+
+    def test_on_exception_on_error_succeeds_notpaused(self):
+        # When on_error succeeds the oops code has rendered an error page which
+        # should be written to the consumer.
+        chunks = ['foo', 'bar']
+        app_body = ListProducer(list(chunks), error_chunk=1)
+        tracker = body_producer_tracker(self.on_first_bytes, self.on_finish,
+            self.on_exception_ok, app_body)
+        consumer = Consumer(self.calls)
+        consumer.registerProducer(tracker, True)
+        d = tracker.startProducing(consumer)
+        expected = ['on exception Test Error', 'error page']
+        d.addCallback(lambda _: self.assertEqual(expected, self.calls))
+        d.addCallback(
+            lambda _: self.assertEqual(len('error page'), tracker.length))
+        return d
+
+    def test_pause_resume_producing_passed_through(self):
+        chunks = ['foo']
+        app_body = ListProducer(list(chunks))
+        tracker = body_producer_tracker(self.on_first_bytes, self.on_finish,
+            self.on_exception_ok, app_body)
+        consumer = Consumer()
+        consumer.registerProducer(tracker, True)
+        d = tracker.startProducing(consumer)
+        self.assertNotEqual(None, app_body.next_write)
+        tracker.pauseProducing()
+        self.assertEqual(None, app_body.next_write)
+        tracker.resumeProducing()
+        self.assertNotEqual(None, app_body.next_write)
+        return d
+
+    def test_stop_producing_passed_through(self):
+        chunks = ['foo']
+        app_body = ListProducer(list(chunks))
+        tracker = body_producer_tracker(self.on_first_bytes, self.on_finish,
+            self.on_exception_ok, app_body)
+        consumer = Consumer()
+        consumer.registerProducer(tracker, True)
+        # d will never fire, we hope. Can't prove a negative though :).
+        tracker.startProducing(consumer)
+        self.assertNotEqual(None, app_body.result)
+        tracker.stopProducing()
+        self.assertEqual(None, app_body.result)
+        self.assertEqual(None, tracker.result)
+
+
+class Consumer:
+    """A simple consumer for testing."""
+
+    def __init__(self, calls=None):
+        self.received = []
+        if calls is None:
+            calls = []
+        self.calls = calls
+
+    def registerProducer(self, producer, streaming):
+        self.producer = producer
+
+    def unregisterProducer(self, producer):
+        self.producer = None
+
+    def write(self, data):
+        self.calls.append(data)
+        self.received.append(data)
+
+
+class ListProducer:
+    """A simple producer for testing.
+    
+    Possibly something to put into twisted itself.
+    """
+    implements(IBodyProducer)
+    
+    def __init__(self, chunks, length=None, error_chunk=None,
+            delay_write=False):
+        """Create a ListProducer.
+
+        :param chunks: A list of bytestrings to write to the consumer.
+        :param length: If not None, the length to expose. If None it is
+            auto calculated.
+        :param error_chunk: The index of the chunk on which to raise an
+            error. If None, no error will be raised.
+        :param delay_write: If True do not do a write on startProducing.
+        """
+        if length is None:
+            length = sum(map(methodcaller('__len__'), chunks))
+        self.length = length
+        self.chunks = list(chunks)
+        self.chunks.reverse()
+        self.error_chunk = error_chunk
+        self.delay_write = delay_write
+        self.consumer = None
+        self.result = None
+        self.next_write = None
+
+    def startProducing(self, consumer):
+        self.consumer = consumer
+        result = Deferred()
+        self.result = result
+        if not self.delay_write:
+            self._send_chunk()
+        return result
+
+    def _send_chunk(self):
+        self.next_write = None
+        if not self.chunks:
+            self._finish_ok()
+        else:
+            if self.error_chunk is not None:
+                self.error_chunk -= 1
+                if not self.error_chunk:
+                    self._finish_error()
+                    return
+            self.consumer.write(self.chunks.pop(-1))
+            self.resumeProducing()
+
+    def _finish_ok(self):
+        if self.result:
+            result = self.result
+            self.result = None
+        if result:
+            result.callback(None)
+
+    def _finish_error(self):
+        if not self.result:
+            return
+        result = self.result
+        self.result = None
+        try:
+            raise Exception("Test Error")
+        except Exception:
+            result.errback()
+
+    def stopProducing(self):
+        # The deferred from startProducing must not fire.
+        self.result = None
+        # And we should not send any more data.
+        self.pauseProducing()
+
+    def pauseProducing(self):
+        next_write = self.next_write
+        self.next_write = None
+        if next_write:
+            next_write.cancel()
+
+    def resumeProducing(self):
+        if self.next_write is None:
+            self.next_write = reactor.callLater(0, self._send_chunk)

=== added file 'oops_twisted/wsgi.py'
--- oops_twisted/wsgi.py	1970-01-01 00:00:00 +0000
+++ oops_twisted/wsgi.py	2011-11-07 03:12:24 +0000
@@ -0,0 +1,149 @@
+# Copyright (c) 2011, Canonical Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Extended WSGI support for Twisted."""
+
+from oops_wsgi.middleware import generator_tracker
+from twisted.internet.defer import Deferred
+from twisted.web.iweb import IBodyProducer
+from zope.interface import implements
+
+
+__all__ = [
+    'body_producer_tracker',
+    ]
+
+def body_producer_tracker(on_first_bytes, on_finish, on_error, app_body):
+    """A wrapper for IBodyProducer that calls the OOPS hooks as needed.
+
+    :seealso: generator_tracker which defines the contract for this factory
+    function.
+
+    :param on_first_bytes: Called as on_first_bytes() when the first bytes from
+        the app body are available but before they are delivered.
+    :param on_finish: Called as on_finish() when the app body is fully
+        consumed.
+    :param on_error: Called as on_error(sys.exc_info()) if a handleable error
+        has occured while consuming the generator. Errors like GeneratorExit
+        are not handleable. The return value from this is written to the
+        consumer.
+    :param app_body: The iterable body for the WSGI app. This may be a simple
+        list or a generator or an IBodyProducer. If it is not an IBodyProducer
+        then oops_wsgi.middleware.generator_tracker will be used.
+    """
+    if not IBodyProducer.providedBy(app_body):
+        return generator_tracker(
+            on_first_bytes, on_finish, on_error, app_body)
+    return ProducerWrapper(on_first_bytes, on_finish, on_error, app_body)
+
+
+class ProducerWrapper:
+    """Wrap an IBodyProducer and call callbacks at key points.
+
+    :seealso: body_producer_tracker - the main user of ProducerWrapper.
+    """
+
+    implements(IBodyProducer)
+
+    def __init__(self, on_first_bytes, on_finish, on_error, app_body):
+        self.on_first_bytes = on_first_bytes
+        self.on_finish = on_finish
+        self.on_error = on_error
+        self.app_body = app_body
+        # deferred returned from startProducing. If None the producer has
+        # finished its work (or never been started).
+        self.result = None
+        # We wrap the consumer as well so that we can track the first write.
+        self.consumer = None
+        # Error page from OOPS
+        self.error_data = None
+
+    @property
+    def length(self):
+        if self.error_data is not None:
+            return len(self.error_data)
+        return self.app_body.length
+
+    def startProducing(self, consumer):
+        self.consumer = consumer
+        self.written = False
+        # Track production state
+        self.paused = False
+        # Error page from OOPS
+        self.error_data = None
+        # The deferred we return. Because we eat errors when an OOPS error page
+        # is generated, we cannot return the underlying producers deferred.
+        result = Deferred()
+        self.result = result
+        # Tell the wrapped producer to write to us.
+        wrapped_result = self.app_body.startProducing(self)
+        # We need a callback at the end to fire on_finish.
+        wrapped_result.addCallback(self.wrapped_finished)
+        # If an exception happens, we want to fire on_error.
+        wrapped_result.addErrback(self.wrapped_failure)
+        return result
+
+    def stopProducing(self):
+        # The deferred from startProducing must not fire.
+        self.result = None
+        self.app_body.stopProducing()
+
+    def pauseProducing(self):
+        self.paused = True
+        self.app_body.pauseProducing()
+
+    def resumeProducing(self):
+        if self.error_data is not None:
+            self.consumer.write(self.error_data)
+            result = self.result
+            self.result = None
+            result.callback(None)
+        else:
+            self.app_body.resumeProducing()
+
+    def write(self, data):
+        if not self.written:
+            self.written = True
+            self.on_first_bytes()
+        self.consumer.write(data)
+
+    def wrapped_finished(self, ignored):
+        result = self.result
+        self.result = None
+        try:
+            self.on_finish()
+        except:
+            result.errback()
+        else:
+            result.callback(None)
+
+    def wrapped_failure(self, failure):
+        try:
+            exc_info = (
+                failure.type, failure.value, failure.getTracebackObject())
+            self.error_data = self.on_error(exc_info)
+        except:
+            result = self.result
+            self.result = None
+            result.errback()
+        else:
+            if not self.paused:
+                self.consumer.write(self.error_data)
+                result = self.result
+                self.result = None
+                result.callback(None)
+            # Received an error from the producer while it was paused, and OOPS
+            # generated an error page. This will be buffered until we are unpaused.

=== modified file 'setup.py'
--- setup.py	2011-10-13 03:02:31 +0000
+++ setup.py	2011-11-07 03:12:24 +0000
@@ -41,6 +41,7 @@
           ],
       install_requires = [
           'oops',
+          'oops_wsgi',
           'pytz',
           'Twisted',
           ],

=== modified file 'versions.cfg'
--- versions.cfg	2011-09-27 06:55:43 +0000
+++ versions.cfg	2011-11-07 03:12:24 +0000
@@ -5,6 +5,7 @@
 fixtures = 0.3.6
 iso8601 = 0.1.4
 oops = 0.0.7
+oops_wsgi = 0.0.6
 paste = 1.7.2
 pytz = 2010o
 setuptools = 0.6c11


Follow ups