launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #25534
[Merge] ~pappacena/launchpad:partial-oci-manifest-list-upload into launchpad:master
Thiago F. Pappacena has proposed merging ~pappacena/launchpad:partial-oci-manifest-list-upload into launchpad:master.
Commit message:
Uploading intermediate OCI manifest list as archs are built
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #1900902 in Launchpad itself: "OCI: multiarch builds are not uploaded when at least one failed"
https://bugs.launchpad.net/launchpad/+bug/1900902
For more details, see:
https://code.launchpad.net/~pappacena/launchpad/+git/launchpad/+merge/392635
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/launchpad:partial-oci-manifest-list-upload into launchpad:master.
diff --git a/lib/lp/oci/model/ocirecipebuild.py b/lib/lp/oci/model/ocirecipebuild.py
index 4b0682c..eda6fa3 100644
--- a/lib/lp/oci/model/ocirecipebuild.py
+++ b/lib/lp/oci/model/ocirecipebuild.py
@@ -30,6 +30,7 @@ from storm.locals import (
from storm.store import EmptyResultSet
from zope.component import getUtility
from zope.interface import implementer
+from zope.security.proxy import isinstance as zope_isinstance
from lp.app.errors import NotFoundError
from lp.buildmaster.enums import (
@@ -178,6 +179,11 @@ class OCIRecipeBuild(PackageBuildMixin, Storm):
if build_request is not None:
self.build_request_id = build_request.id
+ def __eq__(self, other):
+ if not zope_isinstance(other, self.__class__):
+ return False
+ return self.id == other.id
+
@property
def build_request(self):
"""See `IOCIRecipeBuild`."""
diff --git a/lib/lp/oci/model/ocirecipebuildjob.py b/lib/lp/oci/model/ocirecipebuildjob.py
index 0db2514..1e4a7e8 100644
--- a/lib/lp/oci/model/ocirecipebuildjob.py
+++ b/lib/lp/oci/model/ocirecipebuildjob.py
@@ -278,13 +278,15 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
def build_uploaded(self, value):
self.json_data["build_uploaded"] = bool(value)
- def allBuildsUploaded(self, build_request):
- """Returns True if all builds of the given build_request already
- finished uploading. False otherwise.
+ def getUploadedBuilds(self, build_request):
+ """Returns the list of builds in the given build_request that
+ already finished uploading.
Note that this method locks all upload jobs at database level,
preventing them from updating their status until the end of the
- current transaction. Use it with caution.
+ current transaction. Use it with caution. Note also that self.build is
+ always included in the resulting list, as this method should only be
+ called *after* the untagged manifest is uploaded.
"""
builds = list(build_request.builds)
uploads_per_build = {i: list(i.registry_upload_jobs) for i in builds}
@@ -292,15 +294,15 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
# Lock the Job rows, so no other job updates its status until the
# end of this job's transaction. This is done to avoid race conditions,
- # where 2 upload jobs could be running simultaneously and none of them
- # realises that is the last upload.
+ # where 2 upload jobs could be running simultaneously and end up
+ # uploading an incomplete manifest list at the same time.
# Note also that new upload jobs might be created between the
# transaction begin and this lock takes place, but in this case the
# new upload is either a retry from a failed upload, or the first
- # upload for one of the existing builds. Either way, we would see that
- # build as "not uploaded yet", which is ok for this method, and the
- # new job will block until these locks are released, so we should be
- # safe.
+ # upload for one of the existing builds. Either way, we will succeed
+ # to execute our manifest list upload, and the new job will wait
+ # until this job finishes to upload their version of the manifest
+ # list (which will override our version, including both manifests).
store = IMasterStore(builds[0])
placeholders = ', '.join('?' for _ in upload_jobs)
sql = (
@@ -310,28 +312,27 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
job_id: JobStatus.items[status] for job_id, status in
store.execute(sql, [i.job_id for i in upload_jobs])}
+ builds = set()
for build, upload_jobs in uploads_per_build.items():
has_finished_upload = any(
job_status[i.job_id] == JobStatus.COMPLETED
or i.job_id == self.job_id
for i in upload_jobs)
- if not has_finished_upload:
- return False
- return True
+ if has_finished_upload:
+ builds.add(build)
+ return builds
def uploadManifestList(self, client):
- """Uploads the aggregated manifest list for all builds in the
+ """Uploads the aggregated manifest list for all uploaded builds in the
current build request.
"""
- # The "allBuildsUploaded" call will lock, on the database,
- # all upload jobs for update until this transaction finishes.
- # So, make sure this is the last thing being done by this job.
build_request = self.build.build_request
if not build_request:
return
try:
- if self.allBuildsUploaded(build_request):
- client.uploadManifestList(build_request)
+ uploaded_builds = self.getUploadedBuilds(build_request)
+ if uploaded_builds:
+ client.uploadManifestList(build_request, uploaded_builds)
except OCIRegistryError:
# Do not retry automatically on known OCI registry errors.
raise
@@ -351,8 +352,8 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
self.uploadManifestList(client)
# Force this job status to COMPLETED in the same transaction we
- # called `allBuildsUpdated` (in the uploadManifestList call
- # above) to release the lock already including the new status.
+ # called `getUploadedBuilds` (in the uploadManifestList call
+ # above) to release the lock already including our new status.
# This way, any other transaction that was blocked waiting to
# get the info about the upload jobs will immediately have the
# new status of this job, avoiding race conditions. Keep the
diff --git a/lib/lp/oci/model/ociregistryclient.py b/lib/lp/oci/model/ociregistryclient.py
index 7c67b57..a264ab7 100644
--- a/lib/lp/oci/model/ociregistryclient.py
+++ b/lib/lp/oci/model/ociregistryclient.py
@@ -350,12 +350,12 @@ class OCIRegistryClient:
raise MultipleOCIRegistryError(exceptions)
@classmethod
- def makeMultiArchManifest(cls, build_request):
- """Returns the multi-arch manifest content including all builds of
- the given build_request.
+ def makeMultiArchManifest(cls, build_request, uploaded_builds):
+ """Returns the multi-arch manifest content including all uploaded
+ builds of the given build_request.
"""
manifests = []
- for build in build_request.builds:
+ for build in uploaded_builds:
build_manifest = build_request.uploaded_manifests.get(build.id)
if not build_manifest:
continue
@@ -376,11 +376,12 @@ class OCIRegistryClient:
"manifests": manifests}
@classmethod
- def uploadManifestList(cls, build_request):
+ def uploadManifestList(cls, build_request, uploaded_builds):
"""Uploads to all build_request.recipe.push_rules the manifest list
for the builds in the given build_request.
"""
- multi_manifest_content = cls.makeMultiArchManifest(build_request)
+ multi_manifest_content = cls.makeMultiArchManifest(
+ build_request, uploaded_builds)
for push_rule in build_request.recipe.push_rules:
http_client = RegistryHTTPClient.getInstance(push_rule)
cls._uploadRegistryManifest(
diff --git a/lib/lp/oci/tests/test_ocirecipebuildjob.py b/lib/lp/oci/tests/test_ocirecipebuildjob.py
index 2b39f4e..598940f 100644
--- a/lib/lp/oci/tests/test_ocirecipebuildjob.py
+++ b/lib/lp/oci/tests/test_ocirecipebuildjob.py
@@ -19,6 +19,7 @@ from testtools.matchers import (
Equals,
MatchesDict,
MatchesListwise,
+ MatchesSetwise,
MatchesStructure,
)
import transaction
@@ -165,31 +166,32 @@ class TestOCIRecipeBuildJobDerived(TestCaseWithFactory):
class MultiArchRecipeMixin:
- def makeRecipe(self, include_i386=True, include_amd64=True):
- i386 = getUtility(IProcessorSet).getByName("386")
- amd64 = getUtility(IProcessorSet).getByName("amd64")
- recipe = self.factory.makeOCIRecipe()
- distroseries = self.factory.makeDistroSeries(
- distribution=recipe.oci_project.distribution)
- distro_i386 = self.factory.makeDistroArchSeries(
- distroseries=distroseries, architecturetag="i386",
- processor=i386)
- distro_i386.addOrUpdateChroot(self.factory.makeLibraryFileAlias())
- distro_amd64 = self.factory.makeDistroArchSeries(
- distroseries=distroseries, architecturetag="amd64",
- processor=amd64)
- distro_amd64.addOrUpdateChroot(self.factory.makeLibraryFileAlias())
-
- archs = []
+ def makeRecipe(self, include_i386=True, include_amd64=True,
+ include_hppa=False):
+ processors = []
if include_i386:
- archs.append(i386)
+ processors.append("386")
if include_amd64:
- archs.append(amd64)
+ processors.append("amd64")
+ if include_hppa:
+ processors.append("hppa")
+ archs = []
+ recipe = self.factory.makeOCIRecipe(require_virtualized=False)
+ distroseries = self.factory.makeDistroSeries(
+ distribution=recipe.oci_project.distribution)
+ for processor_name in processors:
+ proc = getUtility(IProcessorSet).getByName(processor_name)
+ distro_arch = self.factory.makeDistroArchSeries(
+ distroseries=distroseries, architecturetag=processor_name,
+ processor=proc)
+ distro_arch.addOrUpdateChroot(self.factory.makeLibraryFileAlias())
+ archs.append(proc)
recipe.setProcessors(archs)
return recipe
- def makeBuildRequest(self, include_i386=True, include_amd64=True):
- recipe = self.makeRecipe(include_i386, include_amd64)
+ def makeBuildRequest(self, include_i386=True, include_amd64=True,
+ include_hppa=False):
+ recipe = self.makeRecipe(include_i386, include_amd64, include_hppa)
# Creates a build request with a build in it.
build_request = recipe.requestBuilds(recipe.owner)
with admin_logged_in():
@@ -282,7 +284,7 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin):
run_isolated_jobs([job])
self.assertEqual([((ocibuild,), {})], client.upload.calls)
- self.assertEqual([((build_request,), {})],
+ self.assertEqual([((build_request, {ocibuild}), {})],
client.uploadManifestList.calls)
self.assertContentEqual([job], ocibuild.registry_upload_jobs)
self.assertIsNone(job.error_summary)
@@ -292,8 +294,8 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin):
def test_run_multiple_architectures(self):
build_request = self.makeBuildRequest()
- builds = build_request.builds
- self.assertEqual(2, builds.count())
+ builds = list(build_request.builds)
+ self.assertEqual(2, len(builds))
self.assertEqual(builds[0].build_request, builds[1].build_request)
upload_jobs = []
@@ -307,14 +309,20 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin):
with dbuser(config.IOCIRegistryUploadJobSource.dbuser):
JobRunner([upload_jobs[0]]).runAll()
self.assertEqual([((builds[0],), {})], client.upload.calls)
- self.assertEqual([], client.uploadManifestList.calls)
+ # Should have tried to upload the manifest list with only the first
+ # build.
+ self.assertEqual([((build_request, set(builds[:1])), {})],
+ client.uploadManifestList.calls)
with dbuser(config.IOCIRegistryUploadJobSource.dbuser):
JobRunner([upload_jobs[1]]).runAll()
self.assertEqual(
[((builds[0],), {}), ((builds[1],), {})], client.upload.calls)
- self.assertEqual([((builds[1].build_request, ), {})],
- client.uploadManifestList.calls)
+ # Should have tried to upload the manifest list with both builds.
+ self.assertEqual(
+ [((build_request, set(builds[:1])), {}),
+ ((build_request, set(builds)), {})],
+ client.uploadManifestList.calls)
def test_failing_upload_does_not_retries_automatically(self):
build_request = self.makeBuildRequest(include_i386=False)
@@ -368,19 +376,19 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin):
self.assertEqual(JobStatus.COMPLETED, upload_job.status)
self.assertTrue(upload_job.build_uploaded)
- def test_allBuildsUploaded_lock_between_two_jobs(self):
- """Simple test to ensure that allBuildsUploaded method locks
+ def test_getUploadedBuilds_lock_between_two_jobs(self):
+ """Simple test to ensure that getUploadedBuilds method locks
rows in the database and make concurrent calls wait for that.
This is not a 100% reliable way to check that concurrent calls to
- allBuildsUploaded will queue up since it relies on the
+ getUploadedBuilds will queue up since it relies on the
execution time, but it's a "good enough" approach: this test might
pass if the machine running it is *really, really* slow, but a failure
here will indicate that something is for sure wrong.
"""
class AllBuildsUploadedChecker(threading.Thread):
- """Thread to run upload_job.allBuildsUploaded tracking the time."""
+ """Thread to run upload_job.getUploadedBuilds tracking the time."""
def __init__(self, build_request):
super(AllBuildsUploadedChecker, self).__init__()
self.build_request = build_request
@@ -418,16 +426,20 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin):
self.bootstrap()
self.start_date = datetime.now()
try:
- self.result = self.upload_job.allBuildsUploaded(
+ self.result = self.upload_job.getUploadedBuilds(
self.build_request)
except Exception as e:
self.error = e
self.end_date = datetime.now()
# Create a build request with 2 builds.
- build_request = self.makeBuildRequest()
+ build_request = self.makeBuildRequest(
+ include_i386=True, include_amd64=True, include_hppa=True)
builds = build_request.builds
- self.assertEqual(2, builds.count())
+ self.assertEqual(3, builds.count())
+
+ # Fail one of the builds, to make sure we are ignoring it.
+ removeSecurityProxy(builds[2]).status = BuildStatus.FAILEDTOBUILD
# Create the upload job for the first build.
upload_job1 = OCIRegistryUploadJob.create(builds[0])
@@ -440,9 +452,10 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin):
waiting_time = 2
# Start a clean transaction and lock the rows at database level.
transaction.commit()
- self.assertFalse(upload_job1.allBuildsUploaded(build_request))
+ self.assertEqual(
+ {builds[0]}, upload_job1.getUploadedBuilds(build_request))
- # Start, in parallel, another upload job to run `allBuildsUploaded`.
+ # Start, in parallel, another upload job to run `getUploadedBuilds`.
concurrent_checker = AllBuildsUploadedChecker(build_request)
concurrent_checker.start()
# Wait until concurrent_checker is ready to measure the time waiting
@@ -463,9 +476,14 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin):
# waiting_time to finish running (since it was waiting).
concurrent_checker.join()
self.assertIsNone(concurrent_checker.error)
- self.assertTrue(concurrent_checker.result)
self.assertGreaterEqual(
concurrent_checker.lock_duration, timedelta(seconds=waiting_time))
+ # Should have noticed that both builds are ready to upload.
+ self.assertEqual(2, len(concurrent_checker.result))
+ thread_build1, thread_build2 = concurrent_checker.result
+ self.assertThat(set(builds[:2]), MatchesSetwise(
+ MatchesStructure(id=Equals(thread_build1.id)),
+ MatchesStructure(id=Equals(thread_build2.id))))
def test_run_failed_registry_error(self):
# A run that fails with a registry error sets the registry upload
diff --git a/lib/lp/oci/tests/test_ociregistryclient.py b/lib/lp/oci/tests/test_ociregistryclient.py
index 8de88b4..9c3becd 100644
--- a/lib/lp/oci/tests/test_ociregistryclient.py
+++ b/lib/lp/oci/tests/test_ociregistryclient.py
@@ -548,18 +548,23 @@ class TestOCIRegistryClient(OCIConfigHelperMixin, SpyProxyCallsMixin,
build1 = self.build
build2 = self.factory.makeOCIRecipeBuild(
recipe=recipe)
+ build3 = self.factory.makeOCIRecipeBuild(
+ recipe=recipe)
naked_build1 = removeSecurityProxy(build1)
- naked_build2 = removeSecurityProxy(build1)
+ naked_build2 = removeSecurityProxy(build2)
+ naked_build3 = removeSecurityProxy(build3)
naked_build1.processor = getUtility(IProcessorSet).getByName('386')
naked_build2.processor = getUtility(IProcessorSet).getByName('amd64')
+ naked_build3.processor = getUtility(IProcessorSet).getByName('hppa')
# Creates a mock IOCIRecipeRequestBuildsJobSource, as it was created
- # by the celery job and triggered the 2 registry uploads already.
+ # by the celery job and triggered the 3 registry uploads already.
job = mock.Mock()
- job.builds = [build1, build2]
+ job.builds = [build1, build2, build3]
job.uploaded_manifests = {
build1.id: {"digest": "build1digest", "size": 123},
build2.id: {"digest": "build2digest", "size": 321},
+ build3.id: {"digest": "build2digest", "size": 333},
}
job_source = mock.Mock()
job_source.getByOCIRecipeAndID.return_value = job
@@ -572,7 +577,8 @@ class TestOCIRegistryClient(OCIConfigHelperMixin, SpyProxyCallsMixin,
"GET", "{}/v2/".format(push_rule.registry_url), status=200)
self.addManifestResponses(push_rule, status_code=201)
- self.client.uploadManifestList(build_request)
+ # Let's try to generate the manifest for just 2 of the 3 builds:
+ self.client.uploadManifestList(build_request, [build1, build2])
self.assertEqual(2, len(responses.calls))
auth_call, manifest_call = responses.calls
self.assertEndsWith(
@@ -583,13 +589,13 @@ class TestOCIRegistryClient(OCIConfigHelperMixin, SpyProxyCallsMixin,
"mediaType": "application/"
"vnd.docker.distribution.manifest.list.v2+json",
"manifests": [{
- "platform": {"os": "linux", "architecture": "amd64"},
+ "platform": {"os": "linux", "architecture": "386"},
"mediaType": "application/"
"vnd.docker.distribution.manifest.v2+json",
"digest": "build1digest",
"size": 123
}, {
- "platform": {"os": "linux", "architecture": "386"},
+ "platform": {"os": "linux", "architecture": "amd64"},
"mediaType": "application/"
"vnd.docker.distribution.manifest.v2+json",
"digest": "build2digest",