launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #24483
[Merge] ~twom/launchpad:oci-push-rule-models into launchpad:master
Tom Wardill has proposed merging ~twom/launchpad:oci-push-rule-models into launchpad:master.
Commit message:
Add OCIRegistryCredentials with encrypted credential storage
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~twom/launchpad/+git/launchpad/+merge/380947
We need somewhere to store credentials for pushing to an OCI registry.
Add a model for that, using the NaCL based encrypted storage.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~twom/launchpad:oci-push-rule-models into launchpad:master.
diff --git a/lib/lp/oci/configure.zcml b/lib/lp/oci/configure.zcml
index da8d695..fe785ac 100644
--- a/lib/lp/oci/configure.zcml
+++ b/lib/lp/oci/configure.zcml
@@ -77,4 +77,31 @@
factory="lp.oci.model.ocirecipebuildbehaviour.OCIRecipeBuildBehaviour"
permission="zope.Public" />
+ <!-- OCIRegistryCredentials -->
+ <class class="lp.oci.model.ociregistrycredentials.OCIRegistryCredentials">
+ <require
+ permission="launchpad.View"
+ interface="lp.oci.interfaces.ociregistrycredentials.IOCIRegistryCredentialsView
+ lp.oci.interfaces.ociregistrycredentials.IOCIRegistryCredentialsEditableAttributes" />
+ <require
+ permission="launchpad.Edit"
+ interface="lp.oci.interfaces.ociregistrycredentials.IOCIRegistryCredentialsEdit"
+ set_schema="lp.oci.interfaces.ociregistrycredentials.IOCIRegistryCredentialsEditableAttributes" />
+ </class>
+
+ <securedutility
+ class="lp.oci.model.ociregistrycredentials.OCIRegistryCredentialsSet"
+ provides="lp.oci.interfaces.ociregistrycredentials.IOCIRegistryCredentialsSet">
+ <allow
+ interface="lp.oci.interfaces.ociregistrycredentials.IOCIRegistryCredentialsSet"/>
+ </securedutility>
+
+ <!-- OCIRegistrySecretsEncryptedContainer -->
+ <securedutility
+ class="lp.oci.model.ociregistrycredentials.OCIRegistrySecretsEncryptedContainer"
+ provides="lp.services.crypto.interfaces.IEncryptedContainer"
+ name="oci-registry-secrets">
+ <allow interface="lp.services.crypto.interfaces.IEncryptedContainer"/>
+ </securedutility>
+
</configure>
diff --git a/lib/lp/oci/interfaces/ociregistrycredentials.py b/lib/lp/oci/interfaces/ociregistrycredentials.py
new file mode 100644
index 0000000..411f0bd
--- /dev/null
+++ b/lib/lp/oci/interfaces/ociregistrycredentials.py
@@ -0,0 +1,75 @@
+# Copyright 2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Interfaces for handling credentials for OCI registry actions."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+__all__ = [
+ 'IOCIRegistryCredentials',
+ 'IOCIRegistryCredentialsSet',
+ ]
+
+from zope.interface import Interface
+from zope.schema import (
+ Int,
+ Text,
+ TextLine,
+ )
+
+from lp import _
+from lp.services.fields import PersonChoice
+
+
+class IOCIRegistryCredentialsView(Interface):
+
+ id = Int(title=_("ID"), required=True, readonly=True)
+
+
+class IOCIRegistryCredentialsEditableAttributes(Interface):
+
+ owner = PersonChoice(
+ title=_("Owner"),
+ required=True,
+ vocabulary="AllUserTeamsParticipationPlusSelf",
+ description=_("The owner of these credentials. "
+ "Only the owner is entitled to make use of them."),
+ readonly=False)
+
+ url = TextLine(
+ title=_("URL"),
+ description=_("The registry URL."),
+ required=True,
+ readonly=False)
+
+ credentials = Text(
+ title=_("Credentials"),
+ description=_("Encrypted credentials for pushing to the registry."),
+ required=False,
+ readonly=False)
+
+
+class IOCIRegistryCredentialsEdit(Interface):
+ """`IOCIRegistryCredentials` methods that require launchpad.Edit
+ permission.
+ """
+
+ def destroySelf():
+ """Delete these credentials."""
+
+
+class IOCIRegistryCredentials(IOCIRegistryCredentialsEdit,
+ IOCIRegistryCredentialsEditableAttributes,
+ IOCIRegistryCredentialsView):
+ """Credentials for pushing to an OCI registry."""
+
+
+class IOCIRegistryCredentialsSet(Interface):
+ """A utility to create and access OCI Registry Credentials."""
+
+ def new(owner, url, credentials):
+ """Create an `IOCIRegistryCredentials`."""
+
+ def findByOwner(owner):
+ """Find matching `IOCIRegistryCredentials` by owner."""
diff --git a/lib/lp/oci/model/ociregistrycredentials.py b/lib/lp/oci/model/ociregistrycredentials.py
new file mode 100644
index 0000000..331659f
--- /dev/null
+++ b/lib/lp/oci/model/ociregistrycredentials.py
@@ -0,0 +1,120 @@
+# Copyright 2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Registry credentials for use by an `OCIPushRule`."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+__all__ = [
+ 'OCIRegistryCredentials',
+ 'OCIRegistryCredentialsSet',
+ ]
+
+import base64
+import json
+
+from storm.locals import (
+ Int,
+ JSON,
+ Reference,
+ Storm,
+ Unicode,
+ )
+from zope.component import getUtility
+from zope.interface import implementer
+
+from lp.oci.interfaces.ociregistrycredentials import (
+ IOCIRegistryCredentials,
+ IOCIRegistryCredentialsSet,
+ )
+from lp.services.config import config
+from lp.services.crypto.interfaces import (
+ CryptoError,
+ IEncryptedContainer,
+ )
+from lp.services.crypto.model import NaClEncryptedContainerBase
+from lp.services.database.interfaces import IStore
+
+
+@implementer(IEncryptedContainer)
+class OCIRegistrySecretsEncryptedContainer(NaClEncryptedContainerBase):
+
+ @property
+ def public_key_bytes(self):
+ if config.oci.registry_secrets_public_key is not None:
+ return base64.b64decode(
+ config.oci.registry_secrets_public_key.encode('UTF-8'))
+ else:
+ return None
+
+ @property
+ def private_key_bytes(self):
+ if config.oci.registry_secrets_private_key is not None:
+ return base64.b64decode(
+ config.oci.registry_secrets_private_key.encode('UTF-8'))
+ else:
+ return None
+
+
+@implementer(IOCIRegistryCredentials)
+class OCIRegistryCredentials(Storm):
+
+ __storm_table__ = 'OCIRegistryCredentials'
+
+ id = Int(primary=True)
+
+ owner_id = Int(name='owner', allow_none=False)
+ owner = Reference(owner_id, 'Person.id')
+
+ url = Unicode(name="url", allow_none=False)
+
+ _credentials = JSON(name="credentials", allow_none=True)
+
+ def __init__(self, owner, url, credentials):
+ self.owner = owner
+ self.url = url
+ self.credentials = credentials
+
+ @property
+ def credentials(self):
+ container = getUtility(IEncryptedContainer, "oci-registry-secrets")
+ try:
+ return json.loads(container.decrypt((
+ self._credentials["public_key"],
+ self._credentials['credentials_encrypted'])).decode("UTF-8"))
+ except CryptoError as e:
+ # XXX twom 2020-03-18 This needs a better error
+ # see SnapStoreClient.UnauthorizedUploadResponse
+ # Waiting on OCIRegistryClient.
+ raise e
+
+ @credentials.setter
+ def credentials(self, value):
+ container = getUtility(IEncryptedContainer, "oci-registry-secrets")
+ public_key, encrypted_value = container.encrypt(
+ json.dumps(value).encode('UTF-8'))
+ self._credentials = {
+ "credentials_encrypted": encrypted_value,
+ "public_key": public_key}
+
+ def destroySelf(self):
+ """See `IOCIRegistryCredentials`."""
+ store = IStore(OCIRegistryCredentials)
+ store.find(
+ OCIRegistryCredentials, OCIRegistryCredentials.id == self).remove()
+
+
+@implementer(IOCIRegistryCredentialsSet)
+class OCIRegistryCredentialsSet:
+
+ def new(self, owner, url, credentials):
+ """See `IOCIRegistryCredentialsSet`."""
+ return OCIRegistryCredentials(owner, url, credentials)
+
+ def findByOwner(self, owner):
+ """See `IOCIRegistryCredentialsSet`."""
+ store = IStore(OCIRegistryCredentials)
+ return store.find(
+ OCIRegistryCredentials,
+ OCIRegistryCredentials.owner == owner)
diff --git a/lib/lp/oci/tests/test_ociregistrycredentials.py b/lib/lp/oci/tests/test_ociregistrycredentials.py
new file mode 100644
index 0000000..54246e3
--- /dev/null
+++ b/lib/lp/oci/tests/test_ociregistrycredentials.py
@@ -0,0 +1,116 @@
+# Copyright 2020 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for OCI image registry credential storage."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import base64
+
+from nacl.public import PrivateKey
+from testtools.matchers import (
+ Equals,
+ MatchesDict,
+ )
+from zope.component import getUtility
+from zope.security.proxy import removeSecurityProxy
+
+from lp.oci.interfaces.ociregistrycredentials import (
+ IOCIRegistryCredentials,
+ IOCIRegistryCredentialsSet,
+ )
+from lp.testing import (
+ person_logged_in,
+ TestCaseWithFactory,
+ )
+from lp.testing.layers import LaunchpadZopelessLayer
+
+
+class TestOCIRegistryCredentials(TestCaseWithFactory):
+
+ layer = LaunchpadZopelessLayer
+
+ def setUp(self):
+ super(TestOCIRegistryCredentials, self).setUp()
+ self.private_key = PrivateKey.generate()
+ self.pushConfig(
+ "oci",
+ registry_secrets_public_key=base64.b64encode(
+ bytes(self.private_key.public_key)).decode("UTF-8"))
+ self.pushConfig(
+ "oci",
+ registry_secrets_private_key=base64.b64encode(
+ bytes(self.private_key)))
+
+ def test_implements_interface(self):
+ target = getUtility(IOCIRegistryCredentialsSet).new(
+ owner=self.factory.makePerson(),
+ url='http://example.org',
+ credentials={'username': 'foo', 'password': 'bar'})
+ self.assertProvides(target, IOCIRegistryCredentials)
+
+ def test_retrieve_encrypted_credentials(self):
+ owner = self.factory.makePerson()
+ target = self.factory.makeOCIRegistryCredentials(
+ owner=owner,
+ url='http://example.org',
+ credentials={'username': 'foo', 'password': 'bar'})
+
+ with person_logged_in(owner):
+ self.assertThat(target.credentials, MatchesDict({
+ "username": Equals("foo"),
+ "password": Equals("bar")}))
+
+ def test_credentials_are_encrypted(self):
+ credentials = {'username': 'foo', 'password': 'bar'}
+ target = removeSecurityProxy(
+ self.factory.makeOCIRegistryCredentials(
+ credentials=credentials))
+ self.assertIn('credentials_encrypted', target._credentials)
+ self.assertIn('public_key', target._credentials)
+
+
+class TestOCIRegistryCredentialsSet(TestCaseWithFactory):
+
+ layer = LaunchpadZopelessLayer
+
+ def setUp(self):
+ super(TestOCIRegistryCredentialsSet, self).setUp()
+ self.private_key = PrivateKey.generate()
+ self.pushConfig(
+ "oci",
+ registry_secrets_public_key=base64.b64encode(
+ bytes(self.private_key.public_key)).decode("UTF-8"))
+ self.pushConfig(
+ "oci",
+ registry_secrets_private_key=base64.b64encode(
+ bytes(self.private_key)))
+
+ def test_implements_interface(self):
+ target_set = getUtility(IOCIRegistryCredentialsSet)
+ self.assertProvides(target_set, IOCIRegistryCredentialsSet)
+
+ def test_new(self):
+ owner = self.factory.makePerson()
+ url = unicode(self.factory.getUniqueURL())
+ credentials = {'username': 'foo', 'password': 'bar'}
+ target = getUtility(IOCIRegistryCredentialsSet).new(
+ owner=owner,
+ url=url,
+ credentials=credentials)
+ self.assertEqual(target.owner, owner)
+ self.assertEqual(target.url, url)
+ self.assertEqual(target.credentials, credentials)
+
+ def test_findByOwner(self):
+ owner = self.factory.makePerson()
+ for _ in range(3):
+ self.factory.makeOCIRegistryCredentials(owner=owner)
+ # make some that have a different owner
+ for _ in range(5):
+ self.factory.makeOCIRegistryCredentials()
+
+ found = getUtility(IOCIRegistryCredentialsSet).findByOwner(owner)
+ self.assertEqual(found.count(), 3)
+ for target in found:
+ self.assertEqual(target.owner, owner)
diff --git a/lib/lp/security.py b/lib/lp/security.py
index e31c65c..d4f74f4 100644
--- a/lib/lp/security.py
+++ b/lib/lp/security.py
@@ -114,6 +114,7 @@ from lp.hardwaredb.interfaces.hwdb import (
)
from lp.oci.interfaces.ocirecipe import IOCIRecipe
from lp.oci.interfaces.ocirecipebuild import IOCIRecipeBuild
+from lp.oci.interfaces.ociregistrycredentials import IOCIRegistryCredentials
from lp.registry.enums import PersonVisibility
from lp.registry.interfaces.announcement import IAnnouncement
from lp.registry.interfaces.distribution import IDistribution
@@ -3510,3 +3511,20 @@ class AdminOCIRecipe(AuthorizationBase):
class ViewOCIRecipeBuild(AnonymousAuthorization):
"""Anyone can view an `IOCIRecipe`."""
usedfor = IOCIRecipeBuild
+
+
+class ViewOCIRegistryCredentials(AuthorizationBase):
+ permission = 'launchpad.View'
+ usedfor = IOCIRegistryCredentials
+
+ def checkAuthenticated(self, user):
+ import pdb; pdb.set_trace()
+ return user.isOwner(self.obj)
+
+
+class EditOCIRegistryCredentials(AuthorizationBase):
+ permission = 'launchpad.Edit'
+ usedfor = IOCIRegistryCredentials
+
+ def checkAuthenticated(self, user):
+ return user.isOwner(self.obj)
diff --git a/lib/lp/services/config/schema-lazr.conf b/lib/lp/services/config/schema-lazr.conf
index ad3d36a..64c8473 100644
--- a/lib/lp/services/config/schema-lazr.conf
+++ b/lib/lp/services/config/schema-lazr.conf
@@ -1792,6 +1792,17 @@ store_secrets_private_key: none
# datatype: string
store_secrets_public_key: none
+[oci]
+# Base64-encoded NaCl private key for decrypting registry upload credentials.
+# This should only be set in secret overlays on systems that need to perform
+# registry uploads on behalf of users.
+# datatype: string
+registry_secrets_public_key: none
+
+# Base64-encoded NaCl public key for encrypting registry upload credentials.
+# datatype: string
+registry_secrets_private_key: none
+
[process-job-source-groups]
# This section is used by cronscripts/process-job-source-groups.py.
dbuser: process-job-source-groups
diff --git a/lib/lp/testing/factory.py b/lib/lp/testing/factory.py
index 92e7825..30baf33 100644
--- a/lib/lp/testing/factory.py
+++ b/lib/lp/testing/factory.py
@@ -159,6 +159,7 @@ from lp.hardwaredb.interfaces.hwdb import (
)
from lp.oci.interfaces.ocirecipe import IOCIRecipeSet
from lp.oci.interfaces.ocirecipebuild import IOCIRecipeBuildSet
+from lp.oci.interfaces.ociregistrycredentials import IOCIRegistryCredentialsSet
from lp.oci.model.ocirecipe import OCIRecipeArch
from lp.oci.model.ocirecipebuild import OCIFile
from lp.registry.enums import (
@@ -5032,6 +5033,20 @@ class BareLaunchpadObjectFactory(ObjectFactory):
return OCIFile(build=build, library_file=library_file,
layer_file_digest=layer_file_digest)
+ def makeOCIRegistryCredentials(self, owner=None, url=None,
+ credentials=DEFAULT):
+ """Make a new OCIRegistryCredentials."""
+ if owner is None:
+ owner = self.makePerson()
+ if url is None:
+ url = unicode(self.getUniqueURL())
+ if credentials is DEFAULT:
+ credentials = None
+ return getUtility(IOCIRegistryCredentialsSet).new(
+ owner=owner,
+ url=url,
+ credentials=credentials)
+
# Some factory methods return simple Python types. We don't add
# security wrappers for them, as well as for objects created by