launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #05634
[Merge] lp:~wgrant/launchpad/observer-model into lp:launchpad
William Grant has proposed merging lp:~wgrant/launchpad/observer-model into lp:launchpad.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~wgrant/launchpad/observer-model/+merge/82852
This is the initial Python model work for Disclosure's access policies. It's likely to evolve in big ways over the coming weeks, but this is a start, and something vaguely landable.
See <https://code.launchpad.net/~wgrant/launchpad/observer-db/+merge/81104> for an overview of how things will probably work. This branch is a pretty simple layer to expose and manipulate data within that schema in our code.
--
https://code.launchpad.net/~wgrant/launchpad/observer-model/+merge/82852
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~wgrant/launchpad/observer-model into lp:launchpad.
=== modified file 'database/schema/security.cfg'
--- database/schema/security.cfg 2011-11-09 20:40:58 +0000
+++ database/schema/security.cfg 2011-11-21 09:20:32 +0000
@@ -104,6 +104,9 @@
[launchpad_main]
groups=write,script
+public.accesspolicy = SELECT, INSERT, UPDATE, DELETE
+public.accesspolicyartifact = SELECT, INSERT, UPDATE, DELETE
+public.accesspolicygrant = SELECT, INSERT, UPDATE, DELETE
public.account = SELECT, INSERT, UPDATE, DELETE
public.accountpassword = SELECT, INSERT, UPDATE, DELETE
public.announcement = SELECT, INSERT, UPDATE, DELETE
=== modified file 'lib/lp/registry/configure.zcml'
--- lib/lp/registry/configure.zcml 2011-11-16 21:13:20 +0000
+++ lib/lp/registry/configure.zcml 2011-11-21 09:20:32 +0000
@@ -1938,4 +1938,41 @@
for="lp.registry.interfaces.person.IPerson"
provides="lp.registry.interfaces.role.IPersonRoles"
factory="lp.registry.model.personroles.PersonRoles" />
+
+ <class
+ class="lp.registry.model.accesspolicy.AccessPolicy">
+ <allow
+ interface="lp.registry.interfaces.accesspolicy.IAccessPolicy"/>
+ </class>
+ <securedutility
+ component="lp.registry.model.accesspolicy.AccessPolicy"
+ provides="lp.registry.interfaces.accesspolicy.IAccessPolicySource">
+ <allow
+ interface="lp.registry.interfaces.accesspolicy.IAccessPolicySource"/>
+ </securedutility>
+ <class
+ class="lp.registry.model.accesspolicy.AccessPolicyArtifact">
+ <allow
+ interface="lp.registry.interfaces.accesspolicy.IAccessPolicyArtifact"/>
+ <require
+ permission="zope.Public"
+ set_attributes="policy"/>
+ </class>
+ <securedutility
+ component="lp.registry.model.accesspolicy.AccessPolicyArtifact"
+ provides="lp.registry.interfaces.accesspolicy.IAccessPolicyArtifactSource">
+ <allow
+ interface="lp.registry.interfaces.accesspolicy.IAccessPolicyArtifactSource"/>
+ </securedutility>
+ <class
+ class="lp.registry.model.accesspolicy.AccessPolicyGrant">
+ <allow
+ interface="lp.registry.interfaces.accesspolicy.IAccessPolicyGrant"/>
+ </class>
+ <securedutility
+ component="lp.registry.model.accesspolicy.AccessPolicyGrant"
+ provides="lp.registry.interfaces.accesspolicy.IAccessPolicyGrantSource">
+ <allow
+ interface="lp.registry.interfaces.accesspolicy.IAccessPolicyGrantSource"/>
+ </securedutility>
</configure>
=== added file 'lib/lp/registry/interfaces/accesspolicy.py'
--- lib/lp/registry/interfaces/accesspolicy.py 1970-01-01 00:00:00 +0000
+++ lib/lp/registry/interfaces/accesspolicy.py 2011-11-21 09:20:32 +0000
@@ -0,0 +1,118 @@
+# Copyright 2011 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Interfaces for pillar and artifact access policies."""
+
+__metaclass__ = type
+
+__all__ = [
+ 'AccessPolicyType',
+ 'IAccessPolicy',
+ 'IAccessPolicyArtifact',
+ 'IAccessPolicyArtifactSource',
+ 'IAccessPolicyGrant',
+ 'IAccessPolicySource',
+ ]
+
+from lazr.enum import (
+ DBEnumeratedType,
+ DBItem,
+ )
+from zope.interface import (
+ Attribute,
+ Interface,
+ )
+
+
+class AccessPolicyType(DBEnumeratedType):
+ """Access policy type."""
+
+ PRIVATE = DBItem(1, """
+ Private
+
+ This policy covers general private information.
+ """)
+
+ SECURITY = DBItem(2, """
+ Security
+
+ This policy covers information relating to confidential security
+ vulnerabilities.
+ """)
+
+
+class IAccessPolicy(Interface):
+ id = Attribute("ID")
+ pillar = Attribute("Pillar")
+ type = Attribute("Type")
+
+
+class IAccessPolicyArtifact(Interface):
+ id = Attribute("ID")
+ concrete_artifact = Attribute("Concrete artifact")
+ policy = Attribute("Access policy")
+
+
+class IAccessPolicyGrant(Interface):
+ id = Attribute("ID")
+ grantee = Attribute("Grantee")
+ grantor = Attribute("Grantor")
+ date_created = Attribute("Date created")
+ policy = Attribute("Access policy")
+ abstract_artifact = Attribute("Abstract artifact")
+
+ concrete_artifact = Attribute("Concrete artifact")
+
+
+class IAccessPolicySource(Interface):
+
+ def create(pillar, display_name):
+ """Create an `IAccessPolicy` for the pillar with the given name."""
+
+ def getByID(id):
+ """Return the `IAccessPolicy` with the given ID."""
+
+ def getByPillarAndType(pillar, type):
+ """Return the pillar's `IAccessPolicy` with the given type."""
+
+ def findByPillar(pillar):
+ """Return a ResultSet of all `IAccessPolicy`s for the pillar."""
+
+
+class IAccessPolicyArtifactSource(Interface):
+
+ def ensure(concrete_artifact):
+ """Return the `IAccessPolicyArtifact` for a concrete artifact.
+
+ Creates the abstract artifact if it doesn't already exist.
+ """
+
+ def get(concrete_artifact):
+ """Return the `IAccessPolicyArtifact` for an artifact, if it exists.
+
+ Use ensure() if you want to create one if it doesn't yet exist.
+ """
+
+ def delete(concrete_artifact):
+ """Delete the `IAccessPolicyArtifact` for a concrete artifact.
+
+ Also removes any AccessPolicyGrants for the artifact.
+ """
+
+
+class IAccessPolicyGrantSource(Interface):
+
+ def grant(grantee, grantor, object):
+ """Create an `IAccessPolicyGrant`.
+
+ :param grantee: the `IPerson` to hold the access.
+ :param grantor: the `IPerson` that grants the access.
+ :param object: the `IAccessPolicy` or `IAccessPolicyArtifact` to
+ grant access to.
+ """
+
+ def getByID(id):
+ """Return the `IAccessPolicyGrant` with the given ID."""
+
+ def findByPolicy(policy):
+ """Return all `IAccessPolicyGrant` objects for the policy."""
=== added file 'lib/lp/registry/model/accesspolicy.py'
--- lib/lp/registry/model/accesspolicy.py 1970-01-01 00:00:00 +0000
+++ lib/lp/registry/model/accesspolicy.py 2011-11-21 09:20:32 +0000
@@ -0,0 +1,198 @@
+# Copyright 2011 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Model classes for pillar and artifact access policies."""
+
+__metaclass__ = type
+__all__ = [
+ 'AccessPolicy',
+ 'AccessPolicyArtifact',
+ 'AccessPolicyGrant',
+ ]
+
+from storm.properties import (
+ Int,
+ DateTime,
+ )
+from storm.references import Reference
+from zope.interface import implements
+
+from canonical.database.enumcol import DBEnum
+from canonical.launchpad.interfaces.lpstorm import IStore
+from lp.registry.interfaces.accesspolicy import (
+ AccessPolicyType,
+ IAccessPolicy,
+ IAccessPolicyArtifact,
+ IAccessPolicyGrant,
+ )
+from lp.services.database.stormbase import StormBase
+
+
+class AccessPolicy(StormBase):
+ implements(IAccessPolicy)
+
+ __storm_table__ = 'AccessPolicy'
+
+ id = Int(primary=True)
+ product_id = Int(name='product')
+ product = Reference(product_id, 'Product.id')
+ distribution_id = Int(name='distribution')
+ distribution = Reference(distribution_id, 'Distribution.id')
+ type = DBEnum(allow_none=True, enum=AccessPolicyType)
+
+ @property
+ def pillar(self):
+ return self.product or self.distribution
+
+ @classmethod
+ def create(cls, pillar, type):
+ from lp.registry.interfaces.distribution import IDistribution
+ from lp.registry.interfaces.product import IProduct
+ obj = cls()
+ if IProduct.providedBy(pillar):
+ obj.product = pillar
+ elif IDistribution.providedBy(pillar):
+ obj.distribution = pillar
+ else:
+ raise AssertionError("%r is not a supported pillar" % pillar)
+ obj.type = type
+ IStore(cls).add(obj)
+ return obj
+
+ @classmethod
+ def _constraintForPillar(cls, pillar):
+ from lp.registry.interfaces.distribution import IDistribution
+ from lp.registry.interfaces.product import IProduct
+ if IProduct.providedBy(pillar):
+ col = cls.product
+ elif IDistribution.providedBy(pillar):
+ col = cls.distribution
+ else:
+ raise AssertionError("%r is not a supported pillar" % pillar)
+ return col == pillar
+
+ @classmethod
+ def getByID(cls, id):
+ """See `IAccessPolicySource`."""
+ return IStore(cls).get(cls, id)
+
+ @classmethod
+ def findByPillar(cls, pillar):
+ """See `IAccessPolicySource`."""
+ return IStore(cls).find(cls, cls._constraintForPillar(pillar))
+
+ @classmethod
+ def getByPillarAndType(cls, pillar, type):
+ """See `IAccessPolicySource`."""
+ return cls.findByPillar(pillar).find(type=type).one()
+
+
+class AccessPolicyArtifact(StormBase):
+ implements(IAccessPolicyArtifact)
+
+ __storm_table__ = 'AccessPolicyArtifact'
+
+ id = Int(primary=True)
+ bug_id = Int(name='bug')
+ bug = Reference(bug_id, 'Bug.id')
+ branch_id = Int(name='branch')
+ branch = Reference(branch_id, 'Branch.id')
+ policy_id = Int(name='policy')
+ policy = Reference(policy_id, 'AccessPolicy.id')
+
+ @property
+ def concrete_artifact(self):
+ artifact = self.bug or self.branch
+ assert artifact is not None
+ return artifact
+
+ @staticmethod
+ def _getConcreteAttribute(concrete_artifact):
+ from lp.bugs.interfaces.bug import IBug
+ from lp.code.interfaces.branch import IBranch
+ if IBug.providedBy(concrete_artifact):
+ return 'bug'
+ elif IBranch.providedBy(concrete_artifact):
+ return 'branch'
+ else:
+ raise AssertionError(
+ "%r is not a valid artifact" % concrete_artifact)
+
+ @classmethod
+ def get(cls, concrete_artifact):
+ """See `IAccessPolicyArtifactSource`."""
+ constraints = {
+ cls._getConcreteAttribute(concrete_artifact): concrete_artifact}
+ return IStore(cls).find(cls, **constraints).one()
+
+ @classmethod
+ def ensure(cls, concrete_artifact):
+ """See `IAccessPolicyArtifactSource`."""
+ existing = cls.get(concrete_artifact)
+ if existing is not None:
+ return existing
+ # No existing object. Create a new one.
+ obj = cls()
+ setattr(
+ obj, cls._getConcreteAttribute(concrete_artifact),
+ concrete_artifact)
+ IStore(cls).add(obj)
+ return obj
+
+ @classmethod
+ def delete(cls, concrete_artifact):
+ """See `IAccessPolicyArtifactSource`."""
+ abstract = cls.get(concrete_artifact)
+ if abstract is None:
+ return
+ IStore(abstract).find(
+ AccessPolicyGrant, abstract_artifact=abstract).remove()
+ IStore(abstract).find(AccessPolicyArtifact, id=abstract.id).remove()
+
+
+class AccessPolicyGrant(StormBase):
+ implements(IAccessPolicyGrant)
+
+ __storm_table__ = 'AccessPolicyGrant'
+
+ id = Int(primary=True)
+ grantee_id = Int(name='grantee')
+ grantee = Reference(grantee_id, 'Person.id')
+ policy_id = Int(name='policy')
+ policy = Reference(policy_id, 'AccessPolicy.id')
+ abstract_artifact_id = Int(name='artifact')
+ abstract_artifact = Reference(
+ abstract_artifact_id, 'AccessPolicyArtifact.id')
+ grantor_id = Int(name='grantor')
+ grantor = Reference(grantor_id, 'Person.id')
+ date_created = DateTime()
+
+ @property
+ def concrete_artifact(self):
+ if self.abstract_artifact is not None:
+ return self.abstract_artifact.concrete_artifact
+
+ @classmethod
+ def grant(cls, grantee, grantor, object):
+ """See `IAccessPolicyGrantSource`."""
+ grant = cls()
+ grant.grantee = grantee
+ grant.grantor = grantor
+ if IAccessPolicy.providedBy(object):
+ grant.policy = object
+ elif IAccessPolicyArtifact.providedBy(object):
+ grant.abstract_artifact = object
+ else:
+ raise AssertionError("Unsupported object: %r" % object)
+ IStore(cls).add(grant)
+ return grant
+
+ @classmethod
+ def getByID(cls, id):
+ """See `IAccessPolicyGrantSource`."""
+ return IStore(cls).get(cls, id)
+
+ @classmethod
+ def findByPolicy(cls, policy):
+ """See `IAccessPolicyGrantSource`."""
+ return IStore(cls).find(cls, policy=policy)
=== added file 'lib/lp/registry/tests/test_accesspolicy.py'
--- lib/lp/registry/tests/test_accesspolicy.py 1970-01-01 00:00:00 +0000
+++ lib/lp/registry/tests/test_accesspolicy.py 2011-11-21 09:20:32 +0000
@@ -0,0 +1,287 @@
+# Copyright 2011 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+__metaclass__ = type
+
+from storm.exceptions import LostObjectError
+from storm.store import Store
+from testtools.matchers import MatchesStructure
+from zope.component import getUtility
+
+from canonical.launchpad.interfaces.lpstorm import IStore
+from canonical.testing.layers import DatabaseFunctionalLayer
+from lp.registry.interfaces.accesspolicy import (
+ AccessPolicyType,
+ IAccessPolicy,
+ IAccessPolicyArtifact,
+ IAccessPolicyArtifactSource,
+ IAccessPolicyGrant,
+ IAccessPolicyGrantSource,
+ IAccessPolicySource,
+ )
+from lp.testing import TestCaseWithFactory
+from lp.testing.matchers import Provides
+
+
+class TestAccessPolicy(TestCaseWithFactory):
+ layer = DatabaseFunctionalLayer
+
+ def test_provides_interface(self):
+ self.assertThat(
+ self.factory.makeAccessPolicy(), Provides(IAccessPolicy))
+
+ def test_pillar(self):
+ product = self.factory.makeProduct()
+ policy = self.factory.makeAccessPolicy(pillar=product)
+ self.assertEqual(product, policy.pillar)
+
+
+class TestAccessPolicySource(TestCaseWithFactory):
+ layer = DatabaseFunctionalLayer
+
+ def test_create_for_product(self):
+ product = self.factory.makeProduct()
+ type = AccessPolicyType.SECURITY
+ policy = getUtility(IAccessPolicySource).create(product, type)
+ self.assertThat(
+ policy,
+ MatchesStructure.byEquality(pillar=product, type=type))
+
+ def test_getByID(self):
+ # getByID finds the right policy.
+ policy = self.factory.makeAccessPolicy()
+ # Flush so we get an ID.
+ Store.of(policy).flush()
+ self.assertEqual(
+ policy, getUtility(IAccessPolicySource).getByID(policy.id))
+
+ def test_getByID_nonexistent(self):
+ # getByID returns None if the policy doesn't exist.
+ self.assertIs(
+ None,
+ getUtility(IAccessPolicySource).getByID(
+ self.factory.getUniqueInteger()))
+
+ def test_getByPillarAndType(self):
+ # getByPillarAndType finds the right policy.
+ product = self.factory.makeProduct()
+
+ private_policy = self.factory.makeAccessPolicy(
+ pillar=product, type=AccessPolicyType.PRIVATE)
+ security_policy = self.factory.makeAccessPolicy(
+ pillar=product, type=AccessPolicyType.SECURITY)
+ self.assertEqual(
+ private_policy,
+ getUtility(IAccessPolicySource).getByPillarAndType(
+ product, AccessPolicyType.PRIVATE))
+ self.assertEqual(
+ security_policy,
+ getUtility(IAccessPolicySource).getByPillarAndType(
+ product, AccessPolicyType.SECURITY))
+
+ def test_getByPillarAndType_nonexistent(self):
+ # getByPillarAndType returns None if the policy doesn't exist.
+ # Create policy identifiers, and an unrelated policy.
+ self.factory.makeAccessPolicy(type=AccessPolicyType.PRIVATE)
+ product = self.factory.makeProduct()
+ self.assertIs(
+ None,
+ getUtility(IAccessPolicySource).getByPillarAndType(
+ product, AccessPolicyType.PRIVATE))
+
+ def test_findByPillar(self):
+ # findByPillar finds only the relevant policies.
+ product = self.factory.makeProduct()
+ policies = [
+ self.factory.makeAccessPolicy(pillar=product, type=type)
+ for type in AccessPolicyType.items]
+ self.factory.makeAccessPolicy()
+ self.assertContentEqual(
+ policies,
+ getUtility(IAccessPolicySource).findByPillar(product))
+
+
+class TestAccessPolicyArtifact(TestCaseWithFactory):
+ layer = DatabaseFunctionalLayer
+
+ def test_provides_interface(self):
+ self.assertThat(
+ self.factory.makeAccessPolicyArtifact(),
+ Provides(IAccessPolicyArtifact))
+
+ def test_policy(self):
+ policy = self.factory.makeAccessPolicy()
+ self.assertEqual(
+ policy,
+ self.factory.makeAccessPolicyArtifact(policy=policy).policy)
+
+
+class TestAccessPolicyArtifactSourceOnce(TestCaseWithFactory):
+ layer = DatabaseFunctionalLayer
+
+ def test_ensure_other_fails(self):
+ # ensure() rejects unsupported objects.
+ self.assertRaises(
+ AssertionError,
+ getUtility(IAccessPolicyArtifactSource).ensure,
+ self.factory.makeProduct())
+
+
+class BaseAccessPolicyArtifactTests:
+ layer = DatabaseFunctionalLayer
+
+ def getConcreteArtifact(self):
+ raise NotImplementedError()
+
+ def test_ensure(self):
+ # ensure() creates an abstract artifact which maps to the
+ # concrete one.
+ concrete = self.getConcreteArtifact()
+ abstract = getUtility(IAccessPolicyArtifactSource).ensure(concrete)
+ Store.of(abstract).flush()
+ self.assertEqual(concrete, abstract.concrete_artifact)
+
+ def test_get(self):
+ # get() finds an abstract artifact which maps to the concrete
+ # one.
+ concrete = self.getConcreteArtifact()
+ abstract = getUtility(IAccessPolicyArtifactSource).ensure(concrete)
+ self.assertEqual(
+ abstract, getUtility(IAccessPolicyArtifactSource).get(concrete))
+
+ def test_ensure_twice(self):
+ # ensure() will reuse an existing matching abstract artifact if
+ # it exists.
+ concrete = self.getConcreteArtifact()
+ abstract = getUtility(IAccessPolicyArtifactSource).ensure(concrete)
+ Store.of(abstract).flush()
+ self.assertEqual(
+ abstract.id,
+ getUtility(IAccessPolicyArtifactSource).ensure(concrete).id)
+
+ def test_delete(self):
+ # delete() removes the abstract artifact and any associated
+ # grants.
+ concrete = self.getConcreteArtifact()
+ abstract = getUtility(IAccessPolicyArtifactSource).ensure(concrete)
+ grant = self.factory.makeAccessPolicyGrant(object=abstract)
+
+ # Make some other grants to ensure they're unaffected.
+ other_grants = [
+ self.factory.makeAccessPolicyGrant(
+ object=self.factory.makeAccessPolicyArtifact()),
+ self.factory.makeAccessPolicyGrant(
+ object=self.factory.makeAccessPolicy()),
+ ]
+
+ getUtility(IAccessPolicyArtifactSource).delete(concrete)
+ IStore(grant).invalidate()
+ self.assertRaises(LostObjectError, getattr, grant, 'policy')
+ self.assertRaises(
+ LostObjectError, getattr, abstract, 'concrete_artifact')
+
+ for other_grant in other_grants:
+ self.assertEqual(
+ other_grant,
+ getUtility(IAccessPolicyGrantSource).getByID(other_grant.id))
+
+ def test_delete_noop(self):
+ # delete() works even if there's no abstract artifact.
+ concrete = self.getConcreteArtifact()
+ getUtility(IAccessPolicyArtifactSource).delete(concrete)
+
+
+class TestAccessPolicyArtifactBranch(BaseAccessPolicyArtifactTests,
+ TestCaseWithFactory):
+
+ def getConcreteArtifact(self):
+ return self.factory.makeBranch()
+
+
+class TestAccessPolicyArtifactBug(BaseAccessPolicyArtifactTests,
+ TestCaseWithFactory):
+
+ def getConcreteArtifact(self):
+ return self.factory.makeBug()
+
+
+class TestAccessPolicyGrant(TestCaseWithFactory):
+ layer = DatabaseFunctionalLayer
+
+ def test_provides_interface(self):
+ self.assertThat(
+ self.factory.makeAccessPolicyGrant(),
+ Provides(IAccessPolicyGrant))
+
+ def test_concrete_artifact(self):
+ bug = self.factory.makeBug()
+ abstract = self.factory.makeAccessPolicyArtifact(bug)
+ grant = self.factory.makeAccessPolicyGrant(
+ object=abstract)
+ self.assertEqual(bug, grant.concrete_artifact)
+
+ def test_no_concrete_artifact(self):
+ grant = self.factory.makeAccessPolicyGrant(
+ object=self.factory.makeAccessPolicy())
+ self.assertIs(None, grant.concrete_artifact)
+
+
+class TestAccessPolicyGrantSource(TestCaseWithFactory):
+ layer = DatabaseFunctionalLayer
+
+ def test_grant_for_policy(self):
+ policy = self.factory.makeAccessPolicy()
+ grantee = self.factory.makePerson()
+ grantor = self.factory.makePerson()
+ grant = getUtility(IAccessPolicyGrantSource).grant(
+ grantee, grantor, policy)
+ self.assertThat(
+ grant,
+ MatchesStructure.byEquality(
+ grantee=grantee,
+ grantor=grantor,
+ policy=policy,
+ abstract_artifact=None,
+ concrete_artifact=None,))
+
+ def test_grant_with_artifact(self):
+ artifact = self.factory.makeAccessPolicyArtifact()
+ grantee = self.factory.makePerson()
+ grantor = self.factory.makePerson()
+ grant = getUtility(IAccessPolicyGrantSource).grant(
+ grantee, grantor, artifact)
+ self.assertThat(
+ grant,
+ MatchesStructure.byEquality(
+ grantee=grantee,
+ grantor=grantor,
+ policy=None,
+ abstract_artifact=artifact,
+ concrete_artifact=artifact.concrete_artifact))
+
+ def test_getByID(self):
+ # getByID finds the right grant.
+ grant = self.factory.makeAccessPolicyGrant()
+ # Flush so we get an ID.
+ Store.of(grant).flush()
+ self.assertEqual(
+ grant,
+ getUtility(IAccessPolicyGrantSource).getByID(grant.id))
+
+ def test_getByID_nonexistent(self):
+ # getByID returns None if the grant doesn't exist.
+ self.assertIs(
+ None,
+ getUtility(IAccessPolicyGrantSource).getByID(
+ self.factory.getUniqueInteger()))
+
+ def test_findByPolicy(self):
+ # findByPolicy finds only the relevant grants.
+ policy = self.factory.makeAccessPolicy()
+ grants = [
+ self.factory.makeAccessPolicyGrant(object=policy)
+ for i in range(3)]
+ self.factory.makeAccessPolicyGrant()
+ self.assertContentEqual(
+ grants,
+ getUtility(IAccessPolicyGrantSource).findByPolicy(policy))
=== modified file 'lib/lp/testing/factory.py'
--- lib/lp/testing/factory.py 2011-11-20 23:37:23 +0000
+++ lib/lp/testing/factory.py 2011-11-21 09:20:32 +0000
@@ -174,6 +174,12 @@
DistroSeriesDifferenceStatus,
DistroSeriesDifferenceType,
)
+from lp.registry.interfaces.accesspolicy import (
+ AccessPolicyType,
+ IAccessPolicyArtifactSource,
+ IAccessPolicyGrantSource,
+ IAccessPolicySource,
+ )
from lp.registry.interfaces.distribution import (
IDistribution,
IDistributionSet,
@@ -4318,6 +4324,33 @@
target_distroseries, target_pocket,
package_version=package_version, requester=requester)
+ def makeAccessPolicy(self, pillar=None, type=AccessPolicyType.PRIVATE):
+ if pillar is None:
+ pillar = self.makeProduct()
+ policy = getUtility(IAccessPolicySource).create(pillar, type)
+ IStore(policy).flush()
+ return policy
+
+ def makeAccessPolicyArtifact(self, concrete=None, policy=None):
+ if concrete is None:
+ concrete = self.makeBranch()
+ artifact = getUtility(IAccessPolicyArtifactSource).ensure(concrete)
+ artifact.policy = policy
+ IStore(artifact).flush()
+ return artifact
+
+ def makeAccessPolicyGrant(self, grantee=None, object=None, grantor=None):
+ if grantee is None:
+ grantee = self.makePerson()
+ if grantor is None:
+ grantor = self.makePerson()
+ if object is None:
+ object = self.makeAccessPolicy()
+ grant = getUtility(IAccessPolicyGrantSource).grant(
+ grantee, grantor, object)
+ IStore(grant).flush()
+ return grant
+
# Some factory methods return simple Python types. We don't add
# security wrappers for them, as well as for objects created by