← Back to team overview

graphite-dev team mailing list archive

[Merge] lp:~sidnei/graphite/only-notify-once-when-queue-full into lp:graphite

 

Sidnei da Silva has proposed merging lp:~sidnei/graphite/only-notify-once-when-queue-full into lp:graphite.

Requested reviews:
  graphite-dev (graphite-dev)

For more details, see:
https://code.launchpad.net/~sidnei/graphite/only-notify-once-when-queue-full/+merge/84330

Only notify once when the send queue is full. Reset state once there's enough
space available again.
-- 
https://code.launchpad.net/~sidnei/graphite/only-notify-once-when-queue-full/+merge/84330
Your team graphite-dev is requested to review the proposed merge of lp:~sidnei/graphite/only-notify-once-when-queue-full into lp:graphite.
=== modified file 'carbon/lib/carbon/client.py'
--- carbon/lib/carbon/client.py	2011-10-06 09:22:43 +0000
+++ carbon/lib/carbon/client.py	2011-12-02 21:11:34 +0000
@@ -56,23 +56,27 @@
       self.sendQueued()
 
     else:
-      datapoints = [ (metric, datapoint) ]
-      self.sendString( pickle.dumps(datapoints, protocol=-1) )
-      instrumentation.increment(self.sent)
+      self._sendDatapoints([(metric, datapoint)])
+
+  def _sendDatapoints(self, datapoints):
+      self.sendString(pickle.dumps(datapoints, protocol=-1))
+      instrumentation.increment(self.sent, len(datapoints))
       self.factory.checkQueue()
 
   def sendQueued(self):
     while (not self.paused) and self.factory.hasQueuedDatapoints():
       datapoints = self.factory.takeSomeFromQueue()
-      self.sendString( pickle.dumps(datapoints, protocol=-1) )
-      self.factory.checkQueue()
-      instrumentation.increment(self.sent, len(datapoints))
-
-    if (settings.USE_FLOW_CONTROL and
-        state.metricReceiversPaused and
-        self.factory.queueSize < SEND_QUEUE_LOW_WATERMARK):
-      log.clients('send queue has space available, resuming paused clients')
-      events.resumeReceivingMetrics()
+      self._sendDatapoints(datapoints)
+
+    queueSize = self.factory.queueSize
+    if queueSize < SEND_QUEUE_LOW_WATERMARK:
+      self.factory.queueHasSpace.callback(queueSize)
+      log.clients('%s send queue has space available' % self)
+
+      if (settings.USE_FLOW_CONTROL and
+          state.metricReceiversPaused):
+        log.clients('%s resuming paused clients' % self)
+        events.resumeReceivingMetrics()
 
   def __str__(self):
     return 'CarbonClientProtocol(%s:%d:%s)' % (self.factory.destination)
@@ -84,7 +88,7 @@
 
   def __init__(self, destination):
     self.destination = destination
-    self.destinationName = ('%s:%d:%s' % destination).replace('.', '_')
+    self.destinationName = ('%s_%d_%s' % destination).replace('.', '_')
     self.host, self.port, self.carbon_instance = destination
     self.addr = (self.host, self.port)
     self.started = False
@@ -92,6 +96,10 @@
     self.queue = [] # including datapoints that still need to be sent
     self.connectedProtocol = None
     self.queueEmpty = Deferred()
+    self.queueFull = Deferred()
+    self.queueFull.addCallback(self.queueFullCallback)
+    self.queueHasSpace = Deferred()
+    self.queueHasSpace.addCallback(self.queueSpaceCallback)
     self.connectFailed = Deferred()
     self.connectionMade = Deferred()
     self.connectionLost = Deferred()
@@ -100,6 +108,16 @@
     self.fullQueueDrops = 'destinations.%s.fullQueueDrops' % self.destinationName
     self.queuedUntilConnected = 'destinations.%s.queuedUntilConnected' % self.destinationName
 
+  def queueFullCallback(self, result):
+    log.clients('%s send queue is full (%d datapoints)' % (self, result))
+    
+  def queueSpaceCallback(self, result):
+    if self.queueFull.called:
+      self.queueFull = Deferred()
+      self.queueFull.addCallback(self.queueFullCallback)
+    self.queueHasSpace = Deferred()
+    self.queueHasSpace.addCallback(self.queueSpaceCallback)
+
   def buildProtocol(self, addr):
     self.connectedProtocol = CarbonClientProtocol()
     self.connectedProtocol.factory = self
@@ -133,12 +151,14 @@
       self.queueEmpty = Deferred()
 
   def enqueue(self, metric, datapoint):
-    self.queue.append( (metric, datapoint) )
+    self.queue.append((metric, datapoint))
 
   def sendDatapoint(self, metric, datapoint):
     instrumentation.increment(self.attemptedRelays)
-    if len(self.queue) >= settings.MAX_QUEUE_SIZE:
-      log.clients('%s::sendDatapoint send queue full, dropping datapoint')
+    queueSize = self.queueSize
+    if queueSize >= settings.MAX_QUEUE_SIZE:
+      if not self.queueFull.called:
+        self.queueFull.callback(queueSize)
       instrumentation.increment(self.fullQueueDrops)
     elif self.connectedProtocol:
       self.connectedProtocol.sendDatapoint(metric, datapoint)