← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~rharding/launchpad/nonpublic_1052659 into lp:launchpad

 

Richard Harding has proposed merging lp:~rharding/launchpad/nonpublic_1052659 into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~rharding/launchpad/nonpublic_1052659/+merge/136479

= Summary =

The code was changed to only pull public specifications. In order to change
the method to be privacy aware it needed to be storm-ified and in the end much
of the query building tools already existed to do that.


== Pre Implementation ==

A little bit of chat with Aaron getting a sanity check on the storm-ification.


== Implementation Notes ==

Moved some dupe code out so that it could be reused to generate the
eager loading of the various person fields.

Updated the specifications method to work via a constructed storm query adding
in the privacy tables and clauses via the existing visible_specification_query
method.

Then used the existing get_specification_filters to filter as required.

Then finally just added a quick test to make sure we get/don't get the right
specs when using the method.

== QA ==

Per the bug, having both public and non-public specs the owner should see
the non-public ones they have a grant to while the public only user should not
see them listed on the home page.

== Tests ==

New test in lib/lp/blueprints/tests/test_specification.py

The actual code is tested via a lot of other locations though.

-- 
https://code.launchpad.net/~rharding/launchpad/nonpublic_1052659/+merge/136479
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~rharding/launchpad/nonpublic_1052659 into lp:launchpad.
=== modified file 'lib/lp/blueprints/model/specification.py'
--- lib/lp/blueprints/model/specification.py	2012-11-26 08:33:03 +0000
+++ lib/lp/blueprints/model/specification.py	2012-11-29 15:36:27 +0000
@@ -108,7 +108,6 @@
 from lp.services.database.lpstorm import IStore
 from lp.services.database.sqlbase import (
     cursor,
-    quote,
     SQLBase,
     sqlvalues,
     )
@@ -996,11 +995,12 @@
         :return: A DecoratedResultSet with Person precaching setup.
         """
         # Circular import.
-        from lp.registry.model.person import Person
         if isinstance(clauses, basestring):
             clauses = [SQL(clauses)]
 
         def cache_people(rows):
+            """DecoratedResultSet pre_iter_hook to eager load Person attributes."""
+            from lp.registry.model.person import Person
             # Find the people we need:
             person_ids = set()
             for spec in rows:
@@ -1017,7 +1017,7 @@
             origin.extend(validity_info["joins"])
             columns.extend(validity_info["tables"])
             decorators = validity_info["decorators"]
-            personset = Store.of(self).using(*origin).find(
+            personset = IStore(Specification).using(*origin).find(
                 tuple(columns),
                 Person.id.is_in(person_ids),
                 )
@@ -1029,7 +1029,7 @@
                     index += 1
                     decorator(person, column)
 
-        results = Store.of(self).using(*tables).find(Specification, *clauses)
+        results = IStore(Specification).using(*tables).find(Specification, *clauses)
         return DecoratedResultSet(results, pre_iter_hook=cache_people)
 
     @property
@@ -1088,100 +1088,40 @@
 
     def specifications(self, user, sort=None, quantity=None, filter=None,
                        prejoin_people=True):
-        """See IHasSpecifications."""
-
-        # Make a new list of the filter, so that we do not mutate what we
-        # were passed as a filter
+        store = IStore(Specification)
+
+        # Take the visibility due to privacy into account.
+        privacy_tables, clauses = visible_specification_query(user)
+
         if not filter:
-            # When filter is None or [] then we decide the default
-            # which for a product is to show incomplete specs
+            # Default to showing incomplete specs
             filter = [SpecificationFilter.INCOMPLETE]
 
-        # now look at the filter and fill in the unsaid bits
-
-        # defaults for completeness: if nothing is said about completeness
-        # then we want to show INCOMPLETE
-        completeness = False
-        for option in [
-            SpecificationFilter.COMPLETE,
-            SpecificationFilter.INCOMPLETE]:
-            if option in filter:
-                completeness = True
-        if completeness is False:
-            filter.append(SpecificationFilter.INCOMPLETE)
-
-        # defaults for acceptance: in this case we have nothing to do
-        # because specs are not accepted/declined against a distro
-
-        # defaults for informationalness: we don't have to do anything
-        # because the default if nothing is said is ANY
+        spec_clauses = get_specification_filters(filter)
+        clauses.extend(spec_clauses)
 
         # sort by priority descending, by default
         if sort is None or sort == SpecificationSort.PRIORITY:
-            order = ['-priority', 'Specification.definition_status',
-                     'Specification.name']
+            order = [Desc(Specification.priority),
+                     Specification.definition_status,
+                     Specification.name]
+
         elif sort == SpecificationSort.DATE:
             if SpecificationFilter.COMPLETE in filter:
                 # if we are showing completed, we care about date completed
-                order = ['-Specification.date_completed', 'Specification.id']
+                order = [Desc(Specification.date_completed),
+                         Specification.id]
             else:
                 # if not specially looking for complete, we care about date
                 # registered
-                order = ['-Specification.datecreated', 'Specification.id']
-
-        # figure out what set of specifications we are interested in. for
-        # products, we need to be able to filter on the basis of:
-        #
-        #  - completeness.
-        #  - informational.
-        #
-
-        # filter out specs on inactive products
-        base = """((Specification.product IS NULL OR
-                   Specification.product NOT IN
-                    (SELECT Product.id FROM Product
-                     WHERE Product.active IS FALSE))
-                   AND Specification.information_type = 1)
-                """
-        query = base
-        # look for informational specs
-        if SpecificationFilter.INFORMATIONAL in filter:
-            query += (' AND Specification.implementation_status = %s ' %
-                quote(SpecificationImplementationStatus.INFORMATIONAL.value))
-
-        # filter based on completion. see the implementation of
-        # Specification.is_complete() for more details
-        completeness = Specification.completeness_clause
-
-        if SpecificationFilter.COMPLETE in filter:
-            query += ' AND ( %s ) ' % completeness
-        elif SpecificationFilter.INCOMPLETE in filter:
-            query += ' AND NOT ( %s ) ' % completeness
-
-        # Filter for validity. If we want valid specs only then we should
-        # exclude all OBSOLETE or SUPERSEDED specs
-        if SpecificationFilter.VALID in filter:
-            # XXX: kiko 2007-02-07: this is untested and was broken.
-            query += (
-                ' AND Specification.definition_status NOT IN ( %s, %s ) ' %
-                sqlvalues(SpecificationDefinitionStatus.OBSOLETE,
-                          SpecificationDefinitionStatus.SUPERSEDED))
-
-        # ALL is the trump card
-        if SpecificationFilter.ALL in filter:
-            query = base
-
-        # Filter for specification text
-        for constraint in filter:
-            if isinstance(constraint, basestring):
-                # a string in the filter is a text search filter
-                query += ' AND Specification.fti @@ ftq(%s) ' % quote(
-                    constraint)
-
-        results = Specification.select(query, orderBy=order, limit=quantity)
+                order = [Desc(Specification.datecreated), Specification.id]
+
         if prejoin_people:
-            results = results.prejoin(['assignee', 'approver', 'drafter'])
-        return results
+            results = self._preload_specifications_people(privacy_tables, clauses)
+        else:
+            results = store.using(*privacy_tables).find(
+                Specification, *clauses)
+        return results.order_by(*order)[:quantity]
 
     def getByURL(self, url):
         """See ISpecificationSet."""

=== modified file 'lib/lp/blueprints/tests/test_specification.py'
--- lib/lp/blueprints/tests/test_specification.py	2012-11-22 14:06:36 +0000
+++ lib/lp/blueprints/tests/test_specification.py	2012-11-29 15:36:27 +0000
@@ -5,7 +5,11 @@
 
 __metaclass__ = type
 
-
+from datetime import (
+    datetime,
+    timedelta,
+    )
+import pytz
 from storm.store import Store
 from zope.component import (
     getUtility,
@@ -28,7 +32,11 @@
 from lp.blueprints.enums import (
     NewSpecificationDefinitionStatus,
     SpecificationDefinitionStatus,
+    SpecificationFilter,
     SpecificationGoalStatus,
+    SpecificationImplementationStatus,
+    SpecificationPriority,
+    SpecificationSort,
     )
 from lp.blueprints.errors import TargetAlreadyHasSpecification
 from lp.blueprints.interfaces.specification import ISpecificationSet
@@ -40,6 +48,7 @@
     SharingPermission,
     SpecificationSharingPolicy,
     )
+from lp.registry.interfaces.accesspolicy import IAccessPolicySource
 from lp.security import (
     AdminSpecification,
     EditSpecificationByRelatedPeople,
@@ -533,3 +542,203 @@
             definition_status=SpecificationDefinitionStatus.OBSOLETE)
         self.assertRaises(
             AssertionError, self.specification_set.new, **args)
+
+
+def list_result(context, filter=None, user=None):
+    result = context.specifications(
+        user, SpecificationSort.DATE, filter=filter)
+    return list(result)
+
+
+class TestSpecifications(TestCaseWithFactory):
+
+    layer = DatabaseFunctionalLayer
+
+    def setUp(self):
+        super(TestSpecifications, self).setUp()
+        self.date_created = datetime.now(pytz.utc)
+
+    def makeSpec(self, product=None, date_created=0, title=None,
+                 status=NewSpecificationDefinitionStatus.NEW,
+                 name=None, priority=None, information_type=None):
+        blueprint = self.factory.makeSpecification(
+            title=title, status=status, name=name, priority=priority,
+            information_type=information_type, product=product,
+            )
+        removeSecurityProxy(blueprint).datecreated = (
+            self.date_created + timedelta(date_created))
+        return blueprint
+
+    def test_specifications_quantity(self):
+        # Ensure the quantity controls the maximum number of entries.
+        context = getUtility(ISpecificationSet)
+        product = self.factory.makeProduct()
+        for count in range(10):
+            self.factory.makeSpecification(product=product)
+        self.assertEqual(20, context.specifications(None).count())
+        result = context.specifications(None, quantity=None).count()
+        self.assertEqual(20, result)
+        self.assertEqual(8, context.specifications(None, quantity=8).count())
+        self.assertEqual(11, context.specifications(None, quantity=11).count())
+
+    def test_date_sort(self):
+        # Sort on date_created.
+        context = getUtility(ISpecificationSet)
+        product = self.factory.makeProduct()
+        blueprint1 = self.makeSpec(product, date_created=0)
+        blueprint2 = self.makeSpec(product, date_created=-1)
+        blueprint3 = self.makeSpec(product, date_created=1)
+        result = list_result(context)
+        self.assertEqual([blueprint3, blueprint1, blueprint2], result[0:3])
+
+    def test_date_sort_id(self):
+        # date-sorting when no date varies uses object id.
+        context = getUtility(ISpecificationSet)
+        product = self.factory.makeProduct()
+        blueprint1 = self.makeSpec(product)
+        blueprint2 = self.makeSpec(product)
+        blueprint3 = self.makeSpec(product)
+        result = list_result(context)
+        self.assertEqual([blueprint1, blueprint2, blueprint3], result[0:3])
+
+    def test_priority_sort(self):
+        # Sorting by priority works and is the default.
+        # When priority is supplied, status is ignored.
+        context = getUtility(ISpecificationSet)
+        blueprint1 = self.makeSpec(priority=SpecificationPriority.UNDEFINED,
+                                   status=SpecificationDefinitionStatus.NEW)
+        product = blueprint1.product
+        blueprint2 = self.makeSpec(
+            product, priority=SpecificationPriority.NOTFORUS,
+            status=SpecificationDefinitionStatus.APPROVED)
+        blueprint3 = self.makeSpec(
+            product, priority=SpecificationPriority.LOW,
+            status=SpecificationDefinitionStatus.NEW)
+        result = list(context.specifications(
+            None, sort=SpecificationSort.PRIORITY))
+        self.assertTrue(
+            result.index(blueprint3) <
+            result.index(blueprint1) <
+            result.index(blueprint2))
+
+    def test_priority_sort_fallback_is_priority(self):
+        # Sorting by default falls back to Priority
+        context = getUtility(ISpecificationSet)
+        blueprint1 = self.makeSpec(name='b')
+        product = blueprint1.product
+        self.makeSpec(product, name='c')
+        self.makeSpec(product, name='a')
+        base_result = context.specifications(None)
+        priority_result = context.specifications(
+            None, sort=SpecificationSort.PRIORITY)
+        self.assertEqual(list(base_result), list(priority_result))
+
+    def test_informational(self):
+        # INFORMATIONAL causes only informational specs to be shown.
+        context = getUtility(ISpecificationSet)
+        enum = SpecificationImplementationStatus
+        informational = self.factory.makeSpecification(
+            implementation_status=enum.INFORMATIONAL)
+        product = informational.product
+        plain = self.factory.makeSpecification(product=product)
+        result = context.specifications(None)
+        self.assertIn(informational, result)
+        self.assertIn(plain, result)
+        result = context.specifications(
+            None, filter=[SpecificationFilter.INFORMATIONAL])
+        self.assertIn(informational, result)
+        self.assertNotIn(plain, result)
+
+    def test_completeness(self):
+        # If COMPLETE is specified, completed specs are listed.  If INCOMPLETE
+        # is specified or neither is specified, only incomplete specs are
+        # listed.
+        context = getUtility(ISpecificationSet)
+        enum = SpecificationImplementationStatus
+        implemented = self.factory.makeSpecification(
+            implementation_status=enum.IMPLEMENTED)
+        product = implemented.product
+        non_implemented = self.factory.makeSpecification(product=product)
+        result = context.specifications(
+            None, filter=[SpecificationFilter.COMPLETE])
+        self.assertIn(implemented, result)
+        self.assertNotIn(non_implemented, result)
+
+        result = context.specifications(
+            None, filter=[SpecificationFilter.INCOMPLETE])
+        self.assertNotIn(implemented, result)
+        self.assertIn(non_implemented, result)
+        result = context.specifications(None)
+        self.assertNotIn(implemented, result)
+        self.assertIn(non_implemented, result)
+
+    def test_all(self):
+        # ALL causes both complete and incomplete to be listed.
+        context = getUtility(ISpecificationSet)
+        enum = SpecificationImplementationStatus
+        implemented = self.factory.makeSpecification(
+            implementation_status=enum.IMPLEMENTED)
+        product = implemented.product
+        non_implemented = self.factory.makeSpecification(product=product)
+        result = context.specifications(None, filter=[SpecificationFilter.ALL])
+        self.assertIn(implemented, result)
+        self.assertIn(non_implemented, result)
+
+    def test_valid(self):
+        # VALID adjusts COMPLETE to exclude OBSOLETE and SUPERSEDED specs.
+        # (INCOMPLETE already excludes OBSOLETE and SUPERSEDED.)
+        context = getUtility(ISpecificationSet)
+        i_enum = SpecificationImplementationStatus
+        d_enum = SpecificationDefinitionStatus
+        implemented = self.factory.makeSpecification(
+            implementation_status=i_enum.IMPLEMENTED)
+        product = implemented.product
+        superseded = self.factory.makeSpecification(product=product,
+                                                    status=d_enum.SUPERSEDED)
+        self.factory.makeSpecification(product=product, status=d_enum.OBSOLETE)
+        filter = [SpecificationFilter.VALID, SpecificationFilter.COMPLETE]
+        results = context.specifications(None, filter=filter)
+        self.assertIn(implemented, results)
+        self.assertNotIn(superseded, results)
+
+    def test_text_search(self):
+        # Text searches work.
+        context = getUtility(ISpecificationSet)
+        blueprint1 = self.makeSpec(title='abc')
+        product = blueprint1.product
+        blueprint2 = self.makeSpec(product, title='def')
+        result = list_result(context, ['abc'])
+        self.assertEqual([blueprint1], result)
+        result = list_result(product, ['def'])
+        self.assertEqual([blueprint2], result)
+
+    def test_proprietary_not_listed(self):
+        # Proprietary blueprints are not listed for random users
+        context = getUtility(ISpecificationSet)
+        private_spec = self.makeSpec(
+            information_type=InformationType.PROPRIETARY)
+        self.assertNotIn(private_spec, list(context.specifications(None)))
+
+    def test_proprietary_listed_for_artifact_grant(self):
+        # Proprietary blueprints are listed for users with an artifact grant.
+        context = getUtility(ISpecificationSet)
+        blueprint1 = self.makeSpec(
+            information_type=InformationType.PROPRIETARY)
+        grant = self.factory.makeAccessArtifactGrant(
+            concrete_artifact=blueprint1)
+        self.assertIn(
+            blueprint1,
+            list(context.specifications(grant.grantee)))
+
+    def test_proprietary_listed_for_policy_grant(self):
+        # Proprietary blueprints are listed for users with a policy grant.
+        context = getUtility(ISpecificationSet)
+        blueprint1 = self.makeSpec(
+            information_type=InformationType.PROPRIETARY)
+        policy_source = getUtility(IAccessPolicySource)
+        (policy,) = policy_source.find(
+            [(blueprint1.product, InformationType.PROPRIETARY)])
+        grant = self.factory.makeAccessPolicyGrant(policy)
+        self.assertIn(
+            blueprint1,
+            list(context.specifications(user=grant.grantee)))


Follow ups