← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:refactor-update-by-hash into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:refactor-update-by-hash into launchpad:master.

Commit message:
Refactor _updateByHash to clarify logic

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/434573

William Grant pointed out in https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/373971 that the flow of `Publisher._updateByHash` was quite difficult to understand, and proposed an alternative structure in https://code.launchpad.net/~wgrant/launchpad/+git/launchpad/+merge/390811.  Over two years later, I've finally got round to digesting and polishing this.

The basic idea is that, rather than interleaving database/disk queries and actions, we now gather all the information we need from the database and from the archive on disk at the start, and then separately take all the actions needed to reconcile them and to keep `by-hash` directories up to date.

No functional change is intended here.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:refactor-update-by-hash into launchpad:master.
diff --git a/lib/lp/archivepublisher/publishing.py b/lib/lp/archivepublisher/publishing.py
index 58d948a..b8cea18 100644
--- a/lib/lp/archivepublisher/publishing.py
+++ b/lib/lp/archivepublisher/publishing.py
@@ -48,6 +48,7 @@ from lp.registry.model.distroseries import DistroSeries
 from lp.services.database.bulk import load
 from lp.services.database.constants import UTC_NOW
 from lp.services.database.interfaces import IStore
+from lp.services.database.sqlbase import get_transaction_timestamp
 from lp.services.helpers import filenameToContentType
 from lp.services.librarian.client import LibrarianClient
 from lp.services.osutils import ensure_directory_exists, open_for_writing
@@ -65,6 +66,7 @@ from lp.soyuz.interfaces.publishing import (
     IPublishingSet,
     active_publishing_status,
 )
+from lp.soyuz.model.archivefile import ArchiveFile
 from lp.soyuz.model.binarypackagerelease import BinaryPackageRelease
 from lp.soyuz.model.distroarchseries import DistroArchSeries
 from lp.soyuz.model.publishing import (
@@ -1168,15 +1170,14 @@ class Publisher:
             return self.distro.displayname
         return "LP-PPA-%s" % get_ppa_reference(self.archive)
 
-    def _updateByHash(self, suite, release_file_name, extra_files):
-        """Update by-hash files for a suite.
+    def _getCurrentFiles(self, suite, release_file_name, extra_files):
+        # Gather information on entries in the current Release file.
+        release_path = os.path.join(
+            self._config.distsroot, suite, release_file_name
+        )
+        with open(release_path) as release_file:
+            release_data = Release(release_file)
 
-        This takes Release file data which references a set of on-disk
-        files, injects any newly-modified files from that set into the
-        librarian and the ArchiveFile table, and updates the on-disk by-hash
-        directories to be in sync with ArchiveFile.  Any on-disk by-hash
-        entries that ceased to be current sufficiently long ago are removed.
-        """
         extra_data = {}
         for filename, real_filename in extra_files.items():
             hashes = self._readIndexFileHashes(
@@ -1189,24 +1190,9 @@ class Publisher:
                     hashes[archive_hash.deb822_name]
                 )
 
-        release_path = os.path.join(
-            self._config.distsroot, suite, release_file_name
-        )
-        with open(release_path) as release_file:
-            release_data = Release(release_file)
-        archive_file_set = getUtility(IArchiveFileSet)
-        by_hashes = ByHashes(self._config.distsroot, self.log)
         suite_dir = os.path.relpath(
             os.path.join(self._config.distsroot, suite), self._config.distsroot
         )
-        container = "release:%s" % suite
-
-        def strip_dists(path):
-            assert path.startswith("dists/")
-            return path[len("dists/") :]
-
-        # Gather information on entries in the current Release file, and
-        # make sure nothing there is condemned.
         current_files = {}
         for current_entry in release_data["SHA256"] + extra_data.get(
             "SHA256", []
@@ -1214,66 +1200,93 @@ class Publisher:
             path = os.path.join(suite_dir, current_entry["name"])
             real_name = current_entry.get("real_name", current_entry["name"])
             real_path = os.path.join(suite_dir, real_name)
-            current_files[path] = (
-                int(current_entry["size"]),
-                current_entry["sha256"],
-                real_path,
-            )
-        uncondemned_files = set()
-        for db_file in archive_file_set.getByArchive(
-            self.archive, container=container, condemned=True, eager_load=True
-        ):
-            stripped_path = strip_dists(db_file.path)
-            if stripped_path in current_files:
-                current_sha256 = current_files[stripped_path][1]
-                if db_file.library_file.content.sha256 == current_sha256:
-                    uncondemned_files.add(db_file)
-        if uncondemned_files:
-            for container, path, sha256 in archive_file_set.unscheduleDeletion(
-                uncondemned_files
-            ):
-                self.log.debug(
-                    "by-hash: Unscheduled %s for %s in %s for deletion"
-                    % (sha256, path, container)
+            full_path = os.path.join(self._config.distsroot, real_path)
+            if os.path.exists(full_path):
+                current_files[path] = (
+                    int(current_entry["size"]),
+                    current_entry["sha256"],
+                    real_path,
+                )
+            else:
+                self.log.warning(
+                    "%s contains %s, but %s does not exist!"
+                    % (release_path, path, full_path)
                 )
+        return current_files
 
-        # Remove any condemned files from the database whose stay of
-        # execution has elapsed.  We ensure that we know about all the
-        # relevant by-hash directory trees before doing any removals so that
-        # we can prune them properly later.
+    def _updateByHash(self, suite, release_file_name, extra_files):
+        """Update by-hash files for a suite.
+
+        This takes Release file data which references a set of on-disk
+        files, injects any newly-modified files from that set into the
+        librarian and the ArchiveFile table, and updates the on-disk by-hash
+        directories to be in sync with ArchiveFile.  Any on-disk by-hash
+        entries that ceased to be current sufficiently long ago are removed.
+        """
+        archive_file_set = getUtility(IArchiveFileSet)
+        container = "release:%s" % suite
+
+        by_hashes = ByHashes(self._config.distsroot, self.log)
+        existing_live_files = {}
+        existing_nonlive_files = {}
+        keep_files = set()
+        reapable_files = set()
+
+        def strip_dists(path):
+            assert path.startswith("dists/")
+            return path[len("dists/") :]
+
+        # Record all files from the database.
+        db_now = get_transaction_timestamp(IStore(ArchiveFile))
         for db_file in archive_file_set.getByArchive(
-            self.archive, container=container
-        ):
-            by_hashes.registerChild(os.path.dirname(strip_dists(db_file.path)))
-        for container, path, sha256 in archive_file_set.reap(
-            self.archive, container=container
+            self.archive, container=container, eager_load=True
         ):
-            self.log.debug(
-                "by-hash: Deleted %s for %s in %s" % (sha256, path, container)
+            file_key = (
+                strip_dists(db_file.path),
+                db_file.library_file.content.sha256,
             )
+            # Ensure any subdirectories are registered early on, in case we're
+            # about to delete the only file and need to know to prune it.
+            by_hashes.registerChild(os.path.dirname(strip_dists(db_file.path)))
 
-        # Ensure that all files recorded in the database are in by-hash.
-        db_files = archive_file_set.getByArchive(
-            self.archive, container=container, eager_load=True
+            if db_file.scheduled_deletion_date is None:
+                # XXX wgrant 2020-09-16: Once we have
+                # ArchiveFile.date_superseded in place, this should be a DB
+                # constraint - i.e. there should only be a single
+                # non-superseded row for each path/content pair.
+                assert file_key not in existing_live_files
+                existing_live_files[file_key] = db_file
+            else:
+                existing_nonlive_files[file_key] = db_file
+
+            if (
+                db_file.scheduled_deletion_date is not None
+                and db_file.scheduled_deletion_date < db_now
+            ):
+                # File has expired. Mark it for reaping.
+                reapable_files.add(db_file)
+            else:
+                # File should still be on disk.
+                by_hashes.add(strip_dists(db_file.path), db_file.library_file)
+
+        # Record all files from the archive on disk.
+        current_files = self._getCurrentFiles(
+            suite, release_file_name, extra_files
         )
-        for db_file in db_files:
-            by_hashes.add(strip_dists(db_file.path), db_file.library_file)
+        new_live_files = {
+            (path, sha256) for path, (_, sha256, _) in current_files.items()
+        }
 
-        # Condemn any database records that do not correspond to current
-        # index files.
-        condemned_files = set()
-        for db_file in db_files:
-            if db_file.scheduled_deletion_date is None:
-                stripped_path = strip_dists(db_file.path)
-                if stripped_path in current_files:
-                    current_sha256 = current_files[stripped_path][1]
-                else:
-                    current_sha256 = None
-                if db_file.library_file.content.sha256 != current_sha256:
-                    condemned_files.add(db_file)
-        if condemned_files:
+        # Schedule the deletion of any ArchiveFiles which are current in the
+        # DB but weren't current in the archive this round.
+        old_files = [
+            af
+            for key, af in existing_live_files.items()
+            if key not in new_live_files
+        ]
+        if old_files:
             for container, path, sha256 in archive_file_set.scheduleDeletion(
-                condemned_files, timedelta(days=BY_HASH_STAY_OF_EXECUTION)
+                old_files, timedelta(days=BY_HASH_STAY_OF_EXECUTION)
             ):
                 self.log.debug(
                     "by-hash: Scheduled %s for %s in %s for deletion"
@@ -1281,32 +1294,64 @@ class Publisher:
                 )
 
         # Ensure that all the current index files are in by-hash and have
-        # corresponding database entries.
+        # corresponding ArchiveFiles.
         # XXX cjwatson 2016-03-15: This should possibly use bulk creation,
         # although we can only avoid about a third of the queries since the
         # librarian client has no bulk upload methods.
         for path, (size, sha256, real_path) in current_files.items():
+            file_key = (path, sha256)
             full_path = os.path.join(self._config.distsroot, real_path)
-            if os.path.exists(full_path) and not by_hashes.known(
-                path, "SHA256", sha256
-            ):
-                with open(full_path, "rb") as fileobj:
-                    db_file = archive_file_set.newFromFile(
-                        self.archive,
-                        container,
-                        os.path.join("dists", path),
-                        fileobj,
-                        size,
-                        filenameToContentType(path),
-                    )
+            assert os.path.exists(full_path)  # guaranteed by _getCurrentFiles
+            # Ensure there's a current ArchiveFile row, either by finding a
+            # matching non-live file and marking it live again, or by
+            # creating a new one based on the file on disk.
+            if file_key not in existing_live_files:
+                if file_key in existing_nonlive_files:
+                    db_file = existing_nonlive_files[file_key]
+                    keep_files.add(db_file)
+                else:
+                    with open(full_path, "rb") as fileobj:
+                        db_file = archive_file_set.newFromFile(
+                            self.archive,
+                            container,
+                            os.path.join("dists", path),
+                            fileobj,
+                            size,
+                            filenameToContentType(path),
+                        )
+            # And ensure the by-hash links exist on disk.
+            if not by_hashes.known(path, "SHA256", sha256):
                 by_hashes.add(
                     path, db_file.library_file, copy_from_path=real_path
                 )
 
-        # Finally, remove any files from disk that aren't recorded in the
-        # database and aren't active.
+        # Unschedule the deletion of any ArchiveFiles which are current in
+        # the archive this round but that were previously scheduled for
+        # deletion in the DB.
+        if keep_files:
+            for container, path, sha256 in archive_file_set.unscheduleDeletion(
+                keep_files
+            ):
+                self.log.debug(
+                    "by-hash: Unscheduled %s for %s in %s for deletion"
+                    % (sha256, path, container)
+                )
+
+        # Remove any files from disk that aren't recorded in the database.
         by_hashes.prune()
 
+        # And remove expired ArchiveFiles from the DB now that we've pruned
+        # them and their directories from disk.
+        delete_files = reapable_files - keep_files
+        if delete_files:
+            for container, path, sha256 in archive_file_set.delete(
+                delete_files
+            ):
+                self.log.debug(
+                    "by-hash: Deleted %s for %s in %s"
+                    % (sha256, path, container)
+                )
+
     def _writeReleaseFile(self, suite, release_data):
         """Write a Release file to the archive (as Release.new).
 
diff --git a/lib/lp/archivepublisher/tests/test_publisher.py b/lib/lp/archivepublisher/tests/test_publisher.py
index e52d5a3..2a35cf3 100644
--- a/lib/lp/archivepublisher/tests/test_publisher.py
+++ b/lib/lp/archivepublisher/tests/test_publisher.py
@@ -3029,6 +3029,12 @@ class TestUpdateByHash(TestPublisherBase):
                 "lp.soyuz.model.archivefile._now", lambda: self.times[-1]
             )
         )
+        self.useFixture(
+            MonkeyPatch(
+                "lp.archivepublisher.publishing.get_transaction_timestamp",
+                lambda _: self.times[-1],
+            )
+        )
 
     def advanceTime(self, delta=None, absolute=None):
         if delta is not None:
diff --git a/lib/lp/soyuz/interfaces/archivefile.py b/lib/lp/soyuz/interfaces/archivefile.py
index e41548b..1078039 100644
--- a/lib/lp/soyuz/interfaces/archivefile.py
+++ b/lib/lp/soyuz/interfaces/archivefile.py
@@ -140,11 +140,10 @@ class IArchiveFileSet(Interface):
         :return: An iterable of matched container names.
         """
 
-    def reap(archive, container=None):
-        """Delete archive files that are past their scheduled deletion date.
+    def delete(archive_files):
+        """Delete these archive files.
 
-        :param archive: Delete files from this `IArchive`.
-        :param container: Delete only files with this container.
+        :param archive_files: The `IArchiveFile`s to unschedule for deletion.
         :return: An iterable of (container, path, sha256) for files that
             were deleted.
         """
diff --git a/lib/lp/soyuz/model/archivefile.py b/lib/lp/soyuz/model/archivefile.py
index 670cc96..3aecb98 100644
--- a/lib/lp/soyuz/model/archivefile.py
+++ b/lib/lp/soyuz/model/archivefile.py
@@ -216,18 +216,17 @@ class ArchiveFileSet:
         )
 
     @staticmethod
-    def reap(archive, container=None):
+    def delete(archive_files):
         """See `IArchiveFileSet`."""
         # XXX cjwatson 2016-03-30 bug=322972: Requires manual SQL due to
         # lack of support for DELETE FROM ... USING ... in Storm.
         clauses = [
-            ArchiveFile.archive == archive,
-            ArchiveFile.scheduled_deletion_date < _now(),
+            ArchiveFile.id.is_in(
+                {archive_file.id for archive_file in archive_files}
+            ),
             ArchiveFile.library_file_id == LibraryFileAlias.id,
             LibraryFileAlias.contentID == LibraryFileContent.id,
         ]
-        if container is not None:
-            clauses.append(ArchiveFile.container == container)
         where = convert_storm_clause_to_string(And(*clauses))
         return list(
             IPrimaryStore(ArchiveFile).execute(
diff --git a/lib/lp/soyuz/tests/test_archivefile.py b/lib/lp/soyuz/tests/test_archivefile.py
index 6e8f819..f3febf1 100644
--- a/lib/lp/soyuz/tests/test_archivefile.py
+++ b/lib/lp/soyuz/tests/test_archivefile.py
@@ -256,40 +256,22 @@ class TestArchiveFile(TestCaseWithFactory):
             ),
         )
 
-    def test_reap(self):
+    def test_delete(self):
         archive = self.factory.makeArchive()
         archive_files = [
-            self.factory.makeArchiveFile(archive=archive, container="foo")
-            for _ in range(3)
+            self.factory.makeArchiveFile(archive=archive) for _ in range(4)
         ]
-        archive_files.append(self.factory.makeArchiveFile(archive=archive))
-        other_archive = self.factory.makeArchive()
-        archive_files.append(
-            self.factory.makeArchiveFile(archive=other_archive)
-        )
-        now = get_transaction_timestamp(Store.of(archive_files[0]))
-        removeSecurityProxy(
-            archive_files[0]
-        ).scheduled_deletion_date = now - timedelta(days=1)
-        removeSecurityProxy(
-            archive_files[1]
-        ).scheduled_deletion_date = now + timedelta(days=1)
-        removeSecurityProxy(
-            archive_files[3]
-        ).scheduled_deletion_date = now - timedelta(days=1)
-        removeSecurityProxy(
-            archive_files[4]
-        ).scheduled_deletion_date = now - timedelta(days=1)
-        archive_file_set = getUtility(IArchiveFileSet)
         expected_rows = [
             (
-                "foo",
-                archive_files[0].path,
-                archive_files[0].library_file.content.sha256,
-            ),
+                archive_file.container,
+                archive_file.path,
+                archive_file.library_file.content.sha256,
+            )
+            for archive_file in archive_files[:2]
         ]
-        rows = archive_file_set.reap(archive, container="foo")
+        archive_file_set = getUtility(IArchiveFileSet)
+        rows = archive_file_set.delete(archive_files[:2])
         self.assertContentEqual(expected_rows, rows)
         self.assertContentEqual(
-            archive_files[1:4], archive_file_set.getByArchive(archive)
+            archive_files[2:], archive_file_set.getByArchive(archive)
         )