launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #29263
[Merge] ~cjwatson/launchpad:remove-rabbitmq-sessions into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:remove-rabbitmq-sessions into launchpad:master.
Commit message:
Remove RabbitMQ session infrastructure
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/430846
This was only used by the longpoll mechanism that was removed in commit df2fe07c8766a80affa8467c5491af647f923840.
I started to try to modify this code to support multiple RabbitMQ broker URLs, then realized that would be a lot easier if I removed a big chunk of it first.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:remove-rabbitmq-sessions into launchpad:master.
diff --git a/lib/lp/services/configure.zcml b/lib/lp/services/configure.zcml
index 1461879..77dac94 100644
--- a/lib/lp/services/configure.zcml
+++ b/lib/lp/services/configure.zcml
@@ -21,7 +21,6 @@
<include package=".mail" />
<include package=".memcache" />
<include package=".messages" />
- <include package=".messaging" />
<include package=".oauth" />
<include package=".openid" />
<include package=".profile" />
diff --git a/lib/lp/services/messaging/configure.zcml b/lib/lp/services/messaging/configure.zcml
deleted file mode 100644
index f8d23bd..0000000
--- a/lib/lp/services/messaging/configure.zcml
+++ /dev/null
@@ -1,18 +0,0 @@
-<!-- Copyright 2011 Canonical Ltd. This software is licensed under the
- GNU Affero General Public License version 3 (see the file LICENSE).
--->
-<configure
- xmlns="http://namespaces.zope.org/zope"
- xmlns:browser="http://namespaces.zope.org/browser"
- xmlns:i18n="http://namespaces.zope.org/i18n"
- i18n_domain="launchpad">
- <utility
- provides=".interfaces.IMessageSession"
- component=".rabbit.unreliable_session" />
- <subscriber
- for="lp.services.webapp.interfaces.IFinishReadOnlyRequestEvent"
- handler=".rabbit.session_finish_handler" />
- <subscriber
- for="lp.services.webapp.interfaces.IFinishReadOnlyRequestEvent"
- handler=".rabbit.unreliable_session_finish_handler" />
-</configure>
diff --git a/lib/lp/services/messaging/interfaces.py b/lib/lp/services/messaging/interfaces.py
index c2c5dee..c73efac 100644
--- a/lib/lp/services/messaging/interfaces.py
+++ b/lib/lp/services/messaging/interfaces.py
@@ -4,96 +4,14 @@
"""Messaging interfaces."""
__all__ = [
- "IMessageConsumer",
- "IMessageProducer",
- "IMessageSession",
"MessagingException",
"MessagingUnavailable",
- "QueueEmpty",
- "QueueNotFound",
]
-from zope.interface import Interface
-from zope.schema import Bool
-
-
class MessagingException(Exception):
"""Failure in messaging."""
class MessagingUnavailable(MessagingException):
"""Messaging systems are not available."""
-
-
-class QueueNotFound(MessagingException):
- """Raised if the queue was not found."""
-
-
-class QueueEmpty(MessagingException):
- """Raised if there are no queued messages on a non-blocking read."""
-
-
-class IMessageSession(Interface):
-
- is_connected = Bool(
- "Whether the session is connected to the messaging system."
- )
-
- def connect():
- """Connect to the messaging system.
-
- If the session is already connected this should be a no-op.
- """
-
- def disconnect():
- """Disconnect from the messaging system.
-
- If the session is already disconnected this should be a no-op.
- """
-
- def flush():
- """Run deferred tasks."""
-
- def finish():
- """Flush the session and reset."""
-
- def reset():
- """Reset the session."""
-
- def defer(func, *args, **kwargs):
- """Schedule something to happen when this session is finished."""
-
- def getProducer(name):
- """Get a `IMessageProducer` associated with this session."""
-
- def getConsumer(name):
- """Get a `IMessageConsumer` associated with this session."""
-
-
-class IMessageConsumer(Interface):
- def receive(blocking=True):
- """Receive data from the queue.
-
- :raises EmptyQueue: If non-blocking and the queue is empty.
- """
-
-
-class IMessageProducer(Interface):
- def send(data):
- """Serialize `data` into JSON and send it to the queue on commit."""
-
- def sendNow(data):
- """Serialize `data` into JSON and send it to the queue immediately."""
-
- def associateConsumer(consumer):
- """Make the consumer receive messages from this producer on commit.
-
- :param consumer: An `IMessageConsumer`
- """
-
- def associateConsumerNow(consumer):
- """Make the consumer receive messages from this producer.
-
- :param consumer: An `IMessageConsumer`
- """
diff --git a/lib/lp/services/messaging/rabbit.py b/lib/lp/services/messaging/rabbit.py
index 46b3b18..6c8bd34 100644
--- a/lib/lp/services/messaging/rabbit.py
+++ b/lib/lp/services/messaging/rabbit.py
@@ -6,51 +6,12 @@
__all__ = [
"connect",
"is_configured",
- "session",
- "unreliable_session",
]
-import json
-import sys
-import threading
-import time
-from collections import deque
-from functools import partial
-
import amqp
-import transaction
-from transaction._transaction import Status as TransactionStatus
-from zope.interface import implementer
from lp.services.config import config
-from lp.services.messaging.interfaces import (
- IMessageConsumer,
- IMessageProducer,
- IMessageSession,
- MessagingUnavailable,
- QueueEmpty,
- QueueNotFound,
-)
-
-LAUNCHPAD_EXCHANGE = "launchpad-exchange"
-
-
-@implementer(transaction.interfaces.ISynchronizer)
-class RabbitSessionTransactionSync:
- def __init__(self, session):
- self.session = session
-
- def newTransaction(self, txn):
- pass
-
- def beforeCompletion(self, txn):
- pass
-
- def afterCompletion(self, txn):
- if txn.status == TransactionStatus.COMMITTED:
- self.session.finish()
- else:
- self.session.reset()
+from lp.services.messaging.interfaces import MessagingUnavailable
def is_configured():
@@ -78,217 +39,3 @@ def connect():
)
connection.connect()
return connection
-
-
-@implementer(IMessageSession)
-class RabbitSession(threading.local):
-
- exchange = LAUNCHPAD_EXCHANGE
-
- def __init__(self):
- super().__init__()
- self._connection = None
- self._deferred = deque()
- # Maintain sessions according to transaction boundaries. Keep a strong
- # reference to the sync because the transaction manager does not. We
- # need one per thread (definining it here is enough to ensure that).
- self._sync = RabbitSessionTransactionSync(self)
- transaction.manager.registerSynch(self._sync)
-
- @property
- def is_connected(self):
- """See `IMessageSession`."""
- return self._connection is not None and self._connection.connected
-
- def connect(self):
- """See `IMessageSession`.
-
- Open a connection for this thread if necessary. Connections cannot be
- shared between threads.
- """
- if self._connection is None or not self._connection.connected:
- self._connection = connect()
- return self._connection
-
- def disconnect(self):
- """See `IMessageSession`."""
- if self._connection is not None:
- try:
- self._connection.close()
- except OSError:
- # Socket error is fine; the connection is still closed.
- pass
- finally:
- self._connection = None
-
- def flush(self):
- """See `IMessageSession`."""
- tasks = self._deferred
- while len(tasks) != 0:
- tasks.popleft()()
-
- def finish(self):
- """See `IMessageSession`."""
- try:
- self.flush()
- finally:
- self.reset()
-
- def reset(self):
- """See `IMessageSession`."""
- self._deferred.clear()
- self.disconnect()
-
- def defer(self, func, *args, **kwargs):
- """See `IMessageSession`."""
- self._deferred.append(partial(func, *args, **kwargs))
-
- def getProducer(self, name):
- """See `IMessageSession`."""
- return RabbitRoutingKey(self, name)
-
- def getConsumer(self, name):
- """See `IMessageSession`."""
- return RabbitQueue(self, name)
-
-
-# Per-thread sessions.
-session = RabbitSession()
-session_finish_handler = lambda event: session.finish()
-
-
-class RabbitUnreliableSession(RabbitSession):
- """An "unreliable" `RabbitSession`.
-
- Unreliable in this case means that certain errors in deferred tasks are
- silently suppressed. This means that services can continue to function
- even in the absence of a running and fully functional message queue.
-
- Other types of errors are also caught because we don't want this
- subsystem to destabilise other parts of Launchpad but we nonetheless
- record OOPses for these.
-
- XXX: We only suppress MessagingUnavailable for now because we want to
- monitor this closely before we add more exceptions to the
- suppressed_errors list. Potential candidates are `MessagingException`,
- `IOError` or `amqp.AMQPException`.
- """
-
- suppressed_errors = (MessagingUnavailable,)
-
- def finish(self):
- """See `IMessageSession`.
-
- Suppresses errors listed in `suppressed_errors`. Also suppresses
- other errors but files an oops report for these.
- """
- try:
- super().finish()
- except self.suppressed_errors:
- pass
- except Exception:
- from lp.services.webapp import errorlog
-
- errorlog.globalErrorUtility.raising(sys.exc_info())
-
-
-# Per-thread "unreliable" sessions.
-unreliable_session = RabbitUnreliableSession()
-unreliable_session_finish_handler = lambda event: unreliable_session.finish()
-
-
-class RabbitMessageBase:
- """Base class for all RabbitMQ messaging."""
-
- def __init__(self, session):
- self.session = IMessageSession(session)
- self._channel = None
-
- @property
- def channel(self):
- if self._channel is None or not self._channel.is_open:
- connection = self.session.connect()
- self._channel = connection.channel()
- self._channel.exchange_declare(
- self.session.exchange,
- "direct",
- durable=False,
- auto_delete=False,
- nowait=False,
- )
- return self._channel
-
-
-@implementer(IMessageProducer)
-class RabbitRoutingKey(RabbitMessageBase):
- """A RabbitMQ data origination point."""
-
- def __init__(self, session, routing_key):
- super().__init__(session)
- self.key = routing_key
-
- def associateConsumer(self, consumer):
- """Only receive messages for requested routing key."""
- self.session.defer(self.associateConsumerNow, consumer)
-
- def associateConsumerNow(self, consumer):
- """Only receive messages for requested routing key."""
- # The queue will be auto-deleted 5 minutes after its last use.
- # http://www.rabbitmq.com/extensions.html#queue-leases
- self.channel.queue_declare(
- consumer.name,
- nowait=False,
- auto_delete=False,
- arguments={"x-expires": 300000},
- ) # 5 minutes.
- self.channel.queue_bind(
- queue=consumer.name,
- exchange=self.session.exchange,
- routing_key=self.key,
- nowait=False,
- )
-
- def send(self, data):
- """See `IMessageProducer`."""
- self.session.defer(self.sendNow, data)
-
- def sendNow(self, data):
- """Immediately send a message to the broker."""
- json_data = json.dumps(data)
- msg = amqp.Message(json_data)
- self.channel.basic_publish(
- exchange=self.session.exchange, routing_key=self.key, msg=msg
- )
-
-
-@implementer(IMessageConsumer)
-class RabbitQueue(RabbitMessageBase):
- """A RabbitMQ Queue."""
-
- def __init__(self, session, name):
- super().__init__(session)
- self.name = name
-
- def receive(self, timeout=0.0):
- """Pull a message from the queue.
-
- :param timeout: Wait a maximum of `timeout` seconds before giving up,
- trying at least once.
- :raises QueueEmpty: if the timeout passes.
- """
- endtime = time.time() + timeout
- while True:
- try:
- message = self.channel.basic_get(self.name)
- if message is None:
- if time.time() > endtime:
- raise QueueEmpty()
- time.sleep(0.1)
- else:
- self.channel.basic_ack(message.delivery_tag)
- return json.loads(message.body)
- except amqp.ChannelError as error:
- if error.reply_code == 404:
- raise QueueNotFound()
- else:
- raise
diff --git a/lib/lp/services/messaging/tests/__init__.py b/lib/lp/services/messaging/tests/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/lib/lp/services/messaging/tests/__init__.py
+++ /dev/null
diff --git a/lib/lp/services/messaging/tests/test_rabbit.py b/lib/lp/services/messaging/tests/test_rabbit.py
deleted file mode 100644
index f82a0a1..0000000
--- a/lib/lp/services/messaging/tests/test_rabbit.py
+++ /dev/null
@@ -1,447 +0,0 @@
-# Copyright 2011 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Messaging utility tests."""
-
-from functools import partial
-from itertools import count
-
-import transaction
-from testtools.testcase import ExpectedException
-from transaction._transaction import Status as TransactionStatus
-from zope.component import getUtility
-from zope.event import notify
-
-from lp.services.messaging.interfaces import (
- IMessageConsumer,
- IMessageProducer,
- IMessageSession,
- MessagingUnavailable,
- QueueEmpty,
- QueueNotFound,
-)
-from lp.services.messaging.rabbit import (
- RabbitMessageBase,
- RabbitQueue,
- RabbitRoutingKey,
- RabbitSession,
- RabbitSessionTransactionSync,
- RabbitUnreliableSession,
-)
-from lp.services.messaging.rabbit import session as global_session
-from lp.services.messaging.rabbit import (
- unreliable_session as global_unreliable_session,
-)
-from lp.services.webapp.interfaces import FinishReadOnlyRequestEvent
-from lp.testing import TestCase, monkey_patch
-from lp.testing.fakemethod import FakeMethod
-from lp.testing.faketransaction import FakeTransaction
-from lp.testing.layers import LaunchpadFunctionalLayer, RabbitMQLayer
-from lp.testing.matchers import Provides
-
-# RabbitMQ is not (yet) torn down or reset between tests, so here are sources
-# of distinct names.
-queue_names = ("queue.%d" % num for num in count(1))
-key_names = ("key.%d" % num for num in count(1))
-
-
-class FakeRabbitSession:
- def __init__(self):
- self.log = []
-
- def finish(self):
- self.log.append("finish")
-
- def reset(self):
- self.log.append("reset")
-
-
-class TestRabbitSessionTransactionSync(TestCase):
- def test_interface(self):
- self.assertThat(
- RabbitSessionTransactionSync(None),
- Provides(transaction.interfaces.ISynchronizer),
- )
-
- def test_afterCompletion_COMMITTED(self):
- txn = FakeTransaction()
- txn.status = TransactionStatus.COMMITTED
- fake_session = FakeRabbitSession()
- sync = RabbitSessionTransactionSync(fake_session)
- sync.afterCompletion(txn)
- self.assertEqual(["finish"], fake_session.log)
-
- def test_afterCompletion_ACTIVE(self):
- txn = FakeTransaction()
- txn.status = TransactionStatus.ACTIVE
- fake_session = FakeRabbitSession()
- sync = RabbitSessionTransactionSync(fake_session)
- sync.afterCompletion(txn)
- self.assertEqual(["reset"], fake_session.log)
-
-
-class RabbitTestCase(TestCase):
-
- layer = RabbitMQLayer
-
- def tearDown(self):
- super().tearDown()
- global_session.reset()
- global_unreliable_session.reset()
-
-
-class TestRabbitSession(RabbitTestCase):
-
- session_factory = RabbitSession
-
- def test_interface(self):
- session = self.session_factory()
- self.assertThat(session, Provides(IMessageSession))
-
- def test_connect(self):
- session = self.session_factory()
- self.assertFalse(session.is_connected)
- connection = session.connect()
- self.assertTrue(session.is_connected)
- self.assertIs(connection, session._connection)
-
- def test_connect_with_incomplete_configuration(self):
- self.pushConfig("rabbitmq", host="none")
- session = self.session_factory()
- with ExpectedException(
- MessagingUnavailable, "Incomplete configuration"
- ):
- session.connect()
-
- def test_disconnect(self):
- session = self.session_factory()
- session.connect()
- session.disconnect()
- self.assertFalse(session.is_connected)
-
- def test_disconnect_with_error(self):
- session = self.session_factory()
- session.connect()
- old_close = session._connection.close
-
- def new_close(*args, **kwargs):
- old_close(*args, **kwargs)
- raise OSError
-
- with monkey_patch(session._connection, close=new_close):
- session.disconnect()
- self.assertFalse(session.is_connected)
-
- def test_is_connected(self):
- # is_connected is False once a connection has been closed.
- session = self.session_factory()
- session.connect()
- # Close the connection without using disconnect().
- session._connection.close()
- self.assertFalse(session.is_connected)
-
- def test_defer(self):
- task = lambda foo, bar: None
- session = self.session_factory()
- session.defer(task, "foo", bar="baz")
- self.assertEqual(1, len(session._deferred))
- [deferred_task] = session._deferred
- self.assertIsInstance(deferred_task, partial)
- self.assertIs(task, deferred_task.func)
- self.assertEqual(("foo",), deferred_task.args)
- self.assertEqual({"bar": "baz"}, deferred_task.keywords)
-
- def test_flush(self):
- # RabbitSession.flush() runs deferred tasks.
- log = []
- task = lambda: log.append("task")
- session = self.session_factory()
- session.defer(task)
- session.connect()
- session.flush()
- self.assertEqual(["task"], log)
- self.assertEqual([], list(session._deferred))
- self.assertTrue(session.is_connected)
-
- def test_reset(self):
- # RabbitSession.reset() resets session variables and does not run
- # deferred tasks.
- log = []
- task = lambda: log.append("task")
- session = self.session_factory()
- session.defer(task)
- session.connect()
- session.reset()
- self.assertEqual([], log)
- self.assertEqual([], list(session._deferred))
- self.assertFalse(session.is_connected)
-
- def test_finish(self):
- # RabbitSession.finish() resets session variables after running
- # deferred tasks.
- log = []
- task = lambda: log.append("task")
- session = self.session_factory()
- session.defer(task)
- session.connect()
- session.finish()
- self.assertEqual(["task"], log)
- self.assertEqual([], list(session._deferred))
- self.assertFalse(session.is_connected)
-
- def test_getProducer(self):
- session = self.session_factory()
- producer = session.getProducer("foo")
- self.assertIsInstance(producer, RabbitRoutingKey)
- self.assertIs(session, producer.session)
- self.assertEqual("foo", producer.key)
-
- def test_getConsumer(self):
- session = self.session_factory()
- consumer = session.getConsumer("foo")
- self.assertIsInstance(consumer, RabbitQueue)
- self.assertIs(session, consumer.session)
- self.assertEqual("foo", consumer.name)
-
-
-class TestRabbitUnreliableSession(TestRabbitSession):
-
- session_factory = RabbitUnreliableSession
- layer = RabbitMQLayer
-
- def setUp(self):
- super().setUp()
- self.prev_oops = self.getOops()
-
- def getOops(self):
- try:
- self.oops_capture.sync()
- return self.oopses[-1]
- except IndexError:
- return None
-
- def assertNoOops(self):
- oops_report = self.getOops()
- self.assertEqual(repr(self.prev_oops), repr(oops_report))
-
- def assertOops(self, text_in_oops):
- oops_report = self.getOops()
- self.assertNotEqual(
- repr(self.prev_oops), repr(oops_report), "No OOPS reported!"
- )
- self.assertIn(text_in_oops, str(oops_report))
-
- def _test_finish_suppresses_exception(self, exception):
- # Simple helper to test that the given exception is suppressed
- # when raised by finish().
- session = self.session_factory()
- session.defer(FakeMethod(failure=exception))
- session.finish() # Look, no exceptions!
-
- def test_finish_suppresses_MessagingUnavailable(self):
- self._test_finish_suppresses_exception(
- MessagingUnavailable("Messaging borked.")
- )
- self.assertNoOops()
-
- def test_finish_suppresses_other_errors_with_oopses(self):
- exception = Exception("That hent worked.")
- self._test_finish_suppresses_exception(exception)
- self.assertOops(str(exception))
-
-
-class TestRabbitMessageBase(RabbitTestCase):
- def test_session(self):
- base = RabbitMessageBase(global_session)
- self.assertIs(global_session, base.session)
-
- def test_channel(self):
- # Referencing the channel property causes the session to connect.
- base = RabbitMessageBase(global_session)
- self.assertFalse(base.session.is_connected)
- channel = base.channel
- self.assertTrue(base.session.is_connected)
- self.assertIsNot(None, channel)
- # The same channel is returned every time.
- self.assertIs(channel, base.channel)
-
- def test_channel_session_closed(self):
- # When the session is disconnected the channel is thrown away too.
- base = RabbitMessageBase(global_session)
- channel1 = base.channel
- base.session.disconnect()
- channel2 = base.channel
- self.assertNotEqual(channel1, channel2)
-
-
-class TestRabbitRoutingKey(RabbitTestCase):
- def test_interface(self):
- routing_key = RabbitRoutingKey(global_session, next(key_names))
- self.assertThat(routing_key, Provides(IMessageProducer))
-
- def test_associateConsumer(self):
- # associateConsumer() only associates the consumer at transaction
- # commit time. However, order is preserved.
- consumer = RabbitQueue(global_session, next(queue_names))
- routing_key = RabbitRoutingKey(global_session, next(key_names))
- routing_key.associateConsumer(consumer)
- # The session is still not connected.
- self.assertFalse(global_session.is_connected)
- routing_key.sendNow("now")
- routing_key.send("later")
- # The queue is not found because the consumer has not yet been
- # associated with the routing key and the queue declared.
- self.assertRaises(QueueNotFound, consumer.receive, timeout=2)
- transaction.commit()
- # Now that the transaction has been committed, the consumer is
- # associated, and receives the deferred message.
- self.assertEqual("later", consumer.receive(timeout=2))
-
- def test_associateConsumerNow(self):
- # associateConsumerNow() associates the consumer right away.
- consumer = RabbitQueue(global_session, next(queue_names))
- routing_key = RabbitRoutingKey(global_session, next(key_names))
- routing_key.associateConsumerNow(consumer)
- routing_key.sendNow("now")
- routing_key.send("later")
- # There is already something in the queue.
- self.assertEqual("now", consumer.receive(timeout=2))
- transaction.commit()
- # Now that the transaction has been committed there is another item in
- # the queue.
- self.assertEqual("later", consumer.receive(timeout=2))
-
- def test_send(self):
- consumer = RabbitQueue(global_session, next(queue_names))
- routing_key = RabbitRoutingKey(global_session, next(key_names))
- routing_key.associateConsumerNow(consumer)
-
- for data in range(90, 100):
- routing_key.send(data)
-
- routing_key.sendNow("sync")
- # There is nothing in the queue except the sync we just sent.
- self.assertEqual("sync", consumer.receive(timeout=2))
-
- # Messages get sent on commit
- transaction.commit()
- for data in range(90, 100):
- self.assertEqual(data, consumer.receive())
-
- # There are no more messages. They have all been consumed.
- routing_key.sendNow("sync")
- self.assertEqual("sync", consumer.receive(timeout=2))
-
- def test_sendNow(self):
- consumer = RabbitQueue(global_session, next(queue_names))
- routing_key = RabbitRoutingKey(global_session, next(key_names))
- routing_key.associateConsumerNow(consumer)
-
- for data in range(50, 60):
- routing_key.sendNow(data)
- received_data = consumer.receive(timeout=2)
- self.assertEqual(data, received_data)
-
- def test_does_not_connect_session_immediately(self):
- # RabbitRoutingKey does not connect the session until necessary.
- RabbitRoutingKey(global_session, next(key_names))
- self.assertFalse(global_session.is_connected)
-
-
-class TestRabbitQueue(RabbitTestCase):
- def test_interface(self):
- consumer = RabbitQueue(global_session, next(queue_names))
- self.assertThat(consumer, Provides(IMessageConsumer))
-
- def test_receive(self):
- consumer = RabbitQueue(global_session, next(queue_names))
- routing_key = RabbitRoutingKey(global_session, next(key_names))
- routing_key.associateConsumerNow(consumer)
-
- for data in range(55, 65):
- routing_key.sendNow(data)
- self.assertEqual(data, consumer.receive(timeout=2))
-
- # All the messages received were consumed.
- self.assertRaises(QueueEmpty, consumer.receive, timeout=2)
-
- # New connections to the queue see an empty queue too.
- consumer.session.disconnect()
- consumer = RabbitQueue(global_session, next(queue_names))
- routing_key = RabbitRoutingKey(global_session, next(key_names))
- routing_key.associateConsumerNow(consumer)
- self.assertRaises(QueueEmpty, consumer.receive, timeout=2)
-
- def test_does_not_connect_session_immediately(self):
- # RabbitQueue does not connect the session until necessary.
- RabbitQueue(global_session, next(queue_names))
- self.assertFalse(global_session.is_connected)
-
-
-class TestRabbit(RabbitTestCase):
- """Integration-like tests for the RabbitMQ messaging abstractions."""
-
- def get_synced_sessions(self):
- try:
- syncs_set = transaction.manager.manager._synchs
- except KeyError:
- return set()
- else:
- return {
- sync.session
- for sync in syncs_set.data.values()
- if isinstance(sync, RabbitSessionTransactionSync)
- }
-
- def test_global_session(self):
- self.assertIsInstance(global_session, RabbitSession)
- self.assertIn(global_session, self.get_synced_sessions())
-
- def test_global_unreliable_session(self):
- self.assertIsInstance(
- global_unreliable_session, RabbitUnreliableSession
- )
- self.assertIn(global_unreliable_session, self.get_synced_sessions())
-
- def test_abort(self):
- consumer = RabbitQueue(global_session, next(queue_names))
- routing_key = RabbitRoutingKey(global_session, next(key_names))
- routing_key.associateConsumerNow(consumer)
-
- for data in range(90, 100):
- routing_key.send(data)
-
- # Messages sent using send() are forgotten on abort.
- transaction.abort()
- self.assertRaises(QueueEmpty, consumer.receive, timeout=2)
-
-
-class TestRabbitWithLaunchpad(RabbitTestCase):
- """Integration-like tests for the RabbitMQ messaging abstractions."""
-
- layer = LaunchpadFunctionalLayer
-
- def test_utility(self):
- # The unreliable session is registered as the default IMessageSession
- # utility.
- self.assertIs(global_unreliable_session, getUtility(IMessageSession))
-
- def _test_session_finish_read_only_request(self, session):
- # When a read-only request ends the session is also finished.
- log = []
- task = lambda: log.append("task")
- session.defer(task)
- session.connect()
- notify(FinishReadOnlyRequestEvent(None, None))
- self.assertEqual(["task"], log)
- self.assertEqual([], list(session._deferred))
- self.assertFalse(session.is_connected)
-
- def test_global_session_finish_read_only_request(self):
- # When a read-only request ends the global_session is finished too.
- self._test_session_finish_read_only_request(global_session)
-
- def test_global_unreliable_session_finish_read_only_request(self):
- # When a read-only request ends the global_unreliable_session is
- # finished too.
- self._test_session_finish_read_only_request(global_unreliable_session)
diff --git a/lib/lp/services/webapp/doc/webapp-publication.rst b/lib/lp/services/webapp/doc/webapp-publication.rst
index 0fbbbe6..c478f92 100644
--- a/lib/lp/services/webapp/doc/webapp-publication.rst
+++ b/lib/lp/services/webapp/doc/webapp-publication.rst
@@ -1001,7 +1001,7 @@ afterCall(). For example, this publication subclass will simply print
some string in its finishReadOnlyRequest().
>>> class MyPublication(LaunchpadBrowserPublication):
- ... def finishReadOnlyRequest(self, request, ob, txn):
+ ... def finishReadOnlyRequest(self, txn):
... print("booo!")
...
diff --git a/lib/lp/services/webapp/interfaces.py b/lib/lp/services/webapp/interfaces.py
index aa53686..c6ef41a 100644
--- a/lib/lp/services/webapp/interfaces.py
+++ b/lib/lp/services/webapp/interfaces.py
@@ -818,23 +818,6 @@ class ICheckBoxWidgetLayout(IAlwaysSubmittedWidget):
"""A widget that is displayed like a check box with label to the right."""
-class IFinishReadOnlyRequestEvent(Interface):
- """An event which gets sent when the publication is ended"""
-
- object = Attribute("The object to which this request pertains.")
-
- request = Attribute("The active request.")
-
-
-@implementer(IFinishReadOnlyRequestEvent)
-class FinishReadOnlyRequestEvent:
- """An event which gets sent when the publication is ended"""
-
- def __init__(self, ob, request):
- self.object = ob
- self.request = request
-
-
class StormRangeFactoryError(Exception):
"""Raised when a Storm result set cannot be used for slicing by a
StormRangeFactory.
diff --git a/lib/lp/services/webapp/publication.py b/lib/lp/services/webapp/publication.py
index bbe8b83..1557a46 100644
--- a/lib/lp/services/webapp/publication.py
+++ b/lib/lp/services/webapp/publication.py
@@ -54,7 +54,6 @@ from lp.services.features.flags import NullFeatureController
from lp.services.oauth.interfaces import IOAuthSignedRequest
from lp.services.statsd.interfaces.statsd_client import IStatsdClient
from lp.services.webapp.interfaces import (
- FinishReadOnlyRequestEvent,
ILaunchpadRoot,
IOpenLaunchBag,
IPlacelessAuthUtility,
@@ -506,7 +505,7 @@ class LaunchpadBrowserPublication(
# Abort the transaction on a read-only request.
# NOTHING AFTER THIS SHOULD CAUSE A RETRY.
if request.method in ["GET", "HEAD"]:
- self.finishReadOnlyRequest(request, ob, txn)
+ self.finishReadOnlyRequest(txn)
elif txn.isDoomed():
# The following sends an abort to the database, even though the
# transaction is still doomed.
@@ -528,13 +527,12 @@ class LaunchpadBrowserPublication(
# calling beforeTraversal or doing proper cleanup.
pass
- def finishReadOnlyRequest(self, request, ob, txn):
+ def finishReadOnlyRequest(self, txn):
"""Hook called at the end of a read-only request.
By default it abort()s the transaction, but subclasses may need to
commit it instead, so they must overwrite this.
"""
- notify(FinishReadOnlyRequestEvent(ob, request))
txn.abort()
def callTraversalHooks(self, request, ob):
diff --git a/lib/lp/services/webapp/tests/test_servers.py b/lib/lp/services/webapp/tests/test_servers.py
index e0fa7e6..14820e0 100644
--- a/lib/lp/services/webapp/tests/test_servers.py
+++ b/lib/lp/services/webapp/tests/test_servers.py
@@ -32,8 +32,6 @@ from lp.services.auth.enums import AccessTokenScope
from lp.services.identity.interfaces.account import AccountStatus
from lp.services.oauth.interfaces import TokenException
from lp.services.webapp.interaction import get_interaction_extras
-from lp.services.webapp.interfaces import IFinishReadOnlyRequestEvent
-from lp.services.webapp.publication import LaunchpadBrowserPublication
from lp.services.webapp.servers import (
ApplicationServerSettingRequestFactory,
FeedsBrowserRequest,
@@ -49,7 +47,7 @@ from lp.services.webapp.servers import (
WebServiceTestRequest,
web_service_request_to_browser_request,
)
-from lp.testing import EventRecorder, TestCase, TestCaseWithFactory, logout
+from lp.testing import TestCase, TestCaseWithFactory, logout
from lp.testing.layers import DatabaseFunctionalLayer, FunctionalLayer
from lp.testing.publication import get_request_and_publication
@@ -782,55 +780,6 @@ class LoggingTransaction:
self.log.append("ABORT")
-class TestFinishReadOnlyRequest(TestCase):
- # Publications that have a finishReadOnlyRequest() method are obliged to
- # fire an IFinishReadOnlyRequestEvent.
-
- def _test_publication(self, publication, expected_transaction_log):
- # publication.finishReadOnlyRequest() issues an
- # IFinishReadOnlyRequestEvent and alters the transaction.
- fake_request = object()
- fake_object = object()
- fake_transaction = LoggingTransaction()
-
- with EventRecorder() as event_recorder:
- publication.finishReadOnlyRequest(
- fake_request, fake_object, fake_transaction
- )
-
- self.assertEqual(expected_transaction_log, fake_transaction.log)
-
- finish_events = [
- event
- for event in event_recorder.events
- if IFinishReadOnlyRequestEvent.providedBy(event)
- ]
- self.assertEqual(
- 1,
- len(finish_events),
- (
- "Expected only one IFinishReadOnlyRequestEvent, but "
- "got: %r" % finish_events
- ),
- )
-
- [finish_event] = finish_events
- self.assertIs(fake_request, finish_event.request)
- self.assertIs(fake_object, finish_event.object)
-
- def test_WebServicePub_fires_FinishReadOnlyRequestEvent(self):
- # WebServicePublication.finishReadOnlyRequest() issues an
- # IFinishReadOnlyRequestEvent and aborts the transaction.
- publication = WebServicePublication(None)
- self._test_publication(publication, ["ABORT"])
-
- def test_LaunchpadBrowserPub_fires_FinishReadOnlyRequestEvent(self):
- # LaunchpadBrowserPublication.finishReadOnlyRequest() issues an
- # IFinishReadOnlyRequestEvent and aborts the transaction.
- publication = LaunchpadBrowserPublication(None)
- self._test_publication(publication, ["ABORT"])
-
-
class TestWebServiceAccessTokens(TestCaseWithFactory):
"""Test personal access tokens for the webservice.