← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:copy-advisory-lock into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:copy-advisory-lock into launchpad:master.

Commit message:
Take an advisory lock when copying packages

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/399646

Take an advisory lock on the hash of the target archive, source package name, and source package version when copying packages; otherwise it's possible for multiple copies of the same package into different series in the same archive to race with the conflict checker and create conflicting builds.

There are still some possible races (since different sources may produce the same binaries), but they're much less common.

This is similar to https://code.launchpad.net/~cjwatson/launchpad/copy-lock-archive/+merge/279275, converted to git and rebased on master; but I made the lock parameters more fine-grained (archive+name+version rather than just archive) to avoid the thundering-herd problem.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:copy-advisory-lock into launchpad:master.
diff --git a/lib/lp/services/database/locking.py b/lib/lp/services/database/locking.py
index e8046a4..266f971 100644
--- a/lib/lp/services/database/locking.py
+++ b/lib/lp/services/database/locking.py
@@ -1,4 +1,4 @@
-# Copyright 2011-2015 Canonical Ltd.  This software is licensed under the
+# Copyright 2011-2021 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 __metaclass__ = type
@@ -39,6 +39,11 @@ class LockType(DBEnumeratedType):
         Git repository reference scan.
         """)
 
+    PACKAGE_COPY = DBItem(2, """Package copy.
+
+        Package copy.
+        """)
+
 
 @contextmanager
 def try_advisory_lock(lock_type, lock_id, store):
diff --git a/lib/lp/soyuz/model/packagecopyjob.py b/lib/lp/soyuz/model/packagecopyjob.py
index b07b88d..83c8b1f 100644
--- a/lib/lp/soyuz/model/packagecopyjob.py
+++ b/lib/lp/soyuz/model/packagecopyjob.py
@@ -1,4 +1,4 @@
-# Copyright 2010-2019 Canonical Ltd.  This software is licensed under the
+# Copyright 2010-2021 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 __metaclass__ = type
@@ -8,7 +8,9 @@ __all__ = [
     "PlainPackageCopyJob",
     ]
 
+from datetime import timedelta
 import logging
+import random
 
 from lazr.delegates import delegate_to
 from lazr.jobrunner.jobrunner import SuspendJobException
@@ -50,6 +52,11 @@ from lp.services.database.interfaces import (
     IMasterStore,
     IStore,
     )
+from lp.services.database.locking import (
+    AdvisoryLockHeld,
+    LockType,
+    try_advisory_lock,
+    )
 from lp.services.database.stormbase import StormBase
 from lp.services.job.interfaces.job import JobStatus
 from lp.services.job.model.job import (
@@ -267,7 +274,7 @@ class PlainPackageCopyJob(PackageCopyJobDerived):
     user_error_types = (CannotCopy,)
     # Raised when closing bugs ends up hitting another process and
     # deadlocking.
-    retry_error_types = (TransactionRollbackError,)
+    retry_error_types = (TransactionRollbackError, AdvisoryLockHeld)
     max_retries = 5
 
     @classmethod
@@ -579,10 +586,41 @@ class PlainPackageCopyJob(PackageCopyJobDerived):
             transaction.commit()
         super(PlainPackageCopyJob, self).notifyOops(oops)
 
+    @property
+    def _advisory_lock_id(self):
+        """An ID for use in advisory locks for this job."""
+        # Mask off the bottom 31 bits so that this fits in PostgreSQL's
+        # integer type, allowing it to be used as the second argument to a
+        # two-argument pg_try_advisory_lock function.
+        return hash((
+            self.target_archive_id,
+            self.package_name,
+            self.package_version)) & 0x7FFFFFFF
+
+    @property
+    def retry_delay(self):
+        """See `BaseRunnableJob`."""
+        # Retry in somewhere between 6 and 8 minutes.  This is longer than
+        # the lease duration and the soft time limit, and the randomness
+        # makes it less likely that N-1 of a set of jobs that are all
+        # competing for the same advisory lock will collide the next time
+        # round.  There's already no particular ordering guarantee among
+        # jobs with the same copy policy (see
+        # `PackageCopyJobDerived.iterReady`).
+        return timedelta(minutes=random.uniform(6, 8))
+
     def run(self):
         """See `IRunnableJob`."""
         try:
-            self.attemptCopy()
+            # Take an advisory lock to fend off by far the most common case
+            # of races between multiple instances of the copier's conflict
+            # checker, namely copies of the same source name and version
+            # from the same archive into multiple destination series.  Other
+            # races are still possible, but much rarer.
+            with try_advisory_lock(
+                    LockType.PACKAGE_COPY, self._advisory_lock_id,
+                    IStore(Archive)):
+                self.attemptCopy()
         except CannotCopy as e:
             # Remember the target archive purpose, as otherwise aborting the
             # transaction will forget it.
@@ -607,7 +645,7 @@ class PlainPackageCopyJob(PackageCopyJobDerived):
                 # the job.  We will normally have a DistroSeriesDifference
                 # in this case.
                 pass
-        except SuspendJobException:
+        except (SuspendJobException, AdvisoryLockHeld):
             raise
         except:
             # Abort work done so far, but make sure that we commit the
diff --git a/lib/lp/soyuz/tests/test_packagecopyjob.py b/lib/lp/soyuz/tests/test_packagecopyjob.py
index 6817c10..7c2ef99 100644
--- a/lib/lp/soyuz/tests/test_packagecopyjob.py
+++ b/lib/lp/soyuz/tests/test_packagecopyjob.py
@@ -1,4 +1,4 @@
-# Copyright 2010-2019 Canonical Ltd.  This software is licensed under the
+# Copyright 2010-2021 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Tests for sync package jobs."""
@@ -6,8 +6,11 @@
 from __future__ import absolute_import, print_function, unicode_literals
 
 import operator
+import os
+import signal
 from textwrap import dedent
 
+from fixtures import FakeLogger
 from storm.store import Store
 from testtools.content import text_content
 from testtools.matchers import (
@@ -28,6 +31,10 @@ from lp.registry.model.distroseriesdifferencecomment import (
     )
 from lp.services.config import config
 from lp.services.database.interfaces import IStore
+from lp.services.database.locking import (
+    LockType,
+    try_advisory_lock,
+    )
 from lp.services.features.testing import FeatureFixture
 from lp.services.job.interfaces.job import JobStatus
 from lp.services.job.runner import JobRunner
@@ -58,6 +65,7 @@ from lp.soyuz.interfaces.section import ISectionSet
 from lp.soyuz.interfaces.sourcepackageformat import (
     ISourcePackageFormatSelectionSet,
     )
+from lp.soyuz.model.archive import Archive
 from lp.soyuz.model.packagecopyjob import PackageCopyJob
 from lp.soyuz.model.queue import PackageUpload
 from lp.soyuz.tests.test_publishing import SoyuzTestPublisher
@@ -329,6 +337,45 @@ class PlainPackageCopyJobTests(TestCaseWithFactory, LocalTestHelper):
 
         self.assertRaises(Boom, job.run)
 
+    def test_run_tries_advisory_lock(self):
+        # A job is retried if an advisory lock for the same archive, package
+        # name, and version is held.
+        logger = self.useFixture(FakeLogger())
+        job = create_proper_job(self.factory)
+        advisory_lock_id = hash((
+            job.target_archive_id,
+            job.package_name,
+            job.package_version)) & 0x7FFFFFFF
+        self.assertEqual(
+            advisory_lock_id, removeSecurityProxy(job)._advisory_lock_id)
+        switch_dbuser(self.dbuser)
+        # Fork so that we can take an advisory lock from a different
+        # PostgreSQL session.
+        read, write = os.pipe()
+        pid = os.fork()
+        if pid == 0:  # child
+            os.close(read)
+            with try_advisory_lock(
+                    LockType.PACKAGE_COPY, advisory_lock_id, IStore(Archive)):
+                os.write(write, b"1")
+                try:
+                    signal.pause()
+                except KeyboardInterrupt:
+                    pass
+            os._exit(0)
+        else:  # parent
+            try:
+                os.close(write)
+                os.read(read, 1)
+                runner = JobRunner([job])
+                runner.runAll()
+                self.assertEqual(JobStatus.WAITING, job.status)
+                self.assertEqual([], runner.oops_ids)
+                self.assertIn(
+                    "Scheduling retry due to AdvisoryLockHeld", logger.output)
+            finally:
+                os.kill(pid, signal.SIGINT)
+
     def test_run_posts_copy_failure_as_comment(self):
         # If the job fails with a CannotCopy exception, it swallows the
         # exception and posts a DistroSeriesDifferenceComment with the