launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #06649
[Merge] lp:~rvb/maas/maas-longpoll-rabbitpublisher into lp:maas
Raphaël Badin has proposed merging lp:~rvb/maas/maas-longpoll-rabbitpublisher into lp:maas with lp:~rvb/maas/maas-longpoll as a prerequisite.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~rvb/maas/maas-longpoll-rabbitpublisher/+merge/97051
This branch adds the rabbit messaging layer.
This is composed of two things:
- src/maasserver/rabbit.py contains the classes to interaction with rabbit (It is based on Launchpad's layer but it uses 'fanout' pattern [http://www.rabbitmq.com/tutorials/tutorial-three-python.html] that is much appropriate to our use case.))
- src/maasserver/messages.py uses rabbit.py to create a messaging layer that will publish "model changed" events.
--
https://code.launchpad.net/~rvb/maas/maas-longpoll-rabbitpublisher/+merge/97051
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~rvb/maas/maas-longpoll-rabbitpublisher into lp:maas.
=== modified file 'src/maas/demo.py'
--- src/maas/demo.py 2012-03-12 15:50:49 +0000
+++ src/maas/demo.py 2012-03-12 16:43:38 +0000
@@ -25,6 +25,7 @@
# This should match the setting in Makefile:pserv.pid.
PSERV_URL = "http://localhost:8001/api"
+RABBITMQ_PUBLISH = True
LOGGING = {
'version': 1,
=== modified file 'src/maas/settings.py'
--- src/maas/settings.py 2012-03-12 15:50:49 +0000
+++ src/maas/settings.py 2012-03-12 16:43:38 +0000
@@ -71,6 +71,15 @@
'maasserver.models.MaaSAuthorizationBackend',
)
+# Rabbit MQ Configuration.
+RABBITMQ_HOST = 'localhost'
+RABBITMQ_USERID = 'guest'
+RABBITMQ_PASSWORD = 'guest'
+RABBITMQ_VIRTUAL_HOST = '/'
+
+RABBITMQ_PUBLISH = False
+
+
DATABASES = {
'default': {
# 'postgresql_psycopg2', 'postgresql', 'mysql', 'sqlite3' etc.
=== added file 'src/maasserver/messages.py'
--- src/maasserver/messages.py 1970-01-01 00:00:00 +0000
+++ src/maasserver/messages.py 2012-03-12 16:43:38 +0000
@@ -0,0 +1,131 @@
+# Copyright 2012 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Messages."""
+
+from __future__ import (
+ print_function,
+ unicode_literals,
+ )
+
+__metaclass__ = type
+__all__ = [
+ "MaaSMessenger",
+ "MessengerBase",
+ "messaging",
+ ]
+
+
+from abc import (
+ ABCMeta,
+ abstractmethod,
+ )
+from functools import partial
+
+from django.conf import settings
+from django.core.serializers.json import DjangoJSONEncoder
+from django.db.models.signals import (
+ post_delete,
+ post_save,
+ )
+from maasserver.models import Node
+from maasserver.rabbit import RabbitMessaging
+
+# This is the name of the exchange where changes to MaaS's model objects will
+# be published.
+MODEL_EXCHANGE_NAME = "MaaS Model Exchange"
+
+
+class MESSENGER_EVENT:
+ CREATED = 'created'
+ UPDATED = 'updated'
+ DELETED = 'deleted'
+
+
+class MessengerBase:
+ """Generic class that will publish events to a producer when a model
+ object is changed.
+ """
+
+ __metaclass__ = ABCMeta
+
+ def __init__(self, klass, producer):
+ """
+ :param klass: The model class to track.
+ :type klass: django.db.models.Model
+ :param producer: The producer used to publish events.
+ :type producer: maasserer.rabbit.db.models.RabbitExchange
+ """
+ self.klass = klass
+ self.producer = producer
+
+ @abstractmethod
+ def create_msg(self, event_name, instance):
+ """Format a message from the given event_name and instance."""
+
+ def update_obj(self, sender, instance, created, **kwargs):
+ event_name = (
+ MESSENGER_EVENT.CREATED if created
+ else MESSENGER_EVENT.UPDATED)
+ message = self.create_msg(event_name, instance)
+ self.producer.publish(message)
+
+ def delete_obj(self, sender, instance, **kwargs):
+ message = self.create_msg(MESSENGER_EVENT.DELETED, instance)
+ self.producer.publish(message)
+
+ def register(self):
+ update_obj = partial(self.update_obj)
+ update_obj.__name__ = self.update_obj.__name__
+ post_save.connect(
+ receiver=update_obj, weak=False, sender=self.klass)
+ delete_obj = partial(self.delete_obj)
+ delete_obj.__name__ = self.delete_obj.__name__
+ post_delete.connect(
+ receiver=delete_obj, weak=False, sender=self.klass)
+
+
+class MaaSMessenger(MessengerBase):
+ """A messenger tailored to suit MaaS' UI (JavaScript) requirements.
+
+ The format of the event's payload will be:
+ {
+ "event_key": "$ModelClass.$MESSENGER_EVENT",
+ "instance": jsonified instance
+ }
+
+ For instance, when a Node is created, the event's payload will look like
+ this:
+ {
+ "event_key": "Node.created",
+ "instance": {
+ "hostname": "sun",
+ "system_id": "node-17ca41c2-6c39-11e1-a961-00219bd0a2de",
+ "architecture": "i386",
+ [...]
+ }
+ }
+
+ """
+
+ def create_msg(self, event_name, instance):
+ event_key = self.event_key(event_name, instance)
+ message = DjangoJSONEncoder().encode({
+ 'instance':
+ {k: v for k, v in instance.__dict__.items()
+ if not k.startswith('_')},
+ 'event_key': event_key,
+
+ })
+ return message
+
+ def event_key(self, event_name, instance):
+ return "%s.%s" % (
+ instance.__class__.__name__, event_name)
+
+
+if settings.RABBITMQ_PUBLISH:
+ messaging = RabbitMessaging(MODEL_EXCHANGE_NAME)
+ MaaSMessenger(Node, messaging.getExchange()).register()
+else:
+ session = None
=== modified file 'src/maasserver/models.py'
--- src/maasserver/models.py 2012-03-12 07:25:31 +0000
+++ src/maasserver/models.py 2012-03-12 16:43:38 +0000
@@ -858,3 +858,6 @@
from maasserver import provisioning
# We mention 'provisioning' here to silence lint warnings.
provisioning
+
+from maasserver import messages
+messages
=== added file 'src/maasserver/rabbit.py'
--- src/maasserver/rabbit.py 1970-01-01 00:00:00 +0000
+++ src/maasserver/rabbit.py 2012-03-12 16:43:38 +0000
@@ -0,0 +1,103 @@
+# Copyright 2012 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Rabbit messaging."""
+
+from __future__ import (
+ print_function,
+ unicode_literals,
+ )
+
+__metaclass__ = type
+__all__ = [
+ "RabbitExchange",
+ "RabbitQueue",
+ "RabbitMessaging",
+ "RabbitSession",
+ ]
+
+
+from amqplib import client_0_8 as amqp
+from django.conf import settings
+
+
+def connect():
+ """Connect to AMQP."""
+ return amqp.Connection(
+ host=settings.RABBITMQ_HOST,
+ userid=settings.RABBITMQ_USERID,
+ password=settings.RABBITMQ_PASSWORD,
+ virtual_host=settings.RABBITMQ_VIRTUAL_HOST,
+ insist=False)
+
+
+class RabbitSession:
+
+ def __init__(self):
+ self._connection = None
+
+ @property
+ def connection(self):
+ if self._connection is None or self._connection.transport is None:
+ self._connection = connect()
+ return self._connection
+
+ def disconnect(self):
+ if self._connection is not None:
+ try:
+ self._connection.close()
+ finally:
+ self._connection = None
+
+
+class RabbitMessaging:
+
+ def __init__(self, exchange_name):
+ self.exchange_name = exchange_name
+ self._session = RabbitSession()
+
+ def getExchange(self):
+ return RabbitExchange(self._session, self.exchange_name)
+
+ def getQueue(self):
+ return RabbitQueue(self._session, self.exchange_name)
+
+
+class RabbitBase:
+
+ def __init__(self, session, exchange_name):
+ self.exchange_name = exchange_name
+ self._session = session
+ self._channel = None
+
+ @property
+ def channel(self):
+ if self._channel is None or not self._channel.is_open:
+ self._channel = self._session.connection.channel()
+ self._channel.exchange_declare(
+ self.exchange_name, type='fanout')
+ return self._channel
+
+
+class RabbitExchange(RabbitBase):
+
+ def publish(self, message):
+ msg = amqp.Message(message)
+ # Publish to a 'fanout' exchange: routing_key is ''.
+ self.channel.basic_publish(
+ exchange=self.exchange_name, routing_key='', msg=msg)
+
+
+class RabbitQueue(RabbitBase):
+
+ def __init__(self, session, exchange_name):
+ super(RabbitQueue, self).__init__(session, exchange_name)
+ self.queue_name = self.channel.queue_declare(
+ nowait=False, auto_delete=False,
+ arguments={"x-expires": 300000})[0]
+ self.channel.queue_bind(
+ exchange=self.exchange_name, queue=self.queue_name)
+
+ @property
+ def name(self):
+ return self.queue_name
=== modified file 'src/maasserver/tests/models.py'
--- src/maasserver/tests/models.py 2012-02-27 16:54:22 +0000
+++ src/maasserver/tests/models.py 2012-03-12 16:43:38 +0000
@@ -20,3 +20,7 @@
class JSONFieldModel(models.Model):
name = models.CharField(max_length=255, unique=False)
value = JSONObjectField(null=True)
+
+
+class MessagesTestModel(models.Model):
+ name = models.CharField(max_length=255, unique=False)
=== added file 'src/maasserver/tests/test_messages.py'
--- src/maasserver/tests/test_messages.py 1970-01-01 00:00:00 +0000
+++ src/maasserver/tests/test_messages.py 2012-03-12 16:43:38 +0000
@@ -0,0 +1,117 @@
+# Copyright 2012 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Test maasserver messages."""
+
+from __future__ import (
+ print_function,
+ unicode_literals,
+ )
+
+__metaclass__ = type
+__all__ = []
+
+import json
+
+from maasserver.messages import (
+ MaaSMessenger,
+ MESSENGER_EVENT,
+ MessengerBase,
+ )
+from maasserver.models import Node
+from maasserver.testing import TestModelTestCase
+from maasserver.testing.factory import factory
+from maasserver.tests.models import MessagesTestModel
+
+
+class FakeProducer:
+ """A fake RabbitProducer that simply records published messages."""
+
+ def __init__(self):
+ self.messages = []
+
+ def publish(self, message):
+ self.messages.append(message)
+
+
+class TestMessenger(MessengerBase):
+
+ def create_msg(self, event_name, instance):
+ return [event_name, instance]
+
+
+class MessengerBaseTest(TestModelTestCase):
+
+ app = 'maasserver.tests'
+
+ def test_update_obj_publishes_message_if_created(self):
+ producer = FakeProducer()
+ messenger = TestMessenger(MessagesTestModel, producer)
+ instance = factory.getRandomString()
+ messenger.update_obj(MessagesTestModel, instance, True)
+ self.assertEqual(
+ [[MESSENGER_EVENT.CREATED, instance]], producer.messages)
+
+ def test_update_obj_publishes_message_if_not_created(self):
+ producer = FakeProducer()
+ messenger = TestMessenger(MessagesTestModel, producer)
+ instance = factory.getRandomString()
+ messenger.update_obj(MessagesTestModel, instance, False)
+ self.assertEqual(
+ [[MESSENGER_EVENT.UPDATED, instance]], producer.messages)
+
+ def test_delete_obj_publishes_message(self):
+ producer = FakeProducer()
+ messenger = TestMessenger(MessagesTestModel, producer)
+ instance = factory.getRandomString()
+ messenger.delete_obj(MessagesTestModel, instance)
+ self.assertEqual(
+ [[MESSENGER_EVENT.DELETED, instance]], producer.messages)
+
+ def test_register_registers_update_signal(self):
+ producer = FakeProducer()
+ messenger = TestMessenger(MessagesTestModel, producer)
+ messenger.register()
+ obj = MessagesTestModel(name=factory.getRandomString())
+ obj.save()
+ self.assertEqual(
+ [[MESSENGER_EVENT.CREATED, obj]], producer.messages)
+
+ def test_register_registers_delete_signal(self):
+ obj = MessagesTestModel(name=factory.getRandomString())
+ obj.save()
+ producer = FakeProducer()
+ messenger = TestMessenger(MessagesTestModel, producer)
+ messenger.register()
+ obj.delete()
+ self.assertEqual(
+ [[MESSENGER_EVENT.DELETED, obj]], producer.messages)
+
+
+class MaaSMessengerTest(TestModelTestCase):
+
+ app = 'maasserver.tests'
+
+ def test_event_key(self):
+ producer = FakeProducer()
+ event_name = factory.getRandomString()
+ obj = MessagesTestModel(name=factory.getRandomString())
+ messenger = MaaSMessenger(MessagesTestModel, producer)
+ self.assertEqual(
+ '%s.%s' % ('MessagesTestModel', event_name),
+ messenger.event_key(event_name, obj))
+
+ def test_create_msg(self):
+ producer = FakeProducer()
+ messenger = MaaSMessenger(Node, producer)
+ event_name = factory.getRandomString()
+ obj_name = factory.getRandomString()
+ obj = MessagesTestModel(name=obj_name)
+ obj.save()
+ msg = messenger.create_msg(event_name, obj)
+ decoded_msg = json.loads(msg)
+ self.assertItemsEqual(['instance', 'event_key'], list(decoded_msg))
+ self.assertItemsEqual(
+ ['id', 'name'], list(decoded_msg['instance']))
+ self.assertItemsEqual(
+ obj_name, decoded_msg['instance']['name'])
=== added file 'src/maasserver/tests/test_rabbit.py'
--- src/maasserver/tests/test_rabbit.py 1970-01-01 00:00:00 +0000
+++ src/maasserver/tests/test_rabbit.py 2012-03-12 16:43:38 +0000
@@ -0,0 +1,143 @@
+# Copyright 2012 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Rabbit messaging tests."""
+
+from __future__ import (
+ print_function,
+ unicode_literals,
+ )
+
+__metaclass__ = type
+__all__ = []
+
+
+from amqplib import client_0_8 as amqp
+from maasserver import rabbit
+from maasserver.rabbit import (
+ RabbitBase,
+ RabbitExchange,
+ RabbitMessaging,
+ RabbitQueue,
+ RabbitSession,
+ )
+from maasserver.testing.factory import factory
+from maastesting import TestCase
+from rabbitfixture.server import RabbitServer
+
+
+class RabbitTestCase(TestCase):
+
+ def setUp(self):
+ super(RabbitTestCase, self).setUp()
+ self.rabbit_server = self.useFixture(RabbitServer())
+ self.rabbit_env = self.rabbit_server.runner.environment
+ self.old_connect = rabbit.connect
+ rabbit.connect = self.rabbit_env.get_connection
+
+ def tearDown(self):
+ super(RabbitTestCase, self).tearDown()
+ rabbit.connect = self.old_connect
+
+ def get_command_output(self, command):
+ # Returns the output of the given rabbit command.
+ return self.rabbit_env.rabbitctl(str(command))[0]
+
+
+class TestRabbitSession(RabbitTestCase):
+
+ def test_session_connection(self):
+ session = RabbitSession()
+ # Referencing the connection property causes a connection to be
+ # created.
+ connection = session.connection
+ self.assertIsNotNone(session._connection)
+ # The same connection is returned every time.
+ self.assertIs(connection, session.connection)
+
+ def test_session_disconnect(self):
+ session = RabbitSession()
+ session.disconnect()
+ self.assertIsNone(session._connection)
+
+
+class TestRabbitMessaging(RabbitTestCase):
+
+ def test_messaging_getExchange(self):
+ exchange_name = factory.getRandomString()
+ messaging = RabbitMessaging(exchange_name)
+ exchange = messaging.getExchange()
+ self.assertTrue(isinstance(exchange, RabbitExchange))
+ self.assertEqual(messaging._session, exchange._session)
+ self.assertEqual(exchange_name, exchange.exchange_name)
+
+ def test_messaging_getQueue(self):
+ exchange_name = factory.getRandomString()
+ messaging = RabbitMessaging(exchange_name)
+ queue = messaging.getQueue()
+ self.assertTrue(isinstance(queue, RabbitQueue))
+ self.assertEqual(messaging._session, queue._session)
+ self.assertEqual(exchange_name, queue.exchange_name)
+
+
+class TestRabbitBase(RabbitTestCase):
+
+ def test_rabbitbase_contains_session(self):
+ exchange_name = factory.getRandomString()
+ rabbitbase = RabbitBase(RabbitSession(), exchange_name)
+ self.assertTrue(isinstance(rabbitbase._session, RabbitSession))
+
+ def test_base_has_exchange_name(self):
+ exchange_name = factory.getRandomString()
+ rabbitbase = RabbitBase(RabbitSession(), exchange_name)
+ self.assertEqual(exchange_name, rabbitbase.exchange_name)
+
+ def test_base_channel(self):
+ rabbitbase = RabbitBase(RabbitSession(), factory.getRandomString())
+ # Referencing the channel property causes an open channel to be
+ # created.
+ channel = rabbitbase.channel
+ self.assertTrue(channel.is_open)
+ self.assertIsNotNone(rabbitbase._session._connection)
+ # The same channel is returned every time.
+ self.assertIs(channel, rabbitbase.channel)
+
+ def test_base_channel_creates_exchange(self):
+ exchange_name = factory.getRandomString()
+ rabbitbase = RabbitBase(RabbitSession(), exchange_name)
+ rabbitbase.channel
+ self.assertIn(
+ exchange_name,
+ self.get_command_output('list_exchanges'))
+
+
+class TestRabbitExchange(RabbitTestCase):
+
+ def test_exchange_publish(self):
+ exchange_name = factory.getRandomString()
+ message_content = factory.getRandomString()
+ exchange = RabbitExchange(RabbitSession(), exchange_name)
+
+ channel = RabbitBase(RabbitSession(), exchange_name).channel
+ queue_name = channel.queue_declare(auto_delete=True)[0]
+ channel.queue_bind(exchange=exchange_name, queue=queue_name)
+ exchange.publish(message_content)
+ message = channel.basic_get(queue_name)
+ self.assertEqual(message_content, message.body)
+
+
+class TestRabbitQueue(RabbitTestCase):
+
+ def test_rabbit_queue_binds_queue(self):
+ exchange_name = factory.getRandomString()
+ message_content = factory.getRandomString()
+ queue = RabbitQueue(RabbitSession(), exchange_name)
+
+ # Publish to queue.name.
+ base = RabbitBase(RabbitSession(), exchange_name)
+ channel = base.channel
+ msg = amqp.Message(message_content)
+ channel.basic_publish(
+ exchange=exchange_name, routing_key='', msg=msg)
+ message = channel.basic_get(queue.name)
+ self.assertEqual(message_content, message.body)