launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #30446
[Merge] ~ines-almeida/launchpad:project-tokens/refactor-access-token-mixins into launchpad:master
Ines Almeida has proposed merging ~ines-almeida/launchpad:project-tokens/refactor-access-token-mixins into launchpad:master.
Commit message:
Move "issueAccessToken" logic to generic Access Token Target model and interface.
Logic was originally in Git Repository model/interface. Moving it to a generic place makes adding new access token targets much easier
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~ines-almeida/launchpad/+git/launchpad/+merge/450898
This will make adding project-scoped access tokens much easier and without duplication
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~ines-almeida/launchpad:project-tokens/refactor-access-token-mixins into launchpad:master.
diff --git a/lib/lp/code/interfaces/gitrepository.py b/lib/lp/code/interfaces/gitrepository.py
index 354d2df..fc8c0ae 100644
--- a/lib/lp/code/interfaces/gitrepository.py
+++ b/lib/lp/code/interfaces/gitrepository.py
@@ -83,7 +83,10 @@ from lp.registry.interfaces.personproduct import IPersonProductFactory
from lp.registry.interfaces.product import IProduct
from lp.registry.interfaces.role import IPersonRoles
from lp.services.auth.enums import AccessTokenScope
-from lp.services.auth.interfaces import IAccessTokenTarget
+from lp.services.auth.interfaces import (
+ IAccessTokenTarget,
+ IAccessTokenTargetEdit,
+)
from lp.services.fields import InlineObject, PersonChoice, PublicPersonChoice
from lp.services.webhooks.interfaces import IWebhookTarget
@@ -129,7 +132,7 @@ def git_repository_name_validator(name):
return True
-class IGitRepositoryView(IHasRecipes):
+class IGitRepositoryView(IHasRecipes, IAccessTokenTarget):
"""IGitRepository attributes that require launchpad.View permission."""
id = exported(Int(title=_("ID"), readonly=True, required=True))
@@ -877,48 +880,6 @@ class IGitRepositoryView(IHasRecipes):
:return: A `ResultSet` of `IGitActivity`.
"""
- # XXX cjwatson 2021-10-13: This should move to IAccessTokenTarget, but
- # currently has rather too much backward-compatibility code for that.
- @operation_parameters(
- description=TextLine(
- title=_("A short description of the token."), required=False
- ),
- scopes=List(
- title=_("A list of scopes to be granted by this token."),
- value_type=Choice(vocabulary=AccessTokenScope),
- required=False,
- ),
- date_expires=Datetime(
- title=_("When the token should expire."), required=False
- ),
- )
- @export_write_operation()
- @operation_for_version("devel")
- def issueAccessToken(description=None, scopes=None, date_expires=None):
- """Issue an access token for this repository.
-
- Access tokens can be used to push to this repository over HTTPS.
- They are only valid for a single repository, and have a short expiry
- period (currently fixed at one week), so at the moment they are only
- suitable in some limited situations. By default they are currently
- implemented as macaroons.
-
- If `description` and `scopes` are both given, then issue a personal
- access token instead, either non-expiring or with an expiry time
- given by `date_expires`. These may be used in webservice API
- requests for certain methods on this repository.
-
- This interface is experimental, and may be changed or removed
- without notice.
-
- :return: If `description` and `scopes` are both given, the secret
- for a new personal access token (Launchpad only records the hash
- of this secret and not the secret itself, so the caller must be
- careful to save this; personal access tokens are in development
- and may not entirely work yet). Otherwise, a serialised
- macaroon.
- """
-
@operation_parameters(
commit_sha1=copy_field(IRevisionStatusReport["commit_sha1"])
)
@@ -1054,7 +1015,7 @@ class IGitRepositoryExpensiveRequest(Interface):
that is not an admin or a registry expert."""
-class IGitRepositoryEdit(IWebhookTarget, IAccessTokenTarget):
+class IGitRepositoryEdit(IWebhookTarget, IAccessTokenTargetEdit):
"""IGitRepository methods that require launchpad.Edit permission."""
@mutator_for(IGitRepositoryView["name"])
diff --git a/lib/lp/code/model/gitrepository.py b/lib/lp/code/model/gitrepository.py
index 85240eb..633b508 100644
--- a/lib/lp/code/model/gitrepository.py
+++ b/lib/lp/code/model/gitrepository.py
@@ -126,9 +126,7 @@ from lp.registry.model.accesspolicy import (
)
from lp.registry.model.person import Person
from lp.registry.model.teammembership import TeamParticipation
-from lp.services.auth.interfaces import IAccessTokenSet
from lp.services.auth.model import AccessTokenTargetMixin
-from lp.services.auth.utils import create_access_token_secret
from lp.services.config import config
from lp.services.database import bulk
from lp.services.database.constants import DEFAULT, UTC_NOW
@@ -327,6 +325,9 @@ class GitRepository(
name="builder_constraints", allow_none=True
)
+ # Matches `GitRepositoryMacaroonIssuer.identifier`
+ macaroon_issuer_identifier = "git-repository"
+
def __init__(
self,
repository_type,
@@ -1880,56 +1881,6 @@ class GitRepository(
results, pre_iter_hook=preloadDataForActivities
)
- def _issuePersonalAccessToken(
- self, user, description, scopes, date_expires=None
- ):
- """Issue a personal access token for this repository."""
- if user is None:
- raise Unauthorized(
- "Personal access tokens may only be issued for a logged-in "
- "user."
- )
- secret = create_access_token_secret()
- getUtility(IAccessTokenSet).new(
- secret,
- owner=user,
- description=description,
- target=self,
- scopes=scopes,
- date_expires=date_expires,
- )
- return secret
-
- # XXX cjwatson 2021-10-13: Remove this once lp.code.xmlrpc.git accepts
- # pushes using personal access tokens.
- def _issueMacaroon(self, user):
- """Issue a macaroon for this repository."""
- issuer = getUtility(IMacaroonIssuer, "git-repository")
- # Our security adapter has already done the checks we need, apart
- # from forbidding anonymous users which is done by the issuer.
- return (
- removeSecurityProxy(issuer)
- .issueMacaroon(self, user=user)
- .serialize()
- )
-
- def issueAccessToken(
- self, owner=None, description=None, scopes=None, date_expires=None
- ):
- """See `IGitRepository`."""
- # It's more usual in model code to pass the user as an argument,
- # e.g. using @call_with(user=REQUEST_USER) in the webservice
- # interface. However, in this case that would allow anyone who
- # constructs a way to call this method not via the webservice to
- # issue a token for any user, which seems like a bad idea.
- user = getUtility(ILaunchBag).user
- if description is not None and scopes is not None:
- return self._issuePersonalAccessToken(
- user, description, scopes, date_expires=date_expires
- )
- else:
- return self._issueMacaroon(user)
-
def canBeDeleted(self):
"""See `IGitRepository`."""
# Can't delete if the repository is associated with anything.
diff --git a/lib/lp/code/model/tests/test_gitrepository.py b/lib/lp/code/model/tests/test_gitrepository.py
index 29e8d59..ead3153 100644
--- a/lib/lp/code/model/tests/test_gitrepository.py
+++ b/lib/lp/code/model/tests/test_gitrepository.py
@@ -30,7 +30,6 @@ from testtools.matchers import (
MatchesListwise,
MatchesSetwise,
MatchesStructure,
- StartsWith,
)
from testtools.testcase import ExpectedException
from zope.component import getUtility
@@ -150,7 +149,6 @@ from lp.registry.interfaces.personociproject import IPersonOCIProjectFactory
from lp.registry.interfaces.personproduct import IPersonProductFactory
from lp.registry.tests.test_accesspolicy import get_policies_for_artifact
from lp.services.auth.enums import AccessTokenScope
-from lp.services.auth.interfaces import IAccessTokenSet
from lp.services.authserver.xmlrpc import AuthServerAPIView
from lp.services.config import config
from lp.services.database.constants import UTC_NOW
@@ -6672,306 +6670,6 @@ class TestGitRepositoryWebservice(TestCaseWithFactory):
),
)
- def test_issueAccessToken(self):
- # A user can request an access token via the webservice API.
- self.pushConfig("codehosting", git_macaroon_secret_key="some-secret")
- repository = self.factory.makeGitRepository()
- # Write access to the repository isn't checked at this stage
- # (although the access token will only be useful if the user has
- # some kind of write access).
- requester = self.factory.makePerson()
- with person_logged_in(requester):
- repository_url = api_url(repository)
- webservice = webservice_for_person(
- requester,
- permission=OAuthPermission.WRITE_PUBLIC,
- default_api_version="devel",
- )
- response = webservice.named_post(repository_url, "issueAccessToken")
- self.assertEqual(200, response.status)
- macaroon = Macaroon.deserialize(response.jsonBody())
- with person_logged_in(ANONYMOUS):
- self.assertThat(
- macaroon,
- MatchesStructure(
- location=Equals(config.vhost.mainsite.hostname),
- identifier=Equals("git-repository"),
- caveats=MatchesListwise(
- [
- MatchesStructure.byEquality(
- caveat_id="lp.git-repository %s"
- % repository.id
- ),
- MatchesStructure(
- caveat_id=StartsWith(
- "lp.principal.openid-identifier "
- )
- ),
- MatchesStructure(
- caveat_id=StartsWith("lp.expires ")
- ),
- ]
- ),
- ),
- )
-
- def test_issueAccessToken_anonymous(self):
- # An anonymous user cannot request an access token via the
- # webservice API.
- repository = self.factory.makeGitRepository()
- with person_logged_in(repository.owner):
- repository_url = api_url(repository)
- webservice = webservice_for_person(None, default_api_version="devel")
- response = webservice.named_post(repository_url, "issueAccessToken")
- self.assertEqual(401, response.status)
- self.assertEqual(
- b"git-repository macaroons may only be issued for a logged-in "
- b"user.",
- response.body,
- )
-
- def test_issueAccessToken_personal(self):
- # A user can request a personal access token via the webservice API.
- repository = self.factory.makeGitRepository()
- # Write access to the repository isn't checked at this stage
- # (although the access token will only be useful if the user has
- # some kind of write access).
- requester = self.factory.makePerson()
- with person_logged_in(requester):
- repository_url = api_url(repository)
- webservice = webservice_for_person(
- requester,
- permission=OAuthPermission.WRITE_PUBLIC,
- default_api_version="devel",
- )
- response = webservice.named_post(
- repository_url,
- "issueAccessToken",
- description="Test token",
- scopes=["repository:build_status"],
- )
- self.assertEqual(200, response.status)
- secret = response.jsonBody()
- with person_logged_in(requester):
- token = getUtility(IAccessTokenSet).getBySecret(secret)
- self.assertThat(
- token,
- MatchesStructure(
- owner=Equals(requester),
- description=Equals("Test token"),
- target=Equals(repository),
- scopes=Equals([AccessTokenScope.REPOSITORY_BUILD_STATUS]),
- date_expires=Is(None),
- ),
- )
-
- def test_issueAccessToken_personal_with_expiry(self):
- # A user can set an expiry time when requesting a personal access
- # token via the webservice API.
- repository = self.factory.makeGitRepository()
- # Write access to the repository isn't checked at this stage
- # (although the access token will only be useful if the user has
- # some kind of write access).
- requester = self.factory.makePerson()
- with person_logged_in(requester):
- repository_url = api_url(repository)
- webservice = webservice_for_person(
- requester,
- permission=OAuthPermission.WRITE_PUBLIC,
- default_api_version="devel",
- )
- date_expires = datetime.now(timezone.utc) + timedelta(days=30)
- response = webservice.named_post(
- repository_url,
- "issueAccessToken",
- description="Test token",
- scopes=["repository:build_status"],
- date_expires=date_expires.isoformat(),
- )
- self.assertEqual(200, response.status)
- secret = response.jsonBody()
- with person_logged_in(requester):
- token = getUtility(IAccessTokenSet).getBySecret(secret)
- self.assertThat(
- token,
- MatchesStructure.byEquality(
- owner=requester,
- description="Test token",
- target=repository,
- scopes=[AccessTokenScope.REPOSITORY_BUILD_STATUS],
- date_expires=date_expires,
- ),
- )
-
- def test_issueAccessToken_personal_anonymous(self):
- # An anonymous user cannot request a personal access token via the
- # webservice API.
- repository = self.factory.makeGitRepository()
- with person_logged_in(repository.owner):
- repository_url = api_url(repository)
- webservice = webservice_for_person(None, default_api_version="devel")
- response = webservice.named_post(
- repository_url,
- "issueAccessToken",
- description="Test token",
- scopes=["repository:build_status"],
- )
- self.assertEqual(401, response.status)
- self.assertEqual(
- b"Personal access tokens may only be issued for a logged-in user.",
- response.body,
- )
-
- def test_builder_constraints_commercial_admin(self):
- # A commercial admin can change a repository's builder constraints.
- self.factory.makeBuilder(open_resources=["gpu", "large"])
- repository_db = self.factory.makeGitRepository()
- commercial_admin = getUtility(
- ILaunchpadCelebrities
- ).commercial_admin.teamowner
- webservice = webservice_for_person(
- commercial_admin, permission=OAuthPermission.WRITE_PUBLIC
- )
- webservice.default_api_version = "devel"
- with person_logged_in(ANONYMOUS):
- repository_url = api_url(repository_db)
- response = webservice.patch(
- repository_url,
- "application/json",
- json.dumps({"builder_constraints": ["gpu"]}),
- )
- self.assertEqual(209, response.status)
- with person_logged_in(ANONYMOUS):
- self.assertEqual(("gpu",), repository_db.builder_constraints)
-
- def test_builder_constraints_owner(self):
- # The owner of a repository cannot change its builder constraints
- # (unless they're also a (commercial) admin).
- self.factory.makeBuilder(open_resources=["gpu", "large"])
- repository_db = self.factory.makeGitRepository()
- webservice = webservice_for_person(
- repository_db.owner, permission=OAuthPermission.WRITE_PUBLIC
- )
- webservice.default_api_version = "devel"
- with person_logged_in(ANONYMOUS):
- repository_url = api_url(repository_db)
- response = webservice.patch(
- repository_url,
- "application/json",
- json.dumps({"builder_constraints": ["gpu"]}),
- )
- self.assertEqual(401, response.status)
- with person_logged_in(ANONYMOUS):
- self.assertIsNone(repository_db.builder_constraints)
-
- def test_builder_constraints_nonexistent(self):
- # Only known builder resources may be set as builder constraints.
- self.factory.makeBuilder(
- open_resources=["large"], restricted_resources=["gpu"]
- )
- repository_db = self.factory.makeGitRepository()
- commercial_admin = getUtility(
- ILaunchpadCelebrities
- ).commercial_admin.teamowner
- webservice = webservice_for_person(
- commercial_admin, permission=OAuthPermission.WRITE_PUBLIC
- )
- webservice.default_api_version = "devel"
- with person_logged_in(ANONYMOUS):
- repository_url = api_url(repository_db)
- response = webservice.patch(
- repository_url,
- "application/json",
- json.dumps({"builder_constraints": ["really-large"]}),
- )
- self.assertEqual(400, response.status)
- self.assertEqual(
- b"builder_constraints: 'really-large' isn't a valid token",
- response.body,
- )
-
- def test_fork_to_self(self):
- hosting_fixture = self.useFixture(GitHostingFixture())
- repository = self.factory.makeGitRepository()
- requester = self.factory.makePerson()
- repository_url = api_url(repository)
- requester_url = api_url(requester)
- webservice = webservice_for_person(
- requester,
- permission=OAuthPermission.WRITE_PUBLIC,
- default_api_version="devel",
- )
- response = webservice.named_post(
- repository_url, "fork", new_owner=requester_url
- )
- self.assertEqual(200, response.status)
- self.assertEndsWith(response.jsonBody()["owner_link"], requester_url)
- self.assertEqual(1, len(hosting_fixture.create.calls))
-
- def test_fork_to_team_as_member(self):
- hosting_fixture = self.useFixture(GitHostingFixture())
- repository = self.factory.makeGitRepository()
- requester = self.factory.makePerson()
- team = self.factory.makeTeam(members=[requester])
- repository_url = api_url(repository)
- team_url = api_url(team)
- webservice = webservice_for_person(
- requester,
- permission=OAuthPermission.WRITE_PUBLIC,
- default_api_version="devel",
- )
- response = webservice.named_post(
- repository_url, "fork", new_owner=team_url
- )
- self.assertEqual(200, response.status)
- self.assertEndsWith(response.jsonBody()["owner_link"], team_url)
- self.assertEqual(1, len(hosting_fixture.create.calls))
-
- def test_fork_to_team_as_non_member(self):
- hosting_fixture = self.useFixture(GitHostingFixture())
- repository = self.factory.makeGitRepository()
- requester = self.factory.makePerson()
- team = self.factory.makeTeam()
- repository_url = api_url(repository)
- team_url = api_url(team)
- webservice = webservice_for_person(
- requester,
- permission=OAuthPermission.WRITE_PUBLIC,
- default_api_version="devel",
- )
- response = webservice.named_post(
- repository_url, "fork", new_owner=team_url
- )
- self.assertEqual(401, response.status)
- self.assertEqual(
- b"The owner of the new repository must be you or a team of which "
- b"you are a member.",
- response.body,
- )
- self.assertEqual(0, len(hosting_fixture.create.calls))
-
- def test_fork_invisible(self):
- hosting_fixture = self.useFixture(GitHostingFixture())
- owner = self.factory.makePerson()
- repository = self.factory.makeGitRepository(
- owner=owner, information_type=InformationType.USERDATA
- )
- requester = self.factory.makePerson()
- with person_logged_in(owner):
- repository_url = api_url(repository)
- requester_url = api_url(requester)
- webservice = webservice_for_person(
- requester,
- permission=OAuthPermission.WRITE_PUBLIC,
- default_api_version="devel",
- )
- response = webservice.named_post(
- repository_url, "fork", new_owner=requester_url
- )
- self.assertEqual(401, response.status)
- self.assertIn(b"launchpad.View", response.body)
- self.assertEqual(0, len(hosting_fixture.create.calls))
-
class TestGitRepositoryMacaroonIssuer(MacaroonTestMixin, TestCaseWithFactory):
"""Test GitRepository macaroon issuing and verification."""
diff --git a/lib/lp/services/auth/interfaces.py b/lib/lp/services/auth/interfaces.py
index be90837..1250183 100644
--- a/lib/lp/services/auth/interfaces.py
+++ b/lib/lp/services/auth/interfaces.py
@@ -18,6 +18,7 @@ from lazr.restful.declarations import (
exported,
exported_as_webservice_entry,
operation_for_version,
+ operation_parameters,
operation_returns_collection_of,
)
from lazr.restful.fields import Reference
@@ -207,6 +208,46 @@ class IAccessTokenVerifiedRequest(Interface):
class IAccessTokenTarget(Interface):
"""An object that can be a target for access tokens."""
+ @operation_parameters(
+ description=TextLine(
+ title=_("A short description of the token."), required=False
+ ),
+ scopes=List(
+ title=_("A list of scopes to be granted by this token."),
+ value_type=Choice(vocabulary=AccessTokenScope),
+ required=False,
+ ),
+ date_expires=Datetime(
+ title=_("When the token should expire."), required=False
+ ),
+ )
+ @export_write_operation()
+ @operation_for_version("devel")
+ def issueAccessToken(description=None, scopes=None, date_expires=None):
+ """Issue an access token for this target.
+
+ Access tokens can be used to push to repositories over HTTPS.
+
+ If `description` and `scopes` are both given, then issue a personal
+ access token instead, either non-expiring or with an expiry time
+ given by `date_expires`. These may be used in webservice API
+ requests for certain methods in the target's repositories.
+
+ :return: If `description` and `scopes` are both given, the secret
+ for a new personal access token (Launchpad only records the hash
+ of this secret and not the secret itself, so the caller must be
+ careful to save this; personal access tokens are in development
+ and may not entirely work yet). Otherwise, a serialised
+ macaroon.
+ """
+
+
+@exported_as_webservice_entry(as_of="beta")
+class IAccessTokenTargetEdit(Interface):
+ """An object that can be a target for access tokens that requires
+ launchpad.Edit permission.
+ """
+
@call_with(visible_by_user=REQUEST_USER)
@operation_returns_collection_of(IAccessToken)
@export_read_operation()
diff --git a/lib/lp/services/auth/model.py b/lib/lp/services/auth/model.py
index 436c382..6b517e0 100644
--- a/lib/lp/services/auth/model.py
+++ b/lib/lp/services/auth/model.py
@@ -16,6 +16,7 @@ from storm.expr import SQL, And, Cast, Or, Select, Update
from storm.locals import DateTime, Int, Reference, Unicode
from zope.component import getUtility
from zope.interface import implementer
+from zope.security.interfaces import Unauthorized
from zope.security.proxy import removeSecurityProxy
from lp.code.interfaces.gitcollection import IAllGitRepositories
@@ -23,9 +24,12 @@ from lp.code.interfaces.gitrepository import IGitRepository
from lp.registry.model.teammembership import TeamParticipation
from lp.services.auth.enums import AccessTokenScope
from lp.services.auth.interfaces import IAccessToken, IAccessTokenSet
+from lp.services.auth.utils import create_access_token_secret
from lp.services.database.constants import UTC_NOW
from lp.services.database.interfaces import IPrimaryStore, IStore
from lp.services.database.stormbase import StormBase
+from lp.services.macaroons.interfaces import IMacaroonIssuer
+from lp.services.webapp.interfaces import ILaunchBag
@implementer(IAccessToken)
@@ -236,3 +240,70 @@ class AccessTokenTargetMixin:
visible_by_user=visible_by_user,
include_expired=include_expired,
)
+
+ def issueAccessToken(
+ self, description=None, scopes=None, date_expires=None
+ ):
+ # It's more usual in model code to pass the user as an argument,
+ # e.g. using @call_with(user=REQUEST_USER) in the webservice
+ # interface. However, in this case that would allow anyone who
+ # constructs a way to call this method not via the webservice to
+ # issue a token for any user, which seems like a bad idea.
+ user = getUtility(ILaunchBag).user
+ if description is not None and scopes is not None:
+ return self._issueAccessToken(
+ user, description, scopes, date_expires=date_expires
+ )
+ else:
+ return self._issueMacaroon(user)
+
+ def _issueAccessToken(self, user, description, scopes, date_expires=None):
+ """Issue an access token for this target."""
+ if user is None:
+ raise Unauthorized(
+ "Personal access tokens may only be issued for a logged-in "
+ "user."
+ )
+ secret = create_access_token_secret()
+ getUtility(IAccessTokenSet).new(
+ secret,
+ owner=user,
+ description=description,
+ target=self,
+ scopes=scopes,
+ date_expires=date_expires,
+ )
+ return secret
+
+ def _issuePersonalAccessToken(
+ self, user, description, scopes, date_expires=None
+ ):
+ """Issue a personal access token for this target."""
+ if user is None:
+ raise Unauthorized(
+ "Personal access tokens may only be issued for a logged-in "
+ "user."
+ )
+ secret = create_access_token_secret()
+ getUtility(IAccessTokenSet).new(
+ secret,
+ owner=user,
+ description=description,
+ target=self,
+ scopes=scopes,
+ date_expires=date_expires,
+ )
+ return secret
+
+ # XXX cjwatson 2021-10-13: Remove this once lp.code.xmlrpc.git accepts
+ # pushes using personal access tokens.
+ def _issueMacaroon(self, user):
+ """Issue a macaroon for this target."""
+ issuer = getUtility(IMacaroonIssuer, self.macaroon_issuer_identifier)
+ # Our security adapter has already done the checks we need, apart
+ # from forbidding anonymous users which is done by the issuer.
+ return (
+ removeSecurityProxy(issuer)
+ .issueMacaroon(self, user=user)
+ .serialize()
+ )
diff --git a/lib/lp/services/auth/tests/test_model.py b/lib/lp/services/auth/tests/test_model.py
index 9449f0c..50ad883 100644
--- a/lib/lp/services/auth/tests/test_model.py
+++ b/lib/lp/services/auth/tests/test_model.py
@@ -9,14 +9,22 @@ import signal
from datetime import datetime, timedelta, timezone
import transaction
+from pymacaroons import Macaroon
from storm.store import Store
-from testtools.matchers import Is, MatchesStructure
+from testtools.matchers import (
+ Equals,
+ Is,
+ MatchesListwise,
+ MatchesStructure,
+ StartsWith,
+)
from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
from lp.services.auth.enums import AccessTokenScope
from lp.services.auth.interfaces import IAccessTokenSet
from lp.services.auth.utils import create_access_token_secret
+from lp.services.config import config
from lp.services.database.sqlbase import (
disconnect_stores,
get_transaction_timestamp,
@@ -24,6 +32,7 @@ from lp.services.database.sqlbase import (
from lp.services.webapp.authorization import check_permission
from lp.services.webapp.interfaces import OAuthPermission
from lp.testing import (
+ ANONYMOUS,
TestCaseWithFactory,
api_url,
login,
@@ -468,6 +477,154 @@ class TestAccessTokenTargetBase:
recorder1, recorder2 = record_two_runs(get_tokens, create_token, 2)
self.assertThat(recorder2, HasQueryCount.byEquality(recorder1))
+ def test_issueAccessToken(self):
+ # A user can request an access token via the webservice API.
+ self.pushConfig("codehosting", git_macaroon_secret_key="some-secret")
+ with person_logged_in(ANONYMOUS):
+ # Write access to the repositories isn't checked at this stage
+ # (although the access token will only be useful if the user has
+ # some kind of write access).
+ requester = self.factory.makePerson()
+ with person_logged_in(requester):
+ repository_url = api_url(self.target)
+ webservice = webservice_for_person(
+ requester,
+ permission=OAuthPermission.WRITE_PUBLIC,
+ default_api_version="devel",
+ )
+ response = webservice.named_post(repository_url, "issueAccessToken")
+ self.assertEqual(200, response.status)
+ macaroon = Macaroon.deserialize(response.jsonBody())
+ with person_logged_in(ANONYMOUS):
+ self.assertThat(
+ macaroon,
+ MatchesStructure(
+ location=Equals(config.vhost.mainsite.hostname),
+ identifier=Equals("git-repository"),
+ caveats=MatchesListwise(
+ [
+ MatchesStructure.byEquality(
+ caveat_id="lp.git-repository %s"
+ % self.target.id
+ ),
+ MatchesStructure(
+ caveat_id=StartsWith(
+ "lp.principal.openid-identifier "
+ )
+ ),
+ MatchesStructure(
+ caveat_id=StartsWith("lp.expires ")
+ ),
+ ]
+ ),
+ ),
+ )
+
+ def test_issueAccessToken_anonymous(self):
+ # An anonymous user cannot request an access token via the
+ # webservice API.
+ with person_logged_in(self.owner):
+ repository_url = api_url(self.target)
+ webservice = webservice_for_person(None, default_api_version="devel")
+ response = webservice.named_post(repository_url, "issueAccessToken")
+ self.assertEqual(401, response.status)
+ self.assertEqual(
+ b"git-repository macaroons may only be issued for a logged-in "
+ b"user.",
+ response.body,
+ )
+
+ def test_issueAccessToken_personal(self):
+ # A user can request a personal access token via the webservice API.
+ with person_logged_in(ANONYMOUS):
+ # Write access to the repositories isn't checked at this stage
+ # (although the access token will only be useful if the user has
+ # some kind of write access).
+ requester = self.factory.makePerson()
+ with person_logged_in(requester):
+ repository_url = api_url(self.target)
+ webservice = webservice_for_person(
+ requester,
+ permission=OAuthPermission.WRITE_PUBLIC,
+ default_api_version="devel",
+ )
+ response = webservice.named_post(
+ repository_url,
+ "issueAccessToken",
+ description="Test token",
+ scopes=["repository:build_status"],
+ )
+ self.assertEqual(200, response.status)
+ secret = response.jsonBody()
+ with person_logged_in(requester):
+ token = getUtility(IAccessTokenSet).getBySecret(secret)
+ self.assertThat(
+ token,
+ MatchesStructure(
+ owner=Equals(requester),
+ description=Equals("Test token"),
+ target=Equals(self.target),
+ scopes=Equals([AccessTokenScope.REPOSITORY_BUILD_STATUS]),
+ date_expires=Is(None),
+ ),
+ )
+
+ def test_issueAccessToken_personal_with_expiry(self):
+ # A user can set an expiry time when requesting a personal access
+ # token via the webservice API.
+ with person_logged_in(ANONYMOUS):
+ # Write access to the repositories isn't checked at this stage
+ # (although the access token will only be useful if the user has
+ # some kind of write access).
+ requester = self.factory.makePerson()
+ with person_logged_in(requester):
+ repository_url = api_url(self.target)
+ webservice = webservice_for_person(
+ requester,
+ permission=OAuthPermission.WRITE_PUBLIC,
+ default_api_version="devel",
+ )
+ date_expires = datetime.now(timezone.utc) + timedelta(days=30)
+ response = webservice.named_post(
+ repository_url,
+ "issueAccessToken",
+ description="Test token",
+ scopes=["repository:build_status"],
+ date_expires=date_expires.isoformat(),
+ )
+ self.assertEqual(200, response.status)
+ secret = response.jsonBody()
+ with person_logged_in(requester):
+ token = getUtility(IAccessTokenSet).getBySecret(secret)
+ self.assertThat(
+ token,
+ MatchesStructure.byEquality(
+ owner=requester,
+ description="Test token",
+ target=self.target,
+ scopes=[AccessTokenScope.REPOSITORY_BUILD_STATUS],
+ date_expires=date_expires,
+ ),
+ )
+
+ def test_issueAccessToken_personal_anonymous(self):
+ # An anonymous user cannot request a personal access token via the
+ # webservice API.
+ with person_logged_in(self.owner):
+ target_url = api_url(self.target)
+ webservice = webservice_for_person(None, default_api_version="devel")
+ response = webservice.named_post(
+ target_url,
+ "issueAccessToken",
+ description="Test token",
+ scopes=["repository:build_status"],
+ )
+ self.assertEqual(401, response.status)
+ self.assertEqual(
+ b"Personal access tokens may only be issued for a logged-in user.",
+ response.body,
+ )
+
class TestAccessTokenTargetGitRepository(
TestAccessTokenTargetBase, TestCaseWithFactory