launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #19133
[Merge] lp:~wgrant/launchpad/webhook-delivery-tweaks into lp:launchpad
William Grant has proposed merging lp:~wgrant/launchpad/webhook-delivery-tweaks into lp:launchpad with lp:~wgrant/launchpad/job-configurable-lease-duration as a prerequisite.
Commit message:
Webhook delivery tweaks: 30 second timeout, User-Agent, payload signatures, and faster retries.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~wgrant/launchpad/webhook-delivery-tweaks/+merge/266703
Webhook delivery tweaks: 30 second timeout, User-Agent, payload signatures, and faster retries.
--
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~wgrant/launchpad/webhook-delivery-tweaks into lp:launchpad.
=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
--- lib/lp/services/job/tests/test_celery_configuration.py 2015-08-03 10:16:49 +0000
+++ lib/lp/services/job/tests/test_celery_configuration.py 2015-08-03 10:51:28 +0000
@@ -2,6 +2,7 @@
# GNU Affero General Public License version 3 (see the file LICENSE).
from contextlib import contextmanager
+
from testtools.matchers import MatchesRegex
from lp.services.config import config
=== modified file 'lib/lp/services/webhooks/client.py'
--- lib/lp/services/webhooks/client.py 2015-07-12 23:36:55 +0000
+++ lib/lp/services/webhooks/client.py 2015-08-03 10:51:28 +0000
@@ -8,16 +8,32 @@
'WebhookClient',
]
+import hashlib
+import hmac
+import json
+
import requests
from zope.interface import implementer
from lp.services.webhooks.interfaces import IWebhookClient
+def create_request(user_agent, secret, payload):
+ body = json.dumps(payload)
+ headers = {
+ 'User-Agent': user_agent,
+ 'Content-Type': 'application/json',
+ }
+ if secret is not None:
+ hexdigest = hmac.new(secret, body, digestmod=hashlib.sha1).hexdigest()
+ headers['X-Hub-Signature'] = 'sha1=%s' % hexdigest
+ return (body, headers)
+
+
@implementer(IWebhookClient)
class WebhookClient:
- def deliver(self, url, proxy, payload):
+ def deliver(self, url, proxy, user_agent, timeout, secret, payload):
"""See `IWebhookClient`."""
# We never want to execute a job if there's no proxy configured, as
# we'd then be sending near-arbitrary requests from a trusted
@@ -32,8 +48,11 @@
session = requests.Session()
session.trust_env = False
session.headers = {}
- preq = session.prepare_request(
- requests.Request('POST', url, json=payload))
+
+ body, headers = create_request(user_agent, secret, payload)
+ preq = session.prepare_request(requests.Request(
+ 'POST', url, data=body, headers=headers))
+
result = {
'request': {
'url': url,
@@ -43,7 +62,7 @@
},
}
try:
- resp = session.send(preq, proxies=proxies)
+ resp = session.send(preq, proxies=proxies, timeout=timeout)
result['response'] = {
'status_code': resp.status_code,
'headers': dict(resp.headers),
=== modified file 'lib/lp/services/webhooks/interfaces.py'
--- lib/lp/services/webhooks/interfaces.py 2015-07-29 08:37:51 +0000
+++ lib/lp/services/webhooks/interfaces.py 2015-08-03 10:51:28 +0000
@@ -249,7 +249,7 @@
class IWebhookClient(Interface):
- def deliver(self, url, proxy, payload):
+ def deliver(self, url, proxy, user_agent, timeout, secret, payload):
"""Deliver a payload to a webhook endpoint.
Returns a dict of request and response details. The 'request' key
@@ -260,6 +260,13 @@
cannot be the fault of the remote endpoint. For example, a 404 will
return a response, and a DNS error returns a connection_error, but
the proxy being offline will raise an exception.
+
+ The timeout is just given to the underlying requests library, so
+ it only provides connect and inter-read timeouts. A reliable
+ overall request timeout will require another mechanism.
+
+ If secret is not None, a PubSubHubbub-compatible X-Hub-Signature
+ header will be sent using HMAC-SHA1.
"""
patch_collection_property(IWebhook, 'deliveries', IWebhookDeliveryJob)
=== modified file 'lib/lp/services/webhooks/model.py'
--- lib/lp/services/webhooks/model.py 2015-07-29 08:31:06 +0000
+++ lib/lp/services/webhooks/model.py 2015-08-03 10:51:28 +0000
@@ -38,6 +38,7 @@
)
from zope.security.proxy import removeSecurityProxy
+import lp.app.versioninfo
from lp.registry.model.person import Person
from lp.services.config import config
from lp.services.database.bulk import load_related
@@ -296,6 +297,13 @@
retry_error_types = (WebhookDeliveryRetry,)
user_error_types = (WebhookDeliveryFailure,)
+ # The request timeout is 30 seconds, requests timeouts aren't
+ # totally reliable so we also have a relatively low celery timeout
+ # as a backup. The celery timeout and lease expiry have a bit of
+ # slack to cope with slow job start/finish without conflicts.
+ soft_time_limit = timedelta(seconds=45)
+ lease_duration = timedelta(seconds=60)
+
# Effectively infinite, as we give up by checking
# retry_automatically and raising a fatal exception instead.
max_retries = 1000
@@ -366,14 +374,20 @@
@property
def retry_delay(self):
- if self._time_since_first_attempt < timedelta(hours=1):
+ if self._time_since_first_attempt < timedelta(minutes=10):
+ return timedelta(minutes=1)
+ elif self._time_since_first_attempt < timedelta(hours=1):
return timedelta(minutes=5)
else:
return timedelta(hours=1)
def run(self):
+ user_agent = '%s-Webhooks/r%s' % (
+ config.vhost.mainsite.hostname, lp.app.versioninfo.revno)
+ secret = self.webhook.secret
result = getUtility(IWebhookClient).deliver(
self.webhook.delivery_url, config.webhooks.http_proxy,
+ user_agent, 30, secret.encode('utf-8') if secret else None,
self.payload)
# Request and response headers and body may be large, so don't
# store them in the frequently-used JSON. We could store them in
=== modified file 'lib/lp/services/webhooks/tests/test_webhookjob.py'
--- lib/lp/services/webhooks/tests/test_webhookjob.py 2015-07-29 08:31:06 +0000
+++ lib/lp/services/webhooks/tests/test_webhookjob.py 2015-08-03 10:51:28 +0000
@@ -5,12 +5,16 @@
__metaclass__ = type
-from datetime import timedelta
+from datetime import (
+ datetime,
+ timedelta,
+ )
from httmock import (
HTTMock,
urlmatch,
)
+from pytz import utc
import requests
from storm.store import Store
from testtools import TestCase
@@ -23,18 +27,24 @@
KeysEqual,
LessThan,
MatchesAll,
+ MatchesDict,
MatchesStructure,
Not,
)
import transaction
from zope.component import getUtility
+from zope.security.proxy import removeSecurityProxy
+from lp.app.versioninfo import revno
from lp.services.features.testing import FeatureFixture
from lp.services.job.interfaces.job import JobStatus
from lp.services.job.runner import JobRunner
from lp.services.job.tests import block_on_job
from lp.services.scripts.tests import run_script
-from lp.services.webhooks.client import WebhookClient
+from lp.services.webhooks.client import (
+ create_request,
+ WebhookClient,
+ )
from lp.services.webhooks.interfaces import (
IWebhookClient,
IWebhookDeliveryJob,
@@ -140,56 +150,88 @@
result = WebhookClient().deliver(
'http://hookep.com/foo',
{'http': 'http://squid.example.com:3128'},
- {'foo': 'bar'})
+ 'TestWebhookClient', 30, 'sekrit', {'foo': 'bar'})
return reqs, result
+ @property
+ def request_matcher(self):
+ return MatchesDict({
+ 'url': Equals('http://hookep.com/foo'),
+ 'method': Equals('POST'),
+ 'headers': Equals(
+ {'Content-Type': 'application/json',
+ 'Content-Length': '14',
+ 'User-Agent': 'TestWebhookClient',
+ 'X-Hub-Signature':
+ 'sha1=de75f136c37d89f5eb24834468c1ecd602fa95dd',
+ }),
+ 'body': Equals('{"foo": "bar"}'),
+ })
+
def test_sends_request(self):
[request], result = self.sendToWebhook()
- self.assertEqual(
- {'Content-Type': 'application/json', 'Content-Length': '14'},
- result['request']['headers'])
- self.assertEqual('{"foo": "bar"}', result['request']['body'])
- self.assertEqual(200, result['response']['status_code'])
- self.assertEqual({}, result['response']['headers'])
- self.assertEqual('Content', result['response']['body'])
+ self.assertThat(
+ result,
+ MatchesDict({
+ 'request': self.request_matcher,
+ 'response': MatchesDict({
+ 'status_code': Equals(200),
+ 'headers': Equals({}),
+ 'body': Equals('Content'),
+ }),
+ }))
def test_accepts_404(self):
[request], result = self.sendToWebhook(response_status=404)
- self.assertEqual(
- {'Content-Type': 'application/json', 'Content-Length': '14'},
- result['request']['headers'])
- self.assertEqual('{"foo": "bar"}', result['request']['body'])
- self.assertEqual(404, result['response']['status_code'])
- self.assertEqual({}, result['response']['headers'])
- self.assertEqual('Content', result['response']['body'])
+ self.assertThat(
+ result,
+ MatchesDict({
+ 'request': self.request_matcher,
+ 'response': MatchesDict({
+ 'status_code': Equals(404),
+ 'headers': Equals({}),
+ 'body': Equals('Content'),
+ }),
+ }))
def test_connection_error(self):
# Attempts that fail to connect have a connection_error rather
# than a response.
reqs, result = self.sendToWebhook(
raises=requests.ConnectionError('Connection refused'))
- self.assertNotIn('response', result)
- self.assertEqual(
- 'Connection refused', result['connection_error'])
+ self.assertThat(
+ result,
+ MatchesDict({
+ 'request': self.request_matcher,
+ 'connection_error': Equals('Connection refused'),
+ }))
self.assertEqual([], reqs)
-class MockWebhookClient:
+class MockWebhookClient(WebhookClient):
def __init__(self, response_status=200, raises=None):
self.response_status = response_status
self.raises = raises
self.requests = []
- def deliver(self, url, proxy, payload):
- result = {'request': {}}
+ def deliver(self, url, proxy, user_agent, timeout, secret, payload):
+ body, headers = create_request(user_agent, secret, payload)
+ result = {
+ 'request': {
+ 'url': url,
+ 'method': 'POST',
+ 'headers': headers,
+ 'body': body,
+ },
+ }
if isinstance(self.raises, requests.ConnectionError):
result['connection_error'] = str(self.raises)
elif self.raises is not None:
raise self.raises
else:
- self.requests.append(('POST', url))
+ self.requests.append(('POST', url, result['request']['headers']))
result['response'] = {'status_code': self.response_status}
return result
@@ -199,8 +241,10 @@
layer = ZopelessDatabaseLayer
- def makeAndRunJob(self, response_status=200, raises=None, mock=True):
- hook = self.factory.makeWebhook(delivery_url=u'http://hookep.com/foo')
+ def makeAndRunJob(self, response_status=200, raises=None, mock=True,
+ secret=None):
+ hook = self.factory.makeWebhook(
+ delivery_url=u'http://hookep.com/foo', secret=secret)
job = WebhookDeliveryJob.create(hook, payload={'foo': 'bar'})
client = MockWebhookClient(
@@ -217,6 +261,21 @@
self.assertProvides(
WebhookDeliveryJob.create(hook, payload={}), IWebhookDeliveryJob)
+ def test_short_lease_and_timeout(self):
+ # Webhook jobs have a request timeout of 30 seconds, a celery
+ # timeout of 45 seconds, and a lease of 60 seconds, to give
+ # reasonable time for sluggish things to catch up.
+ hook = self.factory.makeWebhook()
+ job = hook.ping()
+ job.acquireLease()
+ self.assertThat(
+ job.lease_expires - datetime.now(utc),
+ MatchesAll(
+ GreaterThan(timedelta(seconds=50)),
+ LessThan(timedelta(seconds=60))))
+ self.assertEqual(
+ timedelta(seconds=45), removeSecurityProxy(job).soft_time_limit)
+
def test_run_200(self):
# A request that returns 200 is a success.
with CaptureOops() as oopses:
@@ -235,7 +294,26 @@
{'response': ContainsDict(
{'status_code': Equals(200)})}))})))
self.assertEqual(1, len(reqs))
- self.assertEqual([('POST', 'http://hookep.com/foo')], reqs)
+ self.assertEqual([
+ ('POST', 'http://hookep.com/foo',
+ {'Content-Type': 'application/json',
+ 'User-Agent': 'launchpad.dev-Webhooks/r%s' % revno}),
+ ], reqs)
+ self.assertEqual([], oopses.oopses)
+
+ def test_run_signature(self):
+ # If the webhook has a secret, the request is signed in a
+ # PubSubHubbub-compatible way.
+ with CaptureOops() as oopses:
+ job, reqs = self.makeAndRunJob(
+ response_status=200, secret=u'sekrit')
+ self.assertEqual([
+ ('POST', 'http://hookep.com/foo',
+ {'Content-Type': 'application/json',
+ 'User-Agent': 'launchpad.dev-Webhooks/r%s' % revno,
+ 'X-Hub-Signature':
+ 'sha1=de75f136c37d89f5eb24834468c1ecd602fa95dd'}),
+ ], reqs)
self.assertEqual([], oopses.oopses)
def test_run_404(self):
@@ -321,7 +399,7 @@
# Deliveries are retried every 5 minutes for the first hour, and
# every hour thereafter.
job, reqs = self.makeAndRunJob(response_status=404)
- self.assertEqual(timedelta(minutes=5), job.retry_delay)
+ self.assertEqual(timedelta(minutes=1), job.retry_delay)
job.json_data['date_first_sent'] = (
job.date_first_sent - timedelta(minutes=30)).isoformat()
self.assertEqual(timedelta(minutes=5), job.retry_delay)
=== modified file 'lib/lp/testing/factory.py'
--- lib/lp/testing/factory.py 2015-07-30 14:57:06 +0000
+++ lib/lp/testing/factory.py 2015-08-03 10:51:28 +0000
@@ -4525,14 +4525,13 @@
return ProxyFactory(
LiveFSFile(livefsbuild=livefsbuild, libraryfile=libraryfile))
- def makeWebhook(self, target=None, delivery_url=None):
+ def makeWebhook(self, target=None, delivery_url=None, secret=None):
if target is None:
target = self.makeGitRepository()
if delivery_url is None:
delivery_url = self.getUniqueURL().decode('utf-8')
return getUtility(IWebhookSource).new(
- target, self.makePerson(), delivery_url, [], True,
- self.getUniqueUnicode())
+ target, self.makePerson(), delivery_url, [], True, secret)
def makeSnap(self, registrant=None, owner=None, distroseries=None,
name=None, branch=None, git_ref=None,
Follow ups