← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~stub/launchpad/db-deploy into lp:launchpad

 

Stuart Bishop has proposed merging lp:~stub/launchpad/db-deploy into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #906222 in Launchpad itself: "More BAD_USERS creating false positives during fast downtime updates"
  https://bugs.launchpad.net/launchpad/+bug/906222
  Bug #911417 in Launchpad itself: "Staging db restore fails on make -C database/replication stagingsetup"
  https://bugs.launchpad.net/launchpad/+bug/911417

For more details, see:
https://code.launchpad.net/~stub/launchpad/db-deploy/+merge/115194

= Summary =

The preflight.py and full-update.py deployment scripts need to cope with PostgreSQL 9.1 streaming replication *in addition* to slony replication, so we can transition.

== Proposed fix ==

== Pre-implementation notes ==

== LOC Rationale ==

== Implementation details ==

We can't detect connection strings for streaming replicas, so they need to be provided. I've elected to do this via command line, at least for now.

== Tests ==

== Demo and Q/A ==


= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  database/schema/preflight.py
  database/schema/full-update.py

./database/schema/preflight.py
      13: '_pythonpath' imported but unused
./database/schema/full-update.py
       7: '_pythonpath' imported but unused
-- 
https://code.launchpad.net/~stub/launchpad/db-deploy/+merge/115194
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~stub/launchpad/db-deploy into lp:launchpad.
=== modified file 'database/schema/full-update.py'
--- database/schema/full-update.py	2012-07-03 08:04:35 +0000
+++ database/schema/full-update.py	2012-07-16 17:53:49 +0000
@@ -91,6 +91,13 @@
 def main():
     parser = OptionParser()
 
+    # Unfortunatly, we can't reliably detect streaming replicas so
+    # we pass them in on the command line.
+    parser.add_option(
+        '--standby', dest='standbys', default=[], action="append",
+        metavar='CONN_STR',
+        help="libpq connection string to a hot standby database")
+
     # Add all the command command line arguments.
     db_options(parser)
     logger_options(parser)
@@ -113,7 +120,7 @@
 
     # We initially ignore open connections, as they will shortly be
     # killed.
-    if not NoConnectionCheckPreflight(log).check_all():
+    if not NoConnectionCheckPreflight(log, options.standbys).check_all():
         return 99
 
     #
@@ -137,7 +144,7 @@
             return pgbouncer_rc
         pgbouncer_down = True
 
-        if not KillConnectionsPreflight(log).check_all():
+        if not KillConnectionsPreflight(log, options.standbys).check_all():
             return 100
 
         log.info("Preflight check succeeded. Starting upgrade.")
@@ -164,7 +171,7 @@
 
         # We will start seeing connections as soon as pgbouncer is
         # reenabled, so ignore them here.
-        if not NoConnectionCheckPreflight(log).check_all():
+        if not NoConnectionCheckPreflight(log, options.standbys).check_all():
             return 101
 
         log.info("All good. All done.")

=== modified file 'database/schema/preflight.py'
--- database/schema/preflight.py	2012-01-05 07:59:38 +0000
+++ database/schema/preflight.py	2012-07-16 17:53:49 +0000
@@ -17,8 +17,6 @@
 import os.path
 import time
 
-import psycopg2
-
 from lp.services.database.sqlbase import (
     connect,
     ISOLATION_LEVEL_AUTOCOMMIT,
@@ -30,6 +28,7 @@
     logger_options,
     )
 import replication.helpers
+from replication.helpers import Node
 import upgrade
 
 # Ignore connections by these users.
@@ -69,17 +68,16 @@
 
 
 class DatabasePreflight:
-    def __init__(self, log):
+    def __init__(self, log, standbys):
         master_con = connect(isolation=ISOLATION_LEVEL_AUTOCOMMIT)
 
         self.log = log
-        self.is_replicated = replication.helpers.slony_installed(master_con)
-        if self.is_replicated:
+        self.is_slony = replication.helpers.slony_installed(master_con)
+        if self.is_slony:
             self.nodes = set(
                 replication.helpers.get_all_cluster_nodes(master_con))
             for node in self.nodes:
-                node.con = psycopg2.connect(node.connection_string)
-                node.con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
+                node.con = node.connect(ISOLATION_LEVEL_AUTOCOMMIT)
 
             # Create a list of nodes subscribed to the replicated sets we
             # are modifying.
@@ -105,12 +103,39 @@
                 node for node in self.lpmain_nodes
                     if node.node_id == lpmain_master_node_id][0]
         else:
-            node = replication.helpers.Node(None, None, None, True)
+            node = Node(None, None, None, True)
             node.con = master_con
             self.nodes = set([node])
             self.lpmain_nodes = self.nodes
             self.lpmain_master_node = node
 
+        # Add streaming replication standbys, which unfortunately cannot be
+        # detected reliably and has to be passed in via the command line.
+        self._num_standbys = len(standbys)
+        for standby in standbys:
+            standby_node = Node(None, None, standby, False)
+            standby_node.con = standby_node.connect(
+                ISOLATION_LEVEL_AUTOCOMMIT)
+            self.nodes.add(standby_node)
+            self.lpmain_nodes.add(standby_node)
+
+    def check_standby_count(self):
+        # We sanity check the options as best we can to protect against
+        # operator error.
+        cur = self.lpmain_master_node.con.cursor()
+        cur.execute("SELECT COUNT(*) FROM pg_stat_replication")
+        required_standbys = cur.fetchone()[0]
+
+        if required_standbys != self._num_standbys:
+            self.log.fatal(
+                "%d streaming standbys connected, but %d provided on cli"
+                % (required_standbys, self._num_standbys))
+            return False
+        else:
+            self.log.info(
+                "%d streaming standby servers streaming", required_standbys)
+            return True
+
     def check_is_superuser(self):
         """Return True if all the node connections are as superusers."""
         success = True
@@ -232,30 +257,64 @@
 
     def check_replication_lag(self):
         """Return False if the replication cluster is badly lagged."""
-        if not self.is_replicated:
-            self.log.debug("Not replicated - no replication lag.")
-            return True
-
-        # Check replication lag on every node just in case there are
-        # disagreements.
-        max_lag = timedelta(seconds=-1)
-        for node in self.nodes:
-            cur = node.con.cursor()
-            cur.execute("""
-                SELECT current_database(),
-                max(st_lag_time) AS lag FROM _sl.sl_status
-            """)
-            dbname, lag = cur.fetchone()
-            if lag > max_lag:
-                max_lag = lag
+        slony_lagged = False
+        if self.is_slony:
+            # Check replication lag on every node just in case there are
+            # disagreements.
+            max_lag = timedelta(seconds=-1)
+            for node in self.nodes:
+                cur = node.con.cursor()
+                cur.execute("""
+                    SELECT current_database(),
+                    max(st_lag_time) AS lag FROM _sl.sl_status
+                """)
+                dbname, lag = cur.fetchone()
+                if lag > max_lag:
+                    max_lag = lag
+                self.log.debug(
+                    "%s reports database lag of %s.", dbname, lag)
+            if max_lag <= MAX_LAG:
+                self.log.info("Slony cluster lag is ok (%s)", max_lag)
+                slony_lagged = False
+            else:
+                self.log.fatal("Slony cluster lag is high (%s)", max_lag)
+                slony_lagged = True
+
+        # Do something harmless to force changes to be streamed in case
+        # system is idle.
+        self.lpmain_master_node.con.cursor().execute(
+            'ANALYZE LaunchpadDatabaseRevision')
+        start_time = time.time()
+        # Keep looking for low lag for 30 seconds, in case the system
+        # was idle and streaming needs time to kick in.
+        while time.time() < start_time + 30:
+            max_lag = timedelta(seconds=-1)
+            for node in self.nodes:
+                cur = node.con.cursor()
+                cur.execute("""
+                    SELECT current_setting('hot_standby') = 'on',
+                    now() - pg_last_xact_replay_timestamp()
+                    """)
+                is_standby, lag = cur.fetchone()
+                if is_standby:
+                    self.log.debug2('streaming lag %s', lag)
+                    max_lag = max(max_lag, lag)
+            if max_lag < MAX_LAG:
+                break
+            time.sleep(0.2)
+
+        if max_lag < timedelta(0):
+            streaming_lagged = False
+            self.log.debug("No streaming replication")
+        elif max_lag > MAX_LAG:
+            streaming_lagged = True
+            self.log.fatal("Streaming replication lag is high (%s)", max_lag)
+        else:
+            streaming_lagged = False
             self.log.debug(
-                "%s reports database lag of %s.", dbname, lag)
-        if max_lag <= MAX_LAG:
-            self.log.info("Database cluster lag is ok (%s)", max_lag)
-            return True
-        else:
-            self.log.fatal("Database cluster lag is high (%s)", max_lag)
-            return False
+                "Streaming replication lag is not high (%s)", max_lag)
+
+        return not (slony_lagged or streaming_lagged)
 
     def check_can_sync(self):
         """Return True if a sync event is acknowledged by all nodes.
@@ -263,9 +322,10 @@
         We only wait 30 seconds for the sync, because we require the
         cluster to be quiescent.
         """
-        if self.is_replicated:
-            success = replication.helpers.sync(30, exit_on_fail=False)
-            if success:
+        slony_success = True
+        if self.is_slony:
+            slony_success = replication.helpers.sync(30, exit_on_fail=False)
+            if slony_success:
                 self.log.info(
                     "Replication events are being propagated.")
             else:
@@ -275,9 +335,27 @@
                     "One or more replication daemons may be down.")
                 self.log.fatal(
                     "Bounce the replication daemons and check the logs.")
-            return success
-        else:
-            return True
+
+        streaming_success = False
+        # PG 9.1 streaming replication, or no replication.
+        #
+        cur = self.lpmain_master_node.con.cursor()
+        # Force a WAL switch, returning the current position.
+        cur.execute('SELECT pg_switch_xlog()')
+        wal_point = cur.fetchone()[0]
+        self.log.debug('WAL at %s', wal_point)
+        start_time = time.time()
+        while time.time() < start_time + 30:
+            cur.execute("""
+                SELECT FALSE FROM pg_stat_replication
+                WHERE replay_location < %s LIMIT 1
+                """, (wal_point,))
+            if cur.fetchone() is None:
+                # All slaves, possibly 0, are in sync.
+                streaming_success = True
+                break
+            time.sleep(0.2)
+        return slony_success and streaming_success
 
     def report_patches(self):
         """Report what patches are due to be applied from this tree."""
@@ -299,6 +377,8 @@
         self.report_patches()
 
         success = True
+        if not self.check_standby_count():
+            success = False
         if not self.check_replication_lag():
             success = False
         if not self.check_can_sync():
@@ -380,6 +460,10 @@
         "--kill-connections", dest='kill_connections',
         default=False, action="store_true",
         help="Kill non-system connections instead of reporting an error.")
+    parser.add_option(
+        '--standby', dest='standbys', default=[], action="append",
+        metavar='CONN_STR',
+        help="libpq connection string to a hot standby database")
     (options, args) = parser.parse_args()
     if args:
         parser.error("Too many arguments")
@@ -391,11 +475,11 @@
     log = logger(options)
 
     if options.kill_connections:
-        preflight_check = KillConnectionsPreflight(log)
+        preflight_check = KillConnectionsPreflight(log, options.standbys)
     elif options.skip_connection_check:
-        preflight_check = NoConnectionCheckPreflight(log)
+        preflight_check = NoConnectionCheckPreflight(log, options.standbys)
     else:
-        preflight_check = DatabasePreflight(log)
+        preflight_check = DatabasePreflight(log, options.standbys)
 
     if preflight_check.check_all():
         log.info('Preflight check succeeded. Good to go.')


Follow ups