← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~adeuring/lazr.jobrunner/slow-lane into lp:lazr.jobrunner

 

Abel Deuring has proposed merging lp:~adeuring/lazr.jobrunner/slow-lane into lp:lazr.jobrunner.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~adeuring/lazr.jobrunner/slow-lane/+merge/101576

This branch implements a "fast lane/slow lane" mechanism in lazr.jobrunner.

The core idea is simple: If the configuration of a celeryd instance
has the parameter FALLBACK_QUEUE defined, a job that is aborted
by a SoftTimeLimitExceeded exception, it is put again into the
queue specified by FALLBACK_QUEUE.

Working on this change, I noticed that celery tasks which time
out leave a message in the result queue, if task.ignore_result == False.
I tried to consume this message via

    try:
        result.wait()
    except SoftTimeLimitExceeded:
        result.wait()

but this did not work, so the timeout related tests (test_timeout_long,
test_timeout_in_fast_lane_passes_in_slow_lane,
test_timeout_in_fast_lane_and_slow_lane) now use another Task, where
ignore_result is True. This requires a variant of the method
run_file_job(), which does not try to call result.wait(). The new
method run_file_job_ignore_result() wait instead for some time until
stops the celery instance.

To keep the AQMP server "clean", I added a check of the queues of the
local rabbitmq server in the methods setUp() and tearDown() of TestCeleryD.
It seems to me that AMQP does not have any mechanism to get a list
of all current queues from a server, so I used the rabbitmq webservice API
instead.
-- 
https://code.launchpad.net/~adeuring/lazr.jobrunner/slow-lane/+merge/101576
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~adeuring/lazr.jobrunner/slow-lane into lp:lazr.jobrunner.
=== modified file 'src/lazr/jobrunner/celerytask.py'
--- src/lazr/jobrunner/celerytask.py	2012-03-22 18:02:23 +0000
+++ src/lazr/jobrunner/celerytask.py	2012-04-11 14:59:23 +0000
@@ -18,6 +18,7 @@
 
 
 from celery.task import Task
+from functools import partial
 from lazr.jobrunner.jobrunner import (
     JobRunner,
     LeaseHeld,
@@ -42,4 +43,19 @@
             return
         runner = self.getJobRunner()
         with memory_limit(self.job_source.memory_limit):
-            runner.runJobHandleError(job)
+            runner.runJobHandleError(job, self.fallbackToSlowerLane(job_id))
+
+    def fallbackToSlowerLane(self, job_id):
+        """Return a callable that is called by the job runner when
+        a request times out.
+
+        The callable should try to put the job into another queue. If
+        such a queue is not defined, return None.
+        """
+        fallback_queue = self.app.conf.get('FALLBACK_QUEUE')
+        if fallback_queue is None:
+            return None
+        return partial(self.reQueue, job_id, fallback_queue)
+
+    def reQueue(self, job_id, fallback_queue):
+        self.apply_async(args=(job_id, ), queue=fallback_queue)

=== modified file 'src/lazr/jobrunner/jobrunner.py'
--- src/lazr/jobrunner/jobrunner.py	2012-03-27 09:23:46 +0000
+++ src/lazr/jobrunner/jobrunner.py	2012-04-11 14:59:23 +0000
@@ -17,6 +17,7 @@
 __metaclass__ = type
 
 
+from celery.exceptions import SoftTimeLimitExceeded
 from contextlib import contextmanager
 import logging
 from resource import (
@@ -122,7 +123,7 @@
         """Send notifications about a user error."""
         raise NotImplementedError
 
-    def queue(self, manage_transaction=False):
+    def queue(self, manage_transaction=False, abort_transaction=False):
         self.status = JobStatus.WAITING
 
     def getOopsVars(self):
@@ -149,7 +150,7 @@
     def job_str(job):
         return '%r (ID %d)' % (job, job.job_id)
 
-    def runJob(self, job):
+    def runJob(self, job, fallback=None):
         """Attempt to run a job, updating its status as appropriate."""
         self.logger.info(
             'Running %s in status %s' % (
@@ -171,6 +172,12 @@
                 self.logger.debug("Job suspended itself")
                 job.suspend(manage_transaction=True)
                 self.incomplete_jobs.append(job)
+            except SoftTimeLimitExceeded:
+                if fallback is not None:
+                    job.queue(manage_transaction=True, abort_transaction=True)
+                    fallback()
+                else:
+                    raise
             else:
                 job.complete(manage_transaction=True)
                 self.completed_jobs.append(job)
@@ -179,12 +186,12 @@
             self.incomplete_jobs.append(job)
             raise
 
-    def runJobHandleError(self, job):
+    def runJobHandleError(self, job, fallback=None):
         """Run the specified job, handling errors."""
         with self.oopsMessage(dict(job.getOopsVars())):
             try:
                 try:
-                    self.runJob(job)
+                    self.runJob(job, fallback)
                 except job.user_error_types, e:
                     self.logger.info(
                         '%s failed with user error %r.'

=== added file 'src/lazr/jobrunner/tests/config_two_queues.py'
--- src/lazr/jobrunner/tests/config_two_queues.py	1970-01-01 00:00:00 +0000
+++ src/lazr/jobrunner/tests/config_two_queues.py	2012-04-11 14:59:23 +0000
@@ -0,0 +1,17 @@
+BROKER_VHOST = "/"
+CELERY_RESULT_BACKEND = "amqp"
+CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
+CELERYD_CONCURRENCY = 1
+CELERY_QUEUES = {
+    "standard": {"binding_key": "job.standard"},
+    "standard_slow": {"binding_key": "job.standard.slow"},
+    }
+CELERY_DEFAULT_EXCHANGE = "standard"
+CELERY_DEFAULT_QUEUE = "standard"
+CELERY_CREATE_MISSING_QUEUES = False
+import os
+import oops
+CELERY_ANNOTATIONS = {
+    "run_file_job": {"file_job_dir": os.environ['FILE_JOB_DIR'],
+                     'oops_config': oops.Config()}
+    }

=== modified file 'src/lazr/jobrunner/tests/test_celerytask.py'
--- src/lazr/jobrunner/tests/test_celerytask.py	2012-03-26 14:17:00 +0000
+++ src/lazr/jobrunner/tests/test_celerytask.py	2012-04-11 14:59:23 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012 Canonical Ltd.  All rights reserved.
+ # Copyright 2012 Canonical Ltd.  All rights reserved.
 #
 # This file is part of lazr.jobrunner
 #
@@ -31,6 +31,8 @@
 import tempfile
 from time import sleep
 from unittest import TestCase
+import urllib2
+
 os.environ.setdefault('CELERY_CONFIG_MODULE', 'lazr.jobrunner.celeryconfig')
 
 from celery.exceptions import SoftTimeLimitExceeded
@@ -61,8 +63,8 @@
         proc.wait()
 
 
-def celeryd(config_module, file_job_dir):
-    cmd_args = ('--config', config_module)
+def celeryd(config_module, file_job_dir, queue='celery'):
+    cmd_args = ('--config', config_module, '--queue', queue)
     environ = dict(os.environ)
     environ['FILE_JOB_DIR'] = file_job_dir
     return running('bin/celeryd', cmd_args, environ, cwd=get_root())
@@ -102,6 +104,12 @@
     def save(self):
         self.job_source.set(self)
 
+    def queue(self, manage_transaction=False, abort_transaction=False):
+        self.job_source.set_output(
+            self, 'queue(manage_transaction=%s, abort_transaction=%s)\n'
+            % (manage_transaction, abort_transaction))
+        self.status = JobStatus.WAITING
+
     def run(self):
         super(FileJob, self).run()
         if self.sleep is not None:
@@ -163,7 +171,7 @@
             raise
 
     def set_output(self, job, output):
-        with self._job_output_file(job.job_id, 'w') as job_output_file:
+        with self._job_output_file(job.job_id, 'a') as job_output_file:
             job_output_file.write(output)
 
 
@@ -178,6 +186,13 @@
         return FileJobSource(self.file_job_dir)
 
 
+class RunFileJobNoResult(RunFileJob):
+
+    ignore_result = True
+
+    name = 'run_file_job_no_result'
+
+
 class TestRunJob(TestCase):
 
     @staticmethod
@@ -234,6 +249,48 @@
 
 class TestCeleryD(TestCase):
 
+    def getQueueInfo(self):
+        auth_handler = urllib2.HTTPBasicAuthHandler()
+        auth_handler.add_password(
+            realm='Management: Web UI', user='guest', passwd='guest',
+            uri='http://localhost:55672/api/queues')
+        opener = urllib2.build_opener(auth_handler)
+        info = opener.open('http://localhost:55672/api/queues').read()
+        info = json.loads(info)
+        # info is a list of dictionaries with details about the queues.
+        # We are only interested in the name of the queues and the
+        # number of messages they hold.
+        info = [(item['name'], item['messages']) for item in info]
+        return dict(info)
+
+    def setUp(self):
+        super(TestCeleryD, self).setUp()
+        self.queue_status_during_setup = self.getQueueInfo()
+
+    def tearDown(self):
+        current_queue_status = self.getQueueInfo()
+        bad_queues = []
+        for name in current_queue_status:
+            old_value = self.queue_status_during_setup.get(name)
+            new_value = current_queue_status[name]
+            if old_value is not None:
+                if old_value != new_value:
+                    bad_queues.append(
+                        'number of messages in queue %s changed from %i to %i'
+                        % (name, old_value, new_value))
+            elif new_value != 0:
+                bad_queues.append(
+                    'new queue %s with %r messages' % (name, new_value))
+            else:
+                # We have the same number of messages in an existing
+                # queue. That is probably fine.
+                pass
+        if bad_queues:
+            error = (
+                'Test left message queues in a different state:\n%s'
+                % '\n'.join(bad_queues))
+            self.fail(error)
+
     def test_run_job(self):
         with tempdir() as temp_dir:
             js = FileJobSource(temp_dir)
@@ -249,22 +306,38 @@
             self.assertEqual(JobStatus.COMPLETED, job.status)
 
     def run_file_job(self, temp_dir, config='lazr.jobrunner.tests.config1',
-                     **kwargs):
+                     queue='celery', **kwargs):
         js = FileJobSource(temp_dir)
         job = FileJob(js, 10, **kwargs)
         job.save()
-        result = RunFileJob.delay(10)
-        with celeryd(config, temp_dir) as proc:
+        result = RunFileJob.apply_async(args=(10, ), queue=queue)
+        with celeryd(config, temp_dir, queue) as proc:
             try:
                 result.wait(10)
             except SoftTimeLimitExceeded:
                 pass
         job = js.get(job.job_id)
-        return job, proc
+        return job, js, proc
+
+    def run_file_job_ignore_result(self, temp_dir, wait_time,
+                                   config='lazr.jobrunner.tests.config1',
+                                   queue='celery', **kwargs):
+        # If a timeout occurs when Task.ignore_results == True,
+        # two messages are sent, a call of result.wait() will
+        # consume the first message; the second message will stay in
+        # the result message queue.
+        js = FileJobSource(temp_dir)
+        job = FileJob(js, 10, **kwargs)
+        job.save()
+        RunFileJobNoResult.apply_async(args=(10, ), queue=queue)
+        with celeryd(config, temp_dir, queue) as proc:
+            sleep(wait_time)
+        job = js.get(job.job_id)
+        return job, js, proc
 
     def test_run_job_emits_oopses(self):
         with tempdir() as temp_dir:
-            job, proc = self.run_file_job(
+            job, js, proc = self.run_file_job(
                 temp_dir, exception='Catch me if you can!')
             err = proc.stderr.read()
             self.assertEqual(JobStatus.FAILED, job.status)
@@ -276,10 +349,56 @@
     def test_timeout_long(self):
         """Raises exception when a job exceeds the configured time limit."""
         with tempdir() as temp_dir:
-            job, proc = self.run_file_job(
-                temp_dir, config='lazr.jobrunner.tests.time_limit_config',
-                sleep=10)
+            job, js, proc = self.run_file_job_ignore_result(
+                temp_dir, wait_time=2,
+                config='lazr.jobrunner.tests.time_limit_config',
+                sleep=3)
         self.assertEqual(JobStatus.FAILED, job.status)
         err = proc.stderr.read()
         self.assertIn(
             'OOPS while executing job 10: [] SoftTimeLimitExceeded', err)
+
+    def test_timeout_in_fast_lane_passes_in_slow_lane(self):
+        # If a fast and a slow lane are configured, jobs which time out
+        # in the fast lane are queued again in the slow lane.
+        with tempdir() as temp_dir:
+            with celeryd(
+                'lazr.jobrunner.tests.time_limit_config_slow_lane',
+                temp_dir, queue='standard_slow'):
+                # The fast lane times out after one second; the job
+                # is then queued again in the slow lane, where it runs
+                # three seconds. Wait five seconds to check the result.
+                job, js, proc = self.run_file_job_ignore_result(
+                    temp_dir, wait_time=5,
+                    config='lazr.jobrunner.tests.time_limit_config_fast_lane',
+                    queue='standard', sleep=3)
+            job = js.get(job.job_id)
+            job_output = js.get_output(job)
+            self.assertEqual(
+                'queue(manage_transaction=True, abort_transaction=True)\n',
+                job_output)
+
+        self.assertEqual(JobStatus.COMPLETED, job.status)
+
+    def test_timeout_in_fast_lane_and_slow_lane(self):
+        # If a fast and a slow lane are configured, jobs which time out
+        # in the fast lane are queued again in the slow lane.
+        with tempdir() as temp_dir:
+            with celeryd(
+                'lazr.jobrunner.tests.time_limit_config_slow_lane',
+                temp_dir, queue='standard_slow'):
+                # The fast lane times out after one second; the job
+                # is then queued again in the slow lane, where it times
+                # out again after five seconds. Wait seven seconds to
+                # check the result.
+                job, js, proc = self.run_file_job_ignore_result(
+                    temp_dir, wait_time=7,
+                    config='lazr.jobrunner.tests.time_limit_config_fast_lane',
+                    queue='standard', sleep=7)
+            job = js.get(job.job_id)
+            job_output = js.get_output(job)
+            self.assertEqual(
+                'queue(manage_transaction=True, abort_transaction=True)\n',
+                job_output)
+
+        self.assertEqual(JobStatus.FAILED, job.status)

=== added file 'src/lazr/jobrunner/tests/time_limit_config_fast_lane.py'
--- src/lazr/jobrunner/tests/time_limit_config_fast_lane.py	1970-01-01 00:00:00 +0000
+++ src/lazr/jobrunner/tests/time_limit_config_fast_lane.py	2012-04-11 14:59:23 +0000
@@ -0,0 +1,3 @@
+from config_two_queues import *
+CELERYD_TASK_SOFT_TIME_LIMIT = 1
+FALLBACK_QUEUE = 'standard_slow'

=== added file 'src/lazr/jobrunner/tests/time_limit_config_slow_lane.py'
--- src/lazr/jobrunner/tests/time_limit_config_slow_lane.py	1970-01-01 00:00:00 +0000
+++ src/lazr/jobrunner/tests/time_limit_config_slow_lane.py	2012-04-11 14:59:23 +0000
@@ -0,0 +1,2 @@
+from config_two_queues import *
+CELERYD_TASK_SOFT_TIME_LIMIT = 5