← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~ines-almeida/launchpad:project-tokens/refactor-access-token-mixins into launchpad:master


Ines Almeida has proposed merging ~ines-almeida/launchpad:project-tokens/refactor-access-token-mixins into launchpad:master.

Commit message:
Move "issueAccessToken" logic to generic Access Token Target model and interface.

Logic was originally in Git Repository model/interface. Moving it to a generic place makes adding new access token targets much easier

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:

This will make adding project-scoped access tokens much easier and without duplication
Your team Launchpad code reviewers is requested to review the proposed merge of ~ines-almeida/launchpad:project-tokens/refactor-access-token-mixins into launchpad:master.
diff --git a/lib/lp/code/interfaces/gitrepository.py b/lib/lp/code/interfaces/gitrepository.py
index 354d2df..fc8c0ae 100644
--- a/lib/lp/code/interfaces/gitrepository.py
+++ b/lib/lp/code/interfaces/gitrepository.py
@@ -83,7 +83,10 @@ from lp.registry.interfaces.personproduct import IPersonProductFactory
 from lp.registry.interfaces.product import IProduct
 from lp.registry.interfaces.role import IPersonRoles
 from lp.services.auth.enums import AccessTokenScope
-from lp.services.auth.interfaces import IAccessTokenTarget
+from lp.services.auth.interfaces import (
+    IAccessTokenTarget,
+    IAccessTokenTargetEdit,
 from lp.services.fields import InlineObject, PersonChoice, PublicPersonChoice
 from lp.services.webhooks.interfaces import IWebhookTarget
@@ -129,7 +132,7 @@ def git_repository_name_validator(name):
     return True
-class IGitRepositoryView(IHasRecipes):
+class IGitRepositoryView(IHasRecipes, IAccessTokenTarget):
     """IGitRepository attributes that require launchpad.View permission."""
     id = exported(Int(title=_("ID"), readonly=True, required=True))
@@ -877,48 +880,6 @@ class IGitRepositoryView(IHasRecipes):
         :return: A `ResultSet` of `IGitActivity`.
-    # XXX cjwatson 2021-10-13: This should move to IAccessTokenTarget, but
-    # currently has rather too much backward-compatibility code for that.
-    @operation_parameters(
-        description=TextLine(
-            title=_("A short description of the token."), required=False
-        ),
-        scopes=List(
-            title=_("A list of scopes to be granted by this token."),
-            value_type=Choice(vocabulary=AccessTokenScope),
-            required=False,
-        ),
-        date_expires=Datetime(
-            title=_("When the token should expire."), required=False
-        ),
-    )
-    @export_write_operation()
-    @operation_for_version("devel")
-    def issueAccessToken(description=None, scopes=None, date_expires=None):
-        """Issue an access token for this repository.
-        Access tokens can be used to push to this repository over HTTPS.
-        They are only valid for a single repository, and have a short expiry
-        period (currently fixed at one week), so at the moment they are only
-        suitable in some limited situations.  By default they are currently
-        implemented as macaroons.
-        If `description` and `scopes` are both given, then issue a personal
-        access token instead, either non-expiring or with an expiry time
-        given by `date_expires`.  These may be used in webservice API
-        requests for certain methods on this repository.
-        This interface is experimental, and may be changed or removed
-        without notice.
-        :return: If `description` and `scopes` are both given, the secret
-            for a new personal access token (Launchpad only records the hash
-            of this secret and not the secret itself, so the caller must be
-            careful to save this; personal access tokens are in development
-            and may not entirely work yet).  Otherwise, a serialised
-            macaroon.
-        """
@@ -1054,7 +1015,7 @@ class IGitRepositoryExpensiveRequest(Interface):
         that is not an admin or a registry expert."""
-class IGitRepositoryEdit(IWebhookTarget, IAccessTokenTarget):
+class IGitRepositoryEdit(IWebhookTarget, IAccessTokenTargetEdit):
     """IGitRepository methods that require launchpad.Edit permission."""
diff --git a/lib/lp/code/model/gitrepository.py b/lib/lp/code/model/gitrepository.py
index 85240eb..633b508 100644
--- a/lib/lp/code/model/gitrepository.py
+++ b/lib/lp/code/model/gitrepository.py
@@ -126,9 +126,7 @@ from lp.registry.model.accesspolicy import (
 from lp.registry.model.person import Person
 from lp.registry.model.teammembership import TeamParticipation
-from lp.services.auth.interfaces import IAccessTokenSet
 from lp.services.auth.model import AccessTokenTargetMixin
-from lp.services.auth.utils import create_access_token_secret
 from lp.services.config import config
 from lp.services.database import bulk
 from lp.services.database.constants import DEFAULT, UTC_NOW
@@ -327,6 +325,9 @@ class GitRepository(
         name="builder_constraints", allow_none=True
+    # Matches `GitRepositoryMacaroonIssuer.identifier`
+    macaroon_issuer_identifier = "git-repository"
     def __init__(
@@ -1880,56 +1881,6 @@ class GitRepository(
             results, pre_iter_hook=preloadDataForActivities
-    def _issuePersonalAccessToken(
-        self, user, description, scopes, date_expires=None
-    ):
-        """Issue a personal access token for this repository."""
-        if user is None:
-            raise Unauthorized(
-                "Personal access tokens may only be issued for a logged-in "
-                "user."
-            )
-        secret = create_access_token_secret()
-        getUtility(IAccessTokenSet).new(
-            secret,
-            owner=user,
-            description=description,
-            target=self,
-            scopes=scopes,
-            date_expires=date_expires,
-        )
-        return secret
-    # XXX cjwatson 2021-10-13: Remove this once lp.code.xmlrpc.git accepts
-    # pushes using personal access tokens.
-    def _issueMacaroon(self, user):
-        """Issue a macaroon for this repository."""
-        issuer = getUtility(IMacaroonIssuer, "git-repository")
-        # Our security adapter has already done the checks we need, apart
-        # from forbidding anonymous users which is done by the issuer.
-        return (
-            removeSecurityProxy(issuer)
-            .issueMacaroon(self, user=user)
-            .serialize()
-        )
-    def issueAccessToken(
-        self, owner=None, description=None, scopes=None, date_expires=None
-    ):
-        """See `IGitRepository`."""
-        # It's more usual in model code to pass the user as an argument,
-        # e.g. using @call_with(user=REQUEST_USER) in the webservice
-        # interface.  However, in this case that would allow anyone who
-        # constructs a way to call this method not via the webservice to
-        # issue a token for any user, which seems like a bad idea.
-        user = getUtility(ILaunchBag).user
-        if description is not None and scopes is not None:
-            return self._issuePersonalAccessToken(
-                user, description, scopes, date_expires=date_expires
-            )
-        else:
-            return self._issueMacaroon(user)
     def canBeDeleted(self):
         """See `IGitRepository`."""
         # Can't delete if the repository is associated with anything.
diff --git a/lib/lp/code/model/tests/test_gitrepository.py b/lib/lp/code/model/tests/test_gitrepository.py
index 29e8d59..ead3153 100644
--- a/lib/lp/code/model/tests/test_gitrepository.py
+++ b/lib/lp/code/model/tests/test_gitrepository.py
@@ -30,7 +30,6 @@ from testtools.matchers import (
-    StartsWith,
 from testtools.testcase import ExpectedException
 from zope.component import getUtility
@@ -150,7 +149,6 @@ from lp.registry.interfaces.personociproject import IPersonOCIProjectFactory
 from lp.registry.interfaces.personproduct import IPersonProductFactory
 from lp.registry.tests.test_accesspolicy import get_policies_for_artifact
 from lp.services.auth.enums import AccessTokenScope
-from lp.services.auth.interfaces import IAccessTokenSet
 from lp.services.authserver.xmlrpc import AuthServerAPIView
 from lp.services.config import config
 from lp.services.database.constants import UTC_NOW
@@ -6672,306 +6670,6 @@ class TestGitRepositoryWebservice(TestCaseWithFactory):
-    def test_issueAccessToken(self):
-        # A user can request an access token via the webservice API.
-        self.pushConfig("codehosting", git_macaroon_secret_key="some-secret")
-        repository = self.factory.makeGitRepository()
-        # Write access to the repository isn't checked at this stage
-        # (although the access token will only be useful if the user has
-        # some kind of write access).
-        requester = self.factory.makePerson()
-        with person_logged_in(requester):
-            repository_url = api_url(repository)
-        webservice = webservice_for_person(
-            requester,
-            permission=OAuthPermission.WRITE_PUBLIC,
-            default_api_version="devel",
-        )
-        response = webservice.named_post(repository_url, "issueAccessToken")
-        self.assertEqual(200, response.status)
-        macaroon = Macaroon.deserialize(response.jsonBody())
-        with person_logged_in(ANONYMOUS):
-            self.assertThat(
-                macaroon,
-                MatchesStructure(
-                    location=Equals(config.vhost.mainsite.hostname),
-                    identifier=Equals("git-repository"),
-                    caveats=MatchesListwise(
-                        [
-                            MatchesStructure.byEquality(
-                                caveat_id="lp.git-repository %s"
-                                % repository.id
-                            ),
-                            MatchesStructure(
-                                caveat_id=StartsWith(
-                                    "lp.principal.openid-identifier "
-                                )
-                            ),
-                            MatchesStructure(
-                                caveat_id=StartsWith("lp.expires ")
-                            ),
-                        ]
-                    ),
-                ),
-            )
-    def test_issueAccessToken_anonymous(self):
-        # An anonymous user cannot request an access token via the
-        # webservice API.
-        repository = self.factory.makeGitRepository()
-        with person_logged_in(repository.owner):
-            repository_url = api_url(repository)
-        webservice = webservice_for_person(None, default_api_version="devel")
-        response = webservice.named_post(repository_url, "issueAccessToken")
-        self.assertEqual(401, response.status)
-        self.assertEqual(
-            b"git-repository macaroons may only be issued for a logged-in "
-            b"user.",
-            response.body,
-        )
-    def test_issueAccessToken_personal(self):
-        # A user can request a personal access token via the webservice API.
-        repository = self.factory.makeGitRepository()
-        # Write access to the repository isn't checked at this stage
-        # (although the access token will only be useful if the user has
-        # some kind of write access).
-        requester = self.factory.makePerson()
-        with person_logged_in(requester):
-            repository_url = api_url(repository)
-        webservice = webservice_for_person(
-            requester,
-            permission=OAuthPermission.WRITE_PUBLIC,
-            default_api_version="devel",
-        )
-        response = webservice.named_post(
-            repository_url,
-            "issueAccessToken",
-            description="Test token",
-            scopes=["repository:build_status"],
-        )
-        self.assertEqual(200, response.status)
-        secret = response.jsonBody()
-        with person_logged_in(requester):
-            token = getUtility(IAccessTokenSet).getBySecret(secret)
-            self.assertThat(
-                token,
-                MatchesStructure(
-                    owner=Equals(requester),
-                    description=Equals("Test token"),
-                    target=Equals(repository),
-                    scopes=Equals([AccessTokenScope.REPOSITORY_BUILD_STATUS]),
-                    date_expires=Is(None),
-                ),
-            )
-    def test_issueAccessToken_personal_with_expiry(self):
-        # A user can set an expiry time when requesting a personal access
-        # token via the webservice API.
-        repository = self.factory.makeGitRepository()
-        # Write access to the repository isn't checked at this stage
-        # (although the access token will only be useful if the user has
-        # some kind of write access).
-        requester = self.factory.makePerson()
-        with person_logged_in(requester):
-            repository_url = api_url(repository)
-        webservice = webservice_for_person(
-            requester,
-            permission=OAuthPermission.WRITE_PUBLIC,
-            default_api_version="devel",
-        )
-        date_expires = datetime.now(timezone.utc) + timedelta(days=30)
-        response = webservice.named_post(
-            repository_url,
-            "issueAccessToken",
-            description="Test token",
-            scopes=["repository:build_status"],
-            date_expires=date_expires.isoformat(),
-        )
-        self.assertEqual(200, response.status)
-        secret = response.jsonBody()
-        with person_logged_in(requester):
-            token = getUtility(IAccessTokenSet).getBySecret(secret)
-            self.assertThat(
-                token,
-                MatchesStructure.byEquality(
-                    owner=requester,
-                    description="Test token",
-                    target=repository,
-                    scopes=[AccessTokenScope.REPOSITORY_BUILD_STATUS],
-                    date_expires=date_expires,
-                ),
-            )
-    def test_issueAccessToken_personal_anonymous(self):
-        # An anonymous user cannot request a personal access token via the
-        # webservice API.
-        repository = self.factory.makeGitRepository()
-        with person_logged_in(repository.owner):
-            repository_url = api_url(repository)
-        webservice = webservice_for_person(None, default_api_version="devel")
-        response = webservice.named_post(
-            repository_url,
-            "issueAccessToken",
-            description="Test token",
-            scopes=["repository:build_status"],
-        )
-        self.assertEqual(401, response.status)
-        self.assertEqual(
-            b"Personal access tokens may only be issued for a logged-in user.",
-            response.body,
-        )
-    def test_builder_constraints_commercial_admin(self):
-        # A commercial admin can change a repository's builder constraints.
-        self.factory.makeBuilder(open_resources=["gpu", "large"])
-        repository_db = self.factory.makeGitRepository()
-        commercial_admin = getUtility(
-            ILaunchpadCelebrities
-        ).commercial_admin.teamowner
-        webservice = webservice_for_person(
-            commercial_admin, permission=OAuthPermission.WRITE_PUBLIC
-        )
-        webservice.default_api_version = "devel"
-        with person_logged_in(ANONYMOUS):
-            repository_url = api_url(repository_db)
-        response = webservice.patch(
-            repository_url,
-            "application/json",
-            json.dumps({"builder_constraints": ["gpu"]}),
-        )
-        self.assertEqual(209, response.status)
-        with person_logged_in(ANONYMOUS):
-            self.assertEqual(("gpu",), repository_db.builder_constraints)
-    def test_builder_constraints_owner(self):
-        # The owner of a repository cannot change its builder constraints
-        # (unless they're also a (commercial) admin).
-        self.factory.makeBuilder(open_resources=["gpu", "large"])
-        repository_db = self.factory.makeGitRepository()
-        webservice = webservice_for_person(
-            repository_db.owner, permission=OAuthPermission.WRITE_PUBLIC
-        )
-        webservice.default_api_version = "devel"
-        with person_logged_in(ANONYMOUS):
-            repository_url = api_url(repository_db)
-        response = webservice.patch(
-            repository_url,
-            "application/json",
-            json.dumps({"builder_constraints": ["gpu"]}),
-        )
-        self.assertEqual(401, response.status)
-        with person_logged_in(ANONYMOUS):
-            self.assertIsNone(repository_db.builder_constraints)
-    def test_builder_constraints_nonexistent(self):
-        # Only known builder resources may be set as builder constraints.
-        self.factory.makeBuilder(
-            open_resources=["large"], restricted_resources=["gpu"]
-        )
-        repository_db = self.factory.makeGitRepository()
-        commercial_admin = getUtility(
-            ILaunchpadCelebrities
-        ).commercial_admin.teamowner
-        webservice = webservice_for_person(
-            commercial_admin, permission=OAuthPermission.WRITE_PUBLIC
-        )
-        webservice.default_api_version = "devel"
-        with person_logged_in(ANONYMOUS):
-            repository_url = api_url(repository_db)
-        response = webservice.patch(
-            repository_url,
-            "application/json",
-            json.dumps({"builder_constraints": ["really-large"]}),
-        )
-        self.assertEqual(400, response.status)
-        self.assertEqual(
-            b"builder_constraints: 'really-large' isn't a valid token",
-            response.body,
-        )
-    def test_fork_to_self(self):
-        hosting_fixture = self.useFixture(GitHostingFixture())
-        repository = self.factory.makeGitRepository()
-        requester = self.factory.makePerson()
-        repository_url = api_url(repository)
-        requester_url = api_url(requester)
-        webservice = webservice_for_person(
-            requester,
-            permission=OAuthPermission.WRITE_PUBLIC,
-            default_api_version="devel",
-        )
-        response = webservice.named_post(
-            repository_url, "fork", new_owner=requester_url
-        )
-        self.assertEqual(200, response.status)
-        self.assertEndsWith(response.jsonBody()["owner_link"], requester_url)
-        self.assertEqual(1, len(hosting_fixture.create.calls))
-    def test_fork_to_team_as_member(self):
-        hosting_fixture = self.useFixture(GitHostingFixture())
-        repository = self.factory.makeGitRepository()
-        requester = self.factory.makePerson()
-        team = self.factory.makeTeam(members=[requester])
-        repository_url = api_url(repository)
-        team_url = api_url(team)
-        webservice = webservice_for_person(
-            requester,
-            permission=OAuthPermission.WRITE_PUBLIC,
-            default_api_version="devel",
-        )
-        response = webservice.named_post(
-            repository_url, "fork", new_owner=team_url
-        )
-        self.assertEqual(200, response.status)
-        self.assertEndsWith(response.jsonBody()["owner_link"], team_url)
-        self.assertEqual(1, len(hosting_fixture.create.calls))
-    def test_fork_to_team_as_non_member(self):
-        hosting_fixture = self.useFixture(GitHostingFixture())
-        repository = self.factory.makeGitRepository()
-        requester = self.factory.makePerson()
-        team = self.factory.makeTeam()
-        repository_url = api_url(repository)
-        team_url = api_url(team)
-        webservice = webservice_for_person(
-            requester,
-            permission=OAuthPermission.WRITE_PUBLIC,
-            default_api_version="devel",
-        )
-        response = webservice.named_post(
-            repository_url, "fork", new_owner=team_url
-        )
-        self.assertEqual(401, response.status)
-        self.assertEqual(
-            b"The owner of the new repository must be you or a team of which "
-            b"you are a member.",
-            response.body,
-        )
-        self.assertEqual(0, len(hosting_fixture.create.calls))
-    def test_fork_invisible(self):
-        hosting_fixture = self.useFixture(GitHostingFixture())
-        owner = self.factory.makePerson()
-        repository = self.factory.makeGitRepository(
-            owner=owner, information_type=InformationType.USERDATA
-        )
-        requester = self.factory.makePerson()
-        with person_logged_in(owner):
-            repository_url = api_url(repository)
-            requester_url = api_url(requester)
-        webservice = webservice_for_person(
-            requester,
-            permission=OAuthPermission.WRITE_PUBLIC,
-            default_api_version="devel",
-        )
-        response = webservice.named_post(
-            repository_url, "fork", new_owner=requester_url
-        )
-        self.assertEqual(401, response.status)
-        self.assertIn(b"launchpad.View", response.body)
-        self.assertEqual(0, len(hosting_fixture.create.calls))
 class TestGitRepositoryMacaroonIssuer(MacaroonTestMixin, TestCaseWithFactory):
     """Test GitRepository macaroon issuing and verification."""
diff --git a/lib/lp/services/auth/interfaces.py b/lib/lp/services/auth/interfaces.py
index be90837..1250183 100644
--- a/lib/lp/services/auth/interfaces.py
+++ b/lib/lp/services/auth/interfaces.py
@@ -18,6 +18,7 @@ from lazr.restful.declarations import (
+    operation_parameters,
 from lazr.restful.fields import Reference
@@ -207,6 +208,46 @@ class IAccessTokenVerifiedRequest(Interface):
 class IAccessTokenTarget(Interface):
     """An object that can be a target for access tokens."""
+    @operation_parameters(
+        description=TextLine(
+            title=_("A short description of the token."), required=False
+        ),
+        scopes=List(
+            title=_("A list of scopes to be granted by this token."),
+            value_type=Choice(vocabulary=AccessTokenScope),
+            required=False,
+        ),
+        date_expires=Datetime(
+            title=_("When the token should expire."), required=False
+        ),
+    )
+    @export_write_operation()
+    @operation_for_version("devel")
+    def issueAccessToken(description=None, scopes=None, date_expires=None):
+        """Issue an access token for this target.
+        Access tokens can be used to push to repositories over HTTPS.
+        If `description` and `scopes` are both given, then issue a personal
+        access token instead, either non-expiring or with an expiry time
+        given by `date_expires`.  These may be used in webservice API
+        requests for certain methods in the target's repositories.
+        :return: If `description` and `scopes` are both given, the secret
+            for a new personal access token (Launchpad only records the hash
+            of this secret and not the secret itself, so the caller must be
+            careful to save this; personal access tokens are in development
+            and may not entirely work yet).  Otherwise, a serialised
+            macaroon.
+        """
+class IAccessTokenTargetEdit(Interface):
+    """An object that can be a target for access tokens that requires
+    launchpad.Edit permission.
+    """
diff --git a/lib/lp/services/auth/model.py b/lib/lp/services/auth/model.py
index 436c382..6b517e0 100644
--- a/lib/lp/services/auth/model.py
+++ b/lib/lp/services/auth/model.py
@@ -16,6 +16,7 @@ from storm.expr import SQL, And, Cast, Or, Select, Update
 from storm.locals import DateTime, Int, Reference, Unicode
 from zope.component import getUtility
 from zope.interface import implementer
+from zope.security.interfaces import Unauthorized
 from zope.security.proxy import removeSecurityProxy
 from lp.code.interfaces.gitcollection import IAllGitRepositories
@@ -23,9 +24,12 @@ from lp.code.interfaces.gitrepository import IGitRepository
 from lp.registry.model.teammembership import TeamParticipation
 from lp.services.auth.enums import AccessTokenScope
 from lp.services.auth.interfaces import IAccessToken, IAccessTokenSet
+from lp.services.auth.utils import create_access_token_secret
 from lp.services.database.constants import UTC_NOW
 from lp.services.database.interfaces import IPrimaryStore, IStore
 from lp.services.database.stormbase import StormBase
+from lp.services.macaroons.interfaces import IMacaroonIssuer
+from lp.services.webapp.interfaces import ILaunchBag
@@ -236,3 +240,70 @@ class AccessTokenTargetMixin:
+    def issueAccessToken(
+        self, description=None, scopes=None, date_expires=None
+    ):
+        # It's more usual in model code to pass the user as an argument,
+        # e.g. using @call_with(user=REQUEST_USER) in the webservice
+        # interface.  However, in this case that would allow anyone who
+        # constructs a way to call this method not via the webservice to
+        # issue a token for any user, which seems like a bad idea.
+        user = getUtility(ILaunchBag).user
+        if description is not None and scopes is not None:
+            return self._issueAccessToken(
+                user, description, scopes, date_expires=date_expires
+            )
+        else:
+            return self._issueMacaroon(user)
+    def _issueAccessToken(self, user, description, scopes, date_expires=None):
+        """Issue an access token for this target."""
+        if user is None:
+            raise Unauthorized(
+                "Personal access tokens may only be issued for a logged-in "
+                "user."
+            )
+        secret = create_access_token_secret()
+        getUtility(IAccessTokenSet).new(
+            secret,
+            owner=user,
+            description=description,
+            target=self,
+            scopes=scopes,
+            date_expires=date_expires,
+        )
+        return secret
+    def _issuePersonalAccessToken(
+        self, user, description, scopes, date_expires=None
+    ):
+        """Issue a personal access token for this target."""
+        if user is None:
+            raise Unauthorized(
+                "Personal access tokens may only be issued for a logged-in "
+                "user."
+            )
+        secret = create_access_token_secret()
+        getUtility(IAccessTokenSet).new(
+            secret,
+            owner=user,
+            description=description,
+            target=self,
+            scopes=scopes,
+            date_expires=date_expires,
+        )
+        return secret
+    # XXX cjwatson 2021-10-13: Remove this once lp.code.xmlrpc.git accepts
+    # pushes using personal access tokens.
+    def _issueMacaroon(self, user):
+        """Issue a macaroon for this target."""
+        issuer = getUtility(IMacaroonIssuer, self.macaroon_issuer_identifier)
+        # Our security adapter has already done the checks we need, apart
+        # from forbidding anonymous users which is done by the issuer.
+        return (
+            removeSecurityProxy(issuer)
+            .issueMacaroon(self, user=user)
+            .serialize()
+        )
diff --git a/lib/lp/services/auth/tests/test_model.py b/lib/lp/services/auth/tests/test_model.py
index 9449f0c..50ad883 100644
--- a/lib/lp/services/auth/tests/test_model.py
+++ b/lib/lp/services/auth/tests/test_model.py
@@ -9,14 +9,22 @@ import signal
 from datetime import datetime, timedelta, timezone
 import transaction
+from pymacaroons import Macaroon
 from storm.store import Store
-from testtools.matchers import Is, MatchesStructure
+from testtools.matchers import (
+    Equals,
+    Is,
+    MatchesListwise,
+    MatchesStructure,
+    StartsWith,
 from zope.component import getUtility
 from zope.security.proxy import removeSecurityProxy
 from lp.services.auth.enums import AccessTokenScope
 from lp.services.auth.interfaces import IAccessTokenSet
 from lp.services.auth.utils import create_access_token_secret
+from lp.services.config import config
 from lp.services.database.sqlbase import (
@@ -24,6 +32,7 @@ from lp.services.database.sqlbase import (
 from lp.services.webapp.authorization import check_permission
 from lp.services.webapp.interfaces import OAuthPermission
 from lp.testing import (
@@ -468,6 +477,154 @@ class TestAccessTokenTargetBase:
         recorder1, recorder2 = record_two_runs(get_tokens, create_token, 2)
         self.assertThat(recorder2, HasQueryCount.byEquality(recorder1))
+    def test_issueAccessToken(self):
+        # A user can request an access token via the webservice API.
+        self.pushConfig("codehosting", git_macaroon_secret_key="some-secret")
+        with person_logged_in(ANONYMOUS):
+            # Write access to the repositories isn't checked at this stage
+            # (although the access token will only be useful if the user has
+            # some kind of write access).
+            requester = self.factory.makePerson()
+        with person_logged_in(requester):
+            repository_url = api_url(self.target)
+        webservice = webservice_for_person(
+            requester,
+            permission=OAuthPermission.WRITE_PUBLIC,
+            default_api_version="devel",
+        )
+        response = webservice.named_post(repository_url, "issueAccessToken")
+        self.assertEqual(200, response.status)
+        macaroon = Macaroon.deserialize(response.jsonBody())
+        with person_logged_in(ANONYMOUS):
+            self.assertThat(
+                macaroon,
+                MatchesStructure(
+                    location=Equals(config.vhost.mainsite.hostname),
+                    identifier=Equals("git-repository"),
+                    caveats=MatchesListwise(
+                        [
+                            MatchesStructure.byEquality(
+                                caveat_id="lp.git-repository %s"
+                                % self.target.id
+                            ),
+                            MatchesStructure(
+                                caveat_id=StartsWith(
+                                    "lp.principal.openid-identifier "
+                                )
+                            ),
+                            MatchesStructure(
+                                caveat_id=StartsWith("lp.expires ")
+                            ),
+                        ]
+                    ),
+                ),
+            )
+    def test_issueAccessToken_anonymous(self):
+        # An anonymous user cannot request an access token via the
+        # webservice API.
+        with person_logged_in(self.owner):
+            repository_url = api_url(self.target)
+        webservice = webservice_for_person(None, default_api_version="devel")
+        response = webservice.named_post(repository_url, "issueAccessToken")
+        self.assertEqual(401, response.status)
+        self.assertEqual(
+            b"git-repository macaroons may only be issued for a logged-in "
+            b"user.",
+            response.body,
+        )
+    def test_issueAccessToken_personal(self):
+        # A user can request a personal access token via the webservice API.
+        with person_logged_in(ANONYMOUS):
+            # Write access to the repositories isn't checked at this stage
+            # (although the access token will only be useful if the user has
+            # some kind of write access).
+            requester = self.factory.makePerson()
+        with person_logged_in(requester):
+            repository_url = api_url(self.target)
+        webservice = webservice_for_person(
+            requester,
+            permission=OAuthPermission.WRITE_PUBLIC,
+            default_api_version="devel",
+        )
+        response = webservice.named_post(
+            repository_url,
+            "issueAccessToken",
+            description="Test token",
+            scopes=["repository:build_status"],
+        )
+        self.assertEqual(200, response.status)
+        secret = response.jsonBody()
+        with person_logged_in(requester):
+            token = getUtility(IAccessTokenSet).getBySecret(secret)
+            self.assertThat(
+                token,
+                MatchesStructure(
+                    owner=Equals(requester),
+                    description=Equals("Test token"),
+                    target=Equals(self.target),
+                    scopes=Equals([AccessTokenScope.REPOSITORY_BUILD_STATUS]),
+                    date_expires=Is(None),
+                ),
+            )
+    def test_issueAccessToken_personal_with_expiry(self):
+        # A user can set an expiry time when requesting a personal access
+        # token via the webservice API.
+        with person_logged_in(ANONYMOUS):
+            # Write access to the repositories isn't checked at this stage
+            # (although the access token will only be useful if the user has
+            # some kind of write access).
+            requester = self.factory.makePerson()
+        with person_logged_in(requester):
+            repository_url = api_url(self.target)
+        webservice = webservice_for_person(
+            requester,
+            permission=OAuthPermission.WRITE_PUBLIC,
+            default_api_version="devel",
+        )
+        date_expires = datetime.now(timezone.utc) + timedelta(days=30)
+        response = webservice.named_post(
+            repository_url,
+            "issueAccessToken",
+            description="Test token",
+            scopes=["repository:build_status"],
+            date_expires=date_expires.isoformat(),
+        )
+        self.assertEqual(200, response.status)
+        secret = response.jsonBody()
+        with person_logged_in(requester):
+            token = getUtility(IAccessTokenSet).getBySecret(secret)
+            self.assertThat(
+                token,
+                MatchesStructure.byEquality(
+                    owner=requester,
+                    description="Test token",
+                    target=self.target,
+                    scopes=[AccessTokenScope.REPOSITORY_BUILD_STATUS],
+                    date_expires=date_expires,
+                ),
+            )
+    def test_issueAccessToken_personal_anonymous(self):
+        # An anonymous user cannot request a personal access token via the
+        # webservice API.
+        with person_logged_in(self.owner):
+            target_url = api_url(self.target)
+        webservice = webservice_for_person(None, default_api_version="devel")
+        response = webservice.named_post(
+            target_url,
+            "issueAccessToken",
+            description="Test token",
+            scopes=["repository:build_status"],
+        )
+        self.assertEqual(401, response.status)
+        self.assertEqual(
+            b"Personal access tokens may only be issued for a logged-in user.",
+            response.body,
+        )
 class TestAccessTokenTargetGitRepository(
     TestAccessTokenTargetBase, TestCaseWithFactory