← Back to team overview

graphite-dev team mailing list archive

[Merge] lp:~drawks/graphite/graphite-msgpack into lp:graphite

 

Dave Rawks has proposed merging lp:~drawks/graphite/graphite-msgpack into lp:graphite.

Requested reviews:
  graphite-dev (graphite-dev)

For more details, see:
https://code.launchpad.net/~drawks/graphite/graphite-msgpack/+merge/92921

Added msgpack as a receiver protocol. msgpack is super fast at deserializing and much safer than pickle since it doesn't support serializing things like modules, functions, or class objects. Also added support to enable/disable individual listeners in carbon.conf

New code path can be tested with a simple metric writer like:

import msgpack, socket, time:
s=socket.create_connection(('localhost',2005))
fs=s.makefile()
for second in xrange(int(time.time()) - 86400,int(time.time())):
    msgpack.pack(("foo.bar",(second,1)),fs)
fs.flush()
fs.close()
s.shutdown(socket.SHUT_RDWR)


-- 
https://code.launchpad.net/~drawks/graphite/graphite-msgpack/+merge/92921
Your team graphite-dev is requested to review the proposed merge of lp:~drawks/graphite/graphite-msgpack into lp:graphite.
=== modified file 'carbon/conf/carbon.conf.example'
--- carbon/conf/carbon.conf.example	2011-12-14 16:22:17 +0000
+++ carbon/conf/carbon.conf.example	2012-02-14 07:19:18 +0000
@@ -56,6 +56,7 @@
 # the files quickly but at the risk of slowing I/O down considerably for a while.
 MAX_CREATES_PER_MINUTE = 50
 
+ENABLE_LINE_RECEIVER = True
 LINE_RECEIVER_INTERFACE = 0.0.0.0
 LINE_RECEIVER_PORT = 2003
 
@@ -66,6 +67,7 @@
 UDP_RECEIVER_INTERFACE = 0.0.0.0
 UDP_RECEIVER_PORT = 2003
 
+ENABLE_PICKLE_RECEIVER = True
 PICKLE_RECEIVER_INTERFACE = 0.0.0.0
 PICKLE_RECEIVER_PORT = 2004
 
@@ -74,6 +76,10 @@
 # Set this to True to revert to the old-fashioned insecure unpickler.
 USE_INSECURE_UNPICKLER = False
 
+ENABLE_MSGPACK_RECEIVER = True
+MSGPACK_RECEIVER_INTERFACE = 0.0.0.0
+MSGPACK_RECEIVER_PORT = 2005
+
 CACHE_QUERY_INTERFACE = 0.0.0.0
 CACHE_QUERY_PORT = 7002
 

=== modified file 'carbon/lib/carbon/conf.py'
--- carbon/lib/carbon/conf.py	2012-02-09 21:38:55 +0000
+++ carbon/lib/carbon/conf.py	2012-02-14 07:19:18 +0000
@@ -32,11 +32,16 @@
   MAX_CACHE_SIZE=float('inf'),
   MAX_UPDATES_PER_SECOND=500,
   MAX_CREATES_PER_MINUTE=float('inf'),
+  ENABLE_LINE_RECEIVER=True,
   LINE_RECEIVER_INTERFACE='0.0.0.0',
   LINE_RECEIVER_PORT=2003,
   ENABLE_UDP_LISTENER=False,
   UDP_RECEIVER_INTERFACE='0.0.0.0',
   UDP_RECEIVER_PORT=2003,
+  ENABLE_MSGPACK_RECEIVER=True,
+  MSGPACK_RECEIVER_INTERFACE='0.0.0.0',
+  MSGPACK_RECEIVER_PORT=2005,
+  ENABLE_PICKLE_RECEIVER=True,
   PICKLE_RECEIVER_INTERFACE='0.0.0.0',
   PICKLE_RECEIVER_PORT=2004,
   CACHE_QUERY_INTERFACE='0.0.0.0',

=== modified file 'carbon/lib/carbon/protocols.py'
--- carbon/lib/carbon/protocols.py	2012-02-11 06:26:28 +0000
+++ carbon/lib/carbon/protocols.py	2012-02-14 07:19:18 +0000
@@ -1,11 +1,13 @@
 from twisted.internet import reactor
-from twisted.internet.protocol import DatagramProtocol
+from twisted.internet.protocol import DatagramProtocol, Protocol
 from twisted.internet.error import ConnectionDone
 from twisted.protocols.basic import LineOnlyReceiver, Int32StringReceiver
 from carbon import log, events, state, management
 from carbon.conf import settings
 from carbon.regexlist import WhiteList, BlackList
 from carbon.util import pickle, get_unpickler
+if settings.ENABLE_MSGPACK_RECEIVER:
+  from msgpack import Unpacker
 
 
 class MetricReceiver:
@@ -106,6 +108,33 @@
       self.metricReceived(metric, datapoint)
 
 
+class MetricMessagePackReceiver(MetricReceiver, Protocol):
+
+  def connectionMade(self):
+    MetricReceiver.connectionMade(self)
+    self.unpacker = Unpacker()
+
+  def dataReceived(self, data):
+    if len(data) <= 0:
+      log.listener('msgpack short read from %s' % self.peerName)
+      return
+    try:
+      self.unpacker.feed(data)
+      for (metric, datapoint) in self.unpacker:
+        if not isinstance(metric,str):
+          log.listener('invalid metric name/type %r/%r received from %s' % ( metric, type(metric), self.peerName))
+          continue
+        try:
+          datapoint = ( float(datapoint[0]), float(datapoint[1]) )
+        except:
+          continue
+        self.metricReceived(metric, datapoint)
+    except:
+      log.listener('invalid msgpack received from %s, ignoring' % self.peerName)
+      return
+
+
+
 class CacheManagementHandler(Int32StringReceiver):
   def connectionMade(self):
     peer = self.transport.getPeer()

=== modified file 'carbon/lib/carbon/service.py'
--- carbon/lib/carbon/service.py	2011-12-14 16:22:17 +0000
+++ carbon/lib/carbon/service.py	2012-02-14 07:19:18 +0000
@@ -40,7 +40,7 @@
 def createBaseService(config):
     from carbon.conf import settings
     from carbon.protocols import (MetricLineReceiver, MetricPickleReceiver,
-                                  MetricDatagramReceiver)
+                                  MetricDatagramReceiver, MetricMessagePackReceiver)
 
     root_service = CarbonRootService()
     root_service.setName(settings.program)
@@ -59,12 +59,20 @@
         amqp_exchange_name = settings.get("AMQP_EXCHANGE", "graphite")
 
 
-    for interface, port, protocol in ((settings.LINE_RECEIVER_INTERFACE,
-                                       settings.LINE_RECEIVER_PORT,
-                                       MetricLineReceiver),
-                                      (settings.PICKLE_RECEIVER_INTERFACE,
-                                       settings.PICKLE_RECEIVER_PORT,
-                                       MetricPickleReceiver)):
+    receivers = []
+    if settings.ENABLE_LINE_RECEIVER:
+       receivers.append((settings.LINE_RECEIVER_INTERFACE,
+                         settings.LINE_RECEIVER_PORT,
+                         MetricLineReceiver))
+    if settings.ENABLE_PICKLE_RECEIVER:
+       receivers.append((settings.PICKLE_RECEIVER_INTERFACE,
+                         settings.PICKLE_RECEIVER_PORT,
+                         MetricPickleReceiver))
+    if settings.ENABLE_MSGPACK_RECEIVER:
+       receivers.append((settings.MSGPACK_RECEIVER_INTERFACE,
+                         settings.MSGPACK_RECEIVER_PORT,
+                         MetricMessagePackReceiver))
+    for (interface, port, protocol) in receivers:
         if port:
             factory = ServerFactory()
             factory.protocol = protocol

=== modified file 'check-dependencies.py'
--- check-dependencies.py	2012-02-10 05:14:15 +0000
+++ check-dependencies.py	2012-02-14 07:19:18 +0000
@@ -168,6 +168,14 @@
   print "Note that txamqp requires python 2.5 or greater."
   warning += 1
 
+# Test for msgpack
+try:
+  import msgpack
+except:
+  print "[WARNING]"
+  print "Unable to import the 'msgpack' module, this is required if you want to use msgpack receiver."
+  warning += 1
+
 
 if fatal:
   print "%d necessary dependencies not met. Graphite will not function until these dependencies are fulfilled." % fatal