← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~adeuring/lazr.jobrunner/bug1015667-2 into lp:lazr.jobrunner

 

Abel Deuring has proposed merging lp:~adeuring/lazr.jobrunner/bug1015667-2 into lp:lazr.jobrunner.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #1015667 in Launchpad itself: "Celery is leaving an ever increasing number of queues behind"
  https://bugs.launchpad.net/launchpad/+bug/1015667

For more details, see:
https://code.launchpad.net/~adeuring/lazr.jobrunner/bug1015667-2/+merge/113962

This branch makes the lazr.jobrunner script usable with the Celery
configuration used by Launchpad.

The problem: LP's Celery configuration sets CELERY_CREATE_MISSING_QUEUES
to False to avoid the creation of arbitrary queues. With this setting,
an attempt to create an instance of app.amqp.Router in the function
drain_queues() fails with the error "queue not found in CELERY_QUEUES".

This error can be avoid by calling app.amqp.Router(create_missing=True).

Attempts to write a test for this change revealed a test isolation problem
in the existing tests of clear_queues(): The est modules simply imported
the function clear_queues() from the module bin.clear_queues, and
clear_queues() does a "late import" of lazr.jobrunner.celerytask, which
in turn imports indirectly a Celery config module.

This meant that the Celery configuration of the first test was used
in all subsequent tests.

I changed the tests so that the tests now invoke the real script in a
subprocess.

This is done via the context manager running(), which tried to call
proc.terminate(). This failed with an exception for the clear-queues
script because the script is already terminated when proc.terminate()
is called. The call of proc.terminate() is now optional in running().

Finally, I refactored to Celery test configurations a bit: We should
always parameters like BROKER_VHOST or CELERY_RESULT_BACKEND, but
overall quite simple the configuration "config1" needs an environment
variable FILE_JOB_DIR, which is unnecessary for the tests of
clear-queues, so I added a quite basic configuration "simple_config",
which is imported by config1 and by the second new configuration
config_do_not_create_missing_queues.

test: make check

no lint

-- 
https://code.launchpad.net/~adeuring/lazr.jobrunner/bug1015667-2/+merge/113962
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~adeuring/lazr.jobrunner/bug1015667-2 into lp:lazr.jobrunner.
=== modified file 'src/lazr/jobrunner/celerytask.py'
--- src/lazr/jobrunner/celerytask.py	2012-07-03 14:09:46 +0000
+++ src/lazr/jobrunner/celerytask.py	2012-07-09 12:09:18 +0000
@@ -95,7 +95,7 @@
     if callbacks is None:
         callbacks = [lambda x, y: None]
     bindings = []
-    router = app.amqp.Router()
+    router = app.amqp.Router(create_missing=True)
     for queue_name in queue_names:
         destination = router.expand_destination(queue_name)
         exchange = Exchange(destination['exchange'])
@@ -111,8 +111,9 @@
             # This is basically copied from kombu.Queue.declare().
             # We can't use this method directly because queue_declare()
             # must be called with passive=True for result queues.
-            # Otherwise, attempts to connect to the queue fail with
-            # AMQPChannelException: (406, u"PRECONDITION_FAILED...", ...)
+            # Otherwise, attempts to connect to the queue fails with
+            # celery.exceptions.QueueNotFound: "Queue ... is not defined
+            # in CELERY_QUEUES".
             for queue in consumer.queues:
                 if queue.exchange:
                     queue.exchange.declare()

=== modified file 'src/lazr/jobrunner/tests/config1.py'
--- src/lazr/jobrunner/tests/config1.py	2012-03-21 20:38:50 +0000
+++ src/lazr/jobrunner/tests/config1.py	2012-07-09 12:09:18 +0000
@@ -1,7 +1,4 @@
-BROKER_VHOST = "/"
-CELERY_RESULT_BACKEND = "amqp"
-CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
-CELERYD_CONCURRENCY = 1
+from simple_config import *
 import os
 import oops
 CELERY_ANNOTATIONS = {

=== added file 'src/lazr/jobrunner/tests/config_do_not_create_missing_queues.py'
--- src/lazr/jobrunner/tests/config_do_not_create_missing_queues.py	1970-01-01 00:00:00 +0000
+++ src/lazr/jobrunner/tests/config_do_not_create_missing_queues.py	2012-07-09 12:09:18 +0000
@@ -0,0 +1,2 @@
+from simple_config import *
+CELERY_CREATE_MISSING_QUEUES = False

=== added file 'src/lazr/jobrunner/tests/simple_config.py'
--- src/lazr/jobrunner/tests/simple_config.py	1970-01-01 00:00:00 +0000
+++ src/lazr/jobrunner/tests/simple_config.py	2012-07-09 12:09:18 +0000
@@ -0,0 +1,4 @@
+BROKER_VHOST = "/"
+CELERY_RESULT_BACKEND = "amqp"
+CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
+CELERYD_CONCURRENCY = 1

=== modified file 'src/lazr/jobrunner/tests/test_celerytask.py'
--- src/lazr/jobrunner/tests/test_celerytask.py	2012-07-04 16:34:25 +0000
+++ src/lazr/jobrunner/tests/test_celerytask.py	2012-07-09 12:09:18 +0000
@@ -18,7 +18,6 @@
 
 
 import contextlib
-from cStringIO import StringIO
 import errno
 import json
 import os
@@ -29,7 +28,6 @@
     )
 import shutil
 import subprocess
-import sys
 import tempfile
 from time import sleep
 from unittest import TestCase
@@ -39,7 +37,6 @@
 
 from celery.exceptions import SoftTimeLimitExceeded
 
-from lazr.jobrunner.bin.clear_queues import clear_queues
 from lazr.jobrunner.celerytask import (
     drain_queues,
     list_queued,
@@ -62,13 +59,15 @@
 
 
 @contextlib.contextmanager
-def running(cmd_name, cmd_args, env=None, cwd=None):
+def running(cmd_name, cmd_args, env=None, cwd=None, terminate_process=True):
     proc = subprocess.Popen((cmd_name,) + cmd_args, env=env,
-                            stderr=subprocess.PIPE, cwd=cwd)
+                            stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+                            cwd=cwd)
     try:
         yield proc
     finally:
-        proc.terminate()
+        if terminate_process:
+            proc.terminate()
         proc.wait()
 
 
@@ -481,30 +480,28 @@
         self.assertEqual([], list_queued(RunFileJob.app, [self.queue]))
 
 
-class TestInspectQueues(TestCase):
+class TestClearQueues(TestCase):
     """Tests for the script inspect-queues."""
 
     def queueName(self, task_id):
         return task_id.replace('-', '')
 
-    def runInspectQueues(self, celery_config, task_ids):
+    def runClearQueues(self, celery_config, task_ids):
         """Invoke clear_queues() and catch the data written to stdout
         and stderr.
         """
+        # Simply calling the function clear_queues() from bin.clear_queues()
+        # leads to a one-time import of the celery config module; the
+        # config setting from the test that runs first would override
+        # any different configuration setting in a later test.
+        # Running the script in a subprocess avoids this problem.
         queues = [self.queueName(task_id) for task_id in task_ids]
-        real_stdout = sys.stdout
-        real_stderr = sys.stderr
-        try:
-            sys.stdout = StringIO()
-            sys.stderr = StringIO()
-            args = ['program', '-c', celery_config] + queues
-            clear_queues(args)
-            fake_stdout = sys.stdout.getvalue()
-            fake_stderr = sys.stderr.getvalue()
-        finally:
-            sys.stdout = real_stdout
-            sys.stderr = real_stderr
-        return fake_stdout, fake_stderr
+        args = ('-c', celery_config) + tuple(queues)
+
+        with running('bin/clear-queues', args, cwd=get_root(),
+                     terminate_process=False) as script:
+            stdout, stderr = script.communicate()
+        return stdout, stderr
 
     def invokeJob(self, celery_config, task, delay=1, job_args={}):
         """Run the given task.
@@ -537,15 +534,19 @@
         but the result is not consumed, the related message can be
         retrieved with clear_queues().
         """
-        celery_config = 'lazr.jobrunner.tests.config1'
-        task_id = self.invokeJob(celery_config, RunFileJob)
-        stdout, stderr = self.runInspectQueues(celery_config, [task_id])
+        celery_config_jobrunner = 'lazr.jobrunner.tests.config1'
+        task_id = self.invokeJob(celery_config_jobrunner, RunFileJob)
+        # The script clear_queues does not have to use the same
+        # Celery configuration as the job runner: clear_config just
+        # needs to know how to connect to AMQP server.
+        clear_queue_config = 'lazr.jobrunner.tests.simple_config'
+        stdout, stderr = self.runClearQueues(clear_queue_config, [task_id])
         self.assertEqual(self.successMessage(task_id), stdout)
         self.assertEqual('', stderr)
 
         # Reading a queue is destructive. An attempt to read again from
         # a queue results in an error.
-        stdout, stderr = self.runInspectQueues(celery_config, [task_id])
+        stdout, stderr = self.runClearQueues(clear_queue_config, [task_id])
         self.assertEqual('', stdout)
         self.assertEqual(self.noQueueMessage(task_id), stderr)
 
@@ -553,11 +554,12 @@
         """More than one queue can be inspected in one call of
         clear_queue().
         """
-        celery_config = 'lazr.jobrunner.tests.config1'
-        task_id_1 = self.invokeJob(celery_config, RunFileJob)
-        task_id_2 = self.invokeJob(celery_config, RunFileJob)
-        stdout, stderr = self.runInspectQueues(
-            celery_config, [task_id_1, task_id_2])
+        celery_config_jobrunner = 'lazr.jobrunner.tests.config1'
+        task_id_1 = self.invokeJob(celery_config_jobrunner, RunFileJob)
+        task_id_2 = self.invokeJob(celery_config_jobrunner, RunFileJob)
+        clear_queue_config = 'lazr.jobrunner.tests.simple_config'
+        stdout, stderr = self.runClearQueues(
+            clear_queue_config, [task_id_1, task_id_2])
         expected_stdout = (
             self.successMessage(task_id_1) + self.successMessage(task_id_2))
         self.assertEqual(expected_stdout, stdout)
@@ -567,8 +569,28 @@
         """A Celery task which was started so that no result is returned
         does not write to a task queue.
         """
-        celery_config = 'lazr.jobrunner.tests.config1'
-        task_id = self.invokeJob(celery_config, RunFileJobNoResult)
-        stdout, stderr = self.runInspectQueues(celery_config, [task_id])
+        celery_config_jobrunner = 'lazr.jobrunner.tests.config1'
+        task_id = self.invokeJob(celery_config_jobrunner, RunFileJobNoResult)
+        clear_queue_config = 'lazr.jobrunner.tests.simple_config'
+        stdout, stderr = self.runClearQueues(clear_queue_config, [task_id])
+        self.assertEqual('', stdout)
+        self.assertEqual(self.noQueueMessage(task_id), stderr)
+
+    def test_clear_queues__config_create_missing_queues_false(self):
+        """If CELERY_CREATE_MISSING_QUEUES is False, drain_queues()
+        must override this setting by creating the router instance
+        with the parameter create_missing=True. Otherwise,
+        app.amqp.Router() will fail with
+        'celery.exceptions.QueueNotFound: "Queue ... is not defined in
+        CELERY_QUEUES"'. Note that this does not mean that a non-existent
+        queue is actually created, see
+        assertEqual(self.noQueueMessage(task_id), stderr) below...
+        """
+        celery_config = (
+            'lazr.jobrunner.tests.config_do_not_create_missing_queues')
+        # A test isolation problem: Specifying a configuration module does
+        # not have the desired effect
+        task_id = 'this-queue-does-not-exist'
+        stdout, stderr = self.runClearQueues(celery_config, [task_id])
         self.assertEqual('', stdout)
         self.assertEqual(self.noQueueMessage(task_id), stderr)