← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~adeuring/launchpad/bug-1015667-3 into lp:launchpad

 

Abel Deuring has proposed merging lp:~adeuring/launchpad/bug-1015667-3 into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #1015667 in Launchpad itself: "Celery is leaving an ever increasing number of queues behind"
  https://bugs.launchpad.net/launchpad/+bug/1015667

For more details, see:
https://code.launchpad.net/~adeuring/launchpad/bug-1015667-3/+merge/115110

This branch fixes bug 1015667: Celery is leaving an ever increasing number
of queues behind. Well, at least I hope that the bug will be fixed --
the result queues created by Celery tasks generally do not give any hint
which task created them: The queue names are the task IDs which are by
default strings generated by uuid4(), and the content of the queued
messages is nothing more than a status message and the data returned by
Task.run(). (OK, this is not completely true: Messages for failed tasks
also contain a traceback -- but that does not help much when we have
tasks that tend to run without failures.)

The culprit was the task run_missing_jobs which simply did not had
the flag ignore_result set.

I changed the task from a decorated function into a class derived
from celery.task.Task and defined the class attribute ignore_result.

This also allows also to override the default task ID. If no ID is
specified as a parameter for celery.task.Task.apply_async(), a default
value as mentioned above is created in Task.apply() or
celery.app.amqp.TaskPublisher.delay_task().

The test that no result messsage queue is created is somewhat paranoid:
Testing that an unwanted side effect of some other functionality does
not occur must ensure that main event actually happens. Otherwise,
the test might simply check: "If nothing happens, there is no result
queue."

test_run_missing_ready_does_not_return_results() does this by calling
list_queued() -- but since this function uses another function
drain_queues() which can also consume queued messages I wanted to be
sure that this does not happen, hence list_queues() is called twice
before celeryd is started.

    Side note, for extra fun: have a look at the implementation of
    list_queued() and drain_queues() in lazr.jobrunner. drain_queues()
    has the parameter "retain"; if it is True, messages are consumed;
    list_queued() calls drain_queued(retain=True, ...), so that looks
    sane. But the usage of this parameter in drain_queues() scares me:

            consumer = Consumer(
                connection, bindings, callbacks=callbacks, no_ack=not retain,
                auto_declare=not passive_queues)

    So no_ack must be False in order to kepp the message in the queue.
    And if you want to remove the message, you should set the "don't
    acknowledge" flag to False...

This test (and several others for the lazr.jobrunner package and for
lp.servcices.job) could be more simple and more robust if there would
be a way to easily check which queues exist on a rabbitmq instance
and how many messages they contain. There is "rabbitmqctl list_queues"
but this requires root privileges. Also, rabbitmq can provide a
webservice API, but last time I checked it was not available on precise.

And a final rant: After quickly glancing through the AMQP specs I believe
that the protocol does not provide a mecahnism to answer the question
"which queues exist on a given exchange". Scary...

Anyway, I also noticed a minor flaw in BaseRunnableJob.taskId(). This
method creates task IDs for regular job runner tasks, similar to
RUnMissingReady.appy_async().  BaseRunnableJob.taskId() also adds the
DB ID of a job and separated the different parts (class name, DB ID,
UUID) with '-'. But the '-' are removed from the queue name, so it is
a bit hard to separate the DB ID from the UUID in the queue name.
So I replaced the "separator symbol" '-' with '_'.

tests:

./bin/test services.job -vvt lp.services.job.tests.test_celeryjob
./bin/test services.job -vvt lp.services.job.tests.test_runner

no lint

-- 
https://code.launchpad.net/~adeuring/launchpad/bug-1015667-3/+merge/115110
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~adeuring/launchpad/bug-1015667-3 into lp:launchpad.
=== modified file 'lib/lp/services/job/celeryjob.py'
--- lib/lp/services/job/celeryjob.py	2012-06-14 05:18:22 +0000
+++ lib/lp/services/job/celeryjob.py	2012-07-16 11:07:22 +0000
@@ -16,10 +16,11 @@
 
 from logging import info
 import os
+from uuid import uuid4
 
 
 os.environ.setdefault('CELERY_CONFIG_MODULE', 'lp.services.job.celeryconfig')
-from celery.task import task
+from celery.task import Task
 from lazr.jobrunner.celerytask import RunJob
 from storm.zope.interfaces import IZStorm
 import transaction
@@ -80,24 +81,41 @@
             queued_job_ids]
 
 
-@task
-def run_missing_ready(_no_init=False):
+class RunMissingReady(Task):
     """Task to run any jobs that are ready but not scheduled.
 
     Currently supports only BranchScanJob.
     :param _no_init: For tests.  If True, do not perform the initialization.
     """
-    if not _no_init:
-        task_init('run_missing_ready')
-    with TransactionFreeOperation():
-        count = 0
-        for job in find_missing_ready(BranchScanJob):
-            if not celery_enabled(job.__class__.__name__):
-                continue
-            job.celeryCommitHook(True)
-            count += 1
-        info('Scheduled %d missing jobs.', count)
-        transaction.commit()
+    ignore_result = True
+
+    def run(self, _no_init=False):
+        if not _no_init:
+            task_init('run_missing_ready')
+        with TransactionFreeOperation():
+            count = 0
+            for job in find_missing_ready(BranchScanJob):
+                if not celery_enabled(job.__class__.__name__):
+                    continue
+                job.celeryCommitHook(True)
+                count += 1
+            info('Scheduled %d missing jobs.', count)
+            transaction.commit()
+
+    def apply_async(self, args=None, kwargs=None, task_id=None, publisher=None,
+                    connection=None, router=None, queues=None, **options):
+        """Create a task_id if none is specified.
+
+        Override the quite generic default task_id with one containing
+        the class name.
+
+        See also `celery.task.Task.apply_async()`.
+        """
+        if task_id is None:
+            task_id = '%s_%s' % (self.__class__.__name__, uuid4())
+        return super(RunMissingReady, self).apply_async(
+            args, kwargs, task_id, publisher, connection, router, queues,
+            **options)
 
 
 needs_zcml = True

=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2012-07-03 15:36:00 +0000
+++ lib/lp/services/job/runner.py	2012-07-16 11:07:22 +0000
@@ -217,7 +217,7 @@
             'result': SoftTimeLimitExceeded(1,),
             'task_id': 'cba7d07b-37fe-4f1d-a5f6-79ad7c30222f'}
         """
-        return '%s-%s-%s' % (
+        return '%s_%s_%s' % (
             self.__class__.__name__, self.job_id, uuid4())
 
     def runViaCelery(self, ignore_result=False):

=== modified file 'lib/lp/services/job/tests/test_celeryjob.py'
--- lib/lp/services/job/tests/test_celeryjob.py	2012-06-27 03:25:41 +0000
+++ lib/lp/services/job/tests/test_celeryjob.py	2012-07-16 11:07:22 +0000
@@ -1,10 +1,15 @@
 # Copyright 2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
+from cStringIO import StringIO
+import sys
+from time import sleep
+from lazr.jobrunner.bin.clear_queues import clear_queues
 from lp.code.model.branchjob import BranchScanJob
 from lp.scripts.helpers import TransactionFreeOperation
 from lp.services.features.testing import FeatureFixture
 from lp.services.job.tests import (
+    celeryd,
     drain_celery_queues,
     monitor_celery,
     )
@@ -21,10 +26,10 @@
         super(TestRunMissingJobs, self).setUp()
         from lp.services.job.celeryjob import (
             find_missing_ready,
-            run_missing_ready,
+            RunMissingReady,
         )
         self.find_missing_ready = find_missing_ready
-        self.run_missing_ready = run_missing_ready
+        self.RunMissingReady = RunMissingReady
 
     def createMissingJob(self):
         job = BranchScanJob.create(self.factory.makeBranch())
@@ -48,7 +53,7 @@
         with monitor_celery() as responses:
             with dbuser('run_missing_ready'):
                 with TransactionFreeOperation.require():
-                    self.run_missing_ready(_no_init=True)
+                    self.RunMissingReady().run(_no_init=True)
         self.assertEqual([], responses)
 
     def test_run_missing_ready(self):
@@ -59,5 +64,54 @@
         with monitor_celery() as responses:
             with dbuser('run_missing_ready'):
                 with TransactionFreeOperation.require():
-                    self.run_missing_ready(_no_init=True)
+                    self.RunMissingReady().run(_no_init=True)
         self.assertEqual(1, len(responses))
+
+    def test_run_missing_ready_does_not_return_results(self):
+        """The celerybeat task run_missing_ready does not create a
+        result queue."""
+        from lazr.jobrunner.celerytask import list_queued
+        job_queue_name = 'job'
+        request = self.RunMissingReady().apply_async(
+            kwargs={'_no_init': True}, queue=job_queue_name)
+        self.assertTrue(request.task_id.startswith('RunMissingReady_'))
+        result_queue_name = request.task_id.replace('-', '')
+        # Paranoia check: This test intends to prove that a Celery
+        # result queue fot the task created above will _not_ be created.
+        # This would also happen when "with celeryd()" would do nothing.
+        # So let's be sure that right now a task is queued...
+        self.assertEqual(
+            1, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
+        # ...and that list_queued() calls do not consume messages.
+        self.assertEqual(
+            1, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
+        with celeryd(job_queue_name):
+            sleep(2)
+        # But now the message has been consumed by celeryd.
+        self.assertEqual(
+            0, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
+        # No result queue was created for the task.
+        try:
+            real_stdout = sys.stdout
+            real_stderr = sys.stderr
+            sys.stdout = fake_stdout = StringIO()
+            sys.stderr = fake_stderr = StringIO()
+            clear_queues(
+                ['script_name', '-c', 'lp.services.job.celeryconfig',
+                 result_queue_name])
+        finally:
+            sys.stdout = real_stdout
+            sys.stderr = real_stderr
+        fake_stdout = fake_stdout.getvalue()
+        fake_stderr = fake_stderr.getvalue()
+        self.assertEqual(
+            '', fake_stdout,
+            "Unexpected output from clear_queues:\n"
+            "stdout: %r\n"
+            "stderr: %r" % (fake_stdout, fake_stderr))
+        self.assertEqual(
+            "NOT_FOUND - no queue '%s' in vhost '/'\n" % result_queue_name,
+            fake_stderr,
+            "Unexpected output from clear_queues:\n"
+            "stdout: %r\n"
+            "stderr: %r" % (fake_stdout, fake_stderr))

=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py	2012-07-03 15:36:00 +0000
+++ lib/lp/services/job/tests/test_runner.py	2012-07-16 11:07:22 +0000
@@ -384,7 +384,7 @@
         task_id = job.taskId()
         uuid_expr = (
             '[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}')
-        mo = re.search('^NullJob-%s-%s$' % (job.job_id, uuid_expr), task_id)
+        mo = re.search('^NullJob_%s_%s$' % (job.job_id, uuid_expr), task_id)
         self.assertIsNot(None, mo)
 
 


Follow ups