← Back to team overview

graphite-dev team mailing list archive

[Merge] lp:~jens-rantil/whisper/whisper into lp:whisper

 

Jens Rantil has proposed merging lp:~jens-rantil/whisper/whisper into lp:whisper.

Requested reviews:
  graphite-dev (graphite-dev)

For more details, see:
https://code.launchpad.net/~jens-rantil/whisper/whisper/+merge/105582

This branch proposes the following changes:
 * Removing unnecessary stdout from whisper-update.py
 * Using setuptools instead of distutils. This makes it possible to use 'python setup.py develop' for easier development.
 * Introducing '--aggregate' command line parameter to whisper-resize.py for smarter resize algorithms.

The last change was tested using the following script:
===============
#!/bin/bash

echo "================================================================="
rm -f test.wsp
whisper-create.py --xFilesFactor=0 --aggregationMethod=sum test.wsp 1s:20s
CREATED=$(date +%s)
echo "Created: $CREATED"
whisper-update.py test.wsp $((CREATED)):1 $((CREATED-1)):1 $((CREATED-2)):1 $((CREATED-3)):1 $((CREATED-4)):1

echo
echo "Resizing without aggregation:"
whisper-resize.py test.wsp 2s:20s

echo
echo "Dumping result without aggregation:"
whisper-dump.py test.wsp

echo "================================================================="
echo "================================================================="

rm -f test.wsp
whisper-create.py --xFilesFactor=0 --aggregationMethod=sum test.wsp 1s:20s
CREATED=$(date +%s)
echo "Created: $CREATED"
whisper-update.py test.wsp $((CREATED)):1 $((CREATED-1)):1 $((CREATED-2)):1 $((CREATED-3)):1 $((CREATED-4)):1

# Seems to be a bug if I am not waiting a bit here
# Please help figuring this out if you have any input!
sleep 2

echo
echo "Resizing with aggregation:"
whisper-resize.py --aggregate test.wsp 2s:20s

echo
echo "Dumping result without aggregation:"
whisper-dump.py test.wsp
echo "================================================================="
===============
-- 
https://code.launchpad.net/~jens-rantil/whisper/whisper/+merge/105582
Your team graphite-dev is requested to review the proposed merge of lp:~jens-rantil/whisper/whisper into lp:whisper.
=== modified file 'bin/whisper-resize.py'
--- bin/whisper-resize.py	2011-08-01 08:38:41 +0000
+++ bin/whisper-resize.py	2012-05-13 13:10:24 +0000
@@ -3,6 +3,8 @@
 import sys, os, time, traceback
 import whisper
 from optparse import OptionParser
+import bisect
+import math
 
 now = int(time.time())
 
@@ -33,6 +35,10 @@
 option_parser.add_option(
     '--nobackup', action='store_true',
     help='Delete the .bak file after successful execution')
+option_parser.add_option(
+    '--aggregate', action='store_true',
+    help='Try to aggregate the values to fit the new archive better.'
+         ' Note that this will make things slower and use more memory.')
 
 (options, args) = option_parser.parse_args()
 
@@ -80,12 +86,58 @@
 size = os.stat(newfile).st_size
 print 'Created: %s (%d bytes)' % (newfile,size)
 
-print 'Migrating data...'
-for archive in old_archives:
-  timeinfo, values = archive['data']
-  datapoints = zip( range(*timeinfo), values )
-  datapoints = filter(lambda p: p[1] is not None, datapoints)
-  whisper.update_many(newfile, datapoints)
+if options.aggregate:
+  # This is where data will be interpolated (best effort)
+  print 'Migrating data with aggregation...'
+  alldatapoints = []
+  for archive in old_archives:
+    # Loading all datapoints into memory for fast querying
+    timeinfo, values = archive['data']
+    datapoints = zip( range(*timeinfo), values )
+    alldatapoints += datapoints
+
+  # Must be sorted since we are doing binary search for all elements
+  alldatapoints.sort()
+
+  oldtimestamps = map( lambda p: p[0], alldatapoints)
+  oldvalues = map( lambda p: p[1], alldatapoints)
+
+  # Simply cleaning up some used memory
+  del alldatapoints
+
+  new_info = whisper.info(newfile)
+  new_archives = new_info['archives']
+
+  for archive in new_archives:
+    fromTime = now - archive['retention'] + archive['secondsPerPoint']
+    untilTime = now
+    timeinfo, _values = whisper.fetch(newfile, fromTime, untilTime)
+    del _values # Freeing up unused return value
+    timepoints_to_update = range(*timeinfo)
+    newdatapoints = []
+    for tinterval in zip( timepoints_to_update[:-1], timepoints_to_update[1:] ):
+      # TODO: Setting lo= parameter for 'lefti' based on righti from previous
+      #       iteration. Obviously, this can only be done if
+      #       timepoints_to_update is always updated. Is it?
+      lefti = bisect.bisect_left(oldtimestamps, tinterval[0])
+      righti = bisect.bisect_left(oldtimestamps, tinterval[1], lo=lefti)
+      newvalues = oldvalues[lefti:righti]
+      if newvalues:
+        non_none = filter( lambda x: x is not None, newvalues)
+        if 1.0*len(non_none)/len(newvalues) >= xff:
+          newdatapoints.append([tinterval[0],
+                                whisper.aggregate(aggregationMethod,
+                                                  non_none)])
+      elif xff == 0:
+        newdatapoints.append([tinterval[0], 0])
+    whisper.update_many(newfile, newdatapoints)
+else:
+  print 'Migrating data without aggregation...'
+  for archive in old_archives:
+    timeinfo, values = archive['data']
+    datapoints = zip( range(*timeinfo), values )
+    datapoints = filter(lambda p: p[1] is not None, datapoints)
+    whisper.update_many(newfile, datapoints)
 
 if options.newfile is not None:
   sys.exit(0)

=== modified file 'bin/whisper-update.py'
--- bin/whisper-update.py	2011-07-26 14:24:24 +0000
+++ bin/whisper-update.py	2012-05-13 13:10:24 +0000
@@ -25,5 +25,4 @@
   timestamp,value = datapoints[0]
   whisper.update(path, value, timestamp)
 else:
-  print datapoints
   whisper.update_many(path, datapoints)

=== modified file 'setup.py'
--- setup.py	2012-05-01 08:09:32 +0000
+++ setup.py	2012-05-13 13:10:24 +0000
@@ -2,7 +2,7 @@
 
 import os
 from glob import glob
-from distutils.core import setup
+from setuptools import setup
 
 
 setup(

=== modified file 'whisper.py'
--- whisper.py	2012-04-21 08:27:14 +0000
+++ whisper.py	2012-05-13 13:10:24 +0000
@@ -364,7 +364,7 @@
 
   fh.close()
 
-def __aggregate(aggregationMethod, knownValues):
+def aggregate(aggregationMethod, knownValues):
   if aggregationMethod == 'average':
     return float(sum(knownValues)) / float(len(knownValues))
   elif aggregationMethod == 'sum':
@@ -438,7 +438,7 @@
 
   knownPercent = float(len(knownValues)) / float(len(neighborValues))
   if knownPercent >= xff: #we have enough data to propagate a value!
-    aggregateValue = __aggregate(aggregationMethod, knownValues)
+    aggregateValue = aggregate(aggregationMethod, knownValues)
     myPackedPoint = struct.pack(pointFormat,lowerIntervalStart,aggregateValue)
     fh.seek(lower['offset'])
     packedPoint = fh.read(pointSize)


Follow ups