launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #01278
[Merge] lp:~edwin-grubbs/launchpad/bug-615654-jobqueue-cron-script into lp:launchpad
Edwin Grubbs has proposed merging lp:~edwin-grubbs/launchpad/bug-615654-jobqueue-cron-script into lp:launchpad with lp:~edwin-grubbs/launchpad/bug-615654-queue-addmember-emails as a prerequisite.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
#615654 +editproposedmembers times out when working with many candidates
https://bugs.launchpad.net/bugs/615654
Summary
-------
This branch adds a cron script to process the
IMembershipNotificationJobs. The cronscript has been abstracted so that
new job sources can be processed by just adding configs to
schema-lazr.conf, as long as it uses a crontab_group that already has a
crontab entry configured in production.
Implementation details
----------------------
New db users and configs for cronjobs:
database/schema/security.cfg
lib/canonical/config/schema-lazr.conf
Cronscripts and tests.
cronscripts/process-job-source-groups.py
cronscripts/process-job-source.py
lib/lp/registry/tests/test_membership_notification_job_creation.py
lib/lp/registry/tests/test_process_job_sources_cronjob.py
Improved error message.
lib/canonical/launchpad/webapp/errorlog.py
Added get() to IJobSource definition, since TwistedJobRunner needs it.
Moved the get() method into the class that actually provides IJobSource.
lib/lp/services/job/interfaces/job.py
lib/lp/registry/model/persontransferjob.py
Cruft from previous branch.
lib/lp/registry/model/teammembership.py
Eliminate the need for TwistedJobRunner cron scripts to be run with
bin/py by importing _pythonpath in ampoule child processes like normal
cron scripts. Eliminated the need to specify the error_dir, oops_prefix,
and copy_to_zlog when the defaults in [error_reports] are fine.
lib/lp/services/job/runner.py
Tests
-----
./bin/test -vv -t 'test_membership_notification_job_creation|test_process_job_sources_cronjob'
Demo and Q/A
------------
* Add a member to a team.
* Change the member status to ADMIN.
* Run ./cronscripts/process-job-source-groups.py -v MAIN
* Check email.
--
https://code.launchpad.net/~edwin-grubbs/launchpad/bug-615654-jobqueue-cron-script/+merge/36910
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~edwin-grubbs/launchpad/bug-615654-jobqueue-cron-script into lp:launchpad.
=== added file 'cronscripts/process-job-source-groups.py'
--- cronscripts/process-job-source-groups.py 1970-01-01 00:00:00 +0000
+++ cronscripts/process-job-source-groups.py 2010-09-28 18:57:55 +0000
@@ -0,0 +1,106 @@
+#!/usr/bin/python -S
+#
+# Copyright 2009, 2010 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Handle jobs for multiple job source classes."""
+
+__metaclass__ = type
+
+from optparse import IndentedHelpFormatter
+import os
+import subprocess
+import sys
+import textwrap
+
+import _pythonpath
+
+from canonical.config import config
+from lp.services.propertycache import cachedproperty
+from lp.services.scripts.base import LaunchpadCronScript
+
+
+class LongEpilogHelpFormatter(IndentedHelpFormatter):
+ """Preserve newlines in epilog."""
+
+ def format_epilog(self, epilog):
+ if epilog:
+ return '\n%s\n' % epilog
+ else:
+ return ""
+
+
+class ProcessJobSourceGroups(LaunchpadCronScript):
+ """Handle each job source in a separate process with ProcessJobSource."""
+
+ def add_my_options(self):
+ self.parser.usage = "%prog [ -e JOB_SOURCE ] GROUP [GROUP]..."
+ self.parser.epilog = (
+ textwrap.fill(
+ "At least one group must be specified. Excluding job sources "
+ "is useful when you want to run all the other job sources in "
+ "a group.")
+ + "\n\n" + self.group_help)
+
+ self.parser.formatter = LongEpilogHelpFormatter()
+ self.parser.add_option(
+ '-e', '--exclude', dest='excluded_job_sources',
+ metavar="JOB_SOURCE", default=[], action='append',
+ help="Exclude specific job sources.")
+
+ def main(self):
+ selected_groups = self.args
+ if len(selected_groups) == 0:
+ self.parser.print_help()
+ sys.exit(1)
+
+ selected_job_sources = set()
+ # Include job sources from selected groups.
+ for group in selected_groups:
+ selected_job_sources.update(self.grouped_sources[group])
+ # Then, exclude job sources.
+ for source in self.options.excluded_job_sources:
+ if source not in selected_job_sources:
+ self.logger.warn('%r is not in job source groups %s'
+ % (source, self.options.groups))
+ else:
+ selected_job_sources.remove(source)
+ # Process job sources.
+ command = os.path.join(
+ os.path.dirname(sys.argv[0]), 'process-job-source.py')
+ child_args = [command]
+ if self.options.verbose:
+ child_args.append('-v')
+ children = []
+ for job_source in selected_job_sources:
+ child = subprocess.Popen(child_args + [job_source])
+ children.append(child)
+ for child in children:
+ child.wait()
+
+ @cachedproperty
+ def all_job_sources(self):
+ job_sources = config['process-job-source-groups'].job_sources
+ return [job_source.strip() for job_source in job_sources.split(',')]
+
+ @cachedproperty
+ def grouped_sources(self):
+ groups = {}
+ for source in self.all_job_sources:
+ if source not in config:
+ continue
+ section = config[source]
+ group = groups.setdefault(section.crontab_group, [])
+ group.append(source)
+ return groups
+
+ @cachedproperty
+ def group_help(self):
+ return '\n\n'.join(
+ 'Group: %s\n %s' % (group, '\n '.join(sources))
+ for group, sources in sorted(self.grouped_sources.items()))
+
+
+if __name__ == '__main__':
+ script = ProcessJobSourceGroups()
+ script.lock_and_run()
=== added file 'cronscripts/process-job-source.py'
--- cronscripts/process-job-source.py 1970-01-01 00:00:00 +0000
+++ cronscripts/process-job-source.py 2010-09-28 18:57:55 +0000
@@ -0,0 +1,66 @@
+#!/usr/bin/python -S
+#
+# Copyright 2009, 2010 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Handle jobs for a specified job source class."""
+
+__metaclass__ = type
+
+from optparse import OptionParser
+import sys
+
+import _pythonpath
+from twisted.python import log
+
+from canonical.config import config
+from lp.services.job import runner
+from lp.services.job.runner import (
+ JobCronScript,
+ JobRunner,
+ )
+
+
+class ProcessJobSource(JobCronScript):
+ """Run jobs."""
+
+ def __init__(self, job_source):
+ # The dbuser is grabbed from the section matching config_name.
+ self.config_name = job_source
+ section = getattr(config, self.config_name)
+ # The fromlist argument is necessary so that __import__()
+ # returns the bottom submodule instead of the top one.
+ module = __import__(section.module, fromlist=[job_source])
+ self.source_interface = getattr(module, job_source)
+ if getattr(section, 'runner_class', None) is None:
+ runner_class = JobRunner
+ else:
+ runner_class = getattr(runner, section.runner_class, JobRunner)
+ super(ProcessJobSource, self).__init__(
+ runner_class=runner_class,
+ script_name='process-job-source-%s' % job_source)
+
+ def main(self):
+ if self.options.verbose:
+ log.startLogging(sys.stdout)
+ super(ProcessJobSource, self).main()
+
+
+class NoErrorOptionParser(OptionParser):
+ """Allow any options that will be later handled by ProcessJobSource."""
+
+ def error(self, *args, **kw):
+ pass
+
+
+if __name__ == '__main__':
+ options, args = NoErrorOptionParser().parse_args()
+ if len(args) != 1:
+ print "Usage: %s [options] JOB_SOURCE" % sys.argv[0]
+ print
+ print "For help, run:"
+ print " cronscripts/process-job-source-groups.py --help"
+ sys.exit()
+ job_source = args[0]
+ script = ProcessJobSource(job_source)
+ script.lock_and_run()
=== modified file 'database/schema/security.cfg'
--- database/schema/security.cfg 2010-09-28 18:57:50 +0000
+++ database/schema/security.cfg 2010-09-28 18:57:55 +0000
@@ -1872,6 +1872,20 @@
public.product = SELECT, UPDATE
public.bugtracker = SELECT
+[process-job-source-groups]
+# Does not need access to tables.
+type=user
+groups=script
+
+[person-transfer-job]
+type=user
+groups=script
+public.emailaddress = SELECT
+public.job = SELECT, UPDATE
+public.person = SELECT
+public.persontransferjob = SELECT
+public.teammembership = SELECT
+
[weblogstats]
# For the script that parses our Apache/Squid logfiles and updates statistics
type=user
=== modified file 'lib/canonical/config/schema-lazr.conf'
--- lib/canonical/config/schema-lazr.conf 2010-09-24 22:30:48 +0000
+++ lib/canonical/config/schema-lazr.conf 2010-09-28 18:57:55 +0000
@@ -2046,3 +2046,17 @@
# datatype: boolean
send_email: true
+
+[process-job-source-groups]
+# This section is used by cronscripts/process-job-source-groups.py.
+dbuser: process-job-source-groups
+# Each job source class also needs its own config section to specify the
+# dbuser, the crontab_group, and the module that the job source class
+# can be loaded from.
+job_sources: IMembershipNotificationJobSource
+
+[IMembershipNotificationJobSource]
+# This section is used by cronscripts/process-job-source.py.
+module: lp.registry.interfaces.persontransferjob
+dbuser: person-transfer-job
+crontab_group: MAIN
=== modified file 'lib/canonical/launchpad/webapp/errorlog.py'
--- lib/canonical/launchpad/webapp/errorlog.py 2010-09-22 09:57:56 +0000
+++ lib/canonical/launchpad/webapp/errorlog.py 2010-09-28 18:57:55 +0000
@@ -43,6 +43,7 @@
from lp.services.log.uniquefileallocator import UniqueFileAllocator
from lp.services.timeline.requesttimeline import get_request_timeline
+
UTC = pytz.utc
LAZR_OOPS_USER_REQUESTED_KEY = 'lazr.oops.user_requested'
@@ -272,6 +273,10 @@
section_name = self._default_config_section
self.copy_to_zlog = config[section_name].copy_to_zlog
# Start a new UniqueFileAllocator to activate the new configuration.
+ if config[section_name].error_dir is None:
+ raise ValueError("[%s] error_dir config is None" % section_name)
+ if config[section_name].oops_prefix is None:
+ raise ValueError("[%s] oops_prefix config is None" % section_name)
self.log_namer = UniqueFileAllocator(
output_root=config[section_name].error_dir,
log_type="OOPS",
=== modified file 'lib/lp/registry/model/persontransferjob.py'
--- lib/lp/registry/model/persontransferjob.py 2010-09-28 18:57:50 +0000
+++ lib/lp/registry/model/persontransferjob.py 2010-09-28 18:57:55 +0000
@@ -104,16 +104,6 @@
# but the DB representation is unicode.
self._json_data = json_data.decode('utf-8')
- @classmethod
- def get(cls, key):
- """Return the instance of this class whose key is supplied."""
- store = IMasterStore(PersonTransferJob)
- instance = store.get(cls, key)
- if instance is None:
- raise SQLObjectNotFound(
- 'No occurrence of %s has key %s' % (cls.__name__, key))
- return instance
-
class PersonTransferJobDerived(BaseRunnableJob):
"""Intermediate class for deriving from PersonTransferJob.
@@ -177,6 +167,16 @@
])
return vars
+ @classmethod
+ def get(cls, id):
+ """Return the instance of this class whose id is supplied."""
+ store = IMasterStore(PersonTransferJob)
+ instance = store.get(PersonTransferJob, id)
+ if instance is None:
+ raise SQLObjectNotFound(
+ 'No occurrence of PersonTransferJob with id %s' % id)
+ return cls(instance)
+
class MembershipNotificationJob(PersonTransferJobDerived):
"""A Job that sends notifications about team membership changes."""
=== modified file 'lib/lp/registry/model/teammembership.py'
--- lib/lp/registry/model/teammembership.py 2010-09-28 18:57:50 +0000
+++ lib/lp/registry/model/teammembership.py 2010-09-28 18:57:55 +0000
@@ -5,7 +5,6 @@
__metaclass__ = type
__all__ = [
- 'sendStatusChangeNotification',
'TeamMembership',
'TeamMembershipSet',
'TeamParticipation',
@@ -480,7 +479,8 @@
from lp.registry.model.person import Person
conditions.append(TeamMembership.team == Person.id)
conditions.append(
- Person.renewal_policy != TeamMembershipRenewalPolicy.AUTOMATIC)
+ Person.renewal_policy
+ != TeamMembershipRenewalPolicy.AUTOMATIC)
return IStore(TeamMembership).find(TeamMembership, *conditions)
=== added file 'lib/lp/registry/tests/test_membership_notification_job_creation.py'
--- lib/lp/registry/tests/test_membership_notification_job_creation.py 1970-01-01 00:00:00 +0000
+++ lib/lp/registry/tests/test_membership_notification_job_creation.py 2010-09-28 18:57:55 +0000
@@ -0,0 +1,60 @@
+# Copyright 2010 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Test that MembershipNotificationJobs are created appropriately."""
+
+__metaclass__ = type
+
+import transaction
+from zope.component import getUtility
+
+from canonical.testing import LaunchpadFunctionalLayer
+from lp.registry.interfaces.person import IPersonSet
+from lp.registry.interfaces.persontransferjob import (
+ IMembershipNotificationJobSource,
+ )
+from lp.registry.interfaces.teammembership import (
+ ITeamMembershipSet,
+ TeamMembershipStatus,
+ )
+from lp.testing import (
+ login,
+ login_person,
+ TestCaseWithFactory,
+ )
+
+
+class CreateMembershipNotificationJobTest(TestCaseWithFactory):
+ """Test that MembershipNotificationJobs are created appropriately."""
+ layer = LaunchpadFunctionalLayer
+
+ def setUp(self):
+ super(CreateMembershipNotificationJobTest, self).setUp()
+ self.person = self.factory.makePerson(name='murdock')
+ self.team = self.factory.makeTeam(name='a-team')
+ self.job_source = getUtility(IMembershipNotificationJobSource)
+
+ def test_setstatus_admin(self):
+ login_person(self.team.teamowner)
+ self.team.addMember(self.person, self.team.teamowner)
+ membership_set = getUtility(ITeamMembershipSet)
+ tm = membership_set.getByPersonAndTeam(self.person, self.team)
+ tm.setStatus(TeamMembershipStatus.ADMIN, self.team.teamowner)
+ transaction.commit()
+ jobs = list(self.job_source.iterReady())
+ self.assertEqual(
+ ('[<MEMBERSHIP_NOTIFICATION branch job (1) '
+ 'for murdock as part of a-team. status=Waiting>]'),
+ str(jobs))
+
+ def test_setstatus_silent(self):
+ login('admin@xxxxxxxxxxxxx')
+ person_set = getUtility(IPersonSet)
+ admin = person_set.getByEmail('admin@xxxxxxxxxxxxx')
+ self.team.addMember(self.person, self.team.teamowner)
+ membership_set = getUtility(ITeamMembershipSet)
+ tm = membership_set.getByPersonAndTeam(self.person, self.team)
+ tm.setStatus(
+ TeamMembershipStatus.ADMIN, admin, silent=True)
+ transaction.commit()
+ self.assertEqual([], list(self.job_source.iterReady()))
=== added file 'lib/lp/registry/tests/test_process_job_sources_cronjob.py'
--- lib/lp/registry/tests/test_process_job_sources_cronjob.py 1970-01-01 00:00:00 +0000
+++ lib/lp/registry/tests/test_process_job_sources_cronjob.py 2010-09-28 18:57:55 +0000
@@ -0,0 +1,121 @@
+# Copyright 2010 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Test cron script for processing jobs from any job source class."""
+
+__metaclass__ = type
+
+import os
+import subprocess
+
+import transaction
+from zope.component import getUtility
+
+from canonical.config import config
+from canonical.testing import LaunchpadFunctionalLayer
+from lp.registry.interfaces.teammembership import (
+ ITeamMembershipSet,
+ TeamMembershipStatus,
+ )
+from lp.testing import (
+ login_person,
+ TestCaseWithFactory,
+ )
+
+
+class ProcessJobSourceTest(TestCaseWithFactory):
+ """Test the process-job-source.py script."""
+ layer = LaunchpadFunctionalLayer
+
+ def _run(self, *args):
+ script = os.path.join(
+ config.root, 'cronscripts', 'process-job-source.py')
+ cmd = [script] + list(args)
+ process = subprocess.Popen(
+ cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ (stdout, empty_stderr) = process.communicate()
+ return stdout
+
+ def test_missing_argument(self):
+ output = self._run()
+ self.assertIn('Usage:', output)
+ self.assertIn('process-job-source.py [options] JOB_SOURCE', output)
+
+ def test_empty_queue(self):
+ output = self._run('IMembershipNotificationJobSource')
+ expected = (
+ 'INFO Creating lockfile: /var/lock/launchpad-process-job-'
+ 'source-IMembershipNotificationJobSource.lock\n'
+ 'INFO Running synchronously.\n')
+ self.assertEqual(expected, output)
+
+ def test_processed(self):
+ person = self.factory.makePerson(name='murdock')
+ team = self.factory.makeTeam(name='a-team')
+ login_person(team.teamowner)
+ team.addMember(person, team.teamowner)
+ membership_set = getUtility(ITeamMembershipSet)
+ tm = membership_set.getByPersonAndTeam(person, team)
+ tm.setStatus(TeamMembershipStatus.ADMIN, team.teamowner)
+ transaction.commit()
+ output = self._run('-v', 'IMembershipNotificationJobSource')
+ self.assertIn(
+ ('DEBUG Running <MEMBERSHIP_NOTIFICATION branch job (1) '
+ 'for murdock as part of a-team. status=Waiting>'),
+ output)
+ self.assertIn('DEBUG MembershipNotificationJob sent email', output)
+ self.assertIn('Ran 1 MembershipNotificationJob jobs.', output)
+
+
+class ProcessJobSourceGroupsTest(TestCaseWithFactory):
+ """Test the process-job-source-groups.py script."""
+ layer = LaunchpadFunctionalLayer
+
+ def _run(self, *args):
+ script = os.path.join(
+ config.root, 'cronscripts', 'process-job-source-groups.py')
+ cmd = [script] + list(args)
+ process = subprocess.Popen(
+ cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ (stdout, empty_stderr) = process.communicate()
+ return stdout
+
+ def test_missing_argument(self):
+ output = self._run()
+ self.assertIn(
+ ('Usage: process-job-source-groups.py '
+ '[ -e JOB_SOURCE ] GROUP [GROUP]...'),
+ output)
+ self.assertIn('-e JOB_SOURCE, --exclude=JOB_SOURCE', output)
+ self.assertIn('At least one group must be specified.', output)
+ self.assertIn('Group: MAIN\n IMembershipNotificationJobSource',
+ output)
+
+ def test_empty_queue(self):
+ output = self._run('MAIN')
+ expected = (
+ 'INFO Creating lockfile: /var/lock/launchpad-'
+ 'processjobsourcegroups.lock\n'
+ 'INFO Creating lockfile: /var/lock/launchpad-process-job-'
+ 'source-IMembershipNotificationJobSource.lock\n'
+ 'INFO Running synchronously.\n')
+ self.assertEqual(expected, output)
+
+ def test_processed(self):
+ person = self.factory.makePerson(name='murdock')
+ team = self.factory.makeTeam(name='a-team')
+ login_person(team.teamowner)
+ team.addMember(person, team.teamowner)
+ membership_set = getUtility(ITeamMembershipSet)
+ tm = membership_set.getByPersonAndTeam(person, team)
+ tm.setStatus(TeamMembershipStatus.ADMIN, team.teamowner)
+ transaction.commit()
+ output = self._run('-v', 'MAIN')
+ self.assertIn(
+ ('DEBUG Running <MEMBERSHIP_NOTIFICATION branch job (1) '
+ 'for murdock as part of a-team. status=Waiting>'),
+ output)
+ self.assertIn('DEBUG MembershipNotificationJob sent email', output)
+ self.assertIn('Ran 1 MembershipNotificationJob jobs.', output)
=== modified file 'lib/lp/services/job/interfaces/job.py'
--- lib/lp/services/job/interfaces/job.py 2010-08-20 20:31:18 +0000
+++ lib/lp/services/job/interfaces/job.py 2010-09-28 18:57:55 +0000
@@ -166,3 +166,9 @@
def contextManager():
"""Get a context for running this kind of job in."""
+
+ def get(id):
+ """Get a job by its id.
+
+ This is currently only needed by the TwistedJobRunner.
+ """
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2010-09-20 09:48:58 +0000
+++ lib/lp/services/job/runner.py 2010-09-28 18:57:55 +0000
@@ -77,7 +77,6 @@
# We redefine __eq__ and __ne__ here to prevent the security proxy
# from mucking up our comparisons in tests and elsewhere.
-
def __eq__(self, job):
return (
self.__class__ is removeSecurityProxy(job.__class__)
@@ -337,13 +336,14 @@
"""Run Jobs via twisted."""
def __init__(self, job_source, dbuser, logger=None, error_utility=None):
- env = {'PYTHONPATH': os.environ['PYTHONPATH'],
- 'PATH': os.environ['PATH']}
+ env = {'PATH': os.environ['PATH']}
+ if 'PYTHONPATH' in os.environ:
+ env['PYTHONPATH'] = os.environ['PYTHONPATH']
lp_config = os.environ.get('LPCONFIG')
if lp_config is not None:
env['LPCONFIG'] = lp_config
starter = main.ProcessStarter(
- packages=('twisted', 'ampoule'), env=env)
+ packages=('_pythonpath', 'twisted', 'ampoule'), env=env)
super(TwistedJobRunner, self).__init__(logger, error_utility)
self.job_source = job_source
import_name = '%s.%s' % (
@@ -367,7 +367,8 @@
transaction.commit()
job_id = job.id
deadline = timegm(job.lease_expires.timetuple())
- self.logger.debug('Running %r, lease expires %s', job, job.lease_expires)
+ self.logger.debug(
+ 'Running %r, lease expires %s', job, job.lease_expires)
deferred = self.pool.doWork(
RunJobCommand, job_id = job_id, _deadline=deadline)
def update(response):
@@ -459,7 +460,13 @@
return sorted(counts.items())
def main(self):
- errorlog.globalErrorUtility.configure(self.config_name)
+ section = config[self.config_name]
+ if (getattr(section, 'error_dir', None) is not None
+ and getattr(section, 'oops_prefix', None) is not None
+ and getattr(section, 'copy_to_zlog', None) is not None):
+ # If the three variables are not set, we will let the error
+ # utility default to using the [error_reports] config.
+ errorlog.globalErrorUtility.configure(self.config_name)
job_source = getUtility(self.source_interface)
runner = self.runner_class.runFromSource(
job_source, self.dbuser, self.logger)