← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~adeuring/launchpad/sharingjob-remove-bp-subscriptions into lp:launchpad

 

Abel Deuring has proposed merging lp:~adeuring/launchpad/sharingjob-remove-bp-subscriptions into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~adeuring/launchpad/sharingjob-remove-bp-subscriptions/+merge/126008

This branch extends RemoveArtifactSubscriptionsJob so that it
removes subscriptions to specifications if a subscriber does no longer
have grants to access these specifications.

The change in lib/lp/registry/model/sharingjob.py is mostly
a mechanical change that adds specification handling to bug and branch
handling.

The method issues a Storm query to find subscribers of a specification
without access grants. The Storm expression "person with[out] access
to a spec" is defined in a new function get_specification_privacy_filter(),
similar to the existing functions get_bug_privacy_filter() and
get_branch_privacy_filter().

RemoveArtifactSubscriptionsJob accepted not only bugs and branches but
arbitrary objects, which I found a bit confusing while working on this
branch, so I added a check to ensure that only those objects can be
passed to this object that the class can handle, i.e. bugs, branches
and blueprints.

Many tests of RemoveArtifactSubscriptionsJob rely on two methods
_assert_bug_change_unsubscribes() and _assert_branch_change_unsubscribes()
which were quite long but nearly identical. The new test
test_change_information_type_specification() requires a third
variant of these methods, _assert_specification_change_unsubscribes().

I refactored all three _assert.* methods to use a common method.


tests:

./bin/test -vvt lp.registry.tests.test_sharingjob
./bin/test -vvt lp.blueprints.tests.test_specification.*test_get_specification_privacy_filter

no lint

-- 
https://code.launchpad.net/~adeuring/launchpad/sharingjob-remove-bp-subscriptions/+merge/126008
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~adeuring/launchpad/sharingjob-remove-bp-subscriptions into lp:launchpad.
=== modified file 'database/schema/security.cfg'
--- database/schema/security.cfg	2012-09-21 00:08:41 +0000
+++ database/schema/security.cfg	2012-09-24 13:18:20 +0000
@@ -1966,6 +1966,8 @@
 public.person                           = SELECT
 public.product                          = SELECT
 public.sharingjob                       = SELECT, INSERT, UPDATE
+public.specification                    = SELECT
+public.specificationsubscription        = SELECT, DELETE
 public.teamparticipation                = SELECT
 type=user
 

=== modified file 'lib/lp/blueprints/model/specification.py'
--- lib/lp/blueprints/model/specification.py	2012-09-18 19:41:02 +0000
+++ lib/lp/blueprints/model/specification.py	2012-09-24 13:18:20 +0000
@@ -5,6 +5,7 @@
 
 __metaclass__ = type
 __all__ = [
+    'get_specification_privacy_filter',
     'HasSpecificationsMixin',
     'recursive_blocked_query',
     'recursive_dependent_query',
@@ -31,6 +32,7 @@
     And,
     In,
     Join,
+    LeftJoin,
     Or,
     Select,
     )
@@ -724,14 +726,15 @@
         notify(ObjectCreatedEvent(sub, user=subscribed_by))
         return sub
 
-    def unsubscribe(self, person, unsubscribed_by):
+    def unsubscribe(self, person, unsubscribed_by, ignore_permissions=False):
         """See ISpecification."""
         # see if a relevant subscription exists, and if so, delete it
         if person is None:
             person = unsubscribed_by
         for sub in self.subscriptions:
             if sub.person.id == person.id:
-                if not sub.canBeUnsubscribedByUser(unsubscribed_by):
+                if (not sub.canBeUnsubscribedByUser(unsubscribed_by) and
+                    not ignore_permissions):
                     raise UserCannotUnsubscribePerson(
                         '%s does not have permission to unsubscribe %s.' % (
                             unsubscribed_by.displayname,
@@ -1226,3 +1229,55 @@
     def get(self, spec_id):
         """See lp.blueprints.interfaces.specification.ISpecificationSet."""
         return Specification.get(spec_id)
+
+
+def get_specification_privacy_filter(user):
+    """Return a Storm expression for filtering specifications by privacy.
+
+    :param user: A Person ID or a column reference.
+    :return: A Storm expression to check if a peron has access grants
+         for a specification.
+    """
+    # Avoid circular imports.
+    from lp.registry.model.accesspolicy import (
+        AccessArtifact,
+        AccessPolicy,
+        AccessPolicyGrantFlat,
+        )
+    public_specification_filter = (
+        Specification.information_type.is_in(PUBLIC_INFORMATION_TYPES))
+    if user is None:
+        return public_specification_filter
+    return Or(
+        public_specification_filter,
+        Specification.id.is_in(
+            Select(
+                Specification.id,
+                tables=(
+                    Specification,
+                    Join(
+                        AccessPolicy,
+                        And(
+                            Or(
+                                Specification.productID ==
+                                    AccessPolicy.product_id,
+                                Specification.distributionID ==
+                                    AccessPolicy.distribution_id),
+                            Specification.information_type ==
+                                AccessPolicy.type)),
+                    Join(
+                        AccessPolicyGrantFlat,
+                        AccessPolicy.id == AccessPolicyGrantFlat.policy_id),
+                    LeftJoin(
+                        AccessArtifact,
+                        AccessPolicyGrantFlat.abstract_artifact_id ==
+                            AccessArtifact.id),
+                    Join(
+                        TeamParticipation,
+                        And(
+                            TeamParticipation.team ==
+                                AccessPolicyGrantFlat.grantee_id,
+                            TeamParticipation.person == user))),
+                where=Or(
+                    AccessPolicyGrantFlat.abstract_artifact_id == None,
+                    AccessArtifact.specification_id == Specification.id))))

=== modified file 'lib/lp/blueprints/tests/test_specification.py'
--- lib/lp/blueprints/tests/test_specification.py	2012-09-17 15:19:10 +0000
+++ lib/lp/blueprints/tests/test_specification.py	2012-09-24 13:18:20 +0000
@@ -6,6 +6,7 @@
 __metaclass__ = type
 
 
+from storm.store import Store
 from zope.component import (
     getUtility,
     queryAdapter,
@@ -31,7 +32,21 @@
     )
 from lp.blueprints.errors import TargetAlreadyHasSpecification
 from lp.blueprints.interfaces.specification import ISpecificationSet
+<<<<<<< TREE
 from lp.registry.enums import SharingPermission
+=======
+from lp.registry.enums import (
+    InformationType,
+    PRIVATE_INFORMATION_TYPES,
+    PUBLIC_INFORMATION_TYPES,
+    SharingPermission,
+    SpecificationSharingPolicy,
+    )
+from lp.blueprints.model.specification import (
+    get_specification_privacy_filter,
+    Specification,
+    )
+>>>>>>> MERGE-SOURCE
 from lp.security import (
     AdminSpecification,
     EditSpecificationByRelatedPeople,
@@ -409,6 +424,49 @@
                 specification.target.owner, specification,
                 error_expected=False, attribute='name', value='foo')
 
+    def test_get_specification_privacy_filter(self):
+        # get_specification_privacy_filter() returns a Storm expression
+        # that can be used to filter specifications by their visibility-
+        owner = self.factory.makePerson()
+        product = self.factory.makeProduct(
+            owner=owner,
+            specification_sharing_policy=(
+                SpecificationSharingPolicy.PUBLIC_OR_PROPRIETARY))
+        public_spec = self.factory.makeSpecification(product=product)
+        proprietary_spec_1 = self.factory.makeSpecification(
+            product=product, information_type=InformationType.PROPRIETARY)
+        proprietary_spec_2 = self.factory.makeSpecification(
+            product=product, information_type=InformationType.PROPRIETARY)
+        all_specs = [
+            public_spec, proprietary_spec_1, proprietary_spec_2]
+        store = Store.of(product)
+        specs_for_anon = store.find(
+            Specification, get_specification_privacy_filter(None),
+            Specification.productID == product.id)
+        self.assertContentEqual([public_spec], specs_for_anon)
+        # Product owners havae grants on the product, the privacy
+        # filter returns thus all specifications for them.
+        specs_for_owner = store.find(
+            Specification, get_specification_privacy_filter(owner.id),
+            Specification.productID == product.id)
+        self.assertContentEqual(all_specs, specs_for_owner)
+        # The filter returns only public specs for ordinary users.
+        user = self.factory.makePerson()
+        specs_for_other_user = store.find(
+            Specification, get_specification_privacy_filter(user.id),
+            Specification.productID == product.id)
+        self.assertContentEqual([public_spec], specs_for_other_user)
+        # If the user has a grant for a specification, the filter returns
+        # this specification too.
+        with person_logged_in(owner):
+            getUtility(IService, 'sharing').ensureAccessGrants(
+                [user], owner, specifications=[proprietary_spec_1])
+        specs_for_other_user = store.find(
+            Specification, get_specification_privacy_filter(user.id),
+            Specification.productID == product.id)
+        self.assertContentEqual(
+            [public_spec, proprietary_spec_1], specs_for_other_user)
+
 
 class TestSpecificationSet(TestCaseWithFactory):
 

=== modified file 'lib/lp/registry/model/sharingjob.py'
--- lib/lp/registry/model/sharingjob.py	2012-09-18 19:41:02 +0000
+++ lib/lp/registry/model/sharingjob.py	2012-09-24 13:18:20 +0000
@@ -46,6 +46,14 @@
     IBug,
     IBugSet,
     )
+from lp.blueprints.interfaces.specification import ISpecification
+from lp.blueprints.model.specification import (
+    get_specification_privacy_filter,
+    Specification,
+    )
+from lp.blueprints.model.specificationsubscription import (
+    SpecificationSubscription,
+    )
 from lp.bugs.model.bugsubscription import BugSubscription
 from lp.bugs.model.bugtaskflat import BugTaskFlat
 from lp.bugs.model.bugtasksearch import get_bug_privacy_filter_terms
@@ -265,18 +273,25 @@
 
         bug_ids = []
         branch_ids = []
+        specification_ids = []
         if artifacts:
             for artifact in artifacts:
                 if IBug.providedBy(artifact):
                     bug_ids.append(artifact.id)
                 elif IBranch.providedBy(artifact):
                     branch_ids.append(artifact.id)
+                elif ISpecification.providedBy(artifact):
+                    specification_ids.append(artifact.id)
+                else:
+                    raise ValueError(
+                        'Unsupported artifact: %r' % artifact)
         information_types = [
             info_type.value for info_type in information_types or []
         ]
         metadata = {
             'bug_ids': bug_ids,
             'branch_ids': branch_ids,
+            'specification_ids': specification_ids,
             'information_types': information_types,
             'requestor.id': requestor.id
         }
@@ -308,6 +323,10 @@
         return [getUtility(IBranchLookup).get(id) for id in self.branch_ids]
 
     @property
+    def specification_ids(self):
+        return self.metadata.get('specification_ids', [])
+
+    @property
     def information_types(self):
         if not 'information_types' in self.metadata:
             return []
@@ -332,6 +351,7 @@
             'requestor': self.requestor.name,
             'bug_ids': self.bug_ids,
             'branch_ids': self.branch_ids,
+            'specification_ids': self.specification_ids,
             'pillar': getattr(self.pillar, 'name', None),
             'grantee': getattr(self.grantee, 'name', None)
             }
@@ -346,9 +366,13 @@
 
         bug_filters = []
         branch_filters = []
+        specification_filters = []
 
         if self.branch_ids:
             branch_filters.append(Branch.id.is_in(self.branch_ids))
+        if self.specification_ids:
+            specification_filters.append(Specification.id.is_in(
+                self.specification_ids))
         if self.bug_ids:
             bug_filters.append(BugTaskFlat.bug_id.is_in(self.bug_ids))
         else:
@@ -358,14 +382,21 @@
                         self.information_types))
                 branch_filters.append(
                     Branch.information_type.is_in(self.information_types))
+                specification_filters.append(
+                    Specification.information_type.is_in(
+                        self.information_types))
             if self.product:
                 bug_filters.append(
                     BugTaskFlat.product == self.product)
                 branch_filters.append(Branch.product == self.product)
+                specification_filters.append(
+                    Specification.product == self.product)
             if self.distro:
                 bug_filters.append(
                     BugTaskFlat.distribution == self.distro)
                 branch_filters.append(Branch.distribution == self.distro)
+                specification_filters.append(
+                    Specification.distribution == self.distro)
 
         if self.grantee:
             bug_filters.append(
@@ -378,6 +409,11 @@
                     Select(
                         TeamParticipation.personID,
                         where=TeamParticipation.team == self.grantee)))
+            specification_filters.append(
+                In(SpecificationSubscription.personID,
+                    Select(
+                        TeamParticipation.personID,
+                        where=TeamParticipation.team == self.grantee)))
 
         if bug_filters:
             bug_filters.append(Not(
@@ -402,3 +438,20 @@
             for sub in branch_subscriptions:
                 sub.branch.unsubscribe(
                     sub.person, self.requestor, ignore_permissions=True)
+        if specification_filters:
+            specification_filters.append(
+                Not(get_specification_privacy_filter(
+                    SpecificationSubscription.personID)))
+            tables = (
+                SpecificationSubscription,
+                Join(
+                    Specification,
+                    Specification.id ==
+                        SpecificationSubscription.specificationID))
+            specifications_subscriptions = IStore(
+                SpecificationSubscription).using(*tables).find(
+                SpecificationSubscription, *specification_filters).config(
+                distinct=True)
+            for sub in specifications_subscriptions:
+                sub.specification.unsubscribe(
+                    sub.person, self.requestor, ignore_permissions=True)

=== modified file 'lib/lp/registry/tests/test_sharingjob.py'
--- lib/lp/registry/tests/test_sharingjob.py	2012-09-18 19:41:02 +0000
+++ lib/lp/registry/tests/test_sharingjob.py	2012-09-24 13:18:20 +0000
@@ -17,6 +17,13 @@
     BranchSubscriptionNotificationLevel,
     CodeReviewNotificationLevel,
     )
+<<<<<<< TREE
+=======
+from lp.registry.enums import (
+    InformationType,
+    SpecificationSharingPolicy,
+    )
+>>>>>>> MERGE-SOURCE
 from lp.registry.interfaces.accesspolicy import (
     IAccessArtifactGrantSource,
     IAccessPolicySource,
@@ -117,6 +124,16 @@
             'for branch_ids=[%d], requestor=%s>'
             % (branch.id, requestor.name), repr(job))
 
+    def test_repr_specifications(self):
+        requestor = self.factory.makePerson()
+        specification = self.factory.makeSpecification()
+        job = getUtility(IRemoveArtifactSubscriptionsJobSource).create(
+            requestor, artifacts=[specification])
+        self.assertEqual(
+            '<REMOVE_ARTIFACT_SUBSCRIPTIONS job reconciling subscriptions '
+            'for requestor=%s, specification_ids=[%d]>'
+            % (requestor.name, specification.id), repr(job))
+
     def test_create_success(self):
         # Create an instance of SharingJobDerived that delegates to SharingJob.
         self.assertIs(True, ISharingJobSource.providedBy(SharingJobDerived))
@@ -244,6 +261,17 @@
         self.assertEqual(requestor.id, naked_job.requestor_id)
         self.assertContentEqual([bug.id], naked_job.bug_ids)
 
+    def test_create__bad_artifact_class(self):
+        # A ValueError is raised if an object of an unsupported type
+        # is passed as an artifact to
+        # IRemoveArtifactSubscriptionsJob.create().
+        requestor = self.factory.makePerson()
+        wrong_object = self.factory.makePerson()
+        self.assertRaises(
+            ValueError,
+            getUtility(IRemoveArtifactSubscriptionsJobSource).create,
+            requestor, [wrong_object])
+
     def test_getErrorRecipients(self):
         # The pillar owner and job requestor are the error recipients.
         requestor = self.factory.makePerson()
@@ -257,10 +285,19 @@
         self.assertContentEqual(
             expected_emails, job.getErrorRecipients())
 
-    def _assert_bug_change_unsubscribes(self, change_callback):
-        # Subscribers are unsubscribed if the bug becomes invisible due to a
-        # change in information_type.
-        product = self.factory.makeProduct()
+    CHANGE_BUG = 0
+    CHANGE_BRANCH = 1
+    CHANGE_SPECIFICATION = 2
+
+    def _assert_artifact_change_unsubscribes(self, change_callback,
+                                             test_type):
+        # Subscribers are unsubscribed if the artifact becomes invisible
+        # due to a change in information_type.
+        product = self.factory.makeProduct(
+            specification_sharing_policy=(
+                SpecificationSharingPolicy.EMBARGOED_OR_PROPRIETARY))
+        self.factory.makeAccessPolicy(product, InformationType.PROPRIETARY)
+        self.factory.makeAccessPolicy(product, InformationType.EMBARGOED)
         owner = self.factory.makePerson()
         [policy] = getUtility(IAccessPolicySource).find(
             [(product, InformationType.USERDATA)])
@@ -278,6 +315,9 @@
         branch = self.factory.makeBranch(
             owner=owner, product=product,
             information_type=InformationType.USERDATA)
+        specification = self.factory.makeSpecification(
+            owner=owner, product=product,
+            information_type=InformationType.PROPRIETARY)
 
         # The artifact grantees will not lose access when the job is run.
         artifact_indirect_grantee = self.factory.makePerson()
@@ -285,113 +325,117 @@
             membership_policy=TeamMembershipPolicy.RESTRICTED,
             members=[artifact_indirect_grantee])
 
-        bug.subscribe(policy_team_grantee, owner)
-        bug.subscribe(policy_indirect_grantee, owner)
-        bug.subscribe(artifact_team_grantee, owner)
         bug.subscribe(artifact_indirect_grantee, owner)
         branch.subscribe(artifact_indirect_grantee,
             BranchSubscriptionNotificationLevel.NOEMAIL, None,
             CodeReviewNotificationLevel.NOEMAIL, owner)
+        # Subscribing somebody to a specification does not automatically
+        # create an artifact grant.
+        spec_artifact = self.factory.makeAccessArtifact(specification)
+        self.factory.makeAccessArtifactGrant(
+            spec_artifact, artifact_indirect_grantee)
+        specification.subscribe(artifact_indirect_grantee, owner)
+
+        if test_type == self.CHANGE_BUG:
+            concrete_artifact = bug
+            bug.subscribe(policy_team_grantee, owner)
+            bug.subscribe(policy_indirect_grantee, owner)
+            bug.subscribe(artifact_team_grantee, owner)
+
+            def get_bug_pillars(concrete_artifact):
+                return concrete_artifact.affected_pillars
+
+            def get_bug_subscribers(concrete_artifact):
+                return removeSecurityProxy(
+                    concrete_artifact).getDirectSubscribers()
+
+            get_pillars = get_bug_pillars
+            get_subscribers = get_bug_subscribers
+
+        elif test_type == self.CHANGE_BRANCH:
+            concrete_artifact = branch
+            branch.subscribe(
+                policy_team_grantee,
+                BranchSubscriptionNotificationLevel.NOEMAIL,
+                None, CodeReviewNotificationLevel.NOEMAIL, owner)
+            branch.subscribe(
+                policy_indirect_grantee,
+                BranchSubscriptionNotificationLevel.NOEMAIL, None,
+                CodeReviewNotificationLevel.NOEMAIL, owner)
+            branch.subscribe(
+                artifact_team_grantee,
+                BranchSubscriptionNotificationLevel.NOEMAIL, None,
+                CodeReviewNotificationLevel.NOEMAIL, owner)
+
+            def get_branch_pillars(concrete_artifact):
+                return [concrete_artifact.product]
+
+            def get_branch_subscribers(concrete_artifact):
+                return concrete_artifact.subscribers
+
+            get_pillars = get_branch_pillars
+            get_subscribers = get_branch_subscribers
+
+        elif test_type == self.CHANGE_SPECIFICATION:
+            concrete_artifact = specification
+            specification.subscribe(policy_team_grantee, owner)
+            specification.subscribe(policy_indirect_grantee, owner)
+            self.factory.makeAccessArtifactGrant(
+                spec_artifact, artifact_team_grantee)
+            specification.subscribe(artifact_team_grantee, owner)
+
+            def get_spec_pillars(concrete_artifact):
+                return [concrete_artifact.product]
+
+            def get_spec_subscribers(concrete_artifact):
+                return concrete_artifact.subscribers
+
+            get_pillars = get_spec_pillars
+            get_subscribers = get_spec_subscribers
+
         # Subscribing policy_team_grantee has created an artifact grant so we
         # need to revoke that to test the job.
-        artifact = self.factory.makeAccessArtifact(concrete=bug)
+        artifact = self.factory.makeAccessArtifact(concrete=concrete_artifact)
         getUtility(IAccessArtifactGrantSource).revokeByArtifact(
             [artifact], [policy_team_grantee])
 
         # policy grantees are subscribed because the job has not been run yet.
-        subscribers = removeSecurityProxy(bug).getDirectSubscribers()
+        subscribers = get_subscribers(concrete_artifact)
         self.assertIn(policy_team_grantee, subscribers)
         self.assertIn(policy_indirect_grantee, subscribers)
 
-        # Change bug attributes so that it can become inaccessible for
+        # Change artifact attributes so that it can become inaccessible for
         # some users.
-        change_callback(bug)
+        change_callback(concrete_artifact)
         reconcile_access_for_artifact(
-            bug, bug.information_type, bug.affected_pillars)
+            concrete_artifact, concrete_artifact.information_type,
+            get_pillars(concrete_artifact))
 
         getUtility(IRemoveArtifactSubscriptionsJobSource).create(
-            owner, [bug])
+            owner, [concrete_artifact])
         with block_on_job(self):
             transaction.commit()
 
         # Check the result. Policy grantees will be unsubscribed.
-        subscribers = removeSecurityProxy(bug).getDirectSubscribers()
+        subscribers = get_subscribers(concrete_artifact)
         self.assertNotIn(policy_team_grantee, subscribers)
         self.assertNotIn(policy_indirect_grantee, subscribers)
         self.assertIn(artifact_team_grantee, subscribers)
-        self.assertIn(artifact_indirect_grantee, subscribers)
+        self.assertIn(artifact_indirect_grantee, bug.getDirectSubscribers())
         self.assertIn(artifact_indirect_grantee, branch.subscribers)
+        self.assertIn(artifact_indirect_grantee, specification.subscribers)
+
+    def _assert_bug_change_unsubscribes(self, change_callback):
+        self._assert_artifact_change_unsubscribes(
+            change_callback, self.CHANGE_BUG)
 
     def _assert_branch_change_unsubscribes(self, change_callback):
-        product = self.factory.makeProduct()
-        owner = self.factory.makePerson()
-        [policy] = getUtility(IAccessPolicySource).find(
-            [(product, InformationType.USERDATA)])
-        # The policy grantees will lose access.
-        policy_indirect_grantee = self.factory.makePerson()
-        policy_team_grantee = self.factory.makeTeam(
-            membership_policy=TeamMembershipPolicy.RESTRICTED,
-            members=[policy_indirect_grantee])
-
-        self.factory.makeAccessPolicyGrant(policy, policy_team_grantee, owner)
-        login_person(owner)
-        bug = self.factory.makeBug(
-            owner=owner, target=product,
-            information_type=InformationType.USERDATA)
-        branch = self.factory.makeBranch(
-            owner=owner, product=product,
-            information_type=InformationType.USERDATA)
-
-        # The artifact grantees will not lose access when the job is run.
-        artifact_indirect_grantee = self.factory.makePerson()
-        artifact_team_grantee = self.factory.makeTeam(
-            membership_policy=TeamMembershipPolicy.RESTRICTED,
-            members=[artifact_indirect_grantee])
-
-        branch.subscribe(
-            policy_team_grantee, BranchSubscriptionNotificationLevel.NOEMAIL,
-            None, CodeReviewNotificationLevel.NOEMAIL, owner)
-        branch.subscribe(
-            policy_indirect_grantee,
-            BranchSubscriptionNotificationLevel.NOEMAIL, None,
-            CodeReviewNotificationLevel.NOEMAIL, owner)
-        branch.subscribe(
-            artifact_team_grantee,
-            BranchSubscriptionNotificationLevel.NOEMAIL, None,
-            CodeReviewNotificationLevel.NOEMAIL, owner)
-        branch.subscribe(
-            artifact_indirect_grantee,
-            BranchSubscriptionNotificationLevel.NOEMAIL, None,
-            CodeReviewNotificationLevel.NOEMAIL, owner)
-        bug.subscribe(artifact_indirect_grantee, owner)
-        # Subscribing policy_team_grantee has created an artifact grant so we
-        # need to revoke that to test the job.
-        artifact = self.factory.makeAccessArtifact(concrete=branch)
-        getUtility(IAccessArtifactGrantSource).revokeByArtifact(
-            [artifact], [policy_team_grantee])
-
-        # policy grantees are subscribed because the job has not been run yet.
-        #subscribers = removeSecurityProxy(branch).subscribers
-        self.assertIn(policy_team_grantee, branch.subscribers)
-        self.assertIn(policy_indirect_grantee, branch.subscribers)
-
-        # Change branch attributes so that it can become inaccessible for
-        # some users.
-        change_callback(branch)
-        reconcile_access_for_artifact(
-            branch, branch.information_type, [branch.product])
-
-        getUtility(IRemoveArtifactSubscriptionsJobSource).create(
-            owner, [branch])
-        with block_on_job(self):
-            transaction.commit()
-
-        # Check the result. Policy grantees will be unsubscribed.
-        self.assertNotIn(policy_team_grantee, branch.subscribers)
-        self.assertNotIn(policy_indirect_grantee, branch.subscribers)
-        self.assertIn(artifact_team_grantee, branch.subscribers)
-        self.assertIn(artifact_indirect_grantee, branch.subscribers)
-        self.assertIn(artifact_indirect_grantee, bug.getDirectSubscribers())
+        self._assert_artifact_change_unsubscribes(
+            change_callback, self.CHANGE_BRANCH)
+
+    def _assert_specification_change_unsubscribes(self, change_callback):
+        self._assert_artifact_change_unsubscribes(
+            change_callback, self.CHANGE_SPECIFICATION)
 
     def test_change_information_type_branch(self):
         def change_information_type(branch):
@@ -400,6 +444,13 @@
 
         self._assert_branch_change_unsubscribes(change_information_type)
 
+    def test_change_information_type_specification(self):
+        def change_information_type(specification):
+            removeSecurityProxy(specification).information_type = (
+                InformationType.EMBARGOED)
+
+        self._assert_specification_change_unsubscribes(change_information_type)
+
     def test_change_information_type(self):
         # Changing the information type of a bug unsubscribes users who can no
         # longer see the bug.


Follow ups