← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~wgrant/launchpad/webhook-delivery-tweaks into lp:launchpad

 

William Grant has proposed merging lp:~wgrant/launchpad/webhook-delivery-tweaks into lp:launchpad with lp:~wgrant/launchpad/job-configurable-lease-duration as a prerequisite.

Commit message:
Webhook delivery tweaks: 30 second timeout, User-Agent, payload signatures, and faster retries.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~wgrant/launchpad/webhook-delivery-tweaks/+merge/266703

Webhook delivery tweaks: 30 second timeout, User-Agent, payload signatures, and faster retries.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~wgrant/launchpad/webhook-delivery-tweaks into lp:launchpad.
=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
--- lib/lp/services/job/tests/test_celery_configuration.py	2015-08-03 10:16:49 +0000
+++ lib/lp/services/job/tests/test_celery_configuration.py	2015-08-03 10:51:28 +0000
@@ -2,6 +2,7 @@
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 from contextlib import contextmanager
+
 from testtools.matchers import MatchesRegex
 
 from lp.services.config import config

=== modified file 'lib/lp/services/webhooks/client.py'
--- lib/lp/services/webhooks/client.py	2015-07-12 23:36:55 +0000
+++ lib/lp/services/webhooks/client.py	2015-08-03 10:51:28 +0000
@@ -8,16 +8,32 @@
     'WebhookClient',
     ]
 
+import hashlib
+import hmac
+import json
+
 import requests
 from zope.interface import implementer
 
 from lp.services.webhooks.interfaces import IWebhookClient
 
 
+def create_request(user_agent, secret, payload):
+    body = json.dumps(payload)
+    headers = {
+        'User-Agent': user_agent,
+        'Content-Type': 'application/json',
+        }
+    if secret is not None:
+        hexdigest = hmac.new(secret, body, digestmod=hashlib.sha1).hexdigest()
+        headers['X-Hub-Signature'] = 'sha1=%s' % hexdigest
+    return (body, headers)
+
+
 @implementer(IWebhookClient)
 class WebhookClient:
 
-    def deliver(self, url, proxy, payload):
+    def deliver(self, url, proxy, user_agent, timeout, secret, payload):
         """See `IWebhookClient`."""
         # We never want to execute a job if there's no proxy configured, as
         # we'd then be sending near-arbitrary requests from a trusted
@@ -32,8 +48,11 @@
         session = requests.Session()
         session.trust_env = False
         session.headers = {}
-        preq = session.prepare_request(
-            requests.Request('POST', url, json=payload))
+
+        body, headers = create_request(user_agent, secret, payload)
+        preq = session.prepare_request(requests.Request(
+            'POST', url, data=body, headers=headers))
+
         result = {
             'request': {
                 'url': url,
@@ -43,7 +62,7 @@
                 },
             }
         try:
-            resp = session.send(preq, proxies=proxies)
+            resp = session.send(preq, proxies=proxies, timeout=timeout)
             result['response'] = {
                 'status_code': resp.status_code,
                 'headers': dict(resp.headers),

=== modified file 'lib/lp/services/webhooks/interfaces.py'
--- lib/lp/services/webhooks/interfaces.py	2015-07-29 08:37:51 +0000
+++ lib/lp/services/webhooks/interfaces.py	2015-08-03 10:51:28 +0000
@@ -249,7 +249,7 @@
 
 class IWebhookClient(Interface):
 
-    def deliver(self, url, proxy, payload):
+    def deliver(self, url, proxy, user_agent, timeout, secret, payload):
         """Deliver a payload to a webhook endpoint.
 
         Returns a dict of request and response details. The 'request' key
@@ -260,6 +260,13 @@
         cannot be the fault of the remote endpoint. For example, a 404 will
         return a response, and a DNS error returns a connection_error, but
         the proxy being offline will raise an exception.
+
+        The timeout is just given to the underlying requests library, so
+        it only provides connect and inter-read timeouts. A reliable
+        overall request timeout will require another mechanism.
+
+        If secret is not None, a PubSubHubbub-compatible X-Hub-Signature
+        header will be sent using HMAC-SHA1.
         """
 
 patch_collection_property(IWebhook, 'deliveries', IWebhookDeliveryJob)

=== modified file 'lib/lp/services/webhooks/model.py'
--- lib/lp/services/webhooks/model.py	2015-07-29 08:31:06 +0000
+++ lib/lp/services/webhooks/model.py	2015-08-03 10:51:28 +0000
@@ -38,6 +38,7 @@
     )
 from zope.security.proxy import removeSecurityProxy
 
+import lp.app.versioninfo
 from lp.registry.model.person import Person
 from lp.services.config import config
 from lp.services.database.bulk import load_related
@@ -296,6 +297,13 @@
     retry_error_types = (WebhookDeliveryRetry,)
     user_error_types = (WebhookDeliveryFailure,)
 
+    # The request timeout is 30 seconds, requests timeouts aren't
+    # totally reliable so we also have a relatively low celery timeout
+    # as a backup. The celery timeout and lease expiry have a bit of
+    # slack to cope with slow job start/finish without conflicts.
+    soft_time_limit = timedelta(seconds=45)
+    lease_duration = timedelta(seconds=60)
+
     # Effectively infinite, as we give up by checking
     # retry_automatically and raising a fatal exception instead.
     max_retries = 1000
@@ -366,14 +374,20 @@
 
     @property
     def retry_delay(self):
-        if self._time_since_first_attempt < timedelta(hours=1):
+        if self._time_since_first_attempt < timedelta(minutes=10):
+            return timedelta(minutes=1)
+        elif self._time_since_first_attempt < timedelta(hours=1):
             return timedelta(minutes=5)
         else:
             return timedelta(hours=1)
 
     def run(self):
+        user_agent = '%s-Webhooks/r%s' % (
+            config.vhost.mainsite.hostname, lp.app.versioninfo.revno)
+        secret = self.webhook.secret
         result = getUtility(IWebhookClient).deliver(
             self.webhook.delivery_url, config.webhooks.http_proxy,
+            user_agent, 30, secret.encode('utf-8') if secret else None,
             self.payload)
         # Request and response headers and body may be large, so don't
         # store them in the frequently-used JSON. We could store them in

=== modified file 'lib/lp/services/webhooks/tests/test_webhookjob.py'
--- lib/lp/services/webhooks/tests/test_webhookjob.py	2015-07-29 08:31:06 +0000
+++ lib/lp/services/webhooks/tests/test_webhookjob.py	2015-08-03 10:51:28 +0000
@@ -5,12 +5,16 @@
 
 __metaclass__ = type
 
-from datetime import timedelta
+from datetime import (
+    datetime,
+    timedelta,
+    )
 
 from httmock import (
     HTTMock,
     urlmatch,
     )
+from pytz import utc
 import requests
 from storm.store import Store
 from testtools import TestCase
@@ -23,18 +27,24 @@
     KeysEqual,
     LessThan,
     MatchesAll,
+    MatchesDict,
     MatchesStructure,
     Not,
     )
 import transaction
 from zope.component import getUtility
+from zope.security.proxy import removeSecurityProxy
 
+from lp.app.versioninfo import revno
 from lp.services.features.testing import FeatureFixture
 from lp.services.job.interfaces.job import JobStatus
 from lp.services.job.runner import JobRunner
 from lp.services.job.tests import block_on_job
 from lp.services.scripts.tests import run_script
-from lp.services.webhooks.client import WebhookClient
+from lp.services.webhooks.client import (
+    create_request,
+    WebhookClient,
+    )
 from lp.services.webhooks.interfaces import (
     IWebhookClient,
     IWebhookDeliveryJob,
@@ -140,56 +150,88 @@
             result = WebhookClient().deliver(
                 'http://hookep.com/foo',
                 {'http': 'http://squid.example.com:3128'},
-                {'foo': 'bar'})
+                'TestWebhookClient', 30, 'sekrit', {'foo': 'bar'})
 
         return reqs, result
 
+    @property
+    def request_matcher(self):
+        return MatchesDict({
+            'url': Equals('http://hookep.com/foo'),
+            'method': Equals('POST'),
+            'headers': Equals(
+                {'Content-Type': 'application/json',
+                 'Content-Length': '14',
+                 'User-Agent': 'TestWebhookClient',
+                 'X-Hub-Signature':
+                    'sha1=de75f136c37d89f5eb24834468c1ecd602fa95dd',
+                 }),
+            'body': Equals('{"foo": "bar"}'),
+            })
+
     def test_sends_request(self):
         [request], result = self.sendToWebhook()
-        self.assertEqual(
-            {'Content-Type': 'application/json', 'Content-Length': '14'},
-            result['request']['headers'])
-        self.assertEqual('{"foo": "bar"}', result['request']['body'])
-        self.assertEqual(200, result['response']['status_code'])
-        self.assertEqual({}, result['response']['headers'])
-        self.assertEqual('Content', result['response']['body'])
+        self.assertThat(
+            result,
+            MatchesDict({
+                'request': self.request_matcher,
+                'response': MatchesDict({
+                    'status_code': Equals(200),
+                    'headers': Equals({}),
+                    'body': Equals('Content'),
+                    }),
+                }))
 
     def test_accepts_404(self):
         [request], result = self.sendToWebhook(response_status=404)
-        self.assertEqual(
-            {'Content-Type': 'application/json', 'Content-Length': '14'},
-            result['request']['headers'])
-        self.assertEqual('{"foo": "bar"}', result['request']['body'])
-        self.assertEqual(404, result['response']['status_code'])
-        self.assertEqual({}, result['response']['headers'])
-        self.assertEqual('Content', result['response']['body'])
+        self.assertThat(
+            result,
+            MatchesDict({
+                'request': self.request_matcher,
+                'response': MatchesDict({
+                    'status_code': Equals(404),
+                    'headers': Equals({}),
+                    'body': Equals('Content'),
+                    }),
+                }))
 
     def test_connection_error(self):
         # Attempts that fail to connect have a connection_error rather
         # than a response.
         reqs, result = self.sendToWebhook(
             raises=requests.ConnectionError('Connection refused'))
-        self.assertNotIn('response', result)
-        self.assertEqual(
-            'Connection refused', result['connection_error'])
+        self.assertThat(
+            result,
+            MatchesDict({
+                'request': self.request_matcher,
+                'connection_error': Equals('Connection refused'),
+                }))
         self.assertEqual([], reqs)
 
 
-class MockWebhookClient:
+class MockWebhookClient(WebhookClient):
 
     def __init__(self, response_status=200, raises=None):
         self.response_status = response_status
         self.raises = raises
         self.requests = []
 
-    def deliver(self, url, proxy, payload):
-        result = {'request': {}}
+    def deliver(self, url, proxy, user_agent, timeout, secret, payload):
+        body, headers = create_request(user_agent, secret, payload)
+        result = {
+            'request': {
+                'url': url,
+                'method': 'POST',
+                'headers': headers,
+                'body': body,
+                },
+            }
         if isinstance(self.raises, requests.ConnectionError):
             result['connection_error'] = str(self.raises)
         elif self.raises is not None:
             raise self.raises
         else:
-            self.requests.append(('POST', url))
+            self.requests.append(('POST', url, result['request']['headers']))
             result['response'] = {'status_code': self.response_status}
         return result
 
@@ -199,8 +241,10 @@
 
     layer = ZopelessDatabaseLayer
 
-    def makeAndRunJob(self, response_status=200, raises=None, mock=True):
-        hook = self.factory.makeWebhook(delivery_url=u'http://hookep.com/foo')
+    def makeAndRunJob(self, response_status=200, raises=None, mock=True,
+                      secret=None):
+        hook = self.factory.makeWebhook(
+            delivery_url=u'http://hookep.com/foo', secret=secret)
         job = WebhookDeliveryJob.create(hook, payload={'foo': 'bar'})
 
         client = MockWebhookClient(
@@ -217,6 +261,21 @@
         self.assertProvides(
             WebhookDeliveryJob.create(hook, payload={}), IWebhookDeliveryJob)
 
+    def test_short_lease_and_timeout(self):
+        # Webhook jobs have a request timeout of 30 seconds, a celery
+        # timeout of 45 seconds, and a lease of 60 seconds, to give
+        # reasonable time for sluggish things to catch up.
+        hook = self.factory.makeWebhook()
+        job = hook.ping()
+        job.acquireLease()
+        self.assertThat(
+            job.lease_expires - datetime.now(utc),
+            MatchesAll(
+                GreaterThan(timedelta(seconds=50)),
+                LessThan(timedelta(seconds=60))))
+        self.assertEqual(
+            timedelta(seconds=45), removeSecurityProxy(job).soft_time_limit)
+
     def test_run_200(self):
         # A request that returns 200 is a success.
         with CaptureOops() as oopses:
@@ -235,7 +294,26 @@
                             {'response': ContainsDict(
                                 {'status_code': Equals(200)})}))})))
         self.assertEqual(1, len(reqs))
-        self.assertEqual([('POST', 'http://hookep.com/foo')], reqs)
+        self.assertEqual([
+            ('POST', 'http://hookep.com/foo',
+             {'Content-Type': 'application/json',
+              'User-Agent': 'launchpad.dev-Webhooks/r%s' % revno}),
+            ], reqs)
+        self.assertEqual([], oopses.oopses)
+
+    def test_run_signature(self):
+        # If the webhook has a secret, the request is signed in a
+        # PubSubHubbub-compatible way.
+        with CaptureOops() as oopses:
+            job, reqs = self.makeAndRunJob(
+                response_status=200, secret=u'sekrit')
+        self.assertEqual([
+            ('POST', 'http://hookep.com/foo',
+             {'Content-Type': 'application/json',
+              'User-Agent': 'launchpad.dev-Webhooks/r%s' % revno,
+              'X-Hub-Signature':
+                'sha1=de75f136c37d89f5eb24834468c1ecd602fa95dd'}),
+            ], reqs)
         self.assertEqual([], oopses.oopses)
 
     def test_run_404(self):
@@ -321,7 +399,7 @@
         # Deliveries are retried every 5 minutes for the first hour, and
         # every hour thereafter.
         job, reqs = self.makeAndRunJob(response_status=404)
-        self.assertEqual(timedelta(minutes=5), job.retry_delay)
+        self.assertEqual(timedelta(minutes=1), job.retry_delay)
         job.json_data['date_first_sent'] = (
             job.date_first_sent - timedelta(minutes=30)).isoformat()
         self.assertEqual(timedelta(minutes=5), job.retry_delay)

=== modified file 'lib/lp/testing/factory.py'
--- lib/lp/testing/factory.py	2015-07-30 14:57:06 +0000
+++ lib/lp/testing/factory.py	2015-08-03 10:51:28 +0000
@@ -4525,14 +4525,13 @@
         return ProxyFactory(
             LiveFSFile(livefsbuild=livefsbuild, libraryfile=libraryfile))
 
-    def makeWebhook(self, target=None, delivery_url=None):
+    def makeWebhook(self, target=None, delivery_url=None, secret=None):
         if target is None:
             target = self.makeGitRepository()
         if delivery_url is None:
             delivery_url = self.getUniqueURL().decode('utf-8')
         return getUtility(IWebhookSource).new(
-            target, self.makePerson(), delivery_url, [], True,
-            self.getUniqueUnicode())
+            target, self.makePerson(), delivery_url, [], True, secret)
 
     def makeSnap(self, registrant=None, owner=None, distroseries=None,
                  name=None, branch=None, git_ref=None,


Follow ups