launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #22509
[Merge] lp:~cjwatson/launchpad/celery-3.1.26 into lp:launchpad
Colin Watson has proposed merging lp:~cjwatson/launchpad/celery-3.1.26 into lp:launchpad.
Commit message:
Upgrade to celery 3.1.26.post2.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/celery-3.1.26/+merge/345462
This reverts my recent rabbitfixture/oops-amqp upgrade. That pulled in amqp >= 2.0, and all the tests that run celery workers hung after upgrading celery. I bisected this to https://github.com/celery/celery/commit/ee88f6a42938d740c79f3cc670c692dc9fca4a8b, and I think we're just lucky that we didn't hit this earlier because the worker is simply hanging when calling connection.drain_events from that event loop as opposed to directly (for some reason); when I reverted the previous upgrade all was well again. I should have noticed this earlier because kombu explicitly depends on amqp < 2.0, so there's probably something a bit wrong with our dependency upgrade procedures.
celery 3.1.24, or a later 3.1.x version, supports the new task message protocol introduced in 4.0 and is therefore recommended as an intermediate step to upgrading to celery 4.x. So I think the correct order for this is to deploy 3.1.26 and only then upgrade to celery/kombu 4.x and the newer versions of rabbitfixture and oops-amqp.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/celery-3.1.26 into lp:launchpad.
=== modified file 'constraints.txt'
--- constraints.txt 2018-05-10 08:21:03 +0000
+++ constraints.txt 2018-05-12 14:47:15 +0000
@@ -217,7 +217,8 @@
# post1 Don't add a process back to the ready set if it received an error
# such as a timeout.
ampoule==0.2.0.post1
-amqp==2.2.2
+amqp==1.4.9
+amqplib==1.0.2
anyjson==0.3.3
appdirs==1.4.3
asn1crypto==0.23.0
@@ -229,10 +230,10 @@
Babel==2.5.1
backports.lzma==0.0.3
BeautifulSoup==3.2.1
-billiard==3.3.0.20
+billiard==3.3.0.23
bson==0.3.3
bzr==2.6.0.lp.2
-celery==3.1.18
+celery==3.1.26.post2
cffi==1.11.2
Chameleon==2.11
constantly==15.1.0
@@ -268,7 +269,7 @@
iso8601==0.1.12
jsautobuild==0.2
keyring==0.6.2
-kombu==3.0.30
+kombu==3.0.37
launchpad-buildd==159
launchpadlib==1.10.5
lazr.authentication==0.1.1
@@ -297,7 +298,7 @@
netaddr==0.7.19
oauth==1.0
oops==0.0.13
-oops-amqp==0.1.0
+oops-amqp==0.0.8b1
oops-datedir-repo==0.0.23
oops-timeline==0.0.1
oops-twisted==0.0.7
@@ -331,7 +332,7 @@
python-openid==2.2.5-fix1034376
python-swiftclient==2.0.3
PyYAML==3.10
-rabbitfixture==0.4.0
+rabbitfixture==0.3.6
requests==2.7.0
requests-toolbelt==0.6.2
scandir==1.7
@@ -361,7 +362,6 @@
typing==3.6.2
unittest2==1.1.0
van.testing==3.0.0
-vine==1.1.4
virtualenv-tools3==2.0.0
wadllib==1.3.2
wheel==0.29.0
=== modified file 'lib/lp/services/messaging/rabbit.py'
--- lib/lp/services/messaging/rabbit.py 2018-05-09 16:46:39 +0000
+++ lib/lp/services/messaging/rabbit.py 2018-05-12 14:47:15 +0000
@@ -19,7 +19,7 @@
import threading
import time
-import amqp
+from amqplib import client_0_8 as amqp
import transaction
from transaction._transaction import Status as TransactionStatus
from zope.interface import implementer
@@ -73,12 +73,10 @@
"""
if not is_configured():
raise MessagingUnavailable("Incomplete configuration")
- connection = amqp.Connection(
+ return amqp.Connection(
host=config.rabbitmq.host, userid=config.rabbitmq.userid,
password=config.rabbitmq.password,
- virtual_host=config.rabbitmq.virtual_host)
- connection.connect()
- return connection
+ virtual_host=config.rabbitmq.virtual_host, insist=False)
@implementer(IMessageSession)
@@ -99,7 +97,9 @@
@property
def is_connected(self):
"""See `IMessageSession`."""
- return self._connection is not None and self._connection.connected
+ return (
+ self._connection is not None and
+ self._connection.transport is not None)
def connect(self):
"""See `IMessageSession`.
@@ -107,7 +107,7 @@
Open a connection for this thread if necessary. Connections cannot be
shared between threads.
"""
- if self._connection is None or not self._connection.connected:
+ if self._connection is None or self._connection.transport is None:
self._connection = connect()
return self._connection
@@ -281,8 +281,8 @@
else:
self.channel.basic_ack(message.delivery_tag)
return json.loads(message.body)
- except amqp.ChannelError as error:
- if error.reply_code == 404:
+ except amqp.AMQPChannelException as error:
+ if error.amqp_reply_code == 404:
raise QueueNotFound()
else:
raise
=== modified file 'lib/lp/testing/fixture.py'
--- lib/lp/testing/fixture.py 2018-05-09 16:46:39 +0000
+++ lib/lp/testing/fixture.py 2018-05-12 14:47:15 +0000
@@ -21,7 +21,7 @@
import socket
import time
-import amqp
+import amqplib.client_0_8 as amqp
from fixtures import (
EnvironmentVariableFixture,
Fixture,
=== modified file 'lib/lp/testing/tests/test_layers_functional.py'
--- lib/lp/testing/tests/test_layers_functional.py 2018-05-09 16:46:39 +0000
+++ lib/lp/testing/tests/test_layers_functional.py 2018-05-12 14:47:15 +0000
@@ -19,7 +19,7 @@
from urllib import urlopen
import uuid
-import amqp
+from amqplib import client_0_8 as amqp
from fixtures import (
EnvironmentVariableFixture,
Fixture,
@@ -277,8 +277,8 @@
host=rabbitmq.host,
userid=rabbitmq.userid,
password=rabbitmq.password,
- virtual_host=rabbitmq.virtual_host)
- conn.connect()
+ virtual_host=rabbitmq.virtual_host,
+ insist=False)
conn.close()
=== modified file 'lib/lp_sitecustomize.py'
--- lib/lp_sitecustomize.py 2018-05-09 16:46:39 +0000
+++ lib/lp_sitecustomize.py 2018-05-12 14:47:15 +0000
@@ -70,11 +70,11 @@
logger.parent = new_root
-def silence_amqp_logger():
- """Install the NullHandler on the amqp logger to silence logs."""
- amqp_logger = logging.getLogger('amqp')
- amqp_logger.addHandler(logging.NullHandler())
- amqp_logger.propagate = False
+def silence_amqplib_logger():
+ """Install the NullHandler on the amqplib logger to silence logs."""
+ amqplib_logger = logging.getLogger('amqplib')
+ amqplib_logger.addHandler(logging.NullHandler())
+ amqplib_logger.propagate = False
def silence_bzr_logger():
@@ -153,7 +153,7 @@
This function is also invoked by the test infrastructure to reset
logging between tests.
"""
- silence_amqp_logger()
+ silence_amqplib_logger()
silence_bzr_logger()
silence_zcml_logger()
silence_transaction_logger()
Follow ups