← Back to team overview

launchpad-reviewers team mailing list archive

lp:~wallyworld/launchpad/revoke-access-delete-subscriptions-job into lp:launchpad

 

Ian Booth has proposed merging lp:~wallyworld/launchpad/revoke-access-delete-subscriptions-job into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #992315 in Launchpad itself: "When revoking access to a project or artifact, user needs to be unsubscribed also"
  https://bugs.launchpad.net/launchpad/+bug/992315

For more details, see:
https://code.launchpad.net/~wallyworld/launchpad/revoke-access-delete-subscriptions-job/+merge/104198

== Implementation ==

This is the first branch which introduces the job infrastructure needed to run sharing/disclosure jobs. The first job is one which removes subscriptions for artifacts for which access has been revoked. There are 3 scenarios:

1. Revoke access to an individual artifact
2. Revoke access to artifacts of a given information type
3. Revoke access to an entire project/distro

This branch covers the first case above. The job is not wired up yet.

This branch requires that a db-devel branch land first in order to provide the necessary database table: lp:~wallyworld/launchpad/sharingjob-table

The job is implemented to use the new celery framework. Tests are provided to check that the job runs standalone (without celery). I'm not sure if these are required; they can be removed if required.

The job needs to unsubscribe a user from a bug or branch. unsubscribe() on IBug requires "Edit", for branches it requires "View" from what I can tell. removeSecurityProxy() is used to allow the job to work for bugs since there is no zope request which can be used to grant permission.

On error, the job emails the requestor and pillar maintainer about the error.

== Tests ==

A number of newly written  tests for the job are provided.

== Lint ==

Checking for conflicts and issues in changed files.

Linting changed files:
  configs/development/launchpad-lazr.conf
  configs/testrunner/launchpad-lazr.conf
  lib/lp/registry/configure.zcml
  lib/lp/registry/interfaces/sharingjob.py
  lib/lp/registry/model/sharingjob.py
  lib/lp/registry/tests/test_sharingjob.py
  lib/lp/services/config/schema-lazr.conf

./configs/development/launchpad-lazr.conf
      81: Line exceeds 80 characters.
     114: Line exceeds 80 characters.
./configs/testrunner/launchpad-lazr.conf
     126: Line exceeds 80 characters.
./lib/lp/services/config/schema-lazr.conf
     489: Line exceeds 80 characters.
    1100: Line exceeds 80 characters.
    1107: Line exceeds 80 characters.
    1710: Line exceeds 80 characters.
-- 
https://code.launchpad.net/~wallyworld/launchpad/revoke-access-delete-subscriptions-job/+merge/104198
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~wallyworld/launchpad/revoke-access-delete-subscriptions-job into lp:launchpad.
=== modified file 'configs/development/launchpad-lazr.conf'
--- configs/development/launchpad-lazr.conf	2012-03-20 20:50:47 +0000
+++ configs/development/launchpad-lazr.conf	2012-05-01 03:50:25 +0000
@@ -233,6 +233,10 @@
 password: guest
 virtual_host: /
 
+[sharing_jobs]
+error_dir: /var/tmp/sharing.test
+oops_prefix: DSJ
+
 [txlongpoll]
 launch: True
 frontend_port: 22435

=== modified file 'configs/testrunner/launchpad-lazr.conf'
--- configs/testrunner/launchpad-lazr.conf	2012-04-27 16:22:05 +0000
+++ configs/testrunner/launchpad-lazr.conf	2012-05-01 03:50:25 +0000
@@ -167,6 +167,10 @@
 oops_prefix: TPT
 error_dir: /var/tmp/lperr.test
 
+[sharing_jobs]
+oops_prefix: DSJ
+error_dir: /var/tmp/sharing.test
+
 [upgrade_branches]
 oops_prefix: TUB
 error_dir: /var/tmp/codehosting.test

=== modified file 'lib/lp/registry/configure.zcml'
--- lib/lp/registry/configure.zcml	2012-04-12 04:39:58 +0000
+++ lib/lp/registry/configure.zcml	2012-05-01 03:50:25 +0000
@@ -1983,4 +1983,19 @@
         <allow
             interface="lp.registry.interfaces.accesspolicy.IAccessPolicyGrantFlatSource"/>
     </securedutility>
+
+    <!-- Sharing jobs -->
+    <class class=".model.sharingjob.RemoveSubscriptionsJob">
+      <allow interface=".interfaces.sharingjob.IRemoveSubscriptionsJob"/>
+      <allow attributes="
+         context
+         log_name"/>
+    </class>
+
+    <securedutility
+        component=".model.sharingjob.RemoveSubscriptionsJob"
+        provides=".interfaces.sharingjob.IRemoveSubscriptionsJobSource">
+      <allow interface=".interfaces.sharingjob.IRemoveSubscriptionsJobSource"/>
+    </securedutility>
+
 </configure>

=== added file 'lib/lp/registry/interfaces/sharingjob.py'
--- lib/lp/registry/interfaces/sharingjob.py	1970-01-01 00:00:00 +0000
+++ lib/lp/registry/interfaces/sharingjob.py	2012-05-01 03:50:25 +0000
@@ -0,0 +1,88 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Interfaces for sharing jobs."""
+
+__metaclass__ = type
+
+__all__ = [
+    'IRemoveSubscriptionsJob',
+    'IRemoveSubscriptionsJobSource',
+    'ISharingJob',
+    'ISharingJobSource',
+    ]
+
+from zope.interface import Attribute
+from zope.schema import (
+    Int,
+    Object,
+    )
+
+from lp import _
+from lp.registry.interfaces.distribution import IDistribution
+from lp.registry.interfaces.person import IPerson
+from lp.registry.interfaces.product import IProduct
+from lp.services.job.interfaces.job import (
+    IJob,
+    IJobSource,
+    IRunnableJob,
+    )
+
+
+class ISharingJob(IRunnableJob):
+    """A Job for sharing related tasks."""
+
+    id = Int(
+        title=_('DB ID'), required=True, readonly=True,
+        description=_("The tracking number for this job."))
+
+    job = Object(title=_('The common Job attributes'), schema=IJob,
+        required=True)
+
+    product = Object(
+        title=_('The product the job is for'),
+        schema=IProduct)
+
+    distro = Object(
+        title=_('The distribution the job is for'),
+        schema=IDistribution)
+
+    grantee = Object(
+        title=_('The grantee the job is for'),
+        schema=IPerson)
+
+    metadata = Attribute('A dict of data about the job.')
+
+    def destroySelf():
+        """Destroy this object."""
+
+    def getErrorRecipients(self):
+        """See `BaseRunnableJob`."""
+
+    def pillar():
+        """Either product or distro, whichever is not None."""
+
+    def requestor():
+        """The person who initiated the job."""
+
+
+class IRemoveSubscriptionsJob(ISharingJob):
+    """Job to remove subscriptions to artifacts for which access is revoked."""
+
+
+class ISharingJobSource(IJobSource):
+    """Base interface for acquiring ISharingJobs."""
+
+    def create(pillar, grantee, metadata):
+        """Create a new ISharingJob."""
+
+
+class IRemoveSubscriptionsJobSource(ISharingJobSource):
+    """An interface for acquiring IRemoveSubscriptionsJobs."""
+
+    def create(pillar, grantee, requestor, bugs=None, branches=None):
+        """Create a new job to revoke access to the specified artifacts.
+
+        If bug and branches are both None, then all subscriptions the grantee
+        may have to any pillar artifacts are removed.
+        """

=== added file 'lib/lp/registry/model/sharingjob.py'
--- lib/lp/registry/model/sharingjob.py	1970-01-01 00:00:00 +0000
+++ lib/lp/registry/model/sharingjob.py	2012-05-01 03:50:25 +0000
@@ -0,0 +1,297 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+
+"""Job classes related to the sharing feature are in here."""
+
+__metaclass__ = type
+
+
+__all__ = [
+    'RemoveSubscriptionsJob',
+    ]
+
+import contextlib
+import logging
+
+from lazr.delegates import delegates
+from lazr.enum import (
+    DBEnumeratedType,
+    DBItem,
+    )
+import simplejson
+from sqlobject import SQLObjectNotFound
+from storm.expr import And
+from storm.locals import (
+    Int,
+    Reference,
+    Unicode,
+    )
+from storm.store import Store
+from zope.component import getUtility
+from zope.interface import (
+    classProvides,
+    implements,
+    )
+from zope.security.proxy import removeSecurityProxy
+
+from lp.bugs.interfaces.bug import IBugSet
+from lp.code.interfaces.branchlookup import IBranchLookup
+from lp.registry.interfaces.person import IPersonSet
+from lp.registry.interfaces.product import IProduct
+from lp.registry.interfaces.sharingjob import (
+    IRemoveSubscriptionsJob,
+    IRemoveSubscriptionsJobSource,
+    ISharingJob,
+    ISharingJobSource,
+    )
+from lp.registry.model.distribution import Distribution
+from lp.registry.model.person import Person
+from lp.registry.model.product import Product
+from lp.services.config import config
+from lp.services.database.enumcol import EnumCol
+from lp.services.database.lpstorm import IStore
+from lp.services.database.stormbase import StormBase
+from lp.services.job.model.job import (
+    EnumeratedSubclass,
+    Job,
+    )
+from lp.services.job.runner import (
+    BaseRunnableJob,
+    )
+from lp.services.mail.sendmail import format_address_for_person
+from lp.services.webapp import errorlog
+
+
+class SharingJobType(DBEnumeratedType):
+    """Values that ISharingJob.job_type can take."""
+
+    REMOVE_SUBSCRIPTIONS = DBItem(0, """
+        Remove subscriptions when access is revoked.
+
+        This job removes subscriptions to artifacts when access is
+        revoked for a particular information type or artifact.
+        """)
+
+
+class SharingJob(StormBase):
+    """Base class for jobs related to branch merge proposals."""
+
+    implements(ISharingJob)
+
+    __storm_table__ = 'SharingJob'
+
+    id = Int(primary=True)
+
+    job_id = Int('job')
+    job = Reference(job_id, Job.id)
+
+    product_id = Int(name='product')
+    product = Reference(product_id, Product.id)
+
+    distro_id = Int(name='distro')
+    distro = Reference(distro_id, Distribution.id)
+
+    grantee_id = Int(name='grantee')
+    grantee = Reference(grantee_id, Person.id)
+
+    job_type = EnumCol(enum=SharingJobType, notNull=True)
+
+    _json_data = Unicode('json_data')
+
+    @property
+    def metadata(self):
+        return simplejson.loads(self._json_data)
+
+    def __init__(self, job_type, pillar, grantee, metadata):
+        """Constructor.
+
+        :param job_type: The BranchMergeProposalJobType of this job.
+        :param metadata: The type-specific variables, as a JSON-compatible
+            dict.
+        """
+        super(SharingJob, self).__init__()
+        json_data = simplejson.dumps(metadata)
+        self.job = Job()
+        self.job_type = job_type
+        self.grantee = grantee
+        self.product = self.distro = None
+        if IProduct.providedBy(pillar):
+            self.product = pillar
+        else:
+            self.distro = pillar
+        # XXX AaronBentley 2009-01-29 bug=322819: This should be a bytestring,
+        # but the DB representation is unicode.
+        self._json_data = json_data.decode('utf-8')
+
+    def destroySelf(self):
+        Store.of(self).remove(self)
+
+    def makeDerived(self):
+        return SharingJobDerived.makeSubclass(self)
+
+
+class SharingJobDerived(BaseRunnableJob):
+    """Intermediate class for deriving from SharingJob."""
+
+    __metaclass__ = EnumeratedSubclass
+
+    delegates(ISharingJob)
+    classProvides(ISharingJobSource)
+
+    @staticmethod
+    @contextlib.contextmanager
+    def contextManager():
+        """See `IJobSource`."""
+        errorlog.globalErrorUtility.configure('sharing_jobs')
+        yield
+
+    def __init__(self, job):
+        self.context = job
+
+    def __repr__(self):
+        return '<%(job_type)s job for %(grantee)s and %(pillar)s>' % {
+            'job_type': self.context.job_type.name,
+            'grantee': self.grantee.displayname,
+            'pillar': self.pillar.displayname,
+            }
+
+    @property
+    def pillar(self):
+        if self.product:
+            return self.product
+        else:
+            return self.distro
+
+    @property
+    def log_name(self):
+        return self.__class__.__name__
+
+    @classmethod
+    def create(cls, pillar, grantee, metadata):
+        base_job = SharingJob(cls.class_job_type, pillar, grantee, metadata)
+        job = cls(base_job)
+        job.celeryRunOnCommit()
+        return job
+
+    @classmethod
+    def get(cls, job_id):
+        """Get a job by id.
+
+        :return: the SharingJob with the specified id, as the
+            current SharingJobDereived subclass.
+        :raises: SQLObjectNotFound if there is no job with the specified id,
+            or its job_type does not match the desired subclass.
+        """
+        job = SharingJob.get(job_id)
+        if job.job_type != cls.class_job_type:
+            raise SQLObjectNotFound(
+                'No object found with id %d and type %s' % (job_id,
+                cls.class_job_type.title))
+        return cls(job)
+
+    @classmethod
+    def iterReady(cls):
+        """See `IJobSource`.
+
+        This version will emit any ready job based on SharingJob.
+        """
+        store = IStore(SharingJob)
+        jobs = store.find(
+            SharingJob,
+            And(SharingJob.job_type == cls.class_job_type,
+                SharingJob.job_id.is_in(Job.ready_jobs)))
+        return (cls.makeSubclass(job) for job in jobs)
+
+    def getOopsVars(self):
+        """See `IRunnableJob`."""
+        vars = BaseRunnableJob.getOopsVars(self)
+        vars.extend([
+            ('sharing_job_id', self.context.id),
+            ('sharing_job_type', self.context.job_type.title),
+            ('grantee', self.grantee.name)])
+        if self.product:
+            vars.append(('product', self.product.name))
+        if self.distro:
+            vars.append(('distro', self.distro.name))
+        return vars
+
+
+class RemoveSubscriptionsJob(SharingJobDerived):
+    """See `IRemoveSubscriptionsJob`."""
+
+    implements(IRemoveSubscriptionsJob)
+    classProvides(IRemoveSubscriptionsJobSource)
+    class_job_type = SharingJobType.REMOVE_SUBSCRIPTIONS
+
+    config = config.sharing_jobs
+
+    @classmethod
+    def create(cls, pillar, grantee, requestor, bugs=None, branches=None):
+        """See `IRemoveSubscriptionsJob`."""
+
+        bug_ids = [
+            bug.id for bug in bugs or []
+        ]
+        branch_names = [
+            branch.unique_name for branch in branches or []
+        ]
+        metadata = {
+            'bug_ids': bug_ids,
+            'branch_names': branch_names,
+            'requestor.id': requestor.id
+        }
+        return super(RemoveSubscriptionsJob, cls).create(
+            pillar, grantee, metadata)
+
+    @property
+    def requestor_id(self):
+        return self.metadata['requestor.id']
+
+    @property
+    def requestor(self):
+        return getUtility(IPersonSet).get(self.requestor_id)
+
+    @property
+    def bug_ids(self):
+        return self.metadata['bug_ids']
+
+    @property
+    def branch_names(self):
+        return self.metadata['branch_names']
+
+    def getErrorRecipients(self):
+        # If something goes wrong we want to let the requestor know as well
+        # as the pillar maintainer.
+        result = set()
+        result.add(format_address_for_person(self.requestor))
+        if self.pillar.owner.preferredemail:
+            result.add(format_address_for_person(self.pillar.owner))
+        return list(result)
+
+    def getOperationDescription(self):
+        return ('removing subscriptions for artifacts '
+            'for %s on %s' %
+            (self.grantee.displayname,
+             self.pillar.displayname))
+
+    def run(self):
+        """See `IRemoveSubscriptionsJob`."""
+
+        logger = logging.getLogger()
+        logger.info(self.getOperationDescription())
+
+        # Unsubscribe grantee from the specified bugs.
+        if self.bug_ids:
+            bugs = getUtility(IBugSet).getByNumbers(self.bug_ids)
+            for bug in bugs:
+                removeSecurityProxy(bug).unsubscribe(
+                    self.grantee, self.requestor)
+
+        # Unsubscribe grantee from the specified branches.
+        if self.branch_names:
+            branches = [
+                getUtility(IBranchLookup).getByUniqueName(branch_name)
+                for branch_name in self.branch_names]
+            for branch in branches:
+                branch.unsubscribe(self.grantee, self.requestor)

=== added file 'lib/lp/registry/tests/test_sharingjob.py'
--- lib/lp/registry/tests/test_sharingjob.py	1970-01-01 00:00:00 +0000
+++ lib/lp/registry/tests/test_sharingjob.py	2012-05-01 03:50:25 +0000
@@ -0,0 +1,231 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for SharingJobs."""
+
+__metaclass__ = type
+
+import transaction
+
+from zope.component import getUtility
+from zope.security.proxy import removeSecurityProxy
+
+from lp.code.enums import (
+    BranchSubscriptionNotificationLevel,
+    CodeReviewNotificationLevel,
+    )
+from lp.registry.interfaces.sharingjob import (
+    IRemoveSubscriptionsJobSource,
+    ISharingJob,
+    ISharingJobSource,
+    )
+from lp.registry.model.sharingjob import (
+    RemoveSubscriptionsJob,
+    SharingJob,
+    SharingJobDerived,
+    SharingJobType,
+    )
+from lp.services.features.testing import FeatureFixture
+from lp.services.job.tests import block_on_job
+from lp.services.mail.sendmail import format_address_for_person
+from lp.testing import (
+    person_logged_in,
+    TestCaseWithFactory,
+    )
+from lp.testing.layers import (
+    CeleryJobLayer,
+    DatabaseFunctionalLayer,
+    LaunchpadZopelessLayer,
+    )
+
+
+class SharingJobTestCase(TestCaseWithFactory):
+    """Test case for basic SharingJob class."""
+
+    layer = LaunchpadZopelessLayer
+
+    def test_init(self):
+        pillar = self.factory.makeProduct()
+        grantee = self.factory.makePerson()
+        metadata = ('some', 'arbitrary', 'metadata')
+        sharing_job = SharingJob(
+            SharingJobType.REMOVE_SUBSCRIPTIONS, pillar, grantee, metadata)
+        self.assertEqual(
+            SharingJobType.REMOVE_SUBSCRIPTIONS, sharing_job.job_type)
+        self.assertEqual(pillar, sharing_job.product)
+        self.assertEqual(grantee, sharing_job.grantee)
+        expected_json_data = '["some", "arbitrary", "metadata"]'
+        self.assertEqual(expected_json_data, sharing_job._json_data)
+
+    def test_metadata(self):
+        # The python structure stored as json is returned as python.
+        metadata = {
+            'a_list': ('some', 'arbitrary', 'metadata'),
+            'a_number': 1,
+            'a_string': 'string',
+            }
+        pillar = self.factory.makeProduct()
+        grantee = self.factory.makePerson()
+        sharing_job = SharingJob(
+            SharingJobType.REMOVE_SUBSCRIPTIONS, pillar, grantee, metadata)
+        metadata['a_list'] = list(metadata['a_list'])
+        self.assertEqual(metadata, sharing_job.metadata)
+
+
+class SharingJobDerivedTestCase(TestCaseWithFactory):
+    """Test case for the SharingJobDerived class."""
+
+    layer = DatabaseFunctionalLayer
+
+    def _makeJob(self, prod_name=None, grantee_name=None):
+        pillar = self.factory.makeProduct(name=prod_name)
+        grantee = self.factory.makePerson(name=grantee_name)
+        requestor = self.factory.makePerson()
+        job = getUtility(IRemoveSubscriptionsJobSource).create(
+            pillar, grantee, requestor)
+        return job
+
+    def test_repr(self):
+        job = self._makeJob('prod', 'fred')
+        self.assertEqual(
+            '<REMOVE_SUBSCRIPTIONS job for Fred and Prod>', repr(job))
+
+    def test_create_success(self):
+        # Create an instance of SharingJobDerived that delegates to SharingJob.
+        self.assertIs(True, ISharingJobSource.providedBy(SharingJobDerived))
+        job = self._makeJob()
+        self.assertIsInstance(job, SharingJobDerived)
+        self.assertIs(True, ISharingJob.providedBy(job))
+        self.assertIs(True, ISharingJob.providedBy(job.context))
+
+    def test_create_raises_error(self):
+        # SharingJobDerived.create() raises an error because it
+        # needs to be subclassed to work properly.
+        pillar = self.factory.makeProduct()
+        grantee = self.factory.makePerson()
+        self.assertRaises(
+            AttributeError, SharingJobDerived.create, pillar, grantee, {})
+
+    def test_iterReady(self):
+        # iterReady finds job in the READY status that are of the same type.
+        job_1 = self._makeJob()
+        job_2 = self._makeJob()
+        job_2.start()
+        jobs = list(RemoveSubscriptionsJob.iterReady())
+        self.assertEqual(1, len(jobs))
+        self.assertEqual(job_1, jobs[0])
+
+    def test_log_name(self):
+        # The log_name is the name of the implementing class.
+        job = self._makeJob()
+        self.assertEqual('RemoveSubscriptionsJob', job.log_name)
+
+    def test_getOopsVars(self):
+        # The pillar and grantee name are added to the oops vars.
+        pillar = self.factory.makeDistribution()
+        grantee = self.factory.makePerson()
+        requestor = self.factory.makePerson()
+        job = getUtility(IRemoveSubscriptionsJobSource).create(
+            pillar, grantee, requestor)
+        oops_vars = job.getOopsVars()
+        self.assertIs(True, len(oops_vars) > 4)
+        self.assertIn(('distro', pillar.name), oops_vars)
+        self.assertIn(('grantee', grantee.name), oops_vars)
+
+    def test_getErrorRecipients(self):
+        # The pillar owner and job requestor are the error recipients.
+        pillar = self.factory.makeDistribution()
+        grantee = self.factory.makePerson()
+        requestor = self.factory.makePerson()
+        job = getUtility(IRemoveSubscriptionsJobSource).create(
+            pillar, grantee, requestor)
+        expected_emails = [
+            format_address_for_person(person)
+            for person in (pillar.owner, requestor)]
+        self.assertContentEqual(
+            expected_emails, job.getErrorRecipients())
+
+
+class RemoveSubscriptionsJobTestCase(TestCaseWithFactory):
+    """Test case for the RemoveSubscriptionsJob class."""
+
+    layer = CeleryJobLayer
+
+    def setUp(self):
+        self.useFixture(FeatureFixture({
+            'jobs.celery.enabled_classes': 'RemoveSubscriptionsJob',
+        }))
+        super(RemoveSubscriptionsJobTestCase, self).setUp()
+
+    def test_create(self):
+        # Create an instance of RemoveSubscriptionsJob that stores
+        # the notification information.
+        self.assertIs(
+            True,
+            IRemoveSubscriptionsJobSource.providedBy(RemoveSubscriptionsJob))
+        self.assertEqual(
+            SharingJobType.REMOVE_SUBSCRIPTIONS,
+            RemoveSubscriptionsJob.class_job_type)
+        pillar = self.factory.makeProduct()
+        grantee = self.factory.makePerson()
+        requestor = self.factory.makePerson()
+        bug = self.factory.makeBug(product=pillar)
+        branch = self.factory.makeBranch(product=pillar)
+        job = getUtility(IRemoveSubscriptionsJobSource).create(
+            pillar, grantee, requestor, [bug], [branch])
+        naked_job = removeSecurityProxy(job)
+        self.assertIsInstance(job, RemoveSubscriptionsJob)
+        self.assertEqual(pillar, job.pillar)
+        self.assertEqual(grantee, job.grantee)
+        self.assertEqual(requestor.id, naked_job.requestor_id)
+        self.assertContentEqual([bug.id], naked_job.bug_ids)
+        self.assertContentEqual([branch.unique_name], naked_job.branch_names)
+
+    def _run_job(self, job):
+        job.run()
+
+    def _run_job_celery(self, job):
+        with block_on_job(self):
+            transaction.commit()
+
+    def _assert_unsubscribe_bugs(self, run_job_callback):
+        # The requested bug subscriptions are removed.
+        pillar = self.factory.makeDistribution()
+        grantee = self.factory.makePerson()
+        owner = self.factory.makePerson()
+        bug = self.factory.makeBug(owner=owner, distribution=pillar)
+        with person_logged_in(owner):
+            bug.subscribe(grantee, owner)
+        self.assertContentEqual([owner, grantee], bug.getDirectSubscribers())
+        job = getUtility(IRemoveSubscriptionsJobSource).create(
+            pillar, grantee, owner, [bug])
+        run_job_callback(job)
+        self.assertContentEqual([owner], bug.getDirectSubscribers())
+
+    def test_unsubscribe_bugs(self):
+        self._assert_unsubscribe_bugs(self._run_job)
+
+    def test_unsubscribe_bugs_celery(self):
+        self._assert_unsubscribe_bugs(self._run_job_celery)
+
+    def _assert_unsubscribe_branches(self, run_job_callback):
+        # The requested branch subscriptions are removed.
+        pillar = self.factory.makeProduct()
+        grantee = self.factory.makePerson()
+        owner = self.factory.makePerson()
+        branch = self.factory.makeBranch(owner=owner, product=pillar)
+        with person_logged_in(owner):
+            branch.subscribe(grantee,
+                BranchSubscriptionNotificationLevel.NOEMAIL, None,
+                CodeReviewNotificationLevel.NOEMAIL, owner)
+        self.assertContentEqual([owner, grantee], list(branch.subscribers))
+        job = getUtility(IRemoveSubscriptionsJobSource).create(
+            pillar, grantee, owner, branches=[branch])
+        run_job_callback(job)
+        self.assertContentEqual([owner], list(branch.subscribers))
+
+    def test_unsubscribe_branches(self):
+        self._assert_unsubscribe_branches(self._run_job)
+
+    def test_unsubscribe_branches_celery(self):
+        self._assert_unsubscribe_branches(self._run_job_celery)

=== modified file 'lib/lp/services/config/schema-lazr.conf'
--- lib/lp/services/config/schema-lazr.conf	2012-04-27 16:22:05 +0000
+++ lib/lp/services/config/schema-lazr.conf	2012-05-01 03:50:25 +0000
@@ -1601,6 +1601,17 @@
 # datatype: string
 virtual_host: none
 
+[sharing_jobs]
+# The database user which will be used by this process.
+# datatype: string
+dbuser: sharing-jobs
+
+# See [error_reports].
+error_dir: none
+
+# See [error_reports].
+oops_prefix: none
+
 [txlongpoll]
 # Should TxLongPoll be launched by default?
 # datatype: boolean


Follow ups