← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~allenap/launchpad/longpoll-associate-consumer-not-now into lp:launchpad

 

Gavin Panella has proposed merging lp:~allenap/launchpad/longpoll-associate-consumer-not-now into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~allenap/launchpad/longpoll-associate-consumer-not-now/+merge/77894

This branch does two things:

- Ensures that merely creating a RabbitRoutingKey or RabbitQueue
  object does not cause the session to connect.

- Adds is_connected, a boolean property, to IMessageSession, and
  removes connection. The connection should not really be exposed
  anyway, and most uses of it were to check it was None or not -
  i.e. to see if the session was connected or not.

-- 
https://code.launchpad.net/~allenap/launchpad/longpoll-associate-consumer-not-now/+merge/77894
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~allenap/launchpad/longpoll-associate-consumer-not-now into lp:launchpad.
=== modified file 'lib/lp/services/messaging/interfaces.py'
--- lib/lp/services/messaging/interfaces.py	2011-09-26 13:47:26 +0000
+++ lib/lp/services/messaging/interfaces.py	2011-10-03 10:45:28 +0000
@@ -14,10 +14,8 @@
     ]
 
 
-from zope.interface import (
-    Attribute,
-    Interface,
-    )
+from zope.interface import Interface
+from zope.schema import Bool
 
 
 class MessagingException(Exception):
@@ -34,7 +32,8 @@
 
 class IMessageSession(Interface):
 
-    connection = Attribute("A connection to the messaging system.")
+    is_connected = Bool(
+        u"Whether the session is connected to the messaging system.")
 
     def connect():
         """Connect to the messaging system.

=== modified file 'lib/lp/services/messaging/rabbit.py'
--- lib/lp/services/messaging/rabbit.py	2011-09-29 14:50:32 +0000
+++ lib/lp/services/messaging/rabbit.py	2011-10-03 10:45:28 +0000
@@ -71,17 +71,11 @@
         transaction.manager.registerSynch(self._sync)
 
     @property
-    def connection(self):
-        """See `IMessageSession`.
-
-        Don't return closed connection.
-        """
-        if self._connection is None:
-            return None
-        elif self._connection.transport is None:
-            return None
-        else:
-            return self._connection
+    def is_connected(self):
+        """See `IMessageSession`."""
+        return (
+            self._connection is not None and
+            self._connection.transport is not None)
 
     def connect(self):
         """See `IMessageSession`.
@@ -207,6 +201,7 @@
 
     def associateConsumerNow(self, consumer):
         """Only receive messages for requested routing key."""
+        consumer._declare()
         self.channel.queue_bind(
             queue=consumer.name, exchange=self.session.exchange,
             routing_key=self.key, nowait=False)
@@ -232,8 +227,9 @@
     def __init__(self, session, name):
         super(RabbitQueue, self).__init__(session)
         self.name = name
-        # The queue will be auto-deleted 5 minutes after the last
-        # use of the queue.
+
+    def _declare(self):
+        # The queue will be auto-deleted 5 minutes after its last use.
         # http://www.rabbitmq.com/extensions.html#queue-leases
         self.channel.queue_declare(
             self.name, nowait=False, auto_delete=False,
@@ -246,6 +242,7 @@
             trying at least once.
         :raises EmptyQueue: if the timeout passes.
         """
+        self._declare()
         starttime = time.time()
         while True:
             message = self.channel.basic_get(self.name)

=== modified file 'lib/lp/services/messaging/tests/test_rabbit.py'
--- lib/lp/services/messaging/tests/test_rabbit.py	2011-09-26 20:24:37 +0000
+++ lib/lp/services/messaging/tests/test_rabbit.py	2011-10-03 10:45:28 +0000
@@ -101,10 +101,10 @@
 
     def test_connect(self):
         session = RabbitSession()
-        self.assertIs(None, session.connection)
+        self.assertFalse(session.is_connected)
         connection = session.connect()
-        self.assertIsNot(None, session.connection)
-        self.assertIs(connection, session.connection)
+        self.assertTrue(session.is_connected)
+        self.assertIs(connection, session._connection)
 
     def test_connect_with_incomplete_configuration(self):
         self.pushConfig("rabbitmq", host="none")
@@ -117,15 +117,15 @@
         session = RabbitSession()
         session.connect()
         session.disconnect()
-        self.assertIs(None, session.connection)
+        self.assertFalse(session.is_connected)
 
-    def test_connection(self):
-        # The connection property is None once a connection has been closed.
+    def test_is_connected(self):
+        # is_connected is False once a connection has been closed.
         session = RabbitSession()
         session.connect()
         # Close the connection without using disconnect().
-        session.connection.close()
-        self.assertIs(None, session.connection)
+        session._connection.close()
+        self.assertFalse(session.is_connected)
 
     def test_defer(self):
         task = lambda foo, bar: None
@@ -148,7 +148,7 @@
         session.flush()
         self.assertEqual(["task"], log)
         self.assertEqual([], list(session._deferred))
-        self.assertIsNot(None, session.connection)
+        self.assertTrue(session.is_connected)
 
     def test_reset(self):
         # RabbitSession.reset() resets session variables and does not run
@@ -161,7 +161,7 @@
         session.reset()
         self.assertEqual([], log)
         self.assertEqual([], list(session._deferred))
-        self.assertIs(None, session.connection)
+        self.assertFalse(session.is_connected)
 
     def test_finish(self):
         # RabbitSession.finish() resets session variables after running
@@ -174,7 +174,7 @@
         session.finish()
         self.assertEqual(["task"], log)
         self.assertEqual([], list(session._deferred))
-        self.assertIs(None, session.connection)
+        self.assertFalse(session.is_connected)
 
     def test_getProducer(self):
         session = RabbitSession()
@@ -238,9 +238,9 @@
     def test_channel(self):
         # Referencing the channel property causes the session to connect.
         base = RabbitMessageBase(global_session)
-        self.assertIs(None, base.session.connection)
+        self.assertFalse(base.session.is_connected)
         channel = base.channel
-        self.assertIsNot(None, base.session.connection)
+        self.assertTrue(base.session.is_connected)
         self.assertIsNot(None, channel)
         # The same channel is returned every time.
         self.assertIs(channel, base.channel)
@@ -266,6 +266,8 @@
         consumer = RabbitQueue(global_session, next(queue_names))
         routing_key = RabbitRoutingKey(global_session, next(key_names))
         routing_key.associateConsumer(consumer)
+        # The session is still not connected.
+        self.assertFalse(global_session.is_connected)
         routing_key.sendNow('now')
         routing_key.send('later')
         # There is nothing in the queue because the consumer has not yet been
@@ -321,6 +323,11 @@
             received_data = consumer.receive(timeout=2)
             self.assertEqual(data, received_data)
 
+    def test_does_not_connect_session_immediately(self):
+        # RabbitRoutingKey does not connect the session until necessary.
+        RabbitRoutingKey(global_session, next(key_names))
+        self.assertFalse(global_session.is_connected)
+
 
 class TestRabbitQueue(RabbitTestCase):
 
@@ -347,6 +354,11 @@
         routing_key.associateConsumerNow(consumer)
         self.assertRaises(EmptyQueue, consumer.receive, timeout=2)
 
+    def test_does_not_connect_session_immediately(self):
+        # RabbitQueue does not connect the session until necessary.
+        RabbitQueue(global_session, next(queue_names))
+        self.assertFalse(global_session.is_connected)
+
 
 class TestRabbit(RabbitTestCase):
     """Integration-like tests for the RabbitMQ messaging abstractions."""