← Back to team overview

divmod-dev team mailing list archive

[Merge] lp:~divmod-dev/divmod.org/batch-process-start-heuristic-2963 into lp:divmod.org

 

Jean-Paul Calderone has proposed merging lp:~divmod-dev/divmod.org/batch-process-start-heuristic-2963 into lp:divmod.org.

Requested reviews:
  Divmod-dev (divmod-dev)
Related bugs:
  Bug #910927 in Divmod Axiom: "The batch process is not started up automatically"
  https://bugs.launchpad.net/divmod-axiom/+bug/910927

For more details, see:
https://code.launchpad.net/~divmod-dev/divmod.org/batch-process-start-heuristic-2963/+merge/87279

This changes the batch processing system to start the batch process any time there are remote listeners for an item that is added.  This doesn't make the behaviour strictly correct, but it improves it enough to make Quotient usable.

-- 
https://code.launchpad.net/~divmod-dev/divmod.org/batch-process-start-heuristic-2963/+merge/87279
Your team Divmod-dev is requested to review the proposed merge of lp:~divmod-dev/divmod.org/batch-process-start-heuristic-2963 into lp:divmod.org.
=== modified file 'Axiom/axiom/batch.py'
--- Axiom/axiom/batch.py	2011-08-20 01:48:56 +0000
+++ Axiom/axiom/batch.py	2012-01-02 18:03:59 +0000
@@ -400,7 +400,8 @@
         processor is being added to the database.
 
         If this processor is not already scheduled to run, this will schedule
-        it.
+        it.  It will also start the batch process if it is not yet running and
+        there are any registered remote listeners.
         """
         localCount = self.store.query(
             _ReliableListener,
@@ -408,9 +409,19 @@
                            _ReliableListener.style == iaxiom.LOCAL),
             limit=1).count()
 
+        remoteCount = self.store.query(
+            _ReliableListener,
+            attributes.AND(_ReliableListener.processor == self,
+                           _ReliableListener.style == iaxiom.REMOTE),
+            limit=1).count()
+
         if localCount and self.scheduled is None:
             self.scheduled = extime.Time()
             iaxiom.IScheduler(self.store).schedule(self, self.scheduled)
+        if remoteCount:
+            batchService = iaxiom.IBatchService(self.store, None)
+            if batchService is not None:
+                batchService.start()
 
 
 
@@ -894,7 +905,13 @@
     """
     Controls starting, stopping, and passing messages to the system process in
     charge of remote batch processing.
+
+    @ivar batchController: A reference to the L{ProcessController} for
+        interacting with the batch process, if one exists.  Otherwise C{None}.
     """
+    implements(iaxiom.IBatchService)
+
+    batchController = None
 
     def __init__(self, store):
         self.store = store
@@ -948,6 +965,11 @@
                            method=method).do)
 
 
+    def start(self):
+        if self.batchController is not None:
+            self.batchController.getProcess()
+
+
     def suspend(self, storepath, storeID):
         return self.batchController.getProcess().addCallback(
             SuspendProcessor(storepath=storepath, storeid=storeID).do)
@@ -977,6 +999,10 @@
         return self.service.call(itemMethod)
 
 
+    def start(self):
+        self.service.start()
+
+
     def suspend(self, storeID):
         return self.service.suspend(self.storepath, storeID)
 
@@ -987,9 +1013,23 @@
 
 
 def storeBatchServiceSpecialCase(st, pups):
+    """
+    Adapt a L{Store} to L{IBatchService}.
+
+    If C{st} is a substore, return a simple wrapper that delegates to the site
+    store's L{IBatchService} powerup.  Return C{None} if C{st} has no
+    L{BatchProcessingControllerService}.
+    """
     if st.parent is not None:
-        return _SubStoreBatchChannel(st)
-    return service.IService(st).getServiceNamed("Batch Processing Controller")
+        try:
+            return _SubStoreBatchChannel(st)
+        except TypeError:
+            return None
+    storeService = service.IService(st)
+    try:
+        return storeService.getServiceNamed("Batch Processing Controller")
+    except KeyError:
+        return None
 
 
 

=== modified file 'Axiom/axiom/iaxiom.py'
--- Axiom/axiom/iaxiom.py	2009-07-07 20:35:43 +0000
+++ Axiom/axiom/iaxiom.py	2012-01-02 18:03:59 +0000
@@ -309,11 +309,18 @@
         """
 
 
+
 class IBatchService(Interface):
     """
     Object which allows minimal communication with L{IReliableListener}
     providers which are running remotely (that is, with the L{REMOTE} style).
     """
+    def start():
+        """
+        Start the remote batch process if it has not yet been started, otherwise
+        do nothing.
+        """
+
 
     def suspend(storeID):
         """
@@ -324,6 +331,7 @@
         @return: A Deferred which fires when the listener has been suspended.
         """
 
+
     def resume(storeID):
         """
         @type storeID: C{int}
@@ -333,6 +341,8 @@
         @return: A Deferred which fires when the listener has been resumed.
         """
 
+
+
 class IVersion(Interface):
     """
     Object with version information for a package that creates Axiom

=== modified file 'Axiom/axiom/test/test_batch.py'
--- Axiom/axiom/test/test_batch.py	2011-08-19 23:30:29 +0000
+++ Axiom/axiom/test/test_batch.py	2012-01-02 18:03:59 +0000
@@ -531,6 +531,29 @@
 
 
 class RemoteTestCase(unittest.TestCase):
+    def test_noBatchService(self):
+        """
+        A L{Store} with no database directory cannot be adapted to
+        L{iaxiom.IBatchService}.
+        """
+        st = store.Store()
+        self.assertRaises(TypeError, iaxiom.IBatchService, st)
+        self.assertIdentical(
+            iaxiom.IBatchService(st, None), None)
+
+
+    def test_subStoreNoBatchService(self):
+        """
+        A user L{Store} attached to a site L{Store} with no database directory
+        cannot be adapted to L{iaxiom.IBatchService}.
+        """
+        st = store.Store(filesdir=self.mktemp())
+        ss = substore.SubStore.createNew(st, 'substore').open()
+        self.assertRaises(TypeError, iaxiom.IBatchService, ss)
+        self.assertIdentical(
+            iaxiom.IBatchService(ss, None), None)
+
+
     def testBatchService(self):
         """
         Make sure SubStores can be adapted to L{iaxiom.IBatchService}.
@@ -606,3 +629,101 @@
         self.assertEquals(
             st.query(BatchWorkItem, BatchWorkItem.value == u"processed").count(),
             BATCH_WORK_UNITS)
+
+
+    def test_itemAddedStartsBatchProcess(self):
+        """
+        If there are remote-style listeners for an item source, C{itemAdded}
+        starts the batch process.
+
+        This is not completely correct.  There may be items to process remotely
+        when the main process starts up, before any new items are added.  This
+        is simpler to implement, but it shouldn't be taken as a reason not to
+        implement the actually correct solution.
+        """
+        st = store.Store(self.mktemp())
+        svc = service.IService(st)
+        svc.startService()
+        self.addCleanup(svc.stopService)
+
+        batchService = iaxiom.IBatchService(st)
+
+        procType = batch.processor(TestWorkUnit)
+        proc = procType(store=st)
+        listener = WorkListener(store=st)
+        proc.addReliableListener(listener, style=iaxiom.REMOTE)
+
+        # Sanity check: addReliableListener should eventually also trigger a
+        # batch process start if necessary.  But we don't want to test that case
+        # here, so make sure it's not happening.
+        self.assertEquals(batchService.batchController.mode, 'stopped')
+
+        # Now trigger it to start.
+        proc.itemAdded()
+
+        # It probably won't be ready by now, but who knows.
+        self.assertIn(batchService.batchController.mode, ('starting', 'ready'))
+
+
+    def test_itemAddedBeforeStarted(self):
+        """
+        If C{itemAdded} is called before the batch service is started, the batch
+        process is not started.
+        """
+        st = store.Store(self.mktemp())
+
+        procType = batch.processor(TestWorkUnit)
+        proc = procType(store=st)
+        listener = WorkListener(store=st)
+        proc.addReliableListener(listener, style=iaxiom.REMOTE)
+
+        proc.itemAdded()
+
+        # When the service later starts, the batch service needn't start its
+        # process.  Not that this would be bad.  Feel free to reverse this
+        # behavior if you really want.
+        svc = service.IService(st)
+        svc.startService()
+        self.addCleanup(svc.stopService)
+
+        batchService = iaxiom.IBatchService(st)
+        self.assertEquals(batchService.batchController.mode, 'stopped')
+
+
+    def test_itemAddedWithoutBatchService(self):
+        """
+        If the store has no batch service, C{itemAdded} doesn't start the batch
+        process and also doesn't raise an exception.
+        """
+        # An in-memory store can't have a batch service.
+        st = store.Store()
+        svc = service.IService(st)
+        svc.startService()
+        self.addCleanup(svc.stopService)
+
+        procType = batch.processor(TestWorkUnit)
+        proc = procType(store=st)
+        listener = WorkListener(store=st)
+        proc.addReliableListener(listener, style=iaxiom.REMOTE)
+
+        proc.itemAdded()
+
+        # And still there should be no batch service at all.
+        self.assertIdentical(iaxiom.IBatchService(st, None), None)
+
+
+    def test_subStoreBatchServiceStart(self):
+        """
+        The substore implementation of L{IBatchService.start} starts the batch
+        process.
+        """
+        st = store.Store(self.mktemp())
+        svc = service.IService(st)
+        svc.startService()
+        self.addCleanup(svc.stopService)
+
+        ss = substore.SubStore.createNew(st, 'substore').open()
+        iaxiom.IBatchService(ss).start()
+
+        batchService = iaxiom.IBatchService(st)
+        self.assertIn(batchService.batchController.mode, ('starting', 'ready'))


Follow ups