launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #25413
[Merge] ~ilasc/turnip:add-statsd-metrics into turnip:master
Ioana Lasc has proposed merging ~ilasc/turnip:add-statsd-metrics into turnip:master.
Commit message:
Send maxrss, stime and utime metrics to statsd
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~ilasc/turnip/+git/turnip/+merge/391552
This MP is based on Colin's suggestion in MP 391365 and his patch for inter-process communication.
Added the "send to statsd/Telegraf" code, the formatting and tags for metrics, the singleton for the Statsd client and Mock for unit tests.
Tested locally the StatsdGitClient connection with Telegraf & Chronograf, but this branch still needs work and unit tests.
This is a "please review direction" MP, thanks Colin!
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~ilasc/turnip:add-statsd-metrics into turnip:master.
diff --git a/config.yaml b/config.yaml
index 99b0e7b..f7d8612 100644
--- a/config.yaml
+++ b/config.yaml
@@ -21,3 +21,6 @@ openid_provider_root: https://testopenid.test/
site_name: git.launchpad.test
main_site_root: https://launchpad.test/
celery_broker: pyamqp://guest@localhost//
+statsd_host: launchpad.test
+statsd_port: 8125
+statsd_prefix: lp.turnip
diff --git a/packbackendserver.tac b/packbackendserver.tac
index 191ac16..e8157cc 100644
--- a/packbackendserver.tac
+++ b/packbackendserver.tac
@@ -20,7 +20,10 @@ from twisted.scripts.twistd import ServerOptions
from turnip.config import config
from turnip.log import RotatableFileLogObserver
-from turnip.pack.git import PackBackendFactory
+from turnip.pack.git import (
+ PackBackendFactory,
+ StatsdGitClient,
+ )
from turnip.pack.hookrpc import (
HookRPCHandler,
HookRPCServerFactory,
@@ -37,11 +40,14 @@ def getPackBackendServices():
hookrpc_path = config.get('hookrpc_path') or repo_store
hookrpc_sock_path = os.path.join(
hookrpc_path, 'hookrpc_sock_%d' % pack_backend_port)
+ statsd_client = StatsdGitClient(config.get('statsd_host'), config.get('statsd_port')
+ config.get('statsd_prefix'))
pack_backend_service = internet.TCPServer(
pack_backend_port,
PackBackendFactory(repo_store,
hookrpc_handler,
- hookrpc_sock_path))
+ hookrpc_sock_path,
+ statsd_client))
if os.path.exists(hookrpc_sock_path):
os.unlink(hookrpc_sock_path)
hookrpc_service = internet.UNIXServer(
diff --git a/requirements.txt b/requirements.txt
index fe3f1ac..ac502e3 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -58,6 +58,7 @@ scandir==1.10.0
setuptools-scm==1.17.0
simplejson==3.6.5
six==1.15.0
+statsd==3.3.0
testscenarios==0.5.0
testtools==2.4.0
traceback2==1.4.0
diff --git a/setup.py b/setup.py
index 5cfae76..6bc2e94 100755
--- a/setup.py
+++ b/setup.py
@@ -28,6 +28,7 @@ requires = [
'pygit2>=0.27.4,<0.28.0',
'python-openid2',
'PyYAML',
+ 'statsd',
'Twisted[conch]',
'waitress',
'zope.interface',
diff --git a/turnip/pack/git.py b/turnip/pack/git.py
index 4407085..8998462 100644
--- a/turnip/pack/git.py
+++ b/turnip/pack/git.py
@@ -11,9 +11,13 @@ from __future__ import (
__metaclass__ = type
+import json
+import os.path
import uuid
-
import six
+import socket
+import statsd
+import threading
from twisted.internet import (
defer,
error,
@@ -47,6 +51,32 @@ VIRT_ERROR_PREFIX = b'turnip virt error: '
SAFE_PARAMS = frozenset([b'host', b'version'])
+class ThreadSafeSingleton(type):
+ _instances = {}
+ _singleton_lock = threading.Lock()
+
+ def __call__(cls, *args, **kwargs):
+ if cls not in cls._instances:
+ with cls._singleton_lock:
+ if cls not in cls._instances:
+ cls._instances[cls] = super(
+ ThreadSafeSingleton, cls).__call__(*args, **kwargs)
+ return cls._instances[cls]
+
+
+class StatsdGitClient():
+ __metaclass__ = ThreadSafeSingleton
+
+ def __init__(self, *args, **kwargs):
+ self.host = args[0]
+ self.port = args[1]
+ self.prefix = args[2]
+ self.client = statsd.StatsClient(self.host, self.port, self.prefix)
+
+ def get_client(self):
+ return self.client
+
+
class RequestIDLogger(Logger):
def emit(self, level, format=None, **kwargs):
@@ -235,13 +265,15 @@ class PackServerProtocol(PackProxyProtocol):
return auth_params
-class GitProcessProtocol(protocol.ProcessProtocol):
+class GitProcessProtocol(protocol.ProcessProtocol, object):
_err_buffer = b''
+ _resource_usage_buffer = b''
def __init__(self, peer):
self.peer = peer
self.out_started = False
+ self.client = self.peer.factory.statsd_client.get_client()
def connectionMade(self):
self.peer.setPeer(self)
@@ -250,6 +282,12 @@ class GitProcessProtocol(protocol.ProcessProtocol):
UnstoppableProducerWrapper(self.peer.transport), True)
self.peer.resumeProducing()
+ def childDataReceived(self, childFD, data):
+ if childFD == 3:
+ self.resourceUsageReceived(data)
+ else:
+ super(GitProcessProtocol, self).childDataReceived(childFD, data)
+
def outReceived(self, data):
self.out_started = True
self.peer.sendRawData(data)
@@ -259,6 +297,10 @@ class GitProcessProtocol(protocol.ProcessProtocol):
# process is done.
self._err_buffer += data
+ def resourceUsageReceived(self, data):
+ # Just store it up so we can deal with it when the process is done.
+ self._resource_usage_buffer += data
+
def outConnectionLost(self):
if self._err_buffer:
# Originally we'd always return stderr as an ERR packet for
@@ -299,6 +341,35 @@ class GitProcessProtocol(protocol.ProcessProtocol):
code=code)
self.peer.sendPacket(ERROR_PREFIX + 'backend exited %d' % code)
self.peer.processEnded(reason)
+ if self._resource_usage_buffer:
+ try:
+ resource_usage = json.loads(
+ self._resource_usage_buffer.decode('UTF-8'))
+
+ gauge_name = ("host={},repo={},operation={},metric=maxrss"
+ .format(
+ socket.gethostname(),
+ self.peer.raw_pathname,
+ self.peer.command))
+
+ self.client.gauge(gauge_name, resource_usage['maxrss'])
+
+ gauge_name = ("host={},repo={},operation={},metric=stime"
+ .format(
+ socket.gethostname(),
+ self.peer.raw_pathname,
+ self.peer.command))
+ self.client.gauge(gauge_name, resource_usage['stime'])
+
+ gauge_name = ("host={},repo={},operation={},metric=utime"
+ .format(
+ socket.gethostname(),
+ self.peer.raw_pathname,
+ self.peer.command))
+ self.client.gauge(gauge_name, resource_usage['utime'])
+
+ except ValueError:
+ pass
def pauseProducing(self):
self.transport.pauseProducing()
@@ -485,8 +556,9 @@ class PackBackendProtocol(PackServerProtocol):
def spawnGit(self, subcmd, extra_args, write_operation=False,
send_path_as_option=False, auth_params=None,
cmd_env=None):
- cmd = b'git'
- args = [b'git']
+ cmd = os.path.join(
+ os.path.dirname(__file__), 'git_helper.py').encode('UTF-8')
+ args = [cmd]
if send_path_as_option:
args.extend([b'-C', self.path])
args.append(subcmd)
@@ -507,10 +579,12 @@ class PackBackendProtocol(PackServerProtocol):
self.log.info('Spawning {args}', args=args)
self.peer = GitProcessProtocol(self)
- self.spawnProcess(cmd, args, env=env)
+ self.spawnProcess(
+ cmd, args, env=env, childFDs={0: "w", 1: "r", 2: "r", 3: "r"})
- def spawnProcess(self, cmd, args, env=None):
- default_reactor.spawnProcess(self.peer, cmd, args, env=env)
+ def spawnProcess(self, cmd, args, env=None, childFDs=None):
+ default_reactor.spawnProcess(
+ self.peer, cmd, args, env=env, childFDs=childFDs)
def expectNextCommand(self):
"""Enables this connection to receive the next command."""
@@ -620,10 +694,12 @@ class PackBackendFactory(protocol.Factory):
def __init__(self,
root,
hookrpc_handler=None,
- hookrpc_sock=None):
+ hookrpc_sock=None,
+ statsd_client=None):
self.root = root
self.hookrpc_handler = hookrpc_handler
self.hookrpc_sock = hookrpc_sock
+ self.statsd_client = statsd_client
class PackVirtServerProtocol(PackProxyServerProtocol):
diff --git a/turnip/pack/git_helper.py b/turnip/pack/git_helper.py
new file mode 100755
index 0000000..0142834
--- /dev/null
+++ b/turnip/pack/git_helper.py
@@ -0,0 +1,36 @@
+#!/usr/bin/python
+
+# Copyright 2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import fcntl
+import json
+import os
+import resource
+import subprocess
+import sys
+
+
+if __name__ == '__main__':
+ # We expect the caller to have opened FD 3, and will send information
+ # about git's resource usage there. Mark it close-on-exec so that the
+ # git child process can't accidentally interfere with it.
+ flags = fcntl.fcntl(3, fcntl.F_GETFD)
+ fcntl.fcntl(3, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
+
+ # Call git and wait for it to finish.
+ ret = subprocess.call(['git'] + sys.argv[1:])
+
+ # Dump resource usage information to FD 3.
+ resource_fd = os.fdopen(3, 'w')
+ rusage = resource.getrusage(resource.RUSAGE_CHILDREN)
+ resource_fd.write(json.dumps({
+ "utime": rusage.ru_utime,
+ "stime": rusage.ru_stime,
+ "maxrss": rusage.ru_maxrss,
+ }))
+
+ # Pass on git's exit status.
+ sys.exit(ret)
diff --git a/turnip/pack/tests/test_functional.py b/turnip/pack/tests/test_functional.py
index c06c4e2..5ea77d3 100644
--- a/turnip/pack/tests/test_functional.py
+++ b/turnip/pack/tests/test_functional.py
@@ -39,7 +39,9 @@ import six
from testscenarios.testcase import WithScenarios
from testtools import TestCase
from testtools.content import text_content
-from testtools.deferredruntest import AsynchronousDeferredRunTest
+from testtools.deferredruntest import (
+ AsynchronousDeferredRunTestForBrokenTwisted,
+ )
from testtools.matchers import (
Equals,
Is,
@@ -76,12 +78,14 @@ from turnip.pack.tests.fake_servers import (
FakeAuthServerService,
FakeVirtInfoService,
)
+from turnip.pack.tests.test_helpers import MockStatsd
from turnip.version_info import version_info
class FunctionalTestMixin(WithScenarios):
- run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=30)
+ run_tests_with = AsynchronousDeferredRunTestForBrokenTwisted.make_factory(
+ timeout=30)
scenarios = [
('v0 protocol', {"protocol_version": b"0"}),
@@ -119,10 +123,12 @@ class FunctionalTestMixin(WithScenarios):
# get confused on Python 2.
self.root = tempfile.mkdtemp(prefix=b'turnip-test-root-')
self.addCleanup(shutil.rmtree, self.root, ignore_errors=True)
+ self.statsd_client = MockStatsd()
self.backend_listener = reactor.listenTCP(
0,
PackBackendFactory(
- self.root, self.hookrpc_handler, self.hookrpc_sock_path))
+ self.root, self.hookrpc_handler, self.hookrpc_sock_path,
+ self.statsd_client))
self.backend_port = self.backend_listener.getHost().port
self.addCleanup(self.backend_listener.stopListening)
@@ -613,6 +619,7 @@ class FrontendFunctionalTestMixin(FunctionalTestMixin):
0, server.Site(self.authserver))
self.authserver_port = self.authserver_listener.getHost().port
self.authserver_url = b'http://localhost:%d/' % self.authserver_port
+ self.addCleanup(self.authserver_listener.stopListening)
# Run a backend server in a repo root containing an empty repo
# for the path '/test'.
@@ -629,16 +636,11 @@ class FrontendFunctionalTestMixin(FunctionalTestMixin):
PackVirtFactory(
b'localhost', self.backend_port, self.virtinfo_url, 15))
self.virt_port = self.virt_listener.getHost().port
+ self.addCleanup(self.virt_listener.stopListening)
self.virtinfo.ref_permissions = {
b'refs/heads/master': ['create', 'push']}
@defer.inlineCallbacks
- def tearDown(self):
- super(FrontendFunctionalTestMixin, self).tearDown()
- yield self.virt_listener.stopListening()
- yield self.authserver_listener.stopListening()
-
- @defer.inlineCallbacks
def test_read_only(self):
self.virtinfo.ref_permissions = {
b'refs/heads/master': ['create', 'push']}
@@ -711,16 +713,12 @@ class TestGitFrontendFunctional(FrontendFunctionalTestMixin, TestCase):
self.frontend_listener = reactor.listenTCP(
0, PackFrontendFactory(b'localhost', self.virt_port))
self.port = self.frontend_listener.getHost().port
+ self.addCleanup(self.frontend_listener.stopListening)
# Always use a writable URL for now.
self.url = b'git://localhost:%d/+rw/test' % self.port
self.ro_url = b'git://localhost:%d/test' % self.port
- @defer.inlineCallbacks
- def tearDown(self):
- yield super(TestGitFrontendFunctional, self).tearDown()
- yield self.frontend_listener.stopListening()
-
class TestSmartHTTPFrontendFunctional(FrontendFunctionalTestMixin, TestCase):
@@ -744,17 +742,13 @@ class TestSmartHTTPFrontendFunctional(FrontendFunctionalTestMixin, TestCase):
}))
self.frontend_listener = reactor.listenTCP(0, frontend_site)
self.port = self.frontend_listener.getHost().port
+ self.addCleanup(self.frontend_listener.stopListening)
# Always use a writable URL for now.
self.url = b'http://localhost:%d/+rw/test' % self.port
self.ro_url = b'http://localhost:%d/test' % self.port
@defer.inlineCallbacks
- def tearDown(self):
- yield super(TestSmartHTTPFrontendFunctional, self).tearDown()
- yield self.frontend_listener.stopListening()
-
- @defer.inlineCallbacks
def test_root_revision_header(self):
response = yield client.Agent(reactor).request(
b'HEAD', b'http://localhost:%d/' % self.port)
diff --git a/turnip/pack/tests/test_git.py b/turnip/pack/tests/test_git.py
index d1a8162..599c5f6 100644
--- a/turnip/pack/tests/test_git.py
+++ b/turnip/pack/tests/test_git.py
@@ -35,6 +35,7 @@ from turnip.pack import (
helpers,
)
from turnip.pack.tests.fake_servers import FakeVirtInfoService
+from turnip.pack.tests.test_helpers import MockStatsd
from turnip.pack.tests.test_hooks import MockHookRPCHandler
from turnip.tests.compat import mock
@@ -116,7 +117,7 @@ class DummyPackBackendProtocol(git.PackBackendProtocol):
test_process = None
- def spawnProcess(self, cmd, args, env=None):
+ def spawnProcess(self, cmd, args, env=None, childFDs=None):
if self.test_process is not None:
raise AssertionError('Process already spawned.')
self.test_process = (cmd, args, env)
@@ -178,9 +179,11 @@ class TestPackBackendProtocol(TestCase):
super(TestPackBackendProtocol, self).setUp()
self.root = self.useFixture(TempDir()).path
self.hookrpc_handler = MockHookRPCHandler()
+ self.statsd_client = MockStatsd()
self.hookrpc_sock = os.path.join(self.root, 'hookrpc_sock')
self.factory = git.PackBackendFactory(
- self.root, self.hookrpc_handler, self.hookrpc_sock)
+ self.root, self.hookrpc_handler,
+ self.hookrpc_sock, self.statsd_client)
self.proto = DummyPackBackendProtocol()
self.proto.factory = self.factory
self.transport = testing.StringTransportWithDisconnection()
@@ -196,6 +199,9 @@ class TestPackBackendProtocol(TestCase):
self.addCleanup(self.virtinfo_listener.stopListening)
self.setupConfig()
+ self.git_helper = os.path.join(
+ os.path.dirname(git.__file__), 'git_helper.py').encode('UTF-8')
+
def setupConfig(self):
config.defaults['virtinfo_endpoint'] = self.virtinfo_url
# Force timeout to be a string to make sure we are casting it
@@ -229,7 +235,7 @@ class TestPackBackendProtocol(TestCase):
[('foo.git', )], self.virtinfo.confirm_repo_creation_call_args)
self.assertEqual(
- (b'git', [b'git', b'upload-pack', full_path], {
+ (self.git_helper, [self.git_helper, b'upload-pack', full_path], {
'GIT_PROTOCOL': 'version=0'
}), self.proto.test_process)
@@ -266,8 +272,8 @@ class TestPackBackendProtocol(TestCase):
b'git-upload-pack', b'/foo.git', {b'host': b'example.com'})
full_path = os.path.join(six.ensure_binary(self.root), b'foo.git')
self.assertEqual(
- (b'git',
- [b'git', b'upload-pack', full_path],
+ (self.git_helper,
+ [self.git_helper, b'upload-pack', full_path],
{'GIT_PROTOCOL': 'version=0'}),
self.proto.test_process)
@@ -280,8 +286,11 @@ class TestPackBackendProtocol(TestCase):
b'git-receive-pack', b'/foo.git', {b'host': b'example.com'})
self.assertThat(
self.proto.test_process, MatchesListwise([
- Equals(b'git'),
- Equals([b'git', b'receive-pack', repo_dir.encode('utf-8')]),
+ Equals(self.git_helper),
+ Equals([
+ self.git_helper, b'receive-pack',
+ repo_dir.encode('utf-8'),
+ ]),
ContainsDict(
{b'TURNIP_HOOK_RPC_SOCK': Equals(self.hookrpc_sock)})]))
@@ -296,10 +305,10 @@ class TestPackBackendProtocol(TestCase):
self.proto.packetReceived(b'HEAD refs/heads/master')
self.assertThat(
self.proto.test_process, MatchesListwise([
- Equals(b'git'),
+ Equals(self.git_helper),
Equals([
- b'git', b'-C', repo_dir.encode('utf-8'), b'symbolic-ref',
- b'HEAD', b'refs/heads/master']),
+ self.git_helper, b'-C', repo_dir.encode('utf-8'),
+ b'symbolic-ref', b'HEAD', b'refs/heads/master']),
ContainsDict(
{b'TURNIP_HOOK_RPC_SOCK': Equals(self.hookrpc_sock)})]))
diff --git a/turnip/pack/tests/test_helpers.py b/turnip/pack/tests/test_helpers.py
index ff484e8..4ebfa58 100644
--- a/turnip/pack/tests/test_helpers.py
+++ b/turnip/pack/tests/test_helpers.py
@@ -32,6 +32,8 @@ from turnip.pack.helpers import (
)
import turnip.pack.hooks
from turnip.version_info import version_info
+from zope.interface import implementer
+from zope.interface import Interface
TEST_DATA = b'0123456789abcdef'
TEST_PKT = b'00140123456789abcdef'
@@ -339,3 +341,73 @@ class TestCapabilityAdvertisement(TestCase):
self.assertEqual(
turnip_capabilities,
git_advertised_capabilities.replace(git_agent, turnip_agent))
+
+
+class IStats(Interface):
+ def incr(self, key=None):
+ """
+ increment a key
+
+ :param key: the key to increment
+ :return: nothing
+ """
+
+ def decr(self, key=None):
+ """
+ decrement a key
+
+ :param key: the key to decrement
+ :return: nothing
+ """
+
+ def timing(self, key=None, ms=None):
+ """
+ record an execution time for this key
+
+ :param key: the key to report for
+ :param ms: the timing in milliseconds
+ :return: nothing
+ """
+
+ def gauge(self, key=None, value=None):
+ """
+ gauge a value
+
+ :param key: the key to gauge for
+ :param value: the gauged value
+ :return: nothing
+ """
+
+
+@implementer(IStats)
+class MockStatsd():
+ def __init__(self):
+ self.vals = dict()
+ self.timings = dict()
+
+ def get_instance(self):
+ return self
+
+ def incr(self, key=None):
+ if key not in self.vals:
+ self.vals[key] = 1
+ else:
+ self.vals[key] += 1
+
+ def decr(self, key=None):
+ if key not in self.vals:
+ self.vals[key] = -1
+ else:
+ self.vals[key] -= 1
+
+ def timing(self, key=None, ms=None):
+ self.timings[key] = ms
+
+ def gauge(self, key=None, value=None):
+ self.vals[key] = value
+
+ def set(self, key=None, value=None):
+ self.vals[key] = value
+
+ def get_client(self):
+ return self
diff --git a/turnipserver.py b/turnipserver.py
index 8627392..0f50e3c 100644
--- a/turnipserver.py
+++ b/turnipserver.py
@@ -17,6 +17,7 @@ from turnip.pack.git import (
PackBackendFactory,
PackFrontendFactory,
PackVirtFactory,
+ StatsdGitClient,
)
from turnip.pack.hookrpc import (
HookRPCHandler,
@@ -37,6 +38,9 @@ REPO_STORE = config.get('repo_store')
HOOKRPC_PATH = config.get('hookrpc_path') or REPO_STORE
VIRTINFO_ENDPOINT = config.get('virtinfo_endpoint')
VIRTINFO_TIMEOUT = int(config.get('virtinfo_timeout'))
+STATSD_HOST = config.get('statsd_host')
+STATSD_PORT = config.get('statsd_port')
+STATSD_PREFIX = config.get('statsd_prefix')
# turnipserver.py is preserved for convenience in development, services
# in production are run in separate processes.
@@ -48,11 +52,15 @@ VIRTINFO_TIMEOUT = int(config.get('virtinfo_timeout'))
hookrpc_handler = HookRPCHandler(VIRTINFO_ENDPOINT, VIRTINFO_TIMEOUT)
hookrpc_sock_path = os.path.join(
HOOKRPC_PATH, 'hookrpc_sock_%d' % PACK_BACKEND_PORT)
+
+statsd_client = StatsdGitClient(STATSD_HOST, STATSD_PORT, STATSD_PREFIX)
+
reactor.listenTCP(
PACK_BACKEND_PORT,
PackBackendFactory(REPO_STORE,
hookrpc_handler,
- hookrpc_sock_path))
+ hookrpc_sock_path,
+ statsd_client))
if os.path.exists(hookrpc_sock_path):
os.unlink(hookrpc_sock_path)
reactor.listenUNIX(hookrpc_sock_path, HookRPCServerFactory(hookrpc_handler))