launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #27065
[Merge] ~twom/launchpad:tasks-some-speedy-some-slow into launchpad:master
Tom Wardill has proposed merging ~twom/launchpad:tasks-some-speedy-some-slow into launchpad:master.
Commit message:
Enable slow lane fallback in celery
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~twom/launchpad/+git/launchpad/+merge/402919
* Use the correct key
* Don't re-queue the same task _and_ the slow lane task
* Add a test to ensure we're queueing correctly
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~twom/launchpad:tasks-some-speedy-some-slow into launchpad:master.
diff --git a/lib/lp/services/job/celeryconfig.py b/lib/lp/services/job/celeryconfig.py
index dcc931e..aee9919 100644
--- a/lib/lp/services/job/celeryconfig.py
+++ b/lib/lp/services/job/celeryconfig.py
@@ -72,9 +72,7 @@ def configure(argv):
# now that we're on Celery 3.1.
result['task_soft_time_limit'] = config[queue].timeout
if config[queue].fallback_queue != '':
- # XXX wgrant 2015-08-03: lazr.jobrunner actually looks for
- # FALLBACK_QUEUE; this probably isn't doing anything.
- result['FALLBACK'] = config[queue].fallback_queue
+ result['FALLBACK_QUEUE'] = config[queue].fallback_queue
# XXX wgrant 2015-08-03: This is mostly per-queue because we
# can't run *_job and *_job_slow in the same worker, which will be
# fixed once the CELERYD_TASK_SOFT_TIME_LIMIT override is gone.
diff --git a/lib/lp/services/job/celeryjob.py b/lib/lp/services/job/celeryjob.py
index 793330a..b4ae220 100644
--- a/lib/lp/services/job/celeryjob.py
+++ b/lib/lp/services/job/celeryjob.py
@@ -67,9 +67,13 @@ class CeleryRunJob(RunJob):
:param dbuser: The database user to run under. This should match the
dbuser specified by the job's config.
"""
+ self.dbuser = dbuser
task_init(dbuser)
super(CeleryRunJob, self).run(job_id)
+ def reQueue(self, job_id, fallback_queue):
+ self.apply_async(args=(job_id, self.dbuser), queue=fallback_queue)
+
@celery_app.task(base=CeleryRunJob, bind=True)
def celery_run_job(self, job_id, dbuser):
diff --git a/lib/lp/services/job/runner.py b/lib/lp/services/job/runner.py
index 1e8b0e6..2ec4506 100644
--- a/lib/lp/services/job/runner.py
+++ b/lib/lp/services/job/runner.py
@@ -321,9 +321,15 @@ class BaseRunnableJob(BaseRunnableJobSource):
"""See `IJob`."""
if self.job.attempt_count > 0:
self.job.scheduled_start = datetime.now(utc) + self.retry_delay
+ # If we're aborting the transaction, we probably don't want to
+ # start the task again
+ if manage_transaction and abort_transaction:
+ commit_hook = None
+ else:
+ commit_hook = self.celeryRunOnCommit
self.job.queue(
manage_transaction, abort_transaction,
- add_commit_hook=self.celeryRunOnCommit)
+ add_commit_hook=commit_hook)
def start(self, manage_transaction=False):
"""See `IJob`."""
diff --git a/lib/lp/services/job/tests/test_celery.py b/lib/lp/services/job/tests/test_celery.py
index 5db8436..a58d709 100644
--- a/lib/lp/services/job/tests/test_celery.py
+++ b/lib/lp/services/job/tests/test_celery.py
@@ -12,6 +12,7 @@ from time import sleep
import iso8601
from lazr.delegates import delegate_to
+from lazr.jobrunner.celerytask import drain_queues
from pytz import UTC
from testtools.matchers import (
GreaterThan,
@@ -35,6 +36,7 @@ from lp.services.job.model.job import Job
from lp.services.job.runner import BaseRunnableJob
from lp.services.job.tests import (
block_on_job,
+ drain_celery_queues,
monitor_celery,
)
from lp.testing import TestCaseWithFactory
@@ -221,3 +223,51 @@ class TestJobsViaCelery(TestCaseWithFactory):
store = IStore(Job)
dbjob = store.find(Job, id=job_id)[0]
self.assertEqual(JobStatus.WAITING, dbjob.status)
+
+
+class TestTimeoutJob(TestJob):
+
+ def storeDateStarted(self):
+ existing = self.job.base_json_data or {}
+ existing.setdefault('dates_started', [])
+ existing['dates_started'].append(self.job.date_started.isoformat())
+ self.job.base_json_data = existing
+
+ def run(self):
+ """Concoct various retry scenarios."""
+
+ if self.job.attempt_count == 1:
+ from celery.exceptions import SoftTimeLimitExceeded
+ raise SoftTimeLimitExceeded
+
+
+class TestCeleryLaneFallback(TestCaseWithFactory):
+
+ layer = CeleryJobLayer
+
+ def test_fallback_to_slow_lane(self):
+
+ from lp.services.job.celeryjob import celery_app
+
+ self.useFixture(FeatureFixture({
+ 'jobs.celery.enabled_classes': 'TestTimeoutJob'
+ }))
+
+ with block_on_job(self):
+ job = TestTimeoutJob()
+ job.celeryRunOnCommit()
+ transaction.commit()
+
+ from unittest import mock
+ message_drain = mock.Mock()
+
+ drain_queues(
+ celery_app,
+ ['launchpad_job', 'launchpad_job_slow'], callbacks=[message_drain])
+
+ self.assertEqual(1, job.attempt_count)
+ self.assertEqual(1, message_drain.call_count)
+ self.assertEqual(
+ 'launchpad_job_slow',
+ message_drain.call_args[0][1].delivery_info['routing_key'])
+