← Back to team overview

divmod-dev team mailing list archive

lp:~divmod-dev/divmod.org/start-scheduler-when-parent-service-starts-3000-2 into lp:divmod.org

 

Jean-Paul Calderone has proposed merging lp:~divmod-dev/divmod.org/start-scheduler-when-parent-service-starts-3000-2 into lp:divmod.org.

Requested reviews:
  Divmod-dev (divmod-dev)
Related bugs:
  Bug #910923 in Divmod Axiom: "Scheduled events aren't automatically executed"
  https://bugs.launchpad.net/divmod-axiom/+bug/910923

For more details, see:
https://code.launchpad.net/~divmod-dev/divmod.org/start-scheduler-when-parent-service-starts-3000-2/+merge/87280

This fixes the scheduling startup issue by ensuring the scheduler powerup is created and associated with the store service as soon as the store service is created.
-- 
https://code.launchpad.net/~divmod-dev/divmod.org/start-scheduler-when-parent-service-starts-3000-2/+merge/87280
Your team Divmod-dev is requested to review the proposed merge of lp:~divmod-dev/divmod.org/start-scheduler-when-parent-service-starts-3000-2 into lp:divmod.org.
=== modified file 'Axiom/axiom/scheduler.py'
--- Axiom/axiom/scheduler.py	2009-07-07 20:35:43 +0000
+++ Axiom/axiom/scheduler.py	2012-01-02 18:04:35 +0000
@@ -1,5 +1,28 @@
 # -*- test-case-name: axiom.test.test_scheduler -*-
 
+"""
+Timed event scheduling for Axiom databases.
+
+With this module, applications can schedule an L{Item} to have its C{run} method
+called at a particular point in the future.  This call will happen even if the
+process which initially schedules it exits and the database is later re-opened
+by another process (of course, if the scheduled time comes and goes while no
+process is using the database, then the call will be delayed until some process
+opens the database and starts its services).
+
+This module contains two implementations of the L{axiom.iaxiom.IScheduler}
+interface, one for site stores and one for sub-stores.  Items can only be
+scheduled using an L{IScheduler} implementations from the store containing the
+item.  This means a typical way to schedule an item to be run is::
+
+    IScheduler(item.store).schedule(item, when)
+
+The scheduler service can also be retrieved from the site store's service
+collection by name::
+
+    IServiceCollection(siteStore).getServiceNamed(SITE_SCHEDULER)
+"""
+
 import warnings
 
 from zope.interface import implements
@@ -20,6 +43,9 @@
 
 VERBOSE = False
 
+SITE_SCHEDULER = u"Site Scheduler"
+
+
 class TimedEventFailureLog(Item):
     typeName = 'timed_event_failure_log'
     schemaVersion = 1
@@ -213,6 +239,7 @@
 
     def __init__(self, store):
         self.store = store
+        self.setName(SITE_SCHEDULER)
 
 
     def startService(self):

=== modified file 'Axiom/axiom/store.py'
--- Axiom/axiom/store.py	2009-07-07 20:35:43 +0000
+++ Axiom/axiom/store.py	2012-01-02 18:04:35 +0000
@@ -140,6 +140,12 @@
         batcher = batch.BatchProcessingControllerService(st)
         batcher.setServiceParent(collection)
 
+    scheduler = iaxiom.IScheduler(st)
+    # If it's an old database, we might get a SubScheduler instance.  It has no
+    # setServiceParent method.
+    if getattr(scheduler, 'setServiceParent', None) is not None:
+        scheduler.setServiceParent(collection)
+
     return collection
 
 
@@ -989,7 +995,6 @@
                 sched = _SiteScheduler(empowered)
             else:
                 sched = _UserScheduler(empowered)
-            sched.setServiceParent(IService(empowered))
             empowered._schedulerService = sched
         return empowered._schedulerService
     return None
@@ -1522,8 +1527,8 @@
 
         if (key[0], key[1] + 1) in onDiskSchema:
             raise RuntimeError(
-                "Greater versions of database %r objects in the DB than in memory" %
-                (actualType.typeName,))
+                "Memory version of %r is %d; database has newer" % (
+                    actualType.typeName, key[1]))
 
 
     # finally find old versions of the data and prepare to upgrade it.

=== modified file 'Axiom/axiom/test/historic/stub_subscheduler1to2.py'
--- Axiom/axiom/test/historic/stub_subscheduler1to2.py	2009-07-07 20:35:43 +0000
+++ Axiom/axiom/test/historic/stub_subscheduler1to2.py	2012-01-02 18:04:35 +0000
@@ -7,12 +7,14 @@
 
 from axiom.test.historic.stubloader import saveStub
 
+from axiom.substore import SubStore
 from axiom.scheduler import SubScheduler
 from axiom.dependency import installOn
 
 
 def createDatabase(store):
-    installOn(SubScheduler(store=store), store)
+    sub = SubStore.createNew(store, ["substore"]).open()
+    installOn(SubScheduler(store=sub), sub)
 
 
 if __name__ == '__main__':

=== modified file 'Axiom/axiom/test/historic/subscheduler1to2.axiom.tbz2'
Binary files Axiom/axiom/test/historic/subscheduler1to2.axiom.tbz2	2009-07-07 20:35:43 +0000 and Axiom/axiom/test/historic/subscheduler1to2.axiom.tbz2	2012-01-02 18:04:35 +0000 differ
=== modified file 'Axiom/axiom/test/historic/test_subscheduler1to2.py'
--- Axiom/axiom/test/historic/test_subscheduler1to2.py	2009-07-07 20:35:43 +0000
+++ Axiom/axiom/test/historic/test_subscheduler1to2.py	2012-01-02 18:04:35 +0000
@@ -5,6 +5,7 @@
 """
 
 from axiom.iaxiom import IScheduler
+from axiom.substore import SubStore
 from axiom.scheduler import SubScheduler, _UserScheduler
 from axiom.test.historic.stubloader import StubbedTest
 
@@ -16,11 +17,12 @@
         and adapting the L{Store} to L{IScheduler} succeeds with a
         L{_UserScheduler}.
         """
-        scheduler = self.store.findUnique(SubScheduler)
-        self.assertEquals(list(self.store.interfacesFor(scheduler)), [])
-
-        # Slothfully grant this test store the appearance of being a user
-        # store.
-        self.store.parent = self.store
-
-        self.assertIsInstance(IScheduler(self.store), _UserScheduler)
+        sub = self.store.findFirst(SubStore).open()
+        upgraded = sub.whenFullyUpgraded()
+        def subUpgraded(ignored):
+            scheduler = sub.findUnique(SubScheduler)
+            self.assertEquals(list(sub.interfacesFor(scheduler)), [])
+
+            self.assertIsInstance(IScheduler(sub), _UserScheduler)
+        upgraded.addCallback(subUpgraded)
+        return upgraded

=== modified file 'Axiom/axiom/test/test_scheduler.py'
--- Axiom/axiom/test/test_scheduler.py	2009-07-07 20:35:43 +0000
+++ Axiom/axiom/test/test_scheduler.py	2012-01-02 18:04:35 +0000
@@ -6,21 +6,20 @@
 from twisted.trial import unittest
 from twisted.trial.unittest import TestCase
 from twisted.application.service import IService
-from twisted.internet.defer import Deferred
 from twisted.internet.task import Clock
-from twisted.python import filepath, versions
+from twisted.python import filepath
 
 from epsilon.extime import Time
 
 from axiom.scheduler import TimedEvent, _SubSchedulerParentHook, TimedEventFailureLog
 from axiom.scheduler import Scheduler, SubScheduler
+from axiom.scheduler import SITE_SCHEDULER
 from axiom.store import Store
 from axiom.item import Item
 from axiom.substore import SubStore
 
 from axiom.attributes import integer, text, inmemory, boolean, timestamp
 from axiom.iaxiom import IScheduler
-from axiom.dependency import installOn
 
 class TestEvent(Item):
 
@@ -169,7 +168,6 @@
         """
         Test the unscheduleFirst method of the scheduler.
         """
-        d = Deferred()
         sch = IScheduler(self.store)
         t1 = TestEvent(testCase=self, name=u't1', store=self.store)
         t2 = TestEvent(testCase=self, name=u't2', store=self.store, runAgain=None)
@@ -252,6 +250,16 @@
         super(TopStoreSchedTest, self).setUp()
 
 
+    def test_namedService(self):
+        """
+        The site store IScheduler implementation can be retrieved as a named
+        service from the store's IServiceCollection powerup.
+        """
+        self.assertIdentical(
+            IService(self.store).getServiceNamed(SITE_SCHEDULER),
+            IScheduler(self.store))
+
+
     def testBasicScheduledError(self):
         S = IScheduler(self.store)
         S.schedule(NotActuallyRunnable(store=self.store), self.now())
@@ -408,6 +416,17 @@
             return service.stopService()
 
 
+    def test_schedulerStartsWhenServiceStarts(self):
+        """
+        Test that IScheduler(store).startService() gets called whenever
+        IService(store).startService() is called.
+        """
+        service = IService(self.store)
+        service.startService()
+        scheduler = service.getServiceNamed(SITE_SCHEDULER)
+        self.assertTrue(scheduler.running)
+
+
     def test_scheduleWhileStopped(self):
         """
         Test that a schedule call on a L{Scheduler} which has not been started
@@ -789,7 +808,8 @@
         storeID = self.oldScheduler.storeID
         del self.oldScheduler
         gc.collect()
-        scheduler = self.store.getItemByID(storeID)
+        # Just load the scheduler, we don't need to use it for anything.
+        self.store.getItemByID(storeID)
         warnings = self.flushWarnings([self.test_deprecated])
         self.assertEquals(len(warnings), 1)
         self.assertEquals(warnings[0]['category'], PendingDeprecationWarning)

=== modified file 'Axiom/axiom/test/test_upgrading.py'
--- Axiom/axiom/test/test_upgrading.py	2009-01-02 14:21:43 +0000
+++ Axiom/axiom/test/test_upgrading.py	2012-01-02 18:04:35 +0000
@@ -12,7 +12,7 @@
 from twisted.trial import unittest
 from twisted.python import filepath
 from twisted.application.service import IService
-from twisted.internet.defer import maybeDeferred
+from twisted.internet.defer import maybeDeferred, succeed
 from twisted.python.reflect import namedModule
 from twisted.python import log
 
@@ -116,15 +116,31 @@
         self.currentStore = store.Store(self.dbdir, debug=dbg)
         return self.currentStore
 
+
     def closeStore(self):
-        self.currentStore.close()
-        self.currentStore = None
+        """
+        Close C{self.currentStore} and discard the reference.  If there is a
+        store service running, stop it first.
+        """
+        service = IService(self.currentStore)
+        if service.running:
+            result = service.stopService()
+        else:
+            result = succeed(None)
+        def close(ignored):
+            self.currentStore.close()
+            self.currentStore = None
+        result.addCallback(close)
+        return result
+
 
     def startStoreService(self):
         svc = IService(self.currentStore)
         svc.getServiceNamed("Batch Processing Controller").disownServiceParent()
         svc.startService()
 
+
+
 def _logMessagesFrom(f):
     L = []
     log.addObserver(L.append)
@@ -308,6 +324,8 @@
             player = s.getItemByID(playerID, autoUpgrade=False)
             sword = s.getItemByID(swordID, autoUpgrade=False)
             self._testPlayerAndSwordState(player, sword)
+            # Stop that service we started.
+            return IService(s).stopService()
 
         return s.whenFullyUpgraded().addCallback(afterUpgrade)
 
@@ -367,8 +385,6 @@
         s = self.openStore()
         self.startStoreService()
         def afterFirstUpgrade(result):
-            self.closeStore()
-
             choose(morenewapp)
             s = self.openStore()
             self.startStoreService()
@@ -379,7 +395,10 @@
             sword = store.getItemByID(swordID, autoUpgrade=False)
             self._testPlayerAndSwordState(player, sword)
 
-        return s.whenFullyUpgraded().addCallback(afterFirstUpgrade)
+        d = s.whenFullyUpgraded()
+        d.addCallback(lambda ignored: self.closeStore())
+        d.addCallback(afterFirstUpgrade)
+        return d
 
 
 
@@ -399,11 +418,25 @@
             self.currentSubStore = ss.open()
         return self.currentSubStore
 
+
     def closeStore(self):
-        self.currentSubStore.close()
-        self.currentTopStore.close()
-        self.currentSubStore = None
-        self.currentTopStore = None
+        """
+        Close C{self.currentTopStore} and C{self.currentSubStore}.  If there is
+        a store service running in C{self.currentTopStore}, stop it first.
+        """
+        service = IService(self.currentTopStore)
+        if service.running:
+            result = service.stopService()
+        else:
+            result = succeed(None)
+        def stopped(ignored):
+            self.currentSubStore.close()
+            self.currentTopStore.close()
+            self.currentSubStore = None
+            self.currentTopStore = None
+        result.addCallback(stopped)
+        return result
+
 
     def startStoreService(self):
         svc = IService(self.currentTopStore)


Follow ups