← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~stub/launchpad/staging into lp:launchpad

 

Stuart Bishop has proposed merging lp:~stub/launchpad/staging into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #798120 in Launchpad itself: "security.py --no-revoke probably shouldn't change object ownership"
  https://bugs.launchpad.net/launchpad/+bug/798120
  Bug #809123 in Launchpad itself: "we cannot deploy DB schema changes live"
  https://bugs.launchpad.net/launchpad/+bug/809123

For more details, see:
https://code.launchpad.net/~stub/launchpad/staging/+merge/69093

= Summary =

Updates and improvements to the fastdowntime database update process.

== Proposed fix ==

 * There are some users we don't want to interupt, as this may require cleanup. Block the update until these systems have been shut down.

 * Bring pgbouncer back online even if updates failed.

 * Our scripts have a high overhead due to loading the component architecture. Run these in-process instead of as subprocesses.

 * Comments

 * Fix Bug #798120 as a drive by.


== Pre-implementation notes ==

Bug #809123 for background.

== Implementation details ==

== Tests ==

== Demo and Q/A ==


= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  database/schema/security.py
  database/schema/upgrade.py
  database/replication/helpers.py
  database/schema/preflight.py
  database/schema/full-update.py

./database/schema/security.py
       9: '_pythonpath' imported but unused
./database/schema/upgrade.py
      13: '_pythonpath' imported but unused
./database/schema/preflight.py
       7: '_pythonpath' imported but unused
./database/schema/full-update.py
       7: '_pythonpath' imported but unused
-- 
https://code.launchpad.net/~stub/launchpad/staging/+merge/69093
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~stub/launchpad/staging into lp:launchpad.
=== modified file 'database/replication/helpers.py'
--- database/replication/helpers.py	2011-05-12 07:56:18 +0000
+++ database/replication/helpers.py	2011-07-25 14:17:26 +0000
@@ -7,7 +7,6 @@
 __all__ = []
 
 import subprocess
-import sys
 from tempfile import NamedTemporaryFile
 from textwrap import dedent
 
@@ -168,7 +167,7 @@
                  block indefinitely.
 
     :param exit_on_fail: If True, on failure of the slonik script
-                         sys.exit is invoked using the slonik return code.
+                         SystemExit is raised using the slonik return code.
 
     :param auto_preamble: If True, the generated preamble will be
                           automatically included.
@@ -205,7 +204,7 @@
     if returncode != 0:
         log.error("slonik script failed")
         if exit_on_fail:
-            sys.exit(1)
+            raise SystemExit(1)
 
     return returncode == 0
 
@@ -526,4 +525,3 @@
 
     lpmain_tables, lpmain_sequences = calculate_replication_set(
         cur, LPMAIN_SEED)
-

=== modified file 'database/schema/full-update.py'
--- database/schema/full-update.py	2011-07-23 05:30:23 +0000
+++ database/schema/full-update.py	2011-07-25 14:17:26 +0000
@@ -17,6 +17,13 @@
     logger_options,
     )
 
+from preflight import (
+    KillConnectionsPreflight,
+    NoConnectionCheckPreflight,
+    )
+import security  # security.py script
+import upgrade  # upgrade.py script
+
 
 PGBOUNCER_INITD = ['sudo', '/etc/init.d/pgbouncer']
 
@@ -26,6 +33,65 @@
     return subprocess.call([script_path] + sys.argv[1:] + list(extra_args))
 
 
+def run_pgbouncer(log, cmd):
+    """Invoke the pgbouncer initscript.
+
+    :param cmd: One of 'start', 'stop' or 'status'.
+    """
+    assert cmd in ('start', 'stop', 'status'), (
+        'sudo access needs to be granted to new commands on staging and prod'
+        )
+    pgbouncer_rc = subprocess.call(PGBOUNCER_INITD + [cmd])
+    sys.stdout.flush()
+    if pgbouncer_rc != 0:
+        log.error("pgbouncer '%s' failed [%s]", cmd, pgbouncer_rc)
+    return pgbouncer_rc
+
+
+def run_upgrade(options, log):
+    """Invoke upgrade.py in-process.
+
+    It would be easier to just invoke the script, but this way we save
+    several seconds of overhead as the component architecture loads up.
+    """
+    # Fake expected command line arguments and global log
+    options.commit = True
+    options.partial = False
+    upgrade.options = options
+    upgrade.log = log
+    # Invoke the database schema upgrade process.
+    try:
+        return upgrade.main()
+    except Exception:
+        log.exception('Unhandled exception')
+        return 1
+    except SystemExit, x:
+        log.fatal("upgrade.py failed [%s]", x)
+
+
+def run_security(options, log):
+    """Invoke security.py in-process.
+
+    It would be easier to just invoke the script, but this way we save
+    several seconds of overhead as the component architecture loads up.
+    """
+    # Fake expected command line arguments and global log
+    options.dryrun = False
+    options.revoke = True
+    options.owner = 'postgres'
+    options.cluster = True
+    security.options = options
+    security.log = log
+    # Invoke the database security reset process.
+    try:
+        return security.main(options)
+    except Exception:
+        log.exception('Unhandled exception')
+        return 1
+    except SystemExit, x:
+        log.fatal("security.py failed [%s]", x)
+
+
 def main():
     parser = OptionParser()
 
@@ -45,16 +111,12 @@
 
     # We initially ignore open connections, as they will shortly be
     # killed.
-    preflight_rc = run_script('preflight.py', '--skip-connection-check')
-    if preflight_rc != 0:
-        return preflight_rc
+    if not NoConnectionCheckPreflight(log).check_all():
+        return 99
 
     # Confirm we can invoke PGBOUNCER_INITD
-    pgbouncer_status_cmd = PGBOUNCER_INITD + ['status']
-    pgbouncer_rc = subprocess.call(pgbouncer_status_cmd)
-    sys.stdout.flush()
+    pgbouncer_rc = run_pgbouncer(log, 'status')
     if pgbouncer_rc != 0:
-        log.fatal("Unable to invoke %s", ' '.join(pgbouncer_status_cmd))
         return pgbouncer_rc
 
     #
@@ -65,42 +127,42 @@
     # status flags
     pgbouncer_down = False
     upgrade_run = False
-    fti_run = False
+    # Bug #815717
+    # fti_run = False
     security_run = False
 
     try:
         # Shutdown pgbouncer
-        pgbouncer_rc = subprocess.call(PGBOUNCER_INITD + ['stop'])
-        sys.stdout.flush()
+        pgbouncer_rc = run_pgbouncer(log, 'stop')
         if pgbouncer_rc != 0:
             log.fatal("pgbouncer not shut down [%s]", pgbouncer_rc)
             return pgbouncer_rc
         pgbouncer_down = True
 
-        preflight_rc = run_script('preflight.py', '--kill-connections')
-        if preflight_rc != 0:
-            return preflight_rc
+        if not KillConnectionsPreflight(log).check_all():
+            return 100
 
-        upgrade_rc = run_script('upgrade.py')
+        upgrade_rc = run_upgrade(options, log)
         if upgrade_rc != 0:
-            log.warning("upgrade.py run may have been partial")
             return upgrade_rc
         upgrade_run = True
 
-        fti_rc = run_script('fti.py')
-        if fti_rc != 0:
-            return fti_rc
-        fti_run = True
+        # fti.py is no longer being run on production. Updates
+        # to full text indexes need to be handled manually in db
+        # patches. Bug #815717.
+        # fti_rc = run_script('fti.py')
+        # if fti_rc != 0:
+        #     return fti_rc
+        # fti_run = True
 
-        security_rc = run_script('security.py', '--cluster')
+        security_rc = run_security(options, log)
         if security_rc != 0:
             return security_rc
         security_run = True
 
         log.info("All database upgrade steps completed")
 
-        pgbouncer_rc = subprocess.call(PGBOUNCER_INITD + ['start'])
-        sys.stdout.flush()
+        pgbouncer_rc = run_pgbouncer(log, 'start')
         if pgbouncer_rc != 0:
             log.fatal("pgbouncer not restarted [%s]", pgbouncer_rc)
             return pgbouncer_rc
@@ -108,20 +170,37 @@
 
         # We will start seeing connections as soon as pgbouncer is
         # reenabled, so ignore them here.
-        preflight_rc = run_script('preflight.py', '--skip-connection-check')
-        if preflight_rc != 0:
-            return preflight_rc
+        if not NoConnectionCheckPreflight(log).check_all():
+            return 101
 
         log.info("All good. All done.")
         return 0
 
     finally:
         if pgbouncer_down:
-            log.warning("pgbouncer is down and will need to be restarted")
+            # Even if upgrade.py or security.py failed, we should be in
+            # a good enough state to continue operation so restart
+            # pgbouncer and allow connections.
+            #  - upgrade.py may have failed to update the master, and
+            #    changes should have rolled back.
+            #  - upgrade.py may have failed to update a slave, breaking
+            #    replication. The master is still operational, but
+            #    slaves may be lagging and have the old schema.
+            #  - security.py may have died, rolling back its changes on
+            #    one or more nodes.
+            # In all cases except the first, we have recovery to do but
+            # systems are probably ok, or at least providing some
+            # services.
+            pgbouncer_rc = run_pgbouncer(log, 'start')
+            if pgbouncer_rc == 0:
+                log.info("Despite failures, pgbouncer restarted.")
+            else:
+                log.fatal("pgbouncer is down and refuses to restart")
         if not upgrade_run:
             log.warning("upgrade.py still needs to be run")
-        if not fti_run:
-            log.warning("fti.py still needs to be run")
+        # Bug #815717
+        # if not fti_run:
+        #     log.warning("fti.py still needs to be run")
         if not security_run:
             log.warning("security.py still needs to be run")
 

=== modified file 'database/schema/preflight.py'
--- database/schema/preflight.py	2011-07-20 10:10:45 +0000
+++ database/schema/preflight.py	2011-07-25 14:17:26 +0000
@@ -6,6 +6,13 @@
 
 import _pythonpath
 
+__all__ = [
+    'DatabasePreflight',
+    'KillConnectionsPreflight',
+    'NoConnectionCheckPreflight',
+    ]
+
+
 from datetime import timedelta
 from optparse import OptionParser
 import sys
@@ -29,12 +36,22 @@
 # Ignore connections by these users.
 SYSTEM_USERS = frozenset(['postgres', 'slony', 'nagios', 'lagmon'])
 
+# Fail checks if these users are connected. If a process should not be
+# interrupted by a rollout, the database user it connects as should be
+# added here. The preflight check will fail if any of these users are
+# connected, so these systems will need to be shut down manually before
+# a database update.
+FRAGILE_USERS = frozenset(['archivepublisher'])
+
 # How lagged the cluster can be before failing the preflight check.
-MAX_LAG = timedelta(seconds=45)
+MAX_LAG = timedelta(seconds=60)
 
 
 class DatabasePreflight:
-    def __init__(self, log, master_con):
+    def __init__(self, log):
+        master_con = connect(lp.dbuser)
+        master_con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
+
         self.log = log
         self.is_replicated = replication.helpers.slony_installed(master_con)
         if self.is_replicated:
@@ -118,6 +135,35 @@
             self.log.info("Only system users connected to the cluster")
         return success
 
+    def check_fragile_connections(self):
+        """Fail if any FRAGILE_USERS are connected to the cluster.
+
+        If we interrupt these processes, we may have a mess to clean
+        up. If they are connected, the preflight check should fail.
+        """
+        success = True
+        for node in self.lpmain_nodes:
+            cur = node.con.cursor()
+            cur.execute("""
+                SELECT datname, usename, COUNT(*) AS num_connections
+                FROM pg_stat_activity
+                WHERE
+                    datname=current_database()
+                    AND procpid <> pg_backend_pid()
+                    AND usename IN %s
+                GROUP BY datname, usename
+                """ % sqlvalues(FRAGILE_USERS))
+            for datname, usename, num_connections in cur.fetchall():
+                self.log.fatal(
+                    "Fragile system %s running. %s has %d connections.",
+                    usename, datname, num_connections)
+                success = False
+        if success:
+            self.log.info(
+                "No fragile systems connected to the cluster (%s)"
+                % ', '.join(FRAGILE_USERS))
+        return success
+
     def check_long_running_transactions(self, max_secs=10):
         """Return False if any nodes have long running transactions open.
 
@@ -155,7 +201,6 @@
         # Check replication lag on every node just in case there are
         # disagreements.
         max_lag = timedelta(seconds=-1)
-        max_lag_node = None
         for node in self.nodes:
             cur = node.con.cursor()
             cur.execute("""
@@ -165,7 +210,6 @@
             dbname, lag = cur.fetchone()
             if lag > max_lag:
                 max_lag = lag
-                max_lag_node = node
             self.log.debug(
                 "%s reports database lag of %s.", dbname, lag)
         if max_lag <= MAX_LAG:
@@ -208,13 +252,17 @@
             return False
 
         success = True
+        if not self.check_replication_lag():
+            success = False
+        if not self.check_can_sync():
+            success = False
+        # Do checks on open transactions last to minimize race
+        # conditions.
         if not self.check_open_connections():
             success = False
         if not self.check_long_running_transactions():
             success = False
-        if not self.check_replication_lag():
-            success = False
-        if not self.check_can_sync():
+        if not self.check_fragile_connections():
             success = False
         return success
 
@@ -237,7 +285,8 @@
         for node in self.lpmain_nodes:
             cur = node.con.cursor()
             cur.execute("""
-                SELECT procpid, datname, usename, pg_terminate_backend(procpid)
+                SELECT
+                    procpid, datname, usename, pg_terminate_backend(procpid)
                 FROM pg_stat_activity
                 WHERE
                     datname=current_database()
@@ -272,15 +321,12 @@
 
     log = logger(options)
 
-    master_con = connect(lp.dbuser)
-    master_con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
-
     if options.kill_connections:
-        preflight_check = KillConnectionsPreflight(log, master_con)
+        preflight_check = KillConnectionsPreflight(log)
     elif options.skip_connection_check:
-        preflight_check = NoConnectionCheckPreflight(log, master_con)
+        preflight_check = NoConnectionCheckPreflight(log)
     else:
-        preflight_check = DatabasePreflight(log, master_con)
+        preflight_check = DatabasePreflight(log)
 
     if preflight_check.check_all():
         log.info('Preflight check succeeded. Good to go.')

=== modified file 'database/schema/security.py'
--- database/schema/security.py	2011-05-13 09:47:57 +0000
+++ database/schema/security.py	2011-07-25 14:17:26 +0000
@@ -63,8 +63,8 @@
 
 
 class DbSchema(dict):
-    groups = None # List of groups defined in the db
-    users = None # List of users defined in the db
+    groups = None  # List of groups defined in the db
+    users = None  # List of users defined in the db
 
     def __init__(self, con):
         super(DbSchema, self).__init__()
@@ -159,7 +159,6 @@
     config.read([configfile_name])
 
     con = connect(options.dbuser)
-    cur = CursorWrapper(con.cursor())
 
     if options.cluster:
         nodes = replication.helpers.get_nodes(con, 1)
@@ -172,10 +171,11 @@
                     node.nickname, node.connection_string))
                 reset_permissions(
                     psycopg2.connect(node.connection_string), config, options)
-            return
+            return 0
         log.warning("--cluster requested, but not a Slony-I cluster.")
     log.info("Resetting permissions on single database")
     reset_permissions(con, config, options)
+    return 0
 
 
 def list_identifiers(identifiers):
@@ -387,17 +387,19 @@
         else:
             log.debug("%s not in any roles", user)
 
-    # Change ownership of all objects to OWNER
-    for obj in schema.values():
-        if obj.type in ("function", "sequence"):
-            pass # Can't change ownership of functions or sequences
-        else:
-            if obj.owner != options.owner:
-                log.info("Resetting ownership of %s", obj.fullname)
-                cur.execute("ALTER TABLE %s OWNER TO %s" % (
-                    obj.fullname, quote_identifier(options.owner)))
-
     if options.revoke:
+        # Change ownership of all objects to OWNER.
+        # We skip this in --no-revoke mode as ownership changes may
+        # block on a live system.
+        for obj in schema.values():
+            if obj.type in ("function", "sequence"):
+                pass  # Can't change ownership of functions or sequences
+            else:
+                if obj.owner != options.owner:
+                    log.info("Resetting ownership of %s", obj.fullname)
+                    cur.execute("ALTER TABLE %s OWNER TO %s" % (
+                        obj.fullname, quote_identifier(options.owner)))
+
         # Revoke all privs from known groups. Don't revoke anything for
         # users or groups not defined in our security.cfg.
         table_revocations = PermissionGatherer("TABLE")
@@ -429,6 +431,7 @@
         function_revocations.revoke(cur)
         sequence_revocations.revoke(cur)
     else:
+        log.info("Not resetting ownership of database objects")
         log.info("Not revoking permissions on database objects")
 
     # Set of all tables we have granted permissions on. After we have assigned

=== modified file 'database/schema/upgrade.py'
--- database/schema/upgrade.py	2010-12-15 13:46:36 +0000
+++ database/schema/upgrade.py	2011-07-25 14:17:26 +0000
@@ -10,14 +10,13 @@
 __metaclass__ = type
 
 # pylint: disable-msg=W0403
-import _pythonpath # Sort PYTHONPATH
+import _pythonpath  # Sort PYTHONPATH
 
 from cStringIO import StringIO
 import glob
 import os.path
 from optparse import OptionParser
 import re
-import sys
 from tempfile import NamedTemporaryFile
 from textwrap import dedent
 
@@ -101,7 +100,7 @@
 
 def to_seconds(td):
     """Convert a timedelta to seconds."""
-    return td.days * (24*60*60) + td.seconds + td.microseconds/1000000.0
+    return td.days * (24 * 60 * 60) + td.seconds + td.microseconds / 1000000.0
 
 
 def report_patch_times(con, todays_patches):
@@ -249,6 +248,7 @@
     # Execute the script with slonik.
     if not replication.helpers.execute_slonik(outf.getvalue()):
         log.fatal("Aborting.")
+        raise SystemExit(4)
 
     # Cleanup our temporary files - they applied successfully.
     for temporary_file in temporary_files:
@@ -348,8 +348,8 @@
             print >> outf, dedent("""\
                 echo 'Subscribing holding set to @node%d_node.';
                 subscribe set (
-                    id=@holding_set,
-                    provider=@master_node, receiver=@node%d_node, forward=yes);
+                    id=@holding_set, provider=@master_node,
+                    receiver=@node%d_node, forward=yes);
                 echo 'Waiting for sync';
                 sync (id=@master_node);
                 wait for event (
@@ -475,11 +475,11 @@
         m = re.search('patch-(\d+)-(\d+)-(\d).sql$', patch_file)
         if m is None:
             log.fatal('Invalid patch filename %s' % repr(patch_file))
-            sys.exit(1)
+            raise SystemExit(1)
 
         major, minor, patch = [int(i) for i in m.groups()]
         if (major, minor, patch) in dbpatches:
-            continue # This patch has already been applied
+            continue  # This patch has already been applied
         log.debug("Found patch %d.%d.%d -- %s" % (
             major, minor, patch, patch_file
             ))
@@ -504,7 +504,7 @@
     if (major, minor, patch) not in applied_patches(con):
         log.fatal("%s failed to update LaunchpadDatabaseRevision correctly"
                 % patch_file)
-        sys.exit(2)
+        raise SystemExit(2)
 
     # Commit changes if we allow partial updates.
     if options.commit and options.partial:
@@ -523,7 +523,7 @@
         # environment.
         log.fatal(
             "Last non-whitespace character of %s must be a semicolon", script)
-        sys.exit(3)
+        raise SystemExit(3)
     cur.execute(sql)
 
     if not no_commit and options.commit and options.partial: