launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #23349
[Merge] lp:~cjwatson/lazr.jobrunner/celery-4 into lp:lazr.jobrunner
Colin Watson has proposed merging lp:~cjwatson/lazr.jobrunner/celery-4 into lp:lazr.jobrunner with lp:~cjwatson/lazr.jobrunner/oops-0.0.11 as a prerequisite.
Commit message:
Support celery >= 4.0.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/lazr.jobrunner/celery-4/+merge/364032
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/lazr.jobrunner/celery-4 into lp:lazr.jobrunner.
=== modified file 'NEWS.txt'
--- NEWS.txt 2019-03-06 12:53:46 +0000
+++ NEWS.txt 2019-03-06 12:53:46 +0000
@@ -5,6 +5,7 @@
----------
* Add tox testing support.
* Support and require oops >= 0.0.11.
+* Support celery >= 4.0.
0.13
----
=== modified file 'setup.py'
--- setup.py 2019-03-06 12:53:46 +0000
+++ setup.py 2019-03-06 12:53:46 +0000
@@ -28,7 +28,7 @@
# List your project dependencies here.
# For more details, see:
# http://packages.python.org/distribute/setuptools.html#declaring-dependencies
- 'celery>=3.0',
+ 'celery>=3.0,<5.0',
]
tests_require = [
=== modified file 'src/lazr/jobrunner/celeryconfig.py'
--- src/lazr/jobrunner/celeryconfig.py 2012-05-10 14:18:11 +0000
+++ src/lazr/jobrunner/celeryconfig.py 2019-03-06 12:53:46 +0000
@@ -1,8 +1,3 @@
-#BROKER_PORT = 5672
-#BROKER_USER = "guest"
-#BROKER_PASSWORD = "guest"
-
-BROKER_VHOST = "/"
+BROKER_URL = "amqp://"
CELERY_RESULT_BACKEND = "amqp"
CELERY_IMPORTS = ("lazr.jobrunner.jobrunner", )
-CELERYD_LOG_LEVEL = 'INFO'
=== modified file 'src/lazr/jobrunner/celerytask.py'
--- src/lazr/jobrunner/celerytask.py 2015-08-03 05:44:51 +0000
+++ src/lazr/jobrunner/celerytask.py 2019-03-06 12:53:46 +0000
@@ -77,7 +77,13 @@
listings = []
def add_listing(body, message):
- listings.append((body['task'], body['args']))
+ try:
+ # celery >= 4.0.0
+ listings.append((
+ message.properties['application_headers']['task'],
+ tuple(body[0])))
+ except (AttributeError, KeyError):
+ listings.append((body['task'], body['args']))
drain_queues(app, queue_names, callbacks=[add_listing], retain=True)
return listings
=== modified file 'src/lazr/jobrunner/tests/config_no_prefetch.py'
--- src/lazr/jobrunner/tests/config_no_prefetch.py 2012-05-09 11:50:26 +0000
+++ src/lazr/jobrunner/tests/config_no_prefetch.py 2019-03-06 12:53:46 +0000
@@ -1,4 +1,4 @@
-BROKER_VHOST = "/"
+BROKER_URL = "amqp://"
CELERY_RESULT_BACKEND = "amqp"
CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
CELERYD_CONCURRENCY = 1
=== modified file 'src/lazr/jobrunner/tests/config_two_queues.py'
--- src/lazr/jobrunner/tests/config_two_queues.py 2012-04-10 12:44:00 +0000
+++ src/lazr/jobrunner/tests/config_two_queues.py 2019-03-06 12:53:46 +0000
@@ -1,10 +1,10 @@
-BROKER_VHOST = "/"
+BROKER_URL = "amqp://"
CELERY_RESULT_BACKEND = "amqp"
CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
CELERYD_CONCURRENCY = 1
CELERY_QUEUES = {
- "standard": {"binding_key": "job.standard"},
- "standard_slow": {"binding_key": "job.standard.slow"},
+ "standard": {"routing_key": "job.standard"},
+ "standard_slow": {"routing_key": "job.standard.slow"},
}
CELERY_DEFAULT_EXCHANGE = "standard"
CELERY_DEFAULT_QUEUE = "standard"
=== modified file 'src/lazr/jobrunner/tests/simple_config.py'
--- src/lazr/jobrunner/tests/simple_config.py 2012-07-09 10:58:00 +0000
+++ src/lazr/jobrunner/tests/simple_config.py 2019-03-06 12:53:46 +0000
@@ -1,4 +1,4 @@
-BROKER_VHOST = "/"
+BROKER_URL = "amqp://"
CELERY_RESULT_BACKEND = "amqp"
CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
CELERYD_CONCURRENCY = 1
=== modified file 'src/lazr/jobrunner/tests/test_celerytask.py'
--- src/lazr/jobrunner/tests/test_celerytask.py 2019-03-06 12:53:46 +0000
+++ src/lazr/jobrunner/tests/test_celerytask.py 2019-03-06 12:53:46 +0000
@@ -60,17 +60,22 @@
@contextlib.contextmanager
def running(cmd_name, cmd_args, env=None, cwd=None):
- proc = subprocess.Popen((cmd_name,) + cmd_args, env=env,
- stderr=subprocess.PIPE, cwd=cwd)
- try:
- yield proc
- finally:
- proc.terminate()
- proc.wait()
+ with open("/dev/null", "w") as devnull:
+ proc = subprocess.Popen((cmd_name,) + cmd_args, env=env,
+ stdout=devnull, stderr=subprocess.PIPE,
+ cwd=cwd)
+ try:
+ yield proc
+ finally:
+ proc.terminate()
+ proc.wait()
def celery_worker(config_module, file_job_dir, queue='celery'):
- cmd_args = ('worker', '--config', config_module, '--queue', queue)
+ cmd_args = (
+ 'worker', '--config', config_module, '--queue', queue,
+ '--loglevel', 'INFO',
+ )
environ = dict(os.environ)
environ['FILE_JOB_DIR'] = file_job_dir
return running('celery', cmd_args, environ, cwd=get_root())
=== modified file 'src/lazr/jobrunner/tests/test_jobrunnerctl.py'
--- src/lazr/jobrunner/tests/test_jobrunnerctl.py 2015-08-03 05:28:51 +0000
+++ src/lazr/jobrunner/tests/test_jobrunnerctl.py 2019-03-06 12:53:46 +0000
@@ -60,6 +60,41 @@
job.save()
return RunFileJob.apply_async(args=(job_id, ), eta=eta)
+ def getpids(self, control, argv):
+ if getattr(control, 'cluster_from_argv', None) is not None:
+ # celery >= 4.0.0
+ cluster = control.cluster_from_argv(argv)
+ return [node.pid for node in cluster.getpids()]
+ else:
+ parser = NamespacedOptionParser(argv)
+ return [info[2] for info in control.getpids(parser, 'celery')]
+
+ def start(self, control, argv):
+ if getattr(control, 'Cluster', None) is not None:
+ # celery >= 4.0.0
+ control.start(*argv)
+ else:
+ control.start(argv, 'celery')
+
+ def kill(self, control, argv):
+ if getattr(control, 'Cluster', None) is not None:
+ # celery >= 4.0.0
+ control.kill(*argv)
+ else:
+ control.kill(argv, 'celery')
+
+ def node_alive(self, control, argv, pid):
+ if getattr(control, 'Cluster', None) is not None:
+ # celery >= 4.0.0
+ cluster = control.cluster_from_argv(argv)
+ for node in cluster:
+ if node.pid == pid:
+ return node.alive()
+ else:
+ return False
+ else:
+ return control.node_alive(pid)
+
def test_JobRunnerCtl_starts_stops_celery_worker(self):
with tempdir() as temp_dir:
config = 'lazr.jobrunner.tests.config_no_prefetch'
@@ -68,21 +103,19 @@
'worker', '--config=%s' % config, 'node_name', '-Q:node_name',
'celery',
]
- parser = NamespacedOptionParser(argv)
# We may have a stale PID file.
- old_pids = [info[2] for info in control.getpids(parser, 'celery')]
- control.start(argv, 'celery')
+ old_pids = self.getpids(control, argv)
+ self.start(control, argv)
sleep(1)
- current_pids = [
- info[2] for info in control.getpids(parser, 'celery')]
+ current_pids = self.getpids(control, argv)
self.assertTrue(len(current_pids) > 0)
self.assertNotEqual(old_pids, current_pids)
for pid in current_pids:
- self.assertTrue(control.node_alive(pid))
- control.kill(argv, 'celery')
+ self.assertTrue(self.node_alive(control, argv, pid))
+ self.kill(control, argv)
sleep(1)
for pid in current_pids:
- self.assertFalse(control.node_alive(pid))
+ self.assertFalse(self.node_alive(control, argv, pid))
def test_JobRunnerCtl_kill_does_not_lose_jobs(self):
# If a celeryd instance is killed while it executes a task
@@ -99,13 +132,13 @@
'--config=%s' % config, 'node_name', '-Q:node_name', 'celery',
'-c:node_name', '1',
]
- control.start(argv, 'celery')
- control.kill(argv, 'celery')
+ self.start(control, argv)
+ self.kill(control, argv)
sleep(1)
- control.start(argv, 'celery')
+ self.start(control, argv)
for job in all_jobs:
job.wait(10)
- control.kill(argv, 'celery')
+ self.kill(control, argv)
def test_JobRunnerCtl_kill_does_not_lose_jobs_with_eta(self):
with tempdir() as temp_dir:
@@ -120,11 +153,11 @@
'--config=%s' % config, 'node_name', '-Q:node_name', 'celery',
'-c:node_name', '1',
]
- control.start(argv, 'celery')
- sleep(1)
- control.kill(argv, 'celery')
- sleep(1)
- control.start(argv, 'celery')
+ self.start(control, argv)
+ sleep(1)
+ self.kill(control, argv)
+ sleep(1)
+ self.start(control, argv)
for job in all_jobs:
job.wait(10)
- control.kill(argv, 'celery')
+ self.kill(control, argv)
=== modified file 'src/lazr/jobrunner/tests/time_limit_config.py'
--- src/lazr/jobrunner/tests/time_limit_config.py 2012-03-21 20:38:50 +0000
+++ src/lazr/jobrunner/tests/time_limit_config.py 2019-03-06 12:53:46 +0000
@@ -1,4 +1,4 @@
-BROKER_VHOST = "/"
+BROKER_URL = "amqp://"
CELERY_RESULT_BACKEND = "amqp"
CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
CELERYD_CONCURRENCY = 1
=== modified file 'tox.ini'
--- tox.ini 2019-03-06 12:53:46 +0000
+++ tox.ini 2019-03-06 12:53:46 +0000
@@ -1,6 +1,6 @@
[tox]
envlist =
- py27-celery31
+ py27-celery{31,40,41,42}
[testenv]
commands =
@@ -9,3 +9,8 @@
.[test]
zope.testrunner
celery31: celery>=3.1,<4.0
+ celery40: celery>=4.0,<4.1
+ # https://github.com/celery/kombu/issues/870
+ celery40: kombu<4.2
+ celery41: celery>=4.1,<4.2
+ celery42: celery>=4.2,<4.3