txamqp-user team mailing list archive
-
txamqp-user team
-
Mailing list archive
-
Message #00020
Re: after some code
On Wed, Oct 7, 2009 at 4:49 PM, <jacopo.pecci@xxxxxxxxx> wrote:
> I have often seen people referring to code below written by Esteve.
> Unfortunately the original links are broken, does anyone have the two files
> (or know where to get them)
>
I don't remember where i've found them, but below there are 2 links to
pastebin and the complete code.
All the code is from Esteve and you have to adjust some parameters to make
it function.
Fabrizio
consumer: http://pastebin.org/38643
publisher: http://pastebin.org/38642
> Consumer: http://fluidinfo.com/esteve/stuff/txamqp_consumer.py
>
# -*- coding: utf-8 -*-
from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor
from twisted.internet.protocol import ClientCreator
from txamqp.protocol import AMQClient
from txamqp.client import TwistedDelegate
import txamqp.spec
import pickle
@inlineCallbacks
def gotConnection(conn, authentication):
yield conn.start(authentication)
chan = yield conn.channel(1)
yield chan.channel_open()
yield chan.queue_declare(queue="po_box", durable=True, exclusive=False,
auto_delete=False)
yield chan.exchange_declare(exchange="sorting_room", type="direct",
durable=True, auto_delete=False)
yield chan.queue_bind(queue="po_box", exchange="sorting_room",
routing_key="jason")
yield chan.basic_consume(queue='po_box', no_ack=True,
consumer_tag="testtag")
def recv_callback(msg, chan, queue):
msgbody = pickle.loads(msg.content.body)
if msgbody == "STOP":
print "Received STOP signal. Quitting"
return
print 'Received: ' + str(msgbody) + ' from channel #' + str(chan.id)
return (queue.get().addCallback(recv_callback, chan, queue))
queue = yield conn.queue("testtag")
yield (queue.get().addCallback(recv_callback, chan, queue))
yield chan.basic_cancel("testtag")
yield chan.channel_close()
chan0 = yield conn.channel(0)
yield chan0.connection_close()
reactor.stop()
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
print "%s path_to_spec" % sys.argv[0]
sys.exit(1)
spec = txamqp.spec.load(sys.argv[1])
authentication = {"LOGIN": "guest", "PASSWORD": "guest"}
delegate = TwistedDelegate()
d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost="/",
spec=spec).connectTCP("10.105.101.152", 5672)
d.addCallback(gotConnection, authentication)
reactor.run()
Publisher: http://fluidinfo.com/esteve/stuff/txamqp_publisher.py
>
# -*- coding: utf-8 -*-
from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor
from twisted.internet.protocol import ClientCreator
from txamqp.protocol import AMQClient
from txamqp.client import TwistedDelegate
from txamqp.content import Content
import txamqp.spec
import pickle
@inlineCallbacks
def gotConnection(conn, authentication, body):
yield conn.start(authentication)
chan = yield conn.channel(1)
yield chan.channel_open()
msg = Content(pickle.dumps(body))
msg["delivery mode"] = 2
chan.basic_publish(exchange="sorting_room", content=msg,
routing_key="jason")
yield chan.channel_close()
chan0 = yield conn.channel(0)
yield chan0.connection_close()
reactor.stop()
if __name__ == "__main__":
import sys
if len(sys.argv) != 3:
print "%s path_to_spec content" % sys.argv[0]
sys.exit(1)
spec = txamqp.spec.load(sys.argv[1])
authentication = {"LOGIN": "guest", "PASSWORD": "guest"}
delegate = TwistedDelegate()
d = ClientCreator(reactor, AMQClient, delegate=delegate, vhost="/",
spec=spec).connectTCP("10.105.101.152", 5672)
d.addCallback(gotConnection, authentication, sys.argv[2])
reactor.run()
Follow ups
References