launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #30287
[Merge] ~cjwatson/launchpad:stormify-job into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:stormify-job into launchpad:master.
Commit message:
Convert Job to Storm
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/447422
I had to be a bit more careful with flushing row creations to the database in order to get their IDs back, which required updates to a couple of query count tests that previously only happened to pass because they didn't notice some unflushed queries at the end of the test.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:stormify-job into launchpad:master.
diff --git a/lib/lp/charms/model/charmrecipebuildjob.py b/lib/lp/charms/model/charmrecipebuildjob.py
index d923485..3534797 100644
--- a/lib/lp/charms/model/charmrecipebuildjob.py
+++ b/lib/lp/charms/model/charmrecipebuildjob.py
@@ -193,6 +193,7 @@ class CharmhubUploadJob(CharmRecipeBuildJobDerived):
)
job = cls(charm_recipe_build_job)
job.celeryRunOnCommit()
+ IStore(CharmRecipeBuildJob).flush()
del get_property_cache(build).last_store_upload_job
upload_status = build.store_upload_status
if upload_status != before_modification.store_upload_status:
diff --git a/lib/lp/charms/model/charmrecipejob.py b/lib/lp/charms/model/charmrecipejob.py
index 6971b4d..56efa07 100644
--- a/lib/lp/charms/model/charmrecipejob.py
+++ b/lib/lp/charms/model/charmrecipejob.py
@@ -185,6 +185,7 @@ class CharmRecipeRequestBuildsJob(CharmRecipeJobDerived):
recipe_job = CharmRecipeJob(recipe, cls.class_job_type, metadata)
job = cls(recipe_job)
job.celeryRunOnCommit()
+ IStore(CharmRecipeJob).flush()
return job
@classmethod
diff --git a/lib/lp/code/model/branchjob.py b/lib/lp/code/model/branchjob.py
index e593b2b..c08a8c1 100644
--- a/lib/lp/code/model/branchjob.py
+++ b/lib/lp/code/model/branchjob.py
@@ -867,6 +867,7 @@ class RosettaUploadJob(BranchJobDerived):
)
job = cls(branch_job)
job.celeryRunOnCommit()
+ IStore(BranchJob).flush()
return job
else:
return None
@@ -1136,6 +1137,7 @@ class BranchModifiedMailJob(BranchJobDerived):
branch_job = BranchJob(branch, cls.class_job_type, metadata)
job = cls(branch_job)
job.celeryRunOnCommit()
+ IStore(BranchJob).flush()
return job
@property
diff --git a/lib/lp/code/model/branchmergeproposal.py b/lib/lp/code/model/branchmergeproposal.py
index 12df826..04b28b3 100644
--- a/lib/lp/code/model/branchmergeproposal.py
+++ b/lib/lp/code/model/branchmergeproposal.py
@@ -1105,10 +1105,9 @@ class BranchMergeProposal(StormBase, BugLinkTargetMixin):
# or not they have completed.
from lp.code.model.branchmergeproposaljob import BranchMergeProposalJob
- for job in BranchMergeProposalJob.selectBy(
- branch_merge_proposal=self.id
- ):
- job.destroySelf()
+ IStore(BranchMergeProposalJob).find(
+ BranchMergeProposalJob, branch_merge_proposal=self
+ ).remove()
self._preview_diffs.remove()
Store.of(self).remove(self)
diff --git a/lib/lp/code/model/branchmergeproposaljob.py b/lib/lp/code/model/branchmergeproposaljob.py
index e173f29..c4fb588 100644
--- a/lib/lp/code/model/branchmergeproposaljob.py
+++ b/lib/lp/code/model/branchmergeproposaljob.py
@@ -32,6 +32,7 @@ from storm.store import Store
from zope.component import getUtility
from zope.interface import implementer, provider
+from lp.app.errors import NotFoundError
from lp.code.adapters.branch import BranchMergeProposalDelta
from lp.code.enums import BranchType
from lp.code.errors import BranchHasPendingWrites, UpdatePreviewDiffNotReady
@@ -63,7 +64,6 @@ from lp.registry.interfaces.person import IPersonSet
from lp.services.config import config
from lp.services.database.enumcol import DBEnum
from lp.services.database.interfaces import IPrimaryStore, IStore
-from lp.services.database.sqlobject import SQLObjectNotFound
from lp.services.database.stormbase import StormBase
from lp.services.job.interfaces.job import JobStatus
from lp.services.job.model.job import EnumeratedSubclass, Job
@@ -182,25 +182,14 @@ class BranchMergeProposalJob(StormBase):
Store.of(self).remove(self)
@classmethod
- def selectBy(klass, **kwargs):
- """Return selected instances of this class.
-
- At least one pair of keyword arguments must be supplied.
- foo=bar is interpreted as 'select all instances of
- BranchMergeProposalJob whose property "foo" is equal to "bar"'.
- """
- assert len(kwargs) > 0
- return IStore(klass).find(klass, **kwargs)
-
- @classmethod
def get(klass, key):
"""Return the instance of this class whose key is supplied.
- :raises: SQLObjectNotFound
+ :raises: NotFoundError
"""
instance = IStore(klass).get(klass, key)
if instance is None:
- raise SQLObjectNotFound(
+ raise NotFoundError(
"No occurrence of %s has key %s" % (klass.__name__, key)
)
return instance
@@ -236,6 +225,7 @@ class BranchMergeProposalJobDerived(
base_job = BranchMergeProposalJob(bmp, cls.class_job_type, metadata)
job = cls(base_job)
job.celeryRunOnCommit()
+ IStore(BranchMergeProposalJob).flush()
return job
@classmethod
@@ -244,12 +234,12 @@ class BranchMergeProposalJobDerived(
:return: the BranchMergeProposalJob with the specified id, as the
current BranchMergeProposalJobDereived subclass.
- :raises: SQLObjectNotFound if there is no job with the specified id,
+ :raises: NotFoundError if there is no job with the specified id,
or its job_type does not match the desired subclass.
"""
job = BranchMergeProposalJob.get(job_id)
if job.job_type != cls.class_job_type:
- raise SQLObjectNotFound(
+ raise NotFoundError(
"No object found with id %d and type %s"
% (job_id, cls.class_job_type.title)
)
@@ -659,7 +649,7 @@ class BranchMergeProposalJobSource(BaseRunnableJobSource):
:return: the BranchMergeProposalJob with the specified id, as the
current BranchMergeProposalJobDereived subclass.
- :raises: SQLObjectNotFound if there is no job with the specified id,
+ :raises: NotFoundError if there is no job with the specified id,
or its job_type does not match the desired subclass.
"""
job = BranchMergeProposalJob.get(job_id)
diff --git a/lib/lp/code/model/gitjob.py b/lib/lp/code/model/gitjob.py
index 1c51b94..f1e9fce 100644
--- a/lib/lp/code/model/gitjob.py
+++ b/lib/lp/code/model/gitjob.py
@@ -207,6 +207,7 @@ class GitRefScanJob(GitJobDerived):
git_job = GitJob(repository, cls.class_job_type, {})
job = cls(git_job)
job.celeryRunOnCommit()
+ IStore(GitJob).flush()
return job
@staticmethod
@@ -292,6 +293,7 @@ class ReclaimGitRepositorySpaceJob(GitJobDerived):
)
job = cls(git_job)
job.celeryRunOnCommit()
+ IStore(GitJob).flush()
return job
@property
@@ -393,6 +395,7 @@ class GitRepositoryModifiedMailJob(GitJobDerived):
git_job = GitJob(repository, cls.class_job_type, metadata)
job = cls(git_job)
job.celeryRunOnCommit()
+ IStore(GitJob).flush()
return job
@property
diff --git a/lib/lp/code/model/tests/test_branchjob.py b/lib/lp/code/model/tests/test_branchjob.py
index da70a91..a9610b3 100644
--- a/lib/lp/code/model/tests/test_branchjob.py
+++ b/lib/lp/code/model/tests/test_branchjob.py
@@ -1022,7 +1022,7 @@ class TestRosettaUploadJob(TestCaseWithFactory):
# the two out of any accidental sync by advancing the Job.id
# sequence.
dummy = Job()
- dummy.sync()
+ IStore(Job).flush()
dummy.destroySelf()
# Now create the RosettaUploadJob.
diff --git a/lib/lp/code/model/tests/test_branchmergeproposal.py b/lib/lp/code/model/tests/test_branchmergeproposal.py
index 4181b6e..26780a7 100644
--- a/lib/lp/code/model/tests/test_branchmergeproposal.py
+++ b/lib/lp/code/model/tests/test_branchmergeproposal.py
@@ -26,6 +26,7 @@ from zope.security.interfaces import Unauthorized
from zope.security.proxy import removeSecurityProxy
from lp.app.enums import InformationType
+from lp.app.errors import NotFoundError
from lp.app.interfaces.launchpad import IPrivacy
from lp.code.adapters.branch import BranchMergeProposalNoPreviewDiffDelta
from lp.code.enums import (
@@ -77,7 +78,6 @@ from lp.registry.interfaces.person import IPersonSet
from lp.registry.interfaces.product import IProductSet
from lp.services.config import config
from lp.services.database.constants import UTC_NOW
-from lp.services.database.sqlobject import SQLObjectNotFound
from lp.services.job.interfaces.job import JobStatus
from lp.services.webapp import canonical_url
from lp.services.webhooks.testing import LogsScheduledWebhooks
@@ -1862,14 +1862,14 @@ class TestBranchMergeProposalDeletion(TestCaseWithFactory):
def test_deleteProposal_deletes_job(self):
"""Deleting a branch merge proposal deletes all related jobs."""
proposal = self.factory.makeBranchMergeProposal()
+ store = Store.of(proposal)
job = MergeProposalNeedsReviewEmailJob.create(proposal)
- job.context.sync()
job_id = job.context.id
login_person(proposal.registrant)
proposal.deleteProposal()
- self.assertRaises(
- SQLObjectNotFound, BranchMergeProposalJob.get, job_id
- )
+ store.flush()
+ store.invalidate()
+ self.assertRaises(NotFoundError, BranchMergeProposalJob.get, job_id)
class TestBranchMergeProposalBugs(WithVCSScenarios, TestCaseWithFactory):
diff --git a/lib/lp/code/model/tests/test_branchmergeproposaljobs.py b/lib/lp/code/model/tests/test_branchmergeproposaljobs.py
index f8f676a..f1d7889 100644
--- a/lib/lp/code/model/tests/test_branchmergeproposaljobs.py
+++ b/lib/lp/code/model/tests/test_branchmergeproposaljobs.py
@@ -22,6 +22,7 @@ from testtools.matchers import (
from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
+from lp.app.errors import NotFoundError
from lp.code.adapters.branch import BranchMergeProposalNoPreviewDiffDelta
from lp.code.enums import BranchMergeProposalStatus
from lp.code.interfaces.branchmergeproposal import (
@@ -53,7 +54,6 @@ from lp.code.model.branchmergeproposaljob import (
from lp.code.model.tests.test_diff import DiffTestCase
from lp.code.subscribers.branchmergeproposal import merge_proposal_modified
from lp.services.config import config
-from lp.services.database.sqlobject import SQLObjectNotFound
from lp.services.features.testing import FeatureFixture
from lp.services.job.interfaces.job import JobStatus
from lp.services.job.model.job import Job
@@ -118,7 +118,7 @@ class TestBranchMergeProposalJobDerived(TestCaseWithFactory):
It's an error to call get on BranchMergeProposalJobDerived-- it must
be called on a subclass. An object is returned only if the job id
and job type match the request. If no suitable object can be found,
- SQLObjectNotFound is raised.
+ NotFoundError is raised.
"""
bmp = self.factory.makeBranchMergeProposal()
job = MergeProposalNeedsReviewEmailJob.create(bmp)
@@ -126,9 +126,9 @@ class TestBranchMergeProposalJobDerived(TestCaseWithFactory):
self.assertRaises(
AttributeError, BranchMergeProposalJobDerived.get, job.id
)
- self.assertRaises(SQLObjectNotFound, UpdatePreviewDiffJob.get, job.id)
+ self.assertRaises(NotFoundError, UpdatePreviewDiffJob.get, job.id)
self.assertRaises(
- SQLObjectNotFound, MergeProposalNeedsReviewEmailJob.get, job.id + 1
+ NotFoundError, MergeProposalNeedsReviewEmailJob.get, job.id + 1
)
self.assertEqual(job, MergeProposalNeedsReviewEmailJob.get(job.id))
diff --git a/lib/lp/oci/model/ocirecipebuildjob.py b/lib/lp/oci/model/ocirecipebuildjob.py
index f8d4f3c..2d0ccac 100644
--- a/lib/lp/oci/model/ocirecipebuildjob.py
+++ b/lib/lp/oci/model/ocirecipebuildjob.py
@@ -213,6 +213,7 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
)
job = cls(oci_build_job)
job.celeryRunOnCommit()
+ IStore(OCIRecipeBuildJob).flush()
del get_property_cache(build).last_registry_upload_job
upload_status = build.registry_upload_status
if upload_status != before_modification.registry_upload_status:
diff --git a/lib/lp/oci/model/ocirecipejob.py b/lib/lp/oci/model/ocirecipejob.py
index 735e516..30e9d28 100644
--- a/lib/lp/oci/model/ocirecipejob.py
+++ b/lib/lp/oci/model/ocirecipejob.py
@@ -174,6 +174,7 @@ class OCIRecipeRequestBuildsJob(OCIRecipeJobDerived):
recipe_job = OCIRecipeJob(recipe, cls.class_job_type, metadata)
job = cls(recipe_job)
job.celeryRunOnCommit()
+ IStore(OCIRecipeJob).flush()
return job
@classmethod
diff --git a/lib/lp/registry/browser/tests/test_teammembership.py b/lib/lp/registry/browser/tests/test_teammembership.py
index 9856bdc..f76f72c 100644
--- a/lib/lp/registry/browser/tests/test_teammembership.py
+++ b/lib/lp/registry/browser/tests/test_teammembership.py
@@ -32,22 +32,21 @@ class TestTeamMenu(TestCaseWithFactory):
# Only these queries should be run, no matter what the
# membership tree looks like, although the number of queries
# could change slightly if a different user is logged in.
- # 1. Check whether the user is the team owner.
- # 2. Deactivate the membership in the TeamMembership table.
- # 3. Delete from TeamParticipation table.
- # (Queries #4, #5, #6, #7, and #10 are run because the storm
- # objects have been invalidated.)
- # 4. Get the TeamMembership entry.
- # 5. Verify that the member exists in the db, but don't load
- # the refresh the rest of its data, since we just need the id.
- # 6. Verify that the user exists in the db.
- # 7. Verify that the team exists in the db.
- # 8. Insert into Job table.
- # 9. Insert into PersonTransferJob table to schedule sending
- # email. (This requires the data from queries #5, #6, and
- # #7.)
- # 10.Query the rest of the team data for the invalidated
- # object in order to generate the canonical url.
+ # 1. Check whether the user is the team owner.
+ # 2. Deactivate the membership in the TeamMembership table.
+ # 3. Delete from TeamParticipation table.
+ # (Queries #4, #5, #8, and #9 are run because the storm
+ # objects have been invalidated.)
+ # 4. Get the TeamMembership entry.
+ # 5. Verify that the member exists in the db.
+ # 6. Insert into Job table.
+ # 7. Insert into SharingJob table to schedule removal of
+ # subscriptions to artifacts shared with the team.
+ # 8. Verify that the user exists in the db.
+ # 9. Verify that the team exists in the db.
+ # 10. Insert into Job table.
+ # 11. Insert into PersonTransferJob table to schedule sending
+ # email. (This requires the data from queries #5, #8, and #9.)
self.team.addMember(
self.member, self.team.teamowner, force_team_add=True
)
@@ -64,4 +63,4 @@ class TestTeamMenu(TestCaseWithFactory):
view.processForm()
self.assertEqual("", view.errormessage)
self.assertEqual(TeamMembershipStatus.DEACTIVATED, membership.status)
- self.assertThat(recorder, HasQueryCount(LessThan(11)))
+ self.assertThat(recorder, HasQueryCount(LessThan(12)))
diff --git a/lib/lp/registry/model/persontransferjob.py b/lib/lp/registry/model/persontransferjob.py
index 292f256..7c842e7 100644
--- a/lib/lp/registry/model/persontransferjob.py
+++ b/lib/lp/registry/model/persontransferjob.py
@@ -148,6 +148,7 @@ class PersonTransferJobDerived(BaseRunnableJob, metaclass=EnumeratedSubclass):
)
derived = cls(job)
derived.celeryRunOnCommit()
+ IStore(PersonTransferJob).flush()
return derived
@classmethod
diff --git a/lib/lp/registry/model/sharingjob.py b/lib/lp/registry/model/sharingjob.py
index 3dc412d..9f61dd1 100644
--- a/lib/lp/registry/model/sharingjob.py
+++ b/lib/lp/registry/model/sharingjob.py
@@ -184,6 +184,7 @@ class SharingJobDerived(BaseRunnableJob, metaclass=EnumeratedSubclass):
base_job = SharingJob(cls.class_job_type, pillar, grantee, metadata)
job = cls(base_job)
job.celeryRunOnCommit()
+ IStore(SharingJob).flush()
return job
@classmethod
diff --git a/lib/lp/registry/tests/test_teammembership.py b/lib/lp/registry/tests/test_teammembership.py
index a76ae2f..ff2792a 100644
--- a/lib/lp/registry/tests/test_teammembership.py
+++ b/lib/lp/registry/tests/test_teammembership.py
@@ -620,7 +620,7 @@ class TestParticipationCleanup(TeamParticipationTestCase):
The number of db queries should be constant not O(depth).
"""
self.assertStatementCount(
- 9,
+ 11,
self.team5.setMembershipData,
self.no_priv,
TeamMembershipStatus.DEACTIVATED,
diff --git a/lib/lp/services/job/model/job.py b/lib/lp/services/job/model/job.py
index b5ef078..896defc 100644
--- a/lib/lp/services/job/model/job.py
+++ b/lib/lp/services/job/model/job.py
@@ -18,16 +18,14 @@ from datetime import datetime, timezone
import transaction
from lazr.jobrunner.jobrunner import LeaseHeld
from storm.expr import And, Or, Select
-from storm.locals import JSON, Int, Reference
+from storm.locals import JSON, DateTime, Int, Reference, Unicode
from zope.interface import implementer
from lp.services.database import bulk
-from lp.services.database.constants import UTC_NOW
-from lp.services.database.datetimecol import UtcDateTimeCol
+from lp.services.database.constants import DEFAULT, UTC_NOW
from lp.services.database.enumcol import DBEnum
from lp.services.database.interfaces import IStore
-from lp.services.database.sqlbase import SQLBase
-from lp.services.database.sqlobject import StringCol
+from lp.services.database.stormbase import StormBase
from lp.services.job.interfaces.job import IJob, JobStatus, JobType
@@ -43,24 +41,28 @@ class InvalidTransition(Exception):
@implementer(IJob)
-class Job(SQLBase):
+class Job(StormBase):
"""See `IJob`."""
+ __storm_table__ = "Job"
+
@property
def job_id(self):
return self.id
- scheduled_start = UtcDateTimeCol()
+ id = Int(primary=True)
+
+ scheduled_start = DateTime(tzinfo=timezone.utc)
- date_created = UtcDateTimeCol()
+ date_created = DateTime(tzinfo=timezone.utc)
- date_started = UtcDateTimeCol()
+ date_started = DateTime(tzinfo=timezone.utc)
- date_finished = UtcDateTimeCol()
+ date_finished = DateTime(tzinfo=timezone.utc)
- lease_expires = UtcDateTimeCol()
+ lease_expires = DateTime(tzinfo=timezone.utc)
- log = StringCol()
+ log = Unicode()
_status = DBEnum(
enum=JobStatus,
@@ -106,6 +108,28 @@ class Job(SQLBase):
status = property(lambda x: x._status)
+ def __init__(
+ self,
+ scheduled_start=None,
+ date_finished=None,
+ lease_expires=None,
+ max_retries=DEFAULT,
+ requester=None,
+ base_json_data=None,
+ base_job_type=None,
+ status=JobStatus.WAITING,
+ ):
+ super().__init__()
+ self.scheduled_start = scheduled_start
+ self.date_finished = date_finished
+ self.lease_expires = lease_expires
+ self.max_retries = max_retries
+ self.requester = requester
+ self.base_json_data = base_json_data
+ self.base_job_type = base_job_type
+ self._status = status
+ IStore(Job).add(self)
+
@property
def is_pending(self):
"""See `IJob`."""
@@ -217,6 +241,9 @@ class Job(SQLBase):
self._set_status(JobStatus.WAITING)
self.lease_expires = None
+ def destroySelf(self):
+ IStore(Job).remove(self)
+
class EnumeratedSubclass(type):
"""Metaclass for when subclasses are assigned enums."""
diff --git a/lib/lp/services/job/tests/test_celery.py b/lib/lp/services/job/tests/test_celery.py
index 2490962..70bf2b6 100644
--- a/lib/lp/services/job/tests/test_celery.py
+++ b/lib/lp/services/job/tests/test_celery.py
@@ -45,6 +45,7 @@ class TestJob(BaseRunnableJob):
self.job = store.find(Job, id=job_id)[0]
else:
self.job = Job(max_retries=2, scheduled_start=scheduled_start)
+ IStore(Job).flush()
def run(self):
pass
diff --git a/lib/lp/services/job/tests/test_job.py b/lib/lp/services/job/tests/test_job.py
index 799ad98..886e5ad 100644
--- a/lib/lp/services/job/tests/test_job.py
+++ b/lib/lp/services/job/tests/test_job.py
@@ -100,17 +100,17 @@ class TestJob(TestCaseWithFactory):
def test_start_when_completed_is_invalid(self):
"""When a job is completed, attempting to start is invalid."""
- job = Job(_status=JobStatus.COMPLETED)
+ job = Job(status=JobStatus.COMPLETED)
self.assertRaises(InvalidTransition, job.start)
def test_start_when_failed_is_invalid(self):
"""When a job is failed, attempting to start is invalid."""
- job = Job(_status=JobStatus.FAILED)
+ job = Job(status=JobStatus.FAILED)
self.assertRaises(InvalidTransition, job.start)
def test_start_when_running_is_invalid(self):
"""When a job is running, attempting to start is invalid."""
- job = Job(_status=JobStatus.FAILED)
+ job = Job(status=JobStatus.FAILED)
self.assertRaises(InvalidTransition, job.start)
def test_complete(self):
@@ -118,7 +118,7 @@ class TestJob(TestCaseWithFactory):
It should set date_finished and set the job status to COMPLETED.
"""
- job = Job(_status=JobStatus.RUNNING)
+ job = Job(status=JobStatus.RUNNING)
self.assertEqual(None, job.date_finished)
job.complete()
self.assertNotEqual(None, job.date_finished)
@@ -126,17 +126,17 @@ class TestJob(TestCaseWithFactory):
def test_complete_when_waiting_is_invalid(self):
"""When a job is waiting, attempting to complete is invalid."""
- job = Job(_status=JobStatus.WAITING)
+ job = Job(status=JobStatus.WAITING)
self.assertRaises(InvalidTransition, job.complete)
def test_complete_when_completed_is_invalid(self):
"""When a job is completed, attempting to complete is invalid."""
- job = Job(_status=JobStatus.COMPLETED)
+ job = Job(status=JobStatus.COMPLETED)
self.assertRaises(InvalidTransition, job.complete)
def test_complete_when_failed_is_invalid(self):
"""When a job is failed, attempting to complete is invalid."""
- job = Job(_status=JobStatus.FAILED)
+ job = Job(status=JobStatus.FAILED)
self.assertRaises(InvalidTransition, job.complete)
def test_fail(self):
@@ -144,7 +144,7 @@ class TestJob(TestCaseWithFactory):
It should set date_finished and set the job status to FAILED.
"""
- job = Job(_status=JobStatus.RUNNING)
+ job = Job(status=JobStatus.RUNNING)
self.assertEqual(None, job.date_finished)
job.fail()
self.assertNotEqual(None, job.date_finished)
@@ -152,17 +152,17 @@ class TestJob(TestCaseWithFactory):
def test_fail_when_waiting_is_invalid(self):
"""When a job is waiting, attempting to fail is invalid."""
- job = Job(_status=JobStatus.WAITING)
+ job = Job(status=JobStatus.WAITING)
self.assertRaises(InvalidTransition, job.fail)
def test_fail_when_completed_is_invalid(self):
"""When a job is completed, attempting to fail is invalid."""
- job = Job(_status=JobStatus.COMPLETED)
+ job = Job(status=JobStatus.COMPLETED)
self.assertRaises(InvalidTransition, job.fail)
def test_fail_when_failed_is_invalid(self):
"""When a job is failed, attempting to fail is invalid."""
- job = Job(_status=JobStatus.FAILED)
+ job = Job(status=JobStatus.FAILED)
self.assertRaises(InvalidTransition, job.fail)
def test_queue(self):
@@ -170,7 +170,7 @@ class TestJob(TestCaseWithFactory):
It should set date_finished, and set status to WAITING.
"""
- job = Job(_status=JobStatus.RUNNING)
+ job = Job(status=JobStatus.RUNNING)
self.assertEqual(None, job.date_finished)
job.queue()
self.assertNotEqual(None, job.date_finished)
@@ -178,76 +178,76 @@ class TestJob(TestCaseWithFactory):
def test_queue_clears_lease_expires(self):
"""Queueing a job releases its lease."""
- job = Job(_status=JobStatus.RUNNING)
+ job = Job(status=JobStatus.RUNNING)
job.lease_expires = UTC_NOW
job.queue()
self.assertIsNone(job.lease_expires)
def test_suspend(self):
"""A job that is in the WAITING state can be suspended."""
- job = Job(_status=JobStatus.WAITING)
+ job = Job(status=JobStatus.WAITING)
job.suspend()
self.assertEqual(job.status, JobStatus.SUSPENDED)
def test_suspend_when_running(self):
"""When a job is running, attempting to suspend is valid."""
- job = Job(_status=JobStatus.RUNNING)
+ job = Job(status=JobStatus.RUNNING)
job.suspend()
self.assertEqual(JobStatus.SUSPENDED, job.status)
def test_suspend_when_completed(self):
"""When a job is completed, attempting to suspend is invalid."""
- job = Job(_status=JobStatus.COMPLETED)
+ job = Job(status=JobStatus.COMPLETED)
self.assertRaises(InvalidTransition, job.suspend)
def test_suspend_when_failed(self):
"""When a job is failed, attempting to suspend is invalid."""
- job = Job(_status=JobStatus.FAILED)
+ job = Job(status=JobStatus.FAILED)
self.assertRaises(InvalidTransition, job.suspend)
def test_resume(self):
"""A job that is suspended can be resumed."""
- job = Job(_status=JobStatus.SUSPENDED)
+ job = Job(status=JobStatus.SUSPENDED)
job.resume()
self.assertEqual(job.status, JobStatus.WAITING)
def test_resume_clears_lease_expires(self):
"""A job that resumes should null out the lease_expires."""
- job = Job(_status=JobStatus.SUSPENDED)
+ job = Job(status=JobStatus.SUSPENDED)
job.lease_expires = UTC_NOW
job.resume()
self.assertIsNone(job.lease_expires)
def test_resume_when_running(self):
"""When a job is running, attempting to resume is invalid."""
- job = Job(_status=JobStatus.RUNNING)
+ job = Job(status=JobStatus.RUNNING)
self.assertRaises(InvalidTransition, job.resume)
def test_resume_when_completed(self):
"""When a job is completed, attempting to resume is invalid."""
- job = Job(_status=JobStatus.COMPLETED)
+ job = Job(status=JobStatus.COMPLETED)
self.assertRaises(InvalidTransition, job.resume)
def test_resume_when_failed(self):
"""When a job is failed, attempting to resume is invalid."""
- job = Job(_status=JobStatus.FAILED)
+ job = Job(status=JobStatus.FAILED)
self.assertRaises(InvalidTransition, job.resume)
def test_is_pending(self):
"""is_pending is True when the job can possibly complete."""
for status in JobStatus.items:
- job = Job(_status=status)
+ job = Job(status=status)
self.assertEqual(status in Job.PENDING_STATUSES, job.is_pending)
def test_is_runnable_when_failed(self):
"""is_runnable is false when the job is not WAITING."""
- job = Job(_status=JobStatus.FAILED)
+ job = Job(status=JobStatus.FAILED)
self.assertFalse(job.is_runnable)
def test_is_runnable_when_scheduled_in_future(self):
"""is_runnable is false when the job is scheduled in the future."""
job = Job(
- _status=JobStatus.WAITING,
+ status=JobStatus.WAITING,
scheduled_start=datetime.now(timezone.utc) + timedelta(seconds=60),
)
self.assertFalse(job.is_runnable)
@@ -255,14 +255,14 @@ class TestJob(TestCaseWithFactory):
def test_is_runnable_when_scheduled_in_past(self):
"""is_runnable is true when the job is scheduled in the past."""
job = Job(
- _status=JobStatus.WAITING,
+ status=JobStatus.WAITING,
scheduled_start=datetime.now(timezone.utc) - timedelta(seconds=60),
)
self.assertTrue(job.is_runnable)
def test_is_runnable_when_not_scheduled(self):
"""is_runnable is true when no explicit schedule has been requested."""
- job = Job(_status=JobStatus.WAITING)
+ job = Job(status=JobStatus.WAITING)
self.assertTrue(job.is_runnable)
def test_start_manages_transactions(self):
@@ -387,6 +387,7 @@ class TestReadiness(TestCase):
"""Job.ready_jobs should include new jobs."""
preexisting = self._sampleData()
job = Job()
+ Store.of(job).flush()
self.assertEqual(
preexisting + [(job.id,)],
list(Store.of(job).execute(Job.ready_jobs)),
@@ -395,7 +396,7 @@ class TestReadiness(TestCase):
def test_ready_jobs_started(self):
"""Job.ready_jobs should not jobs that have been started."""
preexisting = self._sampleData()
- job = Job(_status=JobStatus.RUNNING)
+ job = Job(status=JobStatus.RUNNING)
self.assertEqual(
preexisting, list(Store.of(job).execute(Job.ready_jobs))
)
@@ -405,6 +406,7 @@ class TestReadiness(TestCase):
preexisting = self._sampleData()
UNIX_EPOCH = datetime.fromtimestamp(0, timezone.utc)
job = Job(lease_expires=UNIX_EPOCH)
+ Store.of(job).flush()
self.assertEqual(
preexisting + [(job.id,)],
list(Store.of(job).execute(Job.ready_jobs)),
diff --git a/lib/lp/services/job/tests/test_runner.py b/lib/lp/services/job/tests/test_runner.py
index f443435..889f941 100644
--- a/lib/lp/services/job/tests/test_runner.py
+++ b/lib/lp/services/job/tests/test_runner.py
@@ -57,6 +57,7 @@ class NullJob(BaseRunnableJob):
):
self.message = completion_message
self.job = Job()
+ IStore(Job).flush()
self.oops_recipients = oops_recipients
if self.oops_recipients is None:
self.oops_recipients = []
@@ -512,6 +513,7 @@ class DerivedJob(BaseRunnableJob, StormBase):
super().__init__()
self.job = Job()
self.should_succeed = should_succeed
+ IStore(Job).flush()
def run(self):
if not self.should_succeed:
@@ -619,6 +621,7 @@ class StuckJob(StaticJobSource):
self.lease_length = lease_length
self.delay = delay
self.job = Job()
+ IStore(Job).flush()
def __repr__(self):
return "<%s(%r, lease_length=%s, delay=%s)>" % (
@@ -655,6 +658,7 @@ class InitialFailureJob(StaticJobSource):
def __init__(self, id, fail):
self.id = id
self.job = Job()
+ IStore(Job).flush()
self.fail = fail
def run(self):
@@ -677,6 +681,7 @@ class ProcessSharingJob(StaticJobSource):
def __init__(self, id, first):
self.id = id
self.job = Job()
+ IStore(Job).flush()
self.first = first
def run(self):
@@ -697,6 +702,7 @@ class MemoryHogJob(StaticJobSource):
def __init__(self, id):
self.job = Job()
+ IStore(Job).flush()
self.id = id
def run(self):
@@ -717,6 +723,7 @@ class LeaseHeldJob(StaticJobSource):
def __init__(self, id):
self.job = Job()
+ IStore(Job).flush()
self.id = id
def acquireLease(self):
diff --git a/lib/lp/services/webhooks/model.py b/lib/lp/services/webhooks/model.py
index 902b46e..5db81e2 100644
--- a/lib/lp/services/webhooks/model.py
+++ b/lib/lp/services/webhooks/model.py
@@ -573,6 +573,7 @@ class WebhookDeliveryJob(WebhookJobDerived):
)
job = cls(webhook_job)
job.celeryRunOnCommit()
+ IStore(WebhookJob).flush()
log.info(
"Scheduled %r (%s): %s"
% (job, event_type, _redact_payload(event_type, payload))
diff --git a/lib/lp/snappy/model/snapbuildjob.py b/lib/lp/snappy/model/snapbuildjob.py
index b16d2c8..539fb1c 100644
--- a/lib/lp/snappy/model/snapbuildjob.py
+++ b/lib/lp/snappy/model/snapbuildjob.py
@@ -188,6 +188,7 @@ class SnapStoreUploadJob(SnapBuildJobDerived):
snap_build_job = SnapBuildJob(snapbuild, cls.class_job_type, {})
job = cls(snap_build_job)
job.celeryRunOnCommit()
+ IStore(SnapBuildJob).flush()
del get_property_cache(snapbuild).last_store_upload_job
notify(SnapBuildStoreUploadStatusChangedEvent(snapbuild))
return job
diff --git a/lib/lp/snappy/model/snapjob.py b/lib/lp/snappy/model/snapjob.py
index 5ec3be9..69f4bb5 100644
--- a/lib/lp/snappy/model/snapjob.py
+++ b/lib/lp/snappy/model/snapjob.py
@@ -191,6 +191,7 @@ class SnapRequestBuildsJob(SnapJobDerived):
snap_job = SnapJob(snap, cls.class_job_type, metadata)
job = cls(snap_job)
job.celeryRunOnCommit()
+ IStore(SnapJob).flush()
return job
@classmethod
diff --git a/lib/lp/soyuz/model/initializedistroseriesjob.py b/lib/lp/soyuz/model/initializedistroseriesjob.py
index 8c7775e..48a30eb 100644
--- a/lib/lp/soyuz/model/initializedistroseriesjob.py
+++ b/lib/lp/soyuz/model/initializedistroseriesjob.py
@@ -113,6 +113,7 @@ class InitializeDistroSeriesJob(DistributionJobDerived):
store.add(distribution_job)
derived_job = cls(distribution_job)
derived_job.celeryRunOnCommit()
+ IStore(DistributionJob).flush()
return derived_job
@classmethod
diff --git a/lib/lp/soyuz/model/processacceptedbugsjob.py b/lib/lp/soyuz/model/processacceptedbugsjob.py
index c4a1515..e368155 100644
--- a/lib/lp/soyuz/model/processacceptedbugsjob.py
+++ b/lib/lp/soyuz/model/processacceptedbugsjob.py
@@ -260,6 +260,7 @@ class ProcessAcceptedBugsJob(StormBase, BaseRunnableJob):
distroseries, sourcepackagerelease, bug_ids
)
IPrimaryStore(ProcessAcceptedBugsJob).add(job)
+ IPrimaryStore(ProcessAcceptedBugsJob).flush()
job.celeryRunOnCommit()
return job
diff --git a/lib/lp/translations/model/translationsharingjob.py b/lib/lp/translations/model/translationsharingjob.py
index 950143b..d317d2a 100644
--- a/lib/lp/translations/model/translationsharingjob.py
+++ b/lib/lp/translations/model/translationsharingjob.py
@@ -173,6 +173,7 @@ class TranslationSharingJobDerived(metaclass=EnumeratedSubclass):
)
derived = cls(context)
derived.celeryRunOnCommit()
+ IStore(TranslationSharingJob).flush()
return derived
@classmethod
diff --git a/lib/lp/translations/tests/test_pofilestatsjob.py b/lib/lp/translations/tests/test_pofilestatsjob.py
index 517949f..c805674 100644
--- a/lib/lp/translations/tests/test_pofilestatsjob.py
+++ b/lib/lp/translations/tests/test_pofilestatsjob.py
@@ -23,7 +23,7 @@ class TestPOFileStatsJob(TestCaseWithFactory):
def test_job_interface(self):
# Instances of POFileStatsJob are runnable jobs.
- verifyObject(IRunnableJob, POFileStatsJob(0))
+ verifyObject(IRunnableJob, POFileStatsJob(self.factory.makePOFile()))
def test_source_interface(self):
# The POFileStatsJob class is a source of POFileStatsJobs.
@@ -38,7 +38,7 @@ class TestPOFileStatsJob(TestCaseWithFactory):
self.factory.makePOTMsgSet(pofile.potemplate, singular)
# The statistics start at 0.
self.assertEqual(pofile.potemplate.messageCount(), 0)
- job = pofilestatsjob.schedule(pofile.id)
+ job = pofilestatsjob.schedule(pofile)
# Just scheduling the job doesn't update the statistics.
self.assertEqual(pofile.potemplate.messageCount(), 0)
with dbuser("pofilestats"):
@@ -58,7 +58,7 @@ class TestPOFileStatsJob(TestCaseWithFactory):
self.factory.makePOTMsgSet(pofile.potemplate, singular)
# The statistics are still at 0, even though there is a message.
self.assertEqual(potemplate.messageCount(), 0)
- job = pofilestatsjob.schedule(pofile.id)
+ job = pofilestatsjob.schedule(pofile)
# Just scheduling the job doesn't update the statistics.
self.assertEqual(pofile.potemplate.messageCount(), 0)
with dbuser("pofilestats"):
@@ -73,7 +73,7 @@ class TestPOFileStatsJob(TestCaseWithFactory):
# We need a POFile to update.
pofile = self.factory.makePOFile(side=TranslationSide.UPSTREAM)
# If we schedule a job, then we'll get it back.
- job = pofilestatsjob.schedule(pofile.id)
+ job = pofilestatsjob.schedule(pofile)
self.assertIs(list(POFileStatsJob.iterReady())[0], job)
def test_second_job_is_scheduled(self):
@@ -83,11 +83,11 @@ class TestPOFileStatsJob(TestCaseWithFactory):
# We need a POFile to update.
pofile = self.factory.makePOFile(side=TranslationSide.UPSTREAM)
# If we schedule a job, then there will be one scheduled.
- pofilestatsjob.schedule(pofile.id)
+ pofilestatsjob.schedule(pofile)
self.assertIs(len(list(POFileStatsJob.iterReady())), 1)
# If we attempt to schedule another job for the same POFile, a new job
# is added.
- pofilestatsjob.schedule(pofile.id)
+ pofilestatsjob.schedule(pofile)
self.assertIs(len(list(POFileStatsJob.iterReady())), 2)
def assertJobUpdatesStats(self, pofile1, pofile2):
@@ -97,7 +97,7 @@ class TestPOFileStatsJob(TestCaseWithFactory):
# The statistics start at 0.
self.assertEqual(pofile1.getStatistics(), (0, 0, 0, 0))
self.assertEqual(pofile2.getStatistics(), (0, 0, 0, 0))
- job = pofilestatsjob.schedule(pofile1.id)
+ job = pofilestatsjob.schedule(pofile1)
# Just scheduling the job doesn't update the statistics.
self.assertEqual(pofile1.getStatistics(), (0, 0, 0, 0))
self.assertEqual(pofile2.getStatistics(), (0, 0, 0, 0))
@@ -207,7 +207,7 @@ class TestViaCelery(TestCaseWithFactory):
self.factory.makePOTMsgSet(pofile.potemplate, singular)
# The statistics start at 0.
self.assertEqual(pofile.potemplate.messageCount(), 0)
- pofilestatsjob.schedule(pofile.id)
+ pofilestatsjob.schedule(pofile)
with block_on_job():
transaction.commit()
# Now that the job ran, the statistics have been updated.