launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #26852
[Merge] ~twom/launchpad:oci-try-the-lock-for-the-race into launchpad:master
Tom Wardill has proposed merging ~twom/launchpad:oci-try-the-lock-for-the-race into launchpad:master.
Commit message:
Use an AdvisoryLock to prevent simultaneous Registry Upload jobs
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~twom/launchpad/+git/launchpad/+merge/400820
The SELECT .. FOR UPDATE mechanism did not prevent a previous job from committing while holding an earlier version of the metadata.
Instead, lock based on the ocirecipe to ensure we only get one at a time.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~twom/launchpad:oci-try-the-lock-for-the-race into launchpad:master.
diff --git a/lib/lp/oci/model/ocirecipebuildjob.py b/lib/lp/oci/model/ocirecipebuildjob.py
index 20337c1..c819652 100644
--- a/lib/lp/oci/model/ocirecipebuildjob.py
+++ b/lib/lp/oci/model/ocirecipebuildjob.py
@@ -24,6 +24,7 @@ from storm.databases.postgres import JSON
from storm.locals import (
Int,
Reference,
+ Store,
)
import transaction
from zope.component import getUtility
@@ -48,6 +49,11 @@ from lp.services.database.interfaces import (
IMasterStore,
IStore,
)
+from lp.services.database.locking import (
+ AdvisoryLockHeld,
+ LockType,
+ try_advisory_lock,
+ )
from lp.services.database.stormbase import StormBase
from lp.services.job.interfaces.job import JobStatus
from lp.services.job.model.job import (
@@ -186,7 +192,7 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
class ManifestListUploadError(Exception):
pass
- retry_error_types = (ManifestListUploadError, )
+ retry_error_types = (ManifestListUploadError, AdvisoryLockHeld,)
max_retries = 5
config = config.IOCIRegistryUploadJobSource
@@ -290,32 +296,11 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
"""
builds = list(build_request.builds)
uploads_per_build = {i: list(i.registry_upload_jobs) for i in builds}
- upload_jobs = sum(uploads_per_build.values(), [])
-
- # Lock the Job rows, so no other job updates its status until the
- # end of this job's transaction. This is done to avoid race conditions,
- # where 2 upload jobs could be running simultaneously and end up
- # uploading an incomplete manifest list at the same time.
- # Note also that new upload jobs might be created between the
- # transaction begin and this lock takes place, but in this case the
- # new upload is either a retry from a failed upload, or the first
- # upload for one of the existing builds. Either way, we will succeed
- # to execute our manifest list upload, and the new job will wait
- # until this job finishes to upload their version of the manifest
- # list (which will override our version, including both manifests).
- store = IMasterStore(builds[0])
- placeholders = ', '.join('?' for _ in upload_jobs)
- sql = (
- "SELECT id, status FROM job WHERE id IN (%s) FOR UPDATE"
- % placeholders)
- job_status = {
- job_id: JobStatus.items[status] for job_id, status in
- store.execute(sql, [i.job_id for i in upload_jobs])}
builds = set()
for build, upload_jobs in uploads_per_build.items():
has_finished_upload = any(
- job_status[i.job_id] == JobStatus.COMPLETED
+ i.status == JobStatus.COMPLETED
or i.job_id == self.job_id
for i in upload_jobs)
if has_finished_upload:
@@ -342,24 +327,25 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
def run(self):
"""See `IRunnableJob`."""
client = getUtility(IOCIRegistryClient)
- # XXX twom 2020-04-16 This is taken from SnapStoreUploadJob
- # it will need to gain retry support.
try:
- try:
- if not self.build_uploaded:
- client.upload(self.build)
- self.build_uploaded = True
-
- self.uploadManifestList(client)
-
- except OCIRegistryError as e:
- self.error_summary = str(e)
- self.errors = e.errors
- raise
- except Exception as e:
- self.error_summary = str(e)
- self.errors = None
- raise
+ with try_advisory_lock(LockType.REGISTRY_UPLOAD,
+ self.build.recipe.id,
+ Store.of(self.build.recipe)):
+ try:
+ if not self.build_uploaded:
+ client.upload(self.build)
+ self.build_uploaded = True
+
+ self.uploadManifestList(client)
+
+ except OCIRegistryError as e:
+ self.error_summary = str(e)
+ self.errors = e.errors
+ raise
+ except Exception as e:
+ self.error_summary = str(e)
+ self.errors = None
+ raise
except Exception:
transaction.commit()
raise
diff --git a/lib/lp/oci/tests/test_ocirecipebuildjob.py b/lib/lp/oci/tests/test_ocirecipebuildjob.py
index 9388a68..0ff2938 100644
--- a/lib/lp/oci/tests/test_ocirecipebuildjob.py
+++ b/lib/lp/oci/tests/test_ocirecipebuildjob.py
@@ -11,10 +11,13 @@ from datetime import (
datetime,
timedelta,
)
+import os
+import signal
import threading
import time
from fixtures import FakeLogger
+from storm.locals import Store
from testtools.matchers import (
Equals,
MatchesDict,
@@ -50,6 +53,11 @@ from lp.oci.model.ocirecipebuildjob import (
)
from lp.services.compat import mock
from lp.services.config import config
+from lp.services.database.locking import (
+ AdvisoryLockHeld,
+ LockType,
+ try_advisory_lock,
+ )
from lp.services.features.testing import FeatureFixture
from lp.services.job.interfaces.job import JobStatus
from lp.services.job.runner import JobRunner
@@ -64,7 +72,10 @@ from lp.testing import (
admin_logged_in,
TestCaseWithFactory,
)
-from lp.testing.dbuser import dbuser
+from lp.testing.dbuser import (
+ dbuser,
+ switch_dbuser,
+ )
from lp.testing.fakemethod import FakeMethod
from lp.testing.fixture import ZopeUtilityFixture
from lp.testing.layers import (
@@ -396,114 +407,30 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin,
self.assertEqual(JobStatus.COMPLETED, upload_job.status)
self.assertTrue(upload_job.build_uploaded)
- def test_getUploadedBuilds_lock_between_two_jobs(self):
- """Simple test to ensure that getUploadedBuilds method locks
- rows in the database and make concurrent calls wait for that.
-
- This is not a 100% reliable way to check that concurrent calls to
- getUploadedBuilds will queue up since it relies on the
- execution time, but it's a "good enough" approach: this test might
- pass if the machine running it is *really, really* slow, but a failure
- here will indicate that something is for sure wrong.
- """
-
- class AllBuildsUploadedChecker(threading.Thread):
- """Thread to run upload_job.getUploadedBuilds tracking the time."""
- def __init__(self, build_request):
- super(AllBuildsUploadedChecker, self).__init__()
- self.build_request = build_request
- self.upload_job = None
- # Locks the measurement start until we finished running the
- # bootstrap code. Parent thread should call waitBootstrap
- # after self.start().
- self.bootstrap_lock = threading.Lock()
- self.bootstrap_lock.acquire()
- self.result = None
- self.error = None
- self.start_date = None
- self.end_date = None
-
- @property
- def lock_duration(self):
- return self.end_date - self.start_date
-
- def waitBootstrap(self):
- """Wait until self.bootstrap finishes running."""
- self.bootstrap_lock.acquire()
- # We don't actually need the lock... just wanted to wait
- # for it. let's release it then.
- self.bootstrap_lock.release()
-
- def bootstrap(self):
- try:
- build = self.build_request.builds[1]
- self.upload_job = OCIRegistryUploadJob.create(build)
- finally:
- self.bootstrap_lock.release()
-
- def run(self):
- with admin_logged_in():
- self.bootstrap()
- self.start_date = datetime.now()
- try:
- self.result = self.upload_job.getUploadedBuilds(
- self.build_request)
- except Exception as e:
- self.error = e
- self.end_date = datetime.now()
-
- # Create a build request with 2 builds.
+ def test_getUploadedBuilds(self):
+ # Create a build request with 3 builds.
build_request = self.makeBuildRequest(
include_i386=True, include_amd64=True, include_hppa=True)
builds = build_request.builds
self.assertEqual(3, builds.count())
- # Fail one of the builds, to make sure we are ignoring it.
- removeSecurityProxy(builds[2]).status = BuildStatus.FAILEDTOBUILD
-
# Create the upload job for the first build.
upload_job1 = OCIRegistryUploadJob.create(builds[0])
upload_job1 = removeSecurityProxy(upload_job1)
- # How long the lock will be held by the first job, in seconds.
- # Adjust to minimize false positives: a number too small here might
- # make the test pass even if the lock is not correctly implemented.
- # A number too big will slow down the test execution...
- waiting_time = 2
- # Start a clean transaction and lock the rows at database level.
- transaction.commit()
- self.assertEqual(
- {builds[0]}, upload_job1.getUploadedBuilds(build_request))
-
- # Start, in parallel, another upload job to run `getUploadedBuilds`.
- concurrent_checker = AllBuildsUploadedChecker(build_request)
- concurrent_checker.start()
- # Wait until concurrent_checker is ready to measure the time waiting
- # for the database lock.
- concurrent_checker.waitBootstrap()
-
- # Wait a bit and release the database lock by committing current
- # transaction.
- time.sleep(waiting_time)
- # Let's force the first job to be finished, just to make sure the
- # second job will realise it's the last one running.
- upload_job1.start()
- upload_job1.complete()
- transaction.commit()
-
- # Now, the concurrent checker should have already finished running,
- # without any error and it should have taken at least the
- # waiting_time to finish running (since it was waiting).
- concurrent_checker.join()
- self.assertIsNone(concurrent_checker.error)
- self.assertGreaterEqual(
- concurrent_checker.lock_duration, timedelta(seconds=waiting_time))
- # Should have noticed that both builds are ready to upload.
- self.assertEqual(2, len(concurrent_checker.result))
- thread_build1, thread_build2 = concurrent_checker.result
- self.assertThat(set(builds[:2]), MatchesSetwise(
- MatchesStructure(id=Equals(thread_build1.id)),
- MatchesStructure(id=Equals(thread_build2.id))))
+ upload_job2 = OCIRegistryUploadJob.create(builds[1])
+ upload_job2 = removeSecurityProxy(upload_job2)
+
+ client = FakeRegistryClient()
+ self.useFixture(ZopeUtilityFixture(client, IOCIRegistryClient))
+ with dbuser(config.IOCIRegistryUploadJobSource.dbuser):
+ run_isolated_jobs([upload_job1])
+
+ with dbuser(config.IOCIRegistryUploadJobSource.dbuser):
+ run_isolated_jobs([upload_job2])
+
+ result = upload_job1.getUploadedBuilds(build_request)
+ self.assertEqual({builds[0], builds[1]}, result)
def test_run_failed_registry_error(self):
# A run that fails with a registry error sets the registry upload
@@ -586,6 +513,53 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin,
self.assertEqual(0, len(self.oopses))
+ def test_advisorylock_on_run(self):
+ # The job should take an advisory lock and any attempted
+ # simultaneous jobs should retry
+ logger = self.useFixture(FakeLogger())
+ build_request = self.makeBuildRequest(include_i386=False)
+ recipe = build_request.recipe
+
+ self.assertEqual(1, build_request.builds.count())
+ ocibuild = build_request.builds[0]
+ ocibuild.updateStatus(BuildStatus.FULLYBUILT)
+ self.makeWebhook(recipe)
+
+ self.assertContentEqual([], ocibuild.registry_upload_jobs)
+ job = OCIRegistryUploadJob.create(ocibuild)
+ client = FakeRegistryClient()
+ switch_dbuser(config.IOCIRegistryUploadJobSource.dbuser)
+ # Fork so that we can take an advisory lock from a different
+ # PostgreSQL session.
+ read, write = os.pipe()
+ pid = os.fork()
+ if pid == 0: # child
+ os.close(read)
+ with try_advisory_lock(
+ LockType.REGISTRY_UPLOAD,
+ ocibuild.recipe.id,
+ Store.of(ocibuild.recipe)):
+ os.write(write, b"1")
+ try:
+ signal.pause()
+ except KeyboardInterrupt:
+ pass
+ os._exit(0)
+ else: # parent
+ try:
+ os.close(write)
+ os.read(read, 1)
+ runner = JobRunner([job])
+ runner.runAll()
+ self.assertEqual(JobStatus.WAITING, job.status)
+ self.assertEqual([], runner.oops_ids)
+ self.assertIn(
+ "Scheduling retry due to AdvisoryLockHeld", logger.output)
+ finally:
+ os.kill(pid, signal.SIGINT)
+
+
+
class TestOCIRegistryUploadJobViaCelery(TestCaseWithFactory,
MultiArchRecipeMixin):
diff --git a/lib/lp/services/database/locking.py b/lib/lp/services/database/locking.py
index 266f971..3f1c478 100644
--- a/lib/lp/services/database/locking.py
+++ b/lib/lp/services/database/locking.py
@@ -44,6 +44,12 @@ class LockType(DBEnumeratedType):
Package copy.
""")
+ REGISTRY_UPLOAD = DBItem(3, """OCI Registry upload.
+
+ OCI Registry upload.
+ """
+ )
+
@contextmanager
def try_advisory_lock(lock_type, lock_id, store):