launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #09710
[Merge] lp:~adeuring/lazr.jobrunner/bug1015667-2 into lp:lazr.jobrunner
Abel Deuring has proposed merging lp:~adeuring/lazr.jobrunner/bug1015667-2 into lp:lazr.jobrunner.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #1015667 in Launchpad itself: "Celery is leaving an ever increasing number of queues behind"
https://bugs.launchpad.net/launchpad/+bug/1015667
For more details, see:
https://code.launchpad.net/~adeuring/lazr.jobrunner/bug1015667-2/+merge/113962
This branch makes the lazr.jobrunner script usable with the Celery
configuration used by Launchpad.
The problem: LP's Celery configuration sets CELERY_CREATE_MISSING_QUEUES
to False to avoid the creation of arbitrary queues. With this setting,
an attempt to create an instance of app.amqp.Router in the function
drain_queues() fails with the error "queue not found in CELERY_QUEUES".
This error can be avoid by calling app.amqp.Router(create_missing=True).
Attempts to write a test for this change revealed a test isolation problem
in the existing tests of clear_queues(): The est modules simply imported
the function clear_queues() from the module bin.clear_queues, and
clear_queues() does a "late import" of lazr.jobrunner.celerytask, which
in turn imports indirectly a Celery config module.
This meant that the Celery configuration of the first test was used
in all subsequent tests.
I changed the tests so that the tests now invoke the real script in a
subprocess.
This is done via the context manager running(), which tried to call
proc.terminate(). This failed with an exception for the clear-queues
script because the script is already terminated when proc.terminate()
is called. The call of proc.terminate() is now optional in running().
Finally, I refactored to Celery test configurations a bit: We should
always parameters like BROKER_VHOST or CELERY_RESULT_BACKEND, but
overall quite simple the configuration "config1" needs an environment
variable FILE_JOB_DIR, which is unnecessary for the tests of
clear-queues, so I added a quite basic configuration "simple_config",
which is imported by config1 and by the second new configuration
config_do_not_create_missing_queues.
test: make check
no lint
--
https://code.launchpad.net/~adeuring/lazr.jobrunner/bug1015667-2/+merge/113962
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~adeuring/lazr.jobrunner/bug1015667-2 into lp:lazr.jobrunner.
=== modified file 'src/lazr/jobrunner/celerytask.py'
--- src/lazr/jobrunner/celerytask.py 2012-07-03 14:09:46 +0000
+++ src/lazr/jobrunner/celerytask.py 2012-07-09 12:09:18 +0000
@@ -95,7 +95,7 @@
if callbacks is None:
callbacks = [lambda x, y: None]
bindings = []
- router = app.amqp.Router()
+ router = app.amqp.Router(create_missing=True)
for queue_name in queue_names:
destination = router.expand_destination(queue_name)
exchange = Exchange(destination['exchange'])
@@ -111,8 +111,9 @@
# This is basically copied from kombu.Queue.declare().
# We can't use this method directly because queue_declare()
# must be called with passive=True for result queues.
- # Otherwise, attempts to connect to the queue fail with
- # AMQPChannelException: (406, u"PRECONDITION_FAILED...", ...)
+ # Otherwise, attempts to connect to the queue fails with
+ # celery.exceptions.QueueNotFound: "Queue ... is not defined
+ # in CELERY_QUEUES".
for queue in consumer.queues:
if queue.exchange:
queue.exchange.declare()
=== modified file 'src/lazr/jobrunner/tests/config1.py'
--- src/lazr/jobrunner/tests/config1.py 2012-03-21 20:38:50 +0000
+++ src/lazr/jobrunner/tests/config1.py 2012-07-09 12:09:18 +0000
@@ -1,7 +1,4 @@
-BROKER_VHOST = "/"
-CELERY_RESULT_BACKEND = "amqp"
-CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
-CELERYD_CONCURRENCY = 1
+from simple_config import *
import os
import oops
CELERY_ANNOTATIONS = {
=== added file 'src/lazr/jobrunner/tests/config_do_not_create_missing_queues.py'
--- src/lazr/jobrunner/tests/config_do_not_create_missing_queues.py 1970-01-01 00:00:00 +0000
+++ src/lazr/jobrunner/tests/config_do_not_create_missing_queues.py 2012-07-09 12:09:18 +0000
@@ -0,0 +1,2 @@
+from simple_config import *
+CELERY_CREATE_MISSING_QUEUES = False
=== added file 'src/lazr/jobrunner/tests/simple_config.py'
--- src/lazr/jobrunner/tests/simple_config.py 1970-01-01 00:00:00 +0000
+++ src/lazr/jobrunner/tests/simple_config.py 2012-07-09 12:09:18 +0000
@@ -0,0 +1,4 @@
+BROKER_VHOST = "/"
+CELERY_RESULT_BACKEND = "amqp"
+CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
+CELERYD_CONCURRENCY = 1
=== modified file 'src/lazr/jobrunner/tests/test_celerytask.py'
--- src/lazr/jobrunner/tests/test_celerytask.py 2012-07-04 16:34:25 +0000
+++ src/lazr/jobrunner/tests/test_celerytask.py 2012-07-09 12:09:18 +0000
@@ -18,7 +18,6 @@
import contextlib
-from cStringIO import StringIO
import errno
import json
import os
@@ -29,7 +28,6 @@
)
import shutil
import subprocess
-import sys
import tempfile
from time import sleep
from unittest import TestCase
@@ -39,7 +37,6 @@
from celery.exceptions import SoftTimeLimitExceeded
-from lazr.jobrunner.bin.clear_queues import clear_queues
from lazr.jobrunner.celerytask import (
drain_queues,
list_queued,
@@ -62,13 +59,15 @@
@contextlib.contextmanager
-def running(cmd_name, cmd_args, env=None, cwd=None):
+def running(cmd_name, cmd_args, env=None, cwd=None, terminate_process=True):
proc = subprocess.Popen((cmd_name,) + cmd_args, env=env,
- stderr=subprocess.PIPE, cwd=cwd)
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ cwd=cwd)
try:
yield proc
finally:
- proc.terminate()
+ if terminate_process:
+ proc.terminate()
proc.wait()
@@ -481,30 +480,28 @@
self.assertEqual([], list_queued(RunFileJob.app, [self.queue]))
-class TestInspectQueues(TestCase):
+class TestClearQueues(TestCase):
"""Tests for the script inspect-queues."""
def queueName(self, task_id):
return task_id.replace('-', '')
- def runInspectQueues(self, celery_config, task_ids):
+ def runClearQueues(self, celery_config, task_ids):
"""Invoke clear_queues() and catch the data written to stdout
and stderr.
"""
+ # Simply calling the function clear_queues() from bin.clear_queues()
+ # leads to a one-time import of the celery config module; the
+ # config setting from the test that runs first would override
+ # any different configuration setting in a later test.
+ # Running the script in a subprocess avoids this problem.
queues = [self.queueName(task_id) for task_id in task_ids]
- real_stdout = sys.stdout
- real_stderr = sys.stderr
- try:
- sys.stdout = StringIO()
- sys.stderr = StringIO()
- args = ['program', '-c', celery_config] + queues
- clear_queues(args)
- fake_stdout = sys.stdout.getvalue()
- fake_stderr = sys.stderr.getvalue()
- finally:
- sys.stdout = real_stdout
- sys.stderr = real_stderr
- return fake_stdout, fake_stderr
+ args = ('-c', celery_config) + tuple(queues)
+
+ with running('bin/clear-queues', args, cwd=get_root(),
+ terminate_process=False) as script:
+ stdout, stderr = script.communicate()
+ return stdout, stderr
def invokeJob(self, celery_config, task, delay=1, job_args={}):
"""Run the given task.
@@ -537,15 +534,19 @@
but the result is not consumed, the related message can be
retrieved with clear_queues().
"""
- celery_config = 'lazr.jobrunner.tests.config1'
- task_id = self.invokeJob(celery_config, RunFileJob)
- stdout, stderr = self.runInspectQueues(celery_config, [task_id])
+ celery_config_jobrunner = 'lazr.jobrunner.tests.config1'
+ task_id = self.invokeJob(celery_config_jobrunner, RunFileJob)
+ # The script clear_queues does not have to use the same
+ # Celery configuration as the job runner: clear_config just
+ # needs to know how to connect to AMQP server.
+ clear_queue_config = 'lazr.jobrunner.tests.simple_config'
+ stdout, stderr = self.runClearQueues(clear_queue_config, [task_id])
self.assertEqual(self.successMessage(task_id), stdout)
self.assertEqual('', stderr)
# Reading a queue is destructive. An attempt to read again from
# a queue results in an error.
- stdout, stderr = self.runInspectQueues(celery_config, [task_id])
+ stdout, stderr = self.runClearQueues(clear_queue_config, [task_id])
self.assertEqual('', stdout)
self.assertEqual(self.noQueueMessage(task_id), stderr)
@@ -553,11 +554,12 @@
"""More than one queue can be inspected in one call of
clear_queue().
"""
- celery_config = 'lazr.jobrunner.tests.config1'
- task_id_1 = self.invokeJob(celery_config, RunFileJob)
- task_id_2 = self.invokeJob(celery_config, RunFileJob)
- stdout, stderr = self.runInspectQueues(
- celery_config, [task_id_1, task_id_2])
+ celery_config_jobrunner = 'lazr.jobrunner.tests.config1'
+ task_id_1 = self.invokeJob(celery_config_jobrunner, RunFileJob)
+ task_id_2 = self.invokeJob(celery_config_jobrunner, RunFileJob)
+ clear_queue_config = 'lazr.jobrunner.tests.simple_config'
+ stdout, stderr = self.runClearQueues(
+ clear_queue_config, [task_id_1, task_id_2])
expected_stdout = (
self.successMessage(task_id_1) + self.successMessage(task_id_2))
self.assertEqual(expected_stdout, stdout)
@@ -567,8 +569,28 @@
"""A Celery task which was started so that no result is returned
does not write to a task queue.
"""
- celery_config = 'lazr.jobrunner.tests.config1'
- task_id = self.invokeJob(celery_config, RunFileJobNoResult)
- stdout, stderr = self.runInspectQueues(celery_config, [task_id])
+ celery_config_jobrunner = 'lazr.jobrunner.tests.config1'
+ task_id = self.invokeJob(celery_config_jobrunner, RunFileJobNoResult)
+ clear_queue_config = 'lazr.jobrunner.tests.simple_config'
+ stdout, stderr = self.runClearQueues(clear_queue_config, [task_id])
+ self.assertEqual('', stdout)
+ self.assertEqual(self.noQueueMessage(task_id), stderr)
+
+ def test_clear_queues__config_create_missing_queues_false(self):
+ """If CELERY_CREATE_MISSING_QUEUES is False, drain_queues()
+ must override this setting by creating the router instance
+ with the parameter create_missing=True. Otherwise,
+ app.amqp.Router() will fail with
+ 'celery.exceptions.QueueNotFound: "Queue ... is not defined in
+ CELERY_QUEUES"'. Note that this does not mean that a non-existent
+ queue is actually created, see
+ assertEqual(self.noQueueMessage(task_id), stderr) below...
+ """
+ celery_config = (
+ 'lazr.jobrunner.tests.config_do_not_create_missing_queues')
+ # A test isolation problem: Specifying a configuration module does
+ # not have the desired effect
+ task_id = 'this-queue-does-not-exist'
+ stdout, stderr = self.runClearQueues(celery_config, [task_id])
self.assertEqual('', stdout)
self.assertEqual(self.noQueueMessage(task_id), stderr)