graphite-dev team mailing list archive
-
graphite-dev team
-
Mailing list archive
-
Message #02588
[Merge] lp:~jens-rantil/whisper/whisper into lp:whisper
Jens Rantil has proposed merging lp:~jens-rantil/whisper/whisper into lp:whisper.
Requested reviews:
graphite-dev (graphite-dev)
For more details, see:
https://code.launchpad.net/~jens-rantil/whisper/whisper/+merge/105582
This branch proposes the following changes:
* Removing unnecessary stdout from whisper-update.py
* Using setuptools instead of distutils. This makes it possible to use 'python setup.py develop' for easier development.
* Introducing '--aggregate' command line parameter to whisper-resize.py for smarter resize algorithms.
The last change was tested using the following script:
===============
#!/bin/bash
echo "================================================================="
rm -f test.wsp
whisper-create.py --xFilesFactor=0 --aggregationMethod=sum test.wsp 1s:20s
CREATED=$(date +%s)
echo "Created: $CREATED"
whisper-update.py test.wsp $((CREATED)):1 $((CREATED-1)):1 $((CREATED-2)):1 $((CREATED-3)):1 $((CREATED-4)):1
echo
echo "Resizing without aggregation:"
whisper-resize.py test.wsp 2s:20s
echo
echo "Dumping result without aggregation:"
whisper-dump.py test.wsp
echo "================================================================="
echo "================================================================="
rm -f test.wsp
whisper-create.py --xFilesFactor=0 --aggregationMethod=sum test.wsp 1s:20s
CREATED=$(date +%s)
echo "Created: $CREATED"
whisper-update.py test.wsp $((CREATED)):1 $((CREATED-1)):1 $((CREATED-2)):1 $((CREATED-3)):1 $((CREATED-4)):1
# Seems to be a bug if I am not waiting a bit here
# Please help figuring this out if you have any input!
sleep 2
echo
echo "Resizing with aggregation:"
whisper-resize.py --aggregate test.wsp 2s:20s
echo
echo "Dumping result without aggregation:"
whisper-dump.py test.wsp
echo "================================================================="
===============
--
https://code.launchpad.net/~jens-rantil/whisper/whisper/+merge/105582
Your team graphite-dev is requested to review the proposed merge of lp:~jens-rantil/whisper/whisper into lp:whisper.
=== modified file 'bin/whisper-resize.py'
--- bin/whisper-resize.py 2011-08-01 08:38:41 +0000
+++ bin/whisper-resize.py 2012-05-13 13:10:24 +0000
@@ -3,6 +3,8 @@
import sys, os, time, traceback
import whisper
from optparse import OptionParser
+import bisect
+import math
now = int(time.time())
@@ -33,6 +35,10 @@
option_parser.add_option(
'--nobackup', action='store_true',
help='Delete the .bak file after successful execution')
+option_parser.add_option(
+ '--aggregate', action='store_true',
+ help='Try to aggregate the values to fit the new archive better.'
+ ' Note that this will make things slower and use more memory.')
(options, args) = option_parser.parse_args()
@@ -80,12 +86,58 @@
size = os.stat(newfile).st_size
print 'Created: %s (%d bytes)' % (newfile,size)
-print 'Migrating data...'
-for archive in old_archives:
- timeinfo, values = archive['data']
- datapoints = zip( range(*timeinfo), values )
- datapoints = filter(lambda p: p[1] is not None, datapoints)
- whisper.update_many(newfile, datapoints)
+if options.aggregate:
+ # This is where data will be interpolated (best effort)
+ print 'Migrating data with aggregation...'
+ alldatapoints = []
+ for archive in old_archives:
+ # Loading all datapoints into memory for fast querying
+ timeinfo, values = archive['data']
+ datapoints = zip( range(*timeinfo), values )
+ alldatapoints += datapoints
+
+ # Must be sorted since we are doing binary search for all elements
+ alldatapoints.sort()
+
+ oldtimestamps = map( lambda p: p[0], alldatapoints)
+ oldvalues = map( lambda p: p[1], alldatapoints)
+
+ # Simply cleaning up some used memory
+ del alldatapoints
+
+ new_info = whisper.info(newfile)
+ new_archives = new_info['archives']
+
+ for archive in new_archives:
+ fromTime = now - archive['retention'] + archive['secondsPerPoint']
+ untilTime = now
+ timeinfo, _values = whisper.fetch(newfile, fromTime, untilTime)
+ del _values # Freeing up unused return value
+ timepoints_to_update = range(*timeinfo)
+ newdatapoints = []
+ for tinterval in zip( timepoints_to_update[:-1], timepoints_to_update[1:] ):
+ # TODO: Setting lo= parameter for 'lefti' based on righti from previous
+ # iteration. Obviously, this can only be done if
+ # timepoints_to_update is always updated. Is it?
+ lefti = bisect.bisect_left(oldtimestamps, tinterval[0])
+ righti = bisect.bisect_left(oldtimestamps, tinterval[1], lo=lefti)
+ newvalues = oldvalues[lefti:righti]
+ if newvalues:
+ non_none = filter( lambda x: x is not None, newvalues)
+ if 1.0*len(non_none)/len(newvalues) >= xff:
+ newdatapoints.append([tinterval[0],
+ whisper.aggregate(aggregationMethod,
+ non_none)])
+ elif xff == 0:
+ newdatapoints.append([tinterval[0], 0])
+ whisper.update_many(newfile, newdatapoints)
+else:
+ print 'Migrating data without aggregation...'
+ for archive in old_archives:
+ timeinfo, values = archive['data']
+ datapoints = zip( range(*timeinfo), values )
+ datapoints = filter(lambda p: p[1] is not None, datapoints)
+ whisper.update_many(newfile, datapoints)
if options.newfile is not None:
sys.exit(0)
=== modified file 'bin/whisper-update.py'
--- bin/whisper-update.py 2011-07-26 14:24:24 +0000
+++ bin/whisper-update.py 2012-05-13 13:10:24 +0000
@@ -25,5 +25,4 @@
timestamp,value = datapoints[0]
whisper.update(path, value, timestamp)
else:
- print datapoints
whisper.update_many(path, datapoints)
=== modified file 'setup.py'
--- setup.py 2012-05-01 08:09:32 +0000
+++ setup.py 2012-05-13 13:10:24 +0000
@@ -2,7 +2,7 @@
import os
from glob import glob
-from distutils.core import setup
+from setuptools import setup
setup(
=== modified file 'whisper.py'
--- whisper.py 2012-04-21 08:27:14 +0000
+++ whisper.py 2012-05-13 13:10:24 +0000
@@ -364,7 +364,7 @@
fh.close()
-def __aggregate(aggregationMethod, knownValues):
+def aggregate(aggregationMethod, knownValues):
if aggregationMethod == 'average':
return float(sum(knownValues)) / float(len(knownValues))
elif aggregationMethod == 'sum':
@@ -438,7 +438,7 @@
knownPercent = float(len(knownValues)) / float(len(neighborValues))
if knownPercent >= xff: #we have enough data to propagate a value!
- aggregateValue = __aggregate(aggregationMethod, knownValues)
+ aggregateValue = aggregate(aggregationMethod, knownValues)
myPackedPoint = struct.pack(pointFormat,lowerIntervalStart,aggregateValue)
fh.seek(lower['offset'])
packedPoint = fh.read(pointSize)
Follow ups