launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #05156
[Merge] lp:~allenap/launchpad/longpoll-associate-consumer-not-now into lp:launchpad
Gavin Panella has proposed merging lp:~allenap/launchpad/longpoll-associate-consumer-not-now into lp:launchpad.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~allenap/launchpad/longpoll-associate-consumer-not-now/+merge/77894
This branch does two things:
- Ensures that merely creating a RabbitRoutingKey or RabbitQueue
object does not cause the session to connect.
- Adds is_connected, a boolean property, to IMessageSession, and
removes connection. The connection should not really be exposed
anyway, and most uses of it were to check it was None or not -
i.e. to see if the session was connected or not.
--
https://code.launchpad.net/~allenap/launchpad/longpoll-associate-consumer-not-now/+merge/77894
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~allenap/launchpad/longpoll-associate-consumer-not-now into lp:launchpad.
=== modified file 'lib/lp/services/messaging/interfaces.py'
--- lib/lp/services/messaging/interfaces.py 2011-09-26 13:47:26 +0000
+++ lib/lp/services/messaging/interfaces.py 2011-10-03 10:45:28 +0000
@@ -14,10 +14,8 @@
]
-from zope.interface import (
- Attribute,
- Interface,
- )
+from zope.interface import Interface
+from zope.schema import Bool
class MessagingException(Exception):
@@ -34,7 +32,8 @@
class IMessageSession(Interface):
- connection = Attribute("A connection to the messaging system.")
+ is_connected = Bool(
+ u"Whether the session is connected to the messaging system.")
def connect():
"""Connect to the messaging system.
=== modified file 'lib/lp/services/messaging/rabbit.py'
--- lib/lp/services/messaging/rabbit.py 2011-09-29 14:50:32 +0000
+++ lib/lp/services/messaging/rabbit.py 2011-10-03 10:45:28 +0000
@@ -71,17 +71,11 @@
transaction.manager.registerSynch(self._sync)
@property
- def connection(self):
- """See `IMessageSession`.
-
- Don't return closed connection.
- """
- if self._connection is None:
- return None
- elif self._connection.transport is None:
- return None
- else:
- return self._connection
+ def is_connected(self):
+ """See `IMessageSession`."""
+ return (
+ self._connection is not None and
+ self._connection.transport is not None)
def connect(self):
"""See `IMessageSession`.
@@ -207,6 +201,7 @@
def associateConsumerNow(self, consumer):
"""Only receive messages for requested routing key."""
+ consumer._declare()
self.channel.queue_bind(
queue=consumer.name, exchange=self.session.exchange,
routing_key=self.key, nowait=False)
@@ -232,8 +227,9 @@
def __init__(self, session, name):
super(RabbitQueue, self).__init__(session)
self.name = name
- # The queue will be auto-deleted 5 minutes after the last
- # use of the queue.
+
+ def _declare(self):
+ # The queue will be auto-deleted 5 minutes after its last use.
# http://www.rabbitmq.com/extensions.html#queue-leases
self.channel.queue_declare(
self.name, nowait=False, auto_delete=False,
@@ -246,6 +242,7 @@
trying at least once.
:raises EmptyQueue: if the timeout passes.
"""
+ self._declare()
starttime = time.time()
while True:
message = self.channel.basic_get(self.name)
=== modified file 'lib/lp/services/messaging/tests/test_rabbit.py'
--- lib/lp/services/messaging/tests/test_rabbit.py 2011-09-26 20:24:37 +0000
+++ lib/lp/services/messaging/tests/test_rabbit.py 2011-10-03 10:45:28 +0000
@@ -101,10 +101,10 @@
def test_connect(self):
session = RabbitSession()
- self.assertIs(None, session.connection)
+ self.assertFalse(session.is_connected)
connection = session.connect()
- self.assertIsNot(None, session.connection)
- self.assertIs(connection, session.connection)
+ self.assertTrue(session.is_connected)
+ self.assertIs(connection, session._connection)
def test_connect_with_incomplete_configuration(self):
self.pushConfig("rabbitmq", host="none")
@@ -117,15 +117,15 @@
session = RabbitSession()
session.connect()
session.disconnect()
- self.assertIs(None, session.connection)
+ self.assertFalse(session.is_connected)
- def test_connection(self):
- # The connection property is None once a connection has been closed.
+ def test_is_connected(self):
+ # is_connected is False once a connection has been closed.
session = RabbitSession()
session.connect()
# Close the connection without using disconnect().
- session.connection.close()
- self.assertIs(None, session.connection)
+ session._connection.close()
+ self.assertFalse(session.is_connected)
def test_defer(self):
task = lambda foo, bar: None
@@ -148,7 +148,7 @@
session.flush()
self.assertEqual(["task"], log)
self.assertEqual([], list(session._deferred))
- self.assertIsNot(None, session.connection)
+ self.assertTrue(session.is_connected)
def test_reset(self):
# RabbitSession.reset() resets session variables and does not run
@@ -161,7 +161,7 @@
session.reset()
self.assertEqual([], log)
self.assertEqual([], list(session._deferred))
- self.assertIs(None, session.connection)
+ self.assertFalse(session.is_connected)
def test_finish(self):
# RabbitSession.finish() resets session variables after running
@@ -174,7 +174,7 @@
session.finish()
self.assertEqual(["task"], log)
self.assertEqual([], list(session._deferred))
- self.assertIs(None, session.connection)
+ self.assertFalse(session.is_connected)
def test_getProducer(self):
session = RabbitSession()
@@ -238,9 +238,9 @@
def test_channel(self):
# Referencing the channel property causes the session to connect.
base = RabbitMessageBase(global_session)
- self.assertIs(None, base.session.connection)
+ self.assertFalse(base.session.is_connected)
channel = base.channel
- self.assertIsNot(None, base.session.connection)
+ self.assertTrue(base.session.is_connected)
self.assertIsNot(None, channel)
# The same channel is returned every time.
self.assertIs(channel, base.channel)
@@ -266,6 +266,8 @@
consumer = RabbitQueue(global_session, next(queue_names))
routing_key = RabbitRoutingKey(global_session, next(key_names))
routing_key.associateConsumer(consumer)
+ # The session is still not connected.
+ self.assertFalse(global_session.is_connected)
routing_key.sendNow('now')
routing_key.send('later')
# There is nothing in the queue because the consumer has not yet been
@@ -321,6 +323,11 @@
received_data = consumer.receive(timeout=2)
self.assertEqual(data, received_data)
+ def test_does_not_connect_session_immediately(self):
+ # RabbitRoutingKey does not connect the session until necessary.
+ RabbitRoutingKey(global_session, next(key_names))
+ self.assertFalse(global_session.is_connected)
+
class TestRabbitQueue(RabbitTestCase):
@@ -347,6 +354,11 @@
routing_key.associateConsumerNow(consumer)
self.assertRaises(EmptyQueue, consumer.receive, timeout=2)
+ def test_does_not_connect_session_immediately(self):
+ # RabbitQueue does not connect the session until necessary.
+ RabbitQueue(global_session, next(queue_names))
+ self.assertFalse(global_session.is_connected)
+
class TestRabbit(RabbitTestCase):
"""Integration-like tests for the RabbitMQ messaging abstractions."""