← Back to team overview

txamqp-user team mailing list archive

Re: after some code

 

On Wed, Oct 7, 2009 at 4:49 PM, <jacopo.pecci@xxxxxxxxx> wrote:

> I have often seen people referring to code below written by Esteve.
> Unfortunately the original links are broken, does anyone have the two files
> (or know where to get them)
>

I don't remember where i've found them, but below there are 2 links to
pastebin and the complete code.
All the code is from Esteve and you have to adjust some parameters to make
it function.
Fabrizio

consumer: http://pastebin.org/38643
publisher: http://pastebin.org/38642


> Consumer: http://fluidinfo.com/esteve/stuff/txamqp_consumer.py
>

# -*- coding: utf-8 -*-
from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor
from twisted.internet.protocol import ClientCreator
from txamqp.protocol import AMQClient
from txamqp.client import TwistedDelegate
import txamqp.spec
import pickle

@inlineCallbacks
def gotConnection(conn, authentication):
    yield conn.start(authentication)

    chan = yield conn.channel(1)
    yield chan.channel_open()

    yield chan.queue_declare(queue="po_box", durable=True, exclusive=False,
auto_delete=False)
    yield chan.exchange_declare(exchange="sorting_room", type="direct",
durable=True, auto_delete=False)

    yield chan.queue_bind(queue="po_box", exchange="sorting_room",
routing_key="jason")

    yield chan.basic_consume(queue='po_box', no_ack=True,
consumer_tag="testtag")

    def recv_callback(msg, chan, queue):
    msgbody = pickle.loads(msg.content.body)
    if msgbody == "STOP":
            print "Received STOP signal. Quitting"
        return
        print 'Received: ' + str(msgbody) + ' from channel #' + str(chan.id)
        return (queue.get().addCallback(recv_callback, chan, queue))

    queue = yield conn.queue("testtag")
    yield (queue.get().addCallback(recv_callback, chan, queue))

    yield chan.basic_cancel("testtag")

    yield chan.channel_close()

    chan0 = yield conn.channel(0)

    yield chan0.connection_close()

    reactor.stop()


if __name__ == "__main__":
    import sys
    if len(sys.argv) != 2:
        print "%s path_to_spec" % sys.argv[0]
        sys.exit(1)

    spec = txamqp.spec.load(sys.argv[1])

    authentication = {"LOGIN": "guest", "PASSWORD": "guest"}

    delegate = TwistedDelegate()
    d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost="/",
        spec=spec).connectTCP("10.105.101.152", 5672)

    d.addCallback(gotConnection, authentication)

    reactor.run()



Publisher: http://fluidinfo.com/esteve/stuff/txamqp_publisher.py
>

# -*- coding: utf-8 -*-
from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor
from twisted.internet.protocol import ClientCreator
from txamqp.protocol import AMQClient
from txamqp.client import TwistedDelegate
from txamqp.content import Content
import txamqp.spec
import pickle

@inlineCallbacks
def gotConnection(conn, authentication, body):
    yield conn.start(authentication)

    chan = yield conn.channel(1)
    yield chan.channel_open()


    msg = Content(pickle.dumps(body))
    msg["delivery mode"] = 2
    chan.basic_publish(exchange="sorting_room", content=msg,
routing_key="jason")

    yield chan.channel_close()

    chan0 = yield conn.channel(0)
    yield chan0.connection_close()

    reactor.stop()

if __name__ == "__main__":
    import sys
    if len(sys.argv) != 3:
        print "%s path_to_spec content" % sys.argv[0]
        sys.exit(1)

    spec = txamqp.spec.load(sys.argv[1])

    authentication = {"LOGIN": "guest", "PASSWORD": "guest"}

    delegate = TwistedDelegate()
    d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost="/",
        spec=spec).connectTCP("10.105.101.152", 5672)

    d.addCallback(gotConnection, authentication, sys.argv[2])

    reactor.run()

Follow ups

References