← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~apw/launchpad/signing-gpg-sign-checksum-files into lp:launchpad

 

Andy Whitcroft has proposed merging lp:~apw/launchpad/signing-gpg-sign-checksum-files into lp:launchpad with lp:~cjwatson/launchpad/refactor-custom-archive-config as a prerequisite.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #1285919 in Launchpad itself: "uefi archive files don't have signed checksums"
  https://bugs.launchpad.net/launchpad/+bug/1285919
  Bug #1577736 in Launchpad itself: "Expand archive signing to kernel modules"
  https://bugs.launchpad.net/launchpad/+bug/1577736

For more details, see:
https://code.launchpad.net/~apw/launchpad/signing-gpg-sign-checksum-files/+merge/297177
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~apw/launchpad/signing-gpg-sign-checksum-files into lp:launchpad.
=== modified file 'lib/lp/archivepublisher/archivesigningkey.py'
--- lib/lp/archivepublisher/archivesigningkey.py	2016-03-23 17:55:39 +0000
+++ lib/lp/archivepublisher/archivesigningkey.py	2016-06-13 10:56:49 +0000
@@ -144,3 +144,20 @@
             os.path.join(suite_path, 'InRelease'), 'w')
         inline_release_file.write(inline_release)
         inline_release_file.close()
+
+    def signFile(self, path):
+        """See `IArchiveSigningKey`."""
+        assert self.archive.signing_key is not None, (
+            "No signing key available for %s" % self.archive.displayname)
+
+        secret_key_export = open(
+            self.getPathForSecretKey(self.archive.signing_key)).read()
+        gpghandler = getUtility(IGPGHandler)
+        secret_key = gpghandler.importSecretKey(secret_key_export)
+
+        file_content = open(path).read()
+        signature = gpghandler.signContent(
+            file_content, secret_key, mode=gpgme.SIG_MODE_DETACH)
+
+        with open(os.path.join(path + '.gpg'), 'w') as signature_file:
+            signature_file.write(signature)

=== modified file 'lib/lp/archivepublisher/interfaces/archivesigningkey.py'
--- lib/lp/archivepublisher/interfaces/archivesigningkey.py	2015-10-21 09:37:08 +0000
+++ lib/lp/archivepublisher/interfaces/archivesigningkey.py	2016-06-13 10:56:49 +0000
@@ -85,3 +85,10 @@
         :raises AssertionError: if the context archive has no `signing_key`
             or there is no Release file in the given suite.
         """
+
+    def signFile(path):
+        """Sign the corresponding file.
+
+        :param path: path within dists to sign with the archive key.
+        :raises AssertionError: if the context archive has no `signing_key`.
+        """

=== modified file 'lib/lp/archivepublisher/publishing.py'
--- lib/lp/archivepublisher/publishing.py	2016-06-13 10:56:49 +0000
+++ lib/lp/archivepublisher/publishing.py	2016-06-13 10:56:49 +0000
@@ -1478,16 +1478,17 @@
 class DirectoryHash:
     """Represents a directory hierarchy for hashing."""
 
-    def __init__(self, root, tmpdir, log):
+    def __init__(self, root, tmpdir, signer=None, logger=None):
         self.root = root
         self.tmpdir = tmpdir
-        self.log = log
+        self.logger = logger
+        self.signer = signer
         self.checksum_hash = []
 
         for usable in self._usable_archive_hashes:
-            checksum_file = os.path.join(self.root, usable.dh_name)
-            self.checksum_hash.append(
-                (RepositoryIndexFile(checksum_file, self.tmpdir), usable))
+            csum_path = os.path.join(self.root, usable.dh_name)
+            self.checksum_hash.append((csum_path,
+                RepositoryIndexFile(csum_path, self.tmpdir), usable))
 
     def __enter__(self):
         return self
@@ -1505,7 +1506,7 @@
         """Add a path to be checksummed."""
         hashes = [
             (checksum_file, archive_hash.hash_factory())
-            for (checksum_file, archive_hash) in self.checksum_hash]
+            for (_, checksum_file, archive_hash) in self.checksum_hash]
         with open(path, 'rb') as in_file:
             for chunk in iter(lambda: in_file.read(256 * 1024), ""):
                 for (checksum_file, hashobj) in hashes:
@@ -1522,5 +1523,7 @@
                 self.add(os.path.join(dirpath, filename))
 
     def close(self):
-        for (checksum_file, archive_hash) in self.checksum_hash:
+        for (checksum_path, checksum_file, archive_hash) in self.checksum_hash:
             checksum_file.close()
+            if self.signer:
+                self.signer.signFile(checksum_path)

=== modified file 'lib/lp/archivepublisher/signing.py'
--- lib/lp/archivepublisher/signing.py	2016-06-13 10:56:49 +0000
+++ lib/lp/archivepublisher/signing.py	2016-06-13 10:56:49 +0000
@@ -28,6 +28,12 @@
 
 from lp.archivepublisher.config import getPubConfig
 from lp.archivepublisher.customupload import CustomUpload
+<<<<<<< TREE
+=======
+from lp.archivepublisher.interfaces.archivesigningkey import (
+    IArchiveSigningKey,
+    )
+>>>>>>> MERGE-SOURCE
 from lp.services.osutils import remove_if_exists
 from lp.soyuz.interfaces.queue import CustomUploadError
 
@@ -328,7 +334,12 @@
             self.convertToTarball()
 
         versiondir = os.path.join(self.tmpdir, self.version)
-        with DirectoryHash(versiondir, self.tmpdir, self.logger) as hasher:
+
+        signer = None
+        if self.archive.signing_key:
+            signer = IArchiveSigningKey(self.archive)
+        with DirectoryHash(versiondir, self.tmpdir, signer,
+                           self.logger) as hasher:
             hasher.add_dir(versiondir)
 
     def shouldInstall(self, filename):

=== modified file 'lib/lp/archivepublisher/tests/test_publisher.py'
--- lib/lp/archivepublisher/tests/test_publisher.py	2016-06-06 17:13:42 +0000
+++ lib/lp/archivepublisher/tests/test_publisher.py	2016-06-13 10:56:49 +0000
@@ -94,10 +94,7 @@
 from lp.soyuz.interfaces.archive import IArchiveSet
 from lp.soyuz.interfaces.archivefile import IArchiveFileSet
 from lp.soyuz.tests.test_publishing import TestNativePublishingBase
-from lp.testing import (
-    TestCase,
-    TestCaseWithFactory,
-    )
+from lp.testing import TestCaseWithFactory
 from lp.testing.fakemethod import FakeMethod
 from lp.testing.gpgkeys import gpgkeysdir
 from lp.testing.keyserver import KeyServerTac
@@ -105,6 +102,7 @@
     LaunchpadZopelessLayer,
     ZopelessDatabaseLayer,
     )
+from lp.archivepublisher.interfaces.publisherconfig import IPublisherConfigSet
 
 
 RELEASE = PackagePublishingPocket.RELEASE
@@ -3167,8 +3165,8 @@
         self.assertEqual([], self.makePublisher(partner).subcomponents)
 
 
-class TestDirectoryHash(TestCase):
-    """Unit tests for DirectoryHash object."""
+class TestDirectoryHashHelpers(TestCaseWithFactory):
+    """Helper functions for DirectoryHash testing."""
 
     def createTestFile(self, path, content):
         with open(path, "w") as tfd:
@@ -3194,6 +3192,23 @@
                         file_list.append(line.strip().split(' '))
         return result
 
+    def fetchSigs(self, rootdir):
+        result = {}
+        for dh_file in self.all_hash_files:
+            checksum_sig = os.path.join(rootdir, dh_file) + '.gpg'
+            if os.path.exists(checksum_sig):
+                with open(checksum_sig, "r") as sfd:
+                    for line in sfd:
+                        sig_lines = result.setdefault(dh_file, [])
+                        sig_lines.append(line)
+        return result
+
+
+class TestDirectoryHash(TestDirectoryHashHelpers):
+    """Unit tests for DirectoryHash object."""
+
+    layer = ZopelessDatabaseLayer
+
     def test_checksum_files_created(self):
         tmpdir = unicode(self.makeTemporaryDirectory())
         rootdir = unicode(self.makeTemporaryDirectory())
@@ -3202,7 +3217,7 @@
             checksum_file = os.path.join(rootdir, dh_file)
             self.assertFalse(os.path.exists(checksum_file))
 
-        with DirectoryHash(rootdir, tmpdir, None) as dh:
+        with DirectoryHash(rootdir, tmpdir, None):
             pass
 
         for dh_file in self.all_hash_files:
@@ -3226,7 +3241,7 @@
         test3_file = os.path.join(rootdir, "subdir1", "test3")
         test3_hash = self.createTestFile(test3_file, "test3")
 
-        with DirectoryHash(rootdir, tmpdir, None) as dh:
+        with DirectoryHash(rootdir, tmpdir) as dh:
             dh.add(test1_file)
             dh.add(test2_file)
             dh.add(test3_file)
@@ -3254,14 +3269,71 @@
         test3_file = os.path.join(rootdir, "subdir1", "test3")
         test3_hash = self.createTestFile(test3_file, "test3 dir")
 
-        with DirectoryHash(rootdir, tmpdir, None) as dh:
-            dh.add_dir(rootdir)
-
-        expected = {
-            'SHA256SUMS': MatchesSetwise(
-                Equals([test1_hash, "*test1"]),
-                Equals([test2_hash, "*test2"]),
-                Equals([test3_hash, "*subdir1/test3"]),
-            ),
-        }
-        self.assertThat(self.fetchSums(rootdir), MatchesDict(expected))
+        with DirectoryHash(rootdir, tmpdir) as dh:
+            dh.add_dir(rootdir)
+
+        expected = {
+            'SHA256SUMS': MatchesSetwise(
+                Equals([test1_hash, "*test1"]),
+                Equals([test2_hash, "*test2"]),
+                Equals([test3_hash, "*subdir1/test3"]),
+            ),
+        }
+        self.assertThat(self.fetchSums(rootdir), MatchesDict(expected))
+
+
+class TestDirectoryHashSigning(TestDirectoryHashHelpers):
+    """Unit tests for DirectoryHash object, signing functionality."""
+
+    layer = ZopelessDatabaseLayer
+
+    def setUp(self):
+        super(TestDirectoryHashSigning, self).setUp()
+        self.temp_dir = self.makeTemporaryDirectory()
+        self.distro = self.factory.makeDistribution()
+        db_pubconf = getUtility(IPublisherConfigSet).getByDistribution(
+            self.distro)
+        db_pubconf.root_dir = unicode(self.temp_dir)
+        self.archive = self.factory.makeArchive(
+            distribution=self.distro, purpose=ArchivePurpose.PRIMARY)
+        self.suite = "distroseries"
+
+        # Setup a keyserver so we can install the archive key.
+        tac = KeyServerTac()
+        tac.setUp()
+
+        key_path = os.path.join(gpgkeysdir, 'ppa-sample@xxxxxxxxxxxxxxxxx')
+        IArchiveSigningKey(self.archive).setSigningKey(key_path)
+
+        tac.tearDown()
+
+    def test_basic_directory_add_signed(self):
+        tmpdir = unicode(self.makeTemporaryDirectory())
+        rootdir = unicode(self.makeTemporaryDirectory())
+        test1_file = os.path.join(rootdir, "test1")
+        test1_hash = self.createTestFile(test1_file, "test1 dir")
+
+        test2_file = os.path.join(rootdir, "test2")
+        test2_hash = self.createTestFile(test2_file, "test2 dir")
+
+        os.mkdir(os.path.join(rootdir, "subdir1"))
+
+        test3_file = os.path.join(rootdir, "subdir1", "test3")
+        test3_hash = self.createTestFile(test3_file, "test3 dir")
+
+        signer = IArchiveSigningKey(self.archive)
+        with DirectoryHash(rootdir, tmpdir, signer=signer) as dh:
+            dh.add_dir(rootdir)
+
+        expected = {
+            'SHA256SUMS': MatchesSetwise(
+                Equals([test1_hash, "*test1"]),
+                Equals([test2_hash, "*test2"]),
+                Equals([test3_hash, "*subdir1/test3"]),
+            ),
+        }
+        self.assertThat(self.fetchSums(rootdir), MatchesDict(expected))
+        sig_content = self.fetchSigs(rootdir)
+        for dh_file in sig_content:
+            self.assertEqual(
+                sig_content[dh_file][0], '-----BEGIN PGP SIGNATURE-----\n')

=== modified file 'lib/lp/archivepublisher/tests/test_signing.py'
--- lib/lp/archivepublisher/tests/test_signing.py	2016-06-13 10:56:49 +0000
+++ lib/lp/archivepublisher/tests/test_signing.py	2016-06-13 10:56:49 +0000
@@ -17,6 +17,9 @@
     CustomUploadAlreadyExists,
     CustomUploadBadUmask,
     )
+from lp.archivepublisher.interfaces.archivesigningkey import (
+    IArchiveSigningKey,
+    )
 from lp.archivepublisher.interfaces.publisherconfig import IPublisherConfigSet
 from lp.archivepublisher.signing import (
     SigningUpload,
@@ -27,6 +30,8 @@
 from lp.soyuz.enums import ArchivePurpose
 from lp.testing import TestCaseWithFactory
 from lp.testing.fakemethod import FakeMethod
+from lp.testing.gpgkeys import gpgkeysdir
+from lp.testing.keyserver import KeyServerTac
 from lp.testing.layers import ZopelessDatabaseLayer
 
 
@@ -111,6 +116,15 @@
             self.temp_dir, "signing", "signing-owner", "testing")
         self.testcase_cn = '/CN=PPA signing-owner testing/'
 
+    def setUpArchiveKey(self):
+        tac = KeyServerTac()
+        tac.setUp()
+
+        key_path = os.path.join(gpgkeysdir, 'ppa-sample@xxxxxxxxxxxxxxxxx')
+        IArchiveSigningKey(self.archive).setSigningKey(key_path)
+
+        tac.tearDown()
+
     def setUpUefiKeys(self, create=True):
         self.key = os.path.join(self.signing_dir, "uefi.key")
         self.cert = os.path.join(self.signing_dir, "uefi.crt")
@@ -657,6 +671,22 @@
              "1.0", "SHA256SUMS")
         self.assertTrue(os.path.exists(sha256file))
 
+    def test_checksumming_tree_signed(self):
+        # Specifying no options should leave us with an open tree,
+        # confirm it is checksummed.  Supply an archive signing key
+        # which should trigger signing of the checksum file.
+        self.setUpArchiveKey()
+        self.setUpUefiKeys()
+        self.setUpKmodKeys()
+        self.openArchive("test", "1.0", "amd64")
+        self.tarfile.add_file("1.0/empty.efi", "")
+        self.tarfile.add_file("1.0/empty.ko", "")
+        self.process_emulate()
+        sha256file = os.path.join(self.getSignedPath("test", "amd64"),
+             "1.0", "SHA256SUMS")
+        self.assertTrue(os.path.exists(sha256file))
+        self.assertTrue(os.path.exists(sha256file + '.gpg'))
+
 
 class TestUefi(TestSigningHelpers):
 


Follow ups