← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~twom/launchpad:oci-try-the-lock-for-the-race into launchpad:master

 

Tom Wardill has proposed merging ~twom/launchpad:oci-try-the-lock-for-the-race into launchpad:master.

Commit message:
Use an AdvisoryLock to prevent simultaneous Registry Upload jobs

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~twom/launchpad/+git/launchpad/+merge/400820

The SELECT .. FOR UPDATE mechanism did not prevent a previous job from committing while holding an earlier version of the metadata.
Instead, lock based on the ocirecipe to ensure we only get one at a time.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~twom/launchpad:oci-try-the-lock-for-the-race into launchpad:master.
diff --git a/lib/lp/oci/model/ocirecipebuildjob.py b/lib/lp/oci/model/ocirecipebuildjob.py
index 20337c1..c819652 100644
--- a/lib/lp/oci/model/ocirecipebuildjob.py
+++ b/lib/lp/oci/model/ocirecipebuildjob.py
@@ -24,6 +24,7 @@ from storm.databases.postgres import JSON
 from storm.locals import (
     Int,
     Reference,
+    Store,
     )
 import transaction
 from zope.component import getUtility
@@ -48,6 +49,11 @@ from lp.services.database.interfaces import (
     IMasterStore,
     IStore,
     )
+from lp.services.database.locking import (
+    AdvisoryLockHeld,
+    LockType,
+    try_advisory_lock,
+    )
 from lp.services.database.stormbase import StormBase
 from lp.services.job.interfaces.job import JobStatus
 from lp.services.job.model.job import (
@@ -186,7 +192,7 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
     class ManifestListUploadError(Exception):
         pass
 
-    retry_error_types = (ManifestListUploadError, )
+    retry_error_types = (ManifestListUploadError, AdvisoryLockHeld,)
     max_retries = 5
 
     config = config.IOCIRegistryUploadJobSource
@@ -290,32 +296,11 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
         """
         builds = list(build_request.builds)
         uploads_per_build = {i: list(i.registry_upload_jobs) for i in builds}
-        upload_jobs = sum(uploads_per_build.values(), [])
-
-        # Lock the Job rows, so no other job updates its status until the
-        # end of this job's transaction. This is done to avoid race conditions,
-        # where 2 upload jobs could be running simultaneously and end up
-        # uploading an incomplete manifest list at the same time.
-        # Note also that new upload jobs might be created between the
-        # transaction begin and this lock takes place, but in this case the
-        # new upload is either a retry from a failed upload, or the first
-        # upload for one of the existing builds. Either way, we will succeed
-        # to execute our manifest list upload, and the new job will wait
-        # until this job finishes to upload their version of the manifest
-        # list (which will override our version, including both manifests).
-        store = IMasterStore(builds[0])
-        placeholders = ', '.join('?' for _ in upload_jobs)
-        sql = (
-            "SELECT id, status FROM job WHERE id IN (%s) FOR UPDATE"
-            % placeholders)
-        job_status = {
-            job_id: JobStatus.items[status] for job_id, status in
-            store.execute(sql, [i.job_id for i in upload_jobs])}
 
         builds = set()
         for build, upload_jobs in uploads_per_build.items():
             has_finished_upload = any(
-                job_status[i.job_id] == JobStatus.COMPLETED
+                i.status == JobStatus.COMPLETED
                 or i.job_id == self.job_id
                 for i in upload_jobs)
             if has_finished_upload:
@@ -342,24 +327,25 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
     def run(self):
         """See `IRunnableJob`."""
         client = getUtility(IOCIRegistryClient)
-        # XXX twom 2020-04-16 This is taken from SnapStoreUploadJob
-        # it will need to gain retry support.
         try:
-            try:
-                if not self.build_uploaded:
-                    client.upload(self.build)
-                    self.build_uploaded = True
-
-                self.uploadManifestList(client)
-
-            except OCIRegistryError as e:
-                self.error_summary = str(e)
-                self.errors = e.errors
-                raise
-            except Exception as e:
-                self.error_summary = str(e)
-                self.errors = None
-                raise
+            with try_advisory_lock(LockType.REGISTRY_UPLOAD,
+                                   self.build.recipe.id,
+                                   Store.of(self.build.recipe)):
+                try:
+                    if not self.build_uploaded:
+                        client.upload(self.build)
+                        self.build_uploaded = True
+
+                    self.uploadManifestList(client)
+
+                except OCIRegistryError as e:
+                    self.error_summary = str(e)
+                    self.errors = e.errors
+                    raise
+                except Exception as e:
+                    self.error_summary = str(e)
+                    self.errors = None
+                    raise
         except Exception:
             transaction.commit()
             raise
diff --git a/lib/lp/oci/tests/test_ocirecipebuildjob.py b/lib/lp/oci/tests/test_ocirecipebuildjob.py
index 9388a68..0ff2938 100644
--- a/lib/lp/oci/tests/test_ocirecipebuildjob.py
+++ b/lib/lp/oci/tests/test_ocirecipebuildjob.py
@@ -11,10 +11,13 @@ from datetime import (
     datetime,
     timedelta,
     )
+import os
+import signal
 import threading
 import time
 
 from fixtures import FakeLogger
+from storm.locals import Store
 from testtools.matchers import (
     Equals,
     MatchesDict,
@@ -50,6 +53,11 @@ from lp.oci.model.ocirecipebuildjob import (
     )
 from lp.services.compat import mock
 from lp.services.config import config
+from lp.services.database.locking import (
+    AdvisoryLockHeld,
+    LockType,
+    try_advisory_lock,
+    )
 from lp.services.features.testing import FeatureFixture
 from lp.services.job.interfaces.job import JobStatus
 from lp.services.job.runner import JobRunner
@@ -64,7 +72,10 @@ from lp.testing import (
     admin_logged_in,
     TestCaseWithFactory,
     )
-from lp.testing.dbuser import dbuser
+from lp.testing.dbuser import (
+    dbuser,
+    switch_dbuser,
+    )
 from lp.testing.fakemethod import FakeMethod
 from lp.testing.fixture import ZopeUtilityFixture
 from lp.testing.layers import (
@@ -396,114 +407,30 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin,
         self.assertEqual(JobStatus.COMPLETED, upload_job.status)
         self.assertTrue(upload_job.build_uploaded)
 
-    def test_getUploadedBuilds_lock_between_two_jobs(self):
-        """Simple test to ensure that getUploadedBuilds method locks
-        rows in the database and make concurrent calls wait for that.
-
-        This is not a 100% reliable way to check that concurrent calls to
-        getUploadedBuilds will queue up since it relies on the
-        execution time, but it's a "good enough" approach: this test might
-        pass if the machine running it is *really, really* slow, but a failure
-        here will indicate that something is for sure wrong.
-        """
-
-        class AllBuildsUploadedChecker(threading.Thread):
-            """Thread to run upload_job.getUploadedBuilds tracking the time."""
-            def __init__(self, build_request):
-                super(AllBuildsUploadedChecker, self).__init__()
-                self.build_request = build_request
-                self.upload_job = None
-                # Locks the measurement start until we finished running the
-                # bootstrap code. Parent thread should call waitBootstrap
-                # after self.start().
-                self.bootstrap_lock = threading.Lock()
-                self.bootstrap_lock.acquire()
-                self.result = None
-                self.error = None
-                self.start_date = None
-                self.end_date = None
-
-            @property
-            def lock_duration(self):
-                return self.end_date - self.start_date
-
-            def waitBootstrap(self):
-                """Wait until self.bootstrap finishes running."""
-                self.bootstrap_lock.acquire()
-                # We don't actually need the lock... just wanted to wait
-                # for it. let's release it then.
-                self.bootstrap_lock.release()
-
-            def bootstrap(self):
-                try:
-                    build = self.build_request.builds[1]
-                    self.upload_job = OCIRegistryUploadJob.create(build)
-                finally:
-                    self.bootstrap_lock.release()
-
-            def run(self):
-                with admin_logged_in():
-                    self.bootstrap()
-                    self.start_date = datetime.now()
-                    try:
-                        self.result = self.upload_job.getUploadedBuilds(
-                            self.build_request)
-                    except Exception as e:
-                        self.error = e
-                    self.end_date = datetime.now()
-
-        # Create a build request with 2 builds.
+    def test_getUploadedBuilds(self):
+        # Create a build request with 3 builds.
         build_request = self.makeBuildRequest(
             include_i386=True, include_amd64=True, include_hppa=True)
         builds = build_request.builds
         self.assertEqual(3, builds.count())
 
-        # Fail one of the builds, to make sure we are ignoring it.
-        removeSecurityProxy(builds[2]).status = BuildStatus.FAILEDTOBUILD
-
         # Create the upload job for the first build.
         upload_job1 = OCIRegistryUploadJob.create(builds[0])
         upload_job1 = removeSecurityProxy(upload_job1)
 
-        # How long the lock will be held by the first job, in seconds.
-        # Adjust to minimize false positives: a number too small here might
-        # make the test pass even if the lock is not correctly implemented.
-        # A number too big will slow down the test execution...
-        waiting_time = 2
-        # Start a clean transaction and lock the rows at database level.
-        transaction.commit()
-        self.assertEqual(
-            {builds[0]}, upload_job1.getUploadedBuilds(build_request))
-
-        # Start, in parallel, another upload job to run `getUploadedBuilds`.
-        concurrent_checker = AllBuildsUploadedChecker(build_request)
-        concurrent_checker.start()
-        # Wait until concurrent_checker is ready to measure the time waiting
-        # for the database lock.
-        concurrent_checker.waitBootstrap()
-
-        # Wait a bit and release the database lock by committing current
-        # transaction.
-        time.sleep(waiting_time)
-        # Let's force the first job to be finished, just to make sure the
-        # second job will realise it's the last one running.
-        upload_job1.start()
-        upload_job1.complete()
-        transaction.commit()
-
-        # Now, the concurrent checker should have already finished running,
-        # without any error and it should have taken at least the
-        # waiting_time to finish running (since it was waiting).
-        concurrent_checker.join()
-        self.assertIsNone(concurrent_checker.error)
-        self.assertGreaterEqual(
-            concurrent_checker.lock_duration, timedelta(seconds=waiting_time))
-        # Should have noticed that both builds are ready to upload.
-        self.assertEqual(2, len(concurrent_checker.result))
-        thread_build1, thread_build2 = concurrent_checker.result
-        self.assertThat(set(builds[:2]), MatchesSetwise(
-            MatchesStructure(id=Equals(thread_build1.id)),
-            MatchesStructure(id=Equals(thread_build2.id))))
+        upload_job2 = OCIRegistryUploadJob.create(builds[1])
+        upload_job2 = removeSecurityProxy(upload_job2)
+
+        client = FakeRegistryClient()
+        self.useFixture(ZopeUtilityFixture(client, IOCIRegistryClient))
+        with dbuser(config.IOCIRegistryUploadJobSource.dbuser):
+            run_isolated_jobs([upload_job1])
+
+        with dbuser(config.IOCIRegistryUploadJobSource.dbuser):
+            run_isolated_jobs([upload_job2])
+
+        result = upload_job1.getUploadedBuilds(build_request)
+        self.assertEqual({builds[0], builds[1]}, result)
 
     def test_run_failed_registry_error(self):
         # A run that fails with a registry error sets the registry upload
@@ -586,6 +513,53 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin,
 
         self.assertEqual(0, len(self.oopses))
 
+    def test_advisorylock_on_run(self):
+        # The job should take an advisory lock and any attempted
+        # simultaneous jobs should retry
+        logger = self.useFixture(FakeLogger())
+        build_request = self.makeBuildRequest(include_i386=False)
+        recipe = build_request.recipe
+
+        self.assertEqual(1, build_request.builds.count())
+        ocibuild = build_request.builds[0]
+        ocibuild.updateStatus(BuildStatus.FULLYBUILT)
+        self.makeWebhook(recipe)
+
+        self.assertContentEqual([], ocibuild.registry_upload_jobs)
+        job = OCIRegistryUploadJob.create(ocibuild)
+        client = FakeRegistryClient()
+        switch_dbuser(config.IOCIRegistryUploadJobSource.dbuser)
+        # Fork so that we can take an advisory lock from a different
+        # PostgreSQL session.
+        read, write = os.pipe()
+        pid = os.fork()
+        if pid == 0:  # child
+            os.close(read)
+            with try_advisory_lock(
+                    LockType.REGISTRY_UPLOAD,
+                    ocibuild.recipe.id,
+                    Store.of(ocibuild.recipe)):
+                os.write(write, b"1")
+                try:
+                    signal.pause()
+                except KeyboardInterrupt:
+                    pass
+            os._exit(0)
+        else:  # parent
+            try:
+                os.close(write)
+                os.read(read, 1)
+                runner = JobRunner([job])
+                runner.runAll()
+                self.assertEqual(JobStatus.WAITING, job.status)
+                self.assertEqual([], runner.oops_ids)
+                self.assertIn(
+                    "Scheduling retry due to AdvisoryLockHeld", logger.output)
+            finally:
+                os.kill(pid, signal.SIGINT)
+
+
+
 
 class TestOCIRegistryUploadJobViaCelery(TestCaseWithFactory,
                                         MultiArchRecipeMixin):
diff --git a/lib/lp/services/database/locking.py b/lib/lp/services/database/locking.py
index 266f971..3f1c478 100644
--- a/lib/lp/services/database/locking.py
+++ b/lib/lp/services/database/locking.py
@@ -44,6 +44,12 @@ class LockType(DBEnumeratedType):
         Package copy.
         """)
 
+    REGISTRY_UPLOAD = DBItem(3, """OCI Registry upload.
+
+        OCI Registry upload.
+        """
+    )
+
 
 @contextmanager
 def try_advisory_lock(lock_type, lock_id, store):