← Back to team overview

launchpad-reviewers team mailing list archive

Re: [Merge] ~lgp171188/launchpad:enforce-stricter-permission-check-when-a-bug-is-locked-down into launchpad:master

 


Diff comments:

> diff --git a/database/sampledata/current-dev.sql b/database/sampledata/current-dev.sql

Removed the sampledata diff in the next revision.

> index 7af2686..4883fb8 100644
> diff --git a/lib/lp/bugs/adapters/bugchange.py b/lib/lp/bugs/adapters/bugchange.py
> index ba0e99a..6b15f36 100644
> --- a/lib/lp/bugs/adapters/bugchange.py
> +++ b/lib/lp/bugs/adapters/bugchange.py
> @@ -27,6 +27,8 @@ __all__ = [
>      'BugTaskStatusChange',
>      'BugTaskTargetChange',
>      'BugTitleChange',
> +    'BugLocked',

Fixed in the next revision.

> +    'BugUnlocked',
>      'BugWatchAdded',
>      'BugWatchRemoved',
>      'CHANGED_DUPLICATE_MARKER',
> @@ -735,6 +740,58 @@ class CveUnlinkedFromBug(BugChangeBase):
>          return {'text': "** CVE removed: %s" % self.cve.url}
>  
>  
> +class BugLocked(BugChangeBase):
> +    """Used to represent the locking of a bug."""
> +
> +    def __init__(self, when, person, new_status=BugLockStatus.COMMENT_ONLY,
> +                 lock_reason=None, **kwargs):
> +        super().__init__(when, person)
> +        self.new_status = new_status
> +        self.lock_reason = lock_reason
> +
> +    def getBugActivity(self):
> +        """See `IBugChange'."""
> +        activity = dict(
> +            whatchanged='bug lock status',

Fixed in the next revision.

> +            oldvalue=str(BugLockStatus.UNLOCKED),

Done in the next revision.

> +            newvalue=str(self.new_status),
> +        )
> +        if self.lock_reason:
> +            activity['message'] = self.lock_reason
> +
> +        return activity
> +
> +    def getBugNotification(self):
> +        """See `IBugChange`."""
> +        return {
> +            'text': (
> +                "** Bug locked\n"
> +                "   Reason: {}"

I have added a separate bug change adapter for setting 'lock_status' and have decoupled it from the 'BugLocked' adapter in the next revision.

> +            ).format(self.lock_reason)
> +        }
> +
> +class BugUnlocked(BugChangeBase):
> +    """Used to represent the unlocking of a bug."""
> +
> +    def __init__(self, when, person, old_status, **kwargs):
> +        super().__init__(when, person)
> +        self.old_status = old_status
> +
> +    def getBugActivity(self):
> +        """See `IBugChange'."""
> +        return dict(
> +            whatchanged='bug lock status',

Fixed in the next revision.

> +            newvalue=str(BugLockStatus.UNLOCKED),
> +            oldvalue=str(self.old_status),
> +        )
> +
> +    def getBugNotification(self):
> +        """See `IBugChange`."""
> +        return {
> +            'text': "** Bug unlocked"
> +        }

I am reconsidering this approach because it doesn't work well when a user changes the lock reason for an already locked bug. I will add a separate bugchange adapter to be used when the lock reason changes and also allow direct assignment to 'lock_reason' via the web service API for this scenario.

So if the lock reason was not set when locking the bug with 'lock()', it can then be set by direct assigning a value to 'lock_reason' via the API.

These changes will be there in the next revision.

> +
> +
>  class BugTaskAttributeChange(AttributeChange):
>      """Used to represent a change in a BugTask's attributes.
>  
> diff --git a/lib/lp/bugs/interfaces/bug.py b/lib/lp/bugs/interfaces/bug.py
> index db94b63..5250211 100644
> --- a/lib/lp/bugs/interfaces/bug.py
> +++ b/lib/lp/bugs/interfaces/bug.py
> @@ -978,9 +996,42 @@ class IBugAppend(Interface):
>          Return None if no bugtask was edited.
>          """
>  
> +class IBugModerate(Interface):
> +    """IBug attributes that require the launchpad.Moderate permission."""
> +
> +    @operation_parameters(
> +        status=copy_field(
> +            IBugView['lock_status'],

Done in the next revision.

> +            default=BugLockStatus.COMMENT_ONLY

I have updated this in the next revision to remove the default and always require a value.

> +        ),
> +        reason=copy_field(IBugView['lock_reason'])
> +    )
> +    @export_write_operation()
> +    @call_with(who=REQUEST_USER)
> +    @operation_for_version("devel")
> +    def lock(who, status=BugLockStatus.COMMENT_ONLY, reason=None):
> +        """
> +        Lock the bug metadata edits to the relevant roles.
> +
> +        :param status: The lock status of the bug - one of the values
> +                       in the BugLockStatus enum.
> +        :param reason: The reason for locking this bug.
> +        :param who: The IPerson who is making the change.
> +        """
> +
> +    @export_write_operation()
> +    @call_with(who=REQUEST_USER)
> +    @operation_for_version("devel")
> +    def unlock(who):
> +        """
> +        Unlock the bug metadata edits to the default roles.
> +
> +        :param who: The IPerson who is making the change.
> +        """
> +
>  
>  @exported_as_webservice_entry()
> -class IBug(IBugPublic, IBugView, IBugAppend, IHasLinkedBranches):
> +class IBug(IBugPublic, IBugView, IBugAppend, IBugModerate, IHasLinkedBranches):
>      """The core bug entry."""
>  
>      linked_bugbranches = exported(
> diff --git a/lib/lp/bugs/model/bug.py b/lib/lp/bugs/model/bug.py
> index f7c588c..04197bf 100644
> --- a/lib/lp/bugs/model/bug.py
> +++ b/lib/lp/bugs/model/bug.py
> @@ -2198,6 +2206,38 @@ class Bug(SQLBase, InformationTypeMixin):
>              BugActivity.datechanged <= end_date)
>          return activity_in_range
>  
> +    def lock(self, who, status=BugLockStatus.COMMENT_ONLY, reason=None):
> +        """See `IBug`."""
> +        if status != BugLockStatus.UNLOCKED and status != self.lock_status:

This is now restricted to only the locked states by the vocabulary specified in the next revision. I have also made the code changes to allow setting the lock_reason using direct assignment via the web service API.

> +            self.lock_status = status
> +            if reason:
> +                self.lock_reason = reason
> +
> +            Store.of(self).flush()

I wasn't sure when the changes to 'self' would get flushed and that is why I added this. I will check if things work okay without this and will remove it if there are no issues.

> +            self.addChange(
> +                BugLocked(
> +                    when=UTC_NOW,
> +                    person=who,
> +                    status=status,
> +                    lock_reason=reason
> +                )
> +            )
> +
> +    def unlock(self, who):
> +        """See `IBug`."""
> +        if self.lock_status != BugLockStatus.UNLOCKED:
> +            old_lock_status = self.lock_status
> +            self.lock_status = BugLockStatus.UNLOCKED
> +            self.lock_reason = None
> +            Store.of(self).flush()
> +            self.addChange(
> +                BugUnlocked(
> +                    when=UTC_NOW,
> +                    person=who,
> +                    old_status=old_lock_status
> +                )
> +            )
> +
>  
>  @ProxyFactory
>  def get_also_notified_subscribers(
> diff --git a/lib/lp/bugs/security.py b/lib/lp/bugs/security.py
> index b60b426..bdef636 100644
> --- a/lib/lp/bugs/security.py
> +++ b/lib/lp/bugs/security.py
> @@ -173,18 +189,40 @@ class EditBug(AuthorizationBase):
>              # If the user seems generally legitimate, let them through.
>              self.forwardCheckAuthenticated(
>                  user, permission='launchpad.AnyLegitimatePerson') or
> -            # The bug reporter can always edit their own bug.
> -            user.inTeam(self.obj.owner) or
> -            # Users with relevant roles can edit the bug.
> -            user.in_admin or user.in_commercial_admin or
> -            user.in_registry_experts or
> -            _has_any_bug_role(user, self.obj.bugtasks))
> +            in_allowed_roles())
>  
>      def checkUnauthenticated(self):
>          """Never allow unauthenticated users to edit a bug."""
>          return False
>  
>  
> +class ModerateBug(AuthorizationBase):
> +    """Security adapter for moderating bugs.
> +
> +    This is used for operations like locking and unlocking a bug to the
> +    relevant roles.
> +    """
> +    permission = 'launchpad.Moderate'
> +    usedfor = IBug
> +
> +    def checkAuthenticated(self, user):
> +        if not self.forwardCheckAuthenticated(
> +                user, permission='launchpad.Append'):
> +            # The user cannot even see the bug.
> +            return False
> +
> +        return (
> +            user.in_admin or
> +            user.in_commercial_admin or
> +            user.in_registry_experts or
> +            _has_any_bug_role(user, self.obj.bugtasks)
> +        )
> +
> +    def checkUnauthenticated(self):
> +        """Never allow unauthenticated users to moderate a bug."""
> +        return False

Removed in the next revision.

> +
> +
>  class PublicToAllOrPrivateToExplicitSubscribersForBug(AuthorizationBase):
>      permission = 'launchpad.View'
>      usedfor = IBug
> diff --git a/lib/lp/bugs/tests/test_bug.py b/lib/lp/bugs/tests/test_bug.py
> index 0c1e24b..bc28835 100644
> --- a/lib/lp/bugs/tests/test_bug.py
> +++ b/lib/lp/bugs/tests/test_bug.py
> @@ -410,3 +419,305 @@ class TestBugPermissions(TestCaseWithFactory, KarmaTestMixin):
>              person)
>          with person_logged_in(person):
>              self.assertTrue(checkPermission('launchpad.Edit', self.bug))
> +
> +
> +class TestBugLocking(TestCaseWithFactory):
> +    """
> +    Tests for the bug locking functionality.
> +    """
> +
> +    layer = DatabaseFunctionalLayer
> +
> +    def setUp(self):
> +        super().setUp()
> +        self.person = self.factory.makePerson()
> +        self.target = self.factory.makeProduct()
> +
> +    def test_bug_lock_status_lock_reason_default_values(self):
> +        bug = self.factory.makeBug(owner=self.person, target=self.target)
> +        self.assertEqual(bug.lock_status, BugLockStatus.UNLOCKED)

Fixed in the next revision.

> +        self.assertIsNone(bug.lock_reason)
> +
> +    def test_bug_lock_status_invalid_values(self):
> +        bug = self.factory.makeBug(owner=self.person, target=self.target)
> +        invalid_values_list = ["test", 1.23, 123, self.person]
> +        with person_logged_in(self.target.owner):
> +            for invalid_value in invalid_values_list:
> +                # Direct assignment
> +                with ExpectedException(TypeError):
> +                    removeSecurityProxy(bug).lock_status = invalid_value
> +
> +                # Via the lock() method
> +                with ExpectedException(TypeError):
> +                    bug.lock(
> +                        who=removeSecurityProxy(
> +                            bug.default_bugtask.target
> +                        ).owner,
> +                        status=invalid_value)
> +
> +    def test_bug_locking_when_bug_already_locked(self):
> +        bug = self.factory.makeBug(owner=self.person, target=self.target)
> +        with person_logged_in(self.target.owner):
> +            bug.lock(who=self.target.owner)
> +            self.assertEqual(bug.lock_status, BugLockStatus.COMMENT_ONLY)
> +            bug.lock(who=self.target.owner)
> +            self.assertEqual(bug.lock_status, BugLockStatus.COMMENT_ONLY)
> +
> +    def test_bug_locking_with_status_set_to_unlocked_does_nothing(self):
> +        bug = self.factory.makeBug(owner=self.person, target=self.target)
> +        with person_logged_in(self.target.owner):
> +            bug.lock(who=self.target.owner, status=BugLockStatus.UNLOCKED)
> +            self.assertEqual(bug.lock_status, BugLockStatus.UNLOCKED)
> +            bug.lock(who=self.target.owner)
> +            self.assertEqual(bug.lock_status, BugLockStatus.COMMENT_ONLY)
> +            bug.lock(who=self.target.owner, status=BugLockStatus.UNLOCKED)
> +            self.assertEqual(bug.lock_status, BugLockStatus.COMMENT_ONLY)
> +
> +    def test_bug_locking_with_a_reason_works(self):
> +        bug = self.factory.makeBug(owner=self.person, target=self.target)
> +        with person_logged_in(self.target.owner):
> +            bug.lock(who=self.person, reason='too hot')
> +            self.assertEqual(bug.lock_reason, 'too hot')
> +
> +    def test_bug_unlocking_clears_the_reason(self):
> +        bug = self.factory.makeBug(owner=self.person, target=self.target)
> +        with person_logged_in(self.target.owner):
> +            bug.lock(who=self.person, reason='too hot')
> +            bug.unlock(who=self.person)
> +            self.assertIsNone(bug.lock_reason)
> +
> +    def test_bug_locking_unlocking_adds_bug_activity_entries(self):
> +        bug = self.factory.makeBug(owner=self.person, target=self.target)
> +        self.assertEqual(bug.activity.count(), 1)
> +        with person_logged_in(self.target.owner):
> +            bug.lock(who=self.target.owner, reason='too hot')
> +            self.assertEqual(bug.activity.count(), 2)
> +            self.assertThat(
> +                bug.activity[1],
> +                MatchesStructure.byEquality(
> +                    person=self.target.owner,
> +                    whatchanged='bug lock status',
> +                    oldvalue=str(BugLockStatus.UNLOCKED),
> +                    newvalue=str(BugLockStatus.COMMENT_ONLY),
> +                )
> +            )
> +            bug.unlock(who=self.target.owner)
> +            self.assertEqual(bug.activity.count(), 3)
> +            self.assertThat(
> +                bug.activity[2],
> +                MatchesStructure.byEquality(
> +                    person=self.target.owner,
> +                    whatchanged='bug lock status',
> +                    oldvalue=str(BugLockStatus.COMMENT_ONLY),
> +                    newvalue=str(BugLockStatus.UNLOCKED),
> +                )
> +            )
> +
> +    def test_edit_permission_restrictions_when_a_bug_is_locked(self):
> +        bug = self.factory.makeBug(owner=self.person, target=self.target)
> +        another_person = self.factory.makePerson()
> +
> +        with person_logged_in(self.target.owner):
> +            bug.lock(who=self.target.owner)
> +
> +        self.assertEqual(bug.lock_status, BugLockStatus.COMMENT_ONLY)
> +
> +        # A user without the relevant role cannot edit a locked bug.
> +        with person_logged_in(another_person):
> +            self.assertFalse(checkPermission('launchpad.Edit', bug))
> +
> +        # The bug reporter can edit a locked bug.
> +        with person_logged_in(self.person):
> +            self.assertTrue(checkPermission('launchpad.Edit', bug))
> +
> +        # Target driver can can edit a locked bug.
> +        new_person = self.factory.makePerson()
> +        removeSecurityProxy(bug.default_bugtask.target).driver = new_person
> +        with person_logged_in(new_person):
> +            self.assertTrue(checkPermission('launchpad.Edit', bug))
> +
> +        # Admins can edit a locked bug.
> +        with admin_logged_in():
> +            self.assertTrue(checkPermission('launchpad.Edit', bug))
> +
> +        # Commercial admins can edit a locked bug.
> +        with celebrity_logged_in('commercial_admin'):
> +            self.assertTrue(checkPermission('launchpad.Edit', bug))
> +
> +        # Registry experts can edit a locked bug.
> +        with celebrity_logged_in('registry_experts'):
> +            self.assertTrue(checkPermission('launchpad.Edit', bug))
> +
> +
> +    def test_only_those_with_moderate_permission_can_lock_unlock_a_bug(self):
> +        bug = self.factory.makeBug(owner=self.person, target=self.target)
> +        another_person = self.factory.makePerson()
> +
> +        # Unauthenticated person cannot moderate a bug.
> +        self.assertFalse(checkPermission('launchpad.Moderate', bug))
> +
> +        # A user without the relevant role cannot moderate a bug.
> +        with person_logged_in(another_person):
> +            self.assertFalse(checkPermission('launchpad.Moderate', bug))
> +
> +        # The bug reporter cannot moderate a bug.
> +        with person_logged_in(self.person):
> +            self.assertFalse(checkPermission('launchpad.Moderate', bug))
> +
> +        # Admins can moderate a bug.
> +        with admin_logged_in():
> +            self.assertTrue(checkPermission('launchpad.Moderate', bug))
> +
> +        # Commercial admins can moderate a bug.
> +        with celebrity_logged_in('commercial_admin'):
> +            self.assertTrue(checkPermission('launchpad.Moderate', bug))
> +
> +        # Registry experts can moderate a bug.
> +        with celebrity_logged_in('registry_experts'):
> +            self.assertTrue(checkPermission('launchpad.Moderate', bug))
> +
> +        # Target owner can moderate a bug.
> +        with person_logged_in(
> +                removeSecurityProxy(bug.default_bugtask.target).owner
> +        ):
> +            self.assertTrue(checkPermission('launchpad.Moderate', bug))
> +
> +        # Target driver can moderate a bug.
> +        new_person = self.factory.makePerson()
> +        removeSecurityProxy(bug.default_bugtask.target).driver = new_person
> +        with person_logged_in(new_person):
> +            self.assertTrue(checkPermission('launchpad.Moderate', bug))
> +
> +        yet_another_person = self.factory.makePerson()
> +        removeSecurityProxy(
> +            bug.default_bugtask.target
> +        ).bug_supervisor = yet_another_person
> +        with person_logged_in(yet_another_person):
> +            self.assertTrue(checkPermission('launchpad.Moderate', bug))
> +
> +
> +class TestBugLockingWebService(TestCaseWithFactory):
> +    """Tests for the bug locking and unlocking web service methods."""
> +
> +    layer = DatabaseFunctionalLayer
> +
> +    def setUp(self):
> +        super().setUp()
> +        self.person = self.factory.makePerson()
> +        self.target = self.factory.makeProduct()
> +
> +    def test_who_value_for_lock_is_correctly_set(self):
> +        bug = self.factory.makeBug(owner=self.person, target=self.target)
> +        self.assertEqual(bug.activity.count(), 1)
> +        bug_url = api_url(bug)
> +        webservice = webservice_for_person(
> +            self.target.owner,
> +            permission=OAuthPermission.WRITE_PRIVATE
> +        )
> +        webservice.default_api_version = "devel"
> +
> +        response = webservice.named_post(
> +            bug_url, 'lock'
> +        )
> +        with person_logged_in(ANONYMOUS):
> +            self.assertEqual(bug.activity.count(), 2)
> +            self.assertThat(
> +                bug.activity[1],
> +                MatchesStructure.byEquality(
> +                    person=self.target.owner,
> +                    oldvalue=str(BugLockStatus.UNLOCKED),
> +                    newvalue=str(BugLockStatus.COMMENT_ONLY)
> +                )
> +            )
> +
> +        response = webservice.named_post(
> +            bug_url, 'unlock'
> +        )
> +        self.assertEqual(200, response.status)
> +        with person_logged_in(ANONYMOUS):
> +            self.assertEqual(bug.activity.count(), 3)
> +            self.assertThat(
> +                bug.activity[2],
> +                MatchesStructure.byEquality(
> +                    person=self.target.owner,
> +                    whatchanged='bug lock status',
> +                    oldvalue=str(BugLockStatus.COMMENT_ONLY),
> +                    newvalue=str(BugLockStatus.UNLOCKED),
> +                )
> +            )
> +
> +    def test_who_value_for_unlock_is_correctly_set(self):
> +        bug = self.factory.makeBug(owner=self.person, target=self.target)
> +        self.assertEqual(bug.activity.count(), 1)
> +        with person_logged_in(self.target.owner):
> +            bug.lock(who=self.target.owner)
> +        self.assertEqual(bug.activity.count(), 2)
> +        bug_url = api_url(bug)
> +        webservice = webservice_for_person(
> +            self.target.owner,
> +            permission=OAuthPermission.WRITE_PRIVATE
> +        )
> +        webservice.default_api_version = "devel"
> +
> +        response = webservice.named_post(
> +            bug_url, 'unlock'
> +        )
> +        self.assertEqual(200, response.status)
> +        with person_logged_in(ANONYMOUS):
> +            self.assertEqual(bug.activity.count(), 3)
> +            self.assertThat(
> +                bug.activity[2],
> +                MatchesStructure.byEquality(
> +                    person=self.target.owner,
> +                    oldvalue=str(BugLockStatus.COMMENT_ONLY),
> +                    newvalue=str(BugLockStatus.UNLOCKED),
> +                )
> +            )
> +
> +    def test_lock_status_lock_reason_values_unlocked_bug(self):
> +        bug = self.factory.makeBug(owner=self.person, target=self.target)
> +        bug_url = api_url(bug)
> +
> +        webservice = webservice_for_person(
> +            self.target.owner,
> +            permission=OAuthPermission.WRITE_PRIVATE
> +        )
> +        webservice.default_api_version = 'devel'
> +
> +        response = webservice.get(bug_url)
> +        self.assertEqual(200, response.status)
> +        response_json = response.jsonBody()
> +        self.assertEqual(
> +            response_json['lock_status'],
> +            str(BugLockStatus.UNLOCKED)
> +        )
> +        self.assertIsNone(response_json['lock_reason'])
> +
> +    def test_lock_status_lock_reason_values_after_locking(self):
> +        bug = self.factory.makeBug(owner=self.person, target=self.target)
> +        bug_url = api_url(bug)
> +
> +        with person_logged_in(self.target.owner):
> +            bug.lock(
> +                who=self.target.owner,
> +                status=BugLockStatus.COMMENT_ONLY,
> +                reason='too hot'
> +            )
> +
> +        webservice = webservice_for_person(
> +            self.target.owner,
> +            permission=OAuthPermission.WRITE_PRIVATE
> +        )
> +        webservice.default_api_version = 'devel'
> +
> +        response = webservice.get(bug_url)
> +        self.assertEqual(200, response.status)
> +        response_json = response.jsonBody()
> +        self.assertEqual(
> +            response_json['lock_status'],
> +            str(BugLockStatus.COMMENT_ONLY)
> +        )
> +        self.assertEqual(
> +            response_json['lock_reason'],
> +            'too hot'
> +        )
> diff --git a/lib/lp/scripts/garbo.py b/lib/lp/scripts/garbo.py
> index 5eabeb6..b91acda 100644
> --- a/lib/lp/scripts/garbo.py
> +++ b/lib/lp/scripts/garbo.py
> @@ -2022,6 +2050,7 @@ class HourlyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
>          GitRepositoryPruner,
>          RevisionCachePruner,
>          UnusedSessionPruner,
> +        PopulateBugLockStatusDefaultUnlocked,

Fixed in the next revision.

>          ]
>      experimental_tunable_loops = []
>  


-- 
https://code.launchpad.net/~lgp171188/launchpad/+git/launchpad/+merge/414097
Your team Launchpad code reviewers is requested to review the proposed merge of ~lgp171188/launchpad:enforce-stricter-permission-check-when-a-bug-is-locked-down into launchpad:master.



References