launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #28901
[Merge] ~lgp171188/launchpad:vulnerability-subscription-model into launchpad:master
Guruprasad has proposed merging ~lgp171188/launchpad:vulnerability-subscription-model into launchpad:master.
Commit message:
WIP changes
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~lgp171188/launchpad/+git/launchpad/+merge/427274
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~lgp171188/launchpad:vulnerability-subscription-model into launchpad:master.
diff --git a/lib/lp/bugs/interfaces/vulnerability.py b/lib/lp/bugs/interfaces/vulnerability.py
index ac60ca2..6fe9abc 100644
--- a/lib/lp/bugs/interfaces/vulnerability.py
+++ b/lib/lp/bugs/interfaces/vulnerability.py
@@ -13,7 +13,7 @@ __all__ = [
from lazr.enum import DBEnumeratedType, DBItem
from lazr.restful.declarations import exported, exported_as_webservice_entry
-from lazr.restful.fields import Reference
+from lazr.restful.fields import CollectionField, Reference
from zope.interface import Interface
from zope.schema import Choice, Datetime, Int, TextLine
@@ -133,6 +133,35 @@ class IVulnerabilityView(Interface):
),
as_of="devel",
)
+ subscriptions = CollectionField(
+ title=_("VulnerabilitySubscriptions for this vulnerability."),
+ readonly=True,
+ value_type=Reference(Interface),
+ )
+
+ subscribers = CollectionField(
+ title=_("Persons subscribed to this vulnerability."),
+ readonly=True,
+ value_type=Reference(IPerson),
+ )
+
+ def visibleByUser(user):
+ """Can this user see this vulnerability?"""
+
+ def getSubscription(person):
+ """Returns the person's subscription for this vulnerability."""
+
+ def hasSubscription(person):
+ """Is this person subscribed to this vulnerability?"""
+
+ def userCanBeSubscribed(person):
+ """Can this person be subscribed to this vulnerability?"""
+
+ def subscribe(person, subscribed_by):
+ """Subscribe a person to this vulnerability."""
+
+ def unsubscribe(person, unsubscribed_by):
+ """Unsubscribe a person from this vulnerability."""
class IVulnerabilityEditableAttributes(Interface):
@@ -285,6 +314,9 @@ class IVulnerabilitySet(Interface):
:param date_made_public: The date this vulnerability was made public.
"""
+ def findByIds(vulnerability_ids, visible_by_user=None):
+ """Returns the vulnerabilities with the given IDs."""
+
class IVulnerabilityActivity(Interface):
"""`IVulnerabilityActivity` attributes that require launchpad.View."""
diff --git a/lib/lp/bugs/interfaces/vulnerabilitysubscription.py b/lib/lp/bugs/interfaces/vulnerabilitysubscription.py
new file mode 100644
index 0000000..06977d1
--- /dev/null
+++ b/lib/lp/bugs/interfaces/vulnerabilitysubscription.py
@@ -0,0 +1,43 @@
+# Copyright 2022 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Vulnerability subscription model."""
+
+__all__ = ["IVulnerabilitySubscription"]
+
+from lazr.restful.fields import Reference
+from zope.interface import Interface
+from zope.schema import Datetime, Int
+
+from lp import _
+from lp.bugs.interfaces.vulnerability import IVulnerability
+from lp.services.fields import PersonChoice
+
+
+class IVulnerabilitySubscription(Interface):
+ """A person subscription to a specific Vulnerability."""
+
+ id = Int(title=_("ID"), readonly=True, required=True)
+ person = PersonChoice(
+ title=_("Person"),
+ required=True,
+ vocabulary="ValidPersonOrTeam",
+ readonly=True,
+ description=_("The person subscribed to the related vulnerability."),
+ )
+ vulnerability = Reference(
+ IVulnerability, title=_("OCI recipe"), required=True, readonly=True
+ )
+ subscribed_by = PersonChoice(
+ title=("Subscribed by"),
+ required=True,
+ vocabulary="ValidPersonOrTeam",
+ readonly=True,
+ description=_("The person who created this subscription."),
+ )
+ date_created = Datetime(
+ title=_("Date subscribed"), required=True, readonly=True
+ )
+
+ def canBeUnsubscribedByUser(user):
+ """Can the user unsubscribe the subscriber from the vulnerability?"""
diff --git a/lib/lp/bugs/model/tests/test_vulnerability.py b/lib/lp/bugs/model/tests/test_vulnerability.py
index 32a902f..b5cd04a 100644
--- a/lib/lp/bugs/model/tests/test_vulnerability.py
+++ b/lib/lp/bugs/model/tests/test_vulnerability.py
@@ -2,16 +2,29 @@
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Tests for the vulnerability and related models."""
+from unittest.mock import patch
+
from testtools.matchers import MatchesStructure
from zope.component import getUtility
+from zope.security.proxy import removeSecurityProxy
+from lp.app.enums import InformationType
+from lp.app.errors import (
+ SubscriptionPrivacyViolation,
+ UserCannotUnsubscribePerson,
+)
+from lp.app.interfaces.launchpad import ILaunchpadCelebrities
+from lp.app.interfaces.services import IService
from lp.bugs.enums import VulnerabilityStatus
from lp.bugs.interfaces.buglink import IBugLinkTarget
+from lp.bugs.interfaces.bugtask import BugTaskImportance
from lp.bugs.interfaces.vulnerability import (
IVulnerability,
IVulnerabilitySet,
VulnerabilityChange,
)
+from lp.bugs.model.vulnerabilitysubscription import VulnerabilitySubscription
+from lp.registry.enums import SpecificationSharingPolicy, TeamMembershipPolicy
from lp.services.webapp.authorization import check_permission
from lp.testing import (
TestCaseWithFactory,
@@ -23,6 +36,16 @@ from lp.testing import (
from lp.testing.layers import DatabaseFunctionalLayer
+def grant_access_to_non_public_vulnerability(vulnerability, person):
+ distribution = removeSecurityProxy(vulnerability).distribution
+ with person_logged_in(distribution.owner):
+ getUtility(IService, "sharing").ensureAccessGrants(
+ [person],
+ distribution.owner,
+ vulnerabilities=[vulnerability],
+ )
+
+
class TestVulnerability(TestCaseWithFactory):
layer = DatabaseFunctionalLayer
@@ -34,6 +57,19 @@ class TestVulnerability(TestCaseWithFactory):
distribution=self.distribution
)
+ def makeProprietaryDistribution(self):
+ return self.factory.makeDistribution(
+ specification_sharing_policy=SpecificationSharingPolicy.PROPRIETARY
+ )
+
+ def makeProprietaryVulnerability(self, distribution=None):
+ if distribution is None:
+ distribution = self.makeProprietaryDistribution()
+ return self.factory.makeVulnerability(
+ distribution=distribution,
+ information_type=InformationType.PROPRIETARY,
+ )
+
def test_Vulnerability_implements_IVulnerability(self):
vulnerability = self.factory.makeVulnerability()
self.assertTrue(verifyObject(IVulnerability, vulnerability))
@@ -42,6 +78,301 @@ class TestVulnerability(TestCaseWithFactory):
vulnerability = self.factory.makeVulnerability()
self.assertTrue(verifyObject(IBugLinkTarget, vulnerability))
+ def test_Vulnerability_subscriptions_subscribers_empty_default(self):
+ vulnerability = self.factory.makeVulnerability()
+ self.assertEqual(0, vulnerability.subscribers.count())
+ self.assertEqual(0, vulnerability.subscriptions.count())
+
+ def test_public_vulnerability_visibleByUser(self):
+ vulnerability = self.factory.makeVulnerability()
+ self.assertTrue(vulnerability.visibleByUser(None))
+ self.assertTrue(vulnerability.visibleByUser(self.factory.makePerson()))
+
+ def test_non_public_vulnerability_visibleByUser(self):
+ # XXX lgp171188 - We use the 'Proprietary' sharing policy
+ # as an example non-public information_type and may have to
+ # add tests for other non-public types in the future.
+ distribution = self.makeProprietaryDistribution()
+ vulnerability = self.makeProprietaryVulnerability(distribution)
+ allowed_user = self.factory.makePerson()
+ grant_access_to_non_public_vulnerability(
+ vulnerability,
+ allowed_user,
+ )
+ with person_logged_in(distribution.owner):
+ self.assertFalse(vulnerability.visibleByUser(None))
+ self.assertFalse(
+ vulnerability.visibleByUser(self.factory.makePerson())
+ )
+ self.assertTrue(vulnerability.visibleByUser(allowed_user))
+
+ @patch("lp.bugs.model.vulnerability.reconcile_access_for_artifacts")
+ def test_setting_information_type_reconciles_access(
+ self, mock_reconcile_method
+ ):
+ vulnerability = self.factory.makeVulnerability()
+ self.assertEqual(
+ InformationType.PUBLIC, vulnerability.information_type
+ )
+ with person_logged_in(vulnerability.distribution.owner):
+ vulnerability.information_type = InformationType.PROPRIETARY
+ mock_reconcile_method.assert_called_with(
+ [vulnerability],
+ InformationType.PROPRIETARY,
+ [vulnerability.distribution],
+ )
+
+ def test_getSubscription_person_is_None(self):
+ self.assertIsNone(
+ self.factory.makeVulnerability().getSubscription(None)
+ )
+
+ def test_getSubscription_person_is_not_subscribed(self):
+ person = self.factory.makePerson()
+ vulnerability = self.factory.makeVulnerability()
+ self.assertIsNone(vulnerability.getSubscription(person))
+
+ def test_getSubscription_person_is_subscribed(self):
+ person = self.factory.makePerson()
+ vulnerability = self.factory.makeVulnerability()
+ subscription = VulnerabilitySubscription(
+ person=person, vulnerability=vulnerability, subscribed_by=person
+ )
+ self.assertEqual(subscription, vulnerability.getSubscription(person))
+
+ def test_hasSubscription(self):
+ person = self.factory.makePerson()
+ vulnerability = self.factory.makeVulnerability()
+ self.assertFalse(vulnerability.hasSubscription(person))
+ VulnerabilitySubscription(
+ person=person,
+ vulnerability=vulnerability,
+ subscribed_by=person,
+ )
+ self.assertTrue(vulnerability.hasSubscription(person))
+
+ def test_userCanBeSubscribed_person_public_vulnerability(self):
+ person = self.factory.makePerson()
+ vulnerability = self.factory.makeVulnerability()
+ self.assertTrue(vulnerability.userCanBeSubscribed(person))
+
+ def test_userCanBeSubscribed_person_non_public_vulnerability(self):
+ person = self.factory.makePerson()
+ vulnerability = removeSecurityProxy(
+ self.makeProprietaryVulnerability(
+ self.makeProprietaryDistribution()
+ )
+ )
+ self.assertTrue(vulnerability.userCanBeSubscribed(person))
+
+ def test_userCanBeSubscribed_public_vulnerability_non_open_team(self):
+ team = self.factory.makeTeam(
+ membership_policy=TeamMembershipPolicy.RESTRICTED
+ )
+ self.assertFalse(team.anyone_can_join())
+ vulnerability = self.factory.makeVulnerability()
+ self.assertTrue(vulnerability.userCanBeSubscribed(team))
+
+ def test_userCanBeSubscribed_non_public_vulnerability_non_open_team(self):
+ team = self.factory.makeTeam(
+ membership_policy=TeamMembershipPolicy.RESTRICTED,
+ )
+ vulnerability = removeSecurityProxy(
+ self.makeProprietaryVulnerability(
+ self.makeProprietaryDistribution()
+ )
+ )
+ self.assertTrue(vulnerability.userCanBeSubscribed(team))
+
+ def test_userCanBeSubscribed_non_public_vulnerability_open_team(self):
+ team = removeSecurityProxy(self.factory.makeTeam())
+ self.assertTrue(team.anyone_can_join())
+ vulnerability = removeSecurityProxy(
+ self.makeProprietaryVulnerability(
+ self.makeProprietaryDistribution()
+ )
+ )
+ self.assertFalse(vulnerability.userCanBeSubscribed(team))
+
+ def test_subscribe_person_to_vulnerability(self):
+ person = self.factory.makePerson()
+ vulnerability = self.factory.makeVulnerability()
+ vulnerability.subscribe(person, vulnerability.distribution.owner)
+ self.assertTrue(vulnerability.hasSubscription(person))
+
+ non_public_vulnerability = removeSecurityProxy(
+ self.makeProprietaryVulnerability()
+ )
+ distribution_owner = non_public_vulnerability.distribution.owner
+ with person_logged_in(distribution_owner):
+ non_public_vulnerability.subscribe(
+ person,
+ distribution_owner,
+ )
+ self.assertTrue(non_public_vulnerability.hasSubscription(person))
+
+ def test_subscribe_open_team_non_public_vulnerability(self):
+ open_team = self.factory.makeTeam()
+ vulnerability = removeSecurityProxy(
+ self.makeProprietaryVulnerability()
+ )
+ distribution_owner = vulnerability.distribution.owner
+ with person_logged_in(distribution_owner):
+ self.assertRaises(
+ SubscriptionPrivacyViolation,
+ vulnerability.subscribe,
+ open_team,
+ distribution_owner,
+ )
+
+ def test_subscribe_open_team_public_vulnerability(self):
+ open_team = self.factory.makeTeam()
+ vulnerability = self.factory.makeVulnerability()
+ self.assertFalse(vulnerability.hasSubscription(open_team))
+ vulnerability.subscribe(open_team, vulnerability.distribution.owner)
+ self.assertTrue(vulnerability.hasSubscription(open_team))
+
+ def test_subscribe_subscribing_a_person_with_existing_subscription(self):
+ person = self.factory.makePerson()
+ vulnerability = self.factory.makeVulnerability()
+ vulnerability.subscribe(
+ person,
+ vulnerability.distribution.owner,
+ )
+ self.assertTrue(vulnerability.hasSubscription(person))
+ vulnerability.subscribe(
+ person,
+ vulnerability.distribution.owner,
+ )
+ self.assertTrue(vulnerability.hasSubscription(person))
+
+ vulnerability2 = removeSecurityProxy(
+ self.makeProprietaryVulnerability()
+ )
+ distribution_owner = vulnerability2.distribution.owner
+ with person_logged_in(distribution_owner):
+ vulnerability2.subscribe(person, distribution_owner)
+ self.assertTrue(vulnerability2.hasSubscription(person))
+ vulnerability2.subscribe(person, distribution_owner)
+ self.assertTrue(vulnerability2.hasSubscription(person))
+
+ def test_subscribing_to_non_public_vulnerability_makes_it_visible(self):
+ person = self.factory.makePerson()
+ vulnerability = self.makeProprietaryVulnerability()
+ distribution_owner = removeSecurityProxy(
+ vulnerability
+ ).distribution.owner
+ with person_logged_in(person):
+ self.assertFalse(check_permission("launchpad.View", vulnerability))
+ self.assertFalse(check_permission("launchpad.Edit", vulnerability))
+
+ with person_logged_in(distribution_owner):
+ vulnerability.subscribe(person, distribution_owner)
+ with person_logged_in(person):
+ self.assertTrue(check_permission("launchpad.View", vulnerability))
+ self.assertFalse(check_permission("launchpad.Edit", vulnerability))
+
+ def test_subscribers_subscriptions(self):
+ person1 = self.factory.makePerson()
+ person2 = self.factory.makePerson()
+ vulnerability = self.factory.makeVulnerability()
+ self.assertEqual(0, vulnerability.subscriptions.count())
+ self.assertEqual(0, vulnerability.subscribers.count())
+ vulnerability.subscribe(person1, person1)
+ vulnerability.subscribe(person2, person2)
+ self.assertContentEqual({person1, person2}, vulnerability.subscribers)
+ self.assertEqual(2, vulnerability.subscriptions.count())
+
+ def test_unsubscribe_user_not_subscribed(self):
+ person = self.factory.makePerson()
+ vulnerability = self.factory.makeVulnerability()
+ self.assertFalse(vulnerability.hasSubscription(person))
+ vulnerability.unsubscribe(person, person)
+ self.assertFalse(vulnerability.hasSubscription(person))
+
+ def test_unsubscribe_random_user_cannot_unsubscribe_a_subscriber(self):
+ person = self.factory.makePerson()
+ person2 = self.factory.makePerson()
+ vulnerability = self.factory.makeVulnerability()
+ vulnerability.subscribe(person, person)
+ self.assertRaises(
+ UserCannotUnsubscribePerson,
+ vulnerability.unsubscribe,
+ person,
+ person2,
+ )
+
+ def test_unsubscribe_self(self):
+ person = self.factory.makePerson()
+ vulnerability = self.factory.makeVulnerability()
+ vulnerability.subscribe(person, person)
+ self.assertTrue(vulnerability.hasSubscription(person))
+ vulnerability.unsubscribe(person, person)
+ self.assertFalse(vulnerability.hasSubscription(person))
+
+ def test_privileged_users_can_unsubscribe_other_subscribers(self):
+ person = self.factory.makePerson()
+ person2 = self.factory.makePerson()
+ person3 = self.factory.makePerson()
+ person4 = self.factory.makePerson()
+ creator = self.factory.makeTeam(members=[person2])
+ vulnerability = self.factory.makeVulnerability(creator=creator)
+ distribution = vulnerability.distribution
+ with person_logged_in(distribution.owner):
+ distribution.security_admin = self.factory.makeTeam(
+ members=[person4]
+ )
+
+ vulnerability.subscribe(person, person)
+ self.assertTrue(vulnerability.hasSubscription(person))
+ # Users in the vulnerability's creator team can unsubscribe user.
+ vulnerability.unsubscribe(person, person2)
+ self.assertFalse(vulnerability.hasSubscription(person))
+
+ vulnerability.subscribe(person, person)
+ self.assertTrue(vulnerability.hasSubscription(person))
+ # Vulnerability's distribution owner can unsubscribe subscribers.
+ vulnerability.unsubscribe(person, vulnerability.distribution.owner)
+ self.assertFalse(vulnerability.hasSubscription(person))
+
+ vulnerability.subscribe(person, person)
+ self.assertTrue(vulnerability.hasSubscription(person))
+ # Vulnerability's distribution's security admins can unsubscribe
+ # subscribers.
+ vulnerability.unsubscribe(person, person4)
+ self.assertFalse(vulnerability.hasSubscription(person))
+
+ vulnerability.subscribe(person, person3)
+ self.assertTrue(vulnerability.hasSubscription(person))
+ # The person who created the subscription can unsubscribe the person.
+ vulnerability.unsubscribe(person, person3)
+ self.assertFalse(vulnerability.hasSubscription(person))
+
+ # Admins can unsubscribe the person.
+ vulnerability.subscribe(person, person)
+ self.assertTrue(vulnerability.hasSubscription(person))
+ vulnerability.unsubscribe(
+ person, getUtility(ILaunchpadCelebrities).admin.teamowner
+ )
+ self.assertFalse(vulnerability.hasSubscription(person))
+
+ def test_unsubscribe_removes_visibility_of_non_public_vulnerability(self):
+ person = self.factory.makePerson()
+ vulnerability = removeSecurityProxy(
+ self.makeProprietaryVulnerability()
+ )
+ distribution_owner = vulnerability.distribution.owner
+ with person_logged_in(distribution_owner):
+ vulnerability.subscribe(person, distribution_owner)
+
+ with person_logged_in(person):
+ self.assertTrue(check_permission("launchpad.View", vulnerability))
+ vulnerability.unsubscribe(person, person)
+
+ # Have to re-login again for the permission cache to get invalidated.
+ with person_logged_in(person):
+ self.assertFalse(check_permission("launchpad.View", vulnerability))
+
def test_random_user_permissions(self):
with person_logged_in(self.factory.makePerson()):
self.assertTrue(
@@ -51,6 +382,18 @@ class TestVulnerability(TestCaseWithFactory):
check_permission("launchpad.Edit", self.vulnerability)
)
+ def test_random_user_permissions_non_public_vulnerability(self):
+ vulnerability = self.makeProprietaryVulnerability()
+ with person_logged_in(self.factory.makePerson()):
+ self.assertFalse(check_permission("launchpad.View", vulnerability))
+
+ def test_user_can_view_shared_non_public_vulnerability(self):
+ person = self.factory.makePerson()
+ vulnerability = self.makeProprietaryVulnerability()
+ grant_access_to_non_public_vulnerability(vulnerability, person)
+ with person_logged_in(person):
+ self.assertTrue(check_permission("launchpad.View", vulnerability))
+
def test_admin_permissions(self):
with admin_logged_in():
self.assertTrue(
@@ -82,12 +425,22 @@ class TestVulnerability(TestCaseWithFactory):
def test_anonymous_permissions(self):
with anonymous_logged_in():
- self.assertFalse(
+ self.assertTrue(
check_permission("launchpad.View", self.vulnerability)
)
self.assertFalse(
check_permission("launchpad.Edit", self.vulnerability)
)
+ distribution = self.factory.makeDistribution(
+ specification_sharing_policy=SpecificationSharingPolicy.PROPRIETARY
+ )
+ vulnerability = self.factory.makeVulnerability(
+ distribution=distribution,
+ information_type=InformationType.PROPRIETARY,
+ )
+ with anonymous_logged_in():
+ self.assertFalse(check_permission("launchpad.View", vulnerability))
+ self.assertFalse(check_permission("launchpad.Edit", vulnerability))
def test_edit_vulnerability_security_admin(self):
person = self.factory.makePerson()
@@ -174,3 +527,63 @@ class TestVulnerabilitySet(TestCaseWithFactory):
initial_number,
(len(vulnerability1.bugs) + len(vulnerability2.bugs)),
)
+
+ @patch("lp.bugs.model.vulnerability.reconcile_access_for_artifacts")
+ def test_access_reconciled_after_creating_a_vulnerability(
+ self, mock_reconcile_method
+ ):
+ distribution = self.factory.makeDistribution()
+ creator = self.factory.makePerson()
+ vulnerability = getUtility(IVulnerabilitySet).new(
+ distribution=distribution,
+ status=VulnerabilityStatus.NEEDS_TRIAGE,
+ importance=BugTaskImportance.UNDECIDED,
+ creator=creator,
+ )
+ mock_reconcile_method.assert_called_with(
+ [vulnerability], vulnerability.information_type, [distribution]
+ )
+
+ def test_findByIds(self):
+ person = self.factory.makePerson()
+ proprietary_distribution = self.factory.makeDistribution(
+ specification_sharing_policy=(
+ SpecificationSharingPolicy.PROPRIETARY
+ )
+ )
+ vulnerability1 = removeSecurityProxy(self.factory.makeVulnerability())
+ vulnerability2 = removeSecurityProxy(
+ self.factory.makeVulnerability(
+ distribution=proprietary_distribution,
+ information_type=InformationType.PROPRIETARY,
+ )
+ )
+ vulnerability3 = removeSecurityProxy(
+ self.factory.makeVulnerability(
+ distribution=proprietary_distribution,
+ information_type=InformationType.PROPRIETARY,
+ )
+ )
+ grant_access_to_non_public_vulnerability(vulnerability2, person)
+ vulnerability_set = getUtility(IVulnerabilitySet)
+ self.assertContentEqual(
+ {vulnerability1, vulnerability2, vulnerability3},
+ vulnerability_set.findByIds(
+ [
+ vulnerability1.id,
+ vulnerability2.id,
+ vulnerability3.id,
+ ]
+ ),
+ )
+ self.assertContentEqual(
+ {vulnerability1, vulnerability2},
+ vulnerability_set.findByIds(
+ [
+ vulnerability1.id,
+ vulnerability2.id,
+ vulnerability3.id,
+ ],
+ visible_by_user=person,
+ ),
+ )
diff --git a/lib/lp/bugs/model/tests/test_vulnerabilitysubscription.py b/lib/lp/bugs/model/tests/test_vulnerabilitysubscription.py
new file mode 100644
index 0000000..14a611d
--- /dev/null
+++ b/lib/lp/bugs/model/tests/test_vulnerabilitysubscription.py
@@ -0,0 +1,68 @@
+# Copyright 2022 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for the VulnerabilitySubscription model."""
+from zope.component import getUtility
+
+from lp.app.interfaces.launchpad import ILaunchpadCelebrities
+from lp.bugs.interfaces.vulnerabilitysubscription import (
+ IVulnerabilitySubscription,
+)
+from lp.bugs.model.vulnerabilitysubscription import VulnerabilitySubscription
+from lp.testing import TestCaseWithFactory, person_logged_in, verifyObject
+from lp.testing.layers import DatabaseFunctionalLayer
+
+
+class TestVulnerabilitySubscription(TestCaseWithFactory):
+
+ layer = DatabaseFunctionalLayer
+
+ def test_VulnerabilitySubscription_implements_its_interface(self):
+ person = self.factory.makePerson()
+ vulnerability = self.factory.makeVulnerability()
+ subscription = VulnerabilitySubscription(vulnerability, person, person)
+ self.assertTrue(verifyObject(IVulnerabilitySubscription, subscription))
+
+ def test_canBeUnsubscribedByUser(self):
+ person = self.factory.makePerson()
+ person2 = self.factory.makePerson()
+ person3 = self.factory.makePerson()
+ person4 = self.factory.makePerson()
+ person5 = self.factory.makePerson()
+ creator = self.factory.makeTeam(members=[person3])
+ distribution = self.factory.makeDistribution()
+ with person_logged_in(distribution.owner):
+ distribution.security_admin = self.factory.makeTeam(
+ members=[person5]
+ )
+ vulnerability = self.factory.makeVulnerability(
+ creator=creator, distribution=distribution
+ )
+ subscription = VulnerabilitySubscription(
+ vulnerability, person, person4
+ )
+ # A user account is needed to unsubscribe a subscriber.
+ self.assertFalse(subscription.canBeUnsubscribedByUser(None))
+ # A user can unsubscribe from their own subscription.
+ self.assertTrue(subscription.canBeUnsubscribedByUser(person))
+ # A random user cannot unsubscribe a subscriber.
+ self.assertFalse(subscription.canBeUnsubscribedByUser(person2))
+ # A user in the vulnerability's creator team can unsubscribe
+ # a subscriber.
+ self.assertTrue(subscription.canBeUnsubscribedByUser(person3))
+ # The vulnerability's distribution's owner can unsubscribe
+ # a subscriber.
+ self.assertTrue(
+ subscription.canBeUnsubscribedByUser(distribution.owner)
+ )
+ # The vulnerability's distribution's security admins can unsubscribe
+ # a subscriber
+ self.assertTrue(subscription.canBeUnsubscribedByUser(person5))
+ # The person who subscribed the subscriber can unsubscribe them.
+ self.assertTrue(subscription.canBeUnsubscribedByUser(person4))
+ # Admins can unsubscribe the subscriber.
+ self.assertTrue(
+ subscription.canBeUnsubscribedByUser(
+ getUtility(ILaunchpadCelebrities).admin.teamowner
+ )
+ )
diff --git a/lib/lp/bugs/model/vulnerability.py b/lib/lp/bugs/model/vulnerability.py
index 25ebea9..30cabf4 100644
--- a/lib/lp/bugs/model/vulnerability.py
+++ b/lib/lp/bugs/model/vulnerability.py
@@ -9,11 +9,18 @@ __all__ = [
import operator
import pytz
+from storm.expr import SQL, Coalesce, Join, Or, Select
from storm.locals import DateTime, Int, Reference, Unicode
+from storm.store import Store
from zope.component import getUtility
from zope.interface import implementer
-from lp.app.enums import InformationType
+from lp.app.enums import PUBLIC_INFORMATION_TYPES, InformationType
+from lp.app.errors import (
+ SubscriptionPrivacyViolation,
+ UserCannotUnsubscribePerson,
+)
+from lp.app.interfaces.services import IService
from lp.app.model.launchpad import InformationTypeMixin
from lp.bugs.enums import VulnerabilityStatus
from lp.bugs.interfaces.buglink import IBugLinkTarget
@@ -27,11 +34,21 @@ from lp.bugs.interfaces.vulnerability import (
)
from lp.bugs.model.bug import Bug
from lp.bugs.model.buglinktarget import BugLinkTargetMixin
+from lp.bugs.model.vulnerabilitysubscription import VulnerabilitySubscription
+from lp.registry.interfaces.accesspolicy import (
+ IAccessArtifactGrantSource,
+ IAccessArtifactSource,
+)
+from lp.registry.interfaces.role import IPersonRoles
+from lp.registry.model.accesspolicy import reconcile_access_for_artifacts
+from lp.registry.model.person import Person
+from lp.registry.model.teammembership import TeamParticipation
from lp.services.database import bulk
from lp.services.database.constants import UTC_NOW
from lp.services.database.enumcol import DBEnum
from lp.services.database.interfaces import IStore
from lp.services.database.stormbase import StormBase
+from lp.services.database.stormexpr import Array, ArrayAgg, ArrayIntersects
from lp.services.xref.interfaces import IXRefSet
@@ -66,7 +83,7 @@ class Vulnerability(StormBase, BugLinkTargetMixin, InformationTypeMixin):
name="importance_explanation", allow_none=True
)
- information_type = DBEnum(
+ _information_type = DBEnum(
enum=InformationType,
default=InformationType.PUBLIC,
allow_none=False,
@@ -103,7 +120,11 @@ class Vulnerability(StormBase, BugLinkTargetMixin, InformationTypeMixin):
self.cve = cve
self.status = status
self.importance = importance
- self.information_type = information_type
+ # Set `self._information_type` rather than `self.information_type`
+ # to avoid the call to `self._reconcileAccess` while constructing
+ # the instance. `VulnerabilitySet.new` deals with calling
+ # `_reconcileAccess` once the instance has been fully constructed.
+ self._information_type = information_type
self.creator = creator
self.description = description
self.notes = notes
@@ -138,6 +159,129 @@ class Vulnerability(StormBase, BugLinkTargetMixin, InformationTypeMixin):
{("vulnerability", str(self.id)): [("bug", str(bug.id))]}
)
+ @property
+ def information_type(self):
+ return self._information_type
+
+ @information_type.setter
+ def information_type(self, information_type):
+ if information_type != self._information_type:
+ self._information_type = information_type
+ self._reconcileAccess()
+
+ def visibleByUser(self, user):
+ """See `IVulnerability`."""
+ if self.information_type in PUBLIC_INFORMATION_TYPES:
+ return True
+ if user is None:
+ return False
+ return (
+ not IStore(self)
+ .find(
+ Vulnerability,
+ Vulnerability.id == self.id,
+ get_vulnerability_privacy_filter(user),
+ )
+ .is_empty()
+ )
+
+ def _reconcileAccess(self):
+ """Reconcile the vulnerability's sharing information.
+ Takes the privacy ad distribution and makes the related AccessArtifact
+ and AccessPolicyArtifacts match."""
+ reconcile_access_for_artifacts(
+ [self], self.information_type, [self.distribution]
+ )
+
+ @property
+ def subscriptions(self):
+ return Store.of(self).find(
+ VulnerabilitySubscription,
+ VulnerabilitySubscription.vulnerability == self,
+ )
+
+ @property
+ def subscribers(self):
+ return Store.of(self).find(
+ Person,
+ VulnerabilitySubscription.person_id == Person.id,
+ VulnerabilitySubscription.vulnerability == self,
+ )
+
+ def getSubscription(self, person):
+ """Returns the person's subscription or None."""
+ if person is None:
+ return None
+ return (
+ Store.of(self)
+ .find(
+ VulnerabilitySubscription,
+ VulnerabilitySubscription.person == person,
+ VulnerabilitySubscription.vulnerability == self,
+ )
+ .one()
+ )
+
+ def hasSubscription(self, person):
+ """See `IVulnerability`."""
+ return self.getSubscription(person) is not None
+
+ def userCanBeSubscribed(self, person):
+ """See `IVulnerability`."""
+ return not (
+ self.information_type not in PUBLIC_INFORMATION_TYPES
+ and person.is_team
+ and person.anyone_can_join()
+ )
+
+ def subscribe(self, person, subscribed_by, ignore_permissions=False):
+ """See `IVulnerability`."""
+ if not self.userCanBeSubscribed(person):
+ raise SubscriptionPrivacyViolation(
+ "Open and delegated teams cannot be subscribed to private"
+ "vulnerabilities."
+ )
+ if self.getSubscription(person) is None:
+ subscription = VulnerabilitySubscription(
+ person=person, vulnerability=self, subscribed_by=subscribed_by
+ )
+ Store.of(subscription).flush()
+ service = getUtility(IService, "sharing")
+ vulnerabilities = service.getVisibleArtifacts(
+ person, vulnerabilities=[self], ignore_permissions=True
+ )["vulnerabilities"]
+ if not vulnerabilities:
+ service.ensureAccessGrants(
+ [person],
+ subscribed_by,
+ vulnerabilities=[self],
+ ignore_permissions=ignore_permissions,
+ )
+
+ def unsubscribe(self, person, unsubscribed_by, ignore_permissions=False):
+ """See `IVulnerability`."""
+ subscription = self.getSubscription(person)
+ if subscription is None:
+ return
+ if (
+ not ignore_permissions
+ and not subscription.canBeUnsubscribedByUser(unsubscribed_by)
+ ):
+ raise UserCannotUnsubscribePerson(
+ "%s does not have permission to unsubscribe %s"
+ % (
+ unsubscribed_by.displayname,
+ person.displayname,
+ )
+ )
+ artifact = getUtility(IAccessArtifactSource).find([self])
+ getUtility(IAccessArtifactGrantSource).revokeByArtifact(
+ artifact, [person]
+ )
+ store = Store.of(subscription)
+ store.remove(subscription)
+ IStore(self).flush()
+
@implementer(IVulnerabilitySet)
class VulnerabilitySet:
@@ -171,9 +315,17 @@ class VulnerabilitySet:
date_made_public=date_made_public,
)
store.add(vulnerability)
+ vulnerability._reconcileAccess()
store.flush()
return vulnerability
+ def findByIds(self, vulnerability_ids, visible_by_user=None):
+ """See `IVulnerabilitySet`."""
+ clauses = [Vulnerability.id.is_in(vulnerability_ids)]
+ if visible_by_user is not None:
+ clauses.append(get_vulnerability_privacy_filter(visible_by_user))
+ return IStore(Vulnerability).find(Vulnerability, *clauses)
+
@implementer(IVulnerabilityActivity)
class VulnerabilityActivity(StormBase):
@@ -233,3 +385,62 @@ class VulnerabilityActivitySet:
)
store.add(activity)
return activity
+
+
+def get_vulnerability_privacy_filter(user):
+ """Returns the filter for all vulnerabilities that the given user has
+ access to, including private vulnerabilities where the user has proper
+ permission.
+
+ :param user: An IPerson, or a class attribute tha references an IPerson
+ in the database.
+ :return: A Storm condition.
+ """
+ from lp.registry.model.accesspolicy import AccessPolicyGrant
+
+ public_vulnerabilities_filter = Vulnerability._information_type.is_in(
+ PUBLIC_INFORMATION_TYPES
+ )
+
+ if user is None:
+ return [public_vulnerabilities_filter]
+ elif IPersonRoles.providedBy(user):
+ user = user.person
+
+ artifact_grant_query = Coalesce(
+ ArrayIntersects(
+ SQL("Vulnerability.access_grants"),
+ Select(
+ ArrayAgg(TeamParticipation.teamID),
+ tables=TeamParticipation,
+ where=(TeamParticipation.person == user),
+ ),
+ ),
+ False,
+ )
+
+ policy_grant_query = Coalesce(
+ ArrayIntersects(
+ Array(SQL("Vulnerability.access_policy")),
+ Select(
+ ArrayAgg(AccessPolicyGrant.policy_id),
+ tables=(
+ AccessPolicyGrant,
+ Join(
+ TeamParticipation,
+ TeamParticipation.teamID
+ == AccessPolicyGrant.grantee_id,
+ ),
+ ),
+ where=(TeamParticipation.person == user),
+ ),
+ ),
+ False,
+ )
+ return [
+ Or(
+ public_vulnerabilities_filter,
+ artifact_grant_query,
+ policy_grant_query,
+ )
+ ]
diff --git a/lib/lp/bugs/model/vulnerabilitysubscription.py b/lib/lp/bugs/model/vulnerabilitysubscription.py
new file mode 100644
index 0000000..2e46b93
--- /dev/null
+++ b/lib/lp/bugs/model/vulnerabilitysubscription.py
@@ -0,0 +1,60 @@
+# Copyright 2022 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Vulnerability subscription model."""
+
+__all__ = ["VulnerabilitySubscription"]
+
+import pytz
+from storm.properties import DateTime, Int
+from storm.references import Reference
+from zope.interface import implementer
+
+from lp.bugs.interfaces.vulnerabilitysubscription import (
+ IVulnerabilitySubscription,
+)
+from lp.registry.interfaces.person import validate_person
+from lp.registry.interfaces.role import IPersonRoles
+from lp.services.database.constants import UTC_NOW
+from lp.services.database.stormbase import StormBase
+
+
+@implementer(IVulnerabilitySubscription)
+class VulnerabilitySubscription(StormBase):
+ """A relationship between a person and a vulnerability."""
+
+ __storm_table__ = "VulnerabilitySubscription"
+
+ id = Int(primary=True)
+
+ person_id = Int("person", allow_none=False, validator=validate_person)
+ person = Reference(person_id, "Person.id")
+
+ vulnerability_id = Int("vulnerability", allow_none=False)
+ vulnerability = Reference(vulnerability_id, "Vulnerability.id")
+
+ date_created = DateTime(allow_none=False, default=UTC_NOW, tzinfo=pytz.UTC)
+
+ subscribed_by_id = Int(
+ "subscribed_by", allow_none=False, validator=validate_person
+ )
+ subscribed_by = Reference(subscribed_by_id, "Person.id")
+
+ def __init__(self, vulnerability, person, subscribed_by):
+ super().__init__()
+ self.vulnerability = vulnerability
+ self.person = person
+ self.subscribed_by = subscribed_by
+
+ def canBeUnsubscribedByUser(self, user):
+ """See `IVulnerabilitySubscription`."""
+ if user is None:
+ return False
+ return (
+ user.inTeam(self.vulnerability.creator)
+ or user.inTeam(self.vulnerability.distribution.owner)
+ or user.inTeam(self.vulnerability.distribution.security_admin)
+ or user.inTeam(self.person)
+ or user.inTeam(self.subscribed_by)
+ or IPersonRoles(user).in_admin
+ )
diff --git a/lib/lp/bugs/security.py b/lib/lp/bugs/security.py
index 6141de3..11d74e9 100644
--- a/lib/lp/bugs/security.py
+++ b/lib/lp/bugs/security.py
@@ -417,6 +417,21 @@ class EditBugSubscriptionFilter(AuthorizationBase):
return user.inTeam(self.obj.structural_subscription.subscriber)
+class ViewVulnerability(AnonymousAuthorization):
+ """Anyone can view public vulnerabilities, but only subscribers
+ can view private ones.
+ """
+
+ permission = "launchpad.View"
+ usedfor = IVulnerability
+
+ def checkUnauthenticated(self):
+ return self.obj.visibleByUser(None)
+
+ def checkAuthenticated(self, user):
+ return self.obj.visibleByUser(user.person)
+
+
class EditVulnerability(DelegatedAuthorization):
"""The security admins of a distribution should be able to edit
vulnerabilities in that distribution."""
diff --git a/lib/lp/registry/browser/pillar.py b/lib/lp/registry/browser/pillar.py
index 4f776e9..a5e5eab 100644
--- a/lib/lp/registry/browser/pillar.py
+++ b/lib/lp/registry/browser/pillar.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2021 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2022 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Common views for objects that implement `IPillar`."""
@@ -445,10 +445,13 @@ class PillarPersonSharingView(LaunchpadView):
spec_data = self._build_specification_template_data(
self.specifications, request
)
- snap_data = self._build_ocirecipe_template_data(self.snaps, request)
+ snap_data = self._build_snap_template_data(self.snaps, request)
ocirecipe_data = self._build_ocirecipe_template_data(
self.ocirecipes, request
)
+ vulnerability_data = self._build_vulnerability_template_data(
+ self.vulnerabilities, request
+ )
grantee_data = {
"displayname": self.person.displayname,
"self_link": absoluteURL(self.person, request),
@@ -462,6 +465,7 @@ class PillarPersonSharingView(LaunchpadView):
cache.objects["specifications"] = spec_data
cache.objects["snaps"] = snap_data
cache.objects["ocirecipes"] = ocirecipe_data
+ cache.objects["vulnerabilities"] = vulnerability_data
def _loadSharedArtifacts(self):
# As a concrete can by linked via more than one policy, we use sets to
@@ -475,6 +479,7 @@ class PillarPersonSharingView(LaunchpadView):
self.snaps = artifacts["snaps"]
self.specifications = artifacts["specifications"]
self.ocirecipes = artifacts["ocirecipes"]
+ self.vulnerabilities = artifacts["vulnerabilities"]
bug_ids = {bugtask.bug.id for bugtask in self.bugtasks}
self.shared_bugs_count = len(bug_ids)
@@ -483,6 +488,7 @@ class PillarPersonSharingView(LaunchpadView):
self.shared_snaps_count = len(self.snaps)
self.shared_specifications_count = len(self.specifications)
self.shared_ocirecipe_count = len(self.ocirecipes)
+ self.shared_vulnerabilities_count = len(self.vulnerabilities)
def _build_specification_template_data(self, specs, request):
spec_data = []
@@ -574,3 +580,19 @@ class PillarPersonSharingView(LaunchpadView):
)
)
return snap_data
+
+ def _build_vulnerability_template_data(self, vulnerabilities, request):
+ vulnerability_data = []
+ for vulnerability in vulnerabilities:
+ vulnerability_data.append(
+ dict(
+ self_link=absoluteURL(vulnerability, request),
+ web_link=canonical_url(
+ vulnerability, path_only_if_possible=True
+ ),
+ name=vulnerability.cve.sequence,
+ id=vulnerability.id,
+ information_type=vulnerability.information_type.title,
+ )
+ )
+ return vulnerability_data
diff --git a/lib/lp/registry/interfaces/accesspolicy.py b/lib/lp/registry/interfaces/accesspolicy.py
index 233cc6c..1e31ea6 100644
--- a/lib/lp/registry/interfaces/accesspolicy.py
+++ b/lib/lp/registry/interfaces/accesspolicy.py
@@ -1,4 +1,4 @@
-# Copyright 2011-2021 Canonical Ltd. This software is licensed under the
+# Copyright 2011-2022 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Interfaces for pillar and artifact access policies."""
@@ -34,6 +34,7 @@ class IAccessArtifact(Interface):
snap_id = Attribute("snap_id")
specification_id = Attribute("specification_id")
ocirecipe_id = Attribute("ocirecipe_id")
+ vulnerability_id = Attribute("vulnerability_id")
class IAccessArtifactGrant(Interface):
diff --git a/lib/lp/registry/interfaces/sharingservice.py b/lib/lp/registry/interfaces/sharingservice.py
index ee08d9a..b5fc95b 100644
--- a/lib/lp/registry/interfaces/sharingservice.py
+++ b/lib/lp/registry/interfaces/sharingservice.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2021 Canonical Ltd. This software is licensed under the
+# Copyright 2012-2022 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Interfaces for sharing service."""
@@ -26,6 +26,7 @@ from lp.app.enums import InformationType
from lp.app.interfaces.services import IService
from lp.blueprints.interfaces.specification import ISpecification
from lp.bugs.interfaces.bug import IBug
+from lp.bugs.interfaces.vulnerability import IVulnerability
from lp.code.interfaces.branch import IBranch
from lp.code.interfaces.gitrepository import IGitRepository
from lp.oci.interfaces.ocirecipe import IOCIRecipe
@@ -192,6 +193,14 @@ class ISharingService(IService):
:return: a collection of OCI recipes.
"""
+ def getSharedVulnerabilities(pillar, person, user):
+ """Return the vulnerabilities shared between the pillar and person.
+
+ :param user: the user making the request. Only the vulnerabilities
+ visible to the user will be included in the result.
+ :param: a collection of vulnerabilities.
+ """
+
def getVisibleArtifacts(
person,
bugs=None,
@@ -200,6 +209,7 @@ class ISharingService(IService):
snaps=None,
specifications=None,
ocirecipes=None,
+ vulnerabilities=None,
):
"""Return the artifacts shared with person.
@@ -216,6 +226,8 @@ class ISharingService(IService):
person has access.
:param ocirecipes: the OCI recipes to check for which a person
has access.
+ :param vulnerabilities: the vulnerabilities to check for which person
+ has access.
:return: a collection of artifacts the person can see.
"""
@@ -375,6 +387,11 @@ class ISharingService(IService):
title=_("OCI recipes"),
required=False,
),
+ vulnerabilities=List(
+ Reference(schema=IVulnerability),
+ title=_("Vulnerabilities"),
+ required=False,
+ ),
)
@operation_for_version("devel")
def revokeAccessGrants(
@@ -387,6 +404,7 @@ class ISharingService(IService):
snaps=None,
specifications=None,
ocirecipes=None,
+ vulnerabilities=None,
):
"""Remove a grantee's access to the specified artifacts.
@@ -399,6 +417,7 @@ class ISharingService(IService):
:param snaps: The snap recipes for which to revoke access
:param specifications: the specifications for which to revoke access
:param ocirecipes: The OCI recipes for which to revoke access
+ :param vulnerabilities: The vulnerabilities for which to revoke access
"""
@export_write_operation()
@@ -423,6 +442,11 @@ class ISharingService(IService):
title=_("OCI recipes"),
required=False,
),
+ vulnerabilities=List(
+ Reference(schema=IVulnerability),
+ title=_("Vulnerabilities"),
+ required=False,
+ ),
)
@operation_for_version("devel")
def ensureAccessGrants(
@@ -434,6 +458,7 @@ class ISharingService(IService):
snaps=None,
specifications=None,
ocirecipes=None,
+ vulnerabilities=None,
):
"""Ensure a grantee has an access grant to the specified artifacts.
@@ -445,6 +470,7 @@ class ISharingService(IService):
:param snaps: the snap recipes for which to grant access
:param specifications: the specifications for which to grant access
:param ocirecipes: the OCI recipes for which to grant access
+ :param vulnerabilities: the vulnerabilities for which to grant access
"""
@export_write_operation()
diff --git a/lib/lp/registry/model/accesspolicy.py b/lib/lp/registry/model/accesspolicy.py
index cb881b9..dd47d69 100644
--- a/lib/lp/registry/model/accesspolicy.py
+++ b/lib/lp/registry/model/accesspolicy.py
@@ -1,4 +1,4 @@
-# Copyright 2011-2021 Canonical Ltd. This software is licensed under the
+# Copyright 2011-2022 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Model classes for pillar and artifact access policies."""
@@ -95,6 +95,8 @@ class AccessArtifact(StormBase):
specification = Reference(specification_id, "Specification.id")
ocirecipe_id = Int(name="ocirecipe")
ocirecipe = Reference(ocirecipe_id, "OCIRecipe.id")
+ vulnerability_id = Int(name="vulnerability")
+ vulnerability = Reference(vulnerability_id, "Vulnerability.id")
@property
def concrete_artifact(self):
@@ -107,6 +109,7 @@ class AccessArtifact(StormBase):
def _constraintForConcrete(cls, concrete_artifact):
from lp.blueprints.interfaces.specification import ISpecification
from lp.bugs.interfaces.bug import IBug
+ from lp.bugs.interfaces.vulnerability import IVulnerability
from lp.code.interfaces.branch import IBranch
from lp.code.interfaces.gitrepository import IGitRepository
from lp.oci.interfaces.ocirecipe import IOCIRecipe
@@ -124,6 +127,8 @@ class AccessArtifact(StormBase):
col = cls.specification
elif IOCIRecipe.providedBy(concrete_artifact):
col = cls.ocirecipe
+ elif IVulnerability.providedBy(concrete_artifact):
+ col = cls.vulnerability
else:
raise ValueError("%r is not a valid artifact" % concrete_artifact)
return col == concrete_artifact
@@ -146,6 +151,7 @@ class AccessArtifact(StormBase):
"""See `IAccessArtifactSource`."""
from lp.blueprints.interfaces.specification import ISpecification
from lp.bugs.interfaces.bug import IBug
+ from lp.bugs.interfaces.vulnerability import IVulnerability
from lp.code.interfaces.branch import IBranch
from lp.code.interfaces.gitrepository import IGitRepository
from lp.oci.interfaces.ocirecipe import IOCIRecipe
@@ -163,17 +169,33 @@ class AccessArtifact(StormBase):
insert_values = []
for concrete in needed:
if IBug.providedBy(concrete):
- insert_values.append((concrete, None, None, None, None, None))
+ insert_values.append(
+ (concrete, None, None, None, None, None, None)
+ )
elif IBranch.providedBy(concrete):
- insert_values.append((None, concrete, None, None, None, None))
+ insert_values.append(
+ (None, concrete, None, None, None, None, None)
+ )
elif IGitRepository.providedBy(concrete):
- insert_values.append((None, None, concrete, None, None, None))
+ insert_values.append(
+ (None, None, concrete, None, None, None, None)
+ )
elif ISnap.providedBy(concrete):
- insert_values.append((None, None, None, concrete, None, None))
+ insert_values.append(
+ (None, None, None, concrete, None, None, None)
+ )
elif ISpecification.providedBy(concrete):
- insert_values.append((None, None, None, None, concrete, None))
+ insert_values.append(
+ (None, None, None, None, concrete, None, None)
+ )
elif IOCIRecipe.providedBy(concrete):
- insert_values.append((None, None, None, None, None, concrete))
+ insert_values.append(
+ (None, None, None, None, None, concrete, None)
+ )
+ elif IVulnerability.providedBy(concrete):
+ insert_values.append(
+ (None, None, None, None, None, None, concrete)
+ )
else:
raise ValueError("%r is not a supported artifact" % concrete)
columns = (
@@ -183,6 +205,7 @@ class AccessArtifact(StormBase):
cls.snap,
cls.specification,
cls.ocirecipe,
+ cls.vulnerability,
)
new = create(columns, insert_values, get_objects=True)
return list(existing) + new
diff --git a/lib/lp/registry/personmerge.py b/lib/lp/registry/personmerge.py
index 66b19b3..7e2dda3 100644
--- a/lib/lp/registry/personmerge.py
+++ b/lib/lp/registry/personmerge.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2021 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2022 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Person/team merger implementation."""
@@ -895,7 +895,12 @@ def _mergeOCIRecipeSubscription(cur, from_id, to_id):
def _mergeVulnerabilitySubscription(cur, from_id, to_id):
# Update only the VulnerabilitySubscription that will not conflict.
+<<<<<<< lib/lp/registry/personmerge.py
cur.execute('''
+=======
+ cur.execute(
+ """
+>>>>>>> lib/lp/registry/personmerge.py
UPDATE VulnerabilitySubscription
SET person=%(to_id)d
WHERE person=%(from_id)d AND vulnerability NOT IN
@@ -904,11 +909,24 @@ def _mergeVulnerabilitySubscription(cur, from_id, to_id):
FROM VulnerabilitySubscription
WHERE person = %(to_id)d
)
+<<<<<<< lib/lp/registry/personmerge.py
''' % vars())
# and delete those left over.
cur.execute('''
DELETE FROM VulnerabilitySubscription WHERE person=%(from_id)d
''' % vars())
+=======
+ """
+ % vars()
+ )
+ # and delete those left over.
+ cur.execute(
+ """
+ DELETE FROM VulnerabilitySubscription WHERE person=%(from_id)d
+ """
+ % vars()
+ )
+>>>>>>> lib/lp/registry/personmerge.py
def _mergeCharmRecipe(cur, from_person, to_person):
@@ -1181,7 +1199,11 @@ def merge_people(from_person, to_person, reviewer, delete=False):
skip.append(("charmrecipe", "owner"))
_mergeVulnerabilitySubscription(cur, from_id, to_id)
+<<<<<<< lib/lp/registry/personmerge.py
skip.append(('vulnerabilitysubscription', 'person'))
+=======
+ skip.append(("vulnerabilitysubscription", "person"))
+>>>>>>> lib/lp/registry/personmerge.py
# Sanity check. If we have a reference that participates in a
# UNIQUE index, it must have already been handled by this point.
diff --git a/lib/lp/registry/services/sharingservice.py b/lib/lp/registry/services/sharingservice.py
index 9dc9bdf..5548b4a 100644
--- a/lib/lp/registry/services/sharingservice.py
+++ b/lib/lp/registry/services/sharingservice.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2021 Canonical Ltd. This software is licensed under the
+# Copyright 2012-2022 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Classes for pillar and artifact sharing service."""
@@ -35,6 +35,8 @@ from lp.app.enums import PRIVATE_INFORMATION_TYPES
from lp.blueprints.model.specification import Specification
from lp.bugs.interfaces.bugtask import IBugTaskSet
from lp.bugs.interfaces.bugtasksearch import BugTaskSearchParams
+from lp.bugs.interfaces.vulnerability import IVulnerabilitySet
+from lp.bugs.model.vulnerability import Vulnerability
from lp.code.interfaces.branchcollection import IAllBranches
from lp.code.interfaces.gitcollection import IAllGitRepositories
from lp.oci.interfaces.ocirecipe import IOCIRecipeSet
@@ -239,6 +241,7 @@ class SharingService:
include_snaps=True,
include_specifications=True,
include_ocirecipes=True,
+ include_vulnerabilities=True,
):
"""See `ISharingService`."""
bug_ids = set()
@@ -247,6 +250,7 @@ class SharingService:
snap_ids = set()
specification_ids = set()
ocirecipe_ids = set()
+ vulnerability_ids = set()
for artifact in self.getArtifactGrantsForPersonOnPillar(
pillar, person
):
@@ -262,6 +266,8 @@ class SharingService:
specification_ids.add(artifact.specification_id)
elif artifact.ocirecipe_id and include_ocirecipes:
ocirecipe_ids.add(artifact.ocirecipe_id)
+ elif artifact.vulnerability_id and include_vulnerabilities:
+ vulnerability_ids.add(artifact.vulnerability_id)
# Load the bugs.
bugtasks = []
@@ -295,6 +301,9 @@ class SharingService:
ocirecipes = []
if ocirecipe_ids:
ocirecipes = load(OCIRecipe, ocirecipe_ids)
+ vulnerabilities = []
+ if vulnerability_ids:
+ vulnerabilities = load(Vulnerability, vulnerability_ids)
return {
"bugtasks": bugtasks,
@@ -303,6 +312,7 @@ class SharingService:
"snaps": snaps,
"specifications": specifications,
"ocirecipes": ocirecipes,
+ "vulnerabilities": vulnerabilities,
}
@available_with_permission("launchpad.Driver", "pillar")
@@ -317,6 +327,7 @@ class SharingService:
include_specifications=False,
include_snaps=False,
include_ocirecipes=False,
+ include_vulnerabilities=False,
)
return artifacts["bugtasks"]
@@ -332,6 +343,7 @@ class SharingService:
include_specifications=False,
include_snaps=False,
include_ocirecipes=False,
+ include_vulnerabilities=False,
)
return artifacts["branches"]
@@ -347,6 +359,7 @@ class SharingService:
include_specifications=False,
include_snaps=False,
include_ocirecipes=False,
+ include_vulnerabilities=False,
)
return artifacts["gitrepositories"]
@@ -362,6 +375,7 @@ class SharingService:
include_gitrepositories=False,
include_specifications=False,
include_ocirecipes=False,
+ include_vulnerabilities=False,
)
return artifacts["snaps"]
@@ -377,6 +391,7 @@ class SharingService:
include_gitrepositories=False,
include_snaps=False,
include_ocirecipes=False,
+ include_vulnerabilities=False,
)
return artifacts["specifications"]
@@ -392,9 +407,26 @@ class SharingService:
include_gitrepositories=False,
include_snaps=False,
include_specifications=False,
+ include_vulnerabilities=False,
)
return artifacts["ocirecipes"]
+ @available_with_permission("launchpad.Driver", "pillar")
+ def getSharedVulnerabilities(self, pillar, person, user):
+ """See `ISharingService`."""
+ artifacts = self.getSharedArtifacts(
+ pillar,
+ person,
+ user,
+ include_bugs=False,
+ include_branches=False,
+ include_gitrepositories=False,
+ include_snaps=False,
+ include_specifications=False,
+ include_ocirecipes=False,
+ )
+ return artifacts["vulnerabilities"]
+
def _getVisiblePrivateSpecificationIDs(self, person, specifications):
store = Store.of(specifications[0])
tables = (
@@ -447,6 +479,7 @@ class SharingService:
specifications=None,
ignore_permissions=False,
ocirecipes=None,
+ vulnerabilities=None,
):
"""See `ISharingService`."""
bug_ids = []
@@ -454,6 +487,7 @@ class SharingService:
gitrepository_ids = []
snap_ids = []
ocirecipes_ids = []
+ vulnerability_ids = []
for bug in bugs or []:
if not ignore_permissions and not check_permission(
"launchpad.View", bug
@@ -489,6 +523,12 @@ class SharingService:
):
raise Unauthorized
ocirecipes_ids.append(ocirecipe.id)
+ for vulnerability in vulnerabilities or []:
+ if not ignore_permissions and not check_permission(
+ "launchpad.View", vulnerability
+ ):
+ raise Unauthorized
+ vulnerability_ids.append(vulnerability.id)
# Load the bugs.
visible_bugs = []
@@ -547,6 +587,14 @@ class SharingService:
)
)
+ visible_vulnerabilities = []
+ if vulnerabilities:
+ visible_vulnerabilities = list(
+ getUtility(IVulnerabilitySet).findByIds(
+ vulnerability_ids, visible_by_user=person
+ )
+ )
+
return {
"bugs": visible_bugs,
"branches": visible_branches,
@@ -554,6 +602,7 @@ class SharingService:
"snaps": visible_snaps,
"specifications": visible_specs,
"ocirecipes": visible_ocirecipes,
+ "vulnerabilities": visible_vulnerabilities,
}
def getInvisibleArtifacts(
@@ -1056,6 +1105,7 @@ class SharingService:
snaps=None,
specifications=None,
ocirecipes=None,
+ vulnerabilities=None,
ignore_permissions=False,
):
"""See `ISharingService`."""
@@ -1073,6 +1123,8 @@ class SharingService:
artifacts.extend(specifications)
if ocirecipes:
artifacts.extend(ocirecipes)
+ if vulnerabilities:
+ artifacts.extend(vulnerabilities)
if not ignore_permissions:
# The user needs to have launchpad.Edit permission on all supplied
# bugs and branches or else we raise an Unauthorized exception.
diff --git a/lib/lp/registry/services/tests/test_sharingservice.py b/lib/lp/registry/services/tests/test_sharingservice.py
index 477db80..3b2cc0f 100644
--- a/lib/lp/registry/services/tests/test_sharingservice.py
+++ b/lib/lp/registry/services/tests/test_sharingservice.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2021 Canonical Ltd. This software is licensed under the
+# Copyright 2012-2022 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
import six
@@ -45,7 +45,9 @@ from lp.registry.interfaces.person import TeamMembershipPolicy
from lp.registry.interfaces.product import IProduct
from lp.registry.interfaces.role import IPersonRoles
from lp.registry.interfaces.sourcepackage import ISourcePackage
+from lp.registry.model.accesspolicy import AccessArtifact
from lp.registry.services.sharingservice import SharingService
+from lp.services.database.interfaces import IStore
from lp.services.job.tests import block_on_job
from lp.services.webapp.interaction import ANONYMOUS
from lp.services.webapp.interfaces import ILaunchpadRoot
@@ -90,6 +92,10 @@ class PillarScenariosMixin(WithScenarios):
if self.pillar_factory_name != "makeProduct":
self.skipTest("Only relevant for Product.")
+ def _skipUnlessDistribution(self):
+ if self.pillar_factory_name != "makeDistribution":
+ self.skipTest("Only relevant for Distribution.")
+
def _makePillar(self, **kwargs):
return getattr(self.factory, self.pillar_factory_name)(**kwargs)
@@ -1761,8 +1767,38 @@ class TestSharingService(
getUtility(IService, "sharing").ensureAccessGrants(
[grantee], pillar.owner, ocirecipes=ocirecipes[:9]
)
+
return bug_tasks, branches, gitrepositories, snaps, specs, ocirecipes
+ def create_shared_vulnerabilities(self, pillar, grantee, user):
+ ocirecipes = []
+ for x in range(0, 10):
+ ociproject = self.factory.makeOCIProject(
+ pillar=pillar, registrant=pillar.owner
+ )
+ ocirecipe = self.factory.makeOCIRecipe(
+ oci_project=ociproject,
+ owner=pillar.owner,
+ registrant=pillar.owner,
+ information_type=InformationType.USERDATA,
+ )
+ ocirecipes.append(ocirecipe)
+ getUtility(IService, "sharing").ensureAccessGrants(
+ [grantee], pillar.owner, ocirecipes=ocirecipes[:9]
+ )
+ vulnerabilities = []
+ for _ in range(10):
+ vulnerability = self.factory.makeVulnerability(
+ distribution=pillar,
+ creator=pillar.owner,
+ information_type=InformationType.USERDATA,
+ )
+ vulnerabilities.append(vulnerability)
+ getUtility(IService, "sharing").ensureAccessGrants(
+ [grantee], pillar.owner, vulnerabilities=vulnerabilities[:9]
+ )
+ return vulnerabilities
+
def test_getSharedArtifacts(self):
# Test the getSharedArtifacts method.
owner = self.factory.makePerson()
@@ -1800,6 +1836,26 @@ class TestSharingService(
self.assertContentEqual(specs[:9], shared_specs)
self.assertContentEqual(ocirecipes[:9], shared_ocirecipes)
+ def test_getSharedArtifacts_distribution_vulnerabilities(self):
+ self._skipUnlessDistribution()
+ owner = self.factory.makePerson()
+ pillar = self._makePillar(
+ owner=owner,
+ specification_sharing_policy=(
+ SpecificationSharingPolicy.PUBLIC_OR_PROPRIETARY
+ ),
+ )
+ login_person(owner)
+ grantee = self.factory.makePerson()
+ user = self.factory.makePerson()
+ vulnerabilities = self.create_shared_vulnerabilities(
+ pillar, grantee, user
+ )
+ IStore(AccessArtifact).flush()
+ artifacts = self.service.getSharedArtifacts(pillar, grantee, user)
+ shared_vulnerabilities = artifacts["vulnerabilities"]
+ self.assertContentEqual(vulnerabilities[:9], shared_vulnerabilities)
+
def _assert_getSharedPillars(self, pillar, who=None):
# Test that 'who' can query the shared pillars for a grantee.
@@ -1999,9 +2055,14 @@ class TestSharingService(
login_person(owner)
grantee = self.factory.makePerson()
user = self.factory.makePerson()
- _, _, _, _, _, ocirecipes = self.create_shared_artifacts(
- pillar, grantee, user
- )
+ (
+ _,
+ _,
+ _,
+ _,
+ _,
+ ocirecipes,
+ ) = self.create_shared_artifacts(pillar, grantee, user)
# Check the results.
shared_ocirecipes = self.service.getSharedOCIRecipes(
@@ -2009,6 +2070,29 @@ class TestSharingService(
)
self.assertContentEqual(ocirecipes[:9], shared_ocirecipes)
+ def test_getSharedVulnerabilities(self):
+ # Test the getSharedVulnerabilities method.
+ self._skipUnlessDistribution()
+ owner = self.factory.makePerson()
+ pillar = self._makePillar(
+ owner=owner,
+ specification_sharing_policy=(
+ SpecificationSharingPolicy.PROPRIETARY
+ ),
+ )
+ login_person(owner)
+ grantee = self.factory.makePerson()
+ user = self.factory.makePerson()
+ vulnerabilities = self.create_shared_vulnerabilities(
+ pillar, grantee, user
+ )
+
+ # Check the results.
+ shared_vulnerabilities = self.service.getSharedVulnerabilities(
+ pillar, grantee, user
+ )
+ self.assertContentEqual(vulnerabilities[:9], shared_vulnerabilities)
+
def test_getPeopleWithAccessBugs(self):
# Test the getPeopleWithoutAccess method with bugs.
owner = self.factory.makePerson()
diff --git a/lib/lp/registry/tests/test_personmerge.py b/lib/lp/registry/tests/test_personmerge.py
index 024697f..f67cc7c 100644
--- a/lib/lp/registry/tests/test_personmerge.py
+++ b/lib/lp/registry/tests/test_personmerge.py
@@ -25,6 +25,7 @@ from lp.charms.interfaces.charmrecipe import (
)
from lp.code.interfaces.gitrepository import IGitRepositorySet
from lp.oci.interfaces.ocirecipe import OCI_RECIPE_ALLOW_CREATE, IOCIRecipeSet
+from lp.registry.enums import SpecificationSharingPolicy
from lp.registry.interfaces.accesspolicy import (
IAccessArtifactGrantSource,
IAccessPolicyGrantSource,
@@ -48,6 +49,7 @@ from lp.services.identity.interfaces.emailaddress import (
EmailAddressStatus,
IEmailAddressSet,
)
+from lp.services.webapp.authorization import check_permission
from lp.snappy.interfaces.snap import SNAP_TESTING_FLAGS, ISnapSet
from lp.soyuz.enums import ArchiveStatus
from lp.soyuz.interfaces.livefs import LIVEFS_FEATURE_FLAG, ILiveFSSet
@@ -750,6 +752,71 @@ class TestMergePeople(TestCaseWithFactory, KarmaTestMixin):
self.assertFalse(snap.visibleByUser(duplicate))
self.assertIsNone(snap.getSubscription(duplicate))
+ def test_merge_vulnerability_subscriptions(self):
+ # Checks that merging users moves subscriptions.
+ duplicate = self.factory.makePerson()
+ mergee = self.factory.makePerson()
+ vulnerability = self.factory.makeVulnerability()
+ vulnerability.subscribe(duplicate, duplicate)
+ self.assertTrue(vulnerability.hasSubscription(duplicate))
+ self.assertFalse(vulnerability.hasSubscription(mergee))
+ self._do_premerge(duplicate, mergee)
+ login_person(mergee)
+ duplicate, mergee = self._do_merge(duplicate, mergee)
+ self.assertFalse(vulnerability.hasSubscription(duplicate))
+ self.assertTrue(vulnerability.hasSubscription(mergee))
+
+ def test_merge_vulnerability_subscriptions_mergee_already_subscribed(self):
+ duplicate = self.factory.makePerson()
+ mergee = self.factory.makePerson()
+ vulnerability = self.factory.makeVulnerability()
+ vulnerability.subscribe(duplicate, duplicate)
+ vulnerability.subscribe(mergee, mergee)
+ mergee_subscription = vulnerability.getSubscription(mergee)
+ self.assertTrue(vulnerability.hasSubscription(duplicate))
+ self.assertTrue(vulnerability.hasSubscription(mergee))
+ self._do_premerge(duplicate, mergee)
+ login_person(mergee)
+ duplicate, mergee = self._do_merge(duplicate, mergee)
+ self.assertFalse(vulnerability.hasSubscription(duplicate))
+ self.assertTrue(vulnerability.hasSubscription(mergee))
+ self.assertEqual(
+ mergee_subscription, vulnerability.getSubscription(mergee)
+ )
+
+ def test_merge_vulnerability_subscriptions_non_public_vulnerability(self):
+ duplicate = self.factory.makePerson()
+ mergee = self.factory.makePerson()
+ distribution = self.factory.makeDistribution(
+ specification_sharing_policy=SpecificationSharingPolicy.PROPRIETARY
+ )
+ vulnerability = self.factory.makeVulnerability(
+ distribution=distribution,
+ information_type=InformationType.PROPRIETARY,
+ )
+ with person_logged_in(distribution.owner):
+ vulnerability.subscribe(duplicate, distribution.owner)
+ self.assertTrue(vulnerability.hasSubscription(duplicate))
+ self.assertFalse(vulnerability.hasSubscription(mergee))
+
+ with person_logged_in(duplicate):
+ self.assertTrue(check_permission("launchpad.View", vulnerability))
+
+ with person_logged_in(mergee):
+ self.assertFalse(check_permission("launchpad.View", vulnerability))
+
+ self._do_premerge(duplicate, mergee)
+ login_person(mergee)
+ duplicate, mergee = self._do_merge(duplicate, mergee)
+ with person_logged_in(distribution.owner):
+ self.assertFalse(vulnerability.hasSubscription(duplicate))
+ self.assertTrue(vulnerability.hasSubscription(mergee))
+
+ # Cannot log in as the duplicate user any more to test that they do not
+ # have the permission.
+ with person_logged_in(mergee):
+ self.assertTrue(check_permission("launchpad.View", vulnerability))
+
def test_merge_moves_oci_recipes(self):
# When person/teams are merged, oci recipes owned by the from
# person are moved.
Follow ups