← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/launchpad/celery-retry-policy into lp:launchpad

 

Colin Watson has proposed merging lp:~cjwatson/launchpad/celery-retry-policy into lp:launchpad.

Commit message:
Fix Celery job-running behaviour when RabbitMQ is unconfigured or unresponsive.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/celery-retry-policy/+merge/366757

I couldn't figure out how to test the first of these two commits in the test suite, but I've tested each of them in sequence on dogfood and it's fixed the problems I was running into there.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/celery-retry-policy into lp:launchpad.
=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py	2019-03-09 09:26:01 +0000
+++ lib/lp/services/job/celeryconfig.py	2019-05-01 12:33:18 +0000
@@ -78,6 +78,14 @@
         # fixed once the CELERYD_TASK_SOFT_TIME_LIMIT override is gone.
         result['worker_concurrency'] = config[queue].concurrency
 
+    # Don't spend too long failing when RabbitMQ isn't running.  We can fall
+    # back to waiting for the job to be run via cron.
+    result['broker_transport_options'] = {
+        'max_retries': 3,
+        'interval_start': 0,
+        'interval_step': 0.1,
+        'interval_max': 0.1,
+        }
     result['broker_url'] = 'amqp://%s:%s@%s/%s' % (
         config.rabbitmq.userid, config.rabbitmq.password,
         config.rabbitmq.host, config.rabbitmq.virtual_host)

=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2019-03-09 09:26:01 +0000
+++ lib/lp/services/job/runner.py	2019-05-01 12:33:18 +0000
@@ -270,7 +270,8 @@
 
     def celeryRunOnCommit(self):
         """Configure transaction so that commit runs this job via Celery."""
-        if not celery_enabled(self.__class__.__name__):
+        if (config.rabbitmq.host is None or
+                not celery_enabled(self.__class__.__name__)):
             return
         current = transaction.get()
         current.addAfterCommitHook(self.celeryCommitHook)

=== modified file 'lib/lp/services/job/tests/test_celery.py'
--- lib/lp/services/job/tests/test_celery.py	2017-06-29 18:06:03 +0000
+++ lib/lp/services/job/tests/test_celery.py	2019-05-01 12:33:18 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012-2017 Canonical Ltd.  This software is licensed under the
+# Copyright 2012-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Tests for running jobs via Celery."""
@@ -33,7 +33,10 @@
     )
 from lp.services.job.model.job import Job
 from lp.services.job.runner import BaseRunnableJob
-from lp.services.job.tests import block_on_job
+from lp.services.job.tests import (
+    block_on_job,
+    monitor_celery,
+    )
 from lp.testing import TestCaseWithFactory
 from lp.testing.layers import CeleryJobLayer
 
@@ -202,3 +205,19 @@
                 ]))
         self.assertEqual(3, job.attempt_count)
         self.assertEqual(JobStatus.COMPLETED, job.status)
+
+    def test_without_rabbitmq(self):
+        # If no RabbitMQ host is configured, the job is not run via Celery.
+        self.pushConfig('rabbitmq', host='none')
+        self.useFixture(FeatureFixture({
+            'jobs.celery.enabled_classes': 'TestJob'
+        }))
+        with monitor_celery() as responses:
+            job = TestJob()
+            job.celeryRunOnCommit()
+            job_id = job.job_id
+            transaction.commit()
+        self.assertEqual([], responses)
+        store = IStore(Job)
+        dbjob = store.find(Job, id=job_id)[0]
+        self.assertEqual(JobStatus.WAITING, dbjob.status)


Follow ups