← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:status-check-view into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:status-check-view into launchpad:master.

Commit message:
Add a /_status/check view for use by Talisker

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/405875
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:status-check-view into launchpad:master.
diff --git a/lib/lp/services/webapp/configure.zcml b/lib/lp/services/webapp/configure.zcml
index 95f495d..11749a3 100644
--- a/lib/lp/services/webapp/configure.zcml
+++ b/lib/lp/services/webapp/configure.zcml
@@ -463,4 +463,15 @@
         permission="zope.Public"
         class="lp.services.webapp.haproxy.HAProxyStatusView"
         />
+
+    <!-- Support Talisker's /_status/check URL. -->
+    <browser:page
+        for="lp.services.webapp.interfaces.ILaunchpadRoot"
+        name="_status"
+        permission="zope.Public"
+        class="lp.services.webapp.status.StatusView"
+        />
+    <class class="lp.services.webapp.status.StatusCheckView">
+        <allow attributes="__call__" />
+    </class>
 </configure>
diff --git a/lib/lp/services/webapp/status.py b/lib/lp/services/webapp/status.py
new file mode 100644
index 0000000..684dd69
--- /dev/null
+++ b/lib/lp/services/webapp/status.py
@@ -0,0 +1,44 @@
+# Copyright 2021 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Health check view for Talisker."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+__all__ = [
+    "StatusCheckView",
+    ]
+
+from zope.publisher.interfaces import NotFound
+
+
+class StatusView:
+
+    def __init__(self, context, request):
+        self.context = context
+
+    def browserDefault(self, request):
+        return self, ()
+
+    def publishTraverse(self, request, name):
+        if name == "check":
+            return StatusCheckView(self.context, request)
+        else:
+            raise NotFound(self.context, name)
+
+    def __call__(self):
+        raise NotFound(self.context, self.__name__)
+
+
+class StatusCheckView:
+    """/_status/check view for use by Talisker.
+
+    This currently just checks that we have a working database connection.
+    """
+
+    def __init__(self, context, request):
+        pass
+
+    def __call__(self):
+        return b""
diff --git a/lib/lp/services/webapp/tests/test_status.py b/lib/lp/services/webapp/tests/test_status.py
new file mode 100644
index 0000000..2e22da5
--- /dev/null
+++ b/lib/lp/services/webapp/tests/test_status.py
@@ -0,0 +1,51 @@
+# Copyright 2021 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Test the health check view for Talisker."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+
+from fixtures import FakeLogger
+from zope.publisher.interfaces.http import IHTTPRequest
+
+from lp.services.database.interfaces import IDatabasePolicy
+from lp.services.database.policy import DatabaseBlockedPolicy
+from lp.testing import TestCase
+from lp.testing.fixture import ZopeAdapterFixture
+from lp.testing.layers import DatabaseFunctionalLayer
+from lp.testing.pages import http
+
+
+class TestStatusView(TestCase):
+
+    layer = DatabaseFunctionalLayer
+
+    # The successful case of traversal is tested in TestStatusCheckView.
+    def test_traverse_not_found(self):
+        self.useFixture(FakeLogger())
+        response = http("GET /_status/nonexistent HTTP/1.0")
+        self.assertEqual(404, response.getStatus())
+
+    def test_call_not_found(self):
+        self.useFixture(FakeLogger())
+        response = http("GET /_status HTTP/1.0")
+        self.assertEqual(404, response.getStatus())
+
+
+class TestStatusCheckView(TestCase):
+
+    layer = DatabaseFunctionalLayer
+
+    def test_ok(self):
+        response = http("GET /_status/check HTTP/1.0")
+        self.assertEqual(200, response.getStatus())
+        self.assertEqual(b"", response.getBody())
+
+    def test_no_database(self):
+        policy = DatabaseBlockedPolicy()
+        self.useFixture(ZopeAdapterFixture(
+            policy, (IHTTPRequest,), IDatabasePolicy))
+        response = http("GET /_status/check HTTP/1.0")
+        self.assertEqual(500, response.getStatus())
diff --git a/lib/lp/testing/fixture.py b/lib/lp/testing/fixture.py
index 0c1ff7a..5ad7317 100644
--- a/lib/lp/testing/fixture.py
+++ b/lib/lp/testing/fixture.py
@@ -33,7 +33,11 @@ from zope.component import (
     adapter,
     getGlobalSiteManager,
     )
-from zope.interface import Interface
+from zope.interface import (
+    implementedBy,
+    Interface,
+    )
+from zope.interface.interfaces import ISpecification
 from zope.publisher.interfaces.browser import IDefaultBrowserLayer
 from zope.security.checker import (
     defineChecker,
@@ -144,16 +148,34 @@ class PGBouncerFixture(pgbouncer.fixture.PGBouncerFixture):
 class ZopeAdapterFixture(Fixture):
     """A fixture to register and unregister an adapter."""
 
-    def __init__(self, *args, **kwargs):
-        self._args, self._kwargs = args, kwargs
+    def __init__(self, factory, required=None, provided=None, name=u"",
+                 info=u"", event=True):
+        # We use some private functions from here since we need them to work
+        # out how to query for existing adapters.  We could copy and paste
+        # the code instead, but it doesn't seem worth it.
+        from zope.interface import registry
+
+        self._factory = factory
+        self._required = registry._getAdapterRequired(factory, required)
+        if provided is None:
+            provided = registry._getAdapterProvided(factory)
+        self._provided = provided
+        self._name = name
+        self._info = info
+        self._event = event
 
     def _setUp(self):
         site_manager = getGlobalSiteManager()
+        original = site_manager.adapters.lookup(
+            self._required, self._provided, self._name)
         site_manager.registerAdapter(
-            *self._args, **self._kwargs)
+            self._factory, required=self._required, provided=self._provided,
+            name=self._name, info=self._info, event=self._event)
+        # Equivalent to unregisterAdapter if original is None.
         self.addCleanup(
-            site_manager.unregisterAdapter,
-            *self._args, **self._kwargs)
+            site_manager.registerAdapter,
+            original, required=self._required, provided=self._provided,
+            name=self._name, info=self._info, event=self._event)
 
 
 class ZopeEventHandlerFixture(Fixture):
diff --git a/lib/lp/testing/tests/test_fixture.py b/lib/lp/testing/tests/test_fixture.py
index bbcea67..d47d957 100644
--- a/lib/lp/testing/tests/test_fixture.py
+++ b/lib/lp/testing/tests/test_fixture.py
@@ -95,6 +95,33 @@ class TestZopeAdapterFixture(TestCase):
         # The adapter is no longer registered.
         self.assertIs(None, queryAdapter(context, IBar))
 
+    def test_restores_previous_adapter(self):
+        # If there was a previous adapter, ZopeAdapterFixture restores it on
+        # cleanup.
+        @adapter(IFoo)
+        @implementer(IBar)
+        class OriginalFooToBar:
+
+            def __init__(self, foo):
+                self.foo = foo
+
+        sm = getGlobalSiteManager()
+        sm.registerAdapter(OriginalFooToBar, (IFoo,), IBar)
+        try:
+            context = Foo()
+            bar_adapter = queryAdapter(context, IBar)
+            self.assertIsNot(None, bar_adapter)
+            self.assertIsInstance(bar_adapter, OriginalFooToBar)
+            with ZopeAdapterFixture(FooToBar):
+                bar_adapter = queryAdapter(context, IBar)
+                self.assertIsNot(None, bar_adapter)
+                self.assertIsInstance(bar_adapter, FooToBar)
+            bar_adapter = queryAdapter(context, IBar)
+            self.assertIsNot(None, bar_adapter)
+            self.assertIsInstance(bar_adapter, OriginalFooToBar)
+        finally:
+            sm.unregisterAdapter(OriginalFooToBar, (IFoo,), IBar)
+
 
 @implementer(IMailDelivery)
 class DummyMailer(object):