launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #09532
[Merge] lp:~adeuring/lazr.jobrunner/bug-1015667 into lp:lazr.jobrunner
Abel Deuring has proposed merging lp:~adeuring/lazr.jobrunner/bug-1015667 into lp:lazr.jobrunner.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #1015667 in Launchpad itself: "Celery is leaving an ever increasing number of queues behind"
https://bugs.launchpad.net/launchpad/+bug/1015667
For more details, see:
https://code.launchpad.net/~adeuring/lazr.jobrunner/bug-1015667/+merge/113228
This branch adds a script "inspect-queues" to lazr.jobrunner.
As described in bug 1015667, some Celery jobs seem to leave
messages in result queues behind. The new script allows us to
see what the messages in these queues contain. I hope that we
can get some clue which jobs created these messages by looking
at these messages. (I have also a branch for the main LP code
ready which changes the task ID so that it includes the job
class and the job ID, together with a UUID. Since the result
queue name is derived from the task ID, we should be able to
trace the "offending jobs" a bit better.)
As a side effect, the messages are also consumed. This might
not be desired in every case -- but I could not figure out
how to call drain_queues() so that the messages stay in the
queues. The darin_queues() parameter "retain" seems to have
no effect...
In other words, the script must be used carefully. But if we
store the result of running "rabbitmqctl list_queues" and run
the script, let's say, 24 hours later, we can be sure that the
queues are suffciently stale so that no results are
inadvertently deleted.
I also changed the function drain_queues(). The old
implementation delegated the setup of the queues completely to
kombu.Consumer.__init__(). Doing this for the result queued
fails with the error "PRECONDITION_FAILED - parameters for queue
'...' in vhost '/' not equivalent". Calling
queue.declare(passive=True) avoids this error, but
Comsumer.__init__() does not allow to do this, so the queues
are now explicitly declared by drain_queues, when called from
inspect_queues().
no lint
--
https://code.launchpad.net/~adeuring/lazr.jobrunner/bug-1015667/+merge/113228
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~adeuring/lazr.jobrunner/bug-1015667 into lp:lazr.jobrunner.
=== modified file 'setup.py'
--- setup.py 2012-05-14 16:42:07 +0000
+++ setup.py 2012-07-03 14:52:21 +0000
@@ -32,7 +32,8 @@
]
-setup(name='lazr.jobrunner',
+setup(
+ name='lazr.jobrunner',
version=version,
description="A Celery based job runner",
long_description=README + '\n\n' + NEWS,
@@ -51,8 +52,10 @@
zip_safe=False,
install_requires=install_requires,
entry_points={
- 'console_scripts':
- ['jobrunnerctl=lazr.jobrunner.bin.jobrunnerctl:main']
+ 'console_scripts': [
+ 'jobrunnerctl=lazr.jobrunner.bin.jobrunnerctl:main',
+ 'inspect-queues=lazr.jobrunner.bin.inspect_queues:main'
+ ]
},
test_suite="lazr.jobrunner",
)
=== added file 'src/lazr/jobrunner/bin/inspect_queues.py'
--- src/lazr/jobrunner/bin/inspect_queues.py 1970-01-01 00:00:00 +0000
+++ src/lazr/jobrunner/bin/inspect_queues.py 2012-07-03 14:52:21 +0000
@@ -0,0 +1,64 @@
+# Copyright 2012 Canonical Ltd. All rights reserved.
+#
+# This file is part of lazr.jobrunner
+#
+# lazr.jobrunner is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation, version 3 of the License.
+#
+# lazr.jobrunner is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+# License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lazr.jobrunner. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""Inspect Celery result queues."""
+
+__metaclass__ = type
+
+from argparse import ArgumentParser
+import os
+import sys
+from amqplib.client_0_8.exceptions import AMQPChannelException
+
+
+def show_queue_data(body, message):
+ print '%s: %r' % (message.delivery_info['routing_key'], body)
+
+
+def inspect_queues(args):
+ parser = ArgumentParser(description=__doc__, prog=args[0])
+ parser.add_argument(
+ '-c', '--config', dest='config', required=True,
+ help='The name of the Celery config module')
+ parser.add_argument(
+ 'queues', nargs='+', metavar='queue',
+ help='The names of RabbitMQ queues that hold results of Celery tasks')
+ args = parser.parse_args(args[1:])
+ os.environ["CELERY_CONFIG_MODULE"] = args.config
+ # Late import because Celery modules are imported by celerytask,
+ # and these modules need to know where to find the configuration.
+ from lazr.jobrunner.celerytask import drain_queues, RunJob
+
+ # In theory, drain_queues() can be called with more than one queue
+ # name in the second argument. But the callback is only called for
+ # the first queue...
+ for queue in args.queues:
+ try:
+ drain_queues(
+ RunJob.app, [queue], callbacks=[show_queue_data],
+ retain=True, passive_queues=True)
+ except AMQPChannelException as exc:
+ if exc.amqp_reply_code == 404:
+ # Unknown queue name specified; amqp_reply_text is
+ # self-explaining.
+ print >>sys.stderr, exc.amqp_reply_text
+ else:
+ raise
+
+
+def main():
+ inspect_queues(sys.argv)
=== modified file 'src/lazr/jobrunner/celerytask.py'
--- src/lazr/jobrunner/celerytask.py 2012-05-18 13:35:46 +0000
+++ src/lazr/jobrunner/celerytask.py 2012-07-03 14:52:21 +0000
@@ -81,7 +81,8 @@
return listings
-def drain_queues(app, queue_names, callbacks=None, retain=False):
+def drain_queues(app, queue_names, callbacks=None, retain=False,
+ passive_queues=False):
"""Drain the messages from queues.
:param app: The app to list queues for (affects backend, Queue type,
@@ -104,11 +105,22 @@
# The no_ack flag is misleadingly named.
# See: https://github.com/ask/kombu/issues/130
consumer = Consumer(
- connection, bindings, callbacks=callbacks, no_ack=not retain)
+ connection, bindings, callbacks=callbacks, no_ack=not retain,
+ auto_declare=not passive_queues)
+ if passive_queues:
+ # This is basically copied from kombu.Queue.declare().
+ # We can't use this method directly because queue_declare()
+ # must be called with passive=True for result queues.
+ # Otherwise, attempts to connect to the queue fail with
+ # AMQPChannelException: (406, u"PRECONDITION_FAILED...", ...)
+ for queue in consumer.queues:
+ if queue.exchange:
+ queue.exchange.declare()
+ queue.queue_declare(passive=True)
with consumer:
try:
# Timeout of 0 causes error: [Errno 11] Resource temporarily
- # unavailable
+ # unavailable.
connection.drain_events(timeout=0.1 ** 100)
except timeout:
pass
=== modified file 'src/lazr/jobrunner/tests/test_celerytask.py'
--- src/lazr/jobrunner/tests/test_celerytask.py 2012-05-14 15:42:44 +0000
+++ src/lazr/jobrunner/tests/test_celerytask.py 2012-07-03 14:52:21 +0000
@@ -18,6 +18,7 @@
import contextlib
+from cStringIO import StringIO
import errno
import json
import os
@@ -28,6 +29,7 @@
)
import shutil
import subprocess
+import sys
import tempfile
from time import sleep
from unittest import TestCase
@@ -37,6 +39,7 @@
from celery.exceptions import SoftTimeLimitExceeded
+from lazr.jobrunner.bin.inspect_queues import inspect_queues
from lazr.jobrunner.celerytask import (
drain_queues,
list_queued,
@@ -476,3 +479,96 @@
def test_no_tasks(self):
"""When no jobs are listed, the queue is shown as empty."""
self.assertEqual([], list_queued(RunFileJob.app, [self.queue]))
+
+
+class TestInspectQueues(TestCase):
+ """Tests for the script inspect-queues."""
+
+ def queueName(self, task_id):
+ return task_id.replace('-', '')
+
+ def runInspectQueues(self, celery_config, task_ids):
+ """Invoke inspect_queues() and catch the data written to stdout
+ and stderr.
+ """
+ queues = [self.queueName(task_id) for task_id in task_ids]
+ real_stdout = sys.stdout
+ real_stderr = sys.stderr
+ try:
+ sys.stdout = StringIO()
+ sys.stderr = StringIO()
+ args = ['program', '-c', celery_config] + queues
+ inspect_queues(args)
+ fake_stdout = sys.stdout.getvalue()
+ fake_stderr = sys.stderr.getvalue()
+ finally:
+ sys.stdout = real_stdout
+ sys.stderr = real_stderr
+ return fake_stdout, fake_stderr
+
+ def invokeJob(self, celery_config, task, delay=1, job_args={}):
+ """Run the given task.
+
+ :return: The name of the result queue.
+ """
+ with tempdir() as temp_dir:
+ js = FileJobSource(temp_dir)
+ job = FileJob(js, 11, **job_args)
+ job.save()
+ task_info = task.apply_async(args=(11, ))
+ with celeryd(celery_config, temp_dir):
+ # Wait just long enough so that celeryd can start and
+ # process the job.
+ sleep(delay)
+ return task_info.task_id
+
+ def successMessage(self, task_id):
+ return (
+ "%s: {'status': 'SUCCESS', 'traceback': None, 'result': None, "
+ "'task_id': '%s'}\n" % (self.queueName(task_id), task_id))
+
+ def noQueueMessage(self, task_id):
+ return (
+ "NOT_FOUND - no queue '%s' in vhost '/'\n"
+ % self.queueName(task_id))
+
+ def test_inspect_queues__result_not_consumed(self):
+ """When a Celery task is started so that a result is returned
+ but the result is not consumed, the related message can be
+ retrieved with inspect_queues().
+ """
+ celery_config = 'lazr.jobrunner.tests.config1'
+ task_id = self.invokeJob(celery_config, RunFileJob)
+ stdout, stderr = self.runInspectQueues(celery_config, [task_id])
+ self.assertEqual(self.successMessage(task_id), stdout)
+ self.assertEqual('', stderr)
+
+ # Reading a queue is destructive. An attempt to read again from
+ # a queue results in an error.
+ stdout, stderr = self.runInspectQueues(celery_config, [task_id])
+ self.assertEqual('', stdout)
+ self.assertEqual(self.noQueueMessage(task_id), stderr)
+
+ def test_inspect_queues__two_queues(self):
+ """More than one queue can be inspected in one call of
+ inspect_queue().
+ """
+ celery_config = 'lazr.jobrunner.tests.config1'
+ task_id_1 = self.invokeJob(celery_config, RunFileJob)
+ task_id_2 = self.invokeJob(celery_config, RunFileJob)
+ stdout, stderr = self.runInspectQueues(
+ celery_config, [task_id_1, task_id_2])
+ expected_stdout = (
+ self.successMessage(task_id_1) + self.successMessage(task_id_2))
+ self.assertEqual(expected_stdout, stdout)
+ self.assertEqual('', stderr)
+
+ def test_inspect_queues__task_without_result(self):
+ """A Celery task which was started so that no result is returned
+ does not write to a task queue.
+ """
+ celery_config = 'lazr.jobrunner.tests.config1'
+ task_id = self.invokeJob(celery_config, RunFileJobNoResult)
+ stdout, stderr = self.runInspectQueues(celery_config, [task_id])
+ self.assertEqual('', stdout)
+ self.assertEqual(self.noQueueMessage(task_id), stderr)