← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~abentley/launchpad/restore-queue-test into lp:launchpad

 

Aaron Bentley has proposed merging lp:~abentley/launchpad/restore-queue-test into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #1015667 in Launchpad itself: "Celery is leaving an ever increasing number of queues behind"
  https://bugs.launchpad.net/launchpad/+bug/1015667

For more details, see:
https://code.launchpad.net/~abentley/launchpad/restore-queue-test/+merge/116534

= Summary =
Fix bug #1015667: Celery is leaving an ever increasing number of queues behind

== Proposed fix ==
Re-apply the changes from r15656, and fix the test.

== Pre-implementation notes ==
None

== LOC Rationale ==
Reverting a rollback

== Implementation details ==
Add polling to ensure that list_queued does not race with RabbitMQ message delivery.

Replace existing polling by waiting for a no-op task.

== Tests ==
bin/test -t test_run_missing_ready_does_not_return_results


== Demo and Q/A ==
Enable Celery and celery-based branch scanning on qastaging.  Run for at least an hour.  No mysterious queues should be left behind.


= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/config/schema-lazr.conf
  lib/lp/services/job/runner.py
  lib/lp/services/job/tests/celery_helpers.py
  lib/lp/services/job/celeryjob.py
  lib/lp/services/job/tests/test_celeryjob.py
  lib/lp/services/job/celeryconfig.py
  lib/lp/services/job/tests/test_celery_configuration.py

./lib/lp/services/config/schema-lazr.conf
     445: Line exceeds 80 characters.
    1038: Line exceeds 80 characters.
    1045: Line exceeds 80 characters.
    1583: Line exceeds 80 characters.
-- 
https://code.launchpad.net/~abentley/launchpad/restore-queue-test/+merge/116534
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/restore-queue-test into lp:launchpad.
=== modified file 'lib/lp/services/config/schema-lazr.conf'
--- lib/lp/services/config/schema-lazr.conf	2012-07-23 13:29:01 +0000
+++ lib/lp/services/config/schema-lazr.conf	2012-07-24 19:53:20 +0000
@@ -1805,7 +1805,7 @@
 
 [job_runner_queues]
 # The names of all queues.
-queues: job job_slow branch_write_job branch_write_job_slow
+queues: job job_slow branch_write_job branch_write_job_slow celerybeat
 
 # The main job queue.
 [job]
@@ -1837,3 +1837,9 @@
 timeout: 86400
 fallback_queue:
 concurrency: 1
+
+# The queue used for the celerybeat task RunMissingReady
+[celerybeat]
+timeout: 86400
+fallback_queue:
+concurrency: 1

=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py	2012-07-23 13:29:01 +0000
+++ lib/lp/services/job/celeryconfig.py	2012-07-24 19:53:20 +0000
@@ -35,6 +35,7 @@
     Doing this in a function is convenient for testing.
     """
     result = {}
+    CELERY_BEAT_QUEUE = 'celerybeat'
     celery_queues = {}
     queue_names = config.job_runner_queues.queues
     queue_names = queue_names.split(' ')
@@ -85,7 +86,10 @@
     result['CELERYBEAT_SCHEDULE'] = {
         'schedule-missing': {
             'task': 'lp.services.job.celeryjob.run_missing_ready',
-            'schedule': timedelta(seconds=600)
+            'schedule': timedelta(seconds=600),
+            'options': {
+                'routing_key': CELERY_BEAT_QUEUE,
+                },
         }
     }
     # See http://ask.github.com/celery/userguide/optimizing.html:

=== modified file 'lib/lp/services/job/celeryjob.py'
--- lib/lp/services/job/celeryjob.py	2012-07-23 13:29:01 +0000
+++ lib/lp/services/job/celeryjob.py	2012-07-24 19:53:20 +0000
@@ -16,10 +16,11 @@
 
 from logging import info
 import os
+from uuid import uuid4
 
 
 os.environ.setdefault('CELERY_CONFIG_MODULE', 'lp.services.job.celeryconfig')
-from celery.task import task
+from celery.task import Task
 from lazr.jobrunner.celerytask import RunJob
 from storm.zope.interfaces import IZStorm
 import transaction
@@ -80,24 +81,41 @@
             queued_job_ids]
 
 
-@task
-def run_missing_ready(_no_init=False):
+class RunMissingReady(Task):
     """Task to run any jobs that are ready but not scheduled.
 
     Currently supports only BranchScanJob.
     :param _no_init: For tests.  If True, do not perform the initialization.
     """
-    if not _no_init:
-        task_init('run_missing_ready')
-    with TransactionFreeOperation():
-        count = 0
-        for job in find_missing_ready(BranchScanJob):
-            if not celery_enabled(job.__class__.__name__):
-                continue
-            job.celeryCommitHook(True)
-            count += 1
-        info('Scheduled %d missing jobs.', count)
-        transaction.commit()
+    ignore_result = True
+
+    def run(self, _no_init=False):
+        if not _no_init:
+            task_init('run_missing_ready')
+        with TransactionFreeOperation():
+            count = 0
+            for job in find_missing_ready(BranchScanJob):
+                if not celery_enabled(job.__class__.__name__):
+                    continue
+                job.celeryCommitHook(True)
+                count += 1
+            info('Scheduled %d missing jobs.', count)
+            transaction.commit()
+
+    def apply_async(self, args=None, kwargs=None, task_id=None, publisher=None,
+                    connection=None, router=None, queues=None, **options):
+        """Create a task_id if none is specified.
+
+        Override the quite generic default task_id with one containing
+        the class name.
+
+        See also `celery.task.Task.apply_async()`.
+        """
+        if task_id is None:
+            task_id = '%s_%s' % (self.__class__.__name__, uuid4())
+        return super(RunMissingReady, self).apply_async(
+            args, kwargs, task_id, publisher, connection, router, queues,
+            **options)
 
 
 needs_zcml = True

=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2012-07-23 13:29:01 +0000
+++ lib/lp/services/job/runner.py	2012-07-24 19:53:20 +0000
@@ -217,7 +217,7 @@
             'result': SoftTimeLimitExceeded(1,),
             'task_id': 'cba7d07b-37fe-4f1d-a5f6-79ad7c30222f'}
         """
-        return '%s-%s-%s' % (
+        return '%s_%s_%s' % (
             self.__class__.__name__, self.job_id, uuid4())
 
     def runViaCelery(self, ignore_result=False):

=== modified file 'lib/lp/services/job/tests/celery_helpers.py'
--- lib/lp/services/job/tests/celery_helpers.py	2012-06-14 05:18:22 +0000
+++ lib/lp/services/job/tests/celery_helpers.py	2012-07-24 19:53:20 +0000
@@ -3,7 +3,10 @@
 
 __metaclass__ = type
 
-__all__ = ['pop_notifications']
+__all__ = [
+    'noop',
+    'pop_notifications'
+    ]
 
 # Force the correct celeryconfig to be used.
 import lp.services.job.celeryjob
@@ -18,3 +21,11 @@
 def pop_notifications():
     from lp.testing.mail_helpers import pop_notifications
     return pop_notifications()
+
+
+@task
+def noop():
+    """Task that does nothing.
+
+    Used to ensure that other tasks have completed.
+    """

=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
--- lib/lp/services/job/tests/test_celery_configuration.py	2012-07-23 13:29:01 +0000
+++ lib/lp/services/job/tests/test_celery_configuration.py	2012-07-24 19:53:20 +0000
@@ -25,7 +25,8 @@
         # Four queues are defined; the binding key for each queue is
         # just the queue name.
         queue_names = [
-            'branch_write_job', 'branch_write_job_slow', 'job', 'job_slow']
+            'branch_write_job', 'branch_write_job_slow', 'celerybeat', 'job',
+            'job_slow']
         queues = config['CELERY_QUEUES']
         self.assertEqual(queue_names, sorted(queues))
         for name in queue_names:

=== modified file 'lib/lp/services/job/tests/test_celeryjob.py'
--- lib/lp/services/job/tests/test_celeryjob.py	2012-07-23 13:29:01 +0000
+++ lib/lp/services/job/tests/test_celeryjob.py	2012-07-24 19:53:20 +0000
@@ -1,10 +1,15 @@
 # Copyright 2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
+from cStringIO import StringIO
+import sys
+from time import sleep
+from lazr.jobrunner.bin.clear_queues import clear_queues
 from lp.code.model.branchjob import BranchScanJob
 from lp.scripts.helpers import TransactionFreeOperation
 from lp.services.features.testing import FeatureFixture
 from lp.services.job.tests import (
+    celeryd,
     drain_celery_queues,
     monitor_celery,
     )
@@ -21,10 +26,10 @@
         super(TestRunMissingJobs, self).setUp()
         from lp.services.job.celeryjob import (
             find_missing_ready,
-            run_missing_ready,
+            RunMissingReady,
         )
         self.find_missing_ready = find_missing_ready
-        self.run_missing_ready = run_missing_ready
+        self.RunMissingReady = RunMissingReady
 
     def createMissingJob(self):
         job = BranchScanJob.create(self.factory.makeBranch())
@@ -48,7 +53,7 @@
         with monitor_celery() as responses:
             with dbuser('run_missing_ready'):
                 with TransactionFreeOperation.require():
-                    self.run_missing_ready(_no_init=True)
+                    self.RunMissingReady().run(_no_init=True)
         self.assertEqual([], responses)
 
     def test_run_missing_ready(self):
@@ -59,5 +64,61 @@
         with monitor_celery() as responses:
             with dbuser('run_missing_ready'):
                 with TransactionFreeOperation.require():
-                    self.run_missing_ready(_no_init=True)
+                    self.RunMissingReady().run(_no_init=True)
         self.assertEqual(1, len(responses))
+
+    def test_run_missing_ready_does_not_return_results(self):
+        """The celerybeat task run_missing_ready does not create a
+        result queue."""
+        from lazr.jobrunner.celerytask import list_queued
+        from lp.services.job.tests.celery_helpers import noop
+        job_queue_name = 'celerybeat'
+        request = self.RunMissingReady().apply_async(
+            kwargs={'_no_init': True}, queue=job_queue_name)
+        self.assertTrue(request.task_id.startswith('RunMissingReady_'))
+        result_queue_name = request.task_id.replace('-', '')
+        # Paranoia check: This test intends to prove that a Celery
+        # result queue for the task created above will _not_ be created.
+        # This would also happen when "with celeryd()" would do nothing.
+        # So let's be sure that a task is queued...
+        # Give the system some time to deliver the message
+        for x in range(10):
+            if list_queued(self.RunMissingReady.app, [job_queue_name]) > 0:
+                break
+            sleep(1)
+        self.assertEqual(
+            1, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
+        # Wait at most 60 seconds for celeryd to start and process
+        # the task.
+        with celeryd(job_queue_name):
+            # Due to FIFO ordering, this will only return after
+            # RunMissingReady has finished.
+            noop.apply_async(queue=job_queue_name).wait(60)
+        # But now the message has been consumed by celeryd.
+        self.assertEqual(
+            0, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
+        # No result queue was created for the task.
+        try:
+            real_stdout = sys.stdout
+            real_stderr = sys.stderr
+            sys.stdout = fake_stdout = StringIO()
+            sys.stderr = fake_stderr = StringIO()
+            clear_queues(
+                ['script_name', '-c', 'lp.services.job.celeryconfig',
+                 result_queue_name])
+        finally:
+            sys.stdout = real_stdout
+            sys.stderr = real_stderr
+        fake_stdout = fake_stdout.getvalue()
+        fake_stderr = fake_stderr.getvalue()
+        self.assertEqual(
+            '', fake_stdout,
+            "Unexpected output from clear_queues:\n"
+            "stdout: %r\n"
+            "stderr: %r" % (fake_stdout, fake_stderr))
+        self.assertEqual(
+            "NOT_FOUND - no queue '%s' in vhost '/'\n" % result_queue_name,
+            fake_stderr,
+            "Unexpected output from clear_queues:\n"
+            "stdout: %r\n"
+            "stderr: %r" % (fake_stdout, fake_stderr))

=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py	2012-07-23 13:29:01 +0000
+++ lib/lp/services/job/tests/test_runner.py	2012-07-24 19:53:20 +0000
@@ -384,7 +384,7 @@
         task_id = job.taskId()
         uuid_expr = (
             '[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}')
-        mo = re.search('^NullJob-%s-%s$' % (job.job_id, uuid_expr), task_id)
+        mo = re.search('^NullJob_%s_%s$' % (job.job_id, uuid_expr), task_id)
         self.assertIsNot(None, mo)
 
 


Follow ups