← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~jtv/launchpad/db-bug-793382 into lp:launchpad/db-devel

 

Jeroen T. Vermeulen has proposed merging lp:~jtv/launchpad/db-bug-793382 into lp:launchpad/db-devel.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #793382 in Launchpad itself: "Get requestUpdate back to constant queries"
  https://bugs.launchpad.net/launchpad/+bug/793382

For more details, see:
https://code.launchpad.net/~jtv/launchpad/db-bug-793382/+merge/63545

= Summary =

Now that we no longer bundle multiple packages into one PackageCopyJob, the DistroSeries:+localpackagediffs view method requestUpgrades issued extra queries in proportion to the number of copies requested.  Creating a Job cost one extra query per package; the associated PackageCopyJob cost another.  This request needs to scale, so we'd like the query count to remain constant.


== Proposed fix ==

Provide interfaces for creating multiple Jobs and PlainPackageCopyJobs with single INSERT queries.  That makes it difficult to get the objects in memory, but we don't need them: all we need (and that only for Job, really) is the ids that they have been assigned.

Actually it should be possible for Storm to bundle the INSERTs in this way, but that would take quite some work to get right, and it might become fragile if one INSERT ends up being flushed to the database before the next object can be added to the store.

In the IPlainPackageCopyJobSource interface you provide two different kinds of data when creating multiple jobs at once: some parameters are specified once and repeated for all jobs in the batch, others need to be specified for each job separately.  The latter are represented as a list of tuples.


== Pre-implementation notes ==

Gavin came up with the idea of having a "create multiple jobs" interface in a utility class.  No other interesting ideas came up when discussing the idea on IRC.


== Implementation details ==

The Job multi-create method went into the class.  There is an IJobSource interface, but that was not suitable here for several reasons.  First, it's implemented in many places that may not have any need for a multi-create; requiring this method would cause unneeded hardship.  Second, IJobSource is not a utility for Jobs.  Instead it's more the generic base interface for classes built around Job, which may or may not live in separate tables — so it would be ambiguous what kind of ids the method would return.  Finally, the job-specific creation methods will have their own signatures, which wouldn't work very well with having them all come from the same interface definition.

You'll notice in the performance test that even though the query count no longer increases by 2 per copy, the query count for the base case went up by 2.  I haven't bothered to figure out where those 2 queries came from; I wonder if perhaps it might have been an accounting glitch where the two INSERTs got deferred beyond the point where the query count is collected.  That deferral would not be possible with the new code, and that could explain the difference.


== Tests ==

>From low-level to high-level:
{{{
./bin/test -vvc lp.services.job.tests.test_job -t createMultiple
./bin/test -vvc lp.soyuz.tests.test_packagecopyjob -t createMultiple
./bin/test -vvc lp.registry.browser.tests.test_distroseries -t requestUpgrades_is_efficient
}}}


== Demo and Q/A ==

I'm not sure the "upgrade packages" button is visible on DistroSeries:+localpackagediffs yet.  But if it is, pressing it should make upgradable entries on that page (and any other batches for the same distroseries) show up as "synchronizing..."

Upgradeable entries are ones where the checkbox on the left is active, and the parent version is greater than the version in the distroseries you're looking at.


= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/soyuz/model/packagecopyjob.py
  lib/lp/soyuz/interfaces/packagecopyjob.py
  lib/lp/soyuz/tests/test_packagecopyjob.py
  lib/lp/registry/browser/tests/test_distroseries.py
  lib/lp/services/job/tests/test_job.py
  lib/lp/services/job/model/job.py
  lib/lp/registry/browser/distroseries.py
-- 
https://code.launchpad.net/~jtv/launchpad/db-bug-793382/+merge/63545
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jtv/launchpad/db-bug-793382 into lp:launchpad/db-devel.
=== modified file 'lib/lp/registry/browser/distroseries.py'
--- lib/lp/registry/browser/distroseries.py	2011-06-03 00:13:41 +0000
+++ lib/lp/registry/browser/distroseries.py	2011-06-06 12:04:29 +0000
@@ -1063,16 +1063,18 @@
     def requestUpgrades(self):
         """Request sync of packages that can be easily upgraded."""
         target_distroseries = self.context
-        target_archive = target_distroseries.main_archive
-        job_source = getUtility(IPlainPackageCopyJobSource)
-
-        for dsd in self.getUpgrades():
-            job_source.create(
+        copies = [
+            (
                 dsd.source_package_name.name,
-                dsd.parent_series.main_archive, target_archive,
-                target_distroseries, PackagePublishingPocket.UPDATES,
-                package_version=dsd.parent_source_version,
-                copy_policy=PackageCopyPolicy.MASS_SYNC)
+                dsd.parent_source_version,
+                dsd.parent_series.main_archive,
+                target_distroseries.main_archive,
+                PackagePublishingPocket.UPDATES,
+            )
+            for dsd in self.getUpgrades()]
+        getUtility(IPlainPackageCopyJobSource).createMultiple(
+            target_distroseries, copies,
+            copy_policy=PackageCopyPolicy.MASS_SYNC)
 
         self.request.response.addInfoNotification(
             (u"Upgrades of {context.displayname} packages have been "

=== modified file 'lib/lp/registry/browser/tests/test_distroseries.py'
--- lib/lp/registry/browser/tests/test_distroseries.py	2011-06-02 18:36:43 +0000
+++ lib/lp/registry/browser/tests/test_distroseries.py	2011-06-06 12:04:29 +0000
@@ -1077,7 +1077,7 @@
         observed = map(vars, view.request.response.notifications)
         self.assertEqual([expected], observed)
 
-    def test_requestUpgrade_is_efficient(self):
+    def test_requestUpgrades_is_efficient(self):
         # A single web request may need to schedule large numbers of
         # package upgrades.  It must do so without issuing large numbers
         # of database queries.
@@ -1087,18 +1087,17 @@
         flush_database_caches()
         with StormStatementRecorder() as recorder1:
             self.makeView(derived_series).requestUpgrades()
-        self.assertThat(recorder1, HasQueryCount(LessThan(10)))
-        # Creating Jobs and DistributionJobs takes 2 extra queries per
-        # requested sync.
-        requested_syncs = 3
-        for index in xrange(requested_syncs):
+        self.assertThat(recorder1, HasQueryCount(LessThan(12)))
+
+        # The query count does not increase with the number of upgrades.
+        for index in xrange(3):
             self.makePackageUpgrade(derived_series=derived_series)
         flush_database_caches()
         with StormStatementRecorder() as recorder2:
             self.makeView(derived_series).requestUpgrades()
         self.assertThat(
             recorder2,
-            HasQueryCount(Equals(recorder1.count + 2 * requested_syncs)))
+            HasQueryCount(Equals(recorder1.count)))
 
 
 class TestDistroSeriesLocalDifferencesFunctional(TestCaseWithFactory,

=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py	2011-05-26 14:29:34 +0000
+++ lib/lp/services/job/model/job.py	2011-06-06 12:04:29 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """ORM object representing jobs."""
@@ -26,7 +26,10 @@
 from canonical.database.constants import UTC_NOW
 from canonical.database.datetimecol import UtcDateTimeCol
 from canonical.database.enumcol import EnumCol
-from canonical.database.sqlbase import SQLBase
+from canonical.database.sqlbase import (
+    quote,
+    SQLBase,
+    )
 from lp.services.job.interfaces.job import (
     IJob,
     JobStatus,
@@ -99,6 +102,22 @@
 
     status = property(lambda x: x._status)
 
+    @classmethod
+    def createMultiple(self, store, num_jobs):
+        """Create multiple `Job`s at once.
+
+        :param store: `Store` to ceate the jobs in.
+        :param num_jobs: Number of `Job`s to create.
+        :return: An iterable of `Job.id` values for the new jobs.
+        """
+        job_contents = ["(%s)" % quote(JobStatus.WAITING)] * num_jobs
+        result = store.execute("""
+            INSERT INTO Job (status)
+            VALUES %s
+            RETURNING id
+            """ % ", ".join(job_contents))
+        return [job_id for job_id, in result]
+
     def acquireLease(self, duration=300):
         """See `IJob`."""
         if (self.lease_expires is not None

=== modified file 'lib/lp/services/job/tests/test_job.py'
--- lib/lp/services/job/tests/test_job.py	2011-05-27 08:18:07 +0000
+++ lib/lp/services/job/tests/test_job.py	2011-06-06 12:04:29 +0000
@@ -8,14 +8,9 @@
 
 import pytz
 from storm.locals import Store
-from zope.component import getUtility
 
 from canonical.database.constants import UTC_NOW
-from canonical.launchpad.webapp.interfaces import (
-    DEFAULT_FLAVOR,
-    IStoreSelector,
-    MAIN_STORE,
-    )
+from canonical.launchpad.interfaces.lpstorm import IStore
 from canonical.launchpad.webapp.testing import verifyObject
 from canonical.testing.layers import ZopelessDatabaseLayer
 from lp.services.job.interfaces.job import (
@@ -44,6 +39,22 @@
         job = Job()
         self.assertEqual(job.status, JobStatus.WAITING)
 
+    def test_createMultiple_creates_requested_number_of_jobs(self):
+        job_ids = list(Job.createMultiple(IStore(Job), 3))
+        self.assertEqual(3, len(job_ids))
+        self.assertEqual(3, len(set(job_ids)))
+
+    def test_createMultiple_returns_valid_job_ids(self):
+        job_ids = list(Job.createMultiple(IStore(Job), 3))
+        store = IStore(Job)
+        for job_id in job_ids:
+            self.assertIsNot(None, store.get(Job, job_id))
+
+    def test_createMultiple_sets_status_to_WAITING(self):
+        store = IStore(Job)
+        job = store.get(Job, Job.createMultiple(store, 1)[0])
+        self.assertEqual(JobStatus.WAITING, job.status)
+
     def test_start(self):
         """Job.start should update the object appropriately.
 
@@ -214,8 +225,7 @@
     layer = ZopelessDatabaseLayer
 
     def _sampleData(self):
-        store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
-        return list(store.execute(Job.ready_jobs))
+        return list(IStore(Job).execute(Job.ready_jobs))
 
     def test_ready_jobs(self):
         """Job.ready_jobs should include new jobs."""

=== modified file 'lib/lp/soyuz/interfaces/packagecopyjob.py'
--- lib/lp/soyuz/interfaces/packagecopyjob.py	2011-06-03 08:53:14 +0000
+++ lib/lp/soyuz/interfaces/packagecopyjob.py	2011-06-06 12:04:29 +0000
@@ -90,7 +90,7 @@
                target_archive, target_distroseries, target_pocket,
                include_binaries=False, package_version=None,
                copy_policy=PackageCopyPolicy.INSECURE):
-        """Create a new `IPackageCopyJob`.
+        """Create a new `IPlainPackageCopyJob`.
 
         :param package_name: The name of the source package to copy.
         :param source_archive: The `IArchive` in which `source_packages` are
@@ -106,6 +106,21 @@
         :param copy_policy: Applicable `PackageCopyPolicy`.
         """
 
+    def createMultiple(target_distroseries, copy_tasks,
+                       copy_policy=PackageCopyPolicy.INSECURE,
+                       include_binaries=False):
+        """Create multiple new `IPlainPackageCopyJob`s at once.
+
+        :param target_distroseries: The `IDistroSeries` to which to copy the
+            packages.
+        :param copy_tasks: A list of tuples describing the copies to be
+            performed: (package name, package version, source archive,
+            target archive, target pocket).
+        :param copy_policy: Applicable `PackageCopyPolicy`.
+        :param include_binaries: As in `do_copy`.
+        :return: An iterable of `PackageCopyJob` ids.
+        """
+
     def getActiveJobs(target_archive):
         """Retrieve all active sync jobs for an archive."""
 

=== modified file 'lib/lp/soyuz/model/packagecopyjob.py'
--- lib/lp/soyuz/model/packagecopyjob.py	2011-06-03 09:18:34 +0000
+++ lib/lp/soyuz/model/packagecopyjob.py	2011-06-06 12:04:29 +0000
@@ -24,6 +24,7 @@
     )
 
 from canonical.database.enumcol import EnumCol
+from canonical.database.sqlbase import sqlvalues
 from canonical.launchpad.components.decoratedresultset import (
     DecoratedResultSet,
     )
@@ -184,17 +185,23 @@
     classProvides(IPlainPackageCopyJobSource)
 
     @classmethod
+    def _makeMetadata(cls, target_pocket, package_version, include_binaries):
+        """."""
+        return {
+            'target_pocket': target_pocket.value,
+            'package_version': package_version,
+            'include_binaries': bool(include_binaries),
+        }
+
+    @classmethod
     def create(cls, package_name, source_archive,
                target_archive, target_distroseries, target_pocket,
                include_binaries=False, package_version=None,
                copy_policy=PackageCopyPolicy.INSECURE):
         """See `IPlainPackageCopyJobSource`."""
         assert package_version is not None, "No package version specified."
-        metadata = {
-            'target_pocket': target_pocket.value,
-            'package_version': package_version,
-            'include_binaries': bool(include_binaries),
-            }
+        metadata = cls._makeMetadata(
+            target_pocket, package_version, include_binaries)
         job = PackageCopyJob(
             job_type=cls.class_job_type,
             source_archive=source_archive,
@@ -207,6 +214,58 @@
         return cls(job)
 
     @classmethod
+    def _composeJobInsertionTuple(cls, target_distroseries, copy_policy,
+                                  include_binaries, job_id, copy_task):
+        """Create an SQL fragment for inserting a job into the database.
+
+        :return: A string representing an SQL tuple containing initializers
+            for a `PackageCopyJob` in the database (minus `id`, which is
+            assigned automatically).  Contents are escaped for use in SQL.
+        """
+        (
+            package_name,
+            package_version,
+            source_archive,
+            target_archive,
+            target_pocket,
+        ) = copy_task
+        metadata = cls._makeMetadata(
+            target_pocket, package_version, include_binaries)
+        data = (
+            cls.class_job_type, target_distroseries, copy_policy,
+            source_archive, target_archive, package_name, job_id,
+            PackageCopyJob.serializeMetadata(metadata))
+        format_string = "(%s)" % ", ".join(["%s"] * len(data))
+        return format_string % sqlvalues(*data)
+
+    @classmethod
+    def createMultiple(cls, target_distroseries, copy_tasks,
+                       copy_policy=PackageCopyPolicy.INSECURE,
+                       include_binaries=False):
+        """See `IPlainPackageCopyJobSource`."""
+        store = IMasterStore(Job)
+        job_ids = Job.createMultiple(store, len(copy_tasks))
+        job_contents = [
+            cls._composeJobInsertionTuple(
+                target_distroseries, copy_policy, include_binaries, job_id,
+                task)
+            for job_id, task in zip(job_ids, copy_tasks)]
+        result = store.execute("""
+            INSERT INTO PackageCopyJob (
+                job_type,
+                target_distroseries,
+                copy_policy,
+                source_archive,
+                target_archive,
+                package_name,
+                job,
+                json_data)
+            VALUES %s
+            RETURNING id
+            """ % ", ".join(job_contents))
+        return [job_id for job_id, in result]
+
+    @classmethod
     def getActiveJobs(cls, target_archive):
         """See `IPlainPackageCopyJobSource`."""
         jobs = IStore(PackageCopyJob).find(

=== modified file 'lib/lp/soyuz/tests/test_packagecopyjob.py'
--- lib/lp/soyuz/tests/test_packagecopyjob.py	2011-06-03 08:53:14 +0000
+++ lib/lp/soyuz/tests/test_packagecopyjob.py	2011-06-06 12:04:29 +0000
@@ -118,6 +118,50 @@
         self.assertEquals(False, job.include_binaries)
         self.assertEquals(PackageCopyPolicy.MASS_SYNC, job.copy_policy)
 
+    def test_createMultiple_creates_one_job_per_copy(self):
+        mother = self.factory.makeDistroSeriesParent()
+        derived_series = mother.derived_series
+        father = self.factory.makeDistroSeriesParent(
+            derived_series=derived_series)
+        mother_package = self.factory.makeSourcePackageName()
+        father_package = self.factory.makeSourcePackageName()
+        job_source = getUtility(IPlainPackageCopyJobSource)
+        copy_tasks = [
+            (
+                mother_package.name,
+                "1.5mother1",
+                mother.parent_series.main_archive,
+                derived_series.main_archive,
+                PackagePublishingPocket.RELEASE,
+                ),
+            (
+                father_package.name,
+                "0.9father1",
+                father.parent_series.main_archive,
+                derived_series.main_archive,
+                PackagePublishingPocket.UPDATES,
+                ),
+            ]
+        job_ids = list(
+            job_source.createMultiple(mother.derived_series, copy_tasks))
+        jobs = list(job_source.getActiveJobs(derived_series.main_archive))
+        self.assertContentEqual(job_ids, [job.id for job in jobs])
+        self.assertEqual(len(copy_tasks), len(set([job.job for job in jobs])))
+        # Get jobs into the same order as copy_tasks, for ease of
+        # comparison.
+        if jobs[0].package_name != mother_package.name:
+            jobs = reversed(jobs)
+        requested_copies = [
+            (
+                job.package_name,
+                job.package_version,
+                job.source_archive,
+                job.target_archive,
+                job.target_pocket,
+                )
+            for job in jobs]
+        self.assertEqual(copy_tasks, requested_copies)
+
     def test_getActiveJobs(self):
         # getActiveJobs() can retrieve all active jobs for an archive.
         distroseries = self.factory.makeDistroSeries()
@@ -449,7 +493,7 @@
         # metadata.
         name = self.factory.makeSourcePackageName()
         component = self.factory.makeComponent()
-        section=self.factory.makeSection()
+        section = self.factory.makeSection()
         pcj = self.factory.makePlainPackageCopyJob()
         self.layer.txn.commit()
         self.layer.switchDbUser('sync_packages')
@@ -474,7 +518,7 @@
         # the metadata.
         name = self.factory.makeSourcePackageName()
         component = self.factory.makeComponent()
-        section=self.factory.makeSection()
+        section = self.factory.makeSection()
         pcj = self.factory.makePlainPackageCopyJob(
             package_name=name.name, package_version="1.0")
         self.layer.txn.commit()