← Back to team overview

txamqp-user team mailing list archive

Re: How do you listen to different queues asynchronously?

 

>> It depends on how you want it to be terminated :-) You can either keep
>> a counter or send a special message (like STOP).
>>
>
> Yeah :-), I was thinking more of breaking the consumer loop because
> I'm quiting my program and I want to end cleanly. Maybe using a global
> boolean variable? Sending a STOP message from the consumer to
> terminate that same process doesn't look right.
>
> The idea is that I have a bunch of users in a "conversation" (sending
> a receiving messages) and any of the users can leave the
> "conversation" any time without disturbing others.
>

This is the sort of thing I wanted to achieve but without trapping
exceptions. I don't know how "dirty" this is (a snip, for the whole
file see paste):

def whichEverFiresFirst(def1, def2):
    d = Deferred()

    def _callback(obj, d):
        d.callback(obj)

    def _errback(failure):
        failure.trap(Closed)

    def1.addCallback(_callback, d)
    def1.addErrback(_errback)
    def2.addCallback(_callback, d)
    return d

class StopSignal(object):

    def get_signal(self):
        self.d = Deferred()
        return self.d

    def fire_signal(self):
        self.d.callback("STOP")

@inlineCallbacks
def consumer_fanout(conn, chan, queue, sig):

    while True:
        msg = yield whichEverFiresFirst(queue.get(), sig.get_signal())

        if isinstance(msg, str) and msg == "STOP":
            print "-> STOP SIGNAL <-"
            queue.close()
            break

        print '[FANOUT] Received: ' + msg.content.body + ' from
channel #' + str(chan.id)

    yield chan.basic_cancel("testtag_fanout")

Cheers,

-- 
Ale.



Follow ups

References