← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~wgrant/launchpad/webhook-retries into lp:launchpad

 

William Grant has proposed merging lp:~wgrant/launchpad/webhook-retries into lp:launchpad with lp:~wgrant/launchpad/job-scheduled_start-retries as a prerequisite.

Commit message:
Automatically retry webhook deliveries for 24 hours after they are first attempted.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #342729 in Launchpad itself: "Should support post-commit webhooks"
  https://bugs.launchpad.net/launchpad/+bug/342729

For more details, see:
https://code.launchpad.net/~wgrant/launchpad/webhook-retries/+merge/266193

Automatically retry webhook deliveries for 24 hours after they are first attempted.

Rather than using the normal count-based timeout mechanism, we retry every so often until at least 24 hours have elapsed.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~wgrant/launchpad/webhook-retries into lp:launchpad.
=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py	2015-07-29 08:38:11 +0000
+++ lib/lp/services/job/model/job.py	2015-07-29 08:38:11 +0000
@@ -102,8 +102,8 @@
              JobStatus.FAILED,
              JobStatus.SUSPENDED,
              JobStatus.WAITING),
-        JobStatus.FAILED: (),
-        JobStatus.COMPLETED: (),
+        JobStatus.FAILED: (JobStatus.WAITING,),
+        JobStatus.COMPLETED: (JobStatus.WAITING,),
         JobStatus.SUSPENDED:
             (JobStatus.WAITING,),
         }

=== modified file 'lib/lp/services/webhooks/interfaces.py'
--- lib/lp/services/webhooks/interfaces.py	2015-07-20 07:01:40 +0000
+++ lib/lp/services/webhooks/interfaces.py	2015-07-29 08:38:11 +0000
@@ -14,6 +14,8 @@
     'IWebhookJobSource',
     'IWebhookSource',
     'IWebhookTarget',
+    'WebhookDeliveryFailure',
+    'WebhookDeliveryRetry',
     'WebhookFeatureDisabled',
     ]
 
@@ -70,6 +72,16 @@
             self, "This webhook feature is not available yet.")
 
 
+class WebhookDeliveryFailure(Exception):
+    """A webhook delivery failed and should not be retried."""
+    pass
+
+
+class WebhookDeliveryRetry(Exception):
+    """A webhook delivery failed and should be retried."""
+    pass
+
+
 class IWebhook(Interface):
 
     export_as_webservice_entry(as_of='beta')
@@ -200,6 +212,11 @@
     date_created = exported(Datetime(
         title=_("Date created"), required=True, readonly=True))
 
+    date_first_sent = exported(Datetime(
+        title=_("Date first sent"),
+        description=_("Timestamp of the first delivery attempt."),
+        required=False, readonly=True))
+
     date_sent = exported(Datetime(
         title=_("Date sent"),
         description=_("Timestamp of the last delivery attempt."),

=== modified file 'lib/lp/services/webhooks/model.py'
--- lib/lp/services/webhooks/model.py	2015-07-20 07:01:40 +0000
+++ lib/lp/services/webhooks/model.py	2015-07-29 08:38:11 +0000
@@ -10,7 +10,10 @@
     'WebhookTargetMixin',
     ]
 
-import datetime
+from datetime import (
+    datetime,
+    timedelta,
+    )
 
 import iso8601
 from lazr.delegates import delegate_to
@@ -18,7 +21,7 @@
     DBEnumeratedType,
     DBItem,
     )
-import pytz
+from pytz import utc
 from storm.properties import (
     Bool,
     DateTime,
@@ -60,6 +63,8 @@
     IWebhookJob,
     IWebhookJobSource,
     IWebhookSource,
+    WebhookDeliveryFailure,
+    WebhookDeliveryRetry,
     WebhookFeatureDisabled,
     )
 
@@ -87,8 +92,8 @@
 
     registrant_id = Int(name='registrant', allow_none=False)
     registrant = Reference(registrant_id, 'Person.id')
-    date_created = DateTime(tzinfo=pytz.UTC, allow_none=False)
-    date_last_modified = DateTime(tzinfo=pytz.UTC, allow_none=False)
+    date_created = DateTime(tzinfo=utc, allow_none=False)
+    date_last_modified = DateTime(tzinfo=utc, allow_none=False)
 
     delivery_url = Unicode(allow_none=False)
     active = Bool(default=True, allow_none=False)
@@ -247,7 +252,7 @@
     def deleteByIDs(webhookjob_ids):
         """See `IWebhookJobSource`."""
         # Assumes that Webhook's PK is its FK to Job.id.
-        webookjob_ids = list(webhookjob_ids)
+        webhookjob_ids = list(webhookjob_ids)
         IStore(WebhookJob).find(
             WebhookJob, WebhookJob.job_id.is_in(webhookjob_ids)).remove()
         IStore(Job).find(Job, Job.id.is_in(webhookjob_ids)).remove()
@@ -288,6 +293,12 @@
 
     class_job_type = WebhookJobType.DELIVERY
 
+    retry_error_types = (WebhookDeliveryRetry,)
+
+    # Effectively infinite, as we give up by checking
+    # retry_automatically and raising a fatal exception instead.
+    max_retries = 1000
+
     config = config.IWebhookDeliveryJobSource
 
     @classmethod
@@ -306,10 +317,25 @@
     def successful(self):
         if 'result' not in self.json_data:
             return None
-        if 'connection_error' in self.json_data['result']:
-            return False
+        return self.failure_detail is None
+
+    @property
+    def failure_detail(self):
+        if 'result' not in self.json_data:
+            return None
+        connection_error = self.json_data['result'].get('connection_error')
+        if connection_error is not None:
+            return 'Connection error: %s' % connection_error
         status_code = self.json_data['result']['response']['status_code']
-        return 200 <= status_code <= 299
+        if 200 <= status_code <= 299:
+            return None
+        return 'Bad HTTP response: %d' % status_code
+
+    @property
+    def date_first_sent(self):
+        if 'date_first_sent' not in self.json_data:
+            return None
+        return iso8601.parse_date(self.json_data['date_first_sent'])
 
     @property
     def date_sent(self):
@@ -321,6 +347,21 @@
     def payload(self):
         return self.json_data['payload']
 
+    @property
+    def _time_since_first_attempt(self):
+        return datetime.now(utc) - (self.date_first_sent or self.date_created)
+
+    @property
+    def retry_automatically(self):
+        return self._time_since_first_attempt < timedelta(days=1)
+
+    @property
+    def retry_delay(self):
+        if self._time_since_first_attempt < timedelta(hours=1):
+            return timedelta(minutes=5)
+        else:
+            return timedelta(hours=1)
+
     def run(self):
         result = getUtility(IWebhookClient).deliver(
             self.webhook.delivery_url, config.webhooks.http_proxy,
@@ -334,5 +375,13 @@
                     del result[direction][attr]
         updated_data = self.json_data
         updated_data['result'] = result
-        updated_data['date_sent'] = datetime.datetime.now(pytz.UTC).isoformat()
+        updated_data['date_sent'] = datetime.now(utc).isoformat()
+        if 'date_first_sent' not in updated_data:
+            updated_data['date_first_sent'] = updated_data['date_sent']
         self.json_data = updated_data
+
+        if not self.successful:
+            if self.retry_automatically:
+                raise WebhookDeliveryRetry()
+            else:
+                raise WebhookDeliveryFailure(self.failure_detail)

=== modified file 'lib/lp/services/webhooks/tests/test_webhookjob.py'
--- lib/lp/services/webhooks/tests/test_webhookjob.py	2015-07-17 01:17:05 +0000
+++ lib/lp/services/webhooks/tests/test_webhookjob.py	2015-07-29 08:38:11 +0000
@@ -5,6 +5,8 @@
 
 __metaclass__ = type
 
+from datetime import timedelta
+
 from httmock import (
     HTTMock,
     urlmatch,
@@ -16,6 +18,7 @@
     Contains,
     ContainsDict,
     Equals,
+    GreaterThan,
     Is,
     KeysEqual,
     MatchesAll,
@@ -235,15 +238,15 @@
         self.assertEqual([], oopses.oopses)
 
     def test_run_404(self):
-        # The job succeeds even if the response is an error. A job only
-        # fails if it was definitely a problem on our end.
+        # A request that returns a non-2xx response is a failure and
+        # gets retried.
         with CaptureOops() as oopses:
             job, reqs = self.makeAndRunJob(response_status=404)
         self.assertThat(
             job,
             MatchesStructure(
-                status=Equals(JobStatus.COMPLETED),
-                pending=Equals(False),
+                status=Equals(JobStatus.WAITING),
+                pending=Equals(True),
                 successful=Equals(False),
                 date_sent=Not(Is(None)),
                 json_data=ContainsDict(
@@ -256,16 +259,16 @@
         self.assertEqual([], oopses.oopses)
 
     def test_run_connection_error(self):
-        # Jobs that fail to connecthave a connection_error rather than a
-        # response.
+        # Jobs that fail to connect have a connection_error rather than a
+        # response. They too trigger a retry.
         with CaptureOops() as oopses:
             job, reqs = self.makeAndRunJob(
                 raises=requests.ConnectionError('Connection refused'))
         self.assertThat(
             job,
             MatchesStructure(
-                status=Equals(JobStatus.COMPLETED),
-                pending=Equals(False),
+                status=Equals(JobStatus.WAITING),
+                pending=Equals(True),
                 successful=Equals(False),
                 date_sent=Not(Is(None)),
                 json_data=ContainsDict(
@@ -298,6 +301,90 @@
         self.assertEqual(
             'No webhook proxy configured.', oopses.oopses[0]['value'])
 
+    def test_date_first_sent(self):
+        job, reqs = self.makeAndRunJob(response_status=404)
+        self.assertEqual(job.date_first_sent, job.date_sent)
+        orig_first_sent = job.date_first_sent
+        self.assertEqual(JobStatus.WAITING, job.status)
+        self.assertEqual(1, job.attempt_count)
+        job.lease_expires = None
+        job.scheduled_start = None
+        with dbuser("webhookrunner"):
+            JobRunner([job]).runAll()
+        self.assertEqual(JobStatus.WAITING, job.status)
+        self.assertEqual(2, job.attempt_count)
+        self.assertNotEqual(job.date_first_sent, job.date_sent)
+        self.assertEqual(orig_first_sent, job.date_first_sent)
+
+    def test_retry_delay(self):
+        # Deliveries are retried every 5 minutes for the first hour, and
+        # every hour thereafter.
+        job, reqs = self.makeAndRunJob(response_status=404)
+        self.assertEqual(timedelta(minutes=5), job.retry_delay)
+        job.json_data['date_first_sent'] = (
+            job.date_first_sent - timedelta(minutes=30)).isoformat()
+        self.assertEqual(timedelta(minutes=5), job.retry_delay)
+        job.json_data['date_first_sent'] = (
+            job.date_first_sent - timedelta(minutes=30)).isoformat()
+        self.assertEqual(timedelta(hours=1), job.retry_delay)
+
+    def test_retry_automatically(self):
+        # Deliveries are automatically retried until 24 hours after the
+        # initial attempt.
+        job, reqs = self.makeAndRunJob(response_status=404)
+        self.assertTrue(job.retry_automatically)
+        job.json_data['date_first_sent'] = (
+            job.date_first_sent - timedelta(hours=24)).isoformat()
+        self.assertFalse(job.retry_automatically)
+
+    def test_automatic_retries(self):
+        hook = self.factory.makeWebhook()
+        job = WebhookDeliveryJob.create(hook, payload={'foo': 'bar'})
+
+        client = MockWebhookClient(response_status=404)
+        self.useFixture(ZopeUtilityFixture(client, IWebhookClient))
+
+        def run_job(job):
+            with dbuser("webhookrunner"):
+                runner = JobRunner([job])
+                runner.runAll()
+            if len(runner.completed_jobs) == 1 and not runner.incomplete_jobs:
+                return True
+            if len(runner.incomplete_jobs) == 1 and not runner.completed_jobs:
+                job.lease_expires = None
+                return False
+            if not runner.incomplete_jobs and not runner.completed_jobs:
+                return None
+            raise Exception("Unexpected jobs.")
+
+        # The first attempt fails but schedules a retry five minutes later.
+        self.assertEqual(False, run_job(job))
+        self.assertEqual(JobStatus.WAITING, job.status)
+        self.assertEqual(False, job.successful)
+        self.assertTrue(job.pending)
+        self.assertIsNot(None, job.date_sent)
+        last_date_sent = job.date_sent
+
+        # Pretend we're five minutes in the future and try again. The
+        # job will be retried again.
+        job.json_data['date_first_sent'] = (
+            job.date_first_sent - timedelta(minutes=5)).isoformat()
+        job.scheduled_start -= timedelta(minutes=5)
+        self.assertEqual(False, run_job(job))
+        self.assertEqual(JobStatus.WAITING, job.status)
+        self.assertEqual(False, job.successful)
+        self.assertTrue(job.pending)
+        self.assertThat(job.date_sent, GreaterThan(last_date_sent))
+
+        # If the job was first tried a day ago, the next attempt gives up.
+        job.json_data['date_first_sent'] = (
+            job.date_first_sent - timedelta(hours=24)).isoformat()
+        job.scheduled_start -= timedelta(hours=24)
+        self.assertEqual(False, run_job(job))
+        self.assertEqual(JobStatus.FAILED, job.status)
+        self.assertEqual(False, job.successful)
+        self.assertFalse(job.pending)
+
 
 class TestViaCronscript(TestCaseWithFactory):
 
@@ -313,9 +400,12 @@
             'cronscripts/process-job-source.py', ['IWebhookDeliveryJobSource'],
             expect_returncode=0)
         self.assertEqual('', stdout)
-        self.assertIn('INFO    Ran 1 WebhookDeliveryJob jobs.\n', stderr)
+        self.assertIn(
+            'WARNING Scheduling retry due to WebhookDeliveryRetry', stderr)
+        self.assertIn(
+            'INFO    1 WebhookDeliveryJob jobs did not complete.\n', stderr)
 
-        self.assertEqual(JobStatus.COMPLETED, job.status)
+        self.assertEqual(JobStatus.WAITING, job.status)
         self.assertIn(
             'Cannot connect to proxy',
             job.json_data['result']['connection_error'])
@@ -335,7 +425,7 @@
             job = WebhookDeliveryJob.create(hook, payload={'foo': 'bar'})
             transaction.commit()
 
-        self.assertEqual(JobStatus.COMPLETED, job.status)
+        self.assertEqual(JobStatus.WAITING, job.status)
         self.assertIn(
             'Cannot connect to proxy',
             job.json_data['result']['connection_error'])

=== modified file 'lib/lp/services/webhooks/tests/test_webservice.py'
--- lib/lp/services/webhooks/tests/test_webservice.py	2015-07-20 07:01:40 +0000
+++ lib/lp/services/webhooks/tests/test_webservice.py	2015-07-29 08:38:11 +0000
@@ -160,9 +160,9 @@
             representation,
             MatchesAll(
                 KeysEqual(
-                    'date_created', 'date_sent', 'http_etag', 'payload',
-                    'pending', 'resource_type_link', 'self_link',
-                    'successful', 'web_link', 'webhook_link'),
+                    'date_created', 'date_first_sent', 'date_sent',
+                    'http_etag', 'payload', 'pending', 'resource_type_link',
+                    'self_link', 'successful', 'web_link', 'webhook_link'),
                 ContainsDict(
                     {'payload': Equals({'ping': True}),
                     'pending': Equals(True),


Follow ups