← Back to team overview

divmod-dev team mailing list archive

[Merge] lp:~exarkun/divmod.org/pop3-grabber-deletes into lp:divmod.org

 

Jean-Paul Calderone has proposed merging lp:~exarkun/divmod.org/pop3-grabber-deletes into lp:divmod.org.

Requested reviews:
  Divmod-dev (divmod-dev)
Related bugs:
  Bug #1073368 in Quotient: "pop3 grabber leaves old emails on server"
  https://bugs.launchpad.net/quotient/+bug/1073368

For more details, see:
https://code.launchpad.net/~exarkun/divmod.org/pop3-grabber-deletes/+merge/132756

This implements deletion of messages from POP3 servers at least one week after Quotient grabs them.

-- 
https://code.launchpad.net/~exarkun/divmod.org/pop3-grabber-deletes/+merge/132756
Your team Divmod-dev is requested to review the proposed merge of lp:~exarkun/divmod.org/pop3-grabber-deletes into lp:divmod.org.
=== modified file 'Quotient/xquotient/grabber.py'
--- Quotient/xquotient/grabber.py	2012-05-11 14:05:29 +0000
+++ Quotient/xquotient/grabber.py	2012-11-02 21:33:23 +0000
@@ -3,7 +3,7 @@
 from epsilon import hotfix
 hotfix.require('twisted', 'deferredgenerator_tfailure')
 
-import time, datetime
+import time, datetime, functools
 
 from twisted.mail import pop3, pop3client
 from twisted.internet import protocol, defer, ssl, error
@@ -161,6 +161,12 @@
 
 
 class POP3UID(item.Item):
+    schemaVersion = 2
+
+    retrieved = attributes.timestamp(doc="""
+    When this POP3 UID was retrieved (or when retrieval failed).
+    """, allowNone=False)
+
     grabberID = attributes.text(doc="""
     A string identifying the email-address/port parts of a
     configured grabber
@@ -173,14 +179,33 @@
     failed = attributes.boolean(doc="""
     When set, indicates that an attempt was made to retrieve this UID,
     but for some reason was unsuccessful.
-    """, indexed=True, default=False)
-
+    """, default=False)
+
+    attributes.compoundIndex(grabberID, retrieved)
+
+
+def _pop3uid1to2(old):
+    return old.upgradeVersion(
+        POP3UID.typeName, 1, 2,
+        value=old.value, failed=old.failed, grabberID=old.grabberID,
+        retrieved=extime.Time())
+registerUpgrader(_pop3uid1to2, POP3UID.typeName, 1, 2)
+
+POP3UIDv1 = item.declareLegacyItem(POP3UID.typeName, 1, dict(
+        grabberID=attributes.text(indexed=True),
+        value=attributes.bytes(indexed=True),
+        failed=attributes.boolean(indexed=True, default=False)))
 
 
 class POP3Grabber(item.Item):
     """
     Item for retrieving email messages from a remote POP server.
     """
+    DELETE_DELAY = datetime.timedelta(days=7)
+
+    now = attributes.inmemory(doc="""
+    A callable returning a Time instance representing the current time.
+    """)
 
     config = attributes.reference(doc="""
     The L{GrabberConfiguration} which created this grabber.
@@ -271,6 +296,7 @@
         self._pop3uids = None
         self.running = False
         self.protocol = None
+        self.now = extime.Time
         if self.status is None:
             self.status = Status(store=self.store, message=u'idle')
 
@@ -288,6 +314,14 @@
         # Don't run concurrently, ever.
         if self.running:
             return
+
+        # Don't run while POP3UIDs are being upgraded.  Any that have not yet
+        # been upgraded won't be returned from query(POP3UID) calls, which will
+        # confuse the logic about which messages to download.  Eventually
+        # they'll all be upgraded and we'll resume grabbing.
+        if self.store.query(POP3UIDv1).count():
+            return
+
         self.running = True
 
         from twisted.internet import reactor
@@ -340,10 +374,30 @@
     grabberID = property(_grabberID)
 
 
-    def shouldRetrieve(self, uidList):
-        """
-        Return a list of (index, uid) pairs from C{uidList} which have not
-        already been grabbed.
+    def shouldDelete(self, uidList):
+        """
+        Return a list of (index, uid) pairs from C{uidList} which were
+        downloaded long enough ago that they can be deleted now.
+        """
+        # Limit to POP3UIDs which were retrieved at least DELETE_DELAY ago.
+        # Failed attempts do not count.
+
+        where = attributes.AND(
+            POP3UID.grabberID == self.grabberID,
+            POP3UID.retrieved < self.now() - self.DELETE_DELAY,
+            POP3UID.failed == False)
+
+        # Here are the server-side POP3 UIDs which we have downloaded and which
+        # are old enough, so we should delete them.
+        pop3uids = set(self.store.query(POP3UID, where).getColumn("value"))
+
+        return [pair for pair in uidList if pair[1] in pop3uids]
+
+
+    def _getPOP3UIDs(self):
+        """
+        Return all the L{POP3UID} instances created by this grabber which still
+        exist, perhaps from an in-memory cache.
         """
         if self._pop3uids is None:
             before = time.time()
@@ -352,8 +406,17 @@
             self._pop3uids = set(self.store.query(POP3UID, POP3UID.grabberID == self.grabberID).getColumn("value"))
             after = time.time()
             log.msg(interface=iaxiom.IStatEvent, stat_pop3uid_load_time=after - before)
+        return self._pop3uids
+
+
+    def shouldRetrieve(self, uidList):
+        """
+        Return a list of (index, uid) pairs from C{uidList} which have not
+        already been grabbed.
+        """
+        pop3uids = self._getPOP3UIDs()
         log.msg(interface=iaxiom.IStatEvent, stat_pop3uid_check=len(uidList))
-        return [pair for pair in uidList if pair[1] not in self._pop3uids]
+        return [pair for pair in uidList if pair[1] not in pop3uids]
 
 
     def markSuccess(self, uid, msg):
@@ -378,17 +441,33 @@
             msg.archive()
         log.msg(interface=iaxiom.IStatEvent, stat_messages_grabbed=1,
                 userstore=self.store)
-        POP3UID(store=self.store, grabberID=self.grabberID, value=uid)
+        POP3UID(
+            store=self.store,
+            grabberID=self.grabberID,
+            value=uid,
+            retrieved=self.now())
         if self._pop3uids is not None:
             self._pop3uids.add(uid)
 
 
     def markFailure(self, uid, err):
-        POP3UID(store=self.store, grabberID=self.grabberID, value=uid, failed=True)
+        POP3UID(
+            store=self.store,
+            grabberID=self.grabberID,
+            value=uid,
+            retrieved=self.now(),
+            failed=True)
         if self._pop3uids is not None:
             self._pop3uids.add(uid)
 
 
+    def markDeleted(self, uid):
+        where = attributes.AND(
+            POP3UID.value == uid, POP3UID.grabberID == self.grabberID)
+        query = self.store.query(POP3UID, where)
+        query.deleteFromStore()
+
+
 
 class POP3GrabberProtocol(pop3.AdvancedPOP3Client):
     _rate = 50
@@ -479,6 +558,9 @@
         # All the (index, uid) pairs which should be retrieved
         uidList = []
 
+        # All of the (index, uid) pairs which should be deleted
+        uidDeleteList = []
+
         # Consumer for listUID - adds to the working set and processes
         # a batch if appropriate.
         def consumeUIDLine(ent):
@@ -487,9 +569,14 @@
                 processBatch()
 
         def processBatch():
-            L = self.shouldRetrieve(uidWorkingSet)
-            L.sort()
-            uidList.extend(L)
+            toRetrieve = self.shouldRetrieve(uidWorkingSet)
+            toRetrieve.sort()
+            uidList.extend(toRetrieve)
+
+            toDelete = self.shouldDelete(uidWorkingSet)
+            toDelete.sort()
+            uidDeleteList.extend(toDelete)
+
             del uidWorkingSet[:]
 
 
@@ -555,6 +642,17 @@
                 else:
                     self.markSuccess(uid, rece.message)
 
+        # Delete any old messages that should now be deleted
+        for (index, uid) in uidDeleteList:
+            d = defer.waitForDeferred(self.delete(index))
+            yield d
+            try:
+                d.getResult()
+            except (error.ConnectionDone, error.ConnectionLost):
+                return
+
+            self.markDeleted(uid)
+
         self.setStatus(u"Logging out...")
         d = defer.waitForDeferred(self.quit())
         yield d
@@ -584,56 +682,68 @@
 
 
 
+def _requiresGrabberItem(f):
+    """
+    Decorator for a method on ControlledPOP3GrabberProtocol which makes it safe
+    to call even after the connection has been lost.
+    """
+    @functools.wraps(f)
+    def safe(self, *args, **kwargs):
+        if self.grabber is not None:
+            return self.grabber.store.transact(f, self, *args, **kwargs)
+    return safe
+
+
+
 class ControlledPOP3GrabberProtocol(POP3GrabberProtocol):
-    def _transact(self, *a, **kw):
-        return self.grabber.store.transact(*a, **kw)
-
-
+    _transient = False
+    def transientFailure(self, f):
+        self._transient = True
+
+
+    @_requiresGrabberItem
     def getSource(self):
         return u'pop3://' + self.grabber.grabberID
 
 
+    @_requiresGrabberItem
     def setStatus(self, msg, success=True):
-        if self.grabber is not None:
-            self._transact(self.grabber.status.setStatus, msg, success)
-
-
+        return self.grabber.status.setStatus(msg, success)
+
+
+    @_requiresGrabberItem
     def shouldRetrieve(self, uidList):
-        if self.grabber is not None:
-            return self._transact(self.grabber.shouldRetrieve, uidList)
-
-
+        return self.grabber.shouldRetrieve(uidList)
+
+
+    @_requiresGrabberItem
+    def shouldDelete(self, uidList):
+        return self.grabber.shouldDelete(uidList)
+
+
+    @_requiresGrabberItem
     def createMIMEReceiver(self, source):
-        if self.grabber is not None:
-            def createIt():
-                agent = self.grabber.config.deliveryAgent
-                return agent.createMIMEReceiver(source)
-            return self._transact(createIt)
-
-
+        agent = self.grabber.config.deliveryAgent
+        return agent.createMIMEReceiver(source)
+
+
+    @_requiresGrabberItem
     def markSuccess(self, uid, msg):
-        if self.grabber is not None:
-            return self._transact(self.grabber.markSuccess, uid, msg)
-
-
+        return self.grabber.markSuccess(uid, msg)
+
+
+    @_requiresGrabberItem
     def markFailure(self, uid, reason):
-        if self.grabber is not None:
-            return self._transact(self.grabber.markFailure, uid, reason)
-
-
+        return self.grabber.markFailure(uid, reason)
+
+
+    @_requiresGrabberItem
     def paused(self):
-        if self.grabber is not None:
-            return self.grabber.paused
-
-
-    _transient = False
-    def transientFailure(self, f):
-        self._transient = True
-
-
+        return self.grabber.paused
+
+
+    @_requiresGrabberItem
     def stoppedRunning(self):
-        if self.grabber is None:
-            return
         self.grabber.running = False
         if self._transient:
             iaxiom.IScheduler(self.grabber.store).reschedule(

=== added file 'Quotient/xquotient/test/historic/pop3uid1to2.axiom.tbz2'
Binary files Quotient/xquotient/test/historic/pop3uid1to2.axiom.tbz2	1970-01-01 00:00:00 +0000 and Quotient/xquotient/test/historic/pop3uid1to2.axiom.tbz2	2012-11-02 21:33:23 +0000 differ
=== added file 'Quotient/xquotient/test/historic/stub_pop3uid1to2.py'
--- Quotient/xquotient/test/historic/stub_pop3uid1to2.py	1970-01-01 00:00:00 +0000
+++ Quotient/xquotient/test/historic/stub_pop3uid1to2.py	2012-11-02 21:33:23 +0000
@@ -0,0 +1,37 @@
+# -*- test-case-name: xquotient.test.historic.test_pop3uid1to2 -*-
+
+"""
+Create stub database for upgrade of L{xquotient.grabber.POP3UID} from version 1
+to version 2.
+"""
+
+from axiom.test.historic.stubloader import saveStub
+
+from axiom.userbase import LoginSystem
+from axiom.dependency import installOn
+
+from xquotient.grabber import POP3UID
+
+VALUE = b"12345678abcdefgh"
+FAILED = False
+GRABBER_ID = u"alice@xxxxxxxxxxx:1234"
+
+def createDatabase(s):
+    """
+    Create an account in the given store and create a POP3UID item in it.
+    """
+    loginSystem = LoginSystem(store=s)
+    installOn(loginSystem, s)
+
+    account = loginSystem.addAccount(u'testuser', u'localhost', None)
+    subStore = account.avatars.open()
+
+    POP3UID(
+        store=subStore,
+        value=VALUE,
+        failed=FAILED,
+        grabberID=GRABBER_ID)
+
+
+if __name__ == '__main__':
+    saveStub(createDatabase, 'exarkun@xxxxxxxxxxxxxxxxx-20120913121256-tg7d6l1w3rkpfehr')

=== added file 'Quotient/xquotient/test/historic/test_pop3uid1to2.py'
--- Quotient/xquotient/test/historic/test_pop3uid1to2.py	1970-01-01 00:00:00 +0000
+++ Quotient/xquotient/test/historic/test_pop3uid1to2.py	2012-11-02 21:33:23 +0000
@@ -0,0 +1,32 @@
+
+"""
+Test that a version 1 POP3UID is unchanged by the upgrade except that it gains a
+value for the new C{retrieved} attribute set to something near the current time.
+"""
+
+from epsilon.extime import Time
+
+from axiom.userbase import LoginSystem
+from axiom.test.historic.stubloader import StubbedTest
+
+from xquotient.test.historic.stub_pop3uid1to2 import VALUE, FAILED, GRABBER_ID
+from xquotient.grabber import POP3UID
+
+class POP3UIDUpgradeTestCase(StubbedTest):
+    def test_attributes(self):
+        loginSystem = self.store.findUnique(LoginSystem)
+        account = loginSystem.accountByAddress(u'testuser', u'localhost')
+        subStore = account.avatars.open()
+
+        d = subStore.whenFullyUpgraded()
+        def upgraded(ignored):
+            [pop3uid] = list(subStore.query(POP3UID))
+            self.assertEqual(VALUE, pop3uid.value)
+            self.assertEqual(FAILED, pop3uid.failed)
+            self.assertEqual(GRABBER_ID, pop3uid.grabberID)
+
+            # This will be close enough.
+            elapsed = (Time() - pop3uid.retrieved).total_seconds()
+            self.assertTrue(abs(elapsed) < 60)
+        d.addCallback(upgraded)
+        return d

=== modified file 'Quotient/xquotient/test/test_grabber.py'
--- Quotient/xquotient/test/test_grabber.py	2012-05-11 14:05:29 +0000
+++ Quotient/xquotient/test/test_grabber.py	2012-11-02 21:33:23 +0000
@@ -3,8 +3,11 @@
 
 from datetime import timedelta
 
+from zope.interface import directlyProvides
+
 from twisted.trial import unittest
 from twisted.internet import defer, error
+from twisted.internet.interfaces import ISSLTransport
 from twisted.mail import pop3
 from twisted.cred import error as ecred
 from twisted.test.proto_helpers import StringTransport
@@ -15,6 +18,7 @@
 from epsilon.test import iosim
 
 from axiom import iaxiom, store, substore, scheduler
+from axiom.test.util import QueryCounter
 
 from xquotient import grabber, mimepart
 
@@ -50,6 +54,8 @@
     def connectionMade(self):
         grabber.POP3GrabberProtocol.connectionMade(self)
         self.events = []
+        self.uidsForDeletion = set()
+        self.uidsNotForRetrieval = set()
 
 
     def getSource(self):
@@ -62,8 +68,13 @@
 
 
     def shouldRetrieve(self, uidList):
-        self.events.append(('retrieve', uidList))
-        return list(uidList)
+        self.events.append(('retrieve', list(uidList)))
+        return [pair for pair in uidList if pair[1] not in self.uidsNotForRetrieval]
+
+
+    def shouldDelete(self, uidList):
+        self.events.append(('delete', list(uidList)))
+        return [pair for pair in uidList if pair[1] in self.uidsForDeletion]
 
 
     def createMIMEReceiver(self, source):
@@ -80,6 +91,10 @@
         self.events.append(('failure', uid, reason))
 
 
+    def markDeleted(self, uid):
+        self.events.append(('markDeleted', uid))
+
+
     def paused(self):
         self.events.append(('paused',))
         return False
@@ -229,6 +244,52 @@
             'stopped')
 
 
+    def test_deletion(self):
+        """
+        Messages indicated by C{shouldDelete} to be ready for deleted are
+        deleted using the I{DELE} POP3 protocol action.
+        """
+        transport = StringTransport()
+        # Convince the client to log in
+        directlyProvides(transport, ISSLTransport)
+
+        self.client.makeConnection(transport)
+        self.addCleanup(self.client.connectionLost, error.ConnectionLost("Simulated"))
+
+        self.client.uidsForDeletion.add(b'xyz')
+        self.client.uidsNotForRetrieval.add(b'abc')
+        self.client.uidsNotForRetrieval.add(b'xyz')
+
+        # Server greeting
+        self.client.dataReceived("+OK Hello\r\n")
+        # CAPA response
+        self.client.dataReceived("+OK\r\nUSER\r\nUIDL\r\n.\r\n")
+        # USER response
+        self.client.dataReceived("+OK\r\n")
+        # PASS response
+        self.client.dataReceived("+OK\r\n")
+
+        del self.client.events[:]
+        transport.clear()
+
+        # UIDL response
+        self.client.dataReceived('+OK \r\n1 abc\r\n3 xyz\r\n.\r\n')
+
+        # Protocol should consult shouldDelete with the UIDs and start issuing
+        # delete commands.
+        self.assertEquals(
+            [('delete', [(0, 'abc'), (2, 'xyz')])],
+            [event for event in self.client.events if event[0] == 'delete'])
+        self.assertEqual("DELE 3\r\n", transport.value())
+
+        del self.client.events[:]
+
+        # DELE response
+        self.client.dataReceived("+OK\r\n")
+
+        self.assertEquals(('markDeleted', 'xyz'), self.client.events[0])
+
+
     def testLineTooLong(self):
         """
         Make sure a message illegally served with a line longer than we will
@@ -400,21 +461,38 @@
         self.assertTrue(scheduled[0] <= extime.Time())
 
 
+    def _timeoutTest(self, exchange):
+        """
+        Exercise handling of a connection timeout at some phase of the
+        interaction.
+        """
+        transport = StringTransport()
+        factory = grabber.POP3GrabberFactory(self.grabberItem, False)
+        protocol = factory.buildProtocol(None)
+        protocol.allowInsecureLogin = True
+        protocol.makeConnection(transport)
+
+        for (serverMessage, clientMessage) in exchange:
+            protocol.dataReceived(serverMessage)
+            self.assertEqual(clientMessage, transport.value())
+            transport.clear()
+
+        protocol.timeoutConnection()
+        self.assertTrue(transport.disconnecting)
+        protocol.connectionLost(Failure(error.ConnectionLost("Simulated")))
+
+        self.assertEqual(
+            self.grabberItem.status.message,
+            u"Timed out waiting for server response.")
+
+
     def test_stoppedRunningAfterTimeout(self):
         """
         When L{ControlledPOP3GrabberProtocol} times out the connection
         due to inactivity, the controlling grabber's status is set to
         reflect this.
         """
-        factory = grabber.POP3GrabberFactory(self.grabberItem, False)
-        protocol = factory.buildProtocol(None)
-        protocol.makeConnection(StringTransport())
-        protocol.timeoutConnection()
-        protocol.connectionLost(Failure(error.ConnectionLost("Simulated")))
-
-        self.assertEqual(
-            self.grabberItem.status.message,
-            u"Timed out waiting for server response.")
+        self._timeoutTest([])
 
 
     def test_stoppedRunningAfterListTimeout(self):
@@ -424,28 +502,53 @@
         (list UIDs) command, the controlling grabber's status is set
         to reflect this.
         """
-        factory = grabber.POP3GrabberFactory(self.grabberItem, False)
-        protocol = factory.buildProtocol(None)
-        protocol.allowInsecureLogin = True
-        protocol.makeConnection(StringTransport())
-        # Server greeting
-        protocol.dataReceived("+OK Hello\r\n") 
-        # CAPA response
-        protocol.dataReceived("+OK\r\nUSER\r\nUIDL\r\n.\r\n")
-        # USER response
-        protocol.dataReceived("+OK\r\n")
-        # PASS response
-        protocol.dataReceived("+OK\r\n")
-        # Sanity check, we should have gotten to sending the UIDL
-        self.assertTrue(
-            protocol.transport.value().endswith("\r\nUIDL\r\n"),
-            "Failed to get to UIDL: %r" % (protocol.transport.value(),))
-
-        protocol.timeoutConnection()
-        protocol.connectionLost(Failure(error.ConnectionLost("Simulated")))
-        self.assertEqual(
-            self.grabberItem.status.message,
-            u"Timed out waiting for server response.")
+        self._timeoutTest([
+                # Server greeting
+                (b"+OK Hello\r\n", b"CAPA\r\n"),
+                # CAPA response
+                (b"+OK\r\nUSER\r\nUIDL\r\n.\r\n", b"USER alice\r\n"),
+                # USER response
+                (b"+OK\r\n", b"PASS secret\r\n"),
+                # PASS response
+                (b"+OK\r\n", b"UIDL\r\n")])
+
+
+    def test_stoppedRunningAfterDeleteTimeout(self):
+        # Set up some good state to want to delete
+        uid = b'abc'
+        delay = self.grabberItem.DELETE_DELAY
+        future = extime.Time()
+        now = future - delay - timedelta(seconds=1)
+        self.grabberItem.now = lambda: now
+        self.grabberItem.markSuccess(uid, StubMessage())
+        now = future
+
+        self._timeoutTest([
+                # Server greeting
+                (b"+OK Hello\r\n", b"CAPA\r\n"),
+                # CAPA response
+                (b"+OK\r\nUSER\r\nUIDL\r\n.\r\n", b"USER alice\r\n"),
+                # USER response
+                (b"+OK\r\n", b"PASS secret\r\n"),
+                # PASS response
+                (b"+OK\r\n", b"UIDL\r\n"),
+                # UIDL response
+                (b"+OK\r\n1 abc\r\n.\r\n", b"DELE 1\r\n")])
+
+
+    def test_notGrabWhileUpgrading(self):
+        """
+        As long as any old (schemaVersion less than most recent) L{POP3UID}
+        items remain in the database, L{POP3Grabber.grab} does not try to grab
+        any messages.
+        """
+        grabber.POP3UIDv1(
+            store=self.userStore,
+            grabberID=self.grabberItem.grabberID,
+            failed=False,
+            value=b'xyz')
+        self.grabberItem.grab()
+        self.assertFalse(self.grabberItem.running)
 
 
 
@@ -490,7 +593,8 @@
         for i in xrange(100, 200):
             grabber.POP3UID(store=self.store,
                             grabberID=self.grabber.grabberID,
-                            value=str(i))
+                            value=str(i),
+                            retrieved=extime.Time())
 
 
     def testShouldRetrieve(self):
@@ -541,6 +645,34 @@
             [(49, '49'), (51, '51')])
 
 
+    def test_successTimestamp(self):
+        """
+        The L{POP3UID} instance created by L{POP3Grabber.markSuccess} has its
+        C{retrieved} attribute set to the current time as reported by
+        L{POP3Grabber.now}.
+        """
+        now = extime.Time()
+        self.grabber.now = lambda: now
+        self.grabber.markSuccess(b'123abc', StubMessage())
+        [pop3uid] = list(self.store.query(
+                grabber.POP3UID, grabber.POP3UID.value == b'123abc'))
+        self.assertEqual(now, pop3uid.retrieved)
+
+
+    def test_failureTimestamp(self):
+        """
+        The L{POP3UID} instance created by L{POP3Grabber.markFailure} has its
+        C{retrieved} attribute set to the current time as reported by
+        L{POP3Grabber.now}.
+        """
+        now = extime.Time()
+        self.grabber.now = lambda: now
+        self.grabber.markFailure(b'123abc', object())
+        [pop3uid] = list(self.store.query(
+                grabber.POP3UID, grabber.POP3UID.value == b'123abc'))
+        self.assertEqual(now, pop3uid.retrieved)
+
+
     def test_delete(self):
         """
         L{POP3Grabber.delete} unschedules the grabber.
@@ -553,3 +685,192 @@
         # was scheduled either.
         self.assertEqual(
             [], list(store.query(scheduler.TimedEvent)))
+
+
+    def test_shouldDeleteOldMessage(self):
+        """
+        C{shouldDelete} accepts a list of (index, uid) pairs and returns a list
+        of (index, uid) pairs corresponding to messages which may now be deleted
+        from the POP3 server (due to having been downloaded more than a fixed
+        number of days in the past).
+        """
+        epoch = extime.Time()
+        now = epoch - (self.grabber.DELETE_DELAY + timedelta(days=1))
+
+        self.grabber.now = lambda: now
+
+        # Generate some state representing a past success
+        oldEnough = b'123abc'
+        self.grabber.markSuccess(oldEnough, StubMessage())
+
+        # Wind the clock forward far enough so that oldEnough should be
+        # considered old enough for deletion.
+        now = epoch
+
+        self.assertEqual(
+            [(3, oldEnough)], self.grabber.shouldDelete([(3, oldEnough)]))
+
+
+    def test_shouldDeleteOtherGrabberState(self):
+        """
+        Messages downloaded by an unrelated grabber are not considered by
+        C{shouldDelete}.
+        """
+        uid = b'abcdef'
+        then = extime.Time() - self.grabber.DELETE_DELAY - timedelta(days=1)
+        grabber.POP3UID(
+            store=self.store, grabberID=u'bob@xxxxxxxxxxx:default', value=uid,
+            retrieved=then)
+
+        self.assertEqual([], self.grabber.shouldDelete([(5, uid)]))
+
+
+
+    def test_shouldDeleteNewMessage(self):
+        """
+        Messages downloaded less than a fixed number of days in the past are not
+        indicated as deletable by C{shouldDelete}.
+        """
+        epoch = extime.Time()
+        now = epoch - (self.grabber.DELETE_DELAY - timedelta(days=1))
+
+        self.grabber.now = lambda: now
+
+        # Generate some state representing a *recently* past success
+        newEnough = b'xyz123'
+        self.grabber.markSuccess(newEnough, StubMessage())
+
+        # Wind the clock forward, but not so far forward that newEnough is
+        # considered old enough for deletion.
+        now = epoch
+
+        self.assertEqual(
+            [], self.grabber.shouldDelete([(5, newEnough)]))
+
+
+    def test_shouldDeleteFailedMessage(self):
+        """
+        Messages for which the download failed are not indicated as deletable by
+        C{shouldDelete}.
+        """
+        epoch = extime.Time()
+        now = epoch - (self.grabber.DELETE_DELAY + timedelta(days=1))
+
+        self.grabber.now = lambda: now
+
+        # Generate some state representing a past failure
+        failed = b'xyz123'
+        self.grabber.markFailure(failed, object())
+
+        # Wind the clock forward enough so that the failed message would be old
+        # enough - if it had been a success.
+        now = epoch
+
+        self.assertEqual(
+            [], self.grabber.shouldDelete([(7, failed)]))
+
+
+    def test_shouldDeleteUnknownMessage(self):
+        """
+        Messages which have not been downloaded are not indicated as deletable
+        by C{shouldDelete}.
+        """
+        self.assertEqual(
+            [], self.grabber.shouldDelete([(7, b'9876wxyz')]))
+
+
+    def test_now(self):
+        """
+        L{POP3Grabber.now} returns the current time.
+        """
+        self.assertTrue(extime.Time() <= self.grabber.now())
+        self.assertTrue(self.grabber.now() <= extime.Time())
+
+
+    def test_markDeleted(self):
+        """
+        L{POP3Grabber.markDeleted} deletes the L{POP3UID} corresponding to the
+        message UID passed in.
+        """
+        uid = b'abcdef'
+        self.grabber.markSuccess(uid, StubMessage())
+        self.grabber.markDeleted(uid)
+        persistentUIDs = list(self.store.query(
+                grabber.POP3UID, grabber.POP3UID.value == uid))
+        self.assertEqual([], persistentUIDs)
+
+
+    def test_markDeletedOtherGrabber(self):
+        """
+        L{POP3Grabber.markDeleted} does not delete a L{POP3UID} with a matching
+        message UID but which belongs to a different grabber.
+        """
+        uid = b'abcdef'
+        pop3uid = grabber.POP3UID(
+            store=self.store,
+            grabberID=u'bob@xxxxxxxxxxx:default',
+            value=uid,
+            retrieved=extime.Time())
+        self.grabber.markDeleted(uid)
+        persistentUIDs = list(self.store.query(
+                grabber.POP3UID, grabber.POP3UID.value == uid))
+        self.assertEqual([pop3uid], persistentUIDs)
+
+
+
+class ShouldDeleteComplexityTests(unittest.TestCase):
+    """
+    Tests for the query complexity of L{POP3Grabber.shouldDelete}.
+    """
+    def test_otherGrabber(self):
+        """
+        The database complexity of L{POP3Grabber.shouldDelete} is independent of
+        the number of L{POP3UID} items which belong to another L{POP3Grabber}.
+        """
+        self._complexityTest(
+            lambda grabberItem: grabber.POP3UID(
+                store=grabberItem.store, retrieved=extime.Time(), failed=False,
+                grabberID=grabberItem.grabberID + b'unrelated', value=b'123'))
+
+
+    def test_shouldNotDelete(self):
+        """
+        The database complexity of L{POP3Grabber.shouldDelete} is independent of
+        the number of L{POP3UID} items which exist in the database but do not
+        yet merit deletion.
+        """
+        self._complexityTest(
+            lambda grabberItem: grabber.POP3UID(
+                store=grabberItem.store, retrieved=extime.Time(), failed=False,
+                grabberID=grabberItem.grabberID, value=b'def'))
+
+
+    def _complexityTest(self, makePOP3UID):
+        s = store.Store()
+        counter = QueryCounter(s)
+
+        config = grabber.GrabberConfiguration(store=s)
+        grabberItem = grabber.POP3Grabber(
+            store=s,
+            config=config,
+            username=u"testuser",
+            domain=u"example.com",
+            password=u"password")
+
+        # Create at least one POP3UID, since zero-items-in-table is always
+        # different from any-items-in-table.
+        for i in range(5):
+            grabber.POP3UID(
+                store=s, retrieved=extime.Time(), failed=False,
+                grabberID=grabberItem.grabberID, value=b'abc' + str(i))
+
+        fewer = counter.measure(
+            lambda: grabberItem.shouldDelete([b"123"]))
+
+        # Create another non-matching POP3UID
+        makePOP3UID(grabberItem)
+
+        more = counter.measure(
+            lambda: grabberItem.shouldDelete([b"123"]))
+
+        self.assertEqual(fewer, more)


Follow ups