launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #03706
[Merge] lp:~allenap/launchpad/job-start-cleanup into lp:launchpad
Gavin Panella has proposed merging lp:~allenap/launchpad/job-start-cleanup into lp:launchpad with lp:~allenap/launchpad/job-start-bunny-lint as a prerequisite.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~allenap/launchpad/job-start-cleanup/+merge/61948
A few style clean-ups of the RabbitMQ fixture stuff I did while reading the code and trying to learn about RabbitMQ. Includes a fix for a tight loop when starting the server.
--
https://code.launchpad.net/~allenap/launchpad/job-start-cleanup/+merge/61948
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~allenap/launchpad/job-start-cleanup into lp:launchpad.
=== modified file 'lib/lp/services/rabbit/testing/server.py'
--- lib/lp/services/rabbit/testing/server.py 2011-05-23 10:09:33 +0000
+++ lib/lp/services/rabbit/testing/server.py 2011-05-23 10:09:34 +0000
@@ -29,41 +29,41 @@
RABBITBIN = "/usr/lib/rabbitmq/bin"
-def setup_exchange(conf, port):
- """ create an exchange """
- # Not ported yet.
- conn = _get_connection(conf, port)
- # see if we already have the exchange
- must_create = False
- chan = conn.channel()
- try:
- chan.exchange_declare(exchange=conf.exchange_name + BRANCH_NICK,
- type=conf.exchange_type, passive=True)
- except (amqp.AMQPConnectionException, amqp.AMQPChannelException), e:
- if e.amqp_reply_code == 404:
- must_create = True
- # amqplib kills the channel on error.... we dispose of it too
- chan.close()
- chan = conn.channel()
- else:
- raise
- # now create the exchange if needed
- if must_create:
- chan.exchange_declare(exchange=conf.exchange_name + BRANCH_NICK,
- type=conf.exchange_type,
- durable=True, auto_delete=False,)
- print "Created new exchange %s (%s)" % (
- conf.exchange_name + BRANCH_NICK, conf.exchange_type)
- else:
- print "Exchange %s (%s) is already declared" % (
- conf.exchange_name + BRANCH_NICK, conf.exchange_type)
- chan.close()
- conn.close()
- return True
+# def setup_exchange(conf, port):
+# """ create an exchange """
+# # Not ported yet.
+# conn = _get_connection(conf, port)
+# # see if we already have the exchange
+# must_create = False
+# chan = conn.channel()
+# try:
+# chan.exchange_declare(exchange=conf.exchange_name + BRANCH_NICK,
+# type=conf.exchange_type, passive=True)
+# except (amqp.AMQPConnectionException, amqp.AMQPChannelException), e:
+# if e.amqp_reply_code == 404:
+# must_create = True
+# # amqplib kills the channel on error.... we dispose of it too
+# chan.close()
+# chan = conn.channel()
+# else:
+# raise
+# # now create the exchange if needed
+# if must_create:
+# chan.exchange_declare(exchange=conf.exchange_name + BRANCH_NICK,
+# type=conf.exchange_type,
+# durable=True, auto_delete=False,)
+# print "Created new exchange %s (%s)" % (
+# conf.exchange_name + BRANCH_NICK, conf.exchange_type)
+# else:
+# print "Exchange %s (%s) is already declared" % (
+# conf.exchange_name + BRANCH_NICK, conf.exchange_type)
+# chan.close()
+# conn.close()
+# return True
def os_exec(*args):
- """ warpper for os.execve() that catches execution errors """
+ """Wrapper for `os.execve()` that catches execution errors."""
try:
os.execv(args[0], args)
os._exit(1)
@@ -74,7 +74,7 @@
def daemon(name, logfilename, pidfilename, *args, **kwargs):
- """Execute a double fork to start up a daemon """
+ """Execute a double fork to start up a daemon."""
# fork 1 - close fds and start new process group
pid = os.fork()
@@ -120,49 +120,47 @@
os_exec(*args)
-def status():
- """ provides status information about the RabbitMQ server """
- # Not ported yet.
- nodename = _get_nodename()
- if not _check_running():
- print "ERROR: RabbitMQ node %s is not running" % nodename
- return
- for act in ["list_exchanges", "list_queues"]:
- outstr, errstr = _rabbitctl(act, strip=True)
- if errstr:
- print >> sys.stderr, errstr
- if outstr:
- print outstr
- return
+# def status():
+# """ provides status information about the RabbitMQ server """
+# # Not ported yet.
+# nodename = _get_nodename()
+# if not _check_running():
+# print "ERROR: RabbitMQ node %s is not running" % nodename
+# return
+# for act in ["list_exchanges", "list_queues"]:
+# outstr, errstr = _rabbitctl(act, strip=True)
+# if errstr:
+# print >> sys.stderr, errstr
+# if outstr:
+# print outstr
+# return
def allocate_ports(n=1):
- """
- Allocate n unused ports
+ """Allocate `n` unused ports.
- There is a small race condition here (between the time we allocate
- the port, and the time it actually gets used), but for the purposes
- for which this function gets used it isn't a problem in practice.
+ There is a small race condition here (between the time we allocate the
+ port, and the time it actually gets used), but for the purposes for which
+ this function gets used it isn't a problem in practice.
"""
sockets = map(lambda _: socket.socket(), xrange(n))
try:
for s in sockets:
s.bind(('localhost', 0))
- ports = map(lambda s: s.getsockname()[1], sockets)
+ return map(lambda s: s.getsockname()[1], sockets)
finally:
for s in sockets:
s.close()
- return ports
class AllocateRabbitServer(Fixture):
- """Allocate the resources a rabbit server needs.
+ """Allocate the resources a RabbitMQ server needs.
- :ivar hostname: The host the rabbit is on (always localhost for
- AllocateRabbitServer).
+ :ivar hostname: The host the RabbitMQ is on (always localhost for
+ `AllocateRabbitServer`).
:ivar port: A port that was free at the time setUp() was called.
- :ivar rabbitdir: A directory to put the rabbit logs in.
- :ivar mnesiadir: A directory for the rabbit db.
+ :ivar rabbitdir: A directory to put the RabbitMQ logs in.
+ :ivar mnesiadir: A directory for the RabbitMQ db.
:ivar logfile: The logfile allocated for the server.
:ivar pidfile: The file the pid should be written to.
:ivar nodename: The name of the node.
@@ -177,8 +175,9 @@
self.pidfile = os.path.join(self.rabbitdir, 'rabbit.pid')
self.nodename = os.path.basename(self.useFixture(TempDir()).path)
+ @property
def fq_nodename(self):
- """Get the node of the rabbit that is being exported."""
+ """The node of the RabbitMQ that is being exported."""
# Note that socket.gethostname is recommended by the rabbitctl manpage
# even though we're always on localhost, its what the erlang cluster
# code wants.
@@ -188,15 +187,17 @@
class ExportRabbitServer(Fixture):
"""Export the environment variables needed to talk to a RabbitMQ instance.
- When setup this exports the key rabbit variables::
- * RABBITMQ_MNESIA_BASE
- * RABBITMQ_LOG_BASE
- * RABBITMQ_NODE_PORT
- * RABBITMQ_NODENAME
+ When setup this exports the key RabbitMQ variables:
+
+ - ``RABBITMQ_MNESIA_BASE``
+ - ``RABBITMQ_LOG_BASE``
+ - ``RABBITMQ_NODE_PORT``
+ - ``RABBITMQ_NODENAME``
+
"""
def __init__(self, config):
- """Create a ExportRabbitServer instance.
+ """Create a `ExportRabbitServer` instance.
:param config: An object exporting the variables
`AllocateRabbitServer` exports.
@@ -216,9 +217,10 @@
"RABBITMQ_NODENAME", self.config.nodename))
self._errors = []
self.addDetail('rabbit-errors',
- Content(UTF8_TEXT, self._get_errors))
+ Content(UTF8_TEXT, self._getErrors))
- def _get_errors(self):
+ def _getErrors(self):
+ """Yield all errors as UTF-8 encoded text."""
for error in self._errors:
if type(error) is unicode:
yield error.encode('utf8')
@@ -227,9 +229,9 @@
yield '\n'
def rabbitctl(self, command, strip=False):
- """ executes a rabbitctl command and returns status """
+ """Executes a ``rabbitctl`` command and returns status."""
ctlbin = os.path.join(RABBITBIN, "rabbitmqctl")
- nodename = self.config.fq_nodename()
+ nodename = self.config.fq_nodename
env = dict(os.environ)
env['HOME'] = self.config.rabbitdir
ctl = subprocess.Popen(
@@ -240,9 +242,9 @@
return outstr.strip(), errstr.strip()
return outstr, errstr
- def check_running(self):
- """ checks that the rabbitmq process is up and running """
- nodename = self.config.fq_nodename()
+ def checkRunning(self):
+ """Checks that RabbitMQ is up and running."""
+ nodename = self.config.fq_nodename
outdata, errdata = self.rabbitctl("status")
if errdata:
self._errors.append(errdata)
@@ -270,26 +272,25 @@
found_node = match.group('nodename')
return found_node == nodename
- def get_connection(self):
+ def getConnection(self):
"""Get an AMQP connection to the RabbitMQ server.
:raises socket.error: If the connection cannot be made.
"""
host_port = "%s:%s" % (self.config.hostname, self.config.port)
- conn = amqp.Connection(
+ return amqp.Connection(
host=host_port, userid="guest",
password="guest", virtual_host="/", insist=False)
- return conn
class RunRabbitServer(Fixture):
- """Run a rabbit server.
+ """Run a RabbitMQ server.
:ivar pid: The pid of the server.
"""
def __init__(self, config):
- """Create a RunRabbitServer instance.
+ """Create a `RunRabbitServer` instance.
:param config: An object exporting the variables
`AllocateRabbitServer` exports.
@@ -299,12 +300,15 @@
def setUp(self):
super(RunRabbitServer, self).setUp()
- self.rabbit = self.useFixture(ExportRabbitServer(self.config))
+ self.server = self.useFixture(ExportRabbitServer(self.config))
# Workaround fixtures not adding details from used fixtures.
self.addDetail('rabbitctl errors',
- Content(UTF8_TEXT, self.rabbit._get_errors))
+ Content(UTF8_TEXT, self.server._getErrors))
self.addDetail('rabbit log file',
content_from_file(self.config.logfile))
+ self.start()
+
+ def start(self):
cmd = os.path.join(RABBITBIN, 'rabbitmq-server')
name = "RabbitMQ server node:%s on port:%d" % (
self.config.nodename, self.config.port)
@@ -313,13 +317,13 @@
# Wait for the server to come up...
timeout = time.time() + 5
while time.time() < timeout:
- if self.rabbit.check_running():
+ if self.server.checkRunning():
break
time.sleep(0.3)
else:
raise Exception(
"Timeout waiting for RabbitMQ OTP server to start.")
- # The erlang OTP is up, but rabbit may not be usable. We need to
+ # The erlang OTP is up, but RabbitMQ may not be usable. We need to
# cleanup up the process from here on in even if the full service
# fails to get together.
self.addCleanup(self.stop)
@@ -328,7 +332,7 @@
# rabbitctl can say a node is up before it is ready to
# accept connections ... :-(
try:
- conn = self.rabbit.get_connection()
+ conn = self.getConnection()
except socket.error:
time.sleep(0.1)
else:
@@ -343,10 +347,10 @@
def stop(self):
"""Stop the running server. Normally called by cleanups."""
- if not self.rabbit.check_running():
+ if not self.server.checkRunning():
# If someone has shut it down already, we're done.
return
- outstr, errstr = self.rabbit.rabbitctl("stop", strip=True)
+ outstr, errstr = self.server.rabbitctl("stop", strip=True)
if outstr:
self.addDetail('stop-out', Content(UTF8_TEXT, lambda: [outstr]))
if errstr:
@@ -354,7 +358,7 @@
# Wait for the server to go down...
timeout = time.time() + 15
while time.time() < timeout:
- if not self.rabbit.check_running():
+ if not self.server.checkRunning():
break
time.sleep(0.3)
else:
@@ -374,21 +378,27 @@
raise Exception(
"RabbitMQ (pid=%d) did not quit." % (self.pid,))
+ def getConnection(self):
+ return self.server.getConnection()
+
class RabbitServer(Fixture):
"""A RabbitMQ server fixture.
- When setup a rabbit instance will be running and the environment variables
- needed to talk to it will be already configured.
+ When setup a RabbitMQ instance will be running and the environment
+ variables needed to talk to it will be already configured.
- :ivar config: The `AllocateRabbitServer` used to start the rabbit.
+ :ivar config: The `AllocateRabbitServer` used to start the server.
+ :ivar runner: The `RunRabbitServer` that bootstraps the server.
"""
def setUp(self):
super(RabbitServer, self).setUp()
self.config = self.useFixture(AllocateRabbitServer())
- self.server = RunRabbitServer(self.config)
- self.useFixture(self.server)
+ self.runner = self.useFixture(RunRabbitServer(self.config))
def getDetails(self):
- return self.server.getDetails()
+ return self.runner.getDetails()
+
+ def getConnection(self):
+ return self.runner.getConnection()
=== modified file 'lib/lp/services/rabbit/tests/test_fixture.py'
--- lib/lp/services/rabbit/tests/test_fixture.py 2011-05-11 00:49:23 +0000
+++ lib/lp/services/rabbit/tests/test_fixture.py 2011-05-23 10:09:34 +0000
@@ -7,11 +7,10 @@
import socket
-from amqplib import client_0_8 as amqp
from fixtures import EnvironmentVariableFixture
+from lp.services.rabbit.testing.server import RabbitServer
from lp.testing import TestCase
-from lp.services.rabbit.testing.server import RabbitServer
class TestRabbitFixture(TestCase):
@@ -28,10 +27,7 @@
self.addCleanup(self._gather_details, fixture.getDetails)
fixture.setUp()
# We can connect.
- host = 'localhost:%s' % fixture.config.port
- conn = amqp.Connection(host=host, userid="guest",
- password="guest", virtual_host="/", insist=False)
- conn.close()
+ fixture.getConnection().close()
# And get a log file
log = fixture.getDetails()['rabbit log file']
# Which shouldn't blow up on iteration.
@@ -39,5 +35,4 @@
finally:
fixture.cleanUp()
# The daemon should be closed now.
- self.assertRaises(socket.error, amqp.Connection, host=host,
- userid="guest", password="guest", virtual_host="/", insist=False)
+ self.assertRaises(socket.error, fixture.getConnection)