txamqp-user team mailing list archive
-
txamqp-user team
-
Mailing list archive
-
Message #00076
Re: rabbitmq: channel.basic_consume crashes channel
On Fri, 26 Aug 2011 18:42:13 +0200
Esteve Fernandez <esteve.fernandez@xxxxxxxxx> wrote:
> Hey,
>
> 2011/8/26 bartek <bartek@xxxxxxxxxxxx>:
> > blocks, the channel disappears from rabbitmq, and rabbit's sasl log
> > shows a crash report, the most important line being:
> >
> > exception exit: {amqp_error,frame_error,
> > "cannot decode
> > <<0,0,16,98,114,111,97,100,99,97,115,116,95,108,105,115,116,101,110,0,0,0,0,0,0>>",
> > 'basic.consume'}
>
> This usually happens when there's a Deferred that's not yield
> elsewhere in the code. The way AMQP works is that all commands must be
> sent in order, that is, you can't send a command if you haven't waited
> for the previous one to come back (if it produces a response).
Ah, so in other words rabbit's channel is in the wrong state so it does
not accept the command I send?
>
> Is this the only place where you're using AMQP? Is there anywhere else
> in the code where it's used?
Not really - the code is trivial, mostly copied from the test suite.
Maybe I'm running it in the wrong way? See attached - all I do is start
and listen...
B.
>
> Thanks for using txAMQP!
>
> _______________________________________________
> Mailing list: https://launchpad.net/~txamqp-user
> Post to : txamqp-user@xxxxxxxxxxxxxxxxxxx
> Unsubscribe : https://launchpad.net/~txamqp-user
> More help : https://help.launchpad.net/ListHelp
--
Mam wiele powodów żeby ciągle iść do przodu trzy z nich witam od progu
gdy wracam do domu... (Abradab/O.S.T.R.)
'''
Created on Aug 24, 2011
@author: bartek
'''
import os
import sys
import time
import warnings
from txamqp.content import Content
import txamqp.spec
from txamqp.protocol import AMQChannel, AMQClient, TwistedDelegate
from twisted.internet import error, protocol, reactor
from twisted.trial import unittest
from twisted.internet.defer import inlineCallbacks, Deferred, returnValue, DeferredQueue, DeferredLock
from twisted.python import failure
from txamqp.queue import Empty
RABBITMQ = "RABBITMQ"
OPENAMQ = "OPENAMQ"
QPID = "QPID"
class supportedBrokers(object):
def __init__(self, *supporterBrokers):
self.supporterBrokers = supporterBrokers
def __call__(self, f):
if _get_broker() not in self.supporterBrokers:
f.skip = "Not supported for this broker."
return f
def _get_broker():
return os.environ.get("TXAMQP_BROKER")
USERNAME='guest'
PASSWORD='guest'
VHOST='/'
HEARTBEAT = 0
class Base(object):
"""
copied from txamqp test suite, nearly in extenso
"""
clientClass = AMQClient
heartbeat = HEARTBEAT
instance_id = 'base'
id = '123'
def __init__(self, id=None, *args, **kwargs):
self.id = id or self.id
self.host = 'localhost'
self.port = 5672
self.spec = '/home/bartek/Downloads/python-txamqp-0.3/src/specs/qpid/amqp.0-8.xml'
self.user = USERNAME
self.password = PASSWORD
self.vhost = VHOST
self.queues = []
self.exchanges = []
self.connectors = []
@inlineCallbacks
def connect(self, host=None, port=None, spec=None, user=None, password=None, vhost=None,
heartbeat=None, clientClass=None):
host = host or self.host
port = port or self.port
spec = spec or self.spec
user = user or self.user
password = password or self.password
vhost = vhost or self.vhost
heartbeat = heartbeat or self.heartbeat
clientClass = clientClass or self.clientClass
delegate = TwistedDelegate()
onConn = Deferred()
p = clientClass(delegate, vhost, txamqp.spec.load(spec), heartbeat=heartbeat)
f = protocol._InstanceFactory(reactor, p, onConn)
c = reactor.connectTCP(host, port, f)
def errb(thefailure):
thefailure.trap(error.ConnectionRefusedError)
print "failed to connect to host: %s, port: %s; These tests are designed to run against a running instance" \
" of the %s AMQP broker on the given host and port. failure: %r" % (host, port, self.broker, thefailure,)
thefailure.raiseException()
onConn.addErrback(errb)
self.connectors.append(c)
client = yield onConn
yield client.authenticate(user, password)
returnValue(client)
@inlineCallbacks
def start(self, v=None):
print 'starting'
# TODO reconnecting, handle calls in the meantime
try:
self.client = yield self.connect()
except txamqp.client.Closed, le:
le.args = tuple(("Unable to connect to AMQP broker in order to run tests (perhaps due to auth failure?). " \
"The tests assume that an instance of the %s AMQP broker is already set up and that this test script " \
"can connect to it and use it as user '%s', password '%s', vhost '%s'." % (_get_broker(),
USERNAME, PASSWORD, VHOST),) + le.args)
raise
print 'setting channel'
self.channel = yield self.client.channel(1)
print 'channel set'
yield self.channel.channel_open()
print 'channel open'
@inlineCallbacks
def stop(self, v=None):
for connector in self.connectors:
yield connector.disconnect()
@inlineCallbacks
def exchange_declare(self, channel=None, ticket=0, exchange='',
type='', passive=False, durable=False,
auto_delete=False, internal=False, nowait=False,
arguments={}):
print 'declaring'
channel = channel or self.channel
reply = yield channel.exchange_declare(ticket, exchange, type, passive, durable, auto_delete, internal, nowait, arguments)
self.exchanges.append((channel,exchange))
returnValue(reply)
@inlineCallbacks
def queue_declare(self, channel=None, *args, **keys):
channel = channel or self.channel
reply = yield channel.queue_declare(*args, **keys)
self.queues.append((channel, reply.queue))
returnValue(reply)
class MyBase(Base):
@inlineCallbacks
def broadcast(self, v=None, message=None):
exid = '%s.broadcast' % self.instance_id
yield self.exchange_declare(0, exchange=exid, type='fanout')
qid = '%s.broadcast.%s' % (self.instance_id, self.id)
yield self.queue_declare(queue=qid)
yield self.channel.queue_bind(queue=qid, exchange=exid)
for i in range(1, 11):
print "sending", i
msg=Content("Message %d" % i)
self.channel.basic_publish(content=msg, exchange=exid)
@inlineCallbacks
def listen(self, v=None, exid=None):
# deklarujemy na wypadek gdyby nie bylo
print 1
exid = '%s.broadcast' % self.instance_id
yield self.exchange_declare(0, exchange=exid, type='fanout')
print 2
qid = '%s_broadcast_listen_%s' % (self.instance_id, self.id)
yield self.queue_declare(queue=qid)
print 2.5
yield self.channel.queue_bind(queue=qid, exchange=exid)
print 3
subscription = yield self.channel.basic_consume(queue=qid)
print 4
queue = yield self.client.queue(consumer_tag=subscription.consumer_tag)
print 5
while True:
print 'waiting'
msg = yield queue.get()
print '[FANOUT] Received: ' + msg.content.body + '...'
if __name__ == '__main__':
a = sys.argv[1]
if a == 'send':
b = MyBase()
d = Deferred()
d.addCallback(b.start)
d.addCallback(b.broadcast)
if a == 'listen':
b = MyBase(sys.argv[2])
d = Deferred()
d.addCallback(b.start)
d.addCallback(b.listen)
d.callback(None)
reactor.run()
# exid = '%s.broadcast' % self.instance_id
# yield self.exchange_declare(0, exchange=exid, type='fanout')
# qid = '%s.broadcast_listen.%s' % (self.instance_id, self.id)
# yield self.channel.queue_declare(queue=qid)
# yield self.channel.queue_bind(queue=qid, exchange=exid)
# import pdb
# pdb.set_trace()
# while True:
# print 'waiting'
# msg = yield self.channel.queue.get()
# print '[FANOUT] Received: ' + msg.content.body + '...'
#
# def b(self):
# def _got(msg):
# print '[FANOUT] Received: ' + msg.content.body + '...'
# return msg
# def _err(f):
# print '[ERR] Received: ', f
# d = self.channel.basic_consume(queue=qid, no_ack=True)
# d.addCallback(_got)
# d.addErrback(_err)
# yield d
#
#
# def a(self):
# done = False
# def _cancel(failure):
# print 'cancel'
# done = True
# return self.channel.basic_cancel()
# def _got(msg):
# print '[FANOUT] Received: ' + msg.content.body + '...'
# return msg
# while not done:
# print 'waiting'
## d = self.channel.basic_get(no_ack=True)
# d = self.channel.queue.get()
# d.addCallback(_got)
# d.addErrback(_cancel)
# msg = yield d
## msg = yield self.channel.basic_consume(queue=qid)
# print '[FANOUT] Received: ' + msg.content.body + '...'
Follow ups
References