graphite-dev team mailing list archive
-
graphite-dev team
-
Mailing list archive
-
Message #01819
[Merge] lp:~sidnei/graphite/only-notify-once-when-queue-full into lp:graphite
Sidnei da Silva has proposed merging lp:~sidnei/graphite/only-notify-once-when-queue-full into lp:graphite.
Requested reviews:
graphite-dev (graphite-dev)
For more details, see:
https://code.launchpad.net/~sidnei/graphite/only-notify-once-when-queue-full/+merge/84330
Only notify once when the send queue is full. Reset state once there's enough
space available again.
--
https://code.launchpad.net/~sidnei/graphite/only-notify-once-when-queue-full/+merge/84330
Your team graphite-dev is requested to review the proposed merge of lp:~sidnei/graphite/only-notify-once-when-queue-full into lp:graphite.
=== modified file 'carbon/lib/carbon/client.py'
--- carbon/lib/carbon/client.py 2011-10-06 09:22:43 +0000
+++ carbon/lib/carbon/client.py 2011-12-02 21:11:34 +0000
@@ -56,23 +56,27 @@
self.sendQueued()
else:
- datapoints = [ (metric, datapoint) ]
- self.sendString( pickle.dumps(datapoints, protocol=-1) )
- instrumentation.increment(self.sent)
+ self._sendDatapoints([(metric, datapoint)])
+
+ def _sendDatapoints(self, datapoints):
+ self.sendString(pickle.dumps(datapoints, protocol=-1))
+ instrumentation.increment(self.sent, len(datapoints))
self.factory.checkQueue()
def sendQueued(self):
while (not self.paused) and self.factory.hasQueuedDatapoints():
datapoints = self.factory.takeSomeFromQueue()
- self.sendString( pickle.dumps(datapoints, protocol=-1) )
- self.factory.checkQueue()
- instrumentation.increment(self.sent, len(datapoints))
-
- if (settings.USE_FLOW_CONTROL and
- state.metricReceiversPaused and
- self.factory.queueSize < SEND_QUEUE_LOW_WATERMARK):
- log.clients('send queue has space available, resuming paused clients')
- events.resumeReceivingMetrics()
+ self._sendDatapoints(datapoints)
+
+ queueSize = self.factory.queueSize
+ if queueSize < SEND_QUEUE_LOW_WATERMARK:
+ self.factory.queueHasSpace.callback(queueSize)
+ log.clients('%s send queue has space available' % self)
+
+ if (settings.USE_FLOW_CONTROL and
+ state.metricReceiversPaused):
+ log.clients('%s resuming paused clients' % self)
+ events.resumeReceivingMetrics()
def __str__(self):
return 'CarbonClientProtocol(%s:%d:%s)' % (self.factory.destination)
@@ -84,7 +88,7 @@
def __init__(self, destination):
self.destination = destination
- self.destinationName = ('%s:%d:%s' % destination).replace('.', '_')
+ self.destinationName = ('%s_%d_%s' % destination).replace('.', '_')
self.host, self.port, self.carbon_instance = destination
self.addr = (self.host, self.port)
self.started = False
@@ -92,6 +96,10 @@
self.queue = [] # including datapoints that still need to be sent
self.connectedProtocol = None
self.queueEmpty = Deferred()
+ self.queueFull = Deferred()
+ self.queueFull.addCallback(self.queueFullCallback)
+ self.queueHasSpace = Deferred()
+ self.queueHasSpace.addCallback(self.queueSpaceCallback)
self.connectFailed = Deferred()
self.connectionMade = Deferred()
self.connectionLost = Deferred()
@@ -100,6 +108,16 @@
self.fullQueueDrops = 'destinations.%s.fullQueueDrops' % self.destinationName
self.queuedUntilConnected = 'destinations.%s.queuedUntilConnected' % self.destinationName
+ def queueFullCallback(self, result):
+ log.clients('%s send queue is full (%d datapoints)' % (self, result))
+
+ def queueSpaceCallback(self, result):
+ if self.queueFull.called:
+ self.queueFull = Deferred()
+ self.queueFull.addCallback(self.queueFullCallback)
+ self.queueHasSpace = Deferred()
+ self.queueHasSpace.addCallback(self.queueSpaceCallback)
+
def buildProtocol(self, addr):
self.connectedProtocol = CarbonClientProtocol()
self.connectedProtocol.factory = self
@@ -133,12 +151,14 @@
self.queueEmpty = Deferred()
def enqueue(self, metric, datapoint):
- self.queue.append( (metric, datapoint) )
+ self.queue.append((metric, datapoint))
def sendDatapoint(self, metric, datapoint):
instrumentation.increment(self.attemptedRelays)
- if len(self.queue) >= settings.MAX_QUEUE_SIZE:
- log.clients('%s::sendDatapoint send queue full, dropping datapoint')
+ queueSize = self.queueSize
+ if queueSize >= settings.MAX_QUEUE_SIZE:
+ if not self.queueFull.called:
+ self.queueFull.callback(queueSize)
instrumentation.increment(self.fullQueueDrops)
elif self.connectedProtocol:
self.connectedProtocol.sendDatapoint(metric, datapoint)