← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~rvb/maas/maas-longpoll-rabbitpublisher into lp:maas

 

Raphaël Badin has proposed merging lp:~rvb/maas/maas-longpoll-rabbitpublisher into lp:maas with lp:~rvb/maas/maas-longpoll as a prerequisite.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~rvb/maas/maas-longpoll-rabbitpublisher/+merge/97051

This branch adds the rabbit messaging layer.  

This is composed of two things:
- src/maasserver/rabbit.py contains the classes to interaction with rabbit (It is based on Launchpad's layer but it uses 'fanout' pattern [http://www.rabbitmq.com/tutorials/tutorial-three-python.html] that is much appropriate to our use case.))
- src/maasserver/messages.py uses rabbit.py to create a messaging layer that will publish "model changed" events.
-- 
https://code.launchpad.net/~rvb/maas/maas-longpoll-rabbitpublisher/+merge/97051
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~rvb/maas/maas-longpoll-rabbitpublisher into lp:maas.
=== modified file 'src/maas/demo.py'
--- src/maas/demo.py	2012-03-12 15:50:49 +0000
+++ src/maas/demo.py	2012-03-12 16:43:38 +0000
@@ -25,6 +25,7 @@
 # This should match the setting in Makefile:pserv.pid.
 PSERV_URL = "http://localhost:8001/api";
 
+RABBITMQ_PUBLISH = True
 
 LOGGING = {
     'version': 1,

=== modified file 'src/maas/settings.py'
--- src/maas/settings.py	2012-03-12 15:50:49 +0000
+++ src/maas/settings.py	2012-03-12 16:43:38 +0000
@@ -71,6 +71,15 @@
     'maasserver.models.MaaSAuthorizationBackend',
     )
 
+# Rabbit MQ Configuration.
+RABBITMQ_HOST = 'localhost'
+RABBITMQ_USERID = 'guest'
+RABBITMQ_PASSWORD = 'guest'
+RABBITMQ_VIRTUAL_HOST = '/'
+
+RABBITMQ_PUBLISH = False
+
+
 DATABASES = {
     'default': {
         # 'postgresql_psycopg2', 'postgresql', 'mysql', 'sqlite3' etc.

=== added file 'src/maasserver/messages.py'
--- src/maasserver/messages.py	1970-01-01 00:00:00 +0000
+++ src/maasserver/messages.py	2012-03-12 16:43:38 +0000
@@ -0,0 +1,131 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Messages."""
+
+from __future__ import (
+    print_function,
+    unicode_literals,
+    )
+
+__metaclass__ = type
+__all__ = [
+    "MaaSMessenger",
+    "MessengerBase",
+    "messaging",
+    ]
+
+
+from abc import (
+    ABCMeta,
+    abstractmethod,
+    )
+from functools import partial
+
+from django.conf import settings
+from django.core.serializers.json import DjangoJSONEncoder
+from django.db.models.signals import (
+    post_delete,
+    post_save,
+    )
+from maasserver.models import Node
+from maasserver.rabbit import RabbitMessaging
+
+# This is the name of the exchange where changes to MaaS's model objects will
+# be published.
+MODEL_EXCHANGE_NAME = "MaaS Model Exchange"
+
+
+class MESSENGER_EVENT:
+    CREATED = 'created'
+    UPDATED = 'updated'
+    DELETED = 'deleted'
+
+
+class MessengerBase:
+    """Generic class that will publish events to a producer when a model
+    object is changed.
+    """
+
+    __metaclass__ = ABCMeta
+
+    def __init__(self, klass, producer):
+        """
+        :param klass: The model class to track.
+        :type klass: django.db.models.Model
+        :param producer: The producer used to publish events.
+        :type producer: maasserer.rabbit.db.models.RabbitExchange
+        """
+        self.klass = klass
+        self.producer = producer
+
+    @abstractmethod
+    def create_msg(self, event_name, instance):
+        """Format a message from the given event_name and instance."""
+
+    def update_obj(self, sender, instance, created, **kwargs):
+        event_name = (
+            MESSENGER_EVENT.CREATED if created
+            else MESSENGER_EVENT.UPDATED)
+        message = self.create_msg(event_name, instance)
+        self.producer.publish(message)
+
+    def delete_obj(self, sender, instance, **kwargs):
+        message = self.create_msg(MESSENGER_EVENT.DELETED, instance)
+        self.producer.publish(message)
+
+    def register(self):
+        update_obj = partial(self.update_obj)
+        update_obj.__name__ = self.update_obj.__name__
+        post_save.connect(
+            receiver=update_obj, weak=False, sender=self.klass)
+        delete_obj = partial(self.delete_obj)
+        delete_obj.__name__ = self.delete_obj.__name__
+        post_delete.connect(
+            receiver=delete_obj, weak=False, sender=self.klass)
+
+
+class MaaSMessenger(MessengerBase):
+    """A messenger tailored to suit MaaS' UI (JavaScript) requirements.
+
+    The format of the event's payload will be:
+    {
+        "event_key": "$ModelClass.$MESSENGER_EVENT",
+        "instance": jsonified instance
+    }
+
+    For instance, when a Node is created, the event's payload will look like
+    this:
+    {
+        "event_key": "Node.created",
+        "instance": {
+            "hostname": "sun",
+            "system_id": "node-17ca41c2-6c39-11e1-a961-00219bd0a2de",
+            "architecture": "i386",
+            [...]
+        }
+    }
+
+    """
+
+    def create_msg(self, event_name, instance):
+        event_key = self.event_key(event_name, instance)
+        message = DjangoJSONEncoder().encode({
+            'instance':
+                {k: v for k, v in instance.__dict__.items()
+                 if not k.startswith('_')},
+            'event_key': event_key,
+
+        })
+        return message
+
+    def event_key(self, event_name, instance):
+        return "%s.%s" % (
+            instance.__class__.__name__, event_name)
+
+
+if settings.RABBITMQ_PUBLISH:
+    messaging = RabbitMessaging(MODEL_EXCHANGE_NAME)
+    MaaSMessenger(Node, messaging.getExchange()).register()
+else:
+    session = None

=== modified file 'src/maasserver/models.py'
--- src/maasserver/models.py	2012-03-12 07:25:31 +0000
+++ src/maasserver/models.py	2012-03-12 16:43:38 +0000
@@ -858,3 +858,6 @@
 from maasserver import provisioning
 # We mention 'provisioning' here to silence lint warnings.
 provisioning
+
+from maasserver import messages
+messages

=== added file 'src/maasserver/rabbit.py'
--- src/maasserver/rabbit.py	1970-01-01 00:00:00 +0000
+++ src/maasserver/rabbit.py	2012-03-12 16:43:38 +0000
@@ -0,0 +1,103 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Rabbit messaging."""
+
+from __future__ import (
+    print_function,
+    unicode_literals,
+    )
+
+__metaclass__ = type
+__all__ = [
+    "RabbitExchange",
+    "RabbitQueue",
+    "RabbitMessaging",
+    "RabbitSession",
+    ]
+
+
+from amqplib import client_0_8 as amqp
+from django.conf import settings
+
+
+def connect():
+    """Connect to AMQP."""
+    return amqp.Connection(
+        host=settings.RABBITMQ_HOST,
+        userid=settings.RABBITMQ_USERID,
+        password=settings.RABBITMQ_PASSWORD,
+        virtual_host=settings.RABBITMQ_VIRTUAL_HOST,
+        insist=False)
+
+
+class RabbitSession:
+
+    def __init__(self):
+        self._connection = None
+
+    @property
+    def connection(self):
+        if self._connection is None or self._connection.transport is None:
+            self._connection = connect()
+        return self._connection
+
+    def disconnect(self):
+        if self._connection is not None:
+            try:
+                self._connection.close()
+            finally:
+                self._connection = None
+
+
+class RabbitMessaging:
+
+    def __init__(self, exchange_name):
+        self.exchange_name = exchange_name
+        self._session = RabbitSession()
+
+    def getExchange(self):
+        return RabbitExchange(self._session, self.exchange_name)
+
+    def getQueue(self):
+        return RabbitQueue(self._session, self.exchange_name)
+
+
+class RabbitBase:
+
+    def __init__(self, session, exchange_name):
+        self.exchange_name = exchange_name
+        self._session = session
+        self._channel = None
+
+    @property
+    def channel(self):
+        if self._channel is None or not self._channel.is_open:
+            self._channel = self._session.connection.channel()
+            self._channel.exchange_declare(
+                self.exchange_name, type='fanout')
+        return self._channel
+
+
+class RabbitExchange(RabbitBase):
+
+    def publish(self, message):
+        msg = amqp.Message(message)
+        # Publish to a 'fanout' exchange: routing_key is ''.
+        self.channel.basic_publish(
+            exchange=self.exchange_name, routing_key='', msg=msg)
+
+
+class RabbitQueue(RabbitBase):
+
+    def __init__(self, session, exchange_name):
+        super(RabbitQueue, self).__init__(session, exchange_name)
+        self.queue_name = self.channel.queue_declare(
+            nowait=False, auto_delete=False,
+            arguments={"x-expires": 300000})[0]
+        self.channel.queue_bind(
+            exchange=self.exchange_name, queue=self.queue_name)
+
+    @property
+    def name(self):
+        return self.queue_name

=== modified file 'src/maasserver/tests/models.py'
--- src/maasserver/tests/models.py	2012-02-27 16:54:22 +0000
+++ src/maasserver/tests/models.py	2012-03-12 16:43:38 +0000
@@ -20,3 +20,7 @@
 class JSONFieldModel(models.Model):
     name = models.CharField(max_length=255, unique=False)
     value = JSONObjectField(null=True)
+
+
+class MessagesTestModel(models.Model):
+    name = models.CharField(max_length=255, unique=False)

=== added file 'src/maasserver/tests/test_messages.py'
--- src/maasserver/tests/test_messages.py	1970-01-01 00:00:00 +0000
+++ src/maasserver/tests/test_messages.py	2012-03-12 16:43:38 +0000
@@ -0,0 +1,117 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Test maasserver messages."""
+
+from __future__ import (
+    print_function,
+    unicode_literals,
+    )
+
+__metaclass__ = type
+__all__ = []
+
+import json
+
+from maasserver.messages import (
+    MaaSMessenger,
+    MESSENGER_EVENT,
+    MessengerBase,
+    )
+from maasserver.models import Node
+from maasserver.testing import TestModelTestCase
+from maasserver.testing.factory import factory
+from maasserver.tests.models import MessagesTestModel
+
+
+class FakeProducer:
+    """A fake RabbitProducer that simply records published messages."""
+
+    def __init__(self):
+        self.messages = []
+
+    def publish(self, message):
+        self.messages.append(message)
+
+
+class TestMessenger(MessengerBase):
+
+    def create_msg(self, event_name, instance):
+        return [event_name, instance]
+
+
+class MessengerBaseTest(TestModelTestCase):
+
+    app = 'maasserver.tests'
+
+    def test_update_obj_publishes_message_if_created(self):
+        producer = FakeProducer()
+        messenger = TestMessenger(MessagesTestModel, producer)
+        instance = factory.getRandomString()
+        messenger.update_obj(MessagesTestModel, instance, True)
+        self.assertEqual(
+            [[MESSENGER_EVENT.CREATED, instance]], producer.messages)
+
+    def test_update_obj_publishes_message_if_not_created(self):
+        producer = FakeProducer()
+        messenger = TestMessenger(MessagesTestModel, producer)
+        instance = factory.getRandomString()
+        messenger.update_obj(MessagesTestModel, instance, False)
+        self.assertEqual(
+            [[MESSENGER_EVENT.UPDATED, instance]], producer.messages)
+
+    def test_delete_obj_publishes_message(self):
+        producer = FakeProducer()
+        messenger = TestMessenger(MessagesTestModel, producer)
+        instance = factory.getRandomString()
+        messenger.delete_obj(MessagesTestModel, instance)
+        self.assertEqual(
+            [[MESSENGER_EVENT.DELETED, instance]], producer.messages)
+
+    def test_register_registers_update_signal(self):
+        producer = FakeProducer()
+        messenger = TestMessenger(MessagesTestModel, producer)
+        messenger.register()
+        obj = MessagesTestModel(name=factory.getRandomString())
+        obj.save()
+        self.assertEqual(
+            [[MESSENGER_EVENT.CREATED, obj]], producer.messages)
+
+    def test_register_registers_delete_signal(self):
+        obj = MessagesTestModel(name=factory.getRandomString())
+        obj.save()
+        producer = FakeProducer()
+        messenger = TestMessenger(MessagesTestModel, producer)
+        messenger.register()
+        obj.delete()
+        self.assertEqual(
+            [[MESSENGER_EVENT.DELETED, obj]], producer.messages)
+
+
+class MaaSMessengerTest(TestModelTestCase):
+
+    app = 'maasserver.tests'
+
+    def test_event_key(self):
+        producer = FakeProducer()
+        event_name = factory.getRandomString()
+        obj = MessagesTestModel(name=factory.getRandomString())
+        messenger = MaaSMessenger(MessagesTestModel, producer)
+        self.assertEqual(
+            '%s.%s' % ('MessagesTestModel', event_name),
+            messenger.event_key(event_name, obj))
+
+    def test_create_msg(self):
+        producer = FakeProducer()
+        messenger = MaaSMessenger(Node, producer)
+        event_name = factory.getRandomString()
+        obj_name = factory.getRandomString()
+        obj = MessagesTestModel(name=obj_name)
+        obj.save()
+        msg = messenger.create_msg(event_name, obj)
+        decoded_msg = json.loads(msg)
+        self.assertItemsEqual(['instance', 'event_key'], list(decoded_msg))
+        self.assertItemsEqual(
+            ['id', 'name'], list(decoded_msg['instance']))
+        self.assertItemsEqual(
+            obj_name, decoded_msg['instance']['name'])

=== added file 'src/maasserver/tests/test_rabbit.py'
--- src/maasserver/tests/test_rabbit.py	1970-01-01 00:00:00 +0000
+++ src/maasserver/tests/test_rabbit.py	2012-03-12 16:43:38 +0000
@@ -0,0 +1,143 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Rabbit messaging tests."""
+
+from __future__ import (
+    print_function,
+    unicode_literals,
+    )
+
+__metaclass__ = type
+__all__ = []
+
+
+from amqplib import client_0_8 as amqp
+from maasserver import rabbit
+from maasserver.rabbit import (
+    RabbitBase,
+    RabbitExchange,
+    RabbitMessaging,
+    RabbitQueue,
+    RabbitSession,
+    )
+from maasserver.testing.factory import factory
+from maastesting import TestCase
+from rabbitfixture.server import RabbitServer
+
+
+class RabbitTestCase(TestCase):
+
+    def setUp(self):
+        super(RabbitTestCase, self).setUp()
+        self.rabbit_server = self.useFixture(RabbitServer())
+        self.rabbit_env = self.rabbit_server.runner.environment
+        self.old_connect = rabbit.connect
+        rabbit.connect = self.rabbit_env.get_connection
+
+    def tearDown(self):
+        super(RabbitTestCase, self).tearDown()
+        rabbit.connect = self.old_connect
+
+    def get_command_output(self, command):
+        # Returns the output of the given rabbit command.
+        return self.rabbit_env.rabbitctl(str(command))[0]
+
+
+class TestRabbitSession(RabbitTestCase):
+
+    def test_session_connection(self):
+        session = RabbitSession()
+        # Referencing the connection property causes a connection to be
+        # created.
+        connection = session.connection
+        self.assertIsNotNone(session._connection)
+        # The same connection is returned every time.
+        self.assertIs(connection, session.connection)
+
+    def test_session_disconnect(self):
+        session = RabbitSession()
+        session.disconnect()
+        self.assertIsNone(session._connection)
+
+
+class TestRabbitMessaging(RabbitTestCase):
+
+    def test_messaging_getExchange(self):
+        exchange_name = factory.getRandomString()
+        messaging = RabbitMessaging(exchange_name)
+        exchange = messaging.getExchange()
+        self.assertTrue(isinstance(exchange, RabbitExchange))
+        self.assertEqual(messaging._session, exchange._session)
+        self.assertEqual(exchange_name, exchange.exchange_name)
+
+    def test_messaging_getQueue(self):
+        exchange_name = factory.getRandomString()
+        messaging = RabbitMessaging(exchange_name)
+        queue = messaging.getQueue()
+        self.assertTrue(isinstance(queue, RabbitQueue))
+        self.assertEqual(messaging._session, queue._session)
+        self.assertEqual(exchange_name, queue.exchange_name)
+
+
+class TestRabbitBase(RabbitTestCase):
+
+    def test_rabbitbase_contains_session(self):
+        exchange_name = factory.getRandomString()
+        rabbitbase = RabbitBase(RabbitSession(), exchange_name)
+        self.assertTrue(isinstance(rabbitbase._session, RabbitSession))
+
+    def test_base_has_exchange_name(self):
+        exchange_name = factory.getRandomString()
+        rabbitbase = RabbitBase(RabbitSession(), exchange_name)
+        self.assertEqual(exchange_name, rabbitbase.exchange_name)
+
+    def test_base_channel(self):
+        rabbitbase = RabbitBase(RabbitSession(), factory.getRandomString())
+        # Referencing the channel property causes an open channel to be
+        # created.
+        channel = rabbitbase.channel
+        self.assertTrue(channel.is_open)
+        self.assertIsNotNone(rabbitbase._session._connection)
+        # The same channel is returned every time.
+        self.assertIs(channel, rabbitbase.channel)
+
+    def test_base_channel_creates_exchange(self):
+        exchange_name = factory.getRandomString()
+        rabbitbase = RabbitBase(RabbitSession(), exchange_name)
+        rabbitbase.channel
+        self.assertIn(
+            exchange_name,
+            self.get_command_output('list_exchanges'))
+
+
+class TestRabbitExchange(RabbitTestCase):
+
+    def test_exchange_publish(self):
+        exchange_name = factory.getRandomString()
+        message_content = factory.getRandomString()
+        exchange = RabbitExchange(RabbitSession(), exchange_name)
+
+        channel = RabbitBase(RabbitSession(), exchange_name).channel
+        queue_name = channel.queue_declare(auto_delete=True)[0]
+        channel.queue_bind(exchange=exchange_name, queue=queue_name)
+        exchange.publish(message_content)
+        message = channel.basic_get(queue_name)
+        self.assertEqual(message_content, message.body)
+
+
+class TestRabbitQueue(RabbitTestCase):
+
+    def test_rabbit_queue_binds_queue(self):
+        exchange_name = factory.getRandomString()
+        message_content = factory.getRandomString()
+        queue = RabbitQueue(RabbitSession(), exchange_name)
+
+        # Publish to queue.name.
+        base = RabbitBase(RabbitSession(), exchange_name)
+        channel = base.channel
+        msg = amqp.Message(message_content)
+        channel.basic_publish(
+            exchange=exchange_name, routing_key='', msg=msg)
+        message = channel.basic_get(queue.name)
+        self.assertEqual(message_content, message.body)