← Back to team overview

graphite-dev team mailing list archive

[Merge] lp:~jerith/graphite/storage-aggregation into lp:graphite

 

Jeremy Thurgood has proposed merging lp:~jerith/graphite/storage-aggregation into lp:graphite.

Requested reviews:
  graphite-dev (graphite-dev)
Related bugs:
  Bug #853955 in Graphite: "Whisper aggregation method cannot be specified in storage schema"
  https://bugs.launchpad.net/graphite/+bug/853955

For more details, see:
https://code.launchpad.net/~jerith/graphite/storage-aggregation/+merge/76355

This branch adds an optional storage-aggregation.conf to specify whisper retention aggregation parameters based on metric names.
-- 
https://code.launchpad.net/~jerith/graphite/storage-aggregation/+merge/76355
Your team graphite-dev is requested to review the proposed merge of lp:~jerith/graphite/storage-aggregation into lp:graphite.
=== modified file 'carbon/lib/carbon/storage.py'
--- carbon/lib/carbon/storage.py	2011-09-11 06:43:44 +0000
+++ carbon/lib/carbon/storage.py	2011-09-21 08:56:30 +0000
@@ -26,6 +26,7 @@
 
 
 STORAGE_SCHEMAS_CONFIG = join(settings.CONF_DIR, 'storage-schemas.conf')
+STORAGE_AGGREGATION_CONFIG = join(settings.CONF_DIR, 'storage-aggregation.conf')
 STORAGE_LISTS_DIR = join(settings.CONF_DIR, 'lists')
 
 def getFilesystemPath(metric):
@@ -146,5 +147,52 @@
   schemaList.append(defaultSchema)
   return schemaList
 
+
+def loadAggregationSchemas():
+  # NOTE: This abuses the Schema classes above, and should probably be refactored.
+  schemaList = []
+  config = OrderedConfigParser()
+
+  try:
+    config.read(STORAGE_AGGREGATION_CONFIG)
+  except IOError:
+    log.msg("%s not found, ignoring." % STORAGE_AGGREGATION_CONFIG)
+
+  for section in config.sections():
+    options = dict( config.items(section) )
+    matchAll = options.get('match-all')
+    pattern = options.get('pattern')
+    listName = options.get('list')
+
+    xFilesFactor = options.get('xfilesfactor')
+    aggregationMethod = options.get('aggregationmethod')
+
+    try:
+      if xFilesFactor is not None:
+        xFilesFactor = float(xFilesFactor)
+        assert 0 <= xFilesFactor <= 1
+      if aggregationMethod is not None:
+        assert aggregationMethod in whisper.aggregationMethods
+    except:
+      log.msg("Invalid schemas found in %s." % section )
+      continue
+
+    archives = (xFilesFactor, aggregationMethod)
+
+    if matchAll:
+      mySchema = DefaultSchema(section, archives)
+
+    elif pattern:
+      mySchema = PatternSchema(section, pattern, archives)
+
+    elif listName:
+      mySchema = ListSchema(section, listName, archives)
+
+    schemaList.append(mySchema)
+
+  schemaList.append(defaultAggregation)
+  return schemaList
+
 defaultArchive = Archive(60, 60 * 24 * 7) #default retention for unclassified data (7 days of minutely data)
 defaultSchema = DefaultSchema('default', [defaultArchive])
+defaultAggregation = DefaultSchema('default', (None, None))

=== modified file 'carbon/lib/carbon/writer.py'
--- carbon/lib/carbon/writer.py	2011-09-18 09:55:35 +0000
+++ carbon/lib/carbon/writer.py	2011-09-21 08:56:30 +0000
@@ -26,7 +26,7 @@
 import whisper
 
 from carbon.cache import MetricCache
-from carbon.storage import getFilesystemPath, loadStorageSchemas
+from carbon.storage import getFilesystemPath, loadStorageSchemas, loadAggregationSchemas
 from carbon.conf import settings
 from carbon import log, events, instrumentation
 
@@ -38,6 +38,7 @@
 lastCreateInterval = 0
 createCount = 0
 schemas = loadStorageSchemas()
+agg_schemas = loadAggregationSchemas()
 CACHE_SIZE_LOW_WATERMARK = settings.MAX_CACHE_SIZE * 0.95
 
 
@@ -98,6 +99,7 @@
 
       if not dbFileExists:
         archiveConfig = None
+        xFilesFactor, aggregationMethod = None, None
 
         for schema in schemas:
           if schema.matches(metric):
@@ -105,6 +107,12 @@
             archiveConfig = [archive.getTuple() for archive in schema.archives]
             break
 
+        for schema in agg_schemas:
+          if schema.matches(metric):
+            log.creates('new metric %s matched aggregation schema %s' % (metric, schema.name))
+            xFilesFactor, aggregationMethod = schema.archives
+            break
+
         if not archiveConfig:
           raise Exception("No storage schema matched the metric '%s', check your storage-schemas.conf file." % metric)
 
@@ -112,7 +120,8 @@
         os.system("mkdir -p -m 755 '%s'" % dbDir)
 
         log.creates("creating database file %s" % dbFilePath)
-        whisper.create(dbFilePath, archiveConfig)
+        log.creates("  %s, %s, %s" % (archiveConfig, xFilesFactor, aggregationMethod))
+        whisper.create(dbFilePath, archiveConfig, xFilesFactor, aggregationMethod)
         os.chmod(dbFilePath, 0755)
         instrumentation.increment('creates')
 

=== modified file 'docs/config-carbon.rst'
--- docs/config-carbon.rst	2011-09-08 11:37:13 +0000
+++ docs/config-carbon.rst	2011-09-21 08:56:30 +0000
@@ -49,7 +49,7 @@
 
 The pattern matches server names that start with 'www', followed by anything, that end in '.workers.busyWorkers'.  This way not all metrics associated with your webservers need this type of retention.  
 
-As you can see there are multiple retentions.  Each is used in the order that it is provided.  As a general rule, they should be in most-precise:shortest-length to least-precise:longest-time.  Retentions are merely a way to save you disk space and decrease I/O for graphs that span a long period of time. When data moves from a higher precision to a lower precision, it is **averaged**.  This way, you can still find the **total** for a particular time period if you know the original precision.  
+As you can see there are multiple retentions.  Each is used in the order that it is provided.  As a general rule, they should be in most-precise:shortest-length to least-precise:longest-time.  Retentions are merely a way to save you disk space and decrease I/O for graphs that span a long period of time. By default, when data moves from a higher precision to a lower precision, it is **averaged**.  This way, you can still find the **total** for a particular time period if you know the original precision.  (To change the aggregation method, see the next section.)
 
 Example: You store the number of sales per minute for 1 year, and the sales per hour for 5 years after that.  You need to know the total sales for January 1st of the year before.  You can query whisper for the raw data, and you'll get 24 datapoints, one for each hour.  They will most likely be floating point numbers.  You can take each datapoint, multiply by 60 (the ratio of high-precision to low-precision datapoints) and still get the total sales per hour.  
 
@@ -63,6 +63,37 @@
 60 represents the number of seconds per datapoint, and 1440 represents the number of datapoints to store.  This required some unnecessarily complicated math, so although it's valid, it's not recommended.  It's left in so that large organizations with complicated retention rates need not re-write their storage-schemas.conf while when they upgrade. 
 
 
+storage-aggregation.conf
+------------------------
+This file defines how to aggregate data to lower-precision retentions.  The format is similar to ``storage-schemas.conf``.
+Important notes before continuing:
+
+* This file is optional.  If it is not present, defaults will be used.
+* There is no ``retentions`` line.  Instead, there are ``xFilesFactor`` and/or ``aggregationMethod`` lines.
+* ``xFilesFactor`` should be a floating point number between 0 and 1, and specifies what fraction of the previous retention level's slots must have non-null values in order to aggregate to a non-null value.  The default is 0.5.
+* ``aggregationMethod`` specifies the function used to aggregate values for the next retention level.  Legal methods are ``average``, ``sum``, ``min``, ``max``, and ``last``. The default is ``average``.
+* These are set at the time the first metric is sent.
+* Changing this file will not affect .wsp files already created on disk. Use whisper-resize.py to change those.
+
+Here's an example:
+
+.. code-block:: none
+
+ [all_min]
+ pattern = \.min$
+ xFilesFactor = 0.1
+ aggregationMethod = min
+
+The pattern above will match any metric that ends with ``.min``.
+
+The ``xFilesFactor`` line is saying that a minimum of 10% of the slots in the previous retention level must have values for next retention level to contain an aggregate.
+The ``aggregationMethod`` line is saying that the aggregate function to use is ``min``.
+
+If either ``xFilesFactor`` or ``aggregationMethod`` is left out, the default value will be used.
+
+The aggregation parameters are kept separate from the retention parameters because the former depends on the type of data being collected and the latter depends on volume and importance.
+
+
 relay-rules.conf
 ----------------
 Relay rules are used to send certain metrics to a certain backend. This is handled by the carbon-relay system.  It must be running for relaying to work. You can use a regular expression to select the metrics and define the servers to which they should go with the servers line.

=== modified file 'whisper/whisper.py'
--- whisper/whisper.py	2011-09-11 06:43:44 +0000
+++ whisper/whisper.py	2011-09-21 08:56:30 +0000
@@ -246,7 +246,6 @@
   return aggregationTypeToMethod.get(aggregationType, 'average')
 
 
-
 def validateArchiveList(archiveList):
   """ Validates an archiveList.
   An ArchiveList must:
@@ -290,7 +289,7 @@
     return False
   return True
 
-def create(path,archiveList,xFilesFactor=0.5,aggregationMethod='average'):
+def create(path,archiveList,xFilesFactor=None,aggregationMethod=None):
   """create(path,archiveList,xFilesFactor=0.5,aggregationMethod='average')
 
 path is a string
@@ -298,6 +297,12 @@
 xFilesFactor specifies the fraction of data points in a propagation interval that must have known values for a propagation to occur
 aggregationMethod specifies the function to use when propogating data (see ``whisper.aggregationMethods``)
 """
+  # Set default params
+  if xFilesFactor is None:
+    xFilesFactor = 0.5
+  if aggregationMethod is None:
+    aggregationMethod = 'average'
+
   #Validate archive configurations...
   validArchive = validateArchiveList(archiveList)
   if not validArchive: