← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/turnip:hookrpc-test into turnip:master

 

Colin Watson has proposed merging ~cjwatson/turnip:hookrpc-test into turnip:master.

Commit message:
Add unit tests for HookRPCHandler

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/turnip/+git/turnip/+merge/359119

It's tested by way of functional tests, but unit tests are easier to deal with in some cases.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/turnip:hookrpc-test into turnip:master.
diff --git a/turnip/pack/tests/fake_servers.py b/turnip/pack/tests/fake_servers.py
new file mode 100644
index 0000000..1e0f310
--- /dev/null
+++ b/turnip/pack/tests/fake_servers.py
@@ -0,0 +1,91 @@
+# Copyright 2015-2018 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+from __future__ import (
+    absolute_import,
+    print_function,
+    unicode_literals,
+    )
+
+from collections import defaultdict
+import hashlib
+
+from lazr.sshserver.auth import NoSuchPersonWithName
+from twisted.web import xmlrpc
+
+__all__ = [
+    "FakeAuthServerService",
+    "FakeVirtInfoService",
+    ]
+
+
+class FakeAuthServerService(xmlrpc.XMLRPC):
+    """A fake version of the Launchpad authserver service."""
+
+    def __init__(self):
+        xmlrpc.XMLRPC.__init__(self)
+        self.keys = defaultdict(list)
+
+    def addSSHKey(self, username, public_key_path):
+        with open(public_key_path, "r") as f:
+            public_key = f.read()
+        kind, keytext, _ = public_key.split(" ", 2)
+        if kind == "ssh-rsa":
+            keytype = "RSA"
+        elif kind == "ssh-dss":
+            keytype = "DSA"
+        else:
+            raise Exception("Unrecognised public key type %s" % kind)
+        self.keys[username].append((keytype, keytext))
+
+    def xmlrpc_getUserAndSSHKeys(self, username):
+        if username not in self.keys:
+            raise NoSuchPersonWithName(username)
+        return {
+            "id": hash(username) % (2 ** 31),
+            "name": username,
+            "keys": self.keys[username],
+            }
+
+
+class FakeVirtInfoService(xmlrpc.XMLRPC):
+    """A trivial virt information XML-RPC service.
+
+    Translates a path to its SHA-256 hash. The repo is writable if the
+    path is prefixed with '/+rw'
+    """
+
+    def __init__(self, *args, **kwargs):
+        xmlrpc.XMLRPC.__init__(self, *args, **kwargs)
+        self.require_auth = False
+        self.translations = []
+        self.authentications = []
+        self.push_notifications = []
+        self.ref_permissions_checks = []
+        self.ref_permissions = {}
+        self.ref_permissions_as_dict = False
+
+    def xmlrpc_translatePath(self, pathname, permission, auth_params):
+        if self.require_auth and 'user' not in auth_params:
+            raise xmlrpc.Fault(3, "Unauthorized")
+
+        self.translations.append((pathname, permission, auth_params))
+        writable = False
+        if pathname.startswith('/+rw'):
+            writable = True
+            pathname = pathname[4:]
+
+        if permission != b'read' and not writable:
+            raise xmlrpc.Fault(2, "Repository is read-only")
+        return {'path': hashlib.sha256(pathname).hexdigest()}
+
+    def xmlrpc_authenticateWithPassword(self, username, password):
+        self.authentications.append((username, password))
+        return {'user': username}
+
+    def xmlrpc_notify(self, path):
+        self.push_notifications.append(path)
+
+    def xmlrpc_checkRefPermissions(self, path, ref_paths, auth_params):
+        self.ref_permissions_checks.append((path, ref_paths, auth_params))
+        return self.ref_permissions
diff --git a/turnip/pack/tests/test_functional.py b/turnip/pack/tests/test_functional.py
index 142de34..2fa1782 100644
--- a/turnip/pack/tests/test_functional.py
+++ b/turnip/pack/tests/test_functional.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2015 Canonical Ltd.  This software is licensed under the
+# Copyright 2015-2018 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 from __future__ import (
@@ -9,7 +9,6 @@ from __future__ import (
     )
 
 import base64
-from collections import defaultdict
 import hashlib
 import io
 import os
@@ -32,7 +31,6 @@ from fixtures import (
     EnvironmentVariable,
     TempDir,
     )
-from lazr.sshserver.auth import NoSuchPersonWithName
 from pygit2 import GIT_OID_HEX_ZERO
 from testtools import TestCase
 from testtools.content import text_content
@@ -62,80 +60,13 @@ from turnip.pack.hookrpc import (
     )
 from turnip.pack.http import SmartHTTPFrontendResource
 from turnip.pack.ssh import SmartSSHService
+from turnip.pack.tests.fake_servers import (
+    FakeAuthServerService,
+    FakeVirtInfoService,
+    )
 from turnip.version_info import version_info
 
 
-class FakeAuthServerService(xmlrpc.XMLRPC):
-    """A fake version of the Launchpad authserver service."""
-
-    def __init__(self):
-        xmlrpc.XMLRPC.__init__(self)
-        self.keys = defaultdict(list)
-
-    def addSSHKey(self, username, public_key_path):
-        with open(public_key_path, "r") as f:
-            public_key = f.read()
-        kind, keytext, _ = public_key.split(" ", 2)
-        if kind == "ssh-rsa":
-            keytype = "RSA"
-        elif kind == "ssh-dss":
-            keytype = "DSA"
-        else:
-            raise Exception("Unrecognised public key type %s" % kind)
-        self.keys[username].append((keytype, keytext))
-
-    def xmlrpc_getUserAndSSHKeys(self, username):
-        if username not in self.keys:
-            raise NoSuchPersonWithName(username)
-        return {
-            "id": hash(username) % (2 ** 31),
-            "name": username,
-            "keys": self.keys[username],
-            }
-
-
-class FakeVirtInfoService(xmlrpc.XMLRPC):
-    """A trivial virt information XML-RPC service.
-
-    Translates a path to its SHA-256 hash. The repo is writable if the
-    path is prefixed with '/+rw'
-    """
-
-    def __init__(self, *args, **kwargs):
-        xmlrpc.XMLRPC.__init__(self, *args, **kwargs)
-        self.require_auth = False
-        self.translations = []
-        self.authentications = []
-        self.push_notifications = []
-        self.ref_permissions_checks = []
-        self.ref_permissions = {}
-
-    def xmlrpc_translatePath(self, pathname, permission, auth_params):
-        if self.require_auth and 'user' not in auth_params:
-            raise xmlrpc.Fault(3, "Unauthorized")
-
-        self.translations.append((pathname, permission, auth_params))
-        writable = False
-        if pathname.startswith('/+rw'):
-            writable = True
-            pathname = pathname[4:]
-
-        if permission != b'read' and not writable:
-            raise xmlrpc.Fault(2, "Repository is read-only")
-        return {'path': hashlib.sha256(pathname).hexdigest()}
-
-    def xmlrpc_authenticateWithPassword(self, username, password):
-        self.authentications.append((username, password))
-        return {'user': username}
-
-    def xmlrpc_notify(self, path):
-        self.push_notifications.append(path)
-
-    def xmlrpc_checkRefPermissions(self, path, ref_paths, auth_params):
-        self.ref_permissions_checks.append((path, ref_paths, auth_params))
-        return self.ref_permissions
-
-
 class FunctionalTestMixin(object):
 
     run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=30)
diff --git a/turnip/pack/tests/test_hookrpc.py b/turnip/pack/tests/test_hookrpc.py
index aca320a..9af0408 100644
--- a/turnip/pack/tests/test_hookrpc.py
+++ b/turnip/pack/tests/test_hookrpc.py
@@ -1,4 +1,4 @@
-# Copyright 2015 Canonical Ltd.  This software is licensed under the
+# Copyright 2015-2018 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 from __future__ import (
@@ -7,11 +7,20 @@ from __future__ import (
     unicode_literals,
     )
 
+import contextlib
+import uuid
+
 from testtools import TestCase
-from twisted.internet import defer
+from testtools.deferredruntest import AsynchronousDeferredRunTest
+from twisted.internet import (
+    defer,
+    reactor,
+    )
 from twisted.test import proto_helpers
+from twisted.web import server
 
 from turnip.pack import hookrpc
+from turnip.pack.tests.fake_servers import FakeVirtInfoService
 
 
 class DummyJSONNetstringProtocol(hookrpc.JSONNetstringProtocol):
@@ -127,3 +136,71 @@ class TestRPCServerProtocol(TestCase):
         self.assertEqual(
             b'42:{"error": "Command must be a JSON object"},',
             self.transport.value())
+
+
+class TestHookRPCHandler(TestCase):
+    """Test the hook RPC handler."""
+
+    run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=5)
+
+    def setUp(self):
+        super(TestHookRPCHandler, self).setUp()
+        self.virtinfo = FakeVirtInfoService(allowNone=True)
+        self.virtinfo_listener = reactor.listenTCP(
+            0, server.Site(self.virtinfo))
+        self.virtinfo_port = self.virtinfo_listener.getHost().port
+        self.virtinfo_url = b'http://localhost:%d/' % self.virtinfo_port
+        self.addCleanup(self.virtinfo_listener.stopListening)
+        self.hookrpc_handler = hookrpc.HookRPCHandler(self.virtinfo_url)
+
+    @contextlib.contextmanager
+    def registeredKey(self, path, auth_params=None, permissions=None):
+        key = str(uuid.uuid4())
+        self.hookrpc_handler.registerKey(key, path, auth_params or {})
+        if permissions is not None:
+            self.hookrpc_handler.ref_permissions[key] = permissions
+        try:
+            yield key
+        finally:
+            self.hookrpc_handler.unregisterKey(key)
+
+    def assertCheckedRefPermissions(self, path, ref_paths, auth_params):
+        self.assertEqual(
+            [(path, ref_paths, auth_params)],
+            self.virtinfo.ref_permissions_checks)
+
+    @defer.inlineCallbacks
+    def test_checkRefPermissions_fresh(self):
+        self.virtinfo.ref_permissions = {
+            'refs/heads/master': ['push'],
+            'refs/heads/next': ['force_push'],
+            }
+        with self.registeredKey('/translated', auth_params={'uid': 42}) as key:
+            permissions = yield self.hookrpc_handler.checkRefPermissions(
+                None,
+                {'key': key, 'paths': sorted(self.virtinfo.ref_permissions)})
+        self.assertEqual(self.virtinfo.ref_permissions, permissions)
+        self.assertCheckedRefPermissions(
+            '/translated', [b'refs/heads/master', b'refs/heads/next'],
+            {'uid': 42})
+
+    @defer.inlineCallbacks
+    def test_checkRefPermissions_cached(self):
+        cached_ref_permissions = {
+            'refs/heads/master': ['push'],
+            'refs/heads/next': ['force_push'],
+            }
+        with self.registeredKey(
+                '/translated', auth_params={'uid': 42},
+                permissions=cached_ref_permissions) as key:
+            permissions = yield self.hookrpc_handler.checkRefPermissions(
+                None, {'key': key, 'paths': ['refs/heads/master']})
+        expected_permissions = {'refs/heads/master': ['push']}
+        self.assertEqual(expected_permissions, permissions)
+        self.assertEqual([], self.virtinfo.ref_permissions_checks)
+
+    @defer.inlineCallbacks
+    def test_notifyPush(self):
+        with self.registeredKey('/translated') as key:
+            yield self.hookrpc_handler.notifyPush(None, {'key': key})
+        self.assertEqual(['/translated'], self.virtinfo.push_notifications)

Follow ups