← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~abentley/launchpad/celery-everywhere-4 into lp:launchpad

 

Aaron Bentley has proposed merging lp:~abentley/launchpad/celery-everywhere-4 into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~abentley/launchpad/celery-everywhere-4/+merge/102918

= Summary =
Support DistributionJob via Celery

== Pre-implementation notes ==
None

== Implementation details ==
Handle missing Jobs more cleanly
Refactor retrieving jobs and changing db user to avoid db permission errors
Add DistributionJob to the list of base classes, add makeDerived method.
Make DistributionJobDerived an EnumeratedSubclass
Add config to InitializeDistroSeriesJob and dispatch via Celery
Add config to DistributionDifferenceJob and dispatch via Celery
Extract create_child from InitializeDistroSeriesJobTestsWithPackages to permit reuse in non-subclasses.


== Tests ==
bin/test --layer=CeleryJobLayer -m 'test_distroseriesdifferencejob|test_initializedistroseriesjob'

== Demo and Q/A ==
None


= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/soyuz/model/distroseriesdifferencejob.py
  lib/lp/soyuz/model/distributionjob.py
  lib/lp/soyuz/tests/test_initializedistroseriesjob.py
  lib/lp/soyuz/model/initializedistroseriesjob.py
  lib/lp/soyuz/tests/test_distroseriesdifferencejob.py
  lib/lp/services/job/model/job.py
-- 
https://code.launchpad.net/~abentley/launchpad/celery-everywhere-4/+merge/102918
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/celery-everywhere-4 into lp:launchpad.
=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py	2012-04-13 23:56:24 +0000
+++ lib/lp/services/job/model/job.py	2012-04-20 19:12:21 +0000
@@ -38,7 +38,7 @@
 from zope.component import getUtility
 from zope.interface import implements
 
-from lp.services.config import dbconfig
+from lp.services.config import config, dbconfig
 from lp.services.database import bulk
 from lp.services.database.constants import UTC_NOW
 from lp.services.database.datetimecol import UtcDateTimeCol
@@ -258,7 +258,15 @@
     needs_init = True
 
     @staticmethod
-    def getDerived(job_id):
+    def _getDerived(job_id, base_class):
+        store = IStore(base_class)
+        base_job = store.find(base_class, base_class.job == job_id).one()
+        if base_job is None:
+            return None, None, None
+        return base_job.makeDerived(), base_job.__class__, store
+
+    @classmethod
+    def getUserAndBaseJob(cls, job_id):
         """Return the derived branch job associated with the job id."""
         # Avoid circular imports.
         from lp.code.model.branchjob import (
@@ -267,15 +275,16 @@
         from lp.code.model.branchmergeproposaljob import (
             BranchMergeProposalJob,
             )
-        store = IStore(Job)
-        for cls in [BranchJob, BranchMergeProposalJob]:
-            base_job = store.find(cls, cls.job == job_id).one()
-            if base_job is not None:
-                break
-        if base_job is None:
-            raise ValueError('No BranchJob with job=%s.' % job_id)
+        from lp.soyuz.model.distributionjob import DistributionJob
+        dbconfig.override(
+            dbuser=config.launchpad.dbuser, isolation_level='read_committed')
 
-        return base_job.makeDerived(), store
+        for baseclass in [BranchJob, BranchMergeProposalJob, DistributionJob]:
+            derived, base_class, store = cls._getDerived(job_id, baseclass)
+            if derived is not None:
+                cls.clearStore(store)
+                return derived.config.dbuser, base_class
+        raise ValueError('No Job with job=%s.' % job_id)
 
     @staticmethod
     def clearStore(store):
@@ -284,19 +293,12 @@
         store.close()
 
     @classmethod
-    def switchDBUser(cls, job_id):
-        """Switch to the DB user associated with this Job ID."""
-        cls.clearStore(IStore(Job))
-        derived, store = cls.getDerived(job_id)
-        dbconfig.override(
-            dbuser=derived.config.dbuser, isolation_level='read_committed')
-        cls.clearStore(store)
-
-    @classmethod
     def get(cls, job_id):
         transaction.abort()
         if cls.needs_init:
             scripts.execute_zcml_for_scripts(use_web_security=False)
             cls.needs_init = False
-        cls.switchDBUser(job_id)
-        return cls.getDerived(job_id)[0]
+        cls.clearStore(IStore(Job))
+        dbuser, base_class = cls.getUserAndBaseJob(job_id)
+        dbconfig.override(dbuser=dbuser, isolation_level='read_committed')
+        return cls._getDerived(job_id, base_class)[0]

=== modified file 'lib/lp/soyuz/model/distributionjob.py'
--- lib/lp/soyuz/model/distributionjob.py	2011-12-30 06:14:56 +0000
+++ lib/lp/soyuz/model/distributionjob.py	2012-04-20 19:12:21 +0000
@@ -23,7 +23,10 @@
 from lp.services.database.enumcol import EnumCol
 from lp.services.database.lpstorm import IStore
 from lp.services.database.stormbase import StormBase
-from lp.services.job.model.job import Job
+from lp.services.job.model.job import (
+    EnumeratedSubclass,
+    Job,
+    )
 from lp.services.job.runner import BaseRunnableJob
 from lp.soyuz.interfaces.distributionjob import (
     DistributionJobType,
@@ -61,9 +64,15 @@
         self.job_type = job_type
         self.metadata = metadata
 
+    def makeDerived(self):
+        return DistributionJobDerived.makeSubclass(self)
+
 
 class DistributionJobDerived(BaseRunnableJob):
     """Abstract class for deriving from DistributionJob."""
+
+    __metaclass__ = EnumeratedSubclass
+
     delegates(IDistributionJob)
 
     def __init__(self, job):

=== modified file 'lib/lp/soyuz/model/distroseriesdifferencejob.py'
--- lib/lp/soyuz/model/distroseriesdifferencejob.py	2012-02-24 03:47:44 +0000
+++ lib/lp/soyuz/model/distroseriesdifferencejob.py	2012-04-20 19:12:21 +0000
@@ -21,6 +21,7 @@
 from lp.registry.model.distroseries import DistroSeries
 from lp.registry.model.distroseriesdifference import DistroSeriesDifference
 from lp.registry.model.sourcepackagename import SourcePackageName
+from lp.services.config import config
 from lp.services.database import bulk
 from lp.services.database.lpstorm import (
     IMasterStore,
@@ -65,12 +66,14 @@
         `derived_series`.  The difference is between the versions of
         `sourcepackagename` in `parent_series` and `derived_series`.
     """
-    job = DistributionJob(
+    db_job = DistributionJob(
         distribution=derived_series.distribution, distroseries=derived_series,
         job_type=DistributionJobType.DISTROSERIESDIFFERENCE,
         metadata=make_metadata(sourcepackagename.id, parent_series.id))
-    IMasterStore(DistributionJob).add(job)
-    return DistroSeriesDifferenceJob(job)
+    IMasterStore(DistributionJob).add(db_job)
+    job = DistroSeriesDifferenceJob(db_job)
+    job.celeryRunOnCommit()
+    return job
 
 
 def create_multiple_jobs(derived_series, parent_series):
@@ -165,6 +168,8 @@
 
     class_job_type = DistributionJobType.DISTROSERIESDIFFERENCE
 
+    config = config.distroseriesdifferencejob
+
     @classmethod
     def createForPackagePublication(cls, derived_series, sourcepackagename,
                                     pocket):

=== modified file 'lib/lp/soyuz/model/initializedistroseriesjob.py'
--- lib/lp/soyuz/model/initializedistroseriesjob.py	2012-01-01 02:58:52 +0000
+++ lib/lp/soyuz/model/initializedistroseriesjob.py	2012-04-20 19:12:21 +0000
@@ -13,6 +13,7 @@
     )
 
 from lp.registry.model.distroseries import DistroSeries
+from lp.services.config import config
 from lp.services.database.lpstorm import (
     IMasterStore,
     IStore,
@@ -46,6 +47,8 @@
 
     user_error_types = (InitializationError,)
 
+    config = config.initializedistroseries
+
     @classmethod
     def create(cls, child, parents, arches=(), archindep_archtag=None,
                packagesets=(), rebuild=False, overlays=(),
@@ -106,7 +109,9 @@
         distribution_job = DistributionJob(
             child.distribution, child, cls.class_job_type, metadata)
         store.add(distribution_job)
-        return cls(distribution_job)
+        derived_job = cls(distribution_job)
+        derived_job.celeryRunOnCommit()
+        return derived_job
 
     @classmethod
     def get(cls, distroseries):

=== modified file 'lib/lp/soyuz/tests/test_distroseriesdifferencejob.py'
--- lib/lp/soyuz/tests/test_distroseriesdifferencejob.py	2012-01-24 17:27:44 +0000
+++ lib/lp/soyuz/tests/test_distroseriesdifferencejob.py	2012-04-20 19:12:21 +0000
@@ -22,6 +22,9 @@
 from lp.services.database.lpstorm import IMasterStore
 from lp.services.features.testing import FeatureFixture
 from lp.services.job.interfaces.job import JobStatus
+from lp.services.job.tests import (
+    block_on_job,
+    )
 from lp.services.scripts.tests import run_script
 from lp.soyuz.enums import (
     ArchivePurpose,
@@ -44,6 +47,7 @@
 from lp.testing import TestCaseWithFactory
 from lp.testing.dbuser import switch_dbuser
 from lp.testing.layers import (
+    CeleryJobLayer,
     LaunchpadZopelessLayer,
     ZopelessDatabaseLayer,
     )
@@ -977,3 +981,20 @@
 
         # The test is that we get here without exceptions.
         pass
+
+
+class TestViaCelery(TestCaseWithFactory):
+
+    layer = CeleryJobLayer
+
+    def test_DerivedDistroseriesDifferenceJob(self):
+        self.useFixture(FeatureFixture({
+            FEATURE_FLAG_ENABLE_MODULE: u'on',
+            'jobs.celery.enabled_classes': 'DistroSeriesDifferenceJob',
+            }))
+        dsp = self.factory.makeDistroSeriesParent()
+        package = self.factory.makeSourcePackageName()
+        with block_on_job():
+            job = create_job(dsp.derived_series, package, dsp.parent_series)
+            transaction.commit()
+        self.assertEqual(JobStatus.COMPLETED, job.status)

=== modified file 'lib/lp/soyuz/tests/test_initializedistroseriesjob.py'
--- lib/lp/soyuz/tests/test_initializedistroseriesjob.py	2012-01-20 15:42:44 +0000
+++ lib/lp/soyuz/tests/test_initializedistroseriesjob.py	2012-04-20 19:12:21 +0000
@@ -10,6 +10,8 @@
 from lp.buildmaster.enums import BuildStatus
 from lp.registry.interfaces.distroseriesparent import IDistroSeriesParentSet
 from lp.registry.interfaces.pocket import PackagePublishingPocket
+from lp.services.features.testing import FeatureFixture
+from lp.services.job.tests import block_on_job
 from lp.services.scripts.tests import run_script
 from lp.soyuz.enums import SourcePackageFormat
 from lp.soyuz.interfaces.distributionjob import (
@@ -26,9 +28,13 @@
 from lp.soyuz.model.initializedistroseriesjob import InitializeDistroSeriesJob
 from lp.soyuz.scripts.initialize_distroseries import InitializationError
 from lp.soyuz.tests.test_publishing import SoyuzTestPublisher
-from lp.testing import TestCaseWithFactory
+from lp.testing import (
+    celebrity_logged_in,
+    TestCaseWithFactory,
+    )
 from lp.testing.dbuser import switch_dbuser
 from lp.testing.layers import (
+    CeleryJobLayer,
     DatabaseFunctionalLayer,
     LaunchpadZopelessLayer,
     )
@@ -221,36 +227,17 @@
         self.assertEqual(message, removeSecurityProxy(job).error_description)
 
 
-class InitializeDistroSeriesJobTestsWithPackages(TestCaseWithFactory):
-    """Test case for InitializeDistroSeriesJob."""
-
-    layer = LaunchpadZopelessLayer
-
-    @property
-    def job_source(self):
-        return getUtility(IInitializeDistroSeriesJobSource)
-
-    def setupDas(self, parent, proc, arch_tag):
-        pf = getUtility(IProcessorFamilySet).getByName(proc)
-        parent_das = self.factory.makeDistroArchSeries(
-            distroseries=parent, processorfamily=pf,
-            architecturetag=arch_tag)
-        lf = self.factory.makeLibraryFileAlias()
-        transaction.commit()
-        parent_das.addOrUpdateChroot(lf)
-        parent_das.supports_virtualized = True
-        return parent_das
-
-    def _create_child(self):
-        pf = self.factory.makeProcessorFamily()
-        pf.addProcessor('x86', '', '')
-        parent = self.factory.makeDistroSeries()
-        parent_das = self.factory.makeDistroArchSeries(
-            distroseries=parent, processorfamily=pf)
-        lf = self.factory.makeLibraryFileAlias()
-        # Since the LFA needs to be in the librarian, commit.
-        transaction.commit()
-        parent_das.addOrUpdateChroot(lf)
+def create_child(factory):
+    pf = factory.makeProcessorFamily()
+    pf.addProcessor('x86', '', '')
+    parent = factory.makeDistroSeries()
+    parent_das = factory.makeDistroArchSeries(
+        distroseries=parent, processorfamily=pf)
+    lf = factory.makeLibraryFileAlias()
+    # Since the LFA needs to be in the librarian, commit.
+    transaction.commit()
+    parent_das.addOrUpdateChroot(lf)
+    with celebrity_logged_in('admin'):
         parent_das.supports_virtualized = True
         parent.nominatedarchindep = parent_das
         publisher = SoyuzTestPublisher()
@@ -264,18 +251,39 @@
         test1 = getUtility(IPackagesetSet).new(
             u'test1', u'test 1 packageset', parent.owner,
             distroseries=parent)
-        self.test1_packageset_id = str(test1.id)
+        test1_packageset_id = str(test1.id)
         test1.addSources('udev')
-        parent.updatePackageCount()
-        child = self.factory.makeDistroSeries()
-        getUtility(ISourcePackageFormatSelectionSet).add(
-            child, SourcePackageFormat.FORMAT_1_0)
-        # Make sure everything hits the database, switching db users aborts.
+    parent.updatePackageCount()
+    child = factory.makeDistroSeries()
+    getUtility(ISourcePackageFormatSelectionSet).add(
+        child, SourcePackageFormat.FORMAT_1_0)
+    # Make sure everything hits the database, switching db users aborts.
+    transaction.commit()
+    return parent, child, test1_packageset_id
+
+
+class InitializeDistroSeriesJobTestsWithPackages(TestCaseWithFactory):
+    """Test case for InitializeDistroSeriesJob."""
+
+    layer = LaunchpadZopelessLayer
+
+    @property
+    def job_source(self):
+        return getUtility(IInitializeDistroSeriesJobSource)
+
+    def setupDas(self, parent, proc, arch_tag):
+        pf = getUtility(IProcessorFamilySet).getByName(proc)
+        parent_das = self.factory.makeDistroArchSeries(
+            distroseries=parent, processorfamily=pf,
+            architecturetag=arch_tag)
+        lf = self.factory.makeLibraryFileAlias()
         transaction.commit()
-        return parent, child
+        parent_das.addOrUpdateChroot(lf)
+        parent_das.supports_virtualized = True
+        return parent_das
 
     def test_job(self):
-        parent, child = self._create_child()
+        parent, child, test1_packageset_id = create_child(self.factory)
         job = self.job_source.create(child, [parent.id])
         switch_dbuser('initializedistroseries')
 
@@ -285,10 +293,10 @@
         self.assertEqual(parent.binarycount, child.binarycount)
 
     def test_job_with_arguments(self):
-        parent, child = self._create_child()
+        parent, child, test1_packageset_id = create_child(self.factory)
         arch = parent.nominatedarchindep.architecturetag
         job = self.job_source.create(
-            child, [parent.id], packagesets=(self.test1_packageset_id,),
+            child, [parent.id], packagesets=(test1_packageset_id,),
             arches=(arch,), rebuild=True)
         switch_dbuser('initializedistroseries')
 
@@ -302,7 +310,7 @@
         self.assertEqual(builds.count(), 1)
 
     def test_job_with_none_arguments(self):
-        parent, child = self._create_child()
+        parent, child, test1_packageset_id = create_child(self.factory)
         job = self.job_source.create(
             child, [parent.id], archindep_archtag=None, packagesets=None,
             arches=None, overlays=None, overlay_pockets=None,
@@ -314,7 +322,7 @@
         self.assertEqual(parent.sourcecount, child.sourcecount)
 
     def test_job_with_none_archindep_archtag_argument(self):
-        parent, child = self._create_child()
+        parent, child, test1_packageset_id = create_child(self.factory)
         job = self.job_source.create(
             child, [parent.id], archindep_archtag=None, packagesets=None,
             arches=None, overlays=None, overlay_pockets=None,
@@ -327,7 +335,7 @@
             child.nominatedarchindep.architecturetag)
 
     def test_job_with_archindep_archtag_argument(self):
-        parent, child = self._create_child()
+        parent, child, test1_packageset_id = create_child(self.factory)
         self.setupDas(parent, 'amd64', 'amd64')
         self.setupDas(parent, 'powerpc', 'hppa')
         job = self.job_source.create(
@@ -344,3 +352,23 @@
     def test_cronscript(self):
         run_script(
             'cronscripts/run_jobs.py', ['-v', 'initializedistroseries'])
+
+
+class TestViaCelery(TestCaseWithFactory):
+
+    layer = CeleryJobLayer
+
+    def test_job(self):
+        """Job runs successfully via Celery."""
+        fixture = FeatureFixture({
+            'jobs.celery.enabled_classes': 'InitializeDistroSeriesJob',
+        })
+        self.useFixture(fixture)
+        parent, child, test1 = create_child(self.factory)
+        job_source = getUtility(IInitializeDistroSeriesJobSource)
+        with block_on_job():
+            job_source.create(child, [parent.id])
+            transaction.commit()
+        child.updatePackageCount()
+        self.assertEqual(parent.sourcecount, child.sourcecount)
+        self.assertEqual(parent.binarycount, child.binarycount)


Follow ups