← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~lgp171188/launchpad:enforce-stricter-permission-check-when-a-bug-is-locked-down into launchpad:master

 

Guruprasad has proposed merging ~lgp171188/launchpad:enforce-stricter-permission-check-when-a-bug-is-locked-down into launchpad:master.

Commit message:
* Add lock_status, lock_reason fields and lock, unlock methods to Bug
* Add a ModerateBug security adapter for IBugModerate

  This adapter uses the 'launchpad.Moderate' permission to allow users
  to lock and unlock a bug.
* Create bug change entries when locking and unlocking a bug
* Allow only the relevant roles to edit a locked bug
* Add unit tests for the previous changes related to bug (un)locking

Requested reviews:
  Colin Watson (cjwatson)
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~lgp171188/launchpad/+git/launchpad/+merge/414097
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~lgp171188/launchpad:enforce-stricter-permission-check-when-a-bug-is-locked-down into launchpad:master.
diff --git a/lib/lp/bugs/adapters/bugchange.py b/lib/lp/bugs/adapters/bugchange.py
index ba0e99a..6b15f36 100644
--- a/lib/lp/bugs/adapters/bugchange.py
+++ b/lib/lp/bugs/adapters/bugchange.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2016 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2022 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Implementations for bug changes."""
@@ -27,6 +27,8 @@ __all__ = [
     'BugTaskStatusChange',
     'BugTaskTargetChange',
     'BugTitleChange',
+    'BugLocked',
+    'BugUnlocked',
     'BugWatchAdded',
     'BugWatchRemoved',
     'CHANGED_DUPLICATE_MARKER',
@@ -52,7 +54,10 @@ from textwrap import dedent
 from zope.interface import implementer
 from zope.security.proxy import isinstance as zope_isinstance
 
-from lp.bugs.enums import BugNotificationLevel
+from lp.bugs.enums import (
+    BugLockStatus,
+    BugNotificationLevel,
+    )
 from lp.bugs.interfaces.bugchange import IBugChange
 from lp.bugs.interfaces.bugtask import (
     IBugTask,
@@ -735,6 +740,58 @@ class CveUnlinkedFromBug(BugChangeBase):
         return {'text': "** CVE removed: %s" % self.cve.url}
 
 
+class BugLocked(BugChangeBase):
+    """Used to represent the locking of a bug."""
+
+    def __init__(self, when, person, new_status=BugLockStatus.COMMENT_ONLY,
+                 lock_reason=None, **kwargs):
+        super().__init__(when, person)
+        self.new_status = new_status
+        self.lock_reason = lock_reason
+
+    def getBugActivity(self):
+        """See `IBugChange'."""
+        activity = dict(
+            whatchanged='bug lock status',
+            oldvalue=str(BugLockStatus.UNLOCKED),
+            newvalue=str(self.new_status),
+        )
+        if self.lock_reason:
+            activity['message'] = self.lock_reason
+
+        return activity
+
+    def getBugNotification(self):
+        """See `IBugChange`."""
+        return {
+            'text': (
+                "** Bug locked\n"
+                "   Reason: {}"
+            ).format(self.lock_reason)
+        }
+
+class BugUnlocked(BugChangeBase):
+    """Used to represent the unlocking of a bug."""
+
+    def __init__(self, when, person, old_status, **kwargs):
+        super().__init__(when, person)
+        self.old_status = old_status
+
+    def getBugActivity(self):
+        """See `IBugChange'."""
+        return dict(
+            whatchanged='bug lock status',
+            newvalue=str(BugLockStatus.UNLOCKED),
+            oldvalue=str(self.old_status),
+        )
+
+    def getBugNotification(self):
+        """See `IBugChange`."""
+        return {
+            'text': "** Bug unlocked"
+        }
+
+
 class BugTaskAttributeChange(AttributeChange):
     """Used to represent a change in a BugTask's attributes.
 
diff --git a/lib/lp/bugs/configure.zcml b/lib/lp/bugs/configure.zcml
index 5b46af7..a83dae0 100644
--- a/lib/lp/bugs/configure.zcml
+++ b/lib/lp/bugs/configure.zcml
@@ -1,4 +1,4 @@
-<!-- Copyright 2009-2021 Canonical Ltd.  This software is licensed under the
+<!-- Copyright 2009-2022 Canonical Ltd.  This software is licensed under the
      GNU Affero General Public License version 3 (see the file LICENSE).
 -->
 
@@ -740,6 +740,9 @@
                     tags
                     title
                     description"/>
+            <require
+                permission="launchpad.Moderate"
+                interface="lp.bugs.interfaces.bug.IBugModerate"/>
         </class>
         <adapter
             for="lp.bugs.interfaces.bug.IBug"
diff --git a/lib/lp/bugs/enums.py b/lib/lp/bugs/enums.py
index 6290763..5276c92 100644
--- a/lib/lp/bugs/enums.py
+++ b/lib/lp/bugs/enums.py
@@ -1,9 +1,10 @@
-# Copyright 2010-2012 Canonical Ltd.  This software is licensed under the
+# Copyright 2010-2022 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Enums for the Bugs app."""
 
 __all__ = [
+    'BugLockStatus',
     'BugNotificationLevel',
     'BugNotificationStatus',
     ]
@@ -74,3 +75,22 @@ class BugNotificationStatus(DBEnumeratedType):
         The notification is deferred.  The recipient list was not calculated
         at creation time but is done when processed.
         """)
+
+
+class BugLockStatus(DBEnumeratedType):
+    """
+    The lock status of a bug.
+    """
+
+    UNLOCKED = DBItem(0, """
+        Unlocked
+
+        The bug is unlocked and the usual permissions apply.
+        """)
+
+    COMMENT_ONLY = DBItem(1, """
+        Comment-only
+
+        The bug allows those without a relevant role to comment,
+        but not to edit metadata.
+        """)
diff --git a/lib/lp/bugs/interfaces/bug.py b/lib/lp/bugs/interfaces/bug.py
index db94b63..5250211 100644
--- a/lib/lp/bugs/interfaces/bug.py
+++ b/lib/lp/bugs/interfaces/bug.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2016 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2022 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Interfaces related to bugs."""
@@ -66,7 +66,10 @@ from lp.app.errors import NotFoundError
 from lp.app.interfaces.launchpad import IPrivacy
 from lp.app.validators.attachment import attachment_size_constraint
 from lp.app.validators.name import bug_name_validator
-from lp.bugs.enums import BugNotificationLevel
+from lp.bugs.enums import (
+    BugLockStatus,
+    BugNotificationLevel,
+    )
 from lp.bugs.interfaces.bugactivity import IBugActivity
 from lp.bugs.interfaces.bugattachment import IBugAttachment
 from lp.bugs.interfaces.bugbranch import IBugBranch
@@ -382,6 +385,21 @@ class IBugView(Interface):
             readonly=True,
             value_type=Reference(schema=IMessage)),
         exported_as='messages'))
+    lock_status = exported(
+        Choice(
+            title=_('Lock Status'), vocabulary=BugLockStatus,
+            default=BugLockStatus.UNLOCKED,
+            description=_("The lock status of this bug."),
+            readonly=True,
+            required=False),
+        as_of="devel")
+    lock_reason = exported(
+        Text(
+            title=_('Lock Reason'),
+            description=_('The reason for locking this bug.'),
+            readonly=True,
+            required=False),
+        as_of="devel")
 
     def getSpecifications(user):
         """List of related specifications that the user can view."""
@@ -978,9 +996,42 @@ class IBugAppend(Interface):
         Return None if no bugtask was edited.
         """
 
+class IBugModerate(Interface):
+    """IBug attributes that require the launchpad.Moderate permission."""
+
+    @operation_parameters(
+        status=copy_field(
+            IBugView['lock_status'],
+            default=BugLockStatus.COMMENT_ONLY
+        ),
+        reason=copy_field(IBugView['lock_reason'])
+    )
+    @export_write_operation()
+    @call_with(who=REQUEST_USER)
+    @operation_for_version("devel")
+    def lock(who, status=BugLockStatus.COMMENT_ONLY, reason=None):
+        """
+        Lock the bug metadata edits to the relevant roles.
+
+        :param status: The lock status of the bug - one of the values
+                       in the BugLockStatus enum.
+        :param reason: The reason for locking this bug.
+        :param who: The IPerson who is making the change.
+        """
+
+    @export_write_operation()
+    @call_with(who=REQUEST_USER)
+    @operation_for_version("devel")
+    def unlock(who):
+        """
+        Unlock the bug metadata edits to the default roles.
+
+        :param who: The IPerson who is making the change.
+        """
+
 
 @exported_as_webservice_entry()
-class IBug(IBugPublic, IBugView, IBugAppend, IHasLinkedBranches):
+class IBug(IBugPublic, IBugView, IBugAppend, IBugModerate, IHasLinkedBranches):
     """The core bug entry."""
 
     linked_bugbranches = exported(
diff --git a/lib/lp/bugs/model/bug.py b/lib/lp/bugs/model/bug.py
index f7c588c..04197bf 100644
--- a/lib/lp/bugs/model/bug.py
+++ b/lib/lp/bugs/model/bug.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2021 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2022 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Launchpad bug-related database table classes."""
@@ -95,12 +95,17 @@ from lp.bugs.adapters.bugchange import (
     BranchUnlinkedFromBug,
     BugConvertedToQuestion,
     BugDuplicateChange,
+    BugLocked,
+    BugUnlocked,
     BugWatchAdded,
     BugWatchRemoved,
     SeriesNominated,
     UnsubscribedFromBug,
     )
-from lp.bugs.enums import BugNotificationLevel
+from lp.bugs.enums import (
+    BugLockStatus,
+    BugNotificationLevel,
+    )
 from lp.bugs.errors import InvalidDuplicateValue
 from lp.bugs.interfaces.bug import (
     IBug,
@@ -387,6 +392,9 @@ class Bug(SQLBase, InformationTypeMixin):
     heat = IntCol(notNull=True, default=0)
     heat_last_updated = UtcDateTimeCol(default=None)
     latest_patch_uploaded = UtcDateTimeCol(default=None)
+    lock_status = DBEnum(
+        enum=BugLockStatus, allow_none=True, default=BugLockStatus.UNLOCKED)
+    lock_reason = StringCol(notNull=False, default=None)
 
     @property
     def linked_branches(self):
@@ -2198,6 +2206,38 @@ class Bug(SQLBase, InformationTypeMixin):
             BugActivity.datechanged <= end_date)
         return activity_in_range
 
+    def lock(self, who, status=BugLockStatus.COMMENT_ONLY, reason=None):
+        """See `IBug`."""
+        if status != BugLockStatus.UNLOCKED and status != self.lock_status:
+            self.lock_status = status
+            if reason:
+                self.lock_reason = reason
+
+            Store.of(self).flush()
+            self.addChange(
+                BugLocked(
+                    when=UTC_NOW,
+                    person=who,
+                    status=status,
+                    lock_reason=reason
+                )
+            )
+
+    def unlock(self, who):
+        """See `IBug`."""
+        if self.lock_status != BugLockStatus.UNLOCKED:
+            old_lock_status = self.lock_status
+            self.lock_status = BugLockStatus.UNLOCKED
+            self.lock_reason = None
+            Store.of(self).flush()
+            self.addChange(
+                BugUnlocked(
+                    when=UTC_NOW,
+                    person=who,
+                    old_status=old_lock_status
+                )
+            )
+
 
 @ProxyFactory
 def get_also_notified_subscribers(
diff --git a/lib/lp/bugs/security.py b/lib/lp/bugs/security.py
index b60b426..bdef636 100644
--- a/lib/lp/bugs/security.py
+++ b/lib/lp/bugs/security.py
@@ -1,4 +1,4 @@
-# Copyright 2010-2020 Canonical Ltd.  This software is licensed under the
+# Copyright 2010-2022 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Security adapters for the bugs module."""
@@ -10,6 +10,7 @@ from lp.app.security import (
     AuthorizationBase,
     DelegatedAuthorization,
     )
+from lp.bugs.enums import BugLockStatus
 from lp.bugs.interfaces.bug import IBug
 from lp.bugs.interfaces.bugactivity import IBugActivity
 from lp.bugs.interfaces.bugattachment import IBugAttachment
@@ -166,6 +167,21 @@ class EditBug(AuthorizationBase):
                 user, permission='launchpad.Append'):
             # The user cannot even see the bug.
             return False
+
+        def in_allowed_roles():
+            return (
+                # The bug reporter can always edit their own bug.
+                user.inTeam(self.obj.owner) or
+                # Users with relevant roles can edit the bug.
+                user.in_admin or
+                user.in_commercial_admin or
+                user.in_registry_experts or
+                _has_any_bug_role(user, self.obj.bugtasks)
+            )
+
+        if self.obj.lock_status == BugLockStatus.COMMENT_ONLY:
+            return in_allowed_roles()
+
         return (
             # If the bug is private, then we don't need more elaborate
             # checks as they must have been explicitly subscribed.
@@ -173,18 +189,40 @@ class EditBug(AuthorizationBase):
             # If the user seems generally legitimate, let them through.
             self.forwardCheckAuthenticated(
                 user, permission='launchpad.AnyLegitimatePerson') or
-            # The bug reporter can always edit their own bug.
-            user.inTeam(self.obj.owner) or
-            # Users with relevant roles can edit the bug.
-            user.in_admin or user.in_commercial_admin or
-            user.in_registry_experts or
-            _has_any_bug_role(user, self.obj.bugtasks))
+            in_allowed_roles())
 
     def checkUnauthenticated(self):
         """Never allow unauthenticated users to edit a bug."""
         return False
 
 
+class ModerateBug(AuthorizationBase):
+    """Security adapter for moderating bugs.
+
+    This is used for operations like locking and unlocking a bug to the
+    relevant roles.
+    """
+    permission = 'launchpad.Moderate'
+    usedfor = IBug
+
+    def checkAuthenticated(self, user):
+        if not self.forwardCheckAuthenticated(
+                user, permission='launchpad.Append'):
+            # The user cannot even see the bug.
+            return False
+
+        return (
+            user.in_admin or
+            user.in_commercial_admin or
+            user.in_registry_experts or
+            _has_any_bug_role(user, self.obj.bugtasks)
+        )
+
+    def checkUnauthenticated(self):
+        """Never allow unauthenticated users to moderate a bug."""
+        return False
+
+
 class PublicToAllOrPrivateToExplicitSubscribersForBug(AuthorizationBase):
     permission = 'launchpad.View'
     usedfor = IBug
diff --git a/lib/lp/bugs/tests/test_bug.py b/lib/lp/bugs/tests/test_bug.py
index 0c1e24b..bc28835 100644
--- a/lib/lp/bugs/tests/test_bug.py
+++ b/lib/lp/bugs/tests/test_bug.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2022 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Tests for lp.bugs.model.Bug."""
@@ -6,12 +6,17 @@
 from datetime import timedelta
 
 from lazr.lifecycle.snapshot import Snapshot
+from testtools import ExpectedException
+from testtools.matchers import MatchesStructure
 from zope.component import getUtility
 from zope.interface import providedBy
 from zope.security.management import checkPermission
 from zope.security.proxy import removeSecurityProxy
 
-from lp.bugs.enums import BugNotificationLevel
+from lp.bugs.enums import (
+    BugLockStatus,
+    BugNotificationLevel,
+    )
 from lp.bugs.interfaces.bug import (
     CreateBugParams,
     IBugSet,
@@ -25,14 +30,18 @@ from lp.bugs.interfaces.bugtask import (
     UserCannotEditBugTaskMilestone,
     )
 from lp.registry.tests.test_person import KarmaTestMixin
+from lp.services.webapp.interfaces import OAuthPermission
 from lp.testing import (
     admin_logged_in,
+    ANONYMOUS,
+    api_url,
     celebrity_logged_in,
     person_logged_in,
     StormStatementRecorder,
     TestCaseWithFactory,
     )
 from lp.testing.layers import DatabaseFunctionalLayer
+from lp.testing.pages import webservice_for_person
 
 
 class TestBug(TestCaseWithFactory):
@@ -410,3 +419,305 @@ class TestBugPermissions(TestCaseWithFactory, KarmaTestMixin):
             person)
         with person_logged_in(person):
             self.assertTrue(checkPermission('launchpad.Edit', self.bug))
+
+
+class TestBugLocking(TestCaseWithFactory):
+    """
+    Tests for the bug locking functionality.
+    """
+
+    layer = DatabaseFunctionalLayer
+
+    def setUp(self):
+        super().setUp()
+        self.person = self.factory.makePerson()
+        self.target = self.factory.makeProduct()
+
+    def test_bug_lock_status_lock_reason_default_values(self):
+        bug = self.factory.makeBug(owner=self.person, target=self.target)
+        self.assertEqual(bug.lock_status, BugLockStatus.UNLOCKED)
+        self.assertIsNone(bug.lock_reason)
+
+    def test_bug_lock_status_invalid_values(self):
+        bug = self.factory.makeBug(owner=self.person, target=self.target)
+        invalid_values_list = ["test", 1.23, 123, self.person]
+        with person_logged_in(self.target.owner):
+            for invalid_value in invalid_values_list:
+                # Direct assignment
+                with ExpectedException(TypeError):
+                    removeSecurityProxy(bug).lock_status = invalid_value
+
+                # Via the lock() method
+                with ExpectedException(TypeError):
+                    bug.lock(
+                        who=removeSecurityProxy(
+                            bug.default_bugtask.target
+                        ).owner,
+                        status=invalid_value)
+
+    def test_bug_locking_when_bug_already_locked(self):
+        bug = self.factory.makeBug(owner=self.person, target=self.target)
+        with person_logged_in(self.target.owner):
+            bug.lock(who=self.target.owner)
+            self.assertEqual(bug.lock_status, BugLockStatus.COMMENT_ONLY)
+            bug.lock(who=self.target.owner)
+            self.assertEqual(bug.lock_status, BugLockStatus.COMMENT_ONLY)
+
+    def test_bug_locking_with_status_set_to_unlocked_does_nothing(self):
+        bug = self.factory.makeBug(owner=self.person, target=self.target)
+        with person_logged_in(self.target.owner):
+            bug.lock(who=self.target.owner, status=BugLockStatus.UNLOCKED)
+            self.assertEqual(bug.lock_status, BugLockStatus.UNLOCKED)
+            bug.lock(who=self.target.owner)
+            self.assertEqual(bug.lock_status, BugLockStatus.COMMENT_ONLY)
+            bug.lock(who=self.target.owner, status=BugLockStatus.UNLOCKED)
+            self.assertEqual(bug.lock_status, BugLockStatus.COMMENT_ONLY)
+
+    def test_bug_locking_with_a_reason_works(self):
+        bug = self.factory.makeBug(owner=self.person, target=self.target)
+        with person_logged_in(self.target.owner):
+            bug.lock(who=self.person, reason='too hot')
+            self.assertEqual(bug.lock_reason, 'too hot')
+
+    def test_bug_unlocking_clears_the_reason(self):
+        bug = self.factory.makeBug(owner=self.person, target=self.target)
+        with person_logged_in(self.target.owner):
+            bug.lock(who=self.person, reason='too hot')
+            bug.unlock(who=self.person)
+            self.assertIsNone(bug.lock_reason)
+
+    def test_bug_locking_unlocking_adds_bug_activity_entries(self):
+        bug = self.factory.makeBug(owner=self.person, target=self.target)
+        self.assertEqual(bug.activity.count(), 1)
+        with person_logged_in(self.target.owner):
+            bug.lock(who=self.target.owner, reason='too hot')
+            self.assertEqual(bug.activity.count(), 2)
+            self.assertThat(
+                bug.activity[1],
+                MatchesStructure.byEquality(
+                    person=self.target.owner,
+                    whatchanged='bug lock status',
+                    oldvalue=str(BugLockStatus.UNLOCKED),
+                    newvalue=str(BugLockStatus.COMMENT_ONLY),
+                )
+            )
+            bug.unlock(who=self.target.owner)
+            self.assertEqual(bug.activity.count(), 3)
+            self.assertThat(
+                bug.activity[2],
+                MatchesStructure.byEquality(
+                    person=self.target.owner,
+                    whatchanged='bug lock status',
+                    oldvalue=str(BugLockStatus.COMMENT_ONLY),
+                    newvalue=str(BugLockStatus.UNLOCKED),
+                )
+            )
+
+    def test_edit_permission_restrictions_when_a_bug_is_locked(self):
+        bug = self.factory.makeBug(owner=self.person, target=self.target)
+        another_person = self.factory.makePerson()
+
+        with person_logged_in(self.target.owner):
+            bug.lock(who=self.target.owner)
+
+        self.assertEqual(bug.lock_status, BugLockStatus.COMMENT_ONLY)
+
+        # A user without the relevant role cannot edit a locked bug.
+        with person_logged_in(another_person):
+            self.assertFalse(checkPermission('launchpad.Edit', bug))
+
+        # The bug reporter can edit a locked bug.
+        with person_logged_in(self.person):
+            self.assertTrue(checkPermission('launchpad.Edit', bug))
+
+        # Target driver can can edit a locked bug.
+        new_person = self.factory.makePerson()
+        removeSecurityProxy(bug.default_bugtask.target).driver = new_person
+        with person_logged_in(new_person):
+            self.assertTrue(checkPermission('launchpad.Edit', bug))
+
+        # Admins can edit a locked bug.
+        with admin_logged_in():
+            self.assertTrue(checkPermission('launchpad.Edit', bug))
+
+        # Commercial admins can edit a locked bug.
+        with celebrity_logged_in('commercial_admin'):
+            self.assertTrue(checkPermission('launchpad.Edit', bug))
+
+        # Registry experts can edit a locked bug.
+        with celebrity_logged_in('registry_experts'):
+            self.assertTrue(checkPermission('launchpad.Edit', bug))
+
+
+    def test_only_those_with_moderate_permission_can_lock_unlock_a_bug(self):
+        bug = self.factory.makeBug(owner=self.person, target=self.target)
+        another_person = self.factory.makePerson()
+
+        # Unauthenticated person cannot moderate a bug.
+        self.assertFalse(checkPermission('launchpad.Moderate', bug))
+
+        # A user without the relevant role cannot moderate a bug.
+        with person_logged_in(another_person):
+            self.assertFalse(checkPermission('launchpad.Moderate', bug))
+
+        # The bug reporter cannot moderate a bug.
+        with person_logged_in(self.person):
+            self.assertFalse(checkPermission('launchpad.Moderate', bug))
+
+        # Admins can moderate a bug.
+        with admin_logged_in():
+            self.assertTrue(checkPermission('launchpad.Moderate', bug))
+
+        # Commercial admins can moderate a bug.
+        with celebrity_logged_in('commercial_admin'):
+            self.assertTrue(checkPermission('launchpad.Moderate', bug))
+
+        # Registry experts can moderate a bug.
+        with celebrity_logged_in('registry_experts'):
+            self.assertTrue(checkPermission('launchpad.Moderate', bug))
+
+        # Target owner can moderate a bug.
+        with person_logged_in(
+                removeSecurityProxy(bug.default_bugtask.target).owner
+        ):
+            self.assertTrue(checkPermission('launchpad.Moderate', bug))
+
+        # Target driver can moderate a bug.
+        new_person = self.factory.makePerson()
+        removeSecurityProxy(bug.default_bugtask.target).driver = new_person
+        with person_logged_in(new_person):
+            self.assertTrue(checkPermission('launchpad.Moderate', bug))
+
+        yet_another_person = self.factory.makePerson()
+        removeSecurityProxy(
+            bug.default_bugtask.target
+        ).bug_supervisor = yet_another_person
+        with person_logged_in(yet_another_person):
+            self.assertTrue(checkPermission('launchpad.Moderate', bug))
+
+
+class TestBugLockingWebService(TestCaseWithFactory):
+    """Tests for the bug locking and unlocking web service methods."""
+
+    layer = DatabaseFunctionalLayer
+
+    def setUp(self):
+        super().setUp()
+        self.person = self.factory.makePerson()
+        self.target = self.factory.makeProduct()
+
+    def test_who_value_for_lock_is_correctly_set(self):
+        bug = self.factory.makeBug(owner=self.person, target=self.target)
+        self.assertEqual(bug.activity.count(), 1)
+        bug_url = api_url(bug)
+        webservice = webservice_for_person(
+            self.target.owner,
+            permission=OAuthPermission.WRITE_PRIVATE
+        )
+        webservice.default_api_version = "devel"
+
+        response = webservice.named_post(
+            bug_url, 'lock'
+        )
+        with person_logged_in(ANONYMOUS):
+            self.assertEqual(bug.activity.count(), 2)
+            self.assertThat(
+                bug.activity[1],
+                MatchesStructure.byEquality(
+                    person=self.target.owner,
+                    oldvalue=str(BugLockStatus.UNLOCKED),
+                    newvalue=str(BugLockStatus.COMMENT_ONLY)
+                )
+            )
+
+        response = webservice.named_post(
+            bug_url, 'unlock'
+        )
+        self.assertEqual(200, response.status)
+        with person_logged_in(ANONYMOUS):
+            self.assertEqual(bug.activity.count(), 3)
+            self.assertThat(
+                bug.activity[2],
+                MatchesStructure.byEquality(
+                    person=self.target.owner,
+                    whatchanged='bug lock status',
+                    oldvalue=str(BugLockStatus.COMMENT_ONLY),
+                    newvalue=str(BugLockStatus.UNLOCKED),
+                )
+            )
+
+    def test_who_value_for_unlock_is_correctly_set(self):
+        bug = self.factory.makeBug(owner=self.person, target=self.target)
+        self.assertEqual(bug.activity.count(), 1)
+        with person_logged_in(self.target.owner):
+            bug.lock(who=self.target.owner)
+        self.assertEqual(bug.activity.count(), 2)
+        bug_url = api_url(bug)
+        webservice = webservice_for_person(
+            self.target.owner,
+            permission=OAuthPermission.WRITE_PRIVATE
+        )
+        webservice.default_api_version = "devel"
+
+        response = webservice.named_post(
+            bug_url, 'unlock'
+        )
+        self.assertEqual(200, response.status)
+        with person_logged_in(ANONYMOUS):
+            self.assertEqual(bug.activity.count(), 3)
+            self.assertThat(
+                bug.activity[2],
+                MatchesStructure.byEquality(
+                    person=self.target.owner,
+                    oldvalue=str(BugLockStatus.COMMENT_ONLY),
+                    newvalue=str(BugLockStatus.UNLOCKED),
+                )
+            )
+
+    def test_lock_status_lock_reason_values_unlocked_bug(self):
+        bug = self.factory.makeBug(owner=self.person, target=self.target)
+        bug_url = api_url(bug)
+
+        webservice = webservice_for_person(
+            self.target.owner,
+            permission=OAuthPermission.WRITE_PRIVATE
+        )
+        webservice.default_api_version = 'devel'
+
+        response = webservice.get(bug_url)
+        self.assertEqual(200, response.status)
+        response_json = response.jsonBody()
+        self.assertEqual(
+            response_json['lock_status'],
+            str(BugLockStatus.UNLOCKED)
+        )
+        self.assertIsNone(response_json['lock_reason'])
+
+    def test_lock_status_lock_reason_values_after_locking(self):
+        bug = self.factory.makeBug(owner=self.person, target=self.target)
+        bug_url = api_url(bug)
+
+        with person_logged_in(self.target.owner):
+            bug.lock(
+                who=self.target.owner,
+                status=BugLockStatus.COMMENT_ONLY,
+                reason='too hot'
+            )
+
+        webservice = webservice_for_person(
+            self.target.owner,
+            permission=OAuthPermission.WRITE_PRIVATE
+        )
+        webservice.default_api_version = 'devel'
+
+        response = webservice.get(bug_url)
+        self.assertEqual(200, response.status)
+        response_json = response.jsonBody()
+        self.assertEqual(
+            response_json['lock_status'],
+            str(BugLockStatus.COMMENT_ONLY)
+        )
+        self.assertEqual(
+            response_json['lock_reason'],
+            'too hot'
+        )

Follow ups