launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #07082
[Merge] lp:~adeuring/lazr.jobrunner/slow-lane into lp:lazr.jobrunner
Abel Deuring has proposed merging lp:~adeuring/lazr.jobrunner/slow-lane into lp:lazr.jobrunner.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~adeuring/lazr.jobrunner/slow-lane/+merge/101576
This branch implements a "fast lane/slow lane" mechanism in lazr.jobrunner.
The core idea is simple: If the configuration of a celeryd instance
has the parameter FALLBACK_QUEUE defined, a job that is aborted
by a SoftTimeLimitExceeded exception, it is put again into the
queue specified by FALLBACK_QUEUE.
Working on this change, I noticed that celery tasks which time
out leave a message in the result queue, if task.ignore_result == False.
I tried to consume this message via
try:
result.wait()
except SoftTimeLimitExceeded:
result.wait()
but this did not work, so the timeout related tests (test_timeout_long,
test_timeout_in_fast_lane_passes_in_slow_lane,
test_timeout_in_fast_lane_and_slow_lane) now use another Task, where
ignore_result is True. This requires a variant of the method
run_file_job(), which does not try to call result.wait(). The new
method run_file_job_ignore_result() wait instead for some time until
stops the celery instance.
To keep the AQMP server "clean", I added a check of the queues of the
local rabbitmq server in the methods setUp() and tearDown() of TestCeleryD.
It seems to me that AMQP does not have any mechanism to get a list
of all current queues from a server, so I used the rabbitmq webservice API
instead.
--
https://code.launchpad.net/~adeuring/lazr.jobrunner/slow-lane/+merge/101576
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~adeuring/lazr.jobrunner/slow-lane into lp:lazr.jobrunner.
=== modified file 'src/lazr/jobrunner/celerytask.py'
--- src/lazr/jobrunner/celerytask.py 2012-03-22 18:02:23 +0000
+++ src/lazr/jobrunner/celerytask.py 2012-04-11 14:59:23 +0000
@@ -18,6 +18,7 @@
from celery.task import Task
+from functools import partial
from lazr.jobrunner.jobrunner import (
JobRunner,
LeaseHeld,
@@ -42,4 +43,19 @@
return
runner = self.getJobRunner()
with memory_limit(self.job_source.memory_limit):
- runner.runJobHandleError(job)
+ runner.runJobHandleError(job, self.fallbackToSlowerLane(job_id))
+
+ def fallbackToSlowerLane(self, job_id):
+ """Return a callable that is called by the job runner when
+ a request times out.
+
+ The callable should try to put the job into another queue. If
+ such a queue is not defined, return None.
+ """
+ fallback_queue = self.app.conf.get('FALLBACK_QUEUE')
+ if fallback_queue is None:
+ return None
+ return partial(self.reQueue, job_id, fallback_queue)
+
+ def reQueue(self, job_id, fallback_queue):
+ self.apply_async(args=(job_id, ), queue=fallback_queue)
=== modified file 'src/lazr/jobrunner/jobrunner.py'
--- src/lazr/jobrunner/jobrunner.py 2012-03-27 09:23:46 +0000
+++ src/lazr/jobrunner/jobrunner.py 2012-04-11 14:59:23 +0000
@@ -17,6 +17,7 @@
__metaclass__ = type
+from celery.exceptions import SoftTimeLimitExceeded
from contextlib import contextmanager
import logging
from resource import (
@@ -122,7 +123,7 @@
"""Send notifications about a user error."""
raise NotImplementedError
- def queue(self, manage_transaction=False):
+ def queue(self, manage_transaction=False, abort_transaction=False):
self.status = JobStatus.WAITING
def getOopsVars(self):
@@ -149,7 +150,7 @@
def job_str(job):
return '%r (ID %d)' % (job, job.job_id)
- def runJob(self, job):
+ def runJob(self, job, fallback=None):
"""Attempt to run a job, updating its status as appropriate."""
self.logger.info(
'Running %s in status %s' % (
@@ -171,6 +172,12 @@
self.logger.debug("Job suspended itself")
job.suspend(manage_transaction=True)
self.incomplete_jobs.append(job)
+ except SoftTimeLimitExceeded:
+ if fallback is not None:
+ job.queue(manage_transaction=True, abort_transaction=True)
+ fallback()
+ else:
+ raise
else:
job.complete(manage_transaction=True)
self.completed_jobs.append(job)
@@ -179,12 +186,12 @@
self.incomplete_jobs.append(job)
raise
- def runJobHandleError(self, job):
+ def runJobHandleError(self, job, fallback=None):
"""Run the specified job, handling errors."""
with self.oopsMessage(dict(job.getOopsVars())):
try:
try:
- self.runJob(job)
+ self.runJob(job, fallback)
except job.user_error_types, e:
self.logger.info(
'%s failed with user error %r.'
=== added file 'src/lazr/jobrunner/tests/config_two_queues.py'
--- src/lazr/jobrunner/tests/config_two_queues.py 1970-01-01 00:00:00 +0000
+++ src/lazr/jobrunner/tests/config_two_queues.py 2012-04-11 14:59:23 +0000
@@ -0,0 +1,17 @@
+BROKER_VHOST = "/"
+CELERY_RESULT_BACKEND = "amqp"
+CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
+CELERYD_CONCURRENCY = 1
+CELERY_QUEUES = {
+ "standard": {"binding_key": "job.standard"},
+ "standard_slow": {"binding_key": "job.standard.slow"},
+ }
+CELERY_DEFAULT_EXCHANGE = "standard"
+CELERY_DEFAULT_QUEUE = "standard"
+CELERY_CREATE_MISSING_QUEUES = False
+import os
+import oops
+CELERY_ANNOTATIONS = {
+ "run_file_job": {"file_job_dir": os.environ['FILE_JOB_DIR'],
+ 'oops_config': oops.Config()}
+ }
=== modified file 'src/lazr/jobrunner/tests/test_celerytask.py'
--- src/lazr/jobrunner/tests/test_celerytask.py 2012-03-26 14:17:00 +0000
+++ src/lazr/jobrunner/tests/test_celerytask.py 2012-04-11 14:59:23 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012 Canonical Ltd. All rights reserved.
+ # Copyright 2012 Canonical Ltd. All rights reserved.
#
# This file is part of lazr.jobrunner
#
@@ -31,6 +31,8 @@
import tempfile
from time import sleep
from unittest import TestCase
+import urllib2
+
os.environ.setdefault('CELERY_CONFIG_MODULE', 'lazr.jobrunner.celeryconfig')
from celery.exceptions import SoftTimeLimitExceeded
@@ -61,8 +63,8 @@
proc.wait()
-def celeryd(config_module, file_job_dir):
- cmd_args = ('--config', config_module)
+def celeryd(config_module, file_job_dir, queue='celery'):
+ cmd_args = ('--config', config_module, '--queue', queue)
environ = dict(os.environ)
environ['FILE_JOB_DIR'] = file_job_dir
return running('bin/celeryd', cmd_args, environ, cwd=get_root())
@@ -102,6 +104,12 @@
def save(self):
self.job_source.set(self)
+ def queue(self, manage_transaction=False, abort_transaction=False):
+ self.job_source.set_output(
+ self, 'queue(manage_transaction=%s, abort_transaction=%s)\n'
+ % (manage_transaction, abort_transaction))
+ self.status = JobStatus.WAITING
+
def run(self):
super(FileJob, self).run()
if self.sleep is not None:
@@ -163,7 +171,7 @@
raise
def set_output(self, job, output):
- with self._job_output_file(job.job_id, 'w') as job_output_file:
+ with self._job_output_file(job.job_id, 'a') as job_output_file:
job_output_file.write(output)
@@ -178,6 +186,13 @@
return FileJobSource(self.file_job_dir)
+class RunFileJobNoResult(RunFileJob):
+
+ ignore_result = True
+
+ name = 'run_file_job_no_result'
+
+
class TestRunJob(TestCase):
@staticmethod
@@ -234,6 +249,48 @@
class TestCeleryD(TestCase):
+ def getQueueInfo(self):
+ auth_handler = urllib2.HTTPBasicAuthHandler()
+ auth_handler.add_password(
+ realm='Management: Web UI', user='guest', passwd='guest',
+ uri='http://localhost:55672/api/queues')
+ opener = urllib2.build_opener(auth_handler)
+ info = opener.open('http://localhost:55672/api/queues').read()
+ info = json.loads(info)
+ # info is a list of dictionaries with details about the queues.
+ # We are only interested in the name of the queues and the
+ # number of messages they hold.
+ info = [(item['name'], item['messages']) for item in info]
+ return dict(info)
+
+ def setUp(self):
+ super(TestCeleryD, self).setUp()
+ self.queue_status_during_setup = self.getQueueInfo()
+
+ def tearDown(self):
+ current_queue_status = self.getQueueInfo()
+ bad_queues = []
+ for name in current_queue_status:
+ old_value = self.queue_status_during_setup.get(name)
+ new_value = current_queue_status[name]
+ if old_value is not None:
+ if old_value != new_value:
+ bad_queues.append(
+ 'number of messages in queue %s changed from %i to %i'
+ % (name, old_value, new_value))
+ elif new_value != 0:
+ bad_queues.append(
+ 'new queue %s with %r messages' % (name, new_value))
+ else:
+ # We have the same number of messages in an existing
+ # queue. That is probably fine.
+ pass
+ if bad_queues:
+ error = (
+ 'Test left message queues in a different state:\n%s'
+ % '\n'.join(bad_queues))
+ self.fail(error)
+
def test_run_job(self):
with tempdir() as temp_dir:
js = FileJobSource(temp_dir)
@@ -249,22 +306,38 @@
self.assertEqual(JobStatus.COMPLETED, job.status)
def run_file_job(self, temp_dir, config='lazr.jobrunner.tests.config1',
- **kwargs):
+ queue='celery', **kwargs):
js = FileJobSource(temp_dir)
job = FileJob(js, 10, **kwargs)
job.save()
- result = RunFileJob.delay(10)
- with celeryd(config, temp_dir) as proc:
+ result = RunFileJob.apply_async(args=(10, ), queue=queue)
+ with celeryd(config, temp_dir, queue) as proc:
try:
result.wait(10)
except SoftTimeLimitExceeded:
pass
job = js.get(job.job_id)
- return job, proc
+ return job, js, proc
+
+ def run_file_job_ignore_result(self, temp_dir, wait_time,
+ config='lazr.jobrunner.tests.config1',
+ queue='celery', **kwargs):
+ # If a timeout occurs when Task.ignore_results == True,
+ # two messages are sent, a call of result.wait() will
+ # consume the first message; the second message will stay in
+ # the result message queue.
+ js = FileJobSource(temp_dir)
+ job = FileJob(js, 10, **kwargs)
+ job.save()
+ RunFileJobNoResult.apply_async(args=(10, ), queue=queue)
+ with celeryd(config, temp_dir, queue) as proc:
+ sleep(wait_time)
+ job = js.get(job.job_id)
+ return job, js, proc
def test_run_job_emits_oopses(self):
with tempdir() as temp_dir:
- job, proc = self.run_file_job(
+ job, js, proc = self.run_file_job(
temp_dir, exception='Catch me if you can!')
err = proc.stderr.read()
self.assertEqual(JobStatus.FAILED, job.status)
@@ -276,10 +349,56 @@
def test_timeout_long(self):
"""Raises exception when a job exceeds the configured time limit."""
with tempdir() as temp_dir:
- job, proc = self.run_file_job(
- temp_dir, config='lazr.jobrunner.tests.time_limit_config',
- sleep=10)
+ job, js, proc = self.run_file_job_ignore_result(
+ temp_dir, wait_time=2,
+ config='lazr.jobrunner.tests.time_limit_config',
+ sleep=3)
self.assertEqual(JobStatus.FAILED, job.status)
err = proc.stderr.read()
self.assertIn(
'OOPS while executing job 10: [] SoftTimeLimitExceeded', err)
+
+ def test_timeout_in_fast_lane_passes_in_slow_lane(self):
+ # If a fast and a slow lane are configured, jobs which time out
+ # in the fast lane are queued again in the slow lane.
+ with tempdir() as temp_dir:
+ with celeryd(
+ 'lazr.jobrunner.tests.time_limit_config_slow_lane',
+ temp_dir, queue='standard_slow'):
+ # The fast lane times out after one second; the job
+ # is then queued again in the slow lane, where it runs
+ # three seconds. Wait five seconds to check the result.
+ job, js, proc = self.run_file_job_ignore_result(
+ temp_dir, wait_time=5,
+ config='lazr.jobrunner.tests.time_limit_config_fast_lane',
+ queue='standard', sleep=3)
+ job = js.get(job.job_id)
+ job_output = js.get_output(job)
+ self.assertEqual(
+ 'queue(manage_transaction=True, abort_transaction=True)\n',
+ job_output)
+
+ self.assertEqual(JobStatus.COMPLETED, job.status)
+
+ def test_timeout_in_fast_lane_and_slow_lane(self):
+ # If a fast and a slow lane are configured, jobs which time out
+ # in the fast lane are queued again in the slow lane.
+ with tempdir() as temp_dir:
+ with celeryd(
+ 'lazr.jobrunner.tests.time_limit_config_slow_lane',
+ temp_dir, queue='standard_slow'):
+ # The fast lane times out after one second; the job
+ # is then queued again in the slow lane, where it times
+ # out again after five seconds. Wait seven seconds to
+ # check the result.
+ job, js, proc = self.run_file_job_ignore_result(
+ temp_dir, wait_time=7,
+ config='lazr.jobrunner.tests.time_limit_config_fast_lane',
+ queue='standard', sleep=7)
+ job = js.get(job.job_id)
+ job_output = js.get_output(job)
+ self.assertEqual(
+ 'queue(manage_transaction=True, abort_transaction=True)\n',
+ job_output)
+
+ self.assertEqual(JobStatus.FAILED, job.status)
=== added file 'src/lazr/jobrunner/tests/time_limit_config_fast_lane.py'
--- src/lazr/jobrunner/tests/time_limit_config_fast_lane.py 1970-01-01 00:00:00 +0000
+++ src/lazr/jobrunner/tests/time_limit_config_fast_lane.py 2012-04-11 14:59:23 +0000
@@ -0,0 +1,3 @@
+from config_two_queues import *
+CELERYD_TASK_SOFT_TIME_LIMIT = 1
+FALLBACK_QUEUE = 'standard_slow'
=== added file 'src/lazr/jobrunner/tests/time_limit_config_slow_lane.py'
--- src/lazr/jobrunner/tests/time_limit_config_slow_lane.py 1970-01-01 00:00:00 +0000
+++ src/lazr/jobrunner/tests/time_limit_config_slow_lane.py 2012-04-11 14:59:23 +0000
@@ -0,0 +1,2 @@
+from config_two_queues import *
+CELERYD_TASK_SOFT_TIME_LIMIT = 5