← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~adeuring/launchpad/put-retry-jobs-into-celery-queue into lp:launchpad

 

Abel Deuring has proposed merging lp:~adeuring/launchpad/put-retry-jobs-into-celery-queue into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~adeuring/launchpad/put-retry-jobs-into-celery-queue/+merge/106012

One of the last missing details to run jobs via Celery: When a job
raises a retry execption, a new Celery request for the next attempt
to run the job must be issued.

A new request is created by BaseRunnableJob.runViaCelery(), which is
called in a hook when the DB transaction is committed. When a retry
execption occurs, the job runner calls job.queue(), which was up to
now only implemented in lp.services.job.model.job.Job, but this
class is/was not aware of any Celery related details, so I added
a method BaseRunnableJob.queue().

Calling celeryRunOnCommit() directly in BaseRunnableJob.queue() gives
other problems: The hook must be added before a transaction committed --
but lp.services.job.model.job.Job.queue() generally calls txn.commit()
twice, and job status is set to WAITING in the second commit call.
TO get the Celery hook placed into the "right" commit, I added the
optional parameter add_commit_hook to
lp.services.job.model.job.Job.queue().

BaseRunnableJob.runViaCelery() now checks if a lease for the job
exists. If so, it uses the lease expiration time as the ETA for the
new Celery request. The implementation looks a bit convoluted: Launchpad
uses datetime instances _with_ a timezone, but Celery works only with
datetime instances _without_ a timezone...

I changed UniversalJobSource so that we can create test jobs which do
not have a related DB table like "regular jobs".


test: ./bin/test -vvt lp.services.job.tests.test_retry_jobs_with_celery

no lint

-- 
https://code.launchpad.net/~adeuring/launchpad/put-retry-jobs-into-celery-queue/+merge/106012
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~adeuring/launchpad/put-retry-jobs-into-celery-queue into lp:launchpad.
=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py	2012-05-14 14:57:15 +0000
+++ lib/lp/services/job/model/job.py	2012-05-16 16:24:18 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """ORM object representing jobs."""
@@ -185,7 +185,8 @@
         if manage_transaction:
             transaction.commit()
 
-    def queue(self, manage_transaction=False, abort_transaction=False):
+    def queue(self, manage_transaction=False, abort_transaction=False,
+              add_commit_hook=None):
         """See `IJob`."""
         if manage_transaction:
             if abort_transaction:
@@ -194,6 +195,8 @@
             transaction.commit()
         self._set_status(JobStatus.WAITING)
         self.date_finished = datetime.datetime.now(UTC)
+        if add_commit_hook is not None:
+            add_commit_hook()
         if manage_transaction:
             transaction.commit()
 
@@ -262,6 +265,10 @@
         job_id, module_name, class_name = ujob_id
         bc_module = __import__(module_name, fromlist=[class_name])
         db_class = getattr(bc_module, class_name)
+        try:
+            return db_class.makeInstance(job_id)
+        except AttributeError:
+            pass
         store = IStore(db_class)
         db_job = store.find(db_class, db_class.job == job_id).one()
         if db_job is None:

=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2012-05-11 20:37:47 +0000
+++ lib/lp/services/job/runner.py	2012-05-16 16:24:18 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Facilities for running Jobs."""
@@ -19,9 +19,11 @@
 
 from calendar import timegm
 from collections import defaultdict
+from datetime import datetime
 import contextlib
 import logging
 import os
+import pytz
 from resource import (
     getrlimit,
     RLIMIT_AS,
@@ -202,8 +204,18 @@
             cls = CeleryRunJob
         db_class = self.getDBClass()
         ujob_id = (self.job_id, db_class.__module__, db_class.__name__)
+        if self.job.lease_expires is not None:
+            # Don't try to run the task before the lease expires:
+            # lazr.jobrunner.celerytask.RunJob.run() will silently ignore
+            # it. lease_expires has a timezone, but Celery does not like
+            # datetime instances with a timezone.
+            utc_now = datetime.now(tz=pytz.timezone('UTC'))
+            delta = self.job.lease_expires - utc_now
+            eta = datetime.now() + delta
+        else:
+            eta = None
         return cls.apply_async(
-            (ujob_id, self.config.dbuser), queue=self.task_queue)
+            (ujob_id, self.config.dbuser), queue=self.task_queue, eta=eta)
 
     def getDBClass(self):
         return self.context.__class__
@@ -223,6 +235,12 @@
         current = transaction.get()
         current.addAfterCommitHook(self.celeryCommitHook)
 
+    def queue(self, manage_transaction=False, abort_transaction=False):
+        """See `IJob`."""
+        self.job.queue(
+            manage_transaction, abort_transaction,
+            add_commit_hook=self.celeryRunOnCommit)
+
 
 class BaseJobRunner(LazrJobRunner):
     """Runner of Jobs."""

=== added file 'lib/lp/services/job/tests/test_retry_jobs_with_celery.py'
--- lib/lp/services/job/tests/test_retry_jobs_with_celery.py	1970-01-01 00:00:00 +0000
+++ lib/lp/services/job/tests/test_retry_jobs_with_celery.py	2012-05-16 16:24:18 +0000
@@ -0,0 +1,126 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for running jobs via Celery."""
+
+
+from datetime import datetime
+import pytz
+from time import (
+    sleep,
+    time,
+    )
+import transaction
+from lazr.delegates import delegates
+from zope.interface import implements
+
+from lp.services.config import config
+from lp.services.database.lpstorm import IStore
+from lp.services.features.testing import FeatureFixture
+from lp.services.job.interfaces.job import (
+    IJob,
+    IRunnableJob,
+    )
+from lp.services.job.interfaces.job import JobStatus
+from lp.services.job.model.job import Job
+from lp.services.job.runner import BaseRunnableJob
+from lp.services.job.tests import block_on_job
+from lp.testing import TestCaseWithFactory
+from lp.testing.layers import CeleryJobLayer
+
+
+class TestJob(BaseRunnableJob):
+    """A dummy job."""
+
+    implements(IRunnableJob)
+    delegates(IJob, 'job')
+
+    config = config.launchpad
+
+    def __init__(self, job_id=None):
+        if job_id is not None:
+            store = IStore(Job)
+            self.job = store.find(Job, id=job_id)[0]
+        else:
+            self.job = Job(max_retries=2)
+
+    def run(self):
+        pass
+
+    @classmethod
+    def makeInstance(cls, job_id):
+        return cls(job_id)
+
+    @classmethod
+    def getDBClass(cls):
+        return cls
+
+
+class RetryException(Exception):
+    """An exception used as a retry exception in TestJobWithRetryError."""
+
+
+class TestJobWithRetryError(TestJob):
+    """A dummy job."""
+
+    retry_error_types = (RetryException, )
+
+    def run(self):
+        """Raise a retry exception on the the first attempt to run the job."""
+        if self.job.attempt_count < 2:
+            # Shorten the lease time: We don't want to wait the
+            # default 300 seconds until the job is queued again.
+            self.job.lease_expires = datetime.fromtimestamp(
+                time() + 1, pytz.timezone('UTC'))
+            raise RetryException
+
+
+class TestRetryJobsViaCelery(TestCaseWithFactory):
+    """Tests for running jobs via Celery."""
+
+    layer = CeleryJobLayer
+
+    def test_TestJob(self):
+        # TestJob can be run via Celery.
+        self.useFixture(FeatureFixture({
+            'jobs.celery.enabled_classes': 'TestJob'
+        }))
+        with block_on_job(self):
+            job = TestJob()
+            job.celeryRunOnCommit()
+            job_id = job.job_id
+            transaction.commit()
+        store = IStore(Job)
+        dbjob = store.find(Job, id=job_id)[0]
+        self.assertEqual(JobStatus.COMPLETED, dbjob.status)
+
+    def test_jobs_with_retry_exceptions_are_queued_again(self):
+        # A job that raises a retry error is automatically queued
+        # and executed again.
+        self.useFixture(FeatureFixture({
+            'jobs.celery.enabled_classes': 'TestJobWithRetryError'
+        }))
+        with block_on_job(self):
+            job = TestJobWithRetryError()
+            job.celeryRunOnCommit()
+            job_id = job.job_id
+            transaction.commit()
+            store = IStore(Job)
+
+            # block_on_job() is not aware of the Celery request
+            # issued when the retry exception occurs, but we can
+            # check the status of the job in the database.
+            def job_finished():
+                transaction.abort()
+                dbjob = store.find(Job, id=job_id)[0]
+                return (
+                    dbjob.status == JobStatus.COMPLETED and
+                    dbjob.attempt_count == 2)
+            count = 0
+            while count < 50 and not job_finished():
+                sleep(0.2)
+                count += 1
+
+        dbjob = store.find(Job, id=job_id)[0]
+        self.assertEqual(2, dbjob.attempt_count)
+        self.assertEqual(JobStatus.COMPLETED, dbjob.status)


Follow ups