← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~stub/launchpad/garbo into lp:launchpad/db-devel

 

Stuart Bishop has proposed merging lp:~stub/launchpad/garbo into lp:launchpad/db-devel.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #768139 in Launchpad itself: "gabo script timeout behavior needs tweaking"
  https://bugs.launchpad.net/launchpad/+bug/768139
  Bug #795305 in Launchpad itself: "bugsummaryjournal is not automatically rolled up"
  https://bugs.launchpad.net/launchpad/+bug/795305

For more details, see:
https://code.launchpad.net/~stub/launchpad/garbo/+merge/69792

Implement a 5 minute garbo job running and use it to fix Bug #795305.

Also a bit of delinting of touched code, and some minor garbo tunings (reducing the default transaction goal time to 2 seconds, and moving some other jobs to the fequent garbo runner to spread the load more evenly).
-- 
https://code.launchpad.net/~stub/launchpad/garbo/+merge/69792
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~stub/launchpad/garbo into lp:launchpad/db-devel.
=== added file 'cronscripts/garbo-frequently.py'
--- cronscripts/garbo-frequently.py	1970-01-01 00:00:00 +0000
+++ cronscripts/garbo-frequently.py	2011-07-29 13:44:41 +0000
@@ -0,0 +1,23 @@
+#!/usr/bin/python -S
+#
+# Copyright 2011 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Database garbage collector, every 5 minutes.
+
+Remove or archive unwanted data. Detect, warn and possibly repair data
+corruption.
+"""
+
+__metaclass__ = type
+__all__ = []
+
+import _pythonpath
+
+from lp.scripts.garbo import FrequentDatabaseGarbageCollector
+
+
+if __name__ == '__main__':
+    script = FrequentDatabaseGarbageCollector()
+    script.continue_on_failure = True
+    script.lock_and_run()

=== modified file 'database/schema/security.cfg'
--- database/schema/security.cfg	2011-07-28 12:58:14 +0000
+++ database/schema/security.cfg	2011-07-29 13:44:41 +0000
@@ -2187,6 +2187,10 @@
 groups=garbo
 type=user
 
+[garbo_frequently]
+groups=garbo
+type=user
+
 [generateppahtaccess]
 groups=script
 public.archive                          = SELECT

=== modified file 'lib/canonical/launchpad/utilities/looptuner.py'
--- lib/canonical/launchpad/utilities/looptuner.py	2011-04-12 09:57:20 +0000
+++ lib/canonical/launchpad/utilities/looptuner.py	2011-07-29 13:44:41 +0000
@@ -311,7 +311,7 @@
     """A base implementation of `ITunableLoop`."""
     implements(ITunableLoop)
 
-    goal_seconds = 4
+    goal_seconds = 2
     minimum_chunk_size = 1
     maximum_chunk_size = None # Override
     cooldown_time = 0

=== modified file 'lib/lp/scripts/garbo.py'
--- lib/lp/scripts/garbo.py	2011-07-05 05:46:02 +0000
+++ lib/lp/scripts/garbo.py	2011-07-29 13:44:41 +0000
@@ -56,7 +56,6 @@
 from lp.bugs.interfaces.bug import IBugSet
 from lp.bugs.model.bug import Bug
 from lp.bugs.model.bugattachment import BugAttachment
-from lp.bugs.model.bugmessage import BugMessage
 from lp.bugs.model.bugnotification import BugNotification
 from lp.bugs.model.bugwatch import BugWatchActivity
 from lp.bugs.scripts.checkwatches.scheduler import (
@@ -84,7 +83,7 @@
 from lp.translations.model.potranslation import POTranslation
 
 
-ONE_DAY_IN_SECONDS = 24*60*60
+ONE_DAY_IN_SECONDS = 24 * 60 * 60
 
 
 class BulkPruner(TunableLoop):
@@ -290,12 +289,34 @@
         """
 
 
+class BugSummaryJournalRollup(TunableLoop):
+    """Rollup BugSummaryJournal rows into BugSummary."""
+    maximum_chunk_size = 5000
+
+    def __init__(self, log, abort_time=None):
+        super(BugSummaryJournalRollup, self).__init__(log, abort_time)
+        self.store = getUtility(IStoreSelector).get(MAIN_STORE, MASTER_FLAVOR)
+
+    def isDone(self):
+        has_more = self.store.execute(
+            "SELECT EXISTS (SELECT TRUE FROM BugSummaryJournal LIMIT 1)"
+            ).get_one()[0]
+        return not has_more
+
+    def __call__(self, chunk_size):
+        chunk_size = int(chunk_size + 0.5)
+        self.store.execute(
+            "SELECT bugsummary_rollup_journal(%s)", (chunk_size,),
+            noresult=True)
+        self.store.commit()
+
+
 class OpenIDConsumerNoncePruner(TunableLoop):
     """An ITunableLoop to prune old OpenIDConsumerNonce records.
 
     We remove all OpenIDConsumerNonce records older than 1 day.
     """
-    maximum_chunk_size = 6*60*60 # 6 hours in seconds.
+    maximum_chunk_size = 6 * 60 * 60  # 6 hours in seconds.
 
     def __init__(self, log, abort_time=None):
         super(OpenIDConsumerNoncePruner, self).__init__(log, abort_time)
@@ -601,7 +622,7 @@
         self.max_offset = self.store.execute(
             "SELECT MAX(id) FROM UnlinkedPeople").get_one()[0]
         if self.max_offset is None:
-            self.max_offset = -1 # Trigger isDone() now.
+            self.max_offset = -1  # Trigger isDone() now.
             self.log.debug("No Person records to remove.")
         else:
             self.log.info("%d Person records to remove." % self.max_offset)
@@ -684,36 +705,6 @@
         """
 
 
-class MirrorBugMessageOwner(TunableLoop):
-    """Mirror BugMessage.owner from Message.
-
-    Only needed until they are all set, after that triggers will maintain it.
-    """
-
-    # Test migration did 3M in 2 hours, so 5000 is ~ 10 seconds - and thats the
-    # max we want to hold a DB lock open for.
-    minimum_chunk_size = 1000
-    maximum_chunk_size = 5000
-
-    def __init__(self, log, abort_time=None):
-        super(MirrorBugMessageOwner, self).__init__(log, abort_time)
-        self.store = IMasterStore(BugMessage)
-        self.isDone = IMasterStore(BugMessage).find(
-            BugMessage, BugMessage.ownerID==None).is_empty
-
-    def __call__(self, chunk_size):
-        """See `ITunableLoop`."""
-        transaction.begin()
-        updated = self.store.execute("""update bugmessage set
-            owner=message.owner from message where
-            bugmessage.message=message.id and bugmessage.id in
-                (select id from bugmessage where owner is NULL limit %s);"""
-            % int(chunk_size)
-            ).rowcount
-        self.log.debug("Updated %s bugmessages." % updated)
-        transaction.commit()
-
-
 class BugHeatUpdater(TunableLoop):
     """A `TunableLoop` for bug heat calculations."""
 
@@ -802,7 +793,7 @@
 class OldTimeLimitedTokenDeleter(TunableLoop):
     """Delete expired url access tokens from the session DB."""
 
-    maximum_chunk_size = 24*60*60 # 24 hours in seconds.
+    maximum_chunk_size = 24 * 60 * 60  # 24 hours in seconds.
 
     def __init__(self, log, abort_time=None):
         super(OldTimeLimitedTokenDeleter, self).__init__(log, abort_time)
@@ -861,10 +852,10 @@
 
 class BaseDatabaseGarbageCollector(LaunchpadCronScript):
     """Abstract base class to run a collection of TunableLoops."""
-    script_name = None # Script name for locking and database user. Override.
-    tunable_loops = None # Collection of TunableLoops. Override.
-    continue_on_failure = False # If True, an exception in a tunable loop
-                                # does not cause the script to abort.
+    script_name = None  # Script name for locking and database user. Override.
+    tunable_loops = None  # Collection of TunableLoops. Override.
+    continue_on_failure = False  # If True, an exception in a tunable loop
+                                 # does not cause the script to abort.
 
     # Default run time of the script in seconds. Override.
     default_abort_script_time = None
@@ -915,7 +906,7 @@
         for count in range(0, self.options.threads):
             thread = threading.Thread(
                 target=self.run_tasks_in_thread,
-                name='Worker-%d' % (count+1,),
+                name='Worker-%d' % (count + 1,),
                 args=(tunable_loops,))
             thread.start()
             threads.add(thread)
@@ -949,7 +940,7 @@
 
     @property
     def script_timeout(self):
-        a_very_long_time = 31536000 # 1 year
+        a_very_long_time = 31536000  # 1 year
         return self.options.abort_script or a_very_long_time
 
     def get_loop_logger(self, loop_name):
@@ -962,7 +953,7 @@
         loop_logger = logging.getLogger('garbo.' + loop_name)
         for filter in loop_logger.filters:
             if isinstance(filter, PrefixFilter):
-                return loop_logger # Already have a PrefixFilter attached.
+                return loop_logger  # Already have a PrefixFilter attached.
         loop_logger.addFilter(PrefixFilter(loop_name))
         return loop_logger
 
@@ -1034,7 +1025,7 @@
                     loop_logger.debug3(
                         "Unable to acquire lock %s. Running elsewhere?",
                         loop_lock_path)
-                    time.sleep(0.3) # Avoid spinning.
+                    time.sleep(0.3)  # Avoid spinning.
                     tunable_loops.append(tunable_loop_class)
                 # Otherwise, emit a warning and skip the task.
                 else:
@@ -1073,16 +1064,38 @@
                 transaction.abort()
 
 
-class HourlyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
-    script_name = 'garbo-hourly'
+class FrequentDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
+    """Run every 5 minutes.
+
+    This may become even more frequent in the future.
+
+    Jobs with low overhead can go here to distribute work more evenly.
+    """
+    script_name = 'garbo-frequently'
     tunable_loops = [
-        MirrorBugMessageOwner,
+        BugSummaryJournalRollup,
         OAuthNoncePruner,
         OpenIDConsumerNoncePruner,
         OpenIDConsumerAssociationPruner,
+        AntiqueSessionPruner,
+        ]
+    experimental_tunable_loops = []
+
+    # 5 minmutes minus 20 seconds for cleanup. This helps ensure the
+    # script is fully terminated before the next scheduled hourly run
+    # kicks in.
+    default_abort_script_time = 60 * 5 - 20
+
+
+class HourlyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
+    """Run every hour.
+
+    Jobs we want to run fairly often but have noticable overhead go here.
+    """
+    script_name = 'garbo-hourly'
+    tunable_loops = [
         RevisionCachePruner,
         BugWatchScheduler,
-        AntiqueSessionPruner,
         UnusedSessionPruner,
         DuplicateSessionPruner,
         BugHeatUpdater,
@@ -1095,6 +1108,13 @@
 
 
 class DailyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
+    """Run every day.
+
+    Jobs that don't need to be run frequently.
+
+    If there is low overhead, consider putting these tasks in more
+    frequently invoked lists to distribute the work more evenly.
+    """
     script_name = 'garbo-daily'
     tunable_loops = [
         BranchJobPruner,

=== modified file 'lib/lp/scripts/tests/test_garbo.py'
--- lib/lp/scripts/tests/test_garbo.py	2011-07-05 05:46:02 +0000
+++ lib/lp/scripts/tests/test_garbo.py	2011-07-29 13:44:41 +0000
@@ -24,6 +24,10 @@
     Storm,
     )
 from storm.store import Store
+from testtools.matchers import (
+    Equals,
+    GreaterThan,
+    )
 import transaction
 from zope.component import getUtility
 from zope.security.proxy import removeSecurityProxy
@@ -55,7 +59,6 @@
     LaunchpadZopelessLayer,
     ZopelessDatabaseLayer,
     )
-from lp.bugs.model.bugmessage import BugMessage
 from lp.bugs.model.bugnotification import (
     BugNotification,
     BugNotificationRecipient,
@@ -81,6 +84,7 @@
     BulkPruner,
     DailyDatabaseGarbageCollector,
     DuplicateSessionPruner,
+    FrequentDatabaseGarbageCollector,
     HourlyDatabaseGarbageCollector,
     OpenIDConsumerAssociationPruner,
     UnusedSessionPruner,
@@ -359,12 +363,23 @@
         # starting us in a known state.
         self.runDaily()
         self.runHourly()
+        self.runFrequently()
 
         # Capture garbo log output to tests can examine it.
         self.log_buffer = StringIO()
         handler = logging.StreamHandler(self.log_buffer)
         self.log.addHandler(handler)
 
+    def runFrequently(self, maximum_chunk_size=2, test_args=()):
+        transaction.commit()
+        LaunchpadZopelessLayer.switchDbUser('garbo_daily')
+        collector = FrequentDatabaseGarbageCollector(
+            test_args=list(test_args))
+        collector._maximum_chunk_size = maximum_chunk_size
+        collector.logger = self.log
+        collector.main()
+        return collector
+
     def runDaily(self, maximum_chunk_size=2, test_args=()):
         transaction.commit()
         LaunchpadZopelessLayer.switchDbUser('garbo_daily')
@@ -385,10 +400,10 @@
     def test_OAuthNoncePruner(self):
         now = datetime.now(UTC)
         timestamps = [
-            now - timedelta(days=2), # Garbage
-            now - timedelta(days=1) - timedelta(seconds=60), # Garbage
-            now - timedelta(days=1) + timedelta(seconds=60), # Not garbage
-            now, # Not garbage
+            now - timedelta(days=2),  # Garbage
+            now - timedelta(days=1) - timedelta(seconds=60),  # Garbage
+            now - timedelta(days=1) + timedelta(seconds=60),  # Not garbage
+            now,  # Not garbage
             ]
         LaunchpadZopelessLayer.switchDbUser('testadmin')
         store = IMasterStore(OAuthNonce)
@@ -399,14 +414,15 @@
         for timestamp in timestamps:
             store.add(OAuthNonce(
                 access_token=OAuthAccessToken.get(1),
-                request_timestamp = timestamp,
-                nonce = str(timestamp)))
+                request_timestamp=timestamp,
+                nonce=str(timestamp)))
         transaction.commit()
 
         # Make sure we have 4 nonces now.
         self.failUnlessEqual(store.find(OAuthNonce).count(), 4)
 
-        self.runHourly(maximum_chunk_size=60) # 1 minute maximum chunk size
+        self.runFrequently(
+            maximum_chunk_size=60)  # 1 minute maximum chunk size
 
         store = IMasterStore(OAuthNonce)
 
@@ -428,10 +444,10 @@
         HOURS = 60 * 60
         DAYS = 24 * HOURS
         timestamps = [
-            now - 2 * DAYS, # Garbage
-            now - 1 * DAYS - 1 * MINUTES, # Garbage
-            now - 1 * DAYS + 1 * MINUTES, # Not garbage
-            now, # Not garbage
+            now - 2 * DAYS,  # Garbage
+            now - 1 * DAYS - 1 * MINUTES,  # Garbage
+            now - 1 * DAYS + 1 * MINUTES,  # Not garbage
+            now,  # Not garbage
             ]
         LaunchpadZopelessLayer.switchDbUser('testadmin')
 
@@ -449,7 +465,7 @@
         self.failUnlessEqual(store.find(OpenIDConsumerNonce).count(), 4)
 
         # Run the garbage collector.
-        self.runHourly(maximum_chunk_size=60) # 1 minute maximum chunks.
+        self.runFrequently(maximum_chunk_size=60)  # 1 minute maximum chunks.
 
         store = IMasterStore(OpenIDConsumerNonce)
 
@@ -458,7 +474,8 @@
 
         # And none of them are older than 1 day
         earliest = store.find(Min(OpenIDConsumerNonce.timestamp)).one()
-        self.failUnless(earliest >= now - 24*60*60, 'Still have old nonces')
+        self.failUnless(
+            earliest >= now - 24 * 60 * 60, 'Still have old nonces')
 
     def test_CodeImportResultPruner(self):
         now = datetime.now(UTC)
@@ -485,7 +502,7 @@
 
         new_code_import_result(now - timedelta(days=60))
         for i in range(results_to_keep_count - 1):
-            new_code_import_result(now - timedelta(days=19+i))
+            new_code_import_result(now - timedelta(days=19 + i))
 
         # Run the garbage collector
         self.runDaily()
@@ -558,7 +575,7 @@
             store.execute("""
                 INSERT INTO %s (server_url, handle, issued, lifetime)
                 VALUES (%s, %s, %d, %d)
-                """ % (table_name, str(delta), str(delta), now-10, delta))
+                """ % (table_name, str(delta), str(delta), now - 10, delta))
         transaction.commit()
 
         # Ensure that we created at least one expirable row (using the
@@ -571,7 +588,7 @@
 
         # Expire all those expirable rows, and possibly a few more if this
         # test is running slow.
-        self.runHourly()
+        self.runFrequently()
 
         LaunchpadZopelessLayer.switchDbUser('testadmin')
         store = store_selector.get(MAIN_STORE, MASTER_FLAVOR)
@@ -879,21 +896,22 @@
 
         self.assertEqual(1, count)
 
-    def test_mirror_bugmessages(self):
-        # Nuke the owner in sampledata.
-        con = DatabaseLayer._db_fixture.superuser_connection()
-        try:
-            cur = con.cursor()
-            cur.execute("ALTER TABLE bugmessage "
-                "DISABLE TRIGGER bugmessage__owner__mirror")
-            cur.execute("UPDATE bugmessage set owner=NULL")
-            cur.execute("ALTER TABLE bugmessage "
-                "ENABLE TRIGGER bugmessage__owner__mirror")
-            con.commit()
-        finally:
-            con.close()
-        store = IMasterStore(BugMessage)
-        unmigrated = store.find(BugMessage, BugMessage.ownerID==None).count
-        self.assertNotEqual(0, unmigrated())
-        self.runHourly()
-        self.assertEqual(0, unmigrated())
+    def test_BugSummaryJournalRollup(self):
+        LaunchpadZopelessLayer.switchDbUser('testadmin')
+        store = getUtility(IStoreSelector).get(MAIN_STORE, MASTER_FLAVOR)
+
+        # Generate a load of entries in BugSummaryJournal.
+        store.execute("UPDATE BugTask SET status=42")
+
+        # We only need a few to test.
+        num_rows = store.execute(
+            "SELECT COUNT(*) FROM BugSummaryJournal").get_one()[0]
+        self.assertThat(num_rows, GreaterThan(10))
+
+        self.runFrequently()
+
+        # We just care that the rows have been removed. The bugsummary
+        # tests confirm that the rollup stored method is working correctly.
+        num_rows = store.execute(
+            "SELECT COUNT(*) FROM BugSummaryJournal").get_one()[0]
+        self.assertThat(num_rows, Equals(0))


Follow ups