launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #05435
[Merge] lp:~lifeless/python-oops-twisted/bug-873030 into lp:python-oops-twisted
Robert Collins has proposed merging lp:~lifeless/python-oops-twisted/bug-873030 into lp:python-oops-twisted.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #873030 in python-oops-twisted: "IBodyProducer isn't supported with twisted wsgi stacks"
https://bugs.launchpad.net/python-oops-twisted/+bug/873030
For more details, see:
https://code.launchpad.net/~lifeless/python-oops-twisted/bug-873030/+merge/81427
This adds support for IBodyProducer in WSGI app_body's. This is a Twisted specific extension requested by U1 (see the bug for some more details).
Its a lot of code, because the IBodyProducer protocol is complex - it permits pausing, and this makes it a little nontrivial to wrap (e.g. we need to manage the returned deferred ourselves).
--
https://code.launchpad.net/~lifeless/python-oops-twisted/bug-873030/+merge/81427
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~lifeless/python-oops-twisted/bug-873030 into lp:python-oops-twisted.
=== modified file '.bzrignore'
--- .bzrignore 2011-09-23 07:44:42 +0000
+++ .bzrignore 2011-11-07 03:12:24 +0000
@@ -8,3 +8,4 @@
./download-cache
./dist
./MANIFEST
+.testrepository
=== added file '.testr.conf'
--- .testr.conf 1970-01-01 00:00:00 +0000
+++ .testr.conf 2011-11-07 03:12:24 +0000
@@ -0,0 +1,4 @@
+[DEFAULT]
+test_command=PYTHONPATH=. bin/py -m subunit.run $LISTOPT $IDOPTION oops_twisted.tests.test_suite
+test_id_option=--load-list $IDFILE
+test_list_option=--list
=== modified file 'NEWS'
--- NEWS 2011-10-13 03:02:31 +0000
+++ NEWS 2011-11-07 03:12:24 +0000
@@ -12,6 +12,9 @@
* Make the logic for creating an OOPS report from a Failure reusable outside of
the log handling logic. (Robert Collins)
+* Support for IBodyProducer enhanced WSGI gateways added.
+ (Robert Collins, 873030).
+
0.0.3
-----
=== modified file 'README'
--- README 2011-09-23 07:44:42 +0000
+++ README 2011-11-07 03:12:24 +0000
@@ -40,6 +40,9 @@
Usage
=====
+OOPS Configuration
+++++++++++++++++++
+
* Setup your configuration::
>>> from oops_twisted import Config
@@ -56,6 +59,9 @@
A helper 'defer_publisher' is supplied to do this for your convenience.
+Catching log.err calls
+++++++++++++++++++++++
+
* create an OOPS log observer::
>>> from oops_twisted import OOPSObserver
@@ -81,6 +87,19 @@
>>> observer = OOPSObserver(config, twisted.log.PythonLoggingObserver().emit)
+Extending WSGI
+++++++++++++++
+
+oops_twisted supports an extended WSGI contract where if the returned iterator
+for the body implements t.w.i.IBodyProducer, then the iterator that
+oops_twisted's WSGI wrapper returns will also implement IBodyProducer. This is
+useful with a customised Twisted WSGI resource that runs IBodyProducer
+iterators in the IO loop, rather than using up a threadpool thread. To use this
+pass tracker=oops_twisted.wsgi.body_producer_tracker when calling
+oops_wsgi.make_app. Note that a non-twisted OOPS Config is assumed because
+the WSGI protocol is synchronous: be sure to provide the oops_wsgi make_app
+with a non-twisted OOPS Config.
+
For more information see pydoc oops_twisted.
Installation
=== modified file 'oops_twisted/__init__.py'
--- oops_twisted/__init__.py 2011-10-13 03:02:31 +0000
+++ oops_twisted/__init__.py 2011-11-07 03:12:24 +0000
@@ -39,6 +39,9 @@
Usage
=====
+OOPS Configuration
+++++++++++++++++++
+
* Setup your configuration::
>>> from oops_twisted import Config
@@ -55,6 +58,9 @@
A helper 'defer_publisher' is supplied to do this for your convenience.
+Catching log.err calls
+++++++++++++++++++++++
+
* create an OOPS log observer::
>>> from oops_twisted import OOPSObserver
@@ -92,6 +98,37 @@
>>> from twisted.python.failure import Failure
>>> report = config.create(dict(twisted_failure=Failure()))
>>> config.publish(report)
+
+Extending WSGI
+++++++++++++++
+
+oops_twisted supports an extended WSGI contract where if the returned iterator
+for the body implements t.w.i.IBodyProducer, then the iterator that
+oops_twisted's WSGI wrapper returns will also implement IBodyProducer. This is
+useful with a customised Twisted WSGI resource that runs IBodyProducer
+iterators in the IO loop, rather than using up a threadpool thread. To use this
+pass tracker=oops_twisted.wsgi.body_producer_tracker when calling
+oops_wsgi.make_app. Note that a non-twisted OOPS Config is assumed because
+the WSGI protocol is synchronous: be sure to provide the oops_wsgi make_app
+with a non-twisted OOPS Config.
+
+If you are publishing with native OOPS publishers you may want to write a small
+synchronous publish-to-an-internal queue as you cannot use
+t.i.t.blockingCallFromThread: the WSGI protocol permits start_response to be
+called quite late, which may happen after an IBodyProducer has been returned to
+the WSGI gateway and all further code will be executing in the reactor thread.
+Specifically the call to startProducing may trigger start_response being called
+before the first write() occurs; and the call to start_response may trigger an
+OOPS being published if:
+ - it contains an exc_info value
+ - it has a status code matching the oops-on-status code values
+ - (in future) the response takes too long to start flowing
+Another route to exceptions is in startProducing itself, which may error, with
+similar consequences in that the oops config must be called into, and it has to
+be compatible with the config for start_response handling. If there is a need
+to address this, oops_twisted could take responsibility for exception handling
+in the IBodyProducer code path, with the cost of needing a second OOPS config
+- a native Twisted one.
"""
=== modified file 'oops_twisted/tests/__init__.py'
--- oops_twisted/tests/__init__.py 2011-10-13 03:02:31 +0000
+++ oops_twisted/tests/__init__.py 2011-11-07 03:12:24 +0000
@@ -24,6 +24,7 @@
'config',
'createhooks',
'log',
+ 'wsgi',
]
return TestLoader().loadTestsFromNames(
['oops_twisted.tests.test_' + name for name in test_mod_names])
=== added file 'oops_twisted/tests/test_wsgi.py'
--- oops_twisted/tests/test_wsgi.py 1970-01-01 00:00:00 +0000
+++ oops_twisted/tests/test_wsgi.py 2011-11-07 03:12:24 +0000
@@ -0,0 +1,297 @@
+# Copyright (c) 2011, Canonical Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for the twisted wsgi body tracker."""
+
+from operator import methodcaller
+
+from oops_wsgi.middleware import generator_tracker
+from testtools import TestCase
+from testtools.deferredruntest import AsynchronousDeferredRunTest
+from testtools.matchers import MatchesException
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred
+from twisted.web.iweb import IBodyProducer
+from zope.interface import implements
+from zope.interface.verify import verifyObject
+
+from oops_twisted.wsgi import body_producer_tracker
+
+
+class TestBodyProducer(TestCase):
+
+ run_tests_with = AsynchronousDeferredRunTest
+
+ def setUp(self):
+ super(TestBodyProducer, self).setUp()
+ self.calls = []
+
+ def on_first_bytes(self):
+ self.calls.append('on_first_bytes')
+
+ def on_finish(self):
+ self.calls.append('on_finish')
+
+ def on_exception_ok(self, exc_info):
+ self.calls.append('on exception %s' % (exc_info[1],))
+ return 'error page'
+
+ def on_exception_fail(self, exc_info):
+ self.calls.append('on exception %s' % (exc_info[1],))
+ raise ValueError('fail')
+
+ def test_does_not_wrap_non_IBodyProducer(self):
+ app_body = ['foo', 'bar']
+ tracker = body_producer_tracker(self.on_first_bytes, self.on_finish,
+ self.on_exception_ok, app_body)
+ self.assertFalse(IBodyProducer.providedBy(tracker))
+ self.assertEqual(['foo', 'bar'], list(tracker))
+
+ def test_wrapper_implements_everything(self):
+ chunks = ['foo', 'bar']
+ app_body = ListProducer(list(chunks))
+ tracker = body_producer_tracker(self.on_first_bytes, self.on_finish,
+ self.on_exception_ok, app_body)
+ self.assertTrue(verifyObject(IBodyProducer, tracker))
+ consumer = Consumer()
+ consumer.registerProducer(tracker, True)
+ d = tracker.startProducing(consumer)
+ d.addCallback(lambda _: self.assertEqual(chunks, consumer.received))
+ return d
+
+ def test_call_order_empty_body(self):
+ # With an empty body the two callbacks are called.
+ chunks = []
+ app_body = ListProducer(list(chunks))
+ tracker = body_producer_tracker(self.on_first_bytes, self.on_finish,
+ self.on_exception_ok, app_body)
+ consumer = Consumer()
+ consumer.registerProducer(tracker, True)
+ d = tracker.startProducing(consumer)
+ d.addCallback(lambda _: self.assertEqual(chunks, consumer.received))
+ expected = ['on_finish']
+ d.addCallback(lambda _: self.assertEqual(expected, self.calls))
+ return d
+
+ def test_call_order_two_items(self):
+ chunks = ['foo', 'bar']
+ app_body = ListProducer(list(chunks))
+ tracker = body_producer_tracker(self.on_first_bytes, self.on_finish,
+ self.on_exception_ok, app_body)
+ consumer = Consumer(self.calls)
+ consumer.registerProducer(tracker, True)
+ d = tracker.startProducing(consumer)
+ d.addCallback(lambda _: self.assertEqual(chunks, consumer.received))
+ # On_first_bytes is called before the bytes are written to ensure
+ # headers get pushed through.
+ expected = ['on_first_bytes', 'foo', 'bar', 'on_finish']
+ d.addCallback(lambda _: self.assertEqual(expected, self.calls))
+ return d
+
+ def test_on_exception_on_error_fails(self):
+ # When an exception is raised from on_error, it indicates the oops code
+ # did not take over the response - it re-raises the current exception.
+ # In that case, we want the app environment to know - the wrapped
+ # deferred needs to fire.
+ chunks = ['foo', 'bar']
+ app_body = ListProducer(list(chunks), error_chunk=2)
+ tracker = body_producer_tracker(self.on_first_bytes, self.on_finish,
+ self.on_exception_fail, app_body)
+ consumer = Consumer(self.calls)
+ consumer.registerProducer(tracker, True)
+ d = tracker.startProducing(consumer)
+ expectedException = MatchesException(Exception("fail"))
+ d.addErrback(lambda failure:self.assertThat(
+ (failure.type, failure.value, ''), expectedException))
+ # The first chunk should have gotten through
+ expected = ['on_first_bytes', 'foo', 'on exception Test Error']
+ d.addCallback(lambda _: self.assertEqual(expected, self.calls))
+ return d
+
+ def test_on_exception_on_error_succeeds_paused(self):
+ # When on_error succeeds the oops code has rendered an error page which
+ # should be written to the consumer. If the consumer is paused, this
+ # has to be buffered.
+ chunks = ['foo', 'bar']
+ app_body = ListProducer(list(chunks), error_chunk=1, delay_write=True)
+ tracker = body_producer_tracker(self.on_first_bytes, self.on_finish,
+ self.on_exception_ok, app_body)
+ consumer = Consumer(self.calls)
+ consumer.registerProducer(tracker, True)
+ d = tracker.startProducing(consumer)
+ # Pause everything.
+ tracker.pauseProducing()
+ # Trigger a failure (as if the app_body had suddenly failed).
+ app_body._finish_error()
+ # We must not have fired d yet - we fire on escaping error or when the
+ # writes are done, and with the producer paused, the error page cannot
+ # have been written yet.
+ self.assertFalse(d.called)
+ reactor.callLater(0, tracker.resumeProducing)
+ expected = ['on exception Test Error', 'error page']
+ d.addCallback(lambda _: self.assertEqual(expected, self.calls))
+ d.addCallback(
+ lambda _: self.assertEqual(len('error page'), tracker.length))
+ return d
+
+ def test_on_exception_on_error_succeeds_notpaused(self):
+ # When on_error succeeds the oops code has rendered an error page which
+ # should be written to the consumer.
+ chunks = ['foo', 'bar']
+ app_body = ListProducer(list(chunks), error_chunk=1)
+ tracker = body_producer_tracker(self.on_first_bytes, self.on_finish,
+ self.on_exception_ok, app_body)
+ consumer = Consumer(self.calls)
+ consumer.registerProducer(tracker, True)
+ d = tracker.startProducing(consumer)
+ expected = ['on exception Test Error', 'error page']
+ d.addCallback(lambda _: self.assertEqual(expected, self.calls))
+ d.addCallback(
+ lambda _: self.assertEqual(len('error page'), tracker.length))
+ return d
+
+ def test_pause_resume_producing_passed_through(self):
+ chunks = ['foo']
+ app_body = ListProducer(list(chunks))
+ tracker = body_producer_tracker(self.on_first_bytes, self.on_finish,
+ self.on_exception_ok, app_body)
+ consumer = Consumer()
+ consumer.registerProducer(tracker, True)
+ d = tracker.startProducing(consumer)
+ self.assertNotEqual(None, app_body.next_write)
+ tracker.pauseProducing()
+ self.assertEqual(None, app_body.next_write)
+ tracker.resumeProducing()
+ self.assertNotEqual(None, app_body.next_write)
+ return d
+
+ def test_stop_producing_passed_through(self):
+ chunks = ['foo']
+ app_body = ListProducer(list(chunks))
+ tracker = body_producer_tracker(self.on_first_bytes, self.on_finish,
+ self.on_exception_ok, app_body)
+ consumer = Consumer()
+ consumer.registerProducer(tracker, True)
+ # d will never fire, we hope. Can't prove a negative though :).
+ tracker.startProducing(consumer)
+ self.assertNotEqual(None, app_body.result)
+ tracker.stopProducing()
+ self.assertEqual(None, app_body.result)
+ self.assertEqual(None, tracker.result)
+
+
+class Consumer:
+ """A simple consumer for testing."""
+
+ def __init__(self, calls=None):
+ self.received = []
+ if calls is None:
+ calls = []
+ self.calls = calls
+
+ def registerProducer(self, producer, streaming):
+ self.producer = producer
+
+ def unregisterProducer(self, producer):
+ self.producer = None
+
+ def write(self, data):
+ self.calls.append(data)
+ self.received.append(data)
+
+
+class ListProducer:
+ """A simple producer for testing.
+
+ Possibly something to put into twisted itself.
+ """
+ implements(IBodyProducer)
+
+ def __init__(self, chunks, length=None, error_chunk=None,
+ delay_write=False):
+ """Create a ListProducer.
+
+ :param chunks: A list of bytestrings to write to the consumer.
+ :param length: If not None, the length to expose. If None it is
+ auto calculated.
+ :param error_chunk: The index of the chunk on which to raise an
+ error. If None, no error will be raised.
+ :param delay_write: If True do not do a write on startProducing.
+ """
+ if length is None:
+ length = sum(map(methodcaller('__len__'), chunks))
+ self.length = length
+ self.chunks = list(chunks)
+ self.chunks.reverse()
+ self.error_chunk = error_chunk
+ self.delay_write = delay_write
+ self.consumer = None
+ self.result = None
+ self.next_write = None
+
+ def startProducing(self, consumer):
+ self.consumer = consumer
+ result = Deferred()
+ self.result = result
+ if not self.delay_write:
+ self._send_chunk()
+ return result
+
+ def _send_chunk(self):
+ self.next_write = None
+ if not self.chunks:
+ self._finish_ok()
+ else:
+ if self.error_chunk is not None:
+ self.error_chunk -= 1
+ if not self.error_chunk:
+ self._finish_error()
+ return
+ self.consumer.write(self.chunks.pop(-1))
+ self.resumeProducing()
+
+ def _finish_ok(self):
+ if self.result:
+ result = self.result
+ self.result = None
+ if result:
+ result.callback(None)
+
+ def _finish_error(self):
+ if not self.result:
+ return
+ result = self.result
+ self.result = None
+ try:
+ raise Exception("Test Error")
+ except Exception:
+ result.errback()
+
+ def stopProducing(self):
+ # The deferred from startProducing must not fire.
+ self.result = None
+ # And we should not send any more data.
+ self.pauseProducing()
+
+ def pauseProducing(self):
+ next_write = self.next_write
+ self.next_write = None
+ if next_write:
+ next_write.cancel()
+
+ def resumeProducing(self):
+ if self.next_write is None:
+ self.next_write = reactor.callLater(0, self._send_chunk)
=== added file 'oops_twisted/wsgi.py'
--- oops_twisted/wsgi.py 1970-01-01 00:00:00 +0000
+++ oops_twisted/wsgi.py 2011-11-07 03:12:24 +0000
@@ -0,0 +1,149 @@
+# Copyright (c) 2011, Canonical Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Extended WSGI support for Twisted."""
+
+from oops_wsgi.middleware import generator_tracker
+from twisted.internet.defer import Deferred
+from twisted.web.iweb import IBodyProducer
+from zope.interface import implements
+
+
+__all__ = [
+ 'body_producer_tracker',
+ ]
+
+def body_producer_tracker(on_first_bytes, on_finish, on_error, app_body):
+ """A wrapper for IBodyProducer that calls the OOPS hooks as needed.
+
+ :seealso: generator_tracker which defines the contract for this factory
+ function.
+
+ :param on_first_bytes: Called as on_first_bytes() when the first bytes from
+ the app body are available but before they are delivered.
+ :param on_finish: Called as on_finish() when the app body is fully
+ consumed.
+ :param on_error: Called as on_error(sys.exc_info()) if a handleable error
+ has occured while consuming the generator. Errors like GeneratorExit
+ are not handleable. The return value from this is written to the
+ consumer.
+ :param app_body: The iterable body for the WSGI app. This may be a simple
+ list or a generator or an IBodyProducer. If it is not an IBodyProducer
+ then oops_wsgi.middleware.generator_tracker will be used.
+ """
+ if not IBodyProducer.providedBy(app_body):
+ return generator_tracker(
+ on_first_bytes, on_finish, on_error, app_body)
+ return ProducerWrapper(on_first_bytes, on_finish, on_error, app_body)
+
+
+class ProducerWrapper:
+ """Wrap an IBodyProducer and call callbacks at key points.
+
+ :seealso: body_producer_tracker - the main user of ProducerWrapper.
+ """
+
+ implements(IBodyProducer)
+
+ def __init__(self, on_first_bytes, on_finish, on_error, app_body):
+ self.on_first_bytes = on_first_bytes
+ self.on_finish = on_finish
+ self.on_error = on_error
+ self.app_body = app_body
+ # deferred returned from startProducing. If None the producer has
+ # finished its work (or never been started).
+ self.result = None
+ # We wrap the consumer as well so that we can track the first write.
+ self.consumer = None
+ # Error page from OOPS
+ self.error_data = None
+
+ @property
+ def length(self):
+ if self.error_data is not None:
+ return len(self.error_data)
+ return self.app_body.length
+
+ def startProducing(self, consumer):
+ self.consumer = consumer
+ self.written = False
+ # Track production state
+ self.paused = False
+ # Error page from OOPS
+ self.error_data = None
+ # The deferred we return. Because we eat errors when an OOPS error page
+ # is generated, we cannot return the underlying producers deferred.
+ result = Deferred()
+ self.result = result
+ # Tell the wrapped producer to write to us.
+ wrapped_result = self.app_body.startProducing(self)
+ # We need a callback at the end to fire on_finish.
+ wrapped_result.addCallback(self.wrapped_finished)
+ # If an exception happens, we want to fire on_error.
+ wrapped_result.addErrback(self.wrapped_failure)
+ return result
+
+ def stopProducing(self):
+ # The deferred from startProducing must not fire.
+ self.result = None
+ self.app_body.stopProducing()
+
+ def pauseProducing(self):
+ self.paused = True
+ self.app_body.pauseProducing()
+
+ def resumeProducing(self):
+ if self.error_data is not None:
+ self.consumer.write(self.error_data)
+ result = self.result
+ self.result = None
+ result.callback(None)
+ else:
+ self.app_body.resumeProducing()
+
+ def write(self, data):
+ if not self.written:
+ self.written = True
+ self.on_first_bytes()
+ self.consumer.write(data)
+
+ def wrapped_finished(self, ignored):
+ result = self.result
+ self.result = None
+ try:
+ self.on_finish()
+ except:
+ result.errback()
+ else:
+ result.callback(None)
+
+ def wrapped_failure(self, failure):
+ try:
+ exc_info = (
+ failure.type, failure.value, failure.getTracebackObject())
+ self.error_data = self.on_error(exc_info)
+ except:
+ result = self.result
+ self.result = None
+ result.errback()
+ else:
+ if not self.paused:
+ self.consumer.write(self.error_data)
+ result = self.result
+ self.result = None
+ result.callback(None)
+ # Received an error from the producer while it was paused, and OOPS
+ # generated an error page. This will be buffered until we are unpaused.
=== modified file 'setup.py'
--- setup.py 2011-10-13 03:02:31 +0000
+++ setup.py 2011-11-07 03:12:24 +0000
@@ -41,6 +41,7 @@
],
install_requires = [
'oops',
+ 'oops_wsgi',
'pytz',
'Twisted',
],
=== modified file 'versions.cfg'
--- versions.cfg 2011-09-27 06:55:43 +0000
+++ versions.cfg 2011-11-07 03:12:24 +0000
@@ -5,6 +5,7 @@
fixtures = 0.3.6
iso8601 = 0.1.4
oops = 0.0.7
+oops_wsgi = 0.0.6
paste = 1.7.2
pytz = 2010o
setuptools = 0.6c11
Follow ups