← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~adeuring/lazr.jobrunner/bug-1015667 into lp:lazr.jobrunner

 

Abel Deuring has proposed merging lp:~adeuring/lazr.jobrunner/bug-1015667 into lp:lazr.jobrunner.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #1015667 in Launchpad itself: "Celery is leaving an ever increasing number of queues behind"
  https://bugs.launchpad.net/launchpad/+bug/1015667

For more details, see:
https://code.launchpad.net/~adeuring/lazr.jobrunner/bug-1015667/+merge/113228

This branch adds a script "inspect-queues" to lazr.jobrunner.

As described in bug 1015667, some Celery jobs seem to leave
messages in result queues behind. The new script allows us to
see what the messages in these queues contain. I hope that we
can get some clue which jobs created these messages by looking
at these messages. (I have also a branch for the main LP code
ready which changes the task ID so that it includes the job
class and the job ID, together with a UUID. Since the result
queue name is derived from the task ID, we should be able to
trace the "offending jobs" a bit better.)

As a side effect, the messages are also consumed. This might
not be desired in every case -- but I could not figure out
how to call drain_queues() so that the messages stay in the
queues. The darin_queues() parameter "retain" seems to have
no effect...

In other words, the script must be used carefully. But if we
store the result of running "rabbitmqctl list_queues" and run
the script, let's say, 24 hours later, we can be sure that the
queues are suffciently stale so that no results are
inadvertently deleted.

I also changed the function drain_queues(). The old
implementation delegated the setup of the queues completely to
kombu.Consumer.__init__(). Doing this for the result queued
fails with the error "PRECONDITION_FAILED - parameters for queue
'...' in vhost '/' not equivalent". Calling
queue.declare(passive=True) avoids this error, but
Comsumer.__init__() does not allow to do this, so the queues
are now explicitly declared by drain_queues, when called from
inspect_queues().

no lint

-- 
https://code.launchpad.net/~adeuring/lazr.jobrunner/bug-1015667/+merge/113228
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~adeuring/lazr.jobrunner/bug-1015667 into lp:lazr.jobrunner.
=== modified file 'setup.py'
--- setup.py	2012-05-14 16:42:07 +0000
+++ setup.py	2012-07-03 14:52:21 +0000
@@ -32,7 +32,8 @@
 ]
 
 
-setup(name='lazr.jobrunner',
+setup(
+    name='lazr.jobrunner',
     version=version,
     description="A Celery based job runner",
     long_description=README + '\n\n' + NEWS,
@@ -51,8 +52,10 @@
     zip_safe=False,
     install_requires=install_requires,
     entry_points={
-        'console_scripts':
-            ['jobrunnerctl=lazr.jobrunner.bin.jobrunnerctl:main']
+        'console_scripts': [
+            'jobrunnerctl=lazr.jobrunner.bin.jobrunnerctl:main',
+            'inspect-queues=lazr.jobrunner.bin.inspect_queues:main'
+            ]
     },
     test_suite="lazr.jobrunner",
 )

=== added file 'src/lazr/jobrunner/bin/inspect_queues.py'
--- src/lazr/jobrunner/bin/inspect_queues.py	1970-01-01 00:00:00 +0000
+++ src/lazr/jobrunner/bin/inspect_queues.py	2012-07-03 14:52:21 +0000
@@ -0,0 +1,64 @@
+# Copyright 2012 Canonical Ltd.  All rights reserved.
+#
+# This file is part of lazr.jobrunner
+#
+# lazr.jobrunner is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation, version 3 of the License.
+#
+# lazr.jobrunner is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+# License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lazr.jobrunner. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""Inspect Celery result queues."""
+
+__metaclass__ = type
+
+from argparse import ArgumentParser
+import os
+import sys
+from amqplib.client_0_8.exceptions import AMQPChannelException
+
+
+def show_queue_data(body, message):
+    print '%s: %r' % (message.delivery_info['routing_key'], body)
+
+
+def inspect_queues(args):
+    parser = ArgumentParser(description=__doc__, prog=args[0])
+    parser.add_argument(
+        '-c', '--config', dest='config', required=True,
+        help='The name of the Celery config module')
+    parser.add_argument(
+        'queues', nargs='+', metavar='queue',
+        help='The names of RabbitMQ queues that hold results of Celery tasks')
+    args = parser.parse_args(args[1:])
+    os.environ["CELERY_CONFIG_MODULE"] = args.config
+    # Late import because Celery modules are imported by celerytask,
+    # and these modules need to know where to find the configuration.
+    from lazr.jobrunner.celerytask import drain_queues, RunJob
+
+    # In theory, drain_queues() can be called with more than one queue
+    # name in the second argument. But the callback is only called for
+    # the first queue...
+    for queue in args.queues:
+        try:
+            drain_queues(
+                RunJob.app, [queue], callbacks=[show_queue_data],
+                retain=True, passive_queues=True)
+        except AMQPChannelException as exc:
+            if exc.amqp_reply_code == 404:
+                # Unknown queue name specified; amqp_reply_text is
+                # self-explaining.
+                print >>sys.stderr, exc.amqp_reply_text
+            else:
+                raise
+
+
+def main():
+    inspect_queues(sys.argv)

=== modified file 'src/lazr/jobrunner/celerytask.py'
--- src/lazr/jobrunner/celerytask.py	2012-05-18 13:35:46 +0000
+++ src/lazr/jobrunner/celerytask.py	2012-07-03 14:52:21 +0000
@@ -81,7 +81,8 @@
     return listings
 
 
-def drain_queues(app, queue_names, callbacks=None, retain=False):
+def drain_queues(app, queue_names, callbacks=None, retain=False,
+                 passive_queues=False):
     """Drain the messages from queues.
 
     :param app: The app to list queues for (affects backend, Queue type,
@@ -104,11 +105,22 @@
         # The no_ack flag is misleadingly named.
         # See: https://github.com/ask/kombu/issues/130
         consumer = Consumer(
-            connection, bindings, callbacks=callbacks, no_ack=not retain)
+            connection, bindings, callbacks=callbacks, no_ack=not retain,
+            auto_declare=not passive_queues)
+        if passive_queues:
+            # This is basically copied from kombu.Queue.declare().
+            # We can't use this method directly because queue_declare()
+            # must be called with passive=True for result queues.
+            # Otherwise, attempts to connect to the queue fail with
+            # AMQPChannelException: (406, u"PRECONDITION_FAILED...", ...)
+            for queue in consumer.queues:
+                if queue.exchange:
+                    queue.exchange.declare()
+                queue.queue_declare(passive=True)
         with consumer:
             try:
                 # Timeout of 0 causes error: [Errno 11] Resource temporarily
-                # unavailable
+                # unavailable.
                 connection.drain_events(timeout=0.1 ** 100)
             except timeout:
                 pass

=== modified file 'src/lazr/jobrunner/tests/test_celerytask.py'
--- src/lazr/jobrunner/tests/test_celerytask.py	2012-05-14 15:42:44 +0000
+++ src/lazr/jobrunner/tests/test_celerytask.py	2012-07-03 14:52:21 +0000
@@ -18,6 +18,7 @@
 
 
 import contextlib
+from cStringIO import StringIO
 import errno
 import json
 import os
@@ -28,6 +29,7 @@
     )
 import shutil
 import subprocess
+import sys
 import tempfile
 from time import sleep
 from unittest import TestCase
@@ -37,6 +39,7 @@
 
 from celery.exceptions import SoftTimeLimitExceeded
 
+from lazr.jobrunner.bin.inspect_queues import inspect_queues
 from lazr.jobrunner.celerytask import (
     drain_queues,
     list_queued,
@@ -476,3 +479,96 @@
     def test_no_tasks(self):
         """When no jobs are listed, the queue is shown as empty."""
         self.assertEqual([], list_queued(RunFileJob.app, [self.queue]))
+
+
+class TestInspectQueues(TestCase):
+    """Tests for the script inspect-queues."""
+
+    def queueName(self, task_id):
+        return task_id.replace('-', '')
+
+    def runInspectQueues(self, celery_config, task_ids):
+        """Invoke inspect_queues() and catch the data written to stdout
+        and stderr.
+        """
+        queues = [self.queueName(task_id) for task_id in task_ids]
+        real_stdout = sys.stdout
+        real_stderr = sys.stderr
+        try:
+            sys.stdout = StringIO()
+            sys.stderr = StringIO()
+            args = ['program', '-c', celery_config] + queues
+            inspect_queues(args)
+            fake_stdout = sys.stdout.getvalue()
+            fake_stderr = sys.stderr.getvalue()
+        finally:
+            sys.stdout = real_stdout
+            sys.stderr = real_stderr
+        return fake_stdout, fake_stderr
+
+    def invokeJob(self, celery_config, task, delay=1, job_args={}):
+        """Run the given task.
+
+        :return: The name of the result queue.
+        """
+        with tempdir() as temp_dir:
+            js = FileJobSource(temp_dir)
+            job = FileJob(js, 11, **job_args)
+            job.save()
+            task_info = task.apply_async(args=(11, ))
+            with celeryd(celery_config, temp_dir):
+                # Wait just long enough so that celeryd can start and
+                # process the job.
+                sleep(delay)
+            return task_info.task_id
+
+    def successMessage(self, task_id):
+        return (
+            "%s: {'status': 'SUCCESS', 'traceback': None, 'result': None, "
+            "'task_id': '%s'}\n" % (self.queueName(task_id), task_id))
+
+    def noQueueMessage(self, task_id):
+        return (
+            "NOT_FOUND - no queue '%s' in vhost '/'\n"
+            % self.queueName(task_id))
+
+    def test_inspect_queues__result_not_consumed(self):
+        """When a Celery task is started so that a result is returned
+        but the result is not consumed, the related message can be
+        retrieved with inspect_queues().
+        """
+        celery_config = 'lazr.jobrunner.tests.config1'
+        task_id = self.invokeJob(celery_config, RunFileJob)
+        stdout, stderr = self.runInspectQueues(celery_config, [task_id])
+        self.assertEqual(self.successMessage(task_id), stdout)
+        self.assertEqual('', stderr)
+
+        # Reading a queue is destructive. An attempt to read again from
+        # a queue results in an error.
+        stdout, stderr = self.runInspectQueues(celery_config, [task_id])
+        self.assertEqual('', stdout)
+        self.assertEqual(self.noQueueMessage(task_id), stderr)
+
+    def test_inspect_queues__two_queues(self):
+        """More than one queue can be inspected in one call of
+        inspect_queue().
+        """
+        celery_config = 'lazr.jobrunner.tests.config1'
+        task_id_1 = self.invokeJob(celery_config, RunFileJob)
+        task_id_2 = self.invokeJob(celery_config, RunFileJob)
+        stdout, stderr = self.runInspectQueues(
+            celery_config, [task_id_1, task_id_2])
+        expected_stdout = (
+            self.successMessage(task_id_1) + self.successMessage(task_id_2))
+        self.assertEqual(expected_stdout, stdout)
+        self.assertEqual('', stderr)
+
+    def test_inspect_queues__task_without_result(self):
+        """A Celery task which was started so that no result is returned
+        does not write to a task queue.
+        """
+        celery_config = 'lazr.jobrunner.tests.config1'
+        task_id = self.invokeJob(celery_config, RunFileJobNoResult)
+        stdout, stderr = self.runInspectQueues(celery_config, [task_id])
+        self.assertEqual('', stdout)
+        self.assertEqual(self.noQueueMessage(task_id), stderr)