← Back to team overview

graphite-dev team mailing list archive

[Merge] lp:~sidnei/graphite/twistd-plugins into lp:graphite


Sidnei da Silva has proposed merging lp:~sidnei/graphite/twistd-plugins into lp:graphite with lp:~sidnei/graphite/hardcoded-conf-dir as a prerequisite.

Requested reviews:
  graphite-dev (graphite-dev)

For more details, see:

Refactor carbon startup scripts to use twistd, register each service as a separate twistd plugin.
Your team graphite-dev is requested to review the proposed merge of lp:~sidnei/graphite/twistd-plugins into lp:graphite.
=== modified file 'carbon/bin/carbon-aggregator.py'
--- carbon/bin/carbon-aggregator.py	2011-07-08 20:52:33 +0000
+++ carbon/bin/carbon-aggregator.py	2011-07-08 20:52:34 +0000
@@ -1,11 +1,35 @@
 #!/usr/bin/env python
+"""Copyright 2009 Chris Davis
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+   http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License."""
 import sys
-import os
-import atexit
-from os.path import basename, dirname, exists, join, isdir
-program = basename( sys.argv[0] ).split('.')[0]
+from os.path import dirname, join, abspath
+# Figure out where we're installed
+BIN_DIR = dirname(abspath(__file__))
+ROOT_DIR = dirname(BIN_DIR)
+# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from
+# source.
+LIB_DIR = join(ROOT_DIR, 'lib')
+sys.path.insert(0, LIB_DIR)
+if __name__ == "__main__":
+    # If we were run directly, call ourselves as a tac file instead.
+    from carbon.util import run_tac
+    run_tac(__file__)
 # Initialize twisted
@@ -13,151 +37,11 @@
-from twisted.internet import reactor
-# Figure out where we're installed
-BIN_DIR = dirname( os.path.abspath(__file__) )
-ROOT_DIR = dirname(BIN_DIR)
-LIB_DIR = join(ROOT_DIR, 'lib')
-sys.path.insert(0, LIB_DIR)
-os.environ['GRAPHITE_ROOT'] = ROOT_DIR
-# Capture useful debug info for this commonly reported problem
-  import carbon
-except ImportError:
-  print 'Failed to import carbon, debug information follows.'
-  print 'pwd=%s' % os.getcwd()
-  print 'sys.path=%s' % sys.path
-  print '__file__=%s' % __file__
-  sys.exit(1)
-# Read config (we want failures to occur before daemonizing)
-from carbon.conf import (get_default_parser, parse_options,
-                         read_config, settings as global_settings)
-parser = get_default_parser()
-    '--rules',
-    default=None,
-    help='Use the give aggregation rules file')
-(options, args) = parse_options(parser, sys.argv[1:])
-settings = read_config(program, options, ROOT_DIR=ROOT_DIR)
-if options.rules is None:
-    options.rules = join(settings.CONF_DIR, "aggregation-rules.conf")
-pidfile = settings.pidfile
-logdir = settings.LOG_DIR
-__builtins__.program = program
-action = args[0]
-if action == 'stop':
-  if not exists(pidfile):
-    print 'Pidfile %s does not exist' % pidfile
-    raise SystemExit(0)
-  pf = open(pidfile, 'r')
-  try:
-    pid = int( pidfile.read().strip() )
-  except:
-    print 'Could not read pidfile %s' % pidfile
-    raise SystemExit(1)
-  print 'Deleting %s (contained pid %d)' % (pidfile, pid)
-  os.unlink(pidfile)
-  print 'Sending kill signal to pid %d' % pid
-  os.kill(pid, 15)
-  raise SystemExit(0)
-elif action == 'status':
-  if not exists(pidfile):
-    print '%s is not running' % program
-    raise SystemExit(0)
-  pf = open(pidfile, 'r')
-  try:
-    pid = int( pidfile.read().strip() )
-  except:
-    print 'Failed to read pid from %s' % pidfile
-    raise SystemExit(1)
-  if exists('/proc/%d' % pid):
-    print "%s is running with pid %d" % (program, pid)
-    raise SystemExit(0)
-  else:
-    print "%s is not running" % program
-    raise SystemExit(0)
-# Import application components
-from carbon.log import logToStdout, logToDir
-from carbon.instrumentation import startRecording
-from carbon.listeners import MetricLineReceiver, MetricPickleReceiver, startListener
-from carbon.aggregator.rules import RuleManager
-from carbon.aggregator import receiver
-from carbon.aggregator import client
-from carbon.rewrite import RewriteRuleManager
-from carbon.events import metricReceived
-from carbon.util import daemonize
-rewrite_rules_conf = join(settings.CONF_DIR, 'rewrite-rules.conf')
-if exists(rewrite_rules_conf):
-  RewriteRuleManager.read_from(rewrite_rules_conf)
-# --debug
-if options.debug:
-  logToStdout()
-  if not isdir(logdir):
-    os.makedirs(logdir)
-  daemonize()
-  pf = open(pidfile, 'w')
-  pf.write( str(os.getpid()) )
-  pf.close()
-  def shutdown():
-    if os.path.exists(pidfile):
-      os.unlink(pidfile)
-  atexit.register(shutdown)
-  logToDir(logdir)
-# Configure application components
-startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver)
-startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver)
-client.connect(settings.DESTINATION_HOST, settings.DESTINATION_PORT)
-# Run the twisted reactor
-print "%s running with pid %d" % (program, os.getpid())
-if options.profile:
-  import cProfile
-  if exists(options.profile):
-    os.unlink(options.profile)
-  cProfile.run('reactor.run()', options.profile)
-  reactor.run()
+from twisted.application.service import Application
+from carbon import service
+application = Application("carbon-aggregator")
+aggregator_service = service.createAggregatorService(None)

=== modified file 'carbon/bin/carbon-cache.py'
--- carbon/bin/carbon-cache.py	2011-07-08 20:52:33 +0000
+++ carbon/bin/carbon-cache.py	2011-07-08 20:52:34 +0000
@@ -14,15 +14,22 @@
 limitations under the License."""
 import sys
-import os
-import socket
-import pwd
-import atexit
-from os.path import basename, dirname, exists, join, isdir
-program = basename( sys.argv[0] ).split('.')[0]
-hostname = socket.gethostname().split('.')[0]
+from os.path import dirname, join, abspath
+# Figure out where we're installed
+BIN_DIR = dirname(abspath(__file__))
+ROOT_DIR = dirname(BIN_DIR)
+# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from
+# source.
+LIB_DIR = join(ROOT_DIR, 'lib')
+sys.path.insert(0, LIB_DIR)
+if __name__ == "__main__":
+    # If we were run directly, call ourselves as a tac file instead.
+    from carbon.util import run_tac
+    run_tac(__file__)
 # Initialize twisted
@@ -30,176 +37,11 @@
-from twisted.internet import reactor
-# Figure out where we're installed
-BIN_DIR = dirname( os.path.abspath(__file__) )
-ROOT_DIR = dirname(BIN_DIR)
-LIB_DIR = join(ROOT_DIR, 'lib')
-sys.path.insert(0, LIB_DIR)
-# Capture useful debug info for this commonly reported problem
-  import carbon
-except ImportError:
-  print 'Failed to import carbon, debug information follows.'
-  print 'pwd=%s' % os.getcwd()
-  print 'sys.path=%s' % sys.path
-  print '__file__=%s' % __file__
-  sys.exit(1)
-# Read config (we want failures to occur before daemonizing)
-from carbon.conf import (get_default_parser, parse_options,
-                         read_config, settings as global_settings)
-(options, args) = parse_options(get_default_parser(), sys.argv[1:])
-settings = read_config(program, options, ROOT_DIR=ROOT_DIR)
-instance = options.instance
-pidfile = settings.pidfile
-logdir = settings.LOG_DIR
-__builtins__.instance = instance # This isn't as evil as you might think
-__builtins__.program = program
-action = args[0]
-if action == 'stop':
-  if not exists(pidfile):
-    print 'Pidfile %s does not exist' % pidfile
-    raise SystemExit(0)
-  pf = open(pidfile, 'r')
-  try:
-    pid = int( pf.read().strip() )
-  except:
-    print 'Could not read pidfile %s' % pidfile
-    raise SystemExit(1)
-  print 'Deleting %s (contained pid %d)' % (pidfile, pid)
-  os.unlink(pidfile)
-  print 'Sending kill signal to pid %d' % pid
-  os.kill(pid, 15)
-  raise SystemExit(0)
-elif action == 'status':
-  if not exists(pidfile):
-    print '%s (instance %s) is not running' % (program, instance)
-    raise SystemExit(0)
-  pf = open(pidfile, 'r')
-  try:
-    pid = int( pf.read().strip() )
-  except:
-    print 'Failed to read pid from %s' % pidfile
-    raise SystemExit(1)
-  if exists('/proc/%d' % pid):
-    print "%s (instance %s) is running with pid %d" % (program, instance, pid)
-    raise SystemExit(0)
-  else:
-    print "%s (instance %s) is not running" % (program, instance)
-    raise SystemExit(0)
-if exists(pidfile):
-  print "Pidfile %s already exists, is %s already running?" % (pidfile, program)
-  raise SystemExit(1)
-# Import application components
-from carbon.log import logToStdout, logToDir
-from carbon.listeners import MetricLineReceiver, MetricPickleReceiver, CacheQueryHandler, startListener
-from carbon.cache import MetricCache
-from carbon.instrumentation import startRecording
-from carbon.events import metricReceived
-storage_schemas = join(settings.CONF_DIR, 'storage-schemas.conf')
-if not exists(storage_schemas):
-  print "Error: missing required config %s" % storage_schemas
-  sys.exit(1)
-use_amqp = settings.get("ENABLE_AMQP", False)
-if use_amqp:
-  from carbon import amqp_listener
-  amqp_host = settings.get("AMQP_HOST", "localhost")
-  amqp_port = settings.get("AMQP_PORT", 5672)
-  amqp_user = settings.get("AMQP_USER", "guest")
-  amqp_password = settings.get("AMQP_PASSWORD", "guest")
-  amqp_verbose  = settings.get("AMQP_VERBOSE", False)
-  amqp_vhost    = settings.get("AMQP_VHOST", "/")
-  amqp_spec     = settings.get("AMQP_SPEC", None)
-  amqp_exchange_name = settings.get("AMQP_EXCHANGE", "graphite")
-# --debug
-if options.debug:
-  logToStdout()
-  if not isdir(logdir):
-    os.makedirs(logdir)
-  if settings.USER:
-    print "Dropping privileges to become the user %s" % settings.USER
-  from carbon.util import daemonize, dropprivs
-  daemonize()
-  pf = open(pidfile, 'w')
-  pf.write( str(os.getpid()) )
-  pf.close()
-  def shutdown():
-    if os.path.exists(pidfile):
-      os.unlink(pidfile)
-  atexit.register(shutdown)
-  if settings.USER:
-    pwent = pwd.getpwnam(settings.USER)
-    os.chown(pidfile, pwent.pw_uid, pwent.pw_gid)
-    dropprivs(settings.USER)
-  logToDir(logdir)
-# Configure application components
-startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver)
-startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver)
-startListener(settings.CACHE_QUERY_INTERFACE, settings.CACHE_QUERY_PORT, CacheQueryHandler)
-if use_amqp:
-  amqp_listener.startReceiver(amqp_host, amqp_port, amqp_user, amqp_password,
-                              vhost=amqp_vhost, spec=amqp_spec,
-                              exchange_name=amqp_exchange_name,
-                              verbose=amqp_verbose)
-if settings.ENABLE_MANHOLE:
-  from carbon import manhole
-  manhole.start()
-from carbon.writer import startWriter # have to import this *after* settings are defined
-# Run the twisted reactor
-print "%s running [instance %s]" % (program, instance)
-if options.profile:
-  import cProfile
-  if exists(options.profile):
-    os.unlink(options.profile)
-  cProfile.run('reactor.run()', options.profile)
-  reactor.run()
+from twisted.application.service import Application
+from carbon import service
+application = Application("carbon-cache")
+cache_service = service.createCacheService(None)

=== modified file 'carbon/bin/carbon-relay.py'
--- carbon/bin/carbon-relay.py	2011-07-08 20:52:33 +0000
+++ carbon/bin/carbon-relay.py	2011-07-08 20:52:34 +0000
@@ -14,13 +14,22 @@
 limitations under the License."""
 import sys
-import os
-import atexit
-from os.path import basename, dirname, exists, join, isdir
-program = basename( sys.argv[0] ).split('.')[0]
+from os.path import dirname, join, abspath
+# Figure out where we're installed
+BIN_DIR = dirname(abspath(__file__))
+ROOT_DIR = dirname(BIN_DIR)
+# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from
+# source.
+LIB_DIR = join(ROOT_DIR, 'lib')
+sys.path.insert(0, LIB_DIR)
+if __name__ == "__main__":
+    # If we were run directly, call ourselves as a tac file instead.
+    from carbon.util import run_tac
+    run_tac(__file__)
 # Initialize twisted
@@ -28,156 +37,11 @@
-from twisted.internet import reactor
-# Figure out where we're installed
-BIN_DIR = dirname(__file__)
-ROOT_DIR = dirname(BIN_DIR)
-LIB_DIR = join(ROOT_DIR, 'lib')
-sys.path.insert(0, LIB_DIR)
-# Capture useful debug info for this commonly reported problem
-  import carbon
-except ImportError:
-  print 'Failed to import carbon, debug information follows.'
-  print 'pwd=%s' % os.getcwd()
-  print 'sys.path=%s' % sys.path
-  print '__file__=%s' % __file__
-  sys.exit(1)
-# Read config (we want failures to occur before daemonizing)
-from carbon.conf import (get_default_parser, parse_options,
-                         read_config, settings as global_settings)
-parser = get_default_parser()
-    '--rules',
-    default=None,
-    help='Use the given relay rules file')
-(options, args) = parse_options(parser, sys.argv[1:])
-settings = read_config(program, options, ROOT_DIR=ROOT_DIR)
-if options.rules is None:
-    options.rules = join(settings.CONF_DIR, "relay-rules.conf")
-pidfile = settings.pidfile
-logdir = settings.LOG_DIR
-__builtins__.program = program
-action = args[0]
-if action == 'stop':
-  if not exists(pidfile):
-    print 'Pidfile %s does not exist' % pidfile
-    raise SystemExit(0)
-  pf = open(pidfile, 'r')
-  try:
-    pid = int( pf.read().strip() )
-  except:
-    print 'Could not read pidfile %s' % pidfile
-    raise SystemExit(1)
-  print 'Deleting %s (contained pid %d)' % (pidfile, pid)
-  os.unlink(pidfile)
-  print 'Sending kill signal to pid %d' % pid
-  os.kill(pid, 15)
-  raise SystemExit(0)
-elif action == 'status':
-  if not exists(pidfile):
-    print '%s is not running' % program
-    raise SystemExit(0)
-  pf = open(pidfile, 'r')
-  try:
-    pid = int( pf.read().strip() )
-  except:
-    print 'Failed to read pid from %s' % pidfile
-    raise SystemExit(1)
-  if exists('/proc/%d' % pid):
-    print "%s is running with pid %d" % (program, pid)
-    raise SystemExit(0)
-  else:
-    print "%s is not running" % program
-    raise SystemExit(0)
-if exists(pidfile):
-  print "Pidfile %s already exists, is %s already running?" % (pidfile, program)
-  raise SystemExit(1)
-# Quick validation
-if settings.RELAY_METHOD not in ('rules', 'consistent-hashing'):
-  print "In carbon.conf, RELAY_METHOD must be either 'rules' or 'consistent-hashing'. Invalid value: '%s'" % settings.RELAY_METHOD
-  sys.exit(1)
-# Import application components
-from carbon.log import logToStdout, logToDir, msg
-from carbon.listeners import MetricLineReceiver, MetricPickleReceiver, startListener
-from carbon.relay import createClientConnections, relay
-from carbon.events import metricReceived
-from carbon.instrumentation import startRecording
-from carbon.rules import loadRules, allDestinationServers, parseHostList
-from carbon.hashing import setDestinationHosts
-# --debug
-if options.debug:
-  logToStdout()
-  if not isdir(logdir):
-    os.makedirs(logdir)
-  from carbon.util import daemonize
-  daemonize()
-  logToDir(logdir)
-  pidfile = open(pidfile, 'w')
-  pidfile.write( str(os.getpid()) )
-  pidfile.close()
-  def shutdown():
-    if os.path.exists(pidfile):
-      os.unlink(pidfile)
-  atexit.register(shutdown)
-# Configure application components
-startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver)
-startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver)
-if settings.RELAY_METHOD == 'rules':
-  loadRules(options.rules)
-  createClientConnections( allDestinationServers() )
-elif settings.RELAY_METHOD == 'consistent-hashing':
-  hosts = parseHostList(settings.CH_HOST_LIST)
-  msg('consistent-hashing hosts = %s' % str(hosts))
-  setDestinationHosts(hosts)
-  createClientConnections(hosts)
-# Run the twisted reactor
-if options.profile:
-  import cProfile
-  if exists(options.profile):
-    os.unlink(options.profile)
-  cProfile.run('reactor.run()', options.profile)
-  reactor.run()
+from twisted.application.service import Application
+from carbon import service
+application = Application("carbon-relay")
+relay_service = service.createRelayService(None)

=== modified file 'carbon/lib/carbon/amqp_listener.py'
--- carbon/lib/carbon/amqp_listener.py	2011-04-25 15:50:10 +0000
+++ carbon/lib/carbon/amqp_listener.py	2011-07-08 20:52:34 +0000
@@ -147,11 +147,12 @@
         p.factory = self
         return p
-def startReceiver(host, port, username, password, vhost, exchange_name,
-                  spec=None, channel=1, verbose=False):
-    """Starts a twisted process that will read messages on the amqp broker
-    and post them as metrics."""
+def createAMQPListener(username, password, vhost, exchange_name,
+                       spec=None, channel=1, verbose=False):
+    """
+    Create an C{AMQPReconnectingFactory} configured with the specified options.
+    """
     # use provided spec if not specified
     if not spec:
         spec = txamqp.spec.load(os.path.normpath(
@@ -161,6 +162,17 @@
     factory = AMQPReconnectingFactory(username, password, delegate, vhost,
                                       spec, channel, exchange_name,
+    return factory
+def startReceiver(host, port, username, password, vhost, exchange_name,
+                  spec=None, channel=1, verbose=False):
+    """
+    Starts a twisted process that will read messages on the amqp broker and
+    post them as metrics.
+    """
+    factory = createAMQPListener(username, password, vhost, exchange_name,
+                                 spec=spec, channel=channel, verbose=verbose)
     reactor.connectTCP(host, port, factory)

=== modified file 'carbon/lib/carbon/conf.py'
--- carbon/lib/carbon/conf.py	2011-07-08 20:52:33 +0000
+++ carbon/lib/carbon/conf.py	2011-07-08 20:52:34 +0000
@@ -13,10 +13,19 @@
 limitations under the License."""
 import os
-from os.path import join, dirname, normpath
+import sys
+import pwd
+from os.path import join, dirname, normpath, abspath, basename, exists
 from optparse import OptionParser
 from ConfigParser import ConfigParser
+import whisper
+from carbon import log
+from twisted.python import usage
+from twisted.scripts import twistd
 defaults = dict(
@@ -118,6 +127,89 @@
+class CarbonServerOptions(usage.Options):
+    optParameters = [
+        ["config", "c", None, "Use the given config file."],
+        ["instance", "", None, "Manage a specific carbon instance."],
+        ["logdir", "", None, "Write logs to the given directory."],
+        ["pidfile", "", None, "Name of the pidfile"],
+        ]
+    def postOptions(self):
+        global settings
+        ROOT_DIR = dirname(dirname(abspath(self["python"])))
+        program = basename(self["python"]).split('.')[0]
+        program_settings = read_config(program, self, ROOT_DIR=ROOT_DIR)
+        settings.update(program_settings)
+        settings["program"] = program
+        if settings.USER:
+            self["uid"], self["gid"] = pwd.getpwnam(settings.USER)[2:4]
+        self["pidfile"] = settings["pidfile"]
+        storage_schemas = join(settings["CONF_DIR"], "storage-schemas.conf")
+        if not exists(storage_schemas):
+            print "Error: missing required config %s" % storage_schemas
+            sys.exit(1)
+        if settings.WHISPER_AUTOFLUSH:
+            log.msg("enabling whisper autoflush")
+            whisper.AUTOFLUSH = True
+class CarbonCacheOptions(CarbonServerOptions, twistd.ServerOptions):
+    optParameters = (
+        twistd.ServerOptions.optParameters +
+        CarbonServerOptions.optParameters
+        )
+    def postOptions(self):
+        twistd.ServerOptions.postOptions(self)
+        CarbonServerOptions.postOptions(self)
+class CarbonAggregatorOptions(CarbonCacheOptions):
+    optParameters = [
+        ["rules", "", None, "Use the given aggregation rules file."],
+        ["rewrite-rules", "", None, "Use the given rewrite rules file."],
+        ] + CarbonCacheOptions.optParameters
+    def postOptions(self):
+        CarbonCacheOptions.postOptions(self)
+        if self["rules"] is None:
+            self["rules"] = join(settings["CONF_DIR"], "aggregation-rules.conf")
+        settings["aggregation-rules"] = self["rules"]
+        if self["rewrite-rules"] is None:
+            self["rewrite-rules"] = join(settings["CONF_DIR"],
+                                         "rewrite-rules.conf")
+        settings["rewrite-rules"] = self["rewrite-rules"]
+class CarbonRelayOptions(CarbonCacheOptions):
+    optParameters = [
+        ["rules", "", None, "Use the given relay rules file."],
+        ] + CarbonCacheOptions.optParameters
+    def postOptions(self):
+        CarbonCacheOptions.postOptions(self)
+        if self["rules"] is None:
+            self["rules"] = join(settings["CONF_DIR"], "relay-rules.conf")
+        settings["relay-rules"] = self["rules"]
+        if settings["RELAY_METHOD"] not in ("rules", "consistent-hashing"):
+            print ("In carbon.conf, RELAY_METHOD must be either 'rules' or "
+                   "'consistent-hashing'. Invalid value: '%s'" %
+                   settings.RELAY_METHOD)
+            sys.exit(1)
 def get_default_parser(usage="%prog [options] <start|stop|status>"):
     """Create a parser for command line options."""
     parser = OptionParser(usage=usage)
@@ -167,7 +259,7 @@
 def read_config(program, options, **kwargs):
     Read settings for 'program' from configuration file specified by
-    'options.config', with missing values provided by 'defaults'.
+    'options["config"]', with missing values provided by 'defaults'.
     settings = Settings()
@@ -184,40 +276,44 @@
     # 'GRAPHITE_CONF_DIR' environment variable.
-                                       join(settings.ROOT_DIR, "conf")))
-    if options.config is None:
-        options.config = join(settings.CONF_DIR, "carbon.conf")
+                                       join(settings["ROOT_DIR"], "conf")))
+    if options["config"] is None:
+        options["config"] = join(settings["CONF_DIR"], "carbon.conf")
         # Set 'CONF_DIR' to the parent directory of the 'carbon.conf' config
         # file.
-        settings["CONF_DIR"] = dirname(normpath(options.config))
+        settings["CONF_DIR"] = dirname(normpath(options["config"]))
     # Storage directory can be overriden by the 'GRAPHITE_STORAGE_DIR'
     # environment variable. It defaults to a path relative to 'ROOT_DIR' for
     # backwards compatibility though.
-                                       join(settings.ROOT_DIR, "storage")))
-    settings.setdefault("LOG_DIR", join(settings.STORAGE_DIR, "log", program))
+                                       join(settings["ROOT_DIR"], "storage")))
+    settings.setdefault(
+        "LOG_DIR", join(settings["STORAGE_DIR"], "log", program))
     # Read configuration options from program-specific section.
     section = program[len("carbon-"):]
-    settings.readFrom(options.config, section)
+    settings.readFrom(options["config"], section)
     # If a specific instance of the program is specified, augment the settings
     # with the instance-specific settings and provide sane defaults for
     # optional settings.
-    if options.instance:
-        settings.readFrom(options.config, "%s:%s" % (section, options.instance))
-        settings["pidfile"] = (options.pidfile or
-                               join(settings.STORAGE_DIR,
-                                    "%s-%s.pid" % (program, options.instance)))
-        settings["LOG_DIR"] = (options.logdir or
-                              "%s-%s" % (settings.LOG_DIR.rstrip('/'),
-                                          options.instance))
+    if options["instance"]:
+        settings.readFrom(options["config"],
+                          "%s:%s" % (section, options["instance"]))
+        settings["pidfile"] = (
+            options["pidfile"] or
+            join(settings["STORAGE_DIR"], "%s-%s.pid" %
+                 (program, options["instance"])))
+        settings["LOG_DIR"] = (options["logdir"] or
+                              "%s-%s" % (settings["LOG_DIR"].rstrip('/'),
+                                          options["instance"]))
-        settings["pidfile"] = (options.pidfile or
-                               join(settings.STORAGE_DIR, '%s.pid' % program))
-        settings["LOG_DIR"] = (options.logdir or settings.LOG_DIR)
+        settings["pidfile"] = (
+            options["pidfile"] or
+            join(settings["STORAGE_DIR"], '%s.pid' % program))
+        settings["LOG_DIR"] = (options["logdir"] or settings["LOG_DIR"])
     return settings

=== modified file 'carbon/lib/carbon/events.py'
--- carbon/lib/carbon/events.py	2009-09-10 19:28:51 +0000
+++ carbon/lib/carbon/events.py	2011-07-08 20:52:34 +0000
@@ -2,6 +2,7 @@
 class EventHandler:
   def __init__(self, defaultHandler=None):
     self.handler = defaultHandler

=== modified file 'carbon/lib/carbon/instrumentation.py'
--- carbon/lib/carbon/instrumentation.py	2011-04-09 05:06:50 +0000
+++ carbon/lib/carbon/instrumentation.py	2011-07-08 20:52:34 +0000
@@ -2,13 +2,14 @@
 import time
 import socket
 from resource import getrusage, RUSAGE_SELF
+from twisted.application.service import Service
 from twisted.internet.task import LoopingCall
 stats = {}
 HOSTNAME = socket.gethostname().replace('.','_')
 PAGESIZE = os.sysconf('SC_PAGESIZE')
-recordTask = None
 rusage = getrusage(RUSAGE_SELF)
 lastUsage = rusage.ru_utime + rusage.ru_stime
 lastUsageTime = time.time()
@@ -56,8 +57,6 @@
 def startRecording():
   global recordTask
-  recordTask = LoopingCall(recordMetrics)
-  recordTask.start(60, now=False)
 def recordMetrics():
@@ -138,6 +137,20 @@
   send_metric(fullMetric, datapoint)
+class InstrumentationService(Service):
+    def __init__(self):
+        self.record_task = LoopingCall(recordMetrics)
+    def startService(self):
+        self.record_task.start(60, False)
+        Service.startService(self)
+    def stopService(self):
+        self.record_task.stop()
+        Service.stopService(self)
 # Avoid import circularity
 from carbon.aggregator.buffers import BufferManager
 from carbon.aggregator.client import send_metric

=== modified file 'carbon/lib/carbon/manhole.py'
--- carbon/lib/carbon/manhole.py	2011-04-02 00:44:19 +0000
+++ carbon/lib/carbon/manhole.py	2011-07-08 20:52:34 +0000
@@ -1,4 +1,4 @@
-from twisted.cred import portal, checkers
+from twisted.cred import portal
 from twisted.conch.ssh import keys
 from twisted.conch.checkers import SSHPublicKeyDatabase
 from twisted.conch.manhole import Manhole
@@ -21,8 +21,7 @@
       keyBlob = self.userKeys[credentials.username]
       return keyBlob == credentials.blob
-def start():
+def createManholeListener():
   sshRealm = TerminalRealm()
   sshRealm.chainedProtocolFactory.protocolFactory = lambda _: Manhole(namespace)
@@ -37,4 +36,8 @@
   sshPortal = portal.Portal(sshRealm)
   sessionFactory = ConchFactory(sshPortal)
-  reactor.listenTCP(settings.MANHOLE_PORT, sessionFactory, interface=settings.MANHOLE_INTERFACE)
+  return sessionFactory
+def start():
+    sessionFactory = createManholeListener()
+    reactor.listenTCP(settings.MANHOLE_PORT, sessionFactory, interface=settings.MANHOLE_INTERFACE)

=== added file 'carbon/lib/carbon/service.py'
--- carbon/lib/carbon/service.py	1970-01-01 00:00:00 +0000
+++ carbon/lib/carbon/service.py	2011-07-08 20:52:34 +0000
@@ -0,0 +1,157 @@
+#!/usr/bin/env python
+"""Copyright 2009 Chris Davis
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+   http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License."""
+from os.path import exists
+from twisted.application.service import MultiService
+from twisted.application.internet import TCPServer, TCPClient
+from twisted.internet.protocol import ServerFactory
+def createBaseService(config):
+    from carbon.conf import settings
+    from carbon.listeners import MetricLineReceiver, MetricPickleReceiver
+    root_service = MultiService()
+    root_service.setName(settings.program)
+    use_amqp = settings.get("ENABLE_AMQP", False)
+    if use_amqp:
+        from carbon import amqp_listener
+        amqp_host = settings.get("AMQP_HOST", "localhost")
+        amqp_port = settings.get("AMQP_PORT", 5672)
+        amqp_user = settings.get("AMQP_USER", "guest")
+        amqp_password = settings.get("AMQP_PASSWORD", "guest")
+        amqp_verbose  = settings.get("AMQP_VERBOSE", False)
+        amqp_vhost    = settings.get("AMQP_VHOST", "/")
+        amqp_spec     = settings.get("AMQP_SPEC", None)
+        amqp_exchange_name = settings.get("AMQP_EXCHANGE", "graphite")
+    for interface, port, protocol in ((settings.LINE_RECEIVER_INTERFACE,
+                                       settings.LINE_RECEIVER_PORT,
+                                       MetricLineReceiver),
+                                      (settings.PICKLE_RECEIVER_INTERFACE,
+                                       settings.PICKLE_RECEIVER_PORT,
+                                       MetricPickleReceiver)):
+        factory = ServerFactory()
+        factory.protocol = protocol
+        service = TCPServer(int(port), factory, interface=interface)
+        service.setServiceParent(root_service)
+    if use_amqp:
+        factory = amqp_listener.createAMQPListener(
+            amqp_user, amqp_password,
+            vhost=amqp_vhost, spec=amqp_spec,
+            exchange_name=amqp_exchange_name,
+            verbose=amqp_verbose)
+        service = TCPClient(amqp_host, int(amqp_port), factory)
+        service.setServiceParent(root_service)
+    if settings.ENABLE_MANHOLE:
+        from carbon import manhole
+        factory = manhole.createManholeListener()
+        service = TCPServer(int(settings.MANHOLE_PORT), factory,
+                            interface=settings.MANHOLE_INTERFACE)
+        service.setServiceParent(root_service)
+    # have to import this *after* settings are defined
+    from carbon.writer import WriterService
+    service = WriterService()
+    service.setServiceParent(root_service)
+    # Instantiate an instrumentation service that will record metrics about
+    # this service.
+    from carbon.instrumentation import InstrumentationService
+    service = InstrumentationService()
+    service.setServiceParent(root_service)
+    return root_service
+def createCacheService(config):
+    from carbon.cache import MetricCache
+    from carbon.conf import settings
+    from carbon.events import metricReceived
+    from carbon.listeners import CacheQueryHandler
+    # Configure application components
+    metricReceived.installHandler(MetricCache.store)
+    root_service = createBaseService(config)
+    factory = ServerFactory()
+    factory.protocol = CacheQueryHandler
+    service = TCPServer(int(settings.CACHE_QUERY_PORT), factory,
+                        interface=settings.CACHE_QUERY_INTERFACE)
+    service.setServiceParent(root_service)
+    # have to import this *after* settings are defined
+    from carbon.writer import WriterService
+    service = WriterService()
+    service.setServiceParent(root_service)
+    return root_service
+def createAggregatorService(config):
+    from carbon.events import metricReceived
+    from carbon.aggregator import receiver
+    from carbon.aggregator.rules import RuleManager
+    from carbon.aggregator import client
+    from carbon.rewrite import RewriteRuleManager
+    from carbon.conf import settings
+    root_service = createBaseService(config)
+    # Configure application components
+    metricReceived.installHandler(receiver.process)
+    RuleManager.read_from(settings["aggregation-rules"])
+    if exists(settings["rewrite-rules"]):
+        RewriteRuleManager.read_from(settings["rewrite-rules"])
+    client.connect(settings["DESTINATION_HOST"],
+                   int(settings["DESTINATION_PORT"]))
+    return root_service
+def createRelayService(config):
+    from carbon.log import msg
+    from carbon.conf import settings
+    from carbon.events import metricReceived
+    from carbon.hashing import setDestinationHosts
+    from carbon.relay import createClientConnections, relay
+    from carbon.rules import loadRules, allDestinationServers, parseHostList
+    root_service = createBaseService(config)
+    # Configure application components
+    metricReceived.installHandler(relay)
+    if settings["RELAY_METHOD"] == "rules":
+        loadRules(settings["relay-rules"])
+        createClientConnections(allDestinationServers())
+    elif settings["RELAY_METHOD"] == "consistent-hashing":
+        hosts = parseHostList(settings["CH_HOST_LIST"])
+        msg('consistent-hashing hosts = %s' % str(hosts))
+        setDestinationHosts(hosts)
+        createClientConnections(hosts)
+    return root_service

=== modified file 'carbon/lib/carbon/util.py'
--- carbon/lib/carbon/util.py	2009-12-13 01:35:28 +0000
+++ carbon/lib/carbon/util.py	2011-07-08 20:52:34 +0000
@@ -1,22 +1,50 @@
-import sys
 import os
 import pwd
-def daemonize():
-  if os.fork() > 0: sys.exit(0)
-  os.setsid()
-  if os.fork() > 0: sys.exit(0)
-  si = open('/dev/null', 'r')
-  so = open('/dev/null', 'a+')
-  se = open('/dev/null', 'a+', 0)
-  os.dup2(si.fileno(), sys.stdin.fileno())
-  os.dup2(so.fileno(), sys.stdout.fileno())
-  os.dup2(se.fileno(), sys.stderr.fileno())
+from os.path import basename, isdir
+import carbon
+from twisted.application import service
+from twisted import plugin
+from twisted.python.util import initgroups
+from twisted.scripts.twistd import runApp
+from twisted.scripts._twistd_unix import daemonize
+daemonize = daemonize # Backwards compatibility
 def dropprivs(user):
-  uid,gid = pwd.getpwnam(user)[2:4]
-  os.setregid(gid,gid)
-  os.setreuid(uid,uid)
-  return (uid,gid)
+  uid, gid = pwd.getpwnam(user)[2:4]
+  initgroups(uid, gid)
+  os.setregid(gid, gid)
+  os.setreuid(uid, uid)
+  return (uid, gid)
+def run_tac(tac_file):
+    from carbon.log import logToDir
+    from carbon.conf import settings
+    plugins = {}
+    for plug in plugin.getPlugins(service.IServiceMaker):
+        plugins[plug.tapname] = plug
+    program = basename(tac_file).split('.')[0]
+    config = plugins[program].options()
+    config["python"] = tac_file
+    config["umask"] = 022
+    config.parseOptions()
+    if not config["nodaemon"]:
+        logdir = settings.LOG_DIR
+        if not isdir(logdir):
+            os.makedirs(logdir)
+        logToDir(logdir)
+    # This isn't as evil as you might think
+    __builtins__["instance"] = config["instance"]
+    __builtins__["program"] = program
+    runApp(config)

=== modified file 'carbon/lib/carbon/writer.py'
--- carbon/lib/carbon/writer.py	2011-04-02 19:54:28 +0000
+++ carbon/lib/carbon/writer.py	2011-07-08 20:52:34 +0000
@@ -16,26 +16,29 @@
 import os
 import time
 from os.path import join, exists, dirname, basename
-from threading import Thread
-from twisted.internet import reactor
-from twisted.internet.task import LoopingCall
+  import cPickle as pickle
+except ImportError:
+  import pickle
 import whisper
 from carbon.cache import MetricCache
 from carbon.storage import getFilesystemPath, loadStorageSchemas
 from carbon.conf import settings
 from carbon.instrumentation import increment, append
 from carbon import log
-  import cPickle as pickle
-except ImportError:
-  import pickle
-  log.msg("enabling whisper autoflush")
-  whisper.AUTOFLUSH = True
+from twisted.internet import reactor
+from twisted.internet.task import LoopingCall
+from twisted.application.service import Service
 lastCreateInterval = 0
 createCount = 0
+schemas = loadStorageSchemas()
 def optimalWriteOrder():
   "Generates metrics with the most cached values first and applies a soft rate limit on new metrics"
@@ -177,10 +180,16 @@
-schemaReloadTask = LoopingCall(reloadStorageSchemas)
-schemas = loadStorageSchemas()
-def startWriter():
-  schemaReloadTask.start(60)
-  reactor.callInThread(writeForever)
+class WriterService(Service):
+    def __init__(self):
+        self.reload_task = LoopingCall(reloadStorageSchemas)
+    def startService(self):
+        self.reload_task.start(60, False)
+        reactor.callInThread(writeForever)
+        Service.startService(self)
+    def stopService(self):
+        self.reload_task.stop()
+        Service.stopService(self)

=== added directory 'carbon/lib/twisted'
=== added directory 'carbon/lib/twisted/plugins'
=== added file 'carbon/lib/twisted/plugins/carbon_aggregator_plugin.py'
--- carbon/lib/twisted/plugins/carbon_aggregator_plugin.py	1970-01-01 00:00:00 +0000
+++ carbon/lib/twisted/plugins/carbon_aggregator_plugin.py	2011-07-08 20:52:34 +0000
@@ -0,0 +1,25 @@
+from zope.interface import implements
+from twisted.plugin import IPlugin
+from twisted.application.service import IServiceMaker
+from carbon import service
+from carbon import conf
+class CarbonAggregatorServiceMaker(object):
+    implements(IServiceMaker, IPlugin)
+    tapname = "carbon-aggregator"
+    description = "Aggregate stats for graphite."
+    options = conf.CarbonAggregatorOptions
+    def makeService(self, options):
+        """
+        Construct a C{carbon-aggregator} service.
+        """
+        return service.createAggregatorService(options)
+# Now construct an object which *provides* the relevant interfaces
+serviceMaker = CarbonAggregatorServiceMaker()

=== added file 'carbon/lib/twisted/plugins/carbon_cache_plugin.py'
--- carbon/lib/twisted/plugins/carbon_cache_plugin.py	1970-01-01 00:00:00 +0000
+++ carbon/lib/twisted/plugins/carbon_cache_plugin.py	2011-07-08 20:52:34 +0000
@@ -0,0 +1,25 @@
+from zope.interface import implements
+from twisted.plugin import IPlugin
+from twisted.application.service import IServiceMaker
+from carbon import service
+from carbon import conf
+class CarbonCacheServiceMaker(object):
+    implements(IServiceMaker, IPlugin)
+    tapname = "carbon-cache"
+    description = "Collect stats for graphite."
+    options = conf.CarbonCacheOptions
+    def makeService(self, options):
+        """
+        Construct a C{carbon-cache} service.
+        """
+        return service.createCacheService(options)
+# Now construct an object which *provides* the relevant interfaces
+serviceMaker = CarbonCacheServiceMaker()

=== added file 'carbon/lib/twisted/plugins/carbon_relay_plugin.py'
--- carbon/lib/twisted/plugins/carbon_relay_plugin.py	1970-01-01 00:00:00 +0000
+++ carbon/lib/twisted/plugins/carbon_relay_plugin.py	2011-07-08 20:52:34 +0000
@@ -0,0 +1,25 @@
+from zope.interface import implements
+from twisted.plugin import IPlugin
+from twisted.application.service import IServiceMaker
+from carbon import service
+from carbon import conf
+class CarbonRelayServiceMaker(object):
+    implements(IServiceMaker, IPlugin)
+    tapname = "carbon-relay"
+    description = "Relay stats for graphite."
+    options = conf.CarbonRelayOptions
+    def makeService(self, options):
+        """
+        Construct a C{carbon-aggregator} service.
+        """
+        return service.createRelayService(options)
+# Now construct an object which *provides* the relevant interfaces
+serviceMaker = CarbonRelayServiceMaker()