← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~abentley/launchpad/sync-pending-jobs into lp:launchpad

 

Aaron Bentley has proposed merging lp:~abentley/launchpad/sync-pending-jobs into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~abentley/launchpad/sync-pending-jobs/+merge/105715

= Summary =
Implement a periodic Celery task to reschedule pending jobs that aren't
scheduled.

== Proposed fix ==


== Pre-implementation notes ==
Discussed with deryck, abel

== LOC Rationale ==
Part of an arc that will ultimately remove code.  And I have a credit of 2110
lines at the moment.

== Implementation details ==
To allow the periodic task to run in a correctly-initialized environment, extracted task_init from UniversalJobSource.

Implemented run_missing_ready in terms of find_missing_ready.

Implemented find_missing_ready in terms of list_queued.

Updated lazr.jobrunner dependency to 0.6 so that list_queued is available.

Implemented CeleryRunJob.run so that tasks are initialized by the task, not by UniversalJobSource.

Renamed UniversalJobSource.rawGet to get, since UniversalJobSource.get no longer needs to do initialization.


== Tests ==
bin/test test_celeryjob

== Demo and Q/A ==
Disable the scan_branches script.  Remove BranchScanJob from jobs.celery.enabled_classes.  Push a branch.  Restore BranchScanJob to jobs.celery.enabled_classes.  Run CeleryBeat.  It should schedule at least one job.  (Probably two.)  That jobs should run via Celery.

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/runner.py
  .bzrignore
  versions.cfg
  lib/lp/services/job/celeryjob.py
  lib/lp/services/job/tests/test_celeryjob.py
  lib/lp/services/job/tests/test_job.py
  lib/lp/services/job/celeryconfig.py
  lib/lp/services/job/model/job.py
  lib/lp/services/job/tests/__init__.py
-- 
https://code.launchpad.net/~abentley/launchpad/sync-pending-jobs/+merge/105715
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/sync-pending-jobs into lp:launchpad.
=== modified file '.bzrignore'
--- .bzrignore	2012-03-14 12:01:42 +0000
+++ .bzrignore	2012-05-14 20:22:21 +0000
@@ -77,3 +77,4 @@
 .emacs.desktop
 callgrind.out.*
 scripts/mlist-sync.py
+./celerybeat-schedule

=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py	2012-05-10 10:27:47 +0000
+++ lib/lp/services/job/celeryconfig.py	2012-05-14 20:22:21 +0000
@@ -2,7 +2,9 @@
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 import argparse
+from datetime import timedelta
 import sys
+
 from lp.services.config import config
 
 
@@ -79,6 +81,7 @@
     result['CELERY_IMPORTS'] = ("lp.services.job.celeryjob", )
     result['CELERY_QUEUES'] = celery_queues
     result['CELERY_RESULT_BACKEND'] = 'amqp'
+<<<<<<< TREE
     # See http://ask.github.com/celery/userguide/optimizing.html:
     # The AMQP message of a job should stay in the RabbitMQ server
     # until the job has been finished. This allows to simply kill
@@ -87,6 +90,14 @@
     result['CELERYD_PREFETCH_MULTIPLIER'] = 1
     result['CELERY_ACKS_LATE'] = True
 
+=======
+    result['CELERYBEAT_SCHEDULE'] = {
+        'schedule-missing': {
+            'task': 'lp.services.job.celeryjob.run_missing_ready',
+            'schedule': timedelta(seconds=600)
+        }
+    }
+>>>>>>> MERGE-SOURCE
     return result
 
 try:

=== modified file 'lib/lp/services/job/celeryjob.py'
--- lib/lp/services/job/celeryjob.py	2012-04-11 18:14:33 +0000
+++ lib/lp/services/job/celeryjob.py	2012-05-14 20:22:21 +0000
@@ -14,13 +14,32 @@
     'CeleryRunJobIgnoreResult',
     ]
 
+from logging import info
 import os
 
 os.environ.setdefault('CELERY_CONFIG_MODULE', 'lp.services.job.celeryconfig')
+from celery.task import task
 from lazr.jobrunner.celerytask import RunJob
+from storm.zope.interfaces import IZStorm
+import transaction
+from zope.component import getUtility
 
-from lp.services.job.model.job import UniversalJobSource
-from lp.services.job.runner import BaseJobRunner
+from lp.code.model.branchjob import BranchScanJob
+from lp.services.config import dbconfig
+from lp.services.database.lpstorm import IStore
+from lp.services.features import (
+    install_feature_controller,
+    make_script_feature_controller,
+    )
+from lp.services.job.model.job import (
+    Job,
+    UniversalJobSource,
+    )
+from lp.services.job.runner import (
+    BaseJobRunner,
+    celery_enabled,
+    )
+from lp.services import scripts
 
 
 class CeleryRunJob(RunJob):
@@ -32,7 +51,73 @@
         """Return a BaseJobRunner, to support customization."""
         return BaseJobRunner()
 
+    def run(self, job_id, dbuser):
+        """Run the specified job.
+
+        :param job_id: The job to run, as expected by UniversalJobSource.
+            (Job.id, module_name, class_name)
+        :param dbuser: The database user to run under.  This should match the
+            dbuser specified by the job's config.
+        """
+        task_init(dbuser)
+        super(CeleryRunJob, self).run(job_id)
+
 
 class CeleryRunJobIgnoreResult(CeleryRunJob):
 
     ignore_result = True
+
+
+def find_missing_ready(job_source):
+    """Find ready jobs that are not queued."""
+    from lp.services.job.celeryjob import CeleryRunJob
+    from lazr.jobrunner.celerytask import list_queued
+    queued_job_ids = set(task[1][0][0] for task in list_queued(
+        CeleryRunJob.app, [job_source.task_queue]))
+    return [job for job in job_source.iterReady() if job.job_id not in
+            queued_job_ids]
+
+
+@task
+def run_missing_ready(_no_init=False):
+    """Task to run any jobs that are ready but not scheduled.
+
+    Currently supports only BranchScanJob.
+    :param _no_init: For tests.  If True, do not perform the initialization.
+    """
+    if not _no_init:
+        task_init()
+    count = 0
+    for job in find_missing_ready(BranchScanJob):
+        if not celery_enabled(job.__class__.__name__):
+            continue
+        job.celeryCommitHook(True)
+        count += 1
+    info('Scheduled %d missing jobs.', count)
+
+
+needs_zcml = True
+
+
+def ensure_zcml():
+    """Ensure the zcml has been executed for the current process."""
+    global needs_zcml
+    if not needs_zcml:
+        return
+    transaction.abort()
+    scripts.execute_zcml_for_scripts(use_web_security=False)
+    needs_zcml = False
+
+
+def task_init(dbuser):
+    """Prepare to run a task.
+
+    :param dbuser: The database user to use for running the task.
+    """
+    ensure_zcml()
+    transaction.abort()
+    store = IStore(Job)
+    getUtility(IZStorm).remove(store)
+    store.close()
+    dbconfig.override(dbuser=dbuser, isolation_level='read_committed')
+    install_feature_controller(make_script_feature_controller('celery'))

=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py	2012-04-26 19:45:32 +0000
+++ lib/lp/services/job/model/job.py	2012-05-14 20:22:21 +0000
@@ -33,12 +33,9 @@
     Int,
     Reference,
     )
-from storm.zope.interfaces import IZStorm
 import transaction
-from zope.component import getUtility
 from zope.interface import implements
 
-from lp.services.config import dbconfig
 from lp.services.database import bulk
 from lp.services.database.constants import UTC_NOW
 from lp.services.database.datetimecol import UtcDateTimeCol
@@ -49,7 +46,6 @@
     IJob,
     JobStatus,
     )
-from lp.services import scripts
 
 
 UTC = pytz.timezone('UTC')
@@ -255,10 +251,15 @@
 
     memory_limit = 2 * (1024 ** 3)
 
-    needs_init = True
-
     @staticmethod
-    def rawGet(job_id, module_name, class_name):
+    def get(ujob_id):
+        """Return the named job database class.
+
+        :param ujob_id: A tuple of Job.id, module name, class name for the
+            class to retrieve.
+        Return derived job class.
+        """
+        job_id, module_name, class_name = ujob_id
         bc_module = __import__(module_name, fromlist=[class_name])
         db_class = getattr(bc_module, class_name)
         store = IStore(db_class)
@@ -266,16 +267,3 @@
         if db_job is None:
             return None
         return db_job.makeDerived()
-
-    @classmethod
-    def get(cls, ujob_id):
-        if cls.needs_init:
-            transaction.abort()
-            scripts.execute_zcml_for_scripts(use_web_security=False)
-            cls.needs_init = False
-        transaction.abort()
-        store = IStore(Job)
-        getUtility(IZStorm).remove(store)
-        store.close()
-        dbconfig.override(dbuser=ujob_id[3], isolation_level='read_committed')
-        return cls.rawGet(*ujob_id[:3])

=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2012-05-01 23:29:49 +0000
+++ lib/lp/services/job/runner.py	2012-05-14 20:22:21 +0000
@@ -201,10 +201,9 @@
         else:
             cls = CeleryRunJob
         db_class = self.getDBClass()
-        ujob_id = (
-            self.job_id, db_class.__module__, db_class.__name__,
-            self.config.dbuser)
-        return cls.apply_async((ujob_id,), queue=self.task_queue)
+        ujob_id = (self.job_id, db_class.__module__, db_class.__name__)
+        return cls.apply_async(
+            (ujob_id, self.config.dbuser), queue=self.task_queue)
 
     def getDBClass(self):
         return self.context.__class__

=== modified file 'lib/lp/services/job/tests/__init__.py'
--- lib/lp/services/job/tests/__init__.py	2012-04-23 20:41:31 +0000
+++ lib/lp/services/job/tests/__init__.py	2012-05-14 20:22:21 +0000
@@ -73,6 +73,12 @@
                     'oops', text_content(str(oops)))
 
 
+def drain_celery_queues():
+    from lazr.jobrunner.celerytask import drain_queues
+    from lp.services.job.celeryjob import CeleryRunJob
+    drain_queues(CeleryRunJob.app, CeleryRunJob.app.conf.CELERY_QUEUES.keys())
+
+
 def pop_remote_notifications():
     """Pop the notifications from a celeryd worker."""
     from lp.services.job.tests.celery_helpers import pop_notifications

=== added file 'lib/lp/services/job/tests/test_celeryjob.py'
--- lib/lp/services/job/tests/test_celeryjob.py	1970-01-01 00:00:00 +0000
+++ lib/lp/services/job/tests/test_celeryjob.py	2012-05-14 20:22:21 +0000
@@ -0,0 +1,55 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+from lp.code.model.branchjob import BranchScanJob
+from lp.services.features.testing import FeatureFixture
+from lp.services.job.tests import (
+    drain_celery_queues,
+    monitor_celery,
+    )
+from lp.testing import TestCaseWithFactory
+from lp.testing.layers import ZopelessAppServerLayer
+
+
+class TestRunMissingJobs(TestCaseWithFactory):
+
+    layer = ZopelessAppServerLayer
+
+    def setUp(self):
+        super(TestRunMissingJobs, self).setUp()
+        from lp.services.job.celeryjob import (
+            find_missing_ready,
+            run_missing_ready,
+        )
+        self.find_missing_ready = find_missing_ready
+        self.run_missing_ready = run_missing_ready
+
+    def createMissingJob(self):
+        job = BranchScanJob.create(self.factory.makeBranch())
+        self.addCleanup(drain_celery_queues)
+        return job
+
+    def test_find_missing_ready(self):
+        """A job which is ready but not queued is "missing"."""
+        job = self.createMissingJob()
+        self.assertEqual([job], self.find_missing_ready(BranchScanJob))
+        job.runViaCelery()
+        self.assertEqual([], self.find_missing_ready(BranchScanJob))
+        drain_celery_queues()
+        self.assertEqual([job], self.find_missing_ready(BranchScanJob))
+
+    def test_run_missing_ready_not_enabled(self):
+        """run_missing_ready does nothing if the class isn't enabled."""
+        self.createMissingJob()
+        with monitor_celery() as responses:
+            self.run_missing_ready(_no_init=True)
+        self.assertEqual([], responses)
+
+    def test_run_missing_ready(self):
+        """run_missing_ready requests the job to run if not scheduled."""
+        self.createMissingJob()
+        self.useFixture(
+            FeatureFixture({'jobs.celery.enabled_classes': 'BranchScanJob'}))
+        with monitor_celery() as responses:
+            self.run_missing_ready(_no_init=True)
+        self.assertEqual(1, len(responses))

=== modified file 'lib/lp/services/job/tests/test_job.py'
--- lib/lp/services/job/tests/test_job.py	2012-04-26 19:42:04 +0000
+++ lib/lp/services/job/tests/test_job.py	2012-05-14 20:22:21 +0000
@@ -462,14 +462,15 @@
 
     layer = ZopelessDatabaseLayer
 
-    def test_rawGet_with_merge_proposal_job(self):
+    def test_get_with_merge_proposal_job(self):
+        """Getting a MergeProposalJob works and is efficient."""
         comment = self.factory.makeCodeReviewComment()
         job = CodeReviewCommentEmailJob.create(comment)
         job_id = job.job_id
         transaction.commit()
         with StormStatementRecorder() as recorder:
-            got_job = UniversalJobSource.rawGet(
-                job_id, 'lp.code.model.branchmergeproposaljob',
-                 'BranchMergeProposalJob')
+            got_job = UniversalJobSource.get(
+                (job_id, 'lp.code.model.branchmergeproposaljob',
+                 'BranchMergeProposalJob'))
         self.assertThat(recorder, HasQueryCount(Equals(1)))
         self.assertEqual(got_job, job)

=== modified file 'versions.cfg'
--- versions.cfg	2012-05-11 14:39:57 +0000
+++ versions.cfg	2012-05-14 20:22:21 +0000
@@ -42,7 +42,11 @@
 lazr.config = 1.1.3
 lazr.delegates = 1.2.0
 lazr.enum = 1.1.3
+<<<<<<< TREE
 lazr.jobrunner = 0.5
+=======
+lazr.jobrunner = 0.6
+>>>>>>> MERGE-SOURCE
 lazr.lifecycle = 1.1
 lazr.restful = 0.19.6
 lazr.restfulclient = 0.12.2


Follow ups