← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~adeuring/launchpad/rollback-15656 into lp:launchpad

 

Abel Deuring has proposed merging lp:~adeuring/launchpad/rollback-15656 into lp:launchpad.

Requested reviews:
  Abel Deuring (adeuring)

For more details, see:
https://code.launchpad.net/~adeuring/launchpad/rollback-15656/+merge/116247

just a rollback
-- 
https://code.launchpad.net/~adeuring/launchpad/rollback-15656/+merge/116247
Your team Launchpad code reviewers is subscribed to branch lp:launchpad.
=== modified file 'lib/lp/services/config/schema-lazr.conf'
--- lib/lp/services/config/schema-lazr.conf	2012-07-19 13:55:15 +0000
+++ lib/lp/services/config/schema-lazr.conf	2012-07-23 11:26:22 +0000
@@ -1805,7 +1805,7 @@
 
 [job_runner_queues]
 # The names of all queues.
-queues: job job_slow branch_write_job branch_write_job_slow celerybeat
+queues: job job_slow branch_write_job branch_write_job_slow
 
 # The main job queue.
 [job]
@@ -1837,9 +1837,3 @@
 timeout: 86400
 fallback_queue:
 concurrency: 1
-
-# The queue used for the celerybeat task RunMissingReady
-[celerybeat]
-timeout: 86400
-fallback_queue:
-concurrency: 1

=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py	2012-07-19 09:06:37 +0000
+++ lib/lp/services/job/celeryconfig.py	2012-07-23 11:26:22 +0000
@@ -35,7 +35,6 @@
     Doing this in a function is convenient for testing.
     """
     result = {}
-    CELERY_BEAT_QUEUE = 'celerybeat'
     celery_queues = {}
     queue_names = config.job_runner_queues.queues
     queue_names = queue_names.split(' ')
@@ -86,10 +85,7 @@
     result['CELERYBEAT_SCHEDULE'] = {
         'schedule-missing': {
             'task': 'lp.services.job.celeryjob.run_missing_ready',
-            'schedule': timedelta(seconds=600),
-            'options': {
-                'routing_key': CELERY_BEAT_QUEUE,
-                },
+            'schedule': timedelta(seconds=600)
         }
     }
     # See http://ask.github.com/celery/userguide/optimizing.html:

=== modified file 'lib/lp/services/job/celeryjob.py'
--- lib/lp/services/job/celeryjob.py	2012-07-16 10:05:00 +0000
+++ lib/lp/services/job/celeryjob.py	2012-07-23 11:26:22 +0000
@@ -16,11 +16,10 @@
 
 from logging import info
 import os
-from uuid import uuid4
 
 
 os.environ.setdefault('CELERY_CONFIG_MODULE', 'lp.services.job.celeryconfig')
-from celery.task import Task
+from celery.task import task
 from lazr.jobrunner.celerytask import RunJob
 from storm.zope.interfaces import IZStorm
 import transaction
@@ -81,41 +80,24 @@
             queued_job_ids]
 
 
-class RunMissingReady(Task):
+@task
+def run_missing_ready(_no_init=False):
     """Task to run any jobs that are ready but not scheduled.
 
     Currently supports only BranchScanJob.
     :param _no_init: For tests.  If True, do not perform the initialization.
     """
-    ignore_result = True
-
-    def run(self, _no_init=False):
-        if not _no_init:
-            task_init('run_missing_ready')
-        with TransactionFreeOperation():
-            count = 0
-            for job in find_missing_ready(BranchScanJob):
-                if not celery_enabled(job.__class__.__name__):
-                    continue
-                job.celeryCommitHook(True)
-                count += 1
-            info('Scheduled %d missing jobs.', count)
-            transaction.commit()
-
-    def apply_async(self, args=None, kwargs=None, task_id=None, publisher=None,
-                    connection=None, router=None, queues=None, **options):
-        """Create a task_id if none is specified.
-
-        Override the quite generic default task_id with one containing
-        the class name.
-
-        See also `celery.task.Task.apply_async()`.
-        """
-        if task_id is None:
-            task_id = '%s_%s' % (self.__class__.__name__, uuid4())
-        return super(RunMissingReady, self).apply_async(
-            args, kwargs, task_id, publisher, connection, router, queues,
-            **options)
+    if not _no_init:
+        task_init('run_missing_ready')
+    with TransactionFreeOperation():
+        count = 0
+        for job in find_missing_ready(BranchScanJob):
+            if not celery_enabled(job.__class__.__name__):
+                continue
+            job.celeryCommitHook(True)
+            count += 1
+        info('Scheduled %d missing jobs.', count)
+        transaction.commit()
 
 
 needs_zcml = True

=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2012-07-16 10:05:00 +0000
+++ lib/lp/services/job/runner.py	2012-07-23 11:26:22 +0000
@@ -217,7 +217,7 @@
             'result': SoftTimeLimitExceeded(1,),
             'task_id': 'cba7d07b-37fe-4f1d-a5f6-79ad7c30222f'}
         """
-        return '%s_%s_%s' % (
+        return '%s-%s-%s' % (
             self.__class__.__name__, self.job_id, uuid4())
 
     def runViaCelery(self, ignore_result=False):

=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
--- lib/lp/services/job/tests/test_celery_configuration.py	2012-07-19 09:06:37 +0000
+++ lib/lp/services/job/tests/test_celery_configuration.py	2012-07-23 11:26:22 +0000
@@ -25,8 +25,7 @@
         # Four queues are defined; the binding key for each queue is
         # just the queue name.
         queue_names = [
-            'branch_write_job', 'branch_write_job_slow', 'celerybeat', 'job',
-            'job_slow']
+            'branch_write_job', 'branch_write_job_slow', 'job', 'job_slow']
         queues = config['CELERY_QUEUES']
         self.assertEqual(queue_names, sorted(queues))
         for name in queue_names:

=== modified file 'lib/lp/services/job/tests/test_celeryjob.py'
--- lib/lp/services/job/tests/test_celeryjob.py	2012-07-20 14:34:05 +0000
+++ lib/lp/services/job/tests/test_celeryjob.py	2012-07-23 11:26:22 +0000
@@ -1,18 +1,10 @@
 # Copyright 2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
-from cStringIO import StringIO
-import sys
-from time import (
-    sleep,
-    time,
-    )
-from lazr.jobrunner.bin.clear_queues import clear_queues
 from lp.code.model.branchjob import BranchScanJob
 from lp.scripts.helpers import TransactionFreeOperation
 from lp.services.features.testing import FeatureFixture
 from lp.services.job.tests import (
-    celeryd,
     drain_celery_queues,
     monitor_celery,
     )
@@ -29,10 +21,10 @@
         super(TestRunMissingJobs, self).setUp()
         from lp.services.job.celeryjob import (
             find_missing_ready,
-            RunMissingReady,
+            run_missing_ready,
         )
         self.find_missing_ready = find_missing_ready
-        self.RunMissingReady = RunMissingReady
+        self.run_missing_ready = run_missing_ready
 
     def createMissingJob(self):
         job = BranchScanJob.create(self.factory.makeBranch())
@@ -56,7 +48,7 @@
         with monitor_celery() as responses:
             with dbuser('run_missing_ready'):
                 with TransactionFreeOperation.require():
-                    self.RunMissingReady().run(_no_init=True)
+                    self.run_missing_ready(_no_init=True)
         self.assertEqual([], responses)
 
     def test_run_missing_ready(self):
@@ -67,64 +59,5 @@
         with monitor_celery() as responses:
             with dbuser('run_missing_ready'):
                 with TransactionFreeOperation.require():
-                    self.RunMissingReady().run(_no_init=True)
+                    self.run_missing_ready().run(_no_init=True)
         self.assertEqual(1, len(responses))
-
-    # XXX: deryck 20-07-2012 bug=1027103
-    # Test fails spuriously in buildbot.
-    def disabled_test_run_missing_ready_does_not_return_results(self):
-        """The celerybeat task run_missing_ready does not create a
-        result queue."""
-        from lazr.jobrunner.celerytask import list_queued
-        job_queue_name = 'celerybeat'
-        request = self.RunMissingReady().apply_async(
-            kwargs={'_no_init': True}, queue=job_queue_name)
-        self.assertTrue(request.task_id.startswith('RunMissingReady_'))
-        result_queue_name = request.task_id.replace('-', '')
-        # Paranoia check: This test intends to prove that a Celery
-        # result queue fot the task created above will _not_ be created.
-        # This would also happen when "with celeryd()" would do nothing.
-        # So let's be sure that right now a task is queued...
-        self.assertEqual(
-            1, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
-        # ...and that list_queued() calls do not consume messages.
-        self.assertEqual(
-            1, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
-        # Wait at most 60 seconds for celeryd to start and process
-        # the task.
-        with celeryd(job_queue_name):
-            wait_until = time() + 60
-            while (time() < wait_until):
-                queued_tasks = list_queued(
-                    self.RunMissingReady.app, [job_queue_name])
-                if len(queued_tasks) == 0:
-                    break
-                sleep(.2)
-        # But now the message has been consumed by celeryd.
-        self.assertEqual(
-            0, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
-        # No result queue was created for the task.
-        try:
-            real_stdout = sys.stdout
-            real_stderr = sys.stderr
-            sys.stdout = fake_stdout = StringIO()
-            sys.stderr = fake_stderr = StringIO()
-            clear_queues(
-                ['script_name', '-c', 'lp.services.job.celeryconfig',
-                 result_queue_name])
-        finally:
-            sys.stdout = real_stdout
-            sys.stderr = real_stderr
-        fake_stdout = fake_stdout.getvalue()
-        fake_stderr = fake_stderr.getvalue()
-        self.assertEqual(
-            '', fake_stdout,
-            "Unexpected output from clear_queues:\n"
-            "stdout: %r\n"
-            "stderr: %r" % (fake_stdout, fake_stderr))
-        self.assertEqual(
-            "NOT_FOUND - no queue '%s' in vhost '/'\n" % result_queue_name,
-            fake_stderr,
-            "Unexpected output from clear_queues:\n"
-            "stdout: %r\n"
-            "stderr: %r" % (fake_stdout, fake_stderr))

=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py	2012-07-16 10:05:00 +0000
+++ lib/lp/services/job/tests/test_runner.py	2012-07-23 11:26:22 +0000
@@ -384,7 +384,7 @@
         task_id = job.taskId()
         uuid_expr = (
             '[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}')
-        mo = re.search('^NullJob_%s_%s$' % (job.job_id, uuid_expr), task_id)
+        mo = re.search('^NullJob-%s-%s$' % (job.job_id, uuid_expr), task_id)
         self.assertIsNot(None, mo)
 
 


Follow ups