← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~allenap/launchpad/rabbit-fixture-cookie-file into lp:launchpad

 

Gavin Panella has proposed merging lp:~allenap/launchpad/rabbit-fixture-cookie-file into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #788557 in Launchpad itself: "RabbitServer fixture not starting in buildbot: "Failed to create cookie file""
  https://bugs.launchpad.net/launchpad/+bug/788557

For more details, see:
https://code.launchpad.net/~allenap/launchpad/rabbit-fixture-cookie-file/+merge/62709

This branch attempts to solve the problem of Erlang not being able to
create its cookie file. From looking at the Erlang source it just
seems to want to create .erlang.cookie in the home directory, and I
really don't know why it could be failing, so I'm attempting two
things:

- When starting the rabbitmq-server process, also set the current
  working directory to the same as the newly configured HOME
  directory.

- I realised there's (probably) no reason to daemonize the RabbitMQ
  server, so I've ripped out all of that complexity, wherein may lie
  bugs, and replaced it with subprocess.Popen().

-- 
https://code.launchpad.net/~allenap/launchpad/rabbit-fixture-cookie-file/+merge/62709
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~allenap/launchpad/rabbit-fixture-cookie-file into lp:launchpad.
=== modified file 'lib/lp/services/rabbit/testing/server.py'
--- lib/lp/services/rabbit/testing/server.py	2011-06-14 10:52:34 +0000
+++ lib/lp/services/rabbit/testing/server.py	2011-06-16 19:55:46 +0000
@@ -9,13 +9,11 @@
     "RabbitServer",
     ]
 
-import errno
 import os
 import re
 import signal
 import socket
 import subprocess
-import sys
 import time
 
 from amqplib import client_0_8 as amqp
@@ -69,83 +67,11 @@
 #     return True
 
 
-def os_exec(*args):
-    """Wrapper for `os.execve()` that catches execution errors."""
-    try:
-        os.execv(args[0], args)
-        os._exit(1)
-    except OSError:
-        sys.stderr.write("\nERROR:\nCould not exec: %s\n" % (args,))
-    # if we reach here, it's an error anyway
-    os._exit(-1)
-
-
-def daemon(name, logfilename, pidfilename, *args, **kwargs):
-    """Execute a double fork to start up a daemon."""
-
-    # fork 1 - close fds and start new process group
-    pid = os.fork()
-    if pid:
-        # parent process - we collect the first child to avoid ghosts.
-        os.waitpid(pid, 0)
-        return
-    # start a new process group and detach ttys
-    # print '## Starting', name, '##'
-    os.setsid()
-
+def preexec_fn():
     # Revert Python's handling of SIGPIPE. See
     # http://bugs.python.org/issue1652 for more info.
     signal.signal(signal.SIGPIPE, signal.SIG_DFL)
 
-    # fork 2 - now detach once more free and clear
-    pid = os.fork()
-    if pid:
-        # this is the first fork - its job is done
-        os._exit(0)
-    # make attempts to read from stdin fail.
-    fnullr = os.open(os.devnull, os.O_RDONLY)
-    os.dup2(fnullr, 0)
-    if fnullr:
-        os.close(fnullr)
-    # open up the logfile and start up the process
-    f = os.open(logfilename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
-    os.dup2(f, 1)
-    os.dup2(f, 2)
-    if f > 2:
-        os.close(f)
-    # With output setup to log we can start running code again.
-    if 'command' in kwargs:
-        args = (kwargs['command'],) + args
-    else:
-        args = ('/usr/bin/env', 'python', '-u',) + args
-    if 'homedir' in kwargs:
-        os.environ['HOME'] = kwargs['homedir']
-    print os.environ['HOME']
-    print os.stat(os.environ['HOME'])
-    # this should get logged
-    print '## Starting %s as %s' % (name, args)
-    # write the pidfile file
-    with open(pidfilename, "w") as pidfile:
-        pidfile.write("%d" % os.getpid())
-        pidfile.flush()
-    os_exec(*args)
-
-
-# def status():
-#     """ provides status information about the RabbitMQ server """
-#     # Not ported yet.
-#     nodename = _get_nodename()
-#     if not _check_running():
-#         print "ERROR: RabbitMQ node %s is not running" % nodename
-#         return
-#     for act in ["list_exchanges", "list_queues"]:
-#         outstr, errstr = _rabbitctl(act, strip=True)
-#         if errstr:
-#             print >> sys.stderr, errstr
-#         if outstr:
-#             print outstr
-#     return
-
 
 def allocate_ports(n=1):
     """Allocate `n` unused ports.
@@ -170,20 +96,18 @@
     :ivar hostname: The host the RabbitMQ is on (always localhost for
         `RabbitServerResources`).
     :ivar port: A port that was free at the time setUp() was called.
-    :ivar rabbitdir: A directory to put the RabbitMQ logs in.
+    :ivar homedir: A directory to put the RabbitMQ logs in.
     :ivar mnesiadir: A directory for the RabbitMQ db.
     :ivar logfile: The logfile allocated for the server.
-    :ivar pidfile: The file the pid should be written to.
     :ivar nodename: The name of the node.
     """
     def setUp(self):
         super(RabbitServerResources, self).setUp()
         self.hostname = 'localhost'
         self.port = allocate_ports()[0]
-        self.rabbitdir = self.useFixture(TempDir()).path
+        self.homedir = self.useFixture(TempDir()).path
         self.mnesiadir = self.useFixture(TempDir()).path
-        self.logfile = os.path.join(self.rabbitdir, 'rabbit.log')
-        self.pidfile = os.path.join(self.rabbitdir, 'rabbit.pid')
+        self.logfile = os.path.join(self.homedir, 'server.log')
         self.nodename = os.path.basename(self.useFixture(TempDir()).path)
 
     @property
@@ -221,7 +145,7 @@
         self.useFixture(EnvironmentVariableFixture(
             "RABBITMQ_MNESIA_BASE", self.config.mnesiadir))
         self.useFixture(EnvironmentVariableFixture(
-            "RABBITMQ_LOG_BASE", self.config.rabbitdir))
+            "RABBITMQ_LOG_BASE", self.config.homedir))
         self.useFixture(EnvironmentVariableFixture(
             "RABBITMQ_NODE_PORT", str(self.config.port)))
         self.useFixture(EnvironmentVariableFixture(
@@ -243,18 +167,18 @@
         """Executes a ``rabbitctl`` command and returns status."""
         ctlbin = os.path.join(RABBITBIN, "rabbitmqctl")
         nodename = self.config.fq_nodename
-        env = dict(os.environ)
-        env['HOME'] = self.config.rabbitdir
+        env = dict(os.environ, HOME=self.config.homedir)
         ctl = subprocess.Popen(
             (ctlbin, "-n", nodename, command), env=env,
-            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+            stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+            preexec_fn=preexec_fn)
         outstr, errstr = ctl.communicate()
         if strip:
             return outstr.strip(), errstr.strip()
         return outstr, errstr
 
-    def check_running(self):
-        """Checks that RabbitMQ is up and running."""
+    def is_node_running(self):
+        """Checks that our RabbitMQ node is up and running."""
         nodename = self.config.fq_nodename
         outdata, errdata = self.rabbitctl("status")
         if errdata:
@@ -295,10 +219,7 @@
 
 
 class RabbitServerRunner(Fixture):
-    """Run a RabbitMQ server.
-
-    :ivar pid: The pid of the server.
-    """
+    """Run a RabbitMQ server."""
 
     def __init__(self, config):
         """Create a `RabbitServerRunner` instance.
@@ -308,6 +229,7 @@
         """
         super(RabbitServerRunner, self).__init__()
         self.config = config
+        self.process = None
 
     def setUp(self):
         super(RabbitServerRunner, self).setUp()
@@ -315,78 +237,108 @@
             RabbitServerEnvironment(self.config))
         self._start()
 
-    def _start(self):
+    def is_running(self):
+        """Is the RabbitMQ server process still running?"""
+        if self.process is None:
+            return False
+        else:
+            return self.process.poll() is None
+
+    def check_running(self):
+        """Checks that the RabbitMQ server process is still running.
+
+        :raises Exception: If it not running.
+        :return: True if it is running.
+        """
+        if self.is_running():
+            return True
+        else:
+            raise Exception("RabbitMQ server is not running.")
+
+    def _spawn(self):
+        """Spawn the RabbitMQ server process."""
         cmd = os.path.join(RABBITBIN, 'rabbitmq-server')
-        name = "RabbitMQ server node:%s on port:%d" % (
-            self.config.nodename, self.config.port)
-        daemon(name, self.config.logfile, self.config.pidfile, command=cmd,
-            homedir=self.config.rabbitdir)
+        env = dict(os.environ, HOME=self.config.homedir)
+        with open(self.config.logfile, "wb") as logfile:
+            with open(os.devnull, "rb") as devnull:
+                self.process = subprocess.Popen(
+                    [cmd], stdin=devnull, stdout=logfile, stderr=logfile,
+                    close_fds=True, cwd=self.config.homedir, env=env,
+                    preexec_fn=preexec_fn)
         self.addDetail(
             os.path.basename(self.config.logfile),
             content_from_file(self.config.logfile))
+
+    def _start(self):
+        """Start the RabbitMQ server."""
+        # Check if Rabbit is already running. In truth this is really to avoid
+        # a race condition around creating $HOME/.erlang.cookie: let rabbitctl
+        # create it now, before spawning the daemon.
+        if self.environment.is_node_running():
+            raise AssertionError(
+                "RabbitMQ OTP already running even though it "
+                "hasn't been started it yet!")
+        self._spawn()
         # Wait for the server to come up...
         timeout = time.time() + 15
-        while time.time() < timeout:
-            if self.environment.check_running():
+        while time.time() < timeout and self.check_running():
+            if self.environment.is_node_running():
                 break
             time.sleep(0.3)
         else:
             raise Exception(
-                "Timeout waiting for RabbitMQ OTP server to start.")
-        # The erlang OTP is up, but RabbitMQ may not be usable. We need to
+                "Timeout waiting for RabbitMQ server to start.")
+        # The Erlang OTP is up, but RabbitMQ may not be usable. We need to
         # cleanup up the process from here on in even if the full service
         # fails to get together.
         self.addCleanup(self._stop)
-        # Wait until the server is ready...
-        while time.time() < timeout:
-            # rabbitctl can say a node is up before it is ready to
-            # accept connections ... :-(
+        # `rabbitctl status` can say a node is up before it is ready to accept
+        # connections. Wait at least 5 more seconds for the node to come up...
+        timeout = max(timeout, time.time() + 5)
+        while time.time() < timeout and self.check_running():
             try:
-                conn = self.environment.get_connection()
+                self.environment.get_connection().close()
             except socket.error:
                 time.sleep(0.1)
             else:
-                conn.close()
                 break
         else:
             raise Exception(
-                "Timeout waiting for RabbitMQ to start listening.")
-        # All should be well here.
-        with open(self.config.pidfile, "r") as f:
-            self.pid = int(f.read().strip())
+                "Timeout waiting for RabbitMQ to node to come up.")
 
-    def _stop(self):
-        """Stop the running server. Normally called by cleanups."""
-        if not self.environment.check_running():
-            # If someone has shut it down already, we're done.
-            return
+    def _request_stop(self):
         outstr, errstr = self.environment.rabbitctl("stop", strip=True)
         if outstr:
             self.addDetail('stop-out', Content(UTF8_TEXT, lambda: [outstr]))
         if errstr:
             self.addDetail('stop-err', Content(UTF8_TEXT, lambda: [errstr]))
-        # Wait for the server to go down...
+
+    def _stop(self):
+        """Stop the running server. Normally called by cleanups."""
+        self._request_stop()
+        # Wait for the node to go down...
         timeout = time.time() + 15
         while time.time() < timeout:
-            if not self.environment.check_running():
+            if not self.environment.is_node_running():
                 break
             time.sleep(0.3)
         else:
             raise Exception(
-                "Timeout waiting for RabbitMQ shutdown.")
-        # Wait for the process to end...
+                "Timeout waiting for RabbitMQ node to go down.")
+        # Wait at least 5 more seconds for the process to end...
+        timeout = max(timeout, time.time() + 5)
         while time.time() < timeout:
-            try:
-                os.kill(self.pid, 0)
-            except OSError, e:
-                if e.errno == errno.ESRCH:
-                    break
-                raise
-            else:
-                time.sleep(0.1)
+            if not self.is_running():
+                break
+            self.process.terminate()
+            time.sleep(0.1)
         else:
-            raise Exception(
-                "RabbitMQ (pid=%d) did not quit." % (self.pid,))
+            # Die!!!
+            if self.is_running():
+                self.process.kill()
+                time.sleep(0.5)
+            if self.is_running():
+                raise Exception("RabbitMQ server just won't die.")
 
 
 class RabbitServer(Fixture):

=== modified file 'lib/lp/services/rabbit/tests/test_fixture.py'
--- lib/lp/services/rabbit/tests/test_fixture.py	2011-06-15 10:25:53 +0000
+++ lib/lp/services/rabbit/tests/test_fixture.py	2011-06-16 19:55:46 +0000
@@ -58,10 +58,6 @@
 class TestRabbitFixture(TestCase):
 
     def test_start_check_shutdown(self):
-        # XXX: GavinPanella 2011-05-26 bug=788557 : Disabled due to spurious
-        # failures (cannot create cookie file).
-        self.skip("Disabled (bug 788557)")
-
         # Rabbit needs to fully isolate itself: an existing per user
         # .erlange.cookie has to be ignored, and ditto bogus HOME if other
         # tests fail to cleanup.
@@ -79,7 +75,7 @@
                     }
                 amqp.Connection(**connect_arguments).close()
                 # And get a log file.
-                log = fixture.runner.getDetails()["rabbit.log"]
+                log = fixture.runner.getDetails()["server.log"]
                 # Which shouldn't blow up on iteration.
                 list(log.iter_text())
         except:
@@ -93,3 +89,9 @@
 
         # The daemon should be closed now.
         self.assertRaises(socket.error, amqp.Connection, **connect_arguments)
+
+
+for num in xrange(1000):
+    setattr(
+        TestRabbitFixture, "test_start_check_shutdown_%04d" % num,
+        TestRabbitFixture.test_start_check_shutdown)


References