← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:stormify-job into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:stormify-job into launchpad:master.

Commit message:
Convert Job to Storm

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/447422

I had to be a bit more careful with flushing row creations to the database in order to get their IDs back, which required updates to a couple of query count tests that previously only happened to pass because they didn't notice some unflushed queries at the end of the test.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:stormify-job into launchpad:master.
diff --git a/lib/lp/charms/model/charmrecipebuildjob.py b/lib/lp/charms/model/charmrecipebuildjob.py
index d923485..3534797 100644
--- a/lib/lp/charms/model/charmrecipebuildjob.py
+++ b/lib/lp/charms/model/charmrecipebuildjob.py
@@ -193,6 +193,7 @@ class CharmhubUploadJob(CharmRecipeBuildJobDerived):
             )
             job = cls(charm_recipe_build_job)
             job.celeryRunOnCommit()
+            IStore(CharmRecipeBuildJob).flush()
             del get_property_cache(build).last_store_upload_job
             upload_status = build.store_upload_status
             if upload_status != before_modification.store_upload_status:
diff --git a/lib/lp/charms/model/charmrecipejob.py b/lib/lp/charms/model/charmrecipejob.py
index 6971b4d..56efa07 100644
--- a/lib/lp/charms/model/charmrecipejob.py
+++ b/lib/lp/charms/model/charmrecipejob.py
@@ -185,6 +185,7 @@ class CharmRecipeRequestBuildsJob(CharmRecipeJobDerived):
         recipe_job = CharmRecipeJob(recipe, cls.class_job_type, metadata)
         job = cls(recipe_job)
         job.celeryRunOnCommit()
+        IStore(CharmRecipeJob).flush()
         return job
 
     @classmethod
diff --git a/lib/lp/code/model/branchjob.py b/lib/lp/code/model/branchjob.py
index e593b2b..c08a8c1 100644
--- a/lib/lp/code/model/branchjob.py
+++ b/lib/lp/code/model/branchjob.py
@@ -867,6 +867,7 @@ class RosettaUploadJob(BranchJobDerived):
             )
             job = cls(branch_job)
             job.celeryRunOnCommit()
+            IStore(BranchJob).flush()
             return job
         else:
             return None
@@ -1136,6 +1137,7 @@ class BranchModifiedMailJob(BranchJobDerived):
         branch_job = BranchJob(branch, cls.class_job_type, metadata)
         job = cls(branch_job)
         job.celeryRunOnCommit()
+        IStore(BranchJob).flush()
         return job
 
     @property
diff --git a/lib/lp/code/model/branchmergeproposal.py b/lib/lp/code/model/branchmergeproposal.py
index 12df826..04b28b3 100644
--- a/lib/lp/code/model/branchmergeproposal.py
+++ b/lib/lp/code/model/branchmergeproposal.py
@@ -1105,10 +1105,9 @@ class BranchMergeProposal(StormBase, BugLinkTargetMixin):
         # or not they have completed.
         from lp.code.model.branchmergeproposaljob import BranchMergeProposalJob
 
-        for job in BranchMergeProposalJob.selectBy(
-            branch_merge_proposal=self.id
-        ):
-            job.destroySelf()
+        IStore(BranchMergeProposalJob).find(
+            BranchMergeProposalJob, branch_merge_proposal=self
+        ).remove()
         self._preview_diffs.remove()
         Store.of(self).remove(self)
 
diff --git a/lib/lp/code/model/branchmergeproposaljob.py b/lib/lp/code/model/branchmergeproposaljob.py
index e173f29..c4fb588 100644
--- a/lib/lp/code/model/branchmergeproposaljob.py
+++ b/lib/lp/code/model/branchmergeproposaljob.py
@@ -32,6 +32,7 @@ from storm.store import Store
 from zope.component import getUtility
 from zope.interface import implementer, provider
 
+from lp.app.errors import NotFoundError
 from lp.code.adapters.branch import BranchMergeProposalDelta
 from lp.code.enums import BranchType
 from lp.code.errors import BranchHasPendingWrites, UpdatePreviewDiffNotReady
@@ -63,7 +64,6 @@ from lp.registry.interfaces.person import IPersonSet
 from lp.services.config import config
 from lp.services.database.enumcol import DBEnum
 from lp.services.database.interfaces import IPrimaryStore, IStore
-from lp.services.database.sqlobject import SQLObjectNotFound
 from lp.services.database.stormbase import StormBase
 from lp.services.job.interfaces.job import JobStatus
 from lp.services.job.model.job import EnumeratedSubclass, Job
@@ -182,25 +182,14 @@ class BranchMergeProposalJob(StormBase):
         Store.of(self).remove(self)
 
     @classmethod
-    def selectBy(klass, **kwargs):
-        """Return selected instances of this class.
-
-        At least one pair of keyword arguments must be supplied.
-        foo=bar is interpreted as 'select all instances of
-        BranchMergeProposalJob whose property "foo" is equal to "bar"'.
-        """
-        assert len(kwargs) > 0
-        return IStore(klass).find(klass, **kwargs)
-
-    @classmethod
     def get(klass, key):
         """Return the instance of this class whose key is supplied.
 
-        :raises: SQLObjectNotFound
+        :raises: NotFoundError
         """
         instance = IStore(klass).get(klass, key)
         if instance is None:
-            raise SQLObjectNotFound(
+            raise NotFoundError(
                 "No occurrence of %s has key %s" % (klass.__name__, key)
             )
         return instance
@@ -236,6 +225,7 @@ class BranchMergeProposalJobDerived(
         base_job = BranchMergeProposalJob(bmp, cls.class_job_type, metadata)
         job = cls(base_job)
         job.celeryRunOnCommit()
+        IStore(BranchMergeProposalJob).flush()
         return job
 
     @classmethod
@@ -244,12 +234,12 @@ class BranchMergeProposalJobDerived(
 
         :return: the BranchMergeProposalJob with the specified id, as the
             current BranchMergeProposalJobDereived subclass.
-        :raises: SQLObjectNotFound if there is no job with the specified id,
+        :raises: NotFoundError if there is no job with the specified id,
             or its job_type does not match the desired subclass.
         """
         job = BranchMergeProposalJob.get(job_id)
         if job.job_type != cls.class_job_type:
-            raise SQLObjectNotFound(
+            raise NotFoundError(
                 "No object found with id %d and type %s"
                 % (job_id, cls.class_job_type.title)
             )
@@ -659,7 +649,7 @@ class BranchMergeProposalJobSource(BaseRunnableJobSource):
 
         :return: the BranchMergeProposalJob with the specified id, as the
             current BranchMergeProposalJobDereived subclass.
-        :raises: SQLObjectNotFound if there is no job with the specified id,
+        :raises: NotFoundError if there is no job with the specified id,
             or its job_type does not match the desired subclass.
         """
         job = BranchMergeProposalJob.get(job_id)
diff --git a/lib/lp/code/model/gitjob.py b/lib/lp/code/model/gitjob.py
index 1c51b94..f1e9fce 100644
--- a/lib/lp/code/model/gitjob.py
+++ b/lib/lp/code/model/gitjob.py
@@ -207,6 +207,7 @@ class GitRefScanJob(GitJobDerived):
         git_job = GitJob(repository, cls.class_job_type, {})
         job = cls(git_job)
         job.celeryRunOnCommit()
+        IStore(GitJob).flush()
         return job
 
     @staticmethod
@@ -292,6 +293,7 @@ class ReclaimGitRepositorySpaceJob(GitJobDerived):
         )
         job = cls(git_job)
         job.celeryRunOnCommit()
+        IStore(GitJob).flush()
         return job
 
     @property
@@ -393,6 +395,7 @@ class GitRepositoryModifiedMailJob(GitJobDerived):
         git_job = GitJob(repository, cls.class_job_type, metadata)
         job = cls(git_job)
         job.celeryRunOnCommit()
+        IStore(GitJob).flush()
         return job
 
     @property
diff --git a/lib/lp/code/model/tests/test_branchjob.py b/lib/lp/code/model/tests/test_branchjob.py
index da70a91..a9610b3 100644
--- a/lib/lp/code/model/tests/test_branchjob.py
+++ b/lib/lp/code/model/tests/test_branchjob.py
@@ -1022,7 +1022,7 @@ class TestRosettaUploadJob(TestCaseWithFactory):
         # the two out of any accidental sync by advancing the Job.id
         # sequence.
         dummy = Job()
-        dummy.sync()
+        IStore(Job).flush()
         dummy.destroySelf()
 
         # Now create the RosettaUploadJob.
diff --git a/lib/lp/code/model/tests/test_branchmergeproposal.py b/lib/lp/code/model/tests/test_branchmergeproposal.py
index 4181b6e..26780a7 100644
--- a/lib/lp/code/model/tests/test_branchmergeproposal.py
+++ b/lib/lp/code/model/tests/test_branchmergeproposal.py
@@ -26,6 +26,7 @@ from zope.security.interfaces import Unauthorized
 from zope.security.proxy import removeSecurityProxy
 
 from lp.app.enums import InformationType
+from lp.app.errors import NotFoundError
 from lp.app.interfaces.launchpad import IPrivacy
 from lp.code.adapters.branch import BranchMergeProposalNoPreviewDiffDelta
 from lp.code.enums import (
@@ -77,7 +78,6 @@ from lp.registry.interfaces.person import IPersonSet
 from lp.registry.interfaces.product import IProductSet
 from lp.services.config import config
 from lp.services.database.constants import UTC_NOW
-from lp.services.database.sqlobject import SQLObjectNotFound
 from lp.services.job.interfaces.job import JobStatus
 from lp.services.webapp import canonical_url
 from lp.services.webhooks.testing import LogsScheduledWebhooks
@@ -1862,14 +1862,14 @@ class TestBranchMergeProposalDeletion(TestCaseWithFactory):
     def test_deleteProposal_deletes_job(self):
         """Deleting a branch merge proposal deletes all related jobs."""
         proposal = self.factory.makeBranchMergeProposal()
+        store = Store.of(proposal)
         job = MergeProposalNeedsReviewEmailJob.create(proposal)
-        job.context.sync()
         job_id = job.context.id
         login_person(proposal.registrant)
         proposal.deleteProposal()
-        self.assertRaises(
-            SQLObjectNotFound, BranchMergeProposalJob.get, job_id
-        )
+        store.flush()
+        store.invalidate()
+        self.assertRaises(NotFoundError, BranchMergeProposalJob.get, job_id)
 
 
 class TestBranchMergeProposalBugs(WithVCSScenarios, TestCaseWithFactory):
diff --git a/lib/lp/code/model/tests/test_branchmergeproposaljobs.py b/lib/lp/code/model/tests/test_branchmergeproposaljobs.py
index f8f676a..f1d7889 100644
--- a/lib/lp/code/model/tests/test_branchmergeproposaljobs.py
+++ b/lib/lp/code/model/tests/test_branchmergeproposaljobs.py
@@ -22,6 +22,7 @@ from testtools.matchers import (
 from zope.component import getUtility
 from zope.security.proxy import removeSecurityProxy
 
+from lp.app.errors import NotFoundError
 from lp.code.adapters.branch import BranchMergeProposalNoPreviewDiffDelta
 from lp.code.enums import BranchMergeProposalStatus
 from lp.code.interfaces.branchmergeproposal import (
@@ -53,7 +54,6 @@ from lp.code.model.branchmergeproposaljob import (
 from lp.code.model.tests.test_diff import DiffTestCase
 from lp.code.subscribers.branchmergeproposal import merge_proposal_modified
 from lp.services.config import config
-from lp.services.database.sqlobject import SQLObjectNotFound
 from lp.services.features.testing import FeatureFixture
 from lp.services.job.interfaces.job import JobStatus
 from lp.services.job.model.job import Job
@@ -118,7 +118,7 @@ class TestBranchMergeProposalJobDerived(TestCaseWithFactory):
         It's an error to call get on BranchMergeProposalJobDerived-- it must
         be called on a subclass.  An object is returned only if the job id
         and job type match the request.  If no suitable object can be found,
-        SQLObjectNotFound is raised.
+        NotFoundError is raised.
         """
         bmp = self.factory.makeBranchMergeProposal()
         job = MergeProposalNeedsReviewEmailJob.create(bmp)
@@ -126,9 +126,9 @@ class TestBranchMergeProposalJobDerived(TestCaseWithFactory):
         self.assertRaises(
             AttributeError, BranchMergeProposalJobDerived.get, job.id
         )
-        self.assertRaises(SQLObjectNotFound, UpdatePreviewDiffJob.get, job.id)
+        self.assertRaises(NotFoundError, UpdatePreviewDiffJob.get, job.id)
         self.assertRaises(
-            SQLObjectNotFound, MergeProposalNeedsReviewEmailJob.get, job.id + 1
+            NotFoundError, MergeProposalNeedsReviewEmailJob.get, job.id + 1
         )
         self.assertEqual(job, MergeProposalNeedsReviewEmailJob.get(job.id))
 
diff --git a/lib/lp/oci/model/ocirecipebuildjob.py b/lib/lp/oci/model/ocirecipebuildjob.py
index f8d4f3c..2d0ccac 100644
--- a/lib/lp/oci/model/ocirecipebuildjob.py
+++ b/lib/lp/oci/model/ocirecipebuildjob.py
@@ -213,6 +213,7 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
             )
             job = cls(oci_build_job)
             job.celeryRunOnCommit()
+            IStore(OCIRecipeBuildJob).flush()
             del get_property_cache(build).last_registry_upload_job
             upload_status = build.registry_upload_status
             if upload_status != before_modification.registry_upload_status:
diff --git a/lib/lp/oci/model/ocirecipejob.py b/lib/lp/oci/model/ocirecipejob.py
index 735e516..30e9d28 100644
--- a/lib/lp/oci/model/ocirecipejob.py
+++ b/lib/lp/oci/model/ocirecipejob.py
@@ -174,6 +174,7 @@ class OCIRecipeRequestBuildsJob(OCIRecipeJobDerived):
         recipe_job = OCIRecipeJob(recipe, cls.class_job_type, metadata)
         job = cls(recipe_job)
         job.celeryRunOnCommit()
+        IStore(OCIRecipeJob).flush()
         return job
 
     @classmethod
diff --git a/lib/lp/registry/browser/tests/test_teammembership.py b/lib/lp/registry/browser/tests/test_teammembership.py
index 9856bdc..f76f72c 100644
--- a/lib/lp/registry/browser/tests/test_teammembership.py
+++ b/lib/lp/registry/browser/tests/test_teammembership.py
@@ -32,22 +32,21 @@ class TestTeamMenu(TestCaseWithFactory):
         # Only these queries should be run, no matter what the
         # membership tree looks like, although the number of queries
         # could change slightly if a different user is logged in.
-        #   1. Check whether the user is the team owner.
-        #   2. Deactivate the membership in the TeamMembership table.
-        #   3. Delete from TeamParticipation table.
-        #   (Queries #4, #5, #6, #7, and #10 are run because the storm
-        #    objects have been invalidated.)
-        #   4. Get the TeamMembership entry.
-        #   5. Verify that the member exists in the db, but don't load
-        #   the refresh the rest of its data, since we just need the id.
-        #   6. Verify that the user exists in the db.
-        #   7. Verify that the team exists in the db.
-        #   8. Insert into Job table.
-        #   9. Insert into PersonTransferJob table to schedule sending
-        #      email. (This requires the data from queries #5, #6, and
-        #      #7.)
-        #   10.Query the rest of the team data for the invalidated
-        #      object in order to generate the canonical url.
+        #   1.  Check whether the user is the team owner.
+        #   2.  Deactivate the membership in the TeamMembership table.
+        #   3.  Delete from TeamParticipation table.
+        #       (Queries #4, #5, #8, and #9 are run because the storm
+        #       objects have been invalidated.)
+        #   4.  Get the TeamMembership entry.
+        #   5.  Verify that the member exists in the db.
+        #   6.  Insert into Job table.
+        #   7.  Insert into SharingJob table to schedule removal of
+        #       subscriptions to artifacts shared with the team.
+        #   8.  Verify that the user exists in the db.
+        #   9.  Verify that the team exists in the db.
+        #   10. Insert into Job table.
+        #   11. Insert into PersonTransferJob table to schedule sending
+        #       email. (This requires the data from queries #5, #8, and #9.)
         self.team.addMember(
             self.member, self.team.teamowner, force_team_add=True
         )
@@ -64,4 +63,4 @@ class TestTeamMenu(TestCaseWithFactory):
             view.processForm()
         self.assertEqual("", view.errormessage)
         self.assertEqual(TeamMembershipStatus.DEACTIVATED, membership.status)
-        self.assertThat(recorder, HasQueryCount(LessThan(11)))
+        self.assertThat(recorder, HasQueryCount(LessThan(12)))
diff --git a/lib/lp/registry/model/persontransferjob.py b/lib/lp/registry/model/persontransferjob.py
index 292f256..7c842e7 100644
--- a/lib/lp/registry/model/persontransferjob.py
+++ b/lib/lp/registry/model/persontransferjob.py
@@ -148,6 +148,7 @@ class PersonTransferJobDerived(BaseRunnableJob, metaclass=EnumeratedSubclass):
         )
         derived = cls(job)
         derived.celeryRunOnCommit()
+        IStore(PersonTransferJob).flush()
         return derived
 
     @classmethod
diff --git a/lib/lp/registry/model/sharingjob.py b/lib/lp/registry/model/sharingjob.py
index 3dc412d..9f61dd1 100644
--- a/lib/lp/registry/model/sharingjob.py
+++ b/lib/lp/registry/model/sharingjob.py
@@ -184,6 +184,7 @@ class SharingJobDerived(BaseRunnableJob, metaclass=EnumeratedSubclass):
         base_job = SharingJob(cls.class_job_type, pillar, grantee, metadata)
         job = cls(base_job)
         job.celeryRunOnCommit()
+        IStore(SharingJob).flush()
         return job
 
     @classmethod
diff --git a/lib/lp/registry/tests/test_teammembership.py b/lib/lp/registry/tests/test_teammembership.py
index a76ae2f..ff2792a 100644
--- a/lib/lp/registry/tests/test_teammembership.py
+++ b/lib/lp/registry/tests/test_teammembership.py
@@ -620,7 +620,7 @@ class TestParticipationCleanup(TeamParticipationTestCase):
         The number of db queries should be constant not O(depth).
         """
         self.assertStatementCount(
-            9,
+            11,
             self.team5.setMembershipData,
             self.no_priv,
             TeamMembershipStatus.DEACTIVATED,
diff --git a/lib/lp/services/job/model/job.py b/lib/lp/services/job/model/job.py
index b5ef078..896defc 100644
--- a/lib/lp/services/job/model/job.py
+++ b/lib/lp/services/job/model/job.py
@@ -18,16 +18,14 @@ from datetime import datetime, timezone
 import transaction
 from lazr.jobrunner.jobrunner import LeaseHeld
 from storm.expr import And, Or, Select
-from storm.locals import JSON, Int, Reference
+from storm.locals import JSON, DateTime, Int, Reference, Unicode
 from zope.interface import implementer
 
 from lp.services.database import bulk
-from lp.services.database.constants import UTC_NOW
-from lp.services.database.datetimecol import UtcDateTimeCol
+from lp.services.database.constants import DEFAULT, UTC_NOW
 from lp.services.database.enumcol import DBEnum
 from lp.services.database.interfaces import IStore
-from lp.services.database.sqlbase import SQLBase
-from lp.services.database.sqlobject import StringCol
+from lp.services.database.stormbase import StormBase
 from lp.services.job.interfaces.job import IJob, JobStatus, JobType
 
 
@@ -43,24 +41,28 @@ class InvalidTransition(Exception):
 
 
 @implementer(IJob)
-class Job(SQLBase):
+class Job(StormBase):
     """See `IJob`."""
 
+    __storm_table__ = "Job"
+
     @property
     def job_id(self):
         return self.id
 
-    scheduled_start = UtcDateTimeCol()
+    id = Int(primary=True)
+
+    scheduled_start = DateTime(tzinfo=timezone.utc)
 
-    date_created = UtcDateTimeCol()
+    date_created = DateTime(tzinfo=timezone.utc)
 
-    date_started = UtcDateTimeCol()
+    date_started = DateTime(tzinfo=timezone.utc)
 
-    date_finished = UtcDateTimeCol()
+    date_finished = DateTime(tzinfo=timezone.utc)
 
-    lease_expires = UtcDateTimeCol()
+    lease_expires = DateTime(tzinfo=timezone.utc)
 
-    log = StringCol()
+    log = Unicode()
 
     _status = DBEnum(
         enum=JobStatus,
@@ -106,6 +108,28 @@ class Job(SQLBase):
 
     status = property(lambda x: x._status)
 
+    def __init__(
+        self,
+        scheduled_start=None,
+        date_finished=None,
+        lease_expires=None,
+        max_retries=DEFAULT,
+        requester=None,
+        base_json_data=None,
+        base_job_type=None,
+        status=JobStatus.WAITING,
+    ):
+        super().__init__()
+        self.scheduled_start = scheduled_start
+        self.date_finished = date_finished
+        self.lease_expires = lease_expires
+        self.max_retries = max_retries
+        self.requester = requester
+        self.base_json_data = base_json_data
+        self.base_job_type = base_job_type
+        self._status = status
+        IStore(Job).add(self)
+
     @property
     def is_pending(self):
         """See `IJob`."""
@@ -217,6 +241,9 @@ class Job(SQLBase):
         self._set_status(JobStatus.WAITING)
         self.lease_expires = None
 
+    def destroySelf(self):
+        IStore(Job).remove(self)
+
 
 class EnumeratedSubclass(type):
     """Metaclass for when subclasses are assigned enums."""
diff --git a/lib/lp/services/job/tests/test_celery.py b/lib/lp/services/job/tests/test_celery.py
index 2490962..70bf2b6 100644
--- a/lib/lp/services/job/tests/test_celery.py
+++ b/lib/lp/services/job/tests/test_celery.py
@@ -45,6 +45,7 @@ class TestJob(BaseRunnableJob):
             self.job = store.find(Job, id=job_id)[0]
         else:
             self.job = Job(max_retries=2, scheduled_start=scheduled_start)
+            IStore(Job).flush()
 
     def run(self):
         pass
diff --git a/lib/lp/services/job/tests/test_job.py b/lib/lp/services/job/tests/test_job.py
index 799ad98..886e5ad 100644
--- a/lib/lp/services/job/tests/test_job.py
+++ b/lib/lp/services/job/tests/test_job.py
@@ -100,17 +100,17 @@ class TestJob(TestCaseWithFactory):
 
     def test_start_when_completed_is_invalid(self):
         """When a job is completed, attempting to start is invalid."""
-        job = Job(_status=JobStatus.COMPLETED)
+        job = Job(status=JobStatus.COMPLETED)
         self.assertRaises(InvalidTransition, job.start)
 
     def test_start_when_failed_is_invalid(self):
         """When a job is failed, attempting to start is invalid."""
-        job = Job(_status=JobStatus.FAILED)
+        job = Job(status=JobStatus.FAILED)
         self.assertRaises(InvalidTransition, job.start)
 
     def test_start_when_running_is_invalid(self):
         """When a job is running, attempting to start is invalid."""
-        job = Job(_status=JobStatus.FAILED)
+        job = Job(status=JobStatus.FAILED)
         self.assertRaises(InvalidTransition, job.start)
 
     def test_complete(self):
@@ -118,7 +118,7 @@ class TestJob(TestCaseWithFactory):
 
         It should set date_finished and set the job status to COMPLETED.
         """
-        job = Job(_status=JobStatus.RUNNING)
+        job = Job(status=JobStatus.RUNNING)
         self.assertEqual(None, job.date_finished)
         job.complete()
         self.assertNotEqual(None, job.date_finished)
@@ -126,17 +126,17 @@ class TestJob(TestCaseWithFactory):
 
     def test_complete_when_waiting_is_invalid(self):
         """When a job is waiting, attempting to complete is invalid."""
-        job = Job(_status=JobStatus.WAITING)
+        job = Job(status=JobStatus.WAITING)
         self.assertRaises(InvalidTransition, job.complete)
 
     def test_complete_when_completed_is_invalid(self):
         """When a job is completed, attempting to complete is invalid."""
-        job = Job(_status=JobStatus.COMPLETED)
+        job = Job(status=JobStatus.COMPLETED)
         self.assertRaises(InvalidTransition, job.complete)
 
     def test_complete_when_failed_is_invalid(self):
         """When a job is failed, attempting to complete is invalid."""
-        job = Job(_status=JobStatus.FAILED)
+        job = Job(status=JobStatus.FAILED)
         self.assertRaises(InvalidTransition, job.complete)
 
     def test_fail(self):
@@ -144,7 +144,7 @@ class TestJob(TestCaseWithFactory):
 
         It should set date_finished and set the job status to FAILED.
         """
-        job = Job(_status=JobStatus.RUNNING)
+        job = Job(status=JobStatus.RUNNING)
         self.assertEqual(None, job.date_finished)
         job.fail()
         self.assertNotEqual(None, job.date_finished)
@@ -152,17 +152,17 @@ class TestJob(TestCaseWithFactory):
 
     def test_fail_when_waiting_is_invalid(self):
         """When a job is waiting, attempting to fail is invalid."""
-        job = Job(_status=JobStatus.WAITING)
+        job = Job(status=JobStatus.WAITING)
         self.assertRaises(InvalidTransition, job.fail)
 
     def test_fail_when_completed_is_invalid(self):
         """When a job is completed, attempting to fail is invalid."""
-        job = Job(_status=JobStatus.COMPLETED)
+        job = Job(status=JobStatus.COMPLETED)
         self.assertRaises(InvalidTransition, job.fail)
 
     def test_fail_when_failed_is_invalid(self):
         """When a job is failed, attempting to fail is invalid."""
-        job = Job(_status=JobStatus.FAILED)
+        job = Job(status=JobStatus.FAILED)
         self.assertRaises(InvalidTransition, job.fail)
 
     def test_queue(self):
@@ -170,7 +170,7 @@ class TestJob(TestCaseWithFactory):
 
         It should set date_finished, and set status to WAITING.
         """
-        job = Job(_status=JobStatus.RUNNING)
+        job = Job(status=JobStatus.RUNNING)
         self.assertEqual(None, job.date_finished)
         job.queue()
         self.assertNotEqual(None, job.date_finished)
@@ -178,76 +178,76 @@ class TestJob(TestCaseWithFactory):
 
     def test_queue_clears_lease_expires(self):
         """Queueing a job releases its lease."""
-        job = Job(_status=JobStatus.RUNNING)
+        job = Job(status=JobStatus.RUNNING)
         job.lease_expires = UTC_NOW
         job.queue()
         self.assertIsNone(job.lease_expires)
 
     def test_suspend(self):
         """A job that is in the WAITING state can be suspended."""
-        job = Job(_status=JobStatus.WAITING)
+        job = Job(status=JobStatus.WAITING)
         job.suspend()
         self.assertEqual(job.status, JobStatus.SUSPENDED)
 
     def test_suspend_when_running(self):
         """When a job is running, attempting to suspend is valid."""
-        job = Job(_status=JobStatus.RUNNING)
+        job = Job(status=JobStatus.RUNNING)
         job.suspend()
         self.assertEqual(JobStatus.SUSPENDED, job.status)
 
     def test_suspend_when_completed(self):
         """When a job is completed, attempting to suspend is invalid."""
-        job = Job(_status=JobStatus.COMPLETED)
+        job = Job(status=JobStatus.COMPLETED)
         self.assertRaises(InvalidTransition, job.suspend)
 
     def test_suspend_when_failed(self):
         """When a job is failed, attempting to suspend is invalid."""
-        job = Job(_status=JobStatus.FAILED)
+        job = Job(status=JobStatus.FAILED)
         self.assertRaises(InvalidTransition, job.suspend)
 
     def test_resume(self):
         """A job that is suspended can be resumed."""
-        job = Job(_status=JobStatus.SUSPENDED)
+        job = Job(status=JobStatus.SUSPENDED)
         job.resume()
         self.assertEqual(job.status, JobStatus.WAITING)
 
     def test_resume_clears_lease_expires(self):
         """A job that resumes should null out the lease_expires."""
-        job = Job(_status=JobStatus.SUSPENDED)
+        job = Job(status=JobStatus.SUSPENDED)
         job.lease_expires = UTC_NOW
         job.resume()
         self.assertIsNone(job.lease_expires)
 
     def test_resume_when_running(self):
         """When a job is running, attempting to resume is invalid."""
-        job = Job(_status=JobStatus.RUNNING)
+        job = Job(status=JobStatus.RUNNING)
         self.assertRaises(InvalidTransition, job.resume)
 
     def test_resume_when_completed(self):
         """When a job is completed, attempting to resume is invalid."""
-        job = Job(_status=JobStatus.COMPLETED)
+        job = Job(status=JobStatus.COMPLETED)
         self.assertRaises(InvalidTransition, job.resume)
 
     def test_resume_when_failed(self):
         """When a job is failed, attempting to resume is invalid."""
-        job = Job(_status=JobStatus.FAILED)
+        job = Job(status=JobStatus.FAILED)
         self.assertRaises(InvalidTransition, job.resume)
 
     def test_is_pending(self):
         """is_pending is True when the job can possibly complete."""
         for status in JobStatus.items:
-            job = Job(_status=status)
+            job = Job(status=status)
             self.assertEqual(status in Job.PENDING_STATUSES, job.is_pending)
 
     def test_is_runnable_when_failed(self):
         """is_runnable is false when the job is not WAITING."""
-        job = Job(_status=JobStatus.FAILED)
+        job = Job(status=JobStatus.FAILED)
         self.assertFalse(job.is_runnable)
 
     def test_is_runnable_when_scheduled_in_future(self):
         """is_runnable is false when the job is scheduled in the future."""
         job = Job(
-            _status=JobStatus.WAITING,
+            status=JobStatus.WAITING,
             scheduled_start=datetime.now(timezone.utc) + timedelta(seconds=60),
         )
         self.assertFalse(job.is_runnable)
@@ -255,14 +255,14 @@ class TestJob(TestCaseWithFactory):
     def test_is_runnable_when_scheduled_in_past(self):
         """is_runnable is true when the job is scheduled in the past."""
         job = Job(
-            _status=JobStatus.WAITING,
+            status=JobStatus.WAITING,
             scheduled_start=datetime.now(timezone.utc) - timedelta(seconds=60),
         )
         self.assertTrue(job.is_runnable)
 
     def test_is_runnable_when_not_scheduled(self):
         """is_runnable is true when no explicit schedule has been requested."""
-        job = Job(_status=JobStatus.WAITING)
+        job = Job(status=JobStatus.WAITING)
         self.assertTrue(job.is_runnable)
 
     def test_start_manages_transactions(self):
@@ -387,6 +387,7 @@ class TestReadiness(TestCase):
         """Job.ready_jobs should include new jobs."""
         preexisting = self._sampleData()
         job = Job()
+        Store.of(job).flush()
         self.assertEqual(
             preexisting + [(job.id,)],
             list(Store.of(job).execute(Job.ready_jobs)),
@@ -395,7 +396,7 @@ class TestReadiness(TestCase):
     def test_ready_jobs_started(self):
         """Job.ready_jobs should not jobs that have been started."""
         preexisting = self._sampleData()
-        job = Job(_status=JobStatus.RUNNING)
+        job = Job(status=JobStatus.RUNNING)
         self.assertEqual(
             preexisting, list(Store.of(job).execute(Job.ready_jobs))
         )
@@ -405,6 +406,7 @@ class TestReadiness(TestCase):
         preexisting = self._sampleData()
         UNIX_EPOCH = datetime.fromtimestamp(0, timezone.utc)
         job = Job(lease_expires=UNIX_EPOCH)
+        Store.of(job).flush()
         self.assertEqual(
             preexisting + [(job.id,)],
             list(Store.of(job).execute(Job.ready_jobs)),
diff --git a/lib/lp/services/job/tests/test_runner.py b/lib/lp/services/job/tests/test_runner.py
index f443435..889f941 100644
--- a/lib/lp/services/job/tests/test_runner.py
+++ b/lib/lp/services/job/tests/test_runner.py
@@ -57,6 +57,7 @@ class NullJob(BaseRunnableJob):
     ):
         self.message = completion_message
         self.job = Job()
+        IStore(Job).flush()
         self.oops_recipients = oops_recipients
         if self.oops_recipients is None:
             self.oops_recipients = []
@@ -512,6 +513,7 @@ class DerivedJob(BaseRunnableJob, StormBase):
         super().__init__()
         self.job = Job()
         self.should_succeed = should_succeed
+        IStore(Job).flush()
 
     def run(self):
         if not self.should_succeed:
@@ -619,6 +621,7 @@ class StuckJob(StaticJobSource):
         self.lease_length = lease_length
         self.delay = delay
         self.job = Job()
+        IStore(Job).flush()
 
     def __repr__(self):
         return "<%s(%r, lease_length=%s, delay=%s)>" % (
@@ -655,6 +658,7 @@ class InitialFailureJob(StaticJobSource):
     def __init__(self, id, fail):
         self.id = id
         self.job = Job()
+        IStore(Job).flush()
         self.fail = fail
 
     def run(self):
@@ -677,6 +681,7 @@ class ProcessSharingJob(StaticJobSource):
     def __init__(self, id, first):
         self.id = id
         self.job = Job()
+        IStore(Job).flush()
         self.first = first
 
     def run(self):
@@ -697,6 +702,7 @@ class MemoryHogJob(StaticJobSource):
 
     def __init__(self, id):
         self.job = Job()
+        IStore(Job).flush()
         self.id = id
 
     def run(self):
@@ -717,6 +723,7 @@ class LeaseHeldJob(StaticJobSource):
 
     def __init__(self, id):
         self.job = Job()
+        IStore(Job).flush()
         self.id = id
 
     def acquireLease(self):
diff --git a/lib/lp/services/webhooks/model.py b/lib/lp/services/webhooks/model.py
index 902b46e..5db81e2 100644
--- a/lib/lp/services/webhooks/model.py
+++ b/lib/lp/services/webhooks/model.py
@@ -573,6 +573,7 @@ class WebhookDeliveryJob(WebhookJobDerived):
         )
         job = cls(webhook_job)
         job.celeryRunOnCommit()
+        IStore(WebhookJob).flush()
         log.info(
             "Scheduled %r (%s): %s"
             % (job, event_type, _redact_payload(event_type, payload))
diff --git a/lib/lp/snappy/model/snapbuildjob.py b/lib/lp/snappy/model/snapbuildjob.py
index b16d2c8..539fb1c 100644
--- a/lib/lp/snappy/model/snapbuildjob.py
+++ b/lib/lp/snappy/model/snapbuildjob.py
@@ -188,6 +188,7 @@ class SnapStoreUploadJob(SnapBuildJobDerived):
         snap_build_job = SnapBuildJob(snapbuild, cls.class_job_type, {})
         job = cls(snap_build_job)
         job.celeryRunOnCommit()
+        IStore(SnapBuildJob).flush()
         del get_property_cache(snapbuild).last_store_upload_job
         notify(SnapBuildStoreUploadStatusChangedEvent(snapbuild))
         return job
diff --git a/lib/lp/snappy/model/snapjob.py b/lib/lp/snappy/model/snapjob.py
index 5ec3be9..69f4bb5 100644
--- a/lib/lp/snappy/model/snapjob.py
+++ b/lib/lp/snappy/model/snapjob.py
@@ -191,6 +191,7 @@ class SnapRequestBuildsJob(SnapJobDerived):
         snap_job = SnapJob(snap, cls.class_job_type, metadata)
         job = cls(snap_job)
         job.celeryRunOnCommit()
+        IStore(SnapJob).flush()
         return job
 
     @classmethod
diff --git a/lib/lp/soyuz/model/initializedistroseriesjob.py b/lib/lp/soyuz/model/initializedistroseriesjob.py
index 8c7775e..48a30eb 100644
--- a/lib/lp/soyuz/model/initializedistroseriesjob.py
+++ b/lib/lp/soyuz/model/initializedistroseriesjob.py
@@ -113,6 +113,7 @@ class InitializeDistroSeriesJob(DistributionJobDerived):
         store.add(distribution_job)
         derived_job = cls(distribution_job)
         derived_job.celeryRunOnCommit()
+        IStore(DistributionJob).flush()
         return derived_job
 
     @classmethod
diff --git a/lib/lp/soyuz/model/processacceptedbugsjob.py b/lib/lp/soyuz/model/processacceptedbugsjob.py
index c4a1515..e368155 100644
--- a/lib/lp/soyuz/model/processacceptedbugsjob.py
+++ b/lib/lp/soyuz/model/processacceptedbugsjob.py
@@ -260,6 +260,7 @@ class ProcessAcceptedBugsJob(StormBase, BaseRunnableJob):
             distroseries, sourcepackagerelease, bug_ids
         )
         IPrimaryStore(ProcessAcceptedBugsJob).add(job)
+        IPrimaryStore(ProcessAcceptedBugsJob).flush()
         job.celeryRunOnCommit()
         return job
 
diff --git a/lib/lp/translations/model/translationsharingjob.py b/lib/lp/translations/model/translationsharingjob.py
index 950143b..d317d2a 100644
--- a/lib/lp/translations/model/translationsharingjob.py
+++ b/lib/lp/translations/model/translationsharingjob.py
@@ -173,6 +173,7 @@ class TranslationSharingJobDerived(metaclass=EnumeratedSubclass):
         )
         derived = cls(context)
         derived.celeryRunOnCommit()
+        IStore(TranslationSharingJob).flush()
         return derived
 
     @classmethod
diff --git a/lib/lp/translations/tests/test_pofilestatsjob.py b/lib/lp/translations/tests/test_pofilestatsjob.py
index 517949f..c805674 100644
--- a/lib/lp/translations/tests/test_pofilestatsjob.py
+++ b/lib/lp/translations/tests/test_pofilestatsjob.py
@@ -23,7 +23,7 @@ class TestPOFileStatsJob(TestCaseWithFactory):
 
     def test_job_interface(self):
         # Instances of POFileStatsJob are runnable jobs.
-        verifyObject(IRunnableJob, POFileStatsJob(0))
+        verifyObject(IRunnableJob, POFileStatsJob(self.factory.makePOFile()))
 
     def test_source_interface(self):
         # The POFileStatsJob class is a source of POFileStatsJobs.
@@ -38,7 +38,7 @@ class TestPOFileStatsJob(TestCaseWithFactory):
         self.factory.makePOTMsgSet(pofile.potemplate, singular)
         # The statistics start at 0.
         self.assertEqual(pofile.potemplate.messageCount(), 0)
-        job = pofilestatsjob.schedule(pofile.id)
+        job = pofilestatsjob.schedule(pofile)
         # Just scheduling the job doesn't update the statistics.
         self.assertEqual(pofile.potemplate.messageCount(), 0)
         with dbuser("pofilestats"):
@@ -58,7 +58,7 @@ class TestPOFileStatsJob(TestCaseWithFactory):
         self.factory.makePOTMsgSet(pofile.potemplate, singular)
         # The statistics are still at 0, even though there is a message.
         self.assertEqual(potemplate.messageCount(), 0)
-        job = pofilestatsjob.schedule(pofile.id)
+        job = pofilestatsjob.schedule(pofile)
         # Just scheduling the job doesn't update the statistics.
         self.assertEqual(pofile.potemplate.messageCount(), 0)
         with dbuser("pofilestats"):
@@ -73,7 +73,7 @@ class TestPOFileStatsJob(TestCaseWithFactory):
         # We need a POFile to update.
         pofile = self.factory.makePOFile(side=TranslationSide.UPSTREAM)
         # If we schedule a job, then we'll get it back.
-        job = pofilestatsjob.schedule(pofile.id)
+        job = pofilestatsjob.schedule(pofile)
         self.assertIs(list(POFileStatsJob.iterReady())[0], job)
 
     def test_second_job_is_scheduled(self):
@@ -83,11 +83,11 @@ class TestPOFileStatsJob(TestCaseWithFactory):
         # We need a POFile to update.
         pofile = self.factory.makePOFile(side=TranslationSide.UPSTREAM)
         # If we schedule a job, then there will be one scheduled.
-        pofilestatsjob.schedule(pofile.id)
+        pofilestatsjob.schedule(pofile)
         self.assertIs(len(list(POFileStatsJob.iterReady())), 1)
         # If we attempt to schedule another job for the same POFile, a new job
         # is added.
-        pofilestatsjob.schedule(pofile.id)
+        pofilestatsjob.schedule(pofile)
         self.assertIs(len(list(POFileStatsJob.iterReady())), 2)
 
     def assertJobUpdatesStats(self, pofile1, pofile2):
@@ -97,7 +97,7 @@ class TestPOFileStatsJob(TestCaseWithFactory):
         # The statistics start at 0.
         self.assertEqual(pofile1.getStatistics(), (0, 0, 0, 0))
         self.assertEqual(pofile2.getStatistics(), (0, 0, 0, 0))
-        job = pofilestatsjob.schedule(pofile1.id)
+        job = pofilestatsjob.schedule(pofile1)
         # Just scheduling the job doesn't update the statistics.
         self.assertEqual(pofile1.getStatistics(), (0, 0, 0, 0))
         self.assertEqual(pofile2.getStatistics(), (0, 0, 0, 0))
@@ -207,7 +207,7 @@ class TestViaCelery(TestCaseWithFactory):
         self.factory.makePOTMsgSet(pofile.potemplate, singular)
         # The statistics start at 0.
         self.assertEqual(pofile.potemplate.messageCount(), 0)
-        pofilestatsjob.schedule(pofile.id)
+        pofilestatsjob.schedule(pofile)
         with block_on_job():
             transaction.commit()
         # Now that the job ran, the statistics have been updated.