← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:archive-translate-path into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:archive-translate-path into launchpad:master with ~cjwatson/launchpad:archive-get-pool-file-by-path as a prerequisite.

Commit message:
Add ArchiveAPI.translatePath

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/430333

This is available over internal XML-RPC, and will ultimately allow building a service to serve files from archives without needing access to a local file system where the archive's files have been published.  See https://docs.google.com/document/d/11etjE-PXDsYwFxVac4V9toNhRtCclDzZx1TfzcX8_nw.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:archive-translate-path into launchpad:master.
diff --git a/lib/lp/soyuz/interfaces/archiveapi.py b/lib/lp/soyuz/interfaces/archiveapi.py
index 6d18f04..b8acecf 100644
--- a/lib/lp/soyuz/interfaces/archiveapi.py
+++ b/lib/lp/soyuz/interfaces/archiveapi.py
@@ -41,3 +41,17 @@ class IArchiveAPI(Interface):
             is not equal to the selected token for this archive; otherwise
             None.
         """
+
+    def translatePath(archive_reference, path):
+        """Find the librarian URL for a relative path within an archive.
+
+        :param archive_reference: The reference form of the archive to check.
+        :param path: The relative path within the archive.  This should not
+            begin with a "/" character.
+
+        :return: A `NotFound` fault if `archive_reference` does not identify
+            an archive, or the archive's repository format is something
+            other than `ArchiveRepositoryFormat.DEBIAN`, or the path does
+            not identify a file that exists in this archive; otherwise a
+            librarian URL.
+        """
diff --git a/lib/lp/soyuz/interfaces/archivefile.py b/lib/lp/soyuz/interfaces/archivefile.py
index d104d9b..b72c54e 100644
--- a/lib/lp/soyuz/interfaces/archivefile.py
+++ b/lib/lp/soyuz/interfaces/archivefile.py
@@ -88,6 +88,7 @@ class IArchiveFileSet(Interface):
         archive,
         container=None,
         path=None,
+        sha256=None,
         only_condemned=False,
         eager_load=False,
     ):
@@ -96,6 +97,10 @@ class IArchiveFileSet(Interface):
         :param archive: Return files in this `IArchive`.
         :param container: Return only files with this container.
         :param path: Return only files with this path.
+        :param path_parent: Return only files whose immediate parent
+            directory is this path.
+        :param sha256: If not None, return only files with this SHA-256
+            checksum.
         :param only_condemned: If True, return only files with a
             scheduled_deletion_date set.
         :param eager_load: If True, preload related `LibraryFileAlias` and
diff --git a/lib/lp/soyuz/model/archivefile.py b/lib/lp/soyuz/model/archivefile.py
index 9e240b5..933e6d7 100644
--- a/lib/lp/soyuz/model/archivefile.py
+++ b/lib/lp/soyuz/model/archivefile.py
@@ -9,6 +9,7 @@ __all__ = [
 ]
 
 import os.path
+import re
 
 import pytz
 from storm.databases.postgres import Returning
@@ -21,7 +22,7 @@ from lp.services.database.constants import UTC_NOW
 from lp.services.database.decoratedresultset import DecoratedResultSet
 from lp.services.database.interfaces import IMasterStore, IStore
 from lp.services.database.sqlbase import convert_storm_clause_to_string
-from lp.services.database.stormexpr import BulkUpdate
+from lp.services.database.stormexpr import BulkUpdate, RegexpMatch
 from lp.services.librarian.interfaces import ILibraryFileAliasSet
 from lp.services.librarian.model import LibraryFileAlias, LibraryFileContent
 from lp.soyuz.interfaces.archivefile import IArchiveFile, IArchiveFileSet
@@ -98,6 +99,8 @@ class ArchiveFileSet:
         archive,
         container=None,
         path=None,
+        path_parent=None,
+        sha256=None,
         only_condemned=False,
         eager_load=False,
     ):
@@ -109,6 +112,20 @@ class ArchiveFileSet:
             clauses.append(ArchiveFile.container == container)
         if path is not None:
             clauses.append(ArchiveFile.path == path)
+        if path_parent is not None:
+            clauses.append(
+                RegexpMatch(
+                    ArchiveFile.path, "^%s/[^/]+$" % re.escape(path_parent)
+                )
+            )
+        if sha256 is not None:
+            clauses.extend(
+                [
+                    ArchiveFile.library_file == LibraryFileAlias.id,
+                    LibraryFileAlias.contentID == LibraryFileContent.id,
+                    LibraryFileContent.sha256 == sha256,
+                ]
+            )
         if only_condemned:
             clauses.append(ArchiveFile.scheduled_deletion_date != None)
         archive_files = IStore(ArchiveFile).find(ArchiveFile, *clauses)
diff --git a/lib/lp/soyuz/tests/test_archivefile.py b/lib/lp/soyuz/tests/test_archivefile.py
index e4457e2..ca295ad 100644
--- a/lib/lp/soyuz/tests/test_archivefile.py
+++ b/lib/lp/soyuz/tests/test_archivefile.py
@@ -117,6 +117,42 @@ class TestArchiveFile(TestCaseWithFactory):
             [archive_files[2]],
             archive_file_set.getByArchive(archives[1], only_condemned=True),
         )
+        self.assertContentEqual(
+            [archive_files[0]],
+            archive_file_set.getByArchive(
+                archives[0],
+                sha256=archive_files[0].library_file.content.sha256,
+            ),
+        )
+        self.assertContentEqual(
+            [], archive_file_set.getByArchive(archives[0], sha256="nonsense")
+        )
+
+    def test_getByArchive_path_parent(self):
+        archive = self.factory.makeArchive()
+        archive_files = [
+            self.factory.makeArchiveFile(archive=archive, path=path)
+            for path in (
+                "dists/jammy/InRelease",
+                "dists/jammy/Release",
+                "dists/jammy/main/binary-amd64/Release",
+            )
+        ]
+        archive_file_set = getUtility(IArchiveFileSet)
+        self.assertContentEqual(
+            archive_files[:2],
+            archive_file_set.getByArchive(archive, path_parent="dists/jammy"),
+        )
+        self.assertContentEqual(
+            [archive_files[2]],
+            archive_file_set.getByArchive(
+                archive, path_parent="dists/jammy/main/binary-amd64"
+            ),
+        )
+        self.assertContentEqual(
+            [],
+            archive_file_set.getByArchive(archive, path_parent="dists/xenial"),
+        )
 
     def test_scheduleDeletion(self):
         archive_files = [self.factory.makeArchiveFile() for _ in range(3)]
diff --git a/lib/lp/soyuz/xmlrpc/archive.py b/lib/lp/soyuz/xmlrpc/archive.py
index 18b2618..a23b7fa 100644
--- a/lib/lp/soyuz/xmlrpc/archive.py
+++ b/lib/lp/soyuz/xmlrpc/archive.py
@@ -8,6 +8,9 @@ __all__ = [
 ]
 
 import logging
+from pathlib import PurePath
+from typing import Optional, Union
+from xmlrpc.client import Fault
 
 from pymacaroons import Macaroon
 from zope.component import getUtility
@@ -17,9 +20,11 @@ from zope.security.proxy import removeSecurityProxy
 
 from lp.services.macaroons.interfaces import NO_USER, IMacaroonIssuer
 from lp.services.webapp import LaunchpadXMLRPCView
+from lp.soyuz.enums import ArchiveRepositoryFormat
 from lp.soyuz.interfaces.archive import IArchiveSet
 from lp.soyuz.interfaces.archiveapi import IArchiveAPI
 from lp.soyuz.interfaces.archiveauthtoken import IArchiveAuthTokenSet
+from lp.soyuz.interfaces.archivefile import IArchiveFileSet
 from lp.xmlrpc import faults
 from lp.xmlrpc.helpers import return_fault
 
@@ -108,3 +113,124 @@ class ArchiveAPI(LaunchpadXMLRPCView):
         return self._checkArchiveAuthToken(
             archive_reference, username, password
         )
+
+    def _translatePathByHash(
+        self, archive_reference: str, archive, path: PurePath
+    ) -> Optional[str]:
+        suite = path.parts[1]
+        checksum_type = path.parts[-2]
+        checksum = path.parts[-1]
+        # We only publish by-hash files for a single checksum type at
+        # present.  See `lp.archivepublisher.publishing`.
+        if checksum_type != "SHA256":
+            return None
+
+        archive_file = (
+            getUtility(IArchiveFileSet)
+            .getByArchive(
+                archive=archive,
+                container="release:%s" % suite,
+                path_parent="/".join(path.parts[:-3]),
+                sha256=checksum,
+            )
+            .any()
+        )
+        if archive_file is None:
+            return None
+
+        log.info(
+            "%s: %s (by-hash) -> LFA %d",
+            archive_reference,
+            path.as_posix(),
+            archive_file.library_file.id,
+        )
+        return archive_file.library_file.getURL()
+
+    def _translatePathNonPool(
+        self, archive_reference: str, archive, path: PurePath
+    ) -> Optional[str]:
+        archive_file = (
+            getUtility(IArchiveFileSet)
+            .getByArchive(archive=archive, path=path.as_posix())
+            .one()
+        )
+        if archive_file is None:
+            return None
+
+        log.info(
+            "%s: %s (non-pool) -> LFA %d",
+            archive_reference,
+            path.as_posix(),
+            archive_file.library_file.id,
+        )
+        return archive_file.library_file.getURL()
+
+    def _translatePathPool(
+        self, archive_reference: str, archive, path: PurePath
+    ) -> Optional[str]:
+        lfa = archive.getPoolFileByPath(path)
+        if lfa is None:
+            return None
+
+        log.info(
+            "%s: %s (pool) -> LFA %d",
+            archive_reference,
+            path.as_posix(),
+            lfa.id,
+        )
+        return lfa.getURL()
+
+    @return_fault
+    def _translatePath(self, archive_reference: str, path: PurePath) -> str:
+        archive = getUtility(IArchiveSet).getByReference(archive_reference)
+        if archive is None:
+            log.info("%s: No archive found", archive_reference)
+            raise faults.NotFound(
+                message="No archive found for '%s'." % archive_reference
+            )
+        archive = removeSecurityProxy(archive)
+        if archive.repository_format != ArchiveRepositoryFormat.DEBIAN:
+            log.info(
+                "%s: Repository format is %s",
+                archive_reference,
+                archive.repository_format,
+            )
+            raise faults.NotFound(
+                message="Can't translate paths in '%s' with format %s."
+                % (archive_reference, archive.repository_format)
+            )
+
+        # Consider by-hash index files.
+        if path.parts[0] == "dists" and path.parts[2:][-3:-2] == ("by-hash",):
+            url = self._translatePathByHash(archive_reference, archive, path)
+            if url is not None:
+                return url
+
+        # Consider other non-pool files.
+        if path.parts[0] != "pool":
+            url = self._translatePathNonPool(archive_reference, archive, path)
+            if url is not None:
+                return url
+            log.info("%s: %s not found", archive_reference, path.as_posix())
+            raise faults.NotFound(
+                message="'%s' not found in '%s'."
+                % (path.as_posix(), archive_reference)
+            )
+
+        # Consider pool files.
+        url = self._translatePathPool(archive_reference, archive, path)
+        if url is not None:
+            return url
+        log.info("%s: %s not found", archive_reference, path.as_posix())
+        raise faults.NotFound(
+            message="'%s' not found in '%s'."
+            % (path.as_posix(), archive_reference)
+        )
+
+    def translatePath(
+        self, archive_reference: str, path: str
+    ) -> Union[str, Fault]:
+        """See `IArchiveAPI`."""
+        # This thunk exists because you can't use a decorated function as
+        # the implementation of a method exported over XML-RPC.
+        return self._translatePath(archive_reference, PurePath(path))
diff --git a/lib/lp/soyuz/xmlrpc/tests/test_archive.py b/lib/lp/soyuz/xmlrpc/tests/test_archive.py
index dce28aa..cdb7d7d 100644
--- a/lib/lp/soyuz/xmlrpc/tests/test_archive.py
+++ b/lib/lp/soyuz/xmlrpc/tests/test_archive.py
@@ -8,8 +8,10 @@ from zope.component import getUtility
 from zope.security.proxy import removeSecurityProxy
 
 from lp.buildmaster.enums import BuildStatus
+from lp.services.database.interfaces import IStore
 from lp.services.features.testing import FeatureFixture
 from lp.services.macaroons.interfaces import IMacaroonIssuer
+from lp.soyuz.enums import ArchiveRepositoryFormat, PackagePublishingStatus
 from lp.soyuz.interfaces.archive import NAMED_AUTH_TOKEN_FEATURE_FLAG
 from lp.soyuz.xmlrpc.archive import ArchiveAPI
 from lp.testing import TestCaseWithFactory
@@ -33,53 +35,48 @@ class TestArchiveAPI(TestCaseWithFactory):
     def assertLogs(self, message):
         self.assertEqual([message], self.logger.output.splitlines())
 
-    def assertNotFound(
-        self, archive_reference, username, password, message, log_message
-    ):
-        """Assert that an archive auth token check returns NotFound."""
-        fault = self.archive_api.checkArchiveAuthToken(
-            archive_reference, username, password
-        )
+    def assertNotFound(self, func_name, message, log_message, *args, **kwargs):
+        """Assert that a call returns NotFound."""
+        fault = getattr(self.archive_api, func_name)(*args, **kwargs)
         self.assertEqual(faults.NotFound(message), fault)
         self.assertLogs(log_message)
 
-    def assertUnauthorized(
-        self, archive_reference, username, password, log_message
-    ):
-        """Assert that an archive auth token check returns Unauthorized."""
-        fault = self.archive_api.checkArchiveAuthToken(
-            archive_reference, username, password
-        )
+    def assertUnauthorized(self, func_name, log_message, *args, **kwargs):
+        """Assert that a call returns Unauthorized."""
+        fault = getattr(self.archive_api, func_name)(*args, **kwargs)
         self.assertEqual(faults.Unauthorized("Authorisation required."), fault)
         self.assertLogs(log_message)
 
     def test_checkArchiveAuthToken_unknown_archive(self):
         self.assertNotFound(
+            "checkArchiveAuthToken",
+            "No archive found for '~nonexistent/unknown/bad'.",
+            "user@~nonexistent/unknown/bad: No archive found",
             "~nonexistent/unknown/bad",
             "user",
             "",
-            "No archive found for '~nonexistent/unknown/bad'.",
-            "user@~nonexistent/unknown/bad: No archive found",
         )
 
     def test_checkArchiveAuthToken_no_tokens(self):
         archive = removeSecurityProxy(self.factory.makeArchive(private=True))
         self.assertNotFound(
+            "checkArchiveAuthToken",
+            "No valid tokens for 'nobody' in '%s'." % archive.reference,
+            "nobody@%s: No valid tokens" % archive.reference,
             archive.reference,
             "nobody",
             "",
-            "No valid tokens for 'nobody' in '%s'." % archive.reference,
-            "nobody@%s: No valid tokens" % archive.reference,
         )
 
     def test_checkArchiveAuthToken_no_named_tokens(self):
         archive = removeSecurityProxy(self.factory.makeArchive(private=True))
         self.assertNotFound(
+            "checkArchiveAuthToken",
+            "No valid tokens for '+missing' in '%s'." % archive.reference,
+            "+missing@%s: No valid tokens" % archive.reference,
             archive.reference,
             "+missing",
             "",
-            "No valid tokens for '+missing' in '%s'." % archive.reference,
-            "+missing@%s: No valid tokens" % archive.reference,
         )
 
     def test_checkArchiveAuthToken_buildd_macaroon_wrong_archive(self):
@@ -94,11 +91,12 @@ class TestArchiveAPI(TestCaseWithFactory):
         )
         macaroon = issuer.issueMacaroon(build)
         self.assertUnauthorized(
+            "checkArchiveAuthToken",
+            "buildd@%s: Macaroon verification failed"
+            % other_archive.reference,
             other_archive.reference,
             "buildd",
             macaroon.serialize(),
-            "buildd@%s: Macaroon verification failed"
-            % other_archive.reference,
         )
 
     def test_checkArchiveAuthToken_buildd_macaroon_not_building(self):
@@ -109,10 +107,11 @@ class TestArchiveAPI(TestCaseWithFactory):
         )
         macaroon = issuer.issueMacaroon(build)
         self.assertUnauthorized(
+            "checkArchiveAuthToken",
+            "buildd@%s: Macaroon verification failed" % archive.reference,
             archive.reference,
             "buildd",
             macaroon.serialize(),
-            "buildd@%s: Macaroon verification failed" % archive.reference,
         )
 
     def test_checkArchiveAuthToken_buildd_macaroon_wrong_user(self):
@@ -124,11 +123,12 @@ class TestArchiveAPI(TestCaseWithFactory):
         )
         macaroon = issuer.issueMacaroon(build)
         self.assertNotFound(
+            "checkArchiveAuthToken",
+            "No valid tokens for 'another-user' in '%s'." % archive.reference,
+            "another-user@%s: No valid tokens" % archive.reference,
             archive.reference,
             "another-user",
             macaroon.serialize(),
-            "No valid tokens for 'another-user' in '%s'." % archive.reference,
-            "another-user@%s: No valid tokens" % archive.reference,
         )
 
     def test_checkArchiveAuthToken_buildd_macaroon_correct(self):
@@ -150,10 +150,11 @@ class TestArchiveAPI(TestCaseWithFactory):
         archive = removeSecurityProxy(self.factory.makeArchive(private=True))
         token = archive.newNamedAuthToken("special")
         self.assertUnauthorized(
+            "checkArchiveAuthToken",
+            "+special@%s: Password does not match" % archive.reference,
             archive.reference,
             "+special",
             token.token + "-bad",
-            "+special@%s: Password does not match" % archive.reference,
         )
 
     def test_checkArchiveAuthToken_named_token_deactivated(self):
@@ -161,11 +162,12 @@ class TestArchiveAPI(TestCaseWithFactory):
         token = archive.newNamedAuthToken("special")
         removeSecurityProxy(token).deactivate()
         self.assertNotFound(
+            "checkArchiveAuthToken",
+            "No valid tokens for '+special' in '%s'." % archive.reference,
+            "+special@%s: No valid tokens" % archive.reference,
             archive.reference,
             "+special",
             token.token,
-            "No valid tokens for '+special' in '%s'." % archive.reference,
-            "+special@%s: No valid tokens" % archive.reference,
         )
 
     def test_checkArchiveAuthToken_named_token_correct_password(self):
@@ -184,11 +186,12 @@ class TestArchiveAPI(TestCaseWithFactory):
         archive.newSubscription(subscriber, archive.owner)
         token = archive.newAuthToken(subscriber)
         self.assertUnauthorized(
+            "checkArchiveAuthToken",
+            "%s@%s: Password does not match"
+            % (subscriber.name, archive.reference),
             archive.reference,
             subscriber.name,
             token.token + "-bad",
-            "%s@%s: Password does not match"
-            % (subscriber.name, archive.reference),
         )
 
     def test_checkArchiveAuthToken_personal_token_deactivated(self):
@@ -198,12 +201,13 @@ class TestArchiveAPI(TestCaseWithFactory):
         token = archive.newAuthToken(subscriber)
         removeSecurityProxy(token).deactivate()
         self.assertNotFound(
-            archive.reference,
-            subscriber.name,
-            token.token,
+            "checkArchiveAuthToken",
             "No valid tokens for '%s' in '%s'."
             % (subscriber.name, archive.reference),
             "%s@%s: No valid tokens" % (subscriber.name, archive.reference),
+            archive.reference,
+            subscriber.name,
+            token.token,
         )
 
     def test_checkArchiveAuthToken_personal_token_cancelled(self):
@@ -213,12 +217,13 @@ class TestArchiveAPI(TestCaseWithFactory):
         token = archive.newAuthToken(subscriber)
         removeSecurityProxy(subscription).cancel(archive.owner)
         self.assertNotFound(
-            archive.reference,
-            subscriber.name,
-            token.token,
+            "checkArchiveAuthToken",
             "No valid tokens for '%s' in '%s'."
             % (subscriber.name, archive.reference),
             "%s@%s: No valid tokens" % (subscriber.name, archive.reference),
+            archive.reference,
+            subscriber.name,
+            token.token,
         )
 
     def test_checkArchiveAuthToken_personal_token_correct_password(self):
@@ -234,3 +239,230 @@ class TestArchiveAPI(TestCaseWithFactory):
         self.assertLogs(
             "%s@%s: Authorized" % (subscriber.name, archive.reference)
         )
+
+    def test_translatePath_unknown_archive(self):
+        self.assertNotFound(
+            "translatePath",
+            "No archive found for '~nonexistent/unknown/bad'.",
+            "~nonexistent/unknown/bad: No archive found",
+            "~nonexistent/unknown/bad",
+            "dists/jammy/InRelease",
+        )
+
+    def test_translatePath_non_debian_archive(self):
+        archive = removeSecurityProxy(
+            self.factory.makeArchive(
+                repository_format=ArchiveRepositoryFormat.PYTHON
+            )
+        )
+        self.assertNotFound(
+            "translatePath",
+            "Can't translate paths in '%s' with format Python."
+            % archive.reference,
+            "%s: Repository format is Python" % archive.reference,
+            archive.reference,
+            "dists/jammy/InRelease",
+        )
+
+    def test_translatePath_by_hash_unsupported_checksum(self):
+        archive = removeSecurityProxy(self.factory.makeArchive())
+        archive_file = self.factory.makeArchiveFile(
+            archive=archive,
+            container="release:jammy",
+            path="dists/jammy/InRelease",
+        )
+        path = (
+            "dists/jammy/by-hash/SHA1/%s"
+            % archive_file.library_file.content.sha1
+        )
+        self.assertNotFound(
+            "translatePath",
+            "'%s' not found in '%s'." % (path, archive.reference),
+            "%s: %s not found" % (archive.reference, path),
+            archive.reference,
+            path,
+        )
+
+    def test_translatePath_by_hash_checksum_not_found(self):
+        archive = removeSecurityProxy(self.factory.makeArchive())
+        self.factory.makeArchiveFile(
+            archive=archive,
+            container="release:jammy",
+            path="dists/jammy/InRelease",
+        )
+        path = "dists/jammy/by-hash/SHA256/nonexistent"
+        self.assertNotFound(
+            "translatePath",
+            "'%s' not found in '%s'." % (path, archive.reference),
+            "%s: %s not found" % (archive.reference, path),
+            archive.reference,
+            path,
+        )
+
+    def test_translatePath_by_hash_checksum_found(self):
+        archive = removeSecurityProxy(self.factory.makeArchive())
+        self.factory.makeArchiveFile(
+            archive=archive,
+            container="release:jammy",
+            path="dists/jammy/InRelease",
+        )
+        archive_file = self.factory.makeArchiveFile(
+            archive=archive,
+            container="release:jammy",
+            path="dists/jammy/InRelease",
+        )
+        path = (
+            "dists/jammy/by-hash/SHA256/%s"
+            % archive_file.library_file.content.sha256
+        )
+        self.assertEqual(
+            archive_file.library_file.getURL(),
+            self.archive_api.translatePath(archive.reference, path),
+        )
+        self.assertLogs(
+            "%s: %s (by-hash) -> LFA %d"
+            % (archive.reference, path, archive_file.library_file.id)
+        )
+
+    def test_translatePath_non_pool_not_found(self):
+        archive = removeSecurityProxy(self.factory.makeArchive())
+        self.factory.makeArchiveFile(archive=archive)
+        self.assertNotFound(
+            "translatePath",
+            "'nonexistent/path' not found in '%s'." % archive.reference,
+            "%s: nonexistent/path not found" % archive.reference,
+            archive.reference,
+            "nonexistent/path",
+        )
+
+    def test_translatePath_non_pool_found(self):
+        archive = removeSecurityProxy(self.factory.makeArchive())
+        self.factory.makeArchiveFile(archive=archive)
+        archive_file = self.factory.makeArchiveFile(archive=archive)
+        self.assertEqual(
+            archive_file.library_file.getURL(),
+            self.archive_api.translatePath(
+                archive.reference, archive_file.path
+            ),
+        )
+        self.assertLogs(
+            "%s: %s (non-pool) -> LFA %d"
+            % (
+                archive.reference,
+                archive_file.path,
+                archive_file.library_file.id,
+            )
+        )
+
+    def test_translatePath_pool_bad_file_name(self):
+        archive = removeSecurityProxy(self.factory.makeArchive())
+        path = "pool/nonexistent"
+        self.assertNotFound(
+            "translatePath",
+            "'%s' not found in '%s'." % (path, archive.reference),
+            "%s: %s not found" % (archive.reference, path),
+            archive.reference,
+            path,
+        )
+
+    def test_translatePath_pool_source_not_found(self):
+        archive = removeSecurityProxy(self.factory.makeArchive())
+        self.factory.makeSourcePackagePublishingHistory(
+            archive=archive,
+            status=PackagePublishingStatus.PUBLISHED,
+            sourcepackagename="test-package",
+            component="main",
+        )
+        path = "pool/main/t/test-package/test-package_1.dsc"
+        self.assertNotFound(
+            "translatePath",
+            "'%s' not found in '%s'." % (path, archive.reference),
+            "%s: %s not found" % (archive.reference, path),
+            archive.reference,
+            path,
+        )
+
+    def test_translatePath_pool_source_found(self):
+        archive = removeSecurityProxy(self.factory.makeArchive())
+        spph = self.factory.makeSourcePackagePublishingHistory(
+            archive=archive,
+            status=PackagePublishingStatus.PUBLISHED,
+            sourcepackagename="test-package",
+            component="main",
+        )
+        sprf = self.factory.makeSourcePackageReleaseFile(
+            sourcepackagerelease=spph.sourcepackagerelease,
+            library_file=self.factory.makeLibraryFileAlias(
+                filename="test-package_1.dsc", db_only=True
+            ),
+        )
+        self.factory.makeSourcePackageReleaseFile(
+            sourcepackagerelease=spph.sourcepackagerelease,
+            library_file=self.factory.makeLibraryFileAlias(
+                filename="test-package_1.tar.xz", db_only=True
+            ),
+        )
+        IStore(sprf).flush()
+        path = "pool/main/t/test-package/test-package_1.dsc"
+        self.assertEqual(
+            sprf.libraryfile.getURL(),
+            self.archive_api.translatePath(archive.reference, path),
+        )
+        self.assertLogs(
+            "%s: %s (pool) -> LFA %d"
+            % (archive.reference, path, sprf.libraryfile.id)
+        )
+
+    def test_translatePath_pool_binary_not_found(self):
+        archive = removeSecurityProxy(self.factory.makeArchive())
+        self.factory.makeBinaryPackagePublishingHistory(
+            archive=archive,
+            status=PackagePublishingStatus.PUBLISHED,
+            sourcepackagename="test-package",
+            component="main",
+        )
+        path = "pool/main/t/test-package/test-package_1_amd64.deb"
+        self.assertNotFound(
+            "translatePath",
+            "'%s' not found in '%s'." % (path, archive.reference),
+            "%s: %s not found" % (archive.reference, path),
+            archive.reference,
+            path,
+        )
+
+    def test_translatePath_pool_binary_found(self):
+        archive = removeSecurityProxy(self.factory.makeArchive())
+        bpph = self.factory.makeBinaryPackagePublishingHistory(
+            archive=archive,
+            status=PackagePublishingStatus.PUBLISHED,
+            sourcepackagename="test-package",
+            component="main",
+        )
+        bpf = self.factory.makeBinaryPackageFile(
+            binarypackagerelease=bpph.binarypackagerelease,
+            library_file=self.factory.makeLibraryFileAlias(
+                filename="test-package_1_amd64.deb", db_only=True
+            ),
+        )
+        bpph2 = self.factory.makeBinaryPackagePublishingHistory(
+            archive=archive,
+            status=PackagePublishingStatus.PUBLISHED,
+            sourcepackagename="test-package",
+            component="main",
+        )
+        self.factory.makeBinaryPackageFile(
+            binarypackagerelease=bpph2.binarypackagerelease,
+            library_file=self.factory.makeLibraryFileAlias(
+                filename="test-package_1_i386.deb", db_only=True
+            ),
+        )
+        IStore(bpf).flush()
+        path = "pool/main/t/test-package/test-package_1_amd64.deb"
+        self.assertEqual(
+            bpf.libraryfile.getURL(),
+            self.archive_api.translatePath(archive.reference, path),
+        )
+        self.assertLogs(
+            "%s: %s (pool) -> LFA %d"
+            % (archive.reference, path, bpf.libraryfile.id)
+        )