launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #07108
[Merge] lp:~adeuring/launchpad/abort-transaction-in-job-queue into lp:launchpad
Abel Deuring has proposed merging lp:~adeuring/launchpad/abort-transaction-in-job-queue into lp:launchpad.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~adeuring/launchpad/abort-transaction-in-job-queue/+merge/101762
This branch adds a parameter "abort_transaction" to the method Job.queue().
Jobs are now controlled by the lazr.jobrunner mudule, where I recently
added a timeout mechanism: This Celery based job runner can define
more than one queues for the same jobs, with different timeout values.
If a job times out in a "fast" queue, it cen be re-queue in another
queue with a longer timeout value.
This requires to change the status of the job back to JobStatus.WAITING.
This is done in Job.queue() -- but this method is also called when
a job raises an exception that is listed in Job.retry_error_types.
This retry mechanism assumes that the job left the database in a
consistent state when it raised the "retry exception", hence Job.queue()
calls transaction.commit(). This is mostly likely bad when the job
is interrupted by a timeout, so I added the parameter abort_transaction,
which is True by default.
I also noticed that the paremeter manage_transaction of the methods
Job.start(), Job.complete(), Job.fail() etc was not tested, so I added
these tests:
./bin/test -vvt lp.services.job.tests.test_job.TestJob.test_.*_manages_transactions
no lint
--
https://code.launchpad.net/~adeuring/launchpad/abort-transaction-in-job-queue/+merge/101762
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~adeuring/launchpad/abort-transaction-in-job-queue into lp:launchpad.
=== modified file 'lib/lp/services/job/interfaces/job.py'
--- lib/lp/services/job/interfaces/job.py 2012-04-06 17:28:25 +0000
+++ lib/lp/services/job/interfaces/job.py 2012-04-12 15:42:51 +0000
@@ -115,22 +115,22 @@
def getTimeout():
"""Determine how long this job can run before timing out."""
- def start():
+ def start(manage_transaction=False):
"""Mark the job as started."""
- def complete():
+ def complete(manage_transaction=False):
"""Mark the job as completed."""
- def fail():
+ def fail(manage_transaction=False):
"""Indicate that the job has failed permanently.
Only running jobs can fail.
"""
- def queue():
+ def queue(manage_transaction=False, abort_transaction=False):
"""Mark the job as queued for processing."""
- def suspend():
+ def suspend(manage_transaction=False):
"""Mark the job as suspended.
Only waiting jobs can be suspended."""
=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py 2012-04-09 19:02:25 +0000
+++ lib/lp/services/job/model/job.py 2012-04-12 15:42:51 +0000
@@ -189,10 +189,12 @@
if manage_transaction:
transaction.commit()
- def queue(self, manage_transaction=False):
+ def queue(self, manage_transaction=False, abort_transaction=False):
"""See `IJob`."""
- # Commit the transaction to update the DB time.
if manage_transaction:
+ if abort_transaction:
+ transaction.abort()
+ # Commit the transaction to update the DB time.
transaction.commit()
self._set_status(JobStatus.WAITING)
self.date_finished = datetime.datetime.now(UTC)
=== modified file 'lib/lp/services/job/tests/test_job.py'
--- lib/lp/services/job/tests/test_job.py 2012-03-21 16:16:08 +0000
+++ lib/lp/services/job/tests/test_job.py 2012-04-12 15:42:51 +0000
@@ -9,6 +9,7 @@
import pytz
from lazr.jobrunner.jobrunner import LeaseHeld
from storm.locals import Store
+import transaction
from lp.services.database.constants import UTC_NOW
from lp.services.database.lpstorm import IStore
@@ -252,6 +253,119 @@
self.assertEqual(
status in Job.PENDING_STATUSES, job.is_pending)
+ def test_start_manages_transactions(self):
+ # Job.start() does not commit the transaction by default.
+ with TransactionRecorder() as recorder:
+ job = Job()
+ job.start()
+ self.assertEqual([], recorder.transaction_calls)
+
+ # If explicitly specified, Job.start() commits the
+ # transaction.
+ with TransactionRecorder() as recorder:
+ job = Job()
+ job.start(manage_transaction=True)
+ self.assertEqual(['commit'], recorder.transaction_calls)
+
+ def test_complete_manages_transactions(self):
+ # Job.complete() does not commit the transaction by default.
+ job = Job()
+ job.start()
+ with TransactionRecorder() as recorder:
+ job.complete()
+ self.assertEqual([], recorder.transaction_calls)
+
+ # If explicitly specified, Job.complete() commits the
+ # transaction.
+ job = Job()
+ job.start()
+ with TransactionRecorder() as recorder:
+ job.complete(manage_transaction=True)
+ self.assertEqual(['commit', 'commit'], recorder.transaction_calls)
+
+ def test_fail_manages_transactions(self):
+ # Job.fail() does not commit the transaction by default.
+ job = Job()
+ job.start()
+ with TransactionRecorder() as recorder:
+ job.fail()
+ self.assertEqual([], recorder.transaction_calls)
+
+ # If explicitly specified, Job.fail() commits the
+ # transaction. Note that there is an additional commit to
+ # update the job status.
+ job = Job()
+ job.start()
+ with TransactionRecorder() as recorder:
+ job.fail(manage_transaction=True)
+ self.assertEqual(['abort', 'commit'], recorder.transaction_calls)
+
+ def test_queue_manages_transactions(self):
+ # Job.queue() does not commit the transaction by default.
+ job = Job()
+ job.start()
+ with TransactionRecorder() as recorder:
+ job.queue()
+ self.assertEqual([], recorder.transaction_calls)
+
+ # If explicitly specified, Job.queue() commits the
+ # transaction. Note that there is an additional commit to
+ # update the job status.
+ job = Job()
+ job.start()
+ with TransactionRecorder() as recorder:
+ job.queue(manage_transaction=True)
+ self.assertEqual(['commit', 'commit'], recorder.transaction_calls)
+
+ # If abort_transaction=True is also passed to Job.queue()
+ # the transaction is first aborted, then two times committed.
+ job = Job()
+ job.start()
+ with TransactionRecorder() as recorder:
+ job.queue(manage_transaction=True, abort_transaction=True)
+ self.assertEqual(
+ ['abort', 'commit', 'commit'], recorder.transaction_calls)
+
+ def test_suspend_manages_transactions(self):
+ # Job.suspend() does not commit the transaction by default.
+ job = Job()
+ job.start()
+ with TransactionRecorder() as recorder:
+ job.suspend()
+ self.assertEqual([], recorder.transaction_calls)
+
+ # If explicitly specified, Job.suspend() commits the
+ # transaction.
+ job = Job()
+ job.start()
+ with TransactionRecorder() as recorder:
+ job.suspend(manage_transaction=True)
+ self.assertEqual(['commit'], recorder.transaction_calls)
+
+
+class TransactionRecorder:
+ def __init__(self):
+ self.transaction_calls = []
+
+ def __enter__(self):
+ self.real_commit = transaction.commit
+ self.real_abort = transaction.abort
+ transaction.commit = self.commit
+ transaction.abort = self.abort
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ transaction.commit = self.real_commit
+ transaction.abort = self.real_abort
+
+ def commit(self):
+ self.transaction_calls.append('commit')
+ self.real_commit()
+
+ def abort(self):
+ self.transaction_calls.append('abort')
+ self.real_abort()
+
class TestReadiness(TestCase):
"""Test the implementation of readiness."""
Follow ups