divmod-dev team mailing list archive
-
divmod-dev team
-
Mailing list archive
-
Message #00202
lp:~divmod-dev/divmod.org/start-scheduler-when-parent-service-starts-3000-2 into lp:divmod.org
Jean-Paul Calderone has proposed merging lp:~divmod-dev/divmod.org/start-scheduler-when-parent-service-starts-3000-2 into lp:divmod.org.
Requested reviews:
Divmod-dev (divmod-dev)
Related bugs:
Bug #910923 in Divmod Axiom: "Scheduled events aren't automatically executed"
https://bugs.launchpad.net/divmod-axiom/+bug/910923
For more details, see:
https://code.launchpad.net/~divmod-dev/divmod.org/start-scheduler-when-parent-service-starts-3000-2/+merge/87280
This fixes the scheduling startup issue by ensuring the scheduler powerup is created and associated with the store service as soon as the store service is created.
--
https://code.launchpad.net/~divmod-dev/divmod.org/start-scheduler-when-parent-service-starts-3000-2/+merge/87280
Your team Divmod-dev is requested to review the proposed merge of lp:~divmod-dev/divmod.org/start-scheduler-when-parent-service-starts-3000-2 into lp:divmod.org.
=== modified file 'Axiom/axiom/scheduler.py'
--- Axiom/axiom/scheduler.py 2009-07-07 20:35:43 +0000
+++ Axiom/axiom/scheduler.py 2012-01-02 18:04:35 +0000
@@ -1,5 +1,28 @@
# -*- test-case-name: axiom.test.test_scheduler -*-
+"""
+Timed event scheduling for Axiom databases.
+
+With this module, applications can schedule an L{Item} to have its C{run} method
+called at a particular point in the future. This call will happen even if the
+process which initially schedules it exits and the database is later re-opened
+by another process (of course, if the scheduled time comes and goes while no
+process is using the database, then the call will be delayed until some process
+opens the database and starts its services).
+
+This module contains two implementations of the L{axiom.iaxiom.IScheduler}
+interface, one for site stores and one for sub-stores. Items can only be
+scheduled using an L{IScheduler} implementations from the store containing the
+item. This means a typical way to schedule an item to be run is::
+
+ IScheduler(item.store).schedule(item, when)
+
+The scheduler service can also be retrieved from the site store's service
+collection by name::
+
+ IServiceCollection(siteStore).getServiceNamed(SITE_SCHEDULER)
+"""
+
import warnings
from zope.interface import implements
@@ -20,6 +43,9 @@
VERBOSE = False
+SITE_SCHEDULER = u"Site Scheduler"
+
+
class TimedEventFailureLog(Item):
typeName = 'timed_event_failure_log'
schemaVersion = 1
@@ -213,6 +239,7 @@
def __init__(self, store):
self.store = store
+ self.setName(SITE_SCHEDULER)
def startService(self):
=== modified file 'Axiom/axiom/store.py'
--- Axiom/axiom/store.py 2009-07-07 20:35:43 +0000
+++ Axiom/axiom/store.py 2012-01-02 18:04:35 +0000
@@ -140,6 +140,12 @@
batcher = batch.BatchProcessingControllerService(st)
batcher.setServiceParent(collection)
+ scheduler = iaxiom.IScheduler(st)
+ # If it's an old database, we might get a SubScheduler instance. It has no
+ # setServiceParent method.
+ if getattr(scheduler, 'setServiceParent', None) is not None:
+ scheduler.setServiceParent(collection)
+
return collection
@@ -989,7 +995,6 @@
sched = _SiteScheduler(empowered)
else:
sched = _UserScheduler(empowered)
- sched.setServiceParent(IService(empowered))
empowered._schedulerService = sched
return empowered._schedulerService
return None
@@ -1522,8 +1527,8 @@
if (key[0], key[1] + 1) in onDiskSchema:
raise RuntimeError(
- "Greater versions of database %r objects in the DB than in memory" %
- (actualType.typeName,))
+ "Memory version of %r is %d; database has newer" % (
+ actualType.typeName, key[1]))
# finally find old versions of the data and prepare to upgrade it.
=== modified file 'Axiom/axiom/test/historic/stub_subscheduler1to2.py'
--- Axiom/axiom/test/historic/stub_subscheduler1to2.py 2009-07-07 20:35:43 +0000
+++ Axiom/axiom/test/historic/stub_subscheduler1to2.py 2012-01-02 18:04:35 +0000
@@ -7,12 +7,14 @@
from axiom.test.historic.stubloader import saveStub
+from axiom.substore import SubStore
from axiom.scheduler import SubScheduler
from axiom.dependency import installOn
def createDatabase(store):
- installOn(SubScheduler(store=store), store)
+ sub = SubStore.createNew(store, ["substore"]).open()
+ installOn(SubScheduler(store=sub), sub)
if __name__ == '__main__':
=== modified file 'Axiom/axiom/test/historic/subscheduler1to2.axiom.tbz2'
Binary files Axiom/axiom/test/historic/subscheduler1to2.axiom.tbz2 2009-07-07 20:35:43 +0000 and Axiom/axiom/test/historic/subscheduler1to2.axiom.tbz2 2012-01-02 18:04:35 +0000 differ
=== modified file 'Axiom/axiom/test/historic/test_subscheduler1to2.py'
--- Axiom/axiom/test/historic/test_subscheduler1to2.py 2009-07-07 20:35:43 +0000
+++ Axiom/axiom/test/historic/test_subscheduler1to2.py 2012-01-02 18:04:35 +0000
@@ -5,6 +5,7 @@
"""
from axiom.iaxiom import IScheduler
+from axiom.substore import SubStore
from axiom.scheduler import SubScheduler, _UserScheduler
from axiom.test.historic.stubloader import StubbedTest
@@ -16,11 +17,12 @@
and adapting the L{Store} to L{IScheduler} succeeds with a
L{_UserScheduler}.
"""
- scheduler = self.store.findUnique(SubScheduler)
- self.assertEquals(list(self.store.interfacesFor(scheduler)), [])
-
- # Slothfully grant this test store the appearance of being a user
- # store.
- self.store.parent = self.store
-
- self.assertIsInstance(IScheduler(self.store), _UserScheduler)
+ sub = self.store.findFirst(SubStore).open()
+ upgraded = sub.whenFullyUpgraded()
+ def subUpgraded(ignored):
+ scheduler = sub.findUnique(SubScheduler)
+ self.assertEquals(list(sub.interfacesFor(scheduler)), [])
+
+ self.assertIsInstance(IScheduler(sub), _UserScheduler)
+ upgraded.addCallback(subUpgraded)
+ return upgraded
=== modified file 'Axiom/axiom/test/test_scheduler.py'
--- Axiom/axiom/test/test_scheduler.py 2009-07-07 20:35:43 +0000
+++ Axiom/axiom/test/test_scheduler.py 2012-01-02 18:04:35 +0000
@@ -6,21 +6,20 @@
from twisted.trial import unittest
from twisted.trial.unittest import TestCase
from twisted.application.service import IService
-from twisted.internet.defer import Deferred
from twisted.internet.task import Clock
-from twisted.python import filepath, versions
+from twisted.python import filepath
from epsilon.extime import Time
from axiom.scheduler import TimedEvent, _SubSchedulerParentHook, TimedEventFailureLog
from axiom.scheduler import Scheduler, SubScheduler
+from axiom.scheduler import SITE_SCHEDULER
from axiom.store import Store
from axiom.item import Item
from axiom.substore import SubStore
from axiom.attributes import integer, text, inmemory, boolean, timestamp
from axiom.iaxiom import IScheduler
-from axiom.dependency import installOn
class TestEvent(Item):
@@ -169,7 +168,6 @@
"""
Test the unscheduleFirst method of the scheduler.
"""
- d = Deferred()
sch = IScheduler(self.store)
t1 = TestEvent(testCase=self, name=u't1', store=self.store)
t2 = TestEvent(testCase=self, name=u't2', store=self.store, runAgain=None)
@@ -252,6 +250,16 @@
super(TopStoreSchedTest, self).setUp()
+ def test_namedService(self):
+ """
+ The site store IScheduler implementation can be retrieved as a named
+ service from the store's IServiceCollection powerup.
+ """
+ self.assertIdentical(
+ IService(self.store).getServiceNamed(SITE_SCHEDULER),
+ IScheduler(self.store))
+
+
def testBasicScheduledError(self):
S = IScheduler(self.store)
S.schedule(NotActuallyRunnable(store=self.store), self.now())
@@ -408,6 +416,17 @@
return service.stopService()
+ def test_schedulerStartsWhenServiceStarts(self):
+ """
+ Test that IScheduler(store).startService() gets called whenever
+ IService(store).startService() is called.
+ """
+ service = IService(self.store)
+ service.startService()
+ scheduler = service.getServiceNamed(SITE_SCHEDULER)
+ self.assertTrue(scheduler.running)
+
+
def test_scheduleWhileStopped(self):
"""
Test that a schedule call on a L{Scheduler} which has not been started
@@ -789,7 +808,8 @@
storeID = self.oldScheduler.storeID
del self.oldScheduler
gc.collect()
- scheduler = self.store.getItemByID(storeID)
+ # Just load the scheduler, we don't need to use it for anything.
+ self.store.getItemByID(storeID)
warnings = self.flushWarnings([self.test_deprecated])
self.assertEquals(len(warnings), 1)
self.assertEquals(warnings[0]['category'], PendingDeprecationWarning)
=== modified file 'Axiom/axiom/test/test_upgrading.py'
--- Axiom/axiom/test/test_upgrading.py 2009-01-02 14:21:43 +0000
+++ Axiom/axiom/test/test_upgrading.py 2012-01-02 18:04:35 +0000
@@ -12,7 +12,7 @@
from twisted.trial import unittest
from twisted.python import filepath
from twisted.application.service import IService
-from twisted.internet.defer import maybeDeferred
+from twisted.internet.defer import maybeDeferred, succeed
from twisted.python.reflect import namedModule
from twisted.python import log
@@ -116,15 +116,31 @@
self.currentStore = store.Store(self.dbdir, debug=dbg)
return self.currentStore
+
def closeStore(self):
- self.currentStore.close()
- self.currentStore = None
+ """
+ Close C{self.currentStore} and discard the reference. If there is a
+ store service running, stop it first.
+ """
+ service = IService(self.currentStore)
+ if service.running:
+ result = service.stopService()
+ else:
+ result = succeed(None)
+ def close(ignored):
+ self.currentStore.close()
+ self.currentStore = None
+ result.addCallback(close)
+ return result
+
def startStoreService(self):
svc = IService(self.currentStore)
svc.getServiceNamed("Batch Processing Controller").disownServiceParent()
svc.startService()
+
+
def _logMessagesFrom(f):
L = []
log.addObserver(L.append)
@@ -308,6 +324,8 @@
player = s.getItemByID(playerID, autoUpgrade=False)
sword = s.getItemByID(swordID, autoUpgrade=False)
self._testPlayerAndSwordState(player, sword)
+ # Stop that service we started.
+ return IService(s).stopService()
return s.whenFullyUpgraded().addCallback(afterUpgrade)
@@ -367,8 +385,6 @@
s = self.openStore()
self.startStoreService()
def afterFirstUpgrade(result):
- self.closeStore()
-
choose(morenewapp)
s = self.openStore()
self.startStoreService()
@@ -379,7 +395,10 @@
sword = store.getItemByID(swordID, autoUpgrade=False)
self._testPlayerAndSwordState(player, sword)
- return s.whenFullyUpgraded().addCallback(afterFirstUpgrade)
+ d = s.whenFullyUpgraded()
+ d.addCallback(lambda ignored: self.closeStore())
+ d.addCallback(afterFirstUpgrade)
+ return d
@@ -399,11 +418,25 @@
self.currentSubStore = ss.open()
return self.currentSubStore
+
def closeStore(self):
- self.currentSubStore.close()
- self.currentTopStore.close()
- self.currentSubStore = None
- self.currentTopStore = None
+ """
+ Close C{self.currentTopStore} and C{self.currentSubStore}. If there is
+ a store service running in C{self.currentTopStore}, stop it first.
+ """
+ service = IService(self.currentTopStore)
+ if service.running:
+ result = service.stopService()
+ else:
+ result = succeed(None)
+ def stopped(ignored):
+ self.currentSubStore.close()
+ self.currentTopStore.close()
+ self.currentSubStore = None
+ self.currentTopStore = None
+ result.addCallback(stopped)
+ return result
+
def startStoreService(self):
svc = IService(self.currentTopStore)
Follow ups