← Back to team overview

divmod-dev team mailing list archive

[Merge] lp:~exarkun/divmod.org/database-classifier-persistence into lp:divmod.org

 

Jean-Paul Calderone has proposed merging lp:~exarkun/divmod.org/database-classifier-persistence into lp:divmod.org.

Requested reviews:
  Divmod-dev (divmod-dev)
Related bugs:
  Bug #1035758 in Quotient: "Switch spambayes training persistence away from pickle"
  https://bugs.launchpad.net/quotient/+bug/1035758

For more details, see:
https://code.launchpad.net/~exarkun/divmod.org/database-classifier-persistence/+merge/120231

Change the persistence of spambayes classifier data to use SQLite3 instead of pickle, to improve performance with large training sets (and probably to provide other benefits, like reliability, introspectability, maintainability, etc).

-- 
https://code.launchpad.net/~exarkun/divmod.org/database-classifier-persistence/+merge/120231
Your team Divmod-dev is requested to review the proposed merge of lp:~exarkun/divmod.org/database-classifier-persistence into lp:divmod.org.
=== modified file 'Quotient/xquotient/spam.py'
--- Quotient/xquotient/spam.py	2012-07-26 15:59:07 +0000
+++ Quotient/xquotient/spam.py	2012-08-17 20:25:22 +0000
@@ -2,6 +2,7 @@
 
 import cPickle, errno
 from decimal import Decimal
+import sqlite3
 
 from spambayes import hammie, classifier
 
@@ -362,38 +363,248 @@
     return df
 registerUpgrader(_dspamFilter1to2, DSPAMFilter.typeName, 1, 2)
 
+class _SQLite3Classifier(object, classifier.Classifier):
+    """
+    A Spambayes classifier which implements training dataset persistence in a
+    SQLite3 database.
+
+    The dataset is persisted in a dedicated SQLite3 database rather than in the
+    user store because the data is accessed from the batch process.  A dedicated
+    database will lead to fewer contention problems than re-using the user
+    store.
+
+    Axiom is not used for the training dataset database because of the
+    complicated, inflexible hooks supplied by the base
+    L{spambayes.classifier.Classifier}, which in particular make getting
+    transaction management difficult with Axiom.
+    """
+
+    SCHEMA = [
+        "CREATE TABLE bayes ("
+        "  word TEXT NOT NULL DEFAULT '',"
+        "  nspam INTEGER NOT NULL DEFAULT 0,"
+        "  nham INTEGER NOT NULL DEFAULT 0,"
+        "  PRIMARY KEY(word)"
+        ")",
+        "CREATE INDEX bayes_word ON bayes(word)",
+        "CREATE TABLE state ("
+        "  nspam INTEGER NOT NULL,"
+        "  nham INTEGER NOT NULL"
+        ")",
+        "INSERT INTO state (nspam, nham) VALUES (0, 0)",
+        ]
+
+    def nspam():
+        """
+        Define an C{nspam} property which reflects the number of messages
+        trained as spam, while also automatically persisting any changes to this
+        value (which the base class will make) to the database.
+        """
+        def get(self):
+            return self._nspam
+        def set(self, value):
+            self._nspam = value
+            self._recordState()
+        return get, set
+    nspam = property(*nspam())
+
+    def nham():
+        """
+        Define a C{nham} property which reflects the number of messages trained
+        as ham, while also automatically persisting any changes to this value
+        (which the base class will make) to the database.
+        """
+        def get(self):
+            return self._nham
+        def set(self, value):
+            self._nham = value
+            self._recordState()
+        return get, set
+    nham = property(*nham())
+
+    db = cursor = databaseName = None
+
+    def __init__(self, databaseName):
+        """
+        Initialize this classifier.
+
+        @param databaseName: The path to the SQLite3 database from which to load
+            and to which to store training data.
+        """
+        classifier.Classifier.__init__(self)
+        self.databaseName = databaseName
+        # Open the database, possibly initializing it if it has not yet been
+        # initialized, and then load the necessary global state from it (nspam,
+        # nham).
+        self.load()
+
+
+    def load(self):
+        """
+        Open the training dataset database, initializing it if necessary, and
+        loading necessary initial state from it.
+        """
+        self.db = sqlite3.connect(self.databaseName, isolation_level='IMMEDIATE')
+        self.cursor = self.db.cursor()
+        try:
+            for statement in self.SCHEMA:
+                self.cursor.execute(statement)
+        except sqlite3.OperationalError as e:
+            # Table already exists
+            self.db.rollback()
+        else:
+            self.db.commit()
+
+        self.cursor.execute('SELECT nspam, nham FROM state')
+        rows = self.cursor.fetchall()
+        self._nspam, self._nham = rows[0]
+
+
+    def close(self):
+        """
+        Close the training dataset database.
+        """
+        self.cursor.close()
+        self.db.close()
+        self.db = self.cursor = None
+
+
+    def _recordState(self):
+        """
+        Save nspam and nham, if the database has been opened (it will not have
+        been opened when this gets run from the nspam/nham property setters
+        invoked from Classifier.__init__, and that's just fine with us).
+        """
+        if self.cursor is not None:
+            self.cursor.execute('UPDATE state SET nspam=?, nham=?', (self._nspam, self._nham))
+
+
+    def _get(self, word):
+        """
+        Load the training data for the given word.
+
+        @param word: A word (or any other kind of token) to load information about.
+        @type word: C{str} or C{unicode} (but really, C{unicode} please)
+
+        @return: C{None} if there is no training data for the given word,
+            otherwise a two-sequence of the number of times the token has been
+            trained as spam and the number of times it has been trained as ham.
+        """
+        if isinstance(word, str):
+            word = word.decode('utf-8', 'replace')
+
+        self.cursor.execute(
+            "SELECT word, nspam, nham FROM bayes WHERE word=?", (word,))
+        rows = self.cursor.fetchall()
+        if rows:
+            return rows[0]
+        return None
+
+
+    def _set(self, word, nspam, nham):
+        """
+        Update the training data for a particular word.
+
+        @param word: A word (or any other kind of token) to store training
+            information about.
+        @type word: C{str} or C{unicode} (but really, C{unicode} please)
+
+        @param nspam: The number of times the token has been trained as spam.
+        @param nham: The number of times the token has been trained as ham.
+        """
+        if isinstance(word, str):
+            word = word.decode('utf-8', 'replace')
+        self.cursor.execute(
+            "INSERT OR REPLACE INTO bayes (word, nspam, nham) "
+            "VALUES (?, ?, ?)",
+            (word, nspam, nham))
+
+
+    def _delete(self, word):
+        """
+        Forget the training data for a particular word.
+
+        @param word: A word (or any other kind of token) to lose training
+            information about.
+        @type word: C{str} or C{unicode} (but really, C{unicode} please)
+        """
+        if isinstance(word, str):
+            word = word.decode('utf-8', 'replace')
+        self.cursor.execute("DELETE FROM bayes WHERE word=?", (word,))
+
+
+    def _post_training(self):
+        """
+        L{Classifier} hook invoked after all other steps of training a message
+        have been completed.  This is used to commit the currently active
+        transaction, which contains all of the database modifications for each
+        token in that message.
+        """
+        self.db.commit()
+
+
+    def _bulkwordinfoset(self, words):
+        """
+        Upgrade helper for recording spam and ham information for many words at
+        once.
+        """
+        self.cursor.executemany(
+            "INSERT OR REPLACE INTO bayes (word, nspam, nham) "
+            "VALUES (?, ?, ?)",
+            ((word.decode('utf-8', 'replace'), record.spamcount, record.hamcount)
+             for (word, record)
+             in words))
+
+
+    def _wordinfoset(self, word, record):
+        """
+        L{Classifier} hook invoked to record a changed spam and ham data for a
+        single word.
+        """
+        self._set(word, record.spamcount, record.hamcount)
+
+
+    def _wordinfoget(self, word):
+        """
+        L{Classifier} hook invoked to retrieve persisted spam and ham data for a
+        single word.
+        """
+        row = self._get(word)
+        if row:
+            item = self.WordInfoClass()
+            item.__setstate__((row[1], row[2]))
+            return item
+        return None
+
+
+    def _wordinfodel(self, word):
+        """
+        L{Classifier} hook invoked to discard the persisted spam and ham data
+        for a single word.
+        """
+        self._delete(word)
+
+
+
 class SpambayesFilter(item.Item):
     """
     Spambayes-based L{iquotient.IHamFilter} powerup.
     """
     implements(iquotient.IHamFilter)
-    schemaVersion = 2
+    schemaVersion = 3
     classifier = attributes.inmemory()
     guesser = attributes.inmemory()
     filter = dependsOn(Filter)
 
     powerupInterfaces = (iquotient.IHamFilter,)
 
-
-
     def _classifierPath(self):
-        return self.store.newFilePath('spambayes-%d-classifier.pickle' % (self.storeID,))
+        return self.store.newFilePath('spambayes-%d-classifier.sqlite' % (self.storeID,))
 
 
     def activate(self):
-        try:
-            try:
-                c = cPickle.load(self._classifierPath().open())
-            except IOError, e:
-                if e.errno != errno.ENOENT:
-                    raise
-                c = classifier.Classifier()
-        except:
-            log.msg("Loading Spambayes trained state failed:")
-            log.err()
-            c = classifier.Classifier()
-        self.classifier = c
-        self.guesser = hammie.Hammie(c, mode='r')
+        self.classifier = _SQLite3Classifier(self._classifierPath().path)
+        self.guesser = hammie.Hammie(self.classifier, mode='r')
 
 
     # IHamFilter
@@ -418,21 +629,13 @@
             else:
                 if self.classify(item) > SPAM_THRESHHOLD:
                     break
-        self.classify(item)
-        p = self._classifierPath()
-        if not p.parent().exists():
-            p.parent().makedirs()
-        t = p.temporarySibling()
-        cPickle.dump(self.classifier, t.open())
-        t.moveTo(p)
 
 
     def forgetTraining(self):
         p = self._classifierPath()
         if p.exists():
             p.remove()
-            self.classifier = classifier.Classifier()
-            self.guesser = hammie.Hammie(self.classifier, mode='r')
+            self.activate()
 
 
 item.declareLegacyItem(SpambayesFilter.typeName, 1, dict(
@@ -444,6 +647,38 @@
     return sbf
 registerUpgrader(_sbFilter1to2, SpambayesFilter.typeName, 1, 2)
 
+item.declareLegacyItem(SpambayesFilter.typeName, 2, dict(
+    filter=attributes.reference()))
+
+
+def _sbFilter2to3(old):
+    """
+    Convert the pickled spambayes data to a SQLite3 database of the same data.
+    """
+    sbf = old.upgradeVersion(
+        SpambayesFilter.typeName, 2, 3, filter=old.filter)
+    path = sbf.store.newFilePath('spambayes-%d-classifier.pickle' % (sbf.storeID,))
+    try:
+        fObj = path.open()
+    except IOError, e:
+        if e.errno != errno.ENOENT:
+            raise
+        else:
+            return sbf
+    try:
+        dataset = cPickle.load(fObj)
+    finally:
+        fObj.close()
+
+    words = dataset._wordinfokeys()
+    sbf.classifier._bulkwordinfoset(
+        ((word, dataset._wordinfoget(word)) for word in words))
+    sbf.classifier.nspam = dataset.nspam
+    sbf.classifier.nham = dataset.nham
+    sbf.classifier._post_training()
+    return sbf
+registerUpgrader(_sbFilter2to3, SpambayesFilter.typeName, 2, 3)
+
 
 class SpambayesBenefactor(item.Item):
     endowed = attributes.integer(default=0)

=== added file 'Quotient/xquotient/test/historic/spambayesfilter2to3.axiom.tbz2'
Binary files Quotient/xquotient/test/historic/spambayesfilter2to3.axiom.tbz2	1970-01-01 00:00:00 +0000 and Quotient/xquotient/test/historic/spambayesfilter2to3.axiom.tbz2	2012-08-17 20:25:22 +0000 differ
=== added file 'Quotient/xquotient/test/historic/stub_spambayesfilter2to3.py'
--- Quotient/xquotient/test/historic/stub_spambayesfilter2to3.py	1970-01-01 00:00:00 +0000
+++ Quotient/xquotient/test/historic/stub_spambayesfilter2to3.py	2012-08-17 20:25:22 +0000
@@ -0,0 +1,36 @@
+from StringIO import StringIO
+
+from axiom.test.historic.stubloader import saveStub
+from axiom.userbase import LoginMethod
+from axiom.dependency import installOn
+
+from xquotient.spam import SpambayesFilter
+
+SPAM_A = ["spam", "bad"]
+SPAM_B = ["junk", "evil"]
+HAM = ["ham", "good", "wonderful", "justice"]
+AMBIGUOUS = "ambiguous"
+
+
+class Document(object):
+    def __init__(self, text):
+        self.impl = self
+        self.source = self
+        self.text = text
+
+
+    def open(self):
+        return StringIO(self.text)
+
+
+
+def createDatabase(s):
+    bayes = SpambayesFilter(store=s)
+    installOn(bayes, s)
+    bayes.train(True, Document(" ".join(SPAM_A)))
+    bayes.train(True, Document(" ".join(SPAM_B + [AMBIGUOUS])))
+    bayes.train(False, Document(" ".join(HAM + [AMBIGUOUS])))
+
+
+if __name__ == '__main__':
+    saveStub(createDatabase, 'exarkun@xxxxxxxxxxxxxxxxx-20120811233233-fxrt1q924l6wmgxp')

=== modified file 'Quotient/xquotient/test/historic/test_spambayesfilter1to2.py'
--- Quotient/xquotient/test/historic/test_spambayesfilter1to2.py	2007-01-23 04:03:52 +0000
+++ Quotient/xquotient/test/historic/test_spambayesfilter1to2.py	2012-08-17 20:25:22 +0000
@@ -3,7 +3,7 @@
 
 
 class SpambayesFilterTestCase(stubloader.StubbedTest):
-    def testUpgrade(self):
+    def test_upgradedFields(self):
         """
         Ensure upgraded fields refer to correct items.
         """

=== added file 'Quotient/xquotient/test/historic/test_spambayesfilter2to3.py'
--- Quotient/xquotient/test/historic/test_spambayesfilter2to3.py	1970-01-01 00:00:00 +0000
+++ Quotient/xquotient/test/historic/test_spambayesfilter2to3.py	2012-08-17 20:25:22 +0000
@@ -0,0 +1,37 @@
+import sqlite3
+from axiom.test.historic import stubloader
+
+from xquotient.spam import SpambayesFilter
+from xquotient.test.historic.stub_spambayesfilter2to3 import (
+        SPAM_A, SPAM_B, HAM, AMBIGUOUS)
+from xquotient.test.historic.test_spambayesfilter1to2 import SpambayesFilterTestCase
+
+class SpambayesFilterTestCase(SpambayesFilterTestCase):
+    def test_upgradedTrainingDataset(self):
+        """
+        The upgrade converts the pickled training set into a SQLite3 training
+        set.  All of the training data is retained.
+        """
+        bayes = self.store.findUnique(SpambayesFilter)
+        db = sqlite3.connect(bayes._classifierPath().path)
+        cursor = db.cursor()
+        self.addCleanup(db.close)
+        self.addCleanup(cursor.close)
+
+        cursor.execute('SELECT word, nham, nspam FROM bayes')
+        expected = set(
+            [(word, 0, 10) for word in SPAM_A] +
+            [(word, 0, 10) for word in SPAM_B] +
+            [(word, 1, 0) for word in HAM] +
+            [(AMBIGUOUS, 1, 10)])
+        found = set(cursor.fetchall())
+
+        # There may be extra tokens due to funny extra spambayes logic.  That's
+        # fine.  As long as we see all the tokens we put there, the upgrade is a
+        # success.
+        self.assertEqual(expected, found & expected)
+
+        cursor.execute('SELECT nham, nspam FROM state')
+        nham, nspam = cursor.fetchone()
+        self.assertEqual(nham, 1)
+        self.assertEqual(nspam, 20)

=== modified file 'Quotient/xquotient/test/test_spambayes.py'
--- Quotient/xquotient/test/test_spambayes.py	2009-07-16 00:52:55 +0000
+++ Quotient/xquotient/test/test_spambayes.py	2012-08-17 20:25:22 +0000
@@ -1,4 +1,8 @@
 
+from StringIO import StringIO
+
+from spambayes.hammie import Hammie
+
 from twisted.trial import unittest
 
 from axiom import store, userbase
@@ -8,31 +12,121 @@
 from xquotient.test.test_dspam import MessageCreationMixin
 
 
+class SQLite3ClassifierTests(unittest.TestCase):
+    """
+    Tests for L{xquotient.spam._SQLite3Classifier}, a spambayes classifier which
+    persists training data in a SQLite3 database.
+    """
+    def setUp(self):
+        self.path = self.mktemp()
+        self.bayes = spam._SQLite3Classifier(self.path)
+        self.classifier = Hammie(self.bayes, mode='r')
+
+
+    def test_nspam(self):
+        """
+        L{SQLite3Classifier} tracks, in memory, the number of spam messages it
+        has been trained with.
+        """
+        self.classifier.train(StringIO("spam words of spamnfulness"), True)
+        self.assertEqual(self.bayes.nspam, 1)
+
+
+    def test_nspamPersisted(self):
+        """
+        L{SQLite3Classifier} tracks, in a database, the number of spam messages
+        it has been trained with.
+        """
+        self.classifier.train(StringIO("spam words of spamfulness"), True)
+        bayes = spam._SQLite3Classifier(self.path)
+        self.assertEqual(bayes.nspam, 1)
+
+
+    def test_nham(self):
+        """
+        L{SQLite3Classifier} tracks, in memory, the number of ham messages it
+        has been trained with.
+        """
+        self.classifier.train(StringIO("very nice words"), False)
+        self.assertEqual(self.bayes.nham, 1)
+
+
+    def test_nhamPersisted(self):
+        """
+        L{SQLite3Classifier} tracks, in a database, the number of ham messages
+        it has been trained with.
+        """
+        self.classifier.train(StringIO("very nice words"), False)
+        bayes = spam._SQLite3Classifier(self.path)
+        self.assertEqual(bayes.nham, 1)
+
+
+    def test_spamClassification(self):
+        """
+        L{SQLite3Classifier} can be trained with a spam message so as to later
+        classify messages like that one as spam.
+        """
+        self.classifier.train(StringIO("spam words of spamfulness"), True)
+        self.assertTrue(
+            self.classifier.score(StringIO("spamfulness words of spam")) > 0.99)
+
+
+    def test_hamClassification(self):
+        """
+        L{SQLite3Classifier} can be trained with a ham message so as to later
+        classify messages like that one as ham.
+        """
+        self.classifier.train(StringIO("very nice words"), False)
+        self.assertTrue(
+            self.classifier.score(StringIO("words, very nice")) < 0.01)
+
+
+
 class SpambayesFilterTestCase(unittest.TestCase, MessageCreationMixin):
-
+    """
+    Tests for L{xquotient.spam.SpambayesFilter}.
+    """
     def setUp(self):
         dbdir = self.mktemp()
         self.store = s = store.Store(dbdir)
-        ls = userbase.LoginSystem(store=s)
-        installOn(ls, s)
-        acc = ls.addAccount('username', 'dom.ain', 'password')
-        ss = acc.avatars.open()
-        self.df = spam.SpambayesFilter(store=ss)
-        installOn(self.df, ss)
-        self.f = self.df.filter
-
-    def testMessageClassification(self):
-        self.f.processItem(self._message())
-
-    def testMessageTraining(self):
+        def account():
+            ls = userbase.LoginSystem(store=s)
+            installOn(ls, s)
+            acc = ls.addAccount('username', 'dom.ain', 'password')
+            return acc.avatars.open()
+        ss = s.transact(account)
+        def spambayes():
+            self.df = spam.SpambayesFilter(store=ss)
+            installOn(self.df, ss)
+            self.f = self.df.filter
+        ss.transact(spambayes)
+
+
+    def test_messageClassification(self):
+        """
+        When L{SpambayesFilter} is installed, L{Filter} uses it to classify
+        items it processes.
+        """
+        self.store.transact(self.f.processItem, self._message())
+        # XXX And the actual test here would be ... ?
+
+
+    def test_messageTraining(self):
+        """
+        L{SpambayesFilter.train} adds tokens from the given message to the
+        training dataset for either ham or spam.
+        """
         m = self._message()
         self.df.classify(m)
         self.df.train(True, m)
+        # XXX Really, unit tests should make _some_ assertions.
+
 
     def test_messageRetraining(self):
         """
         Test that removing the training data and rebuilding it succeeds.
         """
-        self.testMessageTraining()
+        self.test_messageTraining()
         self.df.forgetTraining()
-
+        # XXX The docstring is a total lie, there is no actual testing going on
+        # here.


Follow ups