launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #10164
[Merge] lp:~adeuring/launchpad/rollback-15656 into lp:launchpad
Abel Deuring has proposed merging lp:~adeuring/launchpad/rollback-15656 into lp:launchpad.
Requested reviews:
Abel Deuring (adeuring)
For more details, see:
https://code.launchpad.net/~adeuring/launchpad/rollback-15656/+merge/116247
just a rollback
--
https://code.launchpad.net/~adeuring/launchpad/rollback-15656/+merge/116247
Your team Launchpad code reviewers is subscribed to branch lp:launchpad.
=== modified file 'lib/lp/services/config/schema-lazr.conf'
--- lib/lp/services/config/schema-lazr.conf 2012-07-19 13:55:15 +0000
+++ lib/lp/services/config/schema-lazr.conf 2012-07-23 11:26:22 +0000
@@ -1805,7 +1805,7 @@
[job_runner_queues]
# The names of all queues.
-queues: job job_slow branch_write_job branch_write_job_slow celerybeat
+queues: job job_slow branch_write_job branch_write_job_slow
# The main job queue.
[job]
@@ -1837,9 +1837,3 @@
timeout: 86400
fallback_queue:
concurrency: 1
-
-# The queue used for the celerybeat task RunMissingReady
-[celerybeat]
-timeout: 86400
-fallback_queue:
-concurrency: 1
=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py 2012-07-19 09:06:37 +0000
+++ lib/lp/services/job/celeryconfig.py 2012-07-23 11:26:22 +0000
@@ -35,7 +35,6 @@
Doing this in a function is convenient for testing.
"""
result = {}
- CELERY_BEAT_QUEUE = 'celerybeat'
celery_queues = {}
queue_names = config.job_runner_queues.queues
queue_names = queue_names.split(' ')
@@ -86,10 +85,7 @@
result['CELERYBEAT_SCHEDULE'] = {
'schedule-missing': {
'task': 'lp.services.job.celeryjob.run_missing_ready',
- 'schedule': timedelta(seconds=600),
- 'options': {
- 'routing_key': CELERY_BEAT_QUEUE,
- },
+ 'schedule': timedelta(seconds=600)
}
}
# See http://ask.github.com/celery/userguide/optimizing.html:
=== modified file 'lib/lp/services/job/celeryjob.py'
--- lib/lp/services/job/celeryjob.py 2012-07-16 10:05:00 +0000
+++ lib/lp/services/job/celeryjob.py 2012-07-23 11:26:22 +0000
@@ -16,11 +16,10 @@
from logging import info
import os
-from uuid import uuid4
os.environ.setdefault('CELERY_CONFIG_MODULE', 'lp.services.job.celeryconfig')
-from celery.task import Task
+from celery.task import task
from lazr.jobrunner.celerytask import RunJob
from storm.zope.interfaces import IZStorm
import transaction
@@ -81,41 +80,24 @@
queued_job_ids]
-class RunMissingReady(Task):
+@task
+def run_missing_ready(_no_init=False):
"""Task to run any jobs that are ready but not scheduled.
Currently supports only BranchScanJob.
:param _no_init: For tests. If True, do not perform the initialization.
"""
- ignore_result = True
-
- def run(self, _no_init=False):
- if not _no_init:
- task_init('run_missing_ready')
- with TransactionFreeOperation():
- count = 0
- for job in find_missing_ready(BranchScanJob):
- if not celery_enabled(job.__class__.__name__):
- continue
- job.celeryCommitHook(True)
- count += 1
- info('Scheduled %d missing jobs.', count)
- transaction.commit()
-
- def apply_async(self, args=None, kwargs=None, task_id=None, publisher=None,
- connection=None, router=None, queues=None, **options):
- """Create a task_id if none is specified.
-
- Override the quite generic default task_id with one containing
- the class name.
-
- See also `celery.task.Task.apply_async()`.
- """
- if task_id is None:
- task_id = '%s_%s' % (self.__class__.__name__, uuid4())
- return super(RunMissingReady, self).apply_async(
- args, kwargs, task_id, publisher, connection, router, queues,
- **options)
+ if not _no_init:
+ task_init('run_missing_ready')
+ with TransactionFreeOperation():
+ count = 0
+ for job in find_missing_ready(BranchScanJob):
+ if not celery_enabled(job.__class__.__name__):
+ continue
+ job.celeryCommitHook(True)
+ count += 1
+ info('Scheduled %d missing jobs.', count)
+ transaction.commit()
needs_zcml = True
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2012-07-16 10:05:00 +0000
+++ lib/lp/services/job/runner.py 2012-07-23 11:26:22 +0000
@@ -217,7 +217,7 @@
'result': SoftTimeLimitExceeded(1,),
'task_id': 'cba7d07b-37fe-4f1d-a5f6-79ad7c30222f'}
"""
- return '%s_%s_%s' % (
+ return '%s-%s-%s' % (
self.__class__.__name__, self.job_id, uuid4())
def runViaCelery(self, ignore_result=False):
=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
--- lib/lp/services/job/tests/test_celery_configuration.py 2012-07-19 09:06:37 +0000
+++ lib/lp/services/job/tests/test_celery_configuration.py 2012-07-23 11:26:22 +0000
@@ -25,8 +25,7 @@
# Four queues are defined; the binding key for each queue is
# just the queue name.
queue_names = [
- 'branch_write_job', 'branch_write_job_slow', 'celerybeat', 'job',
- 'job_slow']
+ 'branch_write_job', 'branch_write_job_slow', 'job', 'job_slow']
queues = config['CELERY_QUEUES']
self.assertEqual(queue_names, sorted(queues))
for name in queue_names:
=== modified file 'lib/lp/services/job/tests/test_celeryjob.py'
--- lib/lp/services/job/tests/test_celeryjob.py 2012-07-20 14:34:05 +0000
+++ lib/lp/services/job/tests/test_celeryjob.py 2012-07-23 11:26:22 +0000
@@ -1,18 +1,10 @@
# Copyright 2012 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
-from cStringIO import StringIO
-import sys
-from time import (
- sleep,
- time,
- )
-from lazr.jobrunner.bin.clear_queues import clear_queues
from lp.code.model.branchjob import BranchScanJob
from lp.scripts.helpers import TransactionFreeOperation
from lp.services.features.testing import FeatureFixture
from lp.services.job.tests import (
- celeryd,
drain_celery_queues,
monitor_celery,
)
@@ -29,10 +21,10 @@
super(TestRunMissingJobs, self).setUp()
from lp.services.job.celeryjob import (
find_missing_ready,
- RunMissingReady,
+ run_missing_ready,
)
self.find_missing_ready = find_missing_ready
- self.RunMissingReady = RunMissingReady
+ self.run_missing_ready = run_missing_ready
def createMissingJob(self):
job = BranchScanJob.create(self.factory.makeBranch())
@@ -56,7 +48,7 @@
with monitor_celery() as responses:
with dbuser('run_missing_ready'):
with TransactionFreeOperation.require():
- self.RunMissingReady().run(_no_init=True)
+ self.run_missing_ready(_no_init=True)
self.assertEqual([], responses)
def test_run_missing_ready(self):
@@ -67,64 +59,5 @@
with monitor_celery() as responses:
with dbuser('run_missing_ready'):
with TransactionFreeOperation.require():
- self.RunMissingReady().run(_no_init=True)
+ self.run_missing_ready().run(_no_init=True)
self.assertEqual(1, len(responses))
-
- # XXX: deryck 20-07-2012 bug=1027103
- # Test fails spuriously in buildbot.
- def disabled_test_run_missing_ready_does_not_return_results(self):
- """The celerybeat task run_missing_ready does not create a
- result queue."""
- from lazr.jobrunner.celerytask import list_queued
- job_queue_name = 'celerybeat'
- request = self.RunMissingReady().apply_async(
- kwargs={'_no_init': True}, queue=job_queue_name)
- self.assertTrue(request.task_id.startswith('RunMissingReady_'))
- result_queue_name = request.task_id.replace('-', '')
- # Paranoia check: This test intends to prove that a Celery
- # result queue fot the task created above will _not_ be created.
- # This would also happen when "with celeryd()" would do nothing.
- # So let's be sure that right now a task is queued...
- self.assertEqual(
- 1, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
- # ...and that list_queued() calls do not consume messages.
- self.assertEqual(
- 1, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
- # Wait at most 60 seconds for celeryd to start and process
- # the task.
- with celeryd(job_queue_name):
- wait_until = time() + 60
- while (time() < wait_until):
- queued_tasks = list_queued(
- self.RunMissingReady.app, [job_queue_name])
- if len(queued_tasks) == 0:
- break
- sleep(.2)
- # But now the message has been consumed by celeryd.
- self.assertEqual(
- 0, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
- # No result queue was created for the task.
- try:
- real_stdout = sys.stdout
- real_stderr = sys.stderr
- sys.stdout = fake_stdout = StringIO()
- sys.stderr = fake_stderr = StringIO()
- clear_queues(
- ['script_name', '-c', 'lp.services.job.celeryconfig',
- result_queue_name])
- finally:
- sys.stdout = real_stdout
- sys.stderr = real_stderr
- fake_stdout = fake_stdout.getvalue()
- fake_stderr = fake_stderr.getvalue()
- self.assertEqual(
- '', fake_stdout,
- "Unexpected output from clear_queues:\n"
- "stdout: %r\n"
- "stderr: %r" % (fake_stdout, fake_stderr))
- self.assertEqual(
- "NOT_FOUND - no queue '%s' in vhost '/'\n" % result_queue_name,
- fake_stderr,
- "Unexpected output from clear_queues:\n"
- "stdout: %r\n"
- "stderr: %r" % (fake_stdout, fake_stderr))
=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py 2012-07-16 10:05:00 +0000
+++ lib/lp/services/job/tests/test_runner.py 2012-07-23 11:26:22 +0000
@@ -384,7 +384,7 @@
task_id = job.taskId()
uuid_expr = (
'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}')
- mo = re.search('^NullJob_%s_%s$' % (job.job_id, uuid_expr), task_id)
+ mo = re.search('^NullJob-%s-%s$' % (job.job_id, uuid_expr), task_id)
self.assertIsNot(None, mo)
Follow ups