← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~edwin-grubbs/launchpad/bug-615654-jobqueue-cron-script into lp:launchpad

 

Edwin Grubbs has proposed merging lp:~edwin-grubbs/launchpad/bug-615654-jobqueue-cron-script into lp:launchpad with lp:~edwin-grubbs/launchpad/bug-615654-queue-addmember-emails as a prerequisite.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  #615654 +editproposedmembers times out when working with many candidates
  https://bugs.launchpad.net/bugs/615654


Summary
-------

This branch adds a cron script to process the
IMembershipNotificationJobs. The cronscript has been abstracted so that
new job sources can be processed by just adding configs to
schema-lazr.conf, as long as it uses a crontab_group that already has a
crontab entry configured in production.


Implementation details
----------------------

New db users and configs for cronjobs:
    database/schema/security.cfg
    lib/canonical/config/schema-lazr.conf

Cronscripts and tests.
    cronscripts/process-job-source-groups.py
    cronscripts/process-job-source.py
    lib/lp/registry/tests/test_membership_notification_job_creation.py
    lib/lp/registry/tests/test_process_job_sources_cronjob.py

Improved error message.
    lib/canonical/launchpad/webapp/errorlog.py

Added get() to IJobSource definition, since TwistedJobRunner needs it.
Moved the get() method into the class that actually provides IJobSource.
    lib/lp/services/job/interfaces/job.py
    lib/lp/registry/model/persontransferjob.py

Cruft from previous branch.
    lib/lp/registry/model/teammembership.py

Eliminate the need for TwistedJobRunner cron scripts to be run with
bin/py by importing _pythonpath in ampoule child processes like normal
cron scripts. Eliminated the need to specify the error_dir, oops_prefix,
and copy_to_zlog when the defaults in [error_reports] are fine.
    lib/lp/services/job/runner.py

Tests
-----

./bin/test -vv -t 'test_membership_notification_job_creation|test_process_job_sources_cronjob'

Demo and Q/A
------------

* Add a member to a team.
  * Change the member status to ADMIN.
  * Run ./cronscripts/process-job-source-groups.py -v MAIN
  * Check email.
-- 
https://code.launchpad.net/~edwin-grubbs/launchpad/bug-615654-jobqueue-cron-script/+merge/36910
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~edwin-grubbs/launchpad/bug-615654-jobqueue-cron-script into lp:launchpad.
=== added file 'cronscripts/process-job-source-groups.py'
--- cronscripts/process-job-source-groups.py	1970-01-01 00:00:00 +0000
+++ cronscripts/process-job-source-groups.py	2010-09-28 18:57:55 +0000
@@ -0,0 +1,106 @@
+#!/usr/bin/python -S
+#
+# Copyright 2009, 2010 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Handle jobs for multiple job source classes."""
+
+__metaclass__ = type
+
+from optparse import IndentedHelpFormatter
+import os
+import subprocess
+import sys
+import textwrap
+
+import _pythonpath
+
+from canonical.config import config
+from lp.services.propertycache import cachedproperty
+from lp.services.scripts.base import LaunchpadCronScript
+
+
+class LongEpilogHelpFormatter(IndentedHelpFormatter):
+    """Preserve newlines in epilog."""
+
+    def format_epilog(self, epilog):
+        if epilog:
+            return '\n%s\n' % epilog
+        else:
+            return ""
+
+
+class ProcessJobSourceGroups(LaunchpadCronScript):
+    """Handle each job source in a separate process with ProcessJobSource."""
+
+    def add_my_options(self):
+        self.parser.usage = "%prog [ -e JOB_SOURCE ] GROUP [GROUP]..."
+        self.parser.epilog = (
+            textwrap.fill(
+            "At least one group must be specified. Excluding job sources "
+            "is useful when you want to run all the other job sources in "
+            "a group.")
+            + "\n\n" + self.group_help)
+
+        self.parser.formatter = LongEpilogHelpFormatter()
+        self.parser.add_option(
+            '-e', '--exclude', dest='excluded_job_sources',
+            metavar="JOB_SOURCE", default=[], action='append',
+            help="Exclude specific job sources.")
+
+    def main(self):
+        selected_groups = self.args
+        if len(selected_groups) == 0:
+            self.parser.print_help()
+            sys.exit(1)
+
+        selected_job_sources = set()
+        # Include job sources from selected groups.
+        for group in selected_groups:
+            selected_job_sources.update(self.grouped_sources[group])
+        # Then, exclude job sources.
+        for source in self.options.excluded_job_sources:
+            if source not in selected_job_sources:
+                self.logger.warn('%r is not in job source groups %s'
+                                 % (source, self.options.groups))
+            else:
+                selected_job_sources.remove(source)
+        # Process job sources.
+        command = os.path.join(
+            os.path.dirname(sys.argv[0]), 'process-job-source.py')
+        child_args = [command]
+        if self.options.verbose:
+            child_args.append('-v')
+        children = []
+        for job_source in selected_job_sources:
+            child = subprocess.Popen(child_args + [job_source])
+            children.append(child)
+        for child in children:
+            child.wait()
+
+    @cachedproperty
+    def all_job_sources(self):
+        job_sources = config['process-job-source-groups'].job_sources
+        return [job_source.strip() for job_source in job_sources.split(',')]
+
+    @cachedproperty
+    def grouped_sources(self):
+        groups = {}
+        for source in self.all_job_sources:
+            if source not in config:
+                continue
+            section = config[source]
+            group = groups.setdefault(section.crontab_group, [])
+            group.append(source)
+        return groups
+
+    @cachedproperty
+    def group_help(self):
+        return '\n\n'.join(
+            'Group: %s\n    %s' % (group, '\n    '.join(sources))
+            for group, sources in sorted(self.grouped_sources.items()))
+
+
+if __name__ == '__main__':
+    script = ProcessJobSourceGroups()
+    script.lock_and_run()

=== added file 'cronscripts/process-job-source.py'
--- cronscripts/process-job-source.py	1970-01-01 00:00:00 +0000
+++ cronscripts/process-job-source.py	2010-09-28 18:57:55 +0000
@@ -0,0 +1,66 @@
+#!/usr/bin/python -S
+#
+# Copyright 2009, 2010 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Handle jobs for a specified job source class."""
+
+__metaclass__ = type
+
+from optparse import OptionParser
+import sys
+
+import _pythonpath
+from twisted.python import log
+
+from canonical.config import config
+from lp.services.job import runner
+from lp.services.job.runner import (
+    JobCronScript,
+    JobRunner,
+    )
+
+
+class ProcessJobSource(JobCronScript):
+    """Run jobs."""
+
+    def __init__(self, job_source):
+        # The dbuser is grabbed from the section matching config_name.
+        self.config_name = job_source
+        section = getattr(config, self.config_name)
+        # The fromlist argument is necessary so that __import__()
+        # returns the bottom submodule instead of the top one.
+        module = __import__(section.module, fromlist=[job_source])
+        self.source_interface = getattr(module, job_source)
+        if getattr(section, 'runner_class', None) is None:
+            runner_class = JobRunner
+        else:
+            runner_class = getattr(runner, section.runner_class, JobRunner)
+        super(ProcessJobSource, self).__init__(
+            runner_class=runner_class,
+            script_name='process-job-source-%s' % job_source)
+
+    def main(self):
+        if self.options.verbose:
+            log.startLogging(sys.stdout)
+        super(ProcessJobSource, self).main()
+
+
+class NoErrorOptionParser(OptionParser):
+    """Allow any options that will be later handled by ProcessJobSource."""
+
+    def error(self, *args, **kw):
+        pass
+
+
+if __name__ == '__main__':
+    options, args = NoErrorOptionParser().parse_args()
+    if len(args) != 1:
+        print "Usage: %s [options] JOB_SOURCE" % sys.argv[0]
+        print
+        print "For help, run:"
+        print "    cronscripts/process-job-source-groups.py --help"
+        sys.exit()
+    job_source = args[0]
+    script = ProcessJobSource(job_source)
+    script.lock_and_run()

=== modified file 'database/schema/security.cfg'
--- database/schema/security.cfg	2010-09-28 18:57:50 +0000
+++ database/schema/security.cfg	2010-09-28 18:57:55 +0000
@@ -1872,6 +1872,20 @@
 public.product                          = SELECT, UPDATE
 public.bugtracker                       = SELECT
 
+[process-job-source-groups]
+# Does not need access to tables.
+type=user
+groups=script
+
+[person-transfer-job]
+type=user
+groups=script
+public.emailaddress                     = SELECT
+public.job                              = SELECT, UPDATE
+public.person                           = SELECT
+public.persontransferjob                = SELECT
+public.teammembership                   = SELECT
+
 [weblogstats]
 # For the script that parses our Apache/Squid logfiles and updates statistics
 type=user

=== modified file 'lib/canonical/config/schema-lazr.conf'
--- lib/canonical/config/schema-lazr.conf	2010-09-24 22:30:48 +0000
+++ lib/canonical/config/schema-lazr.conf	2010-09-28 18:57:55 +0000
@@ -2046,3 +2046,17 @@
 
 # datatype: boolean
 send_email: true
+
+[process-job-source-groups]
+# This section is used by cronscripts/process-job-source-groups.py.
+dbuser: process-job-source-groups
+# Each job source class also needs its own config section to specify the
+# dbuser, the crontab_group, and the module that the job source class
+# can be loaded from.
+job_sources: IMembershipNotificationJobSource
+
+[IMembershipNotificationJobSource]
+# This section is used by cronscripts/process-job-source.py.
+module: lp.registry.interfaces.persontransferjob
+dbuser: person-transfer-job
+crontab_group: MAIN

=== modified file 'lib/canonical/launchpad/webapp/errorlog.py'
--- lib/canonical/launchpad/webapp/errorlog.py	2010-09-22 09:57:56 +0000
+++ lib/canonical/launchpad/webapp/errorlog.py	2010-09-28 18:57:55 +0000
@@ -43,6 +43,7 @@
 from lp.services.log.uniquefileallocator import UniqueFileAllocator
 from lp.services.timeline.requesttimeline import get_request_timeline
 
+
 UTC = pytz.utc
 
 LAZR_OOPS_USER_REQUESTED_KEY = 'lazr.oops.user_requested'
@@ -272,6 +273,10 @@
             section_name = self._default_config_section
         self.copy_to_zlog = config[section_name].copy_to_zlog
         # Start a new UniqueFileAllocator to activate the new configuration.
+        if config[section_name].error_dir is None:
+            raise ValueError("[%s] error_dir config is None" % section_name)
+        if config[section_name].oops_prefix is None:
+            raise ValueError("[%s] oops_prefix config is None" % section_name)
         self.log_namer = UniqueFileAllocator(
             output_root=config[section_name].error_dir,
             log_type="OOPS",

=== modified file 'lib/lp/registry/model/persontransferjob.py'
--- lib/lp/registry/model/persontransferjob.py	2010-09-28 18:57:50 +0000
+++ lib/lp/registry/model/persontransferjob.py	2010-09-28 18:57:55 +0000
@@ -104,16 +104,6 @@
         # but the DB representation is unicode.
         self._json_data = json_data.decode('utf-8')
 
-    @classmethod
-    def get(cls, key):
-        """Return the instance of this class whose key is supplied."""
-        store = IMasterStore(PersonTransferJob)
-        instance = store.get(cls, key)
-        if instance is None:
-            raise SQLObjectNotFound(
-                'No occurrence of %s has key %s' % (cls.__name__, key))
-        return instance
-
 
 class PersonTransferJobDerived(BaseRunnableJob):
     """Intermediate class for deriving from PersonTransferJob.
@@ -177,6 +167,16 @@
             ])
         return vars
 
+    @classmethod
+    def get(cls, id):
+        """Return the instance of this class whose id is supplied."""
+        store = IMasterStore(PersonTransferJob)
+        instance = store.get(PersonTransferJob, id)
+        if instance is None:
+            raise SQLObjectNotFound(
+                'No occurrence of PersonTransferJob with id %s' % id)
+        return cls(instance)
+
 
 class MembershipNotificationJob(PersonTransferJobDerived):
     """A Job that sends notifications about team membership changes."""

=== modified file 'lib/lp/registry/model/teammembership.py'
--- lib/lp/registry/model/teammembership.py	2010-09-28 18:57:50 +0000
+++ lib/lp/registry/model/teammembership.py	2010-09-28 18:57:55 +0000
@@ -5,7 +5,6 @@
 
 __metaclass__ = type
 __all__ = [
-    'sendStatusChangeNotification',
     'TeamMembership',
     'TeamMembershipSet',
     'TeamParticipation',
@@ -480,7 +479,8 @@
             from lp.registry.model.person import Person
             conditions.append(TeamMembership.team == Person.id)
             conditions.append(
-                Person.renewal_policy != TeamMembershipRenewalPolicy.AUTOMATIC)
+                Person.renewal_policy
+                != TeamMembershipRenewalPolicy.AUTOMATIC)
         return IStore(TeamMembership).find(TeamMembership, *conditions)
 
 

=== added file 'lib/lp/registry/tests/test_membership_notification_job_creation.py'
--- lib/lp/registry/tests/test_membership_notification_job_creation.py	1970-01-01 00:00:00 +0000
+++ lib/lp/registry/tests/test_membership_notification_job_creation.py	2010-09-28 18:57:55 +0000
@@ -0,0 +1,60 @@
+# Copyright 2010 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Test that MembershipNotificationJobs are created appropriately."""
+
+__metaclass__ = type
+
+import transaction
+from zope.component import getUtility
+
+from canonical.testing import LaunchpadFunctionalLayer
+from lp.registry.interfaces.person import IPersonSet
+from lp.registry.interfaces.persontransferjob import (
+    IMembershipNotificationJobSource,
+    )
+from lp.registry.interfaces.teammembership import (
+    ITeamMembershipSet,
+    TeamMembershipStatus,
+    )
+from lp.testing import (
+    login,
+    login_person,
+    TestCaseWithFactory,
+    )
+
+
+class CreateMembershipNotificationJobTest(TestCaseWithFactory):
+    """Test that MembershipNotificationJobs are created appropriately."""
+    layer = LaunchpadFunctionalLayer
+
+    def setUp(self):
+        super(CreateMembershipNotificationJobTest, self).setUp()
+        self.person = self.factory.makePerson(name='murdock')
+        self.team = self.factory.makeTeam(name='a-team')
+        self.job_source = getUtility(IMembershipNotificationJobSource)
+
+    def test_setstatus_admin(self):
+        login_person(self.team.teamowner)
+        self.team.addMember(self.person, self.team.teamowner)
+        membership_set = getUtility(ITeamMembershipSet)
+        tm = membership_set.getByPersonAndTeam(self.person, self.team)
+        tm.setStatus(TeamMembershipStatus.ADMIN, self.team.teamowner)
+        transaction.commit()
+        jobs = list(self.job_source.iterReady())
+        self.assertEqual(
+            ('[<MEMBERSHIP_NOTIFICATION branch job (1) '
+             'for murdock as part of a-team. status=Waiting>]'),
+            str(jobs))
+
+    def test_setstatus_silent(self):
+        login('admin@xxxxxxxxxxxxx')
+        person_set = getUtility(IPersonSet)
+        admin = person_set.getByEmail('admin@xxxxxxxxxxxxx')
+        self.team.addMember(self.person, self.team.teamowner)
+        membership_set = getUtility(ITeamMembershipSet)
+        tm = membership_set.getByPersonAndTeam(self.person, self.team)
+        tm.setStatus(
+            TeamMembershipStatus.ADMIN, admin, silent=True)
+        transaction.commit()
+        self.assertEqual([], list(self.job_source.iterReady()))

=== added file 'lib/lp/registry/tests/test_process_job_sources_cronjob.py'
--- lib/lp/registry/tests/test_process_job_sources_cronjob.py	1970-01-01 00:00:00 +0000
+++ lib/lp/registry/tests/test_process_job_sources_cronjob.py	2010-09-28 18:57:55 +0000
@@ -0,0 +1,121 @@
+# Copyright 2010 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Test cron script for processing jobs from any job source class."""
+
+__metaclass__ = type
+
+import os
+import subprocess
+
+import transaction
+from zope.component import getUtility
+
+from canonical.config import config
+from canonical.testing import LaunchpadFunctionalLayer
+from lp.registry.interfaces.teammembership import (
+    ITeamMembershipSet,
+    TeamMembershipStatus,
+    )
+from lp.testing import (
+    login_person,
+    TestCaseWithFactory,
+    )
+
+
+class ProcessJobSourceTest(TestCaseWithFactory):
+    """Test the process-job-source.py script."""
+    layer = LaunchpadFunctionalLayer
+
+    def _run(self, *args):
+        script = os.path.join(
+            config.root, 'cronscripts', 'process-job-source.py')
+        cmd = [script] + list(args)
+        process = subprocess.Popen(
+            cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+            stderr=subprocess.STDOUT)
+        (stdout, empty_stderr) = process.communicate()
+        return stdout
+
+    def test_missing_argument(self):
+        output = self._run()
+        self.assertIn('Usage:', output)
+        self.assertIn('process-job-source.py [options] JOB_SOURCE', output)
+
+    def test_empty_queue(self):
+        output = self._run('IMembershipNotificationJobSource')
+        expected = (
+            'INFO    Creating lockfile: /var/lock/launchpad-process-job-'
+            'source-IMembershipNotificationJobSource.lock\n'
+            'INFO    Running synchronously.\n')
+        self.assertEqual(expected, output)
+
+    def test_processed(self):
+        person = self.factory.makePerson(name='murdock')
+        team = self.factory.makeTeam(name='a-team')
+        login_person(team.teamowner)
+        team.addMember(person, team.teamowner)
+        membership_set = getUtility(ITeamMembershipSet)
+        tm = membership_set.getByPersonAndTeam(person, team)
+        tm.setStatus(TeamMembershipStatus.ADMIN, team.teamowner)
+        transaction.commit()
+        output = self._run('-v', 'IMembershipNotificationJobSource')
+        self.assertIn(
+            ('DEBUG   Running <MEMBERSHIP_NOTIFICATION branch job (1) '
+             'for murdock as part of a-team. status=Waiting>'),
+            output)
+        self.assertIn('DEBUG   MembershipNotificationJob sent email', output)
+        self.assertIn('Ran 1 MembershipNotificationJob jobs.', output)
+
+
+class ProcessJobSourceGroupsTest(TestCaseWithFactory):
+    """Test the process-job-source-groups.py script."""
+    layer = LaunchpadFunctionalLayer
+
+    def _run(self, *args):
+        script = os.path.join(
+            config.root, 'cronscripts', 'process-job-source-groups.py')
+        cmd = [script] + list(args)
+        process = subprocess.Popen(
+            cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+            stderr=subprocess.STDOUT)
+        (stdout, empty_stderr) = process.communicate()
+        return stdout
+
+    def test_missing_argument(self):
+        output = self._run()
+        self.assertIn(
+            ('Usage: process-job-source-groups.py '
+             '[ -e JOB_SOURCE ] GROUP [GROUP]...'),
+            output)
+        self.assertIn('-e JOB_SOURCE, --exclude=JOB_SOURCE', output)
+        self.assertIn('At least one group must be specified.', output)
+        self.assertIn('Group: MAIN\n    IMembershipNotificationJobSource',
+                      output)
+
+    def test_empty_queue(self):
+        output = self._run('MAIN')
+        expected = (
+            'INFO    Creating lockfile: /var/lock/launchpad-'
+            'processjobsourcegroups.lock\n'
+            'INFO    Creating lockfile: /var/lock/launchpad-process-job-'
+            'source-IMembershipNotificationJobSource.lock\n'
+            'INFO    Running synchronously.\n')
+        self.assertEqual(expected, output)
+
+    def test_processed(self):
+        person = self.factory.makePerson(name='murdock')
+        team = self.factory.makeTeam(name='a-team')
+        login_person(team.teamowner)
+        team.addMember(person, team.teamowner)
+        membership_set = getUtility(ITeamMembershipSet)
+        tm = membership_set.getByPersonAndTeam(person, team)
+        tm.setStatus(TeamMembershipStatus.ADMIN, team.teamowner)
+        transaction.commit()
+        output = self._run('-v', 'MAIN')
+        self.assertIn(
+            ('DEBUG   Running <MEMBERSHIP_NOTIFICATION branch job (1) '
+             'for murdock as part of a-team. status=Waiting>'),
+            output)
+        self.assertIn('DEBUG   MembershipNotificationJob sent email', output)
+        self.assertIn('Ran 1 MembershipNotificationJob jobs.', output)

=== modified file 'lib/lp/services/job/interfaces/job.py'
--- lib/lp/services/job/interfaces/job.py	2010-08-20 20:31:18 +0000
+++ lib/lp/services/job/interfaces/job.py	2010-09-28 18:57:55 +0000
@@ -166,3 +166,9 @@
 
     def contextManager():
         """Get a context for running this kind of job in."""
+
+    def get(id):
+        """Get a job by its id.
+
+        This is currently only needed by the TwistedJobRunner.
+        """

=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2010-09-20 09:48:58 +0000
+++ lib/lp/services/job/runner.py	2010-09-28 18:57:55 +0000
@@ -77,7 +77,6 @@
 
     # We redefine __eq__ and __ne__ here to prevent the security proxy
     # from mucking up our comparisons in tests and elsewhere.
-
     def __eq__(self, job):
         return (
             self.__class__ is removeSecurityProxy(job.__class__)
@@ -337,13 +336,14 @@
     """Run Jobs via twisted."""
 
     def __init__(self, job_source, dbuser, logger=None, error_utility=None):
-        env = {'PYTHONPATH': os.environ['PYTHONPATH'],
-               'PATH': os.environ['PATH']}
+        env = {'PATH': os.environ['PATH']}
+        if 'PYTHONPATH' in os.environ:
+            env['PYTHONPATH'] = os.environ['PYTHONPATH']
         lp_config = os.environ.get('LPCONFIG')
         if lp_config is not None:
             env['LPCONFIG'] = lp_config
         starter = main.ProcessStarter(
-            packages=('twisted', 'ampoule'), env=env)
+            packages=('_pythonpath', 'twisted', 'ampoule'), env=env)
         super(TwistedJobRunner, self).__init__(logger, error_utility)
         self.job_source = job_source
         import_name = '%s.%s' % (
@@ -367,7 +367,8 @@
         transaction.commit()
         job_id = job.id
         deadline = timegm(job.lease_expires.timetuple())
-        self.logger.debug('Running %r, lease expires %s', job, job.lease_expires)
+        self.logger.debug(
+            'Running %r, lease expires %s', job, job.lease_expires)
         deferred = self.pool.doWork(
             RunJobCommand, job_id = job_id, _deadline=deadline)
         def update(response):
@@ -459,7 +460,13 @@
         return sorted(counts.items())
 
     def main(self):
-        errorlog.globalErrorUtility.configure(self.config_name)
+        section = config[self.config_name]
+        if (getattr(section, 'error_dir', None) is not None
+            and getattr(section, 'oops_prefix', None) is not None
+            and getattr(section, 'copy_to_zlog', None) is not None):
+            # If the three variables are not set, we will let the error
+            # utility default to using the [error_reports] config.
+            errorlog.globalErrorUtility.configure(self.config_name)
         job_source = getUtility(self.source_interface)
         runner = self.runner_class.runFromSource(
             job_source, self.dbuser, self.logger)