launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #19118
[Merge] lp:~wgrant/launchpad/celery-3.1 into lp:launchpad
William Grant has proposed merging lp:~wgrant/launchpad/celery-3.1 into lp:launchpad.
Commit message:
Upgrade to Celery 3.1, mostly so webhook jobs can run with a custom soft time limit. Each queue still configures its worker with a soft time limit, but that can be removed once all new jobs set it themselves.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~wgrant/launchpad/celery-3.1/+merge/266687
Upgrade to Celery 3.1, mostly so webhook jobs can run with a custom soft time limit.
Depends on lazr.jobrunner 0.13 from https://code.launchpad.net/~wgrant/lazr.jobrunner/celery-3.1/+merge/266686
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~wgrant/launchpad/celery-3.1 into lp:launchpad.
=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py 2015-07-24 06:55:21 +0000
+++ lib/lp/services/job/celeryconfig.py 2015-08-03 08:18:40 +0000
@@ -65,18 +65,25 @@
if queue not in celery_queues:
raise ConfigurationError(
'Queue %s is not configured in schema-lazr.conf' % queue)
+ # XXX wgrant 2015-08-03: This should be set in the apply_async
+ # now that we're on Celery 3.1.
result['CELERYD_TASK_SOFT_TIME_LIMIT'] = config[queue].timeout
if config[queue].fallback_queue != '':
+ # XXX wgrant 2015-08-03: lazr.jobrunner actually looks for
+ # FALLBACK_QUEUE; this probably isn't doing anything.
result['FALLBACK'] = config[queue].fallback_queue
+ # XXX wgrant 2015-08-03: This is mostly per-queue because we
+ # can't run *_job and *_job_slow in the same worker, which will be
+ # fixed once the CELERYD_TASK_SOFT_TIME_LIMIT override is gone.
result['CELERYD_CONCURRENCY'] = config[queue].concurrency
- host, port = config.rabbitmq.host.split(':')
-
- result['BROKER_HOST'] = host
- result['BROKER_PORT'] = port
- result['BROKER_USER'] = config.rabbitmq.userid
- result['BROKER_PASSWORD'] = config.rabbitmq.password
- result['BROKER_VHOST'] = config.rabbitmq.virtual_host
+ result['BROKER_URL'] = 'amqp://%s:%s@%s/%s' % (
+ config.rabbitmq.userid, config.rabbitmq.password,
+ config.rabbitmq.host, config.rabbitmq.virtual_host)
+ # XXX wgrant 2015-08-03: Celery 3.2 won't read pickles by default,
+ # and Celery 3.1 can send only pickles for some things. Let's accept
+ # both until they sort things out.
+ result['CELERY_ACCEPT_CONTENT'] = ['pickle', 'json']
result['CELERY_CREATE_MISSING_QUEUES'] = False
result['CELERY_DEFAULT_EXCHANGE'] = 'job'
result['CELERY_DEFAULT_QUEUE'] = 'launchpad_job'
=== modified file 'lib/lp/services/job/celeryjob.py'
--- lib/lp/services/job/celeryjob.py 2013-06-20 05:50:00 +0000
+++ lib/lp/services/job/celeryjob.py 2015-08-03 08:18:40 +0000
@@ -114,8 +114,8 @@
info('Scheduled %d missing jobs.', count)
transaction.commit()
- def apply_async(self, args=None, kwargs=None, task_id=None, publisher=None,
- connection=None, router=None, queues=None, **options):
+ def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
+ link=None, link_error=None, **options):
"""Create a task_id if none is specified.
Override the quite generic default task_id with one containing
@@ -126,8 +126,7 @@
if task_id is None:
task_id = '%s_%s' % (self.__class__.__name__, uuid4())
return super(RunMissingReady, self).apply_async(
- args, kwargs, task_id, publisher, connection, router, queues,
- **options)
+ args, kwargs, task_id, producer, link, link_error, **options)
needs_zcml = True
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2015-07-24 09:54:34 +0000
+++ lib/lp/services/job/runner.py 2015-08-03 08:18:40 +0000
@@ -110,6 +110,7 @@
celery_responses = None
retry_delay = timedelta(minutes=10)
+ soft_time_limit = timedelta(minutes=5)
# We redefine __eq__ and __ne__ here to prevent the security proxy
# from mucking up our comparisons in tests and elsewhere.
@@ -235,6 +236,7 @@
eta = self.job.lease_expires
return cls.apply_async(
(ujob_id, self.config.dbuser), queue=self.task_queue, eta=eta,
+ soft_time_limit=self.soft_time_limit.total_seconds(),
task_id=self.taskId())
def getDBClass(self):
=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
--- lib/lp/services/job/tests/test_celery_configuration.py 2013-05-15 04:41:33 +0000
+++ lib/lp/services/job/tests/test_celery_configuration.py 2015-08-03 08:18:40 +0000
@@ -2,6 +2,7 @@
# GNU Affero General Public License version 3 (see the file LICENSE).
from contextlib import contextmanager
+from testtools.matchers import MatchesRegex
from lp.services.config import config
from lp.testing import TestCase
@@ -33,13 +34,10 @@
for name in queue_names:
self.assertEqual(name, queues[name]['binding_key'])
- self.assertEqual('localhost', config['BROKER_HOST'])
- # BROKER_PORT changes between test runs, so just check that it
- # is defined.
- self.assertTrue('BROKER_PORT' in config)
- self.assertEqual('guest', config['BROKER_USER'])
- self.assertEqual('guest', config['BROKER_PASSWORD'])
- self.assertEqual('/', config['BROKER_VHOST'])
+ # The port changes between test runs.
+ self.assertThat(
+ config['BROKER_URL'],
+ MatchesRegex('^amqp://guest:guest@localhost:\\d+//'))
self.assertFalse(config['CELERY_CREATE_MISSING_QUEUES'])
self.assertEqual('job', config['CELERY_DEFAULT_EXCHANGE'])
self.assertEqual('launchpad_job', config['CELERY_DEFAULT_QUEUE'])
=== modified file 'versions.cfg'
--- versions.cfg 2015-07-23 11:23:15 +0000
+++ versions.cfg 2015-08-03 08:18:40 +0000
@@ -8,6 +8,7 @@
# Alphabetical, case-insensitive, please! :-)
ampoule = 0.2.0
+amqp = 1.4.6
amqplib = 1.0.2
anyjson = 0.3.3
argparse = 1.2.1
@@ -15,9 +16,10 @@
auditorclient = 0.0.4
auditorfixture = 0.0.5
BeautifulSoup = 3.2.1
+billiard = 3.3.0.20
bson = 0.3.3
bzr = 2.6.0.lp.2
-celery = 2.5.1
+celery = 3.1.18
Chameleon = 2.11
cssselect = 0.9.1
cssutils = 0.9.10
@@ -43,14 +45,14 @@
iso8601 = 0.1.4
jsautobuild = 0.2
keyring = 0.6.2
-kombu = 2.1.1
+kombu = 3.0.26
launchpadlib = 1.10.2
lazr.authentication = 0.1.1
lazr.batchnavigator = 1.2.11
lazr.config = 1.1.3
lazr.delegates = 2.0.3
lazr.enum = 1.1.3
-lazr.jobrunner = 0.12
+lazr.jobrunner = 0.13
lazr.lifecycle = 1.1
lazr.restful = 0.19.10
lazr.restfulclient = 0.13.2
Follow ups