← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~pappacena/turnip:parallel-rpc-call into turnip:master

 

Thiago F. Pappacena has proposed merging ~pappacena/turnip:parallel-rpc-call into turnip:master with ~pappacena/turnip:paginated-check-refs-permissions as a prerequisite.

Commit message:
Running branches permission check in parallel, in order to speedup git push operation for large sets of tags & branches.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~pappacena/turnip/+git/turnip/+merge/384684
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/turnip:parallel-rpc-call into turnip:master.
diff --git a/requirements.txt b/requirements.txt
index cb86dfe..4f71780 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -11,6 +11,7 @@ cornice==3.6.1
 cryptography==2.8
 docutils==0.14
 enum34==1.1.9
+futures==3.3.0
 envdir==0.7
 extras==1.0.0
 fixtures==3.0.0
diff --git a/setup.py b/setup.py
index b5821f7..bb22bae 100755
--- a/setup.py
+++ b/setup.py
@@ -21,6 +21,7 @@ requires = [
     'contextlib2',
     'cornice',
     'enum34; python_version < "3.4"',
+    'futures; python_version < "3.2"',
     'lazr.sshserver>=0.1.7',
     'Paste',
     'pygit2>=0.27.4,<0.28.0',
diff --git a/turnip/pack/hooks/hook.py b/turnip/pack/hooks/hook.py
index b1348fa..a5ba739 100755
--- a/turnip/pack/hooks/hook.py
+++ b/turnip/pack/hooks/hook.py
@@ -10,6 +10,7 @@ from __future__ import (
     )
 
 import base64
+from concurrent.futures import ThreadPoolExecutor
 import json
 import os
 import socket
@@ -24,6 +25,20 @@ from turnip.compat.files import fd_buffer
 # that currently causes CFFI warnings to be returned to the client.
 GIT_OID_HEX_ZERO = '0'*40
 
+# Be careful when adjusting these numbers. A page size too big may lead to
+# staled connections and timeouts, while small page sizes could lead to poor
+# performance.
+# For the amount of workers, too many workers might beat too hard on Launchpad,
+# and too few of them might lead to poor performance.
+XML_RPC_PAGE_SIZE = 25
+XML_RPC_MAX_WORKERS = 4
+
+
+def get_socket():
+    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+    sock.connect(os.environ['TURNIP_HOOK_RPC_SOCK'])
+    return sock
+
 
 def check_ancestor(old, new):
     # This is a delete, setting the new ref.
@@ -130,7 +145,8 @@ def netstring_recv(sock):
     return bytes(s)
 
 
-def rpc_invoke(sock, method, args):
+def rpc_invoke(method, args):
+    sock = get_socket()
     msg = dict(args)
     assert 'op' not in msg
     msg['op'] = method
@@ -147,17 +163,33 @@ def split_list(lst, n):
     return [lst[i:i + n] for i in range(0, len(lst), n)]
 
 
-def check_ref_permissions(sock, rpc_key, ref_paths, page_size=25):
+def check_ref_permissions(rpc_key, ref_paths, page_size=XML_RPC_PAGE_SIZE,
+                          max_workers=XML_RPC_MAX_WORKERS):
+    """Run XML RPC call to check refs permissions.
+
+    For large sets of ref_paths, this method paginates them to do several
+    XML RPC calls of maximun `page_size` refs in each, and run it in
+    parallel, `max_workers` at a time.
+    """
     ref_paths = [base64.b64encode(path).decode('UTF-8') for path in ref_paths]
     permissions = {}
-    # Paginate the rpc calls to avoid timeouts.
-    for ref_paths_chunk in split_list(ref_paths, page_size):
-        rule_lines = rpc_invoke(
-            sock, 'check_ref_permissions',
-            {'key': rpc_key, 'paths': ref_paths_chunk})
-        permissions.update({
-            base64.b64decode(path.encode('UTF-8')): permissions
-            for path, permissions in rule_lines.items()})
+
+    # Paginate the rpc calls and run them in parallel, to avoid timeouts and
+    # speed up a bit the request.
+    chunks = split_list(ref_paths, page_size)
+    with ThreadPoolExecutor(max_workers=max_workers) as executor:
+        futures = []
+        for ref_paths_chunk in chunks:
+            future = executor.submit(
+                rpc_invoke, 'check_ref_permissions',
+                {'key': rpc_key, 'paths': ref_paths_chunk})
+            futures.append(future)
+
+        for future in futures:
+            rule_lines = future.result()
+            permissions.update({
+                base64.b64decode(path.encode('UTF-8')): permissions
+                for path, permissions in rule_lines.items()})
     return permissions
 
 
@@ -175,7 +207,7 @@ def send_mp_url(received_line):
         pushed_branch = ref[len(b'refs/heads/'):]
         if not is_default_branch(pushed_branch):
             mp_url = rpc_invoke(
-                sock, 'get_mp_url',
+                'get_mp_url',
                 {'key': rpc_key,
                  'branch': six.ensure_text(pushed_branch, "UTF-8")})
             if mp_url is not None:
@@ -193,14 +225,12 @@ if __name__ == '__main__':
     stdin = fd_buffer(sys.stdin)
     stdout = fd_buffer(sys.stdout)
     rpc_key = os.environ['TURNIP_HOOK_RPC_KEY']
-    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-    sock.connect(os.environ['TURNIP_HOOK_RPC_SOCK'])
     hook = os.path.basename(sys.argv[0])
     if hook == 'pre-receive':
         # Verify the proposed changes against rules from the server.
         raw_paths = stdin.readlines()
         ref_paths = [p.rstrip(b'\n').split(b' ', 2)[2] for p in raw_paths]
-        rule_lines = check_ref_permissions(sock, rpc_key, ref_paths)
+        rule_lines = check_ref_permissions(rpc_key, ref_paths)
         errors = match_rules(rule_lines, raw_paths)
         for error in errors:
             stdout.write(error + b'\n')
@@ -210,13 +240,13 @@ if __name__ == '__main__':
         # Details of the changes aren't currently included.
         lines = stdin.readlines()
         if lines:
-            rpc_invoke(sock, 'notify_push', {'key': rpc_key})
+            rpc_invoke('notify_push', {'key': rpc_key})
         if len(lines) == 1:
             send_mp_url(lines[0])
         sys.exit(0)
     elif hook == 'update':
         ref = sys.argv[1]
-        rule_lines = check_ref_permissions(sock, rpc_key, [ref])
+        rule_lines = check_ref_permissions(rpc_key, [ref])
         errors = match_update_rules(rule_lines, sys.argv[1:4])
         for error in errors:
             stdout.write(error + b'\n')
diff --git a/turnip/pack/tests/test_hooks.py b/turnip/pack/tests/test_hooks.py
index b383d6b..0bd2aaa 100644
--- a/turnip/pack/tests/test_hooks.py
+++ b/turnip/pack/tests/test_hooks.py
@@ -17,6 +17,8 @@ from fixtures import (
     MonkeyPatch,
     TempDir,
     )
+
+
 try:
     from unittest import mock
 except ImportError:
@@ -35,6 +37,7 @@ from turnip.pack import hookrpc
 from turnip.pack.helpers import ensure_hooks
 from turnip.pack.hooks import hook
 from turnip.pack.hooks.hook import split_list, check_ref_permissions
+from turnip.tests.test_helpers import MockThreadPoolExecutor
 
 
 class HookProcessProtocol(protocol.ProcessProtocol):
@@ -472,10 +475,19 @@ class TestSplitRefPathsCalls(TestCase):
             {encode(b'master'): [], encode(b'develop'): []},
             {encode(b'head'): []}
         ]
-        sock = mock.Mock()
-        # Call it with page size = 2
-        result = check_ref_permissions(
-            sock, "rpc-key", [b"master", b"develop", b"head"], 2)
+
+        # Call it with page size = 2 and maximum 10 parallel workers.
+        pool = MockThreadPoolExecutor()
+        executor = mock.Mock()
+        executor.return_value = pool
+        with mock.patch('turnip.pack.hooks.hook.ThreadPoolExecutor', executor):
+            result = check_ref_permissions(
+                "rpc-key", [b"master", b"develop", b"head"], 2, 10)
+
+        # Make sure it executed in parallel using ThreadPoolExecutor.
+        self.assertEqual(1, executor.call_count)
+        self.assertEqual(mock.call(max_workers=10), executor.call_args)
+        self.assertEqual(2, len(pool.futures))
 
         # The final result should have been joined into.
         self.assertEqual(
@@ -483,13 +495,9 @@ class TestSplitRefPathsCalls(TestCase):
 
         # Check that it called correctly the rpc_invoke method.
         self.assertEqual(rpc_invoke.call_args_list, [
-            mock.call(
-                sock, 'check_ref_permissions', {
-                    'key': 'rpc-key', 'paths': [
-                        encode(b"master"), encode(b"develop")]
-                }),
-            mock.call(
-                sock, 'check_ref_permissions', {
-                    'key': 'rpc-key', 'paths': [encode(b"head")]
-                }),
+            mock.call('check_ref_permissions', {
+                'key': 'rpc-key', 'paths': [
+                    encode(b"master"), encode(b"develop")]}),
+            mock.call('check_ref_permissions', {
+                'key': 'rpc-key', 'paths': [encode(b"head")]}),
         ])
diff --git a/turnip/tests/test_helpers.py b/turnip/tests/test_helpers.py
index 9dc4101..23fda1e 100644
--- a/turnip/tests/test_helpers.py
+++ b/turnip/tests/test_helpers.py
@@ -39,3 +39,44 @@ class TestComposePath(TestCase):
             ValueError, helpers.compose_path, b'/foo', b'../bar')
         self.assertRaises(
             ValueError, helpers.compose_path, b'/foo', b'/foo/../../bar')
+
+
+class MockThreadPoolExecutor:
+    """A mock class for concurrent.futures.*PoolExecutor that executes
+    everything sequentially, keeping every generated Future in
+    self.futures.
+    """
+    class Future:
+        def __init__(self, value=None, exception=None):
+            self.value = value
+            self.exeception = exception
+
+        def result(self):
+            if self.exeception:
+                raise self.exeception
+            return self.value
+
+    def __init__(self, **kwargs):
+        self.futures = []
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, exc_traceback):
+        pass
+
+    def submit(self, fn, *args, **kwargs):
+        # execute functions in series, keeping the generated futures in
+        # self.future list.
+        exception = None
+        result = None
+        try:
+            result = fn(*args, **kwargs)
+        except Exception as e:
+            exception = e
+        future = MockThreadPoolExecutor.Future(result, exception)
+        self.futures.append(future)
+        return future
+
+    def shutdown(self, wait=True):
+        pass

Follow ups