graphite-dev team mailing list archive
-
graphite-dev team
-
Mailing list archive
-
Message #01067
[Merge] lp:~sidnei/graphite/twistd-plugins into lp:graphite
Sidnei da Silva has proposed merging lp:~sidnei/graphite/twistd-plugins into lp:graphite with lp:~sidnei/graphite/hardcoded-conf-dir as a prerequisite.
Requested reviews:
graphite-dev (graphite-dev)
For more details, see:
https://code.launchpad.net/~sidnei/graphite/twistd-plugins/+merge/67391
Refactor carbon startup scripts to use twistd, register each service as a separate twistd plugin.
--
https://code.launchpad.net/~sidnei/graphite/twistd-plugins/+merge/67391
Your team graphite-dev is requested to review the proposed merge of lp:~sidnei/graphite/twistd-plugins into lp:graphite.
=== modified file 'carbon/bin/carbon-aggregator.py'
--- carbon/bin/carbon-aggregator.py 2011-07-08 20:52:33 +0000
+++ carbon/bin/carbon-aggregator.py 2011-07-08 20:52:34 +0000
@@ -1,11 +1,35 @@
#!/usr/bin/env python
+"""Copyright 2009 Chris Davis
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
import sys
-import os
-import atexit
-from os.path import basename, dirname, exists, join, isdir
-
-
-program = basename( sys.argv[0] ).split('.')[0]
+from os.path import dirname, join, abspath
+
+# Figure out where we're installed
+BIN_DIR = dirname(abspath(__file__))
+ROOT_DIR = dirname(BIN_DIR)
+
+# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from
+# source.
+LIB_DIR = join(ROOT_DIR, 'lib')
+sys.path.insert(0, LIB_DIR)
+
+if __name__ == "__main__":
+ # If we were run directly, call ourselves as a tac file instead.
+ from carbon.util import run_tac
+
+ run_tac(__file__)
# Initialize twisted
try:
@@ -13,151 +37,11 @@
epollreactor.install()
except:
pass
-from twisted.internet import reactor
-
-
-# Figure out where we're installed
-BIN_DIR = dirname( os.path.abspath(__file__) )
-ROOT_DIR = dirname(BIN_DIR)
-LIB_DIR = join(ROOT_DIR, 'lib')
-
-sys.path.insert(0, LIB_DIR)
-os.environ['GRAPHITE_ROOT'] = ROOT_DIR
-
-# Capture useful debug info for this commonly reported problem
-try:
- import carbon
-except ImportError:
- print 'Failed to import carbon, debug information follows.'
- print 'pwd=%s' % os.getcwd()
- print 'sys.path=%s' % sys.path
- print '__file__=%s' % __file__
- sys.exit(1)
-
-
-# Read config (we want failures to occur before daemonizing)
-from carbon.conf import (get_default_parser, parse_options,
- read_config, settings as global_settings)
-
-
-parser = get_default_parser()
-parser.add_option(
- '--rules',
- default=None,
- help='Use the give aggregation rules file')
-
-(options, args) = parse_options(parser, sys.argv[1:])
-settings = read_config(program, options, ROOT_DIR=ROOT_DIR)
-global_settings.update(settings)
-
-if options.rules is None:
- options.rules = join(settings.CONF_DIR, "aggregation-rules.conf")
-
-pidfile = settings.pidfile
-logdir = settings.LOG_DIR
-
-__builtins__.program = program
-action = args[0]
-
-
-if action == 'stop':
- if not exists(pidfile):
- print 'Pidfile %s does not exist' % pidfile
- raise SystemExit(0)
-
- pf = open(pidfile, 'r')
- try:
- pid = int( pidfile.read().strip() )
- except:
- print 'Could not read pidfile %s' % pidfile
- raise SystemExit(1)
-
- print 'Deleting %s (contained pid %d)' % (pidfile, pid)
- os.unlink(pidfile)
-
- print 'Sending kill signal to pid %d' % pid
- os.kill(pid, 15)
- raise SystemExit(0)
-
-
-elif action == 'status':
- if not exists(pidfile):
- print '%s is not running' % program
- raise SystemExit(0)
-
- pf = open(pidfile, 'r')
- try:
- pid = int( pidfile.read().strip() )
- except:
- print 'Failed to read pid from %s' % pidfile
- raise SystemExit(1)
-
- if exists('/proc/%d' % pid):
- print "%s is running with pid %d" % (program, pid)
- raise SystemExit(0)
- else:
- print "%s is not running" % program
- raise SystemExit(0)
-
-# Import application components
-from carbon.log import logToStdout, logToDir
-from carbon.instrumentation import startRecording
-from carbon.listeners import MetricLineReceiver, MetricPickleReceiver, startListener
-from carbon.aggregator.rules import RuleManager
-from carbon.aggregator import receiver
-from carbon.aggregator import client
-from carbon.rewrite import RewriteRuleManager
-from carbon.events import metricReceived
-from carbon.util import daemonize
-
-RuleManager.read_from(options.rules)
-
-rewrite_rules_conf = join(settings.CONF_DIR, 'rewrite-rules.conf')
-if exists(rewrite_rules_conf):
- RewriteRuleManager.read_from(rewrite_rules_conf)
-
-# --debug
-if options.debug:
- logToStdout()
-
-else:
- if not isdir(logdir):
- os.makedirs(logdir)
-
- daemonize()
-
- pf = open(pidfile, 'w')
- pf.write( str(os.getpid()) )
- pf.close()
-
- def shutdown():
- if os.path.exists(pidfile):
- os.unlink(pidfile)
-
- atexit.register(shutdown)
-
- logToDir(logdir)
-
-
-# Configure application components
-metricReceived.installHandler(receiver.process)
-startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver)
-startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver)
-
-client.connect(settings.DESTINATION_HOST, settings.DESTINATION_PORT)
-startRecording()
-
-
-# Run the twisted reactor
-print "%s running with pid %d" % (program, os.getpid())
-
-if options.profile:
- import cProfile
-
- if exists(options.profile):
- os.unlink(options.profile)
-
- cProfile.run('reactor.run()', options.profile)
-
-else:
- reactor.run()
+
+from twisted.application.service import Application
+from carbon import service
+
+application = Application("carbon-aggregator")
+
+aggregator_service = service.createAggregatorService(None)
+aggregator_service.setServiceParent(application)
=== modified file 'carbon/bin/carbon-cache.py'
--- carbon/bin/carbon-cache.py 2011-07-08 20:52:33 +0000
+++ carbon/bin/carbon-cache.py 2011-07-08 20:52:34 +0000
@@ -14,15 +14,22 @@
limitations under the License."""
import sys
-import os
-import socket
-import pwd
-import atexit
-from os.path import basename, dirname, exists, join, isdir
-
-program = basename( sys.argv[0] ).split('.')[0]
-hostname = socket.gethostname().split('.')[0]
-os.umask(022)
+from os.path import dirname, join, abspath
+
+# Figure out where we're installed
+BIN_DIR = dirname(abspath(__file__))
+ROOT_DIR = dirname(BIN_DIR)
+
+# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from
+# source.
+LIB_DIR = join(ROOT_DIR, 'lib')
+sys.path.insert(0, LIB_DIR)
+
+if __name__ == "__main__":
+ # If we were run directly, call ourselves as a tac file instead.
+ from carbon.util import run_tac
+
+ run_tac(__file__)
# Initialize twisted
try:
@@ -30,176 +37,11 @@
epollreactor.install()
except:
pass
-from twisted.internet import reactor
-
-
-# Figure out where we're installed
-BIN_DIR = dirname( os.path.abspath(__file__) )
-ROOT_DIR = dirname(BIN_DIR)
-LIB_DIR = join(ROOT_DIR, 'lib')
-sys.path.insert(0, LIB_DIR)
-
-
-# Capture useful debug info for this commonly reported problem
-try:
- import carbon
-except ImportError:
- print 'Failed to import carbon, debug information follows.'
- print 'pwd=%s' % os.getcwd()
- print 'sys.path=%s' % sys.path
- print '__file__=%s' % __file__
- sys.exit(1)
-
-
-# Read config (we want failures to occur before daemonizing)
-from carbon.conf import (get_default_parser, parse_options,
- read_config, settings as global_settings)
-
-
-(options, args) = parse_options(get_default_parser(), sys.argv[1:])
-settings = read_config(program, options, ROOT_DIR=ROOT_DIR)
-global_settings.update(settings)
-
-instance = options.instance
-pidfile = settings.pidfile
-logdir = settings.LOG_DIR
-
-
-__builtins__.instance = instance # This isn't as evil as you might think
-__builtins__.program = program
-action = args[0]
-
-
-if action == 'stop':
- if not exists(pidfile):
- print 'Pidfile %s does not exist' % pidfile
- raise SystemExit(0)
-
- pf = open(pidfile, 'r')
- try:
- pid = int( pf.read().strip() )
- except:
- print 'Could not read pidfile %s' % pidfile
- raise SystemExit(1)
-
- print 'Deleting %s (contained pid %d)' % (pidfile, pid)
- os.unlink(pidfile)
-
- print 'Sending kill signal to pid %d' % pid
- os.kill(pid, 15)
- raise SystemExit(0)
-
-
-elif action == 'status':
- if not exists(pidfile):
- print '%s (instance %s) is not running' % (program, instance)
- raise SystemExit(0)
-
- pf = open(pidfile, 'r')
- try:
- pid = int( pf.read().strip() )
- except:
- print 'Failed to read pid from %s' % pidfile
- raise SystemExit(1)
-
- if exists('/proc/%d' % pid):
- print "%s (instance %s) is running with pid %d" % (program, instance, pid)
- raise SystemExit(0)
- else:
- print "%s (instance %s) is not running" % (program, instance)
- raise SystemExit(0)
-
-if exists(pidfile):
- print "Pidfile %s already exists, is %s already running?" % (pidfile, program)
- raise SystemExit(1)
-
-# Import application components
-from carbon.log import logToStdout, logToDir
-from carbon.listeners import MetricLineReceiver, MetricPickleReceiver, CacheQueryHandler, startListener
-from carbon.cache import MetricCache
-from carbon.instrumentation import startRecording
-from carbon.events import metricReceived
-
-storage_schemas = join(settings.CONF_DIR, 'storage-schemas.conf')
-if not exists(storage_schemas):
- print "Error: missing required config %s" % storage_schemas
- sys.exit(1)
-
-use_amqp = settings.get("ENABLE_AMQP", False)
-if use_amqp:
- from carbon import amqp_listener
- amqp_host = settings.get("AMQP_HOST", "localhost")
- amqp_port = settings.get("AMQP_PORT", 5672)
- amqp_user = settings.get("AMQP_USER", "guest")
- amqp_password = settings.get("AMQP_PASSWORD", "guest")
- amqp_verbose = settings.get("AMQP_VERBOSE", False)
- amqp_vhost = settings.get("AMQP_VHOST", "/")
- amqp_spec = settings.get("AMQP_SPEC", None)
- amqp_exchange_name = settings.get("AMQP_EXCHANGE", "graphite")
-
-
-# --debug
-if options.debug:
- logToStdout()
-
-else:
- if not isdir(logdir):
- os.makedirs(logdir)
-
- if settings.USER:
- print "Dropping privileges to become the user %s" % settings.USER
-
- from carbon.util import daemonize, dropprivs
- daemonize()
-
- pf = open(pidfile, 'w')
- pf.write( str(os.getpid()) )
- pf.close()
-
- def shutdown():
- if os.path.exists(pidfile):
- os.unlink(pidfile)
-
- atexit.register(shutdown)
-
- if settings.USER:
- pwent = pwd.getpwnam(settings.USER)
- os.chown(pidfile, pwent.pw_uid, pwent.pw_gid)
- dropprivs(settings.USER)
-
- logToDir(logdir)
-
-# Configure application components
-metricReceived.installHandler(MetricCache.store)
-startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver)
-startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver)
-startListener(settings.CACHE_QUERY_INTERFACE, settings.CACHE_QUERY_PORT, CacheQueryHandler)
-
-if use_amqp:
- amqp_listener.startReceiver(amqp_host, amqp_port, amqp_user, amqp_password,
- vhost=amqp_vhost, spec=amqp_spec,
- exchange_name=amqp_exchange_name,
- verbose=amqp_verbose)
-
-if settings.ENABLE_MANHOLE:
- from carbon import manhole
- manhole.start()
-
-from carbon.writer import startWriter # have to import this *after* settings are defined
-startWriter()
-startRecording()
-
-
-# Run the twisted reactor
-print "%s running [instance %s]" % (program, instance)
-
-if options.profile:
- import cProfile
-
- if exists(options.profile):
- os.unlink(options.profile)
-
- cProfile.run('reactor.run()', options.profile)
-
-else:
- reactor.run()
+
+from twisted.application.service import Application
+from carbon import service
+
+application = Application("carbon-cache")
+
+cache_service = service.createCacheService(None)
+cache_service.setServiceParent(application)
=== modified file 'carbon/bin/carbon-relay.py'
--- carbon/bin/carbon-relay.py 2011-07-08 20:52:33 +0000
+++ carbon/bin/carbon-relay.py 2011-07-08 20:52:34 +0000
@@ -14,13 +14,22 @@
limitations under the License."""
import sys
-import os
-import atexit
-from os.path import basename, dirname, exists, join, isdir
-
-program = basename( sys.argv[0] ).split('.')[0]
-os.umask(022)
-
+from os.path import dirname, join, abspath
+
+# Figure out where we're installed
+BIN_DIR = dirname(abspath(__file__))
+ROOT_DIR = dirname(BIN_DIR)
+
+# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from
+# source.
+LIB_DIR = join(ROOT_DIR, 'lib')
+sys.path.insert(0, LIB_DIR)
+
+if __name__ == "__main__":
+ # If we were run directly, call ourselves as a tac file instead.
+ from carbon.util import run_tac
+
+ run_tac(__file__)
# Initialize twisted
try:
@@ -28,156 +37,11 @@
epollreactor.install()
except:
pass
-from twisted.internet import reactor
-
-
-# Figure out where we're installed
-BIN_DIR = dirname(__file__)
-ROOT_DIR = dirname(BIN_DIR)
-LIB_DIR = join(ROOT_DIR, 'lib')
-sys.path.insert(0, LIB_DIR)
-
-
-# Capture useful debug info for this commonly reported problem
-try:
- import carbon
-except ImportError:
- print 'Failed to import carbon, debug information follows.'
- print 'pwd=%s' % os.getcwd()
- print 'sys.path=%s' % sys.path
- print '__file__=%s' % __file__
- sys.exit(1)
-
-
-# Read config (we want failures to occur before daemonizing)
-from carbon.conf import (get_default_parser, parse_options,
- read_config, settings as global_settings)
-
-
-parser = get_default_parser()
-parser.add_option(
- '--rules',
- default=None,
- help='Use the given relay rules file')
-
-(options, args) = parse_options(parser, sys.argv[1:])
-settings = read_config(program, options, ROOT_DIR=ROOT_DIR)
-global_settings.update(settings)
-
-if options.rules is None:
- options.rules = join(settings.CONF_DIR, "relay-rules.conf")
-
-pidfile = settings.pidfile
-logdir = settings.LOG_DIR
-
-__builtins__.program = program
-action = args[0]
-
-
-if action == 'stop':
- if not exists(pidfile):
- print 'Pidfile %s does not exist' % pidfile
- raise SystemExit(0)
-
- pf = open(pidfile, 'r')
- try:
- pid = int( pf.read().strip() )
- except:
- print 'Could not read pidfile %s' % pidfile
- raise SystemExit(1)
-
- print 'Deleting %s (contained pid %d)' % (pidfile, pid)
- os.unlink(pidfile)
-
- print 'Sending kill signal to pid %d' % pid
- os.kill(pid, 15)
- raise SystemExit(0)
-
-
-elif action == 'status':
- if not exists(pidfile):
- print '%s is not running' % program
- raise SystemExit(0)
-
- pf = open(pidfile, 'r')
- try:
- pid = int( pf.read().strip() )
- except:
- print 'Failed to read pid from %s' % pidfile
- raise SystemExit(1)
-
- if exists('/proc/%d' % pid):
- print "%s is running with pid %d" % (program, pid)
- raise SystemExit(0)
- else:
- print "%s is not running" % program
- raise SystemExit(0)
-
-if exists(pidfile):
- print "Pidfile %s already exists, is %s already running?" % (pidfile, program)
- raise SystemExit(1)
-
-# Quick validation
-if settings.RELAY_METHOD not in ('rules', 'consistent-hashing'):
- print "In carbon.conf, RELAY_METHOD must be either 'rules' or 'consistent-hashing'. Invalid value: '%s'" % settings.RELAY_METHOD
- sys.exit(1)
-
-# Import application components
-from carbon.log import logToStdout, logToDir, msg
-from carbon.listeners import MetricLineReceiver, MetricPickleReceiver, startListener
-from carbon.relay import createClientConnections, relay
-from carbon.events import metricReceived
-from carbon.instrumentation import startRecording
-from carbon.rules import loadRules, allDestinationServers, parseHostList
-from carbon.hashing import setDestinationHosts
-
-# --debug
-if options.debug:
- logToStdout()
-else:
- if not isdir(logdir):
- os.makedirs(logdir)
-
- from carbon.util import daemonize
- daemonize()
- logToDir(logdir)
-
- pidfile = open(pidfile, 'w')
- pidfile.write( str(os.getpid()) )
- pidfile.close()
-
- def shutdown():
- if os.path.exists(pidfile):
- os.unlink(pidfile)
-
- atexit.register(shutdown)
-
-
-# Configure application components
-metricReceived.installHandler(relay)
-startListener(settings.LINE_RECEIVER_INTERFACE, settings.LINE_RECEIVER_PORT, MetricLineReceiver)
-startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver)
-
-if settings.RELAY_METHOD == 'rules':
- loadRules(options.rules)
- createClientConnections( allDestinationServers() )
-elif settings.RELAY_METHOD == 'consistent-hashing':
- hosts = parseHostList(settings.CH_HOST_LIST)
- msg('consistent-hashing hosts = %s' % str(hosts))
- setDestinationHosts(hosts)
- createClientConnections(hosts)
-
-startRecording()
-
-
-# Run the twisted reactor
-if options.profile:
- import cProfile
-
- if exists(options.profile):
- os.unlink(options.profile)
-
- cProfile.run('reactor.run()', options.profile)
-
-else:
- reactor.run()
+
+from twisted.application.service import Application
+from carbon import service
+
+application = Application("carbon-relay")
+
+relay_service = service.createRelayService(None)
+relay_service.setServiceParent(application)
=== modified file 'carbon/lib/carbon/amqp_listener.py'
--- carbon/lib/carbon/amqp_listener.py 2011-04-25 15:50:10 +0000
+++ carbon/lib/carbon/amqp_listener.py 2011-07-08 20:52:34 +0000
@@ -147,11 +147,12 @@
p.factory = self
return p
-def startReceiver(host, port, username, password, vhost, exchange_name,
- spec=None, channel=1, verbose=False):
- """Starts a twisted process that will read messages on the amqp broker
- and post them as metrics."""
+def createAMQPListener(username, password, vhost, exchange_name,
+ spec=None, channel=1, verbose=False):
+ """
+ Create an C{AMQPReconnectingFactory} configured with the specified options.
+ """
# use provided spec if not specified
if not spec:
spec = txamqp.spec.load(os.path.normpath(
@@ -161,6 +162,17 @@
factory = AMQPReconnectingFactory(username, password, delegate, vhost,
spec, channel, exchange_name,
verbose=verbose)
+ return factory
+
+
+def startReceiver(host, port, username, password, vhost, exchange_name,
+ spec=None, channel=1, verbose=False):
+ """
+ Starts a twisted process that will read messages on the amqp broker and
+ post them as metrics.
+ """
+ factory = createAMQPListener(username, password, vhost, exchange_name,
+ spec=spec, channel=channel, verbose=verbose)
reactor.connectTCP(host, port, factory)
=== modified file 'carbon/lib/carbon/conf.py'
--- carbon/lib/carbon/conf.py 2011-07-08 20:52:33 +0000
+++ carbon/lib/carbon/conf.py 2011-07-08 20:52:34 +0000
@@ -13,10 +13,19 @@
limitations under the License."""
import os
-from os.path import join, dirname, normpath
+import sys
+import pwd
+
+from os.path import join, dirname, normpath, abspath, basename, exists
from optparse import OptionParser
from ConfigParser import ConfigParser
+import whisper
+from carbon import log
+
+from twisted.python import usage
+from twisted.scripts import twistd
+
defaults = dict(
LOCAL_DATA_DIR="/opt/graphite/storage/whisper/",
@@ -118,6 +127,89 @@
settings.update(defaults)
+class CarbonServerOptions(usage.Options):
+
+ optParameters = [
+ ["config", "c", None, "Use the given config file."],
+ ["instance", "", None, "Manage a specific carbon instance."],
+ ["logdir", "", None, "Write logs to the given directory."],
+ ["pidfile", "", None, "Name of the pidfile"],
+ ]
+
+ def postOptions(self):
+ global settings
+ ROOT_DIR = dirname(dirname(abspath(self["python"])))
+ program = basename(self["python"]).split('.')[0]
+ program_settings = read_config(program, self, ROOT_DIR=ROOT_DIR)
+ settings.update(program_settings)
+
+ settings["program"] = program
+
+ if settings.USER:
+ self["uid"], self["gid"] = pwd.getpwnam(settings.USER)[2:4]
+
+ self["pidfile"] = settings["pidfile"]
+
+ storage_schemas = join(settings["CONF_DIR"], "storage-schemas.conf")
+ if not exists(storage_schemas):
+ print "Error: missing required config %s" % storage_schemas
+ sys.exit(1)
+
+ if settings.WHISPER_AUTOFLUSH:
+ log.msg("enabling whisper autoflush")
+ whisper.AUTOFLUSH = True
+
+
+class CarbonCacheOptions(CarbonServerOptions, twistd.ServerOptions):
+
+ optParameters = (
+ twistd.ServerOptions.optParameters +
+ CarbonServerOptions.optParameters
+ )
+
+ def postOptions(self):
+ twistd.ServerOptions.postOptions(self)
+ CarbonServerOptions.postOptions(self)
+
+
+class CarbonAggregatorOptions(CarbonCacheOptions):
+
+ optParameters = [
+ ["rules", "", None, "Use the given aggregation rules file."],
+ ["rewrite-rules", "", None, "Use the given rewrite rules file."],
+ ] + CarbonCacheOptions.optParameters
+
+ def postOptions(self):
+ CarbonCacheOptions.postOptions(self)
+ if self["rules"] is None:
+ self["rules"] = join(settings["CONF_DIR"], "aggregation-rules.conf")
+ settings["aggregation-rules"] = self["rules"]
+
+ if self["rewrite-rules"] is None:
+ self["rewrite-rules"] = join(settings["CONF_DIR"],
+ "rewrite-rules.conf")
+ settings["rewrite-rules"] = self["rewrite-rules"]
+
+
+class CarbonRelayOptions(CarbonCacheOptions):
+
+ optParameters = [
+ ["rules", "", None, "Use the given relay rules file."],
+ ] + CarbonCacheOptions.optParameters
+
+ def postOptions(self):
+ CarbonCacheOptions.postOptions(self)
+ if self["rules"] is None:
+ self["rules"] = join(settings["CONF_DIR"], "relay-rules.conf")
+ settings["relay-rules"] = self["rules"]
+
+ if settings["RELAY_METHOD"] not in ("rules", "consistent-hashing"):
+ print ("In carbon.conf, RELAY_METHOD must be either 'rules' or "
+ "'consistent-hashing'. Invalid value: '%s'" %
+ settings.RELAY_METHOD)
+ sys.exit(1)
+
+
def get_default_parser(usage="%prog [options] <start|stop|status>"):
"""Create a parser for command line options."""
parser = OptionParser(usage=usage)
@@ -167,7 +259,7 @@
def read_config(program, options, **kwargs):
"""
Read settings for 'program' from configuration file specified by
- 'options.config', with missing values provided by 'defaults'.
+ 'options["config"]', with missing values provided by 'defaults'.
"""
settings = Settings()
settings.update(defaults)
@@ -184,40 +276,44 @@
# 'GRAPHITE_CONF_DIR' environment variable.
settings.setdefault("CONF_DIR",
os.environ.get("GRAPHITE_CONF_DIR",
- join(settings.ROOT_DIR, "conf")))
- if options.config is None:
- options.config = join(settings.CONF_DIR, "carbon.conf")
+ join(settings["ROOT_DIR"], "conf")))
+ if options["config"] is None:
+ options["config"] = join(settings["CONF_DIR"], "carbon.conf")
else:
# Set 'CONF_DIR' to the parent directory of the 'carbon.conf' config
# file.
- settings["CONF_DIR"] = dirname(normpath(options.config))
+ settings["CONF_DIR"] = dirname(normpath(options["config"]))
# Storage directory can be overriden by the 'GRAPHITE_STORAGE_DIR'
# environment variable. It defaults to a path relative to 'ROOT_DIR' for
# backwards compatibility though.
settings.setdefault("STORAGE_DIR",
os.environ.get("GRAPHITE_STORAGE_DIR",
- join(settings.ROOT_DIR, "storage")))
- settings.setdefault("LOG_DIR", join(settings.STORAGE_DIR, "log", program))
+ join(settings["ROOT_DIR"], "storage")))
+ settings.setdefault(
+ "LOG_DIR", join(settings["STORAGE_DIR"], "log", program))
# Read configuration options from program-specific section.
section = program[len("carbon-"):]
- settings.readFrom(options.config, section)
+ settings.readFrom(options["config"], section)
# If a specific instance of the program is specified, augment the settings
# with the instance-specific settings and provide sane defaults for
# optional settings.
- if options.instance:
- settings.readFrom(options.config, "%s:%s" % (section, options.instance))
- settings["pidfile"] = (options.pidfile or
- join(settings.STORAGE_DIR,
- "%s-%s.pid" % (program, options.instance)))
- settings["LOG_DIR"] = (options.logdir or
- "%s-%s" % (settings.LOG_DIR.rstrip('/'),
- options.instance))
+ if options["instance"]:
+ settings.readFrom(options["config"],
+ "%s:%s" % (section, options["instance"]))
+ settings["pidfile"] = (
+ options["pidfile"] or
+ join(settings["STORAGE_DIR"], "%s-%s.pid" %
+ (program, options["instance"])))
+ settings["LOG_DIR"] = (options["logdir"] or
+ "%s-%s" % (settings["LOG_DIR"].rstrip('/'),
+ options["instance"]))
else:
- settings["pidfile"] = (options.pidfile or
- join(settings.STORAGE_DIR, '%s.pid' % program))
- settings["LOG_DIR"] = (options.logdir or settings.LOG_DIR)
+ settings["pidfile"] = (
+ options["pidfile"] or
+ join(settings["STORAGE_DIR"], '%s.pid' % program))
+ settings["LOG_DIR"] = (options["logdir"] or settings["LOG_DIR"])
return settings
=== modified file 'carbon/lib/carbon/events.py'
--- carbon/lib/carbon/events.py 2009-09-10 19:28:51 +0000
+++ carbon/lib/carbon/events.py 2011-07-08 20:52:34 +0000
@@ -2,6 +2,7 @@
class EventHandler:
+
def __init__(self, defaultHandler=None):
self.handler = defaultHandler
=== modified file 'carbon/lib/carbon/instrumentation.py'
--- carbon/lib/carbon/instrumentation.py 2011-04-09 05:06:50 +0000
+++ carbon/lib/carbon/instrumentation.py 2011-07-08 20:52:34 +0000
@@ -2,13 +2,14 @@
import time
import socket
from resource import getrusage, RUSAGE_SELF
+
+from twisted.application.service import Service
from twisted.internet.task import LoopingCall
stats = {}
HOSTNAME = socket.gethostname().replace('.','_')
PAGESIZE = os.sysconf('SC_PAGESIZE')
-recordTask = None
rusage = getrusage(RUSAGE_SELF)
lastUsage = rusage.ru_utime + rusage.ru_stime
lastUsageTime = time.time()
@@ -56,8 +57,6 @@
def startRecording():
global recordTask
- recordTask = LoopingCall(recordMetrics)
- recordTask.start(60, now=False)
def recordMetrics():
@@ -138,6 +137,20 @@
send_metric(fullMetric, datapoint)
+class InstrumentationService(Service):
+
+ def __init__(self):
+ self.record_task = LoopingCall(recordMetrics)
+
+ def startService(self):
+ self.record_task.start(60, False)
+ Service.startService(self)
+
+ def stopService(self):
+ self.record_task.stop()
+ Service.stopService(self)
+
+
# Avoid import circularity
from carbon.aggregator.buffers import BufferManager
from carbon.aggregator.client import send_metric
=== modified file 'carbon/lib/carbon/manhole.py'
--- carbon/lib/carbon/manhole.py 2011-04-02 00:44:19 +0000
+++ carbon/lib/carbon/manhole.py 2011-07-08 20:52:34 +0000
@@ -1,4 +1,4 @@
-from twisted.cred import portal, checkers
+from twisted.cred import portal
from twisted.conch.ssh import keys
from twisted.conch.checkers import SSHPublicKeyDatabase
from twisted.conch.manhole import Manhole
@@ -21,8 +21,7 @@
keyBlob = self.userKeys[credentials.username]
return keyBlob == credentials.blob
-
-def start():
+def createManholeListener():
sshRealm = TerminalRealm()
sshRealm.chainedProtocolFactory.protocolFactory = lambda _: Manhole(namespace)
@@ -37,4 +36,8 @@
sshPortal = portal.Portal(sshRealm)
sshPortal.registerChecker(credChecker)
sessionFactory = ConchFactory(sshPortal)
- reactor.listenTCP(settings.MANHOLE_PORT, sessionFactory, interface=settings.MANHOLE_INTERFACE)
+ return sessionFactory
+
+def start():
+ sessionFactory = createManholeListener()
+ reactor.listenTCP(settings.MANHOLE_PORT, sessionFactory, interface=settings.MANHOLE_INTERFACE)
=== added file 'carbon/lib/carbon/service.py'
--- carbon/lib/carbon/service.py 1970-01-01 00:00:00 +0000
+++ carbon/lib/carbon/service.py 2011-07-08 20:52:34 +0000
@@ -0,0 +1,157 @@
+#!/usr/bin/env python
+"""Copyright 2009 Chris Davis
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+from os.path import exists
+
+from twisted.application.service import MultiService
+from twisted.application.internet import TCPServer, TCPClient
+from twisted.internet.protocol import ServerFactory
+
+
+def createBaseService(config):
+ from carbon.conf import settings
+ from carbon.listeners import MetricLineReceiver, MetricPickleReceiver
+
+ root_service = MultiService()
+ root_service.setName(settings.program)
+
+ use_amqp = settings.get("ENABLE_AMQP", False)
+ if use_amqp:
+ from carbon import amqp_listener
+
+ amqp_host = settings.get("AMQP_HOST", "localhost")
+ amqp_port = settings.get("AMQP_PORT", 5672)
+ amqp_user = settings.get("AMQP_USER", "guest")
+ amqp_password = settings.get("AMQP_PASSWORD", "guest")
+ amqp_verbose = settings.get("AMQP_VERBOSE", False)
+ amqp_vhost = settings.get("AMQP_VHOST", "/")
+ amqp_spec = settings.get("AMQP_SPEC", None)
+ amqp_exchange_name = settings.get("AMQP_EXCHANGE", "graphite")
+
+
+ for interface, port, protocol in ((settings.LINE_RECEIVER_INTERFACE,
+ settings.LINE_RECEIVER_PORT,
+ MetricLineReceiver),
+ (settings.PICKLE_RECEIVER_INTERFACE,
+ settings.PICKLE_RECEIVER_PORT,
+ MetricPickleReceiver)):
+ factory = ServerFactory()
+ factory.protocol = protocol
+ service = TCPServer(int(port), factory, interface=interface)
+ service.setServiceParent(root_service)
+
+ if use_amqp:
+ factory = amqp_listener.createAMQPListener(
+ amqp_user, amqp_password,
+ vhost=amqp_vhost, spec=amqp_spec,
+ exchange_name=amqp_exchange_name,
+ verbose=amqp_verbose)
+ service = TCPClient(amqp_host, int(amqp_port), factory)
+ service.setServiceParent(root_service)
+
+ if settings.ENABLE_MANHOLE:
+ from carbon import manhole
+
+ factory = manhole.createManholeListener()
+ service = TCPServer(int(settings.MANHOLE_PORT), factory,
+ interface=settings.MANHOLE_INTERFACE)
+ service.setServiceParent(root_service)
+
+ # have to import this *after* settings are defined
+ from carbon.writer import WriterService
+
+ service = WriterService()
+ service.setServiceParent(root_service)
+
+ # Instantiate an instrumentation service that will record metrics about
+ # this service.
+ from carbon.instrumentation import InstrumentationService
+
+ service = InstrumentationService()
+ service.setServiceParent(root_service)
+
+ return root_service
+
+
+def createCacheService(config):
+ from carbon.cache import MetricCache
+ from carbon.conf import settings
+ from carbon.events import metricReceived
+ from carbon.listeners import CacheQueryHandler
+
+ # Configure application components
+ metricReceived.installHandler(MetricCache.store)
+
+ root_service = createBaseService(config)
+ factory = ServerFactory()
+ factory.protocol = CacheQueryHandler
+ service = TCPServer(int(settings.CACHE_QUERY_PORT), factory,
+ interface=settings.CACHE_QUERY_INTERFACE)
+ service.setServiceParent(root_service)
+
+ # have to import this *after* settings are defined
+ from carbon.writer import WriterService
+
+ service = WriterService()
+ service.setServiceParent(root_service)
+
+ return root_service
+
+
+def createAggregatorService(config):
+ from carbon.events import metricReceived
+ from carbon.aggregator import receiver
+ from carbon.aggregator.rules import RuleManager
+ from carbon.aggregator import client
+ from carbon.rewrite import RewriteRuleManager
+ from carbon.conf import settings
+
+ root_service = createBaseService(config)
+
+ # Configure application components
+ metricReceived.installHandler(receiver.process)
+ RuleManager.read_from(settings["aggregation-rules"])
+ if exists(settings["rewrite-rules"]):
+ RewriteRuleManager.read_from(settings["rewrite-rules"])
+
+ client.connect(settings["DESTINATION_HOST"],
+ int(settings["DESTINATION_PORT"]))
+
+ return root_service
+
+
+def createRelayService(config):
+ from carbon.log import msg
+ from carbon.conf import settings
+ from carbon.events import metricReceived
+ from carbon.hashing import setDestinationHosts
+ from carbon.relay import createClientConnections, relay
+ from carbon.rules import loadRules, allDestinationServers, parseHostList
+
+ root_service = createBaseService(config)
+
+ # Configure application components
+ metricReceived.installHandler(relay)
+
+ if settings["RELAY_METHOD"] == "rules":
+ loadRules(settings["relay-rules"])
+ createClientConnections(allDestinationServers())
+ elif settings["RELAY_METHOD"] == "consistent-hashing":
+ hosts = parseHostList(settings["CH_HOST_LIST"])
+ msg('consistent-hashing hosts = %s' % str(hosts))
+ setDestinationHosts(hosts)
+ createClientConnections(hosts)
+
+ return root_service
=== modified file 'carbon/lib/carbon/util.py'
--- carbon/lib/carbon/util.py 2009-12-13 01:35:28 +0000
+++ carbon/lib/carbon/util.py 2011-07-08 20:52:34 +0000
@@ -1,22 +1,50 @@
-import sys
import os
import pwd
-
-def daemonize():
- if os.fork() > 0: sys.exit(0)
- os.setsid()
- if os.fork() > 0: sys.exit(0)
- si = open('/dev/null', 'r')
- so = open('/dev/null', 'a+')
- se = open('/dev/null', 'a+', 0)
- os.dup2(si.fileno(), sys.stdin.fileno())
- os.dup2(so.fileno(), sys.stdout.fileno())
- os.dup2(se.fileno(), sys.stderr.fileno())
+from os.path import basename, isdir
+
+import carbon
+
+from twisted.application import service
+from twisted import plugin
+from twisted.python.util import initgroups
+from twisted.scripts.twistd import runApp
+from twisted.scripts._twistd_unix import daemonize
+
+
+daemonize = daemonize # Backwards compatibility
def dropprivs(user):
- uid,gid = pwd.getpwnam(user)[2:4]
- os.setregid(gid,gid)
- os.setreuid(uid,uid)
- return (uid,gid)
+ uid, gid = pwd.getpwnam(user)[2:4]
+ initgroups(uid, gid)
+ os.setregid(gid, gid)
+ os.setreuid(uid, uid)
+ return (uid, gid)
+
+
+def run_tac(tac_file):
+ from carbon.log import logToDir
+ from carbon.conf import settings
+
+ plugins = {}
+ for plug in plugin.getPlugins(service.IServiceMaker):
+ plugins[plug.tapname] = plug
+
+ program = basename(tac_file).split('.')[0]
+ config = plugins[program].options()
+ config["python"] = tac_file
+ config["umask"] = 022
+ config.parseOptions()
+
+ if not config["nodaemon"]:
+ logdir = settings.LOG_DIR
+ if not isdir(logdir):
+ os.makedirs(logdir)
+ logToDir(logdir)
+
+ # This isn't as evil as you might think
+ __builtins__["instance"] = config["instance"]
+ __builtins__["program"] = program
+
+ runApp(config)
=== modified file 'carbon/lib/carbon/writer.py'
--- carbon/lib/carbon/writer.py 2011-04-02 19:54:28 +0000
+++ carbon/lib/carbon/writer.py 2011-07-08 20:52:34 +0000
@@ -16,26 +16,29 @@
import os
import time
from os.path import join, exists, dirname, basename
-from threading import Thread
-from twisted.internet import reactor
-from twisted.internet.task import LoopingCall
+
+try:
+ import cPickle as pickle
+except ImportError:
+ import pickle
+
import whisper
+
from carbon.cache import MetricCache
from carbon.storage import getFilesystemPath, loadStorageSchemas
from carbon.conf import settings
from carbon.instrumentation import increment, append
from carbon import log
-try:
- import cPickle as pickle
-except ImportError:
- import pickle
-
-if settings.WHISPER_AUTOFLUSH:
- log.msg("enabling whisper autoflush")
- whisper.AUTOFLUSH = True
+
+from twisted.internet import reactor
+from twisted.internet.task import LoopingCall
+from twisted.application.service import Service
+
lastCreateInterval = 0
createCount = 0
+schemas = loadStorageSchemas()
+
def optimalWriteOrder():
"Generates metrics with the most cached values first and applies a soft rate limit on new metrics"
@@ -177,10 +180,16 @@
log.err()
-schemaReloadTask = LoopingCall(reloadStorageSchemas)
-schemas = loadStorageSchemas()
-
-
-def startWriter():
- schemaReloadTask.start(60)
- reactor.callInThread(writeForever)
+class WriterService(Service):
+
+ def __init__(self):
+ self.reload_task = LoopingCall(reloadStorageSchemas)
+
+ def startService(self):
+ self.reload_task.start(60, False)
+ reactor.callInThread(writeForever)
+ Service.startService(self)
+
+ def stopService(self):
+ self.reload_task.stop()
+ Service.stopService(self)
=== added directory 'carbon/lib/twisted'
=== added directory 'carbon/lib/twisted/plugins'
=== added file 'carbon/lib/twisted/plugins/carbon_aggregator_plugin.py'
--- carbon/lib/twisted/plugins/carbon_aggregator_plugin.py 1970-01-01 00:00:00 +0000
+++ carbon/lib/twisted/plugins/carbon_aggregator_plugin.py 2011-07-08 20:52:34 +0000
@@ -0,0 +1,25 @@
+from zope.interface import implements
+
+from twisted.plugin import IPlugin
+from twisted.application.service import IServiceMaker
+
+from carbon import service
+from carbon import conf
+
+
+class CarbonAggregatorServiceMaker(object):
+
+ implements(IServiceMaker, IPlugin)
+ tapname = "carbon-aggregator"
+ description = "Aggregate stats for graphite."
+ options = conf.CarbonAggregatorOptions
+
+ def makeService(self, options):
+ """
+ Construct a C{carbon-aggregator} service.
+ """
+ return service.createAggregatorService(options)
+
+
+# Now construct an object which *provides* the relevant interfaces
+serviceMaker = CarbonAggregatorServiceMaker()
=== added file 'carbon/lib/twisted/plugins/carbon_cache_plugin.py'
--- carbon/lib/twisted/plugins/carbon_cache_plugin.py 1970-01-01 00:00:00 +0000
+++ carbon/lib/twisted/plugins/carbon_cache_plugin.py 2011-07-08 20:52:34 +0000
@@ -0,0 +1,25 @@
+from zope.interface import implements
+
+from twisted.plugin import IPlugin
+from twisted.application.service import IServiceMaker
+
+from carbon import service
+from carbon import conf
+
+
+class CarbonCacheServiceMaker(object):
+
+ implements(IServiceMaker, IPlugin)
+ tapname = "carbon-cache"
+ description = "Collect stats for graphite."
+ options = conf.CarbonCacheOptions
+
+ def makeService(self, options):
+ """
+ Construct a C{carbon-cache} service.
+ """
+ return service.createCacheService(options)
+
+
+# Now construct an object which *provides* the relevant interfaces
+serviceMaker = CarbonCacheServiceMaker()
=== added file 'carbon/lib/twisted/plugins/carbon_relay_plugin.py'
--- carbon/lib/twisted/plugins/carbon_relay_plugin.py 1970-01-01 00:00:00 +0000
+++ carbon/lib/twisted/plugins/carbon_relay_plugin.py 2011-07-08 20:52:34 +0000
@@ -0,0 +1,25 @@
+from zope.interface import implements
+
+from twisted.plugin import IPlugin
+from twisted.application.service import IServiceMaker
+
+from carbon import service
+from carbon import conf
+
+
+class CarbonRelayServiceMaker(object):
+
+ implements(IServiceMaker, IPlugin)
+ tapname = "carbon-relay"
+ description = "Relay stats for graphite."
+ options = conf.CarbonRelayOptions
+
+ def makeService(self, options):
+ """
+ Construct a C{carbon-aggregator} service.
+ """
+ return service.createRelayService(options)
+
+
+# Now construct an object which *provides* the relevant interfaces
+serviceMaker = CarbonRelayServiceMaker()