launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #06748
[Merge] lp:~abentley/lazr.jobrunner/run-via-celery into lp:lazr.jobrunner
Aaron Bentley has proposed merging lp:~abentley/lazr.jobrunner/run-via-celery into lp:lazr.jobrunner.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~abentley/lazr.jobrunner/run-via-celery/+merge/97963
Support running jobs via Celery
- Implement memory_limit context manager, so that Jobs whose source specifies a memory limit are killed if they exceed it.
- Provide LeaseHeld exception so that lease handling is tested.
- Implement RunJob Celery Task that can be subclassed for particular job sources.
- Remove run_job and set_job_source because job sources will be specified by the RunJob subclass
Testing
- Implement FileJob and FileJobSource for use in integration tests
- Add no-op save method to FakeJob to simplify FileJob implementation
- Add config1.py module for integration tests
- Implement tempdir context manager for integration tests
- Implement celeryd context manager for integration tests
--
https://code.launchpad.net/~abentley/lazr.jobrunner/run-via-celery/+merge/97963
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/lazr.jobrunner/run-via-celery into lp:lazr.jobrunner.
=== modified file 'Makefile'
--- Makefile 2012-02-28 20:16:39 +0000
+++ Makefile 2012-03-16 19:06:20 +0000
@@ -1,5 +1,5 @@
check: bin/python
- bin/python setup.py test
+ bin/nosetests
develop: bin/python
bin/buildout
=== modified file 'src/lazr/jobrunner/jobrunner.py'
--- src/lazr/jobrunner/jobrunner.py 2012-03-15 15:14:59 +0000
+++ src/lazr/jobrunner/jobrunner.py 2012-03-16 19:06:20 +0000
@@ -16,31 +16,82 @@
__metaclass__ = type
-import contextlib
+
+from contextlib import contextmanager
import logging
+from resource import (
+ getrlimit,
+ RLIMIT_AS,
+ setrlimit,
+ )
import sys
+from celery.task import Task
+
class SuspendJobException(Exception):
"""Raised when a running job wants to suspend itself."""
pass
+class LeaseHeld(Exception):
+ """Raised when a lease cannot be acquired."""
+
+
class JobStatus:
+
+ class COMPLETED:
+ title = 'completed'
+ value = 1
+
class WAITING:
- title = "waiting"
+ title = 'waiting'
+ value = 2
class RUNNING:
title = "running"
-
- class COMPLETED:
- title = "completed"
+ value = 3
class FAILED:
title = "failed"
+ value = 4
class SUSPENDED:
title = "suspended"
+ value = 5
+
+ by_value = dict(
+ (cls.value, cls) for cls in [COMPLETED, WAITING, RUNNING,
+ FAILED, SUSPENDED])
+
+
+class RunJob(Task):
+
+ abstract = True
+
+ oops_config = None
+
+ def run(self, job_id):
+ job = self.job_source.get(job_id)
+ try:
+ job.acquireLease()
+ except LeaseHeld:
+ return
+ runner = JobRunner(oops_config=self.oops_config)
+ with memory_limit(self.job_source.memory_limit):
+ runner.runJobHandleError(job)
+
+
+@contextmanager
+def memory_limit(limit):
+ if limit is not None:
+ orig_soft_limit, hard_limit = getrlimit(RLIMIT_AS)
+ setrlimit(RLIMIT_AS, (limit, hard_limit))
+ try:
+ yield
+ finally:
+ if limit is not None:
+ setrlimit(RLIMIT_AS, (orig_soft_limit, hard_limit))
class NoJobSource(Exception):
@@ -94,21 +145,6 @@
return ()
-_job_source = None
-
-
-def set_job_source(job_source):
- global _job_source
- _job_source = job_source
-
-
-def run_job(job_id):
- if _job_source is None:
- raise NoJobSource
- job = _job_source.get(job_id)
- job.run()
-
-
class JobRunner:
def __init__(self, logger=None, oops_config=None, oopsMessage=None):
@@ -184,7 +220,7 @@
return report
@staticmethod
- @contextlib.contextmanager
+ @contextmanager
def oopsMessage(message):
"""Add an oops message to be included in oopses from this context."""
yield
=== added file 'src/lazr/jobrunner/tests/config1.py'
--- src/lazr/jobrunner/tests/config1.py 1970-01-01 00:00:00 +0000
+++ src/lazr/jobrunner/tests/config1.py 2012-03-16 19:06:20 +0000
@@ -0,0 +1,10 @@
+BROKER_VHOST = "/"
+CELERY_RESULT_BACKEND = "amqp"
+CELERY_IMPORTS = ("lazr.jobrunner.tests.test_jobrunner", )
+CELERYD_CONCURRENCY = 1
+import os
+import oops
+CELERY_ANNOTATIONS = {
+ "run_file_job": {"file_job_dir": os.environ['FILE_JOB_DIR'],
+ 'oops_config': oops.Config()}
+ }
=== modified file 'src/lazr/jobrunner/tests/test_jobrunner.py'
--- src/lazr/jobrunner/tests/test_jobrunner.py 2012-03-15 15:14:59 +0000
+++ src/lazr/jobrunner/tests/test_jobrunner.py 2012-03-16 19:06:20 +0000
@@ -14,28 +14,40 @@
# You should have received a copy of the GNU Lesser General Public License
# along with lazr.jobrunner. If not, see <http://www.gnu.org/licenses/>.
-_metaclass__ = type
+__metaclass__ = type
import contextlib
+import errno
+import json
import logging
+import os.path
+from resource import (
+ getrlimit,
+ RLIMIT_AS,
+ )
+import subprocess
+import shutil
+import tempfile
+from time import sleep
+from unittest import TestCase
+
+from celery.exceptions import SoftTimeLimitExceeded
import oops
-from unittest import TestCase
from zope.testing.loghandler import Handler
from lazr.jobrunner.jobrunner import (
BaseJob,
JobRunner,
JobStatus,
- NoJobSource,
- run_job,
- set_job_source,
+ LeaseHeld,
+ RunJob,
SuspendJobException,
)
class FakeJob(BaseJob):
- retry_error_types = []
+ retry_error_types = ()
def __init__(self, job_id, failure=None):
super(FakeJob, self).__init__(job_id)
@@ -44,12 +56,33 @@
self.notifyOops_called = False
self.notifyUserError_called = False
self.queue_call_count = 0
+ self.lease_held = False
+
+ def save(self):
+ pass
+
+ def acquireLease(self):
+ if self.lease_held:
+ raise LeaseHeld
+ self.lease_held = True
def run(self):
self.unrun = False
if self.failure is not None:
raise self.failure
+ def start(self, manage_transaction=False):
+ super(FakeJob, self).start(manage_transaction)
+ self.save()
+
+ def complete(self, manage_transaction=False):
+ super(FakeJob, self).complete(manage_transaction)
+ self.save()
+
+ def fail(self, manage_transaction=False):
+ super(FakeJob, self).fail(manage_transaction)
+ self.save()
+
def notifyOops(self, oops_report):
self.notifyOops_called = True
@@ -66,6 +99,8 @@
class FakeJobSource:
+ memory_limit = None
+
def __init__(self):
self.jobs = {}
@@ -73,23 +108,230 @@
return self.jobs[job_id]
-class TestRunJobs(TestCase):
-
- def tearDown(self):
- set_job_source(None)
- super(TestRunJobs, self).tearDown()
-
- def test_run_no_job_source(self):
- self.assertRaises(NoJobSource, run_job, 10)
+class FileJob(FakeJob):
+
+ def __init__(self, job_source, job_id, output=None,
+ status=JobStatus.WAITING, exception=None, sleep=None):
+ super(FileJob, self).__init__(job_id)
+ self.job_source = job_source
+ self.output = output
+ self.status = status
+ self.exception = exception
+ self.sleep = sleep
+
+ def save(self):
+ self.job_source.set(self)
+
+ def run(self):
+ super(FileJob, self).run()
+ if self.sleep is not None:
+ sleep(self.sleep)
+ if self.exception is not None:
+ raise Exception(self.exception)
+ if self.output is not None:
+ self.job_source.set_output(self, self.output)
+
+
+class FileJobSource:
+
+ memory_limit = None
+
+ def __init__(self, root):
+ self.root = root
+ self.job_root = os.path.join(self.root, 'job')
+ self.output_root = os.path.join(self.root, 'output')
+ def ensure_dir(path):
+ try:
+ os.mkdir(path)
+ except OSError, e:
+ if e.errno != errno.EEXIST:
+ raise
+ ensure_dir(self.job_root)
+ ensure_dir(self.output_root)
+
+ def _job_file(self, job_id, mode):
+ return open(os.path.join(self.job_root, str(job_id)), mode)
+
+ def _job_output_file(self, job_id, mode):
+ return open(os.path.join(self.output_root, str(job_id)), mode)
+
+ def get(self, job_id):
+ with self._job_file(job_id, 'r') as job_file:
+ job_data = json.load(job_file)
+ job_data['status'] = JobStatus.by_value[job_data['status']]
+ return FileJob(self, **job_data)
+
+ def set(self, job):
+ with self._job_file(job.job_id, 'w') as job_file:
+ job_info = {
+ 'job_id': job.job_id,
+ 'output': job.output,
+ 'status': job.status.value,
+ 'exception': job.exception,
+ 'sleep': job.sleep,
+ }
+ json.dump(job_info, job_file)
+
+ def get_output(self, job):
+ try:
+ with self._job_output_file(job.job_id, 'r') as job_output_file:
+ return job_output_file.read()
+ except IOError, e:
+ if e.errno == errno.ENOENT:
+ return None
+ raise
+
+ def set_output(self, job, output):
+ with self._job_output_file(job.job_id, 'w') as job_output_file:
+ job_output_file.write(output)
+
+
+class TestRunJob(TestCase):
+
+ @staticmethod
+ def makeFakeJobSource(job=None):
+ js = FakeJobSource()
+ if job is None:
+ job = FakeJob(10)
+ js.jobs[job.job_id] = job
+ return js
+
+ @staticmethod
+ def runJob(js):
+ task = RunJob()
+ task.job_source = js
+ task.run(10)
def test_run(self):
+ js = self.makeFakeJobSource()
+ self.assertTrue(js.jobs[10].unrun)
+ self.runJob(js)
+ self.assertFalse(js.jobs[10].unrun)
+
+ def test_memory_limit(self):
+
+ class MemoryCheckJob(FakeJob):
+
+ def run(self):
+ super(MemoryCheckJob, self).run()
+ self.current_memory_limit = getrlimit(RLIMIT_AS)[0]
+
+ start_limits = getrlimit(RLIMIT_AS)
js = FakeJobSource()
- set_job_source(js)
- job = FakeJob(10)
+ job = MemoryCheckJob(10)
js.jobs[10] = job
- self.assertTrue(job.unrun)
- run_job(10)
- self.assertFalse(job.unrun)
+ js.memory_limit = 1024 ** 3
+ task = RunJob()
+ task.job_source = js
+ task.run(10)
+ self.assertEqual(1024 ** 3, job.current_memory_limit)
+ self.assertEqual(start_limits, getrlimit(RLIMIT_AS))
+
+ def test_acquires_lease(self):
+ js = self.makeFakeJobSource()
+ self.assertFalse(js.jobs[10].lease_held)
+ self.runJob(js)
+ self.assertTrue(js.jobs[10].lease_held)
+
+ def test_skips_failed_acquisition(self):
+ js = self.makeFakeJobSource()
+ js.jobs[10].acquireLease()
+ self.runJob(js)
+ self.assertTrue(js.jobs[10].unrun)
+
+
+def get_root():
+ import lazr.jobrunner
+ root = os.path.join(os.path.dirname(lazr.jobrunner.__file__), '../../../')
+ return os.path.normpath(root)
+
+
+@contextlib.contextmanager
+def tempdir():
+ dirname = tempfile.mkdtemp()
+ try:
+ yield dirname
+ finally:
+ shutil.rmtree(dirname)
+
+
+@contextlib.contextmanager
+def celeryd(config_module, file_job_dir):
+ cmdname = os.path.join(get_root(), 'bin/celeryd')
+ environ = dict(os.environ)
+ environ['FILE_JOB_DIR'] = file_job_dir
+ proc = subprocess.Popen([cmdname, '--config', config_module], env=environ,
+ stderr=subprocess.PIPE)
+ try:
+ yield proc
+ finally:
+ proc.terminate()
+ proc.wait()
+
+
+class RunFileJob(RunJob):
+
+ name = 'run_file_job'
+
+ def __init__(self):
+ file_job_dir = None
+
+ @property
+ def job_source(self):
+ return FileJobSource(self.file_job_dir)
+
+
+class TestCeleryD(TestCase):
+
+ def test_run_job(self):
+ with tempdir() as temp_dir:
+ js = FileJobSource(temp_dir)
+ job = FileJob(js, 10, 'my_output')
+ job.save()
+ result = RunFileJob.delay(10)
+ self.assertIs(None, js.get_output(job))
+ self.assertEqual(JobStatus.WAITING, job.status)
+ with celeryd('lazr.jobrunner.tests.config1', temp_dir):
+ result.wait(10)
+ job = js.get(job.job_id)
+ self.assertEqual('my_output', js.get_output(job))
+ self.assertEqual(JobStatus.COMPLETED, job.status)
+
+ def run_file_job(self, temp_dir, config='lazr.jobrunner.tests.config1',
+ **kwargs):
+ js = FileJobSource(temp_dir)
+ job = FileJob(js, 10, **kwargs)
+ job.save()
+ result = RunFileJob.delay(10)
+ with celeryd(config, temp_dir) as proc:
+ try:
+ result.wait(10)
+ except SoftTimeLimitExceeded:
+ pass
+ job = js.get(job.job_id)
+ return job, proc
+
+ def test_run_job_emits_oopses(self):
+ with tempdir() as temp_dir:
+ job, proc = self.run_file_job(
+ temp_dir, exception='Catch me if you can!')
+ err = proc.stderr.read()
+ self.assertEqual(JobStatus.FAILED, job.status)
+ self.assertIs(None, job.job_source.get_output(job))
+ self.assertIn(
+ "OOPS while executing job 10: [] Exception(u'Catch me if you"
+ " can!',)", err)
+
+ def test_timeout_long(self):
+ """Raises exception when a job exceeds the configured time limit."""
+ with tempdir() as temp_dir:
+ job, proc = self.run_file_job(
+ temp_dir, config='lazr.jobrunner.tests.time_limit_config',
+ sleep=10)
+ self.assertEqual(JobStatus.FAILED, job.status)
+ err = proc.stderr.read()
+ self.assertIn(
+ 'OOPS while executing job 10: [] SoftTimeLimitExceeded', err)
class OOPSTestRepository:
=== added file 'src/lazr/jobrunner/tests/time_limit_config.py'
--- src/lazr/jobrunner/tests/time_limit_config.py 1970-01-01 00:00:00 +0000
+++ src/lazr/jobrunner/tests/time_limit_config.py 2012-03-16 19:06:20 +0000
@@ -0,0 +1,11 @@
+BROKER_VHOST = "/"
+CELERY_RESULT_BACKEND = "amqp"
+CELERY_IMPORTS = ("lazr.jobrunner.tests.test_jobrunner", )
+CELERYD_CONCURRENCY = 1
+CELERYD_TASK_SOFT_TIME_LIMIT = 1
+import os
+import oops
+CELERY_ANNOTATIONS = {
+ "run_file_job": {"file_job_dir": os.environ['FILE_JOB_DIR'],
+ 'oops_config': oops.Config()}
+ }