← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:fix-after-commit-hooks into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:fix-after-commit-hooks into launchpad:master with ~cjwatson/launchpad:fix-translation-sharing-job-derived as a prerequisite.

Commit message:
Don't make database queries in after-commit hooks

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/382818

BaseRunnableJob's use of after-commit hooks was always a bit dodgy, because the hook itself uses the database.  The semantics of this are unclear at best and the transaction package doesn't document anything in particular in this case; it's already bitten us once, requiring the insertion of an extra transaction.abort() to stop the transaction begun in runViaCelery from leaking into other code.  In fact, that should probably have been a clue: the very fact that the after-commit hook opened another transaction suggests the possibility of race conditions.

With transaction 3.0.0, the situation gets worse: trying to execute any query in an after-commit hook raises an exception due to trying to join a committed transaction.  It's possible this is in fact a bug, but this approach has enough strikes against it by this point that it seems wiser to avoid it altogether.  Instead, extract the relatively small amount of state needed by runViaCelery in a before-commit hook, and block database access in the corresponding after-commit hook.

Similarly, TestProcessAccepted.test_commits_after_each_item tried to execute a database query in the afterCompletion hook of a synchronizer.  An analogous fix works there as well: move the database work to the beforeCompletion hook.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:fix-after-commit-hooks into launchpad:master.
diff --git a/lib/lp/archivepublisher/tests/test_processaccepted.py b/lib/lp/archivepublisher/tests/test_processaccepted.py
index 814bcf1..90210ce 100644
--- a/lib/lp/archivepublisher/tests/test_processaccepted.py
+++ b/lib/lp/archivepublisher/tests/test_processaccepted.py
@@ -144,21 +144,23 @@ class TestProcessAccepted(TestCaseWithFactory):
             commit_count = 0
 
             def beforeCompletion(inner_self, txn):
-                pass
-
-            def afterCompletion(inner_self, txn):
-                if txn.status != 'Committed':
-                    return
-                inner_self.commit_count += 1
-                done_count = len([
+                self.done_count = len([
                     upload for upload in uploads
                     if upload.package_upload.status ==
                         PackageUploadStatus.DONE])
-                # We actually commit twice for each upload: once for the
-                # queue item itself, and again to close its bugs.
-                self.assertIn(
-                    min(len(uploads) * 2, inner_self.commit_count),
-                    (done_count * 2, (done_count * 2) - 1))
+
+            def afterCompletion(inner_self, txn):
+                try:
+                    if txn.status != 'Committed':
+                        return
+                    inner_self.commit_count += 1
+                    # We actually commit twice for each upload: once for the
+                    # queue item itself, and again to close its bugs.
+                    self.assertIn(
+                        min(len(uploads) * 2, inner_self.commit_count),
+                        (self.done_count * 2, (self.done_count * 2) - 1))
+                finally:
+                    self.done_count = None
 
         script = self.getScript([])
         switch_dbuser(self.dbuser)
diff --git a/lib/lp/services/job/runner.py b/lib/lp/services/job/runner.py
index cdcf604..9fe5101 100644
--- a/lib/lp/services/job/runner.py
+++ b/lib/lp/services/job/runner.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2019 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Facilities for running Jobs."""
@@ -66,6 +66,7 @@ from lp.services.config import (
     config,
     dbconfig,
     )
+from lp.services.database.policy import DatabaseBlockedPolicy
 from lp.services.features import getFeatureFlag
 from lp.services.job.interfaces.job import (
     IJob,
@@ -98,6 +99,20 @@ class BaseRunnableJobSource:
         yield
 
 
+class JobState:
+    """Extract some state from a job.
+
+    This is just enough to allow `BaseRunnableJob.runViaCelery` to run
+    without database access.
+    """
+
+    def __init__(self, runnable_job):
+        self.job_id = runnable_job.job.id
+        self.task_id = runnable_job.taskId()
+        self.scheduled_start = runnable_job.scheduled_start
+        self.lease_expires = runnable_job.lease_expires
+
+
 @delegate_to(IJob, context='job')
 class BaseRunnableJob(BaseRunnableJobSource):
     """Base class for jobs to be run via JobRunner.
@@ -245,39 +260,41 @@ class BaseRunnableJob(BaseRunnableJobSource):
         else:
             task = celery_run_job
         db_class = self.getDBClass()
-        ujob_id = (self.job_id, db_class.__module__, db_class.__name__)
-        eta = self.job.scheduled_start
+        ujob_id = (
+            self.job_state.job_id, db_class.__module__, db_class.__name__)
+        eta = self.job_state.scheduled_start
         # Don't schedule the job while its lease is still held, or
         # celery will skip it.
-        if (self.job.lease_expires is not None
-                and (eta is None or eta < self.job.lease_expires)):
-            eta = self.job.lease_expires
+        if (self.job_state.lease_expires is not None
+                and (eta is None or eta < self.job_state.lease_expires)):
+            eta = self.job_state.lease_expires
         return task.apply_async(
             (ujob_id, self.config.dbuser), queue=self.task_queue, eta=eta,
             soft_time_limit=self.soft_time_limit.total_seconds(),
-            task_id=self.taskId())
+            task_id=self.job_state.task_id)
 
     def getDBClass(self):
         return self.context.__class__
 
+    def extractJobState(self):
+        """Hook function to call before starting a commit."""
+        self.job_state = JobState(self)
+
     def celeryCommitHook(self, succeeded):
-        """Hook function to call when a commit completes."""
-        if succeeded:
-            ignore_result = bool(BaseRunnableJob.celery_responses is None)
-            response = self.runViaCelery(ignore_result)
-            if not ignore_result:
-                BaseRunnableJob.celery_responses.append(response)
-            # transaction >= 1.6.0 removes data managers from the
-            # transaction before calling after-commit hooks; this means that
-            # the transaction begun in runViaCelery to look up details of
-            # the job doesn't get committed or rolled back, and so
-            # subsequent SQL statements executed by the caller confusingly
-            # see a database snapshot from before the Celery job was run,
-            # even if they use the block_on_job test helper.  Since
-            # runViaCelery never issues any data-modifying statements
-            # itself, the least confusing thing to do here is to just roll
-            # back its transaction.
-            transaction.abort()
+        """Hook function to call when a commit completes.
+
+        extractJobState must have been run first.
+        """
+        try:
+            with DatabaseBlockedPolicy():
+                if succeeded:
+                    ignore_result = bool(
+                        BaseRunnableJob.celery_responses is None)
+                    response = self.runViaCelery(ignore_result)
+                    if not ignore_result:
+                        BaseRunnableJob.celery_responses.append(response)
+        finally:
+            self.job_state = None
 
     def celeryRunOnCommit(self):
         """Configure transaction so that commit runs this job via Celery."""
@@ -285,6 +302,7 @@ class BaseRunnableJob(BaseRunnableJobSource):
                 not celery_enabled(self.__class__.__name__)):
             return
         current = transaction.get()
+        current.addBeforeCommitHook(self.extractJobState)
         current.addAfterCommitHook(self.celeryCommitHook)
 
     def queue(self, manage_transaction=False, abort_transaction=False):
diff --git a/lib/lp/services/job/tests/test_celeryjob.py b/lib/lp/services/job/tests/test_celeryjob.py
index d1991c1..85f6252 100644
--- a/lib/lp/services/job/tests/test_celeryjob.py
+++ b/lib/lp/services/job/tests/test_celeryjob.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2019 Canonical Ltd.  This software is licensed under the
+# Copyright 2012-2020 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 from cStringIO import StringIO
@@ -82,6 +82,7 @@ class TestRunMissingJobs(TestCaseWithFactory):
             'job_info', 'job.id: %d, job.job_id: %d' % (job.id, job.job_id))
         find_missing_ready_obj = self.getFMR(BranchScanJob, 0)
         self.assertEqual([job], find_missing_ready_obj.find_missing_ready())
+        job.extractJobState()
         job.runViaCelery()
         find_missing_ready_obj = self.getFMR(BranchScanJob, 1)
         missing_ready = find_missing_ready_obj.find_missing_ready()