← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:remove-rabbitmq-sessions into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:remove-rabbitmq-sessions into launchpad:master.

Commit message:
Remove RabbitMQ session infrastructure

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/430846

This was only used by the longpoll mechanism that was removed in commit df2fe07c8766a80affa8467c5491af647f923840.

I started to try to modify this code to support multiple RabbitMQ broker URLs, then realized that would be a lot easier if I removed a big chunk of it first.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:remove-rabbitmq-sessions into launchpad:master.
diff --git a/lib/lp/services/configure.zcml b/lib/lp/services/configure.zcml
index 1461879..77dac94 100644
--- a/lib/lp/services/configure.zcml
+++ b/lib/lp/services/configure.zcml
@@ -21,7 +21,6 @@
   <include package=".mail" />
   <include package=".memcache" />
   <include package=".messages" />
-  <include package=".messaging" />
   <include package=".oauth" />
   <include package=".openid" />
   <include package=".profile" />
diff --git a/lib/lp/services/messaging/configure.zcml b/lib/lp/services/messaging/configure.zcml
deleted file mode 100644
index f8d23bd..0000000
--- a/lib/lp/services/messaging/configure.zcml
+++ /dev/null
@@ -1,18 +0,0 @@
-<!-- Copyright 2011 Canonical Ltd.  This software is licensed under the
-     GNU Affero General Public License version 3 (see the file LICENSE).
--->
-<configure
-    xmlns="http://namespaces.zope.org/zope";
-    xmlns:browser="http://namespaces.zope.org/browser";
-    xmlns:i18n="http://namespaces.zope.org/i18n";
-    i18n_domain="launchpad">
-  <utility
-      provides=".interfaces.IMessageSession"
-      component=".rabbit.unreliable_session" />
-  <subscriber
-      for="lp.services.webapp.interfaces.IFinishReadOnlyRequestEvent"
-      handler=".rabbit.session_finish_handler" />
-  <subscriber
-      for="lp.services.webapp.interfaces.IFinishReadOnlyRequestEvent"
-      handler=".rabbit.unreliable_session_finish_handler" />
-</configure>
diff --git a/lib/lp/services/messaging/interfaces.py b/lib/lp/services/messaging/interfaces.py
index c2c5dee..c73efac 100644
--- a/lib/lp/services/messaging/interfaces.py
+++ b/lib/lp/services/messaging/interfaces.py
@@ -4,96 +4,14 @@
 """Messaging interfaces."""
 
 __all__ = [
-    "IMessageConsumer",
-    "IMessageProducer",
-    "IMessageSession",
     "MessagingException",
     "MessagingUnavailable",
-    "QueueEmpty",
-    "QueueNotFound",
 ]
 
 
-from zope.interface import Interface
-from zope.schema import Bool
-
-
 class MessagingException(Exception):
     """Failure in messaging."""
 
 
 class MessagingUnavailable(MessagingException):
     """Messaging systems are not available."""
-
-
-class QueueNotFound(MessagingException):
-    """Raised if the queue was not found."""
-
-
-class QueueEmpty(MessagingException):
-    """Raised if there are no queued messages on a non-blocking read."""
-
-
-class IMessageSession(Interface):
-
-    is_connected = Bool(
-        "Whether the session is connected to the messaging system."
-    )
-
-    def connect():
-        """Connect to the messaging system.
-
-        If the session is already connected this should be a no-op.
-        """
-
-    def disconnect():
-        """Disconnect from the messaging system.
-
-        If the session is already disconnected this should be a no-op.
-        """
-
-    def flush():
-        """Run deferred tasks."""
-
-    def finish():
-        """Flush the session and reset."""
-
-    def reset():
-        """Reset the session."""
-
-    def defer(func, *args, **kwargs):
-        """Schedule something to happen when this session is finished."""
-
-    def getProducer(name):
-        """Get a `IMessageProducer` associated with this session."""
-
-    def getConsumer(name):
-        """Get a `IMessageConsumer` associated with this session."""
-
-
-class IMessageConsumer(Interface):
-    def receive(blocking=True):
-        """Receive data from the queue.
-
-        :raises EmptyQueue: If non-blocking and the queue is empty.
-        """
-
-
-class IMessageProducer(Interface):
-    def send(data):
-        """Serialize `data` into JSON and send it to the queue on commit."""
-
-    def sendNow(data):
-        """Serialize `data` into JSON and send it to the queue immediately."""
-
-    def associateConsumer(consumer):
-        """Make the consumer receive messages from this producer on commit.
-
-        :param consumer: An `IMessageConsumer`
-        """
-
-    def associateConsumerNow(consumer):
-        """Make the consumer receive messages from this producer.
-
-        :param consumer: An `IMessageConsumer`
-        """
diff --git a/lib/lp/services/messaging/rabbit.py b/lib/lp/services/messaging/rabbit.py
index 46b3b18..6c8bd34 100644
--- a/lib/lp/services/messaging/rabbit.py
+++ b/lib/lp/services/messaging/rabbit.py
@@ -6,51 +6,12 @@
 __all__ = [
     "connect",
     "is_configured",
-    "session",
-    "unreliable_session",
 ]
 
-import json
-import sys
-import threading
-import time
-from collections import deque
-from functools import partial
-
 import amqp
-import transaction
-from transaction._transaction import Status as TransactionStatus
-from zope.interface import implementer
 
 from lp.services.config import config
-from lp.services.messaging.interfaces import (
-    IMessageConsumer,
-    IMessageProducer,
-    IMessageSession,
-    MessagingUnavailable,
-    QueueEmpty,
-    QueueNotFound,
-)
-
-LAUNCHPAD_EXCHANGE = "launchpad-exchange"
-
-
-@implementer(transaction.interfaces.ISynchronizer)
-class RabbitSessionTransactionSync:
-    def __init__(self, session):
-        self.session = session
-
-    def newTransaction(self, txn):
-        pass
-
-    def beforeCompletion(self, txn):
-        pass
-
-    def afterCompletion(self, txn):
-        if txn.status == TransactionStatus.COMMITTED:
-            self.session.finish()
-        else:
-            self.session.reset()
+from lp.services.messaging.interfaces import MessagingUnavailable
 
 
 def is_configured():
@@ -78,217 +39,3 @@ def connect():
     )
     connection.connect()
     return connection
-
-
-@implementer(IMessageSession)
-class RabbitSession(threading.local):
-
-    exchange = LAUNCHPAD_EXCHANGE
-
-    def __init__(self):
-        super().__init__()
-        self._connection = None
-        self._deferred = deque()
-        # Maintain sessions according to transaction boundaries. Keep a strong
-        # reference to the sync because the transaction manager does not. We
-        # need one per thread (definining it here is enough to ensure that).
-        self._sync = RabbitSessionTransactionSync(self)
-        transaction.manager.registerSynch(self._sync)
-
-    @property
-    def is_connected(self):
-        """See `IMessageSession`."""
-        return self._connection is not None and self._connection.connected
-
-    def connect(self):
-        """See `IMessageSession`.
-
-        Open a connection for this thread if necessary. Connections cannot be
-        shared between threads.
-        """
-        if self._connection is None or not self._connection.connected:
-            self._connection = connect()
-        return self._connection
-
-    def disconnect(self):
-        """See `IMessageSession`."""
-        if self._connection is not None:
-            try:
-                self._connection.close()
-            except OSError:
-                # Socket error is fine; the connection is still closed.
-                pass
-            finally:
-                self._connection = None
-
-    def flush(self):
-        """See `IMessageSession`."""
-        tasks = self._deferred
-        while len(tasks) != 0:
-            tasks.popleft()()
-
-    def finish(self):
-        """See `IMessageSession`."""
-        try:
-            self.flush()
-        finally:
-            self.reset()
-
-    def reset(self):
-        """See `IMessageSession`."""
-        self._deferred.clear()
-        self.disconnect()
-
-    def defer(self, func, *args, **kwargs):
-        """See `IMessageSession`."""
-        self._deferred.append(partial(func, *args, **kwargs))
-
-    def getProducer(self, name):
-        """See `IMessageSession`."""
-        return RabbitRoutingKey(self, name)
-
-    def getConsumer(self, name):
-        """See `IMessageSession`."""
-        return RabbitQueue(self, name)
-
-
-# Per-thread sessions.
-session = RabbitSession()
-session_finish_handler = lambda event: session.finish()
-
-
-class RabbitUnreliableSession(RabbitSession):
-    """An "unreliable" `RabbitSession`.
-
-    Unreliable in this case means that certain errors in deferred tasks are
-    silently suppressed. This means that services can continue to function
-    even in the absence of a running and fully functional message queue.
-
-    Other types of errors are also caught because we don't want this
-    subsystem to destabilise other parts of Launchpad but we nonetheless
-    record OOPses for these.
-
-    XXX: We only suppress MessagingUnavailable for now because we want to
-    monitor this closely before we add more exceptions to the
-    suppressed_errors list. Potential candidates are `MessagingException`,
-    `IOError` or `amqp.AMQPException`.
-    """
-
-    suppressed_errors = (MessagingUnavailable,)
-
-    def finish(self):
-        """See `IMessageSession`.
-
-        Suppresses errors listed in `suppressed_errors`. Also suppresses
-        other errors but files an oops report for these.
-        """
-        try:
-            super().finish()
-        except self.suppressed_errors:
-            pass
-        except Exception:
-            from lp.services.webapp import errorlog
-
-            errorlog.globalErrorUtility.raising(sys.exc_info())
-
-
-# Per-thread "unreliable" sessions.
-unreliable_session = RabbitUnreliableSession()
-unreliable_session_finish_handler = lambda event: unreliable_session.finish()
-
-
-class RabbitMessageBase:
-    """Base class for all RabbitMQ messaging."""
-
-    def __init__(self, session):
-        self.session = IMessageSession(session)
-        self._channel = None
-
-    @property
-    def channel(self):
-        if self._channel is None or not self._channel.is_open:
-            connection = self.session.connect()
-            self._channel = connection.channel()
-            self._channel.exchange_declare(
-                self.session.exchange,
-                "direct",
-                durable=False,
-                auto_delete=False,
-                nowait=False,
-            )
-        return self._channel
-
-
-@implementer(IMessageProducer)
-class RabbitRoutingKey(RabbitMessageBase):
-    """A RabbitMQ data origination point."""
-
-    def __init__(self, session, routing_key):
-        super().__init__(session)
-        self.key = routing_key
-
-    def associateConsumer(self, consumer):
-        """Only receive messages for requested routing key."""
-        self.session.defer(self.associateConsumerNow, consumer)
-
-    def associateConsumerNow(self, consumer):
-        """Only receive messages for requested routing key."""
-        # The queue will be auto-deleted 5 minutes after its last use.
-        # http://www.rabbitmq.com/extensions.html#queue-leases
-        self.channel.queue_declare(
-            consumer.name,
-            nowait=False,
-            auto_delete=False,
-            arguments={"x-expires": 300000},
-        )  # 5 minutes.
-        self.channel.queue_bind(
-            queue=consumer.name,
-            exchange=self.session.exchange,
-            routing_key=self.key,
-            nowait=False,
-        )
-
-    def send(self, data):
-        """See `IMessageProducer`."""
-        self.session.defer(self.sendNow, data)
-
-    def sendNow(self, data):
-        """Immediately send a message to the broker."""
-        json_data = json.dumps(data)
-        msg = amqp.Message(json_data)
-        self.channel.basic_publish(
-            exchange=self.session.exchange, routing_key=self.key, msg=msg
-        )
-
-
-@implementer(IMessageConsumer)
-class RabbitQueue(RabbitMessageBase):
-    """A RabbitMQ Queue."""
-
-    def __init__(self, session, name):
-        super().__init__(session)
-        self.name = name
-
-    def receive(self, timeout=0.0):
-        """Pull a message from the queue.
-
-        :param timeout: Wait a maximum of `timeout` seconds before giving up,
-            trying at least once.
-        :raises QueueEmpty: if the timeout passes.
-        """
-        endtime = time.time() + timeout
-        while True:
-            try:
-                message = self.channel.basic_get(self.name)
-                if message is None:
-                    if time.time() > endtime:
-                        raise QueueEmpty()
-                    time.sleep(0.1)
-                else:
-                    self.channel.basic_ack(message.delivery_tag)
-                    return json.loads(message.body)
-            except amqp.ChannelError as error:
-                if error.reply_code == 404:
-                    raise QueueNotFound()
-                else:
-                    raise
diff --git a/lib/lp/services/messaging/tests/__init__.py b/lib/lp/services/messaging/tests/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/lib/lp/services/messaging/tests/__init__.py
+++ /dev/null
diff --git a/lib/lp/services/messaging/tests/test_rabbit.py b/lib/lp/services/messaging/tests/test_rabbit.py
deleted file mode 100644
index f82a0a1..0000000
--- a/lib/lp/services/messaging/tests/test_rabbit.py
+++ /dev/null
@@ -1,447 +0,0 @@
-# Copyright 2011 Canonical Ltd.  This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Messaging utility tests."""
-
-from functools import partial
-from itertools import count
-
-import transaction
-from testtools.testcase import ExpectedException
-from transaction._transaction import Status as TransactionStatus
-from zope.component import getUtility
-from zope.event import notify
-
-from lp.services.messaging.interfaces import (
-    IMessageConsumer,
-    IMessageProducer,
-    IMessageSession,
-    MessagingUnavailable,
-    QueueEmpty,
-    QueueNotFound,
-)
-from lp.services.messaging.rabbit import (
-    RabbitMessageBase,
-    RabbitQueue,
-    RabbitRoutingKey,
-    RabbitSession,
-    RabbitSessionTransactionSync,
-    RabbitUnreliableSession,
-)
-from lp.services.messaging.rabbit import session as global_session
-from lp.services.messaging.rabbit import (
-    unreliable_session as global_unreliable_session,
-)
-from lp.services.webapp.interfaces import FinishReadOnlyRequestEvent
-from lp.testing import TestCase, monkey_patch
-from lp.testing.fakemethod import FakeMethod
-from lp.testing.faketransaction import FakeTransaction
-from lp.testing.layers import LaunchpadFunctionalLayer, RabbitMQLayer
-from lp.testing.matchers import Provides
-
-# RabbitMQ is not (yet) torn down or reset between tests, so here are sources
-# of distinct names.
-queue_names = ("queue.%d" % num for num in count(1))
-key_names = ("key.%d" % num for num in count(1))
-
-
-class FakeRabbitSession:
-    def __init__(self):
-        self.log = []
-
-    def finish(self):
-        self.log.append("finish")
-
-    def reset(self):
-        self.log.append("reset")
-
-
-class TestRabbitSessionTransactionSync(TestCase):
-    def test_interface(self):
-        self.assertThat(
-            RabbitSessionTransactionSync(None),
-            Provides(transaction.interfaces.ISynchronizer),
-        )
-
-    def test_afterCompletion_COMMITTED(self):
-        txn = FakeTransaction()
-        txn.status = TransactionStatus.COMMITTED
-        fake_session = FakeRabbitSession()
-        sync = RabbitSessionTransactionSync(fake_session)
-        sync.afterCompletion(txn)
-        self.assertEqual(["finish"], fake_session.log)
-
-    def test_afterCompletion_ACTIVE(self):
-        txn = FakeTransaction()
-        txn.status = TransactionStatus.ACTIVE
-        fake_session = FakeRabbitSession()
-        sync = RabbitSessionTransactionSync(fake_session)
-        sync.afterCompletion(txn)
-        self.assertEqual(["reset"], fake_session.log)
-
-
-class RabbitTestCase(TestCase):
-
-    layer = RabbitMQLayer
-
-    def tearDown(self):
-        super().tearDown()
-        global_session.reset()
-        global_unreliable_session.reset()
-
-
-class TestRabbitSession(RabbitTestCase):
-
-    session_factory = RabbitSession
-
-    def test_interface(self):
-        session = self.session_factory()
-        self.assertThat(session, Provides(IMessageSession))
-
-    def test_connect(self):
-        session = self.session_factory()
-        self.assertFalse(session.is_connected)
-        connection = session.connect()
-        self.assertTrue(session.is_connected)
-        self.assertIs(connection, session._connection)
-
-    def test_connect_with_incomplete_configuration(self):
-        self.pushConfig("rabbitmq", host="none")
-        session = self.session_factory()
-        with ExpectedException(
-            MessagingUnavailable, "Incomplete configuration"
-        ):
-            session.connect()
-
-    def test_disconnect(self):
-        session = self.session_factory()
-        session.connect()
-        session.disconnect()
-        self.assertFalse(session.is_connected)
-
-    def test_disconnect_with_error(self):
-        session = self.session_factory()
-        session.connect()
-        old_close = session._connection.close
-
-        def new_close(*args, **kwargs):
-            old_close(*args, **kwargs)
-            raise OSError
-
-        with monkey_patch(session._connection, close=new_close):
-            session.disconnect()
-            self.assertFalse(session.is_connected)
-
-    def test_is_connected(self):
-        # is_connected is False once a connection has been closed.
-        session = self.session_factory()
-        session.connect()
-        # Close the connection without using disconnect().
-        session._connection.close()
-        self.assertFalse(session.is_connected)
-
-    def test_defer(self):
-        task = lambda foo, bar: None
-        session = self.session_factory()
-        session.defer(task, "foo", bar="baz")
-        self.assertEqual(1, len(session._deferred))
-        [deferred_task] = session._deferred
-        self.assertIsInstance(deferred_task, partial)
-        self.assertIs(task, deferred_task.func)
-        self.assertEqual(("foo",), deferred_task.args)
-        self.assertEqual({"bar": "baz"}, deferred_task.keywords)
-
-    def test_flush(self):
-        # RabbitSession.flush() runs deferred tasks.
-        log = []
-        task = lambda: log.append("task")
-        session = self.session_factory()
-        session.defer(task)
-        session.connect()
-        session.flush()
-        self.assertEqual(["task"], log)
-        self.assertEqual([], list(session._deferred))
-        self.assertTrue(session.is_connected)
-
-    def test_reset(self):
-        # RabbitSession.reset() resets session variables and does not run
-        # deferred tasks.
-        log = []
-        task = lambda: log.append("task")
-        session = self.session_factory()
-        session.defer(task)
-        session.connect()
-        session.reset()
-        self.assertEqual([], log)
-        self.assertEqual([], list(session._deferred))
-        self.assertFalse(session.is_connected)
-
-    def test_finish(self):
-        # RabbitSession.finish() resets session variables after running
-        # deferred tasks.
-        log = []
-        task = lambda: log.append("task")
-        session = self.session_factory()
-        session.defer(task)
-        session.connect()
-        session.finish()
-        self.assertEqual(["task"], log)
-        self.assertEqual([], list(session._deferred))
-        self.assertFalse(session.is_connected)
-
-    def test_getProducer(self):
-        session = self.session_factory()
-        producer = session.getProducer("foo")
-        self.assertIsInstance(producer, RabbitRoutingKey)
-        self.assertIs(session, producer.session)
-        self.assertEqual("foo", producer.key)
-
-    def test_getConsumer(self):
-        session = self.session_factory()
-        consumer = session.getConsumer("foo")
-        self.assertIsInstance(consumer, RabbitQueue)
-        self.assertIs(session, consumer.session)
-        self.assertEqual("foo", consumer.name)
-
-
-class TestRabbitUnreliableSession(TestRabbitSession):
-
-    session_factory = RabbitUnreliableSession
-    layer = RabbitMQLayer
-
-    def setUp(self):
-        super().setUp()
-        self.prev_oops = self.getOops()
-
-    def getOops(self):
-        try:
-            self.oops_capture.sync()
-            return self.oopses[-1]
-        except IndexError:
-            return None
-
-    def assertNoOops(self):
-        oops_report = self.getOops()
-        self.assertEqual(repr(self.prev_oops), repr(oops_report))
-
-    def assertOops(self, text_in_oops):
-        oops_report = self.getOops()
-        self.assertNotEqual(
-            repr(self.prev_oops), repr(oops_report), "No OOPS reported!"
-        )
-        self.assertIn(text_in_oops, str(oops_report))
-
-    def _test_finish_suppresses_exception(self, exception):
-        # Simple helper to test that the given exception is suppressed
-        # when raised by finish().
-        session = self.session_factory()
-        session.defer(FakeMethod(failure=exception))
-        session.finish()  # Look, no exceptions!
-
-    def test_finish_suppresses_MessagingUnavailable(self):
-        self._test_finish_suppresses_exception(
-            MessagingUnavailable("Messaging borked.")
-        )
-        self.assertNoOops()
-
-    def test_finish_suppresses_other_errors_with_oopses(self):
-        exception = Exception("That hent worked.")
-        self._test_finish_suppresses_exception(exception)
-        self.assertOops(str(exception))
-
-
-class TestRabbitMessageBase(RabbitTestCase):
-    def test_session(self):
-        base = RabbitMessageBase(global_session)
-        self.assertIs(global_session, base.session)
-
-    def test_channel(self):
-        # Referencing the channel property causes the session to connect.
-        base = RabbitMessageBase(global_session)
-        self.assertFalse(base.session.is_connected)
-        channel = base.channel
-        self.assertTrue(base.session.is_connected)
-        self.assertIsNot(None, channel)
-        # The same channel is returned every time.
-        self.assertIs(channel, base.channel)
-
-    def test_channel_session_closed(self):
-        # When the session is disconnected the channel is thrown away too.
-        base = RabbitMessageBase(global_session)
-        channel1 = base.channel
-        base.session.disconnect()
-        channel2 = base.channel
-        self.assertNotEqual(channel1, channel2)
-
-
-class TestRabbitRoutingKey(RabbitTestCase):
-    def test_interface(self):
-        routing_key = RabbitRoutingKey(global_session, next(key_names))
-        self.assertThat(routing_key, Provides(IMessageProducer))
-
-    def test_associateConsumer(self):
-        # associateConsumer() only associates the consumer at transaction
-        # commit time. However, order is preserved.
-        consumer = RabbitQueue(global_session, next(queue_names))
-        routing_key = RabbitRoutingKey(global_session, next(key_names))
-        routing_key.associateConsumer(consumer)
-        # The session is still not connected.
-        self.assertFalse(global_session.is_connected)
-        routing_key.sendNow("now")
-        routing_key.send("later")
-        # The queue is not found because the consumer has not yet been
-        # associated with the routing key and the queue declared.
-        self.assertRaises(QueueNotFound, consumer.receive, timeout=2)
-        transaction.commit()
-        # Now that the transaction has been committed, the consumer is
-        # associated, and receives the deferred message.
-        self.assertEqual("later", consumer.receive(timeout=2))
-
-    def test_associateConsumerNow(self):
-        # associateConsumerNow() associates the consumer right away.
-        consumer = RabbitQueue(global_session, next(queue_names))
-        routing_key = RabbitRoutingKey(global_session, next(key_names))
-        routing_key.associateConsumerNow(consumer)
-        routing_key.sendNow("now")
-        routing_key.send("later")
-        # There is already something in the queue.
-        self.assertEqual("now", consumer.receive(timeout=2))
-        transaction.commit()
-        # Now that the transaction has been committed there is another item in
-        # the queue.
-        self.assertEqual("later", consumer.receive(timeout=2))
-
-    def test_send(self):
-        consumer = RabbitQueue(global_session, next(queue_names))
-        routing_key = RabbitRoutingKey(global_session, next(key_names))
-        routing_key.associateConsumerNow(consumer)
-
-        for data in range(90, 100):
-            routing_key.send(data)
-
-        routing_key.sendNow("sync")
-        # There is nothing in the queue except the sync we just sent.
-        self.assertEqual("sync", consumer.receive(timeout=2))
-
-        # Messages get sent on commit
-        transaction.commit()
-        for data in range(90, 100):
-            self.assertEqual(data, consumer.receive())
-
-        # There are no more messages. They have all been consumed.
-        routing_key.sendNow("sync")
-        self.assertEqual("sync", consumer.receive(timeout=2))
-
-    def test_sendNow(self):
-        consumer = RabbitQueue(global_session, next(queue_names))
-        routing_key = RabbitRoutingKey(global_session, next(key_names))
-        routing_key.associateConsumerNow(consumer)
-
-        for data in range(50, 60):
-            routing_key.sendNow(data)
-            received_data = consumer.receive(timeout=2)
-            self.assertEqual(data, received_data)
-
-    def test_does_not_connect_session_immediately(self):
-        # RabbitRoutingKey does not connect the session until necessary.
-        RabbitRoutingKey(global_session, next(key_names))
-        self.assertFalse(global_session.is_connected)
-
-
-class TestRabbitQueue(RabbitTestCase):
-    def test_interface(self):
-        consumer = RabbitQueue(global_session, next(queue_names))
-        self.assertThat(consumer, Provides(IMessageConsumer))
-
-    def test_receive(self):
-        consumer = RabbitQueue(global_session, next(queue_names))
-        routing_key = RabbitRoutingKey(global_session, next(key_names))
-        routing_key.associateConsumerNow(consumer)
-
-        for data in range(55, 65):
-            routing_key.sendNow(data)
-            self.assertEqual(data, consumer.receive(timeout=2))
-
-        # All the messages received were consumed.
-        self.assertRaises(QueueEmpty, consumer.receive, timeout=2)
-
-        # New connections to the queue see an empty queue too.
-        consumer.session.disconnect()
-        consumer = RabbitQueue(global_session, next(queue_names))
-        routing_key = RabbitRoutingKey(global_session, next(key_names))
-        routing_key.associateConsumerNow(consumer)
-        self.assertRaises(QueueEmpty, consumer.receive, timeout=2)
-
-    def test_does_not_connect_session_immediately(self):
-        # RabbitQueue does not connect the session until necessary.
-        RabbitQueue(global_session, next(queue_names))
-        self.assertFalse(global_session.is_connected)
-
-
-class TestRabbit(RabbitTestCase):
-    """Integration-like tests for the RabbitMQ messaging abstractions."""
-
-    def get_synced_sessions(self):
-        try:
-            syncs_set = transaction.manager.manager._synchs
-        except KeyError:
-            return set()
-        else:
-            return {
-                sync.session
-                for sync in syncs_set.data.values()
-                if isinstance(sync, RabbitSessionTransactionSync)
-            }
-
-    def test_global_session(self):
-        self.assertIsInstance(global_session, RabbitSession)
-        self.assertIn(global_session, self.get_synced_sessions())
-
-    def test_global_unreliable_session(self):
-        self.assertIsInstance(
-            global_unreliable_session, RabbitUnreliableSession
-        )
-        self.assertIn(global_unreliable_session, self.get_synced_sessions())
-
-    def test_abort(self):
-        consumer = RabbitQueue(global_session, next(queue_names))
-        routing_key = RabbitRoutingKey(global_session, next(key_names))
-        routing_key.associateConsumerNow(consumer)
-
-        for data in range(90, 100):
-            routing_key.send(data)
-
-        # Messages sent using send() are forgotten on abort.
-        transaction.abort()
-        self.assertRaises(QueueEmpty, consumer.receive, timeout=2)
-
-
-class TestRabbitWithLaunchpad(RabbitTestCase):
-    """Integration-like tests for the RabbitMQ messaging abstractions."""
-
-    layer = LaunchpadFunctionalLayer
-
-    def test_utility(self):
-        # The unreliable session is registered as the default IMessageSession
-        # utility.
-        self.assertIs(global_unreliable_session, getUtility(IMessageSession))
-
-    def _test_session_finish_read_only_request(self, session):
-        # When a read-only request ends the session is also finished.
-        log = []
-        task = lambda: log.append("task")
-        session.defer(task)
-        session.connect()
-        notify(FinishReadOnlyRequestEvent(None, None))
-        self.assertEqual(["task"], log)
-        self.assertEqual([], list(session._deferred))
-        self.assertFalse(session.is_connected)
-
-    def test_global_session_finish_read_only_request(self):
-        # When a read-only request ends the global_session is finished too.
-        self._test_session_finish_read_only_request(global_session)
-
-    def test_global_unreliable_session_finish_read_only_request(self):
-        # When a read-only request ends the global_unreliable_session is
-        # finished too.
-        self._test_session_finish_read_only_request(global_unreliable_session)
diff --git a/lib/lp/services/webapp/doc/webapp-publication.rst b/lib/lp/services/webapp/doc/webapp-publication.rst
index 0fbbbe6..c478f92 100644
--- a/lib/lp/services/webapp/doc/webapp-publication.rst
+++ b/lib/lp/services/webapp/doc/webapp-publication.rst
@@ -1001,7 +1001,7 @@ afterCall().  For example, this publication subclass will simply print
 some string in its finishReadOnlyRequest().
 
     >>> class MyPublication(LaunchpadBrowserPublication):
-    ...     def finishReadOnlyRequest(self, request, ob, txn):
+    ...     def finishReadOnlyRequest(self, txn):
     ...         print("booo!")
     ...
 
diff --git a/lib/lp/services/webapp/interfaces.py b/lib/lp/services/webapp/interfaces.py
index aa53686..c6ef41a 100644
--- a/lib/lp/services/webapp/interfaces.py
+++ b/lib/lp/services/webapp/interfaces.py
@@ -818,23 +818,6 @@ class ICheckBoxWidgetLayout(IAlwaysSubmittedWidget):
     """A widget that is displayed like a check box with label to the right."""
 
 
-class IFinishReadOnlyRequestEvent(Interface):
-    """An event which gets sent when the publication is ended"""
-
-    object = Attribute("The object to which this request pertains.")
-
-    request = Attribute("The active request.")
-
-
-@implementer(IFinishReadOnlyRequestEvent)
-class FinishReadOnlyRequestEvent:
-    """An event which gets sent when the publication is ended"""
-
-    def __init__(self, ob, request):
-        self.object = ob
-        self.request = request
-
-
 class StormRangeFactoryError(Exception):
     """Raised when a Storm result set cannot be used for slicing by a
     StormRangeFactory.
diff --git a/lib/lp/services/webapp/publication.py b/lib/lp/services/webapp/publication.py
index bbe8b83..1557a46 100644
--- a/lib/lp/services/webapp/publication.py
+++ b/lib/lp/services/webapp/publication.py
@@ -54,7 +54,6 @@ from lp.services.features.flags import NullFeatureController
 from lp.services.oauth.interfaces import IOAuthSignedRequest
 from lp.services.statsd.interfaces.statsd_client import IStatsdClient
 from lp.services.webapp.interfaces import (
-    FinishReadOnlyRequestEvent,
     ILaunchpadRoot,
     IOpenLaunchBag,
     IPlacelessAuthUtility,
@@ -506,7 +505,7 @@ class LaunchpadBrowserPublication(
         # Abort the transaction on a read-only request.
         # NOTHING AFTER THIS SHOULD CAUSE A RETRY.
         if request.method in ["GET", "HEAD"]:
-            self.finishReadOnlyRequest(request, ob, txn)
+            self.finishReadOnlyRequest(txn)
         elif txn.isDoomed():
             # The following sends an abort to the database, even though the
             # transaction is still doomed.
@@ -528,13 +527,12 @@ class LaunchpadBrowserPublication(
             # calling beforeTraversal or doing proper cleanup.
             pass
 
-    def finishReadOnlyRequest(self, request, ob, txn):
+    def finishReadOnlyRequest(self, txn):
         """Hook called at the end of a read-only request.
 
         By default it abort()s the transaction, but subclasses may need to
         commit it instead, so they must overwrite this.
         """
-        notify(FinishReadOnlyRequestEvent(ob, request))
         txn.abort()
 
     def callTraversalHooks(self, request, ob):
diff --git a/lib/lp/services/webapp/tests/test_servers.py b/lib/lp/services/webapp/tests/test_servers.py
index e0fa7e6..14820e0 100644
--- a/lib/lp/services/webapp/tests/test_servers.py
+++ b/lib/lp/services/webapp/tests/test_servers.py
@@ -32,8 +32,6 @@ from lp.services.auth.enums import AccessTokenScope
 from lp.services.identity.interfaces.account import AccountStatus
 from lp.services.oauth.interfaces import TokenException
 from lp.services.webapp.interaction import get_interaction_extras
-from lp.services.webapp.interfaces import IFinishReadOnlyRequestEvent
-from lp.services.webapp.publication import LaunchpadBrowserPublication
 from lp.services.webapp.servers import (
     ApplicationServerSettingRequestFactory,
     FeedsBrowserRequest,
@@ -49,7 +47,7 @@ from lp.services.webapp.servers import (
     WebServiceTestRequest,
     web_service_request_to_browser_request,
 )
-from lp.testing import EventRecorder, TestCase, TestCaseWithFactory, logout
+from lp.testing import TestCase, TestCaseWithFactory, logout
 from lp.testing.layers import DatabaseFunctionalLayer, FunctionalLayer
 from lp.testing.publication import get_request_and_publication
 
@@ -782,55 +780,6 @@ class LoggingTransaction:
         self.log.append("ABORT")
 
 
-class TestFinishReadOnlyRequest(TestCase):
-    # Publications that have a finishReadOnlyRequest() method are obliged to
-    # fire an IFinishReadOnlyRequestEvent.
-
-    def _test_publication(self, publication, expected_transaction_log):
-        # publication.finishReadOnlyRequest() issues an
-        # IFinishReadOnlyRequestEvent and alters the transaction.
-        fake_request = object()
-        fake_object = object()
-        fake_transaction = LoggingTransaction()
-
-        with EventRecorder() as event_recorder:
-            publication.finishReadOnlyRequest(
-                fake_request, fake_object, fake_transaction
-            )
-
-        self.assertEqual(expected_transaction_log, fake_transaction.log)
-
-        finish_events = [
-            event
-            for event in event_recorder.events
-            if IFinishReadOnlyRequestEvent.providedBy(event)
-        ]
-        self.assertEqual(
-            1,
-            len(finish_events),
-            (
-                "Expected only one IFinishReadOnlyRequestEvent, but "
-                "got: %r" % finish_events
-            ),
-        )
-
-        [finish_event] = finish_events
-        self.assertIs(fake_request, finish_event.request)
-        self.assertIs(fake_object, finish_event.object)
-
-    def test_WebServicePub_fires_FinishReadOnlyRequestEvent(self):
-        # WebServicePublication.finishReadOnlyRequest() issues an
-        # IFinishReadOnlyRequestEvent and aborts the transaction.
-        publication = WebServicePublication(None)
-        self._test_publication(publication, ["ABORT"])
-
-    def test_LaunchpadBrowserPub_fires_FinishReadOnlyRequestEvent(self):
-        # LaunchpadBrowserPublication.finishReadOnlyRequest() issues an
-        # IFinishReadOnlyRequestEvent and aborts the transaction.
-        publication = LaunchpadBrowserPublication(None)
-        self._test_publication(publication, ["ABORT"])
-
-
 class TestWebServiceAccessTokens(TestCaseWithFactory):
     """Test personal access tokens for the webservice.