← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~lgp171188/launchpad:vulnerability-subscription-model into launchpad:master

 

Guruprasad has proposed merging ~lgp171188/launchpad:vulnerability-subscription-model into launchpad:master.

Commit message:
WIP changes

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~lgp171188/launchpad/+git/launchpad/+merge/427274
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~lgp171188/launchpad:vulnerability-subscription-model into launchpad:master.
diff --git a/lib/lp/bugs/interfaces/vulnerability.py b/lib/lp/bugs/interfaces/vulnerability.py
index ac60ca2..6fe9abc 100644
--- a/lib/lp/bugs/interfaces/vulnerability.py
+++ b/lib/lp/bugs/interfaces/vulnerability.py
@@ -13,7 +13,7 @@ __all__ = [
 
 from lazr.enum import DBEnumeratedType, DBItem
 from lazr.restful.declarations import exported, exported_as_webservice_entry
-from lazr.restful.fields import Reference
+from lazr.restful.fields import CollectionField, Reference
 from zope.interface import Interface
 from zope.schema import Choice, Datetime, Int, TextLine
 
@@ -133,6 +133,35 @@ class IVulnerabilityView(Interface):
         ),
         as_of="devel",
     )
+    subscriptions = CollectionField(
+        title=_("VulnerabilitySubscriptions for this vulnerability."),
+        readonly=True,
+        value_type=Reference(Interface),
+    )
+
+    subscribers = CollectionField(
+        title=_("Persons subscribed to this vulnerability."),
+        readonly=True,
+        value_type=Reference(IPerson),
+    )
+
+    def visibleByUser(user):
+        """Can this user see this vulnerability?"""
+
+    def getSubscription(person):
+        """Returns the person's subscription for this vulnerability."""
+
+    def hasSubscription(person):
+        """Is this person subscribed to this vulnerability?"""
+
+    def userCanBeSubscribed(person):
+        """Can this person be subscribed to this vulnerability?"""
+
+    def subscribe(person, subscribed_by):
+        """Subscribe a person to this vulnerability."""
+
+    def unsubscribe(person, unsubscribed_by):
+        """Unsubscribe a person from this vulnerability."""
 
 
 class IVulnerabilityEditableAttributes(Interface):
@@ -285,6 +314,9 @@ class IVulnerabilitySet(Interface):
         :param date_made_public: The date this vulnerability was made public.
         """
 
+    def findByIds(vulnerability_ids, visible_by_user=None):
+        """Returns the vulnerabilities with the given IDs."""
+
 
 class IVulnerabilityActivity(Interface):
     """`IVulnerabilityActivity` attributes that require launchpad.View."""
diff --git a/lib/lp/bugs/interfaces/vulnerabilitysubscription.py b/lib/lp/bugs/interfaces/vulnerabilitysubscription.py
new file mode 100644
index 0000000..06977d1
--- /dev/null
+++ b/lib/lp/bugs/interfaces/vulnerabilitysubscription.py
@@ -0,0 +1,43 @@
+# Copyright 2022 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Vulnerability subscription model."""
+
+__all__ = ["IVulnerabilitySubscription"]
+
+from lazr.restful.fields import Reference
+from zope.interface import Interface
+from zope.schema import Datetime, Int
+
+from lp import _
+from lp.bugs.interfaces.vulnerability import IVulnerability
+from lp.services.fields import PersonChoice
+
+
+class IVulnerabilitySubscription(Interface):
+    """A person subscription to a specific Vulnerability."""
+
+    id = Int(title=_("ID"), readonly=True, required=True)
+    person = PersonChoice(
+        title=_("Person"),
+        required=True,
+        vocabulary="ValidPersonOrTeam",
+        readonly=True,
+        description=_("The person subscribed to the related vulnerability."),
+    )
+    vulnerability = Reference(
+        IVulnerability, title=_("OCI recipe"), required=True, readonly=True
+    )
+    subscribed_by = PersonChoice(
+        title=("Subscribed by"),
+        required=True,
+        vocabulary="ValidPersonOrTeam",
+        readonly=True,
+        description=_("The person who created this subscription."),
+    )
+    date_created = Datetime(
+        title=_("Date subscribed"), required=True, readonly=True
+    )
+
+    def canBeUnsubscribedByUser(user):
+        """Can the user unsubscribe the subscriber from the vulnerability?"""
diff --git a/lib/lp/bugs/model/tests/test_vulnerability.py b/lib/lp/bugs/model/tests/test_vulnerability.py
index 32a902f..b5cd04a 100644
--- a/lib/lp/bugs/model/tests/test_vulnerability.py
+++ b/lib/lp/bugs/model/tests/test_vulnerability.py
@@ -2,16 +2,29 @@
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Tests for the vulnerability and related models."""
+from unittest.mock import patch
+
 from testtools.matchers import MatchesStructure
 from zope.component import getUtility
+from zope.security.proxy import removeSecurityProxy
 
+from lp.app.enums import InformationType
+from lp.app.errors import (
+    SubscriptionPrivacyViolation,
+    UserCannotUnsubscribePerson,
+)
+from lp.app.interfaces.launchpad import ILaunchpadCelebrities
+from lp.app.interfaces.services import IService
 from lp.bugs.enums import VulnerabilityStatus
 from lp.bugs.interfaces.buglink import IBugLinkTarget
+from lp.bugs.interfaces.bugtask import BugTaskImportance
 from lp.bugs.interfaces.vulnerability import (
     IVulnerability,
     IVulnerabilitySet,
     VulnerabilityChange,
 )
+from lp.bugs.model.vulnerabilitysubscription import VulnerabilitySubscription
+from lp.registry.enums import SpecificationSharingPolicy, TeamMembershipPolicy
 from lp.services.webapp.authorization import check_permission
 from lp.testing import (
     TestCaseWithFactory,
@@ -23,6 +36,16 @@ from lp.testing import (
 from lp.testing.layers import DatabaseFunctionalLayer
 
 
+def grant_access_to_non_public_vulnerability(vulnerability, person):
+    distribution = removeSecurityProxy(vulnerability).distribution
+    with person_logged_in(distribution.owner):
+        getUtility(IService, "sharing").ensureAccessGrants(
+            [person],
+            distribution.owner,
+            vulnerabilities=[vulnerability],
+        )
+
+
 class TestVulnerability(TestCaseWithFactory):
 
     layer = DatabaseFunctionalLayer
@@ -34,6 +57,19 @@ class TestVulnerability(TestCaseWithFactory):
             distribution=self.distribution
         )
 
+    def makeProprietaryDistribution(self):
+        return self.factory.makeDistribution(
+            specification_sharing_policy=SpecificationSharingPolicy.PROPRIETARY
+        )
+
+    def makeProprietaryVulnerability(self, distribution=None):
+        if distribution is None:
+            distribution = self.makeProprietaryDistribution()
+        return self.factory.makeVulnerability(
+            distribution=distribution,
+            information_type=InformationType.PROPRIETARY,
+        )
+
     def test_Vulnerability_implements_IVulnerability(self):
         vulnerability = self.factory.makeVulnerability()
         self.assertTrue(verifyObject(IVulnerability, vulnerability))
@@ -42,6 +78,301 @@ class TestVulnerability(TestCaseWithFactory):
         vulnerability = self.factory.makeVulnerability()
         self.assertTrue(verifyObject(IBugLinkTarget, vulnerability))
 
+    def test_Vulnerability_subscriptions_subscribers_empty_default(self):
+        vulnerability = self.factory.makeVulnerability()
+        self.assertEqual(0, vulnerability.subscribers.count())
+        self.assertEqual(0, vulnerability.subscriptions.count())
+
+    def test_public_vulnerability_visibleByUser(self):
+        vulnerability = self.factory.makeVulnerability()
+        self.assertTrue(vulnerability.visibleByUser(None))
+        self.assertTrue(vulnerability.visibleByUser(self.factory.makePerson()))
+
+    def test_non_public_vulnerability_visibleByUser(self):
+        # XXX lgp171188 - We use the 'Proprietary' sharing policy
+        # as an example non-public information_type and may have to
+        # add tests for other non-public types in the future.
+        distribution = self.makeProprietaryDistribution()
+        vulnerability = self.makeProprietaryVulnerability(distribution)
+        allowed_user = self.factory.makePerson()
+        grant_access_to_non_public_vulnerability(
+            vulnerability,
+            allowed_user,
+        )
+        with person_logged_in(distribution.owner):
+            self.assertFalse(vulnerability.visibleByUser(None))
+            self.assertFalse(
+                vulnerability.visibleByUser(self.factory.makePerson())
+            )
+            self.assertTrue(vulnerability.visibleByUser(allowed_user))
+
+    @patch("lp.bugs.model.vulnerability.reconcile_access_for_artifacts")
+    def test_setting_information_type_reconciles_access(
+        self, mock_reconcile_method
+    ):
+        vulnerability = self.factory.makeVulnerability()
+        self.assertEqual(
+            InformationType.PUBLIC, vulnerability.information_type
+        )
+        with person_logged_in(vulnerability.distribution.owner):
+            vulnerability.information_type = InformationType.PROPRIETARY
+        mock_reconcile_method.assert_called_with(
+            [vulnerability],
+            InformationType.PROPRIETARY,
+            [vulnerability.distribution],
+        )
+
+    def test_getSubscription_person_is_None(self):
+        self.assertIsNone(
+            self.factory.makeVulnerability().getSubscription(None)
+        )
+
+    def test_getSubscription_person_is_not_subscribed(self):
+        person = self.factory.makePerson()
+        vulnerability = self.factory.makeVulnerability()
+        self.assertIsNone(vulnerability.getSubscription(person))
+
+    def test_getSubscription_person_is_subscribed(self):
+        person = self.factory.makePerson()
+        vulnerability = self.factory.makeVulnerability()
+        subscription = VulnerabilitySubscription(
+            person=person, vulnerability=vulnerability, subscribed_by=person
+        )
+        self.assertEqual(subscription, vulnerability.getSubscription(person))
+
+    def test_hasSubscription(self):
+        person = self.factory.makePerson()
+        vulnerability = self.factory.makeVulnerability()
+        self.assertFalse(vulnerability.hasSubscription(person))
+        VulnerabilitySubscription(
+            person=person,
+            vulnerability=vulnerability,
+            subscribed_by=person,
+        )
+        self.assertTrue(vulnerability.hasSubscription(person))
+
+    def test_userCanBeSubscribed_person_public_vulnerability(self):
+        person = self.factory.makePerson()
+        vulnerability = self.factory.makeVulnerability()
+        self.assertTrue(vulnerability.userCanBeSubscribed(person))
+
+    def test_userCanBeSubscribed_person_non_public_vulnerability(self):
+        person = self.factory.makePerson()
+        vulnerability = removeSecurityProxy(
+            self.makeProprietaryVulnerability(
+                self.makeProprietaryDistribution()
+            )
+        )
+        self.assertTrue(vulnerability.userCanBeSubscribed(person))
+
+    def test_userCanBeSubscribed_public_vulnerability_non_open_team(self):
+        team = self.factory.makeTeam(
+            membership_policy=TeamMembershipPolicy.RESTRICTED
+        )
+        self.assertFalse(team.anyone_can_join())
+        vulnerability = self.factory.makeVulnerability()
+        self.assertTrue(vulnerability.userCanBeSubscribed(team))
+
+    def test_userCanBeSubscribed_non_public_vulnerability_non_open_team(self):
+        team = self.factory.makeTeam(
+            membership_policy=TeamMembershipPolicy.RESTRICTED,
+        )
+        vulnerability = removeSecurityProxy(
+            self.makeProprietaryVulnerability(
+                self.makeProprietaryDistribution()
+            )
+        )
+        self.assertTrue(vulnerability.userCanBeSubscribed(team))
+
+    def test_userCanBeSubscribed_non_public_vulnerability_open_team(self):
+        team = removeSecurityProxy(self.factory.makeTeam())
+        self.assertTrue(team.anyone_can_join())
+        vulnerability = removeSecurityProxy(
+            self.makeProprietaryVulnerability(
+                self.makeProprietaryDistribution()
+            )
+        )
+        self.assertFalse(vulnerability.userCanBeSubscribed(team))
+
+    def test_subscribe_person_to_vulnerability(self):
+        person = self.factory.makePerson()
+        vulnerability = self.factory.makeVulnerability()
+        vulnerability.subscribe(person, vulnerability.distribution.owner)
+        self.assertTrue(vulnerability.hasSubscription(person))
+
+        non_public_vulnerability = removeSecurityProxy(
+            self.makeProprietaryVulnerability()
+        )
+        distribution_owner = non_public_vulnerability.distribution.owner
+        with person_logged_in(distribution_owner):
+            non_public_vulnerability.subscribe(
+                person,
+                distribution_owner,
+            )
+            self.assertTrue(non_public_vulnerability.hasSubscription(person))
+
+    def test_subscribe_open_team_non_public_vulnerability(self):
+        open_team = self.factory.makeTeam()
+        vulnerability = removeSecurityProxy(
+            self.makeProprietaryVulnerability()
+        )
+        distribution_owner = vulnerability.distribution.owner
+        with person_logged_in(distribution_owner):
+            self.assertRaises(
+                SubscriptionPrivacyViolation,
+                vulnerability.subscribe,
+                open_team,
+                distribution_owner,
+            )
+
+    def test_subscribe_open_team_public_vulnerability(self):
+        open_team = self.factory.makeTeam()
+        vulnerability = self.factory.makeVulnerability()
+        self.assertFalse(vulnerability.hasSubscription(open_team))
+        vulnerability.subscribe(open_team, vulnerability.distribution.owner)
+        self.assertTrue(vulnerability.hasSubscription(open_team))
+
+    def test_subscribe_subscribing_a_person_with_existing_subscription(self):
+        person = self.factory.makePerson()
+        vulnerability = self.factory.makeVulnerability()
+        vulnerability.subscribe(
+            person,
+            vulnerability.distribution.owner,
+        )
+        self.assertTrue(vulnerability.hasSubscription(person))
+        vulnerability.subscribe(
+            person,
+            vulnerability.distribution.owner,
+        )
+        self.assertTrue(vulnerability.hasSubscription(person))
+
+        vulnerability2 = removeSecurityProxy(
+            self.makeProprietaryVulnerability()
+        )
+        distribution_owner = vulnerability2.distribution.owner
+        with person_logged_in(distribution_owner):
+            vulnerability2.subscribe(person, distribution_owner)
+            self.assertTrue(vulnerability2.hasSubscription(person))
+            vulnerability2.subscribe(person, distribution_owner)
+            self.assertTrue(vulnerability2.hasSubscription(person))
+
+    def test_subscribing_to_non_public_vulnerability_makes_it_visible(self):
+        person = self.factory.makePerson()
+        vulnerability = self.makeProprietaryVulnerability()
+        distribution_owner = removeSecurityProxy(
+            vulnerability
+        ).distribution.owner
+        with person_logged_in(person):
+            self.assertFalse(check_permission("launchpad.View", vulnerability))
+            self.assertFalse(check_permission("launchpad.Edit", vulnerability))
+
+        with person_logged_in(distribution_owner):
+            vulnerability.subscribe(person, distribution_owner)
+        with person_logged_in(person):
+            self.assertTrue(check_permission("launchpad.View", vulnerability))
+            self.assertFalse(check_permission("launchpad.Edit", vulnerability))
+
+    def test_subscribers_subscriptions(self):
+        person1 = self.factory.makePerson()
+        person2 = self.factory.makePerson()
+        vulnerability = self.factory.makeVulnerability()
+        self.assertEqual(0, vulnerability.subscriptions.count())
+        self.assertEqual(0, vulnerability.subscribers.count())
+        vulnerability.subscribe(person1, person1)
+        vulnerability.subscribe(person2, person2)
+        self.assertContentEqual({person1, person2}, vulnerability.subscribers)
+        self.assertEqual(2, vulnerability.subscriptions.count())
+
+    def test_unsubscribe_user_not_subscribed(self):
+        person = self.factory.makePerson()
+        vulnerability = self.factory.makeVulnerability()
+        self.assertFalse(vulnerability.hasSubscription(person))
+        vulnerability.unsubscribe(person, person)
+        self.assertFalse(vulnerability.hasSubscription(person))
+
+    def test_unsubscribe_random_user_cannot_unsubscribe_a_subscriber(self):
+        person = self.factory.makePerson()
+        person2 = self.factory.makePerson()
+        vulnerability = self.factory.makeVulnerability()
+        vulnerability.subscribe(person, person)
+        self.assertRaises(
+            UserCannotUnsubscribePerson,
+            vulnerability.unsubscribe,
+            person,
+            person2,
+        )
+
+    def test_unsubscribe_self(self):
+        person = self.factory.makePerson()
+        vulnerability = self.factory.makeVulnerability()
+        vulnerability.subscribe(person, person)
+        self.assertTrue(vulnerability.hasSubscription(person))
+        vulnerability.unsubscribe(person, person)
+        self.assertFalse(vulnerability.hasSubscription(person))
+
+    def test_privileged_users_can_unsubscribe_other_subscribers(self):
+        person = self.factory.makePerson()
+        person2 = self.factory.makePerson()
+        person3 = self.factory.makePerson()
+        person4 = self.factory.makePerson()
+        creator = self.factory.makeTeam(members=[person2])
+        vulnerability = self.factory.makeVulnerability(creator=creator)
+        distribution = vulnerability.distribution
+        with person_logged_in(distribution.owner):
+            distribution.security_admin = self.factory.makeTeam(
+                members=[person4]
+            )
+
+        vulnerability.subscribe(person, person)
+        self.assertTrue(vulnerability.hasSubscription(person))
+        # Users in the vulnerability's creator team can unsubscribe user.
+        vulnerability.unsubscribe(person, person2)
+        self.assertFalse(vulnerability.hasSubscription(person))
+
+        vulnerability.subscribe(person, person)
+        self.assertTrue(vulnerability.hasSubscription(person))
+        # Vulnerability's distribution owner can unsubscribe subscribers.
+        vulnerability.unsubscribe(person, vulnerability.distribution.owner)
+        self.assertFalse(vulnerability.hasSubscription(person))
+
+        vulnerability.subscribe(person, person)
+        self.assertTrue(vulnerability.hasSubscription(person))
+        # Vulnerability's distribution's security admins can unsubscribe
+        # subscribers.
+        vulnerability.unsubscribe(person, person4)
+        self.assertFalse(vulnerability.hasSubscription(person))
+
+        vulnerability.subscribe(person, person3)
+        self.assertTrue(vulnerability.hasSubscription(person))
+        # The person who created the subscription can unsubscribe the person.
+        vulnerability.unsubscribe(person, person3)
+        self.assertFalse(vulnerability.hasSubscription(person))
+
+        # Admins can unsubscribe the person.
+        vulnerability.subscribe(person, person)
+        self.assertTrue(vulnerability.hasSubscription(person))
+        vulnerability.unsubscribe(
+            person, getUtility(ILaunchpadCelebrities).admin.teamowner
+        )
+        self.assertFalse(vulnerability.hasSubscription(person))
+
+    def test_unsubscribe_removes_visibility_of_non_public_vulnerability(self):
+        person = self.factory.makePerson()
+        vulnerability = removeSecurityProxy(
+            self.makeProprietaryVulnerability()
+        )
+        distribution_owner = vulnerability.distribution.owner
+        with person_logged_in(distribution_owner):
+            vulnerability.subscribe(person, distribution_owner)
+
+        with person_logged_in(person):
+            self.assertTrue(check_permission("launchpad.View", vulnerability))
+            vulnerability.unsubscribe(person, person)
+
+        # Have to re-login again for the permission cache to get invalidated.
+        with person_logged_in(person):
+            self.assertFalse(check_permission("launchpad.View", vulnerability))
+
     def test_random_user_permissions(self):
         with person_logged_in(self.factory.makePerson()):
             self.assertTrue(
@@ -51,6 +382,18 @@ class TestVulnerability(TestCaseWithFactory):
                 check_permission("launchpad.Edit", self.vulnerability)
             )
 
+    def test_random_user_permissions_non_public_vulnerability(self):
+        vulnerability = self.makeProprietaryVulnerability()
+        with person_logged_in(self.factory.makePerson()):
+            self.assertFalse(check_permission("launchpad.View", vulnerability))
+
+    def test_user_can_view_shared_non_public_vulnerability(self):
+        person = self.factory.makePerson()
+        vulnerability = self.makeProprietaryVulnerability()
+        grant_access_to_non_public_vulnerability(vulnerability, person)
+        with person_logged_in(person):
+            self.assertTrue(check_permission("launchpad.View", vulnerability))
+
     def test_admin_permissions(self):
         with admin_logged_in():
             self.assertTrue(
@@ -82,12 +425,22 @@ class TestVulnerability(TestCaseWithFactory):
 
     def test_anonymous_permissions(self):
         with anonymous_logged_in():
-            self.assertFalse(
+            self.assertTrue(
                 check_permission("launchpad.View", self.vulnerability)
             )
             self.assertFalse(
                 check_permission("launchpad.Edit", self.vulnerability)
             )
+        distribution = self.factory.makeDistribution(
+            specification_sharing_policy=SpecificationSharingPolicy.PROPRIETARY
+        )
+        vulnerability = self.factory.makeVulnerability(
+            distribution=distribution,
+            information_type=InformationType.PROPRIETARY,
+        )
+        with anonymous_logged_in():
+            self.assertFalse(check_permission("launchpad.View", vulnerability))
+            self.assertFalse(check_permission("launchpad.Edit", vulnerability))
 
     def test_edit_vulnerability_security_admin(self):
         person = self.factory.makePerson()
@@ -174,3 +527,63 @@ class TestVulnerabilitySet(TestCaseWithFactory):
             initial_number,
             (len(vulnerability1.bugs) + len(vulnerability2.bugs)),
         )
+
+    @patch("lp.bugs.model.vulnerability.reconcile_access_for_artifacts")
+    def test_access_reconciled_after_creating_a_vulnerability(
+        self, mock_reconcile_method
+    ):
+        distribution = self.factory.makeDistribution()
+        creator = self.factory.makePerson()
+        vulnerability = getUtility(IVulnerabilitySet).new(
+            distribution=distribution,
+            status=VulnerabilityStatus.NEEDS_TRIAGE,
+            importance=BugTaskImportance.UNDECIDED,
+            creator=creator,
+        )
+        mock_reconcile_method.assert_called_with(
+            [vulnerability], vulnerability.information_type, [distribution]
+        )
+
+    def test_findByIds(self):
+        person = self.factory.makePerson()
+        proprietary_distribution = self.factory.makeDistribution(
+            specification_sharing_policy=(
+                SpecificationSharingPolicy.PROPRIETARY
+            )
+        )
+        vulnerability1 = removeSecurityProxy(self.factory.makeVulnerability())
+        vulnerability2 = removeSecurityProxy(
+            self.factory.makeVulnerability(
+                distribution=proprietary_distribution,
+                information_type=InformationType.PROPRIETARY,
+            )
+        )
+        vulnerability3 = removeSecurityProxy(
+            self.factory.makeVulnerability(
+                distribution=proprietary_distribution,
+                information_type=InformationType.PROPRIETARY,
+            )
+        )
+        grant_access_to_non_public_vulnerability(vulnerability2, person)
+        vulnerability_set = getUtility(IVulnerabilitySet)
+        self.assertContentEqual(
+            {vulnerability1, vulnerability2, vulnerability3},
+            vulnerability_set.findByIds(
+                [
+                    vulnerability1.id,
+                    vulnerability2.id,
+                    vulnerability3.id,
+                ]
+            ),
+        )
+        self.assertContentEqual(
+            {vulnerability1, vulnerability2},
+            vulnerability_set.findByIds(
+                [
+                    vulnerability1.id,
+                    vulnerability2.id,
+                    vulnerability3.id,
+                ],
+                visible_by_user=person,
+            ),
+        )
diff --git a/lib/lp/bugs/model/tests/test_vulnerabilitysubscription.py b/lib/lp/bugs/model/tests/test_vulnerabilitysubscription.py
new file mode 100644
index 0000000..14a611d
--- /dev/null
+++ b/lib/lp/bugs/model/tests/test_vulnerabilitysubscription.py
@@ -0,0 +1,68 @@
+# Copyright 2022 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for the VulnerabilitySubscription model."""
+from zope.component import getUtility
+
+from lp.app.interfaces.launchpad import ILaunchpadCelebrities
+from lp.bugs.interfaces.vulnerabilitysubscription import (
+    IVulnerabilitySubscription,
+)
+from lp.bugs.model.vulnerabilitysubscription import VulnerabilitySubscription
+from lp.testing import TestCaseWithFactory, person_logged_in, verifyObject
+from lp.testing.layers import DatabaseFunctionalLayer
+
+
+class TestVulnerabilitySubscription(TestCaseWithFactory):
+
+    layer = DatabaseFunctionalLayer
+
+    def test_VulnerabilitySubscription_implements_its_interface(self):
+        person = self.factory.makePerson()
+        vulnerability = self.factory.makeVulnerability()
+        subscription = VulnerabilitySubscription(vulnerability, person, person)
+        self.assertTrue(verifyObject(IVulnerabilitySubscription, subscription))
+
+    def test_canBeUnsubscribedByUser(self):
+        person = self.factory.makePerson()
+        person2 = self.factory.makePerson()
+        person3 = self.factory.makePerson()
+        person4 = self.factory.makePerson()
+        person5 = self.factory.makePerson()
+        creator = self.factory.makeTeam(members=[person3])
+        distribution = self.factory.makeDistribution()
+        with person_logged_in(distribution.owner):
+            distribution.security_admin = self.factory.makeTeam(
+                members=[person5]
+            )
+        vulnerability = self.factory.makeVulnerability(
+            creator=creator, distribution=distribution
+        )
+        subscription = VulnerabilitySubscription(
+            vulnerability, person, person4
+        )
+        # A user account is needed to unsubscribe a subscriber.
+        self.assertFalse(subscription.canBeUnsubscribedByUser(None))
+        # A user can unsubscribe from their own subscription.
+        self.assertTrue(subscription.canBeUnsubscribedByUser(person))
+        # A random user cannot unsubscribe a subscriber.
+        self.assertFalse(subscription.canBeUnsubscribedByUser(person2))
+        # A user in the vulnerability's creator team can unsubscribe
+        # a subscriber.
+        self.assertTrue(subscription.canBeUnsubscribedByUser(person3))
+        # The vulnerability's distribution's owner can unsubscribe
+        # a subscriber.
+        self.assertTrue(
+            subscription.canBeUnsubscribedByUser(distribution.owner)
+        )
+        # The vulnerability's distribution's security admins can unsubscribe
+        # a subscriber
+        self.assertTrue(subscription.canBeUnsubscribedByUser(person5))
+        # The person who subscribed the subscriber can unsubscribe them.
+        self.assertTrue(subscription.canBeUnsubscribedByUser(person4))
+        # Admins can unsubscribe the subscriber.
+        self.assertTrue(
+            subscription.canBeUnsubscribedByUser(
+                getUtility(ILaunchpadCelebrities).admin.teamowner
+            )
+        )
diff --git a/lib/lp/bugs/model/vulnerability.py b/lib/lp/bugs/model/vulnerability.py
index 25ebea9..30cabf4 100644
--- a/lib/lp/bugs/model/vulnerability.py
+++ b/lib/lp/bugs/model/vulnerability.py
@@ -9,11 +9,18 @@ __all__ = [
 import operator
 
 import pytz
+from storm.expr import SQL, Coalesce, Join, Or, Select
 from storm.locals import DateTime, Int, Reference, Unicode
+from storm.store import Store
 from zope.component import getUtility
 from zope.interface import implementer
 
-from lp.app.enums import InformationType
+from lp.app.enums import PUBLIC_INFORMATION_TYPES, InformationType
+from lp.app.errors import (
+    SubscriptionPrivacyViolation,
+    UserCannotUnsubscribePerson,
+)
+from lp.app.interfaces.services import IService
 from lp.app.model.launchpad import InformationTypeMixin
 from lp.bugs.enums import VulnerabilityStatus
 from lp.bugs.interfaces.buglink import IBugLinkTarget
@@ -27,11 +34,21 @@ from lp.bugs.interfaces.vulnerability import (
 )
 from lp.bugs.model.bug import Bug
 from lp.bugs.model.buglinktarget import BugLinkTargetMixin
+from lp.bugs.model.vulnerabilitysubscription import VulnerabilitySubscription
+from lp.registry.interfaces.accesspolicy import (
+    IAccessArtifactGrantSource,
+    IAccessArtifactSource,
+)
+from lp.registry.interfaces.role import IPersonRoles
+from lp.registry.model.accesspolicy import reconcile_access_for_artifacts
+from lp.registry.model.person import Person
+from lp.registry.model.teammembership import TeamParticipation
 from lp.services.database import bulk
 from lp.services.database.constants import UTC_NOW
 from lp.services.database.enumcol import DBEnum
 from lp.services.database.interfaces import IStore
 from lp.services.database.stormbase import StormBase
+from lp.services.database.stormexpr import Array, ArrayAgg, ArrayIntersects
 from lp.services.xref.interfaces import IXRefSet
 
 
@@ -66,7 +83,7 @@ class Vulnerability(StormBase, BugLinkTargetMixin, InformationTypeMixin):
         name="importance_explanation", allow_none=True
     )
 
-    information_type = DBEnum(
+    _information_type = DBEnum(
         enum=InformationType,
         default=InformationType.PUBLIC,
         allow_none=False,
@@ -103,7 +120,11 @@ class Vulnerability(StormBase, BugLinkTargetMixin, InformationTypeMixin):
         self.cve = cve
         self.status = status
         self.importance = importance
-        self.information_type = information_type
+        # Set `self._information_type` rather than `self.information_type`
+        # to avoid the call to `self._reconcileAccess` while constructing
+        # the instance. `VulnerabilitySet.new` deals with calling
+        # `_reconcileAccess` once the instance has been fully constructed.
+        self._information_type = information_type
         self.creator = creator
         self.description = description
         self.notes = notes
@@ -138,6 +159,129 @@ class Vulnerability(StormBase, BugLinkTargetMixin, InformationTypeMixin):
             {("vulnerability", str(self.id)): [("bug", str(bug.id))]}
         )
 
+    @property
+    def information_type(self):
+        return self._information_type
+
+    @information_type.setter
+    def information_type(self, information_type):
+        if information_type != self._information_type:
+            self._information_type = information_type
+            self._reconcileAccess()
+
+    def visibleByUser(self, user):
+        """See `IVulnerability`."""
+        if self.information_type in PUBLIC_INFORMATION_TYPES:
+            return True
+        if user is None:
+            return False
+        return (
+            not IStore(self)
+            .find(
+                Vulnerability,
+                Vulnerability.id == self.id,
+                get_vulnerability_privacy_filter(user),
+            )
+            .is_empty()
+        )
+
+    def _reconcileAccess(self):
+        """Reconcile the vulnerability's sharing information.
+        Takes the privacy ad distribution and makes the related AccessArtifact
+        and AccessPolicyArtifacts match."""
+        reconcile_access_for_artifacts(
+            [self], self.information_type, [self.distribution]
+        )
+
+    @property
+    def subscriptions(self):
+        return Store.of(self).find(
+            VulnerabilitySubscription,
+            VulnerabilitySubscription.vulnerability == self,
+        )
+
+    @property
+    def subscribers(self):
+        return Store.of(self).find(
+            Person,
+            VulnerabilitySubscription.person_id == Person.id,
+            VulnerabilitySubscription.vulnerability == self,
+        )
+
+    def getSubscription(self, person):
+        """Returns the person's subscription or None."""
+        if person is None:
+            return None
+        return (
+            Store.of(self)
+            .find(
+                VulnerabilitySubscription,
+                VulnerabilitySubscription.person == person,
+                VulnerabilitySubscription.vulnerability == self,
+            )
+            .one()
+        )
+
+    def hasSubscription(self, person):
+        """See `IVulnerability`."""
+        return self.getSubscription(person) is not None
+
+    def userCanBeSubscribed(self, person):
+        """See `IVulnerability`."""
+        return not (
+            self.information_type not in PUBLIC_INFORMATION_TYPES
+            and person.is_team
+            and person.anyone_can_join()
+        )
+
+    def subscribe(self, person, subscribed_by, ignore_permissions=False):
+        """See `IVulnerability`."""
+        if not self.userCanBeSubscribed(person):
+            raise SubscriptionPrivacyViolation(
+                "Open and delegated teams cannot be subscribed to private"
+                "vulnerabilities."
+            )
+        if self.getSubscription(person) is None:
+            subscription = VulnerabilitySubscription(
+                person=person, vulnerability=self, subscribed_by=subscribed_by
+            )
+            Store.of(subscription).flush()
+        service = getUtility(IService, "sharing")
+        vulnerabilities = service.getVisibleArtifacts(
+            person, vulnerabilities=[self], ignore_permissions=True
+        )["vulnerabilities"]
+        if not vulnerabilities:
+            service.ensureAccessGrants(
+                [person],
+                subscribed_by,
+                vulnerabilities=[self],
+                ignore_permissions=ignore_permissions,
+            )
+
+    def unsubscribe(self, person, unsubscribed_by, ignore_permissions=False):
+        """See `IVulnerability`."""
+        subscription = self.getSubscription(person)
+        if subscription is None:
+            return
+        if (
+            not ignore_permissions
+            and not subscription.canBeUnsubscribedByUser(unsubscribed_by)
+        ):
+            raise UserCannotUnsubscribePerson(
+                "%s does not have permission to unsubscribe %s"
+                % (
+                    unsubscribed_by.displayname,
+                    person.displayname,
+                )
+            )
+        artifact = getUtility(IAccessArtifactSource).find([self])
+        getUtility(IAccessArtifactGrantSource).revokeByArtifact(
+            artifact, [person]
+        )
+        store = Store.of(subscription)
+        store.remove(subscription)
+        IStore(self).flush()
+
 
 @implementer(IVulnerabilitySet)
 class VulnerabilitySet:
@@ -171,9 +315,17 @@ class VulnerabilitySet:
             date_made_public=date_made_public,
         )
         store.add(vulnerability)
+        vulnerability._reconcileAccess()
         store.flush()
         return vulnerability
 
+    def findByIds(self, vulnerability_ids, visible_by_user=None):
+        """See `IVulnerabilitySet`."""
+        clauses = [Vulnerability.id.is_in(vulnerability_ids)]
+        if visible_by_user is not None:
+            clauses.append(get_vulnerability_privacy_filter(visible_by_user))
+        return IStore(Vulnerability).find(Vulnerability, *clauses)
+
 
 @implementer(IVulnerabilityActivity)
 class VulnerabilityActivity(StormBase):
@@ -233,3 +385,62 @@ class VulnerabilityActivitySet:
         )
         store.add(activity)
         return activity
+
+
+def get_vulnerability_privacy_filter(user):
+    """Returns the filter for all vulnerabilities that the given user has
+    access to, including private vulnerabilities where the user has proper
+    permission.
+
+    :param user: An IPerson, or a class attribute tha references an IPerson
+                 in the database.
+    :return: A Storm condition.
+    """
+    from lp.registry.model.accesspolicy import AccessPolicyGrant
+
+    public_vulnerabilities_filter = Vulnerability._information_type.is_in(
+        PUBLIC_INFORMATION_TYPES
+    )
+
+    if user is None:
+        return [public_vulnerabilities_filter]
+    elif IPersonRoles.providedBy(user):
+        user = user.person
+
+    artifact_grant_query = Coalesce(
+        ArrayIntersects(
+            SQL("Vulnerability.access_grants"),
+            Select(
+                ArrayAgg(TeamParticipation.teamID),
+                tables=TeamParticipation,
+                where=(TeamParticipation.person == user),
+            ),
+        ),
+        False,
+    )
+
+    policy_grant_query = Coalesce(
+        ArrayIntersects(
+            Array(SQL("Vulnerability.access_policy")),
+            Select(
+                ArrayAgg(AccessPolicyGrant.policy_id),
+                tables=(
+                    AccessPolicyGrant,
+                    Join(
+                        TeamParticipation,
+                        TeamParticipation.teamID
+                        == AccessPolicyGrant.grantee_id,
+                    ),
+                ),
+                where=(TeamParticipation.person == user),
+            ),
+        ),
+        False,
+    )
+    return [
+        Or(
+            public_vulnerabilities_filter,
+            artifact_grant_query,
+            policy_grant_query,
+        )
+    ]
diff --git a/lib/lp/bugs/model/vulnerabilitysubscription.py b/lib/lp/bugs/model/vulnerabilitysubscription.py
new file mode 100644
index 0000000..2e46b93
--- /dev/null
+++ b/lib/lp/bugs/model/vulnerabilitysubscription.py
@@ -0,0 +1,60 @@
+# Copyright 2022 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Vulnerability subscription model."""
+
+__all__ = ["VulnerabilitySubscription"]
+
+import pytz
+from storm.properties import DateTime, Int
+from storm.references import Reference
+from zope.interface import implementer
+
+from lp.bugs.interfaces.vulnerabilitysubscription import (
+    IVulnerabilitySubscription,
+)
+from lp.registry.interfaces.person import validate_person
+from lp.registry.interfaces.role import IPersonRoles
+from lp.services.database.constants import UTC_NOW
+from lp.services.database.stormbase import StormBase
+
+
+@implementer(IVulnerabilitySubscription)
+class VulnerabilitySubscription(StormBase):
+    """A relationship between a person and a vulnerability."""
+
+    __storm_table__ = "VulnerabilitySubscription"
+
+    id = Int(primary=True)
+
+    person_id = Int("person", allow_none=False, validator=validate_person)
+    person = Reference(person_id, "Person.id")
+
+    vulnerability_id = Int("vulnerability", allow_none=False)
+    vulnerability = Reference(vulnerability_id, "Vulnerability.id")
+
+    date_created = DateTime(allow_none=False, default=UTC_NOW, tzinfo=pytz.UTC)
+
+    subscribed_by_id = Int(
+        "subscribed_by", allow_none=False, validator=validate_person
+    )
+    subscribed_by = Reference(subscribed_by_id, "Person.id")
+
+    def __init__(self, vulnerability, person, subscribed_by):
+        super().__init__()
+        self.vulnerability = vulnerability
+        self.person = person
+        self.subscribed_by = subscribed_by
+
+    def canBeUnsubscribedByUser(self, user):
+        """See `IVulnerabilitySubscription`."""
+        if user is None:
+            return False
+        return (
+            user.inTeam(self.vulnerability.creator)
+            or user.inTeam(self.vulnerability.distribution.owner)
+            or user.inTeam(self.vulnerability.distribution.security_admin)
+            or user.inTeam(self.person)
+            or user.inTeam(self.subscribed_by)
+            or IPersonRoles(user).in_admin
+        )
diff --git a/lib/lp/bugs/security.py b/lib/lp/bugs/security.py
index 6141de3..11d74e9 100644
--- a/lib/lp/bugs/security.py
+++ b/lib/lp/bugs/security.py
@@ -417,6 +417,21 @@ class EditBugSubscriptionFilter(AuthorizationBase):
         return user.inTeam(self.obj.structural_subscription.subscriber)
 
 
+class ViewVulnerability(AnonymousAuthorization):
+    """Anyone can view public vulnerabilities, but only subscribers
+    can view private ones.
+    """
+
+    permission = "launchpad.View"
+    usedfor = IVulnerability
+
+    def checkUnauthenticated(self):
+        return self.obj.visibleByUser(None)
+
+    def checkAuthenticated(self, user):
+        return self.obj.visibleByUser(user.person)
+
+
 class EditVulnerability(DelegatedAuthorization):
     """The security admins of a distribution should be able to edit
     vulnerabilities in that distribution."""
diff --git a/lib/lp/registry/browser/pillar.py b/lib/lp/registry/browser/pillar.py
index 4f776e9..a5e5eab 100644
--- a/lib/lp/registry/browser/pillar.py
+++ b/lib/lp/registry/browser/pillar.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2021 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2022 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Common views for objects that implement `IPillar`."""
@@ -445,10 +445,13 @@ class PillarPersonSharingView(LaunchpadView):
         spec_data = self._build_specification_template_data(
             self.specifications, request
         )
-        snap_data = self._build_ocirecipe_template_data(self.snaps, request)
+        snap_data = self._build_snap_template_data(self.snaps, request)
         ocirecipe_data = self._build_ocirecipe_template_data(
             self.ocirecipes, request
         )
+        vulnerability_data = self._build_vulnerability_template_data(
+            self.vulnerabilities, request
+        )
         grantee_data = {
             "displayname": self.person.displayname,
             "self_link": absoluteURL(self.person, request),
@@ -462,6 +465,7 @@ class PillarPersonSharingView(LaunchpadView):
         cache.objects["specifications"] = spec_data
         cache.objects["snaps"] = snap_data
         cache.objects["ocirecipes"] = ocirecipe_data
+        cache.objects["vulnerabilities"] = vulnerability_data
 
     def _loadSharedArtifacts(self):
         # As a concrete can by linked via more than one policy, we use sets to
@@ -475,6 +479,7 @@ class PillarPersonSharingView(LaunchpadView):
         self.snaps = artifacts["snaps"]
         self.specifications = artifacts["specifications"]
         self.ocirecipes = artifacts["ocirecipes"]
+        self.vulnerabilities = artifacts["vulnerabilities"]
 
         bug_ids = {bugtask.bug.id for bugtask in self.bugtasks}
         self.shared_bugs_count = len(bug_ids)
@@ -483,6 +488,7 @@ class PillarPersonSharingView(LaunchpadView):
         self.shared_snaps_count = len(self.snaps)
         self.shared_specifications_count = len(self.specifications)
         self.shared_ocirecipe_count = len(self.ocirecipes)
+        self.shared_vulnerabilities_count = len(self.vulnerabilities)
 
     def _build_specification_template_data(self, specs, request):
         spec_data = []
@@ -574,3 +580,19 @@ class PillarPersonSharingView(LaunchpadView):
                 )
             )
         return snap_data
+
+    def _build_vulnerability_template_data(self, vulnerabilities, request):
+        vulnerability_data = []
+        for vulnerability in vulnerabilities:
+            vulnerability_data.append(
+                dict(
+                    self_link=absoluteURL(vulnerability, request),
+                    web_link=canonical_url(
+                        vulnerability, path_only_if_possible=True
+                    ),
+                    name=vulnerability.cve.sequence,
+                    id=vulnerability.id,
+                    information_type=vulnerability.information_type.title,
+                )
+            )
+        return vulnerability_data
diff --git a/lib/lp/registry/interfaces/accesspolicy.py b/lib/lp/registry/interfaces/accesspolicy.py
index 233cc6c..1e31ea6 100644
--- a/lib/lp/registry/interfaces/accesspolicy.py
+++ b/lib/lp/registry/interfaces/accesspolicy.py
@@ -1,4 +1,4 @@
-# Copyright 2011-2021 Canonical Ltd.  This software is licensed under the
+# Copyright 2011-2022 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Interfaces for pillar and artifact access policies."""
@@ -34,6 +34,7 @@ class IAccessArtifact(Interface):
     snap_id = Attribute("snap_id")
     specification_id = Attribute("specification_id")
     ocirecipe_id = Attribute("ocirecipe_id")
+    vulnerability_id = Attribute("vulnerability_id")
 
 
 class IAccessArtifactGrant(Interface):
diff --git a/lib/lp/registry/interfaces/sharingservice.py b/lib/lp/registry/interfaces/sharingservice.py
index ee08d9a..b5fc95b 100644
--- a/lib/lp/registry/interfaces/sharingservice.py
+++ b/lib/lp/registry/interfaces/sharingservice.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2021 Canonical Ltd.  This software is licensed under the
+# Copyright 2012-2022 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Interfaces for sharing service."""
@@ -26,6 +26,7 @@ from lp.app.enums import InformationType
 from lp.app.interfaces.services import IService
 from lp.blueprints.interfaces.specification import ISpecification
 from lp.bugs.interfaces.bug import IBug
+from lp.bugs.interfaces.vulnerability import IVulnerability
 from lp.code.interfaces.branch import IBranch
 from lp.code.interfaces.gitrepository import IGitRepository
 from lp.oci.interfaces.ocirecipe import IOCIRecipe
@@ -192,6 +193,14 @@ class ISharingService(IService):
         :return: a collection of OCI recipes.
         """
 
+    def getSharedVulnerabilities(pillar, person, user):
+        """Return the vulnerabilities shared between the pillar and person.
+
+        :param user: the user making the request. Only the vulnerabilities
+            visible to the user will be included in the result.
+        :param: a collection of vulnerabilities.
+        """
+
     def getVisibleArtifacts(
         person,
         bugs=None,
@@ -200,6 +209,7 @@ class ISharingService(IService):
         snaps=None,
         specifications=None,
         ocirecipes=None,
+        vulnerabilities=None,
     ):
         """Return the artifacts shared with person.
 
@@ -216,6 +226,8 @@ class ISharingService(IService):
             person has access.
         :param ocirecipes: the OCI recipes to check for which a person
             has access.
+        :param vulnerabilities: the vulnerabilities to check for which person
+            has access.
         :return: a collection of artifacts the person can see.
         """
 
@@ -375,6 +387,11 @@ class ISharingService(IService):
             title=_("OCI recipes"),
             required=False,
         ),
+        vulnerabilities=List(
+            Reference(schema=IVulnerability),
+            title=_("Vulnerabilities"),
+            required=False,
+        ),
     )
     @operation_for_version("devel")
     def revokeAccessGrants(
@@ -387,6 +404,7 @@ class ISharingService(IService):
         snaps=None,
         specifications=None,
         ocirecipes=None,
+        vulnerabilities=None,
     ):
         """Remove a grantee's access to the specified artifacts.
 
@@ -399,6 +417,7 @@ class ISharingService(IService):
         :param snaps: The snap recipes for which to revoke access
         :param specifications: the specifications for which to revoke access
         :param ocirecipes: The OCI recipes for which to revoke access
+        :param vulnerabilities: The vulnerabilities for which to revoke access
         """
 
     @export_write_operation()
@@ -423,6 +442,11 @@ class ISharingService(IService):
             title=_("OCI recipes"),
             required=False,
         ),
+        vulnerabilities=List(
+            Reference(schema=IVulnerability),
+            title=_("Vulnerabilities"),
+            required=False,
+        ),
     )
     @operation_for_version("devel")
     def ensureAccessGrants(
@@ -434,6 +458,7 @@ class ISharingService(IService):
         snaps=None,
         specifications=None,
         ocirecipes=None,
+        vulnerabilities=None,
     ):
         """Ensure a grantee has an access grant to the specified artifacts.
 
@@ -445,6 +470,7 @@ class ISharingService(IService):
         :param snaps: the snap recipes for which to grant access
         :param specifications: the specifications for which to grant access
         :param ocirecipes: the OCI recipes for which to grant access
+        :param vulnerabilities: the vulnerabilities for which to grant access
         """
 
     @export_write_operation()
diff --git a/lib/lp/registry/model/accesspolicy.py b/lib/lp/registry/model/accesspolicy.py
index cb881b9..dd47d69 100644
--- a/lib/lp/registry/model/accesspolicy.py
+++ b/lib/lp/registry/model/accesspolicy.py
@@ -1,4 +1,4 @@
-# Copyright 2011-2021 Canonical Ltd.  This software is licensed under the
+# Copyright 2011-2022 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Model classes for pillar and artifact access policies."""
@@ -95,6 +95,8 @@ class AccessArtifact(StormBase):
     specification = Reference(specification_id, "Specification.id")
     ocirecipe_id = Int(name="ocirecipe")
     ocirecipe = Reference(ocirecipe_id, "OCIRecipe.id")
+    vulnerability_id = Int(name="vulnerability")
+    vulnerability = Reference(vulnerability_id, "Vulnerability.id")
 
     @property
     def concrete_artifact(self):
@@ -107,6 +109,7 @@ class AccessArtifact(StormBase):
     def _constraintForConcrete(cls, concrete_artifact):
         from lp.blueprints.interfaces.specification import ISpecification
         from lp.bugs.interfaces.bug import IBug
+        from lp.bugs.interfaces.vulnerability import IVulnerability
         from lp.code.interfaces.branch import IBranch
         from lp.code.interfaces.gitrepository import IGitRepository
         from lp.oci.interfaces.ocirecipe import IOCIRecipe
@@ -124,6 +127,8 @@ class AccessArtifact(StormBase):
             col = cls.specification
         elif IOCIRecipe.providedBy(concrete_artifact):
             col = cls.ocirecipe
+        elif IVulnerability.providedBy(concrete_artifact):
+            col = cls.vulnerability
         else:
             raise ValueError("%r is not a valid artifact" % concrete_artifact)
         return col == concrete_artifact
@@ -146,6 +151,7 @@ class AccessArtifact(StormBase):
         """See `IAccessArtifactSource`."""
         from lp.blueprints.interfaces.specification import ISpecification
         from lp.bugs.interfaces.bug import IBug
+        from lp.bugs.interfaces.vulnerability import IVulnerability
         from lp.code.interfaces.branch import IBranch
         from lp.code.interfaces.gitrepository import IGitRepository
         from lp.oci.interfaces.ocirecipe import IOCIRecipe
@@ -163,17 +169,33 @@ class AccessArtifact(StormBase):
         insert_values = []
         for concrete in needed:
             if IBug.providedBy(concrete):
-                insert_values.append((concrete, None, None, None, None, None))
+                insert_values.append(
+                    (concrete, None, None, None, None, None, None)
+                )
             elif IBranch.providedBy(concrete):
-                insert_values.append((None, concrete, None, None, None, None))
+                insert_values.append(
+                    (None, concrete, None, None, None, None, None)
+                )
             elif IGitRepository.providedBy(concrete):
-                insert_values.append((None, None, concrete, None, None, None))
+                insert_values.append(
+                    (None, None, concrete, None, None, None, None)
+                )
             elif ISnap.providedBy(concrete):
-                insert_values.append((None, None, None, concrete, None, None))
+                insert_values.append(
+                    (None, None, None, concrete, None, None, None)
+                )
             elif ISpecification.providedBy(concrete):
-                insert_values.append((None, None, None, None, concrete, None))
+                insert_values.append(
+                    (None, None, None, None, concrete, None, None)
+                )
             elif IOCIRecipe.providedBy(concrete):
-                insert_values.append((None, None, None, None, None, concrete))
+                insert_values.append(
+                    (None, None, None, None, None, concrete, None)
+                )
+            elif IVulnerability.providedBy(concrete):
+                insert_values.append(
+                    (None, None, None, None, None, None, concrete)
+                )
             else:
                 raise ValueError("%r is not a supported artifact" % concrete)
         columns = (
@@ -183,6 +205,7 @@ class AccessArtifact(StormBase):
             cls.snap,
             cls.specification,
             cls.ocirecipe,
+            cls.vulnerability,
         )
         new = create(columns, insert_values, get_objects=True)
         return list(existing) + new
diff --git a/lib/lp/registry/personmerge.py b/lib/lp/registry/personmerge.py
index 66b19b3..7e2dda3 100644
--- a/lib/lp/registry/personmerge.py
+++ b/lib/lp/registry/personmerge.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2021 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2022 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Person/team merger implementation."""
@@ -895,7 +895,12 @@ def _mergeOCIRecipeSubscription(cur, from_id, to_id):
 
 def _mergeVulnerabilitySubscription(cur, from_id, to_id):
     # Update only the VulnerabilitySubscription that will not conflict.
+<<<<<<< lib/lp/registry/personmerge.py
     cur.execute('''
+=======
+    cur.execute(
+        """
+>>>>>>> lib/lp/registry/personmerge.py
         UPDATE VulnerabilitySubscription
         SET person=%(to_id)d
         WHERE person=%(from_id)d AND vulnerability NOT IN
@@ -904,11 +909,24 @@ def _mergeVulnerabilitySubscription(cur, from_id, to_id):
             FROM VulnerabilitySubscription
             WHERE person = %(to_id)d
             )
+<<<<<<< lib/lp/registry/personmerge.py
     ''' % vars())
     # and delete those left over.
     cur.execute('''
         DELETE FROM VulnerabilitySubscription WHERE person=%(from_id)d
         ''' % vars())
+=======
+    """
+        % vars()
+    )
+    # and delete those left over.
+    cur.execute(
+        """
+        DELETE FROM VulnerabilitySubscription WHERE person=%(from_id)d
+        """
+        % vars()
+    )
+>>>>>>> lib/lp/registry/personmerge.py
 
 
 def _mergeCharmRecipe(cur, from_person, to_person):
@@ -1181,7 +1199,11 @@ def merge_people(from_person, to_person, reviewer, delete=False):
     skip.append(("charmrecipe", "owner"))
 
     _mergeVulnerabilitySubscription(cur, from_id, to_id)
+<<<<<<< lib/lp/registry/personmerge.py
     skip.append(('vulnerabilitysubscription', 'person'))
+=======
+    skip.append(("vulnerabilitysubscription", "person"))
+>>>>>>> lib/lp/registry/personmerge.py
 
     # Sanity check. If we have a reference that participates in a
     # UNIQUE index, it must have already been handled by this point.
diff --git a/lib/lp/registry/services/sharingservice.py b/lib/lp/registry/services/sharingservice.py
index 9dc9bdf..5548b4a 100644
--- a/lib/lp/registry/services/sharingservice.py
+++ b/lib/lp/registry/services/sharingservice.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2021 Canonical Ltd.  This software is licensed under the
+# Copyright 2012-2022 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Classes for pillar and artifact sharing service."""
@@ -35,6 +35,8 @@ from lp.app.enums import PRIVATE_INFORMATION_TYPES
 from lp.blueprints.model.specification import Specification
 from lp.bugs.interfaces.bugtask import IBugTaskSet
 from lp.bugs.interfaces.bugtasksearch import BugTaskSearchParams
+from lp.bugs.interfaces.vulnerability import IVulnerabilitySet
+from lp.bugs.model.vulnerability import Vulnerability
 from lp.code.interfaces.branchcollection import IAllBranches
 from lp.code.interfaces.gitcollection import IAllGitRepositories
 from lp.oci.interfaces.ocirecipe import IOCIRecipeSet
@@ -239,6 +241,7 @@ class SharingService:
         include_snaps=True,
         include_specifications=True,
         include_ocirecipes=True,
+        include_vulnerabilities=True,
     ):
         """See `ISharingService`."""
         bug_ids = set()
@@ -247,6 +250,7 @@ class SharingService:
         snap_ids = set()
         specification_ids = set()
         ocirecipe_ids = set()
+        vulnerability_ids = set()
         for artifact in self.getArtifactGrantsForPersonOnPillar(
             pillar, person
         ):
@@ -262,6 +266,8 @@ class SharingService:
                 specification_ids.add(artifact.specification_id)
             elif artifact.ocirecipe_id and include_ocirecipes:
                 ocirecipe_ids.add(artifact.ocirecipe_id)
+            elif artifact.vulnerability_id and include_vulnerabilities:
+                vulnerability_ids.add(artifact.vulnerability_id)
 
         # Load the bugs.
         bugtasks = []
@@ -295,6 +301,9 @@ class SharingService:
         ocirecipes = []
         if ocirecipe_ids:
             ocirecipes = load(OCIRecipe, ocirecipe_ids)
+        vulnerabilities = []
+        if vulnerability_ids:
+            vulnerabilities = load(Vulnerability, vulnerability_ids)
 
         return {
             "bugtasks": bugtasks,
@@ -303,6 +312,7 @@ class SharingService:
             "snaps": snaps,
             "specifications": specifications,
             "ocirecipes": ocirecipes,
+            "vulnerabilities": vulnerabilities,
         }
 
     @available_with_permission("launchpad.Driver", "pillar")
@@ -317,6 +327,7 @@ class SharingService:
             include_specifications=False,
             include_snaps=False,
             include_ocirecipes=False,
+            include_vulnerabilities=False,
         )
         return artifacts["bugtasks"]
 
@@ -332,6 +343,7 @@ class SharingService:
             include_specifications=False,
             include_snaps=False,
             include_ocirecipes=False,
+            include_vulnerabilities=False,
         )
         return artifacts["branches"]
 
@@ -347,6 +359,7 @@ class SharingService:
             include_specifications=False,
             include_snaps=False,
             include_ocirecipes=False,
+            include_vulnerabilities=False,
         )
         return artifacts["gitrepositories"]
 
@@ -362,6 +375,7 @@ class SharingService:
             include_gitrepositories=False,
             include_specifications=False,
             include_ocirecipes=False,
+            include_vulnerabilities=False,
         )
         return artifacts["snaps"]
 
@@ -377,6 +391,7 @@ class SharingService:
             include_gitrepositories=False,
             include_snaps=False,
             include_ocirecipes=False,
+            include_vulnerabilities=False,
         )
         return artifacts["specifications"]
 
@@ -392,9 +407,26 @@ class SharingService:
             include_gitrepositories=False,
             include_snaps=False,
             include_specifications=False,
+            include_vulnerabilities=False,
         )
         return artifacts["ocirecipes"]
 
+    @available_with_permission("launchpad.Driver", "pillar")
+    def getSharedVulnerabilities(self, pillar, person, user):
+        """See `ISharingService`."""
+        artifacts = self.getSharedArtifacts(
+            pillar,
+            person,
+            user,
+            include_bugs=False,
+            include_branches=False,
+            include_gitrepositories=False,
+            include_snaps=False,
+            include_specifications=False,
+            include_ocirecipes=False,
+        )
+        return artifacts["vulnerabilities"]
+
     def _getVisiblePrivateSpecificationIDs(self, person, specifications):
         store = Store.of(specifications[0])
         tables = (
@@ -447,6 +479,7 @@ class SharingService:
         specifications=None,
         ignore_permissions=False,
         ocirecipes=None,
+        vulnerabilities=None,
     ):
         """See `ISharingService`."""
         bug_ids = []
@@ -454,6 +487,7 @@ class SharingService:
         gitrepository_ids = []
         snap_ids = []
         ocirecipes_ids = []
+        vulnerability_ids = []
         for bug in bugs or []:
             if not ignore_permissions and not check_permission(
                 "launchpad.View", bug
@@ -489,6 +523,12 @@ class SharingService:
             ):
                 raise Unauthorized
             ocirecipes_ids.append(ocirecipe.id)
+        for vulnerability in vulnerabilities or []:
+            if not ignore_permissions and not check_permission(
+                "launchpad.View", vulnerability
+            ):
+                raise Unauthorized
+            vulnerability_ids.append(vulnerability.id)
 
         # Load the bugs.
         visible_bugs = []
@@ -547,6 +587,14 @@ class SharingService:
                 )
             )
 
+        visible_vulnerabilities = []
+        if vulnerabilities:
+            visible_vulnerabilities = list(
+                getUtility(IVulnerabilitySet).findByIds(
+                    vulnerability_ids, visible_by_user=person
+                )
+            )
+
         return {
             "bugs": visible_bugs,
             "branches": visible_branches,
@@ -554,6 +602,7 @@ class SharingService:
             "snaps": visible_snaps,
             "specifications": visible_specs,
             "ocirecipes": visible_ocirecipes,
+            "vulnerabilities": visible_vulnerabilities,
         }
 
     def getInvisibleArtifacts(
@@ -1056,6 +1105,7 @@ class SharingService:
         snaps=None,
         specifications=None,
         ocirecipes=None,
+        vulnerabilities=None,
         ignore_permissions=False,
     ):
         """See `ISharingService`."""
@@ -1073,6 +1123,8 @@ class SharingService:
             artifacts.extend(specifications)
         if ocirecipes:
             artifacts.extend(ocirecipes)
+        if vulnerabilities:
+            artifacts.extend(vulnerabilities)
         if not ignore_permissions:
             # The user needs to have launchpad.Edit permission on all supplied
             # bugs and branches or else we raise an Unauthorized exception.
diff --git a/lib/lp/registry/services/tests/test_sharingservice.py b/lib/lp/registry/services/tests/test_sharingservice.py
index 477db80..3b2cc0f 100644
--- a/lib/lp/registry/services/tests/test_sharingservice.py
+++ b/lib/lp/registry/services/tests/test_sharingservice.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2021 Canonical Ltd.  This software is licensed under the
+# Copyright 2012-2022 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 import six
@@ -45,7 +45,9 @@ from lp.registry.interfaces.person import TeamMembershipPolicy
 from lp.registry.interfaces.product import IProduct
 from lp.registry.interfaces.role import IPersonRoles
 from lp.registry.interfaces.sourcepackage import ISourcePackage
+from lp.registry.model.accesspolicy import AccessArtifact
 from lp.registry.services.sharingservice import SharingService
+from lp.services.database.interfaces import IStore
 from lp.services.job.tests import block_on_job
 from lp.services.webapp.interaction import ANONYMOUS
 from lp.services.webapp.interfaces import ILaunchpadRoot
@@ -90,6 +92,10 @@ class PillarScenariosMixin(WithScenarios):
         if self.pillar_factory_name != "makeProduct":
             self.skipTest("Only relevant for Product.")
 
+    def _skipUnlessDistribution(self):
+        if self.pillar_factory_name != "makeDistribution":
+            self.skipTest("Only relevant for Distribution.")
+
     def _makePillar(self, **kwargs):
         return getattr(self.factory, self.pillar_factory_name)(**kwargs)
 
@@ -1761,8 +1767,38 @@ class TestSharingService(
         getUtility(IService, "sharing").ensureAccessGrants(
             [grantee], pillar.owner, ocirecipes=ocirecipes[:9]
         )
+
         return bug_tasks, branches, gitrepositories, snaps, specs, ocirecipes
 
+    def create_shared_vulnerabilities(self, pillar, grantee, user):
+        ocirecipes = []
+        for x in range(0, 10):
+            ociproject = self.factory.makeOCIProject(
+                pillar=pillar, registrant=pillar.owner
+            )
+            ocirecipe = self.factory.makeOCIRecipe(
+                oci_project=ociproject,
+                owner=pillar.owner,
+                registrant=pillar.owner,
+                information_type=InformationType.USERDATA,
+            )
+            ocirecipes.append(ocirecipe)
+        getUtility(IService, "sharing").ensureAccessGrants(
+            [grantee], pillar.owner, ocirecipes=ocirecipes[:9]
+        )
+        vulnerabilities = []
+        for _ in range(10):
+            vulnerability = self.factory.makeVulnerability(
+                distribution=pillar,
+                creator=pillar.owner,
+                information_type=InformationType.USERDATA,
+            )
+            vulnerabilities.append(vulnerability)
+        getUtility(IService, "sharing").ensureAccessGrants(
+            [grantee], pillar.owner, vulnerabilities=vulnerabilities[:9]
+        )
+        return vulnerabilities
+
     def test_getSharedArtifacts(self):
         # Test the getSharedArtifacts method.
         owner = self.factory.makePerson()
@@ -1800,6 +1836,26 @@ class TestSharingService(
         self.assertContentEqual(specs[:9], shared_specs)
         self.assertContentEqual(ocirecipes[:9], shared_ocirecipes)
 
+    def test_getSharedArtifacts_distribution_vulnerabilities(self):
+        self._skipUnlessDistribution()
+        owner = self.factory.makePerson()
+        pillar = self._makePillar(
+            owner=owner,
+            specification_sharing_policy=(
+                SpecificationSharingPolicy.PUBLIC_OR_PROPRIETARY
+            ),
+        )
+        login_person(owner)
+        grantee = self.factory.makePerson()
+        user = self.factory.makePerson()
+        vulnerabilities = self.create_shared_vulnerabilities(
+            pillar, grantee, user
+        )
+        IStore(AccessArtifact).flush()
+        artifacts = self.service.getSharedArtifacts(pillar, grantee, user)
+        shared_vulnerabilities = artifacts["vulnerabilities"]
+        self.assertContentEqual(vulnerabilities[:9], shared_vulnerabilities)
+
     def _assert_getSharedPillars(self, pillar, who=None):
         # Test that 'who' can query the shared pillars for a grantee.
 
@@ -1999,9 +2055,14 @@ class TestSharingService(
         login_person(owner)
         grantee = self.factory.makePerson()
         user = self.factory.makePerson()
-        _, _, _, _, _, ocirecipes = self.create_shared_artifacts(
-            pillar, grantee, user
-        )
+        (
+            _,
+            _,
+            _,
+            _,
+            _,
+            ocirecipes,
+        ) = self.create_shared_artifacts(pillar, grantee, user)
 
         # Check the results.
         shared_ocirecipes = self.service.getSharedOCIRecipes(
@@ -2009,6 +2070,29 @@ class TestSharingService(
         )
         self.assertContentEqual(ocirecipes[:9], shared_ocirecipes)
 
+    def test_getSharedVulnerabilities(self):
+        # Test the getSharedVulnerabilities method.
+        self._skipUnlessDistribution()
+        owner = self.factory.makePerson()
+        pillar = self._makePillar(
+            owner=owner,
+            specification_sharing_policy=(
+                SpecificationSharingPolicy.PROPRIETARY
+            ),
+        )
+        login_person(owner)
+        grantee = self.factory.makePerson()
+        user = self.factory.makePerson()
+        vulnerabilities = self.create_shared_vulnerabilities(
+            pillar, grantee, user
+        )
+
+        # Check the results.
+        shared_vulnerabilities = self.service.getSharedVulnerabilities(
+            pillar, grantee, user
+        )
+        self.assertContentEqual(vulnerabilities[:9], shared_vulnerabilities)
+
     def test_getPeopleWithAccessBugs(self):
         # Test the getPeopleWithoutAccess method with bugs.
         owner = self.factory.makePerson()
diff --git a/lib/lp/registry/tests/test_personmerge.py b/lib/lp/registry/tests/test_personmerge.py
index 024697f..f67cc7c 100644
--- a/lib/lp/registry/tests/test_personmerge.py
+++ b/lib/lp/registry/tests/test_personmerge.py
@@ -25,6 +25,7 @@ from lp.charms.interfaces.charmrecipe import (
 )
 from lp.code.interfaces.gitrepository import IGitRepositorySet
 from lp.oci.interfaces.ocirecipe import OCI_RECIPE_ALLOW_CREATE, IOCIRecipeSet
+from lp.registry.enums import SpecificationSharingPolicy
 from lp.registry.interfaces.accesspolicy import (
     IAccessArtifactGrantSource,
     IAccessPolicyGrantSource,
@@ -48,6 +49,7 @@ from lp.services.identity.interfaces.emailaddress import (
     EmailAddressStatus,
     IEmailAddressSet,
 )
+from lp.services.webapp.authorization import check_permission
 from lp.snappy.interfaces.snap import SNAP_TESTING_FLAGS, ISnapSet
 from lp.soyuz.enums import ArchiveStatus
 from lp.soyuz.interfaces.livefs import LIVEFS_FEATURE_FLAG, ILiveFSSet
@@ -750,6 +752,71 @@ class TestMergePeople(TestCaseWithFactory, KarmaTestMixin):
         self.assertFalse(snap.visibleByUser(duplicate))
         self.assertIsNone(snap.getSubscription(duplicate))
 
+    def test_merge_vulnerability_subscriptions(self):
+        # Checks that merging users moves subscriptions.
+        duplicate = self.factory.makePerson()
+        mergee = self.factory.makePerson()
+        vulnerability = self.factory.makeVulnerability()
+        vulnerability.subscribe(duplicate, duplicate)
+        self.assertTrue(vulnerability.hasSubscription(duplicate))
+        self.assertFalse(vulnerability.hasSubscription(mergee))
+        self._do_premerge(duplicate, mergee)
+        login_person(mergee)
+        duplicate, mergee = self._do_merge(duplicate, mergee)
+        self.assertFalse(vulnerability.hasSubscription(duplicate))
+        self.assertTrue(vulnerability.hasSubscription(mergee))
+
+    def test_merge_vulnerability_subscriptions_mergee_already_subscribed(self):
+        duplicate = self.factory.makePerson()
+        mergee = self.factory.makePerson()
+        vulnerability = self.factory.makeVulnerability()
+        vulnerability.subscribe(duplicate, duplicate)
+        vulnerability.subscribe(mergee, mergee)
+        mergee_subscription = vulnerability.getSubscription(mergee)
+        self.assertTrue(vulnerability.hasSubscription(duplicate))
+        self.assertTrue(vulnerability.hasSubscription(mergee))
+        self._do_premerge(duplicate, mergee)
+        login_person(mergee)
+        duplicate, mergee = self._do_merge(duplicate, mergee)
+        self.assertFalse(vulnerability.hasSubscription(duplicate))
+        self.assertTrue(vulnerability.hasSubscription(mergee))
+        self.assertEqual(
+            mergee_subscription, vulnerability.getSubscription(mergee)
+        )
+
+    def test_merge_vulnerability_subscriptions_non_public_vulnerability(self):
+        duplicate = self.factory.makePerson()
+        mergee = self.factory.makePerson()
+        distribution = self.factory.makeDistribution(
+            specification_sharing_policy=SpecificationSharingPolicy.PROPRIETARY
+        )
+        vulnerability = self.factory.makeVulnerability(
+            distribution=distribution,
+            information_type=InformationType.PROPRIETARY,
+        )
+        with person_logged_in(distribution.owner):
+            vulnerability.subscribe(duplicate, distribution.owner)
+            self.assertTrue(vulnerability.hasSubscription(duplicate))
+            self.assertFalse(vulnerability.hasSubscription(mergee))
+
+        with person_logged_in(duplicate):
+            self.assertTrue(check_permission("launchpad.View", vulnerability))
+
+        with person_logged_in(mergee):
+            self.assertFalse(check_permission("launchpad.View", vulnerability))
+
+        self._do_premerge(duplicate, mergee)
+        login_person(mergee)
+        duplicate, mergee = self._do_merge(duplicate, mergee)
+        with person_logged_in(distribution.owner):
+            self.assertFalse(vulnerability.hasSubscription(duplicate))
+            self.assertTrue(vulnerability.hasSubscription(mergee))
+
+        # Cannot log in as the duplicate user any more to test that they do not
+        # have the permission.
+        with person_logged_in(mergee):
+            self.assertTrue(check_permission("launchpad.View", vulnerability))
+
     def test_merge_moves_oci_recipes(self):
         # When person/teams are merged, oci recipes owned by the from
         # person are moved.

Follow ups