← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:job-failure-txn-management into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:job-failure-txn-management into launchpad:master.

Commit message:
Fix transaction management error when failing a job

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/425613

Many of Launchpad's jobs are structured using a separate table (e.g. `ArchiveJob`) that has a reference to a `Job` row, so evaluating `self.job` may involve an SQL query if that reference is not in the Storm cache (as is the case after any commit or abort, since both of those invalidate the cache).  This means that a job that executes an SQL statement that raises an error may crash as follows when trying to mark the job as failed:

    storm.database.InFailedSqlTransaction: current transaction is aborted, commands ignored until end of transaction block

To fix this, take slightly more control of transaction management.  `self.job.fail(manage_transaction=True)` aborts the transaction at the start of the method and commits the transaction at the end, but calling that in the context of an already-aborted transaction requires `self.job` to be in the Storm cache.  Instead, lift the abort/commit out by one layer so that `self.job` is evaluated in a new transaction.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:job-failure-txn-management into launchpad:master.
diff --git a/lib/lp/services/job/runner.py b/lib/lp/services/job/runner.py
index 9410069..77e78ac 100644
--- a/lib/lp/services/job/runner.py
+++ b/lib/lp/services/job/runner.py
@@ -345,7 +345,17 @@ class BaseRunnableJob(BaseRunnableJobSource):
 
     def fail(self, manage_transaction=False):
         """See `IJob`."""
-        self.job.fail(manage_transaction=manage_transaction)
+        # The job may have failed due to an error in an SQL statement, and
+        # `self.job` may not have been loaded since it was invalidated by
+        # the commit in `BaseRunnableJob.start`.  To avoid hitting an
+        # `InFailedSqlTransaction` exception here, we manage the transaction
+        # manually so that the rollback happens before trying to load
+        # `self.job`.
+        if manage_transaction:
+            transaction.abort()
+        self.job.fail()
+        if manage_transaction:
+            transaction.commit()
         statsd = getUtility(IStatsdClient)
         statsd.incr('job.fail_count', labels={'type': self.__class__.__name__})
 
diff --git a/lib/lp/services/job/tests/test_runner.py b/lib/lp/services/job/tests/test_runner.py
index 34cee43..34d7bb8 100644
--- a/lib/lp/services/job/tests/test_runner.py
+++ b/lib/lp/services/job/tests/test_runner.py
@@ -19,6 +19,11 @@ from lazr.jobrunner.jobrunner import (
     )
 from lazr.restful.utils import get_current_browser_request
 from pytz import UTC
+from storm.locals import (
+    Bool,
+    Int,
+    Reference,
+    )
 from testtools.matchers import (
     GreaterThan,
     LessThan,
@@ -30,7 +35,12 @@ import transaction
 from zope.interface import implementer
 
 from lp.services.config import config
-from lp.services.database.sqlbase import flush_database_updates
+from lp.services.database.interfaces import IStore
+from lp.services.database.sqlbase import (
+    connect,
+    flush_database_updates,
+    )
+from lp.services.database.stormbase import StormBase
 from lp.services.features.testing import FeatureFixture
 from lp.services.job.interfaces.job import (
     IRunnableJob,
@@ -493,6 +503,87 @@ class TestJobRunner(StatsMixin, TestCaseWithFactory):
         self.assertIsNot(None, mo)
 
 
+@implementer(IRunnableJob)
+class DerivedJob(BaseRunnableJob, StormBase):
+    """A job using a separate database table with a reference to Job."""
+
+    __storm_table__ = "DerivedJob"
+
+    id = Int(primary=True)
+
+    job_id = Int(name="job", allow_none=False)
+    job = Reference(job_id, Job.id)
+
+    should_succeed = Bool(name="should_succeed", allow_none=False)
+
+    def __init__(self, should_succeed):
+        super().__init__()
+        self.job = Job()
+        self.should_succeed = should_succeed
+
+    def run(self):
+        if not self.should_succeed:
+            IStore(self).execute("SELECT 1/0")
+
+
+class TestJobRunnerDerivedJob(StatsMixin, TestCaseWithFactory):
+    """Test JobRunner's behaviour with a job using a separate DB table."""
+
+    layer = LaunchpadZopelessLayer
+
+    def setUp(self):
+        super().setUp()
+        con = connect()
+        cur = con.cursor()
+        cur.execute(dedent("""
+            CREATE TABLE DerivedJob (
+                id serial PRIMARY KEY,
+                job integer NOT NULL REFERENCES Job,
+                should_succeed boolean NOT NULL
+            )
+            """))
+        cur.execute("GRANT ALL ON DerivedJob TO launchpad_main")
+        cur.execute("GRANT ALL ON derivedjob_id_seq TO launchpad_main")
+        con.commit()
+        self.setUpStats()
+
+    def test_runJob(self):
+        """Status is set to completed when a job runs to completion."""
+        job = DerivedJob(should_succeed=True)
+        flush_database_updates()
+        runner = JobRunner([job])
+        runner.runJob(job, None)
+        self.assertEqual(JobStatus.COMPLETED, job.job.status)
+        self.assertEqual([job], runner.completed_jobs)
+        self.assertEqual(
+            self.stats_client.incr.call_args_list[0][0],
+            ("job.start_count,env=test,type=DerivedJob",))
+        self.assertEqual(
+            self.stats_client.incr.call_args_list[1][0],
+            ("job.complete_count,env=test,type=DerivedJob",))
+
+    def test_runAll_reports_oopses(self):
+        """When an error is encountered, report an oops and continue."""
+        job_1 = DerivedJob(should_succeed=False)
+        job_2 = DerivedJob(should_succeed=True)
+        flush_database_updates()
+        runner = JobRunner([job_1, job_2])
+        runner.runAll()
+        self.assertEqual([], pop_notifications())
+        self.assertEqual([job_2], runner.completed_jobs)
+        self.assertEqual([job_1], runner.incomplete_jobs)
+        self.assertEqual(JobStatus.FAILED, job_1.job.status)
+        self.assertEqual(JobStatus.COMPLETED, job_2.job.status)
+        oops = self.oopses[-1]
+        self.assertIn("division by zero", oops["tb_text"])
+        self.assertEqual(
+            self.stats_client.incr.call_args_list[0][0],
+            ("job.start_count,env=test,type=DerivedJob",))
+        self.assertEqual(
+            self.stats_client.incr.call_args_list[1][0],
+            ("job.fail_count,env=test,type=DerivedJob",))
+
+
 class StaticJobSource(BaseRunnableJob):
 
     @classmethod