txamqp-user team mailing list archive
-
txamqp-user team
-
Mailing list archive
-
Message #00039
Re: How do you listen to different queues asynchronously?
>> It depends on how you want it to be terminated :-) You can either keep
>> a counter or send a special message (like STOP).
>>
>
> Yeah :-), I was thinking more of breaking the consumer loop because
> I'm quiting my program and I want to end cleanly. Maybe using a global
> boolean variable? Sending a STOP message from the consumer to
> terminate that same process doesn't look right.
>
> The idea is that I have a bunch of users in a "conversation" (sending
> a receiving messages) and any of the users can leave the
> "conversation" any time without disturbing others.
>
This is the sort of thing I wanted to achieve but without trapping
exceptions. I don't know how "dirty" this is (a snip, for the whole
file see paste):
def whichEverFiresFirst(def1, def2):
d = Deferred()
def _callback(obj, d):
d.callback(obj)
def _errback(failure):
failure.trap(Closed)
def1.addCallback(_callback, d)
def1.addErrback(_errback)
def2.addCallback(_callback, d)
return d
class StopSignal(object):
def get_signal(self):
self.d = Deferred()
return self.d
def fire_signal(self):
self.d.callback("STOP")
@inlineCallbacks
def consumer_fanout(conn, chan, queue, sig):
while True:
msg = yield whichEverFiresFirst(queue.get(), sig.get_signal())
if isinstance(msg, str) and msg == "STOP":
print "-> STOP SIGNAL <-"
queue.close()
break
print '[FANOUT] Received: ' + msg.content.body + ' from
channel #' + str(chan.id)
yield chan.basic_cancel("testtag_fanout")
Cheers,
--
Ale.
Follow ups
References