← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~wgrant/launchpad/celery-3.1 into lp:launchpad

 

William Grant has proposed merging lp:~wgrant/launchpad/celery-3.1 into lp:launchpad.

Commit message:
Upgrade to Celery 3.1, mostly so webhook jobs can run with a custom soft time limit. Each queue still configures its worker with a soft time limit, but that can be removed once all new jobs set it themselves.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~wgrant/launchpad/celery-3.1/+merge/266687

Upgrade to Celery 3.1, mostly so webhook jobs can run with a custom soft time limit.

Depends on lazr.jobrunner 0.13 from https://code.launchpad.net/~wgrant/lazr.jobrunner/celery-3.1/+merge/266686
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~wgrant/launchpad/celery-3.1 into lp:launchpad.
=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py	2015-07-24 06:55:21 +0000
+++ lib/lp/services/job/celeryconfig.py	2015-08-03 08:18:40 +0000
@@ -65,18 +65,25 @@
         if queue not in celery_queues:
             raise ConfigurationError(
                 'Queue %s is not configured in schema-lazr.conf' % queue)
+        # XXX wgrant 2015-08-03: This should be set in the apply_async
+        # now that we're on Celery 3.1.
         result['CELERYD_TASK_SOFT_TIME_LIMIT'] = config[queue].timeout
         if config[queue].fallback_queue != '':
+            # XXX wgrant 2015-08-03: lazr.jobrunner actually looks for
+            # FALLBACK_QUEUE; this probably isn't doing anything.
             result['FALLBACK'] = config[queue].fallback_queue
+        # XXX wgrant 2015-08-03: This is mostly per-queue because we
+        # can't run *_job and *_job_slow in the same worker, which will be
+        # fixed once the CELERYD_TASK_SOFT_TIME_LIMIT override is gone.
         result['CELERYD_CONCURRENCY'] = config[queue].concurrency
 
-    host, port = config.rabbitmq.host.split(':')
-
-    result['BROKER_HOST'] = host
-    result['BROKER_PORT'] = port
-    result['BROKER_USER'] = config.rabbitmq.userid
-    result['BROKER_PASSWORD'] = config.rabbitmq.password
-    result['BROKER_VHOST'] = config.rabbitmq.virtual_host
+    result['BROKER_URL'] = 'amqp://%s:%s@%s/%s' % (
+        config.rabbitmq.userid, config.rabbitmq.password,
+        config.rabbitmq.host, config.rabbitmq.virtual_host)
+    # XXX wgrant 2015-08-03: Celery 3.2 won't read pickles by default,
+    # and Celery 3.1 can send only pickles for some things. Let's accept
+    # both until they sort things out.
+    result['CELERY_ACCEPT_CONTENT'] = ['pickle', 'json']
     result['CELERY_CREATE_MISSING_QUEUES'] = False
     result['CELERY_DEFAULT_EXCHANGE'] = 'job'
     result['CELERY_DEFAULT_QUEUE'] = 'launchpad_job'

=== modified file 'lib/lp/services/job/celeryjob.py'
--- lib/lp/services/job/celeryjob.py	2013-06-20 05:50:00 +0000
+++ lib/lp/services/job/celeryjob.py	2015-08-03 08:18:40 +0000
@@ -114,8 +114,8 @@
             info('Scheduled %d missing jobs.', count)
             transaction.commit()
 
-    def apply_async(self, args=None, kwargs=None, task_id=None, publisher=None,
-                    connection=None, router=None, queues=None, **options):
+    def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
+                    link=None, link_error=None, **options):
         """Create a task_id if none is specified.
 
         Override the quite generic default task_id with one containing
@@ -126,8 +126,7 @@
         if task_id is None:
             task_id = '%s_%s' % (self.__class__.__name__, uuid4())
         return super(RunMissingReady, self).apply_async(
-            args, kwargs, task_id, publisher, connection, router, queues,
-            **options)
+            args, kwargs, task_id, producer, link, link_error, **options)
 
 
 needs_zcml = True

=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2015-07-24 09:54:34 +0000
+++ lib/lp/services/job/runner.py	2015-08-03 08:18:40 +0000
@@ -110,6 +110,7 @@
     celery_responses = None
 
     retry_delay = timedelta(minutes=10)
+    soft_time_limit = timedelta(minutes=5)
 
     # We redefine __eq__ and __ne__ here to prevent the security proxy
     # from mucking up our comparisons in tests and elsewhere.
@@ -235,6 +236,7 @@
             eta = self.job.lease_expires
         return cls.apply_async(
             (ujob_id, self.config.dbuser), queue=self.task_queue, eta=eta,
+            soft_time_limit=self.soft_time_limit.total_seconds(),
             task_id=self.taskId())
 
     def getDBClass(self):

=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
--- lib/lp/services/job/tests/test_celery_configuration.py	2013-05-15 04:41:33 +0000
+++ lib/lp/services/job/tests/test_celery_configuration.py	2015-08-03 08:18:40 +0000
@@ -2,6 +2,7 @@
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 from contextlib import contextmanager
+from testtools.matchers import MatchesRegex
 
 from lp.services.config import config
 from lp.testing import TestCase
@@ -33,13 +34,10 @@
         for name in queue_names:
             self.assertEqual(name, queues[name]['binding_key'])
 
-        self.assertEqual('localhost', config['BROKER_HOST'])
-        # BROKER_PORT changes between test runs, so just check that it
-        # is defined.
-        self.assertTrue('BROKER_PORT' in config)
-        self.assertEqual('guest', config['BROKER_USER'])
-        self.assertEqual('guest', config['BROKER_PASSWORD'])
-        self.assertEqual('/', config['BROKER_VHOST'])
+        # The port changes between test runs.
+        self.assertThat(
+            config['BROKER_URL'],
+            MatchesRegex('^amqp://guest:guest@localhost:\\d+//'))
         self.assertFalse(config['CELERY_CREATE_MISSING_QUEUES'])
         self.assertEqual('job', config['CELERY_DEFAULT_EXCHANGE'])
         self.assertEqual('launchpad_job', config['CELERY_DEFAULT_QUEUE'])

=== modified file 'versions.cfg'
--- versions.cfg	2015-07-23 11:23:15 +0000
+++ versions.cfg	2015-08-03 08:18:40 +0000
@@ -8,6 +8,7 @@
 # Alphabetical, case-insensitive, please! :-)
 
 ampoule = 0.2.0
+amqp = 1.4.6
 amqplib = 1.0.2
 anyjson = 0.3.3
 argparse = 1.2.1
@@ -15,9 +16,10 @@
 auditorclient = 0.0.4
 auditorfixture = 0.0.5
 BeautifulSoup = 3.2.1
+billiard = 3.3.0.20
 bson = 0.3.3
 bzr = 2.6.0.lp.2
-celery = 2.5.1
+celery = 3.1.18
 Chameleon = 2.11
 cssselect = 0.9.1
 cssutils = 0.9.10
@@ -43,14 +45,14 @@
 iso8601 = 0.1.4
 jsautobuild = 0.2
 keyring = 0.6.2
-kombu = 2.1.1
+kombu = 3.0.26
 launchpadlib = 1.10.2
 lazr.authentication = 0.1.1
 lazr.batchnavigator = 1.2.11
 lazr.config = 1.1.3
 lazr.delegates = 2.0.3
 lazr.enum = 1.1.3
-lazr.jobrunner = 0.12
+lazr.jobrunner = 0.13
 lazr.lifecycle = 1.1
 lazr.restful = 0.19.10
 lazr.restfulclient = 0.13.2


Follow ups