← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/lazr.jobrunner/celery-4 into lp:lazr.jobrunner

 

Colin Watson has proposed merging lp:~cjwatson/lazr.jobrunner/celery-4 into lp:lazr.jobrunner with lp:~cjwatson/lazr.jobrunner/oops-0.0.11 as a prerequisite.

Commit message:
Support celery >= 4.0.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/lazr.jobrunner/celery-4/+merge/364032
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/lazr.jobrunner/celery-4 into lp:lazr.jobrunner.
=== modified file 'NEWS.txt'
--- NEWS.txt	2019-03-06 12:53:46 +0000
+++ NEWS.txt	2019-03-06 12:53:46 +0000
@@ -5,6 +5,7 @@
 ----------
 * Add tox testing support.
 * Support and require oops >= 0.0.11.
+* Support celery >= 4.0.
 
 0.13
 ----

=== modified file 'setup.py'
--- setup.py	2019-03-06 12:53:46 +0000
+++ setup.py	2019-03-06 12:53:46 +0000
@@ -28,7 +28,7 @@
     # List your project dependencies here.
     # For more details, see:
     # http://packages.python.org/distribute/setuptools.html#declaring-dependencies
-    'celery>=3.0',
+    'celery>=3.0,<5.0',
 ]
 
 tests_require = [

=== modified file 'src/lazr/jobrunner/celeryconfig.py'
--- src/lazr/jobrunner/celeryconfig.py	2012-05-10 14:18:11 +0000
+++ src/lazr/jobrunner/celeryconfig.py	2019-03-06 12:53:46 +0000
@@ -1,8 +1,3 @@
-#BROKER_PORT = 5672
-#BROKER_USER = "guest"
-#BROKER_PASSWORD = "guest"
-
-BROKER_VHOST = "/"
+BROKER_URL = "amqp://"
 CELERY_RESULT_BACKEND = "amqp"
 CELERY_IMPORTS = ("lazr.jobrunner.jobrunner", )
-CELERYD_LOG_LEVEL = 'INFO'

=== modified file 'src/lazr/jobrunner/celerytask.py'
--- src/lazr/jobrunner/celerytask.py	2015-08-03 05:44:51 +0000
+++ src/lazr/jobrunner/celerytask.py	2019-03-06 12:53:46 +0000
@@ -77,7 +77,13 @@
     listings = []
 
     def add_listing(body, message):
-        listings.append((body['task'], body['args']))
+        try:
+            # celery >= 4.0.0
+            listings.append((
+                message.properties['application_headers']['task'],
+                tuple(body[0])))
+        except (AttributeError, KeyError):
+            listings.append((body['task'], body['args']))
 
     drain_queues(app, queue_names, callbacks=[add_listing], retain=True)
     return listings

=== modified file 'src/lazr/jobrunner/tests/config_no_prefetch.py'
--- src/lazr/jobrunner/tests/config_no_prefetch.py	2012-05-09 11:50:26 +0000
+++ src/lazr/jobrunner/tests/config_no_prefetch.py	2019-03-06 12:53:46 +0000
@@ -1,4 +1,4 @@
-BROKER_VHOST = "/"
+BROKER_URL = "amqp://"
 CELERY_RESULT_BACKEND = "amqp"
 CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
 CELERYD_CONCURRENCY = 1

=== modified file 'src/lazr/jobrunner/tests/config_two_queues.py'
--- src/lazr/jobrunner/tests/config_two_queues.py	2012-04-10 12:44:00 +0000
+++ src/lazr/jobrunner/tests/config_two_queues.py	2019-03-06 12:53:46 +0000
@@ -1,10 +1,10 @@
-BROKER_VHOST = "/"
+BROKER_URL = "amqp://"
 CELERY_RESULT_BACKEND = "amqp"
 CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
 CELERYD_CONCURRENCY = 1
 CELERY_QUEUES = {
-    "standard": {"binding_key": "job.standard"},
-    "standard_slow": {"binding_key": "job.standard.slow"},
+    "standard": {"routing_key": "job.standard"},
+    "standard_slow": {"routing_key": "job.standard.slow"},
     }
 CELERY_DEFAULT_EXCHANGE = "standard"
 CELERY_DEFAULT_QUEUE = "standard"

=== modified file 'src/lazr/jobrunner/tests/simple_config.py'
--- src/lazr/jobrunner/tests/simple_config.py	2012-07-09 10:58:00 +0000
+++ src/lazr/jobrunner/tests/simple_config.py	2019-03-06 12:53:46 +0000
@@ -1,4 +1,4 @@
-BROKER_VHOST = "/"
+BROKER_URL = "amqp://"
 CELERY_RESULT_BACKEND = "amqp"
 CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
 CELERYD_CONCURRENCY = 1

=== modified file 'src/lazr/jobrunner/tests/test_celerytask.py'
--- src/lazr/jobrunner/tests/test_celerytask.py	2019-03-06 12:53:46 +0000
+++ src/lazr/jobrunner/tests/test_celerytask.py	2019-03-06 12:53:46 +0000
@@ -60,17 +60,22 @@
 
 @contextlib.contextmanager
 def running(cmd_name, cmd_args, env=None, cwd=None):
-    proc = subprocess.Popen((cmd_name,) + cmd_args, env=env,
-                            stderr=subprocess.PIPE, cwd=cwd)
-    try:
-        yield proc
-    finally:
-        proc.terminate()
-        proc.wait()
+    with open("/dev/null", "w") as devnull:
+        proc = subprocess.Popen((cmd_name,) + cmd_args, env=env,
+                                stdout=devnull, stderr=subprocess.PIPE,
+                                cwd=cwd)
+        try:
+            yield proc
+        finally:
+            proc.terminate()
+            proc.wait()
 
 
 def celery_worker(config_module, file_job_dir, queue='celery'):
-    cmd_args = ('worker', '--config', config_module, '--queue', queue)
+    cmd_args = (
+        'worker', '--config', config_module, '--queue', queue,
+        '--loglevel', 'INFO',
+        )
     environ = dict(os.environ)
     environ['FILE_JOB_DIR'] = file_job_dir
     return running('celery', cmd_args, environ, cwd=get_root())

=== modified file 'src/lazr/jobrunner/tests/test_jobrunnerctl.py'
--- src/lazr/jobrunner/tests/test_jobrunnerctl.py	2015-08-03 05:28:51 +0000
+++ src/lazr/jobrunner/tests/test_jobrunnerctl.py	2019-03-06 12:53:46 +0000
@@ -60,6 +60,41 @@
         job.save()
         return RunFileJob.apply_async(args=(job_id, ), eta=eta)
 
+    def getpids(self, control, argv):
+        if getattr(control, 'cluster_from_argv', None) is not None:
+            # celery >= 4.0.0
+            cluster = control.cluster_from_argv(argv)
+            return [node.pid for node in cluster.getpids()]
+        else:
+            parser = NamespacedOptionParser(argv)
+            return [info[2] for info in control.getpids(parser, 'celery')]
+
+    def start(self, control, argv):
+        if getattr(control, 'Cluster', None) is not None:
+            # celery >= 4.0.0
+            control.start(*argv)
+        else:
+            control.start(argv, 'celery')
+
+    def kill(self, control, argv):
+        if getattr(control, 'Cluster', None) is not None:
+            # celery >= 4.0.0
+            control.kill(*argv)
+        else:
+            control.kill(argv, 'celery')
+
+    def node_alive(self, control, argv, pid):
+        if getattr(control, 'Cluster', None) is not None:
+            # celery >= 4.0.0
+            cluster = control.cluster_from_argv(argv)
+            for node in cluster:
+                if node.pid == pid:
+                    return node.alive()
+            else:
+                return False
+        else:
+            return control.node_alive(pid)
+
     def test_JobRunnerCtl_starts_stops_celery_worker(self):
         with tempdir() as temp_dir:
             config = 'lazr.jobrunner.tests.config_no_prefetch'
@@ -68,21 +103,19 @@
                 'worker', '--config=%s' % config, 'node_name', '-Q:node_name',
                 'celery',
                 ]
-            parser = NamespacedOptionParser(argv)
             # We may have a stale PID file.
-            old_pids = [info[2] for info in control.getpids(parser, 'celery')]
-            control.start(argv, 'celery')
+            old_pids = self.getpids(control, argv)
+            self.start(control, argv)
             sleep(1)
-            current_pids = [
-                info[2] for info in control.getpids(parser, 'celery')]
+            current_pids = self.getpids(control, argv)
             self.assertTrue(len(current_pids) > 0)
             self.assertNotEqual(old_pids, current_pids)
             for pid in current_pids:
-                self.assertTrue(control.node_alive(pid))
-            control.kill(argv, 'celery')
+                self.assertTrue(self.node_alive(control, argv, pid))
+            self.kill(control, argv)
             sleep(1)
             for pid in current_pids:
-                self.assertFalse(control.node_alive(pid))
+                self.assertFalse(self.node_alive(control, argv, pid))
 
     def test_JobRunnerCtl_kill_does_not_lose_jobs(self):
         # If a celeryd instance is killed while it executes a task
@@ -99,13 +132,13 @@
                 '--config=%s' % config, 'node_name', '-Q:node_name', 'celery',
                 '-c:node_name', '1',
                 ]
-            control.start(argv, 'celery')
-            control.kill(argv, 'celery')
+            self.start(control, argv)
+            self.kill(control, argv)
             sleep(1)
-            control.start(argv, 'celery')
+            self.start(control, argv)
             for job in all_jobs:
                 job.wait(10)
-            control.kill(argv, 'celery')
+            self.kill(control, argv)
 
     def test_JobRunnerCtl_kill_does_not_lose_jobs_with_eta(self):
         with tempdir() as temp_dir:
@@ -120,11 +153,11 @@
                 '--config=%s' % config, 'node_name', '-Q:node_name', 'celery',
                 '-c:node_name', '1',
                 ]
-            control.start(argv, 'celery')
-            sleep(1)
-            control.kill(argv, 'celery')
-            sleep(1)
-            control.start(argv, 'celery')
+            self.start(control, argv)
+            sleep(1)
+            self.kill(control, argv)
+            sleep(1)
+            self.start(control, argv)
             for job in all_jobs:
                 job.wait(10)
-            control.kill(argv, 'celery')
+            self.kill(control, argv)

=== modified file 'src/lazr/jobrunner/tests/time_limit_config.py'
--- src/lazr/jobrunner/tests/time_limit_config.py	2012-03-21 20:38:50 +0000
+++ src/lazr/jobrunner/tests/time_limit_config.py	2019-03-06 12:53:46 +0000
@@ -1,4 +1,4 @@
-BROKER_VHOST = "/"
+BROKER_URL = "amqp://"
 CELERY_RESULT_BACKEND = "amqp"
 CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
 CELERYD_CONCURRENCY = 1

=== modified file 'tox.ini'
--- tox.ini	2019-03-06 12:53:46 +0000
+++ tox.ini	2019-03-06 12:53:46 +0000
@@ -1,6 +1,6 @@
 [tox]
 envlist =
-    py27-celery31
+    py27-celery{31,40,41,42}
 
 [testenv]
 commands =
@@ -9,3 +9,8 @@
     .[test]
     zope.testrunner
     celery31: celery>=3.1,<4.0
+    celery40: celery>=4.0,<4.1
+    # https://github.com/celery/kombu/issues/870
+    celery40: kombu<4.2
+    celery41: celery>=4.1,<4.2
+    celery42: celery>=4.2,<4.3