launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #07846
[Merge] lp:~abentley/launchpad/sync-pending-jobs into lp:launchpad
Aaron Bentley has proposed merging lp:~abentley/launchpad/sync-pending-jobs into lp:launchpad.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~abentley/launchpad/sync-pending-jobs/+merge/105715
= Summary =
Implement a periodic Celery task to reschedule pending jobs that aren't
scheduled.
== Proposed fix ==
== Pre-implementation notes ==
Discussed with deryck, abel
== LOC Rationale ==
Part of an arc that will ultimately remove code. And I have a credit of 2110
lines at the moment.
== Implementation details ==
To allow the periodic task to run in a correctly-initialized environment, extracted task_init from UniversalJobSource.
Implemented run_missing_ready in terms of find_missing_ready.
Implemented find_missing_ready in terms of list_queued.
Updated lazr.jobrunner dependency to 0.6 so that list_queued is available.
Implemented CeleryRunJob.run so that tasks are initialized by the task, not by UniversalJobSource.
Renamed UniversalJobSource.rawGet to get, since UniversalJobSource.get no longer needs to do initialization.
== Tests ==
bin/test test_celeryjob
== Demo and Q/A ==
Disable the scan_branches script. Remove BranchScanJob from jobs.celery.enabled_classes. Push a branch. Restore BranchScanJob to jobs.celery.enabled_classes. Run CeleryBeat. It should schedule at least one job. (Probably two.) That jobs should run via Celery.
= Launchpad lint =
Checking for conflicts and issues in changed files.
Linting changed files:
lib/lp/services/job/runner.py
.bzrignore
versions.cfg
lib/lp/services/job/celeryjob.py
lib/lp/services/job/tests/test_celeryjob.py
lib/lp/services/job/tests/test_job.py
lib/lp/services/job/celeryconfig.py
lib/lp/services/job/model/job.py
lib/lp/services/job/tests/__init__.py
--
https://code.launchpad.net/~abentley/launchpad/sync-pending-jobs/+merge/105715
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/sync-pending-jobs into lp:launchpad.
=== modified file '.bzrignore'
--- .bzrignore 2012-03-14 12:01:42 +0000
+++ .bzrignore 2012-05-14 20:22:21 +0000
@@ -77,3 +77,4 @@
.emacs.desktop
callgrind.out.*
scripts/mlist-sync.py
+./celerybeat-schedule
=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py 2012-05-10 10:27:47 +0000
+++ lib/lp/services/job/celeryconfig.py 2012-05-14 20:22:21 +0000
@@ -2,7 +2,9 @@
# GNU Affero General Public License version 3 (see the file LICENSE).
import argparse
+from datetime import timedelta
import sys
+
from lp.services.config import config
@@ -79,6 +81,7 @@
result['CELERY_IMPORTS'] = ("lp.services.job.celeryjob", )
result['CELERY_QUEUES'] = celery_queues
result['CELERY_RESULT_BACKEND'] = 'amqp'
+<<<<<<< TREE
# See http://ask.github.com/celery/userguide/optimizing.html:
# The AMQP message of a job should stay in the RabbitMQ server
# until the job has been finished. This allows to simply kill
@@ -87,6 +90,14 @@
result['CELERYD_PREFETCH_MULTIPLIER'] = 1
result['CELERY_ACKS_LATE'] = True
+=======
+ result['CELERYBEAT_SCHEDULE'] = {
+ 'schedule-missing': {
+ 'task': 'lp.services.job.celeryjob.run_missing_ready',
+ 'schedule': timedelta(seconds=600)
+ }
+ }
+>>>>>>> MERGE-SOURCE
return result
try:
=== modified file 'lib/lp/services/job/celeryjob.py'
--- lib/lp/services/job/celeryjob.py 2012-04-11 18:14:33 +0000
+++ lib/lp/services/job/celeryjob.py 2012-05-14 20:22:21 +0000
@@ -14,13 +14,32 @@
'CeleryRunJobIgnoreResult',
]
+from logging import info
import os
os.environ.setdefault('CELERY_CONFIG_MODULE', 'lp.services.job.celeryconfig')
+from celery.task import task
from lazr.jobrunner.celerytask import RunJob
+from storm.zope.interfaces import IZStorm
+import transaction
+from zope.component import getUtility
-from lp.services.job.model.job import UniversalJobSource
-from lp.services.job.runner import BaseJobRunner
+from lp.code.model.branchjob import BranchScanJob
+from lp.services.config import dbconfig
+from lp.services.database.lpstorm import IStore
+from lp.services.features import (
+ install_feature_controller,
+ make_script_feature_controller,
+ )
+from lp.services.job.model.job import (
+ Job,
+ UniversalJobSource,
+ )
+from lp.services.job.runner import (
+ BaseJobRunner,
+ celery_enabled,
+ )
+from lp.services import scripts
class CeleryRunJob(RunJob):
@@ -32,7 +51,73 @@
"""Return a BaseJobRunner, to support customization."""
return BaseJobRunner()
+ def run(self, job_id, dbuser):
+ """Run the specified job.
+
+ :param job_id: The job to run, as expected by UniversalJobSource.
+ (Job.id, module_name, class_name)
+ :param dbuser: The database user to run under. This should match the
+ dbuser specified by the job's config.
+ """
+ task_init(dbuser)
+ super(CeleryRunJob, self).run(job_id)
+
class CeleryRunJobIgnoreResult(CeleryRunJob):
ignore_result = True
+
+
+def find_missing_ready(job_source):
+ """Find ready jobs that are not queued."""
+ from lp.services.job.celeryjob import CeleryRunJob
+ from lazr.jobrunner.celerytask import list_queued
+ queued_job_ids = set(task[1][0][0] for task in list_queued(
+ CeleryRunJob.app, [job_source.task_queue]))
+ return [job for job in job_source.iterReady() if job.job_id not in
+ queued_job_ids]
+
+
+@task
+def run_missing_ready(_no_init=False):
+ """Task to run any jobs that are ready but not scheduled.
+
+ Currently supports only BranchScanJob.
+ :param _no_init: For tests. If True, do not perform the initialization.
+ """
+ if not _no_init:
+ task_init()
+ count = 0
+ for job in find_missing_ready(BranchScanJob):
+ if not celery_enabled(job.__class__.__name__):
+ continue
+ job.celeryCommitHook(True)
+ count += 1
+ info('Scheduled %d missing jobs.', count)
+
+
+needs_zcml = True
+
+
+def ensure_zcml():
+ """Ensure the zcml has been executed for the current process."""
+ global needs_zcml
+ if not needs_zcml:
+ return
+ transaction.abort()
+ scripts.execute_zcml_for_scripts(use_web_security=False)
+ needs_zcml = False
+
+
+def task_init(dbuser):
+ """Prepare to run a task.
+
+ :param dbuser: The database user to use for running the task.
+ """
+ ensure_zcml()
+ transaction.abort()
+ store = IStore(Job)
+ getUtility(IZStorm).remove(store)
+ store.close()
+ dbconfig.override(dbuser=dbuser, isolation_level='read_committed')
+ install_feature_controller(make_script_feature_controller('celery'))
=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py 2012-04-26 19:45:32 +0000
+++ lib/lp/services/job/model/job.py 2012-05-14 20:22:21 +0000
@@ -33,12 +33,9 @@
Int,
Reference,
)
-from storm.zope.interfaces import IZStorm
import transaction
-from zope.component import getUtility
from zope.interface import implements
-from lp.services.config import dbconfig
from lp.services.database import bulk
from lp.services.database.constants import UTC_NOW
from lp.services.database.datetimecol import UtcDateTimeCol
@@ -49,7 +46,6 @@
IJob,
JobStatus,
)
-from lp.services import scripts
UTC = pytz.timezone('UTC')
@@ -255,10 +251,15 @@
memory_limit = 2 * (1024 ** 3)
- needs_init = True
-
@staticmethod
- def rawGet(job_id, module_name, class_name):
+ def get(ujob_id):
+ """Return the named job database class.
+
+ :param ujob_id: A tuple of Job.id, module name, class name for the
+ class to retrieve.
+ Return derived job class.
+ """
+ job_id, module_name, class_name = ujob_id
bc_module = __import__(module_name, fromlist=[class_name])
db_class = getattr(bc_module, class_name)
store = IStore(db_class)
@@ -266,16 +267,3 @@
if db_job is None:
return None
return db_job.makeDerived()
-
- @classmethod
- def get(cls, ujob_id):
- if cls.needs_init:
- transaction.abort()
- scripts.execute_zcml_for_scripts(use_web_security=False)
- cls.needs_init = False
- transaction.abort()
- store = IStore(Job)
- getUtility(IZStorm).remove(store)
- store.close()
- dbconfig.override(dbuser=ujob_id[3], isolation_level='read_committed')
- return cls.rawGet(*ujob_id[:3])
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2012-05-01 23:29:49 +0000
+++ lib/lp/services/job/runner.py 2012-05-14 20:22:21 +0000
@@ -201,10 +201,9 @@
else:
cls = CeleryRunJob
db_class = self.getDBClass()
- ujob_id = (
- self.job_id, db_class.__module__, db_class.__name__,
- self.config.dbuser)
- return cls.apply_async((ujob_id,), queue=self.task_queue)
+ ujob_id = (self.job_id, db_class.__module__, db_class.__name__)
+ return cls.apply_async(
+ (ujob_id, self.config.dbuser), queue=self.task_queue)
def getDBClass(self):
return self.context.__class__
=== modified file 'lib/lp/services/job/tests/__init__.py'
--- lib/lp/services/job/tests/__init__.py 2012-04-23 20:41:31 +0000
+++ lib/lp/services/job/tests/__init__.py 2012-05-14 20:22:21 +0000
@@ -73,6 +73,12 @@
'oops', text_content(str(oops)))
+def drain_celery_queues():
+ from lazr.jobrunner.celerytask import drain_queues
+ from lp.services.job.celeryjob import CeleryRunJob
+ drain_queues(CeleryRunJob.app, CeleryRunJob.app.conf.CELERY_QUEUES.keys())
+
+
def pop_remote_notifications():
"""Pop the notifications from a celeryd worker."""
from lp.services.job.tests.celery_helpers import pop_notifications
=== added file 'lib/lp/services/job/tests/test_celeryjob.py'
--- lib/lp/services/job/tests/test_celeryjob.py 1970-01-01 00:00:00 +0000
+++ lib/lp/services/job/tests/test_celeryjob.py 2012-05-14 20:22:21 +0000
@@ -0,0 +1,55 @@
+# Copyright 2012 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+from lp.code.model.branchjob import BranchScanJob
+from lp.services.features.testing import FeatureFixture
+from lp.services.job.tests import (
+ drain_celery_queues,
+ monitor_celery,
+ )
+from lp.testing import TestCaseWithFactory
+from lp.testing.layers import ZopelessAppServerLayer
+
+
+class TestRunMissingJobs(TestCaseWithFactory):
+
+ layer = ZopelessAppServerLayer
+
+ def setUp(self):
+ super(TestRunMissingJobs, self).setUp()
+ from lp.services.job.celeryjob import (
+ find_missing_ready,
+ run_missing_ready,
+ )
+ self.find_missing_ready = find_missing_ready
+ self.run_missing_ready = run_missing_ready
+
+ def createMissingJob(self):
+ job = BranchScanJob.create(self.factory.makeBranch())
+ self.addCleanup(drain_celery_queues)
+ return job
+
+ def test_find_missing_ready(self):
+ """A job which is ready but not queued is "missing"."""
+ job = self.createMissingJob()
+ self.assertEqual([job], self.find_missing_ready(BranchScanJob))
+ job.runViaCelery()
+ self.assertEqual([], self.find_missing_ready(BranchScanJob))
+ drain_celery_queues()
+ self.assertEqual([job], self.find_missing_ready(BranchScanJob))
+
+ def test_run_missing_ready_not_enabled(self):
+ """run_missing_ready does nothing if the class isn't enabled."""
+ self.createMissingJob()
+ with monitor_celery() as responses:
+ self.run_missing_ready(_no_init=True)
+ self.assertEqual([], responses)
+
+ def test_run_missing_ready(self):
+ """run_missing_ready requests the job to run if not scheduled."""
+ self.createMissingJob()
+ self.useFixture(
+ FeatureFixture({'jobs.celery.enabled_classes': 'BranchScanJob'}))
+ with monitor_celery() as responses:
+ self.run_missing_ready(_no_init=True)
+ self.assertEqual(1, len(responses))
=== modified file 'lib/lp/services/job/tests/test_job.py'
--- lib/lp/services/job/tests/test_job.py 2012-04-26 19:42:04 +0000
+++ lib/lp/services/job/tests/test_job.py 2012-05-14 20:22:21 +0000
@@ -462,14 +462,15 @@
layer = ZopelessDatabaseLayer
- def test_rawGet_with_merge_proposal_job(self):
+ def test_get_with_merge_proposal_job(self):
+ """Getting a MergeProposalJob works and is efficient."""
comment = self.factory.makeCodeReviewComment()
job = CodeReviewCommentEmailJob.create(comment)
job_id = job.job_id
transaction.commit()
with StormStatementRecorder() as recorder:
- got_job = UniversalJobSource.rawGet(
- job_id, 'lp.code.model.branchmergeproposaljob',
- 'BranchMergeProposalJob')
+ got_job = UniversalJobSource.get(
+ (job_id, 'lp.code.model.branchmergeproposaljob',
+ 'BranchMergeProposalJob'))
self.assertThat(recorder, HasQueryCount(Equals(1)))
self.assertEqual(got_job, job)
=== modified file 'versions.cfg'
--- versions.cfg 2012-05-11 14:39:57 +0000
+++ versions.cfg 2012-05-14 20:22:21 +0000
@@ -42,7 +42,11 @@
lazr.config = 1.1.3
lazr.delegates = 1.2.0
lazr.enum = 1.1.3
+<<<<<<< TREE
lazr.jobrunner = 0.5
+=======
+lazr.jobrunner = 0.6
+>>>>>>> MERGE-SOURCE
lazr.lifecycle = 1.1
lazr.restful = 0.19.6
lazr.restfulclient = 0.12.2
Follow ups