launchpad-reviewers team mailing list archive
  
  - 
     launchpad-reviewers team launchpad-reviewers team
- 
    Mailing list archive
  
- 
    Message #19072
  
 [Merge] lp:~wgrant/launchpad/job-scheduled_start-retries into lp:launchpad
  
William Grant has proposed merging lp:~wgrant/launchpad/job-scheduled_start-retries into lp:launchpad.
Commit message:
Teach Celery to respect Job.scheduled_start, and use it to delay retries for all runners.
Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~wgrant/launchpad/job-scheduled_start-retries/+merge/265793
More integration of Job.scheduled_start:
 - Celery respects scheduled_start if it is set.
 - Retries set scheduled_start = now + retry_delay, so more than just celery respects retry_delay.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~wgrant/launchpad/job-scheduled_start-retries into lp:launchpad.
=== modified file 'lib/lp/code/model/tests/test_branch.py'
--- lib/lp/code/model/tests/test_branch.py	2015-04-22 12:03:05 +0000
+++ lib/lp/code/model/tests/test_branch.py	2015-07-24 09:55:29 +0000
@@ -10,7 +10,6 @@
     datetime,
     timedelta,
     )
-import json
 
 from bzrlib.branch import Branch
 from bzrlib.bzrdir import BzrDir
@@ -73,6 +72,7 @@
 from lp.code.interfaces.branchjob import (
     IBranchScanJobSource,
     IBranchUpgradeJobSource,
+    IReclaimBranchSpaceJobSource,
     )
 from lp.code.interfaces.branchlookup import IBranchLookup
 from lp.code.interfaces.branchmergeproposal import (
@@ -371,8 +371,13 @@
         db_branch, tree = self.create_branch_and_tree()
         branch_path = get_real_branch_path(db_branch.id)
         self.assertThat(branch_path, PathExists())
+        store = Store.of(db_branch)
         with person_logged_in(db_branch.owner):
             db_branch.destroySelf()
+        job = store.find(
+            BranchJob,
+            BranchJob.job_type == BranchJobType.RECLAIM_BRANCH_SPACE).one()
+        job.job.scheduled_start = datetime.now(UTC)
         with block_on_job():
             transaction.commit()
         self.assertThat(branch_path, Not(PathExists()))
=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py	2013-05-15 04:41:33 +0000
+++ lib/lp/services/job/celeryconfig.py	2015-07-24 09:55:29 +0000
@@ -80,6 +80,7 @@
     result['CELERY_CREATE_MISSING_QUEUES'] = False
     result['CELERY_DEFAULT_EXCHANGE'] = 'job'
     result['CELERY_DEFAULT_QUEUE'] = 'launchpad_job'
+    result['CELERY_ENABLE_UTC'] = True
     result['CELERY_IMPORTS'] = ("lp.services.job.celeryjob", )
     result['CELERY_QUEUES'] = celery_queues
     result['CELERY_RESULT_BACKEND'] = 'amqp'
=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py	2015-07-08 16:05:11 +0000
+++ lib/lp/services/job/model/job.py	2015-07-24 09:55:29 +0000
@@ -129,7 +129,11 @@
     @property
     def is_runnable(self):
         """See `IJob`."""
-        return self.status == JobStatus.WAITING
+        if self.status != JobStatus.WAITING:
+            return False
+        if self.scheduled_start is None:
+            return True
+        return self.scheduled_start <= datetime.datetime.now(UTC)
 
     @classmethod
     def createMultiple(self, store, num_jobs, requester=None):
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py	2015-07-09 20:06:17 +0000
+++ lib/lp/services/job/runner.py	2015-07-24 09:55:29 +0000
@@ -46,6 +46,7 @@
     JobRunner as LazrJobRunner,
     LeaseHeld,
     )
+from pytz import utc
 from storm.exceptions import LostObjectError
 import transaction
 from twisted.internet import reactor
@@ -226,10 +227,12 @@
             cls = CeleryRunJob
         db_class = self.getDBClass()
         ujob_id = (self.job_id, db_class.__module__, db_class.__name__)
-        if self.job.lease_expires is not None:
-            eta = datetime.now() + self.retry_delay
-        else:
-            eta = None
+        eta = self.job.scheduled_start
+        # Don't schedule the job while its lease is still held, or
+        # celery will skip it.
+        if (self.job.lease_expires is not None
+                and (eta is None or eta < self.job.lease_expires)):
+            eta = self.job.lease_expires
         return cls.apply_async(
             (ujob_id, self.config.dbuser), queue=self.task_queue, eta=eta,
             task_id=self.taskId())
@@ -254,6 +257,8 @@
 
     def queue(self, manage_transaction=False, abort_transaction=False):
         """See `IJob`."""
+        if self.job.attempt_count > 0:
+            self.job.scheduled_start = datetime.now(utc) + self.retry_delay
         self.job.queue(
             manage_transaction, abort_transaction,
             add_commit_hook=self.celeryRunOnCommit)
=== renamed file 'lib/lp/services/job/tests/test_retry_jobs_with_celery.py' => 'lib/lp/services/job/tests/test_celery.py'
--- lib/lp/services/job/tests/test_retry_jobs_with_celery.py	2015-07-09 20:06:17 +0000
+++ lib/lp/services/job/tests/test_celery.py	2015-07-24 09:55:29 +0000
@@ -4,10 +4,22 @@
 """Tests for running jobs via Celery."""
 
 
-from datetime import timedelta
+from datetime import (
+    datetime,
+    timedelta,
+    )
 from time import sleep
 
+import iso8601
 from lazr.delegates import delegate_to
+from pytz import UTC
+from testtools.matchers import (
+    GreaterThan,
+    HasLength,
+    LessThan,
+    MatchesAll,
+    MatchesListwise,
+    )
 import transaction
 from zope.interface import implementer
 
@@ -33,12 +45,12 @@
 
     config = config.launchpad
 
-    def __init__(self, job_id=None):
+    def __init__(self, job_id=None, scheduled_start=None):
         if job_id is not None:
             store = IStore(Job)
             self.job = store.find(Job, id=job_id)[0]
         else:
-            self.job = Job(max_retries=2)
+            self.job = Job(max_retries=2, scheduled_start=scheduled_start)
 
     def run(self):
         pass
@@ -61,18 +73,33 @@
 
     retry_error_types = (RetryException, )
 
-    retry_delay = timedelta(seconds=1)
+    retry_delay = timedelta(seconds=5)
+
+    def acquireLease(self, duration=10):
+        return self.job.acquireLease(duration)
+
+    def storeDateStarted(self):
+        existing = self.job.base_json_data or {}
+        existing.setdefault('dates_started', [])
+        existing['dates_started'].append(self.job.date_started.isoformat())
+        self.job.base_json_data = existing
 
     def run(self):
-        """Raise a retry exception on the the first attempt to run the job."""
-        if self.job.attempt_count < 2:
-            # Reset the lease so that the next attempt to run the
-            # job does not fail with a LeaseHeld error.
-            self.job.lease_expires = None
-            raise RetryException
-
-
-class TestRetryJobsViaCelery(TestCaseWithFactory):
+        """Concoct various retry scenarios."""
+        self.storeDateStarted()
+        if self.job.attempt_count == 1:
+            # First test without a conflicting lease. The job should be
+            # rescheduled for 5 seconds (retry_delay) in the future.
+            self.job.lease_expires = datetime.now(UTC)
+            raise RetryException
+        elif self.job.attempt_count == 2:
+            # The retry delay is 5 seconds, but the lease is for nearly
+            # 10 seconds, so the job will be rescheduled 10 seconds in
+            # the future.
+            raise RetryException
+
+
+class TestJobsViaCelery(TestCaseWithFactory):
     """Tests for running jobs via Celery."""
 
     layer = CeleryJobLayer
@@ -91,37 +118,89 @@
         dbjob = store.find(Job, id=job_id)[0]
         self.assertEqual(JobStatus.COMPLETED, dbjob.status)
 
+    def test_scheduled_start(self):
+        # Submit four jobs: one in the past, one in the far future, one
+        # in 10 seconds, and one at any time.  Wait up to a minute and
+        # ensure that the correct three have completed, and that they
+        # completed in the expected order.
+        self.useFixture(FeatureFixture({
+            'jobs.celery.enabled_classes': 'TestJob'
+        }))
+        now = datetime.now(UTC)
+        job_past = TestJob(scheduled_start=now - timedelta(seconds=60))
+        job_past.celeryRunOnCommit()
+        self.assertTrue(job_past.is_runnable)
+        job_forever = TestJob(scheduled_start=now + timedelta(seconds=600))
+        job_forever.celeryRunOnCommit()
+        self.assertFalse(job_forever.is_runnable)
+        job_future = TestJob(scheduled_start=now + timedelta(seconds=10))
+        job_future.celeryRunOnCommit()
+        self.assertFalse(job_future.is_runnable)
+        job_whenever = TestJob(scheduled_start=None)
+        job_whenever.celeryRunOnCommit()
+        self.assertTrue(job_whenever.is_runnable)
+        transaction.commit()
+
+        count = 0
+        while (count < 300
+                and (job_past.is_pending or job_future.is_pending
+                     or job_whenever.is_pending)):
+            sleep(0.2)
+            count += 1
+            transaction.abort()
+
+        self.assertEqual(JobStatus.COMPLETED, job_past.status)
+        self.assertEqual(JobStatus.COMPLETED, job_future.status)
+        self.assertEqual(JobStatus.COMPLETED, job_whenever.status)
+        self.assertEqual(JobStatus.WAITING, job_forever.status)
+        self.assertThat(
+            job_future.date_started, GreaterThan(job_past.date_started))
+        self.assertThat(
+            job_future.date_started, GreaterThan(job_whenever.date_started))
+
     def test_jobs_with_retry_exceptions_are_queued_again(self):
         # A job that raises a retry error is automatically queued
         # and executed again.
         self.useFixture(FeatureFixture({
             'jobs.celery.enabled_classes': 'TestJobWithRetryError'
         }))
-        with block_on_job(self):
-            job = TestJobWithRetryError()
-            job.celeryRunOnCommit()
-            job_id = job.job_id
-            transaction.commit()
-            store = IStore(Job)
-
-            # block_on_job() is not aware of the Celery request
-            # issued when the retry exception occurs, but we can
-            # check the status of the job in the database.
-            def job_finished():
-                transaction.abort()
-                dbjob = store.find(Job, id=job_id)[0]
-                return (
-                    dbjob.status == JobStatus.COMPLETED and
-                    dbjob.attempt_count == 2)
-            count = 0
-            while count < 300 and not job_finished():
-                # We have a maximum wait of one minute.  We should not get
-                # anywhere close to that on developer machines (10 seconds was
-                # working fine), but when the test suite is run in parallel we
-                # can need a lot more time (see bug 1007576).
-                sleep(0.2)
-                count += 1
-
-        dbjob = store.find(Job, id=job_id)[0]
-        self.assertEqual(2, dbjob.attempt_count)
-        self.assertEqual(JobStatus.COMPLETED, dbjob.status)
+
+        # Set scheduled_start on the job to ensure that retry delays
+        # override it.
+        job = TestJobWithRetryError(
+            scheduled_start=datetime.now(UTC) + timedelta(seconds=1))
+        job.celeryRunOnCommit()
+        transaction.commit()
+
+        count = 0
+        while count < 300 and job.is_pending:
+            # We have a maximum wait of one minute.  We should not get
+            # anywhere close to that on developer machines (10 seconds was
+            # working fine), but when the test suite is run in parallel we
+            # can need a lot more time (see bug 1007576).
+            sleep(0.2)
+            count += 1
+            transaction.abort()
+
+        # Collect the start times recorded by the job.
+        dates_started = [
+            iso8601.parse_date(d)
+            for d in job.job.base_json_data['dates_started']]
+
+        # The first attempt's lease is set to the end of the job, so
+        # the second attempt should start roughly 5 seconds after the
+        # first. The third attempt has to wait out the full 10 second
+        # lease, so it should start roughly 10 seconds after the second.
+        self.assertThat(dates_started, HasLength(3))
+        self.assertThat(dates_started,
+            MatchesListwise([
+                MatchesAll(),
+                MatchesAll(
+                    GreaterThan(dates_started[0] + timedelta(seconds=4)),
+                    LessThan(dates_started[0] + timedelta(seconds=8))),
+                MatchesAll(
+                    GreaterThan(dates_started[1] + timedelta(seconds=8)),
+                    LessThan(dates_started[1] + timedelta(seconds=12))),
+                ]))
+        self.assertEqual(3, job.attempt_count)
+        self.assertEqual(JobStatus.COMPLETED, job.status)
=== modified file 'lib/lp/services/job/tests/test_job.py'
--- lib/lp/services/job/tests/test_job.py	2013-06-20 05:50:00 +0000
+++ lib/lp/services/job/tests/test_job.py	2015-07-24 09:55:29 +0000
@@ -3,7 +3,11 @@
 
 __metaclass__ = type
 
-from datetime import datetime
+from datetime import (
+    datetime,
+    timedelta,
+    )
+from pytz import UTC
 import time
 
 from lazr.jobrunner.jobrunner import LeaseHeld
@@ -258,6 +262,30 @@
             self.assertEqual(
                 status in Job.PENDING_STATUSES, job.is_pending)
 
+    def test_is_runnable_when_failed(self):
+        """is_runnable is false when the job is not WAITING."""
+        job = Job(_status=JobStatus.FAILED)
+        self.assertFalse(job.is_runnable)
+
+    def test_is_runnable_when_scheduled_in_future(self):
+        """is_runnable is false when the job is scheduled in the future."""
+        job = Job(
+            _status=JobStatus.WAITING,
+            scheduled_start=datetime.now(UTC) + timedelta(seconds=60))
+        self.assertFalse(job.is_runnable)
+
+    def test_is_runnable_when_scheduled_in_past(self):
+        """is_runnable is true when the job is scheduled in the past."""
+        job = Job(
+            _status=JobStatus.WAITING,
+            scheduled_start=datetime.now(UTC) - timedelta(seconds=60))
+        self.assertTrue(job.is_runnable)
+
+    def test_is_runnable_when_not_scheduled(self):
+        """is_runnable is true when no explicit schedule has been requested."""
+        job = Job(_status=JobStatus.WAITING)
+        self.assertTrue(job.is_runnable)
+
     def test_start_manages_transactions(self):
         # Job.start() does not commit the transaction by default.
         with TransactionRecorder() as recorder:
Follow ups