launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #07210
[Merge] lp:~abentley/launchpad/celery-everywhere-5 into lp:launchpad
Aaron Bentley has proposed merging lp:~abentley/launchpad/celery-everywhere-5 into lp:launchpad with lp:~abentley/launchpad/celery-everywhere-4 as a prerequisite.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~abentley/launchpad/celery-everywhere-5/+merge/103161
= Summary =
Support running ApportJobs via Celery
== Pre-implementation notes ==
None
== LOC Rationale ==
Part of a resourced arc that will reduce LOC.
== Implementation details ==
Enhance block_on_job to make test development easier: report oopses and worker-side tracebacks for exceptions.
Enhance LaunchpadObjectFactory.makeBlob to support specifying files, to simplify code.
Update ApportJob, ApportJobDerived and ProcessApportBlobJob to support running via Celery.
== Tests ==
bin/test test_apportjob -t TestViaCelery
== Demo and Q/A ==
None
= Launchpad lint =
Checking for conflicts and issues in changed files.
Linting changed files:
lib/lp/soyuz/model/distroseriesdifferencejob.py
lib/lp/soyuz/model/distributionjob.py
lib/lp/bugs/model/apportjob.py
lib/lp/soyuz/tests/test_initializedistroseriesjob.py
lib/lp/testing/factory.py
lib/lp/soyuz/model/initializedistroseriesjob.py
lib/lp/soyuz/tests/test_distroseriesdifferencejob.py
lib/lp/bugs/tests/test_apportjob.py
lib/lp/services/job/model/job.py
lib/lp/services/job/tests/__init__.py
--
https://code.launchpad.net/~abentley/launchpad/celery-everywhere-5/+merge/103161
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/celery-everywhere-5 into lp:launchpad.
=== modified file 'lib/lp/bugs/model/apportjob.py'
--- lib/lp/bugs/model/apportjob.py 2011-12-30 06:14:56 +0000
+++ lib/lp/bugs/model/apportjob.py 2012-04-23 19:36:28 +0000
@@ -37,10 +37,14 @@
FileBugData,
FileBugDataParser,
)
+from lp.services.config import config
from lp.services.database.enumcol import EnumCol
from lp.services.database.lpstorm import IStore
from lp.services.database.stormbase import StormBase
-from lp.services.job.model.job import Job
+from lp.services.job.model.job import (
+ EnumeratedSubclass,
+ Job,
+ )
from lp.services.job.runner import BaseRunnableJob
from lp.services.librarian.interfaces import ILibraryFileAliasSet
from lp.services.temporaryblobstorage.model import TemporaryBlobStorage
@@ -110,9 +114,13 @@
'No occurrence of %s has key %s' % (cls.__name__, key))
return instance
+ def makeDerived(self):
+ return ApportJobDerived.makeSubclass(self)
+
class ApportJobDerived(BaseRunnableJob):
"""Intermediate class for deriving from ApportJob."""
+ __metaclass__ = EnumeratedSubclass
delegates(IApportJob)
classProvides(IApportJobSource)
@@ -124,7 +132,9 @@
"""See `IApportJob`."""
# If there's already a job for the blob, don't create a new one.
job = ApportJob(blob, cls.class_job_type, {})
- return cls(job)
+ derived = cls(job)
+ derived.celeryRunOnCommit()
+ return derived
@classmethod
def get(cls, job_id):
@@ -173,6 +183,8 @@
class_job_type = ApportJobType.PROCESS_BLOB
classProvides(IProcessApportBlobJobSource)
+ config = config.process_apport_blobs
+
@classmethod
def create(cls, blob):
"""See `IProcessApportBlobJobSource`."""
=== modified file 'lib/lp/bugs/tests/test_apportjob.py'
--- lib/lp/bugs/tests/test_apportjob.py 2012-02-21 22:46:28 +0000
+++ lib/lp/bugs/tests/test_apportjob.py 2012-04-23 19:36:28 +0000
@@ -27,7 +27,9 @@
FileBugDataParser,
)
from lp.services.config import config
+from lp.services.features.testing import FeatureFixture
from lp.services.job.interfaces.job import JobStatus
+from lp.services.job.tests import block_on_job
from lp.services.librarian.interfaces import ILibraryFileAliasSet
from lp.services.scripts.tests import run_script
from lp.services.temporaryblobstorage.interfaces import (
@@ -39,6 +41,7 @@
TestCaseWithFactory,
)
from lp.testing.layers import (
+ CeleryJobLayer,
LaunchpadFunctionalLayer,
LaunchpadZopelessLayer,
)
@@ -92,12 +95,8 @@
super(ProcessApportBlobJobTestCase, self).setUp()
# Create a BLOB using existing testing data.
- testfiles = os.path.join(config.root, 'lib/lp/bugs/tests/testfiles')
- blob_file = open(
- os.path.join(testfiles, 'extra_filebug_data.msg'))
- blob_data = blob_file.read()
- self.blob = self.factory.makeBlob(blob_data)
+ self.blob = self.factory.makeBlob(blob_file='extra_filebug_data.msg')
transaction.commit() # We need the blob available from the Librarian.
def _assertFileBugDataMatchesDict(self, filebug_data, data_dict):
@@ -310,6 +309,28 @@
self._assertFileBugDataMatchesDict(filebug_data, processed_data)
+class TestViaCelery(TestCaseWithFactory):
+
+ layer = CeleryJobLayer
+
+ def test_ProcessApportBlobJob(self):
+ # ProcessApportBlobJob runs under Celery.
+ blob = self.factory.makeBlob(blob_file='extra_filebug_data.msg')
+ self.useFixture(FeatureFixture(
+ {'jobs.celery.enabled_classes': 'ProcessApportBlobJob'}))
+ with block_on_job(self):
+ job = getUtility(IProcessApportBlobJobSource).create(blob)
+ transaction.commit()
+
+ # Once the job has been run, its metadata will contain a dict
+ # called processed_data, which will contain the data parsed from
+ # the BLOB.
+ processed_data = job.metadata.get('processed_data', None)
+ self.assertIsNot(
+ None, processed_data,
+ "processed_data should not be None after the job has run.")
+
+
class TestTemporaryBlobStorageAddView(TestCaseWithFactory):
"""Test case for the TemporaryBlobStorageAddView."""
=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py 2012-04-23 19:36:28 +0000
+++ lib/lp/services/job/model/job.py 2012-04-23 19:36:28 +0000
@@ -269,6 +269,7 @@
def getUserAndBaseJob(cls, job_id):
"""Return the derived branch job associated with the job id."""
# Avoid circular imports.
+ from lp.bugs.model.apportjob import ApportJob
from lp.code.model.branchjob import (
BranchJob,
)
@@ -279,7 +280,8 @@
dbconfig.override(
dbuser=config.launchpad.dbuser, isolation_level='read_committed')
- for baseclass in [BranchJob, BranchMergeProposalJob, DistributionJob]:
+ for baseclass in [
+ ApportJob, BranchJob, BranchMergeProposalJob, DistributionJob]:
derived, base_class, store = cls._getDerived(job_id, baseclass)
if derived is not None:
cls.clearStore(store)
=== modified file 'lib/lp/services/job/tests/__init__.py'
--- lib/lp/services/job/tests/__init__.py 2012-04-13 18:31:35 +0000
+++ lib/lp/services/job/tests/__init__.py 2012-04-23 19:36:28 +0000
@@ -13,6 +13,9 @@
from contextlib import contextmanager
+from testtools.content import text_content
+
+from lp.testing.fixture import CaptureOops
from lp.services.job.runner import BaseRunnableJob
@@ -51,10 +54,21 @@
@contextmanager
-def block_on_job():
- with monitor_celery() as responses:
- yield
- responses[-1].wait(30)
+def block_on_job(test_case=None):
+ with CaptureOops() as capture:
+ with monitor_celery() as responses:
+ yield
+ try:
+ responses[-1].wait(30)
+ finally:
+ if test_case is not None and responses[-1].traceback is not None:
+ test_case.addDetail(
+ 'Worker traceback', text_content(responses[-1].traceback))
+ if test_case is not None:
+ capture.sync()
+ for oops in capture.oopses:
+ test_case.addDetail(
+ 'oops', text_content(str(oops)))
def pop_remote_notifications():
=== modified file 'lib/lp/testing/factory.py'
--- lib/lp/testing/factory.py 2012-04-19 16:46:32 +0000
+++ lib/lp/testing/factory.py 2012-04-23 19:36:28 +0000
@@ -4232,8 +4232,13 @@
self.getUniqueString(), self.getUniqueString())
return getUtility(ISSHKeySet).new(person, public_key)
- def makeBlob(self, blob=None, expires=None):
+ def makeBlob(self, blob=None, expires=None, blob_file=None):
"""Create a new TemporaryFileStorage BLOB."""
+ if blob_file is not None:
+ blob_path = os.path.join(
+ config.root, 'lib/lp/bugs/tests/testfiles', blob_file)
+ blob_file = open(blob_path)
+ blob = blob_file.read()
if blob is None:
blob = self.getUniqueString()
new_uuid = getUtility(ITemporaryStorageManager).new(blob, expires)
Follow ups