← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:access-token-model into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:access-token-model into launchpad:master.

Commit message:
Add AccessToken model

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/409769

This will be used for personal access tokens for the webservice API.

We have to take some care to avoid bloat and locks when updating last-used dates.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:access-token-model into launchpad:master.
diff --git a/lib/lp/registry/scripts/closeaccount.py b/lib/lp/registry/scripts/closeaccount.py
index 9bdc3b0..419f99c 100644
--- a/lib/lp/registry/scripts/closeaccount.py
+++ b/lib/lp/registry/scripts/closeaccount.py
@@ -98,6 +98,8 @@ def close_account(username, log):
         # we no longer identify the actor.
         ('accessartifactgrant', 'grantor'),
         ('accesspolicygrant', 'grantor'),
+        ('accesstoken', 'owner'),
+        ('accesstoken', 'revoked_by'),
         ('binarypackagepublishinghistory', 'removed_by'),
         ('branch', 'registrant'),
         ('branchmergeproposal', 'merge_reporter'),
diff --git a/lib/lp/security.py b/lib/lp/security.py
index d8876be..02f2816 100644
--- a/lib/lp/security.py
+++ b/lib/lp/security.py
@@ -189,6 +189,7 @@ from lp.registry.interfaces.teammembership import (
     )
 from lp.registry.interfaces.wikiname import IWikiName
 from lp.registry.model.person import Person
+from lp.services.auth.interfaces import IAccessToken
 from lp.services.config import config
 from lp.services.database.interfaces import IStore
 from lp.services.identity.interfaces.account import IAccount
@@ -496,6 +497,24 @@ class EditOAuthRequestToken(EditOAuthAccessToken):
     usedfor = IOAuthRequestToken
 
 
+class EditAccessToken(AuthorizationBase):
+    permission = 'launchpad.Edit'
+    usedfor = IAccessToken
+
+    def checkAuthenticated(self, user):
+        if user.inTeam(self.obj.owner):
+            return True
+        # Being able to edit the token doesn't allow extracting the secret,
+        # so it's OK to allow the owner of the context to do so too.  This
+        # allows context owners to exercise some control over access to
+        # their object.
+        adapter = queryAdapter(
+            self.obj.context, IAuthorization, 'launchpad.Edit')
+        if adapter is not None and adapter.checkAuthenticated(user):
+            return True
+        return False
+
+
 class EditByOwnersOrAdmins(AuthorizationBase):
     permission = 'launchpad.Edit'
     usedfor = IHasOwner
diff --git a/lib/lp/services/auth/__init__.py b/lib/lp/services/auth/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lp/services/auth/__init__.py
diff --git a/lib/lp/services/auth/configure.zcml b/lib/lp/services/auth/configure.zcml
new file mode 100644
index 0000000..bd76aa6
--- /dev/null
+++ b/lib/lp/services/auth/configure.zcml
@@ -0,0 +1,26 @@
+<!-- Copyright 2021 Canonical Ltd.  This software is licensed under the
+     GNU Affero General Public License version 3 (see the file LICENSE).
+-->
+
+<configure
+    xmlns="http://namespaces.zope.org/zope";
+    xmlns:i18n="http://namespaces.zope.org/i18n";
+    i18n_domain="launchpad">
+
+    <class class="lp.services.auth.model.AccessToken">
+        <require
+            permission="launchpad.Edit"
+            interface="lp.services.auth.interfaces.IAccessToken"
+            set_schema="lp.services.auth.interfaces.IAccessToken" />
+    </class>
+
+    <class class="lp.services.auth.model.AccessTokenSet">
+        <allow interface="lp.services.auth.interfaces.IAccessTokenSet" />
+    </class>
+
+    <securedutility
+        class="lp.services.auth.model.AccessTokenSet"
+        provides="lp.services.auth.interfaces.IAccessTokenSet">
+        <allow interface="lp.services.auth.interfaces.IAccessTokenSet" />
+    </securedutility>
+</configure>
diff --git a/lib/lp/services/auth/enums.py b/lib/lp/services/auth/enums.py
new file mode 100644
index 0000000..351f18e
--- /dev/null
+++ b/lib/lp/services/auth/enums.py
@@ -0,0 +1,30 @@
+# Copyright 2021 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Enumerations used in lp.services.auth."""
+
+__metaclass__ = type
+__all__ = [
+    "AccessTokenScope",
+    ]
+
+from lazr.enum import (
+    EnumeratedType,
+    Item,
+    )
+
+
+class AccessTokenScope(EnumeratedType):
+    """A scope specifying the capabilities of an access token."""
+
+    REPOSITORY_BUILD_STATUS = Item("""
+        repository:build_status
+
+        Can see and update the build status for all commits in a repository.
+        """)
+
+    REPOSITORY_PUSH = Item("""
+        repository:push
+
+        Can push to a repository.
+        """)
diff --git a/lib/lp/services/auth/interfaces.py b/lib/lp/services/auth/interfaces.py
new file mode 100644
index 0000000..9394cec
--- /dev/null
+++ b/lib/lp/services/auth/interfaces.py
@@ -0,0 +1,112 @@
+# Copyright 2021 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Personal access token interfaces."""
+
+__metaclass__ = type
+__all__ = [
+    "IAccessToken",
+    "IAccessTokenSet",
+    "IAccessTokenVerifiedRequest",
+    ]
+
+from lazr.restful.fields import Reference
+from zope.interface import Interface
+from zope.schema import (
+    Bool,
+    Choice,
+    Datetime,
+    Int,
+    List,
+    TextLine,
+    )
+
+from lp import _
+from lp.code.interfaces.gitrepository import IGitRepository
+from lp.services.auth.enums import AccessTokenScope
+from lp.services.fields import PublicPersonChoice
+
+
+class IAccessToken(Interface):
+    """A personal access token for the webservice API."""
+
+    id = Int(title=_("ID"), required=True, readonly=True)
+
+    date_created = Datetime(
+        title=_("When the token was created."), required=True, readonly=True)
+
+    owner = PublicPersonChoice(
+        title=_("The person who created the token."),
+        vocabulary="ValidPersonOrTeam", required=True, readonly=True)
+
+    description = TextLine(
+        title=_("A short description of the token."), required=True)
+
+    git_repository = Reference(
+        title=_("The Git repository for which the token was issued."),
+        schema=IGitRepository, required=True, readonly=True)
+
+    scopes = List(
+        value_type=Choice(vocabulary=AccessTokenScope),
+        title=_("A list of scopes granted by the token."),
+        required=True, readonly=True)
+
+    date_last_used = Datetime(
+        title=_("When the token was last used."),
+        required=False, readonly=False)
+
+    date_expires = Datetime(
+        title=_("When the token should expire or was revoked."),
+        required=False, readonly=False)
+
+    is_expired = Bool(
+        title=_("Whether this token has expired."),
+        required=False, readonly=True)
+
+    revoked_by = PublicPersonChoice(
+        title=_("The person who revoked the token, if any."),
+        vocabulary="ValidPersonOrTeam", required=False, readonly=False)
+
+    def updateLastUsed():
+        """Update this token's last-used date, if possible."""
+
+    def revoke(revoked_by):
+        """Revoke this token."""
+
+
+class IAccessTokenSet(Interface):
+    """The set of all personal access tokens."""
+
+    def new(secret, owner, description, context, scopes):
+        """Return a new access token with a given secret.
+
+        :param secret: A text string.
+        :param owner: An `IPerson` who is creating the token.
+        :param description: A short description of the token.
+        :param context: An `IGitRepository` for which the token is being
+            issued.
+        :param scopes: A list of `AccessTokenScope`s to be granted by the
+            token.
+        """
+
+    def getBySecret(secret):
+        """Return the access token with this secret, or None.
+
+        :param secret: A text string.
+        """
+
+    def findByOwner(owner):
+        """Return all access tokens for this owner.
+
+        :param owner: An `IPerson`.
+        """
+
+    def findByContext(context):
+        """Return all access tokens for this context.
+
+        :param context: An `IGitRepository`.
+        """
+
+
+class IAccessTokenVerifiedRequest(Interface):
+    """Marker interface for a request with a verified access token."""
diff --git a/lib/lp/services/auth/model.py b/lib/lp/services/auth/model.py
new file mode 100644
index 0000000..c1228d1
--- /dev/null
+++ b/lib/lp/services/auth/model.py
@@ -0,0 +1,166 @@
+# Copyright 2021 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Personal access tokens."""
+
+__metaclass__ = type
+__all__ = [
+    "AccessToken",
+    ]
+
+from datetime import (
+    datetime,
+    timedelta,
+    )
+import hashlib
+
+import pytz
+from storm.databases.postgres import JSON
+from storm.expr import (
+    And,
+    Cast,
+    Or,
+    SQL,
+    Update,
+    )
+from storm.locals import (
+    DateTime,
+    Int,
+    Reference,
+    Unicode,
+    )
+from zope.interface import implementer
+
+from lp.code.interfaces.gitrepository import IGitRepository
+from lp.services.auth.enums import AccessTokenScope
+from lp.services.auth.interfaces import (
+    IAccessToken,
+    IAccessTokenSet,
+    )
+from lp.services.database.constants import UTC_NOW
+from lp.services.database.interfaces import (
+    IMasterStore,
+    IStore,
+    )
+from lp.services.database.stormbase import StormBase
+
+
+@implementer(IAccessToken)
+class AccessToken(StormBase):
+    """See `IAccessToken`."""
+
+    __storm_table__ = "AccessToken"
+
+    id = Int(primary=True)
+
+    date_created = DateTime(
+        name="date_created", tzinfo=pytz.UTC, allow_none=False)
+
+    _token_sha256 = Unicode(name="token_sha256", allow_none=False)
+
+    owner_id = Int(name="owner", allow_none=False)
+    owner = Reference(owner_id, "Person.id")
+
+    description = Unicode(name="description", allow_none=False)
+
+    git_repository_id = Int(name="git_repository", allow_none=False)
+    git_repository = Reference(git_repository_id, "GitRepository.id")
+
+    _scopes = JSON(name="scopes", allow_none=False)
+
+    date_last_used = DateTime(
+        name="date_last_used", tzinfo=pytz.UTC, allow_none=True)
+    date_expires = DateTime(
+        name="date_expires", tzinfo=pytz.UTC, allow_none=True)
+
+    revoked_by_id = Int(name="revoked_by", allow_none=True)
+    revoked_by = Reference(revoked_by_id, "Person.id")
+
+    resolution = timedelta(minutes=10)
+
+    def __init__(self, secret, owner, description, context, scopes):
+        """Construct an `AccessToken`."""
+        self._token_sha256 = hashlib.sha256(secret.encode()).hexdigest()
+        self.owner = owner
+        self.description = description
+        if IGitRepository.providedBy(context):
+            self.git_repository = context
+        else:
+            raise TypeError("Unsupported context: {!r}".format(context))
+        self.scopes = scopes
+        self.date_created = UTC_NOW
+
+    @property
+    def context(self):
+        """See `IAccessToken`."""
+        return self.git_repository
+
+    @property
+    def scopes(self):
+        """See `IAccessToken`."""
+        return [
+            AccessTokenScope.getTermByToken(scope).value
+            for scope in self._scopes]
+
+    @scopes.setter
+    def scopes(self, scopes):
+        """See `IAccessToken`."""
+        self._scopes = [scope.title for scope in scopes]
+
+    def updateLastUsed(self):
+        """See `IAccessToken`."""
+        IMasterStore(AccessToken).execute(Update(
+            {AccessToken.date_last_used: UTC_NOW},
+            where=And(
+                # Skip the update if the AccessToken row is already locked,
+                # for example by a concurrent request using the same token.
+                AccessToken.id.is_in(SQL(
+                    "SELECT id FROM AccessToken WHERE id = ? "
+                    "FOR UPDATE SKIP LOCKED", params=(self.id,))),
+                # Only update the last-used date every so often, to avoid
+                # bloat.
+                Or(
+                    AccessToken.date_last_used == None,
+                    AccessToken.date_last_used <
+                        UTC_NOW - Cast(self.resolution, 'interval'))),
+            table=AccessToken))
+
+    @property
+    def is_expired(self):
+        now = datetime.now(pytz.UTC)
+        return self.date_expires is not None and self.date_expires <= now
+
+    def revoke(self, revoked_by):
+        """See `IAccessToken`."""
+        self.date_expires = UTC_NOW
+        self.revoked_by = revoked_by
+
+
+@implementer(IAccessTokenSet)
+class AccessTokenSet:
+
+    def new(self, secret, owner, description, context, scopes):
+        """See `IAccessTokenSet`."""
+        store = IStore(AccessToken)
+        token = AccessToken(secret, owner, description, context, scopes)
+        store.add(token)
+        return token
+
+    def getBySecret(self, secret):
+        """See `IAccessTokenSet`."""
+        return IStore(AccessToken).find(
+            AccessToken,
+            _token_sha256=hashlib.sha256(secret.encode()).hexdigest()).one()
+
+    def findByOwner(self, owner):
+        """See `IAccessTokenSet`."""
+        return IStore(AccessToken).find(AccessToken, owner=owner)
+
+    def findByContext(self, context):
+        """See `IAccessTokenSet`."""
+        kwargs = {}
+        if IGitRepository.providedBy(context):
+            kwargs["git_repository"] = context
+        else:
+            raise TypeError("Unsupported context: {!r}".format(context))
+        return IStore(AccessToken).find(AccessToken, **kwargs)
diff --git a/lib/lp/services/auth/tests/__init__.py b/lib/lp/services/auth/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lp/services/auth/tests/__init__.py
diff --git a/lib/lp/services/auth/tests/test_model.py b/lib/lp/services/auth/tests/test_model.py
new file mode 100644
index 0000000..77ce073
--- /dev/null
+++ b/lib/lp/services/auth/tests/test_model.py
@@ -0,0 +1,222 @@
+# Copyright 2021 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Test personal access tokens."""
+
+__metaclass__ = type
+
+from datetime import (
+    datetime,
+    timedelta,
+    )
+import hashlib
+import os
+import signal
+
+import pytz
+from storm.store import Store
+from testtools.matchers import (
+    Is,
+    MatchesStructure,
+    )
+import transaction
+from zope.component import getUtility
+from zope.security.proxy import removeSecurityProxy
+
+from lp.services.auth.enums import AccessTokenScope
+from lp.services.auth.interfaces import IAccessTokenSet
+from lp.services.auth.utils import create_access_token_secret
+from lp.services.database.sqlbase import (
+    disconnect_stores,
+    get_transaction_timestamp,
+    )
+from lp.services.webapp.authorization import check_permission
+from lp.testing import (
+    login,
+    login_person,
+    TestCaseWithFactory,
+    )
+from lp.testing.layers import DatabaseFunctionalLayer
+
+
+class TestAccessToken(TestCaseWithFactory):
+
+    layer = DatabaseFunctionalLayer
+
+    def test_owner_can_edit(self):
+        owner = self.factory.makePerson()
+        _, token = self.factory.makeAccessToken(owner=owner)
+        login_person(owner)
+        self.assertTrue(check_permission("launchpad.Edit", token))
+
+    def test_context_owner_can_edit(self):
+        context_owner = self.factory.makePerson()
+        repository = self.factory.makeGitRepository(owner=context_owner)
+        _, token = self.factory.makeAccessToken(context=repository)
+        login_person(context_owner)
+        self.assertTrue(check_permission("launchpad.Edit", token))
+
+    def test_other_user_cannot_edit(self):
+        _, token = self.factory.makeAccessToken()
+        login_person(self.factory.makePerson())
+        self.assertFalse(check_permission("launchpad.Edit", token))
+
+    def test_updateLastUsed_never_used(self):
+        # If the token has never been used, we update its last-used date.
+        owner = self.factory.makePerson()
+        _, token = self.factory.makeAccessToken(owner=owner)
+        login_person(owner)
+        self.assertIsNone(token.date_last_used)
+        transaction.commit()
+        token.updateLastUsed()
+        now = get_transaction_timestamp(Store.of(token))
+        self.assertEqual(now, token.date_last_used)
+
+    def test_updateLastUsed_recent(self):
+        # If the token's last-used date was updated recently, we leave it
+        # alone.
+        owner = self.factory.makePerson()
+        _, token = self.factory.makeAccessToken(owner=owner)
+        login_person(owner)
+        recent = datetime.now(pytz.UTC) - timedelta(minutes=1)
+        token.date_last_used = recent
+        transaction.commit()
+        token.updateLastUsed()
+        self.assertEqual(recent, token.date_last_used)
+
+    def test_updateLastUsed_old(self):
+        # If the token's last-used date is outside our update resolution, we
+        # update it.
+        owner = self.factory.makePerson()
+        _, token = self.factory.makeAccessToken(owner=owner)
+        login_person(owner)
+        recent = datetime.now(pytz.UTC) - timedelta(hours=1)
+        token.date_last_used = recent
+        transaction.commit()
+        token.updateLastUsed()
+        now = get_transaction_timestamp(Store.of(token))
+        self.assertEqual(now, token.date_last_used)
+
+    def test_updateLastUsed_concurrent(self):
+        # If the token is locked by another transaction, we leave it alone.
+        owner = self.factory.makePerson()
+        owner_email = removeSecurityProxy(owner.preferredemail).email
+        secret, token = self.factory.makeAccessToken(owner=owner)
+        login_person(owner)
+        self.assertIsNone(token.date_last_used)
+        transaction.commit()
+        # Fork so that we can lock the token from a different PostgreSQL
+        # session.  We must disconnect the Storm store before forking, as
+        # libpq connections are not safe for use across forks.
+        disconnect_stores()
+        read, write = os.pipe()
+        pid = os.fork()
+        if pid == 0:  # child
+            os.close(read)
+            login(owner_email)
+            token = getUtility(IAccessTokenSet).getBySecret(secret)
+            token.updateLastUsed()
+            os.write(write, b"1")
+            try:
+                signal.pause()
+            except KeyboardInterrupt:
+                pass
+            transaction.commit()
+            os._exit(0)
+        else:  # parent
+            try:
+                os.close(write)
+                os.read(read, 1)
+                login(owner_email)
+                token = getUtility(IAccessTokenSet).getBySecret(secret)
+                token.updateLastUsed()
+                now = get_transaction_timestamp(Store.of(token))
+                # The last-used date is being updated by a different
+                # transaction, which hasn't been committed yet.
+                self.assertIsNone(token.date_last_used)
+            finally:
+                os.kill(pid, signal.SIGINT)
+                os.waitpid(pid, 0)
+            transaction.commit()
+            self.assertIsNotNone(token.date_last_used)
+            self.assertNotEqual(now, token.date_last_used)
+
+    def test_is_expired(self):
+        owner = self.factory.makePerson()
+        login_person(owner)
+        _, current_token = self.factory.makeAccessToken(owner=owner)
+        _, expired_token = self.factory.makeAccessToken(owner=owner)
+        expired_token.date_expires = (
+            datetime.now(pytz.UTC) - timedelta(minutes=1))
+        self.assertFalse(current_token.is_expired)
+        self.assertTrue(expired_token.is_expired)
+
+    def test_revoke(self):
+        owner = self.factory.makePerson()
+        _, token = self.factory.makeAccessToken(
+            owner=owner, scopes=[AccessTokenScope.REPOSITORY_BUILD_STATUS])
+        login_person(owner)
+        self.assertThat(token, MatchesStructure(
+            date_expires=Is(None), revoked_by=Is(None)))
+        token.revoke(token.owner)
+        now = get_transaction_timestamp(Store.of(token))
+        self.assertThat(token, MatchesStructure.byEquality(
+            date_expires=now, revoked_by=token.owner))
+
+
+class TestAccessTokenSet(TestCaseWithFactory):
+
+    layer = DatabaseFunctionalLayer
+
+    def test_new(self):
+        secret = create_access_token_secret()
+        self.assertEqual(64, len(secret))
+        owner = self.factory.makePerson()
+        description = "Test token"
+        context = self.factory.makeGitRepository()
+        scopes = [AccessTokenScope.REPOSITORY_BUILD_STATUS]
+        _, token = self.factory.makeAccessToken(
+            secret=secret, owner=owner, description=description,
+            context=context, scopes=scopes)
+        self.assertThat(
+            removeSecurityProxy(token), MatchesStructure.byEquality(
+                _token_sha256=hashlib.sha256(secret.encode()).hexdigest(),
+                owner=owner, description=description, context=context,
+                scopes=scopes))
+
+    def test_getBySecret(self):
+        secret, token = self.factory.makeAccessToken()
+        self.assertEqual(
+            token, getUtility(IAccessTokenSet).getBySecret(secret))
+        self.assertIsNone(
+            getUtility(IAccessTokenSet).getBySecret(
+                create_access_token_secret()))
+
+    def test_findByOwner(self):
+        owners = [self.factory.makePerson() for _ in range(3)]
+        tokens = [
+            self.factory.makeAccessToken(owner=owners[0])[1],
+            self.factory.makeAccessToken(owner=owners[0])[1],
+            self.factory.makeAccessToken(owner=owners[1])[1],
+            ]
+        self.assertContentEqual(
+            tokens[:2], getUtility(IAccessTokenSet).findByOwner(owners[0]))
+        self.assertContentEqual(
+            [tokens[2]], getUtility(IAccessTokenSet).findByOwner(owners[1]))
+        self.assertContentEqual(
+            [], getUtility(IAccessTokenSet).findByOwner(owners[2]))
+
+    def test_findByContext(self):
+        contexts = [self.factory.makeGitRepository() for _ in range(3)]
+        tokens = [
+            self.factory.makeAccessToken(context=contexts[0])[1],
+            self.factory.makeAccessToken(context=contexts[0])[1],
+            self.factory.makeAccessToken(context=contexts[1])[1],
+            ]
+        self.assertContentEqual(
+            tokens[:2], getUtility(IAccessTokenSet).findByContext(contexts[0]))
+        self.assertContentEqual(
+            [tokens[2]],
+            getUtility(IAccessTokenSet).findByContext(contexts[1]))
+        self.assertContentEqual(
+            [], getUtility(IAccessTokenSet).findByContext(contexts[2]))
diff --git a/lib/lp/services/auth/utils.py b/lib/lp/services/auth/utils.py
new file mode 100644
index 0000000..99dd2ba
--- /dev/null
+++ b/lib/lp/services/auth/utils.py
@@ -0,0 +1,19 @@
+# Copyright 2021 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Personal access token utilities."""
+
+__metaclass__ = type
+__all__ = [
+    "create_access_token_secret",
+    ]
+
+import binascii
+import os
+
+
+# XXX cjwatson 2021-09-30: Replace this with secrets.token_hex(32) once we
+# can rely on Python 3.6 everywhere.
+def create_access_token_secret():
+    """Create a secret suitable for use in a personal access token."""
+    return binascii.hexlify(os.urandom(32)).decode("ASCII")
diff --git a/lib/lp/services/configure.zcml b/lib/lp/services/configure.zcml
index bdde7df..1461879 100644
--- a/lib/lp/services/configure.zcml
+++ b/lib/lp/services/configure.zcml
@@ -4,6 +4,7 @@
 
 <configure xmlns="http://namespaces.zope.org/zope";>
   <include package=".webapp" />
+  <include package=".auth" />
   <include package=".authserver" />
   <include package=".comments" />
   <include package=".database" />
diff --git a/lib/lp/testing/factory.py b/lib/lp/testing/factory.py
index b8d4c22..3ba8333 100644
--- a/lib/lp/testing/factory.py
+++ b/lib/lp/testing/factory.py
@@ -242,6 +242,8 @@ from lp.registry.model.commercialsubscription import CommercialSubscription
 from lp.registry.model.karma import KarmaTotalCache
 from lp.registry.model.milestone import Milestone
 from lp.registry.model.suitesourcepackage import SuiteSourcePackage
+from lp.services.auth.interfaces import IAccessTokenSet
+from lp.services.auth.utils import create_access_token_secret
 from lp.services.compat import message_as_bytes
 from lp.services.config import config
 from lp.services.database.constants import (
@@ -4516,6 +4518,27 @@ class BareLaunchpadObjectFactory(ObjectFactory):
             consumer, reviewed_by=owner, access_level=access_level)
         return request_token.createAccessToken()
 
+    def makeAccessToken(self, secret=None, owner=None, description=None,
+                        context=None, scopes=None):
+        """Create a personal access token.
+
+        :return: A tuple of the secret for the new token and the token
+            itself.
+        """
+        if secret is None:
+            secret = create_access_token_secret()
+        if owner is None:
+            owner = self.makePerson()
+        if description is None:
+            description = self.getUniqueUnicode()
+        if context is None:
+            context = self.makeGitRepository()
+        if scopes is None:
+            scopes = []
+        token = getUtility(IAccessTokenSet).new(
+            secret, owner, description, context, scopes)
+        return secret, token
+
     def makeCVE(self, sequence, description=None,
                 cvestate=CveStatus.CANDIDATE):
         """Create a new CVE record."""