← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:pjsg-serial into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:pjsg-serial into launchpad:master.

Commit message:
Stop process-job-source-groups running job sources in parallel

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/383329

In theory it makes sense to run these in parallel, since they're largely independent.  In practice most of the process-job-source children spend quite a while starting up only to find that they have nothing to do; furthermore, we have enough different job sources now that starting all of these up at once causes extremely spiky memory use patterns and occasionally causes the relevant system to run out of memory.

Instead, teach process-job-source to run multiple job sources, reconnecting to the database using different users as needed.  process-job-source-groups now just runs a single process-job-source child with all the selected job sources.  (It could be further modified to do this entirely in-process, but that can be done later.)
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:pjsg-serial into launchpad:master.
diff --git a/cronscripts/process-job-source-groups.py b/cronscripts/process-job-source-groups.py
index c233d0c..01ab9a7 100755
--- a/cronscripts/process-job-source-groups.py
+++ b/cronscripts/process-job-source-groups.py
@@ -19,6 +19,7 @@ from lp.services.config import config
 from lp.services.helpers import english_list
 from lp.services.propertycache import cachedproperty
 from lp.services.scripts.base import LaunchpadCronScript
+from lp.services.job.scripts.process_job_source import ProcessJobSource
 
 
 class LongEpilogHelpFormatter(IndentedHelpFormatter):
@@ -48,13 +49,6 @@ class ProcessJobSourceGroups(LaunchpadCronScript):
             '-e', '--exclude', dest='excluded_job_sources',
             metavar="JOB_SOURCE", default=[], action='append',
             help="Exclude specific job sources.")
-        self.parser.add_option(
-            '--wait', dest='do_wait', default=False, action='store_true',
-            help="Wait for the child processes to finish. This is useful "
-                 "for testing, but it shouldn't be used in a cronjob, since "
-                 "it would prevent the cronjob from processing new jobs "
-                 "if just one of the child processes is still processing, "
-                 "and each process only handles a single job source class.")
 
     def main(self):
         selected_groups = self.args
@@ -74,19 +68,16 @@ class ProcessJobSourceGroups(LaunchpadCronScript):
                         source, english_list(selected_groups, "or")))
             else:
                 selected_job_sources.remove(source)
+        if not selected_job_sources:
+            return
         # Process job sources.
         command = os.path.join(
             os.path.dirname(sys.argv[0]), 'process-job-source.py')
         child_args = [command]
         if self.options.verbose:
             child_args.append('-v')
-        children = []
-        for job_source in selected_job_sources:
-            child = subprocess.Popen(child_args + [job_source])
-            children.append(child)
-        if self.options.do_wait:
-            for child in children:
-                child.wait()
+        child_args.extend(sorted(selected_job_sources))
+        subprocess.check_call(child_args)
 
     @cachedproperty
     def all_job_sources(self):
diff --git a/cronscripts/process-job-source.py b/cronscripts/process-job-source.py
index e14c39d..98ff31f 100755
--- a/cronscripts/process-job-source.py
+++ b/cronscripts/process-job-source.py
@@ -9,4 +9,5 @@ from lp.services.job.scripts.process_job_source import ProcessJobSource
 
 if __name__ == '__main__':
     script = ProcessJobSource()
-    script.lock_and_run()
+    # ProcessJobSource handles its own locking.
+    script.run()
diff --git a/lib/lp/services/database/sqlbase.py b/lib/lp/services/database/sqlbase.py
index 9b0a6df..26e3e7a 100644
--- a/lib/lp/services/database/sqlbase.py
+++ b/lib/lp/services/database/sqlbase.py
@@ -8,6 +8,7 @@ __all__ = [
     'connect',
     'convert_storm_clause_to_string',
     'cursor',
+    'disconnect_stores',
     'flush_database_caches',
     'flush_database_updates',
     'get_transaction_timestamp',
@@ -50,6 +51,7 @@ from storm.locals import (
     Storm,
     )
 from storm.zope.interfaces import IZStorm
+import transaction
 from twisted.python.util import mergeFunctionMetadata
 from zope.component import getUtility
 from zope.interface import implementer
@@ -631,3 +633,24 @@ class cursor:
 def session_store():
     """Return a store connected to the session DB."""
     return getUtility(IZStorm).get('session', 'launchpad-session:')
+
+
+def disconnect_stores():
+    """Disconnect Storm stores.
+
+    Note that any existing Storm objects will be broken, so this should only
+    be used in situations where we can guarantee that we have no such object
+    references in hand (other than in Storm caches, which will be dropped as
+    a process of removing stores anyway).
+    """
+    zstorm = getUtility(IZStorm)
+    stores = [
+        store for name, store in zstorm.iterstores() if name != 'session']
+
+    # If we have any stores, abort the transaction and close them.
+    if stores:
+        for store in stores:
+            zstorm.remove(store)
+        transaction.abort()
+        for store in stores:
+            store.close()
diff --git a/lib/lp/services/database/tests/script_isolation.py b/lib/lp/services/database/tests/script_isolation.py
index e65b4e7..887c1b4 100644
--- a/lib/lp/services/database/tests/script_isolation.py
+++ b/lib/lp/services/database/tests/script_isolation.py
@@ -13,9 +13,11 @@ import warnings
 import transaction
 
 from lp.services.config import dbconfig
-from lp.services.database.sqlbase import cursor
+from lp.services.database.sqlbase import (
+    cursor,
+    disconnect_stores,
+    )
 from lp.services.scripts import execute_zcml_for_scripts
-from lp.testing.layers import disconnect_stores
 
 execute_zcml_for_scripts()
 
diff --git a/lib/lp/services/database/tests/test_isolation_changes.py b/lib/lp/services/database/tests/test_isolation_changes.py
index 02ae087..3104776 100644
--- a/lib/lp/services/database/tests/test_isolation_changes.py
+++ b/lib/lp/services/database/tests/test_isolation_changes.py
@@ -22,12 +22,10 @@ from lp.services.config import dbconfig
 from lp.services.database.sqlbase import (
     connect,
     cursor,
-    ISOLATION_LEVEL_SERIALIZABLE,
-    )
-from lp.testing.layers import (
     disconnect_stores,
-    LaunchpadZopelessLayer,
+    ISOLATION_LEVEL_SERIALIZABLE,
     )
+from lp.testing.layers import LaunchpadZopelessLayer
 
 
 def set_isolation_level(isolation):
diff --git a/lib/lp/services/job/celeryjob.py b/lib/lp/services/job/celeryjob.py
index ee08371..793330a 100644
--- a/lib/lp/services/job/celeryjob.py
+++ b/lib/lp/services/job/celeryjob.py
@@ -28,23 +28,18 @@ from celery import (
     Task,
     )
 from lazr.jobrunner.celerytask import RunJob
-from storm.zope.interfaces import IZStorm
 import transaction
-from zope.component import getUtility
 
 from lp.code.model.branchjob import BranchScanJob
 from lp.scripts.helpers import TransactionFreeOperation
 from lp.services.config import dbconfig
-from lp.services.database.interfaces import IStore
+from lp.services.database.sqlbase import disconnect_stores
 from lp.services.features import (
     install_feature_controller,
     make_script_feature_controller,
     )
 from lp.services.mail.sendmail import set_immediate_mail_delivery
-from lp.services.job.model.job import (
-    Job,
-    UniversalJobSource,
-    )
+from lp.services.job.model.job import UniversalJobSource
 from lp.services.job.runner import (
     BaseJobRunner,
     celery_enabled,
@@ -168,9 +163,6 @@ def task_init(dbuser):
     :param dbuser: The database user to use for running the task.
     """
     ensure_zcml()
-    transaction.abort()
-    store = IStore(Job)
-    getUtility(IZStorm).remove(store)
-    store.close()
+    disconnect_stores()
     dbconfig.override(dbuser=dbuser, isolation_level='read_committed')
     install_feature_controller(make_script_feature_controller('celery'))
diff --git a/lib/lp/services/job/scripts/process_job_source.py b/lib/lp/services/job/scripts/process_job_source.py
index 2bb34c2..ad909f0 100644
--- a/lib/lp/services/job/scripts/process_job_source.py
+++ b/lib/lp/services/job/scripts/process_job_source.py
@@ -6,29 +6,32 @@
 __metaclass__ = type
 
 from collections import defaultdict
+import logging
 import sys
 
 from twisted.python import log
 from zope.component import getUtility
 
 from lp.services.config import config
+from lp.services.database.sqlbase import disconnect_stores
 from lp.services.job import runner
-from lp.services.scripts.base import LaunchpadCronScript
+from lp.services.scripts.base import (
+    LaunchpadCronScript,
+    LaunchpadScript,
+    SilentLaunchpadScriptFailure,
+    )
+from lp.services.scripts.logger import OopsHandler
 from lp.services.webapp import errorlog
 
 
-class ProcessJobSource(LaunchpadCronScript):
-    """Run jobs for a specified job source class."""
-    usage = (
-        "Usage: %prog [options] JOB_SOURCE\n\n"
-        "For more help, run:\n"
-        "    cronscripts/process-job-source.py --help")
+class ProcessSingleJobSource(LaunchpadCronScript):
+    """Run jobs for a single specified job source class.
 
-    description = (
-        "Takes pending jobs of the given type off the queue and runs them.")
+    This is for internal use by the L{ProcessJobSource} wrapper.
+    """
 
-    def __init__(self, test_args=None):
-        super(ProcessJobSource, self).__init__(test_args=test_args)
+    def __init__(self, *args, **kwargs):
+        super(ProcessSingleJobSource, self).__init__(*args, **kwargs)
         # The fromlist argument is necessary so that __import__()
         # returns the bottom submodule instead of the top one.
         module = __import__(self.config_section.module,
@@ -64,6 +67,7 @@ class ProcessJobSource(LaunchpadCronScript):
         # Override attributes that are normally set in __init__().
         return getattr(runner, runner_class_name)
 
+    # Keep this in sync with ProcessJobSource.add_my_options.
     def add_my_options(self):
         self.parser.add_option(
             '--log-twisted', action='store_true', default=False,
@@ -74,7 +78,7 @@ class ProcessJobSource(LaunchpadCronScript):
             self.parser.print_help()
             sys.exit(1)
         self.job_source_name = self.args[0]
-        super(ProcessJobSource, self).handle_options()
+        super(ProcessSingleJobSource, self).handle_options()
 
     def job_counts(self, jobs):
         """Return a list of tuples containing the job name and counts."""
@@ -83,9 +87,23 @@ class ProcessJobSource(LaunchpadCronScript):
             counts[job.__class__.__name__] += 1
         return sorted(counts.items())
 
+    def _init_zca(self, use_web_security):
+        """Do nothing; already done by ProcessJobSource."""
+        pass
+
+    def _init_db(self, isolation):
+        """Switch to the appropriate database user.
+
+        We may be running jobs from multiple different job sources
+        consecutively, so we need to disconnect existing Storm stores which
+        may be using a connection with a different user.  Any existing Storm
+        objects will be broken, but this script never holds object
+        references across calls to this method.
+        """
+        disconnect_stores()
+        super(ProcessSingleJobSource, self)._init_db(isolation)
+
     def main(self):
-        if self.options.verbose:
-            log.startLogging(sys.stdout)
         errorlog.globalErrorUtility.configure(self.config_name)
         job_source = getUtility(self.source_interface)
         kwargs = {}
@@ -97,3 +115,61 @@ class ProcessJobSource(LaunchpadCronScript):
             self.logger.info('Ran %d %s jobs.', count, name)
         for name, count in self.job_counts(runner.incomplete_jobs):
             self.logger.info('%d %s jobs did not complete.', count, name)
+
+
+class ProcessJobSource(LaunchpadScript):
+    """Run jobs for specified job source classes."""
+    usage = (
+        "Usage: %prog [options] JOB_SOURCE [JOB_SOURCE ...]\n\n"
+        "For more help, run:\n"
+        "    cronscripts/process-job-source.py --help")
+
+    description = (
+        "Takes pending jobs of the given type(s) off the queue and runs them.")
+
+    name = 'process-job-source'
+
+    # Keep this in sync with ProcessSingleJobSource.add_my_options.
+    def add_my_options(self):
+        self.parser.add_option(
+            '--log-twisted', action='store_true', default=False,
+            help='Enable extra Twisted logging.')
+
+    def handle_options(self):
+        if len(self.args) < 1:
+            self.parser.print_help()
+            sys.exit(1)
+        self.job_source_names = self.args
+        super(ProcessJobSource, self).handle_options()
+
+    def main(self):
+        if self.options.verbose:
+            log.startLogging(sys.stdout)
+        failure_count = 0
+        for job_source_name in self.job_source_names:
+            self.logger.info(
+                'Calling ProcessSingleJobSource(%s)', job_source_name)
+            script = ProcessSingleJobSource(
+                test_args=[job_source_name], logger=self.logger)
+            # This is easier than unparsing all the possible options.
+            script.options = self.options
+            try:
+                script.lock_and_run()
+            except SystemExit:
+                # We're acting somewhat as though each job source were being
+                # handled by a separate process.  If the lock file for a
+                # given job source is locked, or if one of them raises
+                # LaunchpadScriptFailure and so causes LaunchpadScript to
+                # call sys.exit, carry on to the next job source.  Other
+                # ordinarily-fatal exceptions are left alone.
+                failure_count += 1
+            # Disable the OOPS handler added by this script, as otherwise
+            # we'll get duplicate OOPSes if anything goes wrong in second or
+            # subsequent job sources.
+            root_logger = logging.getLogger()
+            for handler in list(root_logger.handlers):
+                if isinstance(handler, OopsHandler):
+                    root_logger.removeHandler(handler)
+        if failure_count:
+            self.logger.error('%d job sources failed.' % failure_count)
+            raise SilentLaunchpadScriptFailure(failure_count)
diff --git a/lib/lp/services/job/scripts/tests/__init__.py b/lib/lp/services/job/scripts/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lp/services/job/scripts/tests/__init__.py
diff --git a/lib/lp/registry/tests/test_process_job_sources_cronjob.py b/lib/lp/services/job/scripts/tests/test_process_job_source.py
similarity index 88%
rename from lib/lp/registry/tests/test_process_job_sources_cronjob.py
rename to lib/lp/services/job/scripts/tests/test_process_job_source.py
index 331546e..52c7499 100644
--- a/lib/lp/registry/tests/test_process_job_sources_cronjob.py
+++ b/lib/lp/services/job/scripts/tests/test_process_job_source.py
@@ -24,10 +24,10 @@ from lp.testing.layers import LaunchpadScriptLayer
 from lp.testing.matchers import DocTestMatches
 
 
-class ProcessJobSourceConfigTest(TestCase):
+class ProcessSingleJobSourceConfigTest(TestCase):
     """
-    This test case is specific for unit testing ProcessJobSource's usage of
-    config.
+    This test case is specific for unit testing ProcessSingleJobSource's
+    usage of config.
     """
     def test_config_section_link(self):
         module_name = "lp.code.interfaces.branchmergeproposal"
@@ -35,7 +35,7 @@ class ProcessJobSourceConfigTest(TestCase):
         self.pushConfig("IUpdatePreviewDiffJobSource",
                         link="IBranchMergeProposalJobSource")
 
-        proc = process_job_source.ProcessJobSource(
+        proc = process_job_source.ProcessSingleJobSource(
             test_args=['IUpdatePreviewDiffJobSource'])
         self.assertEqual(proc.config_section.module, module_name)
 
@@ -120,9 +120,9 @@ class ProcessJobSourceGroupsTest(TestCaseWithFactory):
         self.assertIn('Group: MAIN\n    I', output)
 
     def test_empty_queue(self):
-        # The script should just launch a child for each job source class,
-        # and then exit if no jobs are in the queue.  It should not create
-        # its own lockfile.
+        # The script should just run over each job source class, and then
+        # exit if no jobs are in the queue.  It should not create its own
+        # lockfile.
         returncode, output, error = run_script(self.script, ['MAIN'])
         expected = (
             '.*Creating lockfile:.*launchpad-process-job-'
@@ -132,7 +132,7 @@ class ProcessJobSourceGroupsTest(TestCaseWithFactory):
 
     def test_processed(self):
         # The script should output the number of jobs that have been
-        # processed by its child processes.
+        # processed.
         person = self.factory.makePerson(name='murdock')
         team = self.factory.makeTeam(name='a-team')
         login_person(team.teamowner)
@@ -141,8 +141,7 @@ class ProcessJobSourceGroupsTest(TestCaseWithFactory):
         tm = membership_set.getByPersonAndTeam(person, team)
         tm.setStatus(TeamMembershipStatus.ADMIN, team.teamowner)
         transaction.commit()
-        returncode, output, error = run_script(
-            self.script, ['-v', '--wait', 'MAIN'])
+        returncode, output, error = run_script(self.script, ['-v', 'MAIN'])
         self.assertTextMatchesExpressionIgnoreWhitespace(
             ('INFO Running <MembershipNotificationJob '
              'about ~murdock in ~a-team; status=Waiting>'),
diff --git a/lib/lp/testing/layers.py b/lib/lp/testing/layers.py
index c12718f..fa89018 100644
--- a/lib/lp/testing/layers.py
+++ b/lib/lp/testing/layers.py
@@ -47,7 +47,6 @@ __all__ = [
     'ZopelessAppServerLayer',
     'ZopelessDatabaseLayer',
     'ZopelessLayer',
-    'disconnect_stores',
     'reconnect_stores',
     ]
 
@@ -83,7 +82,6 @@ from six.moves.urllib.parse import (
     urlparse,
     )
 from six.moves.urllib.request import urlopen
-from storm.zope.interfaces import IZStorm
 import transaction
 from webob.request import environ_from_url as orig_environ_from_url
 import wsgi_intercept
@@ -119,7 +117,10 @@ from lp.services.config.fixture import (
     ConfigUseFixture,
     )
 from lp.services.database.interfaces import IStore
-from lp.services.database.sqlbase import session_store
+from lp.services.database.sqlbase import (
+    disconnect_stores,
+    session_store,
+    )
 from lp.services.encoding import wsgi_native_string
 from lp.services.job.tests import celery_worker
 from lp.services.librarian.model import LibraryFileAlias
@@ -197,21 +198,6 @@ def is_ca_available():
         return True
 
 
-def disconnect_stores():
-    """Disconnect Storm stores."""
-    zstorm = getUtility(IZStorm)
-    stores = [
-        store for name, store in zstorm.iterstores() if name != 'session']
-
-    # If we have any stores, abort the transaction and close them.
-    if stores:
-        for store in stores:
-            zstorm.remove(store)
-        transaction.abort()
-        for store in stores:
-            store.close()
-
-
 def reconnect_stores(reset=False):
     """Reconnect Storm stores, resetting the dbconfig to its defaults.