launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #23094
[Merge] ~cjwatson/turnip:hookrpc-test into turnip:master
Colin Watson has proposed merging ~cjwatson/turnip:hookrpc-test into turnip:master.
Commit message:
Add unit tests for HookRPCHandler
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/turnip/+git/turnip/+merge/359119
It's tested by way of functional tests, but unit tests are easier to deal with in some cases.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/turnip:hookrpc-test into turnip:master.
diff --git a/turnip/pack/tests/fake_servers.py b/turnip/pack/tests/fake_servers.py
new file mode 100644
index 0000000..1e0f310
--- /dev/null
+++ b/turnip/pack/tests/fake_servers.py
@@ -0,0 +1,91 @@
+# Copyright 2015-2018 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+from __future__ import (
+ absolute_import,
+ print_function,
+ unicode_literals,
+ )
+
+from collections import defaultdict
+import hashlib
+
+from lazr.sshserver.auth import NoSuchPersonWithName
+from twisted.web import xmlrpc
+
+__all__ = [
+ "FakeAuthServerService",
+ "FakeVirtInfoService",
+ ]
+
+
+class FakeAuthServerService(xmlrpc.XMLRPC):
+ """A fake version of the Launchpad authserver service."""
+
+ def __init__(self):
+ xmlrpc.XMLRPC.__init__(self)
+ self.keys = defaultdict(list)
+
+ def addSSHKey(self, username, public_key_path):
+ with open(public_key_path, "r") as f:
+ public_key = f.read()
+ kind, keytext, _ = public_key.split(" ", 2)
+ if kind == "ssh-rsa":
+ keytype = "RSA"
+ elif kind == "ssh-dss":
+ keytype = "DSA"
+ else:
+ raise Exception("Unrecognised public key type %s" % kind)
+ self.keys[username].append((keytype, keytext))
+
+ def xmlrpc_getUserAndSSHKeys(self, username):
+ if username not in self.keys:
+ raise NoSuchPersonWithName(username)
+ return {
+ "id": hash(username) % (2 ** 31),
+ "name": username,
+ "keys": self.keys[username],
+ }
+
+
+class FakeVirtInfoService(xmlrpc.XMLRPC):
+ """A trivial virt information XML-RPC service.
+
+ Translates a path to its SHA-256 hash. The repo is writable if the
+ path is prefixed with '/+rw'
+ """
+
+ def __init__(self, *args, **kwargs):
+ xmlrpc.XMLRPC.__init__(self, *args, **kwargs)
+ self.require_auth = False
+ self.translations = []
+ self.authentications = []
+ self.push_notifications = []
+ self.ref_permissions_checks = []
+ self.ref_permissions = {}
+ self.ref_permissions_as_dict = False
+
+ def xmlrpc_translatePath(self, pathname, permission, auth_params):
+ if self.require_auth and 'user' not in auth_params:
+ raise xmlrpc.Fault(3, "Unauthorized")
+
+ self.translations.append((pathname, permission, auth_params))
+ writable = False
+ if pathname.startswith('/+rw'):
+ writable = True
+ pathname = pathname[4:]
+
+ if permission != b'read' and not writable:
+ raise xmlrpc.Fault(2, "Repository is read-only")
+ return {'path': hashlib.sha256(pathname).hexdigest()}
+
+ def xmlrpc_authenticateWithPassword(self, username, password):
+ self.authentications.append((username, password))
+ return {'user': username}
+
+ def xmlrpc_notify(self, path):
+ self.push_notifications.append(path)
+
+ def xmlrpc_checkRefPermissions(self, path, ref_paths, auth_params):
+ self.ref_permissions_checks.append((path, ref_paths, auth_params))
+ return self.ref_permissions
diff --git a/turnip/pack/tests/test_functional.py b/turnip/pack/tests/test_functional.py
index 142de34..2fa1782 100644
--- a/turnip/pack/tests/test_functional.py
+++ b/turnip/pack/tests/test_functional.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2015 Canonical Ltd. This software is licensed under the
+# Copyright 2015-2018 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
from __future__ import (
@@ -9,7 +9,6 @@ from __future__ import (
)
import base64
-from collections import defaultdict
import hashlib
import io
import os
@@ -32,7 +31,6 @@ from fixtures import (
EnvironmentVariable,
TempDir,
)
-from lazr.sshserver.auth import NoSuchPersonWithName
from pygit2 import GIT_OID_HEX_ZERO
from testtools import TestCase
from testtools.content import text_content
@@ -62,80 +60,13 @@ from turnip.pack.hookrpc import (
)
from turnip.pack.http import SmartHTTPFrontendResource
from turnip.pack.ssh import SmartSSHService
+from turnip.pack.tests.fake_servers import (
+ FakeAuthServerService,
+ FakeVirtInfoService,
+ )
from turnip.version_info import version_info
-class FakeAuthServerService(xmlrpc.XMLRPC):
- """A fake version of the Launchpad authserver service."""
-
- def __init__(self):
- xmlrpc.XMLRPC.__init__(self)
- self.keys = defaultdict(list)
-
- def addSSHKey(self, username, public_key_path):
- with open(public_key_path, "r") as f:
- public_key = f.read()
- kind, keytext, _ = public_key.split(" ", 2)
- if kind == "ssh-rsa":
- keytype = "RSA"
- elif kind == "ssh-dss":
- keytype = "DSA"
- else:
- raise Exception("Unrecognised public key type %s" % kind)
- self.keys[username].append((keytype, keytext))
-
- def xmlrpc_getUserAndSSHKeys(self, username):
- if username not in self.keys:
- raise NoSuchPersonWithName(username)
- return {
- "id": hash(username) % (2 ** 31),
- "name": username,
- "keys": self.keys[username],
- }
-
-
-class FakeVirtInfoService(xmlrpc.XMLRPC):
- """A trivial virt information XML-RPC service.
-
- Translates a path to its SHA-256 hash. The repo is writable if the
- path is prefixed with '/+rw'
- """
-
- def __init__(self, *args, **kwargs):
- xmlrpc.XMLRPC.__init__(self, *args, **kwargs)
- self.require_auth = False
- self.translations = []
- self.authentications = []
- self.push_notifications = []
- self.ref_permissions_checks = []
- self.ref_permissions = {}
-
- def xmlrpc_translatePath(self, pathname, permission, auth_params):
- if self.require_auth and 'user' not in auth_params:
- raise xmlrpc.Fault(3, "Unauthorized")
-
- self.translations.append((pathname, permission, auth_params))
- writable = False
- if pathname.startswith('/+rw'):
- writable = True
- pathname = pathname[4:]
-
- if permission != b'read' and not writable:
- raise xmlrpc.Fault(2, "Repository is read-only")
- return {'path': hashlib.sha256(pathname).hexdigest()}
-
- def xmlrpc_authenticateWithPassword(self, username, password):
- self.authentications.append((username, password))
- return {'user': username}
-
- def xmlrpc_notify(self, path):
- self.push_notifications.append(path)
-
- def xmlrpc_checkRefPermissions(self, path, ref_paths, auth_params):
- self.ref_permissions_checks.append((path, ref_paths, auth_params))
- return self.ref_permissions
-
-
class FunctionalTestMixin(object):
run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=30)
diff --git a/turnip/pack/tests/test_hookrpc.py b/turnip/pack/tests/test_hookrpc.py
index aca320a..9af0408 100644
--- a/turnip/pack/tests/test_hookrpc.py
+++ b/turnip/pack/tests/test_hookrpc.py
@@ -1,4 +1,4 @@
-# Copyright 2015 Canonical Ltd. This software is licensed under the
+# Copyright 2015-2018 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
from __future__ import (
@@ -7,11 +7,20 @@ from __future__ import (
unicode_literals,
)
+import contextlib
+import uuid
+
from testtools import TestCase
-from twisted.internet import defer
+from testtools.deferredruntest import AsynchronousDeferredRunTest
+from twisted.internet import (
+ defer,
+ reactor,
+ )
from twisted.test import proto_helpers
+from twisted.web import server
from turnip.pack import hookrpc
+from turnip.pack.tests.fake_servers import FakeVirtInfoService
class DummyJSONNetstringProtocol(hookrpc.JSONNetstringProtocol):
@@ -127,3 +136,71 @@ class TestRPCServerProtocol(TestCase):
self.assertEqual(
b'42:{"error": "Command must be a JSON object"},',
self.transport.value())
+
+
+class TestHookRPCHandler(TestCase):
+ """Test the hook RPC handler."""
+
+ run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=5)
+
+ def setUp(self):
+ super(TestHookRPCHandler, self).setUp()
+ self.virtinfo = FakeVirtInfoService(allowNone=True)
+ self.virtinfo_listener = reactor.listenTCP(
+ 0, server.Site(self.virtinfo))
+ self.virtinfo_port = self.virtinfo_listener.getHost().port
+ self.virtinfo_url = b'http://localhost:%d/' % self.virtinfo_port
+ self.addCleanup(self.virtinfo_listener.stopListening)
+ self.hookrpc_handler = hookrpc.HookRPCHandler(self.virtinfo_url)
+
+ @contextlib.contextmanager
+ def registeredKey(self, path, auth_params=None, permissions=None):
+ key = str(uuid.uuid4())
+ self.hookrpc_handler.registerKey(key, path, auth_params or {})
+ if permissions is not None:
+ self.hookrpc_handler.ref_permissions[key] = permissions
+ try:
+ yield key
+ finally:
+ self.hookrpc_handler.unregisterKey(key)
+
+ def assertCheckedRefPermissions(self, path, ref_paths, auth_params):
+ self.assertEqual(
+ [(path, ref_paths, auth_params)],
+ self.virtinfo.ref_permissions_checks)
+
+ @defer.inlineCallbacks
+ def test_checkRefPermissions_fresh(self):
+ self.virtinfo.ref_permissions = {
+ 'refs/heads/master': ['push'],
+ 'refs/heads/next': ['force_push'],
+ }
+ with self.registeredKey('/translated', auth_params={'uid': 42}) as key:
+ permissions = yield self.hookrpc_handler.checkRefPermissions(
+ None,
+ {'key': key, 'paths': sorted(self.virtinfo.ref_permissions)})
+ self.assertEqual(self.virtinfo.ref_permissions, permissions)
+ self.assertCheckedRefPermissions(
+ '/translated', [b'refs/heads/master', b'refs/heads/next'],
+ {'uid': 42})
+
+ @defer.inlineCallbacks
+ def test_checkRefPermissions_cached(self):
+ cached_ref_permissions = {
+ 'refs/heads/master': ['push'],
+ 'refs/heads/next': ['force_push'],
+ }
+ with self.registeredKey(
+ '/translated', auth_params={'uid': 42},
+ permissions=cached_ref_permissions) as key:
+ permissions = yield self.hookrpc_handler.checkRefPermissions(
+ None, {'key': key, 'paths': ['refs/heads/master']})
+ expected_permissions = {'refs/heads/master': ['push']}
+ self.assertEqual(expected_permissions, permissions)
+ self.assertEqual([], self.virtinfo.ref_permissions_checks)
+
+ @defer.inlineCallbacks
+ def test_notifyPush(self):
+ with self.registeredKey('/translated') as key:
+ yield self.hookrpc_handler.notifyPush(None, {'key': key})
+ self.assertEqual(['/translated'], self.virtinfo.push_notifications)
Follow ups