← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:extract-job-state-flush into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:extract-job-state-flush into launchpad:master.

Commit message:
Flush store before extracting job state

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/449126

Since converting `Job` to Storm in c69d6205ab, the store is no longer automatically flushed when the `id` column is fetched, so we have to be a little more careful to ensure that it's flushed at appropriate times.  Some Celery tasks are currently showing up with their job IDs as None, indicating that this isn't always being done properly.  This appears to be because the before-commit hook that extracts the job state runs before the Storm hook that flushes the store at the start of a commit, so if the statement that created the `Job` row hadn't yet been flushed for some other reason then we might not know the job ID yet.

The test suite evaded this problem because Celery-based job tests typically use `FeatureFixture`, which causes `BaseRunnableJob.celeryRunOnCommit` to make another database query after creating the job, thus causing an implicit flush.

Flushing the store before extracting the job state should be a complete fix for this class of problem.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:extract-job-state-flush into launchpad:master.
diff --git a/lib/lp/services/job/runner.py b/lib/lp/services/job/runner.py
index 9b8f1cf..f66d5eb 100644
--- a/lib/lp/services/job/runner.py
+++ b/lib/lp/services/job/runner.py
@@ -41,6 +41,7 @@ from zope.security.proxy import removeSecurityProxy
 
 from lp.services import scripts
 from lp.services.config import config, dbconfig
+from lp.services.database.interfaces import IStore
 from lp.services.database.policy import DatabaseBlockedPolicy
 from lp.services.features import getFeatureFlag
 from lp.services.job.interfaces.job import IJob, IRunnableJob
@@ -266,6 +267,11 @@ class BaseRunnableJob(BaseRunnableJobSource):
 
     def extractJobState(self):
         """Hook function to call before starting a commit."""
+        # Before-commit hooks are called before the hook in
+        # storm.zope.zstorm.StoreDataManager.tpc_begin that flushes the
+        # store, so we have to flush the store here because we might
+        # otherwise not know the job ID yet.
+        IStore(self.job).flush()
         self.job_state = JobState(self)
 
     def celeryCommitHook(self, succeeded):
diff --git a/lib/lp/services/job/tests/test_celery.py b/lib/lp/services/job/tests/test_celery.py
index 70bf2b6..970f9f5 100644
--- a/lib/lp/services/job/tests/test_celery.py
+++ b/lib/lp/services/job/tests/test_celery.py
@@ -26,7 +26,7 @@ from lp.services.database.interfaces import IStore
 from lp.services.features.testing import FeatureFixture
 from lp.services.job.interfaces.job import IJob, IRunnableJob, JobStatus
 from lp.services.job.model.job import Job
-from lp.services.job.runner import BaseRunnableJob
+from lp.services.job.runner import BaseRunnableJob, celery_enabled
 from lp.services.job.tests import block_on_job, monitor_celery
 from lp.testing import TestCaseWithFactory
 from lp.testing.layers import CeleryJobLayer
@@ -45,7 +45,6 @@ class TestJob(BaseRunnableJob):
             self.job = store.find(Job, id=job_id)[0]
         else:
             self.job = Job(max_retries=2, scheduled_start=scheduled_start)
-            IStore(Job).flush()
 
     def run(self):
         pass
@@ -100,16 +99,25 @@ class TestJobsViaCelery(TestCaseWithFactory):
 
     layer = CeleryJobLayer
 
-    def test_TestJob(self):
-        # TestJob can be run via Celery.
+    def enableCeleryClass(self, job_class_name):
+        """Enable running jobs with a given class name via Celery."""
         self.useFixture(
-            FeatureFixture({"jobs.celery.enabled_classes": "TestJob"})
+            FeatureFixture({"jobs.celery.enabled_classes": job_class_name})
         )
+        # Prime the feature flag cache so that
+        # BaseRunnableJob.celeryRunOnCommit doesn't make a database query.
+        # This lets us more carefully test the flush behaviour in
+        # BaseRunnableJob.extractJobState.
+        self.assertTrue(celery_enabled(job_class_name))
+
+    def test_TestJob(self):
+        # TestJob can be run via Celery.
+        self.enableCeleryClass("TestJob")
         with block_on_job(self):
             job = TestJob()
             job.celeryRunOnCommit()
-            job_id = job.job_id
             transaction.commit()
+            job_id = job.job_id
         store = IStore(Job)
         dbjob = store.find(Job, id=job_id)[0]
         self.assertEqual(JobStatus.COMPLETED, dbjob.status)
@@ -119,9 +127,7 @@ class TestJobsViaCelery(TestCaseWithFactory):
         # in 10 seconds, and one at any time.  Wait up to a minute and
         # ensure that the correct three have completed, and that they
         # completed in the expected order.
-        self.useFixture(
-            FeatureFixture({"jobs.celery.enabled_classes": "TestJob"})
-        )
+        self.enableCeleryClass("TestJob")
         now = datetime.now(timezone.utc)
         job_past = TestJob(scheduled_start=now - timedelta(seconds=60))
         job_past.celeryRunOnCommit()
@@ -158,11 +164,7 @@ class TestJobsViaCelery(TestCaseWithFactory):
     def test_jobs_with_retry_exceptions_are_queued_again(self):
         # A job that raises a retry error is automatically queued
         # and executed again.
-        self.useFixture(
-            FeatureFixture(
-                {"jobs.celery.enabled_classes": "TestJobWithRetryError"}
-            )
-        )
+        self.enableCeleryClass("TestJobWithRetryError")
 
         # Set scheduled_start on the job to ensure that retry delays
         # override it.
@@ -214,14 +216,12 @@ class TestJobsViaCelery(TestCaseWithFactory):
     def test_without_rabbitmq(self):
         # If no RabbitMQ broker is configured, the job is not run via Celery.
         self.pushConfig("rabbitmq", broker_urls="none")
-        self.useFixture(
-            FeatureFixture({"jobs.celery.enabled_classes": "TestJob"})
-        )
+        self.enableCeleryClass("TestJob")
         with monitor_celery() as responses:
             job = TestJob()
             job.celeryRunOnCommit()
-            job_id = job.job_id
             transaction.commit()
+            job_id = job.job_id
         self.assertEqual([], responses)
         store = IStore(Job)
         dbjob = store.find(Job, id=job_id)[0]