launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #27572
[Merge] ~cjwatson/launchpad:access-token-model into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:access-token-model into launchpad:master.
Commit message:
Add AccessToken model
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/409769
This will be used for personal access tokens for the webservice API.
We have to take some care to avoid bloat and locks when updating last-used dates.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:access-token-model into launchpad:master.
diff --git a/lib/lp/registry/scripts/closeaccount.py b/lib/lp/registry/scripts/closeaccount.py
index 9bdc3b0..419f99c 100644
--- a/lib/lp/registry/scripts/closeaccount.py
+++ b/lib/lp/registry/scripts/closeaccount.py
@@ -98,6 +98,8 @@ def close_account(username, log):
# we no longer identify the actor.
('accessartifactgrant', 'grantor'),
('accesspolicygrant', 'grantor'),
+ ('accesstoken', 'owner'),
+ ('accesstoken', 'revoked_by'),
('binarypackagepublishinghistory', 'removed_by'),
('branch', 'registrant'),
('branchmergeproposal', 'merge_reporter'),
diff --git a/lib/lp/security.py b/lib/lp/security.py
index d8876be..02f2816 100644
--- a/lib/lp/security.py
+++ b/lib/lp/security.py
@@ -189,6 +189,7 @@ from lp.registry.interfaces.teammembership import (
)
from lp.registry.interfaces.wikiname import IWikiName
from lp.registry.model.person import Person
+from lp.services.auth.interfaces import IAccessToken
from lp.services.config import config
from lp.services.database.interfaces import IStore
from lp.services.identity.interfaces.account import IAccount
@@ -496,6 +497,24 @@ class EditOAuthRequestToken(EditOAuthAccessToken):
usedfor = IOAuthRequestToken
+class EditAccessToken(AuthorizationBase):
+ permission = 'launchpad.Edit'
+ usedfor = IAccessToken
+
+ def checkAuthenticated(self, user):
+ if user.inTeam(self.obj.owner):
+ return True
+ # Being able to edit the token doesn't allow extracting the secret,
+ # so it's OK to allow the owner of the context to do so too. This
+ # allows context owners to exercise some control over access to
+ # their object.
+ adapter = queryAdapter(
+ self.obj.context, IAuthorization, 'launchpad.Edit')
+ if adapter is not None and adapter.checkAuthenticated(user):
+ return True
+ return False
+
+
class EditByOwnersOrAdmins(AuthorizationBase):
permission = 'launchpad.Edit'
usedfor = IHasOwner
diff --git a/lib/lp/services/auth/__init__.py b/lib/lp/services/auth/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lp/services/auth/__init__.py
diff --git a/lib/lp/services/auth/configure.zcml b/lib/lp/services/auth/configure.zcml
new file mode 100644
index 0000000..bd76aa6
--- /dev/null
+++ b/lib/lp/services/auth/configure.zcml
@@ -0,0 +1,26 @@
+<!-- Copyright 2021 Canonical Ltd. This software is licensed under the
+ GNU Affero General Public License version 3 (see the file LICENSE).
+-->
+
+<configure
+ xmlns="http://namespaces.zope.org/zope"
+ xmlns:i18n="http://namespaces.zope.org/i18n"
+ i18n_domain="launchpad">
+
+ <class class="lp.services.auth.model.AccessToken">
+ <require
+ permission="launchpad.Edit"
+ interface="lp.services.auth.interfaces.IAccessToken"
+ set_schema="lp.services.auth.interfaces.IAccessToken" />
+ </class>
+
+ <class class="lp.services.auth.model.AccessTokenSet">
+ <allow interface="lp.services.auth.interfaces.IAccessTokenSet" />
+ </class>
+
+ <securedutility
+ class="lp.services.auth.model.AccessTokenSet"
+ provides="lp.services.auth.interfaces.IAccessTokenSet">
+ <allow interface="lp.services.auth.interfaces.IAccessTokenSet" />
+ </securedutility>
+</configure>
diff --git a/lib/lp/services/auth/enums.py b/lib/lp/services/auth/enums.py
new file mode 100644
index 0000000..351f18e
--- /dev/null
+++ b/lib/lp/services/auth/enums.py
@@ -0,0 +1,30 @@
+# Copyright 2021 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Enumerations used in lp.services.auth."""
+
+__metaclass__ = type
+__all__ = [
+ "AccessTokenScope",
+ ]
+
+from lazr.enum import (
+ EnumeratedType,
+ Item,
+ )
+
+
+class AccessTokenScope(EnumeratedType):
+ """A scope specifying the capabilities of an access token."""
+
+ REPOSITORY_BUILD_STATUS = Item("""
+ repository:build_status
+
+ Can see and update the build status for all commits in a repository.
+ """)
+
+ REPOSITORY_PUSH = Item("""
+ repository:push
+
+ Can push to a repository.
+ """)
diff --git a/lib/lp/services/auth/interfaces.py b/lib/lp/services/auth/interfaces.py
new file mode 100644
index 0000000..9394cec
--- /dev/null
+++ b/lib/lp/services/auth/interfaces.py
@@ -0,0 +1,112 @@
+# Copyright 2021 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Personal access token interfaces."""
+
+__metaclass__ = type
+__all__ = [
+ "IAccessToken",
+ "IAccessTokenSet",
+ "IAccessTokenVerifiedRequest",
+ ]
+
+from lazr.restful.fields import Reference
+from zope.interface import Interface
+from zope.schema import (
+ Bool,
+ Choice,
+ Datetime,
+ Int,
+ List,
+ TextLine,
+ )
+
+from lp import _
+from lp.code.interfaces.gitrepository import IGitRepository
+from lp.services.auth.enums import AccessTokenScope
+from lp.services.fields import PublicPersonChoice
+
+
+class IAccessToken(Interface):
+ """A personal access token for the webservice API."""
+
+ id = Int(title=_("ID"), required=True, readonly=True)
+
+ date_created = Datetime(
+ title=_("When the token was created."), required=True, readonly=True)
+
+ owner = PublicPersonChoice(
+ title=_("The person who created the token."),
+ vocabulary="ValidPersonOrTeam", required=True, readonly=True)
+
+ description = TextLine(
+ title=_("A short description of the token."), required=True)
+
+ git_repository = Reference(
+ title=_("The Git repository for which the token was issued."),
+ schema=IGitRepository, required=True, readonly=True)
+
+ scopes = List(
+ value_type=Choice(vocabulary=AccessTokenScope),
+ title=_("A list of scopes granted by the token."),
+ required=True, readonly=True)
+
+ date_last_used = Datetime(
+ title=_("When the token was last used."),
+ required=False, readonly=False)
+
+ date_expires = Datetime(
+ title=_("When the token should expire or was revoked."),
+ required=False, readonly=False)
+
+ is_expired = Bool(
+ title=_("Whether this token has expired."),
+ required=False, readonly=True)
+
+ revoked_by = PublicPersonChoice(
+ title=_("The person who revoked the token, if any."),
+ vocabulary="ValidPersonOrTeam", required=False, readonly=False)
+
+ def updateLastUsed():
+ """Update this token's last-used date, if possible."""
+
+ def revoke(revoked_by):
+ """Revoke this token."""
+
+
+class IAccessTokenSet(Interface):
+ """The set of all personal access tokens."""
+
+ def new(secret, owner, description, context, scopes):
+ """Return a new access token with a given secret.
+
+ :param secret: A text string.
+ :param owner: An `IPerson` who is creating the token.
+ :param description: A short description of the token.
+ :param context: An `IGitRepository` for which the token is being
+ issued.
+ :param scopes: A list of `AccessTokenScope`s to be granted by the
+ token.
+ """
+
+ def getBySecret(secret):
+ """Return the access token with this secret, or None.
+
+ :param secret: A text string.
+ """
+
+ def findByOwner(owner):
+ """Return all access tokens for this owner.
+
+ :param owner: An `IPerson`.
+ """
+
+ def findByContext(context):
+ """Return all access tokens for this context.
+
+ :param context: An `IGitRepository`.
+ """
+
+
+class IAccessTokenVerifiedRequest(Interface):
+ """Marker interface for a request with a verified access token."""
diff --git a/lib/lp/services/auth/model.py b/lib/lp/services/auth/model.py
new file mode 100644
index 0000000..c1228d1
--- /dev/null
+++ b/lib/lp/services/auth/model.py
@@ -0,0 +1,166 @@
+# Copyright 2021 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Personal access tokens."""
+
+__metaclass__ = type
+__all__ = [
+ "AccessToken",
+ ]
+
+from datetime import (
+ datetime,
+ timedelta,
+ )
+import hashlib
+
+import pytz
+from storm.databases.postgres import JSON
+from storm.expr import (
+ And,
+ Cast,
+ Or,
+ SQL,
+ Update,
+ )
+from storm.locals import (
+ DateTime,
+ Int,
+ Reference,
+ Unicode,
+ )
+from zope.interface import implementer
+
+from lp.code.interfaces.gitrepository import IGitRepository
+from lp.services.auth.enums import AccessTokenScope
+from lp.services.auth.interfaces import (
+ IAccessToken,
+ IAccessTokenSet,
+ )
+from lp.services.database.constants import UTC_NOW
+from lp.services.database.interfaces import (
+ IMasterStore,
+ IStore,
+ )
+from lp.services.database.stormbase import StormBase
+
+
+@implementer(IAccessToken)
+class AccessToken(StormBase):
+ """See `IAccessToken`."""
+
+ __storm_table__ = "AccessToken"
+
+ id = Int(primary=True)
+
+ date_created = DateTime(
+ name="date_created", tzinfo=pytz.UTC, allow_none=False)
+
+ _token_sha256 = Unicode(name="token_sha256", allow_none=False)
+
+ owner_id = Int(name="owner", allow_none=False)
+ owner = Reference(owner_id, "Person.id")
+
+ description = Unicode(name="description", allow_none=False)
+
+ git_repository_id = Int(name="git_repository", allow_none=False)
+ git_repository = Reference(git_repository_id, "GitRepository.id")
+
+ _scopes = JSON(name="scopes", allow_none=False)
+
+ date_last_used = DateTime(
+ name="date_last_used", tzinfo=pytz.UTC, allow_none=True)
+ date_expires = DateTime(
+ name="date_expires", tzinfo=pytz.UTC, allow_none=True)
+
+ revoked_by_id = Int(name="revoked_by", allow_none=True)
+ revoked_by = Reference(revoked_by_id, "Person.id")
+
+ resolution = timedelta(minutes=10)
+
+ def __init__(self, secret, owner, description, context, scopes):
+ """Construct an `AccessToken`."""
+ self._token_sha256 = hashlib.sha256(secret.encode()).hexdigest()
+ self.owner = owner
+ self.description = description
+ if IGitRepository.providedBy(context):
+ self.git_repository = context
+ else:
+ raise TypeError("Unsupported context: {!r}".format(context))
+ self.scopes = scopes
+ self.date_created = UTC_NOW
+
+ @property
+ def context(self):
+ """See `IAccessToken`."""
+ return self.git_repository
+
+ @property
+ def scopes(self):
+ """See `IAccessToken`."""
+ return [
+ AccessTokenScope.getTermByToken(scope).value
+ for scope in self._scopes]
+
+ @scopes.setter
+ def scopes(self, scopes):
+ """See `IAccessToken`."""
+ self._scopes = [scope.title for scope in scopes]
+
+ def updateLastUsed(self):
+ """See `IAccessToken`."""
+ IMasterStore(AccessToken).execute(Update(
+ {AccessToken.date_last_used: UTC_NOW},
+ where=And(
+ # Skip the update if the AccessToken row is already locked,
+ # for example by a concurrent request using the same token.
+ AccessToken.id.is_in(SQL(
+ "SELECT id FROM AccessToken WHERE id = ? "
+ "FOR UPDATE SKIP LOCKED", params=(self.id,))),
+ # Only update the last-used date every so often, to avoid
+ # bloat.
+ Or(
+ AccessToken.date_last_used == None,
+ AccessToken.date_last_used <
+ UTC_NOW - Cast(self.resolution, 'interval'))),
+ table=AccessToken))
+
+ @property
+ def is_expired(self):
+ now = datetime.now(pytz.UTC)
+ return self.date_expires is not None and self.date_expires <= now
+
+ def revoke(self, revoked_by):
+ """See `IAccessToken`."""
+ self.date_expires = UTC_NOW
+ self.revoked_by = revoked_by
+
+
+@implementer(IAccessTokenSet)
+class AccessTokenSet:
+
+ def new(self, secret, owner, description, context, scopes):
+ """See `IAccessTokenSet`."""
+ store = IStore(AccessToken)
+ token = AccessToken(secret, owner, description, context, scopes)
+ store.add(token)
+ return token
+
+ def getBySecret(self, secret):
+ """See `IAccessTokenSet`."""
+ return IStore(AccessToken).find(
+ AccessToken,
+ _token_sha256=hashlib.sha256(secret.encode()).hexdigest()).one()
+
+ def findByOwner(self, owner):
+ """See `IAccessTokenSet`."""
+ return IStore(AccessToken).find(AccessToken, owner=owner)
+
+ def findByContext(self, context):
+ """See `IAccessTokenSet`."""
+ kwargs = {}
+ if IGitRepository.providedBy(context):
+ kwargs["git_repository"] = context
+ else:
+ raise TypeError("Unsupported context: {!r}".format(context))
+ return IStore(AccessToken).find(AccessToken, **kwargs)
diff --git a/lib/lp/services/auth/tests/__init__.py b/lib/lp/services/auth/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lp/services/auth/tests/__init__.py
diff --git a/lib/lp/services/auth/tests/test_model.py b/lib/lp/services/auth/tests/test_model.py
new file mode 100644
index 0000000..77ce073
--- /dev/null
+++ b/lib/lp/services/auth/tests/test_model.py
@@ -0,0 +1,222 @@
+# Copyright 2021 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Test personal access tokens."""
+
+__metaclass__ = type
+
+from datetime import (
+ datetime,
+ timedelta,
+ )
+import hashlib
+import os
+import signal
+
+import pytz
+from storm.store import Store
+from testtools.matchers import (
+ Is,
+ MatchesStructure,
+ )
+import transaction
+from zope.component import getUtility
+from zope.security.proxy import removeSecurityProxy
+
+from lp.services.auth.enums import AccessTokenScope
+from lp.services.auth.interfaces import IAccessTokenSet
+from lp.services.auth.utils import create_access_token_secret
+from lp.services.database.sqlbase import (
+ disconnect_stores,
+ get_transaction_timestamp,
+ )
+from lp.services.webapp.authorization import check_permission
+from lp.testing import (
+ login,
+ login_person,
+ TestCaseWithFactory,
+ )
+from lp.testing.layers import DatabaseFunctionalLayer
+
+
+class TestAccessToken(TestCaseWithFactory):
+
+ layer = DatabaseFunctionalLayer
+
+ def test_owner_can_edit(self):
+ owner = self.factory.makePerson()
+ _, token = self.factory.makeAccessToken(owner=owner)
+ login_person(owner)
+ self.assertTrue(check_permission("launchpad.Edit", token))
+
+ def test_context_owner_can_edit(self):
+ context_owner = self.factory.makePerson()
+ repository = self.factory.makeGitRepository(owner=context_owner)
+ _, token = self.factory.makeAccessToken(context=repository)
+ login_person(context_owner)
+ self.assertTrue(check_permission("launchpad.Edit", token))
+
+ def test_other_user_cannot_edit(self):
+ _, token = self.factory.makeAccessToken()
+ login_person(self.factory.makePerson())
+ self.assertFalse(check_permission("launchpad.Edit", token))
+
+ def test_updateLastUsed_never_used(self):
+ # If the token has never been used, we update its last-used date.
+ owner = self.factory.makePerson()
+ _, token = self.factory.makeAccessToken(owner=owner)
+ login_person(owner)
+ self.assertIsNone(token.date_last_used)
+ transaction.commit()
+ token.updateLastUsed()
+ now = get_transaction_timestamp(Store.of(token))
+ self.assertEqual(now, token.date_last_used)
+
+ def test_updateLastUsed_recent(self):
+ # If the token's last-used date was updated recently, we leave it
+ # alone.
+ owner = self.factory.makePerson()
+ _, token = self.factory.makeAccessToken(owner=owner)
+ login_person(owner)
+ recent = datetime.now(pytz.UTC) - timedelta(minutes=1)
+ token.date_last_used = recent
+ transaction.commit()
+ token.updateLastUsed()
+ self.assertEqual(recent, token.date_last_used)
+
+ def test_updateLastUsed_old(self):
+ # If the token's last-used date is outside our update resolution, we
+ # update it.
+ owner = self.factory.makePerson()
+ _, token = self.factory.makeAccessToken(owner=owner)
+ login_person(owner)
+ recent = datetime.now(pytz.UTC) - timedelta(hours=1)
+ token.date_last_used = recent
+ transaction.commit()
+ token.updateLastUsed()
+ now = get_transaction_timestamp(Store.of(token))
+ self.assertEqual(now, token.date_last_used)
+
+ def test_updateLastUsed_concurrent(self):
+ # If the token is locked by another transaction, we leave it alone.
+ owner = self.factory.makePerson()
+ owner_email = removeSecurityProxy(owner.preferredemail).email
+ secret, token = self.factory.makeAccessToken(owner=owner)
+ login_person(owner)
+ self.assertIsNone(token.date_last_used)
+ transaction.commit()
+ # Fork so that we can lock the token from a different PostgreSQL
+ # session. We must disconnect the Storm store before forking, as
+ # libpq connections are not safe for use across forks.
+ disconnect_stores()
+ read, write = os.pipe()
+ pid = os.fork()
+ if pid == 0: # child
+ os.close(read)
+ login(owner_email)
+ token = getUtility(IAccessTokenSet).getBySecret(secret)
+ token.updateLastUsed()
+ os.write(write, b"1")
+ try:
+ signal.pause()
+ except KeyboardInterrupt:
+ pass
+ transaction.commit()
+ os._exit(0)
+ else: # parent
+ try:
+ os.close(write)
+ os.read(read, 1)
+ login(owner_email)
+ token = getUtility(IAccessTokenSet).getBySecret(secret)
+ token.updateLastUsed()
+ now = get_transaction_timestamp(Store.of(token))
+ # The last-used date is being updated by a different
+ # transaction, which hasn't been committed yet.
+ self.assertIsNone(token.date_last_used)
+ finally:
+ os.kill(pid, signal.SIGINT)
+ os.waitpid(pid, 0)
+ transaction.commit()
+ self.assertIsNotNone(token.date_last_used)
+ self.assertNotEqual(now, token.date_last_used)
+
+ def test_is_expired(self):
+ owner = self.factory.makePerson()
+ login_person(owner)
+ _, current_token = self.factory.makeAccessToken(owner=owner)
+ _, expired_token = self.factory.makeAccessToken(owner=owner)
+ expired_token.date_expires = (
+ datetime.now(pytz.UTC) - timedelta(minutes=1))
+ self.assertFalse(current_token.is_expired)
+ self.assertTrue(expired_token.is_expired)
+
+ def test_revoke(self):
+ owner = self.factory.makePerson()
+ _, token = self.factory.makeAccessToken(
+ owner=owner, scopes=[AccessTokenScope.REPOSITORY_BUILD_STATUS])
+ login_person(owner)
+ self.assertThat(token, MatchesStructure(
+ date_expires=Is(None), revoked_by=Is(None)))
+ token.revoke(token.owner)
+ now = get_transaction_timestamp(Store.of(token))
+ self.assertThat(token, MatchesStructure.byEquality(
+ date_expires=now, revoked_by=token.owner))
+
+
+class TestAccessTokenSet(TestCaseWithFactory):
+
+ layer = DatabaseFunctionalLayer
+
+ def test_new(self):
+ secret = create_access_token_secret()
+ self.assertEqual(64, len(secret))
+ owner = self.factory.makePerson()
+ description = "Test token"
+ context = self.factory.makeGitRepository()
+ scopes = [AccessTokenScope.REPOSITORY_BUILD_STATUS]
+ _, token = self.factory.makeAccessToken(
+ secret=secret, owner=owner, description=description,
+ context=context, scopes=scopes)
+ self.assertThat(
+ removeSecurityProxy(token), MatchesStructure.byEquality(
+ _token_sha256=hashlib.sha256(secret.encode()).hexdigest(),
+ owner=owner, description=description, context=context,
+ scopes=scopes))
+
+ def test_getBySecret(self):
+ secret, token = self.factory.makeAccessToken()
+ self.assertEqual(
+ token, getUtility(IAccessTokenSet).getBySecret(secret))
+ self.assertIsNone(
+ getUtility(IAccessTokenSet).getBySecret(
+ create_access_token_secret()))
+
+ def test_findByOwner(self):
+ owners = [self.factory.makePerson() for _ in range(3)]
+ tokens = [
+ self.factory.makeAccessToken(owner=owners[0])[1],
+ self.factory.makeAccessToken(owner=owners[0])[1],
+ self.factory.makeAccessToken(owner=owners[1])[1],
+ ]
+ self.assertContentEqual(
+ tokens[:2], getUtility(IAccessTokenSet).findByOwner(owners[0]))
+ self.assertContentEqual(
+ [tokens[2]], getUtility(IAccessTokenSet).findByOwner(owners[1]))
+ self.assertContentEqual(
+ [], getUtility(IAccessTokenSet).findByOwner(owners[2]))
+
+ def test_findByContext(self):
+ contexts = [self.factory.makeGitRepository() for _ in range(3)]
+ tokens = [
+ self.factory.makeAccessToken(context=contexts[0])[1],
+ self.factory.makeAccessToken(context=contexts[0])[1],
+ self.factory.makeAccessToken(context=contexts[1])[1],
+ ]
+ self.assertContentEqual(
+ tokens[:2], getUtility(IAccessTokenSet).findByContext(contexts[0]))
+ self.assertContentEqual(
+ [tokens[2]],
+ getUtility(IAccessTokenSet).findByContext(contexts[1]))
+ self.assertContentEqual(
+ [], getUtility(IAccessTokenSet).findByContext(contexts[2]))
diff --git a/lib/lp/services/auth/utils.py b/lib/lp/services/auth/utils.py
new file mode 100644
index 0000000..99dd2ba
--- /dev/null
+++ b/lib/lp/services/auth/utils.py
@@ -0,0 +1,19 @@
+# Copyright 2021 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Personal access token utilities."""
+
+__metaclass__ = type
+__all__ = [
+ "create_access_token_secret",
+ ]
+
+import binascii
+import os
+
+
+# XXX cjwatson 2021-09-30: Replace this with secrets.token_hex(32) once we
+# can rely on Python 3.6 everywhere.
+def create_access_token_secret():
+ """Create a secret suitable for use in a personal access token."""
+ return binascii.hexlify(os.urandom(32)).decode("ASCII")
diff --git a/lib/lp/services/configure.zcml b/lib/lp/services/configure.zcml
index bdde7df..1461879 100644
--- a/lib/lp/services/configure.zcml
+++ b/lib/lp/services/configure.zcml
@@ -4,6 +4,7 @@
<configure xmlns="http://namespaces.zope.org/zope">
<include package=".webapp" />
+ <include package=".auth" />
<include package=".authserver" />
<include package=".comments" />
<include package=".database" />
diff --git a/lib/lp/testing/factory.py b/lib/lp/testing/factory.py
index b8d4c22..3ba8333 100644
--- a/lib/lp/testing/factory.py
+++ b/lib/lp/testing/factory.py
@@ -242,6 +242,8 @@ from lp.registry.model.commercialsubscription import CommercialSubscription
from lp.registry.model.karma import KarmaTotalCache
from lp.registry.model.milestone import Milestone
from lp.registry.model.suitesourcepackage import SuiteSourcePackage
+from lp.services.auth.interfaces import IAccessTokenSet
+from lp.services.auth.utils import create_access_token_secret
from lp.services.compat import message_as_bytes
from lp.services.config import config
from lp.services.database.constants import (
@@ -4516,6 +4518,27 @@ class BareLaunchpadObjectFactory(ObjectFactory):
consumer, reviewed_by=owner, access_level=access_level)
return request_token.createAccessToken()
+ def makeAccessToken(self, secret=None, owner=None, description=None,
+ context=None, scopes=None):
+ """Create a personal access token.
+
+ :return: A tuple of the secret for the new token and the token
+ itself.
+ """
+ if secret is None:
+ secret = create_access_token_secret()
+ if owner is None:
+ owner = self.makePerson()
+ if description is None:
+ description = self.getUniqueUnicode()
+ if context is None:
+ context = self.makeGitRepository()
+ if scopes is None:
+ scopes = []
+ token = getUtility(IAccessTokenSet).new(
+ secret, owner, description, context, scopes)
+ return secret, token
+
def makeCVE(self, sequence, description=None,
cvestate=CveStatus.CANDIDATE):
"""Create a new CVE record."""