launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #04427
[Merge] lp:~stub/launchpad/garbo into lp:launchpad/db-devel
Stuart Bishop has proposed merging lp:~stub/launchpad/garbo into lp:launchpad/db-devel.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #768139 in Launchpad itself: "gabo script timeout behavior needs tweaking"
https://bugs.launchpad.net/launchpad/+bug/768139
Bug #795305 in Launchpad itself: "bugsummaryjournal is not automatically rolled up"
https://bugs.launchpad.net/launchpad/+bug/795305
For more details, see:
https://code.launchpad.net/~stub/launchpad/garbo/+merge/69792
Implement a 5 minute garbo job running and use it to fix Bug #795305.
Also a bit of delinting of touched code, and some minor garbo tunings (reducing the default transaction goal time to 2 seconds, and moving some other jobs to the fequent garbo runner to spread the load more evenly).
--
https://code.launchpad.net/~stub/launchpad/garbo/+merge/69792
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~stub/launchpad/garbo into lp:launchpad/db-devel.
=== added file 'cronscripts/garbo-frequently.py'
--- cronscripts/garbo-frequently.py 1970-01-01 00:00:00 +0000
+++ cronscripts/garbo-frequently.py 2011-07-29 13:44:41 +0000
@@ -0,0 +1,23 @@
+#!/usr/bin/python -S
+#
+# Copyright 2011 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Database garbage collector, every 5 minutes.
+
+Remove or archive unwanted data. Detect, warn and possibly repair data
+corruption.
+"""
+
+__metaclass__ = type
+__all__ = []
+
+import _pythonpath
+
+from lp.scripts.garbo import FrequentDatabaseGarbageCollector
+
+
+if __name__ == '__main__':
+ script = FrequentDatabaseGarbageCollector()
+ script.continue_on_failure = True
+ script.lock_and_run()
=== modified file 'database/schema/security.cfg'
--- database/schema/security.cfg 2011-07-28 12:58:14 +0000
+++ database/schema/security.cfg 2011-07-29 13:44:41 +0000
@@ -2187,6 +2187,10 @@
groups=garbo
type=user
+[garbo_frequently]
+groups=garbo
+type=user
+
[generateppahtaccess]
groups=script
public.archive = SELECT
=== modified file 'lib/canonical/launchpad/utilities/looptuner.py'
--- lib/canonical/launchpad/utilities/looptuner.py 2011-04-12 09:57:20 +0000
+++ lib/canonical/launchpad/utilities/looptuner.py 2011-07-29 13:44:41 +0000
@@ -311,7 +311,7 @@
"""A base implementation of `ITunableLoop`."""
implements(ITunableLoop)
- goal_seconds = 4
+ goal_seconds = 2
minimum_chunk_size = 1
maximum_chunk_size = None # Override
cooldown_time = 0
=== modified file 'lib/lp/scripts/garbo.py'
--- lib/lp/scripts/garbo.py 2011-07-05 05:46:02 +0000
+++ lib/lp/scripts/garbo.py 2011-07-29 13:44:41 +0000
@@ -56,7 +56,6 @@
from lp.bugs.interfaces.bug import IBugSet
from lp.bugs.model.bug import Bug
from lp.bugs.model.bugattachment import BugAttachment
-from lp.bugs.model.bugmessage import BugMessage
from lp.bugs.model.bugnotification import BugNotification
from lp.bugs.model.bugwatch import BugWatchActivity
from lp.bugs.scripts.checkwatches.scheduler import (
@@ -84,7 +83,7 @@
from lp.translations.model.potranslation import POTranslation
-ONE_DAY_IN_SECONDS = 24*60*60
+ONE_DAY_IN_SECONDS = 24 * 60 * 60
class BulkPruner(TunableLoop):
@@ -290,12 +289,34 @@
"""
+class BugSummaryJournalRollup(TunableLoop):
+ """Rollup BugSummaryJournal rows into BugSummary."""
+ maximum_chunk_size = 5000
+
+ def __init__(self, log, abort_time=None):
+ super(BugSummaryJournalRollup, self).__init__(log, abort_time)
+ self.store = getUtility(IStoreSelector).get(MAIN_STORE, MASTER_FLAVOR)
+
+ def isDone(self):
+ has_more = self.store.execute(
+ "SELECT EXISTS (SELECT TRUE FROM BugSummaryJournal LIMIT 1)"
+ ).get_one()[0]
+ return not has_more
+
+ def __call__(self, chunk_size):
+ chunk_size = int(chunk_size + 0.5)
+ self.store.execute(
+ "SELECT bugsummary_rollup_journal(%s)", (chunk_size,),
+ noresult=True)
+ self.store.commit()
+
+
class OpenIDConsumerNoncePruner(TunableLoop):
"""An ITunableLoop to prune old OpenIDConsumerNonce records.
We remove all OpenIDConsumerNonce records older than 1 day.
"""
- maximum_chunk_size = 6*60*60 # 6 hours in seconds.
+ maximum_chunk_size = 6 * 60 * 60 # 6 hours in seconds.
def __init__(self, log, abort_time=None):
super(OpenIDConsumerNoncePruner, self).__init__(log, abort_time)
@@ -601,7 +622,7 @@
self.max_offset = self.store.execute(
"SELECT MAX(id) FROM UnlinkedPeople").get_one()[0]
if self.max_offset is None:
- self.max_offset = -1 # Trigger isDone() now.
+ self.max_offset = -1 # Trigger isDone() now.
self.log.debug("No Person records to remove.")
else:
self.log.info("%d Person records to remove." % self.max_offset)
@@ -684,36 +705,6 @@
"""
-class MirrorBugMessageOwner(TunableLoop):
- """Mirror BugMessage.owner from Message.
-
- Only needed until they are all set, after that triggers will maintain it.
- """
-
- # Test migration did 3M in 2 hours, so 5000 is ~ 10 seconds - and thats the
- # max we want to hold a DB lock open for.
- minimum_chunk_size = 1000
- maximum_chunk_size = 5000
-
- def __init__(self, log, abort_time=None):
- super(MirrorBugMessageOwner, self).__init__(log, abort_time)
- self.store = IMasterStore(BugMessage)
- self.isDone = IMasterStore(BugMessage).find(
- BugMessage, BugMessage.ownerID==None).is_empty
-
- def __call__(self, chunk_size):
- """See `ITunableLoop`."""
- transaction.begin()
- updated = self.store.execute("""update bugmessage set
- owner=message.owner from message where
- bugmessage.message=message.id and bugmessage.id in
- (select id from bugmessage where owner is NULL limit %s);"""
- % int(chunk_size)
- ).rowcount
- self.log.debug("Updated %s bugmessages." % updated)
- transaction.commit()
-
-
class BugHeatUpdater(TunableLoop):
"""A `TunableLoop` for bug heat calculations."""
@@ -802,7 +793,7 @@
class OldTimeLimitedTokenDeleter(TunableLoop):
"""Delete expired url access tokens from the session DB."""
- maximum_chunk_size = 24*60*60 # 24 hours in seconds.
+ maximum_chunk_size = 24 * 60 * 60 # 24 hours in seconds.
def __init__(self, log, abort_time=None):
super(OldTimeLimitedTokenDeleter, self).__init__(log, abort_time)
@@ -861,10 +852,10 @@
class BaseDatabaseGarbageCollector(LaunchpadCronScript):
"""Abstract base class to run a collection of TunableLoops."""
- script_name = None # Script name for locking and database user. Override.
- tunable_loops = None # Collection of TunableLoops. Override.
- continue_on_failure = False # If True, an exception in a tunable loop
- # does not cause the script to abort.
+ script_name = None # Script name for locking and database user. Override.
+ tunable_loops = None # Collection of TunableLoops. Override.
+ continue_on_failure = False # If True, an exception in a tunable loop
+ # does not cause the script to abort.
# Default run time of the script in seconds. Override.
default_abort_script_time = None
@@ -915,7 +906,7 @@
for count in range(0, self.options.threads):
thread = threading.Thread(
target=self.run_tasks_in_thread,
- name='Worker-%d' % (count+1,),
+ name='Worker-%d' % (count + 1,),
args=(tunable_loops,))
thread.start()
threads.add(thread)
@@ -949,7 +940,7 @@
@property
def script_timeout(self):
- a_very_long_time = 31536000 # 1 year
+ a_very_long_time = 31536000 # 1 year
return self.options.abort_script or a_very_long_time
def get_loop_logger(self, loop_name):
@@ -962,7 +953,7 @@
loop_logger = logging.getLogger('garbo.' + loop_name)
for filter in loop_logger.filters:
if isinstance(filter, PrefixFilter):
- return loop_logger # Already have a PrefixFilter attached.
+ return loop_logger # Already have a PrefixFilter attached.
loop_logger.addFilter(PrefixFilter(loop_name))
return loop_logger
@@ -1034,7 +1025,7 @@
loop_logger.debug3(
"Unable to acquire lock %s. Running elsewhere?",
loop_lock_path)
- time.sleep(0.3) # Avoid spinning.
+ time.sleep(0.3) # Avoid spinning.
tunable_loops.append(tunable_loop_class)
# Otherwise, emit a warning and skip the task.
else:
@@ -1073,16 +1064,38 @@
transaction.abort()
-class HourlyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
- script_name = 'garbo-hourly'
+class FrequentDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
+ """Run every 5 minutes.
+
+ This may become even more frequent in the future.
+
+ Jobs with low overhead can go here to distribute work more evenly.
+ """
+ script_name = 'garbo-frequently'
tunable_loops = [
- MirrorBugMessageOwner,
+ BugSummaryJournalRollup,
OAuthNoncePruner,
OpenIDConsumerNoncePruner,
OpenIDConsumerAssociationPruner,
+ AntiqueSessionPruner,
+ ]
+ experimental_tunable_loops = []
+
+ # 5 minmutes minus 20 seconds for cleanup. This helps ensure the
+ # script is fully terminated before the next scheduled hourly run
+ # kicks in.
+ default_abort_script_time = 60 * 5 - 20
+
+
+class HourlyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
+ """Run every hour.
+
+ Jobs we want to run fairly often but have noticable overhead go here.
+ """
+ script_name = 'garbo-hourly'
+ tunable_loops = [
RevisionCachePruner,
BugWatchScheduler,
- AntiqueSessionPruner,
UnusedSessionPruner,
DuplicateSessionPruner,
BugHeatUpdater,
@@ -1095,6 +1108,13 @@
class DailyDatabaseGarbageCollector(BaseDatabaseGarbageCollector):
+ """Run every day.
+
+ Jobs that don't need to be run frequently.
+
+ If there is low overhead, consider putting these tasks in more
+ frequently invoked lists to distribute the work more evenly.
+ """
script_name = 'garbo-daily'
tunable_loops = [
BranchJobPruner,
=== modified file 'lib/lp/scripts/tests/test_garbo.py'
--- lib/lp/scripts/tests/test_garbo.py 2011-07-05 05:46:02 +0000
+++ lib/lp/scripts/tests/test_garbo.py 2011-07-29 13:44:41 +0000
@@ -24,6 +24,10 @@
Storm,
)
from storm.store import Store
+from testtools.matchers import (
+ Equals,
+ GreaterThan,
+ )
import transaction
from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
@@ -55,7 +59,6 @@
LaunchpadZopelessLayer,
ZopelessDatabaseLayer,
)
-from lp.bugs.model.bugmessage import BugMessage
from lp.bugs.model.bugnotification import (
BugNotification,
BugNotificationRecipient,
@@ -81,6 +84,7 @@
BulkPruner,
DailyDatabaseGarbageCollector,
DuplicateSessionPruner,
+ FrequentDatabaseGarbageCollector,
HourlyDatabaseGarbageCollector,
OpenIDConsumerAssociationPruner,
UnusedSessionPruner,
@@ -359,12 +363,23 @@
# starting us in a known state.
self.runDaily()
self.runHourly()
+ self.runFrequently()
# Capture garbo log output to tests can examine it.
self.log_buffer = StringIO()
handler = logging.StreamHandler(self.log_buffer)
self.log.addHandler(handler)
+ def runFrequently(self, maximum_chunk_size=2, test_args=()):
+ transaction.commit()
+ LaunchpadZopelessLayer.switchDbUser('garbo_daily')
+ collector = FrequentDatabaseGarbageCollector(
+ test_args=list(test_args))
+ collector._maximum_chunk_size = maximum_chunk_size
+ collector.logger = self.log
+ collector.main()
+ return collector
+
def runDaily(self, maximum_chunk_size=2, test_args=()):
transaction.commit()
LaunchpadZopelessLayer.switchDbUser('garbo_daily')
@@ -385,10 +400,10 @@
def test_OAuthNoncePruner(self):
now = datetime.now(UTC)
timestamps = [
- now - timedelta(days=2), # Garbage
- now - timedelta(days=1) - timedelta(seconds=60), # Garbage
- now - timedelta(days=1) + timedelta(seconds=60), # Not garbage
- now, # Not garbage
+ now - timedelta(days=2), # Garbage
+ now - timedelta(days=1) - timedelta(seconds=60), # Garbage
+ now - timedelta(days=1) + timedelta(seconds=60), # Not garbage
+ now, # Not garbage
]
LaunchpadZopelessLayer.switchDbUser('testadmin')
store = IMasterStore(OAuthNonce)
@@ -399,14 +414,15 @@
for timestamp in timestamps:
store.add(OAuthNonce(
access_token=OAuthAccessToken.get(1),
- request_timestamp = timestamp,
- nonce = str(timestamp)))
+ request_timestamp=timestamp,
+ nonce=str(timestamp)))
transaction.commit()
# Make sure we have 4 nonces now.
self.failUnlessEqual(store.find(OAuthNonce).count(), 4)
- self.runHourly(maximum_chunk_size=60) # 1 minute maximum chunk size
+ self.runFrequently(
+ maximum_chunk_size=60) # 1 minute maximum chunk size
store = IMasterStore(OAuthNonce)
@@ -428,10 +444,10 @@
HOURS = 60 * 60
DAYS = 24 * HOURS
timestamps = [
- now - 2 * DAYS, # Garbage
- now - 1 * DAYS - 1 * MINUTES, # Garbage
- now - 1 * DAYS + 1 * MINUTES, # Not garbage
- now, # Not garbage
+ now - 2 * DAYS, # Garbage
+ now - 1 * DAYS - 1 * MINUTES, # Garbage
+ now - 1 * DAYS + 1 * MINUTES, # Not garbage
+ now, # Not garbage
]
LaunchpadZopelessLayer.switchDbUser('testadmin')
@@ -449,7 +465,7 @@
self.failUnlessEqual(store.find(OpenIDConsumerNonce).count(), 4)
# Run the garbage collector.
- self.runHourly(maximum_chunk_size=60) # 1 minute maximum chunks.
+ self.runFrequently(maximum_chunk_size=60) # 1 minute maximum chunks.
store = IMasterStore(OpenIDConsumerNonce)
@@ -458,7 +474,8 @@
# And none of them are older than 1 day
earliest = store.find(Min(OpenIDConsumerNonce.timestamp)).one()
- self.failUnless(earliest >= now - 24*60*60, 'Still have old nonces')
+ self.failUnless(
+ earliest >= now - 24 * 60 * 60, 'Still have old nonces')
def test_CodeImportResultPruner(self):
now = datetime.now(UTC)
@@ -485,7 +502,7 @@
new_code_import_result(now - timedelta(days=60))
for i in range(results_to_keep_count - 1):
- new_code_import_result(now - timedelta(days=19+i))
+ new_code_import_result(now - timedelta(days=19 + i))
# Run the garbage collector
self.runDaily()
@@ -558,7 +575,7 @@
store.execute("""
INSERT INTO %s (server_url, handle, issued, lifetime)
VALUES (%s, %s, %d, %d)
- """ % (table_name, str(delta), str(delta), now-10, delta))
+ """ % (table_name, str(delta), str(delta), now - 10, delta))
transaction.commit()
# Ensure that we created at least one expirable row (using the
@@ -571,7 +588,7 @@
# Expire all those expirable rows, and possibly a few more if this
# test is running slow.
- self.runHourly()
+ self.runFrequently()
LaunchpadZopelessLayer.switchDbUser('testadmin')
store = store_selector.get(MAIN_STORE, MASTER_FLAVOR)
@@ -879,21 +896,22 @@
self.assertEqual(1, count)
- def test_mirror_bugmessages(self):
- # Nuke the owner in sampledata.
- con = DatabaseLayer._db_fixture.superuser_connection()
- try:
- cur = con.cursor()
- cur.execute("ALTER TABLE bugmessage "
- "DISABLE TRIGGER bugmessage__owner__mirror")
- cur.execute("UPDATE bugmessage set owner=NULL")
- cur.execute("ALTER TABLE bugmessage "
- "ENABLE TRIGGER bugmessage__owner__mirror")
- con.commit()
- finally:
- con.close()
- store = IMasterStore(BugMessage)
- unmigrated = store.find(BugMessage, BugMessage.ownerID==None).count
- self.assertNotEqual(0, unmigrated())
- self.runHourly()
- self.assertEqual(0, unmigrated())
+ def test_BugSummaryJournalRollup(self):
+ LaunchpadZopelessLayer.switchDbUser('testadmin')
+ store = getUtility(IStoreSelector).get(MAIN_STORE, MASTER_FLAVOR)
+
+ # Generate a load of entries in BugSummaryJournal.
+ store.execute("UPDATE BugTask SET status=42")
+
+ # We only need a few to test.
+ num_rows = store.execute(
+ "SELECT COUNT(*) FROM BugSummaryJournal").get_one()[0]
+ self.assertThat(num_rows, GreaterThan(10))
+
+ self.runFrequently()
+
+ # We just care that the rows have been removed. The bugsummary
+ # tests confirm that the rollup stored method is working correctly.
+ num_rows = store.execute(
+ "SELECT COUNT(*) FROM BugSummaryJournal").get_one()[0]
+ self.assertThat(num_rows, Equals(0))
Follow ups