launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #29236
[Merge] ~cjwatson/launchpad:archive-translate-path into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:archive-translate-path into launchpad:master with ~cjwatson/launchpad:archive-get-pool-file-by-path as a prerequisite.
Commit message:
Add ArchiveAPI.translatePath
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/430333
This is available over internal XML-RPC, and will ultimately allow building a service to serve files from archives without needing access to a local file system where the archive's files have been published. See https://docs.google.com/document/d/11etjE-PXDsYwFxVac4V9toNhRtCclDzZx1TfzcX8_nw.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:archive-translate-path into launchpad:master.
diff --git a/lib/lp/soyuz/interfaces/archiveapi.py b/lib/lp/soyuz/interfaces/archiveapi.py
index 6d18f04..b8acecf 100644
--- a/lib/lp/soyuz/interfaces/archiveapi.py
+++ b/lib/lp/soyuz/interfaces/archiveapi.py
@@ -41,3 +41,17 @@ class IArchiveAPI(Interface):
is not equal to the selected token for this archive; otherwise
None.
"""
+
+ def translatePath(archive_reference, path):
+ """Find the librarian URL for a relative path within an archive.
+
+ :param archive_reference: The reference form of the archive to check.
+ :param path: The relative path within the archive. This should not
+ begin with a "/" character.
+
+ :return: A `NotFound` fault if `archive_reference` does not identify
+ an archive, or the archive's repository format is something
+ other than `ArchiveRepositoryFormat.DEBIAN`, or the path does
+ not identify a file that exists in this archive; otherwise a
+ librarian URL.
+ """
diff --git a/lib/lp/soyuz/interfaces/archivefile.py b/lib/lp/soyuz/interfaces/archivefile.py
index d104d9b..b72c54e 100644
--- a/lib/lp/soyuz/interfaces/archivefile.py
+++ b/lib/lp/soyuz/interfaces/archivefile.py
@@ -88,6 +88,7 @@ class IArchiveFileSet(Interface):
archive,
container=None,
path=None,
+ sha256=None,
only_condemned=False,
eager_load=False,
):
@@ -96,6 +97,10 @@ class IArchiveFileSet(Interface):
:param archive: Return files in this `IArchive`.
:param container: Return only files with this container.
:param path: Return only files with this path.
+ :param path_parent: Return only files whose immediate parent
+ directory is this path.
+ :param sha256: If not None, return only files with this SHA-256
+ checksum.
:param only_condemned: If True, return only files with a
scheduled_deletion_date set.
:param eager_load: If True, preload related `LibraryFileAlias` and
diff --git a/lib/lp/soyuz/model/archivefile.py b/lib/lp/soyuz/model/archivefile.py
index 9e240b5..933e6d7 100644
--- a/lib/lp/soyuz/model/archivefile.py
+++ b/lib/lp/soyuz/model/archivefile.py
@@ -9,6 +9,7 @@ __all__ = [
]
import os.path
+import re
import pytz
from storm.databases.postgres import Returning
@@ -21,7 +22,7 @@ from lp.services.database.constants import UTC_NOW
from lp.services.database.decoratedresultset import DecoratedResultSet
from lp.services.database.interfaces import IMasterStore, IStore
from lp.services.database.sqlbase import convert_storm_clause_to_string
-from lp.services.database.stormexpr import BulkUpdate
+from lp.services.database.stormexpr import BulkUpdate, RegexpMatch
from lp.services.librarian.interfaces import ILibraryFileAliasSet
from lp.services.librarian.model import LibraryFileAlias, LibraryFileContent
from lp.soyuz.interfaces.archivefile import IArchiveFile, IArchiveFileSet
@@ -98,6 +99,8 @@ class ArchiveFileSet:
archive,
container=None,
path=None,
+ path_parent=None,
+ sha256=None,
only_condemned=False,
eager_load=False,
):
@@ -109,6 +112,20 @@ class ArchiveFileSet:
clauses.append(ArchiveFile.container == container)
if path is not None:
clauses.append(ArchiveFile.path == path)
+ if path_parent is not None:
+ clauses.append(
+ RegexpMatch(
+ ArchiveFile.path, "^%s/[^/]+$" % re.escape(path_parent)
+ )
+ )
+ if sha256 is not None:
+ clauses.extend(
+ [
+ ArchiveFile.library_file == LibraryFileAlias.id,
+ LibraryFileAlias.contentID == LibraryFileContent.id,
+ LibraryFileContent.sha256 == sha256,
+ ]
+ )
if only_condemned:
clauses.append(ArchiveFile.scheduled_deletion_date != None)
archive_files = IStore(ArchiveFile).find(ArchiveFile, *clauses)
diff --git a/lib/lp/soyuz/tests/test_archivefile.py b/lib/lp/soyuz/tests/test_archivefile.py
index e4457e2..ca295ad 100644
--- a/lib/lp/soyuz/tests/test_archivefile.py
+++ b/lib/lp/soyuz/tests/test_archivefile.py
@@ -117,6 +117,42 @@ class TestArchiveFile(TestCaseWithFactory):
[archive_files[2]],
archive_file_set.getByArchive(archives[1], only_condemned=True),
)
+ self.assertContentEqual(
+ [archive_files[0]],
+ archive_file_set.getByArchive(
+ archives[0],
+ sha256=archive_files[0].library_file.content.sha256,
+ ),
+ )
+ self.assertContentEqual(
+ [], archive_file_set.getByArchive(archives[0], sha256="nonsense")
+ )
+
+ def test_getByArchive_path_parent(self):
+ archive = self.factory.makeArchive()
+ archive_files = [
+ self.factory.makeArchiveFile(archive=archive, path=path)
+ for path in (
+ "dists/jammy/InRelease",
+ "dists/jammy/Release",
+ "dists/jammy/main/binary-amd64/Release",
+ )
+ ]
+ archive_file_set = getUtility(IArchiveFileSet)
+ self.assertContentEqual(
+ archive_files[:2],
+ archive_file_set.getByArchive(archive, path_parent="dists/jammy"),
+ )
+ self.assertContentEqual(
+ [archive_files[2]],
+ archive_file_set.getByArchive(
+ archive, path_parent="dists/jammy/main/binary-amd64"
+ ),
+ )
+ self.assertContentEqual(
+ [],
+ archive_file_set.getByArchive(archive, path_parent="dists/xenial"),
+ )
def test_scheduleDeletion(self):
archive_files = [self.factory.makeArchiveFile() for _ in range(3)]
diff --git a/lib/lp/soyuz/xmlrpc/archive.py b/lib/lp/soyuz/xmlrpc/archive.py
index 18b2618..a23b7fa 100644
--- a/lib/lp/soyuz/xmlrpc/archive.py
+++ b/lib/lp/soyuz/xmlrpc/archive.py
@@ -8,6 +8,9 @@ __all__ = [
]
import logging
+from pathlib import PurePath
+from typing import Optional, Union
+from xmlrpc.client import Fault
from pymacaroons import Macaroon
from zope.component import getUtility
@@ -17,9 +20,11 @@ from zope.security.proxy import removeSecurityProxy
from lp.services.macaroons.interfaces import NO_USER, IMacaroonIssuer
from lp.services.webapp import LaunchpadXMLRPCView
+from lp.soyuz.enums import ArchiveRepositoryFormat
from lp.soyuz.interfaces.archive import IArchiveSet
from lp.soyuz.interfaces.archiveapi import IArchiveAPI
from lp.soyuz.interfaces.archiveauthtoken import IArchiveAuthTokenSet
+from lp.soyuz.interfaces.archivefile import IArchiveFileSet
from lp.xmlrpc import faults
from lp.xmlrpc.helpers import return_fault
@@ -108,3 +113,124 @@ class ArchiveAPI(LaunchpadXMLRPCView):
return self._checkArchiveAuthToken(
archive_reference, username, password
)
+
+ def _translatePathByHash(
+ self, archive_reference: str, archive, path: PurePath
+ ) -> Optional[str]:
+ suite = path.parts[1]
+ checksum_type = path.parts[-2]
+ checksum = path.parts[-1]
+ # We only publish by-hash files for a single checksum type at
+ # present. See `lp.archivepublisher.publishing`.
+ if checksum_type != "SHA256":
+ return None
+
+ archive_file = (
+ getUtility(IArchiveFileSet)
+ .getByArchive(
+ archive=archive,
+ container="release:%s" % suite,
+ path_parent="/".join(path.parts[:-3]),
+ sha256=checksum,
+ )
+ .any()
+ )
+ if archive_file is None:
+ return None
+
+ log.info(
+ "%s: %s (by-hash) -> LFA %d",
+ archive_reference,
+ path.as_posix(),
+ archive_file.library_file.id,
+ )
+ return archive_file.library_file.getURL()
+
+ def _translatePathNonPool(
+ self, archive_reference: str, archive, path: PurePath
+ ) -> Optional[str]:
+ archive_file = (
+ getUtility(IArchiveFileSet)
+ .getByArchive(archive=archive, path=path.as_posix())
+ .one()
+ )
+ if archive_file is None:
+ return None
+
+ log.info(
+ "%s: %s (non-pool) -> LFA %d",
+ archive_reference,
+ path.as_posix(),
+ archive_file.library_file.id,
+ )
+ return archive_file.library_file.getURL()
+
+ def _translatePathPool(
+ self, archive_reference: str, archive, path: PurePath
+ ) -> Optional[str]:
+ lfa = archive.getPoolFileByPath(path)
+ if lfa is None:
+ return None
+
+ log.info(
+ "%s: %s (pool) -> LFA %d",
+ archive_reference,
+ path.as_posix(),
+ lfa.id,
+ )
+ return lfa.getURL()
+
+ @return_fault
+ def _translatePath(self, archive_reference: str, path: PurePath) -> str:
+ archive = getUtility(IArchiveSet).getByReference(archive_reference)
+ if archive is None:
+ log.info("%s: No archive found", archive_reference)
+ raise faults.NotFound(
+ message="No archive found for '%s'." % archive_reference
+ )
+ archive = removeSecurityProxy(archive)
+ if archive.repository_format != ArchiveRepositoryFormat.DEBIAN:
+ log.info(
+ "%s: Repository format is %s",
+ archive_reference,
+ archive.repository_format,
+ )
+ raise faults.NotFound(
+ message="Can't translate paths in '%s' with format %s."
+ % (archive_reference, archive.repository_format)
+ )
+
+ # Consider by-hash index files.
+ if path.parts[0] == "dists" and path.parts[2:][-3:-2] == ("by-hash",):
+ url = self._translatePathByHash(archive_reference, archive, path)
+ if url is not None:
+ return url
+
+ # Consider other non-pool files.
+ if path.parts[0] != "pool":
+ url = self._translatePathNonPool(archive_reference, archive, path)
+ if url is not None:
+ return url
+ log.info("%s: %s not found", archive_reference, path.as_posix())
+ raise faults.NotFound(
+ message="'%s' not found in '%s'."
+ % (path.as_posix(), archive_reference)
+ )
+
+ # Consider pool files.
+ url = self._translatePathPool(archive_reference, archive, path)
+ if url is not None:
+ return url
+ log.info("%s: %s not found", archive_reference, path.as_posix())
+ raise faults.NotFound(
+ message="'%s' not found in '%s'."
+ % (path.as_posix(), archive_reference)
+ )
+
+ def translatePath(
+ self, archive_reference: str, path: str
+ ) -> Union[str, Fault]:
+ """See `IArchiveAPI`."""
+ # This thunk exists because you can't use a decorated function as
+ # the implementation of a method exported over XML-RPC.
+ return self._translatePath(archive_reference, PurePath(path))
diff --git a/lib/lp/soyuz/xmlrpc/tests/test_archive.py b/lib/lp/soyuz/xmlrpc/tests/test_archive.py
index dce28aa..cdb7d7d 100644
--- a/lib/lp/soyuz/xmlrpc/tests/test_archive.py
+++ b/lib/lp/soyuz/xmlrpc/tests/test_archive.py
@@ -8,8 +8,10 @@ from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
from lp.buildmaster.enums import BuildStatus
+from lp.services.database.interfaces import IStore
from lp.services.features.testing import FeatureFixture
from lp.services.macaroons.interfaces import IMacaroonIssuer
+from lp.soyuz.enums import ArchiveRepositoryFormat, PackagePublishingStatus
from lp.soyuz.interfaces.archive import NAMED_AUTH_TOKEN_FEATURE_FLAG
from lp.soyuz.xmlrpc.archive import ArchiveAPI
from lp.testing import TestCaseWithFactory
@@ -33,53 +35,48 @@ class TestArchiveAPI(TestCaseWithFactory):
def assertLogs(self, message):
self.assertEqual([message], self.logger.output.splitlines())
- def assertNotFound(
- self, archive_reference, username, password, message, log_message
- ):
- """Assert that an archive auth token check returns NotFound."""
- fault = self.archive_api.checkArchiveAuthToken(
- archive_reference, username, password
- )
+ def assertNotFound(self, func_name, message, log_message, *args, **kwargs):
+ """Assert that a call returns NotFound."""
+ fault = getattr(self.archive_api, func_name)(*args, **kwargs)
self.assertEqual(faults.NotFound(message), fault)
self.assertLogs(log_message)
- def assertUnauthorized(
- self, archive_reference, username, password, log_message
- ):
- """Assert that an archive auth token check returns Unauthorized."""
- fault = self.archive_api.checkArchiveAuthToken(
- archive_reference, username, password
- )
+ def assertUnauthorized(self, func_name, log_message, *args, **kwargs):
+ """Assert that a call returns Unauthorized."""
+ fault = getattr(self.archive_api, func_name)(*args, **kwargs)
self.assertEqual(faults.Unauthorized("Authorisation required."), fault)
self.assertLogs(log_message)
def test_checkArchiveAuthToken_unknown_archive(self):
self.assertNotFound(
+ "checkArchiveAuthToken",
+ "No archive found for '~nonexistent/unknown/bad'.",
+ "user@~nonexistent/unknown/bad: No archive found",
"~nonexistent/unknown/bad",
"user",
"",
- "No archive found for '~nonexistent/unknown/bad'.",
- "user@~nonexistent/unknown/bad: No archive found",
)
def test_checkArchiveAuthToken_no_tokens(self):
archive = removeSecurityProxy(self.factory.makeArchive(private=True))
self.assertNotFound(
+ "checkArchiveAuthToken",
+ "No valid tokens for 'nobody' in '%s'." % archive.reference,
+ "nobody@%s: No valid tokens" % archive.reference,
archive.reference,
"nobody",
"",
- "No valid tokens for 'nobody' in '%s'." % archive.reference,
- "nobody@%s: No valid tokens" % archive.reference,
)
def test_checkArchiveAuthToken_no_named_tokens(self):
archive = removeSecurityProxy(self.factory.makeArchive(private=True))
self.assertNotFound(
+ "checkArchiveAuthToken",
+ "No valid tokens for '+missing' in '%s'." % archive.reference,
+ "+missing@%s: No valid tokens" % archive.reference,
archive.reference,
"+missing",
"",
- "No valid tokens for '+missing' in '%s'." % archive.reference,
- "+missing@%s: No valid tokens" % archive.reference,
)
def test_checkArchiveAuthToken_buildd_macaroon_wrong_archive(self):
@@ -94,11 +91,12 @@ class TestArchiveAPI(TestCaseWithFactory):
)
macaroon = issuer.issueMacaroon(build)
self.assertUnauthorized(
+ "checkArchiveAuthToken",
+ "buildd@%s: Macaroon verification failed"
+ % other_archive.reference,
other_archive.reference,
"buildd",
macaroon.serialize(),
- "buildd@%s: Macaroon verification failed"
- % other_archive.reference,
)
def test_checkArchiveAuthToken_buildd_macaroon_not_building(self):
@@ -109,10 +107,11 @@ class TestArchiveAPI(TestCaseWithFactory):
)
macaroon = issuer.issueMacaroon(build)
self.assertUnauthorized(
+ "checkArchiveAuthToken",
+ "buildd@%s: Macaroon verification failed" % archive.reference,
archive.reference,
"buildd",
macaroon.serialize(),
- "buildd@%s: Macaroon verification failed" % archive.reference,
)
def test_checkArchiveAuthToken_buildd_macaroon_wrong_user(self):
@@ -124,11 +123,12 @@ class TestArchiveAPI(TestCaseWithFactory):
)
macaroon = issuer.issueMacaroon(build)
self.assertNotFound(
+ "checkArchiveAuthToken",
+ "No valid tokens for 'another-user' in '%s'." % archive.reference,
+ "another-user@%s: No valid tokens" % archive.reference,
archive.reference,
"another-user",
macaroon.serialize(),
- "No valid tokens for 'another-user' in '%s'." % archive.reference,
- "another-user@%s: No valid tokens" % archive.reference,
)
def test_checkArchiveAuthToken_buildd_macaroon_correct(self):
@@ -150,10 +150,11 @@ class TestArchiveAPI(TestCaseWithFactory):
archive = removeSecurityProxy(self.factory.makeArchive(private=True))
token = archive.newNamedAuthToken("special")
self.assertUnauthorized(
+ "checkArchiveAuthToken",
+ "+special@%s: Password does not match" % archive.reference,
archive.reference,
"+special",
token.token + "-bad",
- "+special@%s: Password does not match" % archive.reference,
)
def test_checkArchiveAuthToken_named_token_deactivated(self):
@@ -161,11 +162,12 @@ class TestArchiveAPI(TestCaseWithFactory):
token = archive.newNamedAuthToken("special")
removeSecurityProxy(token).deactivate()
self.assertNotFound(
+ "checkArchiveAuthToken",
+ "No valid tokens for '+special' in '%s'." % archive.reference,
+ "+special@%s: No valid tokens" % archive.reference,
archive.reference,
"+special",
token.token,
- "No valid tokens for '+special' in '%s'." % archive.reference,
- "+special@%s: No valid tokens" % archive.reference,
)
def test_checkArchiveAuthToken_named_token_correct_password(self):
@@ -184,11 +186,12 @@ class TestArchiveAPI(TestCaseWithFactory):
archive.newSubscription(subscriber, archive.owner)
token = archive.newAuthToken(subscriber)
self.assertUnauthorized(
+ "checkArchiveAuthToken",
+ "%s@%s: Password does not match"
+ % (subscriber.name, archive.reference),
archive.reference,
subscriber.name,
token.token + "-bad",
- "%s@%s: Password does not match"
- % (subscriber.name, archive.reference),
)
def test_checkArchiveAuthToken_personal_token_deactivated(self):
@@ -198,12 +201,13 @@ class TestArchiveAPI(TestCaseWithFactory):
token = archive.newAuthToken(subscriber)
removeSecurityProxy(token).deactivate()
self.assertNotFound(
- archive.reference,
- subscriber.name,
- token.token,
+ "checkArchiveAuthToken",
"No valid tokens for '%s' in '%s'."
% (subscriber.name, archive.reference),
"%s@%s: No valid tokens" % (subscriber.name, archive.reference),
+ archive.reference,
+ subscriber.name,
+ token.token,
)
def test_checkArchiveAuthToken_personal_token_cancelled(self):
@@ -213,12 +217,13 @@ class TestArchiveAPI(TestCaseWithFactory):
token = archive.newAuthToken(subscriber)
removeSecurityProxy(subscription).cancel(archive.owner)
self.assertNotFound(
- archive.reference,
- subscriber.name,
- token.token,
+ "checkArchiveAuthToken",
"No valid tokens for '%s' in '%s'."
% (subscriber.name, archive.reference),
"%s@%s: No valid tokens" % (subscriber.name, archive.reference),
+ archive.reference,
+ subscriber.name,
+ token.token,
)
def test_checkArchiveAuthToken_personal_token_correct_password(self):
@@ -234,3 +239,230 @@ class TestArchiveAPI(TestCaseWithFactory):
self.assertLogs(
"%s@%s: Authorized" % (subscriber.name, archive.reference)
)
+
+ def test_translatePath_unknown_archive(self):
+ self.assertNotFound(
+ "translatePath",
+ "No archive found for '~nonexistent/unknown/bad'.",
+ "~nonexistent/unknown/bad: No archive found",
+ "~nonexistent/unknown/bad",
+ "dists/jammy/InRelease",
+ )
+
+ def test_translatePath_non_debian_archive(self):
+ archive = removeSecurityProxy(
+ self.factory.makeArchive(
+ repository_format=ArchiveRepositoryFormat.PYTHON
+ )
+ )
+ self.assertNotFound(
+ "translatePath",
+ "Can't translate paths in '%s' with format Python."
+ % archive.reference,
+ "%s: Repository format is Python" % archive.reference,
+ archive.reference,
+ "dists/jammy/InRelease",
+ )
+
+ def test_translatePath_by_hash_unsupported_checksum(self):
+ archive = removeSecurityProxy(self.factory.makeArchive())
+ archive_file = self.factory.makeArchiveFile(
+ archive=archive,
+ container="release:jammy",
+ path="dists/jammy/InRelease",
+ )
+ path = (
+ "dists/jammy/by-hash/SHA1/%s"
+ % archive_file.library_file.content.sha1
+ )
+ self.assertNotFound(
+ "translatePath",
+ "'%s' not found in '%s'." % (path, archive.reference),
+ "%s: %s not found" % (archive.reference, path),
+ archive.reference,
+ path,
+ )
+
+ def test_translatePath_by_hash_checksum_not_found(self):
+ archive = removeSecurityProxy(self.factory.makeArchive())
+ self.factory.makeArchiveFile(
+ archive=archive,
+ container="release:jammy",
+ path="dists/jammy/InRelease",
+ )
+ path = "dists/jammy/by-hash/SHA256/nonexistent"
+ self.assertNotFound(
+ "translatePath",
+ "'%s' not found in '%s'." % (path, archive.reference),
+ "%s: %s not found" % (archive.reference, path),
+ archive.reference,
+ path,
+ )
+
+ def test_translatePath_by_hash_checksum_found(self):
+ archive = removeSecurityProxy(self.factory.makeArchive())
+ self.factory.makeArchiveFile(
+ archive=archive,
+ container="release:jammy",
+ path="dists/jammy/InRelease",
+ )
+ archive_file = self.factory.makeArchiveFile(
+ archive=archive,
+ container="release:jammy",
+ path="dists/jammy/InRelease",
+ )
+ path = (
+ "dists/jammy/by-hash/SHA256/%s"
+ % archive_file.library_file.content.sha256
+ )
+ self.assertEqual(
+ archive_file.library_file.getURL(),
+ self.archive_api.translatePath(archive.reference, path),
+ )
+ self.assertLogs(
+ "%s: %s (by-hash) -> LFA %d"
+ % (archive.reference, path, archive_file.library_file.id)
+ )
+
+ def test_translatePath_non_pool_not_found(self):
+ archive = removeSecurityProxy(self.factory.makeArchive())
+ self.factory.makeArchiveFile(archive=archive)
+ self.assertNotFound(
+ "translatePath",
+ "'nonexistent/path' not found in '%s'." % archive.reference,
+ "%s: nonexistent/path not found" % archive.reference,
+ archive.reference,
+ "nonexistent/path",
+ )
+
+ def test_translatePath_non_pool_found(self):
+ archive = removeSecurityProxy(self.factory.makeArchive())
+ self.factory.makeArchiveFile(archive=archive)
+ archive_file = self.factory.makeArchiveFile(archive=archive)
+ self.assertEqual(
+ archive_file.library_file.getURL(),
+ self.archive_api.translatePath(
+ archive.reference, archive_file.path
+ ),
+ )
+ self.assertLogs(
+ "%s: %s (non-pool) -> LFA %d"
+ % (
+ archive.reference,
+ archive_file.path,
+ archive_file.library_file.id,
+ )
+ )
+
+ def test_translatePath_pool_bad_file_name(self):
+ archive = removeSecurityProxy(self.factory.makeArchive())
+ path = "pool/nonexistent"
+ self.assertNotFound(
+ "translatePath",
+ "'%s' not found in '%s'." % (path, archive.reference),
+ "%s: %s not found" % (archive.reference, path),
+ archive.reference,
+ path,
+ )
+
+ def test_translatePath_pool_source_not_found(self):
+ archive = removeSecurityProxy(self.factory.makeArchive())
+ self.factory.makeSourcePackagePublishingHistory(
+ archive=archive,
+ status=PackagePublishingStatus.PUBLISHED,
+ sourcepackagename="test-package",
+ component="main",
+ )
+ path = "pool/main/t/test-package/test-package_1.dsc"
+ self.assertNotFound(
+ "translatePath",
+ "'%s' not found in '%s'." % (path, archive.reference),
+ "%s: %s not found" % (archive.reference, path),
+ archive.reference,
+ path,
+ )
+
+ def test_translatePath_pool_source_found(self):
+ archive = removeSecurityProxy(self.factory.makeArchive())
+ spph = self.factory.makeSourcePackagePublishingHistory(
+ archive=archive,
+ status=PackagePublishingStatus.PUBLISHED,
+ sourcepackagename="test-package",
+ component="main",
+ )
+ sprf = self.factory.makeSourcePackageReleaseFile(
+ sourcepackagerelease=spph.sourcepackagerelease,
+ library_file=self.factory.makeLibraryFileAlias(
+ filename="test-package_1.dsc", db_only=True
+ ),
+ )
+ self.factory.makeSourcePackageReleaseFile(
+ sourcepackagerelease=spph.sourcepackagerelease,
+ library_file=self.factory.makeLibraryFileAlias(
+ filename="test-package_1.tar.xz", db_only=True
+ ),
+ )
+ IStore(sprf).flush()
+ path = "pool/main/t/test-package/test-package_1.dsc"
+ self.assertEqual(
+ sprf.libraryfile.getURL(),
+ self.archive_api.translatePath(archive.reference, path),
+ )
+ self.assertLogs(
+ "%s: %s (pool) -> LFA %d"
+ % (archive.reference, path, sprf.libraryfile.id)
+ )
+
+ def test_translatePath_pool_binary_not_found(self):
+ archive = removeSecurityProxy(self.factory.makeArchive())
+ self.factory.makeBinaryPackagePublishingHistory(
+ archive=archive,
+ status=PackagePublishingStatus.PUBLISHED,
+ sourcepackagename="test-package",
+ component="main",
+ )
+ path = "pool/main/t/test-package/test-package_1_amd64.deb"
+ self.assertNotFound(
+ "translatePath",
+ "'%s' not found in '%s'." % (path, archive.reference),
+ "%s: %s not found" % (archive.reference, path),
+ archive.reference,
+ path,
+ )
+
+ def test_translatePath_pool_binary_found(self):
+ archive = removeSecurityProxy(self.factory.makeArchive())
+ bpph = self.factory.makeBinaryPackagePublishingHistory(
+ archive=archive,
+ status=PackagePublishingStatus.PUBLISHED,
+ sourcepackagename="test-package",
+ component="main",
+ )
+ bpf = self.factory.makeBinaryPackageFile(
+ binarypackagerelease=bpph.binarypackagerelease,
+ library_file=self.factory.makeLibraryFileAlias(
+ filename="test-package_1_amd64.deb", db_only=True
+ ),
+ )
+ bpph2 = self.factory.makeBinaryPackagePublishingHistory(
+ archive=archive,
+ status=PackagePublishingStatus.PUBLISHED,
+ sourcepackagename="test-package",
+ component="main",
+ )
+ self.factory.makeBinaryPackageFile(
+ binarypackagerelease=bpph2.binarypackagerelease,
+ library_file=self.factory.makeLibraryFileAlias(
+ filename="test-package_1_i386.deb", db_only=True
+ ),
+ )
+ IStore(bpf).flush()
+ path = "pool/main/t/test-package/test-package_1_amd64.deb"
+ self.assertEqual(
+ bpf.libraryfile.getURL(),
+ self.archive_api.translatePath(archive.reference, path),
+ )
+ self.assertLogs(
+ "%s: %s (pool) -> LFA %d"
+ % (archive.reference, path, bpf.libraryfile.id)
+ )