← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~allenap/launchpad/job-start-cleanup into lp:launchpad

 

Gavin Panella has proposed merging lp:~allenap/launchpad/job-start-cleanup into lp:launchpad with lp:~allenap/launchpad/job-start-bunny-lint as a prerequisite.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~allenap/launchpad/job-start-cleanup/+merge/61948

A few style clean-ups of the RabbitMQ fixture stuff I did while reading the code and trying to learn about RabbitMQ. Includes a fix for a tight loop when starting the server.
-- 
https://code.launchpad.net/~allenap/launchpad/job-start-cleanup/+merge/61948
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~allenap/launchpad/job-start-cleanup into lp:launchpad.
=== modified file 'lib/lp/services/rabbit/testing/server.py'
--- lib/lp/services/rabbit/testing/server.py	2011-05-23 10:09:33 +0000
+++ lib/lp/services/rabbit/testing/server.py	2011-05-23 10:09:34 +0000
@@ -29,41 +29,41 @@
 RABBITBIN = "/usr/lib/rabbitmq/bin"
 
 
-def setup_exchange(conf, port):
-    """ create an exchange """
-    # Not ported yet.
-    conn = _get_connection(conf, port)
-    # see if we already have the exchange
-    must_create = False
-    chan = conn.channel()
-    try:
-        chan.exchange_declare(exchange=conf.exchange_name + BRANCH_NICK,
-                              type=conf.exchange_type, passive=True)
-    except (amqp.AMQPConnectionException, amqp.AMQPChannelException), e:
-        if e.amqp_reply_code == 404:
-            must_create = True
-            # amqplib kills the channel on error.... we dispose of it too
-            chan.close()
-            chan = conn.channel()
-        else:
-            raise
-    # now create the exchange if needed
-    if must_create:
-        chan.exchange_declare(exchange=conf.exchange_name + BRANCH_NICK,
-                              type=conf.exchange_type,
-                              durable=True, auto_delete=False,)
-        print "Created new exchange %s (%s)" % (
-            conf.exchange_name + BRANCH_NICK, conf.exchange_type)
-    else:
-        print "Exchange %s (%s) is already declared" % (
-            conf.exchange_name + BRANCH_NICK, conf.exchange_type)
-    chan.close()
-    conn.close()
-    return True
+# def setup_exchange(conf, port):
+#     """ create an exchange """
+#     # Not ported yet.
+#     conn = _get_connection(conf, port)
+#     # see if we already have the exchange
+#     must_create = False
+#     chan = conn.channel()
+#     try:
+#         chan.exchange_declare(exchange=conf.exchange_name + BRANCH_NICK,
+#                               type=conf.exchange_type, passive=True)
+#     except (amqp.AMQPConnectionException, amqp.AMQPChannelException), e:
+#         if e.amqp_reply_code == 404:
+#             must_create = True
+#             # amqplib kills the channel on error.... we dispose of it too
+#             chan.close()
+#             chan = conn.channel()
+#         else:
+#             raise
+#     # now create the exchange if needed
+#     if must_create:
+#         chan.exchange_declare(exchange=conf.exchange_name + BRANCH_NICK,
+#                               type=conf.exchange_type,
+#                               durable=True, auto_delete=False,)
+#         print "Created new exchange %s (%s)" % (
+#             conf.exchange_name + BRANCH_NICK, conf.exchange_type)
+#     else:
+#         print "Exchange %s (%s) is already declared" % (
+#             conf.exchange_name + BRANCH_NICK, conf.exchange_type)
+#     chan.close()
+#     conn.close()
+#     return True
 
 
 def os_exec(*args):
-    """ warpper for os.execve() that catches execution errors """
+    """Wrapper for `os.execve()` that catches execution errors."""
     try:
         os.execv(args[0], args)
         os._exit(1)
@@ -74,7 +74,7 @@
 
 
 def daemon(name, logfilename, pidfilename, *args, **kwargs):
-    """Execute a double fork to start up a daemon """
+    """Execute a double fork to start up a daemon."""
 
     # fork 1 - close fds and start new process group
     pid = os.fork()
@@ -120,49 +120,47 @@
     os_exec(*args)
 
 
-def status():
-    """ provides status information about the RabbitMQ server """
-    # Not ported yet.
-    nodename = _get_nodename()
-    if not _check_running():
-        print "ERROR: RabbitMQ node %s is not running" % nodename
-        return
-    for act in ["list_exchanges", "list_queues"]:
-        outstr, errstr = _rabbitctl(act, strip=True)
-        if errstr:
-            print >> sys.stderr, errstr
-        if outstr:
-            print outstr
-    return
+# def status():
+#     """ provides status information about the RabbitMQ server """
+#     # Not ported yet.
+#     nodename = _get_nodename()
+#     if not _check_running():
+#         print "ERROR: RabbitMQ node %s is not running" % nodename
+#         return
+#     for act in ["list_exchanges", "list_queues"]:
+#         outstr, errstr = _rabbitctl(act, strip=True)
+#         if errstr:
+#             print >> sys.stderr, errstr
+#         if outstr:
+#             print outstr
+#     return
 
 
 def allocate_ports(n=1):
-    """
-    Allocate n unused ports
+    """Allocate `n` unused ports.
 
-    There is a small race condition here (between the time we allocate
-    the port, and the time it actually gets used), but for the purposes
-    for which this function gets used it isn't a problem in practice.
+    There is a small race condition here (between the time we allocate the
+    port, and the time it actually gets used), but for the purposes for which
+    this function gets used it isn't a problem in practice.
     """
     sockets = map(lambda _: socket.socket(), xrange(n))
     try:
         for s in sockets:
             s.bind(('localhost', 0))
-        ports = map(lambda s: s.getsockname()[1], sockets)
+        return map(lambda s: s.getsockname()[1], sockets)
     finally:
         for s in sockets:
             s.close()
-    return ports
 
 
 class AllocateRabbitServer(Fixture):
-    """Allocate the resources a rabbit server needs.
+    """Allocate the resources a RabbitMQ server needs.
 
-    :ivar hostname: The host the rabbit is on (always localhost for
-        AllocateRabbitServer).
+    :ivar hostname: The host the RabbitMQ is on (always localhost for
+        `AllocateRabbitServer`).
     :ivar port: A port that was free at the time setUp() was called.
-    :ivar rabbitdir: A directory to put the rabbit logs in.
-    :ivar mnesiadir: A directory for the rabbit db.
+    :ivar rabbitdir: A directory to put the RabbitMQ logs in.
+    :ivar mnesiadir: A directory for the RabbitMQ db.
     :ivar logfile: The logfile allocated for the server.
     :ivar pidfile: The file the pid should be written to.
     :ivar nodename: The name of the node.
@@ -177,8 +175,9 @@
         self.pidfile = os.path.join(self.rabbitdir, 'rabbit.pid')
         self.nodename = os.path.basename(self.useFixture(TempDir()).path)
 
+    @property
     def fq_nodename(self):
-        """Get the node of the rabbit that is being exported."""
+        """The node of the RabbitMQ that is being exported."""
         # Note that socket.gethostname is recommended by the rabbitctl manpage
         # even though we're always on localhost, its what the erlang cluster
         # code wants.
@@ -188,15 +187,17 @@
 class ExportRabbitServer(Fixture):
     """Export the environment variables needed to talk to a RabbitMQ instance.
 
-    When setup this exports the key rabbit variables::
-     * RABBITMQ_MNESIA_BASE
-     * RABBITMQ_LOG_BASE
-     * RABBITMQ_NODE_PORT
-     * RABBITMQ_NODENAME
+    When setup this exports the key RabbitMQ variables:
+
+    - ``RABBITMQ_MNESIA_BASE``
+    - ``RABBITMQ_LOG_BASE``
+    - ``RABBITMQ_NODE_PORT``
+    - ``RABBITMQ_NODENAME``
+
     """
 
     def __init__(self, config):
-        """Create a ExportRabbitServer instance.
+        """Create a `ExportRabbitServer` instance.
 
         :param config: An object exporting the variables
             `AllocateRabbitServer` exports.
@@ -216,9 +217,10 @@
             "RABBITMQ_NODENAME", self.config.nodename))
         self._errors = []
         self.addDetail('rabbit-errors',
-            Content(UTF8_TEXT, self._get_errors))
+            Content(UTF8_TEXT, self._getErrors))
 
-    def _get_errors(self):
+    def _getErrors(self):
+        """Yield all errors as UTF-8 encoded text."""
         for error in self._errors:
             if type(error) is unicode:
                 yield error.encode('utf8')
@@ -227,9 +229,9 @@
             yield '\n'
 
     def rabbitctl(self, command, strip=False):
-        """ executes a rabbitctl command and returns status """
+        """Executes a ``rabbitctl`` command and returns status."""
         ctlbin = os.path.join(RABBITBIN, "rabbitmqctl")
-        nodename = self.config.fq_nodename()
+        nodename = self.config.fq_nodename
         env = dict(os.environ)
         env['HOME'] = self.config.rabbitdir
         ctl = subprocess.Popen(
@@ -240,9 +242,9 @@
             return outstr.strip(), errstr.strip()
         return outstr, errstr
 
-    def check_running(self):
-        """ checks that the rabbitmq process is up and running """
-        nodename = self.config.fq_nodename()
+    def checkRunning(self):
+        """Checks that RabbitMQ is up and running."""
+        nodename = self.config.fq_nodename
         outdata, errdata = self.rabbitctl("status")
         if errdata:
             self._errors.append(errdata)
@@ -270,26 +272,25 @@
         found_node = match.group('nodename')
         return found_node == nodename
 
-    def get_connection(self):
+    def getConnection(self):
         """Get an AMQP connection to the RabbitMQ server.
 
         :raises socket.error: If the connection cannot be made.
         """
         host_port = "%s:%s" % (self.config.hostname, self.config.port)
-        conn = amqp.Connection(
+        return amqp.Connection(
             host=host_port, userid="guest",
             password="guest", virtual_host="/", insist=False)
-        return conn
 
 
 class RunRabbitServer(Fixture):
-    """Run a rabbit server.
+    """Run a RabbitMQ server.
 
     :ivar pid: The pid of the server.
     """
 
     def __init__(self, config):
-        """Create a RunRabbitServer instance.
+        """Create a `RunRabbitServer` instance.
 
         :param config: An object exporting the variables
             `AllocateRabbitServer` exports.
@@ -299,12 +300,15 @@
 
     def setUp(self):
         super(RunRabbitServer, self).setUp()
-        self.rabbit = self.useFixture(ExportRabbitServer(self.config))
+        self.server = self.useFixture(ExportRabbitServer(self.config))
         # Workaround fixtures not adding details from used fixtures.
         self.addDetail('rabbitctl errors',
-            Content(UTF8_TEXT, self.rabbit._get_errors))
+            Content(UTF8_TEXT, self.server._getErrors))
         self.addDetail('rabbit log file',
             content_from_file(self.config.logfile))
+        self.start()
+
+    def start(self):
         cmd = os.path.join(RABBITBIN, 'rabbitmq-server')
         name = "RabbitMQ server node:%s on port:%d" % (
             self.config.nodename, self.config.port)
@@ -313,13 +317,13 @@
         # Wait for the server to come up...
         timeout = time.time() + 5
         while time.time() < timeout:
-            if self.rabbit.check_running():
+            if self.server.checkRunning():
                 break
             time.sleep(0.3)
         else:
             raise Exception(
                 "Timeout waiting for RabbitMQ OTP server to start.")
-        # The erlang OTP is up, but rabbit may not be usable. We need to
+        # The erlang OTP is up, but RabbitMQ may not be usable. We need to
         # cleanup up the process from here on in even if the full service
         # fails to get together.
         self.addCleanup(self.stop)
@@ -328,7 +332,7 @@
             # rabbitctl can say a node is up before it is ready to
             # accept connections ... :-(
             try:
-                conn = self.rabbit.get_connection()
+                conn = self.getConnection()
             except socket.error:
                 time.sleep(0.1)
             else:
@@ -343,10 +347,10 @@
 
     def stop(self):
         """Stop the running server. Normally called by cleanups."""
-        if not self.rabbit.check_running():
+        if not self.server.checkRunning():
             # If someone has shut it down already, we're done.
             return
-        outstr, errstr = self.rabbit.rabbitctl("stop", strip=True)
+        outstr, errstr = self.server.rabbitctl("stop", strip=True)
         if outstr:
             self.addDetail('stop-out', Content(UTF8_TEXT, lambda: [outstr]))
         if errstr:
@@ -354,7 +358,7 @@
         # Wait for the server to go down...
         timeout = time.time() + 15
         while time.time() < timeout:
-            if not self.rabbit.check_running():
+            if not self.server.checkRunning():
                 break
             time.sleep(0.3)
         else:
@@ -374,21 +378,27 @@
             raise Exception(
                 "RabbitMQ (pid=%d) did not quit." % (self.pid,))
 
+    def getConnection(self):
+        return self.server.getConnection()
+
 
 class RabbitServer(Fixture):
     """A RabbitMQ server fixture.
 
-    When setup a rabbit instance will be running and the environment variables
-    needed to talk to it will be already configured.
+    When setup a RabbitMQ instance will be running and the environment
+    variables needed to talk to it will be already configured.
 
-    :ivar config: The `AllocateRabbitServer` used to start the rabbit.
+    :ivar config: The `AllocateRabbitServer` used to start the server.
+    :ivar runner: The `RunRabbitServer` that bootstraps the server.
     """
 
     def setUp(self):
         super(RabbitServer, self).setUp()
         self.config = self.useFixture(AllocateRabbitServer())
-        self.server = RunRabbitServer(self.config)
-        self.useFixture(self.server)
+        self.runner = self.useFixture(RunRabbitServer(self.config))
 
     def getDetails(self):
-        return self.server.getDetails()
+        return self.runner.getDetails()
+
+    def getConnection(self):
+        return self.runner.getConnection()

=== modified file 'lib/lp/services/rabbit/tests/test_fixture.py'
--- lib/lp/services/rabbit/tests/test_fixture.py	2011-05-11 00:49:23 +0000
+++ lib/lp/services/rabbit/tests/test_fixture.py	2011-05-23 10:09:34 +0000
@@ -7,11 +7,10 @@
 
 import socket
 
-from amqplib import client_0_8 as amqp
 from fixtures import EnvironmentVariableFixture
 
+from lp.services.rabbit.testing.server import RabbitServer
 from lp.testing import TestCase
-from lp.services.rabbit.testing.server import RabbitServer
 
 
 class TestRabbitFixture(TestCase):
@@ -28,10 +27,7 @@
             self.addCleanup(self._gather_details, fixture.getDetails)
             fixture.setUp()
             # We can connect.
-            host = 'localhost:%s' % fixture.config.port
-            conn = amqp.Connection(host=host, userid="guest",
-                password="guest", virtual_host="/", insist=False)
-            conn.close()
+            fixture.getConnection().close()
             # And get a log file
             log = fixture.getDetails()['rabbit log file']
             # Which shouldn't blow up on iteration.
@@ -39,5 +35,4 @@
         finally:
             fixture.cleanUp()
         # The daemon should be closed now.
-        self.assertRaises(socket.error, amqp.Connection, host=host,
-            userid="guest", password="guest", virtual_host="/", insist=False)
+        self.assertRaises(socket.error, fixture.getConnection)