graphite-dev team mailing list archive
-
graphite-dev team
-
Mailing list archive
-
Message #02128
[Merge] lp:~drawks/graphite/graphite-msgpack into lp:graphite
Dave Rawks has proposed merging lp:~drawks/graphite/graphite-msgpack into lp:graphite.
Requested reviews:
graphite-dev (graphite-dev)
For more details, see:
https://code.launchpad.net/~drawks/graphite/graphite-msgpack/+merge/92921
Added msgpack as a receiver protocol. msgpack is super fast at deserializing and much safer than pickle since it doesn't support serializing things like modules, functions, or class objects. Also added support to enable/disable individual listeners in carbon.conf
New code path can be tested with a simple metric writer like:
import msgpack, socket, time:
s=socket.create_connection(('localhost',2005))
fs=s.makefile()
for second in xrange(int(time.time()) - 86400,int(time.time())):
msgpack.pack(("foo.bar",(second,1)),fs)
fs.flush()
fs.close()
s.shutdown(socket.SHUT_RDWR)
--
https://code.launchpad.net/~drawks/graphite/graphite-msgpack/+merge/92921
Your team graphite-dev is requested to review the proposed merge of lp:~drawks/graphite/graphite-msgpack into lp:graphite.
=== modified file 'carbon/conf/carbon.conf.example'
--- carbon/conf/carbon.conf.example 2011-12-14 16:22:17 +0000
+++ carbon/conf/carbon.conf.example 2012-02-14 07:19:18 +0000
@@ -56,6 +56,7 @@
# the files quickly but at the risk of slowing I/O down considerably for a while.
MAX_CREATES_PER_MINUTE = 50
+ENABLE_LINE_RECEIVER = True
LINE_RECEIVER_INTERFACE = 0.0.0.0
LINE_RECEIVER_PORT = 2003
@@ -66,6 +67,7 @@
UDP_RECEIVER_INTERFACE = 0.0.0.0
UDP_RECEIVER_PORT = 2003
+ENABLE_PICKLE_RECEIVER = True
PICKLE_RECEIVER_INTERFACE = 0.0.0.0
PICKLE_RECEIVER_PORT = 2004
@@ -74,6 +76,10 @@
# Set this to True to revert to the old-fashioned insecure unpickler.
USE_INSECURE_UNPICKLER = False
+ENABLE_MSGPACK_RECEIVER = True
+MSGPACK_RECEIVER_INTERFACE = 0.0.0.0
+MSGPACK_RECEIVER_PORT = 2005
+
CACHE_QUERY_INTERFACE = 0.0.0.0
CACHE_QUERY_PORT = 7002
=== modified file 'carbon/lib/carbon/conf.py'
--- carbon/lib/carbon/conf.py 2012-02-09 21:38:55 +0000
+++ carbon/lib/carbon/conf.py 2012-02-14 07:19:18 +0000
@@ -32,11 +32,16 @@
MAX_CACHE_SIZE=float('inf'),
MAX_UPDATES_PER_SECOND=500,
MAX_CREATES_PER_MINUTE=float('inf'),
+ ENABLE_LINE_RECEIVER=True,
LINE_RECEIVER_INTERFACE='0.0.0.0',
LINE_RECEIVER_PORT=2003,
ENABLE_UDP_LISTENER=False,
UDP_RECEIVER_INTERFACE='0.0.0.0',
UDP_RECEIVER_PORT=2003,
+ ENABLE_MSGPACK_RECEIVER=True,
+ MSGPACK_RECEIVER_INTERFACE='0.0.0.0',
+ MSGPACK_RECEIVER_PORT=2005,
+ ENABLE_PICKLE_RECEIVER=True,
PICKLE_RECEIVER_INTERFACE='0.0.0.0',
PICKLE_RECEIVER_PORT=2004,
CACHE_QUERY_INTERFACE='0.0.0.0',
=== modified file 'carbon/lib/carbon/protocols.py'
--- carbon/lib/carbon/protocols.py 2012-02-11 06:26:28 +0000
+++ carbon/lib/carbon/protocols.py 2012-02-14 07:19:18 +0000
@@ -1,11 +1,13 @@
from twisted.internet import reactor
-from twisted.internet.protocol import DatagramProtocol
+from twisted.internet.protocol import DatagramProtocol, Protocol
from twisted.internet.error import ConnectionDone
from twisted.protocols.basic import LineOnlyReceiver, Int32StringReceiver
from carbon import log, events, state, management
from carbon.conf import settings
from carbon.regexlist import WhiteList, BlackList
from carbon.util import pickle, get_unpickler
+if settings.ENABLE_MSGPACK_RECEIVER:
+ from msgpack import Unpacker
class MetricReceiver:
@@ -106,6 +108,33 @@
self.metricReceived(metric, datapoint)
+class MetricMessagePackReceiver(MetricReceiver, Protocol):
+
+ def connectionMade(self):
+ MetricReceiver.connectionMade(self)
+ self.unpacker = Unpacker()
+
+ def dataReceived(self, data):
+ if len(data) <= 0:
+ log.listener('msgpack short read from %s' % self.peerName)
+ return
+ try:
+ self.unpacker.feed(data)
+ for (metric, datapoint) in self.unpacker:
+ if not isinstance(metric,str):
+ log.listener('invalid metric name/type %r/%r received from %s' % ( metric, type(metric), self.peerName))
+ continue
+ try:
+ datapoint = ( float(datapoint[0]), float(datapoint[1]) )
+ except:
+ continue
+ self.metricReceived(metric, datapoint)
+ except:
+ log.listener('invalid msgpack received from %s, ignoring' % self.peerName)
+ return
+
+
+
class CacheManagementHandler(Int32StringReceiver):
def connectionMade(self):
peer = self.transport.getPeer()
=== modified file 'carbon/lib/carbon/service.py'
--- carbon/lib/carbon/service.py 2011-12-14 16:22:17 +0000
+++ carbon/lib/carbon/service.py 2012-02-14 07:19:18 +0000
@@ -40,7 +40,7 @@
def createBaseService(config):
from carbon.conf import settings
from carbon.protocols import (MetricLineReceiver, MetricPickleReceiver,
- MetricDatagramReceiver)
+ MetricDatagramReceiver, MetricMessagePackReceiver)
root_service = CarbonRootService()
root_service.setName(settings.program)
@@ -59,12 +59,20 @@
amqp_exchange_name = settings.get("AMQP_EXCHANGE", "graphite")
- for interface, port, protocol in ((settings.LINE_RECEIVER_INTERFACE,
- settings.LINE_RECEIVER_PORT,
- MetricLineReceiver),
- (settings.PICKLE_RECEIVER_INTERFACE,
- settings.PICKLE_RECEIVER_PORT,
- MetricPickleReceiver)):
+ receivers = []
+ if settings.ENABLE_LINE_RECEIVER:
+ receivers.append((settings.LINE_RECEIVER_INTERFACE,
+ settings.LINE_RECEIVER_PORT,
+ MetricLineReceiver))
+ if settings.ENABLE_PICKLE_RECEIVER:
+ receivers.append((settings.PICKLE_RECEIVER_INTERFACE,
+ settings.PICKLE_RECEIVER_PORT,
+ MetricPickleReceiver))
+ if settings.ENABLE_MSGPACK_RECEIVER:
+ receivers.append((settings.MSGPACK_RECEIVER_INTERFACE,
+ settings.MSGPACK_RECEIVER_PORT,
+ MetricMessagePackReceiver))
+ for (interface, port, protocol) in receivers:
if port:
factory = ServerFactory()
factory.protocol = protocol
=== modified file 'check-dependencies.py'
--- check-dependencies.py 2012-02-10 05:14:15 +0000
+++ check-dependencies.py 2012-02-14 07:19:18 +0000
@@ -168,6 +168,14 @@
print "Note that txamqp requires python 2.5 or greater."
warning += 1
+# Test for msgpack
+try:
+ import msgpack
+except:
+ print "[WARNING]"
+ print "Unable to import the 'msgpack' module, this is required if you want to use msgpack receiver."
+ warning += 1
+
if fatal:
print "%d necessary dependencies not met. Graphite will not function until these dependencies are fulfilled." % fatal