← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~adeuring/launchpad/abort-transaction-in-job-queue into lp:launchpad

 

Abel Deuring has proposed merging lp:~adeuring/launchpad/abort-transaction-in-job-queue into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~adeuring/launchpad/abort-transaction-in-job-queue/+merge/101762

This branch adds a parameter "abort_transaction" to the method Job.queue().

Jobs are now controlled by the lazr.jobrunner mudule, where I recently
added a timeout mechanism: This Celery based job runner can define
more than one queues for the same jobs, with different timeout values.
If a job times out in a "fast" queue, it cen be re-queue in another
queue with a longer timeout value.

This requires to change the status of the job back to JobStatus.WAITING.
This is done in Job.queue() -- but this method is also called when
a job raises an exception that is listed in Job.retry_error_types.

This retry mechanism assumes that the job left the database in a
consistent state when it raised the "retry exception", hence Job.queue()
calls transaction.commit(). This is mostly likely bad when the job
is interrupted by a timeout, so I added the parameter abort_transaction,
which is True by default.

I also noticed that the paremeter manage_transaction of the methods
Job.start(), Job.complete(), Job.fail() etc was not tested, so I added
these tests:

./bin/test -vvt    lp.services.job.tests.test_job.TestJob.test_.*_manages_transactions

no lint

-- 
https://code.launchpad.net/~adeuring/launchpad/abort-transaction-in-job-queue/+merge/101762
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~adeuring/launchpad/abort-transaction-in-job-queue into lp:launchpad.
=== modified file 'lib/lp/services/job/interfaces/job.py'
--- lib/lp/services/job/interfaces/job.py	2012-04-06 17:28:25 +0000
+++ lib/lp/services/job/interfaces/job.py	2012-04-12 15:42:51 +0000
@@ -115,22 +115,22 @@
     def getTimeout():
         """Determine how long this job can run before timing out."""
 
-    def start():
+    def start(manage_transaction=False):
         """Mark the job as started."""
 
-    def complete():
+    def complete(manage_transaction=False):
         """Mark the job as completed."""
 
-    def fail():
+    def fail(manage_transaction=False):
         """Indicate that the job has failed permanently.
 
         Only running jobs can fail.
         """
 
-    def queue():
+    def queue(manage_transaction=False, abort_transaction=False):
         """Mark the job as queued for processing."""
 
-    def suspend():
+    def suspend(manage_transaction=False):
         """Mark the job as suspended.
 
         Only waiting jobs can be suspended."""

=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py	2012-04-09 19:02:25 +0000
+++ lib/lp/services/job/model/job.py	2012-04-12 15:42:51 +0000
@@ -189,10 +189,12 @@
         if manage_transaction:
             transaction.commit()
 
-    def queue(self, manage_transaction=False):
+    def queue(self, manage_transaction=False, abort_transaction=False):
         """See `IJob`."""
-        # Commit the transaction to update the DB time.
         if manage_transaction:
+            if abort_transaction:
+                transaction.abort()
+            # Commit the transaction to update the DB time.
             transaction.commit()
         self._set_status(JobStatus.WAITING)
         self.date_finished = datetime.datetime.now(UTC)

=== modified file 'lib/lp/services/job/tests/test_job.py'
--- lib/lp/services/job/tests/test_job.py	2012-03-21 16:16:08 +0000
+++ lib/lp/services/job/tests/test_job.py	2012-04-12 15:42:51 +0000
@@ -9,6 +9,7 @@
 import pytz
 from lazr.jobrunner.jobrunner import LeaseHeld
 from storm.locals import Store
+import transaction
 
 from lp.services.database.constants import UTC_NOW
 from lp.services.database.lpstorm import IStore
@@ -252,6 +253,119 @@
             self.assertEqual(
                 status in Job.PENDING_STATUSES, job.is_pending)
 
+    def test_start_manages_transactions(self):
+        # Job.start() does not commit the transaction by default.
+        with TransactionRecorder() as recorder:
+            job = Job()
+            job.start()
+            self.assertEqual([], recorder.transaction_calls)
+
+        # If explicitly specified, Job.start() commits the
+        # transaction.
+        with TransactionRecorder() as recorder:
+            job = Job()
+            job.start(manage_transaction=True)
+            self.assertEqual(['commit'], recorder.transaction_calls)
+
+    def test_complete_manages_transactions(self):
+        # Job.complete() does not commit the transaction by default.
+        job = Job()
+        job.start()
+        with TransactionRecorder() as recorder:
+            job.complete()
+            self.assertEqual([], recorder.transaction_calls)
+
+        # If explicitly specified, Job.complete() commits the
+        # transaction.
+        job = Job()
+        job.start()
+        with TransactionRecorder() as recorder:
+            job.complete(manage_transaction=True)
+            self.assertEqual(['commit', 'commit'], recorder.transaction_calls)
+
+    def test_fail_manages_transactions(self):
+        # Job.fail() does not commit the transaction by default.
+        job = Job()
+        job.start()
+        with TransactionRecorder() as recorder:
+            job.fail()
+            self.assertEqual([], recorder.transaction_calls)
+
+        # If explicitly specified, Job.fail() commits the
+        # transaction. Note that there is an additional commit to
+        # update the job status.
+        job = Job()
+        job.start()
+        with TransactionRecorder() as recorder:
+            job.fail(manage_transaction=True)
+            self.assertEqual(['abort', 'commit'], recorder.transaction_calls)
+
+    def test_queue_manages_transactions(self):
+        # Job.queue() does not commit the transaction by default.
+        job = Job()
+        job.start()
+        with TransactionRecorder() as recorder:
+            job.queue()
+            self.assertEqual([], recorder.transaction_calls)
+
+        # If explicitly specified, Job.queue() commits the
+        # transaction. Note that there is an additional commit to
+        # update the job status.
+        job = Job()
+        job.start()
+        with TransactionRecorder() as recorder:
+            job.queue(manage_transaction=True)
+            self.assertEqual(['commit', 'commit'], recorder.transaction_calls)
+
+        # If abort_transaction=True is also passed to Job.queue()
+        # the transaction is first aborted, then two times committed.
+        job = Job()
+        job.start()
+        with TransactionRecorder() as recorder:
+            job.queue(manage_transaction=True, abort_transaction=True)
+            self.assertEqual(
+                ['abort', 'commit', 'commit'], recorder.transaction_calls)
+
+    def test_suspend_manages_transactions(self):
+        # Job.suspend() does not commit the transaction by default.
+        job = Job()
+        job.start()
+        with TransactionRecorder() as recorder:
+            job.suspend()
+            self.assertEqual([], recorder.transaction_calls)
+
+        # If explicitly specified, Job.suspend() commits the
+        # transaction.
+        job = Job()
+        job.start()
+        with TransactionRecorder() as recorder:
+            job.suspend(manage_transaction=True)
+            self.assertEqual(['commit'], recorder.transaction_calls)
+
+
+class TransactionRecorder:
+    def __init__(self):
+        self.transaction_calls = []
+
+    def __enter__(self):
+        self.real_commit = transaction.commit
+        self.real_abort = transaction.abort
+        transaction.commit = self.commit
+        transaction.abort = self.abort
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        transaction.commit = self.real_commit
+        transaction.abort = self.real_abort
+
+    def commit(self):
+        self.transaction_calls.append('commit')
+        self.real_commit()
+
+    def abort(self):
+        self.transaction_calls.append('abort')
+        self.real_abort()
+
 
 class TestReadiness(TestCase):
     """Test the implementation of readiness."""


Follow ups