← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~stub/launchpad/db-deploy into lp:launchpad

 

Stuart Bishop has proposed merging lp:~stub/launchpad/db-deploy into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #906222 in Launchpad itself: "More BAD_USERS creating false positives during fast downtime updates"
  https://bugs.launchpad.net/launchpad/+bug/906222
  Bug #911417 in Launchpad itself: "Staging db restore fails on make -C database/replication stagingsetup"
  https://bugs.launchpad.net/launchpad/+bug/911417
  Bug #1025396 in Launchpad itself: "deployment scripts need to cope with streaming replication"
  https://bugs.launchpad.net/launchpad/+bug/1025396
  Bug #1025399 in Launchpad itself: "full-update.py should wait for streaming slaves before completing"
  https://bugs.launchpad.net/launchpad/+bug/1025399
  Bug #1036694 in Launchpad itself: "security.py still references slony"
  https://bugs.launchpad.net/launchpad/+bug/1036694

For more details, see:
https://code.launchpad.net/~stub/launchpad/db-deploy/+merge/125472


-- 
https://code.launchpad.net/~stub/launchpad/db-deploy/+merge/125472
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~stub/launchpad/db-deploy into lp:launchpad.
=== modified file 'database/replication/helpers.py'
--- database/replication/helpers.py	2012-04-02 10:36:55 +0000
+++ database/replication/helpers.py	2012-09-20 12:58:35 +0000
@@ -241,7 +241,7 @@
         self.is_master = is_master
 
     def connect(self, isolation=ISOLATION_LEVEL_DEFAULT):
-        con = psycopg2.connect(self.connection_string)
+        con = psycopg2.connect(str(self.connection_string))
         con.set_isolation_level(isolation)
         return con
 

=== added file 'database/schema/dbcontroller.py'
--- database/schema/dbcontroller.py	1970-01-01 00:00:00 +0000
+++ database/schema/dbcontroller.py	2012-09-20 12:58:35 +0000
@@ -0,0 +1,211 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Connect to and control our pgbouncer instance."""
+
+__metaclass__ = type
+__all__ = ['DBController', 'streaming_sync']
+
+import time
+
+import psycopg2
+from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
+from psycopg2.extras import NamedTupleConnection
+
+from lp.services.database.postgresql import ConnectionString
+
+
+# Increase this timeout once we are confident in the
+# implementation. We don't want to block rollouts
+# unnecessarily with slow timeouts and a flaky sync
+# detection implementation.
+STREAMING_SYNC_TIMEOUT = 60
+
+
+def pg_connect(conn_str):
+    con = psycopg2.connect(
+        str(conn_str), connection_factory=NamedTupleConnection)
+    con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
+    return con
+
+
+def streaming_sync(con, timeout=None):
+    """Wait for streaming replicas to synchronize with master as of now.
+
+    :param timeout: seconds to wait, None for no timeout.
+
+    :returns: True if sync happened or no streaming replicas
+              False if the timeout was passed.
+    """
+    cur = con.cursor()
+
+    # Force a WAL switch, returning the current position.
+    cur.execute('SELECT pg_switch_xlog()')
+    wal_point = cur.fetchone()[0]
+    start_time = time.time()
+    while timeout is None or time.time() < start_time + timeout:
+        cur.execute("""
+            SELECT FALSE FROM pg_stat_replication
+            WHERE replay_location < %s LIMIT 1
+            """, (wal_point,))
+        if cur.fetchone() is None:
+            # All slaves, possibly 0, are in sync.
+            return True
+        time.sleep(0.2)
+    return False
+
+
+class DBController:
+    def __init__(self, log, pgbouncer_conn_str, dbname, dbuser):
+        self.log = log
+
+        pgbouncer_conn_str = ConnectionString(pgbouncer_conn_str)
+        if not pgbouncer_conn_str.dbname:
+            pgbouncer_conn_str.dbname = 'pgbouncer'
+        if pgbouncer_conn_str.dbname != 'pgbouncer':
+            log.warn("pgbouncer administrative database not named 'pgbouncer'")
+        self.pgbouncer_con = pg_connect(pgbouncer_conn_str)
+
+        self.master_name = None
+        self.master = None
+        self.slaves = {}
+
+        for db in self.pgbouncer_cmd('show databases', results=True):
+            if db.database != dbname:
+                continue
+
+            conn_str = ConnectionString(
+                'dbname=%s port=%s user=%s' % (dbname, db.port, dbuser))
+            if db.host:
+                conn_str.host = db.host
+            con = pg_connect(conn_str)
+            cur = con.cursor()
+            cur.execute('select pg_is_in_recovery()')
+            if cur.fetchone()[0] is True:
+                self.slaves[db.name] = conn_str
+            else:
+                self.master_name = db.name
+                self.master = conn_str
+
+        if self.master_name is None:
+            log.fatal('No master detected.')
+            raise SystemExit(98)
+
+    def pgbouncer_cmd(self, cmd, results):
+        cur = self.pgbouncer_con.cursor()
+        cur.execute(cmd)
+        if results:
+            return cur.fetchall()
+
+    def pause_replication(self):
+        names = self.slaves.keys()
+        self.log.info("Pausing replication to %s.", ', '.join(names))
+        for name, conn_str in self.slaves.items():
+            try:
+                con = pg_connect(conn_str)
+                cur = con.cursor()
+                cur.execute('select pg_xlog_replay_pause()')
+            except psycopg2.Error, x:
+                self.log.error(
+                    'Unable to pause replication to %s (%s).'
+                    % (name, str(x)))
+                return False
+        return True
+
+    def resume_replication(self):
+        names = self.slaves.keys()
+        self.log.info("Resuming replication to %s.", ', '.join(names))
+        success = True
+        for name, conn_str in self.slaves.items():
+            try:
+                con = pg_connect(conn_str)
+                cur = con.cursor()
+                cur.execute('select pg_xlog_replay_resume()')
+            except psycopg2.Error, x:
+                success = False
+                self.log.error(
+                    'Failed to resume replication to %s (%s).'
+                    % (name, str(x)))
+        return success
+
+    def ensure_replication_enabled(self):
+        """Force replication back on.
+
+        It may have been disabled if a previous run failed horribly,
+        or just admin error. Either way, we are trying to make the
+        scheduled downtime window so automate this.
+        """
+        success = True
+        wait_for_sync = False
+        for name, conn_str in self.slaves.items():
+            try:
+                con = pg_connect(conn_str)
+                cur = con.cursor()
+                cur.execute("SELECT pg_is_xlog_replay_paused()")
+                replication_paused = cur.fetchone()[0]
+                if replication_paused:
+                    self.log.warn("Replication paused on %s. Resuming.", name)
+                    cur.execute("SELECT pg_xlog_replay_resume()")
+                    wait_for_sync = True
+            except psycopg2.Error, x:
+                success = False
+                self.log.error(
+                    "Failed to resume replication on %s (%s)", name, str(x))
+        if success and wait_for_sync:
+            self.sync()
+        return success
+
+    def disable(self, name):
+        try:
+            self.pgbouncer_cmd("DISABLE %s" % name, results=False)
+            self.pgbouncer_cmd("KILL %s" % name, results=False)
+            return True
+        except psycopg2.Error, x:
+            self.log.error("Unable to disable %s (%s)", name, str(x))
+            return False
+
+    def enable(self, name):
+        try:
+            self.pgbouncer_cmd("RESUME %s" % name, results=False)
+            self.pgbouncer_cmd("ENABLE %s" % name, results=False)
+            return True
+        except psycopg2.Error, x:
+            self.log.error("Unable to enable %s (%s)", name, str(x))
+            return False
+
+    def disable_master(self):
+        self.log.info("Disabling access to %s.", self.master_name)
+        return self.disable(self.master_name)
+
+    def enable_master(self):
+        self.log.info("Enabling access to %s.", self.master_name)
+        return self.enable(self.master_name)
+
+    def disable_slaves(self):
+        names = self.slaves.keys()
+        self.log.info(
+            "Disabling access to %s.", ', '.join(names))
+        for name in self.slaves.keys():
+            if not self.disable(name):
+                return False  # Don't do further damage if we failed.
+        return True
+
+    def enable_slaves(self):
+        names = self.slaves.keys()
+        self.log.info(
+            "Enabling access to %s.", ', '.join(names))
+        success = True
+        for name in self.slaves.keys():
+            if not self.enable(name):
+                success = False
+        return success
+
+    def sync(self):
+        sync = streaming_sync(pg_connect(self.master), STREAMING_SYNC_TIMEOUT)
+        if sync:
+            self.log.debug('Slaves in sync.')
+        else:
+            self.log.error(
+                'Slaves failed to sync after %d seconds.',
+                STREAMING_SYNC_TIMEOUT)
+        return sync

=== modified file 'database/schema/full-update.py'
--- database/schema/full-update.py	2012-08-30 07:55:24 +0000
+++ database/schema/full-update.py	2012-09-20 12:58:35 +0000
@@ -8,60 +8,41 @@
 
 from datetime import datetime
 from optparse import OptionParser
-import subprocess
+import psycopg2
 import sys
 
-from lp.services.database.sqlbase import (
-    connect,
-    ISOLATION_LEVEL_AUTOCOMMIT,
-    )
+from lp.services.database.postgresql import ConnectionString
 from lp.services.scripts import (
-    db_options,
     logger,
     logger_options,
     )
+from dbcontroller import DBController
 from preflight import (
     KillConnectionsPreflight,
     NoConnectionCheckPreflight,
-    streaming_sync
+    SYSTEM_USERS
     )
 import security  # security.py script
 import upgrade  # upgrade.py script
 
 
-PGBOUNCER_INITD = ['sudo', '/etc/init.d/pgbouncer']
-
-
-def run_pgbouncer(log, cmd):
-    """Invoke the pgbouncer initscript.
-
-    :param cmd: One of 'start', 'stop' or 'status'.
-    """
-    assert cmd in ('start', 'stop', 'status'), '''
-        Unrecognized command; remember any new commands need to be
-        granted sudo on staging and prod.
-        '''
-    pgbouncer_rc = subprocess.call(PGBOUNCER_INITD + [cmd])
-    sys.stdout.flush()
-    if pgbouncer_rc != 0:
-        log.error("pgbouncer '%s' failed [%s]", cmd, pgbouncer_rc)
-    return pgbouncer_rc
-
-
-def run_upgrade(options, log):
+def run_upgrade(options, log, master_con):
     """Invoke upgrade.py in-process.
 
     It would be easier to just invoke the script, but this way we save
     several seconds of overhead as the component architecture loads up.
     """
     # Fake expected command line arguments and global log
-    options.commit = True
-    options.partial = False
     upgrade.options = options
     upgrade.log = log
+    # upgrade.py doesn't commit, because we are sharing the transaction
+    # with security.py. We want schema updates and security changes
+    # applied in the same transaction.
+    options.commit = False
+    options.partial = False
     # Invoke the database schema upgrade process.
     try:
-        return upgrade.main()
+        return upgrade.main(master_con)
     except Exception:
         log.exception('Unhandled exception')
         return 1
@@ -69,7 +50,7 @@
         log.fatal("upgrade.py failed [%s]", x)
 
 
-def run_security(options, log):
+def run_security(options, log, master_con):
     """Invoke security.py in-process.
 
     It would be easier to just invoke the script, but this way we save
@@ -83,7 +64,7 @@
     security.log = log
     # Invoke the database security reset process.
     try:
-        return security.main(options)
+        return security.main(options, master_con)
     except Exception:
         log.exception('Unhandled exception')
         return 1
@@ -93,37 +74,45 @@
 
 def main():
     parser = OptionParser()
-
-    # Unfortunatly, we can't reliably detect streaming replicas so
-    # we pass them in on the command line.
     parser.add_option(
-        '--standby', dest='standbys', default=[], action="append",
+        '--pgbouncer', dest='pgbouncer',
+        default='host=localhost port=6432 user=pgbouncer',
         metavar='CONN_STR',
-        help="libpq connection string to a hot standby database")
+        help="libpq connection string to administer pgbouncer")
+    parser.add_option(
+        '--dbname', dest='dbname', default='launchpad_prod', metavar='DBNAME',
+        help='Database name we are updating.')
+    parser.add_option(
+        '--dbuser', dest='dbuser', default='postgres', metavar='USERNAME',
+        help='Connect as USERNAME to databases')
 
-    # Add all the command command line arguments.
-    db_options(parser)
     logger_options(parser)
     (options, args) = parser.parse_args()
     if args:
         parser.error("Too many arguments")
 
+    # In case we are connected as a non-standard superuser, ensure we
+    # don't kill our own connections.
+    SYSTEM_USERS.add(options.dbuser)
+
     log = logger(options)
 
-    #
+    controller = DBController(
+        log, options.pgbouncer, options.dbname, options.dbuser)
+
+    try:
+        # Master connection, not running in autocommit to allow us to
+        # rollback changes on failure.
+        master_con = psycopg2.connect(str(controller.master))
+    except Exception, x:
+        log.fatal("Unable to open connection to master db (%s)", str(x))
+        return 94
+
     # Preflight checks. Confirm as best we can that the upgrade will
-    # work unattended.
-    #
-
-    # Confirm we can invoke PGBOUNCER_INITD
-    log.debug("Confirming sudo access to pgbouncer startup script")
-    pgbouncer_rc = run_pgbouncer(log, 'status')
-    if pgbouncer_rc != 0:
-        return pgbouncer_rc
-
-    # We initially ignore open connections, as they will shortly be
-    # killed.
-    if not NoConnectionCheckPreflight(log, options.standbys).check_all():
+    # work unattended. Here we ignore open connections, as they
+    # will shortly be killed.
+    controller.ensure_replication_enabled()
+    if not NoConnectionCheckPreflight(log, controller).check_all():
         return 99
 
     #
@@ -132,94 +121,105 @@
     #
 
     # status flags
-    pgbouncer_down = False
     upgrade_run = False
     security_run = False
-
-    outage_start = datetime.now()
+    replication_paused = False
+    master_disabled = False
+    slaves_disabled = False
+    outage_start = None
 
     try:
-        # Shutdown pgbouncer
-        log.info("Outage starts. Shutting down pgbouncer.")
-        pgbouncer_rc = run_pgbouncer(log, 'stop')
-        if pgbouncer_rc != 0:
-            log.fatal("pgbouncer not shut down [%s]", pgbouncer_rc)
-            return pgbouncer_rc
-        pgbouncer_down = True
-
-        if not KillConnectionsPreflight(log, options.standbys).check_all():
+        # Pause replication.
+        replication_paused = controller.pause_replication()
+        if not replication_paused:
+            return 93
+
+        # Start the outage clock.
+        log.info("Outage starts.")
+        outage_start = datetime.now()
+
+        # Disable access and kill connections to the master database.
+        master_disabled = controller.disable_master()
+        if not master_disabled:
+            return 95
+
+        if not KillConnectionsPreflight(
+            log, controller,
+            replication_paused=replication_paused).check_all():
             return 100
 
         log.info("Preflight check succeeded. Starting upgrade.")
-        upgrade_rc = run_upgrade(options, log)
-        if upgrade_rc != 0:
+        # Does not commit master_con, even on success.
+        upgrade_rc = run_upgrade(options, log, master_con)
+        upgrade_run = (upgrade_rc == 0)
+        if not upgrade_run:
             return upgrade_rc
-        upgrade_run = True
-        log.info("Database patches applied. Stored procedures updated.")
+        log.info("Database patches applied.")
 
-        security_rc = run_security(options, log)
-        if security_rc != 0:
+        # Commits master_con on success.
+        security_rc = run_security(options, log, master_con)
+        security_run = (security_rc == 0)
+        if not security_run:
             return security_rc
-        security_run = True
-
-        log.info("All database upgrade steps completed. Waiting for sync.")
-
-        # Increase this timeout once we are confident in the implementation.
-        # We don't want to block rollouts unnecessarily with slow
-        # timeouts and a flaky sync detection implementation.
-        streaming_sync_timeout = 60
-
-        sync = streaming_sync(
-            connect(isolation=ISOLATION_LEVEL_AUTOCOMMIT),
-            streaming_sync_timeout)
-        if sync:
-            log.debug('Streaming replicas in sync.')
+
+        master_disabled = not controller.enable_master()
+        if master_disabled:
+            log.warn("Outage ongoing until pgbouncer bounced.")
+            return 96
         else:
+            log.info("Outage complete. %s", datetime.now() - outage_start)
+
+        slaves_disabled = controller.disable_slaves()
+
+        # Resume replication.
+        replication_paused = not controller.resume_replication()
+        if replication_paused:
             log.error(
-                'Streaming replicas failed to sync after %d seconds.',
-                streaming_sync_timeout)
+                "Failed to resume replication. Run pg_xlog_replay_pause() "
+                "on all slaves to manually resume.")
+        else:
+            if controller.sync():
+                log.info('Slaves in sync. Updates replicated.')
+            else:
+                log.error(
+                    'Slaves failed to sync. Updates may not be replicated.')
 
-        log.info("Restarting pgbouncer")
-        pgbouncer_rc = run_pgbouncer(log, 'start')
-        if pgbouncer_rc != 0:
-            log.fatal("pgbouncer not restarted [%s]", pgbouncer_rc)
-            return pgbouncer_rc
-        pgbouncer_down = False
-        log.info("Outage complete. %s", datetime.now() - outage_start)
+        if slaves_disabled:
+            slaves_disabled = not controller.enable_slaves()
+            if slaves_disabled:
+                log.warn(
+                    "Failed to enable slave databases in pgbouncer. "
+                    "Now running in master-only mode.")
 
         # We will start seeing connections as soon as pgbouncer is
         # reenabled, so ignore them here.
-        if not NoConnectionCheckPreflight(log, options.standbys).check_all():
+        if not NoConnectionCheckPreflight(log, controller).check_all():
             return 101
 
         log.info("All good. All done.")
         return 0
 
     finally:
-        if pgbouncer_down:
-            # Even if upgrade.py or security.py failed, we should be in
-            # a good enough state to continue operation so restart
-            # pgbouncer and allow connections.
-            #  - upgrade.py may have failed to update the master, and
-            #    changes should have rolled back.
-            #  - upgrade.py may have failed to update a slave, breaking
-            #    replication. The master is still operational, but
-            #    slaves may be lagging and have the old schema.
-            #  - security.py may have died, rolling back its changes on
-            #    one or more nodes.
-            # In all cases except the first, we have recovery to do but
-            # systems are probably ok, or at least providing some
-            # services.
-            pgbouncer_rc = run_pgbouncer(log, 'start')
-            if pgbouncer_rc == 0:
-                log.info("Despite failures, pgbouncer restarted.")
-                log.info("Outage complete. %s", datetime.now() - outage_start)
+        if not security_run:
+            log.warning("Rolling back all schema and security changes.")
+            master_con.rollback()
+
+        # Recovery if necessary.
+        if master_disabled:
+            if controller.enable_master():
+                log.warning(
+                    "Master reenabled despite earlier failures. "
+                    "Outage over %s, but we have problems",
+                    str(datetime.now() - outage_start))
             else:
-                log.fatal("pgbouncer is down and refuses to restart")
-        if not upgrade_run:
-            log.warning("upgrade.py still needs to be run")
-        if not security_run:
-            log.warning("security.py still needs to be run")
+                log.warning(
+                    "Master is still disabled in pgbouncer. Outage ongoing.")
+
+        if replication_paused:
+            controller.resume_replication()
+
+        if slaves_disabled:
+            controller.enable_slaves()
 
 
 if __name__ == '__main__':

=== modified file 'database/schema/preflight.py'
--- database/schema/preflight.py	2012-08-30 07:55:24 +0000
+++ database/schema/preflight.py	2012-09-20 12:58:35 +0000
@@ -18,8 +18,9 @@
 import os.path
 import time
 
+import psycopg2
+
 from lp.services.database.sqlbase import (
-    connect,
     ISOLATION_LEVEL_AUTOCOMMIT,
     sqlvalues,
     )
@@ -28,18 +29,19 @@
     logger,
     logger_options,
     )
+from dbcontroller import DBController, streaming_sync
 from replication.helpers import Node
 import upgrade
 
 # Ignore connections by these users.
-SYSTEM_USERS = frozenset(['postgres', 'slony', 'nagios', 'lagmon'])
+SYSTEM_USERS = set(['postgres', 'slony', 'nagios', 'lagmon'])
 
 # Fail checks if these users are connected. If a process should not be
 # interrupted by a rollout, the database user it connects as should be
 # added here. The preflight check will fail if any of these users are
 # connected, so these systems will need to be shut down manually before
 # a database update.
-FRAGILE_USERS = frozenset([
+FRAGILE_USERS = set([
     'buildd_manager',
     # process_accepted is fragile, but also fast so we likely shouldn't
     # need to ever manually shut it down.
@@ -52,7 +54,7 @@
 # If these users have long running transactions, just kill 'em. Entries
 # added here must come with a bug number, a if part of Launchpad holds
 # open a long running transaction it is a bug we need to fix.
-BAD_USERS = frozenset([
+BAD_USERS = set([
     'karma',  # Bug #863109
     'rosettaadmin',  # Bug #863122
     'update-pkg-cache',  # Bug #912144
@@ -68,18 +70,21 @@
 
 
 class DatabasePreflight:
-    def __init__(self, log, standbys):
-        master_con = connect(isolation=ISOLATION_LEVEL_AUTOCOMMIT)
+    def __init__(self, log, controller, replication_paused=False):
+        master_con = psycopg2.connect(str(controller.master))
+        master_con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
 
         self.log = log
+        self.replication_paused = replication_paused
+
         node = Node(None, None, None, True)
         node.con = master_con
         self.nodes = set([node])
         self.lpmain_nodes = self.nodes
         self.lpmain_master_node = node
 
-        # Add streaming replication standbys, which unfortunately cannot be
-        # detected reliably and has to be passed in via the command line.
+        # Add streaming replication standbys.
+        standbys = controller.slaves.values()
         self._num_standbys = len(standbys)
         for standby in standbys:
             standby_node = Node(None, None, standby, False)
@@ -101,7 +106,7 @@
                 % (required_standbys, self._num_standbys))
             return False
         else:
-            self.log.info(
+            self.log.debug(
                 "%d streaming standby servers streaming", required_standbys)
             return True
 
@@ -181,7 +186,7 @@
                     usename, datname, num_connections)
                 success = False
         if success:
-            self.log.info(
+            self.log.debug(
                 "No fragile systems connected to the cluster (%s)"
                 % ', '.join(FRAGILE_USERS))
         return success
@@ -221,7 +226,7 @@
                         datname, usename, age)
                     success = False
         if success:
-            self.log.info("No long running transactions detected.")
+            self.log.debug("No long running transactions detected.")
         return success
 
     def check_replication_lag(self):
@@ -300,9 +305,9 @@
         success = True
         if not self.check_standby_count():
             success = False
-        if not self.check_replication_lag():
+        if not self.replication_paused and not self.check_replication_lag():
             success = False
-        if not self.check_can_sync():
+        if not self.replication_paused and not self.check_can_sync():
             success = False
         # Do checks on open transactions last to minimize race
         # conditions.
@@ -352,14 +357,16 @@
                     all_clear = False
                     if loop_count == num_tries - 1:
                         self.log.fatal(
-                            "Unable to kill %s [%s] on %s",
+                            "Unable to kill %s [%s] on %s.",
                             usename, procpid, datname)
                     elif usename in BAD_USERS:
                         self.log.info(
-                            "Killed %s [%s] on %s", usename, procpid, datname)
+                            "Killed %s [%s] on %s.",
+                            usename, procpid, datname)
                     else:
                         self.log.warning(
-                            "Killed %s [%s] on %s", usename, procpid, datname)
+                            "Killed %s [%s] on %s.",
+                            usename, procpid, datname)
             if all_clear:
                 break
 
@@ -369,35 +376,8 @@
         return all_clear
 
 
-def streaming_sync(con, timeout=None):
-    """Wait for streaming replicas to synchronize with master as of now.
-
-    :param timeout: seconds to wait, None for no timeout.
-
-    :returns: True if sync happened or no streaming replicas
-              False if the timeout was passed.
-    """
-    cur = con.cursor()
-
-    # Force a WAL switch, returning the current position.
-    cur.execute('SELECT pg_switch_xlog()')
-    wal_point = cur.fetchone()[0]
-    start_time = time.time()
-    while timeout is None or time.time() < start_time + timeout:
-        cur.execute("""
-            SELECT FALSE FROM pg_stat_replication
-            WHERE replay_location < %s LIMIT 1
-            """, (wal_point,))
-        if cur.fetchone() is None:
-            # All slaves, possibly 0, are in sync.
-            return True
-        time.sleep(0.2)
-    return False
-
-
 def main():
     parser = OptionParser()
-    db_options(parser)
     logger_options(parser)
     parser.add_option(
         "--skip-connection-check", dest='skip_connection_check',
@@ -408,9 +388,17 @@
         default=False, action="store_true",
         help="Kill non-system connections instead of reporting an error.")
     parser.add_option(
-        '--standby', dest='standbys', default=[], action="append",
+        '--pgbouncer', dest='pgbouncer',
+        default='host=localhost port=6432 user=pgbouncer',
         metavar='CONN_STR',
-        help="libpq connection string to a hot standby database")
+        help="libpq connection string to administer pgbouncer")
+    parser.add_option(
+        '--dbname', dest='dbname', default='launchpad_prod', metavar='DBNAME',
+        help='Database name we are updating.')
+    parser.add_option(
+        '--dbuser', dest='dbuser', default='postgres', metavar='USERNAME',
+        help='Connect as USERNAME to databases')
+
     (options, args) = parser.parse_args()
     if args:
         parser.error("Too many arguments")
@@ -421,12 +409,15 @@
 
     log = logger(options)
 
+    controller = DBController(
+        log, options.pgbouncer, options.dbname, options.dbuser)
+
     if options.kill_connections:
-        preflight_check = KillConnectionsPreflight(log, options.standbys)
+        preflight_check = KillConnectionsPreflight(log, controller)
     elif options.skip_connection_check:
-        preflight_check = NoConnectionCheckPreflight(log, options.standbys)
+        preflight_check = NoConnectionCheckPreflight(log, controller)
     else:
-        preflight_check = DatabasePreflight(log, options.standbys)
+        preflight_check = DatabasePreflight(log, controller)
 
     if preflight_check.check_all():
         log.info('Preflight check succeeded. Good to go.')

=== modified file 'database/schema/security.py'
--- database/schema/security.py	2012-08-15 13:44:17 +0000
+++ database/schema/security.py	2012-09-20 12:58:35 +0000
@@ -164,6 +164,7 @@
             WHERE c.relkind IN ('r','v','S','')
                 AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
                 AND pg_catalog.pg_table_is_visible(c.oid)
+                AND c.relpersistence <> 't'
             ORDER BY 1,2
             ''')
         for schema, name, type_, owner, acl in cur.fetchall():
@@ -232,16 +233,17 @@
     }
 
 
-def main(options):
+def main(options, master_con=None):
     # Load the config file
     config = SafeConfigParser(CONFIG_DEFAULTS)
     configfile_name = os.path.join(os.path.dirname(__file__), 'security.cfg')
     config.read([configfile_name])
 
-    con = connect()
+    if master_con is None:
+        master_con = connect()
 
     log.info("Resetting permissions.")
-    reset_permissions(con, config, options)
+    reset_permissions(master_con, config, options)
     return 0
 
 

=== modified file 'database/schema/upgrade.py'
--- database/schema/upgrade.py	2012-08-07 10:38:36 +0000
+++ database/schema/upgrade.py	2012-09-20 12:58:35 +0000
@@ -35,8 +35,10 @@
 SCHEMA_DIR = os.path.dirname(__file__)
 
 
-def main():
-    con = connect()
+def main(con=None):
+    if con is None:
+        con = connect()
+
     patches = get_patchlist(con)
 
     log.info("Applying patches.")


Follow ups