← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~stub/launchpad/db-deploy into lp:launchpad/db-devel

 

Stuart Bishop has proposed merging lp:~stub/launchpad/db-deploy into lp:launchpad/db-devel with lp:~stub/launchpad/pending-db-changes as a prerequisite.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~stub/launchpad/db-deploy/+merge/59065

There are a number of checks we need to make before proceding with production database updates. preflight.py automates these checks.

full-update.py runs the database update steps in sequence, including the new preflight.py check. This is to make production rollouts smoother and minimize downtime windows.

unfortunately, we can't enable preflight.py on staging yet. One of the checks it makes is to confirm there are no non-system connections open to the active databases. Unfortunately, the staging update scripts neglect to disable cronjobs when doing a code-only update so preflight.py (and full-update.py) will normally fail. This fail is correct - these rogue connections occasionally cause the staging update to fail and are exactly the sort of thing we are trying to catch during production rollouts.
-- 
https://code.launchpad.net/~stub/launchpad/db-deploy/+merge/59065
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~stub/launchpad/db-deploy into lp:launchpad/db-devel.
=== added file 'database/schema/full-update.py'
--- database/schema/full-update.py	1970-01-01 00:00:00 +0000
+++ database/schema/full-update.py	2011-04-26 14:26:48 +0000
@@ -0,0 +1,56 @@
+#!/usr/bin/python2.6 -S
+# Copyright 2011 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Full update process."""
+
+import _pythonpath
+
+import os.path
+from optparse import OptionParser
+import subprocess
+import sys
+
+from canonical.launchpad.scripts import (
+    db_options,
+    logger_options,
+    )
+
+
+def run_script(script, *extra_args):
+    script_path = os.path.join(os.path.dirname(__file__), script)
+    return subprocess.call([script_path] + sys.argv[1:] + list(extra_args))
+
+
+def main():
+    parser = OptionParser()
+
+    # Add all the command command line arguments.
+    db_options(parser)
+    logger_options(parser)
+    (options, args) = parser.parse_args()
+    if args:
+        parser.error("Too many arguments")
+
+    preflight_rc = run_script('preflight.py')
+    if preflight_rc != 0:
+        return preflight_rc
+
+    upgrade_rc = run_script('upgrade.py')
+    if upgrade_rc != 0:
+        return upgrade_rc
+
+    fti_rc = run_script('fti.py')
+    if fti_rc != 0:
+        return fti_rc
+
+    security_rc = run_script('security.py', '--cluster')
+    if security_rc != 0:
+        return security_rc
+
+    preflight_rc = run_script('preflight.py')
+    return preflight_rc
+
+
+if __name__ == '__main__':
+    sys.exit(main())

=== added file 'database/schema/preflight.py'
--- database/schema/preflight.py	1970-01-01 00:00:00 +0000
+++ database/schema/preflight.py	2011-04-26 14:26:48 +0000
@@ -0,0 +1,223 @@
+#!/usr/bin/python2.6 -S
+# Copyright 2011 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Confirm the database systems are ready to be patched as best we can."""
+
+import _pythonpath
+
+from datetime import timedelta
+from optparse import OptionParser
+import sys
+
+import psycopg2
+
+from canonical.database.sqlbase import (
+    connect,
+    ISOLATION_LEVEL_AUTOCOMMIT,
+    )
+from canonical.launchpad.scripts import (
+    db_options,
+    logger,
+    logger_options,
+    )
+from canonical import lp
+import replication.helpers
+
+
+# Ignore connections by these users.
+SYSTEM_USERS = frozenset(['postgres', 'slony', 'nagios'])
+
+# How lagged the cluster can be before failing the preflight check.
+# In seconds.
+MAX_LAG = timedelta(seconds=45)
+
+
+class DatabasePreflight:
+    def __init__(self, log, master_con):
+        self.log = log
+        self.is_replicated = replication.helpers.slony_installed(master_con)
+        if self.is_replicated:
+            self.nodes = replication.helpers.get_all_cluster_nodes(master_con)
+            for node in self.nodes:
+                node.con = psycopg2.connect(node.connection_string)
+                node.con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
+        else:
+            node = replication.helpers.Node(None, None, None, True)
+            node.con = master_con
+            self.nodes = [node]
+
+    def check_is_superuser(self):
+        """Return True if all the node connections are as superusers."""
+        success = True
+        for node in self.nodes:
+            cur = node.con.cursor()
+            cur.execute("""
+                SELECT current_database(), pg_user.usesuper
+                FROM pg_user
+                WHERE usename = current_user
+                """)
+            dbname, is_super = cur.fetchone()
+            if is_super:
+                self.log.debug("Connected to %s as a superuser.", dbname)
+            else:
+                self.log.fatal("Not connected to %s as a superuser.", dbname)
+                success = False
+        return success
+
+    def check_open_connections(self):
+        """Return False if any nodes have connections from non-system users.
+
+        System users are defined by SYSTEM_USERS.
+        """
+        success = True
+        for node in self.nodes:
+            cur = node.con.cursor()
+            cur.execute("""
+                SELECT datname, usename, COUNT(*) AS num_connections
+                FROM pg_stat_activity
+                WHERE
+                    datname=current_database()
+                    AND procpid <> pg_backend_pid()
+                GROUP BY datname, usename
+                """)
+            for datname, usename, num_connections in cur.fetchall():
+                if usename in SYSTEM_USERS:
+                    self.log.debug(
+                        "%s has %d connections by %s",
+                        datname, num_connections, usename)
+                else:
+                    self.log.fatal(
+                        "%s has %d connections by %s",
+                        datname, num_connections, usename)
+                    success = False
+        if success:
+            self.log.info("Only system users connected to the cluster")
+        return success
+
+    def check_long_running_transactions(self, max_secs=10):
+        """Return False if any nodes have long running transactions open.
+
+        max_secs defines what is long running. For database rollouts,
+        this will be short. Even if the transaction is benign like a
+        autovacuum task, we should wait until things have settled down.
+        """
+        success = True
+        for node in self.nodes:
+            cur = node.con.cursor()
+            cur.execute("""
+                SELECT
+                    datname, usename,
+                    age(current_timestamp, xact_start) AS age, current_query
+                FROM pg_stat_activity
+                WHERE
+                    age(current_timestamp, xact_start) > interval '%d secs'
+                    AND datname=current_database()
+                """ % max_secs)
+            for datname, usename, age, current_query in cur.fetchall():
+                self.log.fatal(
+                    "%s has transaction by %s open %s",
+                    datname, usename, age)
+                success = False
+        if success:
+            self.log.info("No long running transactions detected.")
+        return success
+
+    def check_replication_lag(self):
+        """Return False if the replication cluster is badly lagged."""
+        if not self.is_replicated:
+            self.log.debug("Not replicated - no replication lag.")
+            return True
+
+        # Check replication lag on every node just in case there are
+        # disagreements.
+        max_lag = timedelta(seconds=-1)
+        max_lag_node = None
+        for node in self.nodes:
+            cur = node.con.cursor()
+            cur.execute("""
+                SELECT current_database(),
+                max(st_lag_time) AS lag FROM _sl.sl_status
+            """)
+            dbname, lag = cur.fetchone()
+            if lag > max_lag:
+                max_lag = lag
+                max_lag_node = node
+            self.log.debug(
+                "%s reports database lag of %s.", dbname, lag)
+        if max_lag <= MAX_LAG:
+            self.log.info("Database cluster lag is ok (%s)", max_lag)
+            return True
+        else:
+            self.log.fatal("Database cluster lag is high (%s)", max_lag)
+            return False
+
+    def check_can_sync(self):
+        """Return True if a sync event is acknowledged by all nodes.
+
+        We only wait 30 seconds for the sync, because we require the
+        cluster to be quiescent.
+        """
+        if self.is_replicated:
+            success = replication.helpers.sync(30)
+            if success:
+                self.log.info(
+                    "Replication events are being propagated.")
+            else:
+                self.log.fatal(
+                    "Replication events are not being propagated.")
+                self.log.fatal(
+                    "One or more replication daemons may be down.")
+                self.log.fatal(
+                    "Bounce the replication daemons and check the logs.")
+            return success
+        else:
+            return True
+
+    def check_all(self):
+        """Run all checks.
+
+        If any failed, return False. Otherwise return True.
+        """
+        if not self.check_is_superuser():
+            # No point continuing - results will be bogus without access
+            # to pg_stat_activity
+            return False
+
+        success = True
+        if not self.check_open_connections():
+            success = False
+        if not self.check_long_running_transactions():
+            success = False
+        if not self.check_replication_lag():
+            success = False
+        if not self.check_can_sync():
+            success = False
+        return success
+
+
+def main():
+    parser = OptionParser()
+    db_options(parser)
+    logger_options(parser)
+    (options, args) = parser.parse_args()
+    if args:
+        parser.error("Too many arguments")
+
+    log = logger(options)
+
+    master_con = connect(lp.dbuser)
+    master_con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
+
+    preflight_check = DatabasePreflight(log, master_con)
+
+    if preflight_check.check_all():
+        log.info('Preflight check succeeded. Good to go.')
+        return 0
+    else:
+        log.error('Preflight check failed.')
+        return 1
+
+
+if __name__ == '__main__':
+    sys.exit(main())

=== modified file 'database/schema/security.py'
--- database/schema/security.py	2011-03-21 03:50:40 +0000
+++ database/schema/security.py	2011-04-26 14:26:48 +0000
@@ -172,12 +172,10 @@
                     node.nickname, node.connection_string))
                 reset_permissions(
                     psycopg2.connect(node.connection_string), config, options)
-        else:
-            log.error("--cluster requested, but not a Slony-I cluster.")
-            return 1
-    else:
-        log.info("Resetting permissions on single database")
-        reset_permissions(con, config, options)
+            return
+        log.warning("--cluster requested, but not a Slony-I cluster.")
+    log.info("Resetting permissions on single database")
+    reset_permissions(con, config, options)
 
 
 def list_identifiers(identifiers):