← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/launchpad/bionic-gpg into lp:launchpad

 

Colin Watson has proposed merging lp:~cjwatson/launchpad/bionic-gpg into lp:launchpad.

Commit message:
Fix GPG handling to support bionic.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/bionic-gpg/+merge/364982
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/bionic-gpg into lp:launchpad.
=== modified file 'constraints.txt'
--- constraints.txt	2019-01-21 17:45:55 +0000
+++ constraints.txt	2019-03-22 19:20:45 +0000
@@ -217,8 +217,7 @@
 # post1 Don't add a process back to the ready set if it received an error
 # such as a timeout.
 ampoule==0.2.0.post1
-amqp==1.4.9
-amqplib==1.0.2
+amqp==2.4.2
 anyjson==0.3.3
 appdirs==1.4.3
 asn1crypto==0.23.0
@@ -231,10 +230,10 @@
 backports.lzma==0.0.3
 bcrypt==3.1.4
 BeautifulSoup==3.2.1
-billiard==3.3.0.23
+billiard==3.5.0.5
 bson==0.3.3
 bzr==2.6.0.lp.3
-celery==3.1.26.post2
+celery==4.1.1
 cffi==1.11.2
 Chameleon==2.11
 chardet==3.0.4
@@ -273,7 +272,7 @@
 iso8601==0.1.12
 jsautobuild==0.2
 keyring==0.6.2
-kombu==3.0.37
+kombu==4.4.0
 launchpad-buildd==159
 launchpadlib==1.10.5
 lazr.authentication==0.1.1
@@ -281,7 +280,7 @@
 lazr.config==2.2.1
 lazr.delegates==2.0.4
 lazr.enum==1.1.3
-lazr.jobrunner==0.13
+lazr.jobrunner==0.14
 lazr.lifecycle==1.1
 lazr.restful==0.20.1
 lazr.restfulclient==0.13.2
@@ -302,7 +301,7 @@
 netaddr==0.7.19
 oauth==1.0
 oops==0.0.13
-oops-amqp==0.0.8b1
+oops-amqp==0.1.0
 oops-datedir-repo==0.0.23
 oops-timeline==0.0.1
 oops-twisted==0.0.7
@@ -338,7 +337,7 @@
 python-openid==2.2.5-fix1034376
 python-swiftclient==2.0.3
 PyYAML==3.10
-rabbitfixture==0.3.6
+rabbitfixture==0.4.0
 requests==2.7.0
 requests-file==1.4.3
 requests-toolbelt==0.6.2
@@ -371,6 +370,7 @@
 typing==3.6.2
 unittest2==1.1.0
 van.testing==3.0.0
+vine==1.1.4
 virtualenv-tools3==2.0.0
 wadllib==1.3.2
 wheel==0.29.0

=== modified file 'lib/lp/services/gpg/doc/gpg-signatures.txt'
--- lib/lp/services/gpg/doc/gpg-signatures.txt	2019-03-07 15:05:17 +0000
+++ lib/lp/services/gpg/doc/gpg-signatures.txt	2019-03-22 19:20:45 +0000
@@ -168,8 +168,12 @@
     >>> gpghandler.getVerifiedSignature(content)
     Traceback (most recent call last):
     ...
+<<<<<<< TREE
     GPGKeyDoesNotExistOnServer: GPG key E192C0543B1BB2EB does not exist on the
     keyserver.
+=======
+    GPGVerificationError: (..., 9, u'No public key')
+>>>>>>> MERGE-SOURCE
 
 Due to unpredictable behaviour between the production system and
 the external keyserver, we have a resilient signature verifier,
@@ -183,9 +187,15 @@
     Traceback (most recent call last):
     ...
     GPGVerificationError: Verification failed 3 times:
+<<<<<<< TREE
     ['GPG key E192C0543B1BB2EB does not exist on the keyserver.',
      'GPG key E192C0543B1BB2EB does not exist on the keyserver.',
      'GPG key E192C0543B1BB2EB does not exist on the keyserver.']
+=======
+    ["(..., 9, u'No public key')",
+     "(..., 9, u'No public key')",
+     "(..., 9, u'No public key')"]
+>>>>>>> MERGE-SOURCE
 
 
 Debugging exceptions

=== modified file 'lib/lp/services/gpg/handler.py'
--- lib/lp/services/gpg/handler.py	2019-03-18 10:12:37 +0000
+++ lib/lp/services/gpg/handler.py	2019-03-22 19:20:45 +0000
@@ -31,6 +31,8 @@
 from lp.app.validators.email import valid_email
 from lp.services.config import config
 from lp.services.gpg.interfaces import (
+    get_gpg_path,
+    get_gpgme_context,
     GPGKeyAlgorithm,
     GPGKeyDoesNotExistOnServer,
     GPGKeyExpired,
@@ -116,14 +118,6 @@
 
         atexit.register(removeHome, self.home)
 
-    def _getContext(self):
-        """Return a new appropriately-configured GPGME context."""
-        context = gpgme.Context()
-        # Stick to GnuPG 1.
-        context.set_engine_info(gpgme.PROTOCOL_OpenPGP, "/usr/bin/gpg", None)
-        context.armor = True
-        return context
-
     def sanitizeFingerprint(self, fingerprint):
         """See IGPGHandler."""
         return sanitize_fingerprint(fingerprint)
@@ -162,6 +156,7 @@
         raise GPGVerificationError(
             "Verification failed 3 times: %s " % stored_errors)
 
+<<<<<<< TREE
     def _rawVerifySignature(self, ctx, content, signature=None):
         """Internals of `getVerifiedSignature`.
 
@@ -169,6 +164,16 @@
         the correct fingerprint, and once after retrieving the corresponding
         key from the keyserver.
         """
+=======
+    def getVerifiedSignature(self, content, signature=None):
+        """See IGPGHandler."""
+
+        assert not isinstance(content, unicode)
+        assert not isinstance(signature, unicode)
+
+        ctx = get_gpgme_context()
+
+>>>>>>> MERGE-SOURCE
         # from `info gpgme` about gpgme_op_verify(SIG, SIGNED_TEXT, PLAIN):
         #
         # If SIG is a detached signature, then the signed text should be
@@ -273,7 +278,7 @@
     def importPublicKey(self, content):
         """See IGPGHandler."""
         assert isinstance(content, str)
-        context = self._getContext()
+        context = get_gpgme_context()
 
         newkey = StringIO(content)
         with gpgme_timeline("import", "new public key"):
@@ -308,7 +313,7 @@
         if 'GPG_AGENT_INFO' in os.environ:
             del os.environ['GPG_AGENT_INFO']
 
-        context = self._getContext()
+        context = get_gpgme_context()
         newkey = StringIO(content)
         with gpgme_timeline("import", "new secret key"):
             import_result = context.import_(newkey)
@@ -334,7 +339,7 @@
 
     def generateKey(self, name):
         """See `IGPGHandler`."""
-        context = self._getContext()
+        context = get_gpgme_context()
 
         # Make sure that gpg-agent doesn't interfere.
         if 'GPG_AGENT_INFO' in os.environ:
@@ -372,8 +377,7 @@
         if isinstance(content, unicode):
             raise TypeError('Content cannot be Unicode.')
 
-        # setup context
-        ctx = self._getContext()
+        ctx = get_gpgme_context()
 
         # setup containers
         plain = StringIO(content)
@@ -404,7 +408,7 @@
 
         # Find the key and make it the only one allowed to sign content
         # during this session.
-        context = self._getContext()
+        context = get_gpgme_context()
         context.signers = [removeSecurityProxy(key.key)]
 
         # Set up containers.
@@ -432,7 +436,7 @@
         """Get an iterator of the keys this gpg handler
         already knows about.
         """
-        ctx = self._getContext()
+        ctx = get_gpgme_context()
 
         # XXX michaeln 2010-05-07 bug=576405
         # Currently gpgme.Context().keylist fails if passed a unicode
@@ -457,8 +461,13 @@
         if not key.exists_in_local_keyring:
             pubkey = self._getPubKey(fingerprint)
             key = self.importPublicKey(pubkey)
+<<<<<<< TREE
             if not key.matches(fingerprint):
                 ctx = self._getContext()
+=======
+            if fingerprint != key.fingerprint:
+                ctx = get_gpgme_context()
+>>>>>>> MERGE-SOURCE
                 with gpgme_timeline("delete", key.fingerprint):
                     ctx.delete(key.key)
                 raise GPGKeyMismatchOnServer(fingerprint, key.fingerprint)
@@ -602,17 +611,9 @@
         self._buildFromGpgmeKey(key)
         return self
 
-    def _getContext(self):
-        """Return a new appropriately-configured GPGME context."""
-        context = gpgme.Context()
-        # Stick to GnuPG 1.
-        context.set_engine_info(gpgme.PROTOCOL_OpenPGP, "/usr/bin/gpg", None)
-        context.armor = True
-        return context
-
     def _buildFromFingerprint(self, fingerprint):
         """Build key information from a fingerprint."""
-        context = self._getContext()
+        context = get_gpgme_context()
         # retrive additional key information
         try:
             with gpgme_timeline("get-key", fingerprint):
@@ -658,11 +659,12 @@
             # XXX cprov 20081014: gpgme_op_export() only supports public keys.
             # See http://www.fifi.org/cgi-bin/info2www?(gpgme)Exporting+Keys
             p = subprocess.Popen(
-                ['gpg', '--export-secret-keys', '-a', self.fingerprint],
+                [get_gpg_path(), '--export-secret-keys', '-a',
+                 self.fingerprint],
                 stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
             return p.stdout.read()
 
-        context = self._getContext()
+        context = get_gpgme_context()
         keydata = StringIO()
         with gpgme_timeline("export", self.fingerprint):
             context.export(self.fingerprint.encode('ascii'), keydata)

=== modified file 'lib/lp/services/gpg/interfaces.py'
--- lib/lp/services/gpg/interfaces.py	2019-03-18 10:12:37 +0000
+++ lib/lp/services/gpg/interfaces.py	2019-03-22 19:20:45 +0000
@@ -2,6 +2,8 @@
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 __all__ = [
+    'get_gpg_path',
+    'get_gpgme_context',
     'GPGKeyAlgorithm',
     'GPGKeyDoesNotExistOnServer',
     'GPGKeyExpired',
@@ -22,6 +24,7 @@
     ]
 
 import httplib
+import os.path
 import re
 
 from lazr.enum import (
@@ -56,6 +59,28 @@
         return False
 
 
+def get_gpg_path():
+    """Return the path to the GPG executable we prefer.
+
+    We stick to GnuPG 1 until we've worked out how to get things working
+    with GnuPG 2.
+    """
+    if os.path.exists("/usr/bin/gpg1"):
+        return "/usr/bin/gpg1"
+    else:
+        return "/usr/bin/gpg"
+
+
+def get_gpgme_context():
+    """Return a new appropriately-configured GPGME context."""
+    import gpgme
+
+    context = gpgme.Context()
+    context.set_engine_info(gpgme.PROTOCOL_OpenPGP, get_gpg_path(), None)
+    context.armor = True
+    return context
+
+
 # XXX: cprov 2004-10-04:
 # (gpg+dbschema) the data structure should be rearranged to support 4 field
 # needed: keynumber(1,16,17,20), keyalias(R,g,D,G), title and description

=== modified file 'lib/lp/services/gpg/tests/test_gpghandler.py'
--- lib/lp/services/gpg/tests/test_gpghandler.py	2019-03-18 10:12:37 +0000
+++ lib/lp/services/gpg/tests/test_gpghandler.py	2019-03-22 19:20:45 +0000
@@ -12,6 +12,7 @@
 from zope.security.proxy import removeSecurityProxy
 
 from lp.services.gpg.interfaces import (
+    get_gpg_path,
     GPGKeyDoesNotExistOnServer,
     GPGKeyMismatchOnServer,
     GPGKeyTemporarilyNotFoundError,
@@ -327,7 +328,8 @@
             # plumbing.
             with open(os.devnull, "w") as devnull:
                 gpg_proc = subprocess.Popen(
-                    ["gpg", "--quiet", "--status-fd", "1", "--verify"],
+                    [get_gpg_path(), "--quiet", "--status-fd", "1",
+                     "--verify"],
                     stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                     stderr=devnull, universal_newlines=True)
             status = gpg_proc.communicate(signed_content)[0].splitlines()

=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py	2019-03-06 14:52:01 +0000
+++ lib/lp/services/job/celeryconfig.py	2019-03-22 19:20:45 +0000
@@ -14,7 +14,7 @@
 
 
 def check_circular_fallbacks(queue):
-    """Check for curcular fallback queues.
+    """Check for circular fallback queues.
 
     A circular chain of fallback queues could keep a job forever queued
     if it times out in all queues.
@@ -41,7 +41,7 @@
     queue_names = queue_names.split(' ')
     for queue_name in queue_names:
         celery_queues[queue_name] = {
-            'binding_key': queue_name,
+            'routing_key': queue_name,
             }
         check_circular_fallbacks(queue_name)
 
@@ -52,7 +52,7 @@
     # A queue must be specified as a command line parameter for each
     # "celery worker" instance, but this is not required for a Launchpad app
     # server.
-    if 'celeryd' in argv[0] or ('celery' in argv[0] and argv[1] == 'worker'):
+    if 'celery' in argv[0] and argv[1] == 'worker':
         if queues is None or queues == '':
             raise ConfigurationError('A queue must be specified.')
         queues = queues.split(',')
@@ -68,7 +68,7 @@
                 'Queue %s is not configured in schema-lazr.conf' % queue)
         # XXX wgrant 2015-08-03: This should be set in the apply_async
         # now that we're on Celery 3.1.
-        result['CELERYD_TASK_SOFT_TIME_LIMIT'] = config[queue].timeout
+        result['task_soft_time_limit'] = config[queue].timeout
         if config[queue].fallback_queue != '':
             # XXX wgrant 2015-08-03: lazr.jobrunner actually looks for
             # FALLBACK_QUEUE; this probably isn't doing anything.
@@ -76,42 +76,34 @@
         # XXX wgrant 2015-08-03: This is mostly per-queue because we
         # can't run *_job and *_job_slow in the same worker, which will be
         # fixed once the CELERYD_TASK_SOFT_TIME_LIMIT override is gone.
-        result['CELERYD_CONCURRENCY'] = config[queue].concurrency
+        result['worker_concurrency'] = config[queue].concurrency
 
-    result['BROKER_URL'] = 'amqp://%s:%s@%s/%s' % (
+    result['broker_url'] = 'amqp://%s:%s@%s/%s' % (
         config.rabbitmq.userid, config.rabbitmq.password,
         config.rabbitmq.host, config.rabbitmq.virtual_host)
-    # XXX wgrant 2015-08-03: Celery 3.2 won't read pickles by default,
-    # and Celery 3.1 can send only pickles for some things. Let's accept
-    # both until they sort things out.
-    # XXX cjwatson 2019-03-06: Remove this once production is using json as
-    # its task/result serialiser.
-    result['CELERY_ACCEPT_CONTENT'] = ['pickle', 'json']
-    result['CELERY_CREATE_MISSING_QUEUES'] = False
-    result['CELERY_DEFAULT_EXCHANGE'] = 'job'
-    result['CELERY_DEFAULT_QUEUE'] = 'launchpad_job'
-    result['CELERY_ENABLE_UTC'] = True
-    result['CELERY_IMPORTS'] = ("lp.services.job.celeryjob", )
-    result['CELERY_QUEUES'] = celery_queues
-    result['CELERY_RESULT_BACKEND'] = 'amqp'
-    result['CELERY_RESULT_SERIALIZER'] = 'json'
-    result['CELERY_TASK_SERIALIZER'] = 'json'
-    result['CELERYBEAT_SCHEDULE'] = {
+    result['beat_schedule'] = {
         'schedule-missing': {
             'task': 'lp.services.job.celeryjob.run_missing_ready',
             'schedule': timedelta(seconds=600),
             'options': {
                 'routing_key': CELERY_BEAT_QUEUE,
                 },
+            }
         }
-    }
+    result['enable_utc'] = True
+    result['imports'] = ("lp.services.job.celeryjob", )
+    result['result_backend'] = 'amqp'
+    result['task_acks_late'] = True
+    result['task_create_missing_queues'] = False
+    result['task_default_exchange'] = 'job'
+    result['task_default_queue'] = 'launchpad_job'
+    result['task_queues'] = celery_queues
     # See http://ask.github.com/celery/userguide/optimizing.html:
     # The AMQP message of a job should stay in the RabbitMQ server
     # until the job has been finished. This allows to simply kill
     # a "celery worker" instance while a job is executed; when another
     # instance is started later, it will run the aborted job again.
-    result['CELERYD_PREFETCH_MULTIPLIER'] = 1
-    result['CELERY_ACKS_LATE'] = True
+    result['worker_prefetch_multiplier'] = 1
 
     return result
 

=== modified file 'lib/lp/services/job/celeryjob.py'
--- lib/lp/services/job/celeryjob.py	2015-08-03 07:06:45 +0000
+++ lib/lp/services/job/celeryjob.py	2019-03-22 19:20:45 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# Copyright 2012-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Celery-specific Job code.
@@ -10,8 +10,11 @@
 __metaclass__ = type
 
 __all__ = [
-    'CeleryRunJob',
-    'CeleryRunJobIgnoreResult',
+    'celery_app',
+    'celery_run_job',
+    'celery_run_job_ignore_result',
+    'find_missing_ready',
+    'run_missing_ready',
     ]
 
 from logging import info
@@ -20,7 +23,10 @@
 
 
 os.environ.setdefault('CELERY_CONFIG_MODULE', 'lp.services.job.celeryconfig')
-from celery.task import Task
+from celery import (
+    Celery,
+    Task,
+    )
 from lazr.jobrunner.celerytask import RunJob
 from storm.zope.interfaces import IZStorm
 import transaction
@@ -46,6 +52,9 @@
 from lp.services import scripts
 
 
+celery_app = Celery()
+
+
 class CeleryRunJob(RunJob):
     """The Celery Task that runs a job."""
 
@@ -67,19 +76,22 @@
         super(CeleryRunJob, self).run(job_id)
 
 
-class CeleryRunJobIgnoreResult(CeleryRunJob):
-
-    ignore_result = True
+@celery_app.task(base=CeleryRunJob, bind=True)
+def celery_run_job(self, job_id, dbuser):
+    super(type(self), self).run(job_id, dbuser)
+
+
+@celery_app.task(base=CeleryRunJob, bind=True, ignore_result=True)
+def celery_run_job_ignore_result(self, job_id, dbuser):
+    super(type(self), self).run(job_id, dbuser)
 
 
 class FindMissingReady:
 
     def __init__(self, job_source):
-        from lp.services.job.celeryjob import CeleryRunJob
         from lazr.jobrunner.celerytask import list_queued
         self.job_source = job_source
-        self.queue_contents = list_queued(CeleryRunJob.app,
-                                          [job_source.task_queue])
+        self.queue_contents = list_queued(celery_app, [job_source.task_queue])
         self.queued_job_ids = set(task[1][0][0] for task in
                                   self.queue_contents)
 
@@ -93,40 +105,46 @@
     return FindMissingReady(job_source).find_missing_ready()
 
 
-class RunMissingReady(Task):
-    """Task to run any jobs that are ready but not scheduled.
-
-    Currently supports only BranchScanJob.
-    :param _no_init: For tests.  If True, do not perform the initialization.
-    """
-    ignore_result = True
-
-    def run(self, _no_init=False):
-        if not _no_init:
-            task_init('run_missing_ready')
-        with TransactionFreeOperation():
-            count = 0
-            for job in find_missing_ready(BranchScanJob):
-                if not celery_enabled(job.__class__.__name__):
-                    continue
-                job.celeryCommitHook(True)
-                count += 1
-            info('Scheduled %d missing jobs.', count)
-            transaction.commit()
+class PrefixedTask(Task):
+    """A Task with more informative task_id defaults."""
+
+    task_id_prefix = None
 
     def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
-                    link=None, link_error=None, **options):
+                    link=None, link_error=None, shadow=None, **options):
         """Create a task_id if none is specified.
 
         Override the quite generic default task_id with one containing
-        the class name.
+        the task_id_prefix.
 
         See also `celery.task.Task.apply_async()`.
         """
-        if task_id is None:
-            task_id = '%s_%s' % (self.__class__.__name__, uuid4())
-        return super(RunMissingReady, self).apply_async(
-            args, kwargs, task_id, producer, link, link_error, **options)
+        if task_id is None and self.task_id_prefix is not None:
+            task_id = '%s_%s' % (self.task_id_prefix, uuid4())
+        return super(PrefixedTask, self).apply_async(
+            args=args, kwargs=kwargs, task_id=task_id, producer=producer,
+            link=link, link_error=link_error, shadow=shadow, **options)
+
+
+@celery_app.task(
+    base=PrefixedTask, task_id_prefix='RunMissingReady', ignore_result=True)
+def run_missing_ready(_no_init=False):
+    """Task to run any jobs that are ready but not scheduled.
+
+    Currently supports only BranchScanJob.
+    :param _no_init: For tests.  If True, do not perform the initialization.
+    """
+    if not _no_init:
+        task_init('run_missing_ready')
+    with TransactionFreeOperation():
+        count = 0
+        for job in find_missing_ready(BranchScanJob):
+            if not celery_enabled(job.__class__.__name__):
+                continue
+            job.celeryCommitHook(True)
+            count += 1
+        info('Scheduled %d missing jobs.', count)
+        transaction.commit()
 
 
 needs_zcml = True

=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2018-03-30 20:42:14 +0000
+++ lib/lp/services/job/runner.py	2019-03-22 19:20:45 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Facilities for running Jobs."""
@@ -237,11 +237,13 @@
         # Avoid importing from lp.services.job.celeryjob where not needed, to
         # avoid configuring Celery when Rabbit is not configured.
         from lp.services.job.celeryjob import (
-            CeleryRunJob, CeleryRunJobIgnoreResult)
+            celery_run_job,
+            celery_run_job_ignore_result,
+            )
         if ignore_result:
-            cls = CeleryRunJobIgnoreResult
+            task = celery_run_job_ignore_result
         else:
-            cls = CeleryRunJob
+            task = celery_run_job
         db_class = self.getDBClass()
         ujob_id = (self.job_id, db_class.__module__, db_class.__name__)
         eta = self.job.scheduled_start
@@ -250,7 +252,7 @@
         if (self.job.lease_expires is not None
                 and (eta is None or eta < self.job.lease_expires)):
             eta = self.job.lease_expires
-        return cls.apply_async(
+        return task.apply_async(
             (ujob_id, self.config.dbuser), queue=self.task_queue, eta=eta,
             soft_time_limit=self.soft_time_limit.total_seconds(),
             task_id=self.taskId())

=== modified file 'lib/lp/services/job/tests/__init__.py'
--- lib/lp/services/job/tests/__init__.py	2019-03-08 14:06:04 +0000
+++ lib/lp/services/job/tests/__init__.py	2019-03-22 19:20:45 +0000
@@ -28,9 +28,9 @@
     The "celery worker" instance will be configured to use the
     currently-configured BROKER_URL, and able to run CeleryRunJob tasks.
     """
-    from lp.services.job.celeryjob import CeleryRunJob
+    from lp.services.job.celeryjob import celery_app
     # convert config params to a URL, so they can be passed as --broker.
-    with CeleryRunJob.app.broker_connection() as connection:
+    with celery_app.broker_connection() as connection:
         broker_uri = connection.as_uri(include_password=True)
     cmd_args = (
         'worker',
@@ -89,8 +89,8 @@
 
 def drain_celery_queues():
     from lazr.jobrunner.celerytask import drain_queues
-    from lp.services.job.celeryjob import CeleryRunJob
-    drain_queues(CeleryRunJob.app, CeleryRunJob.app.conf.CELERY_QUEUES.keys())
+    from lp.services.job.celeryjob import celery_app
+    drain_queues(celery_app, celery_app.conf.CELERY_QUEUES.keys())
 
 
 def pop_remote_notifications():

=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
--- lib/lp/services/job/tests/test_celery_configuration.py	2015-10-12 13:16:54 +0000
+++ lib/lp/services/job/tests/test_celery_configuration.py	2019-03-22 19:20:45 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012-2015 Canonical Ltd.  This software is licensed under the
+# Copyright 2012-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 from contextlib import contextmanager
@@ -32,21 +32,20 @@
             'branch_write_job', 'branch_write_job_slow',
             'bzrsyncd_job', 'bzrsyncd_job_slow', 'celerybeat',
             'launchpad_job', 'launchpad_job_slow']
-        queues = config['CELERY_QUEUES']
+        queues = config['task_queues']
         self.assertEqual(queue_names, sorted(queues))
         for name in queue_names:
-            self.assertEqual(name, queues[name]['binding_key'])
+            self.assertEqual(name, queues[name]['routing_key'])
 
         # The port changes between test runs.
         self.assertThat(
-            config['BROKER_URL'],
+            config['broker_url'],
             MatchesRegex(r'amqp://guest:guest@localhost:\d+//\Z'))
-        self.assertFalse(config['CELERY_CREATE_MISSING_QUEUES'])
-        self.assertEqual('job', config['CELERY_DEFAULT_EXCHANGE'])
-        self.assertEqual('launchpad_job', config['CELERY_DEFAULT_QUEUE'])
-        self.assertEqual(
-            ('lp.services.job.celeryjob', ), config['CELERY_IMPORTS'])
-        self.assertEqual('amqp', config['CELERY_RESULT_BACKEND'])
+        self.assertFalse(config['task_create_missing_queues'])
+        self.assertEqual('job', config['task_default_exchange'])
+        self.assertEqual('launchpad_job', config['task_default_queue'])
+        self.assertEqual(('lp.services.job.celeryjob', ), config['imports'])
+        self.assertEqual('amqp', config['result_backend'])
 
     def test_app_server_configuration(self):
         from lp.services.job.celeryconfig import configure
@@ -55,10 +54,8 @@
 
     def check_job_specific_celery_worker_configuration(self, expected, config):
         self.check_default_common_parameters(config)
-        self.assertEqual(
-            expected['concurrency'], config['CELERYD_CONCURRENCY'])
-        self.assertEqual(
-            expected['timeout'], config['CELERYD_TASK_SOFT_TIME_LIMIT'])
+        self.assertEqual(expected['concurrency'], config['worker_concurrency'])
+        self.assertEqual(expected['timeout'], config['task_soft_time_limit'])
         self.assertEqual(
             expected['fallback'], config.get('FALLBACK', None))
 
@@ -151,9 +148,3 @@
         self.assertRaisesWithContent(
             ConfigurationError, error, configure,
             self.command + ['--queue=foo'])
-
-
-class TestCelerydConfiguration(TestCeleryWorkerConfiguration):
-    """Test behaviour with legacy "celeryd" command name."""
-
-    command = ['celeryd']

=== modified file 'lib/lp/services/job/tests/test_celeryjob.py'
--- lib/lp/services/job/tests/test_celeryjob.py	2015-10-12 13:16:54 +0000
+++ lib/lp/services/job/tests/test_celeryjob.py	2019-03-22 19:20:45 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012-2015 Canonical Ltd.  This software is licensed under the
+# Copyright 2012-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 from cStringIO import StringIO
@@ -29,13 +29,11 @@
     def setUp(self):
         super(TestRunMissingJobs, self).setUp()
         from lp.services.job.celeryjob import (
-            CeleryRunJob,
             find_missing_ready,
-            RunMissingReady,
-        )
-        self.CeleryRunJob = CeleryRunJob
+            run_missing_ready,
+            )
         self.find_missing_ready = find_missing_ready
-        self.RunMissingReady = RunMissingReady
+        self.run_missing_ready = run_missing_ready
 
     def createMissingJob(self):
         job = BranchScanJob.create(self.factory.makeBranch())
@@ -108,7 +106,7 @@
         with monitor_celery() as responses:
             with dbuser('run_missing_ready'):
                 with TransactionFreeOperation.require():
-                    self.RunMissingReady().run(_no_init=True)
+                    self.run_missing_ready.run(_no_init=True)
         self.assertEqual([], responses)
 
     def test_run_missing_ready(self):
@@ -119,7 +117,7 @@
         with monitor_celery() as responses:
             with dbuser('run_missing_ready'):
                 with TransactionFreeOperation.require():
-                    self.RunMissingReady().run(_no_init=True)
+                    self.run_missing_ready.run(_no_init=True)
         self.assertEqual(1, len(responses))
 
     def test_run_missing_ready_does_not_return_results(self):
@@ -127,7 +125,7 @@
         result queue."""
         from lp.services.job.tests.celery_helpers import noop
         job_queue_name = 'celerybeat'
-        request = self.RunMissingReady().apply_async(
+        request = self.run_missing_ready.apply_async(
             kwargs={'_no_init': True}, queue=job_queue_name)
         self.assertTrue(request.task_id.startswith('RunMissingReady_'))
         result_queue_name = request.task_id.replace('-', '')
@@ -136,15 +134,15 @@
         # This would also happen when "with celery_worker()" would do nothing.
         # So let's be sure that a task is queued...
         # Give the system some time to deliver the message
-        self.assertQueueSize(self.RunMissingReady.app, [job_queue_name], 1)
+        self.assertQueueSize(self.run_missing_ready.app, [job_queue_name], 1)
         # Wait at most 60 seconds for "celery worker" to start and process
         # the task.
         with celery_worker(job_queue_name):
             # Due to FIFO ordering, this will only return after
-            # RunMissingReady has finished.
+            # run_missing_ready has finished.
             noop.apply_async(queue=job_queue_name).wait(60)
         # But now the message has been consumed by "celery worker".
-        self.assertQueueSize(self.RunMissingReady.app, [job_queue_name], 0)
+        self.assertQueueSize(self.run_missing_ready.app, [job_queue_name], 0)
         # No result queue was created for the task.
         try:
             real_stdout = sys.stdout

=== modified file 'lib/lp/services/messaging/rabbit.py'
--- lib/lp/services/messaging/rabbit.py	2018-05-12 14:14:33 +0000
+++ lib/lp/services/messaging/rabbit.py	2019-03-22 19:20:45 +0000
@@ -1,4 +1,4 @@
-# Copyright 2011 Canonical Ltd.  This software is licensed under the
+# Copyright 2011-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """An API for messaging systems in Launchpad, e.g. RabbitMQ."""
@@ -19,7 +19,7 @@
 import threading
 import time
 
-from amqplib import client_0_8 as amqp
+import amqp
 import transaction
 from transaction._transaction import Status as TransactionStatus
 from zope.interface import implementer
@@ -73,10 +73,12 @@
     """
     if not is_configured():
         raise MessagingUnavailable("Incomplete configuration")
-    return amqp.Connection(
+    connection = amqp.Connection(
         host=config.rabbitmq.host, userid=config.rabbitmq.userid,
         password=config.rabbitmq.password,
-        virtual_host=config.rabbitmq.virtual_host, insist=False)
+        virtual_host=config.rabbitmq.virtual_host)
+    connection.connect()
+    return connection
 
 
 @implementer(IMessageSession)
@@ -97,9 +99,7 @@
     @property
     def is_connected(self):
         """See `IMessageSession`."""
-        return (
-            self._connection is not None and
-            self._connection.transport is not None)
+        return self._connection is not None and self._connection.connected
 
     def connect(self):
         """See `IMessageSession`.
@@ -107,7 +107,7 @@
         Open a connection for this thread if necessary. Connections cannot be
         shared between threads.
         """
-        if self._connection is None or self._connection.transport is None:
+        if self._connection is None or not self._connection.connected:
             self._connection = connect()
         return self._connection
 
@@ -281,8 +281,8 @@
                 else:
                     self.channel.basic_ack(message.delivery_tag)
                     return json.loads(message.body)
-            except amqp.AMQPChannelException as error:
-                if error.amqp_reply_code == 404:
+            except amqp.ChannelError as error:
+                if error.reply_code == 404:
                     raise QueueNotFound()
                 else:
                     raise

=== modified file 'lib/lp/testing/fixture.py'
--- lib/lp/testing/fixture.py	2018-05-12 14:14:33 +0000
+++ lib/lp/testing/fixture.py	2019-03-22 19:20:45 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Launchpad test fixtures that have no better home."""
@@ -21,7 +21,7 @@
 import socket
 import time
 
-import amqplib.client_0_8 as amqp
+import amqp
 from fixtures import (
     EnvironmentVariableFixture,
     Fixture,

=== modified file 'lib/lp/testing/gpgkeys/__init__.py'
--- lib/lp/testing/gpgkeys/__init__.py	2018-05-06 08:52:34 +0000
+++ lib/lp/testing/gpgkeys/__init__.py	2019-03-22 19:20:45 +0000
@@ -28,7 +28,10 @@
 
 from lp.registry.interfaces.gpg import IGPGKeySet
 from lp.registry.interfaces.person import IPersonSet
-from lp.services.gpg.interfaces import IGPGHandler
+from lp.services.gpg.interfaces import (
+    get_gpgme_context,
+    IGPGHandler,
+    )
 
 
 gpgkeysdir = os.path.join(os.path.dirname(__file__), 'data')
@@ -127,11 +130,7 @@
     if isinstance(content, unicode):
         raise TypeError('Content cannot be Unicode.')
 
-    # setup context
-    ctx = gpgme.Context()
-    # Stick to GnuPG 1.
-    ctx.set_engine_info(gpgme.PROTOCOL_OpenPGP, "/usr/bin/gpg", None)
-    ctx.armor = True
+    ctx = get_gpgme_context()
 
     # setup containers
     cipher = StringIO(content)

=== modified file 'lib/lp/testing/tests/test_layers_functional.py'
--- lib/lp/testing/tests/test_layers_functional.py	2018-05-12 14:14:33 +0000
+++ lib/lp/testing/tests/test_layers_functional.py	2019-03-22 19:20:45 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2015 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 from __future__ import with_statement
@@ -19,7 +19,7 @@
 from urllib import urlopen
 import uuid
 
-from amqplib import client_0_8 as amqp
+import amqp
 from fixtures import (
     EnvironmentVariableFixture,
     Fixture,
@@ -277,8 +277,8 @@
                 host=rabbitmq.host,
                 userid=rabbitmq.userid,
                 password=rabbitmq.password,
-                virtual_host=rabbitmq.virtual_host,
-                insist=False)
+                virtual_host=rabbitmq.virtual_host)
+            conn.connect()
             conn.close()
 
 

=== modified file 'lib/lp_sitecustomize.py'
--- lib/lp_sitecustomize.py	2018-05-12 14:14:33 +0000
+++ lib/lp_sitecustomize.py	2019-03-22 19:20:45 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2017 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 # This file is imported by _pythonpath.py and by the standard Launchpad
@@ -70,11 +70,11 @@
             logger.parent = new_root
 
 
-def silence_amqplib_logger():
-    """Install the NullHandler on the amqplib logger to silence logs."""
-    amqplib_logger = logging.getLogger('amqplib')
-    amqplib_logger.addHandler(logging.NullHandler())
-    amqplib_logger.propagate = False
+def silence_amqp_logger():
+    """Install the NullHandler on the amqp logger to silence logs."""
+    amqp_logger = logging.getLogger('amqp')
+    amqp_logger.addHandler(logging.NullHandler())
+    amqp_logger.propagate = False
 
 
 def silence_bzr_logger():
@@ -153,7 +153,7 @@
     This function is also invoked by the test infrastructure to reset
     logging between tests.
     """
-    silence_amqplib_logger()
+    silence_amqp_logger()
     silence_bzr_logger()
     silence_zcml_logger()
     silence_transaction_logger()

=== modified file 'utilities/make-lp-user'
--- utilities/make-lp-user	2019-02-23 07:41:11 +0000
+++ utilities/make-lp-user	2019-03-22 19:20:45 +0000
@@ -46,6 +46,7 @@
 from lp.registry.interfaces.ssh import ISSHKeySet
 from lp.registry.interfaces.teammembership import TeamMembershipStatus
 from lp.services.gpg.interfaces import (
+    get_gpg_path,
     GPGKeyAlgorithm,
     IGPGHandler,
     )
@@ -138,7 +139,7 @@
     # Prevent translated gpg output from messing up our parsing.
     env['LC_ALL'] = 'C'
 
-    command_line = ['gpg'] + arguments
+    command_line = [get_gpg_path()] + arguments
     pipe = subprocess.Popen(
         command_line, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
     stdout, stderr = pipe.communicate()


References