launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #24781
[Merge] ~pappacena/turnip:parallel-rpc-call into turnip:master
Thiago F. Pappacena has proposed merging ~pappacena/turnip:parallel-rpc-call into turnip:master with ~pappacena/turnip:paginated-check-refs-permissions as a prerequisite.
Commit message:
Running branches permission check in parallel, in order to speedup git push operation for large sets of tags & branches.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~pappacena/turnip/+git/turnip/+merge/384684
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/turnip:parallel-rpc-call into turnip:master.
diff --git a/requirements.txt b/requirements.txt
index cb86dfe..4f71780 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -11,6 +11,7 @@ cornice==3.6.1
cryptography==2.8
docutils==0.14
enum34==1.1.9
+futures==3.3.0
envdir==0.7
extras==1.0.0
fixtures==3.0.0
diff --git a/setup.py b/setup.py
index b5821f7..bb22bae 100755
--- a/setup.py
+++ b/setup.py
@@ -21,6 +21,7 @@ requires = [
'contextlib2',
'cornice',
'enum34; python_version < "3.4"',
+ 'futures; python_version < "3.2"',
'lazr.sshserver>=0.1.7',
'Paste',
'pygit2>=0.27.4,<0.28.0',
diff --git a/turnip/pack/hooks/hook.py b/turnip/pack/hooks/hook.py
index b1348fa..a5ba739 100755
--- a/turnip/pack/hooks/hook.py
+++ b/turnip/pack/hooks/hook.py
@@ -10,6 +10,7 @@ from __future__ import (
)
import base64
+from concurrent.futures import ThreadPoolExecutor
import json
import os
import socket
@@ -24,6 +25,20 @@ from turnip.compat.files import fd_buffer
# that currently causes CFFI warnings to be returned to the client.
GIT_OID_HEX_ZERO = '0'*40
+# Be careful when adjusting these numbers. A page size too big may lead to
+# staled connections and timeouts, while small page sizes could lead to poor
+# performance.
+# For the amount of workers, too many workers might beat too hard on Launchpad,
+# and too few of them might lead to poor performance.
+XML_RPC_PAGE_SIZE = 25
+XML_RPC_MAX_WORKERS = 4
+
+
+def get_socket():
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.connect(os.environ['TURNIP_HOOK_RPC_SOCK'])
+ return sock
+
def check_ancestor(old, new):
# This is a delete, setting the new ref.
@@ -130,7 +145,8 @@ def netstring_recv(sock):
return bytes(s)
-def rpc_invoke(sock, method, args):
+def rpc_invoke(method, args):
+ sock = get_socket()
msg = dict(args)
assert 'op' not in msg
msg['op'] = method
@@ -147,17 +163,33 @@ def split_list(lst, n):
return [lst[i:i + n] for i in range(0, len(lst), n)]
-def check_ref_permissions(sock, rpc_key, ref_paths, page_size=25):
+def check_ref_permissions(rpc_key, ref_paths, page_size=XML_RPC_PAGE_SIZE,
+ max_workers=XML_RPC_MAX_WORKERS):
+ """Run XML RPC call to check refs permissions.
+
+ For large sets of ref_paths, this method paginates them to do several
+ XML RPC calls of maximun `page_size` refs in each, and run it in
+ parallel, `max_workers` at a time.
+ """
ref_paths = [base64.b64encode(path).decode('UTF-8') for path in ref_paths]
permissions = {}
- # Paginate the rpc calls to avoid timeouts.
- for ref_paths_chunk in split_list(ref_paths, page_size):
- rule_lines = rpc_invoke(
- sock, 'check_ref_permissions',
- {'key': rpc_key, 'paths': ref_paths_chunk})
- permissions.update({
- base64.b64decode(path.encode('UTF-8')): permissions
- for path, permissions in rule_lines.items()})
+
+ # Paginate the rpc calls and run them in parallel, to avoid timeouts and
+ # speed up a bit the request.
+ chunks = split_list(ref_paths, page_size)
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
+ futures = []
+ for ref_paths_chunk in chunks:
+ future = executor.submit(
+ rpc_invoke, 'check_ref_permissions',
+ {'key': rpc_key, 'paths': ref_paths_chunk})
+ futures.append(future)
+
+ for future in futures:
+ rule_lines = future.result()
+ permissions.update({
+ base64.b64decode(path.encode('UTF-8')): permissions
+ for path, permissions in rule_lines.items()})
return permissions
@@ -175,7 +207,7 @@ def send_mp_url(received_line):
pushed_branch = ref[len(b'refs/heads/'):]
if not is_default_branch(pushed_branch):
mp_url = rpc_invoke(
- sock, 'get_mp_url',
+ 'get_mp_url',
{'key': rpc_key,
'branch': six.ensure_text(pushed_branch, "UTF-8")})
if mp_url is not None:
@@ -193,14 +225,12 @@ if __name__ == '__main__':
stdin = fd_buffer(sys.stdin)
stdout = fd_buffer(sys.stdout)
rpc_key = os.environ['TURNIP_HOOK_RPC_KEY']
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- sock.connect(os.environ['TURNIP_HOOK_RPC_SOCK'])
hook = os.path.basename(sys.argv[0])
if hook == 'pre-receive':
# Verify the proposed changes against rules from the server.
raw_paths = stdin.readlines()
ref_paths = [p.rstrip(b'\n').split(b' ', 2)[2] for p in raw_paths]
- rule_lines = check_ref_permissions(sock, rpc_key, ref_paths)
+ rule_lines = check_ref_permissions(rpc_key, ref_paths)
errors = match_rules(rule_lines, raw_paths)
for error in errors:
stdout.write(error + b'\n')
@@ -210,13 +240,13 @@ if __name__ == '__main__':
# Details of the changes aren't currently included.
lines = stdin.readlines()
if lines:
- rpc_invoke(sock, 'notify_push', {'key': rpc_key})
+ rpc_invoke('notify_push', {'key': rpc_key})
if len(lines) == 1:
send_mp_url(lines[0])
sys.exit(0)
elif hook == 'update':
ref = sys.argv[1]
- rule_lines = check_ref_permissions(sock, rpc_key, [ref])
+ rule_lines = check_ref_permissions(rpc_key, [ref])
errors = match_update_rules(rule_lines, sys.argv[1:4])
for error in errors:
stdout.write(error + b'\n')
diff --git a/turnip/pack/tests/test_hooks.py b/turnip/pack/tests/test_hooks.py
index b383d6b..0bd2aaa 100644
--- a/turnip/pack/tests/test_hooks.py
+++ b/turnip/pack/tests/test_hooks.py
@@ -17,6 +17,8 @@ from fixtures import (
MonkeyPatch,
TempDir,
)
+
+
try:
from unittest import mock
except ImportError:
@@ -35,6 +37,7 @@ from turnip.pack import hookrpc
from turnip.pack.helpers import ensure_hooks
from turnip.pack.hooks import hook
from turnip.pack.hooks.hook import split_list, check_ref_permissions
+from turnip.tests.test_helpers import MockThreadPoolExecutor
class HookProcessProtocol(protocol.ProcessProtocol):
@@ -472,10 +475,19 @@ class TestSplitRefPathsCalls(TestCase):
{encode(b'master'): [], encode(b'develop'): []},
{encode(b'head'): []}
]
- sock = mock.Mock()
- # Call it with page size = 2
- result = check_ref_permissions(
- sock, "rpc-key", [b"master", b"develop", b"head"], 2)
+
+ # Call it with page size = 2 and maximum 10 parallel workers.
+ pool = MockThreadPoolExecutor()
+ executor = mock.Mock()
+ executor.return_value = pool
+ with mock.patch('turnip.pack.hooks.hook.ThreadPoolExecutor', executor):
+ result = check_ref_permissions(
+ "rpc-key", [b"master", b"develop", b"head"], 2, 10)
+
+ # Make sure it executed in parallel using ThreadPoolExecutor.
+ self.assertEqual(1, executor.call_count)
+ self.assertEqual(mock.call(max_workers=10), executor.call_args)
+ self.assertEqual(2, len(pool.futures))
# The final result should have been joined into.
self.assertEqual(
@@ -483,13 +495,9 @@ class TestSplitRefPathsCalls(TestCase):
# Check that it called correctly the rpc_invoke method.
self.assertEqual(rpc_invoke.call_args_list, [
- mock.call(
- sock, 'check_ref_permissions', {
- 'key': 'rpc-key', 'paths': [
- encode(b"master"), encode(b"develop")]
- }),
- mock.call(
- sock, 'check_ref_permissions', {
- 'key': 'rpc-key', 'paths': [encode(b"head")]
- }),
+ mock.call('check_ref_permissions', {
+ 'key': 'rpc-key', 'paths': [
+ encode(b"master"), encode(b"develop")]}),
+ mock.call('check_ref_permissions', {
+ 'key': 'rpc-key', 'paths': [encode(b"head")]}),
])
diff --git a/turnip/tests/test_helpers.py b/turnip/tests/test_helpers.py
index 9dc4101..23fda1e 100644
--- a/turnip/tests/test_helpers.py
+++ b/turnip/tests/test_helpers.py
@@ -39,3 +39,44 @@ class TestComposePath(TestCase):
ValueError, helpers.compose_path, b'/foo', b'../bar')
self.assertRaises(
ValueError, helpers.compose_path, b'/foo', b'/foo/../../bar')
+
+
+class MockThreadPoolExecutor:
+ """A mock class for concurrent.futures.*PoolExecutor that executes
+ everything sequentially, keeping every generated Future in
+ self.futures.
+ """
+ class Future:
+ def __init__(self, value=None, exception=None):
+ self.value = value
+ self.exeception = exception
+
+ def result(self):
+ if self.exeception:
+ raise self.exeception
+ return self.value
+
+ def __init__(self, **kwargs):
+ self.futures = []
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_traceback):
+ pass
+
+ def submit(self, fn, *args, **kwargs):
+ # execute functions in series, keeping the generated futures in
+ # self.future list.
+ exception = None
+ result = None
+ try:
+ result = fn(*args, **kwargs)
+ except Exception as e:
+ exception = e
+ future = MockThreadPoolExecutor.Future(result, exception)
+ self.futures.append(future)
+ return future
+
+ def shutdown(self, wait=True):
+ pass
Follow ups