← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~twom/launchpad:oci-push-rule-models into launchpad:master

 

Tom Wardill has proposed merging ~twom/launchpad:oci-push-rule-models into launchpad:master.

Commit message:
Add OCIRegistryCredentials with encrypted credential storage

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~twom/launchpad/+git/launchpad/+merge/380947

We need somewhere to store credentials for pushing to an OCI registry.
Add a model for that, using the NaCL based encrypted storage.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~twom/launchpad:oci-push-rule-models into launchpad:master.
diff --git a/lib/lp/oci/configure.zcml b/lib/lp/oci/configure.zcml
index da8d695..fe785ac 100644
--- a/lib/lp/oci/configure.zcml
+++ b/lib/lp/oci/configure.zcml
@@ -77,4 +77,31 @@
         factory="lp.oci.model.ocirecipebuildbehaviour.OCIRecipeBuildBehaviour"
         permission="zope.Public" />
 
+    <!-- OCIRegistryCredentials -->
+    <class class="lp.oci.model.ociregistrycredentials.OCIRegistryCredentials">
+        <require
+            permission="launchpad.View"
+            interface="lp.oci.interfaces.ociregistrycredentials.IOCIRegistryCredentialsView
+                       lp.oci.interfaces.ociregistrycredentials.IOCIRegistryCredentialsEditableAttributes" />
+        <require
+            permission="launchpad.Edit"
+            interface="lp.oci.interfaces.ociregistrycredentials.IOCIRegistryCredentialsEdit"
+            set_schema="lp.oci.interfaces.ociregistrycredentials.IOCIRegistryCredentialsEditableAttributes" />
+    </class>
+
+    <securedutility
+        class="lp.oci.model.ociregistrycredentials.OCIRegistryCredentialsSet"
+        provides="lp.oci.interfaces.ociregistrycredentials.IOCIRegistryCredentialsSet">
+        <allow
+            interface="lp.oci.interfaces.ociregistrycredentials.IOCIRegistryCredentialsSet"/>
+    </securedutility>
+
+    <!-- OCIRegistrySecretsEncryptedContainer -->
+    <securedutility
+        class="lp.oci.model.ociregistrycredentials.OCIRegistrySecretsEncryptedContainer"
+        provides="lp.services.crypto.interfaces.IEncryptedContainer"
+        name="oci-registry-secrets">
+        <allow interface="lp.services.crypto.interfaces.IEncryptedContainer"/>
+    </securedutility>
+
 </configure>
diff --git a/lib/lp/oci/interfaces/ociregistrycredentials.py b/lib/lp/oci/interfaces/ociregistrycredentials.py
new file mode 100644
index 0000000..411f0bd
--- /dev/null
+++ b/lib/lp/oci/interfaces/ociregistrycredentials.py
@@ -0,0 +1,75 @@
+# Copyright 2020 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Interfaces for handling credentials for OCI registry actions."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+__all__ = [
+    'IOCIRegistryCredentials',
+    'IOCIRegistryCredentialsSet',
+    ]
+
+from zope.interface import Interface
+from zope.schema import (
+    Int,
+    Text,
+    TextLine,
+    )
+
+from lp import _
+from lp.services.fields import PersonChoice
+
+
+class IOCIRegistryCredentialsView(Interface):
+
+    id = Int(title=_("ID"), required=True, readonly=True)
+
+
+class IOCIRegistryCredentialsEditableAttributes(Interface):
+
+    owner = PersonChoice(
+        title=_("Owner"),
+        required=True,
+        vocabulary="AllUserTeamsParticipationPlusSelf",
+        description=_("The owner of these credentials. "
+                      "Only the owner is entitled to make use of them."),
+        readonly=False)
+
+    url = TextLine(
+        title=_("URL"),
+        description=_("The registry URL."),
+        required=True,
+        readonly=False)
+
+    credentials = Text(
+        title=_("Credentials"),
+        description=_("Encrypted credentials for pushing to the registry."),
+        required=False,
+        readonly=False)
+
+
+class IOCIRegistryCredentialsEdit(Interface):
+    """`IOCIRegistryCredentials` methods that require launchpad.Edit
+    permission.
+    """
+
+    def destroySelf():
+        """Delete these credentials."""
+
+
+class IOCIRegistryCredentials(IOCIRegistryCredentialsEdit,
+                              IOCIRegistryCredentialsEditableAttributes,
+                              IOCIRegistryCredentialsView):
+    """Credentials for pushing to an OCI registry."""
+
+
+class IOCIRegistryCredentialsSet(Interface):
+    """A utility to create and access OCI Registry Credentials."""
+
+    def new(owner, url, credentials):
+        """Create an `IOCIRegistryCredentials`."""
+
+    def findByOwner(owner):
+        """Find matching `IOCIRegistryCredentials` by owner."""
diff --git a/lib/lp/oci/model/ociregistrycredentials.py b/lib/lp/oci/model/ociregistrycredentials.py
new file mode 100644
index 0000000..331659f
--- /dev/null
+++ b/lib/lp/oci/model/ociregistrycredentials.py
@@ -0,0 +1,120 @@
+# Copyright 2020 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Registry credentials for use by an `OCIPushRule`."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+__all__ = [
+    'OCIRegistryCredentials',
+    'OCIRegistryCredentialsSet',
+    ]
+
+import base64
+import json
+
+from storm.locals import (
+    Int,
+    JSON,
+    Reference,
+    Storm,
+    Unicode,
+    )
+from zope.component import getUtility
+from zope.interface import implementer
+
+from lp.oci.interfaces.ociregistrycredentials import (
+    IOCIRegistryCredentials,
+    IOCIRegistryCredentialsSet,
+    )
+from lp.services.config import config
+from lp.services.crypto.interfaces import (
+    CryptoError,
+    IEncryptedContainer,
+    )
+from lp.services.crypto.model import NaClEncryptedContainerBase
+from lp.services.database.interfaces import IStore
+
+
+@implementer(IEncryptedContainer)
+class OCIRegistrySecretsEncryptedContainer(NaClEncryptedContainerBase):
+
+    @property
+    def public_key_bytes(self):
+        if config.oci.registry_secrets_public_key is not None:
+            return base64.b64decode(
+                config.oci.registry_secrets_public_key.encode('UTF-8'))
+        else:
+            return None
+
+    @property
+    def private_key_bytes(self):
+        if config.oci.registry_secrets_private_key is not None:
+            return base64.b64decode(
+                config.oci.registry_secrets_private_key.encode('UTF-8'))
+        else:
+            return None
+
+
+@implementer(IOCIRegistryCredentials)
+class OCIRegistryCredentials(Storm):
+
+    __storm_table__ = 'OCIRegistryCredentials'
+
+    id = Int(primary=True)
+
+    owner_id = Int(name='owner', allow_none=False)
+    owner = Reference(owner_id, 'Person.id')
+
+    url = Unicode(name="url", allow_none=False)
+
+    _credentials = JSON(name="credentials", allow_none=True)
+
+    def __init__(self, owner, url, credentials):
+        self.owner = owner
+        self.url = url
+        self.credentials = credentials
+
+    @property
+    def credentials(self):
+        container = getUtility(IEncryptedContainer, "oci-registry-secrets")
+        try:
+            return json.loads(container.decrypt((
+                self._credentials["public_key"],
+                self._credentials['credentials_encrypted'])).decode("UTF-8"))
+        except CryptoError as e:
+            # XXX twom 2020-03-18 This needs a better error
+            # see SnapStoreClient.UnauthorizedUploadResponse
+            # Waiting on OCIRegistryClient.
+            raise e
+
+    @credentials.setter
+    def credentials(self, value):
+        container = getUtility(IEncryptedContainer, "oci-registry-secrets")
+        public_key, encrypted_value = container.encrypt(
+            json.dumps(value).encode('UTF-8'))
+        self._credentials = {
+            "credentials_encrypted": encrypted_value,
+            "public_key": public_key}
+
+    def destroySelf(self):
+        """See `IOCIRegistryCredentials`."""
+        store = IStore(OCIRegistryCredentials)
+        store.find(
+            OCIRegistryCredentials, OCIRegistryCredentials.id == self).remove()
+
+
+@implementer(IOCIRegistryCredentialsSet)
+class OCIRegistryCredentialsSet:
+
+    def new(self, owner, url,  credentials):
+        """See `IOCIRegistryCredentialsSet`."""
+        return OCIRegistryCredentials(owner, url, credentials)
+
+    def findByOwner(self, owner):
+        """See `IOCIRegistryCredentialsSet`."""
+        store = IStore(OCIRegistryCredentials)
+        return store.find(
+            OCIRegistryCredentials,
+            OCIRegistryCredentials.owner == owner)
diff --git a/lib/lp/oci/tests/test_ociregistrycredentials.py b/lib/lp/oci/tests/test_ociregistrycredentials.py
new file mode 100644
index 0000000..54246e3
--- /dev/null
+++ b/lib/lp/oci/tests/test_ociregistrycredentials.py
@@ -0,0 +1,116 @@
+# Copyright 2020 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for OCI image registry credential storage."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import base64
+
+from nacl.public import PrivateKey
+from testtools.matchers import (
+    Equals,
+    MatchesDict,
+    )
+from zope.component import getUtility
+from zope.security.proxy import removeSecurityProxy
+
+from lp.oci.interfaces.ociregistrycredentials import (
+    IOCIRegistryCredentials,
+    IOCIRegistryCredentialsSet,
+    )
+from lp.testing import (
+    person_logged_in,
+    TestCaseWithFactory,
+    )
+from lp.testing.layers import LaunchpadZopelessLayer
+
+
+class TestOCIRegistryCredentials(TestCaseWithFactory):
+
+    layer = LaunchpadZopelessLayer
+
+    def setUp(self):
+        super(TestOCIRegistryCredentials, self).setUp()
+        self.private_key = PrivateKey.generate()
+        self.pushConfig(
+            "oci",
+            registry_secrets_public_key=base64.b64encode(
+                bytes(self.private_key.public_key)).decode("UTF-8"))
+        self.pushConfig(
+            "oci",
+            registry_secrets_private_key=base64.b64encode(
+                bytes(self.private_key)))
+
+    def test_implements_interface(self):
+        target = getUtility(IOCIRegistryCredentialsSet).new(
+            owner=self.factory.makePerson(),
+            url='http://example.org',
+            credentials={'username': 'foo', 'password': 'bar'})
+        self.assertProvides(target, IOCIRegistryCredentials)
+
+    def test_retrieve_encrypted_credentials(self):
+        owner = self.factory.makePerson()
+        target = self.factory.makeOCIRegistryCredentials(
+            owner=owner,
+            url='http://example.org',
+            credentials={'username': 'foo', 'password': 'bar'})
+
+        with person_logged_in(owner):
+            self.assertThat(target.credentials, MatchesDict({
+                "username": Equals("foo"),
+                "password": Equals("bar")}))
+
+    def test_credentials_are_encrypted(self):
+        credentials = {'username': 'foo', 'password': 'bar'}
+        target = removeSecurityProxy(
+                    self.factory.makeOCIRegistryCredentials(
+                        credentials=credentials))
+        self.assertIn('credentials_encrypted', target._credentials)
+        self.assertIn('public_key', target._credentials)
+
+
+class TestOCIRegistryCredentialsSet(TestCaseWithFactory):
+
+    layer = LaunchpadZopelessLayer
+
+    def setUp(self):
+        super(TestOCIRegistryCredentialsSet, self).setUp()
+        self.private_key = PrivateKey.generate()
+        self.pushConfig(
+            "oci",
+            registry_secrets_public_key=base64.b64encode(
+                bytes(self.private_key.public_key)).decode("UTF-8"))
+        self.pushConfig(
+            "oci",
+            registry_secrets_private_key=base64.b64encode(
+                bytes(self.private_key)))
+
+    def test_implements_interface(self):
+        target_set = getUtility(IOCIRegistryCredentialsSet)
+        self.assertProvides(target_set, IOCIRegistryCredentialsSet)
+
+    def test_new(self):
+        owner = self.factory.makePerson()
+        url = unicode(self.factory.getUniqueURL())
+        credentials = {'username': 'foo', 'password': 'bar'}
+        target = getUtility(IOCIRegistryCredentialsSet).new(
+            owner=owner,
+            url=url,
+            credentials=credentials)
+        self.assertEqual(target.owner, owner)
+        self.assertEqual(target.url, url)
+        self.assertEqual(target.credentials, credentials)
+
+    def test_findByOwner(self):
+        owner = self.factory.makePerson()
+        for _ in range(3):
+            self.factory.makeOCIRegistryCredentials(owner=owner)
+        # make some that have a different owner
+        for _ in range(5):
+            self.factory.makeOCIRegistryCredentials()
+
+        found = getUtility(IOCIRegistryCredentialsSet).findByOwner(owner)
+        self.assertEqual(found.count(), 3)
+        for target in found:
+            self.assertEqual(target.owner, owner)
diff --git a/lib/lp/security.py b/lib/lp/security.py
index e31c65c..d4f74f4 100644
--- a/lib/lp/security.py
+++ b/lib/lp/security.py
@@ -114,6 +114,7 @@ from lp.hardwaredb.interfaces.hwdb import (
     )
 from lp.oci.interfaces.ocirecipe import IOCIRecipe
 from lp.oci.interfaces.ocirecipebuild import IOCIRecipeBuild
+from lp.oci.interfaces.ociregistrycredentials import IOCIRegistryCredentials
 from lp.registry.enums import PersonVisibility
 from lp.registry.interfaces.announcement import IAnnouncement
 from lp.registry.interfaces.distribution import IDistribution
@@ -3510,3 +3511,20 @@ class AdminOCIRecipe(AuthorizationBase):
 class ViewOCIRecipeBuild(AnonymousAuthorization):
     """Anyone can view an `IOCIRecipe`."""
     usedfor = IOCIRecipeBuild
+
+
+class ViewOCIRegistryCredentials(AuthorizationBase):
+    permission = 'launchpad.View'
+    usedfor = IOCIRegistryCredentials
+
+    def checkAuthenticated(self, user):
+        import pdb; pdb.set_trace()
+        return user.isOwner(self.obj)
+
+
+class EditOCIRegistryCredentials(AuthorizationBase):
+    permission = 'launchpad.Edit'
+    usedfor = IOCIRegistryCredentials
+
+    def checkAuthenticated(self, user):
+        return user.isOwner(self.obj)
diff --git a/lib/lp/services/config/schema-lazr.conf b/lib/lp/services/config/schema-lazr.conf
index ad3d36a..64c8473 100644
--- a/lib/lp/services/config/schema-lazr.conf
+++ b/lib/lp/services/config/schema-lazr.conf
@@ -1792,6 +1792,17 @@ store_secrets_private_key: none
 # datatype: string
 store_secrets_public_key: none
 
+[oci]
+# Base64-encoded NaCl private key for decrypting registry upload credentials.
+# This should only be set in secret overlays on systems that need to perform
+# registry uploads on behalf of users.
+# datatype: string
+registry_secrets_public_key: none
+
+# Base64-encoded NaCl public key for encrypting registry upload credentials.
+# datatype: string
+registry_secrets_private_key: none
+
 [process-job-source-groups]
 # This section is used by cronscripts/process-job-source-groups.py.
 dbuser: process-job-source-groups
diff --git a/lib/lp/testing/factory.py b/lib/lp/testing/factory.py
index 92e7825..30baf33 100644
--- a/lib/lp/testing/factory.py
+++ b/lib/lp/testing/factory.py
@@ -159,6 +159,7 @@ from lp.hardwaredb.interfaces.hwdb import (
     )
 from lp.oci.interfaces.ocirecipe import IOCIRecipeSet
 from lp.oci.interfaces.ocirecipebuild import IOCIRecipeBuildSet
+from lp.oci.interfaces.ociregistrycredentials import IOCIRegistryCredentialsSet
 from lp.oci.model.ocirecipe import OCIRecipeArch
 from lp.oci.model.ocirecipebuild import OCIFile
 from lp.registry.enums import (
@@ -5032,6 +5033,20 @@ class BareLaunchpadObjectFactory(ObjectFactory):
         return OCIFile(build=build, library_file=library_file,
                        layer_file_digest=layer_file_digest)
 
+    def makeOCIRegistryCredentials(self, owner=None, url=None,
+                                   credentials=DEFAULT):
+        """Make a new OCIRegistryCredentials."""
+        if owner is None:
+            owner = self.makePerson()
+        if url is None:
+            url = unicode(self.getUniqueURL())
+        if credentials is DEFAULT:
+            credentials = None
+        return getUtility(IOCIRegistryCredentialsSet).new(
+            owner=owner,
+            url=url,
+            credentials=credentials)
+
 
 # Some factory methods return simple Python types. We don't add
 # security wrappers for them, as well as for objects created by