← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/launchpad/snap-upload-better-retry into lp:launchpad

 

Colin Watson has proposed merging lp:~cjwatson/launchpad/snap-upload-better-retry into lp:launchpad.

Commit message:
Make SnapStoreUploadJob retries go via celery and be much more responsive.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #1689282 in Launchpad itself: "No-op snap takes 1.5 min to build, 6.5 minutes to publish"
  https://bugs.launchpad.net/launchpad/+bug/1689282

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/snap-upload-better-retry/+merge/326548

There were several problems here.  The lifecycle method overrides in SnapStoreUploadJob were subtly wrong and thus bypassed celeryRunOnCommit when retrying.  The lease was held for much longer than the retry delay, even after the job had finished, which introduced several minutes of unnecessary delay (before fixing this, I checked that all the iterReady implementations honour Job.ready_jobs).  Finally, we needed a shorter retry delay for the first few attempts to poll the status endpoint; that polling doesn't involve much work on the Launchpad side, and it may well succeed quite quickly for small packages.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/snap-upload-better-retry into lp:launchpad.
=== modified file 'lib/lp/code/model/branchmergeproposaljob.py'
--- lib/lp/code/model/branchmergeproposaljob.py	2016-09-06 15:34:38 +0000
+++ lib/lp/code/model/branchmergeproposaljob.py	2017-06-29 18:31:47 +0000
@@ -675,8 +675,13 @@
                 continue
             # We have now seen this merge proposal.
             seen_merge_proposals.add(bmp.id)
-            # If the job is running, then skip it
-            if job.status == JobStatus.RUNNING:
+            # If the job is running or can't currently be run due to its
+            # lease or its start time, then skip it.
+            if (job.status == JobStatus.RUNNING or
+                (job.lease_expires is not None and
+                 job.lease_expires >= datetime.now(pytz.UTC)) or
+                (job.scheduled_start is not None and
+                 job.scheduled_start > datetime.now(pytz.UTC))):
                 continue
             derived_job = bmp_job.makeDerived()
             # If the job is an update preview diff, then check that it is

=== modified file 'lib/lp/code/model/tests/test_branchmergeproposaljobs.py'
--- lib/lp/code/model/tests/test_branchmergeproposaljobs.py	2017-05-25 17:22:38 +0000
+++ lib/lp/code/model/tests/test_branchmergeproposaljobs.py	2017-06-29 18:31:47 +0000
@@ -559,6 +559,40 @@
         jobs = self.job_source.iterReady()
         self.assertEqual(0, len(jobs))
 
+    def test_iterReady_new_merge_proposal_update_diff_leased(self):
+        # If either the diff or the email job has an acquired lease, then
+        # iterReady skips it.
+        self.makeBranchMergeProposal(
+            set_state=BranchMergeProposalStatus.NEEDS_REVIEW)
+        [update_diff_job] = self.job_source.iterReady()
+        self.assertIsInstance(update_diff_job, UpdatePreviewDiffJob)
+        update_diff_job.acquireLease()
+        self.assertEqual(0, len(self.job_source.iterReady()))
+        update_diff_job.start()
+        update_diff_job.complete()
+        [email_job] = self.job_source.iterReady()
+        self.assertIsInstance(email_job, MergeProposalNeedsReviewEmailJob)
+        email_job.acquireLease()
+        self.assertEqual(0, len(self.job_source.iterReady()))
+
+    def test_iterReady_new_merge_proposal_update_diff_scheduled(self):
+        # If either the diff or the email job has a scheduled start time in
+        # the future, then iterReady skips it.
+        self.makeBranchMergeProposal(
+            set_state=BranchMergeProposalStatus.NEEDS_REVIEW)
+        [update_diff_job] = self.job_source.iterReady()
+        self.assertIsInstance(update_diff_job, UpdatePreviewDiffJob)
+        update_diff_job.start()
+        update_diff_job.queue()
+        self.assertEqual(0, len(self.job_source.iterReady()))
+        update_diff_job.start()
+        update_diff_job.complete()
+        [email_job] = self.job_source.iterReady()
+        self.assertIsInstance(email_job, MergeProposalNeedsReviewEmailJob)
+        email_job.start()
+        email_job.queue()
+        self.assertEqual(0, len(self.job_source.iterReady()))
+
     def makeBranchMergeProposal(self, set_state=None):
         # Make a merge proposal that would have a ready update diff job.
         bmp = self.factory.makeBranchMergeProposal(set_state=set_state)

=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py	2015-10-14 15:22:01 +0000
+++ lib/lp/services/job/model/job.py	2017-06-29 18:31:47 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2013 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2017 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """ORM object representing jobs."""
@@ -206,6 +206,8 @@
         if self.status != JobStatus.WAITING:
             self._set_status(JobStatus.WAITING)
         self.date_finished = datetime.datetime.now(UTC)
+        # Release the lease to allow short retry delays to be effective.
+        self.lease_expires = None
         if add_commit_hook is not None:
             add_commit_hook()
         if manage_transaction:

=== modified file 'lib/lp/services/job/tests/test_celery.py'
--- lib/lp/services/job/tests/test_celery.py	2015-08-04 02:26:26 +0000
+++ lib/lp/services/job/tests/test_celery.py	2017-06-29 18:31:47 +0000
@@ -1,4 +1,4 @@
-# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# Copyright 2012-2017 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Tests for running jobs via Celery."""
@@ -93,9 +93,10 @@
             self.job.lease_expires = datetime.now(UTC)
             raise RetryException
         elif self.job.attempt_count == 2:
-            # The retry delay is 5 seconds, but the lease is for nearly
-            # 10 seconds, so the job will be rescheduled 10 seconds in
-            # the future.
+            # The retry delay is 5 seconds, but the lease is for nearly 10
+            # seconds. However, the job releases the lease when it's
+            # requeued, so the job will again be rescheduled for 5 seconds
+            # (retry_delay) in the future.
             raise RetryException
 
 
@@ -185,10 +186,9 @@
             iso8601.parse_date(d)
             for d in job.job.base_json_data['dates_started']]
 
-        # The first attempt's lease is set to the end of the job, so
-        # the second attempt should start roughly 5 seconds after the
-        # first. The third attempt has to wait out the full 10 second
-        # lease, so it should start roughly 10 seconds after the second.
+        # The first attempt's lease is set to the end of the job, so the
+        # second attempt should start roughly 5 seconds after the first. The
+        # third attempt should start roughly 5 seconds after the second.
         self.assertThat(dates_started, HasLength(3))
         self.assertThat(dates_started,
             MatchesListwise([
@@ -197,8 +197,8 @@
                     GreaterThan(dates_started[0] + timedelta(seconds=4)),
                     LessThan(dates_started[0] + timedelta(seconds=8))),
                 MatchesAll(
-                    GreaterThan(dates_started[1] + timedelta(seconds=8)),
-                    LessThan(dates_started[1] + timedelta(seconds=13))),
+                    GreaterThan(dates_started[1] + timedelta(seconds=4)),
+                    LessThan(dates_started[1] + timedelta(seconds=8))),
                 ]))
         self.assertEqual(3, job.attempt_count)
         self.assertEqual(JobStatus.COMPLETED, job.status)

=== modified file 'lib/lp/services/job/tests/test_job.py'
--- lib/lp/services/job/tests/test_job.py	2015-07-30 01:50:11 +0000
+++ lib/lp/services/job/tests/test_job.py	2017-06-29 18:31:47 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2017 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 __metaclass__ = type
@@ -7,11 +7,11 @@
     datetime,
     timedelta,
     )
-from pytz import UTC
 import time
 
 from lazr.jobrunner.jobrunner import LeaseHeld
 import pytz
+from pytz import UTC
 from storm.locals import Store
 from testtools.matchers import Equals
 import transaction
@@ -186,6 +186,13 @@
         self.assertNotEqual(None, job.date_finished)
         self.assertEqual(job.status, JobStatus.WAITING)
 
+    def test_queue_clears_lease_expires(self):
+        """Queueing a job releases its lease."""
+        job = Job(_status=JobStatus.RUNNING)
+        job.lease_expires = UTC_NOW
+        job.queue()
+        self.assertIsNone(job.lease_expires)
+
     def test_suspend(self):
         """A job that is in the WAITING state can be suspended."""
         job = Job(_status=JobStatus.WAITING)
@@ -218,12 +225,12 @@
             job.status,
             JobStatus.WAITING)
 
-    def test_resume_clears_lease_expiry(self):
-        """A job that resumes should null out the lease_expiry."""
+    def test_resume_clears_lease_expires(self):
+        """A job that resumes should null out the lease_expires."""
         job = Job(_status=JobStatus.SUSPENDED)
         job.lease_expires = UTC_NOW
         job.resume()
-        self.assertIs(None, job.lease_expires)
+        self.assertIsNone(job.lease_expires)
 
     def test_resume_when_running(self):
         """When a job is running, attempting to resume is invalid."""

=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py	2016-05-06 09:28:28 +0000
+++ lib/lp/services/job/tests/test_runner.py	2017-06-29 18:31:47 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2016 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2017 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Tests for job-running facilities."""
@@ -349,6 +349,7 @@
             MatchesAll(
                 GreaterThan(expected_delay - timedelta(minutes=1)),
                 LessThan(expected_delay + timedelta(minutes=1))))
+        self.assertIsNone(job.lease_expires)
         self.assertNotIn(job, runner.completed_jobs)
         self.assertIn(job, runner.incomplete_jobs)
 

=== modified file 'lib/lp/snappy/model/snapbuildjob.py'
--- lib/lp/snappy/model/snapbuildjob.py	2017-06-14 10:25:23 +0000
+++ lib/lp/snappy/model/snapbuildjob.py	2017-06-29 18:31:47 +0000
@@ -197,7 +197,6 @@
         )
 
     retry_error_types = (UploadNotScannedYetResponse, RetryableSnapStoreError)
-    retry_delay = timedelta(minutes=1)
     max_retries = 20
 
     config = config.ISnapStoreUploadJobSource
@@ -255,29 +254,29 @@
     # Ideally we'd just override Job._set_status or similar, but
     # lazr.delegates makes that difficult, so we use this to override all
     # the individual Job lifecycle methods instead.
-    def _do_lifecycle(self, method, *args, **kwargs):
+    def _do_lifecycle(self, method_name, *args, **kwargs):
         old_store_upload_status = self.snapbuild.store_upload_status
-        method(*args, **kwargs)
+        getattr(super(SnapStoreUploadJob, self), method_name)(*args, **kwargs)
         if self.snapbuild.store_upload_status != old_store_upload_status:
             notify(SnapBuildStoreUploadStatusChangedEvent(self.snapbuild))
 
     def start(self, *args, **kwargs):
-        self._do_lifecycle(self.job.start, *args, **kwargs)
+        self._do_lifecycle("start", *args, **kwargs)
 
     def complete(self, *args, **kwargs):
-        self._do_lifecycle(self.job.complete, *args, **kwargs)
+        self._do_lifecycle("complete", *args, **kwargs)
 
     def fail(self, *args, **kwargs):
-        self._do_lifecycle(self.job.fail, *args, **kwargs)
+        self._do_lifecycle("fail", *args, **kwargs)
 
     def queue(self, *args, **kwargs):
-        self._do_lifecycle(self.job.queue, *args, **kwargs)
+        self._do_lifecycle("queue", *args, **kwargs)
 
     def suspend(self, *args, **kwargs):
-        self._do_lifecycle(self.job.suspend, *args, **kwargs)
+        self._do_lifecycle("suspend", *args, **kwargs)
 
     def resume(self, *args, **kwargs):
-        self._do_lifecycle(self.job.resume, *args, **kwargs)
+        self._do_lifecycle("resume", *args, **kwargs)
 
     def getOopsVars(self):
         """See `IRunnableJob`."""
@@ -285,15 +284,34 @@
         oops_vars.append(('error_detail', self.error_detail))
         return oops_vars
 
+    @property
+    def retry_delay(self):
+        """See `BaseRunnableJob`."""
+        if "status_url" in self.metadata and self.store_url is None:
+            # At the moment we have to poll the status endpoint to find out
+            # if the store has finished scanning.  Try to deal with easy
+            # cases quickly without hammering our job runners or the store
+            # too badly.
+            delays = (15, 15, 30, 30)
+            try:
+                return timedelta(seconds=delays[self.attempt_count - 1])
+            except IndexError:
+                pass
+        return timedelta(minutes=1)
+
     def run(self):
         """See `IRunnableJob`."""
         client = getUtility(ISnapStoreClient)
         try:
             if "status_url" not in self.metadata:
                 self.metadata["status_url"] = client.upload(self.snapbuild)
+                # We made progress, so reset attempt_count.
+                self.attempt_count = 1
             if self.store_url is None:
                 self.store_url, self.store_revision = (
                     client.checkStatus(self.metadata["status_url"]))
+                # We made progress, so reset attempt_count.
+                self.attempt_count = 1
             if self.snapbuild.snap.store_channels:
                 if self.store_revision is None:
                     raise ManualReview(

=== modified file 'lib/lp/snappy/tests/test_snapbuildjob.py'
--- lib/lp/snappy/tests/test_snapbuildjob.py	2017-06-14 10:25:23 +0000
+++ lib/lp/snappy/tests/test_snapbuildjob.py	2017-06-29 18:31:47 +0000
@@ -7,6 +7,8 @@
 
 __metaclass__ = type
 
+from datetime import timedelta
+
 from fixtures import FakeLogger
 from testtools.matchers import (
     Equals,
@@ -256,7 +258,6 @@
         self.assertWebhookDeliveries(snapbuild, ["Pending"])
         # Try again.  The upload part of the job is retried, and this time
         # it succeeds.
-        job.lease_expires = None
         job.scheduled_start = None
         client.upload.calls = []
         client.upload.failure = None
@@ -403,7 +404,6 @@
         self.assertWebhookDeliveries(snapbuild, ["Pending"])
         # Try again.  The upload part of the job is not retried, and this
         # time the scan completes.
-        job.lease_expires = None
         job.scheduled_start = None
         client.upload.calls = []
         client.checkStatus.calls = []
@@ -594,3 +594,39 @@
             snapbuild.id, footer)
         self.assertWebhookDeliveries(
             snapbuild, ["Pending", "Failed to release to channels"])
+
+    def test_retry_delay(self):
+        # The job is retried every minute, unless it just made one of its
+        # first four attempts to poll the status endpoint, in which case the
+        # delays are 15/15/30/30 seconds.
+        self.useFixture(FakeLogger())
+        snapbuild = self.makeSnapBuild()
+        job = SnapStoreUploadJob.create(snapbuild)
+        client = FakeSnapStoreClient()
+        client.upload.failure = UploadFailedResponse(
+            "Proxy error", can_retry=True)
+        self.useFixture(ZopeUtilityFixture(client, ISnapStoreClient))
+        with dbuser(config.ISnapStoreUploadJobSource.dbuser):
+            JobRunner([job]).runAll()
+        self.assertNotIn("status_url", job.metadata)
+        self.assertEqual(timedelta(seconds=60), job.retry_delay)
+        job.scheduled_start = None
+        client.upload.failure = None
+        client.upload.result = self.status_url
+        client.checkStatus.failure = UploadNotScannedYetResponse()
+        for expected_delay in (15, 15, 30, 30, 60):
+            with dbuser(config.ISnapStoreUploadJobSource.dbuser):
+                JobRunner([job]).runAll()
+            self.assertIn("status_url", job.metadata)
+            self.assertIsNone(job.store_url)
+            self.assertEqual(
+                timedelta(seconds=expected_delay), job.retry_delay)
+            job.scheduled_start = None
+        client.checkStatus.failure = None
+        client.checkStatus.result = (self.store_url, 1)
+        with dbuser(config.ISnapStoreUploadJobSource.dbuser):
+            JobRunner([job]).runAll()
+        self.assertEqual(self.store_url, job.store_url)
+        self.assertIsNone(job.error_message)
+        self.assertEqual([], pop_notifications())
+        self.assertEqual(JobStatus.COMPLETED, job.job.status)


Follow ups