← Back to team overview

graphite-dev team mailing list archive

[Merge] lp:~sidnei/graphite/whisper-merge-command into lp:graphite

 

Sidnei da Silva has proposed merging lp:~sidnei/graphite/whisper-merge-command into lp:graphite.

Requested reviews:
  graphite-dev (graphite-dev)

For more details, see:
https://code.launchpad.net/~sidnei/graphite/whisper-merge-command/+merge/92472

Add a whisper-merge command, which will feed data from one whisper file into
another.
-- 
https://code.launchpad.net/~sidnei/graphite/whisper-merge-command/+merge/92472
Your team graphite-dev is requested to review the proposed merge of lp:~sidnei/graphite/whisper-merge-command into lp:graphite.
=== added file 'whisper/bin/whisper-merge.py'
--- whisper/bin/whisper-merge.py	1970-01-01 00:00:00 +0000
+++ whisper/bin/whisper-merge.py	2012-02-10 13:49:32 +0000
@@ -0,0 +1,22 @@
+#!/usr/bin/env python
+
+import sys
+import whisper
+
+from optparse import OptionParser
+
+option_parser = OptionParser(
+    usage='''%prog [options] from_path to_path''')
+
+(options, args) = option_parser.parse_args()
+
+if len(args) < 2:
+  option_parser.print_usage()
+  sys.exit(1)
+
+path_from = args[0]
+path_to = args[1]
+
+with open(path_from, 'r+b') as from_fh:
+  with open(path_to, 'r+b') as to_fh:
+    whisper.file_merge(from_fh, to_fh)

=== modified file 'whisper/whisper.py'
--- whisper/whisper.py	2012-02-10 01:39:20 +0000
+++ whisper/whisper.py	2012-02-10 13:49:32 +0000
@@ -25,7 +25,7 @@
 #		Archive = Point+
 #			Point = timestamp,value
 
-import os, struct, time
+import os, struct, time, operator, itertools
 
 try:
   import fcntl
@@ -730,3 +730,29 @@
   fh.close()
   timeInfo = (fromInterval,untilInterval,step)
   return (timeInfo,valueList)
+
+def file_merge(from_fh, to_fh, step=1<<12):
+  headerFrom = __readHeader(from_fh)
+
+  archives = headerFrom['archives']
+  archives.sort(key=operator.itemgetter('retention'), reverse=True)
+
+  # Start from maxRetention of the oldest file, and skip forward at max 'step'
+  # points at a time.
+  fromTime = int(time.time()) - headerFrom['maxRetention']
+  for archive in archives:
+    pointsToRead = archive['points']
+    while pointsToRead:
+      maxPoints = step
+      if archive['points'] < step:
+        maxPoints = archive['points']
+      pointsToRead -= maxPoints
+      untilTime = fromTime + (maxPoints * archive['secondsPerPoint'])
+      (timeInfo, values) = file_fetch(from_fh, fromTime, untilTime)
+      (start, end, step) = timeInfo
+      pointsToWrite = list(itertools.ifilter(
+        lambda points: points[1] is not None,
+        itertools.izip(xrange(start, end, step), values)))
+      pointsToWrite.sort(key=lambda p: p[0],reverse=True) #order points by timestamp, newest first
+      file_update_many(to_fh, pointsToWrite)
+      fromTime = untilTime