launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #19072
[Merge] lp:~wgrant/launchpad/job-scheduled_start-retries into lp:launchpad
William Grant has proposed merging lp:~wgrant/launchpad/job-scheduled_start-retries into lp:launchpad.
Commit message:
Teach Celery to respect Job.scheduled_start, and use it to delay retries for all runners.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~wgrant/launchpad/job-scheduled_start-retries/+merge/265793
More integration of Job.scheduled_start:
- Celery respects scheduled_start if it is set.
- Retries set scheduled_start = now + retry_delay, so more than just celery respects retry_delay.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~wgrant/launchpad/job-scheduled_start-retries into lp:launchpad.
=== modified file 'lib/lp/code/model/tests/test_branch.py'
--- lib/lp/code/model/tests/test_branch.py 2015-04-22 12:03:05 +0000
+++ lib/lp/code/model/tests/test_branch.py 2015-07-24 09:55:29 +0000
@@ -10,7 +10,6 @@
datetime,
timedelta,
)
-import json
from bzrlib.branch import Branch
from bzrlib.bzrdir import BzrDir
@@ -73,6 +72,7 @@
from lp.code.interfaces.branchjob import (
IBranchScanJobSource,
IBranchUpgradeJobSource,
+ IReclaimBranchSpaceJobSource,
)
from lp.code.interfaces.branchlookup import IBranchLookup
from lp.code.interfaces.branchmergeproposal import (
@@ -371,8 +371,13 @@
db_branch, tree = self.create_branch_and_tree()
branch_path = get_real_branch_path(db_branch.id)
self.assertThat(branch_path, PathExists())
+ store = Store.of(db_branch)
with person_logged_in(db_branch.owner):
db_branch.destroySelf()
+ job = store.find(
+ BranchJob,
+ BranchJob.job_type == BranchJobType.RECLAIM_BRANCH_SPACE).one()
+ job.job.scheduled_start = datetime.now(UTC)
with block_on_job():
transaction.commit()
self.assertThat(branch_path, Not(PathExists()))
=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py 2013-05-15 04:41:33 +0000
+++ lib/lp/services/job/celeryconfig.py 2015-07-24 09:55:29 +0000
@@ -80,6 +80,7 @@
result['CELERY_CREATE_MISSING_QUEUES'] = False
result['CELERY_DEFAULT_EXCHANGE'] = 'job'
result['CELERY_DEFAULT_QUEUE'] = 'launchpad_job'
+ result['CELERY_ENABLE_UTC'] = True
result['CELERY_IMPORTS'] = ("lp.services.job.celeryjob", )
result['CELERY_QUEUES'] = celery_queues
result['CELERY_RESULT_BACKEND'] = 'amqp'
=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py 2015-07-08 16:05:11 +0000
+++ lib/lp/services/job/model/job.py 2015-07-24 09:55:29 +0000
@@ -129,7 +129,11 @@
@property
def is_runnable(self):
"""See `IJob`."""
- return self.status == JobStatus.WAITING
+ if self.status != JobStatus.WAITING:
+ return False
+ if self.scheduled_start is None:
+ return True
+ return self.scheduled_start <= datetime.datetime.now(UTC)
@classmethod
def createMultiple(self, store, num_jobs, requester=None):
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2015-07-09 20:06:17 +0000
+++ lib/lp/services/job/runner.py 2015-07-24 09:55:29 +0000
@@ -46,6 +46,7 @@
JobRunner as LazrJobRunner,
LeaseHeld,
)
+from pytz import utc
from storm.exceptions import LostObjectError
import transaction
from twisted.internet import reactor
@@ -226,10 +227,12 @@
cls = CeleryRunJob
db_class = self.getDBClass()
ujob_id = (self.job_id, db_class.__module__, db_class.__name__)
- if self.job.lease_expires is not None:
- eta = datetime.now() + self.retry_delay
- else:
- eta = None
+ eta = self.job.scheduled_start
+ # Don't schedule the job while its lease is still held, or
+ # celery will skip it.
+ if (self.job.lease_expires is not None
+ and (eta is None or eta < self.job.lease_expires)):
+ eta = self.job.lease_expires
return cls.apply_async(
(ujob_id, self.config.dbuser), queue=self.task_queue, eta=eta,
task_id=self.taskId())
@@ -254,6 +257,8 @@
def queue(self, manage_transaction=False, abort_transaction=False):
"""See `IJob`."""
+ if self.job.attempt_count > 0:
+ self.job.scheduled_start = datetime.now(utc) + self.retry_delay
self.job.queue(
manage_transaction, abort_transaction,
add_commit_hook=self.celeryRunOnCommit)
=== renamed file 'lib/lp/services/job/tests/test_retry_jobs_with_celery.py' => 'lib/lp/services/job/tests/test_celery.py'
--- lib/lp/services/job/tests/test_retry_jobs_with_celery.py 2015-07-09 20:06:17 +0000
+++ lib/lp/services/job/tests/test_celery.py 2015-07-24 09:55:29 +0000
@@ -4,10 +4,22 @@
"""Tests for running jobs via Celery."""
-from datetime import timedelta
+from datetime import (
+ datetime,
+ timedelta,
+ )
from time import sleep
+import iso8601
from lazr.delegates import delegate_to
+from pytz import UTC
+from testtools.matchers import (
+ GreaterThan,
+ HasLength,
+ LessThan,
+ MatchesAll,
+ MatchesListwise,
+ )
import transaction
from zope.interface import implementer
@@ -33,12 +45,12 @@
config = config.launchpad
- def __init__(self, job_id=None):
+ def __init__(self, job_id=None, scheduled_start=None):
if job_id is not None:
store = IStore(Job)
self.job = store.find(Job, id=job_id)[0]
else:
- self.job = Job(max_retries=2)
+ self.job = Job(max_retries=2, scheduled_start=scheduled_start)
def run(self):
pass
@@ -61,18 +73,33 @@
retry_error_types = (RetryException, )
- retry_delay = timedelta(seconds=1)
+ retry_delay = timedelta(seconds=5)
+
+ def acquireLease(self, duration=10):
+ return self.job.acquireLease(duration)
+
+ def storeDateStarted(self):
+ existing = self.job.base_json_data or {}
+ existing.setdefault('dates_started', [])
+ existing['dates_started'].append(self.job.date_started.isoformat())
+ self.job.base_json_data = existing
def run(self):
- """Raise a retry exception on the the first attempt to run the job."""
- if self.job.attempt_count < 2:
- # Reset the lease so that the next attempt to run the
- # job does not fail with a LeaseHeld error.
- self.job.lease_expires = None
- raise RetryException
-
-
-class TestRetryJobsViaCelery(TestCaseWithFactory):
+ """Concoct various retry scenarios."""
+ self.storeDateStarted()
+ if self.job.attempt_count == 1:
+ # First test without a conflicting lease. The job should be
+ # rescheduled for 5 seconds (retry_delay) in the future.
+ self.job.lease_expires = datetime.now(UTC)
+ raise RetryException
+ elif self.job.attempt_count == 2:
+ # The retry delay is 5 seconds, but the lease is for nearly
+ # 10 seconds, so the job will be rescheduled 10 seconds in
+ # the future.
+ raise RetryException
+
+
+class TestJobsViaCelery(TestCaseWithFactory):
"""Tests for running jobs via Celery."""
layer = CeleryJobLayer
@@ -91,37 +118,89 @@
dbjob = store.find(Job, id=job_id)[0]
self.assertEqual(JobStatus.COMPLETED, dbjob.status)
+ def test_scheduled_start(self):
+ # Submit four jobs: one in the past, one in the far future, one
+ # in 10 seconds, and one at any time. Wait up to a minute and
+ # ensure that the correct three have completed, and that they
+ # completed in the expected order.
+ self.useFixture(FeatureFixture({
+ 'jobs.celery.enabled_classes': 'TestJob'
+ }))
+ now = datetime.now(UTC)
+ job_past = TestJob(scheduled_start=now - timedelta(seconds=60))
+ job_past.celeryRunOnCommit()
+ self.assertTrue(job_past.is_runnable)
+ job_forever = TestJob(scheduled_start=now + timedelta(seconds=600))
+ job_forever.celeryRunOnCommit()
+ self.assertFalse(job_forever.is_runnable)
+ job_future = TestJob(scheduled_start=now + timedelta(seconds=10))
+ job_future.celeryRunOnCommit()
+ self.assertFalse(job_future.is_runnable)
+ job_whenever = TestJob(scheduled_start=None)
+ job_whenever.celeryRunOnCommit()
+ self.assertTrue(job_whenever.is_runnable)
+ transaction.commit()
+
+ count = 0
+ while (count < 300
+ and (job_past.is_pending or job_future.is_pending
+ or job_whenever.is_pending)):
+ sleep(0.2)
+ count += 1
+ transaction.abort()
+
+ self.assertEqual(JobStatus.COMPLETED, job_past.status)
+ self.assertEqual(JobStatus.COMPLETED, job_future.status)
+ self.assertEqual(JobStatus.COMPLETED, job_whenever.status)
+ self.assertEqual(JobStatus.WAITING, job_forever.status)
+ self.assertThat(
+ job_future.date_started, GreaterThan(job_past.date_started))
+ self.assertThat(
+ job_future.date_started, GreaterThan(job_whenever.date_started))
+
def test_jobs_with_retry_exceptions_are_queued_again(self):
# A job that raises a retry error is automatically queued
# and executed again.
self.useFixture(FeatureFixture({
'jobs.celery.enabled_classes': 'TestJobWithRetryError'
}))
- with block_on_job(self):
- job = TestJobWithRetryError()
- job.celeryRunOnCommit()
- job_id = job.job_id
- transaction.commit()
- store = IStore(Job)
-
- # block_on_job() is not aware of the Celery request
- # issued when the retry exception occurs, but we can
- # check the status of the job in the database.
- def job_finished():
- transaction.abort()
- dbjob = store.find(Job, id=job_id)[0]
- return (
- dbjob.status == JobStatus.COMPLETED and
- dbjob.attempt_count == 2)
- count = 0
- while count < 300 and not job_finished():
- # We have a maximum wait of one minute. We should not get
- # anywhere close to that on developer machines (10 seconds was
- # working fine), but when the test suite is run in parallel we
- # can need a lot more time (see bug 1007576).
- sleep(0.2)
- count += 1
-
- dbjob = store.find(Job, id=job_id)[0]
- self.assertEqual(2, dbjob.attempt_count)
- self.assertEqual(JobStatus.COMPLETED, dbjob.status)
+
+ # Set scheduled_start on the job to ensure that retry delays
+ # override it.
+ job = TestJobWithRetryError(
+ scheduled_start=datetime.now(UTC) + timedelta(seconds=1))
+ job.celeryRunOnCommit()
+ transaction.commit()
+
+ count = 0
+ while count < 300 and job.is_pending:
+ # We have a maximum wait of one minute. We should not get
+ # anywhere close to that on developer machines (10 seconds was
+ # working fine), but when the test suite is run in parallel we
+ # can need a lot more time (see bug 1007576).
+ sleep(0.2)
+ count += 1
+ transaction.abort()
+
+ # Collect the start times recorded by the job.
+ dates_started = [
+ iso8601.parse_date(d)
+ for d in job.job.base_json_data['dates_started']]
+
+ # The first attempt's lease is set to the end of the job, so
+ # the second attempt should start roughly 5 seconds after the
+ # first. The third attempt has to wait out the full 10 second
+ # lease, so it should start roughly 10 seconds after the second.
+ self.assertThat(dates_started, HasLength(3))
+ self.assertThat(dates_started,
+ MatchesListwise([
+ MatchesAll(),
+ MatchesAll(
+ GreaterThan(dates_started[0] + timedelta(seconds=4)),
+ LessThan(dates_started[0] + timedelta(seconds=8))),
+ MatchesAll(
+ GreaterThan(dates_started[1] + timedelta(seconds=8)),
+ LessThan(dates_started[1] + timedelta(seconds=12))),
+ ]))
+ self.assertEqual(3, job.attempt_count)
+ self.assertEqual(JobStatus.COMPLETED, job.status)
=== modified file 'lib/lp/services/job/tests/test_job.py'
--- lib/lp/services/job/tests/test_job.py 2013-06-20 05:50:00 +0000
+++ lib/lp/services/job/tests/test_job.py 2015-07-24 09:55:29 +0000
@@ -3,7 +3,11 @@
__metaclass__ = type
-from datetime import datetime
+from datetime import (
+ datetime,
+ timedelta,
+ )
+from pytz import UTC
import time
from lazr.jobrunner.jobrunner import LeaseHeld
@@ -258,6 +262,30 @@
self.assertEqual(
status in Job.PENDING_STATUSES, job.is_pending)
+ def test_is_runnable_when_failed(self):
+ """is_runnable is false when the job is not WAITING."""
+ job = Job(_status=JobStatus.FAILED)
+ self.assertFalse(job.is_runnable)
+
+ def test_is_runnable_when_scheduled_in_future(self):
+ """is_runnable is false when the job is scheduled in the future."""
+ job = Job(
+ _status=JobStatus.WAITING,
+ scheduled_start=datetime.now(UTC) + timedelta(seconds=60))
+ self.assertFalse(job.is_runnable)
+
+ def test_is_runnable_when_scheduled_in_past(self):
+ """is_runnable is true when the job is scheduled in the past."""
+ job = Job(
+ _status=JobStatus.WAITING,
+ scheduled_start=datetime.now(UTC) - timedelta(seconds=60))
+ self.assertTrue(job.is_runnable)
+
+ def test_is_runnable_when_not_scheduled(self):
+ """is_runnable is true when no explicit schedule has been requested."""
+ job = Job(_status=JobStatus.WAITING)
+ self.assertTrue(job.is_runnable)
+
def test_start_manages_transactions(self):
# Job.start() does not commit the transaction by default.
with TransactionRecorder() as recorder:
Follow ups