launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #23593
[Merge] lp:~cjwatson/launchpad/celery-retry-policy into lp:launchpad
Colin Watson has proposed merging lp:~cjwatson/launchpad/celery-retry-policy into lp:launchpad.
Commit message:
Fix Celery job-running behaviour when RabbitMQ is unconfigured or unresponsive.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/celery-retry-policy/+merge/366757
I couldn't figure out how to test the first of these two commits in the test suite, but I've tested each of them in sequence on dogfood and it's fixed the problems I was running into there.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/celery-retry-policy into lp:launchpad.
=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py 2019-03-09 09:26:01 +0000
+++ lib/lp/services/job/celeryconfig.py 2019-05-01 12:33:18 +0000
@@ -78,6 +78,14 @@
# fixed once the CELERYD_TASK_SOFT_TIME_LIMIT override is gone.
result['worker_concurrency'] = config[queue].concurrency
+ # Don't spend too long failing when RabbitMQ isn't running. We can fall
+ # back to waiting for the job to be run via cron.
+ result['broker_transport_options'] = {
+ 'max_retries': 3,
+ 'interval_start': 0,
+ 'interval_step': 0.1,
+ 'interval_max': 0.1,
+ }
result['broker_url'] = 'amqp://%s:%s@%s/%s' % (
config.rabbitmq.userid, config.rabbitmq.password,
config.rabbitmq.host, config.rabbitmq.virtual_host)
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2019-03-09 09:26:01 +0000
+++ lib/lp/services/job/runner.py 2019-05-01 12:33:18 +0000
@@ -270,7 +270,8 @@
def celeryRunOnCommit(self):
"""Configure transaction so that commit runs this job via Celery."""
- if not celery_enabled(self.__class__.__name__):
+ if (config.rabbitmq.host is None or
+ not celery_enabled(self.__class__.__name__)):
return
current = transaction.get()
current.addAfterCommitHook(self.celeryCommitHook)
=== modified file 'lib/lp/services/job/tests/test_celery.py'
--- lib/lp/services/job/tests/test_celery.py 2017-06-29 18:06:03 +0000
+++ lib/lp/services/job/tests/test_celery.py 2019-05-01 12:33:18 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012-2017 Canonical Ltd. This software is licensed under the
+# Copyright 2012-2019 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Tests for running jobs via Celery."""
@@ -33,7 +33,10 @@
)
from lp.services.job.model.job import Job
from lp.services.job.runner import BaseRunnableJob
-from lp.services.job.tests import block_on_job
+from lp.services.job.tests import (
+ block_on_job,
+ monitor_celery,
+ )
from lp.testing import TestCaseWithFactory
from lp.testing.layers import CeleryJobLayer
@@ -202,3 +205,19 @@
]))
self.assertEqual(3, job.attempt_count)
self.assertEqual(JobStatus.COMPLETED, job.status)
+
+ def test_without_rabbitmq(self):
+ # If no RabbitMQ host is configured, the job is not run via Celery.
+ self.pushConfig('rabbitmq', host='none')
+ self.useFixture(FeatureFixture({
+ 'jobs.celery.enabled_classes': 'TestJob'
+ }))
+ with monitor_celery() as responses:
+ job = TestJob()
+ job.celeryRunOnCommit()
+ job_id = job.job_id
+ transaction.commit()
+ self.assertEqual([], responses)
+ store = IStore(Job)
+ dbjob = store.find(Job, id=job_id)[0]
+ self.assertEqual(JobStatus.WAITING, dbjob.status)
Follow ups