← Back to team overview

launchpad-reviewers team mailing list archive

Re: [Merge] lp:~thomir/launchpad/devel-gpg-layer into lp:launchpad

 

Review: Approve code



Diff comments:

> 
> === modified file 'lib/lp/services/gpg/handler.py'
> --- lib/lp/services/gpg/handler.py	2015-09-26 02:55:36 +0000
> +++ lib/lp/services/gpg/handler.py	2016-03-14 02:02:05 +0000
> @@ -619,3 +618,120 @@
>          self.name = uid.name
>          self.email = uid.email
>          self.comment = uid.comment
> +
> +
> +def sanitize_fingerprint(fingerprint):
> +    """Sanitize a GPG fingerprint.
> +
> +    This is the ultimate implementation of IGPGHandler.sanitizeFingerprint, and
> +    is also used by the IGPGClient implementation.
> +    """
> +    # remove whitespaces, truncate to max of 40 (as per v4 keys) and
> +    # convert to upper case
> +    fingerprint = fingerprint.replace(' ', '')
> +    fingerprint = fingerprint[:40].upper()
> +
> +    if not valid_fingerprint(fingerprint):
> +        return None
> +
> +    return fingerprint
> +
> +
> +def sanitize_fingerprint_or_raise(fingerprint):
> +    """Check the sanity of 'fingerprint'.
> +
> +    If 'fingerprint' is a valid fingerprint, the sanitised version will be
> +    returned (see sanitize_fingerprint).
> +
> +    Otherwise, a ValueError will be raised.
> +    """
> +    sane_fingerprint = sanitize_fingerprint(fingerprint)
> +    if sane_fingerprint is None:
> +        raise ValueError("Invalid fingerprint: %r." % fingerprint)
> +    return sane_fingerprint
> +
> +
> +@implementer(IGPGClient)
> +class GPGClient:
> +    """See IGPGClient."""
> +
> +    def __init__(self):
> +        self.write_hooks = set()
> +
> +    def getKeysForOwner(self, owner_id):
> +        """See IGPGClient."""
> +        path = '/users/%s/keys' % self._encode_owner_id(owner_id)
> +        resp = self._request('get', path)
> +        if resp.status_code != http_codes['OK']:
> +            self.raise_for_error(resp)
> +        return resp.json()
> +
> +    def addKeyForOwner(self, owner_id, fingerprint):
> +        """See IGPGClient."""
> +        fingerprint = sanitize_fingerprint_or_raise(fingerprint)
> +        path = '/users/%s/keys' % self._encode_owner_id(owner_id)
> +        data = dict(fingerprint=fingerprint)
> +        resp = self._request('post', path, data)
> +        if resp.status_code == http_codes['CREATED']:
> +            self._notify_writes()
> +        else:
> +            self.raise_for_error(resp)
> +
> +    def disableKeyForOwner(self, owner_id, fingerprint):
> +        """See IGPGClient."""
> +        fingerprint = sanitize_fingerprint_or_raise(fingerprint)
> +        path = '/users/%s/keys/%s' % (self._encode_owner_id(owner_id), fingerprint)
> +        resp = self._request('delete', path)
> +        if resp.status_code == http_codes['OK']:
> +            self._notify_writes()
> +        else:
> +            self.raise_for_error(resp)
> +
> +    def registerWriteHook(self, hook_callable):
> +        """See IGPGClient."""
> +        if not callable(hook_callable):
> +            raise TypeError("'hook_callable' parameter must be a callable.")
> +        self.write_hooks.add(hook_callable)
> +
> +    def unregisterWriteHook(self, hook_callable):
> +        """See IGPGClient."""
> +        if hook_callable not in self.write_hooks:
> +            raise ValueError("%r not registered.")
> +        self.write_hooks.remove(hook_callable)
> +
> +    def _notify_writes(self):
> +        errors = []
> +        for hook in self.write_hooks:
> +            try:
> +                hook()
> +            except Exception as e:
> +                errors.append(str(e))
> +        if errors:
> +            raise Exception("The operation succeeded, but one or more write"
> +                            " hooks failed: %s" % ', '.join(errors))
> +
> +    def _encode_owner_id(self, owner_id):
> +        return base64.b64encode(owner_id, altchars='-_')
> +
> +    def raise_for_error(self, response):
> +        """Raise GPGServiceException based on what's in 'response'."""
> +        if response.headers['Content-Type'] == 'application/json':
> +            message = response.json()['status']
> +        else:
> +            message = "Unhandled service error. HTTP Status: %d HTTP Body: %s" % (
> +                response.status_code, response.content)
> +        raise GPGServiceException(message)
> +
> +    @property
> +    def timeout(self):
> +        # Perhaps this should be from config?
> +        return 30.0
> +
> +    @property
> +    def endpoint(self):
> +        return "http://{}".format(config.gpgservice.api_endpoint)

We should probably revise config.gpgservice.api_endpoint to be a full URL before we set anything in the production configs. Then it can contain any path/auth/whatever information that we might require.

> +
> +    def _request(self, method, path, data=None, **kwargs):
> +        response = getattr(requests, method)(
> +            urljoin(self.endpoint, path), json=data, timeout=self.timeout, **kwargs)
> +        return response


-- 
https://code.launchpad.net/~thomir/launchpad/devel-gpg-layer/+merge/286440
Your team Launchpad code reviewers is subscribed to branch lp:launchpad.


References