← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/launchpad/celery-3.1.26 into lp:launchpad

 

Colin Watson has proposed merging lp:~cjwatson/launchpad/celery-3.1.26 into lp:launchpad.

Commit message:
Upgrade to celery 3.1.26.post2.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/celery-3.1.26/+merge/345462

This reverts my recent rabbitfixture/oops-amqp upgrade.  That pulled in amqp >= 2.0, and all the tests that run celery workers hung after upgrading celery.  I bisected this to https://github.com/celery/celery/commit/ee88f6a42938d740c79f3cc670c692dc9fca4a8b, and I think we're just lucky that we didn't hit this earlier because the worker is simply hanging when calling connection.drain_events from that event loop as opposed to directly (for some reason); when I reverted the previous upgrade all was well again.  I should have noticed this earlier because kombu explicitly depends on amqp < 2.0, so there's probably something a bit wrong with our dependency upgrade procedures.

celery 3.1.24, or a later 3.1.x version, supports the new task message protocol introduced in 4.0 and is therefore recommended as an intermediate step to upgrading to celery 4.x.  So I think the correct order for this is to deploy 3.1.26 and only then upgrade to celery/kombu 4.x and the newer versions of rabbitfixture and oops-amqp.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/celery-3.1.26 into lp:launchpad.
=== modified file 'constraints.txt'
--- constraints.txt	2018-05-10 08:21:03 +0000
+++ constraints.txt	2018-05-12 14:47:15 +0000
@@ -217,7 +217,8 @@
 # post1 Don't add a process back to the ready set if it received an error
 # such as a timeout.
 ampoule==0.2.0.post1
-amqp==2.2.2
+amqp==1.4.9
+amqplib==1.0.2
 anyjson==0.3.3
 appdirs==1.4.3
 asn1crypto==0.23.0
@@ -229,10 +230,10 @@
 Babel==2.5.1
 backports.lzma==0.0.3
 BeautifulSoup==3.2.1
-billiard==3.3.0.20
+billiard==3.3.0.23
 bson==0.3.3
 bzr==2.6.0.lp.2
-celery==3.1.18
+celery==3.1.26.post2
 cffi==1.11.2
 Chameleon==2.11
 constantly==15.1.0
@@ -268,7 +269,7 @@
 iso8601==0.1.12
 jsautobuild==0.2
 keyring==0.6.2
-kombu==3.0.30
+kombu==3.0.37
 launchpad-buildd==159
 launchpadlib==1.10.5
 lazr.authentication==0.1.1
@@ -297,7 +298,7 @@
 netaddr==0.7.19
 oauth==1.0
 oops==0.0.13
-oops-amqp==0.1.0
+oops-amqp==0.0.8b1
 oops-datedir-repo==0.0.23
 oops-timeline==0.0.1
 oops-twisted==0.0.7
@@ -331,7 +332,7 @@
 python-openid==2.2.5-fix1034376
 python-swiftclient==2.0.3
 PyYAML==3.10
-rabbitfixture==0.4.0
+rabbitfixture==0.3.6
 requests==2.7.0
 requests-toolbelt==0.6.2
 scandir==1.7
@@ -361,7 +362,6 @@
 typing==3.6.2
 unittest2==1.1.0
 van.testing==3.0.0
-vine==1.1.4
 virtualenv-tools3==2.0.0
 wadllib==1.3.2
 wheel==0.29.0

=== modified file 'lib/lp/services/messaging/rabbit.py'
--- lib/lp/services/messaging/rabbit.py	2018-05-09 16:46:39 +0000
+++ lib/lp/services/messaging/rabbit.py	2018-05-12 14:47:15 +0000
@@ -19,7 +19,7 @@
 import threading
 import time
 
-import amqp
+from amqplib import client_0_8 as amqp
 import transaction
 from transaction._transaction import Status as TransactionStatus
 from zope.interface import implementer
@@ -73,12 +73,10 @@
     """
     if not is_configured():
         raise MessagingUnavailable("Incomplete configuration")
-    connection = amqp.Connection(
+    return amqp.Connection(
         host=config.rabbitmq.host, userid=config.rabbitmq.userid,
         password=config.rabbitmq.password,
-        virtual_host=config.rabbitmq.virtual_host)
-    connection.connect()
-    return connection
+        virtual_host=config.rabbitmq.virtual_host, insist=False)
 
 
 @implementer(IMessageSession)
@@ -99,7 +97,9 @@
     @property
     def is_connected(self):
         """See `IMessageSession`."""
-        return self._connection is not None and self._connection.connected
+        return (
+            self._connection is not None and
+            self._connection.transport is not None)
 
     def connect(self):
         """See `IMessageSession`.
@@ -107,7 +107,7 @@
         Open a connection for this thread if necessary. Connections cannot be
         shared between threads.
         """
-        if self._connection is None or not self._connection.connected:
+        if self._connection is None or self._connection.transport is None:
             self._connection = connect()
         return self._connection
 
@@ -281,8 +281,8 @@
                 else:
                     self.channel.basic_ack(message.delivery_tag)
                     return json.loads(message.body)
-            except amqp.ChannelError as error:
-                if error.reply_code == 404:
+            except amqp.AMQPChannelException as error:
+                if error.amqp_reply_code == 404:
                     raise QueueNotFound()
                 else:
                     raise

=== modified file 'lib/lp/testing/fixture.py'
--- lib/lp/testing/fixture.py	2018-05-09 16:46:39 +0000
+++ lib/lp/testing/fixture.py	2018-05-12 14:47:15 +0000
@@ -21,7 +21,7 @@
 import socket
 import time
 
-import amqp
+import amqplib.client_0_8 as amqp
 from fixtures import (
     EnvironmentVariableFixture,
     Fixture,

=== modified file 'lib/lp/testing/tests/test_layers_functional.py'
--- lib/lp/testing/tests/test_layers_functional.py	2018-05-09 16:46:39 +0000
+++ lib/lp/testing/tests/test_layers_functional.py	2018-05-12 14:47:15 +0000
@@ -19,7 +19,7 @@
 from urllib import urlopen
 import uuid
 
-import amqp
+from amqplib import client_0_8 as amqp
 from fixtures import (
     EnvironmentVariableFixture,
     Fixture,
@@ -277,8 +277,8 @@
                 host=rabbitmq.host,
                 userid=rabbitmq.userid,
                 password=rabbitmq.password,
-                virtual_host=rabbitmq.virtual_host)
-            conn.connect()
+                virtual_host=rabbitmq.virtual_host,
+                insist=False)
             conn.close()
 
 

=== modified file 'lib/lp_sitecustomize.py'
--- lib/lp_sitecustomize.py	2018-05-09 16:46:39 +0000
+++ lib/lp_sitecustomize.py	2018-05-12 14:47:15 +0000
@@ -70,11 +70,11 @@
             logger.parent = new_root
 
 
-def silence_amqp_logger():
-    """Install the NullHandler on the amqp logger to silence logs."""
-    amqp_logger = logging.getLogger('amqp')
-    amqp_logger.addHandler(logging.NullHandler())
-    amqp_logger.propagate = False
+def silence_amqplib_logger():
+    """Install the NullHandler on the amqplib logger to silence logs."""
+    amqplib_logger = logging.getLogger('amqplib')
+    amqplib_logger.addHandler(logging.NullHandler())
+    amqplib_logger.propagate = False
 
 
 def silence_bzr_logger():
@@ -153,7 +153,7 @@
     This function is also invoked by the test infrastructure to reset
     logging between tests.
     """
-    silence_amqp_logger()
+    silence_amqplib_logger()
     silence_bzr_logger()
     silence_zcml_logger()
     silence_transaction_logger()


Follow ups