← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~abentley/lazr.jobrunner/run-via-celery into lp:lazr.jobrunner

 

Aaron Bentley has proposed merging lp:~abentley/lazr.jobrunner/run-via-celery into lp:lazr.jobrunner.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~abentley/lazr.jobrunner/run-via-celery/+merge/97963

Support running jobs via Celery

- Implement memory_limit context manager, so that Jobs whose source specifies a memory limit are killed if they exceed it.
- Provide LeaseHeld exception so that lease handling is tested.
- Implement RunJob Celery Task that can be subclassed for particular job sources.
- Remove run_job and set_job_source because job sources will be specified by the RunJob subclass

Testing
- Implement FileJob and FileJobSource for use in integration tests
- Add no-op save method to FakeJob to simplify FileJob implementation
- Add config1.py module for integration tests
- Implement tempdir context manager for integration tests
- Implement celeryd context manager for integration tests
-- 
https://code.launchpad.net/~abentley/lazr.jobrunner/run-via-celery/+merge/97963
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/lazr.jobrunner/run-via-celery into lp:lazr.jobrunner.
=== modified file 'Makefile'
--- Makefile	2012-02-28 20:16:39 +0000
+++ Makefile	2012-03-16 19:06:20 +0000
@@ -1,5 +1,5 @@
 check: bin/python
-	bin/python setup.py test
+	bin/nosetests
 
 develop: bin/python
 	bin/buildout

=== modified file 'src/lazr/jobrunner/jobrunner.py'
--- src/lazr/jobrunner/jobrunner.py	2012-03-15 15:14:59 +0000
+++ src/lazr/jobrunner/jobrunner.py	2012-03-16 19:06:20 +0000
@@ -16,31 +16,82 @@
 
 __metaclass__ = type
 
-import contextlib
+
+from contextlib import contextmanager
 import logging
+from resource import (
+    getrlimit,
+    RLIMIT_AS,
+    setrlimit,
+    )
 import sys
 
+from celery.task import Task
+
 
 class SuspendJobException(Exception):
     """Raised when a running job wants to suspend itself."""
     pass
 
 
+class LeaseHeld(Exception):
+    """Raised when a lease cannot be acquired."""
+
+
 class JobStatus:
+
+    class COMPLETED:
+        title = 'completed'
+        value = 1
+
     class WAITING:
-        title = "waiting"
+        title = 'waiting'
+        value = 2
 
     class RUNNING:
         title = "running"
-
-    class COMPLETED:
-        title = "completed"
+        value = 3
 
     class FAILED:
         title = "failed"
+        value = 4
 
     class SUSPENDED:
         title = "suspended"
+        value = 5
+
+    by_value = dict(
+        (cls.value, cls) for cls in [COMPLETED, WAITING, RUNNING,
+        FAILED, SUSPENDED])
+
+
+class RunJob(Task):
+
+    abstract = True
+
+    oops_config = None
+
+    def run(self, job_id):
+        job = self.job_source.get(job_id)
+        try:
+            job.acquireLease()
+        except LeaseHeld:
+            return
+        runner = JobRunner(oops_config=self.oops_config)
+        with memory_limit(self.job_source.memory_limit):
+            runner.runJobHandleError(job)
+
+
+@contextmanager
+def memory_limit(limit):
+    if limit is not None:
+        orig_soft_limit, hard_limit = getrlimit(RLIMIT_AS)
+        setrlimit(RLIMIT_AS, (limit, hard_limit))
+    try:
+        yield
+    finally:
+        if limit is not None:
+            setrlimit(RLIMIT_AS, (orig_soft_limit, hard_limit))
 
 
 class NoJobSource(Exception):
@@ -94,21 +145,6 @@
         return ()
 
 
-_job_source = None
-
-
-def set_job_source(job_source):
-    global _job_source
-    _job_source = job_source
-
-
-def run_job(job_id):
-    if _job_source is None:
-        raise NoJobSource
-    job = _job_source.get(job_id)
-    job.run()
-
-
 class JobRunner:
 
     def __init__(self, logger=None, oops_config=None, oopsMessage=None):
@@ -184,7 +220,7 @@
                 return report
 
     @staticmethod
-    @contextlib.contextmanager
+    @contextmanager
     def oopsMessage(message):
         """Add an oops message to be included in oopses from this context."""
         yield

=== added file 'src/lazr/jobrunner/tests/config1.py'
--- src/lazr/jobrunner/tests/config1.py	1970-01-01 00:00:00 +0000
+++ src/lazr/jobrunner/tests/config1.py	2012-03-16 19:06:20 +0000
@@ -0,0 +1,10 @@
+BROKER_VHOST = "/"
+CELERY_RESULT_BACKEND = "amqp"
+CELERY_IMPORTS = ("lazr.jobrunner.tests.test_jobrunner", )
+CELERYD_CONCURRENCY = 1
+import os
+import oops
+CELERY_ANNOTATIONS = {
+    "run_file_job": {"file_job_dir": os.environ['FILE_JOB_DIR'],
+                     'oops_config': oops.Config()}
+    }

=== modified file 'src/lazr/jobrunner/tests/test_jobrunner.py'
--- src/lazr/jobrunner/tests/test_jobrunner.py	2012-03-15 15:14:59 +0000
+++ src/lazr/jobrunner/tests/test_jobrunner.py	2012-03-16 19:06:20 +0000
@@ -14,28 +14,40 @@
 # You should have received a copy of the GNU Lesser General Public License
 # along with lazr.jobrunner. If not, see <http://www.gnu.org/licenses/>.
 
-_metaclass__ = type
+__metaclass__ = type
 
 import contextlib
+import errno
+import json
 import logging
+import os.path
+from resource import (
+    getrlimit,
+    RLIMIT_AS,
+    )
+import subprocess
+import shutil
+import tempfile
+from time import sleep
+from unittest import TestCase
+
+from celery.exceptions import SoftTimeLimitExceeded
 import oops
-from unittest import TestCase
 from zope.testing.loghandler import Handler
 
 from lazr.jobrunner.jobrunner import (
     BaseJob,
     JobRunner,
     JobStatus,
-    NoJobSource,
-    run_job,
-    set_job_source,
+    LeaseHeld,
+    RunJob,
     SuspendJobException,
     )
 
 
 class FakeJob(BaseJob):
 
-    retry_error_types = []
+    retry_error_types = ()
 
     def __init__(self, job_id, failure=None):
         super(FakeJob, self).__init__(job_id)
@@ -44,12 +56,33 @@
         self.notifyOops_called = False
         self.notifyUserError_called = False
         self.queue_call_count = 0
+        self.lease_held = False
+
+    def save(self):
+        pass
+
+    def acquireLease(self):
+        if self.lease_held:
+            raise LeaseHeld
+        self.lease_held = True
 
     def run(self):
         self.unrun = False
         if self.failure is not None:
             raise self.failure
 
+    def start(self, manage_transaction=False):
+        super(FakeJob, self).start(manage_transaction)
+        self.save()
+
+    def complete(self, manage_transaction=False):
+        super(FakeJob, self).complete(manage_transaction)
+        self.save()
+
+    def fail(self, manage_transaction=False):
+        super(FakeJob, self).fail(manage_transaction)
+        self.save()
+
     def notifyOops(self, oops_report):
         self.notifyOops_called = True
 
@@ -66,6 +99,8 @@
 
 class FakeJobSource:
 
+    memory_limit = None
+
     def __init__(self):
         self.jobs = {}
 
@@ -73,23 +108,230 @@
         return self.jobs[job_id]
 
 
-class TestRunJobs(TestCase):
-
-    def tearDown(self):
-        set_job_source(None)
-        super(TestRunJobs, self).tearDown()
-
-    def test_run_no_job_source(self):
-        self.assertRaises(NoJobSource, run_job, 10)
+class FileJob(FakeJob):
+
+    def __init__(self, job_source, job_id, output=None,
+                 status=JobStatus.WAITING, exception=None, sleep=None):
+        super(FileJob, self).__init__(job_id)
+        self.job_source = job_source
+        self.output = output
+        self.status = status
+        self.exception = exception
+        self.sleep = sleep
+
+    def save(self):
+        self.job_source.set(self)
+
+    def run(self):
+        super(FileJob, self).run()
+        if self.sleep is not None:
+            sleep(self.sleep)
+        if self.exception is not None:
+            raise Exception(self.exception)
+        if self.output is not None:
+            self.job_source.set_output(self, self.output)
+
+
+class FileJobSource:
+
+    memory_limit = None
+
+    def __init__(self, root):
+        self.root = root
+        self.job_root = os.path.join(self.root, 'job')
+        self.output_root = os.path.join(self.root, 'output')
+        def ensure_dir(path):
+            try:
+                os.mkdir(path)
+            except OSError, e:
+                if e.errno != errno.EEXIST:
+                    raise
+        ensure_dir(self.job_root)
+        ensure_dir(self.output_root)
+
+    def _job_file(self, job_id, mode):
+        return open(os.path.join(self.job_root, str(job_id)), mode)
+
+    def _job_output_file(self, job_id, mode):
+        return open(os.path.join(self.output_root, str(job_id)), mode)
+
+    def get(self, job_id):
+        with self._job_file(job_id, 'r') as job_file:
+            job_data = json.load(job_file)
+            job_data['status'] = JobStatus.by_value[job_data['status']]
+            return FileJob(self, **job_data)
+
+    def set(self, job):
+        with self._job_file(job.job_id, 'w') as job_file:
+            job_info = {
+                'job_id': job.job_id,
+                'output': job.output,
+                'status': job.status.value,
+                'exception': job.exception,
+                'sleep': job.sleep,
+            }
+            json.dump(job_info, job_file)
+
+    def get_output(self, job):
+        try:
+            with self._job_output_file(job.job_id, 'r') as job_output_file:
+                    return job_output_file.read()
+        except IOError, e:
+            if e.errno == errno.ENOENT:
+                return None
+            raise
+
+    def set_output(self, job, output):
+        with self._job_output_file(job.job_id, 'w') as job_output_file:
+            job_output_file.write(output)
+
+
+class TestRunJob(TestCase):
+
+    @staticmethod
+    def makeFakeJobSource(job=None):
+        js = FakeJobSource()
+        if job is None:
+            job = FakeJob(10)
+        js.jobs[job.job_id] = job
+        return js
+
+    @staticmethod
+    def runJob(js):
+        task = RunJob()
+        task.job_source = js
+        task.run(10)
 
     def test_run(self):
+        js = self.makeFakeJobSource()
+        self.assertTrue(js.jobs[10].unrun)
+        self.runJob(js)
+        self.assertFalse(js.jobs[10].unrun)
+
+    def test_memory_limit(self):
+
+        class MemoryCheckJob(FakeJob):
+
+            def run(self):
+                super(MemoryCheckJob, self).run()
+                self.current_memory_limit = getrlimit(RLIMIT_AS)[0]
+
+        start_limits = getrlimit(RLIMIT_AS)
         js = FakeJobSource()
-        set_job_source(js)
-        job = FakeJob(10)
+        job = MemoryCheckJob(10)
         js.jobs[10] = job
-        self.assertTrue(job.unrun)
-        run_job(10)
-        self.assertFalse(job.unrun)
+        js.memory_limit = 1024 ** 3
+        task = RunJob()
+        task.job_source = js
+        task.run(10)
+        self.assertEqual(1024 ** 3, job.current_memory_limit)
+        self.assertEqual(start_limits, getrlimit(RLIMIT_AS))
+
+    def test_acquires_lease(self):
+        js = self.makeFakeJobSource()
+        self.assertFalse(js.jobs[10].lease_held)
+        self.runJob(js)
+        self.assertTrue(js.jobs[10].lease_held)
+
+    def test_skips_failed_acquisition(self):
+        js = self.makeFakeJobSource()
+        js.jobs[10].acquireLease()
+        self.runJob(js)
+        self.assertTrue(js.jobs[10].unrun)
+
+
+def get_root():
+    import lazr.jobrunner
+    root = os.path.join(os.path.dirname(lazr.jobrunner.__file__), '../../../')
+    return os.path.normpath(root)
+
+
+@contextlib.contextmanager
+def tempdir():
+    dirname = tempfile.mkdtemp()
+    try:
+        yield dirname
+    finally:
+        shutil.rmtree(dirname)
+
+
+@contextlib.contextmanager
+def celeryd(config_module, file_job_dir):
+    cmdname = os.path.join(get_root(), 'bin/celeryd')
+    environ = dict(os.environ)
+    environ['FILE_JOB_DIR'] = file_job_dir
+    proc = subprocess.Popen([cmdname, '--config', config_module], env=environ,
+                            stderr=subprocess.PIPE)
+    try:
+        yield proc
+    finally:
+        proc.terminate()
+        proc.wait()
+
+
+class RunFileJob(RunJob):
+
+    name = 'run_file_job'
+
+    def __init__(self):
+        file_job_dir = None
+
+    @property
+    def job_source(self):
+        return FileJobSource(self.file_job_dir)
+
+
+class TestCeleryD(TestCase):
+
+    def test_run_job(self):
+        with tempdir() as temp_dir:
+            js = FileJobSource(temp_dir)
+            job = FileJob(js, 10, 'my_output')
+            job.save()
+            result = RunFileJob.delay(10)
+            self.assertIs(None, js.get_output(job))
+            self.assertEqual(JobStatus.WAITING, job.status)
+            with celeryd('lazr.jobrunner.tests.config1', temp_dir):
+                result.wait(10)
+            job = js.get(job.job_id)
+            self.assertEqual('my_output', js.get_output(job))
+            self.assertEqual(JobStatus.COMPLETED, job.status)
+
+    def run_file_job(self, temp_dir, config='lazr.jobrunner.tests.config1',
+                     **kwargs):
+        js = FileJobSource(temp_dir)
+        job = FileJob(js, 10, **kwargs)
+        job.save()
+        result = RunFileJob.delay(10)
+        with celeryd(config, temp_dir) as proc:
+            try:
+                result.wait(10)
+            except SoftTimeLimitExceeded:
+                pass
+        job = js.get(job.job_id)
+        return job, proc
+
+    def test_run_job_emits_oopses(self):
+        with tempdir() as temp_dir:
+            job, proc = self.run_file_job(
+                temp_dir, exception='Catch me if you can!')
+            err = proc.stderr.read()
+            self.assertEqual(JobStatus.FAILED, job.status)
+            self.assertIs(None, job.job_source.get_output(job))
+            self.assertIn(
+                "OOPS while executing job 10: [] Exception(u'Catch me if you"
+                " can!',)", err)
+
+    def test_timeout_long(self):
+        """Raises exception when a job exceeds the configured time limit."""
+        with tempdir() as temp_dir:
+            job, proc = self.run_file_job(
+                temp_dir, config='lazr.jobrunner.tests.time_limit_config',
+                sleep=10)
+        self.assertEqual(JobStatus.FAILED, job.status)
+        err = proc.stderr.read()
+        self.assertIn(
+            'OOPS while executing job 10: [] SoftTimeLimitExceeded', err)
 
 
 class OOPSTestRepository:

=== added file 'src/lazr/jobrunner/tests/time_limit_config.py'
--- src/lazr/jobrunner/tests/time_limit_config.py	1970-01-01 00:00:00 +0000
+++ src/lazr/jobrunner/tests/time_limit_config.py	2012-03-16 19:06:20 +0000
@@ -0,0 +1,11 @@
+BROKER_VHOST = "/"
+CELERY_RESULT_BACKEND = "amqp"
+CELERY_IMPORTS = ("lazr.jobrunner.tests.test_jobrunner", )
+CELERYD_CONCURRENCY = 1
+CELERYD_TASK_SOFT_TIME_LIMIT = 1
+import os
+import oops
+CELERY_ANNOTATIONS = {
+    "run_file_job": {"file_job_dir": os.environ['FILE_JOB_DIR'],
+                     'oops_config': oops.Config()}
+    }