launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #10238
[Merge] lp:~abentley/launchpad/restore-queue-test into lp:launchpad
Aaron Bentley has proposed merging lp:~abentley/launchpad/restore-queue-test into lp:launchpad.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #1015667 in Launchpad itself: "Celery is leaving an ever increasing number of queues behind"
https://bugs.launchpad.net/launchpad/+bug/1015667
For more details, see:
https://code.launchpad.net/~abentley/launchpad/restore-queue-test/+merge/116534
= Summary =
Fix bug #1015667: Celery is leaving an ever increasing number of queues behind
== Proposed fix ==
Re-apply the changes from r15656, and fix the test.
== Pre-implementation notes ==
None
== LOC Rationale ==
Reverting a rollback
== Implementation details ==
Add polling to ensure that list_queued does not race with RabbitMQ message delivery.
Replace existing polling by waiting for a no-op task.
== Tests ==
bin/test -t test_run_missing_ready_does_not_return_results
== Demo and Q/A ==
Enable Celery and celery-based branch scanning on qastaging. Run for at least an hour. No mysterious queues should be left behind.
= Launchpad lint =
Checking for conflicts and issues in changed files.
Linting changed files:
lib/lp/services/job/tests/test_runner.py
lib/lp/services/config/schema-lazr.conf
lib/lp/services/job/runner.py
lib/lp/services/job/tests/celery_helpers.py
lib/lp/services/job/celeryjob.py
lib/lp/services/job/tests/test_celeryjob.py
lib/lp/services/job/celeryconfig.py
lib/lp/services/job/tests/test_celery_configuration.py
./lib/lp/services/config/schema-lazr.conf
445: Line exceeds 80 characters.
1038: Line exceeds 80 characters.
1045: Line exceeds 80 characters.
1583: Line exceeds 80 characters.
--
https://code.launchpad.net/~abentley/launchpad/restore-queue-test/+merge/116534
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/restore-queue-test into lp:launchpad.
=== modified file 'lib/lp/services/config/schema-lazr.conf'
--- lib/lp/services/config/schema-lazr.conf 2012-07-23 13:29:01 +0000
+++ lib/lp/services/config/schema-lazr.conf 2012-07-24 19:53:20 +0000
@@ -1805,7 +1805,7 @@
[job_runner_queues]
# The names of all queues.
-queues: job job_slow branch_write_job branch_write_job_slow
+queues: job job_slow branch_write_job branch_write_job_slow celerybeat
# The main job queue.
[job]
@@ -1837,3 +1837,9 @@
timeout: 86400
fallback_queue:
concurrency: 1
+
+# The queue used for the celerybeat task RunMissingReady
+[celerybeat]
+timeout: 86400
+fallback_queue:
+concurrency: 1
=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py 2012-07-23 13:29:01 +0000
+++ lib/lp/services/job/celeryconfig.py 2012-07-24 19:53:20 +0000
@@ -35,6 +35,7 @@
Doing this in a function is convenient for testing.
"""
result = {}
+ CELERY_BEAT_QUEUE = 'celerybeat'
celery_queues = {}
queue_names = config.job_runner_queues.queues
queue_names = queue_names.split(' ')
@@ -85,7 +86,10 @@
result['CELERYBEAT_SCHEDULE'] = {
'schedule-missing': {
'task': 'lp.services.job.celeryjob.run_missing_ready',
- 'schedule': timedelta(seconds=600)
+ 'schedule': timedelta(seconds=600),
+ 'options': {
+ 'routing_key': CELERY_BEAT_QUEUE,
+ },
}
}
# See http://ask.github.com/celery/userguide/optimizing.html:
=== modified file 'lib/lp/services/job/celeryjob.py'
--- lib/lp/services/job/celeryjob.py 2012-07-23 13:29:01 +0000
+++ lib/lp/services/job/celeryjob.py 2012-07-24 19:53:20 +0000
@@ -16,10 +16,11 @@
from logging import info
import os
+from uuid import uuid4
os.environ.setdefault('CELERY_CONFIG_MODULE', 'lp.services.job.celeryconfig')
-from celery.task import task
+from celery.task import Task
from lazr.jobrunner.celerytask import RunJob
from storm.zope.interfaces import IZStorm
import transaction
@@ -80,24 +81,41 @@
queued_job_ids]
-@task
-def run_missing_ready(_no_init=False):
+class RunMissingReady(Task):
"""Task to run any jobs that are ready but not scheduled.
Currently supports only BranchScanJob.
:param _no_init: For tests. If True, do not perform the initialization.
"""
- if not _no_init:
- task_init('run_missing_ready')
- with TransactionFreeOperation():
- count = 0
- for job in find_missing_ready(BranchScanJob):
- if not celery_enabled(job.__class__.__name__):
- continue
- job.celeryCommitHook(True)
- count += 1
- info('Scheduled %d missing jobs.', count)
- transaction.commit()
+ ignore_result = True
+
+ def run(self, _no_init=False):
+ if not _no_init:
+ task_init('run_missing_ready')
+ with TransactionFreeOperation():
+ count = 0
+ for job in find_missing_ready(BranchScanJob):
+ if not celery_enabled(job.__class__.__name__):
+ continue
+ job.celeryCommitHook(True)
+ count += 1
+ info('Scheduled %d missing jobs.', count)
+ transaction.commit()
+
+ def apply_async(self, args=None, kwargs=None, task_id=None, publisher=None,
+ connection=None, router=None, queues=None, **options):
+ """Create a task_id if none is specified.
+
+ Override the quite generic default task_id with one containing
+ the class name.
+
+ See also `celery.task.Task.apply_async()`.
+ """
+ if task_id is None:
+ task_id = '%s_%s' % (self.__class__.__name__, uuid4())
+ return super(RunMissingReady, self).apply_async(
+ args, kwargs, task_id, publisher, connection, router, queues,
+ **options)
needs_zcml = True
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2012-07-23 13:29:01 +0000
+++ lib/lp/services/job/runner.py 2012-07-24 19:53:20 +0000
@@ -217,7 +217,7 @@
'result': SoftTimeLimitExceeded(1,),
'task_id': 'cba7d07b-37fe-4f1d-a5f6-79ad7c30222f'}
"""
- return '%s-%s-%s' % (
+ return '%s_%s_%s' % (
self.__class__.__name__, self.job_id, uuid4())
def runViaCelery(self, ignore_result=False):
=== modified file 'lib/lp/services/job/tests/celery_helpers.py'
--- lib/lp/services/job/tests/celery_helpers.py 2012-06-14 05:18:22 +0000
+++ lib/lp/services/job/tests/celery_helpers.py 2012-07-24 19:53:20 +0000
@@ -3,7 +3,10 @@
__metaclass__ = type
-__all__ = ['pop_notifications']
+__all__ = [
+ 'noop',
+ 'pop_notifications'
+ ]
# Force the correct celeryconfig to be used.
import lp.services.job.celeryjob
@@ -18,3 +21,11 @@
def pop_notifications():
from lp.testing.mail_helpers import pop_notifications
return pop_notifications()
+
+
+@task
+def noop():
+ """Task that does nothing.
+
+ Used to ensure that other tasks have completed.
+ """
=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
--- lib/lp/services/job/tests/test_celery_configuration.py 2012-07-23 13:29:01 +0000
+++ lib/lp/services/job/tests/test_celery_configuration.py 2012-07-24 19:53:20 +0000
@@ -25,7 +25,8 @@
# Four queues are defined; the binding key for each queue is
# just the queue name.
queue_names = [
- 'branch_write_job', 'branch_write_job_slow', 'job', 'job_slow']
+ 'branch_write_job', 'branch_write_job_slow', 'celerybeat', 'job',
+ 'job_slow']
queues = config['CELERY_QUEUES']
self.assertEqual(queue_names, sorted(queues))
for name in queue_names:
=== modified file 'lib/lp/services/job/tests/test_celeryjob.py'
--- lib/lp/services/job/tests/test_celeryjob.py 2012-07-23 13:29:01 +0000
+++ lib/lp/services/job/tests/test_celeryjob.py 2012-07-24 19:53:20 +0000
@@ -1,10 +1,15 @@
# Copyright 2012 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
+from cStringIO import StringIO
+import sys
+from time import sleep
+from lazr.jobrunner.bin.clear_queues import clear_queues
from lp.code.model.branchjob import BranchScanJob
from lp.scripts.helpers import TransactionFreeOperation
from lp.services.features.testing import FeatureFixture
from lp.services.job.tests import (
+ celeryd,
drain_celery_queues,
monitor_celery,
)
@@ -21,10 +26,10 @@
super(TestRunMissingJobs, self).setUp()
from lp.services.job.celeryjob import (
find_missing_ready,
- run_missing_ready,
+ RunMissingReady,
)
self.find_missing_ready = find_missing_ready
- self.run_missing_ready = run_missing_ready
+ self.RunMissingReady = RunMissingReady
def createMissingJob(self):
job = BranchScanJob.create(self.factory.makeBranch())
@@ -48,7 +53,7 @@
with monitor_celery() as responses:
with dbuser('run_missing_ready'):
with TransactionFreeOperation.require():
- self.run_missing_ready(_no_init=True)
+ self.RunMissingReady().run(_no_init=True)
self.assertEqual([], responses)
def test_run_missing_ready(self):
@@ -59,5 +64,61 @@
with monitor_celery() as responses:
with dbuser('run_missing_ready'):
with TransactionFreeOperation.require():
- self.run_missing_ready(_no_init=True)
+ self.RunMissingReady().run(_no_init=True)
self.assertEqual(1, len(responses))
+
+ def test_run_missing_ready_does_not_return_results(self):
+ """The celerybeat task run_missing_ready does not create a
+ result queue."""
+ from lazr.jobrunner.celerytask import list_queued
+ from lp.services.job.tests.celery_helpers import noop
+ job_queue_name = 'celerybeat'
+ request = self.RunMissingReady().apply_async(
+ kwargs={'_no_init': True}, queue=job_queue_name)
+ self.assertTrue(request.task_id.startswith('RunMissingReady_'))
+ result_queue_name = request.task_id.replace('-', '')
+ # Paranoia check: This test intends to prove that a Celery
+ # result queue for the task created above will _not_ be created.
+ # This would also happen when "with celeryd()" would do nothing.
+ # So let's be sure that a task is queued...
+ # Give the system some time to deliver the message
+ for x in range(10):
+ if list_queued(self.RunMissingReady.app, [job_queue_name]) > 0:
+ break
+ sleep(1)
+ self.assertEqual(
+ 1, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
+ # Wait at most 60 seconds for celeryd to start and process
+ # the task.
+ with celeryd(job_queue_name):
+ # Due to FIFO ordering, this will only return after
+ # RunMissingReady has finished.
+ noop.apply_async(queue=job_queue_name).wait(60)
+ # But now the message has been consumed by celeryd.
+ self.assertEqual(
+ 0, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
+ # No result queue was created for the task.
+ try:
+ real_stdout = sys.stdout
+ real_stderr = sys.stderr
+ sys.stdout = fake_stdout = StringIO()
+ sys.stderr = fake_stderr = StringIO()
+ clear_queues(
+ ['script_name', '-c', 'lp.services.job.celeryconfig',
+ result_queue_name])
+ finally:
+ sys.stdout = real_stdout
+ sys.stderr = real_stderr
+ fake_stdout = fake_stdout.getvalue()
+ fake_stderr = fake_stderr.getvalue()
+ self.assertEqual(
+ '', fake_stdout,
+ "Unexpected output from clear_queues:\n"
+ "stdout: %r\n"
+ "stderr: %r" % (fake_stdout, fake_stderr))
+ self.assertEqual(
+ "NOT_FOUND - no queue '%s' in vhost '/'\n" % result_queue_name,
+ fake_stderr,
+ "Unexpected output from clear_queues:\n"
+ "stdout: %r\n"
+ "stderr: %r" % (fake_stdout, fake_stderr))
=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py 2012-07-23 13:29:01 +0000
+++ lib/lp/services/job/tests/test_runner.py 2012-07-24 19:53:20 +0000
@@ -384,7 +384,7 @@
task_id = job.taskId()
uuid_expr = (
'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}')
- mo = re.search('^NullJob-%s-%s$' % (job.job_id, uuid_expr), task_id)
+ mo = re.search('^NullJob_%s_%s$' % (job.job_id, uuid_expr), task_id)
self.assertIsNot(None, mo)
Follow ups