← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~pappacena/launchpad:partial-oci-manifest-list-upload into launchpad:master

 

Thiago F. Pappacena has proposed merging ~pappacena/launchpad:partial-oci-manifest-list-upload into launchpad:master.

Commit message:
Uploading intermediate OCI manifest list as archs are built

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #1900902 in Launchpad itself: "OCI: multiarch builds are not uploaded when at least one failed"
  https://bugs.launchpad.net/launchpad/+bug/1900902

For more details, see:
https://code.launchpad.net/~pappacena/launchpad/+git/launchpad/+merge/392635
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/launchpad:partial-oci-manifest-list-upload into launchpad:master.
diff --git a/lib/lp/oci/model/ocirecipebuild.py b/lib/lp/oci/model/ocirecipebuild.py
index 4b0682c..eda6fa3 100644
--- a/lib/lp/oci/model/ocirecipebuild.py
+++ b/lib/lp/oci/model/ocirecipebuild.py
@@ -30,6 +30,7 @@ from storm.locals import (
 from storm.store import EmptyResultSet
 from zope.component import getUtility
 from zope.interface import implementer
+from zope.security.proxy import isinstance as zope_isinstance
 
 from lp.app.errors import NotFoundError
 from lp.buildmaster.enums import (
@@ -178,6 +179,11 @@ class OCIRecipeBuild(PackageBuildMixin, Storm):
         if build_request is not None:
             self.build_request_id = build_request.id
 
+    def __eq__(self, other):
+        if not zope_isinstance(other, self.__class__):
+            return False
+        return self.id == other.id
+
     @property
     def build_request(self):
         """See `IOCIRecipeBuild`."""
diff --git a/lib/lp/oci/model/ocirecipebuildjob.py b/lib/lp/oci/model/ocirecipebuildjob.py
index 0db2514..1e4a7e8 100644
--- a/lib/lp/oci/model/ocirecipebuildjob.py
+++ b/lib/lp/oci/model/ocirecipebuildjob.py
@@ -278,13 +278,15 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
     def build_uploaded(self, value):
         self.json_data["build_uploaded"] = bool(value)
 
-    def allBuildsUploaded(self, build_request):
-        """Returns True if all builds of the given build_request already
-        finished uploading. False otherwise.
+    def getUploadedBuilds(self, build_request):
+        """Returns the list of builds in the given build_request that
+        already finished uploading.
 
         Note that this method locks all upload jobs at database level,
         preventing them from updating their status until the end of the
-        current transaction. Use it with caution.
+        current transaction. Use it with caution. Note also that self.build is
+        always included in the resulting list, as this method should only be
+        called *after* the untagged manifest is uploaded.
         """
         builds = list(build_request.builds)
         uploads_per_build = {i: list(i.registry_upload_jobs) for i in builds}
@@ -292,15 +294,15 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
 
         # Lock the Job rows, so no other job updates its status until the
         # end of this job's transaction. This is done to avoid race conditions,
-        # where 2 upload jobs could be running simultaneously and none of them
-        # realises that is the last upload.
+        # where 2 upload jobs could be running simultaneously and end up
+        # uploading an incomplete manifest list at the same time.
         # Note also that new upload jobs might be created between the
         # transaction begin and this lock takes place, but in this case the
         # new upload is either a retry from a failed upload, or the first
-        # upload for one of the existing builds. Either way, we would see that
-        # build as "not uploaded yet", which is ok for this method, and the
-        # new job will block until these locks are released, so we should be
-        # safe.
+        # upload for one of the existing builds. Either way, we will succeed
+        # to execute our manifest list upload, and the new job will wait
+        # until this job finishes to upload their version of the manifest
+        # list (which will override our version, including both manifests).
         store = IMasterStore(builds[0])
         placeholders = ', '.join('?' for _ in upload_jobs)
         sql = (
@@ -310,28 +312,27 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
             job_id: JobStatus.items[status] for job_id, status in
             store.execute(sql, [i.job_id for i in upload_jobs])}
 
+        builds = set()
         for build, upload_jobs in uploads_per_build.items():
             has_finished_upload = any(
                 job_status[i.job_id] == JobStatus.COMPLETED
                 or i.job_id == self.job_id
                 for i in upload_jobs)
-            if not has_finished_upload:
-                return False
-        return True
+            if has_finished_upload:
+                builds.add(build)
+        return builds
 
     def uploadManifestList(self, client):
-        """Uploads the aggregated manifest list for all builds in the
+        """Uploads the aggregated manifest list for all uploaded builds in the
         current build request.
         """
-        # The "allBuildsUploaded" call will lock, on the database,
-        # all upload jobs for update until this transaction finishes.
-        # So, make sure this is the last thing being done by this job.
         build_request = self.build.build_request
         if not build_request:
             return
         try:
-            if self.allBuildsUploaded(build_request):
-                client.uploadManifestList(build_request)
+            uploaded_builds = self.getUploadedBuilds(build_request)
+            if uploaded_builds:
+                client.uploadManifestList(build_request, uploaded_builds)
         except OCIRegistryError:
             # Do not retry automatically on known OCI registry errors.
             raise
@@ -351,8 +352,8 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
 
                 self.uploadManifestList(client)
                 # Force this job status to COMPLETED in the same transaction we
-                # called `allBuildsUpdated` (in the uploadManifestList call
-                # above) to release the lock already including the new status.
+                # called `getUploadedBuilds` (in the uploadManifestList call
+                # above) to release the lock already including our new status.
                 # This way, any other transaction that was blocked waiting to
                 # get the info about the upload jobs will immediately have the
                 # new status of this job, avoiding race conditions. Keep the
diff --git a/lib/lp/oci/model/ociregistryclient.py b/lib/lp/oci/model/ociregistryclient.py
index 7c67b57..a264ab7 100644
--- a/lib/lp/oci/model/ociregistryclient.py
+++ b/lib/lp/oci/model/ociregistryclient.py
@@ -350,12 +350,12 @@ class OCIRegistryClient:
             raise MultipleOCIRegistryError(exceptions)
 
     @classmethod
-    def makeMultiArchManifest(cls, build_request):
-        """Returns the multi-arch manifest content including all builds of
-        the given build_request.
+    def makeMultiArchManifest(cls, build_request, uploaded_builds):
+        """Returns the multi-arch manifest content including all uploaded
+        builds of the given build_request.
         """
         manifests = []
-        for build in build_request.builds:
+        for build in uploaded_builds:
             build_manifest = build_request.uploaded_manifests.get(build.id)
             if not build_manifest:
                 continue
@@ -376,11 +376,12 @@ class OCIRegistryClient:
           "manifests": manifests}
 
     @classmethod
-    def uploadManifestList(cls, build_request):
+    def uploadManifestList(cls, build_request, uploaded_builds):
         """Uploads to all build_request.recipe.push_rules the manifest list
         for the builds in the given build_request.
         """
-        multi_manifest_content = cls.makeMultiArchManifest(build_request)
+        multi_manifest_content = cls.makeMultiArchManifest(
+            build_request, uploaded_builds)
         for push_rule in build_request.recipe.push_rules:
             http_client = RegistryHTTPClient.getInstance(push_rule)
             cls._uploadRegistryManifest(
diff --git a/lib/lp/oci/tests/test_ocirecipebuildjob.py b/lib/lp/oci/tests/test_ocirecipebuildjob.py
index 2b39f4e..598940f 100644
--- a/lib/lp/oci/tests/test_ocirecipebuildjob.py
+++ b/lib/lp/oci/tests/test_ocirecipebuildjob.py
@@ -19,6 +19,7 @@ from testtools.matchers import (
     Equals,
     MatchesDict,
     MatchesListwise,
+    MatchesSetwise,
     MatchesStructure,
     )
 import transaction
@@ -165,31 +166,32 @@ class TestOCIRecipeBuildJobDerived(TestCaseWithFactory):
 
 
 class MultiArchRecipeMixin:
-    def makeRecipe(self, include_i386=True, include_amd64=True):
-        i386 = getUtility(IProcessorSet).getByName("386")
-        amd64 = getUtility(IProcessorSet).getByName("amd64")
-        recipe = self.factory.makeOCIRecipe()
-        distroseries = self.factory.makeDistroSeries(
-            distribution=recipe.oci_project.distribution)
-        distro_i386 = self.factory.makeDistroArchSeries(
-            distroseries=distroseries, architecturetag="i386",
-            processor=i386)
-        distro_i386.addOrUpdateChroot(self.factory.makeLibraryFileAlias())
-        distro_amd64 = self.factory.makeDistroArchSeries(
-            distroseries=distroseries, architecturetag="amd64",
-            processor=amd64)
-        distro_amd64.addOrUpdateChroot(self.factory.makeLibraryFileAlias())
-
-        archs = []
+    def makeRecipe(self, include_i386=True, include_amd64=True,
+                   include_hppa=False):
+        processors = []
         if include_i386:
-            archs.append(i386)
+            processors.append("386")
         if include_amd64:
-            archs.append(amd64)
+            processors.append("amd64")
+        if include_hppa:
+            processors.append("hppa")
+        archs = []
+        recipe = self.factory.makeOCIRecipe(require_virtualized=False)
+        distroseries = self.factory.makeDistroSeries(
+            distribution=recipe.oci_project.distribution)
+        for processor_name in processors:
+            proc = getUtility(IProcessorSet).getByName(processor_name)
+            distro_arch = self.factory.makeDistroArchSeries(
+                distroseries=distroseries, architecturetag=processor_name,
+                processor=proc)
+            distro_arch.addOrUpdateChroot(self.factory.makeLibraryFileAlias())
+            archs.append(proc)
         recipe.setProcessors(archs)
         return recipe
 
-    def makeBuildRequest(self, include_i386=True, include_amd64=True):
-        recipe = self.makeRecipe(include_i386, include_amd64)
+    def makeBuildRequest(self, include_i386=True, include_amd64=True,
+                         include_hppa=False):
+        recipe = self.makeRecipe(include_i386, include_amd64, include_hppa)
         # Creates a build request with a build in it.
         build_request = recipe.requestBuilds(recipe.owner)
         with admin_logged_in():
@@ -282,7 +284,7 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin):
             run_isolated_jobs([job])
 
         self.assertEqual([((ocibuild,), {})], client.upload.calls)
-        self.assertEqual([((build_request,), {})],
+        self.assertEqual([((build_request, {ocibuild}), {})],
                          client.uploadManifestList.calls)
         self.assertContentEqual([job], ocibuild.registry_upload_jobs)
         self.assertIsNone(job.error_summary)
@@ -292,8 +294,8 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin):
 
     def test_run_multiple_architectures(self):
         build_request = self.makeBuildRequest()
-        builds = build_request.builds
-        self.assertEqual(2, builds.count())
+        builds = list(build_request.builds)
+        self.assertEqual(2, len(builds))
         self.assertEqual(builds[0].build_request, builds[1].build_request)
 
         upload_jobs = []
@@ -307,14 +309,20 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin):
         with dbuser(config.IOCIRegistryUploadJobSource.dbuser):
             JobRunner([upload_jobs[0]]).runAll()
         self.assertEqual([((builds[0],), {})], client.upload.calls)
-        self.assertEqual([], client.uploadManifestList.calls)
+        # Should have tried to upload the manifest list with only the first
+        # build.
+        self.assertEqual([((build_request, set(builds[:1])), {})],
+                         client.uploadManifestList.calls)
 
         with dbuser(config.IOCIRegistryUploadJobSource.dbuser):
             JobRunner([upload_jobs[1]]).runAll()
         self.assertEqual(
             [((builds[0],), {}), ((builds[1],), {})], client.upload.calls)
-        self.assertEqual([((builds[1].build_request, ), {})],
-                         client.uploadManifestList.calls)
+        # Should have tried to upload the manifest list with both builds.
+        self.assertEqual(
+            [((build_request, set(builds[:1])), {}),
+             ((build_request, set(builds)), {})],
+            client.uploadManifestList.calls)
 
     def test_failing_upload_does_not_retries_automatically(self):
         build_request = self.makeBuildRequest(include_i386=False)
@@ -368,19 +376,19 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin):
         self.assertEqual(JobStatus.COMPLETED, upload_job.status)
         self.assertTrue(upload_job.build_uploaded)
 
-    def test_allBuildsUploaded_lock_between_two_jobs(self):
-        """Simple test to ensure that allBuildsUploaded method locks
+    def test_getUploadedBuilds_lock_between_two_jobs(self):
+        """Simple test to ensure that getUploadedBuilds method locks
         rows in the database and make concurrent calls wait for that.
 
         This is not a 100% reliable way to check that concurrent calls to
-        allBuildsUploaded will queue up since it relies on the
+        getUploadedBuilds will queue up since it relies on the
         execution time, but it's a "good enough" approach: this test might
         pass if the machine running it is *really, really* slow, but a failure
         here will indicate that something is for sure wrong.
         """
 
         class AllBuildsUploadedChecker(threading.Thread):
-            """Thread to run upload_job.allBuildsUploaded tracking the time."""
+            """Thread to run upload_job.getUploadedBuilds tracking the time."""
             def __init__(self, build_request):
                 super(AllBuildsUploadedChecker, self).__init__()
                 self.build_request = build_request
@@ -418,16 +426,20 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin):
                     self.bootstrap()
                     self.start_date = datetime.now()
                     try:
-                        self.result = self.upload_job.allBuildsUploaded(
+                        self.result = self.upload_job.getUploadedBuilds(
                             self.build_request)
                     except Exception as e:
                         self.error = e
                     self.end_date = datetime.now()
 
         # Create a build request with 2 builds.
-        build_request = self.makeBuildRequest()
+        build_request = self.makeBuildRequest(
+            include_i386=True, include_amd64=True, include_hppa=True)
         builds = build_request.builds
-        self.assertEqual(2, builds.count())
+        self.assertEqual(3, builds.count())
+
+        # Fail one of the builds, to make sure we are ignoring it.
+        removeSecurityProxy(builds[2]).status = BuildStatus.FAILEDTOBUILD
 
         # Create the upload job for the first build.
         upload_job1 = OCIRegistryUploadJob.create(builds[0])
@@ -440,9 +452,10 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin):
         waiting_time = 2
         # Start a clean transaction and lock the rows at database level.
         transaction.commit()
-        self.assertFalse(upload_job1.allBuildsUploaded(build_request))
+        self.assertEqual(
+            {builds[0]}, upload_job1.getUploadedBuilds(build_request))
 
-        # Start, in parallel, another upload job to run `allBuildsUploaded`.
+        # Start, in parallel, another upload job to run `getUploadedBuilds`.
         concurrent_checker = AllBuildsUploadedChecker(build_request)
         concurrent_checker.start()
         # Wait until concurrent_checker is ready to measure the time waiting
@@ -463,9 +476,14 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin):
         # waiting_time to finish running (since it was waiting).
         concurrent_checker.join()
         self.assertIsNone(concurrent_checker.error)
-        self.assertTrue(concurrent_checker.result)
         self.assertGreaterEqual(
             concurrent_checker.lock_duration, timedelta(seconds=waiting_time))
+        # Should have noticed that both builds are ready to upload.
+        self.assertEqual(2, len(concurrent_checker.result))
+        thread_build1, thread_build2 = concurrent_checker.result
+        self.assertThat(set(builds[:2]), MatchesSetwise(
+            MatchesStructure(id=Equals(thread_build1.id)),
+            MatchesStructure(id=Equals(thread_build2.id))))
 
     def test_run_failed_registry_error(self):
         # A run that fails with a registry error sets the registry upload
diff --git a/lib/lp/oci/tests/test_ociregistryclient.py b/lib/lp/oci/tests/test_ociregistryclient.py
index 8de88b4..9c3becd 100644
--- a/lib/lp/oci/tests/test_ociregistryclient.py
+++ b/lib/lp/oci/tests/test_ociregistryclient.py
@@ -548,18 +548,23 @@ class TestOCIRegistryClient(OCIConfigHelperMixin, SpyProxyCallsMixin,
         build1 = self.build
         build2 = self.factory.makeOCIRecipeBuild(
             recipe=recipe)
+        build3 = self.factory.makeOCIRecipeBuild(
+            recipe=recipe)
         naked_build1 = removeSecurityProxy(build1)
-        naked_build2 = removeSecurityProxy(build1)
+        naked_build2 = removeSecurityProxy(build2)
+        naked_build3 = removeSecurityProxy(build3)
         naked_build1.processor = getUtility(IProcessorSet).getByName('386')
         naked_build2.processor = getUtility(IProcessorSet).getByName('amd64')
+        naked_build3.processor = getUtility(IProcessorSet).getByName('hppa')
 
         # Creates a mock IOCIRecipeRequestBuildsJobSource, as it was created
-        # by the celery job and triggered the 2 registry uploads already.
+        # by the celery job and triggered the 3 registry uploads already.
         job = mock.Mock()
-        job.builds = [build1, build2]
+        job.builds = [build1, build2, build3]
         job.uploaded_manifests = {
             build1.id: {"digest": "build1digest", "size": 123},
             build2.id: {"digest": "build2digest", "size": 321},
+            build3.id: {"digest": "build2digest", "size": 333},
         }
         job_source = mock.Mock()
         job_source.getByOCIRecipeAndID.return_value = job
@@ -572,7 +577,8 @@ class TestOCIRegistryClient(OCIConfigHelperMixin, SpyProxyCallsMixin,
             "GET", "{}/v2/".format(push_rule.registry_url), status=200)
         self.addManifestResponses(push_rule, status_code=201)
 
-        self.client.uploadManifestList(build_request)
+        # Let's try to generate the manifest for just 2 of the 3 builds:
+        self.client.uploadManifestList(build_request, [build1, build2])
         self.assertEqual(2, len(responses.calls))
         auth_call, manifest_call = responses.calls
         self.assertEndsWith(
@@ -583,13 +589,13 @@ class TestOCIRegistryClient(OCIConfigHelperMixin, SpyProxyCallsMixin,
             "mediaType": "application/"
                          "vnd.docker.distribution.manifest.list.v2+json",
             "manifests": [{
-                "platform": {"os": "linux", "architecture": "amd64"},
+                "platform": {"os": "linux", "architecture": "386"},
                 "mediaType": "application/"
                              "vnd.docker.distribution.manifest.v2+json",
                 "digest": "build1digest",
                 "size": 123
             }, {
-                "platform": {"os": "linux", "architecture": "386"},
+                "platform": {"os": "linux", "architecture": "amd64"},
                 "mediaType": "application/"
                              "vnd.docker.distribution.manifest.v2+json",
                 "digest": "build2digest",