launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #07923
[Merge] lp:~adeuring/launchpad/put-retry-jobs-into-celery-queue into lp:launchpad
Abel Deuring has proposed merging lp:~adeuring/launchpad/put-retry-jobs-into-celery-queue into lp:launchpad.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~adeuring/launchpad/put-retry-jobs-into-celery-queue/+merge/106012
One of the last missing details to run jobs via Celery: When a job
raises a retry execption, a new Celery request for the next attempt
to run the job must be issued.
A new request is created by BaseRunnableJob.runViaCelery(), which is
called in a hook when the DB transaction is committed. When a retry
execption occurs, the job runner calls job.queue(), which was up to
now only implemented in lp.services.job.model.job.Job, but this
class is/was not aware of any Celery related details, so I added
a method BaseRunnableJob.queue().
Calling celeryRunOnCommit() directly in BaseRunnableJob.queue() gives
other problems: The hook must be added before a transaction committed --
but lp.services.job.model.job.Job.queue() generally calls txn.commit()
twice, and job status is set to WAITING in the second commit call.
TO get the Celery hook placed into the "right" commit, I added the
optional parameter add_commit_hook to
lp.services.job.model.job.Job.queue().
BaseRunnableJob.runViaCelery() now checks if a lease for the job
exists. If so, it uses the lease expiration time as the ETA for the
new Celery request. The implementation looks a bit convoluted: Launchpad
uses datetime instances _with_ a timezone, but Celery works only with
datetime instances _without_ a timezone...
I changed UniversalJobSource so that we can create test jobs which do
not have a related DB table like "regular jobs".
test: ./bin/test -vvt lp.services.job.tests.test_retry_jobs_with_celery
no lint
--
https://code.launchpad.net/~adeuring/launchpad/put-retry-jobs-into-celery-queue/+merge/106012
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~adeuring/launchpad/put-retry-jobs-into-celery-queue into lp:launchpad.
=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py 2012-05-14 14:57:15 +0000
+++ lib/lp/services/job/model/job.py 2012-05-16 16:24:18 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2012 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""ORM object representing jobs."""
@@ -185,7 +185,8 @@
if manage_transaction:
transaction.commit()
- def queue(self, manage_transaction=False, abort_transaction=False):
+ def queue(self, manage_transaction=False, abort_transaction=False,
+ add_commit_hook=None):
"""See `IJob`."""
if manage_transaction:
if abort_transaction:
@@ -194,6 +195,8 @@
transaction.commit()
self._set_status(JobStatus.WAITING)
self.date_finished = datetime.datetime.now(UTC)
+ if add_commit_hook is not None:
+ add_commit_hook()
if manage_transaction:
transaction.commit()
@@ -262,6 +265,10 @@
job_id, module_name, class_name = ujob_id
bc_module = __import__(module_name, fromlist=[class_name])
db_class = getattr(bc_module, class_name)
+ try:
+ return db_class.makeInstance(job_id)
+ except AttributeError:
+ pass
store = IStore(db_class)
db_job = store.find(db_class, db_class.job == job_id).one()
if db_job is None:
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2012-05-11 20:37:47 +0000
+++ lib/lp/services/job/runner.py 2012-05-16 16:24:18 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2012 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Facilities for running Jobs."""
@@ -19,9 +19,11 @@
from calendar import timegm
from collections import defaultdict
+from datetime import datetime
import contextlib
import logging
import os
+import pytz
from resource import (
getrlimit,
RLIMIT_AS,
@@ -202,8 +204,18 @@
cls = CeleryRunJob
db_class = self.getDBClass()
ujob_id = (self.job_id, db_class.__module__, db_class.__name__)
+ if self.job.lease_expires is not None:
+ # Don't try to run the task before the lease expires:
+ # lazr.jobrunner.celerytask.RunJob.run() will silently ignore
+ # it. lease_expires has a timezone, but Celery does not like
+ # datetime instances with a timezone.
+ utc_now = datetime.now(tz=pytz.timezone('UTC'))
+ delta = self.job.lease_expires - utc_now
+ eta = datetime.now() + delta
+ else:
+ eta = None
return cls.apply_async(
- (ujob_id, self.config.dbuser), queue=self.task_queue)
+ (ujob_id, self.config.dbuser), queue=self.task_queue, eta=eta)
def getDBClass(self):
return self.context.__class__
@@ -223,6 +235,12 @@
current = transaction.get()
current.addAfterCommitHook(self.celeryCommitHook)
+ def queue(self, manage_transaction=False, abort_transaction=False):
+ """See `IJob`."""
+ self.job.queue(
+ manage_transaction, abort_transaction,
+ add_commit_hook=self.celeryRunOnCommit)
+
class BaseJobRunner(LazrJobRunner):
"""Runner of Jobs."""
=== added file 'lib/lp/services/job/tests/test_retry_jobs_with_celery.py'
--- lib/lp/services/job/tests/test_retry_jobs_with_celery.py 1970-01-01 00:00:00 +0000
+++ lib/lp/services/job/tests/test_retry_jobs_with_celery.py 2012-05-16 16:24:18 +0000
@@ -0,0 +1,126 @@
+# Copyright 2012 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for running jobs via Celery."""
+
+
+from datetime import datetime
+import pytz
+from time import (
+ sleep,
+ time,
+ )
+import transaction
+from lazr.delegates import delegates
+from zope.interface import implements
+
+from lp.services.config import config
+from lp.services.database.lpstorm import IStore
+from lp.services.features.testing import FeatureFixture
+from lp.services.job.interfaces.job import (
+ IJob,
+ IRunnableJob,
+ )
+from lp.services.job.interfaces.job import JobStatus
+from lp.services.job.model.job import Job
+from lp.services.job.runner import BaseRunnableJob
+from lp.services.job.tests import block_on_job
+from lp.testing import TestCaseWithFactory
+from lp.testing.layers import CeleryJobLayer
+
+
+class TestJob(BaseRunnableJob):
+ """A dummy job."""
+
+ implements(IRunnableJob)
+ delegates(IJob, 'job')
+
+ config = config.launchpad
+
+ def __init__(self, job_id=None):
+ if job_id is not None:
+ store = IStore(Job)
+ self.job = store.find(Job, id=job_id)[0]
+ else:
+ self.job = Job(max_retries=2)
+
+ def run(self):
+ pass
+
+ @classmethod
+ def makeInstance(cls, job_id):
+ return cls(job_id)
+
+ @classmethod
+ def getDBClass(cls):
+ return cls
+
+
+class RetryException(Exception):
+ """An exception used as a retry exception in TestJobWithRetryError."""
+
+
+class TestJobWithRetryError(TestJob):
+ """A dummy job."""
+
+ retry_error_types = (RetryException, )
+
+ def run(self):
+ """Raise a retry exception on the the first attempt to run the job."""
+ if self.job.attempt_count < 2:
+ # Shorten the lease time: We don't want to wait the
+ # default 300 seconds until the job is queued again.
+ self.job.lease_expires = datetime.fromtimestamp(
+ time() + 1, pytz.timezone('UTC'))
+ raise RetryException
+
+
+class TestRetryJobsViaCelery(TestCaseWithFactory):
+ """Tests for running jobs via Celery."""
+
+ layer = CeleryJobLayer
+
+ def test_TestJob(self):
+ # TestJob can be run via Celery.
+ self.useFixture(FeatureFixture({
+ 'jobs.celery.enabled_classes': 'TestJob'
+ }))
+ with block_on_job(self):
+ job = TestJob()
+ job.celeryRunOnCommit()
+ job_id = job.job_id
+ transaction.commit()
+ store = IStore(Job)
+ dbjob = store.find(Job, id=job_id)[0]
+ self.assertEqual(JobStatus.COMPLETED, dbjob.status)
+
+ def test_jobs_with_retry_exceptions_are_queued_again(self):
+ # A job that raises a retry error is automatically queued
+ # and executed again.
+ self.useFixture(FeatureFixture({
+ 'jobs.celery.enabled_classes': 'TestJobWithRetryError'
+ }))
+ with block_on_job(self):
+ job = TestJobWithRetryError()
+ job.celeryRunOnCommit()
+ job_id = job.job_id
+ transaction.commit()
+ store = IStore(Job)
+
+ # block_on_job() is not aware of the Celery request
+ # issued when the retry exception occurs, but we can
+ # check the status of the job in the database.
+ def job_finished():
+ transaction.abort()
+ dbjob = store.find(Job, id=job_id)[0]
+ return (
+ dbjob.status == JobStatus.COMPLETED and
+ dbjob.attempt_count == 2)
+ count = 0
+ while count < 50 and not job_finished():
+ sleep(0.2)
+ count += 1
+
+ dbjob = store.find(Job, id=job_id)[0]
+ self.assertEqual(2, dbjob.attempt_count)
+ self.assertEqual(JobStatus.COMPLETED, dbjob.status)
Follow ups