← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~twom/launchpad:tasks-some-speedy-some-slow into launchpad:master

 

Tom Wardill has proposed merging ~twom/launchpad:tasks-some-speedy-some-slow into launchpad:master.

Commit message:
Enable slow lane fallback in celery

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~twom/launchpad/+git/launchpad/+merge/402919

* Use the correct key
* Don't re-queue the same task _and_ the slow lane task
* Add a test to ensure we're queueing correctly
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~twom/launchpad:tasks-some-speedy-some-slow into launchpad:master.
diff --git a/lib/lp/services/job/celeryconfig.py b/lib/lp/services/job/celeryconfig.py
index dcc931e..aee9919 100644
--- a/lib/lp/services/job/celeryconfig.py
+++ b/lib/lp/services/job/celeryconfig.py
@@ -72,9 +72,7 @@ def configure(argv):
         # now that we're on Celery 3.1.
         result['task_soft_time_limit'] = config[queue].timeout
         if config[queue].fallback_queue != '':
-            # XXX wgrant 2015-08-03: lazr.jobrunner actually looks for
-            # FALLBACK_QUEUE; this probably isn't doing anything.
-            result['FALLBACK'] = config[queue].fallback_queue
+            result['FALLBACK_QUEUE'] = config[queue].fallback_queue
         # XXX wgrant 2015-08-03: This is mostly per-queue because we
         # can't run *_job and *_job_slow in the same worker, which will be
         # fixed once the CELERYD_TASK_SOFT_TIME_LIMIT override is gone.
diff --git a/lib/lp/services/job/celeryjob.py b/lib/lp/services/job/celeryjob.py
index 793330a..b4ae220 100644
--- a/lib/lp/services/job/celeryjob.py
+++ b/lib/lp/services/job/celeryjob.py
@@ -67,9 +67,13 @@ class CeleryRunJob(RunJob):
         :param dbuser: The database user to run under.  This should match the
             dbuser specified by the job's config.
         """
+        self.dbuser = dbuser
         task_init(dbuser)
         super(CeleryRunJob, self).run(job_id)
 
+    def reQueue(self, job_id, fallback_queue):
+        self.apply_async(args=(job_id, self.dbuser), queue=fallback_queue)
+
 
 @celery_app.task(base=CeleryRunJob, bind=True)
 def celery_run_job(self, job_id, dbuser):
diff --git a/lib/lp/services/job/runner.py b/lib/lp/services/job/runner.py
index 1e8b0e6..2ec4506 100644
--- a/lib/lp/services/job/runner.py
+++ b/lib/lp/services/job/runner.py
@@ -321,9 +321,15 @@ class BaseRunnableJob(BaseRunnableJobSource):
         """See `IJob`."""
         if self.job.attempt_count > 0:
             self.job.scheduled_start = datetime.now(utc) + self.retry_delay
+        # If we're aborting the transaction, we probably don't want to
+        # start the task again
+        if manage_transaction and abort_transaction:
+            commit_hook = None
+        else:
+            commit_hook = self.celeryRunOnCommit
         self.job.queue(
             manage_transaction, abort_transaction,
-            add_commit_hook=self.celeryRunOnCommit)
+            add_commit_hook=commit_hook)
 
     def start(self, manage_transaction=False):
         """See `IJob`."""
diff --git a/lib/lp/services/job/tests/test_celery.py b/lib/lp/services/job/tests/test_celery.py
index 5db8436..a58d709 100644
--- a/lib/lp/services/job/tests/test_celery.py
+++ b/lib/lp/services/job/tests/test_celery.py
@@ -12,6 +12,7 @@ from time import sleep
 
 import iso8601
 from lazr.delegates import delegate_to
+from lazr.jobrunner.celerytask import drain_queues
 from pytz import UTC
 from testtools.matchers import (
     GreaterThan,
@@ -35,6 +36,7 @@ from lp.services.job.model.job import Job
 from lp.services.job.runner import BaseRunnableJob
 from lp.services.job.tests import (
     block_on_job,
+    drain_celery_queues,
     monitor_celery,
     )
 from lp.testing import TestCaseWithFactory
@@ -221,3 +223,51 @@ class TestJobsViaCelery(TestCaseWithFactory):
         store = IStore(Job)
         dbjob = store.find(Job, id=job_id)[0]
         self.assertEqual(JobStatus.WAITING, dbjob.status)
+
+
+class TestTimeoutJob(TestJob):
+
+    def storeDateStarted(self):
+        existing = self.job.base_json_data or {}
+        existing.setdefault('dates_started', [])
+        existing['dates_started'].append(self.job.date_started.isoformat())
+        self.job.base_json_data = existing
+
+    def run(self):
+        """Concoct various retry scenarios."""
+
+        if self.job.attempt_count == 1:
+            from celery.exceptions import SoftTimeLimitExceeded
+            raise SoftTimeLimitExceeded
+
+
+class TestCeleryLaneFallback(TestCaseWithFactory):
+
+    layer = CeleryJobLayer
+
+    def test_fallback_to_slow_lane(self):
+
+        from lp.services.job.celeryjob import celery_app
+
+        self.useFixture(FeatureFixture({
+            'jobs.celery.enabled_classes': 'TestTimeoutJob'
+        }))
+
+        with block_on_job(self):
+            job = TestTimeoutJob()
+            job.celeryRunOnCommit()
+            transaction.commit()
+
+        from unittest import mock
+        message_drain = mock.Mock()
+
+        drain_queues(
+            celery_app,
+            ['launchpad_job', 'launchpad_job_slow'], callbacks=[message_drain])
+
+        self.assertEqual(1, job.attempt_count)
+        self.assertEqual(1, message_drain.call_count)
+        self.assertEqual(
+            'launchpad_job_slow',
+            message_drain.call_args[0][1].delivery_info['routing_key'])
+