launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #23446
[Merge] lp:~cjwatson/launchpad/bionic-gpg into lp:launchpad
Colin Watson has proposed merging lp:~cjwatson/launchpad/bionic-gpg into lp:launchpad.
Commit message:
Fix GPG handling to support bionic.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/bionic-gpg/+merge/364982
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/bionic-gpg into lp:launchpad.
=== modified file 'constraints.txt'
--- constraints.txt 2019-01-21 17:45:55 +0000
+++ constraints.txt 2019-03-22 19:20:45 +0000
@@ -217,8 +217,7 @@
# post1 Don't add a process back to the ready set if it received an error
# such as a timeout.
ampoule==0.2.0.post1
-amqp==1.4.9
-amqplib==1.0.2
+amqp==2.4.2
anyjson==0.3.3
appdirs==1.4.3
asn1crypto==0.23.0
@@ -231,10 +230,10 @@
backports.lzma==0.0.3
bcrypt==3.1.4
BeautifulSoup==3.2.1
-billiard==3.3.0.23
+billiard==3.5.0.5
bson==0.3.3
bzr==2.6.0.lp.3
-celery==3.1.26.post2
+celery==4.1.1
cffi==1.11.2
Chameleon==2.11
chardet==3.0.4
@@ -273,7 +272,7 @@
iso8601==0.1.12
jsautobuild==0.2
keyring==0.6.2
-kombu==3.0.37
+kombu==4.4.0
launchpad-buildd==159
launchpadlib==1.10.5
lazr.authentication==0.1.1
@@ -281,7 +280,7 @@
lazr.config==2.2.1
lazr.delegates==2.0.4
lazr.enum==1.1.3
-lazr.jobrunner==0.13
+lazr.jobrunner==0.14
lazr.lifecycle==1.1
lazr.restful==0.20.1
lazr.restfulclient==0.13.2
@@ -302,7 +301,7 @@
netaddr==0.7.19
oauth==1.0
oops==0.0.13
-oops-amqp==0.0.8b1
+oops-amqp==0.1.0
oops-datedir-repo==0.0.23
oops-timeline==0.0.1
oops-twisted==0.0.7
@@ -338,7 +337,7 @@
python-openid==2.2.5-fix1034376
python-swiftclient==2.0.3
PyYAML==3.10
-rabbitfixture==0.3.6
+rabbitfixture==0.4.0
requests==2.7.0
requests-file==1.4.3
requests-toolbelt==0.6.2
@@ -371,6 +370,7 @@
typing==3.6.2
unittest2==1.1.0
van.testing==3.0.0
+vine==1.1.4
virtualenv-tools3==2.0.0
wadllib==1.3.2
wheel==0.29.0
=== modified file 'lib/lp/services/gpg/doc/gpg-signatures.txt'
--- lib/lp/services/gpg/doc/gpg-signatures.txt 2019-03-07 15:05:17 +0000
+++ lib/lp/services/gpg/doc/gpg-signatures.txt 2019-03-22 19:20:45 +0000
@@ -168,8 +168,12 @@
>>> gpghandler.getVerifiedSignature(content)
Traceback (most recent call last):
...
+<<<<<<< TREE
GPGKeyDoesNotExistOnServer: GPG key E192C0543B1BB2EB does not exist on the
keyserver.
+=======
+ GPGVerificationError: (..., 9, u'No public key')
+>>>>>>> MERGE-SOURCE
Due to unpredictable behaviour between the production system and
the external keyserver, we have a resilient signature verifier,
@@ -183,9 +187,15 @@
Traceback (most recent call last):
...
GPGVerificationError: Verification failed 3 times:
+<<<<<<< TREE
['GPG key E192C0543B1BB2EB does not exist on the keyserver.',
'GPG key E192C0543B1BB2EB does not exist on the keyserver.',
'GPG key E192C0543B1BB2EB does not exist on the keyserver.']
+=======
+ ["(..., 9, u'No public key')",
+ "(..., 9, u'No public key')",
+ "(..., 9, u'No public key')"]
+>>>>>>> MERGE-SOURCE
Debugging exceptions
=== modified file 'lib/lp/services/gpg/handler.py'
--- lib/lp/services/gpg/handler.py 2019-03-18 10:12:37 +0000
+++ lib/lp/services/gpg/handler.py 2019-03-22 19:20:45 +0000
@@ -31,6 +31,8 @@
from lp.app.validators.email import valid_email
from lp.services.config import config
from lp.services.gpg.interfaces import (
+ get_gpg_path,
+ get_gpgme_context,
GPGKeyAlgorithm,
GPGKeyDoesNotExistOnServer,
GPGKeyExpired,
@@ -116,14 +118,6 @@
atexit.register(removeHome, self.home)
- def _getContext(self):
- """Return a new appropriately-configured GPGME context."""
- context = gpgme.Context()
- # Stick to GnuPG 1.
- context.set_engine_info(gpgme.PROTOCOL_OpenPGP, "/usr/bin/gpg", None)
- context.armor = True
- return context
-
def sanitizeFingerprint(self, fingerprint):
"""See IGPGHandler."""
return sanitize_fingerprint(fingerprint)
@@ -162,6 +156,7 @@
raise GPGVerificationError(
"Verification failed 3 times: %s " % stored_errors)
+<<<<<<< TREE
def _rawVerifySignature(self, ctx, content, signature=None):
"""Internals of `getVerifiedSignature`.
@@ -169,6 +164,16 @@
the correct fingerprint, and once after retrieving the corresponding
key from the keyserver.
"""
+=======
+ def getVerifiedSignature(self, content, signature=None):
+ """See IGPGHandler."""
+
+ assert not isinstance(content, unicode)
+ assert not isinstance(signature, unicode)
+
+ ctx = get_gpgme_context()
+
+>>>>>>> MERGE-SOURCE
# from `info gpgme` about gpgme_op_verify(SIG, SIGNED_TEXT, PLAIN):
#
# If SIG is a detached signature, then the signed text should be
@@ -273,7 +278,7 @@
def importPublicKey(self, content):
"""See IGPGHandler."""
assert isinstance(content, str)
- context = self._getContext()
+ context = get_gpgme_context()
newkey = StringIO(content)
with gpgme_timeline("import", "new public key"):
@@ -308,7 +313,7 @@
if 'GPG_AGENT_INFO' in os.environ:
del os.environ['GPG_AGENT_INFO']
- context = self._getContext()
+ context = get_gpgme_context()
newkey = StringIO(content)
with gpgme_timeline("import", "new secret key"):
import_result = context.import_(newkey)
@@ -334,7 +339,7 @@
def generateKey(self, name):
"""See `IGPGHandler`."""
- context = self._getContext()
+ context = get_gpgme_context()
# Make sure that gpg-agent doesn't interfere.
if 'GPG_AGENT_INFO' in os.environ:
@@ -372,8 +377,7 @@
if isinstance(content, unicode):
raise TypeError('Content cannot be Unicode.')
- # setup context
- ctx = self._getContext()
+ ctx = get_gpgme_context()
# setup containers
plain = StringIO(content)
@@ -404,7 +408,7 @@
# Find the key and make it the only one allowed to sign content
# during this session.
- context = self._getContext()
+ context = get_gpgme_context()
context.signers = [removeSecurityProxy(key.key)]
# Set up containers.
@@ -432,7 +436,7 @@
"""Get an iterator of the keys this gpg handler
already knows about.
"""
- ctx = self._getContext()
+ ctx = get_gpgme_context()
# XXX michaeln 2010-05-07 bug=576405
# Currently gpgme.Context().keylist fails if passed a unicode
@@ -457,8 +461,13 @@
if not key.exists_in_local_keyring:
pubkey = self._getPubKey(fingerprint)
key = self.importPublicKey(pubkey)
+<<<<<<< TREE
if not key.matches(fingerprint):
ctx = self._getContext()
+=======
+ if fingerprint != key.fingerprint:
+ ctx = get_gpgme_context()
+>>>>>>> MERGE-SOURCE
with gpgme_timeline("delete", key.fingerprint):
ctx.delete(key.key)
raise GPGKeyMismatchOnServer(fingerprint, key.fingerprint)
@@ -602,17 +611,9 @@
self._buildFromGpgmeKey(key)
return self
- def _getContext(self):
- """Return a new appropriately-configured GPGME context."""
- context = gpgme.Context()
- # Stick to GnuPG 1.
- context.set_engine_info(gpgme.PROTOCOL_OpenPGP, "/usr/bin/gpg", None)
- context.armor = True
- return context
-
def _buildFromFingerprint(self, fingerprint):
"""Build key information from a fingerprint."""
- context = self._getContext()
+ context = get_gpgme_context()
# retrive additional key information
try:
with gpgme_timeline("get-key", fingerprint):
@@ -658,11 +659,12 @@
# XXX cprov 20081014: gpgme_op_export() only supports public keys.
# See http://www.fifi.org/cgi-bin/info2www?(gpgme)Exporting+Keys
p = subprocess.Popen(
- ['gpg', '--export-secret-keys', '-a', self.fingerprint],
+ [get_gpg_path(), '--export-secret-keys', '-a',
+ self.fingerprint],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
return p.stdout.read()
- context = self._getContext()
+ context = get_gpgme_context()
keydata = StringIO()
with gpgme_timeline("export", self.fingerprint):
context.export(self.fingerprint.encode('ascii'), keydata)
=== modified file 'lib/lp/services/gpg/interfaces.py'
--- lib/lp/services/gpg/interfaces.py 2019-03-18 10:12:37 +0000
+++ lib/lp/services/gpg/interfaces.py 2019-03-22 19:20:45 +0000
@@ -2,6 +2,8 @@
# GNU Affero General Public License version 3 (see the file LICENSE).
__all__ = [
+ 'get_gpg_path',
+ 'get_gpgme_context',
'GPGKeyAlgorithm',
'GPGKeyDoesNotExistOnServer',
'GPGKeyExpired',
@@ -22,6 +24,7 @@
]
import httplib
+import os.path
import re
from lazr.enum import (
@@ -56,6 +59,28 @@
return False
+def get_gpg_path():
+ """Return the path to the GPG executable we prefer.
+
+ We stick to GnuPG 1 until we've worked out how to get things working
+ with GnuPG 2.
+ """
+ if os.path.exists("/usr/bin/gpg1"):
+ return "/usr/bin/gpg1"
+ else:
+ return "/usr/bin/gpg"
+
+
+def get_gpgme_context():
+ """Return a new appropriately-configured GPGME context."""
+ import gpgme
+
+ context = gpgme.Context()
+ context.set_engine_info(gpgme.PROTOCOL_OpenPGP, get_gpg_path(), None)
+ context.armor = True
+ return context
+
+
# XXX: cprov 2004-10-04:
# (gpg+dbschema) the data structure should be rearranged to support 4 field
# needed: keynumber(1,16,17,20), keyalias(R,g,D,G), title and description
=== modified file 'lib/lp/services/gpg/tests/test_gpghandler.py'
--- lib/lp/services/gpg/tests/test_gpghandler.py 2019-03-18 10:12:37 +0000
+++ lib/lp/services/gpg/tests/test_gpghandler.py 2019-03-22 19:20:45 +0000
@@ -12,6 +12,7 @@
from zope.security.proxy import removeSecurityProxy
from lp.services.gpg.interfaces import (
+ get_gpg_path,
GPGKeyDoesNotExistOnServer,
GPGKeyMismatchOnServer,
GPGKeyTemporarilyNotFoundError,
@@ -327,7 +328,8 @@
# plumbing.
with open(os.devnull, "w") as devnull:
gpg_proc = subprocess.Popen(
- ["gpg", "--quiet", "--status-fd", "1", "--verify"],
+ [get_gpg_path(), "--quiet", "--status-fd", "1",
+ "--verify"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=devnull, universal_newlines=True)
status = gpg_proc.communicate(signed_content)[0].splitlines()
=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py 2019-03-06 14:52:01 +0000
+++ lib/lp/services/job/celeryconfig.py 2019-03-22 19:20:45 +0000
@@ -14,7 +14,7 @@
def check_circular_fallbacks(queue):
- """Check for curcular fallback queues.
+ """Check for circular fallback queues.
A circular chain of fallback queues could keep a job forever queued
if it times out in all queues.
@@ -41,7 +41,7 @@
queue_names = queue_names.split(' ')
for queue_name in queue_names:
celery_queues[queue_name] = {
- 'binding_key': queue_name,
+ 'routing_key': queue_name,
}
check_circular_fallbacks(queue_name)
@@ -52,7 +52,7 @@
# A queue must be specified as a command line parameter for each
# "celery worker" instance, but this is not required for a Launchpad app
# server.
- if 'celeryd' in argv[0] or ('celery' in argv[0] and argv[1] == 'worker'):
+ if 'celery' in argv[0] and argv[1] == 'worker':
if queues is None or queues == '':
raise ConfigurationError('A queue must be specified.')
queues = queues.split(',')
@@ -68,7 +68,7 @@
'Queue %s is not configured in schema-lazr.conf' % queue)
# XXX wgrant 2015-08-03: This should be set in the apply_async
# now that we're on Celery 3.1.
- result['CELERYD_TASK_SOFT_TIME_LIMIT'] = config[queue].timeout
+ result['task_soft_time_limit'] = config[queue].timeout
if config[queue].fallback_queue != '':
# XXX wgrant 2015-08-03: lazr.jobrunner actually looks for
# FALLBACK_QUEUE; this probably isn't doing anything.
@@ -76,42 +76,34 @@
# XXX wgrant 2015-08-03: This is mostly per-queue because we
# can't run *_job and *_job_slow in the same worker, which will be
# fixed once the CELERYD_TASK_SOFT_TIME_LIMIT override is gone.
- result['CELERYD_CONCURRENCY'] = config[queue].concurrency
+ result['worker_concurrency'] = config[queue].concurrency
- result['BROKER_URL'] = 'amqp://%s:%s@%s/%s' % (
+ result['broker_url'] = 'amqp://%s:%s@%s/%s' % (
config.rabbitmq.userid, config.rabbitmq.password,
config.rabbitmq.host, config.rabbitmq.virtual_host)
- # XXX wgrant 2015-08-03: Celery 3.2 won't read pickles by default,
- # and Celery 3.1 can send only pickles for some things. Let's accept
- # both until they sort things out.
- # XXX cjwatson 2019-03-06: Remove this once production is using json as
- # its task/result serialiser.
- result['CELERY_ACCEPT_CONTENT'] = ['pickle', 'json']
- result['CELERY_CREATE_MISSING_QUEUES'] = False
- result['CELERY_DEFAULT_EXCHANGE'] = 'job'
- result['CELERY_DEFAULT_QUEUE'] = 'launchpad_job'
- result['CELERY_ENABLE_UTC'] = True
- result['CELERY_IMPORTS'] = ("lp.services.job.celeryjob", )
- result['CELERY_QUEUES'] = celery_queues
- result['CELERY_RESULT_BACKEND'] = 'amqp'
- result['CELERY_RESULT_SERIALIZER'] = 'json'
- result['CELERY_TASK_SERIALIZER'] = 'json'
- result['CELERYBEAT_SCHEDULE'] = {
+ result['beat_schedule'] = {
'schedule-missing': {
'task': 'lp.services.job.celeryjob.run_missing_ready',
'schedule': timedelta(seconds=600),
'options': {
'routing_key': CELERY_BEAT_QUEUE,
},
+ }
}
- }
+ result['enable_utc'] = True
+ result['imports'] = ("lp.services.job.celeryjob", )
+ result['result_backend'] = 'amqp'
+ result['task_acks_late'] = True
+ result['task_create_missing_queues'] = False
+ result['task_default_exchange'] = 'job'
+ result['task_default_queue'] = 'launchpad_job'
+ result['task_queues'] = celery_queues
# See http://ask.github.com/celery/userguide/optimizing.html:
# The AMQP message of a job should stay in the RabbitMQ server
# until the job has been finished. This allows to simply kill
# a "celery worker" instance while a job is executed; when another
# instance is started later, it will run the aborted job again.
- result['CELERYD_PREFETCH_MULTIPLIER'] = 1
- result['CELERY_ACKS_LATE'] = True
+ result['worker_prefetch_multiplier'] = 1
return result
=== modified file 'lib/lp/services/job/celeryjob.py'
--- lib/lp/services/job/celeryjob.py 2015-08-03 07:06:45 +0000
+++ lib/lp/services/job/celeryjob.py 2019-03-22 19:20:45 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012 Canonical Ltd. This software is licensed under the
+# Copyright 2012-2019 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Celery-specific Job code.
@@ -10,8 +10,11 @@
__metaclass__ = type
__all__ = [
- 'CeleryRunJob',
- 'CeleryRunJobIgnoreResult',
+ 'celery_app',
+ 'celery_run_job',
+ 'celery_run_job_ignore_result',
+ 'find_missing_ready',
+ 'run_missing_ready',
]
from logging import info
@@ -20,7 +23,10 @@
os.environ.setdefault('CELERY_CONFIG_MODULE', 'lp.services.job.celeryconfig')
-from celery.task import Task
+from celery import (
+ Celery,
+ Task,
+ )
from lazr.jobrunner.celerytask import RunJob
from storm.zope.interfaces import IZStorm
import transaction
@@ -46,6 +52,9 @@
from lp.services import scripts
+celery_app = Celery()
+
+
class CeleryRunJob(RunJob):
"""The Celery Task that runs a job."""
@@ -67,19 +76,22 @@
super(CeleryRunJob, self).run(job_id)
-class CeleryRunJobIgnoreResult(CeleryRunJob):
-
- ignore_result = True
+@celery_app.task(base=CeleryRunJob, bind=True)
+def celery_run_job(self, job_id, dbuser):
+ super(type(self), self).run(job_id, dbuser)
+
+
+@celery_app.task(base=CeleryRunJob, bind=True, ignore_result=True)
+def celery_run_job_ignore_result(self, job_id, dbuser):
+ super(type(self), self).run(job_id, dbuser)
class FindMissingReady:
def __init__(self, job_source):
- from lp.services.job.celeryjob import CeleryRunJob
from lazr.jobrunner.celerytask import list_queued
self.job_source = job_source
- self.queue_contents = list_queued(CeleryRunJob.app,
- [job_source.task_queue])
+ self.queue_contents = list_queued(celery_app, [job_source.task_queue])
self.queued_job_ids = set(task[1][0][0] for task in
self.queue_contents)
@@ -93,40 +105,46 @@
return FindMissingReady(job_source).find_missing_ready()
-class RunMissingReady(Task):
- """Task to run any jobs that are ready but not scheduled.
-
- Currently supports only BranchScanJob.
- :param _no_init: For tests. If True, do not perform the initialization.
- """
- ignore_result = True
-
- def run(self, _no_init=False):
- if not _no_init:
- task_init('run_missing_ready')
- with TransactionFreeOperation():
- count = 0
- for job in find_missing_ready(BranchScanJob):
- if not celery_enabled(job.__class__.__name__):
- continue
- job.celeryCommitHook(True)
- count += 1
- info('Scheduled %d missing jobs.', count)
- transaction.commit()
+class PrefixedTask(Task):
+ """A Task with more informative task_id defaults."""
+
+ task_id_prefix = None
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
- link=None, link_error=None, **options):
+ link=None, link_error=None, shadow=None, **options):
"""Create a task_id if none is specified.
Override the quite generic default task_id with one containing
- the class name.
+ the task_id_prefix.
See also `celery.task.Task.apply_async()`.
"""
- if task_id is None:
- task_id = '%s_%s' % (self.__class__.__name__, uuid4())
- return super(RunMissingReady, self).apply_async(
- args, kwargs, task_id, producer, link, link_error, **options)
+ if task_id is None and self.task_id_prefix is not None:
+ task_id = '%s_%s' % (self.task_id_prefix, uuid4())
+ return super(PrefixedTask, self).apply_async(
+ args=args, kwargs=kwargs, task_id=task_id, producer=producer,
+ link=link, link_error=link_error, shadow=shadow, **options)
+
+
+@celery_app.task(
+ base=PrefixedTask, task_id_prefix='RunMissingReady', ignore_result=True)
+def run_missing_ready(_no_init=False):
+ """Task to run any jobs that are ready but not scheduled.
+
+ Currently supports only BranchScanJob.
+ :param _no_init: For tests. If True, do not perform the initialization.
+ """
+ if not _no_init:
+ task_init('run_missing_ready')
+ with TransactionFreeOperation():
+ count = 0
+ for job in find_missing_ready(BranchScanJob):
+ if not celery_enabled(job.__class__.__name__):
+ continue
+ job.celeryCommitHook(True)
+ count += 1
+ info('Scheduled %d missing jobs.', count)
+ transaction.commit()
needs_zcml = True
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2018-03-30 20:42:14 +0000
+++ lib/lp/services/job/runner.py 2019-03-22 19:20:45 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2019 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Facilities for running Jobs."""
@@ -237,11 +237,13 @@
# Avoid importing from lp.services.job.celeryjob where not needed, to
# avoid configuring Celery when Rabbit is not configured.
from lp.services.job.celeryjob import (
- CeleryRunJob, CeleryRunJobIgnoreResult)
+ celery_run_job,
+ celery_run_job_ignore_result,
+ )
if ignore_result:
- cls = CeleryRunJobIgnoreResult
+ task = celery_run_job_ignore_result
else:
- cls = CeleryRunJob
+ task = celery_run_job
db_class = self.getDBClass()
ujob_id = (self.job_id, db_class.__module__, db_class.__name__)
eta = self.job.scheduled_start
@@ -250,7 +252,7 @@
if (self.job.lease_expires is not None
and (eta is None or eta < self.job.lease_expires)):
eta = self.job.lease_expires
- return cls.apply_async(
+ return task.apply_async(
(ujob_id, self.config.dbuser), queue=self.task_queue, eta=eta,
soft_time_limit=self.soft_time_limit.total_seconds(),
task_id=self.taskId())
=== modified file 'lib/lp/services/job/tests/__init__.py'
--- lib/lp/services/job/tests/__init__.py 2019-03-08 14:06:04 +0000
+++ lib/lp/services/job/tests/__init__.py 2019-03-22 19:20:45 +0000
@@ -28,9 +28,9 @@
The "celery worker" instance will be configured to use the
currently-configured BROKER_URL, and able to run CeleryRunJob tasks.
"""
- from lp.services.job.celeryjob import CeleryRunJob
+ from lp.services.job.celeryjob import celery_app
# convert config params to a URL, so they can be passed as --broker.
- with CeleryRunJob.app.broker_connection() as connection:
+ with celery_app.broker_connection() as connection:
broker_uri = connection.as_uri(include_password=True)
cmd_args = (
'worker',
@@ -89,8 +89,8 @@
def drain_celery_queues():
from lazr.jobrunner.celerytask import drain_queues
- from lp.services.job.celeryjob import CeleryRunJob
- drain_queues(CeleryRunJob.app, CeleryRunJob.app.conf.CELERY_QUEUES.keys())
+ from lp.services.job.celeryjob import celery_app
+ drain_queues(celery_app, celery_app.conf.CELERY_QUEUES.keys())
def pop_remote_notifications():
=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
--- lib/lp/services/job/tests/test_celery_configuration.py 2015-10-12 13:16:54 +0000
+++ lib/lp/services/job/tests/test_celery_configuration.py 2019-03-22 19:20:45 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012-2015 Canonical Ltd. This software is licensed under the
+# Copyright 2012-2019 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
from contextlib import contextmanager
@@ -32,21 +32,20 @@
'branch_write_job', 'branch_write_job_slow',
'bzrsyncd_job', 'bzrsyncd_job_slow', 'celerybeat',
'launchpad_job', 'launchpad_job_slow']
- queues = config['CELERY_QUEUES']
+ queues = config['task_queues']
self.assertEqual(queue_names, sorted(queues))
for name in queue_names:
- self.assertEqual(name, queues[name]['binding_key'])
+ self.assertEqual(name, queues[name]['routing_key'])
# The port changes between test runs.
self.assertThat(
- config['BROKER_URL'],
+ config['broker_url'],
MatchesRegex(r'amqp://guest:guest@localhost:\d+//\Z'))
- self.assertFalse(config['CELERY_CREATE_MISSING_QUEUES'])
- self.assertEqual('job', config['CELERY_DEFAULT_EXCHANGE'])
- self.assertEqual('launchpad_job', config['CELERY_DEFAULT_QUEUE'])
- self.assertEqual(
- ('lp.services.job.celeryjob', ), config['CELERY_IMPORTS'])
- self.assertEqual('amqp', config['CELERY_RESULT_BACKEND'])
+ self.assertFalse(config['task_create_missing_queues'])
+ self.assertEqual('job', config['task_default_exchange'])
+ self.assertEqual('launchpad_job', config['task_default_queue'])
+ self.assertEqual(('lp.services.job.celeryjob', ), config['imports'])
+ self.assertEqual('amqp', config['result_backend'])
def test_app_server_configuration(self):
from lp.services.job.celeryconfig import configure
@@ -55,10 +54,8 @@
def check_job_specific_celery_worker_configuration(self, expected, config):
self.check_default_common_parameters(config)
- self.assertEqual(
- expected['concurrency'], config['CELERYD_CONCURRENCY'])
- self.assertEqual(
- expected['timeout'], config['CELERYD_TASK_SOFT_TIME_LIMIT'])
+ self.assertEqual(expected['concurrency'], config['worker_concurrency'])
+ self.assertEqual(expected['timeout'], config['task_soft_time_limit'])
self.assertEqual(
expected['fallback'], config.get('FALLBACK', None))
@@ -151,9 +148,3 @@
self.assertRaisesWithContent(
ConfigurationError, error, configure,
self.command + ['--queue=foo'])
-
-
-class TestCelerydConfiguration(TestCeleryWorkerConfiguration):
- """Test behaviour with legacy "celeryd" command name."""
-
- command = ['celeryd']
=== modified file 'lib/lp/services/job/tests/test_celeryjob.py'
--- lib/lp/services/job/tests/test_celeryjob.py 2015-10-12 13:16:54 +0000
+++ lib/lp/services/job/tests/test_celeryjob.py 2019-03-22 19:20:45 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012-2015 Canonical Ltd. This software is licensed under the
+# Copyright 2012-2019 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
from cStringIO import StringIO
@@ -29,13 +29,11 @@
def setUp(self):
super(TestRunMissingJobs, self).setUp()
from lp.services.job.celeryjob import (
- CeleryRunJob,
find_missing_ready,
- RunMissingReady,
- )
- self.CeleryRunJob = CeleryRunJob
+ run_missing_ready,
+ )
self.find_missing_ready = find_missing_ready
- self.RunMissingReady = RunMissingReady
+ self.run_missing_ready = run_missing_ready
def createMissingJob(self):
job = BranchScanJob.create(self.factory.makeBranch())
@@ -108,7 +106,7 @@
with monitor_celery() as responses:
with dbuser('run_missing_ready'):
with TransactionFreeOperation.require():
- self.RunMissingReady().run(_no_init=True)
+ self.run_missing_ready.run(_no_init=True)
self.assertEqual([], responses)
def test_run_missing_ready(self):
@@ -119,7 +117,7 @@
with monitor_celery() as responses:
with dbuser('run_missing_ready'):
with TransactionFreeOperation.require():
- self.RunMissingReady().run(_no_init=True)
+ self.run_missing_ready.run(_no_init=True)
self.assertEqual(1, len(responses))
def test_run_missing_ready_does_not_return_results(self):
@@ -127,7 +125,7 @@
result queue."""
from lp.services.job.tests.celery_helpers import noop
job_queue_name = 'celerybeat'
- request = self.RunMissingReady().apply_async(
+ request = self.run_missing_ready.apply_async(
kwargs={'_no_init': True}, queue=job_queue_name)
self.assertTrue(request.task_id.startswith('RunMissingReady_'))
result_queue_name = request.task_id.replace('-', '')
@@ -136,15 +134,15 @@
# This would also happen when "with celery_worker()" would do nothing.
# So let's be sure that a task is queued...
# Give the system some time to deliver the message
- self.assertQueueSize(self.RunMissingReady.app, [job_queue_name], 1)
+ self.assertQueueSize(self.run_missing_ready.app, [job_queue_name], 1)
# Wait at most 60 seconds for "celery worker" to start and process
# the task.
with celery_worker(job_queue_name):
# Due to FIFO ordering, this will only return after
- # RunMissingReady has finished.
+ # run_missing_ready has finished.
noop.apply_async(queue=job_queue_name).wait(60)
# But now the message has been consumed by "celery worker".
- self.assertQueueSize(self.RunMissingReady.app, [job_queue_name], 0)
+ self.assertQueueSize(self.run_missing_ready.app, [job_queue_name], 0)
# No result queue was created for the task.
try:
real_stdout = sys.stdout
=== modified file 'lib/lp/services/messaging/rabbit.py'
--- lib/lp/services/messaging/rabbit.py 2018-05-12 14:14:33 +0000
+++ lib/lp/services/messaging/rabbit.py 2019-03-22 19:20:45 +0000
@@ -1,4 +1,4 @@
-# Copyright 2011 Canonical Ltd. This software is licensed under the
+# Copyright 2011-2019 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""An API for messaging systems in Launchpad, e.g. RabbitMQ."""
@@ -19,7 +19,7 @@
import threading
import time
-from amqplib import client_0_8 as amqp
+import amqp
import transaction
from transaction._transaction import Status as TransactionStatus
from zope.interface import implementer
@@ -73,10 +73,12 @@
"""
if not is_configured():
raise MessagingUnavailable("Incomplete configuration")
- return amqp.Connection(
+ connection = amqp.Connection(
host=config.rabbitmq.host, userid=config.rabbitmq.userid,
password=config.rabbitmq.password,
- virtual_host=config.rabbitmq.virtual_host, insist=False)
+ virtual_host=config.rabbitmq.virtual_host)
+ connection.connect()
+ return connection
@implementer(IMessageSession)
@@ -97,9 +99,7 @@
@property
def is_connected(self):
"""See `IMessageSession`."""
- return (
- self._connection is not None and
- self._connection.transport is not None)
+ return self._connection is not None and self._connection.connected
def connect(self):
"""See `IMessageSession`.
@@ -107,7 +107,7 @@
Open a connection for this thread if necessary. Connections cannot be
shared between threads.
"""
- if self._connection is None or self._connection.transport is None:
+ if self._connection is None or not self._connection.connected:
self._connection = connect()
return self._connection
@@ -281,8 +281,8 @@
else:
self.channel.basic_ack(message.delivery_tag)
return json.loads(message.body)
- except amqp.AMQPChannelException as error:
- if error.amqp_reply_code == 404:
+ except amqp.ChannelError as error:
+ if error.reply_code == 404:
raise QueueNotFound()
else:
raise
=== modified file 'lib/lp/testing/fixture.py'
--- lib/lp/testing/fixture.py 2018-05-12 14:14:33 +0000
+++ lib/lp/testing/fixture.py 2019-03-22 19:20:45 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2019 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Launchpad test fixtures that have no better home."""
@@ -21,7 +21,7 @@
import socket
import time
-import amqplib.client_0_8 as amqp
+import amqp
from fixtures import (
EnvironmentVariableFixture,
Fixture,
=== modified file 'lib/lp/testing/gpgkeys/__init__.py'
--- lib/lp/testing/gpgkeys/__init__.py 2018-05-06 08:52:34 +0000
+++ lib/lp/testing/gpgkeys/__init__.py 2019-03-22 19:20:45 +0000
@@ -28,7 +28,10 @@
from lp.registry.interfaces.gpg import IGPGKeySet
from lp.registry.interfaces.person import IPersonSet
-from lp.services.gpg.interfaces import IGPGHandler
+from lp.services.gpg.interfaces import (
+ get_gpgme_context,
+ IGPGHandler,
+ )
gpgkeysdir = os.path.join(os.path.dirname(__file__), 'data')
@@ -127,11 +130,7 @@
if isinstance(content, unicode):
raise TypeError('Content cannot be Unicode.')
- # setup context
- ctx = gpgme.Context()
- # Stick to GnuPG 1.
- ctx.set_engine_info(gpgme.PROTOCOL_OpenPGP, "/usr/bin/gpg", None)
- ctx.armor = True
+ ctx = get_gpgme_context()
# setup containers
cipher = StringIO(content)
=== modified file 'lib/lp/testing/tests/test_layers_functional.py'
--- lib/lp/testing/tests/test_layers_functional.py 2018-05-12 14:14:33 +0000
+++ lib/lp/testing/tests/test_layers_functional.py 2019-03-22 19:20:45 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2015 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2019 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
from __future__ import with_statement
@@ -19,7 +19,7 @@
from urllib import urlopen
import uuid
-from amqplib import client_0_8 as amqp
+import amqp
from fixtures import (
EnvironmentVariableFixture,
Fixture,
@@ -277,8 +277,8 @@
host=rabbitmq.host,
userid=rabbitmq.userid,
password=rabbitmq.password,
- virtual_host=rabbitmq.virtual_host,
- insist=False)
+ virtual_host=rabbitmq.virtual_host)
+ conn.connect()
conn.close()
=== modified file 'lib/lp_sitecustomize.py'
--- lib/lp_sitecustomize.py 2018-05-12 14:14:33 +0000
+++ lib/lp_sitecustomize.py 2019-03-22 19:20:45 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2017 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2019 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
# This file is imported by _pythonpath.py and by the standard Launchpad
@@ -70,11 +70,11 @@
logger.parent = new_root
-def silence_amqplib_logger():
- """Install the NullHandler on the amqplib logger to silence logs."""
- amqplib_logger = logging.getLogger('amqplib')
- amqplib_logger.addHandler(logging.NullHandler())
- amqplib_logger.propagate = False
+def silence_amqp_logger():
+ """Install the NullHandler on the amqp logger to silence logs."""
+ amqp_logger = logging.getLogger('amqp')
+ amqp_logger.addHandler(logging.NullHandler())
+ amqp_logger.propagate = False
def silence_bzr_logger():
@@ -153,7 +153,7 @@
This function is also invoked by the test infrastructure to reset
logging between tests.
"""
- silence_amqplib_logger()
+ silence_amqp_logger()
silence_bzr_logger()
silence_zcml_logger()
silence_transaction_logger()
=== modified file 'utilities/make-lp-user'
--- utilities/make-lp-user 2019-02-23 07:41:11 +0000
+++ utilities/make-lp-user 2019-03-22 19:20:45 +0000
@@ -46,6 +46,7 @@
from lp.registry.interfaces.ssh import ISSHKeySet
from lp.registry.interfaces.teammembership import TeamMembershipStatus
from lp.services.gpg.interfaces import (
+ get_gpg_path,
GPGKeyAlgorithm,
IGPGHandler,
)
@@ -138,7 +139,7 @@
# Prevent translated gpg output from messing up our parsing.
env['LC_ALL'] = 'C'
- command_line = ['gpg'] + arguments
+ command_line = [get_gpg_path()] + arguments
pipe = subprocess.Popen(
command_line, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = pipe.communicate()
References