launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #28118
[Merge] ~cjwatson/launchpad:distribution-sharing-policies into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:distribution-sharing-policies into launchpad:master with ~cjwatson/launchpad:distribution-information-type as a prerequisite.
Commit message:
Add distribution sharing policies
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/415623
This is heavily based on the equivalent code for projects.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:distribution-sharing-policies into launchpad:master.
diff --git a/lib/lp/code/model/branch.py b/lib/lp/code/model/branch.py
index 98952be..8ec4819 100644
--- a/lib/lp/code/model/branch.py
+++ b/lib/lp/code/model/branch.py
@@ -253,10 +253,10 @@ class Branch(SQLBase, WebhookTargetMixin, BzrIdentityMixin):
(abstract_artifact, policy) for policy in
getUtility(IAccessPolicySource).findByTeam([self.owner])}
else:
- # We haven't yet quite worked out how distribution privacy
- # works, so only work for products for now.
if self.product is not None:
pillars = [self.product]
+ elif self.distribution is not None:
+ pillars = [self.distribution]
reconcile_access_for_artifacts(
[self], self.information_type, pillars, wanted_links)
diff --git a/lib/lp/code/model/branchnamespace.py b/lib/lp/code/model/branchnamespace.py
index 326d1d4..b007d99 100644
--- a/lib/lp/code/model/branchnamespace.py
+++ b/lib/lp/code/model/branchnamespace.py
@@ -26,7 +26,6 @@ from lp.app.enums import (
FREE_INFORMATION_TYPES,
InformationType,
NON_EMBARGOED_INFORMATION_TYPES,
- PUBLIC_INFORMATION_TYPES,
)
from lp.app.interfaces.services import IService
from lp.code.enums import (
@@ -420,11 +419,32 @@ class PackageBranchNamespace(_BaseBranchNamespace):
def getAllowedInformationTypes(self, who=None):
"""See `IBranchNamespace`."""
- return PUBLIC_INFORMATION_TYPES
+ # The distribution uses the new simplified branch_sharing_policy
+ # rules, so check them.
+
+ # Some policies require that the branch owner or current user have
+ # full access to an information type. If it's required and the user
+ # doesn't hold it, no information types are legal.
+ distribution = self.sourcepackage.distribution
+ required_grant = BRANCH_POLICY_REQUIRED_GRANTS[
+ distribution.branch_sharing_policy]
+ if (required_grant is not None
+ and not getUtility(IService, 'sharing').checkPillarAccess(
+ [distribution], required_grant, self.owner)
+ and (who is None
+ or not getUtility(IService, 'sharing').checkPillarAccess(
+ [distribution], required_grant, who))):
+ return []
+
+ return BRANCH_POLICY_ALLOWED_TYPES[distribution.branch_sharing_policy]
def getDefaultInformationType(self, who=None):
"""See `IBranchNamespace`."""
- return InformationType.PUBLIC
+ default_type = BRANCH_POLICY_DEFAULT_TYPES[
+ self.sourcepackage.distribution.branch_sharing_policy]
+ if default_type not in self.getAllowedInformationTypes(who):
+ return None
+ return default_type
class BranchNamespaceSet:
diff --git a/lib/lp/code/model/gitnamespace.py b/lib/lp/code/model/gitnamespace.py
index 7ba9a13..e7b2c92 100644
--- a/lib/lp/code/model/gitnamespace.py
+++ b/lib/lp/code/model/gitnamespace.py
@@ -510,11 +510,29 @@ class PackageGitNamespace(_BaseGitNamespace):
def getAllowedInformationTypes(self, who=None):
"""See `IGitNamespace`."""
- return PUBLIC_INFORMATION_TYPES
+ # Some policies require that the repository owner or current user
+ # have full access to an information type. If it's required and the
+ # user doesn't hold it, no information types are legal.
+ distribution = self.distro_source_package.distribution
+ required_grant = BRANCH_POLICY_REQUIRED_GRANTS[
+ distribution.branch_sharing_policy]
+ if (required_grant is not None
+ and not getUtility(IService, 'sharing').checkPillarAccess(
+ [distribution], required_grant, self.owner)
+ and (who is None
+ or not getUtility(IService, 'sharing').checkPillarAccess(
+ [distribution], required_grant, who))):
+ return []
+
+ return BRANCH_POLICY_ALLOWED_TYPES[distribution.branch_sharing_policy]
def getDefaultInformationType(self, who=None):
"""See `IGitNamespace`."""
- return InformationType.PUBLIC
+ default_type = BRANCH_POLICY_DEFAULT_TYPES[
+ self.distro_source_package.distribution.branch_sharing_policy]
+ if default_type not in self.getAllowedInformationTypes(who):
+ return None
+ return default_type
def areRepositoriesMergeable(self, this, other):
"""See `IGitNamespacePolicy`."""
diff --git a/lib/lp/code/model/gitrepository.py b/lib/lp/code/model/gitrepository.py
index 1677d9e..0dd205b 100644
--- a/lib/lp/code/model/gitrepository.py
+++ b/lib/lp/code/model/gitrepository.py
@@ -659,10 +659,10 @@ class GitRepository(StormBase, WebhookTargetMixin, AccessTokenTargetMixin,
(abstract_artifact, policy) for policy in
getUtility(IAccessPolicySource).findByTeam([self.owner])}
else:
- # We haven't yet quite worked out how distribution privacy
- # works, so only work for projects for now.
if self.project is not None:
pillars = [self.project]
+ elif self.distribution is not None:
+ pillars = [self.distribution]
reconcile_access_for_artifacts(
[self], self.information_type, pillars, wanted_links)
diff --git a/lib/lp/code/model/tests/test_branch.py b/lib/lp/code/model/tests/test_branch.py
index 4419b12..9692104 100644
--- a/lib/lp/code/model/tests/test_branch.py
+++ b/lib/lp/code/model/tests/test_branch.py
@@ -17,6 +17,10 @@ from pytz import UTC
import six
from storm.exceptions import LostObjectError
from storm.locals import Store
+from testscenarios import (
+ load_tests_apply_scenarios,
+ WithScenarios,
+ )
from testtools import ExpectedException
from testtools.matchers import (
Not,
@@ -2531,13 +2535,17 @@ class TestBranchPrivacy(TestCaseWithFactory):
[(branch.product, InformationType.USERDATA)]),
get_policies_for_artifact(branch))
- def test__reconcileAccess_for_distro_branch(self):
- # Branch privacy isn't yet supported for distributions, so no
- # AccessPolicyArtifact is created for a distro branch.
+ def test__reconcileAccess_for_package_branch(self):
+ # _reconcileAccess uses a distribution policy for a package branch.
branch = self.factory.makePackageBranch(
information_type=InformationType.USERDATA)
+ [artifact] = getUtility(IAccessArtifactSource).ensure([branch])
+ getUtility(IAccessPolicyArtifactSource).deleteByArtifact([artifact])
removeSecurityProxy(branch)._reconcileAccess()
- self.assertEqual([], get_policies_for_artifact(branch))
+ self.assertContentEqual(
+ getUtility(IAccessPolicySource).find(
+ [(branch.distribution, InformationType.USERDATA)]),
+ get_policies_for_artifact(branch))
def test__reconcileAccess_for_personal_branch(self):
# _reconcileAccess uses a person policy for a personal branch.
@@ -2701,15 +2709,26 @@ class TestBranchSetPrivate(TestCaseWithFactory):
InformationType.PRIVATESECURITY, branch.information_type)
-class BranchModerateTestCase(TestCaseWithFactory):
- """Test that product owners and commercial admins can moderate branches."""
+class BranchModerateTestCase(WithScenarios, TestCaseWithFactory):
+ """Test that pillar owners and commercial admins can moderate branches."""
layer = DatabaseFunctionalLayer
+ scenarios = [
+ ("project", {"branch_factory_name": "makeProductBranch"}),
+ ("distribution", {"branch_factory_name": "makePackageBranch"}),
+ ]
+
+ def _makeBranch(self, **kwargs):
+ return getattr(self.factory, self.branch_factory_name)(**kwargs)
+
+ def _getPillar(self, branch):
+ return branch.product or branch.distribution
def test_moderate_permission(self):
# Test the ModerateBranch security checker.
- branch = self.factory.makeProductBranch()
- with person_logged_in(branch.product.owner):
+ branch = self._makeBranch()
+ pillar = self._getPillar(branch)
+ with person_logged_in(pillar.owner):
self.assertTrue(
check_permission('launchpad.Moderate', branch))
with celebrity_logged_in('commercial_admin'):
@@ -2718,25 +2737,27 @@ class BranchModerateTestCase(TestCaseWithFactory):
def test_methods_smoketest(self):
# Users with launchpad.Moderate can call transitionToInformationType.
- branch = self.factory.makeProductBranch()
- with person_logged_in(branch.product.owner):
- branch.product.setBranchSharingPolicy(BranchSharingPolicy.PUBLIC)
+ branch = self._makeBranch()
+ pillar = self._getPillar(branch)
+ with person_logged_in(pillar.owner):
+ pillar.setBranchSharingPolicy(BranchSharingPolicy.PUBLIC)
branch.transitionToInformationType(
- InformationType.PRIVATESECURITY, branch.product.owner)
+ InformationType.PRIVATESECURITY, pillar.owner)
self.assertEqual(
InformationType.PRIVATESECURITY, branch.information_type)
def test_attribute_smoketest(self):
# Users with launchpad.Moderate can set attrs.
- branch = self.factory.makeProductBranch()
- with person_logged_in(branch.product.owner):
+ branch = self._makeBranch()
+ pillar = self._getPillar(branch)
+ with person_logged_in(pillar.owner):
branch.name = 'not-secret'
branch.description = 'redacted'
- branch.reviewer = branch.product.owner
+ branch.reviewer = pillar.owner
branch.lifecycle_status = BranchLifecycleStatus.EXPERIMENTAL
self.assertEqual('not-secret', branch.name)
self.assertEqual('redacted', branch.description)
- self.assertEqual(branch.product.owner, branch.reviewer)
+ self.assertEqual(pillar.owner, branch.reviewer)
self.assertEqual(
BranchLifecycleStatus.EXPERIMENTAL, branch.lifecycle_status)
@@ -3574,3 +3595,6 @@ class TestWebservice(TestCaseWithFactory):
with admin_logged_in():
self.assertEqual(
1, len(list(getUtility(IBranchScanJobSource).iterReady())))
+
+
+load_tests = load_tests_apply_scenarios
diff --git a/lib/lp/code/model/tests/test_branchnamespace.py b/lib/lp/code/model/tests/test_branchnamespace.py
index aff13b1..7a4ff61 100644
--- a/lib/lp/code/model/tests/test_branchnamespace.py
+++ b/lib/lp/code/model/tests/test_branchnamespace.py
@@ -10,7 +10,6 @@ from lp.app.enums import (
FREE_INFORMATION_TYPES,
InformationType,
NON_EMBARGOED_INFORMATION_TYPES,
- PUBLIC_INFORMATION_TYPES,
)
from lp.app.interfaces.services import IService
from lp.app.validators import LaunchpadValidationError
@@ -594,6 +593,186 @@ class TestPackageBranchNamespace(TestCaseWithFactory, NamespaceMixin):
self.assertEqual(IBranchTarget(package), namespace.target)
+class TestPackageBranchNamespacePrivacyWithInformationType(
+ TestCaseWithFactory):
+ """Tests for the privacy aspects of `PackageBranchNamespace`.
+
+ This tests the behaviour for a package in a distribution using the new
+ branch_sharing_policy rules.
+ """
+
+ layer = DatabaseFunctionalLayer
+
+ def makePackageBranchNamespace(self, sharing_policy, person=None):
+ if person is None:
+ person = self.factory.makePerson()
+ package = self.factory.makeSourcePackage()
+ self.factory.makeCommercialSubscription(pillar=package.distribution)
+ with person_logged_in(package.distribution.owner):
+ package.distribution.setBranchSharingPolicy(sharing_policy)
+ namespace = PackageBranchNamespace(person, package)
+ return namespace
+
+ def test_public_anyone(self):
+ namespace = self.makePackageBranchNamespace(
+ BranchSharingPolicy.PUBLIC)
+ self.assertContentEqual(
+ FREE_INFORMATION_TYPES, namespace.getAllowedInformationTypes())
+ self.assertEqual(
+ InformationType.PUBLIC, namespace.getDefaultInformationType())
+
+ def test_forbidden_anyone(self):
+ namespace = self.makePackageBranchNamespace(
+ BranchSharingPolicy.FORBIDDEN)
+ self.assertContentEqual([], namespace.getAllowedInformationTypes())
+ self.assertEqual(None, namespace.getDefaultInformationType())
+
+ def test_public_or_proprietary_anyone(self):
+ namespace = self.makePackageBranchNamespace(
+ BranchSharingPolicy.PUBLIC_OR_PROPRIETARY)
+ self.assertContentEqual(
+ NON_EMBARGOED_INFORMATION_TYPES,
+ namespace.getAllowedInformationTypes())
+ self.assertEqual(
+ InformationType.PUBLIC, namespace.getDefaultInformationType())
+
+ def test_proprietary_or_public_anyone(self):
+ namespace = self.makePackageBranchNamespace(
+ BranchSharingPolicy.PROPRIETARY_OR_PUBLIC)
+ self.assertContentEqual([], namespace.getAllowedInformationTypes())
+ self.assertIs(None, namespace.getDefaultInformationType())
+
+ def test_proprietary_or_public_owner_grantee(self):
+ namespace = self.makePackageBranchNamespace(
+ BranchSharingPolicy.PROPRIETARY_OR_PUBLIC)
+ distribution = namespace.sourcepackage.distribution
+ with person_logged_in(distribution.owner):
+ getUtility(IService, 'sharing').sharePillarInformation(
+ distribution, namespace.owner, distribution.owner,
+ {InformationType.PROPRIETARY: SharingPermission.ALL})
+ self.assertContentEqual(
+ NON_EMBARGOED_INFORMATION_TYPES,
+ namespace.getAllowedInformationTypes())
+ self.assertEqual(
+ InformationType.PROPRIETARY,
+ namespace.getDefaultInformationType())
+
+ def test_proprietary_or_public_caller_grantee(self):
+ namespace = self.makePackageBranchNamespace(
+ BranchSharingPolicy.PROPRIETARY_OR_PUBLIC)
+ distribution = namespace.sourcepackage.distribution
+ grantee = self.factory.makePerson()
+ with person_logged_in(distribution.owner):
+ getUtility(IService, 'sharing').sharePillarInformation(
+ distribution, grantee, distribution.owner,
+ {InformationType.PROPRIETARY: SharingPermission.ALL})
+ self.assertContentEqual(
+ NON_EMBARGOED_INFORMATION_TYPES,
+ namespace.getAllowedInformationTypes(grantee))
+ self.assertEqual(
+ InformationType.PROPRIETARY,
+ namespace.getDefaultInformationType(grantee))
+
+ def test_proprietary_anyone(self):
+ namespace = self.makePackageBranchNamespace(
+ BranchSharingPolicy.PROPRIETARY)
+ self.assertContentEqual([], namespace.getAllowedInformationTypes())
+ self.assertIs(None, namespace.getDefaultInformationType())
+
+ def test_proprietary_branch_owner_grantee(self):
+ namespace = self.makePackageBranchNamespace(
+ BranchSharingPolicy.PROPRIETARY)
+ distribution = namespace.sourcepackage.distribution
+ with person_logged_in(distribution.owner):
+ getUtility(IService, 'sharing').sharePillarInformation(
+ distribution, namespace.owner, distribution.owner,
+ {InformationType.PROPRIETARY: SharingPermission.ALL})
+ self.assertContentEqual(
+ [InformationType.PROPRIETARY],
+ namespace.getAllowedInformationTypes())
+ self.assertEqual(
+ InformationType.PROPRIETARY,
+ namespace.getDefaultInformationType())
+
+ def test_proprietary_caller_grantee(self):
+ namespace = self.makePackageBranchNamespace(
+ BranchSharingPolicy.PROPRIETARY)
+ distribution = namespace.sourcepackage.distribution
+ grantee = self.factory.makePerson()
+ with person_logged_in(distribution.owner):
+ getUtility(IService, 'sharing').sharePillarInformation(
+ distribution, grantee, distribution.owner,
+ {InformationType.PROPRIETARY: SharingPermission.ALL})
+ self.assertContentEqual(
+ [InformationType.PROPRIETARY],
+ namespace.getAllowedInformationTypes(grantee))
+ self.assertEqual(
+ InformationType.PROPRIETARY,
+ namespace.getDefaultInformationType(grantee))
+
+ def test_embargoed_or_proprietary_anyone(self):
+ namespace = self.makePackageBranchNamespace(
+ BranchSharingPolicy.EMBARGOED_OR_PROPRIETARY)
+ self.assertContentEqual([], namespace.getAllowedInformationTypes())
+ self.assertIs(None, namespace.getDefaultInformationType())
+
+ def test_embargoed_or_proprietary_owner_grantee(self):
+ namespace = self.makePackageBranchNamespace(
+ BranchSharingPolicy.EMBARGOED_OR_PROPRIETARY)
+ distribution = namespace.sourcepackage.distribution
+ with person_logged_in(distribution.owner):
+ getUtility(IService, 'sharing').sharePillarInformation(
+ distribution, namespace.owner, distribution.owner,
+ {InformationType.PROPRIETARY: SharingPermission.ALL})
+ self.assertContentEqual(
+ [InformationType.PROPRIETARY, InformationType.EMBARGOED],
+ namespace.getAllowedInformationTypes())
+ self.assertEqual(
+ InformationType.EMBARGOED,
+ namespace.getDefaultInformationType())
+
+ def test_embargoed_or_proprietary_caller_grantee(self):
+ namespace = self.makePackageBranchNamespace(
+ BranchSharingPolicy.EMBARGOED_OR_PROPRIETARY)
+ distribution = namespace.sourcepackage.distribution
+ grantee = self.factory.makePerson()
+ with person_logged_in(distribution.owner):
+ getUtility(IService, 'sharing').sharePillarInformation(
+ distribution, grantee, distribution.owner,
+ {InformationType.PROPRIETARY: SharingPermission.ALL})
+ self.assertContentEqual(
+ [InformationType.PROPRIETARY, InformationType.EMBARGOED],
+ namespace.getAllowedInformationTypes(grantee))
+ self.assertEqual(
+ InformationType.EMBARGOED,
+ namespace.getDefaultInformationType(grantee))
+
+ def test_grantee_has_no_artifact_grant(self):
+ # The owner of a new branch in a distribution whose default
+ # information type is non-public does not have an artifact grant
+ # specifically for the new branch, because their existing policy
+ # grant is sufficient.
+ person = self.factory.makePerson()
+ team = self.factory.makeTeam(members=[person])
+ namespace = self.makePackageBranchNamespace(
+ BranchSharingPolicy.PROPRIETARY, person=person)
+ distribution = namespace.sourcepackage.distribution
+ with person_logged_in(distribution.owner):
+ getUtility(IService, 'sharing').sharePillarInformation(
+ distribution, team, distribution.owner,
+ {InformationType.PROPRIETARY: SharingPermission.ALL})
+ branch = namespace.createBranch(
+ BranchType.HOSTED, self.factory.getUniqueString(), person)
+ [policy] = getUtility(IAccessPolicySource).find(
+ [(distribution, InformationType.PROPRIETARY)])
+ apgfs = getUtility(IAccessPolicyGrantFlatSource)
+ self.assertContentEqual(
+ [(distribution.owner, {policy: SharingPermission.ALL}, []),
+ (team, {policy: SharingPermission.ALL}, [])],
+ apgfs.findGranteePermissionsByPolicy([policy]))
+ self.assertTrue(removeSecurityProxy(branch).visibleByUser(person))
+
+
class TestNamespaceSet(TestCaseWithFactory):
"""Tests for `get_namespace`."""
@@ -1041,21 +1220,6 @@ class TestPersonalBranchNamespaceAllowedInformationTypes(TestCaseWithFactory):
namespace.getAllowedInformationTypes())
-class TestPackageBranchNamespaceAllowedInformationTypes(TestCaseWithFactory):
- """Tests for PackageBranchNamespace.getAllowedInformationTypes."""
-
- layer = DatabaseFunctionalLayer
-
- def test_anyone(self):
- # Source package branches are always public.
- source_package = self.factory.makeSourcePackage()
- person = self.factory.makePerson()
- namespace = PackageBranchNamespace(person, source_package)
- self.assertContentEqual(
- PUBLIC_INFORMATION_TYPES,
- namespace.getAllowedInformationTypes())
-
-
class BaseValidateNewBranchMixin:
layer = DatabaseFunctionalLayer
diff --git a/lib/lp/code/model/tests/test_gitnamespace.py b/lib/lp/code/model/tests/test_gitnamespace.py
index ac469e9..1ca8b5f 100644
--- a/lib/lp/code/model/tests/test_gitnamespace.py
+++ b/lib/lp/code/model/tests/test_gitnamespace.py
@@ -885,6 +885,183 @@ class TestPackageGitNamespace(TestCaseWithFactory, NamespaceMixin):
repositories[0].namespace.collection.getRepositories())
+class TestPackageGitNamespacePrivacyWithInformationType(TestCaseWithFactory):
+ """Tests for the privacy aspects of `PackageGitNamespace`.
+
+ This tests the behaviour for a package in a distribution using the new
+ branch_sharing_policy rules.
+ """
+
+ layer = DatabaseFunctionalLayer
+
+ def makePackageGitNamespace(self, sharing_policy, person=None):
+ if person is None:
+ person = self.factory.makePerson()
+ dsp = self.factory.makeDistributionSourcePackage()
+ self.factory.makeCommercialSubscription(pillar=dsp.distribution)
+ with person_logged_in(dsp.distribution.owner):
+ dsp.distribution.setBranchSharingPolicy(sharing_policy)
+ namespace = PackageGitNamespace(person, dsp)
+ return namespace
+
+ def test_public_anyone(self):
+ namespace = self.makePackageGitNamespace(BranchSharingPolicy.PUBLIC)
+ self.assertContentEqual(
+ FREE_INFORMATION_TYPES, namespace.getAllowedInformationTypes())
+ self.assertEqual(
+ InformationType.PUBLIC, namespace.getDefaultInformationType())
+
+ def test_forbidden_anyone(self):
+ namespace = self.makePackageGitNamespace(BranchSharingPolicy.FORBIDDEN)
+ self.assertEqual([], namespace.getAllowedInformationTypes())
+ self.assertIsNone(namespace.getDefaultInformationType())
+
+ def test_public_or_proprietary_anyone(self):
+ namespace = self.makePackageGitNamespace(
+ BranchSharingPolicy.PUBLIC_OR_PROPRIETARY)
+ self.assertContentEqual(
+ NON_EMBARGOED_INFORMATION_TYPES,
+ namespace.getAllowedInformationTypes())
+ self.assertEqual(
+ InformationType.PUBLIC, namespace.getDefaultInformationType())
+
+ def test_proprietary_or_public_anyone(self):
+ namespace = self.makePackageGitNamespace(
+ BranchSharingPolicy.PROPRIETARY_OR_PUBLIC)
+ self.assertEqual([], namespace.getAllowedInformationTypes())
+ self.assertIsNone(namespace.getDefaultInformationType())
+
+ def test_proprietary_or_public_owner_grantee(self):
+ namespace = self.makePackageGitNamespace(
+ BranchSharingPolicy.PROPRIETARY_OR_PUBLIC)
+ distribution = namespace.distro_source_package.distribution
+ with person_logged_in(distribution.owner):
+ getUtility(IService, "sharing").sharePillarInformation(
+ distribution, namespace.owner, distribution.owner,
+ {InformationType.PROPRIETARY: SharingPermission.ALL})
+ self.assertContentEqual(
+ NON_EMBARGOED_INFORMATION_TYPES,
+ namespace.getAllowedInformationTypes())
+ self.assertEqual(
+ InformationType.PROPRIETARY,
+ namespace.getDefaultInformationType())
+
+ def test_proprietary_or_public_caller_grantee(self):
+ namespace = self.makePackageGitNamespace(
+ BranchSharingPolicy.PROPRIETARY_OR_PUBLIC)
+ distribution = namespace.distro_source_package.distribution
+ grantee = self.factory.makePerson()
+ with person_logged_in(distribution.owner):
+ getUtility(IService, "sharing").sharePillarInformation(
+ distribution, grantee, distribution.owner,
+ {InformationType.PROPRIETARY: SharingPermission.ALL})
+ self.assertContentEqual(
+ NON_EMBARGOED_INFORMATION_TYPES,
+ namespace.getAllowedInformationTypes(grantee))
+ self.assertEqual(
+ InformationType.PROPRIETARY,
+ namespace.getDefaultInformationType(grantee))
+
+ def test_proprietary_anyone(self):
+ namespace = self.makePackageGitNamespace(
+ BranchSharingPolicy.PROPRIETARY)
+ self.assertEqual([], namespace.getAllowedInformationTypes())
+ self.assertIsNone(namespace.getDefaultInformationType())
+
+ def test_proprietary_repository_owner_grantee(self):
+ namespace = self.makePackageGitNamespace(
+ BranchSharingPolicy.PROPRIETARY)
+ distribution = namespace.distro_source_package.distribution
+ with person_logged_in(distribution.owner):
+ getUtility(IService, "sharing").sharePillarInformation(
+ distribution, namespace.owner, distribution.owner,
+ {InformationType.PROPRIETARY: SharingPermission.ALL})
+ self.assertContentEqual(
+ [InformationType.PROPRIETARY],
+ namespace.getAllowedInformationTypes())
+ self.assertEqual(
+ InformationType.PROPRIETARY,
+ namespace.getDefaultInformationType())
+
+ def test_proprietary_caller_grantee(self):
+ namespace = self.makePackageGitNamespace(
+ BranchSharingPolicy.PROPRIETARY)
+ distribution = namespace.distro_source_package.distribution
+ grantee = self.factory.makePerson()
+ with person_logged_in(distribution.owner):
+ getUtility(IService, "sharing").sharePillarInformation(
+ distribution, grantee, distribution.owner,
+ {InformationType.PROPRIETARY: SharingPermission.ALL})
+ self.assertContentEqual(
+ [InformationType.PROPRIETARY],
+ namespace.getAllowedInformationTypes(grantee))
+ self.assertEqual(
+ InformationType.PROPRIETARY,
+ namespace.getDefaultInformationType(grantee))
+
+ def test_embargoed_or_proprietary_anyone(self):
+ namespace = self.makePackageGitNamespace(
+ BranchSharingPolicy.EMBARGOED_OR_PROPRIETARY)
+ self.assertEqual([], namespace.getAllowedInformationTypes())
+ self.assertIsNone(namespace.getDefaultInformationType())
+
+ def test_embargoed_or_proprietary_owner_grantee(self):
+ namespace = self.makePackageGitNamespace(
+ BranchSharingPolicy.EMBARGOED_OR_PROPRIETARY)
+ distribution = namespace.distro_source_package.distribution
+ with person_logged_in(distribution.owner):
+ getUtility(IService, "sharing").sharePillarInformation(
+ distribution, namespace.owner, distribution.owner,
+ {InformationType.PROPRIETARY: SharingPermission.ALL})
+ self.assertContentEqual(
+ [InformationType.PROPRIETARY, InformationType.EMBARGOED],
+ namespace.getAllowedInformationTypes())
+ self.assertEqual(
+ InformationType.EMBARGOED,
+ namespace.getDefaultInformationType())
+
+ def test_embargoed_or_proprietary_caller_grantee(self):
+ namespace = self.makePackageGitNamespace(
+ BranchSharingPolicy.EMBARGOED_OR_PROPRIETARY)
+ distribution = namespace.distro_source_package.distribution
+ grantee = self.factory.makePerson()
+ with person_logged_in(distribution.owner):
+ getUtility(IService, "sharing").sharePillarInformation(
+ distribution, grantee, distribution.owner,
+ {InformationType.PROPRIETARY: SharingPermission.ALL})
+ self.assertContentEqual(
+ [InformationType.PROPRIETARY, InformationType.EMBARGOED],
+ namespace.getAllowedInformationTypes(grantee))
+ self.assertEqual(
+ InformationType.EMBARGOED,
+ namespace.getDefaultInformationType(grantee))
+
+ def test_grantee_has_no_artifact_grant(self):
+ # The owner of a new repository in a distribution whose default
+ # information type is non-public does not have an artifact grant
+ # specifically for the new repository, because their existing policy
+ # grant is sufficient.
+ person = self.factory.makePerson()
+ team = self.factory.makeTeam(members=[person])
+ namespace = self.makePackageGitNamespace(
+ BranchSharingPolicy.PROPRIETARY, person=person)
+ distribution = namespace.distro_source_package.distribution
+ with person_logged_in(distribution.owner):
+ getUtility(IService, 'sharing').sharePillarInformation(
+ distribution, team, distribution.owner,
+ {InformationType.PROPRIETARY: SharingPermission.ALL})
+ repository = namespace.createRepository(
+ GitRepositoryType.HOSTED, person, self.factory.getUniqueUnicode())
+ [policy] = getUtility(IAccessPolicySource).find(
+ [(distribution, InformationType.PROPRIETARY)])
+ apgfs = getUtility(IAccessPolicyGrantFlatSource)
+ self.assertContentEqual(
+ [(distribution.owner, {policy: SharingPermission.ALL}, []),
+ (team, {policy: SharingPermission.ALL}, [])],
+ apgfs.findGranteePermissionsByPolicy([policy]))
+ self.assertTrue(removeSecurityProxy(repository).visibleByUser(person))
+
+
class BaseCanCreateRepositoriesMixin:
"""Common tests for all namespaces."""
@@ -1040,20 +1217,6 @@ class TestPersonalGitNamespaceAllowedInformationTypes(TestCaseWithFactory):
namespace.getAllowedInformationTypes())
-class TestPackageGitNamespaceAllowedInformationTypes(TestCaseWithFactory):
- """Tests for PackageGitNamespace.getAllowedInformationTypes."""
-
- layer = DatabaseFunctionalLayer
-
- def test_anyone(self):
- # Package repositories are always public.
- dsp = self.factory.makeDistributionSourcePackage()
- person = self.factory.makePerson()
- namespace = PackageGitNamespace(person, dsp)
- self.assertContentEqual(
- PUBLIC_INFORMATION_TYPES, namespace.getAllowedInformationTypes())
-
-
class TestOCIProjectGitNamespaceAllowedInformationTypes(TestCaseWithFactory):
"""Tests for OCIProjectGitNamespace.getAllowedInformationTypes."""
diff --git a/lib/lp/code/model/tests/test_gitrepository.py b/lib/lp/code/model/tests/test_gitrepository.py
index 3ad7c8e..90bf08e 100644
--- a/lib/lp/code/model/tests/test_gitrepository.py
+++ b/lib/lp/code/model/tests/test_gitrepository.py
@@ -20,6 +20,10 @@ import pytz
import six
from storm.exceptions import LostObjectError
from storm.store import Store
+from testscenarios import (
+ load_tests_apply_scenarios,
+ WithScenarios,
+ )
from testtools.matchers import (
AnyMatch,
ContainsDict,
@@ -141,6 +145,10 @@ from lp.registry.interfaces.accesspolicy import (
IAccessPolicyArtifactSource,
IAccessPolicySource,
)
+from lp.registry.interfaces.distributionsourcepackage import (
+ IDistributionSourcePackage,
+ )
+from lp.registry.interfaces.ociproject import IOCIProject
from lp.registry.interfaces.person import IPerson
from lp.registry.interfaces.persondistributionsourcepackage import (
IPersonDistributionSourcePackageFactory,
@@ -1746,13 +1754,18 @@ class TestGitRepositoryPrivacy(TestCaseWithFactory):
get_policies_for_artifact(repository))
def test__reconcileAccess_for_package_repository(self):
- # Git repository privacy isn't yet supported for distributions, so
- # no AccessPolicyArtifact is created for a package repository.
+ # _reconcileAccess uses a distribution policy for a package
+ # repository.
repository = self.factory.makeGitRepository(
target=self.factory.makeDistributionSourcePackage(),
information_type=InformationType.USERDATA)
+ [artifact] = getUtility(IAccessArtifactSource).ensure([repository])
+ getUtility(IAccessPolicyArtifactSource).deleteByArtifact([artifact])
removeSecurityProxy(repository)._reconcileAccess()
- self.assertEqual([], get_policies_for_artifact(repository))
+ self.assertContentEqual(
+ getUtility(IAccessPolicySource).find(
+ [(repository.target.distribution, InformationType.USERDATA)]),
+ get_policies_for_artifact(repository))
def test__reconcileAccess_for_oci_project_repository(self):
# Git repository privacy isn't yet supported for OCI projects, so no
@@ -2356,17 +2369,36 @@ class TestGitRepositoryGetAllowedInformationTypes(TestCaseWithFactory):
repository.getAllowedInformationTypes(admin))
-class TestGitRepositoryModerate(TestCaseWithFactory):
+class TestGitRepositoryModerate(WithScenarios, TestCaseWithFactory):
"""Test that project owners and commercial admins can moderate Git
repositories."""
layer = DatabaseFunctionalLayer
+ scenarios = [
+ ("project", {"target_factory_name": "makeProduct"}),
+ ("distribution",
+ {"target_factory_name": "makeDistributionSourcePackage"}),
+ ("OCI project", {"target_factory_name": "makeOCIProject"}),
+ ]
+
+ def _makeGitRepository(self, **kwargs):
+ target = getattr(self.factory, self.target_factory_name)()
+ return self.factory.makeGitRepository(target=target, **kwargs)
+
+ def _getPillar(self, repository):
+ target = repository.target
+ if IDistributionSourcePackage.providedBy(target):
+ return target.distribution
+ elif IOCIProject.providedBy(target):
+ return target.pillar
+ else:
+ return target
def test_moderate_permission(self):
# Test the ModerateGitRepository security checker.
- project = self.factory.makeProduct()
- repository = self.factory.makeGitRepository(target=project)
- with person_logged_in(project.owner):
+ repository = self._makeGitRepository()
+ pillar = self._getPillar(repository)
+ with person_logged_in(pillar.owner):
self.assertTrue(check_permission("launchpad.Moderate", repository))
with celebrity_logged_in("commercial_admin"):
self.assertTrue(check_permission("launchpad.Moderate", repository))
@@ -2376,24 +2408,26 @@ class TestGitRepositoryModerate(TestCaseWithFactory):
def test_methods_smoketest(self):
# Users with launchpad.Moderate can call transitionToInformationType.
- project = self.factory.makeProduct()
- repository = self.factory.makeGitRepository(target=project)
- with person_logged_in(project.owner):
- project.setBranchSharingPolicy(BranchSharingPolicy.PUBLIC)
+ if self.target_factory_name == "makeOCIProject":
+ self.skipTest("Not implemented for OCI projects yet.")
+ repository = self._makeGitRepository()
+ pillar = self._getPillar(repository)
+ with person_logged_in(pillar.owner):
+ pillar.setBranchSharingPolicy(BranchSharingPolicy.PUBLIC)
repository.transitionToInformationType(
- InformationType.PRIVATESECURITY, project.owner)
+ InformationType.PRIVATESECURITY, pillar.owner)
self.assertEqual(
InformationType.PRIVATESECURITY, repository.information_type)
def test_attribute_smoketest(self):
# Users with launchpad.Moderate can set attributes.
- project = self.factory.makeProduct()
- repository = self.factory.makeGitRepository(target=project)
- with person_logged_in(project.owner):
+ repository = self._makeGitRepository()
+ pillar = self._getPillar(repository)
+ with person_logged_in(pillar.owner):
repository.description = "something"
- repository.reviewer = project.owner
+ repository.reviewer = pillar.owner
self.assertEqual("something", repository.description)
- self.assertEqual(project.owner, repository.reviewer)
+ self.assertEqual(pillar.owner, repository.reviewer)
class TestGitRepositoryIsPersonTrustedReviewer(TestCaseWithFactory):
@@ -5454,3 +5488,6 @@ class TestGitRepositoryMacaroonIssuer(MacaroonTestMixin, TestCaseWithFactory):
["Caveat check for '%s' failed." %
find_caveats_by_name(macaroon2, "lp.expires")[0].caveat_id],
issuer, macaroon2, repository, user=repository.owner)
+
+
+load_tests = load_tests_apply_scenarios
diff --git a/lib/lp/registry/interfaces/distribution.py b/lib/lp/registry/interfaces/distribution.py
index 7e943c1..2e3a406 100644
--- a/lib/lp/registry/interfaces/distribution.py
+++ b/lib/lp/registry/interfaces/distribution.py
@@ -29,6 +29,7 @@ from lazr.restful.declarations import (
exported,
exported_as_webservice_collection,
exported_as_webservice_entry,
+ mutator_for,
operation_for_version,
operation_parameters,
operation_returns_collection_of,
@@ -780,6 +781,41 @@ class IDistributionView(
class IDistributionEditRestricted(IOfficialBugTagTargetRestricted):
"""IDistribution properties requiring launchpad.Edit permission."""
+ @mutator_for(IDistributionView['bug_sharing_policy'])
+ @operation_parameters(bug_sharing_policy=copy_field(
+ IDistributionView['bug_sharing_policy']))
+ @export_write_operation()
+ @operation_for_version("devel")
+ def setBugSharingPolicy(bug_sharing_policy):
+ """Mutator for bug_sharing_policy.
+
+ Checks authorization and entitlement.
+ """
+
+ @mutator_for(IDistributionView['branch_sharing_policy'])
+ @operation_parameters(
+ branch_sharing_policy=copy_field(
+ IDistributionView['branch_sharing_policy']))
+ @export_write_operation()
+ @operation_for_version("devel")
+ def setBranchSharingPolicy(branch_sharing_policy):
+ """Mutator for branch_sharing_policy.
+
+ Checks authorization and entitlement.
+ """
+
+ @mutator_for(IDistributionView['specification_sharing_policy'])
+ @operation_parameters(
+ specification_sharing_policy=copy_field(
+ IDistributionView['specification_sharing_policy']))
+ @export_write_operation()
+ @operation_for_version("devel")
+ def setSpecificationSharingPolicy(specification_sharing_policy):
+ """Mutator for specification_sharing_policy.
+
+ Checks authorization and entitlement.
+ """
+
def checkInformationType(value):
"""Check whether the information type change should be permitted.
diff --git a/lib/lp/registry/model/distribution.py b/lib/lp/registry/model/distribution.py
index 94558ca..82f062d 100644
--- a/lib/lp/registry/model/distribution.py
+++ b/lib/lp/registry/model/distribution.py
@@ -83,11 +83,17 @@ from lp.blueprints.enums import SpecificationFilter
from lp.blueprints.model.specification import (
HasSpecificationsMixin,
Specification,
+ SPECIFICATION_POLICY_ALLOWED_TYPES,
+ SPECIFICATION_POLICY_DEFAULT_TYPES,
)
from lp.blueprints.model.specificationsearch import search_specifications
from lp.blueprints.model.sprint import HasSprintsMixin
from lp.bugs.interfaces.bugsummary import IBugSummaryDimension
from lp.bugs.interfaces.bugsupervisor import IHasBugSupervisor
+from lp.bugs.interfaces.bugtarget import (
+ BUG_POLICY_ALLOWED_TYPES,
+ BUG_POLICY_DEFAULT_TYPES,
+ )
from lp.bugs.interfaces.bugtaskfilter import OrderedBugTask
from lp.bugs.model.bugtarget import (
BugTargetBase,
@@ -101,6 +107,7 @@ from lp.code.interfaces.seriessourcepackagebranch import (
IFindOfficialBranchLinks,
)
from lp.code.model.branch import Branch
+from lp.code.model.branchnamespace import BRANCH_POLICY_ALLOWED_TYPES
from lp.oci.interfaces.ociregistrycredentials import (
IOCIRegistryCredentialsSet,
)
@@ -119,6 +126,7 @@ from lp.registry.errors import (
ProprietaryPillar,
)
from lp.registry.interfaces.accesspolicy import (
+ IAccessPolicyArtifactSource,
IAccessPolicyGrantSource,
IAccessPolicySource,
)
@@ -240,6 +248,24 @@ from lp.translations.model.potemplate import POTemplate
from lp.translations.model.translationpolicy import TranslationPolicyMixin
+bug_policy_default = {
+ InformationType.PUBLIC: BugSharingPolicy.PUBLIC,
+ InformationType.PROPRIETARY: BugSharingPolicy.PROPRIETARY,
+ }
+
+
+branch_policy_default = {
+ InformationType.PUBLIC: BranchSharingPolicy.PUBLIC,
+ InformationType.PROPRIETARY: BranchSharingPolicy.PROPRIETARY,
+ }
+
+
+specification_policy_default = {
+ InformationType.PUBLIC: SpecificationSharingPolicy.PUBLIC,
+ InformationType.PROPRIETARY: SpecificationSharingPolicy.PROPRIETARY,
+ }
+
+
@implementer(
IBugSummaryDimension, IDistribution, IHasBugSupervisor,
IHasBuildRecords, IHasIcon, IHasLogo, IHasMugshot,
@@ -462,6 +488,10 @@ class Distribution(SQLBase, BugTargetBase, MakesAnnouncements,
if (old_info_type == InformationType.PUBLIC and
value != InformationType.PUBLIC):
self._ensure_complimentary_subscription()
+ self.setBranchSharingPolicy(branch_policy_default[value])
+ self.setBugSharingPolicy(bug_policy_default[value])
+ self.setSpecificationSharingPolicy(
+ specification_policy_default[value])
self._ensurePolicies([value])
@property
@@ -469,23 +499,51 @@ class Distribution(SQLBase, BugTargetBase, MakesAnnouncements,
"""See `IPillar`."""
return "Distribution"
- @property
- def branch_sharing_policy(self):
- """See `IHasSharingPolicies."""
- # Sharing policy for distributions is always PUBLIC.
- return BranchSharingPolicy.PUBLIC
-
- @property
- def bug_sharing_policy(self):
- """See `IHasSharingPolicies."""
- # Sharing policy for distributions is always PUBLIC.
- return BugSharingPolicy.PUBLIC
-
- @property
- def specification_sharing_policy(self):
- """See `IHasSharingPolicies."""
- # Sharing policy for distributions is always PUBLIC.
- return SpecificationSharingPolicy.PUBLIC
+ bug_sharing_policy = DBEnum(
+ enum=BugSharingPolicy, allow_none=True,
+ default=BugSharingPolicy.PUBLIC)
+ branch_sharing_policy = DBEnum(
+ enum=BranchSharingPolicy, allow_none=True,
+ default=BranchSharingPolicy.PUBLIC)
+ specification_sharing_policy = DBEnum(
+ enum=SpecificationSharingPolicy, allow_none=True,
+ default=SpecificationSharingPolicy.PUBLIC)
+
+ def _prepare_to_set_sharing_policy(self, var, enum, kind, allowed_types):
+ if (var not in {enum.PUBLIC, enum.FORBIDDEN} and
+ not self.has_current_commercial_subscription):
+ raise CommercialSubscribersOnly(
+ "A current commercial subscription is required to use "
+ "proprietary %s." % kind)
+ if self.information_type != InformationType.PUBLIC:
+ if InformationType.PUBLIC in allowed_types[var]:
+ raise ProprietaryPillar(
+ "The distribution is %s." % self.information_type.title)
+ self._ensurePolicies(allowed_types[var])
+
+ def setBranchSharingPolicy(self, branch_sharing_policy):
+ """See `IDistributionEditRestricted`."""
+ self._prepare_to_set_sharing_policy(
+ branch_sharing_policy, BranchSharingPolicy, 'branches',
+ BRANCH_POLICY_ALLOWED_TYPES)
+ self.branch_sharing_policy = branch_sharing_policy
+ self._pruneUnusedPolicies()
+
+ def setBugSharingPolicy(self, bug_sharing_policy):
+ """See `IDistributionEditRestricted`."""
+ self._prepare_to_set_sharing_policy(
+ bug_sharing_policy, BugSharingPolicy, 'bugs',
+ BUG_POLICY_ALLOWED_TYPES)
+ self.bug_sharing_policy = bug_sharing_policy
+ self._pruneUnusedPolicies()
+
+ def setSpecificationSharingPolicy(self, specification_sharing_policy):
+ """See `IDistributionEditRestricted`."""
+ self._prepare_to_set_sharing_policy(
+ specification_sharing_policy, SpecificationSharingPolicy,
+ 'specifications', SPECIFICATION_POLICY_ALLOWED_TYPES)
+ self.specification_sharing_policy = specification_sharing_policy
+ self._pruneUnusedPolicies()
# Cache of AccessPolicy.ids that convey launchpad.LimitedView.
# Unlike artifacts' cached access_policies, an AccessArtifactGrant
@@ -530,6 +588,33 @@ class Distribution(SQLBase, BugTargetBase, MakesAnnouncements,
else:
self.access_policies = None
+ def _pruneUnusedPolicies(self):
+ allowed_bug_types = set(
+ BUG_POLICY_ALLOWED_TYPES.get(
+ self.bug_sharing_policy, FREE_INFORMATION_TYPES))
+ allowed_branch_types = set(
+ BRANCH_POLICY_ALLOWED_TYPES.get(
+ self.branch_sharing_policy, FREE_INFORMATION_TYPES))
+ allowed_spec_types = set(
+ SPECIFICATION_POLICY_ALLOWED_TYPES.get(
+ self.specification_sharing_policy, [InformationType.PUBLIC]))
+ allowed_types = (
+ allowed_bug_types | allowed_branch_types | allowed_spec_types)
+ allowed_types.add(self.information_type)
+ # Fetch all APs, and after filtering out ones that are forbidden
+ # by the bug, branch, and specification policies, the APs that have no
+ # APAs are unused and can be deleted.
+ ap_source = getUtility(IAccessPolicySource)
+ access_policies = set(ap_source.findByPillar([self]))
+ apa_source = getUtility(IAccessPolicyArtifactSource)
+ unused_aps = [
+ ap for ap in access_policies
+ if ap.type not in allowed_types
+ and apa_source.findByPolicy([ap]).is_empty()]
+ getUtility(IAccessPolicyGrantSource).revokeByPolicy(unused_aps)
+ ap_source.delete([(ap.pillar, ap.type) for ap in unused_aps])
+ self._cacheAccessPolicies()
+
@cachedproperty
def commercial_subscription(self):
return IStore(CommercialSubscription).find(
@@ -1157,11 +1242,13 @@ class Distribution(SQLBase, BugTargetBase, MakesAnnouncements,
def getAllowedSpecificationInformationTypes(self):
"""See `ISpecificationTarget`."""
- return (InformationType.PUBLIC,)
+ return SPECIFICATION_POLICY_ALLOWED_TYPES[
+ self.specification_sharing_policy]
def getDefaultSpecificationInformationType(self):
"""See `ISpecificationTarget`."""
- return InformationType.PUBLIC
+ return SPECIFICATION_POLICY_DEFAULT_TYPES[
+ self.specification_sharing_policy]
def searchQuestions(self, search_text=None,
status=QUESTION_STATUS_DEFAULT_SEARCH,
@@ -1639,11 +1726,11 @@ class Distribution(SQLBase, BugTargetBase, MakesAnnouncements,
def getAllowedBugInformationTypes(self):
"""See `IDistribution.`"""
- return FREE_INFORMATION_TYPES
+ return BUG_POLICY_ALLOWED_TYPES[self.bug_sharing_policy]
def getDefaultBugInformationType(self):
"""See `IDistribution.`"""
- return InformationType.PUBLIC
+ return BUG_POLICY_DEFAULT_TYPES[self.bug_sharing_policy]
def userCanEdit(self, user):
"""See `IDistribution`."""
@@ -1922,12 +2009,11 @@ class DistributionSet:
IStore(distro).add(distro)
if information_type != InformationType.PUBLIC:
distro._ensure_complimentary_subscription()
- # XXX cjwatson 2022-02-10: Replace this with sharing policies once
- # those are defined here.
- distro._ensurePolicies(
- [information_type]
- if information_type == InformationType.PROPRIETARY
- else FREE_INFORMATION_TYPES)
+ distro.setBugSharingPolicy(bug_policy_default[information_type])
+ distro.setBranchSharingPolicy(
+ branch_policy_default[information_type])
+ distro.setSpecificationSharingPolicy(
+ specification_policy_default[information_type])
if information_type == InformationType.PUBLIC:
getUtility(IArchiveSet).new(
distribution=distro, owner=owner,
diff --git a/lib/lp/registry/services/sharingservice.py b/lib/lp/registry/services/sharingservice.py
index 9480512..9713061 100644
--- a/lib/lp/registry/services/sharingservice.py
+++ b/lib/lp/registry/services/sharingservice.py
@@ -549,15 +549,9 @@ class SharingService:
def getBranchSharingPolicies(self, pillar):
"""See `ISharingService`."""
- # Only Products have branch sharing policies. Distributions just
- # default to Public.
- # If the branch sharing policy is EMBARGOED_OR_PROPRIETARY, then we
- # do not allow any other policies.
allowed_policies = [BranchSharingPolicy.PUBLIC]
- # Commercial projects also allow proprietary branches.
- if (IProduct.providedBy(pillar)
- and pillar.has_current_commercial_subscription):
-
+ # Commercial pillars also allow proprietary branches.
+ if pillar.has_current_commercial_subscription:
if pillar.private:
allowed_policies = [
BranchSharingPolicy.EMBARGOED_OR_PROPRIETARY,
@@ -579,13 +573,9 @@ class SharingService:
def getBugSharingPolicies(self, pillar):
"""See `ISharingService`."""
- # Only Products have bug sharing policies. Distributions just
- # default to Public.
allowed_policies = [BugSharingPolicy.PUBLIC]
- # Commercial projects also allow proprietary bugs.
- if (IProduct.providedBy(pillar)
- and pillar.has_current_commercial_subscription):
-
+ # Commercial pillars also allow proprietary bugs.
+ if pillar.has_current_commercial_subscription:
if pillar.private:
allowed_policies = [
BugSharingPolicy.EMBARGOED_OR_PROPRIETARY,
@@ -607,13 +597,8 @@ class SharingService:
def getSpecificationSharingPolicies(self, pillar):
"""See `ISharingService`."""
- # Only Products have specification sharing policies. Distributions just
- # default to Public.
allowed_policies = [SpecificationSharingPolicy.PUBLIC]
- # Commercial projects also allow proprietary specifications.
- if (IProduct.providedBy(pillar)
- and pillar.has_current_commercial_subscription):
-
+ if pillar.has_current_commercial_subscription:
if pillar.private:
allowed_policies = [
SpecificationSharingPolicy.EMBARGOED_OR_PROPRIETARY,
@@ -910,10 +895,6 @@ class SharingService:
if (not branch_sharing_policy and not bug_sharing_policy and not
specification_sharing_policy):
return None
- # Only Products have sharing policies.
- if not IProduct.providedBy(pillar):
- raise ValueError(
- "Sharing policies are only supported for products.")
if branch_sharing_policy:
pillar.setBranchSharingPolicy(branch_sharing_policy)
if bug_sharing_policy:
diff --git a/lib/lp/registry/services/tests/test_sharingservice.py b/lib/lp/registry/services/tests/test_sharingservice.py
index 8ce3be1..8c557f8 100644
--- a/lib/lp/registry/services/tests/test_sharingservice.py
+++ b/lib/lp/registry/services/tests/test_sharingservice.py
@@ -91,10 +91,6 @@ class PillarScenariosMixin(WithScenarios):
self.skipTest("Only relevant for Product.")
def _makePillar(self, **kwargs):
- if ("bug_sharing_policy" in kwargs or
- "branch_sharing_policy" in kwargs or
- "specification_sharing_policy" in kwargs):
- self._skipUnlessProduct()
return getattr(self.factory, self.pillar_factory_name)(**kwargs)
def _makeBranch(self, pillar, **kwargs):
@@ -246,14 +242,12 @@ class TestSharingService(
pillar, [BranchSharingPolicy.PUBLIC])
def test_getBranchSharingPolicies_expired_commercial(self):
- self._skipUnlessProduct()
pillar = self._makePillar()
self.factory.makeCommercialSubscription(pillar, expired=True)
self._assert_getBranchSharingPolicies(
pillar, [BranchSharingPolicy.PUBLIC])
def test_getBranchSharingPolicies_commercial(self):
- self._skipUnlessProduct()
pillar = self._makePillar()
self.factory.makeCommercialSubscription(pillar)
self._assert_getBranchSharingPolicies(
@@ -266,7 +260,6 @@ class TestSharingService(
def test_getBranchSharingPolicies_non_public(self):
# When the pillar is non-public the policy options are limited to
# only proprietary or embargoed/proprietary.
- self._skipUnlessProduct()
owner = self.factory.makePerson()
pillar = self._makePillar(
information_type=InformationType.PROPRIETARY,
@@ -280,7 +273,6 @@ class TestSharingService(
def test_getBranchSharingPolicies_disallowed_policy(self):
# getBranchSharingPolicies includes a pillar's current policy even if
# it is nominally not allowed.
- self._skipUnlessProduct()
pillar = self._makePillar()
self.factory.makeCommercialSubscription(pillar, expired=True)
with person_logged_in(pillar.owner):
@@ -313,14 +305,12 @@ class TestSharingService(
pillar, [SpecificationSharingPolicy.PUBLIC])
def test_getSpecificationSharingPolicies_expired_commercial(self):
- self._skipUnlessProduct()
pillar = self._makePillar()
self.factory.makeCommercialSubscription(pillar, expired=True)
self._assert_getSpecificationSharingPolicies(
pillar, [SpecificationSharingPolicy.PUBLIC])
def test_getSpecificationSharingPolicies_commercial(self):
- self._skipUnlessProduct()
pillar = self._makePillar()
self.factory.makeCommercialSubscription(pillar)
self._assert_getSpecificationSharingPolicies(
@@ -333,7 +323,6 @@ class TestSharingService(
def test_getSpecificationSharingPolicies_non_public(self):
# When the pillar is non-public the policy options are limited to
# only proprietary or embargoed/proprietary.
- self._skipUnlessProduct()
owner = self.factory.makePerson()
pillar = self._makePillar(
information_type=InformationType.PROPRIETARY,
@@ -367,13 +356,11 @@ class TestSharingService(
self._assert_getBugSharingPolicies(pillar, [BugSharingPolicy.PUBLIC])
def test_getBugSharingPolicies_expired_commercial(self):
- self._skipUnlessProduct()
pillar = self._makePillar()
self.factory.makeCommercialSubscription(pillar, expired=True)
self._assert_getBugSharingPolicies(pillar, [BugSharingPolicy.PUBLIC])
def test_getBugSharingPolicies_commercial(self):
- self._skipUnlessProduct()
pillar = self._makePillar()
self.factory.makeCommercialSubscription(pillar)
self._assert_getBugSharingPolicies(
@@ -386,7 +373,6 @@ class TestSharingService(
def test_getBugSharingPolicies_non_public(self):
# When the pillar is non-public the policy options are limited to
# only proprietary or embargoed/proprietary.
- self._skipUnlessProduct()
owner = self.factory.makePerson()
pillar = self._makePillar(
information_type=InformationType.PROPRIETARY,
@@ -400,7 +386,6 @@ class TestSharingService(
def test_getBugSharingPolicies_disallowed_policy(self):
# getBugSharingPolicies includes a pillar's current policy even if it
# is nominally not allowed.
- self._skipUnlessProduct()
pillar = self._makePillar()
self.factory.makeCommercialSubscription(pillar, expired=True)
with person_logged_in(pillar.owner):
@@ -1294,7 +1279,6 @@ class TestSharingService(
def test_ensureAccessGrantsBranches(self):
# Access grants can be created for branches.
- self._skipUnlessProduct()
owner = self.factory.makePerson()
pillar = self._makePillar(owner=owner)
login_person(owner)
@@ -1305,7 +1289,6 @@ class TestSharingService(
def test_ensureAccessGrantsGitRepositories(self):
# Access grants can be created for Git repositories.
- self._skipUnlessProduct()
owner = self.factory.makePerson()
pillar = self._makePillar(owner=owner)
login_person(owner)
@@ -1375,7 +1358,6 @@ class TestSharingService(
def test_updatePillarBugSharingPolicy(self):
# updatePillarSharingPolicies works for bugs.
- self._skipUnlessProduct()
owner = self.factory.makePerson()
pillar = self._makePillar(owner=owner)
self.factory.makeCommercialSubscription(pillar)
@@ -1388,7 +1370,6 @@ class TestSharingService(
def test_updatePillarBranchSharingPolicy(self):
# updatePillarSharingPolicies works for branches.
- self._skipUnlessProduct()
owner = self.factory.makePerson()
pillar = self._makePillar(owner=owner)
self.factory.makeCommercialSubscription(pillar)
@@ -1401,7 +1382,6 @@ class TestSharingService(
def test_updatePillarSpecificationSharingPolicy(self):
# updatePillarSharingPolicies works for specifications.
- self._skipUnlessProduct()
owner = self.factory.makePerson()
pillar = self._makePillar(owner=owner)
self.factory.makeCommercialSubscription(pillar)
@@ -1730,7 +1710,6 @@ class TestSharingService(
def test_getPeopleWithAccessBranches(self):
# Test the getPeopleWithoutAccess method with branches.
- self._skipUnlessProduct()
owner = self.factory.makePerson()
pillar = self._makePillar(owner=owner)
branch = self._makeBranch(
@@ -1741,7 +1720,6 @@ class TestSharingService(
def test_getPeopleWithAccessGitRepositories(self):
# Test the getPeopleWithoutAccess method with Git repositories.
- self._skipUnlessProduct()
owner = self.factory.makePerson()
pillar = self._makePillar(owner=owner)
gitrepository = self._makeGitRepository(
diff --git a/lib/lp/registry/tests/test_distribution.py b/lib/lp/registry/tests/test_distribution.py
index 49f55df..481ff56 100644
--- a/lib/lp/registry/tests/test_distribution.py
+++ b/lib/lp/registry/tests/test_distribution.py
@@ -31,6 +31,12 @@ from lp.app.errors import (
ServiceUsageForbidden,
)
from lp.app.interfaces.launchpad import ILaunchpadCelebrities
+from lp.app.interfaces.services import IService
+from lp.blueprints.model.specification import (
+ SPECIFICATION_POLICY_ALLOWED_TYPES,
+ )
+from lp.bugs.interfaces.bugtarget import BUG_POLICY_ALLOWED_TYPES
+from lp.code.model.branchnamespace import BRANCH_POLICY_ALLOWED_TYPES
from lp.oci.tests.helpers import OCIConfigHelperMixin
from lp.registry.enums import (
BranchSharingPolicy,
@@ -38,6 +44,7 @@ from lp.registry.enums import (
DistributionDefaultTraversalPolicy,
EXCLUSIVE_TEAM_POLICY,
INCLUSIVE_TEAM_POLICY,
+ SpecificationSharingPolicy,
TeamMembershipPolicy,
)
from lp.registry.errors import (
@@ -69,6 +76,7 @@ from lp.soyuz.interfaces.distributionsourcepackagerelease import (
IDistributionSourcePackageRelease,
)
from lp.testing import (
+ admin_logged_in,
api_url,
celebrity_logged_in,
login_person,
@@ -398,13 +406,52 @@ class TestDistribution(TestCaseWithFactory):
grantees = {grant.grantee for grant in grants}
self.assertEqual(expected_grantess, grantees)
+ def test_open_creation_sharing_policies(self):
+ # Creating a new open (non-proprietary) distribution sets the bug
+ # and branch sharing policies to public, and creates policies if
+ # required.
+ owner = self.factory.makePerson()
+ with person_logged_in(owner):
+ distribution = self.factory.makeDistribution(owner=owner)
+ self.assertEqual(
+ BugSharingPolicy.PUBLIC, distribution.bug_sharing_policy)
+ self.assertEqual(
+ BranchSharingPolicy.PUBLIC, distribution.branch_sharing_policy)
+ self.assertEqual(
+ SpecificationSharingPolicy.PUBLIC,
+ distribution.specification_sharing_policy)
+ aps = getUtility(IAccessPolicySource).findByPillar([distribution])
+ expected = [
+ InformationType.USERDATA, InformationType.PRIVATESECURITY]
+ self.assertContentEqual(expected, [policy.type for policy in aps])
+
+ def test_proprietary_creation_sharing_policies(self):
+ # Creating a new proprietary distribution sets the bug, branch, and
+ # specification sharing policies to proprietary.
+ owner = self.factory.makePerson()
+ with person_logged_in(owner):
+ distribution = self.factory.makeDistribution(
+ owner=owner, information_type=InformationType.PROPRIETARY)
+ self.assertEqual(
+ BugSharingPolicy.PROPRIETARY, distribution.bug_sharing_policy)
+ self.assertEqual(
+ BranchSharingPolicy.PROPRIETARY,
+ distribution.branch_sharing_policy)
+ self.assertEqual(
+ SpecificationSharingPolicy.PROPRIETARY,
+ distribution.specification_sharing_policy)
+ aps = getUtility(IAccessPolicySource).findByPillar([distribution])
+ expected = [InformationType.PROPRIETARY]
+ self.assertContentEqual(expected, [policy.type for policy in aps])
+
def test_change_info_type_proprietary_check_artifacts(self):
# Cannot change distribution information_type if any artifacts are
# public.
- # XXX cjwatson 2022-02-11: Make this use
- # artifact.transitionToInformationType once sharing policies are in
- # place.
- distribution = self.factory.makeDistribution()
+ distribution = self.factory.makeDistribution(
+ specification_sharing_policy=(
+ SpecificationSharingPolicy.PUBLIC_OR_PROPRIETARY),
+ bug_sharing_policy=BugSharingPolicy.PUBLIC_OR_PROPRIETARY,
+ branch_sharing_policy=BranchSharingPolicy.PUBLIC_OR_PROPRIETARY)
self.useContext(person_logged_in(distribution.owner))
spec = self.factory.makeSpecification(distribution=distribution)
for info_type in PRIVATE_DISTRIBUTION_TYPES:
@@ -412,32 +459,34 @@ class TestDistribution(TestCaseWithFactory):
CannotChangeInformationType,
"Some blueprints are public."):
distribution.information_type = info_type
- removeSecurityProxy(spec).information_type = (
- InformationType.PROPRIETARY)
+ spec.transitionToInformationType(
+ InformationType.PROPRIETARY, distribution.owner)
dsp = self.factory.makeDistributionSourcePackage(
distribution=distribution)
bug = self.factory.makeBug(target=dsp)
for bug_info_type in FREE_INFORMATION_TYPES:
- removeSecurityProxy(bug).information_type = bug_info_type
+ bug.transitionToInformationType(bug_info_type, distribution.owner)
for info_type in PRIVATE_DISTRIBUTION_TYPES:
with ExpectedException(
CannotChangeInformationType,
"Some bugs are neither proprietary nor embargoed."):
distribution.information_type = info_type
- removeSecurityProxy(bug).information_type = InformationType.PROPRIETARY
+ bug.transitionToInformationType(
+ InformationType.PROPRIETARY, distribution.owner)
distroseries = self.factory.makeDistroSeries(distribution=distribution)
sp = self.factory.makeSourcePackage(distroseries=distroseries)
branch = self.factory.makeBranch(sourcepackage=sp)
for branch_info_type in FREE_INFORMATION_TYPES:
- removeSecurityProxy(branch).information_type = branch_info_type
+ branch.transitionToInformationType(
+ branch_info_type, distribution.owner)
for info_type in PRIVATE_DISTRIBUTION_TYPES:
with ExpectedException(
CannotChangeInformationType,
"Some branches are neither proprietary nor "
"embargoed."):
distribution.information_type = info_type
- removeSecurityProxy(branch).information_type = (
- InformationType.PROPRIETARY)
+ branch.transitionToInformationType(
+ InformationType.PROPRIETARY, distribution.owner)
for info_type in PRIVATE_DISTRIBUTION_TYPES:
distribution.information_type = info_type
@@ -457,6 +506,40 @@ class TestDistribution(TestCaseWithFactory):
else:
distribution.information_type = info_type
+ def test_change_info_type_proprietary_sets_policies(self):
+ # Changing information type from public to proprietary sets the
+ # appropriate policies.
+ distribution = self.factory.makeDistribution()
+ with person_logged_in(distribution.owner):
+ distribution.information_type = InformationType.PROPRIETARY
+ self.assertEqual(
+ BranchSharingPolicy.PROPRIETARY,
+ distribution.branch_sharing_policy)
+ self.assertEqual(
+ BugSharingPolicy.PROPRIETARY, distribution.bug_sharing_policy)
+ self.assertEqual(
+ SpecificationSharingPolicy.PROPRIETARY,
+ distribution.specification_sharing_policy)
+
+ def test_proprietary_to_public_leaves_policies(self):
+ # Changing information type from public leaves sharing policies
+ # unchanged.
+ owner = self.factory.makePerson()
+ distribution = self.factory.makeDistribution(
+ information_type=InformationType.PROPRIETARY, owner=owner)
+ with person_logged_in(owner):
+ distribution.information_type = InformationType.PUBLIC
+ # Setting information type to the current type should be a no-op.
+ distribution.information_type = InformationType.PUBLIC
+ self.assertEqual(
+ BranchSharingPolicy.PROPRIETARY,
+ distribution.branch_sharing_policy)
+ self.assertEqual(
+ BugSharingPolicy.PROPRIETARY, distribution.bug_sharing_policy)
+ self.assertEqual(
+ SpecificationSharingPolicy.PROPRIETARY,
+ distribution.specification_sharing_policy)
+
def test_cacheAccessPolicies(self):
# Distribution.access_policies is a list caching AccessPolicy.ids
# for which an AccessPolicyGrant or AccessArtifactGrant gives a
@@ -483,6 +566,18 @@ class TestDistribution(TestCaseWithFactory):
naked_distribution.information_type = InformationType.PUBLIC
self.assertIsNone(naked_distribution.access_policies)
+ # Proprietary distributions can have both Proprietary and Embargoed
+ # artifacts, and someone who can see either needs LimitedView on the
+ # pillar they're on. So both policies are permissible if they
+ # exist.
+ naked_distribution.information_type = InformationType.PROPRIETARY
+ naked_distribution.setBugSharingPolicy(
+ BugSharingPolicy.EMBARGOED_OR_PROPRIETARY)
+ [emb_policy] = aps.find([(distribution, InformationType.EMBARGOED)])
+ self.assertContentEqual(
+ [prop_policy.id, emb_policy.id],
+ naked_distribution.access_policies)
+
def test_checkInformationType_bug_supervisor(self):
# Bug supervisors of proprietary distributions must not have
# inclusive membership policies.
@@ -744,6 +839,378 @@ class TestDistribution(TestCaseWithFactory):
distribution.information_type = InformationType.PROPRIETARY
+class TestDistributionBugInformationTypes(TestCaseWithFactory):
+
+ layer = DatabaseFunctionalLayer
+
+ def makeDistributionWithPolicy(self, bug_sharing_policy):
+ distribution = self.factory.makeDistribution()
+ self.factory.makeCommercialSubscription(pillar=distribution)
+ with person_logged_in(distribution.owner):
+ distribution.setBugSharingPolicy(bug_sharing_policy)
+ return distribution
+
+ def test_no_policy(self):
+ # New distributions can only use the non-proprietary information
+ # types.
+ distribution = self.factory.makeDistribution()
+ self.assertContentEqual(
+ FREE_INFORMATION_TYPES,
+ distribution.getAllowedBugInformationTypes())
+ self.assertEqual(
+ InformationType.PUBLIC,
+ distribution.getDefaultBugInformationType())
+
+ def test_sharing_policy_public_or_proprietary(self):
+ # bug_sharing_policy can enable Proprietary.
+ distribution = self.makeDistributionWithPolicy(
+ BugSharingPolicy.PUBLIC_OR_PROPRIETARY)
+ self.assertContentEqual(
+ FREE_INFORMATION_TYPES + (InformationType.PROPRIETARY,),
+ distribution.getAllowedBugInformationTypes())
+ self.assertEqual(
+ InformationType.PUBLIC,
+ distribution.getDefaultBugInformationType())
+
+ def test_sharing_policy_proprietary_or_public(self):
+ # bug_sharing_policy can enable and default to Proprietary.
+ distribution = self.makeDistributionWithPolicy(
+ BugSharingPolicy.PROPRIETARY_OR_PUBLIC)
+ self.assertContentEqual(
+ FREE_INFORMATION_TYPES + (InformationType.PROPRIETARY,),
+ distribution.getAllowedBugInformationTypes())
+ self.assertEqual(
+ InformationType.PROPRIETARY,
+ distribution.getDefaultBugInformationType())
+
+ def test_sharing_policy_proprietary(self):
+ # bug_sharing_policy can enable only Proprietary.
+ distribution = self.makeDistributionWithPolicy(
+ BugSharingPolicy.PROPRIETARY)
+ self.assertContentEqual(
+ [InformationType.PROPRIETARY],
+ distribution.getAllowedBugInformationTypes())
+ self.assertEqual(
+ InformationType.PROPRIETARY,
+ distribution.getDefaultBugInformationType())
+
+
+class TestDistributionSpecificationPolicyAndInformationTypes(
+ TestCaseWithFactory):
+
+ layer = DatabaseFunctionalLayer
+
+ def makeDistributionWithPolicy(self, specification_sharing_policy):
+ distribution = self.factory.makeDistribution()
+ self.factory.makeCommercialSubscription(pillar=distribution)
+ with person_logged_in(distribution.owner):
+ distribution.setSpecificationSharingPolicy(
+ specification_sharing_policy)
+ return distribution
+
+ def test_no_policy(self):
+ # Distributions that have not specified a policy can use the PUBLIC
+ # information type.
+ distribution = self.factory.makeDistribution()
+ self.assertContentEqual(
+ [InformationType.PUBLIC],
+ distribution.getAllowedSpecificationInformationTypes())
+ self.assertEqual(
+ InformationType.PUBLIC,
+ distribution.getDefaultSpecificationInformationType())
+
+ def test_sharing_policy_public(self):
+ # Distributions with a purely public policy should use PUBLIC
+ # information type.
+ distribution = self.makeDistributionWithPolicy(
+ SpecificationSharingPolicy.PUBLIC)
+ self.assertContentEqual(
+ [InformationType.PUBLIC],
+ distribution.getAllowedSpecificationInformationTypes())
+ self.assertEqual(
+ InformationType.PUBLIC,
+ distribution.getDefaultSpecificationInformationType())
+
+ def test_sharing_policy_public_or_proprietary(self):
+ # specification_sharing_policy can enable Proprietary.
+ distribution = self.makeDistributionWithPolicy(
+ SpecificationSharingPolicy.PUBLIC_OR_PROPRIETARY)
+ self.assertContentEqual(
+ [InformationType.PUBLIC, InformationType.PROPRIETARY],
+ distribution.getAllowedSpecificationInformationTypes())
+ self.assertEqual(
+ InformationType.PUBLIC,
+ distribution.getDefaultSpecificationInformationType())
+
+ def test_sharing_policy_proprietary_or_public(self):
+ # specification_sharing_policy can enable and default to Proprietary.
+ distribution = self.makeDistributionWithPolicy(
+ SpecificationSharingPolicy.PROPRIETARY_OR_PUBLIC)
+ self.assertContentEqual(
+ [InformationType.PUBLIC, InformationType.PROPRIETARY],
+ distribution.getAllowedSpecificationInformationTypes())
+ self.assertEqual(
+ InformationType.PROPRIETARY,
+ distribution.getDefaultSpecificationInformationType())
+
+ def test_sharing_policy_proprietary(self):
+ # specification_sharing_policy can enable only Proprietary.
+ distribution = self.makeDistributionWithPolicy(
+ SpecificationSharingPolicy.PROPRIETARY)
+ self.assertContentEqual(
+ [InformationType.PROPRIETARY],
+ distribution.getAllowedSpecificationInformationTypes())
+ self.assertEqual(
+ InformationType.PROPRIETARY,
+ distribution.getDefaultSpecificationInformationType())
+
+ def test_sharing_policy_embargoed_or_proprietary(self):
+ # specification_sharing_policy can be embargoed and then proprietary.
+ distribution = self.makeDistributionWithPolicy(
+ SpecificationSharingPolicy.EMBARGOED_OR_PROPRIETARY)
+ self.assertContentEqual(
+ [InformationType.PROPRIETARY, InformationType.EMBARGOED],
+ distribution.getAllowedSpecificationInformationTypes())
+ self.assertEqual(
+ InformationType.EMBARGOED,
+ distribution.getDefaultSpecificationInformationType())
+
+
+class BaseSharingPolicyTests:
+ """Common tests for distribution sharing policies."""
+
+ layer = DatabaseFunctionalLayer
+
+ def setSharingPolicy(self, policy, user):
+ raise NotImplementedError
+
+ def getSharingPolicy(self):
+ raise NotImplementedError
+
+ def setUp(self):
+ super().setUp()
+ self.distribution = self.factory.makeDistribution()
+ self.commercial_admin = self.factory.makeCommercialAdmin()
+
+ def test_owner_can_set_policy(self):
+ # Distribution maintainers can set sharing policies.
+ self.setSharingPolicy(self.public_policy, self.distribution.owner)
+ self.assertEqual(self.public_policy, self.getSharingPolicy())
+
+ def test_commercial_admin_can_set_policy(self):
+ # Commercial admins can set sharing policies for commercial
+ # distributions.
+ self.factory.makeCommercialSubscription(pillar=self.distribution)
+ self.setSharingPolicy(self.public_policy, self.commercial_admin)
+ self.assertEqual(self.public_policy, self.getSharingPolicy())
+
+ def test_random_cannot_set_policy(self):
+ # An unrelated user can't set sharing policies.
+ person = self.factory.makePerson()
+ self.assertRaises(
+ Unauthorized, self.setSharingPolicy, self.public_policy, person)
+
+ def test_anonymous_cannot_set_policy(self):
+ # An anonymous user can't set sharing policies.
+ self.assertRaises(
+ Unauthorized, self.setSharingPolicy, self.public_policy, None)
+
+ def test_proprietary_forbidden_without_commercial_sub(self):
+ # No policy that allows Proprietary can be configured without a
+ # commercial subscription.
+ self.setSharingPolicy(self.public_policy, self.distribution.owner)
+ self.assertEqual(self.public_policy, self.getSharingPolicy())
+ for policy in self.commercial_policies:
+ self.assertRaises(
+ CommercialSubscribersOnly,
+ self.setSharingPolicy, policy, self.distribution.owner)
+
+ def test_proprietary_allowed_with_commercial_sub(self):
+ # All policies are valid when there's a current commercial
+ # subscription.
+ self.factory.makeCommercialSubscription(pillar=self.distribution)
+ for policy in self.enum.items:
+ self.setSharingPolicy(policy, self.commercial_admin)
+ self.assertEqual(policy, self.getSharingPolicy())
+
+ def test_setting_proprietary_creates_access_policy(self):
+ # Setting a policy that allows Proprietary creates a
+ # corresponding access policy and shares it with the the
+ # maintainer.
+ self.factory.makeCommercialSubscription(pillar=self.distribution)
+ self.assertEqual(
+ [InformationType.PRIVATESECURITY, InformationType.USERDATA],
+ [policy.type for policy in
+ getUtility(IAccessPolicySource).findByPillar(
+ [self.distribution])])
+ self.setSharingPolicy(
+ self.commercial_policies[0], self.commercial_admin)
+ self.assertEqual(
+ [InformationType.PRIVATESECURITY, InformationType.USERDATA,
+ InformationType.PROPRIETARY],
+ [policy.type for policy in
+ getUtility(IAccessPolicySource).findByPillar(
+ [self.distribution])])
+ self.assertTrue(
+ getUtility(IService, 'sharing').checkPillarAccess(
+ [self.distribution], InformationType.PROPRIETARY,
+ self.distribution.owner))
+
+ def test_unused_policies_are_pruned(self):
+ # When a sharing policy is changed, the allowed information types may
+ # become more restricted. If this case, any existing access polices
+ # for the now defunct information type(s) should be removed so long as
+ # there are no corresponding policy artifacts.
+
+ # We create a distribution with and ensure there's an APA.
+ ap_source = getUtility(IAccessPolicySource)
+ distribution = self.factory.makeDistribution()
+ [ap] = ap_source.find(
+ [(distribution, InformationType.PRIVATESECURITY)])
+ self.factory.makeAccessPolicyArtifact(policy=ap)
+
+ def getAccessPolicyTypes(pillar):
+ return [
+ ap.type
+ for ap in ap_source.findByPillar([pillar])]
+
+ # Now change the sharing policies to PROPRIETARY
+ self.factory.makeCommercialSubscription(pillar=distribution)
+ with person_logged_in(distribution.owner):
+ distribution.setBugSharingPolicy(BugSharingPolicy.PROPRIETARY)
+ # Just bug sharing policy has been changed so all previous policy
+ # types are still valid.
+ self.assertContentEqual(
+ [InformationType.PRIVATESECURITY, InformationType.USERDATA,
+ InformationType.PROPRIETARY],
+ getAccessPolicyTypes(distribution))
+
+ distribution.setBranchSharingPolicy(
+ BranchSharingPolicy.PROPRIETARY)
+ # Proprietary is permitted by the sharing policy, and there's a
+ # Private Security artifact. But Private isn't in use or allowed
+ # by a sharing policy, so it's now gone.
+ self.assertContentEqual(
+ [InformationType.PRIVATESECURITY, InformationType.PROPRIETARY],
+ getAccessPolicyTypes(distribution))
+
+ def test_proprietary_distributions_forbid_public_policies(self):
+ # A proprietary distribution forbids any sharing policy that would
+ # permit public artifacts.
+ owner = self.distribution.owner
+ with admin_logged_in():
+ self.distribution.information_type = InformationType.PROPRIETARY
+ policies_permitting_public = [self.public_policy]
+ policies_permitting_public.extend(
+ policy for policy in self.commercial_policies if
+ InformationType.PUBLIC in self.allowed_types[policy])
+ for policy in policies_permitting_public:
+ with ExpectedException(
+ ProprietaryPillar, "The distribution is Proprietary."):
+ self.setSharingPolicy(policy, owner)
+
+
+class TestDistributionBugSharingPolicy(
+ BaseSharingPolicyTests, TestCaseWithFactory):
+ """Test Distribution.bug_sharing_policy."""
+
+ layer = DatabaseFunctionalLayer
+
+ enum = BugSharingPolicy
+ public_policy = BugSharingPolicy.PUBLIC
+ commercial_policies = (
+ BugSharingPolicy.PUBLIC_OR_PROPRIETARY,
+ BugSharingPolicy.PROPRIETARY_OR_PUBLIC,
+ BugSharingPolicy.PROPRIETARY,
+ )
+ allowed_types = BUG_POLICY_ALLOWED_TYPES
+
+ def setSharingPolicy(self, policy, user):
+ with person_logged_in(user):
+ result = self.distribution.setBugSharingPolicy(policy)
+ return result
+
+ def getSharingPolicy(self):
+ return self.distribution.bug_sharing_policy
+
+
+class TestDistributionBranchSharingPolicy(
+ BaseSharingPolicyTests, TestCaseWithFactory):
+ """Test Distribution.branch_sharing_policy."""
+
+ layer = DatabaseFunctionalLayer
+
+ enum = BranchSharingPolicy
+ public_policy = BranchSharingPolicy.PUBLIC
+ commercial_policies = (
+ BranchSharingPolicy.PUBLIC_OR_PROPRIETARY,
+ BranchSharingPolicy.PROPRIETARY_OR_PUBLIC,
+ BranchSharingPolicy.PROPRIETARY,
+ BranchSharingPolicy.EMBARGOED_OR_PROPRIETARY,
+ )
+ allowed_types = BRANCH_POLICY_ALLOWED_TYPES
+
+ def setSharingPolicy(self, policy, user):
+ with person_logged_in(user):
+ result = self.distribution.setBranchSharingPolicy(policy)
+ return result
+
+ def getSharingPolicy(self):
+ return self.distribution.branch_sharing_policy
+
+ def test_setting_embargoed_creates_access_policy(self):
+ # Setting a policy that allows Embargoed creates a corresponding
+ # access policy and shares it with the maintainer.
+ self.factory.makeCommercialSubscription(pillar=self.distribution)
+ self.assertEqual(
+ [InformationType.PRIVATESECURITY, InformationType.USERDATA],
+ [policy.type for policy in
+ getUtility(IAccessPolicySource).findByPillar(
+ [self.distribution])])
+ self.setSharingPolicy(
+ self.enum.EMBARGOED_OR_PROPRIETARY,
+ self.commercial_admin)
+ self.assertEqual(
+ [InformationType.PRIVATESECURITY, InformationType.USERDATA,
+ InformationType.PROPRIETARY, InformationType.EMBARGOED],
+ [policy.type for policy in
+ getUtility(IAccessPolicySource).findByPillar(
+ [self.distribution])])
+ self.assertTrue(
+ getUtility(IService, 'sharing').checkPillarAccess(
+ [self.distribution], InformationType.PROPRIETARY,
+ self.distribution.owner))
+ self.assertTrue(
+ getUtility(IService, 'sharing').checkPillarAccess(
+ [self.distribution], InformationType.EMBARGOED,
+ self.distribution.owner))
+
+
+class TestDistributionSpecificationSharingPolicy(
+ BaseSharingPolicyTests, TestCaseWithFactory):
+ """Test Distribution.specification_sharing_policy."""
+
+ layer = DatabaseFunctionalLayer
+
+ enum = SpecificationSharingPolicy
+ public_policy = SpecificationSharingPolicy.PUBLIC
+ commercial_policies = (
+ SpecificationSharingPolicy.PUBLIC_OR_PROPRIETARY,
+ SpecificationSharingPolicy.PROPRIETARY_OR_PUBLIC,
+ SpecificationSharingPolicy.PROPRIETARY,
+ SpecificationSharingPolicy.EMBARGOED_OR_PROPRIETARY,
+ )
+ allowed_types = SPECIFICATION_POLICY_ALLOWED_TYPES
+
+ def setSharingPolicy(self, policy, user):
+ with person_logged_in(user):
+ result = self.distribution.setSpecificationSharingPolicy(policy)
+ return result
+
+ def getSharingPolicy(self):
+ return self.distribution.specification_sharing_policy
+
+
class TestDistributionCurrentSourceReleases(
CurrentSourceReleasesMixin, TestCase):
"""Test for Distribution.getCurrentSourceReleases().
diff --git a/lib/lp/scripts/garbo.py b/lib/lp/scripts/garbo.py
index f1c75ad..24c4d61 100644
--- a/lib/lp/scripts/garbo.py
+++ b/lib/lp/scripts/garbo.py
@@ -79,6 +79,7 @@ from lp.code.model.revision import (
from lp.code.model.revisionstatus import RevisionStatusArtifact
from lp.oci.model.ocirecipebuild import OCIFile
from lp.registry.interfaces.person import IPersonSet
+from lp.registry.model.distribution import Distribution
from lp.registry.model.person import Person
from lp.registry.model.product import Product
from lp.registry.model.sourcepackagename import SourcePackageName
@@ -1426,7 +1427,7 @@ class UnusedPOTMsgSetPruner(TunableLoop):
transaction.commit()
-class UnusedAccessPolicyPruner(TunableLoop):
+class UnusedProductAccessPolicyPruner(TunableLoop):
"""Deletes unused AccessPolicy and AccessPolicyGrants for products."""
maximum_chunk_size = 5000
@@ -1451,6 +1452,32 @@ class UnusedAccessPolicyPruner(TunableLoop):
transaction.commit()
+class UnusedDistributionAccessPolicyPruner(TunableLoop):
+ """Deletes unused AccessPolicy and AccessPolicyGrants for distributions."""
+
+ maximum_chunk_size = 5000
+
+ def __init__(self, log, abort_time=None):
+ super().__init__(log, abort_time)
+ self.start_at = 1
+ self.store = IMasterStore(Distribution)
+
+ def findDistributions(self):
+ return self.store.find(
+ Distribution,
+ Distribution.id >= self.start_at).order_by(Distribution.id)
+
+ def isDone(self):
+ return self.findDistributions().is_empty()
+
+ def __call__(self, chunk_size):
+ distributions = list(self.findDistributions()[:chunk_size])
+ for distribution in distributions:
+ distribution._pruneUnusedPolicies()
+ self.start_at = distributions[-1].id + 1
+ transaction.commit()
+
+
class ProductVCSPopulator(TunableLoop):
"""Populates product.vcs from product.inferred_vcs if not set."""
@@ -2087,8 +2114,9 @@ class DailyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
SuggestiveTemplatesCacheUpdater,
TeamMembershipPruner,
UnlinkedAccountPruner,
- UnusedAccessPolicyPruner,
+ UnusedDistributionAccessPolicyPruner,
UnusedPOTMsgSetPruner,
+ UnusedProductAccessPolicyPruner,
WebhookJobPruner,
]
experimental_tunable_loops = [
diff --git a/lib/lp/scripts/tests/test_garbo.py b/lib/lp/scripts/tests/test_garbo.py
index 7f5844a..3df3b58 100644
--- a/lib/lp/scripts/tests/test_garbo.py
+++ b/lib/lp/scripts/tests/test_garbo.py
@@ -1355,9 +1355,10 @@ class TestGarbo(FakeAdapterMixin, TestCaseWithFactory):
ap.type
for ap in getUtility(IAccessPolicySource).findByPillar([pillar])]
- def test_UnusedAccessPolicyPruner(self):
- # UnusedAccessPolicyPruner removes access policies that aren't
- # in use by artifacts or allowed by the project sharing policy.
+ def test_UnusedProductAccessPolicyPruner(self):
+ # UnusedProductAccessPolicyPruner removes access policies that
+ # aren't in use by artifacts or allowed by the project sharing
+ # policy.
switch_dbuser('testadmin')
product = self.factory.makeProduct()
self.factory.makeCommercialSubscription(pillar=product)
@@ -1385,6 +1386,39 @@ class TestGarbo(FakeAdapterMixin, TestCaseWithFactory):
[InformationType.PRIVATESECURITY, InformationType.PROPRIETARY],
self.getAccessPolicyTypes(product))
+ def test_UnusedDistributionAccessPolicyPruner(self):
+ # UnusedDistributionAccessPolicyPruner removes access policies that
+ # aren't in use by artifacts or allowed by the distribution sharing
+ # policy.
+ switch_dbuser('testadmin')
+ distribution = self.factory.makeProduct()
+ self.factory.makeCommercialSubscription(pillar=distribution)
+ self.factory.makeAccessPolicy(
+ distribution, InformationType.PROPRIETARY)
+ naked_distribution = removeSecurityProxy(distribution)
+ naked_distribution.bug_sharing_policy = BugSharingPolicy.PROPRIETARY
+ naked_distribution.branch_sharing_policy = (
+ BranchSharingPolicy.PROPRIETARY)
+ [ap] = getUtility(IAccessPolicySource).find(
+ [(distribution, InformationType.PRIVATESECURITY)])
+ self.factory.makeAccessPolicyArtifact(policy=ap)
+
+ # Private and Private Security were created with the distribution.
+ # Proprietary was created when the branch sharing policy was set.
+ self.assertContentEqual(
+ [InformationType.PRIVATESECURITY, InformationType.USERDATA,
+ InformationType.PROPRIETARY],
+ self.getAccessPolicyTypes(distribution))
+
+ self.runDaily()
+
+ # Proprietary is permitted by the sharing policy, and there's a
+ # Private Security artifact. But Private isn't in use or allowed
+ # by a sharing policy, so garbo deleted it.
+ self.assertContentEqual(
+ [InformationType.PRIVATESECURITY, InformationType.PROPRIETARY],
+ self.getAccessPolicyTypes(distribution))
+
def test_ProductVCSPopulator(self):
switch_dbuser('testadmin')
product = self.factory.makeProduct()
diff --git a/lib/lp/security.py b/lib/lp/security.py
index 95d7a45..fbada1b 100644
--- a/lib/lp/security.py
+++ b/lib/lp/security.py
@@ -2402,14 +2402,15 @@ class EditBranch(AuthorizationBase):
class ModerateBranch(EditBranch):
- """The owners, product owners, and admins can moderate branches."""
+ """The owners, pillar owners, and admins can moderate branches."""
permission = 'launchpad.Moderate'
def checkAuthenticated(self, user):
if super().checkAuthenticated(user):
return True
branch = self.obj
- if branch.product is not None and user.inTeam(branch.product.owner):
+ pillar = branch.product or branch.distribution
+ if pillar is not None and user.inTeam(pillar.owner):
return True
return user.in_commercial_admin
@@ -2477,15 +2478,22 @@ class EditGitRepository(AuthorizationBase):
class ModerateGitRepository(EditGitRepository):
- """The owners, project owners, and admins can moderate Git repositories."""
+ """The owners, pillar owners, and admins can moderate Git repositories."""
permission = 'launchpad.Moderate'
def checkAuthenticated(self, user):
if super().checkAuthenticated(user):
return True
target = self.obj.target
- if (target is not None and IProduct.providedBy(target) and
- user.inTeam(target.owner)):
+ if IProduct.providedBy(target):
+ pillar = target
+ elif IDistributionSourcePackage.providedBy(target):
+ pillar = target.distribution
+ elif IOCIProject.providedBy(target):
+ pillar = target.pillar
+ else:
+ raise AssertionError("Unknown target: %r" % target)
+ if pillar is not None and user.inTeam(pillar.owner):
return True
return user.in_commercial_admin
diff --git a/lib/lp/testing/factory.py b/lib/lp/testing/factory.py
index d4b9f20..f6a7917 100644
--- a/lib/lp/testing/factory.py
+++ b/lib/lp/testing/factory.py
@@ -2709,7 +2709,10 @@ class BareLaunchpadObjectFactory(ObjectFactory):
publish_root_dir=None, publish_base_url=None,
publish_copy_base_url=None, no_pubconf=False,
icon=None, summary=None, vcs=None,
- oci_project_admin=None, information_type=None):
+ oci_project_admin=None, bug_sharing_policy=None,
+ branch_sharing_policy=None,
+ specification_sharing_policy=None,
+ information_type=None):
"""Make a new distribution."""
if name is None:
name = self.getUniqueString(prefix="distribution")
@@ -2740,6 +2743,26 @@ class BareLaunchpadObjectFactory(ObjectFactory):
naked_distro.bug_supervisor = bug_supervisor
if oci_project_admin is not None:
naked_distro.oci_project_admin = oci_project_admin
+ # makeProduct defaults licenses to [License.OTHER_PROPRIETARY] if
+ # any non-public sharing policy is set, which ensures a
+ # complimentary commercial subscription. However, Distribution
+ # doesn't have a licenses field, so deal with the commercial
+ # subscription directly here instead.
+ if ((bug_sharing_policy is not None and
+ bug_sharing_policy != BugSharingPolicy.PUBLIC) or
+ (branch_sharing_policy is not None and
+ branch_sharing_policy != BranchSharingPolicy.PUBLIC) or
+ (specification_sharing_policy is not None and
+ specification_sharing_policy !=
+ SpecificationSharingPolicy.PUBLIC)):
+ naked_distro._ensure_complimentary_subscription()
+ if branch_sharing_policy:
+ naked_distro.setBranchSharingPolicy(branch_sharing_policy)
+ if bug_sharing_policy:
+ naked_distro.setBugSharingPolicy(bug_sharing_policy)
+ if specification_sharing_policy:
+ naked_distro.setSpecificationSharingPolicy(
+ specification_sharing_policy)
if not no_pubconf:
self.makePublisherConfig(
distro, publish_root_dir, publish_base_url,