← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/launchpad/celery-4 into lp:launchpad

 

Colin Watson has proposed merging lp:~cjwatson/launchpad/celery-4 into lp:launchpad.

Commit message:
Upgrade to celery 4.1.1.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/celery-4/+merge/364202

Celery has been gradually deprecating the use of class-based tasks, and in particular they aren't automatically added to the app's registry any more, so I switched over to explicitly creating an app and using function-based tasks (it's still OK to use custom base classes for tasks though).

Before landing this, we need to ensure that the init scripts being used for celery on gandwana (qastaging/staging) and ackee (production) have been changed to use "celery worker" rather than the obsolete "celeryd".
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/celery-4 into lp:launchpad.
=== modified file 'constraints.txt'
--- constraints.txt	2019-01-21 17:45:55 +0000
+++ constraints.txt	2019-03-09 09:26:51 +0000
@@ -217,8 +217,7 @@
 # post1 Don't add a process back to the ready set if it received an error
 # such as a timeout.
 ampoule==0.2.0.post1
-amqp==1.4.9
-amqplib==1.0.2
+amqp==2.4.2
 anyjson==0.3.3
 appdirs==1.4.3
 asn1crypto==0.23.0
@@ -231,10 +230,10 @@
 backports.lzma==0.0.3
 bcrypt==3.1.4
 BeautifulSoup==3.2.1
-billiard==3.3.0.23
+billiard==3.5.0.5
 bson==0.3.3
 bzr==2.6.0.lp.3
-celery==3.1.26.post2
+celery==4.1.1
 cffi==1.11.2
 Chameleon==2.11
 chardet==3.0.4
@@ -273,7 +272,7 @@
 iso8601==0.1.12
 jsautobuild==0.2
 keyring==0.6.2
-kombu==3.0.37
+kombu==4.4.0
 launchpad-buildd==159
 launchpadlib==1.10.5
 lazr.authentication==0.1.1
@@ -281,7 +280,7 @@
 lazr.config==2.2.1
 lazr.delegates==2.0.4
 lazr.enum==1.1.3
-lazr.jobrunner==0.13
+lazr.jobrunner==0.14
 lazr.lifecycle==1.1
 lazr.restful==0.20.1
 lazr.restfulclient==0.13.2
@@ -302,7 +301,7 @@
 netaddr==0.7.19
 oauth==1.0
 oops==0.0.13
-oops-amqp==0.0.8b1
+oops-amqp==0.1.0
 oops-datedir-repo==0.0.23
 oops-timeline==0.0.1
 oops-twisted==0.0.7
@@ -338,7 +337,7 @@
 python-openid==2.2.5-fix1034376
 python-swiftclient==2.0.3
 PyYAML==3.10
-rabbitfixture==0.3.6
+rabbitfixture==0.4.0
 requests==2.7.0
 requests-file==1.4.3
 requests-toolbelt==0.6.2
@@ -371,6 +370,7 @@
 typing==3.6.2
 unittest2==1.1.0
 van.testing==3.0.0
+vine==1.1.4
 virtualenv-tools3==2.0.0
 wadllib==1.3.2
 wheel==0.29.0

=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py	2019-03-06 14:52:01 +0000
+++ lib/lp/services/job/celeryconfig.py	2019-03-09 09:26:51 +0000
@@ -14,7 +14,7 @@
 
 
 def check_circular_fallbacks(queue):
-    """Check for curcular fallback queues.
+    """Check for circular fallback queues.
 
     A circular chain of fallback queues could keep a job forever queued
     if it times out in all queues.
@@ -41,7 +41,7 @@
     queue_names = queue_names.split(' ')
     for queue_name in queue_names:
         celery_queues[queue_name] = {
-            'binding_key': queue_name,
+            'routing_key': queue_name,
             }
         check_circular_fallbacks(queue_name)
 
@@ -52,7 +52,7 @@
     # A queue must be specified as a command line parameter for each
     # "celery worker" instance, but this is not required for a Launchpad app
     # server.
-    if 'celeryd' in argv[0] or ('celery' in argv[0] and argv[1] == 'worker'):
+    if 'celery' in argv[0] and argv[1] == 'worker':
         if queues is None or queues == '':
             raise ConfigurationError('A queue must be specified.')
         queues = queues.split(',')
@@ -68,7 +68,7 @@
                 'Queue %s is not configured in schema-lazr.conf' % queue)
         # XXX wgrant 2015-08-03: This should be set in the apply_async
         # now that we're on Celery 3.1.
-        result['CELERYD_TASK_SOFT_TIME_LIMIT'] = config[queue].timeout
+        result['task_soft_time_limit'] = config[queue].timeout
         if config[queue].fallback_queue != '':
             # XXX wgrant 2015-08-03: lazr.jobrunner actually looks for
             # FALLBACK_QUEUE; this probably isn't doing anything.
@@ -76,42 +76,34 @@
         # XXX wgrant 2015-08-03: This is mostly per-queue because we
         # can't run *_job and *_job_slow in the same worker, which will be
         # fixed once the CELERYD_TASK_SOFT_TIME_LIMIT override is gone.
-        result['CELERYD_CONCURRENCY'] = config[queue].concurrency
+        result['worker_concurrency'] = config[queue].concurrency
 
-    result['BROKER_URL'] = 'amqp://%s:%s@%s/%s' % (
+    result['broker_url'] = 'amqp://%s:%s@%s/%s' % (
         config.rabbitmq.userid, config.rabbitmq.password,
         config.rabbitmq.host, config.rabbitmq.virtual_host)
-    # XXX wgrant 2015-08-03: Celery 3.2 won't read pickles by default,
-    # and Celery 3.1 can send only pickles for some things. Let's accept
-    # both until they sort things out.
-    # XXX cjwatson 2019-03-06: Remove this once production is using json as
-    # its task/result serialiser.
-    result['CELERY_ACCEPT_CONTENT'] = ['pickle', 'json']
-    result['CELERY_CREATE_MISSING_QUEUES'] = False
-    result['CELERY_DEFAULT_EXCHANGE'] = 'job'
-    result['CELERY_DEFAULT_QUEUE'] = 'launchpad_job'
-    result['CELERY_ENABLE_UTC'] = True
-    result['CELERY_IMPORTS'] = ("lp.services.job.celeryjob", )
-    result['CELERY_QUEUES'] = celery_queues
-    result['CELERY_RESULT_BACKEND'] = 'amqp'
-    result['CELERY_RESULT_SERIALIZER'] = 'json'
-    result['CELERY_TASK_SERIALIZER'] = 'json'
-    result['CELERYBEAT_SCHEDULE'] = {
+    result['beat_schedule'] = {
         'schedule-missing': {
             'task': 'lp.services.job.celeryjob.run_missing_ready',
             'schedule': timedelta(seconds=600),
             'options': {
                 'routing_key': CELERY_BEAT_QUEUE,
                 },
+            }
         }
-    }
+    result['enable_utc'] = True
+    result['imports'] = ("lp.services.job.celeryjob", )
+    result['result_backend'] = 'amqp'
+    result['task_acks_late'] = True
+    result['task_create_missing_queues'] = False
+    result['task_default_exchange'] = 'job'
+    result['task_default_queue'] = 'launchpad_job'
+    result['task_queues'] = celery_queues
     # See http://ask.github.com/celery/userguide/optimizing.html:
     # The AMQP message of a job should stay in the RabbitMQ server
     # until the job has been finished. This allows to simply kill
     # a "celery worker" instance while a job is executed; when another
     # instance is started later, it will run the aborted job again.
-    result['CELERYD_PREFETCH_MULTIPLIER'] = 1
-    result['CELERY_ACKS_LATE'] = True
+    result['worker_prefetch_multiplier'] = 1
 
     return result
 

=== modified file 'lib/lp/services/job/celeryjob.py'
--- lib/lp/services/job/celeryjob.py	2015-08-03 07:06:45 +0000
+++ lib/lp/services/job/celeryjob.py	2019-03-09 09:26:51 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# Copyright 2012-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Celery-specific Job code.
@@ -10,8 +10,11 @@
 __metaclass__ = type
 
 __all__ = [
-    'CeleryRunJob',
-    'CeleryRunJobIgnoreResult',
+    'celery_app',
+    'celery_run_job',
+    'celery_run_job_ignore_result',
+    'find_missing_ready',
+    'run_missing_ready',
     ]
 
 from logging import info
@@ -20,7 +23,10 @@
 
 
 os.environ.setdefault('CELERY_CONFIG_MODULE', 'lp.services.job.celeryconfig')
-from celery.task import Task
+from celery import (
+    Celery,
+    Task,
+    )
 from lazr.jobrunner.celerytask import RunJob
 from storm.zope.interfaces import IZStorm
 import transaction
@@ -46,6 +52,9 @@
 from lp.services import scripts
 
 
+celery_app = Celery()
+
+
 class CeleryRunJob(RunJob):
     """The Celery Task that runs a job."""
 
@@ -67,19 +76,22 @@
         super(CeleryRunJob, self).run(job_id)
 
 
-class CeleryRunJobIgnoreResult(CeleryRunJob):
-
-    ignore_result = True
+@celery_app.task(base=CeleryRunJob, bind=True)
+def celery_run_job(self, job_id, dbuser):
+    super(type(self), self).run(job_id, dbuser)
+
+
+@celery_app.task(base=CeleryRunJob, bind=True, ignore_result=True)
+def celery_run_job_ignore_result(self, job_id, dbuser):
+    super(type(self), self).run(job_id, dbuser)
 
 
 class FindMissingReady:
 
     def __init__(self, job_source):
-        from lp.services.job.celeryjob import CeleryRunJob
         from lazr.jobrunner.celerytask import list_queued
         self.job_source = job_source
-        self.queue_contents = list_queued(CeleryRunJob.app,
-                                          [job_source.task_queue])
+        self.queue_contents = list_queued(celery_app, [job_source.task_queue])
         self.queued_job_ids = set(task[1][0][0] for task in
                                   self.queue_contents)
 
@@ -93,40 +105,46 @@
     return FindMissingReady(job_source).find_missing_ready()
 
 
-class RunMissingReady(Task):
-    """Task to run any jobs that are ready but not scheduled.
-
-    Currently supports only BranchScanJob.
-    :param _no_init: For tests.  If True, do not perform the initialization.
-    """
-    ignore_result = True
-
-    def run(self, _no_init=False):
-        if not _no_init:
-            task_init('run_missing_ready')
-        with TransactionFreeOperation():
-            count = 0
-            for job in find_missing_ready(BranchScanJob):
-                if not celery_enabled(job.__class__.__name__):
-                    continue
-                job.celeryCommitHook(True)
-                count += 1
-            info('Scheduled %d missing jobs.', count)
-            transaction.commit()
+class PrefixedTask(Task):
+    """A Task with more informative task_id defaults."""
+
+    task_id_prefix = None
 
     def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
-                    link=None, link_error=None, **options):
+                    link=None, link_error=None, shadow=None, **options):
         """Create a task_id if none is specified.
 
         Override the quite generic default task_id with one containing
-        the class name.
+        the task_id_prefix.
 
         See also `celery.task.Task.apply_async()`.
         """
-        if task_id is None:
-            task_id = '%s_%s' % (self.__class__.__name__, uuid4())
-        return super(RunMissingReady, self).apply_async(
-            args, kwargs, task_id, producer, link, link_error, **options)
+        if task_id is None and self.task_id_prefix is not None:
+            task_id = '%s_%s' % (self.task_id_prefix, uuid4())
+        return super(PrefixedTask, self).apply_async(
+            args=args, kwargs=kwargs, task_id=task_id, producer=producer,
+            link=link, link_error=link_error, shadow=shadow, **options)
+
+
+@celery_app.task(
+    base=PrefixedTask, task_id_prefix='RunMissingReady', ignore_result=True)
+def run_missing_ready(_no_init=False):
+    """Task to run any jobs that are ready but not scheduled.
+
+    Currently supports only BranchScanJob.
+    :param _no_init: For tests.  If True, do not perform the initialization.
+    """
+    if not _no_init:
+        task_init('run_missing_ready')
+    with TransactionFreeOperation():
+        count = 0
+        for job in find_missing_ready(BranchScanJob):
+            if not celery_enabled(job.__class__.__name__):
+                continue
+            job.celeryCommitHook(True)
+            count += 1
+        info('Scheduled %d missing jobs.', count)
+        transaction.commit()
 
 
 needs_zcml = True

=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2018-03-30 20:42:14 +0000
+++ lib/lp/services/job/runner.py	2019-03-09 09:26:51 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Facilities for running Jobs."""
@@ -237,11 +237,13 @@
         # Avoid importing from lp.services.job.celeryjob where not needed, to
         # avoid configuring Celery when Rabbit is not configured.
         from lp.services.job.celeryjob import (
-            CeleryRunJob, CeleryRunJobIgnoreResult)
+            celery_run_job,
+            celery_run_job_ignore_result,
+            )
         if ignore_result:
-            cls = CeleryRunJobIgnoreResult
+            task = celery_run_job_ignore_result
         else:
-            cls = CeleryRunJob
+            task = celery_run_job
         db_class = self.getDBClass()
         ujob_id = (self.job_id, db_class.__module__, db_class.__name__)
         eta = self.job.scheduled_start
@@ -250,7 +252,7 @@
         if (self.job.lease_expires is not None
                 and (eta is None or eta < self.job.lease_expires)):
             eta = self.job.lease_expires
-        return cls.apply_async(
+        return task.apply_async(
             (ujob_id, self.config.dbuser), queue=self.task_queue, eta=eta,
             soft_time_limit=self.soft_time_limit.total_seconds(),
             task_id=self.taskId())

=== modified file 'lib/lp/services/job/tests/__init__.py'
--- lib/lp/services/job/tests/__init__.py	2019-03-08 14:06:04 +0000
+++ lib/lp/services/job/tests/__init__.py	2019-03-09 09:26:51 +0000
@@ -28,9 +28,9 @@
     The "celery worker" instance will be configured to use the
     currently-configured BROKER_URL, and able to run CeleryRunJob tasks.
     """
-    from lp.services.job.celeryjob import CeleryRunJob
+    from lp.services.job.celeryjob import celery_app
     # convert config params to a URL, so they can be passed as --broker.
-    with CeleryRunJob.app.broker_connection() as connection:
+    with celery_app.broker_connection() as connection:
         broker_uri = connection.as_uri(include_password=True)
     cmd_args = (
         'worker',
@@ -89,8 +89,8 @@
 
 def drain_celery_queues():
     from lazr.jobrunner.celerytask import drain_queues
-    from lp.services.job.celeryjob import CeleryRunJob
-    drain_queues(CeleryRunJob.app, CeleryRunJob.app.conf.CELERY_QUEUES.keys())
+    from lp.services.job.celeryjob import celery_app
+    drain_queues(celery_app, celery_app.conf.CELERY_QUEUES.keys())
 
 
 def pop_remote_notifications():

=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
--- lib/lp/services/job/tests/test_celery_configuration.py	2015-10-12 13:16:54 +0000
+++ lib/lp/services/job/tests/test_celery_configuration.py	2019-03-09 09:26:51 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012-2015 Canonical Ltd.  This software is licensed under the
+# Copyright 2012-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 from contextlib import contextmanager
@@ -32,21 +32,20 @@
             'branch_write_job', 'branch_write_job_slow',
             'bzrsyncd_job', 'bzrsyncd_job_slow', 'celerybeat',
             'launchpad_job', 'launchpad_job_slow']
-        queues = config['CELERY_QUEUES']
+        queues = config['task_queues']
         self.assertEqual(queue_names, sorted(queues))
         for name in queue_names:
-            self.assertEqual(name, queues[name]['binding_key'])
+            self.assertEqual(name, queues[name]['routing_key'])
 
         # The port changes between test runs.
         self.assertThat(
-            config['BROKER_URL'],
+            config['broker_url'],
             MatchesRegex(r'amqp://guest:guest@localhost:\d+//\Z'))
-        self.assertFalse(config['CELERY_CREATE_MISSING_QUEUES'])
-        self.assertEqual('job', config['CELERY_DEFAULT_EXCHANGE'])
-        self.assertEqual('launchpad_job', config['CELERY_DEFAULT_QUEUE'])
-        self.assertEqual(
-            ('lp.services.job.celeryjob', ), config['CELERY_IMPORTS'])
-        self.assertEqual('amqp', config['CELERY_RESULT_BACKEND'])
+        self.assertFalse(config['task_create_missing_queues'])
+        self.assertEqual('job', config['task_default_exchange'])
+        self.assertEqual('launchpad_job', config['task_default_queue'])
+        self.assertEqual(('lp.services.job.celeryjob', ), config['imports'])
+        self.assertEqual('amqp', config['result_backend'])
 
     def test_app_server_configuration(self):
         from lp.services.job.celeryconfig import configure
@@ -55,10 +54,8 @@
 
     def check_job_specific_celery_worker_configuration(self, expected, config):
         self.check_default_common_parameters(config)
-        self.assertEqual(
-            expected['concurrency'], config['CELERYD_CONCURRENCY'])
-        self.assertEqual(
-            expected['timeout'], config['CELERYD_TASK_SOFT_TIME_LIMIT'])
+        self.assertEqual(expected['concurrency'], config['worker_concurrency'])
+        self.assertEqual(expected['timeout'], config['task_soft_time_limit'])
         self.assertEqual(
             expected['fallback'], config.get('FALLBACK', None))
 
@@ -151,9 +148,3 @@
         self.assertRaisesWithContent(
             ConfigurationError, error, configure,
             self.command + ['--queue=foo'])
-
-
-class TestCelerydConfiguration(TestCeleryWorkerConfiguration):
-    """Test behaviour with legacy "celeryd" command name."""
-
-    command = ['celeryd']

=== modified file 'lib/lp/services/job/tests/test_celeryjob.py'
--- lib/lp/services/job/tests/test_celeryjob.py	2015-10-12 13:16:54 +0000
+++ lib/lp/services/job/tests/test_celeryjob.py	2019-03-09 09:26:51 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012-2015 Canonical Ltd.  This software is licensed under the
+# Copyright 2012-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 from cStringIO import StringIO
@@ -29,13 +29,11 @@
     def setUp(self):
         super(TestRunMissingJobs, self).setUp()
         from lp.services.job.celeryjob import (
-            CeleryRunJob,
             find_missing_ready,
-            RunMissingReady,
-        )
-        self.CeleryRunJob = CeleryRunJob
+            run_missing_ready,
+            )
         self.find_missing_ready = find_missing_ready
-        self.RunMissingReady = RunMissingReady
+        self.run_missing_ready = run_missing_ready
 
     def createMissingJob(self):
         job = BranchScanJob.create(self.factory.makeBranch())
@@ -108,7 +106,7 @@
         with monitor_celery() as responses:
             with dbuser('run_missing_ready'):
                 with TransactionFreeOperation.require():
-                    self.RunMissingReady().run(_no_init=True)
+                    self.run_missing_ready.run(_no_init=True)
         self.assertEqual([], responses)
 
     def test_run_missing_ready(self):
@@ -119,7 +117,7 @@
         with monitor_celery() as responses:
             with dbuser('run_missing_ready'):
                 with TransactionFreeOperation.require():
-                    self.RunMissingReady().run(_no_init=True)
+                    self.run_missing_ready.run(_no_init=True)
         self.assertEqual(1, len(responses))
 
     def test_run_missing_ready_does_not_return_results(self):
@@ -127,7 +125,7 @@
         result queue."""
         from lp.services.job.tests.celery_helpers import noop
         job_queue_name = 'celerybeat'
-        request = self.RunMissingReady().apply_async(
+        request = self.run_missing_ready.apply_async(
             kwargs={'_no_init': True}, queue=job_queue_name)
         self.assertTrue(request.task_id.startswith('RunMissingReady_'))
         result_queue_name = request.task_id.replace('-', '')
@@ -136,15 +134,15 @@
         # This would also happen when "with celery_worker()" would do nothing.
         # So let's be sure that a task is queued...
         # Give the system some time to deliver the message
-        self.assertQueueSize(self.RunMissingReady.app, [job_queue_name], 1)
+        self.assertQueueSize(self.run_missing_ready.app, [job_queue_name], 1)
         # Wait at most 60 seconds for "celery worker" to start and process
         # the task.
         with celery_worker(job_queue_name):
             # Due to FIFO ordering, this will only return after
-            # RunMissingReady has finished.
+            # run_missing_ready has finished.
             noop.apply_async(queue=job_queue_name).wait(60)
         # But now the message has been consumed by "celery worker".
-        self.assertQueueSize(self.RunMissingReady.app, [job_queue_name], 0)
+        self.assertQueueSize(self.run_missing_ready.app, [job_queue_name], 0)
         # No result queue was created for the task.
         try:
             real_stdout = sys.stdout

=== modified file 'lib/lp/services/messaging/rabbit.py'
--- lib/lp/services/messaging/rabbit.py	2018-05-12 14:14:33 +0000
+++ lib/lp/services/messaging/rabbit.py	2019-03-09 09:26:51 +0000
@@ -1,4 +1,4 @@
-# Copyright 2011 Canonical Ltd.  This software is licensed under the
+# Copyright 2011-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """An API for messaging systems in Launchpad, e.g. RabbitMQ."""
@@ -19,7 +19,7 @@
 import threading
 import time
 
-from amqplib import client_0_8 as amqp
+import amqp
 import transaction
 from transaction._transaction import Status as TransactionStatus
 from zope.interface import implementer
@@ -73,10 +73,12 @@
     """
     if not is_configured():
         raise MessagingUnavailable("Incomplete configuration")
-    return amqp.Connection(
+    connection = amqp.Connection(
         host=config.rabbitmq.host, userid=config.rabbitmq.userid,
         password=config.rabbitmq.password,
-        virtual_host=config.rabbitmq.virtual_host, insist=False)
+        virtual_host=config.rabbitmq.virtual_host)
+    connection.connect()
+    return connection
 
 
 @implementer(IMessageSession)
@@ -97,9 +99,7 @@
     @property
     def is_connected(self):
         """See `IMessageSession`."""
-        return (
-            self._connection is not None and
-            self._connection.transport is not None)
+        return self._connection is not None and self._connection.connected
 
     def connect(self):
         """See `IMessageSession`.
@@ -107,7 +107,7 @@
         Open a connection for this thread if necessary. Connections cannot be
         shared between threads.
         """
-        if self._connection is None or self._connection.transport is None:
+        if self._connection is None or not self._connection.connected:
             self._connection = connect()
         return self._connection
 
@@ -281,8 +281,8 @@
                 else:
                     self.channel.basic_ack(message.delivery_tag)
                     return json.loads(message.body)
-            except amqp.AMQPChannelException as error:
-                if error.amqp_reply_code == 404:
+            except amqp.ChannelError as error:
+                if error.reply_code == 404:
                     raise QueueNotFound()
                 else:
                     raise

=== modified file 'lib/lp/testing/fixture.py'
--- lib/lp/testing/fixture.py	2018-05-12 14:14:33 +0000
+++ lib/lp/testing/fixture.py	2019-03-09 09:26:51 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Launchpad test fixtures that have no better home."""
@@ -21,7 +21,7 @@
 import socket
 import time
 
-import amqplib.client_0_8 as amqp
+import amqp
 from fixtures import (
     EnvironmentVariableFixture,
     Fixture,

=== modified file 'lib/lp/testing/tests/test_layers_functional.py'
--- lib/lp/testing/tests/test_layers_functional.py	2018-05-12 14:14:33 +0000
+++ lib/lp/testing/tests/test_layers_functional.py	2019-03-09 09:26:51 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2015 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 from __future__ import with_statement
@@ -19,7 +19,7 @@
 from urllib import urlopen
 import uuid
 
-from amqplib import client_0_8 as amqp
+import amqp
 from fixtures import (
     EnvironmentVariableFixture,
     Fixture,
@@ -277,8 +277,8 @@
                 host=rabbitmq.host,
                 userid=rabbitmq.userid,
                 password=rabbitmq.password,
-                virtual_host=rabbitmq.virtual_host,
-                insist=False)
+                virtual_host=rabbitmq.virtual_host)
+            conn.connect()
             conn.close()
 
 

=== modified file 'lib/lp_sitecustomize.py'
--- lib/lp_sitecustomize.py	2018-05-12 14:14:33 +0000
+++ lib/lp_sitecustomize.py	2019-03-09 09:26:51 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2017 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 # This file is imported by _pythonpath.py and by the standard Launchpad
@@ -70,11 +70,11 @@
             logger.parent = new_root
 
 
-def silence_amqplib_logger():
-    """Install the NullHandler on the amqplib logger to silence logs."""
-    amqplib_logger = logging.getLogger('amqplib')
-    amqplib_logger.addHandler(logging.NullHandler())
-    amqplib_logger.propagate = False
+def silence_amqp_logger():
+    """Install the NullHandler on the amqp logger to silence logs."""
+    amqp_logger = logging.getLogger('amqp')
+    amqp_logger.addHandler(logging.NullHandler())
+    amqp_logger.propagate = False
 
 
 def silence_bzr_logger():
@@ -153,7 +153,7 @@
     This function is also invoked by the test infrastructure to reset
     logging between tests.
     """
-    silence_amqplib_logger()
+    silence_amqp_logger()
     silence_bzr_logger()
     silence_zcml_logger()
     silence_transaction_logger()


Follow ups