← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~ilasc/turnip:add-statsd-metrics into turnip:master

 

Ioana Lasc has proposed merging ~ilasc/turnip:add-statsd-metrics into turnip:master.

Commit message:
Send maxrss, stime and utime metrics to statsd

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~ilasc/turnip/+git/turnip/+merge/391552

This MP is based on Colin's suggestion in MP 391365 and his patch for inter-process communication.
Added the "send to statsd/Telegraf" code, the formatting and tags for metrics, the singleton for the Statsd client and Mock for unit tests.
Tested locally the StatsdGitClient connection with Telegraf & Chronograf, but this branch still needs work and unit tests.
This is a "please review direction" MP, thanks Colin!
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~ilasc/turnip:add-statsd-metrics into turnip:master.
diff --git a/config.yaml b/config.yaml
index 99b0e7b..f7d8612 100644
--- a/config.yaml
+++ b/config.yaml
@@ -21,3 +21,6 @@ openid_provider_root: https://testopenid.test/
 site_name: git.launchpad.test
 main_site_root: https://launchpad.test/
 celery_broker: pyamqp://guest@localhost//
+statsd_host: launchpad.test
+statsd_port: 8125
+statsd_prefix: lp.turnip
diff --git a/packbackendserver.tac b/packbackendserver.tac
index 191ac16..e8157cc 100644
--- a/packbackendserver.tac
+++ b/packbackendserver.tac
@@ -20,7 +20,10 @@ from twisted.scripts.twistd import ServerOptions
 
 from turnip.config import config
 from turnip.log import RotatableFileLogObserver
-from turnip.pack.git import PackBackendFactory
+from turnip.pack.git import (
+    PackBackendFactory,
+    StatsdGitClient,
+    )
 from turnip.pack.hookrpc import (
     HookRPCHandler,
     HookRPCServerFactory,
@@ -37,11 +40,14 @@ def getPackBackendServices():
     hookrpc_path = config.get('hookrpc_path') or repo_store
     hookrpc_sock_path = os.path.join(
         hookrpc_path, 'hookrpc_sock_%d' % pack_backend_port)
+    statsd_client = StatsdGitClient(config.get('statsd_host'), config.get('statsd_port')
+        config.get('statsd_prefix'))
     pack_backend_service = internet.TCPServer(
         pack_backend_port,
         PackBackendFactory(repo_store,
                            hookrpc_handler,
-                           hookrpc_sock_path))
+                           hookrpc_sock_path,
+                           statsd_client))
     if os.path.exists(hookrpc_sock_path):
         os.unlink(hookrpc_sock_path)
     hookrpc_service = internet.UNIXServer(
diff --git a/requirements.txt b/requirements.txt
index fe3f1ac..ac502e3 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -58,6 +58,7 @@ scandir==1.10.0
 setuptools-scm==1.17.0
 simplejson==3.6.5
 six==1.15.0
+statsd==3.3.0
 testscenarios==0.5.0
 testtools==2.4.0
 traceback2==1.4.0
diff --git a/setup.py b/setup.py
index 5cfae76..6bc2e94 100755
--- a/setup.py
+++ b/setup.py
@@ -28,6 +28,7 @@ requires = [
     'pygit2>=0.27.4,<0.28.0',
     'python-openid2',
     'PyYAML',
+    'statsd',
     'Twisted[conch]',
     'waitress',
     'zope.interface',
diff --git a/turnip/pack/git.py b/turnip/pack/git.py
index 4407085..8998462 100644
--- a/turnip/pack/git.py
+++ b/turnip/pack/git.py
@@ -11,9 +11,13 @@ from __future__ import (
 __metaclass__ = type
 
 
+import json
+import os.path
 import uuid
-
 import six
+import socket
+import statsd
+import threading
 from twisted.internet import (
     defer,
     error,
@@ -47,6 +51,32 @@ VIRT_ERROR_PREFIX = b'turnip virt error: '
 SAFE_PARAMS = frozenset([b'host', b'version'])
 
 
+class ThreadSafeSingleton(type):
+    _instances = {}
+    _singleton_lock = threading.Lock()
+
+    def __call__(cls, *args, **kwargs):
+        if cls not in cls._instances:
+            with cls._singleton_lock:
+                if cls not in cls._instances:
+                    cls._instances[cls] = super(
+                        ThreadSafeSingleton, cls).__call__(*args, **kwargs)
+        return cls._instances[cls]
+
+
+class StatsdGitClient():
+    __metaclass__ = ThreadSafeSingleton
+
+    def __init__(self, *args, **kwargs):
+        self.host = args[0]
+        self.port = args[1]
+        self.prefix = args[2]
+        self.client = statsd.StatsClient(self.host, self.port, self.prefix)
+
+    def get_client(self):
+        return self.client
+
+
 class RequestIDLogger(Logger):
 
     def emit(self, level, format=None, **kwargs):
@@ -235,13 +265,15 @@ class PackServerProtocol(PackProxyProtocol):
         return auth_params
 
 
-class GitProcessProtocol(protocol.ProcessProtocol):
+class GitProcessProtocol(protocol.ProcessProtocol, object):
 
     _err_buffer = b''
+    _resource_usage_buffer = b''
 
     def __init__(self, peer):
         self.peer = peer
         self.out_started = False
+        self.client = self.peer.factory.statsd_client.get_client()
 
     def connectionMade(self):
         self.peer.setPeer(self)
@@ -250,6 +282,12 @@ class GitProcessProtocol(protocol.ProcessProtocol):
             UnstoppableProducerWrapper(self.peer.transport), True)
         self.peer.resumeProducing()
 
+    def childDataReceived(self, childFD, data):
+        if childFD == 3:
+            self.resourceUsageReceived(data)
+        else:
+            super(GitProcessProtocol, self).childDataReceived(childFD, data)
+
     def outReceived(self, data):
         self.out_started = True
         self.peer.sendRawData(data)
@@ -259,6 +297,10 @@ class GitProcessProtocol(protocol.ProcessProtocol):
         # process is done.
         self._err_buffer += data
 
+    def resourceUsageReceived(self, data):
+        # Just store it up so we can deal with it when the process is done.
+        self._resource_usage_buffer += data
+
     def outConnectionLost(self):
         if self._err_buffer:
             # Originally we'd always return stderr as an ERR packet for
@@ -299,6 +341,35 @@ class GitProcessProtocol(protocol.ProcessProtocol):
                 code=code)
             self.peer.sendPacket(ERROR_PREFIX + 'backend exited %d' % code)
         self.peer.processEnded(reason)
+        if self._resource_usage_buffer:
+            try:
+                resource_usage = json.loads(
+                    self._resource_usage_buffer.decode('UTF-8'))
+
+                gauge_name = ("host={},repo={},operation={},metric=maxrss"
+                              .format(
+                                  socket.gethostname(),
+                                  self.peer.raw_pathname,
+                                  self.peer.command))
+
+                self.client.gauge(gauge_name, resource_usage['maxrss'])
+
+                gauge_name = ("host={},repo={},operation={},metric=stime"
+                              .format(
+                                  socket.gethostname(),
+                                  self.peer.raw_pathname,
+                                  self.peer.command))
+                self.client.gauge(gauge_name, resource_usage['stime'])
+
+                gauge_name = ("host={},repo={},operation={},metric=utime"
+                              .format(
+                                  socket.gethostname(),
+                                  self.peer.raw_pathname,
+                                  self.peer.command))
+                self.client.gauge(gauge_name, resource_usage['utime'])
+
+            except ValueError:
+                pass
 
     def pauseProducing(self):
         self.transport.pauseProducing()
@@ -485,8 +556,9 @@ class PackBackendProtocol(PackServerProtocol):
     def spawnGit(self, subcmd, extra_args, write_operation=False,
                  send_path_as_option=False, auth_params=None,
                  cmd_env=None):
-        cmd = b'git'
-        args = [b'git']
+        cmd = os.path.join(
+            os.path.dirname(__file__), 'git_helper.py').encode('UTF-8')
+        args = [cmd]
         if send_path_as_option:
             args.extend([b'-C', self.path])
         args.append(subcmd)
@@ -507,10 +579,12 @@ class PackBackendProtocol(PackServerProtocol):
 
         self.log.info('Spawning {args}', args=args)
         self.peer = GitProcessProtocol(self)
-        self.spawnProcess(cmd, args, env=env)
+        self.spawnProcess(
+            cmd, args, env=env, childFDs={0: "w", 1: "r", 2: "r", 3: "r"})
 
-    def spawnProcess(self, cmd, args, env=None):
-        default_reactor.spawnProcess(self.peer, cmd, args, env=env)
+    def spawnProcess(self, cmd, args, env=None, childFDs=None):
+        default_reactor.spawnProcess(
+            self.peer, cmd, args, env=env, childFDs=childFDs)
 
     def expectNextCommand(self):
         """Enables this connection to receive the next command."""
@@ -620,10 +694,12 @@ class PackBackendFactory(protocol.Factory):
     def __init__(self,
                  root,
                  hookrpc_handler=None,
-                 hookrpc_sock=None):
+                 hookrpc_sock=None,
+                 statsd_client=None):
         self.root = root
         self.hookrpc_handler = hookrpc_handler
         self.hookrpc_sock = hookrpc_sock
+        self.statsd_client = statsd_client
 
 
 class PackVirtServerProtocol(PackProxyServerProtocol):
diff --git a/turnip/pack/git_helper.py b/turnip/pack/git_helper.py
new file mode 100755
index 0000000..0142834
--- /dev/null
+++ b/turnip/pack/git_helper.py
@@ -0,0 +1,36 @@
+#!/usr/bin/python
+
+# Copyright 2020 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import fcntl
+import json
+import os
+import resource
+import subprocess
+import sys
+
+
+if __name__ == '__main__':
+    # We expect the caller to have opened FD 3, and will send information
+    # about git's resource usage there.  Mark it close-on-exec so that the
+    # git child process can't accidentally interfere with it.
+    flags = fcntl.fcntl(3, fcntl.F_GETFD)
+    fcntl.fcntl(3, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
+
+    # Call git and wait for it to finish.
+    ret = subprocess.call(['git'] + sys.argv[1:])
+
+    # Dump resource usage information to FD 3.
+    resource_fd = os.fdopen(3, 'w')
+    rusage = resource.getrusage(resource.RUSAGE_CHILDREN)
+    resource_fd.write(json.dumps({
+        "utime": rusage.ru_utime,
+        "stime": rusage.ru_stime,
+        "maxrss": rusage.ru_maxrss,
+        }))
+
+    # Pass on git's exit status.
+    sys.exit(ret)
diff --git a/turnip/pack/tests/test_functional.py b/turnip/pack/tests/test_functional.py
index c06c4e2..5ea77d3 100644
--- a/turnip/pack/tests/test_functional.py
+++ b/turnip/pack/tests/test_functional.py
@@ -39,7 +39,9 @@ import six
 from testscenarios.testcase import WithScenarios
 from testtools import TestCase
 from testtools.content import text_content
-from testtools.deferredruntest import AsynchronousDeferredRunTest
+from testtools.deferredruntest import (
+    AsynchronousDeferredRunTestForBrokenTwisted,
+    )
 from testtools.matchers import (
     Equals,
     Is,
@@ -76,12 +78,14 @@ from turnip.pack.tests.fake_servers import (
     FakeAuthServerService,
     FakeVirtInfoService,
     )
+from turnip.pack.tests.test_helpers import MockStatsd
 from turnip.version_info import version_info
 
 
 class FunctionalTestMixin(WithScenarios):
 
-    run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=30)
+    run_tests_with = AsynchronousDeferredRunTestForBrokenTwisted.make_factory(
+        timeout=30)
 
     scenarios = [
         ('v0 protocol', {"protocol_version": b"0"}),
@@ -119,10 +123,12 @@ class FunctionalTestMixin(WithScenarios):
         # get confused on Python 2.
         self.root = tempfile.mkdtemp(prefix=b'turnip-test-root-')
         self.addCleanup(shutil.rmtree, self.root, ignore_errors=True)
+        self.statsd_client = MockStatsd()
         self.backend_listener = reactor.listenTCP(
             0,
             PackBackendFactory(
-                self.root, self.hookrpc_handler, self.hookrpc_sock_path))
+                self.root, self.hookrpc_handler, self.hookrpc_sock_path,
+                self.statsd_client))
         self.backend_port = self.backend_listener.getHost().port
         self.addCleanup(self.backend_listener.stopListening)
 
@@ -613,6 +619,7 @@ class FrontendFunctionalTestMixin(FunctionalTestMixin):
             0, server.Site(self.authserver))
         self.authserver_port = self.authserver_listener.getHost().port
         self.authserver_url = b'http://localhost:%d/' % self.authserver_port
+        self.addCleanup(self.authserver_listener.stopListening)
 
         # Run a backend server in a repo root containing an empty repo
         # for the path '/test'.
@@ -629,16 +636,11 @@ class FrontendFunctionalTestMixin(FunctionalTestMixin):
             PackVirtFactory(
                 b'localhost', self.backend_port, self.virtinfo_url, 15))
         self.virt_port = self.virt_listener.getHost().port
+        self.addCleanup(self.virt_listener.stopListening)
         self.virtinfo.ref_permissions = {
             b'refs/heads/master': ['create', 'push']}
 
     @defer.inlineCallbacks
-    def tearDown(self):
-        super(FrontendFunctionalTestMixin, self).tearDown()
-        yield self.virt_listener.stopListening()
-        yield self.authserver_listener.stopListening()
-
-    @defer.inlineCallbacks
     def test_read_only(self):
         self.virtinfo.ref_permissions = {
             b'refs/heads/master': ['create', 'push']}
@@ -711,16 +713,12 @@ class TestGitFrontendFunctional(FrontendFunctionalTestMixin, TestCase):
         self.frontend_listener = reactor.listenTCP(
             0, PackFrontendFactory(b'localhost', self.virt_port))
         self.port = self.frontend_listener.getHost().port
+        self.addCleanup(self.frontend_listener.stopListening)
 
         # Always use a writable URL for now.
         self.url = b'git://localhost:%d/+rw/test' % self.port
         self.ro_url = b'git://localhost:%d/test' % self.port
 
-    @defer.inlineCallbacks
-    def tearDown(self):
-        yield super(TestGitFrontendFunctional, self).tearDown()
-        yield self.frontend_listener.stopListening()
-
 
 class TestSmartHTTPFrontendFunctional(FrontendFunctionalTestMixin, TestCase):
 
@@ -744,17 +742,13 @@ class TestSmartHTTPFrontendFunctional(FrontendFunctionalTestMixin, TestCase):
                 }))
         self.frontend_listener = reactor.listenTCP(0, frontend_site)
         self.port = self.frontend_listener.getHost().port
+        self.addCleanup(self.frontend_listener.stopListening)
 
         # Always use a writable URL for now.
         self.url = b'http://localhost:%d/+rw/test' % self.port
         self.ro_url = b'http://localhost:%d/test' % self.port
 
     @defer.inlineCallbacks
-    def tearDown(self):
-        yield super(TestSmartHTTPFrontendFunctional, self).tearDown()
-        yield self.frontend_listener.stopListening()
-
-    @defer.inlineCallbacks
     def test_root_revision_header(self):
         response = yield client.Agent(reactor).request(
             b'HEAD', b'http://localhost:%d/' % self.port)
diff --git a/turnip/pack/tests/test_git.py b/turnip/pack/tests/test_git.py
index d1a8162..599c5f6 100644
--- a/turnip/pack/tests/test_git.py
+++ b/turnip/pack/tests/test_git.py
@@ -35,6 +35,7 @@ from turnip.pack import (
     helpers,
     )
 from turnip.pack.tests.fake_servers import FakeVirtInfoService
+from turnip.pack.tests.test_helpers import MockStatsd
 from turnip.pack.tests.test_hooks import MockHookRPCHandler
 from turnip.tests.compat import mock
 
@@ -116,7 +117,7 @@ class DummyPackBackendProtocol(git.PackBackendProtocol):
 
     test_process = None
 
-    def spawnProcess(self, cmd, args, env=None):
+    def spawnProcess(self, cmd, args, env=None, childFDs=None):
         if self.test_process is not None:
             raise AssertionError('Process already spawned.')
         self.test_process = (cmd, args, env)
@@ -178,9 +179,11 @@ class TestPackBackendProtocol(TestCase):
         super(TestPackBackendProtocol, self).setUp()
         self.root = self.useFixture(TempDir()).path
         self.hookrpc_handler = MockHookRPCHandler()
+        self.statsd_client = MockStatsd()
         self.hookrpc_sock = os.path.join(self.root, 'hookrpc_sock')
         self.factory = git.PackBackendFactory(
-            self.root, self.hookrpc_handler, self.hookrpc_sock)
+            self.root, self.hookrpc_handler,
+            self.hookrpc_sock, self.statsd_client)
         self.proto = DummyPackBackendProtocol()
         self.proto.factory = self.factory
         self.transport = testing.StringTransportWithDisconnection()
@@ -196,6 +199,9 @@ class TestPackBackendProtocol(TestCase):
         self.addCleanup(self.virtinfo_listener.stopListening)
         self.setupConfig()
 
+        self.git_helper = os.path.join(
+            os.path.dirname(git.__file__), 'git_helper.py').encode('UTF-8')
+
     def setupConfig(self):
         config.defaults['virtinfo_endpoint'] = self.virtinfo_url
         # Force timeout to be a string to make sure we are casting it
@@ -229,7 +235,7 @@ class TestPackBackendProtocol(TestCase):
             [('foo.git', )], self.virtinfo.confirm_repo_creation_call_args)
 
         self.assertEqual(
-            (b'git', [b'git', b'upload-pack', full_path], {
+            (self.git_helper, [self.git_helper, b'upload-pack', full_path], {
                 'GIT_PROTOCOL': 'version=0'
             }), self.proto.test_process)
 
@@ -266,8 +272,8 @@ class TestPackBackendProtocol(TestCase):
             b'git-upload-pack', b'/foo.git', {b'host': b'example.com'})
         full_path = os.path.join(six.ensure_binary(self.root), b'foo.git')
         self.assertEqual(
-            (b'git',
-             [b'git', b'upload-pack', full_path],
+            (self.git_helper,
+             [self.git_helper, b'upload-pack', full_path],
              {'GIT_PROTOCOL': 'version=0'}),
             self.proto.test_process)
 
@@ -280,8 +286,11 @@ class TestPackBackendProtocol(TestCase):
             b'git-receive-pack', b'/foo.git', {b'host': b'example.com'})
         self.assertThat(
             self.proto.test_process, MatchesListwise([
-                Equals(b'git'),
-                Equals([b'git', b'receive-pack', repo_dir.encode('utf-8')]),
+                Equals(self.git_helper),
+                Equals([
+                    self.git_helper, b'receive-pack',
+                    repo_dir.encode('utf-8'),
+                    ]),
                 ContainsDict(
                     {b'TURNIP_HOOK_RPC_SOCK': Equals(self.hookrpc_sock)})]))
 
@@ -296,10 +305,10 @@ class TestPackBackendProtocol(TestCase):
         self.proto.packetReceived(b'HEAD refs/heads/master')
         self.assertThat(
             self.proto.test_process, MatchesListwise([
-                Equals(b'git'),
+                Equals(self.git_helper),
                 Equals([
-                    b'git', b'-C', repo_dir.encode('utf-8'), b'symbolic-ref',
-                    b'HEAD', b'refs/heads/master']),
+                    self.git_helper, b'-C', repo_dir.encode('utf-8'),
+                    b'symbolic-ref', b'HEAD', b'refs/heads/master']),
                 ContainsDict(
                     {b'TURNIP_HOOK_RPC_SOCK': Equals(self.hookrpc_sock)})]))
 
diff --git a/turnip/pack/tests/test_helpers.py b/turnip/pack/tests/test_helpers.py
index ff484e8..4ebfa58 100644
--- a/turnip/pack/tests/test_helpers.py
+++ b/turnip/pack/tests/test_helpers.py
@@ -32,6 +32,8 @@ from turnip.pack.helpers import (
     )
 import turnip.pack.hooks
 from turnip.version_info import version_info
+from zope.interface import implementer
+from zope.interface import Interface
 
 TEST_DATA = b'0123456789abcdef'
 TEST_PKT = b'00140123456789abcdef'
@@ -339,3 +341,73 @@ class TestCapabilityAdvertisement(TestCase):
         self.assertEqual(
             turnip_capabilities,
             git_advertised_capabilities.replace(git_agent, turnip_agent))
+
+
+class IStats(Interface):
+    def incr(self, key=None):
+        """
+        increment a key
+
+        :param key: the key to increment
+        :return: nothing
+        """
+
+    def decr(self, key=None):
+        """
+        decrement a key
+
+        :param key: the key to decrement
+        :return: nothing
+        """
+
+    def timing(self, key=None, ms=None):
+        """
+        record an execution time for this key
+
+        :param key: the key to report for
+        :param ms: the timing in milliseconds
+        :return: nothing
+        """
+
+    def gauge(self, key=None, value=None):
+        """
+        gauge a value
+
+        :param key: the key to gauge for
+        :param value: the gauged value
+        :return: nothing
+        """
+
+
+@implementer(IStats)
+class MockStatsd():
+    def __init__(self):
+        self.vals = dict()
+        self.timings = dict()
+
+    def get_instance(self):
+        return self
+
+    def incr(self, key=None):
+        if key not in self.vals:
+            self.vals[key] = 1
+        else:
+            self.vals[key] += 1
+
+    def decr(self, key=None):
+        if key not in self.vals:
+            self.vals[key] = -1
+        else:
+            self.vals[key] -= 1
+
+    def timing(self, key=None, ms=None):
+        self.timings[key] = ms
+
+    def gauge(self, key=None, value=None):
+        self.vals[key] = value
+
+    def set(self, key=None, value=None):
+        self.vals[key] = value
+
+    def get_client(self):
+        return self
diff --git a/turnipserver.py b/turnipserver.py
index 8627392..0f50e3c 100644
--- a/turnipserver.py
+++ b/turnipserver.py
@@ -17,6 +17,7 @@ from turnip.pack.git import (
     PackBackendFactory,
     PackFrontendFactory,
     PackVirtFactory,
+    StatsdGitClient,
     )
 from turnip.pack.hookrpc import (
     HookRPCHandler,
@@ -37,6 +38,9 @@ REPO_STORE = config.get('repo_store')
 HOOKRPC_PATH = config.get('hookrpc_path') or REPO_STORE
 VIRTINFO_ENDPOINT = config.get('virtinfo_endpoint')
 VIRTINFO_TIMEOUT = int(config.get('virtinfo_timeout'))
+STATSD_HOST = config.get('statsd_host')
+STATSD_PORT = config.get('statsd_port')
+STATSD_PREFIX = config.get('statsd_prefix')
 
 # turnipserver.py is preserved for convenience in development, services
 # in production are run in separate processes.
@@ -48,11 +52,15 @@ VIRTINFO_TIMEOUT = int(config.get('virtinfo_timeout'))
 hookrpc_handler = HookRPCHandler(VIRTINFO_ENDPOINT, VIRTINFO_TIMEOUT)
 hookrpc_sock_path = os.path.join(
     HOOKRPC_PATH, 'hookrpc_sock_%d' % PACK_BACKEND_PORT)
+
+statsd_client = StatsdGitClient(STATSD_HOST, STATSD_PORT, STATSD_PREFIX)
+
 reactor.listenTCP(
     PACK_BACKEND_PORT,
     PackBackendFactory(REPO_STORE,
                        hookrpc_handler,
-                       hookrpc_sock_path))
+                       hookrpc_sock_path,
+                       statsd_client))
 if os.path.exists(hookrpc_sock_path):
     os.unlink(hookrpc_sock_path)
 reactor.listenUNIX(hookrpc_sock_path, HookRPCServerFactory(hookrpc_handler))