launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #19117
[Merge] lp:~wgrant/lazr.jobrunner/celery-3.1 into lp:lazr.jobrunner
William Grant has proposed merging lp:~wgrant/lazr.jobrunner/celery-3.1 into lp:lazr.jobrunner.
Commit message:
Support and require celery >= 3.0.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~wgrant/lazr.jobrunner/celery-3.1/+merge/266686
Support and require celery >= 3.0.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~wgrant/lazr.jobrunner/celery-3.1 into lp:lazr.jobrunner.
=== modified file 'NEWS.txt'
--- NEWS.txt 2013-06-14 00:38:07 +0000
+++ NEWS.txt 2015-08-03 08:08:38 +0000
@@ -1,6 +1,10 @@
News
====
+0.13
+----
+* Support and require celery >= 3.0.
+
0.12
----
* Only run the job if its is_runnable property is True.
=== modified file 'bootstrap.py'
--- bootstrap.py 2012-02-23 10:32:17 +0000
+++ bootstrap.py 2015-08-03 08:08:38 +0000
@@ -1,6 +1,6 @@
##############################################################################
#
-# Copyright (c) 2006 Zope Corporation and Contributors.
+# Copyright (c) 2006 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
@@ -16,98 +16,245 @@
Simply run this script in a directory containing a buildout.cfg.
The script accepts buildout command-line options, so you can
use the -c option to specify an alternate configuration file.
-
-$Id: bootstrap.py 102545 2009-08-06 14:49:47Z chrisw $
"""
-import os, shutil, sys, tempfile, urllib2
+import os, shutil, sys, tempfile, textwrap, urllib, urllib2, subprocess
from optparse import OptionParser
-tmpeggs = tempfile.mkdtemp()
+if sys.platform == 'win32':
+ def quote(c):
+ if ' ' in c:
+ return '"%s"' % c # work around spawn lamosity on windows
+ else:
+ return c
+else:
+ quote = str
+
+# See zc.buildout.easy_install._has_broken_dash_S for motivation and comments.
+stdout, stderr = subprocess.Popen(
+ [sys.executable, '-Sc',
+ 'try:\n'
+ ' import ConfigParser\n'
+ 'except ImportError:\n'
+ ' print 1\n'
+ 'else:\n'
+ ' print 0\n'],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
+has_broken_dash_S = bool(int(stdout.strip()))
+
+# In order to be more robust in the face of system Pythons, we want to
+# run without site-packages loaded. This is somewhat tricky, in
+# particular because Python 2.6's distutils imports site, so starting
+# with the -S flag is not sufficient. However, we'll start with that:
+if not has_broken_dash_S and 'site' in sys.modules:
+ # We will restart with python -S.
+ args = sys.argv[:]
+ args[0:0] = [sys.executable, '-S']
+ args = map(quote, args)
+ os.execv(sys.executable, args)
+# Now we are running with -S. We'll get the clean sys.path, import site
+# because distutils will do it later, and then reset the path and clean
+# out any namespace packages from site-packages that might have been
+# loaded by .pth files.
+clean_path = sys.path[:]
+import site
+sys.path[:] = clean_path
+for k, v in sys.modules.items():
+ if k in ('setuptools', 'pkg_resources') or (
+ hasattr(v, '__path__') and
+ len(v.__path__)==1 and
+ not os.path.exists(os.path.join(v.__path__[0],'__init__.py'))):
+ # This is a namespace package. Remove it.
+ sys.modules.pop(k)
is_jython = sys.platform.startswith('java')
+setuptools_source = 'http://peak.telecommunity.com/dist/ez_setup.py'
+distribute_source = 'http://python-distribute.org/distribute_setup.py'
+
# parsing arguments
-parser = OptionParser()
+def normalize_to_url(option, opt_str, value, parser):
+ if value:
+ if '://' not in value: # It doesn't smell like a URL.
+ value = 'file://%s' % (
+ urllib.pathname2url(
+ os.path.abspath(os.path.expanduser(value))),)
+ if opt_str == '--download-base' and not value.endswith('/'):
+ # Download base needs a trailing slash to make the world happy.
+ value += '/'
+ else:
+ value = None
+ name = opt_str[2:].replace('-', '_')
+ setattr(parser.values, name, value)
+
+usage = '''\
+[DESIRED PYTHON FOR BUILDOUT] bootstrap.py [options]
+
+Bootstraps a buildout-based project.
+
+Simply run this script in a directory containing a buildout.cfg, using the
+Python that you want bin/buildout to use.
+
+Note that by using --setup-source and --download-base to point to
+local resources, you can keep this script from going over the network.
+'''
+
+parser = OptionParser(usage=usage)
parser.add_option("-v", "--version", dest="version",
help="use a specific zc.buildout version")
parser.add_option("-d", "--distribute",
- action="store_true", dest="distribute", default=True,
- help="Use Disribute rather than Setuptools.")
+ action="store_true", dest="use_distribute", default=False,
+ help="Use Distribute rather than Setuptools.")
+parser.add_option("--setup-source", action="callback", dest="setup_source",
+ callback=normalize_to_url, nargs=1, type="string",
+ help=("Specify a URL or file location for the setup file. "
+ "If you use Setuptools, this will default to " +
+ setuptools_source + "; if you use Distribute, this "
+ "will default to " + distribute_source +"."))
+parser.add_option("--download-base", action="callback", dest="download_base",
+ callback=normalize_to_url, nargs=1, type="string",
+ help=("Specify a URL or directory for downloading "
+ "zc.buildout and either Setuptools or Distribute. "
+ "Defaults to PyPI."))
+parser.add_option("--eggs",
+ help=("Specify a directory for storing eggs. Defaults to "
+ "a temporary directory that is deleted when the "
+ "bootstrap script completes."))
+parser.add_option("-t", "--accept-buildout-test-releases",
+ dest='accept_buildout_test_releases',
+ action="store_true", default=False,
+ help=("Normally, if you do not specify a --version, the "
+ "bootstrap script and buildout gets the newest "
+ "*final* versions of zc.buildout and its recipes and "
+ "extensions for you. If you use this flag, "
+ "bootstrap and buildout will get the newest releases "
+ "even if they are alphas or betas."))
+parser.add_option("-c", None, action="store", dest="config_file",
+ help=("Specify the path to the buildout configuration "
+ "file to be used."))
options, args = parser.parse_args()
-if options.version is not None:
- VERSION = '==%s' % options.version
+# if -c was provided, we push it back into args for buildout's main function
+if options.config_file is not None:
+ args += ['-c', options.config_file]
+
+if options.eggs:
+ eggs_dir = os.path.abspath(os.path.expanduser(options.eggs))
else:
- VERSION = ''
-
-USE_DISTRIBUTE = options.distribute
-args = args + ['bootstrap']
-
-to_reload = False
+ eggs_dir = tempfile.mkdtemp()
+
+if options.setup_source is None:
+ if options.use_distribute:
+ options.setup_source = distribute_source
+ else:
+ options.setup_source = setuptools_source
+
+if options.accept_buildout_test_releases:
+ args.append('buildout:accept-buildout-test-releases=true')
+args.append('bootstrap')
+
try:
import pkg_resources
+ import setuptools # A flag. Sometimes pkg_resources is installed alone.
if not hasattr(pkg_resources, '_distribute'):
- to_reload = True
raise ImportError
except ImportError:
+ ez_code = urllib2.urlopen(
+ options.setup_source).read().replace('\r\n', '\n')
ez = {}
- if USE_DISTRIBUTE:
- exec urllib2.urlopen('http://python-distribute.org/distribute_setup.py'
- ).read() in ez
- ez['use_setuptools'](to_dir=tmpeggs, download_delay=0, no_fake=True)
- else:
- exec urllib2.urlopen('http://peak.telecommunity.com/dist/ez_setup.py'
- ).read() in ez
- ez['use_setuptools'](to_dir=tmpeggs, download_delay=0)
-
- if to_reload:
- reload(pkg_resources)
- else:
- import pkg_resources
-
-if sys.platform == 'win32':
- def quote(c):
- if ' ' in c:
- return '"%s"' % c # work around spawn lamosity on windows
- else:
- return c
-else:
- def quote (c):
- return c
-
-cmd = 'from setuptools.command.easy_install import main; main()'
-ws = pkg_resources.working_set
-
-if USE_DISTRIBUTE:
- requirement = 'distribute'
-else:
- requirement = 'setuptools'
+ exec ez_code in ez
+ setup_args = dict(to_dir=eggs_dir, download_delay=0)
+ if options.download_base:
+ setup_args['download_base'] = options.download_base
+ if options.use_distribute:
+ setup_args['no_fake'] = True
+ ez['use_setuptools'](**setup_args)
+ if 'pkg_resources' in sys.modules:
+ reload(sys.modules['pkg_resources'])
+ import pkg_resources
+ # This does not (always?) update the default working set. We will
+ # do it.
+ for path in sys.path:
+ if path not in pkg_resources.working_set.entries:
+ pkg_resources.working_set.add_entry(path)
+
+cmd = [quote(sys.executable),
+ '-c',
+ quote('from setuptools.command.easy_install import main; main()'),
+ '-mqNxd',
+ quote(eggs_dir)]
+
+if not has_broken_dash_S:
+ cmd.insert(1, '-S')
+
+find_links = options.download_base
+if not find_links:
+ find_links = os.environ.get('bootstrap-testing-find-links')
+if find_links:
+ cmd.extend(['-f', quote(find_links)])
+
+if options.use_distribute:
+ setup_requirement = 'distribute'
+else:
+ setup_requirement = 'setuptools'
+ws = pkg_resources.working_set
+setup_requirement_path = ws.find(
+ pkg_resources.Requirement.parse(setup_requirement)).location
+env = dict(
+ os.environ,
+ PYTHONPATH=setup_requirement_path)
+
+requirement = 'zc.buildout'
+version = options.version
+if version is None and not options.accept_buildout_test_releases:
+ # Figure out the most recent final version of zc.buildout.
+ import setuptools.package_index
+ _final_parts = '*final-', '*final'
+ def _final_version(parsed_version):
+ for part in parsed_version:
+ if (part[:1] == '*') and (part not in _final_parts):
+ return False
+ return True
+ index = setuptools.package_index.PackageIndex(
+ search_path=[setup_requirement_path])
+ if find_links:
+ index.add_find_links((find_links,))
+ req = pkg_resources.Requirement.parse(requirement)
+ if index.obtain(req) is not None:
+ best = []
+ bestv = None
+ for dist in index[req.project_name]:
+ distv = dist.parsed_version
+ if _final_version(distv):
+ if bestv is None or distv > bestv:
+ best = [dist]
+ bestv = distv
+ elif distv == bestv:
+ best.append(dist)
+ if best:
+ best.sort()
+ version = best[-1].version
+if version:
+ requirement = '=='.join((requirement, version))
+cmd.append(requirement)
if is_jython:
import subprocess
-
- assert subprocess.Popen([sys.executable] + ['-c', quote(cmd), '-mqNxd',
- quote(tmpeggs), 'zc.buildout' + VERSION],
- env=dict(os.environ,
- PYTHONPATH=
- ws.find(pkg_resources.Requirement.parse(requirement)).location
- ),
- ).wait() == 0
-
-else:
- assert os.spawnle(
- os.P_WAIT, sys.executable, quote (sys.executable),
- '-c', quote (cmd), '-mqNxd', quote (tmpeggs), 'zc.buildout' + VERSION,
- dict(os.environ,
- PYTHONPATH=
- ws.find(pkg_resources.Requirement.parse(requirement)).location
- ),
- ) == 0
-
-ws.add_entry(tmpeggs)
-ws.require('zc.buildout' + VERSION)
+ exitcode = subprocess.Popen(cmd, env=env).wait()
+else: # Windows prefers this, apparently; otherwise we would prefer subprocess
+ exitcode = os.spawnle(*([os.P_WAIT, sys.executable] + cmd + [env]))
+if exitcode != 0:
+ sys.stdout.flush()
+ sys.stderr.flush()
+ print ("An error occurred when trying to install zc.buildout. "
+ "Look above this message for any errors that "
+ "were output by easy_install.")
+ sys.exit(exitcode)
+
+ws.add_entry(eggs_dir)
+ws.require(requirement)
import zc.buildout.buildout
zc.buildout.buildout.main(args)
-shutil.rmtree(tmpeggs)
+if not options.eggs: # clean up temporary egg directory
+ shutil.rmtree(eggs_dir)
=== modified file 'buildout.cfg'
--- buildout.cfg 2012-07-10 09:41:52 +0000
+++ buildout.cfg 2015-08-03 08:08:38 +0000
@@ -1,5 +1,6 @@
[buildout]
parts = python scripts
+download-cache = download-cache
develop = .
eggs =
lazr.jobrunner
@@ -21,5 +22,8 @@
eggs = ${buildout:eggs}
[versions]
-celery = 2.5.1
-kombu = 2.1.1
+celery = 3.1.18
+keyring = 0.6.2
+kombu = 3.0.26
+zc.recipe.egg = 1.3.2
+z3c.recipe.scripts = 1.0.1
=== modified file 'setup.py'
--- setup.py 2013-06-13 05:22:01 +0000
+++ setup.py 2015-08-03 08:08:38 +0000
@@ -22,13 +22,13 @@
NEWS = open(os.path.join(here, 'NEWS.txt')).read()
-version = '0.12'
+version = '0.13'
install_requires = [
# List your project dependencies here.
# For more details, see:
# http://packages.python.org/distribute/setuptools.html#declaring-dependencies
- 'celery',
+ 'celery>=3.0',
]
=== modified file 'src/lazr/jobrunner/bin/clear_queues.py'
--- src/lazr/jobrunner/bin/clear_queues.py 2012-07-04 16:34:25 +0000
+++ src/lazr/jobrunner/bin/clear_queues.py 2015-08-03 08:08:38 +0000
@@ -22,7 +22,8 @@
from argparse import ArgumentParser
import os
import sys
-from amqplib.client_0_8.exceptions import AMQPChannelException
+
+import amqp
def show_queue_data(body, message):
@@ -51,13 +52,8 @@
drain_queues(
RunJob.app, [queue], callbacks=[show_queue_data],
retain=True, passive_queues=True)
- except AMQPChannelException as exc:
- if exc.amqp_reply_code == 404:
- # Unknown queue name specified; amqp_reply_text is
- # self-explaining.
- print >>sys.stderr, exc.amqp_reply_text
- else:
- raise
+ except amqp.exceptions.NotFound as exc:
+ print >>sys.stderr, exc.reply_text
def main():
=== modified file 'src/lazr/jobrunner/bin/jobrunnerctl.py'
--- src/lazr/jobrunner/bin/jobrunnerctl.py 2012-05-09 15:08:25 +0000
+++ src/lazr/jobrunner/bin/jobrunnerctl.py 2015-08-03 08:08:38 +0000
@@ -21,7 +21,7 @@
import os
import sys
-from celery.bin.celeryd_multi import MultiTool
+from celery.bin.multi import MultiTool
class JobRunnerCtl(MultiTool):
=== modified file 'src/lazr/jobrunner/celerytask.py'
--- src/lazr/jobrunner/celerytask.py 2013-06-14 00:38:07 +0000
+++ src/lazr/jobrunner/celerytask.py 2015-08-03 08:08:38 +0000
@@ -21,7 +21,7 @@
from socket import timeout
from celery.task import Task
-from kombu import Consumer, Exchange, Queue
+from kombu import Consumer
from lazr.jobrunner.jobrunner import (
JobRunner,
@@ -97,18 +97,18 @@
if callbacks is None:
callbacks = [lambda x, y: None]
bindings = []
- router = app.amqp.Router(create_missing=True)
+ router = app.amqp.Router(
+ create_missing=True,
+ queues=app.amqp.Queues(app.conf.CELERY_QUEUES, create_missing=True))
for queue_name in queue_names:
destination = router.expand_destination(queue_name)
- exchange = Exchange(destination['exchange'])
- queue = Queue(queue_name, exchange=exchange)
- bindings.append(queue)
+ bindings.append(destination['queue'])
with app.broker_connection() as connection:
# The no_ack flag is misleadingly named.
# See: https://github.com/ask/kombu/issues/130
consumer = Consumer(
connection, bindings, callbacks=callbacks, no_ack=not retain,
- auto_declare=not passive_queues)
+ auto_declare=not passive_queues, accept=['json', 'pickle'])
if passive_queues:
# This is basically copied from kombu.Queue.declare().
# We can't use this method directly because queue_declare()
=== modified file 'src/lazr/jobrunner/tests/test_celerytask.py'
--- src/lazr/jobrunner/tests/test_celerytask.py 2012-07-09 15:53:48 +0000
+++ src/lazr/jobrunner/tests/test_celerytask.py 2015-08-03 08:08:38 +0000
@@ -69,11 +69,11 @@
proc.wait()
-def celeryd(config_module, file_job_dir, queue='celery'):
- cmd_args = ('--config', config_module, '--queue', queue)
+def celery_worker(config_module, file_job_dir, queue='celery'):
+ cmd_args = ('worker', '--config', config_module, '--queue', queue)
environ = dict(os.environ)
environ['FILE_JOB_DIR'] = file_job_dir
- return running('bin/celeryd', cmd_args, environ, cwd=get_root())
+ return running('bin/celery', cmd_args, environ, cwd=get_root())
@contextlib.contextmanager
@@ -342,7 +342,7 @@
result = RunFileJob.delay(10)
self.assertIs(None, js.get_output(job))
self.assertEqual(JobStatus.WAITING, job.status)
- with celeryd('lazr.jobrunner.tests.config1', temp_dir):
+ with celery_worker('lazr.jobrunner.tests.config1', temp_dir):
result.wait(10)
job = js.get(job.job_id)
self.assertEqual('my_output', js.get_output(job))
@@ -354,7 +354,7 @@
job = FileJob(js, 10, **kwargs)
job.save()
result = RunFileJob.apply_async(args=(10, ), queue=queue)
- with celeryd(config, temp_dir, queue) as proc:
+ with celery_worker(config, temp_dir, queue) as proc:
try:
result.wait(10)
except SoftTimeLimitExceeded:
@@ -373,7 +373,7 @@
job = FileJob(js, 10, **kwargs)
job.save()
RunFileJobNoResult.apply_async(args=(10, ), queue=queue)
- with celeryd(config, temp_dir, queue) as proc:
+ with celery_worker(config, temp_dir, queue) as proc:
sleep(wait_time)
job = js.get(job.job_id)
return job, js, proc
@@ -393,7 +393,7 @@
"""Raises exception when a job exceeds the configured time limit."""
with tempdir() as temp_dir:
job, js, proc = self.run_file_job_ignore_result(
- temp_dir, wait_time=2,
+ temp_dir, wait_time=5,
config='lazr.jobrunner.tests.time_limit_config',
sleep=3)
self.assertEqual(JobStatus.FAILED, job.status)
@@ -405,7 +405,7 @@
# If a fast and a slow lane are configured, jobs which time out
# in the fast lane are queued again in the slow lane.
with tempdir() as temp_dir:
- with celeryd(
+ with celery_worker(
'lazr.jobrunner.tests.time_limit_config_slow_lane',
temp_dir, queue='standard_slow'):
# The fast lane times out after one second; the job
@@ -427,7 +427,7 @@
# If a fast and a slow lane are configured, jobs which time out
# in the fast lane are queued again in the slow lane.
with tempdir() as temp_dir:
- with celeryd(
+ with celery_worker(
'lazr.jobrunner.tests.time_limit_config_slow_lane',
temp_dir, queue='standard_slow'):
# The fast lane times out after one second; the job
@@ -501,7 +501,7 @@
stdout, stderr = proc.communicate()
return stdout, stderr
- def invokeJob(self, celery_config, task, delay=1, job_args={}):
+ def invokeJob(self, celery_config, task, delay=5, job_args={}):
"""Run the given task.
:return: The name of the result queue.
@@ -511,7 +511,7 @@
job = FileJob(js, 11, **job_args)
job.save()
task_info = task.apply_async(args=(11, ))
- with celeryd(celery_config, temp_dir):
+ with celery_worker(celery_config, temp_dir):
# Wait just long enough so that celeryd can start and
# process the job.
sleep(delay)
@@ -520,7 +520,8 @@
def successMessage(self, task_id):
return (
"%s: {'status': 'SUCCESS', 'traceback': None, 'result': None, "
- "'task_id': '%s'}\n" % (self.queueName(task_id), task_id))
+ "'task_id': '%s', 'children': []}\n"
+ % (self.queueName(task_id), task_id))
def noQueueMessage(self, task_id):
return (
=== modified file 'src/lazr/jobrunner/tests/test_jobrunnerctl.py'
--- src/lazr/jobrunner/tests/test_jobrunnerctl.py 2012-05-09 12:18:45 +0000
+++ src/lazr/jobrunner/tests/test_jobrunnerctl.py 2015-08-03 08:08:38 +0000
@@ -23,7 +23,7 @@
from time import sleep
from unittest import TestCase
-from celery.bin.celeryd_multi import NamespacedOptionParser
+from celery.bin.multi import NamespacedOptionParser
from lazr.jobrunner.bin.jobrunnerctl import JobRunnerCtl
from lazr.jobrunner.tests.test_celerytask import (
FileJob,
@@ -60,25 +60,26 @@
job.save()
return RunFileJob.apply_async(args=(job_id, ), eta=eta)
- def test_JobRunnerCtl_starts_stops_celeryd(self):
+ def test_JobRunnerCtl_starts_stops_celery_worker(self):
with tempdir() as temp_dir:
config = 'lazr.jobrunner.tests.config_no_prefetch'
control = self.getController(config, temp_dir)
argv = [
- '--config=%s' % config, 'node_name', '-Q:node_name', 'celery',
+ 'worker', '--config=%s' % config, 'node_name', '-Q:node_name',
+ 'celery',
]
parser = NamespacedOptionParser(argv)
# We may have a stale PID file.
- old_pids = [info[2] for info in control.getpids(parser, 'celeryd')]
- control.start(argv, 'celeryd')
+ old_pids = [info[2] for info in control.getpids(parser, 'celery')]
+ control.start(argv, 'celery')
sleep(1)
current_pids = [
- info[2] for info in control.getpids(parser, 'celeryd')]
+ info[2] for info in control.getpids(parser, 'celery')]
self.assertTrue(len(current_pids) > 0)
self.assertNotEqual(old_pids, current_pids)
for pid in current_pids:
self.assertTrue(control.node_alive(pid))
- control.kill(argv, 'celeryd')
+ control.kill(argv, 'celery')
sleep(1)
for pid in current_pids:
self.assertFalse(control.node_alive(pid))
@@ -98,13 +99,13 @@
'--config=%s' % config, 'node_name', '-Q:node_name', 'celery',
'-c:node_name', '1',
]
- control.start(argv, 'celeryd')
- control.kill(argv, 'celeryd')
+ control.start(argv, 'celery')
+ control.kill(argv, 'celery')
sleep(1)
- control.start(argv, 'celeryd')
+ control.start(argv, 'celery')
for job in all_jobs:
job.wait(10)
- control.kill(argv, 'celeryd')
+ control.kill(argv, 'celery')
def test_JobRunnerCtl_kill_does_not_lose_jobs_with_eta(self):
with tempdir() as temp_dir:
@@ -119,11 +120,11 @@
'--config=%s' % config, 'node_name', '-Q:node_name', 'celery',
'-c:node_name', '1',
]
- control.start(argv, 'celeryd')
- sleep(1)
- control.kill(argv, 'celeryd')
- sleep(1)
- control.start(argv, 'celeryd')
+ control.start(argv, 'celery')
+ sleep(1)
+ control.kill(argv, 'celery')
+ sleep(1)
+ control.start(argv, 'celery')
for job in all_jobs:
job.wait(10)
- control.kill(argv, 'celeryd')
+ control.kill(argv, 'celery')
=== modified file 'src/lazr/jobrunner/version.txt'
--- src/lazr/jobrunner/version.txt 2013-06-13 05:22:01 +0000
+++ src/lazr/jobrunner/version.txt 2015-08-03 08:08:38 +0000
@@ -1,1 +1,1 @@
-0.12
+0.13