launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #20080
Re: [Merge] lp:~thomir/launchpad/devel-gpg-layer into lp:launchpad
Review: Approve code
Diff comments:
>
> === modified file 'lib/lp/services/gpg/handler.py'
> --- lib/lp/services/gpg/handler.py 2015-09-26 02:55:36 +0000
> +++ lib/lp/services/gpg/handler.py 2016-03-14 02:02:05 +0000
> @@ -619,3 +618,120 @@
> self.name = uid.name
> self.email = uid.email
> self.comment = uid.comment
> +
> +
> +def sanitize_fingerprint(fingerprint):
> + """Sanitize a GPG fingerprint.
> +
> + This is the ultimate implementation of IGPGHandler.sanitizeFingerprint, and
> + is also used by the IGPGClient implementation.
> + """
> + # remove whitespaces, truncate to max of 40 (as per v4 keys) and
> + # convert to upper case
> + fingerprint = fingerprint.replace(' ', '')
> + fingerprint = fingerprint[:40].upper()
> +
> + if not valid_fingerprint(fingerprint):
> + return None
> +
> + return fingerprint
> +
> +
> +def sanitize_fingerprint_or_raise(fingerprint):
> + """Check the sanity of 'fingerprint'.
> +
> + If 'fingerprint' is a valid fingerprint, the sanitised version will be
> + returned (see sanitize_fingerprint).
> +
> + Otherwise, a ValueError will be raised.
> + """
> + sane_fingerprint = sanitize_fingerprint(fingerprint)
> + if sane_fingerprint is None:
> + raise ValueError("Invalid fingerprint: %r." % fingerprint)
> + return sane_fingerprint
> +
> +
> +@implementer(IGPGClient)
> +class GPGClient:
> + """See IGPGClient."""
> +
> + def __init__(self):
> + self.write_hooks = set()
> +
> + def getKeysForOwner(self, owner_id):
> + """See IGPGClient."""
> + path = '/users/%s/keys' % self._encode_owner_id(owner_id)
> + resp = self._request('get', path)
> + if resp.status_code != http_codes['OK']:
> + self.raise_for_error(resp)
> + return resp.json()
> +
> + def addKeyForOwner(self, owner_id, fingerprint):
> + """See IGPGClient."""
> + fingerprint = sanitize_fingerprint_or_raise(fingerprint)
> + path = '/users/%s/keys' % self._encode_owner_id(owner_id)
> + data = dict(fingerprint=fingerprint)
> + resp = self._request('post', path, data)
> + if resp.status_code == http_codes['CREATED']:
> + self._notify_writes()
> + else:
> + self.raise_for_error(resp)
> +
> + def disableKeyForOwner(self, owner_id, fingerprint):
> + """See IGPGClient."""
> + fingerprint = sanitize_fingerprint_or_raise(fingerprint)
> + path = '/users/%s/keys/%s' % (self._encode_owner_id(owner_id), fingerprint)
> + resp = self._request('delete', path)
> + if resp.status_code == http_codes['OK']:
> + self._notify_writes()
> + else:
> + self.raise_for_error(resp)
> +
> + def registerWriteHook(self, hook_callable):
> + """See IGPGClient."""
> + if not callable(hook_callable):
> + raise TypeError("'hook_callable' parameter must be a callable.")
> + self.write_hooks.add(hook_callable)
> +
> + def unregisterWriteHook(self, hook_callable):
> + """See IGPGClient."""
> + if hook_callable not in self.write_hooks:
> + raise ValueError("%r not registered.")
> + self.write_hooks.remove(hook_callable)
> +
> + def _notify_writes(self):
> + errors = []
> + for hook in self.write_hooks:
> + try:
> + hook()
> + except Exception as e:
> + errors.append(str(e))
> + if errors:
> + raise Exception("The operation succeeded, but one or more write"
> + " hooks failed: %s" % ', '.join(errors))
> +
> + def _encode_owner_id(self, owner_id):
> + return base64.b64encode(owner_id, altchars='-_')
> +
> + def raise_for_error(self, response):
> + """Raise GPGServiceException based on what's in 'response'."""
> + if response.headers['Content-Type'] == 'application/json':
> + message = response.json()['status']
> + else:
> + message = "Unhandled service error. HTTP Status: %d HTTP Body: %s" % (
> + response.status_code, response.content)
> + raise GPGServiceException(message)
> +
> + @property
> + def timeout(self):
> + # Perhaps this should be from config?
> + return 30.0
> +
> + @property
> + def endpoint(self):
> + return "http://{}".format(config.gpgservice.api_endpoint)
We should probably revise config.gpgservice.api_endpoint to be a full URL before we set anything in the production configs. Then it can contain any path/auth/whatever information that we might require.
> +
> + def _request(self, method, path, data=None, **kwargs):
> + response = getattr(requests, method)(
> + urljoin(self.endpoint, path), json=data, timeout=self.timeout, **kwargs)
> + return response
--
https://code.launchpad.net/~thomir/launchpad/devel-gpg-layer/+merge/286440
Your team Launchpad code reviewers is subscribed to branch lp:launchpad.
References