← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~wgrant/lazr.jobrunner/celery-3.1 into lp:lazr.jobrunner

 

William Grant has proposed merging lp:~wgrant/lazr.jobrunner/celery-3.1 into lp:lazr.jobrunner.

Commit message:
Support and require celery >= 3.0.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~wgrant/lazr.jobrunner/celery-3.1/+merge/266686

Support and require celery >= 3.0.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~wgrant/lazr.jobrunner/celery-3.1 into lp:lazr.jobrunner.
=== modified file 'NEWS.txt'
--- NEWS.txt	2013-06-14 00:38:07 +0000
+++ NEWS.txt	2015-08-03 08:08:38 +0000
@@ -1,6 +1,10 @@
 News
 ====
 
+0.13
+----
+* Support and require celery >= 3.0.
+
 0.12
 ----
 * Only run the job if its is_runnable property is True.

=== modified file 'bootstrap.py'
--- bootstrap.py	2012-02-23 10:32:17 +0000
+++ bootstrap.py	2015-08-03 08:08:38 +0000
@@ -1,6 +1,6 @@
 ##############################################################################
 #
-# Copyright (c) 2006 Zope Corporation and Contributors.
+# Copyright (c) 2006 Zope Foundation and Contributors.
 # All Rights Reserved.
 #
 # This software is subject to the provisions of the Zope Public License,
@@ -16,98 +16,245 @@
 Simply run this script in a directory containing a buildout.cfg.
 The script accepts buildout command-line options, so you can
 use the -c option to specify an alternate configuration file.
-
-$Id: bootstrap.py 102545 2009-08-06 14:49:47Z chrisw $
 """
 
-import os, shutil, sys, tempfile, urllib2
+import os, shutil, sys, tempfile, textwrap, urllib, urllib2, subprocess
 from optparse import OptionParser
 
-tmpeggs = tempfile.mkdtemp()
+if sys.platform == 'win32':
+    def quote(c):
+        if ' ' in c:
+            return '"%s"' % c # work around spawn lamosity on windows
+        else:
+            return c
+else:
+    quote = str
+
+# See zc.buildout.easy_install._has_broken_dash_S for motivation and comments.
+stdout, stderr = subprocess.Popen(
+    [sys.executable, '-Sc',
+     'try:\n'
+     '    import ConfigParser\n'
+     'except ImportError:\n'
+     '    print 1\n'
+     'else:\n'
+     '    print 0\n'],
+    stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
+has_broken_dash_S = bool(int(stdout.strip()))
+
+# In order to be more robust in the face of system Pythons, we want to
+# run without site-packages loaded.  This is somewhat tricky, in
+# particular because Python 2.6's distutils imports site, so starting
+# with the -S flag is not sufficient.  However, we'll start with that:
+if not has_broken_dash_S and 'site' in sys.modules:
+    # We will restart with python -S.
+    args = sys.argv[:]
+    args[0:0] = [sys.executable, '-S']
+    args = map(quote, args)
+    os.execv(sys.executable, args)
+# Now we are running with -S.  We'll get the clean sys.path, import site
+# because distutils will do it later, and then reset the path and clean
+# out any namespace packages from site-packages that might have been
+# loaded by .pth files.
+clean_path = sys.path[:]
+import site
+sys.path[:] = clean_path
+for k, v in sys.modules.items():
+    if k in ('setuptools', 'pkg_resources') or (
+        hasattr(v, '__path__') and
+        len(v.__path__)==1 and
+        not os.path.exists(os.path.join(v.__path__[0],'__init__.py'))):
+        # This is a namespace package.  Remove it.
+        sys.modules.pop(k)
 
 is_jython = sys.platform.startswith('java')
 
+setuptools_source = 'http://peak.telecommunity.com/dist/ez_setup.py'
+distribute_source = 'http://python-distribute.org/distribute_setup.py'
+
 # parsing arguments
-parser = OptionParser()
+def normalize_to_url(option, opt_str, value, parser):
+    if value:
+        if '://' not in value: # It doesn't smell like a URL.
+            value = 'file://%s' % (
+                urllib.pathname2url(
+                    os.path.abspath(os.path.expanduser(value))),)
+        if opt_str == '--download-base' and not value.endswith('/'):
+            # Download base needs a trailing slash to make the world happy.
+            value += '/'
+    else:
+        value = None
+    name = opt_str[2:].replace('-', '_')
+    setattr(parser.values, name, value)
+
+usage = '''\
+[DESIRED PYTHON FOR BUILDOUT] bootstrap.py [options]
+
+Bootstraps a buildout-based project.
+
+Simply run this script in a directory containing a buildout.cfg, using the
+Python that you want bin/buildout to use.
+
+Note that by using --setup-source and --download-base to point to
+local resources, you can keep this script from going over the network.
+'''
+
+parser = OptionParser(usage=usage)
 parser.add_option("-v", "--version", dest="version",
                           help="use a specific zc.buildout version")
 parser.add_option("-d", "--distribute",
-                   action="store_true", dest="distribute", default=True,
-                   help="Use Disribute rather than Setuptools.")
+                   action="store_true", dest="use_distribute", default=False,
+                   help="Use Distribute rather than Setuptools.")
+parser.add_option("--setup-source", action="callback", dest="setup_source",
+                  callback=normalize_to_url, nargs=1, type="string",
+                  help=("Specify a URL or file location for the setup file. "
+                        "If you use Setuptools, this will default to " +
+                        setuptools_source + "; if you use Distribute, this "
+                        "will default to " + distribute_source +"."))
+parser.add_option("--download-base", action="callback", dest="download_base",
+                  callback=normalize_to_url, nargs=1, type="string",
+                  help=("Specify a URL or directory for downloading "
+                        "zc.buildout and either Setuptools or Distribute. "
+                        "Defaults to PyPI."))
+parser.add_option("--eggs",
+                  help=("Specify a directory for storing eggs.  Defaults to "
+                        "a temporary directory that is deleted when the "
+                        "bootstrap script completes."))
+parser.add_option("-t", "--accept-buildout-test-releases",
+                  dest='accept_buildout_test_releases',
+                  action="store_true", default=False,
+                  help=("Normally, if you do not specify a --version, the "
+                        "bootstrap script and buildout gets the newest "
+                        "*final* versions of zc.buildout and its recipes and "
+                        "extensions for you.  If you use this flag, "
+                        "bootstrap and buildout will get the newest releases "
+                        "even if they are alphas or betas."))
+parser.add_option("-c", None, action="store", dest="config_file",
+                   help=("Specify the path to the buildout configuration "
+                         "file to be used."))
 
 options, args = parser.parse_args()
 
-if options.version is not None:
-    VERSION = '==%s' % options.version
+# if -c was provided, we push it back into args for buildout's main function
+if options.config_file is not None:
+    args += ['-c', options.config_file]
+
+if options.eggs:
+    eggs_dir = os.path.abspath(os.path.expanduser(options.eggs))
 else:
-    VERSION = ''
-
-USE_DISTRIBUTE = options.distribute
-args = args + ['bootstrap']
-
-to_reload = False
+    eggs_dir = tempfile.mkdtemp()
+
+if options.setup_source is None:
+    if options.use_distribute:
+        options.setup_source = distribute_source
+    else:
+        options.setup_source = setuptools_source
+
+if options.accept_buildout_test_releases:
+    args.append('buildout:accept-buildout-test-releases=true')
+args.append('bootstrap')
+
 try:
     import pkg_resources
+    import setuptools # A flag.  Sometimes pkg_resources is installed alone.
     if not hasattr(pkg_resources, '_distribute'):
-        to_reload = True
         raise ImportError
 except ImportError:
+    ez_code = urllib2.urlopen(
+        options.setup_source).read().replace('\r\n', '\n')
     ez = {}
-    if USE_DISTRIBUTE:
-        exec urllib2.urlopen('http://python-distribute.org/distribute_setup.py'
-                         ).read() in ez
-        ez['use_setuptools'](to_dir=tmpeggs, download_delay=0, no_fake=True)
-    else:
-        exec urllib2.urlopen('http://peak.telecommunity.com/dist/ez_setup.py'
-                             ).read() in ez
-        ez['use_setuptools'](to_dir=tmpeggs, download_delay=0)
-
-    if to_reload:
-        reload(pkg_resources)
-    else:
-        import pkg_resources
-
-if sys.platform == 'win32':
-    def quote(c):
-        if ' ' in c:
-            return '"%s"' % c # work around spawn lamosity on windows
-        else:
-            return c
-else:
-    def quote (c):
-        return c
-
-cmd = 'from setuptools.command.easy_install import main; main()'
-ws  = pkg_resources.working_set
-
-if USE_DISTRIBUTE:
-    requirement = 'distribute'
-else:
-    requirement = 'setuptools'
+    exec ez_code in ez
+    setup_args = dict(to_dir=eggs_dir, download_delay=0)
+    if options.download_base:
+        setup_args['download_base'] = options.download_base
+    if options.use_distribute:
+        setup_args['no_fake'] = True
+    ez['use_setuptools'](**setup_args)
+    if 'pkg_resources' in sys.modules:
+        reload(sys.modules['pkg_resources'])
+    import pkg_resources
+    # This does not (always?) update the default working set.  We will
+    # do it.
+    for path in sys.path:
+        if path not in pkg_resources.working_set.entries:
+            pkg_resources.working_set.add_entry(path)
+
+cmd = [quote(sys.executable),
+       '-c',
+       quote('from setuptools.command.easy_install import main; main()'),
+       '-mqNxd',
+       quote(eggs_dir)]
+
+if not has_broken_dash_S:
+    cmd.insert(1, '-S')
+
+find_links = options.download_base
+if not find_links:
+    find_links = os.environ.get('bootstrap-testing-find-links')
+if find_links:
+    cmd.extend(['-f', quote(find_links)])
+
+if options.use_distribute:
+    setup_requirement = 'distribute'
+else:
+    setup_requirement = 'setuptools'
+ws = pkg_resources.working_set
+setup_requirement_path = ws.find(
+    pkg_resources.Requirement.parse(setup_requirement)).location
+env = dict(
+    os.environ,
+    PYTHONPATH=setup_requirement_path)
+
+requirement = 'zc.buildout'
+version = options.version
+if version is None and not options.accept_buildout_test_releases:
+    # Figure out the most recent final version of zc.buildout.
+    import setuptools.package_index
+    _final_parts = '*final-', '*final'
+    def _final_version(parsed_version):
+        for part in parsed_version:
+            if (part[:1] == '*') and (part not in _final_parts):
+                return False
+        return True
+    index = setuptools.package_index.PackageIndex(
+        search_path=[setup_requirement_path])
+    if find_links:
+        index.add_find_links((find_links,))
+    req = pkg_resources.Requirement.parse(requirement)
+    if index.obtain(req) is not None:
+        best = []
+        bestv = None
+        for dist in index[req.project_name]:
+            distv = dist.parsed_version
+            if _final_version(distv):
+                if bestv is None or distv > bestv:
+                    best = [dist]
+                    bestv = distv
+                elif distv == bestv:
+                    best.append(dist)
+        if best:
+            best.sort()
+            version = best[-1].version
+if version:
+    requirement = '=='.join((requirement, version))
+cmd.append(requirement)
 
 if is_jython:
     import subprocess
-
-    assert subprocess.Popen([sys.executable] + ['-c', quote(cmd), '-mqNxd',
-           quote(tmpeggs), 'zc.buildout' + VERSION],
-           env=dict(os.environ,
-               PYTHONPATH=
-               ws.find(pkg_resources.Requirement.parse(requirement)).location
-               ),
-           ).wait() == 0
-
-else:
-    assert os.spawnle(
-        os.P_WAIT, sys.executable, quote (sys.executable),
-        '-c', quote (cmd), '-mqNxd', quote (tmpeggs), 'zc.buildout' + VERSION,
-        dict(os.environ,
-            PYTHONPATH=
-            ws.find(pkg_resources.Requirement.parse(requirement)).location
-            ),
-        ) == 0
-
-ws.add_entry(tmpeggs)
-ws.require('zc.buildout' + VERSION)
+    exitcode = subprocess.Popen(cmd, env=env).wait()
+else: # Windows prefers this, apparently; otherwise we would prefer subprocess
+    exitcode = os.spawnle(*([os.P_WAIT, sys.executable] + cmd + [env]))
+if exitcode != 0:
+    sys.stdout.flush()
+    sys.stderr.flush()
+    print ("An error occurred when trying to install zc.buildout. "
+           "Look above this message for any errors that "
+           "were output by easy_install.")
+    sys.exit(exitcode)
+
+ws.add_entry(eggs_dir)
+ws.require(requirement)
 import zc.buildout.buildout
 zc.buildout.buildout.main(args)
-shutil.rmtree(tmpeggs)
+if not options.eggs: # clean up temporary egg directory
+    shutil.rmtree(eggs_dir)

=== modified file 'buildout.cfg'
--- buildout.cfg	2012-07-10 09:41:52 +0000
+++ buildout.cfg	2015-08-03 08:08:38 +0000
@@ -1,5 +1,6 @@
 [buildout]
 parts = python scripts
+download-cache = download-cache
 develop = .
 eggs =
     lazr.jobrunner
@@ -21,5 +22,8 @@
 eggs = ${buildout:eggs}
 
 [versions]
-celery = 2.5.1
-kombu = 2.1.1
+celery = 3.1.18
+keyring = 0.6.2
+kombu = 3.0.26
+zc.recipe.egg = 1.3.2
+z3c.recipe.scripts = 1.0.1

=== modified file 'setup.py'
--- setup.py	2013-06-13 05:22:01 +0000
+++ setup.py	2015-08-03 08:08:38 +0000
@@ -22,13 +22,13 @@
 NEWS = open(os.path.join(here, 'NEWS.txt')).read()
 
 
-version = '0.12'
+version = '0.13'
 
 install_requires = [
     # List your project dependencies here.
     # For more details, see:
     # http://packages.python.org/distribute/setuptools.html#declaring-dependencies
-    'celery',
+    'celery>=3.0',
 ]
 
 

=== modified file 'src/lazr/jobrunner/bin/clear_queues.py'
--- src/lazr/jobrunner/bin/clear_queues.py	2012-07-04 16:34:25 +0000
+++ src/lazr/jobrunner/bin/clear_queues.py	2015-08-03 08:08:38 +0000
@@ -22,7 +22,8 @@
 from argparse import ArgumentParser
 import os
 import sys
-from amqplib.client_0_8.exceptions import AMQPChannelException
+
+import amqp
 
 
 def show_queue_data(body, message):
@@ -51,13 +52,8 @@
             drain_queues(
                 RunJob.app, [queue], callbacks=[show_queue_data],
                 retain=True, passive_queues=True)
-        except AMQPChannelException as exc:
-            if exc.amqp_reply_code == 404:
-                # Unknown queue name specified; amqp_reply_text is
-                # self-explaining.
-                print >>sys.stderr, exc.amqp_reply_text
-            else:
-                raise
+        except amqp.exceptions.NotFound as exc:
+            print >>sys.stderr, exc.reply_text
 
 
 def main():

=== modified file 'src/lazr/jobrunner/bin/jobrunnerctl.py'
--- src/lazr/jobrunner/bin/jobrunnerctl.py	2012-05-09 15:08:25 +0000
+++ src/lazr/jobrunner/bin/jobrunnerctl.py	2015-08-03 08:08:38 +0000
@@ -21,7 +21,7 @@
 import os
 import sys
 
-from celery.bin.celeryd_multi import MultiTool
+from celery.bin.multi import MultiTool
 
 
 class JobRunnerCtl(MultiTool):

=== modified file 'src/lazr/jobrunner/celerytask.py'
--- src/lazr/jobrunner/celerytask.py	2013-06-14 00:38:07 +0000
+++ src/lazr/jobrunner/celerytask.py	2015-08-03 08:08:38 +0000
@@ -21,7 +21,7 @@
 from socket import timeout
 
 from celery.task import Task
-from kombu import Consumer, Exchange, Queue
+from kombu import Consumer
 
 from lazr.jobrunner.jobrunner import (
     JobRunner,
@@ -97,18 +97,18 @@
     if callbacks is None:
         callbacks = [lambda x, y: None]
     bindings = []
-    router = app.amqp.Router(create_missing=True)
+    router = app.amqp.Router(
+        create_missing=True,
+        queues=app.amqp.Queues(app.conf.CELERY_QUEUES, create_missing=True))
     for queue_name in queue_names:
         destination = router.expand_destination(queue_name)
-        exchange = Exchange(destination['exchange'])
-        queue = Queue(queue_name, exchange=exchange)
-        bindings.append(queue)
+        bindings.append(destination['queue'])
     with app.broker_connection() as connection:
         # The no_ack flag is misleadingly named.
         # See: https://github.com/ask/kombu/issues/130
         consumer = Consumer(
             connection, bindings, callbacks=callbacks, no_ack=not retain,
-            auto_declare=not passive_queues)
+            auto_declare=not passive_queues, accept=['json', 'pickle'])
         if passive_queues:
             # This is basically copied from kombu.Queue.declare().
             # We can't use this method directly because queue_declare()

=== modified file 'src/lazr/jobrunner/tests/test_celerytask.py'
--- src/lazr/jobrunner/tests/test_celerytask.py	2012-07-09 15:53:48 +0000
+++ src/lazr/jobrunner/tests/test_celerytask.py	2015-08-03 08:08:38 +0000
@@ -69,11 +69,11 @@
         proc.wait()
 
 
-def celeryd(config_module, file_job_dir, queue='celery'):
-    cmd_args = ('--config', config_module, '--queue', queue)
+def celery_worker(config_module, file_job_dir, queue='celery'):
+    cmd_args = ('worker', '--config', config_module, '--queue', queue)
     environ = dict(os.environ)
     environ['FILE_JOB_DIR'] = file_job_dir
-    return running('bin/celeryd', cmd_args, environ, cwd=get_root())
+    return running('bin/celery', cmd_args, environ, cwd=get_root())
 
 
 @contextlib.contextmanager
@@ -342,7 +342,7 @@
             result = RunFileJob.delay(10)
             self.assertIs(None, js.get_output(job))
             self.assertEqual(JobStatus.WAITING, job.status)
-            with celeryd('lazr.jobrunner.tests.config1', temp_dir):
+            with celery_worker('lazr.jobrunner.tests.config1', temp_dir):
                 result.wait(10)
             job = js.get(job.job_id)
             self.assertEqual('my_output', js.get_output(job))
@@ -354,7 +354,7 @@
         job = FileJob(js, 10, **kwargs)
         job.save()
         result = RunFileJob.apply_async(args=(10, ), queue=queue)
-        with celeryd(config, temp_dir, queue) as proc:
+        with celery_worker(config, temp_dir, queue) as proc:
             try:
                 result.wait(10)
             except SoftTimeLimitExceeded:
@@ -373,7 +373,7 @@
         job = FileJob(js, 10, **kwargs)
         job.save()
         RunFileJobNoResult.apply_async(args=(10, ), queue=queue)
-        with celeryd(config, temp_dir, queue) as proc:
+        with celery_worker(config, temp_dir, queue) as proc:
             sleep(wait_time)
         job = js.get(job.job_id)
         return job, js, proc
@@ -393,7 +393,7 @@
         """Raises exception when a job exceeds the configured time limit."""
         with tempdir() as temp_dir:
             job, js, proc = self.run_file_job_ignore_result(
-                temp_dir, wait_time=2,
+                temp_dir, wait_time=5,
                 config='lazr.jobrunner.tests.time_limit_config',
                 sleep=3)
         self.assertEqual(JobStatus.FAILED, job.status)
@@ -405,7 +405,7 @@
         # If a fast and a slow lane are configured, jobs which time out
         # in the fast lane are queued again in the slow lane.
         with tempdir() as temp_dir:
-            with celeryd(
+            with celery_worker(
                 'lazr.jobrunner.tests.time_limit_config_slow_lane',
                 temp_dir, queue='standard_slow'):
                 # The fast lane times out after one second; the job
@@ -427,7 +427,7 @@
         # If a fast and a slow lane are configured, jobs which time out
         # in the fast lane are queued again in the slow lane.
         with tempdir() as temp_dir:
-            with celeryd(
+            with celery_worker(
                 'lazr.jobrunner.tests.time_limit_config_slow_lane',
                 temp_dir, queue='standard_slow'):
                 # The fast lane times out after one second; the job
@@ -501,7 +501,7 @@
         stdout, stderr = proc.communicate()
         return stdout, stderr
 
-    def invokeJob(self, celery_config, task, delay=1, job_args={}):
+    def invokeJob(self, celery_config, task, delay=5, job_args={}):
         """Run the given task.
 
         :return: The name of the result queue.
@@ -511,7 +511,7 @@
             job = FileJob(js, 11, **job_args)
             job.save()
             task_info = task.apply_async(args=(11, ))
-            with celeryd(celery_config, temp_dir):
+            with celery_worker(celery_config, temp_dir):
                 # Wait just long enough so that celeryd can start and
                 # process the job.
                 sleep(delay)
@@ -520,7 +520,8 @@
     def successMessage(self, task_id):
         return (
             "%s: {'status': 'SUCCESS', 'traceback': None, 'result': None, "
-            "'task_id': '%s'}\n" % (self.queueName(task_id), task_id))
+            "'task_id': '%s', 'children': []}\n"
+            % (self.queueName(task_id), task_id))
 
     def noQueueMessage(self, task_id):
         return (

=== modified file 'src/lazr/jobrunner/tests/test_jobrunnerctl.py'
--- src/lazr/jobrunner/tests/test_jobrunnerctl.py	2012-05-09 12:18:45 +0000
+++ src/lazr/jobrunner/tests/test_jobrunnerctl.py	2015-08-03 08:08:38 +0000
@@ -23,7 +23,7 @@
 from time import sleep
 from unittest import TestCase
 
-from celery.bin.celeryd_multi import NamespacedOptionParser
+from celery.bin.multi import NamespacedOptionParser
 from lazr.jobrunner.bin.jobrunnerctl import JobRunnerCtl
 from lazr.jobrunner.tests.test_celerytask import (
     FileJob,
@@ -60,25 +60,26 @@
         job.save()
         return RunFileJob.apply_async(args=(job_id, ), eta=eta)
 
-    def test_JobRunnerCtl_starts_stops_celeryd(self):
+    def test_JobRunnerCtl_starts_stops_celery_worker(self):
         with tempdir() as temp_dir:
             config = 'lazr.jobrunner.tests.config_no_prefetch'
             control = self.getController(config, temp_dir)
             argv = [
-                '--config=%s' % config, 'node_name', '-Q:node_name', 'celery',
+                'worker', '--config=%s' % config, 'node_name', '-Q:node_name',
+                'celery',
                 ]
             parser = NamespacedOptionParser(argv)
             # We may have a stale PID file.
-            old_pids = [info[2] for info in control.getpids(parser, 'celeryd')]
-            control.start(argv, 'celeryd')
+            old_pids = [info[2] for info in control.getpids(parser, 'celery')]
+            control.start(argv, 'celery')
             sleep(1)
             current_pids = [
-                info[2] for info in control.getpids(parser, 'celeryd')]
+                info[2] for info in control.getpids(parser, 'celery')]
             self.assertTrue(len(current_pids) > 0)
             self.assertNotEqual(old_pids, current_pids)
             for pid in current_pids:
                 self.assertTrue(control.node_alive(pid))
-            control.kill(argv, 'celeryd')
+            control.kill(argv, 'celery')
             sleep(1)
             for pid in current_pids:
                 self.assertFalse(control.node_alive(pid))
@@ -98,13 +99,13 @@
                 '--config=%s' % config, 'node_name', '-Q:node_name', 'celery',
                 '-c:node_name', '1',
                 ]
-            control.start(argv, 'celeryd')
-            control.kill(argv, 'celeryd')
+            control.start(argv, 'celery')
+            control.kill(argv, 'celery')
             sleep(1)
-            control.start(argv, 'celeryd')
+            control.start(argv, 'celery')
             for job in all_jobs:
                 job.wait(10)
-            control.kill(argv, 'celeryd')
+            control.kill(argv, 'celery')
 
     def test_JobRunnerCtl_kill_does_not_lose_jobs_with_eta(self):
         with tempdir() as temp_dir:
@@ -119,11 +120,11 @@
                 '--config=%s' % config, 'node_name', '-Q:node_name', 'celery',
                 '-c:node_name', '1',
                 ]
-            control.start(argv, 'celeryd')
-            sleep(1)
-            control.kill(argv, 'celeryd')
-            sleep(1)
-            control.start(argv, 'celeryd')
+            control.start(argv, 'celery')
+            sleep(1)
+            control.kill(argv, 'celery')
+            sleep(1)
+            control.start(argv, 'celery')
             for job in all_jobs:
                 job.wait(10)
-            control.kill(argv, 'celeryd')
+            control.kill(argv, 'celery')

=== modified file 'src/lazr/jobrunner/version.txt'
--- src/lazr/jobrunner/version.txt	2013-06-13 05:22:01 +0000
+++ src/lazr/jobrunner/version.txt	2015-08-03 08:08:38 +0000
@@ -1,1 +1,1 @@
-0.12
+0.13