launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #28012
Re: [Merge] ~lgp171188/launchpad:enforce-stricter-permission-check-when-a-bug-is-locked-down into launchpad:master
Diff comments:
> diff --git a/database/sampledata/current-dev.sql b/database/sampledata/current-dev.sql
Removed the sampledata diff in the next revision.
> index 7af2686..4883fb8 100644
> diff --git a/lib/lp/bugs/adapters/bugchange.py b/lib/lp/bugs/adapters/bugchange.py
> index ba0e99a..6b15f36 100644
> --- a/lib/lp/bugs/adapters/bugchange.py
> +++ b/lib/lp/bugs/adapters/bugchange.py
> @@ -27,6 +27,8 @@ __all__ = [
> 'BugTaskStatusChange',
> 'BugTaskTargetChange',
> 'BugTitleChange',
> + 'BugLocked',
Fixed in the next revision.
> + 'BugUnlocked',
> 'BugWatchAdded',
> 'BugWatchRemoved',
> 'CHANGED_DUPLICATE_MARKER',
> @@ -735,6 +740,58 @@ class CveUnlinkedFromBug(BugChangeBase):
> return {'text': "** CVE removed: %s" % self.cve.url}
>
>
> +class BugLocked(BugChangeBase):
> + """Used to represent the locking of a bug."""
> +
> + def __init__(self, when, person, new_status=BugLockStatus.COMMENT_ONLY,
> + lock_reason=None, **kwargs):
> + super().__init__(when, person)
> + self.new_status = new_status
> + self.lock_reason = lock_reason
> +
> + def getBugActivity(self):
> + """See `IBugChange'."""
> + activity = dict(
> + whatchanged='bug lock status',
Fixed in the next revision.
> + oldvalue=str(BugLockStatus.UNLOCKED),
Done in the next revision.
> + newvalue=str(self.new_status),
> + )
> + if self.lock_reason:
> + activity['message'] = self.lock_reason
> +
> + return activity
> +
> + def getBugNotification(self):
> + """See `IBugChange`."""
> + return {
> + 'text': (
> + "** Bug locked\n"
> + " Reason: {}"
I have added a separate bug change adapter for setting 'lock_status' and have decoupled it from the 'BugLocked' adapter in the next revision.
> + ).format(self.lock_reason)
> + }
> +
> +class BugUnlocked(BugChangeBase):
> + """Used to represent the unlocking of a bug."""
> +
> + def __init__(self, when, person, old_status, **kwargs):
> + super().__init__(when, person)
> + self.old_status = old_status
> +
> + def getBugActivity(self):
> + """See `IBugChange'."""
> + return dict(
> + whatchanged='bug lock status',
Fixed in the next revision.
> + newvalue=str(BugLockStatus.UNLOCKED),
> + oldvalue=str(self.old_status),
> + )
> +
> + def getBugNotification(self):
> + """See `IBugChange`."""
> + return {
> + 'text': "** Bug unlocked"
> + }
I am reconsidering this approach because it doesn't work well when a user changes the lock reason for an already locked bug. I will add a separate bugchange adapter to be used when the lock reason changes and also allow direct assignment to 'lock_reason' via the web service API for this scenario.
So if the lock reason was not set when locking the bug with 'lock()', it can then be set by direct assigning a value to 'lock_reason' via the API.
These changes will be there in the next revision.
> +
> +
> class BugTaskAttributeChange(AttributeChange):
> """Used to represent a change in a BugTask's attributes.
>
> diff --git a/lib/lp/bugs/interfaces/bug.py b/lib/lp/bugs/interfaces/bug.py
> index db94b63..5250211 100644
> --- a/lib/lp/bugs/interfaces/bug.py
> +++ b/lib/lp/bugs/interfaces/bug.py
> @@ -978,9 +996,42 @@ class IBugAppend(Interface):
> Return None if no bugtask was edited.
> """
>
> +class IBugModerate(Interface):
> + """IBug attributes that require the launchpad.Moderate permission."""
> +
> + @operation_parameters(
> + status=copy_field(
> + IBugView['lock_status'],
Done in the next revision.
> + default=BugLockStatus.COMMENT_ONLY
I have updated this in the next revision to remove the default and always require a value.
> + ),
> + reason=copy_field(IBugView['lock_reason'])
> + )
> + @export_write_operation()
> + @call_with(who=REQUEST_USER)
> + @operation_for_version("devel")
> + def lock(who, status=BugLockStatus.COMMENT_ONLY, reason=None):
> + """
> + Lock the bug metadata edits to the relevant roles.
> +
> + :param status: The lock status of the bug - one of the values
> + in the BugLockStatus enum.
> + :param reason: The reason for locking this bug.
> + :param who: The IPerson who is making the change.
> + """
> +
> + @export_write_operation()
> + @call_with(who=REQUEST_USER)
> + @operation_for_version("devel")
> + def unlock(who):
> + """
> + Unlock the bug metadata edits to the default roles.
> +
> + :param who: The IPerson who is making the change.
> + """
> +
>
> @exported_as_webservice_entry()
> -class IBug(IBugPublic, IBugView, IBugAppend, IHasLinkedBranches):
> +class IBug(IBugPublic, IBugView, IBugAppend, IBugModerate, IHasLinkedBranches):
> """The core bug entry."""
>
> linked_bugbranches = exported(
> diff --git a/lib/lp/bugs/model/bug.py b/lib/lp/bugs/model/bug.py
> index f7c588c..04197bf 100644
> --- a/lib/lp/bugs/model/bug.py
> +++ b/lib/lp/bugs/model/bug.py
> @@ -2198,6 +2206,38 @@ class Bug(SQLBase, InformationTypeMixin):
> BugActivity.datechanged <= end_date)
> return activity_in_range
>
> + def lock(self, who, status=BugLockStatus.COMMENT_ONLY, reason=None):
> + """See `IBug`."""
> + if status != BugLockStatus.UNLOCKED and status != self.lock_status:
This is now restricted to only the locked states by the vocabulary specified in the next revision. I have also made the code changes to allow setting the lock_reason using direct assignment via the web service API.
> + self.lock_status = status
> + if reason:
> + self.lock_reason = reason
> +
> + Store.of(self).flush()
I wasn't sure when the changes to 'self' would get flushed and that is why I added this. I will check if things work okay without this and will remove it if there are no issues.
> + self.addChange(
> + BugLocked(
> + when=UTC_NOW,
> + person=who,
> + status=status,
> + lock_reason=reason
> + )
> + )
> +
> + def unlock(self, who):
> + """See `IBug`."""
> + if self.lock_status != BugLockStatus.UNLOCKED:
> + old_lock_status = self.lock_status
> + self.lock_status = BugLockStatus.UNLOCKED
> + self.lock_reason = None
> + Store.of(self).flush()
> + self.addChange(
> + BugUnlocked(
> + when=UTC_NOW,
> + person=who,
> + old_status=old_lock_status
> + )
> + )
> +
>
> @ProxyFactory
> def get_also_notified_subscribers(
> diff --git a/lib/lp/bugs/security.py b/lib/lp/bugs/security.py
> index b60b426..bdef636 100644
> --- a/lib/lp/bugs/security.py
> +++ b/lib/lp/bugs/security.py
> @@ -173,18 +189,40 @@ class EditBug(AuthorizationBase):
> # If the user seems generally legitimate, let them through.
> self.forwardCheckAuthenticated(
> user, permission='launchpad.AnyLegitimatePerson') or
> - # The bug reporter can always edit their own bug.
> - user.inTeam(self.obj.owner) or
> - # Users with relevant roles can edit the bug.
> - user.in_admin or user.in_commercial_admin or
> - user.in_registry_experts or
> - _has_any_bug_role(user, self.obj.bugtasks))
> + in_allowed_roles())
>
> def checkUnauthenticated(self):
> """Never allow unauthenticated users to edit a bug."""
> return False
>
>
> +class ModerateBug(AuthorizationBase):
> + """Security adapter for moderating bugs.
> +
> + This is used for operations like locking and unlocking a bug to the
> + relevant roles.
> + """
> + permission = 'launchpad.Moderate'
> + usedfor = IBug
> +
> + def checkAuthenticated(self, user):
> + if not self.forwardCheckAuthenticated(
> + user, permission='launchpad.Append'):
> + # The user cannot even see the bug.
> + return False
> +
> + return (
> + user.in_admin or
> + user.in_commercial_admin or
> + user.in_registry_experts or
> + _has_any_bug_role(user, self.obj.bugtasks)
> + )
> +
> + def checkUnauthenticated(self):
> + """Never allow unauthenticated users to moderate a bug."""
> + return False
Removed in the next revision.
> +
> +
> class PublicToAllOrPrivateToExplicitSubscribersForBug(AuthorizationBase):
> permission = 'launchpad.View'
> usedfor = IBug
> diff --git a/lib/lp/bugs/tests/test_bug.py b/lib/lp/bugs/tests/test_bug.py
> index 0c1e24b..bc28835 100644
> --- a/lib/lp/bugs/tests/test_bug.py
> +++ b/lib/lp/bugs/tests/test_bug.py
> @@ -410,3 +419,305 @@ class TestBugPermissions(TestCaseWithFactory, KarmaTestMixin):
> person)
> with person_logged_in(person):
> self.assertTrue(checkPermission('launchpad.Edit', self.bug))
> +
> +
> +class TestBugLocking(TestCaseWithFactory):
> + """
> + Tests for the bug locking functionality.
> + """
> +
> + layer = DatabaseFunctionalLayer
> +
> + def setUp(self):
> + super().setUp()
> + self.person = self.factory.makePerson()
> + self.target = self.factory.makeProduct()
> +
> + def test_bug_lock_status_lock_reason_default_values(self):
> + bug = self.factory.makeBug(owner=self.person, target=self.target)
> + self.assertEqual(bug.lock_status, BugLockStatus.UNLOCKED)
Fixed in the next revision.
> + self.assertIsNone(bug.lock_reason)
> +
> + def test_bug_lock_status_invalid_values(self):
> + bug = self.factory.makeBug(owner=self.person, target=self.target)
> + invalid_values_list = ["test", 1.23, 123, self.person]
> + with person_logged_in(self.target.owner):
> + for invalid_value in invalid_values_list:
> + # Direct assignment
> + with ExpectedException(TypeError):
> + removeSecurityProxy(bug).lock_status = invalid_value
> +
> + # Via the lock() method
> + with ExpectedException(TypeError):
> + bug.lock(
> + who=removeSecurityProxy(
> + bug.default_bugtask.target
> + ).owner,
> + status=invalid_value)
> +
> + def test_bug_locking_when_bug_already_locked(self):
> + bug = self.factory.makeBug(owner=self.person, target=self.target)
> + with person_logged_in(self.target.owner):
> + bug.lock(who=self.target.owner)
> + self.assertEqual(bug.lock_status, BugLockStatus.COMMENT_ONLY)
> + bug.lock(who=self.target.owner)
> + self.assertEqual(bug.lock_status, BugLockStatus.COMMENT_ONLY)
> +
> + def test_bug_locking_with_status_set_to_unlocked_does_nothing(self):
> + bug = self.factory.makeBug(owner=self.person, target=self.target)
> + with person_logged_in(self.target.owner):
> + bug.lock(who=self.target.owner, status=BugLockStatus.UNLOCKED)
> + self.assertEqual(bug.lock_status, BugLockStatus.UNLOCKED)
> + bug.lock(who=self.target.owner)
> + self.assertEqual(bug.lock_status, BugLockStatus.COMMENT_ONLY)
> + bug.lock(who=self.target.owner, status=BugLockStatus.UNLOCKED)
> + self.assertEqual(bug.lock_status, BugLockStatus.COMMENT_ONLY)
> +
> + def test_bug_locking_with_a_reason_works(self):
> + bug = self.factory.makeBug(owner=self.person, target=self.target)
> + with person_logged_in(self.target.owner):
> + bug.lock(who=self.person, reason='too hot')
> + self.assertEqual(bug.lock_reason, 'too hot')
> +
> + def test_bug_unlocking_clears_the_reason(self):
> + bug = self.factory.makeBug(owner=self.person, target=self.target)
> + with person_logged_in(self.target.owner):
> + bug.lock(who=self.person, reason='too hot')
> + bug.unlock(who=self.person)
> + self.assertIsNone(bug.lock_reason)
> +
> + def test_bug_locking_unlocking_adds_bug_activity_entries(self):
> + bug = self.factory.makeBug(owner=self.person, target=self.target)
> + self.assertEqual(bug.activity.count(), 1)
> + with person_logged_in(self.target.owner):
> + bug.lock(who=self.target.owner, reason='too hot')
> + self.assertEqual(bug.activity.count(), 2)
> + self.assertThat(
> + bug.activity[1],
> + MatchesStructure.byEquality(
> + person=self.target.owner,
> + whatchanged='bug lock status',
> + oldvalue=str(BugLockStatus.UNLOCKED),
> + newvalue=str(BugLockStatus.COMMENT_ONLY),
> + )
> + )
> + bug.unlock(who=self.target.owner)
> + self.assertEqual(bug.activity.count(), 3)
> + self.assertThat(
> + bug.activity[2],
> + MatchesStructure.byEquality(
> + person=self.target.owner,
> + whatchanged='bug lock status',
> + oldvalue=str(BugLockStatus.COMMENT_ONLY),
> + newvalue=str(BugLockStatus.UNLOCKED),
> + )
> + )
> +
> + def test_edit_permission_restrictions_when_a_bug_is_locked(self):
> + bug = self.factory.makeBug(owner=self.person, target=self.target)
> + another_person = self.factory.makePerson()
> +
> + with person_logged_in(self.target.owner):
> + bug.lock(who=self.target.owner)
> +
> + self.assertEqual(bug.lock_status, BugLockStatus.COMMENT_ONLY)
> +
> + # A user without the relevant role cannot edit a locked bug.
> + with person_logged_in(another_person):
> + self.assertFalse(checkPermission('launchpad.Edit', bug))
> +
> + # The bug reporter can edit a locked bug.
> + with person_logged_in(self.person):
> + self.assertTrue(checkPermission('launchpad.Edit', bug))
> +
> + # Target driver can can edit a locked bug.
> + new_person = self.factory.makePerson()
> + removeSecurityProxy(bug.default_bugtask.target).driver = new_person
> + with person_logged_in(new_person):
> + self.assertTrue(checkPermission('launchpad.Edit', bug))
> +
> + # Admins can edit a locked bug.
> + with admin_logged_in():
> + self.assertTrue(checkPermission('launchpad.Edit', bug))
> +
> + # Commercial admins can edit a locked bug.
> + with celebrity_logged_in('commercial_admin'):
> + self.assertTrue(checkPermission('launchpad.Edit', bug))
> +
> + # Registry experts can edit a locked bug.
> + with celebrity_logged_in('registry_experts'):
> + self.assertTrue(checkPermission('launchpad.Edit', bug))
> +
> +
> + def test_only_those_with_moderate_permission_can_lock_unlock_a_bug(self):
> + bug = self.factory.makeBug(owner=self.person, target=self.target)
> + another_person = self.factory.makePerson()
> +
> + # Unauthenticated person cannot moderate a bug.
> + self.assertFalse(checkPermission('launchpad.Moderate', bug))
> +
> + # A user without the relevant role cannot moderate a bug.
> + with person_logged_in(another_person):
> + self.assertFalse(checkPermission('launchpad.Moderate', bug))
> +
> + # The bug reporter cannot moderate a bug.
> + with person_logged_in(self.person):
> + self.assertFalse(checkPermission('launchpad.Moderate', bug))
> +
> + # Admins can moderate a bug.
> + with admin_logged_in():
> + self.assertTrue(checkPermission('launchpad.Moderate', bug))
> +
> + # Commercial admins can moderate a bug.
> + with celebrity_logged_in('commercial_admin'):
> + self.assertTrue(checkPermission('launchpad.Moderate', bug))
> +
> + # Registry experts can moderate a bug.
> + with celebrity_logged_in('registry_experts'):
> + self.assertTrue(checkPermission('launchpad.Moderate', bug))
> +
> + # Target owner can moderate a bug.
> + with person_logged_in(
> + removeSecurityProxy(bug.default_bugtask.target).owner
> + ):
> + self.assertTrue(checkPermission('launchpad.Moderate', bug))
> +
> + # Target driver can moderate a bug.
> + new_person = self.factory.makePerson()
> + removeSecurityProxy(bug.default_bugtask.target).driver = new_person
> + with person_logged_in(new_person):
> + self.assertTrue(checkPermission('launchpad.Moderate', bug))
> +
> + yet_another_person = self.factory.makePerson()
> + removeSecurityProxy(
> + bug.default_bugtask.target
> + ).bug_supervisor = yet_another_person
> + with person_logged_in(yet_another_person):
> + self.assertTrue(checkPermission('launchpad.Moderate', bug))
> +
> +
> +class TestBugLockingWebService(TestCaseWithFactory):
> + """Tests for the bug locking and unlocking web service methods."""
> +
> + layer = DatabaseFunctionalLayer
> +
> + def setUp(self):
> + super().setUp()
> + self.person = self.factory.makePerson()
> + self.target = self.factory.makeProduct()
> +
> + def test_who_value_for_lock_is_correctly_set(self):
> + bug = self.factory.makeBug(owner=self.person, target=self.target)
> + self.assertEqual(bug.activity.count(), 1)
> + bug_url = api_url(bug)
> + webservice = webservice_for_person(
> + self.target.owner,
> + permission=OAuthPermission.WRITE_PRIVATE
> + )
> + webservice.default_api_version = "devel"
> +
> + response = webservice.named_post(
> + bug_url, 'lock'
> + )
> + with person_logged_in(ANONYMOUS):
> + self.assertEqual(bug.activity.count(), 2)
> + self.assertThat(
> + bug.activity[1],
> + MatchesStructure.byEquality(
> + person=self.target.owner,
> + oldvalue=str(BugLockStatus.UNLOCKED),
> + newvalue=str(BugLockStatus.COMMENT_ONLY)
> + )
> + )
> +
> + response = webservice.named_post(
> + bug_url, 'unlock'
> + )
> + self.assertEqual(200, response.status)
> + with person_logged_in(ANONYMOUS):
> + self.assertEqual(bug.activity.count(), 3)
> + self.assertThat(
> + bug.activity[2],
> + MatchesStructure.byEquality(
> + person=self.target.owner,
> + whatchanged='bug lock status',
> + oldvalue=str(BugLockStatus.COMMENT_ONLY),
> + newvalue=str(BugLockStatus.UNLOCKED),
> + )
> + )
> +
> + def test_who_value_for_unlock_is_correctly_set(self):
> + bug = self.factory.makeBug(owner=self.person, target=self.target)
> + self.assertEqual(bug.activity.count(), 1)
> + with person_logged_in(self.target.owner):
> + bug.lock(who=self.target.owner)
> + self.assertEqual(bug.activity.count(), 2)
> + bug_url = api_url(bug)
> + webservice = webservice_for_person(
> + self.target.owner,
> + permission=OAuthPermission.WRITE_PRIVATE
> + )
> + webservice.default_api_version = "devel"
> +
> + response = webservice.named_post(
> + bug_url, 'unlock'
> + )
> + self.assertEqual(200, response.status)
> + with person_logged_in(ANONYMOUS):
> + self.assertEqual(bug.activity.count(), 3)
> + self.assertThat(
> + bug.activity[2],
> + MatchesStructure.byEquality(
> + person=self.target.owner,
> + oldvalue=str(BugLockStatus.COMMENT_ONLY),
> + newvalue=str(BugLockStatus.UNLOCKED),
> + )
> + )
> +
> + def test_lock_status_lock_reason_values_unlocked_bug(self):
> + bug = self.factory.makeBug(owner=self.person, target=self.target)
> + bug_url = api_url(bug)
> +
> + webservice = webservice_for_person(
> + self.target.owner,
> + permission=OAuthPermission.WRITE_PRIVATE
> + )
> + webservice.default_api_version = 'devel'
> +
> + response = webservice.get(bug_url)
> + self.assertEqual(200, response.status)
> + response_json = response.jsonBody()
> + self.assertEqual(
> + response_json['lock_status'],
> + str(BugLockStatus.UNLOCKED)
> + )
> + self.assertIsNone(response_json['lock_reason'])
> +
> + def test_lock_status_lock_reason_values_after_locking(self):
> + bug = self.factory.makeBug(owner=self.person, target=self.target)
> + bug_url = api_url(bug)
> +
> + with person_logged_in(self.target.owner):
> + bug.lock(
> + who=self.target.owner,
> + status=BugLockStatus.COMMENT_ONLY,
> + reason='too hot'
> + )
> +
> + webservice = webservice_for_person(
> + self.target.owner,
> + permission=OAuthPermission.WRITE_PRIVATE
> + )
> + webservice.default_api_version = 'devel'
> +
> + response = webservice.get(bug_url)
> + self.assertEqual(200, response.status)
> + response_json = response.jsonBody()
> + self.assertEqual(
> + response_json['lock_status'],
> + str(BugLockStatus.COMMENT_ONLY)
> + )
> + self.assertEqual(
> + response_json['lock_reason'],
> + 'too hot'
> + )
> diff --git a/lib/lp/scripts/garbo.py b/lib/lp/scripts/garbo.py
> index 5eabeb6..b91acda 100644
> --- a/lib/lp/scripts/garbo.py
> +++ b/lib/lp/scripts/garbo.py
> @@ -2022,6 +2050,7 @@ class HourlyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
> GitRepositoryPruner,
> RevisionCachePruner,
> UnusedSessionPruner,
> + PopulateBugLockStatusDefaultUnlocked,
Fixed in the next revision.
> ]
> experimental_tunable_loops = []
>
--
https://code.launchpad.net/~lgp171188/launchpad/+git/launchpad/+merge/414097
Your team Launchpad code reviewers is requested to review the proposed merge of ~lgp171188/launchpad:enforce-stricter-permission-check-when-a-bug-is-locked-down into launchpad:master.
References