launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #09965
[Merge] lp:~stub/launchpad/db-deploy into lp:launchpad
Stuart Bishop has proposed merging lp:~stub/launchpad/db-deploy into lp:launchpad.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #906222 in Launchpad itself: "More BAD_USERS creating false positives during fast downtime updates"
https://bugs.launchpad.net/launchpad/+bug/906222
Bug #911417 in Launchpad itself: "Staging db restore fails on make -C database/replication stagingsetup"
https://bugs.launchpad.net/launchpad/+bug/911417
For more details, see:
https://code.launchpad.net/~stub/launchpad/db-deploy/+merge/115194
= Summary =
The preflight.py and full-update.py deployment scripts need to cope with PostgreSQL 9.1 streaming replication *in addition* to slony replication, so we can transition.
== Proposed fix ==
== Pre-implementation notes ==
== LOC Rationale ==
== Implementation details ==
We can't detect connection strings for streaming replicas, so they need to be provided. I've elected to do this via command line, at least for now.
== Tests ==
== Demo and Q/A ==
= Launchpad lint =
Checking for conflicts and issues in changed files.
Linting changed files:
database/schema/preflight.py
database/schema/full-update.py
./database/schema/preflight.py
13: '_pythonpath' imported but unused
./database/schema/full-update.py
7: '_pythonpath' imported but unused
--
https://code.launchpad.net/~stub/launchpad/db-deploy/+merge/115194
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~stub/launchpad/db-deploy into lp:launchpad.
=== modified file 'database/schema/full-update.py'
--- database/schema/full-update.py 2012-07-03 08:04:35 +0000
+++ database/schema/full-update.py 2012-07-16 17:53:49 +0000
@@ -91,6 +91,13 @@
def main():
parser = OptionParser()
+ # Unfortunatly, we can't reliably detect streaming replicas so
+ # we pass them in on the command line.
+ parser.add_option(
+ '--standby', dest='standbys', default=[], action="append",
+ metavar='CONN_STR',
+ help="libpq connection string to a hot standby database")
+
# Add all the command command line arguments.
db_options(parser)
logger_options(parser)
@@ -113,7 +120,7 @@
# We initially ignore open connections, as they will shortly be
# killed.
- if not NoConnectionCheckPreflight(log).check_all():
+ if not NoConnectionCheckPreflight(log, options.standbys).check_all():
return 99
#
@@ -137,7 +144,7 @@
return pgbouncer_rc
pgbouncer_down = True
- if not KillConnectionsPreflight(log).check_all():
+ if not KillConnectionsPreflight(log, options.standbys).check_all():
return 100
log.info("Preflight check succeeded. Starting upgrade.")
@@ -164,7 +171,7 @@
# We will start seeing connections as soon as pgbouncer is
# reenabled, so ignore them here.
- if not NoConnectionCheckPreflight(log).check_all():
+ if not NoConnectionCheckPreflight(log, options.standbys).check_all():
return 101
log.info("All good. All done.")
=== modified file 'database/schema/preflight.py'
--- database/schema/preflight.py 2012-01-05 07:59:38 +0000
+++ database/schema/preflight.py 2012-07-16 17:53:49 +0000
@@ -17,8 +17,6 @@
import os.path
import time
-import psycopg2
-
from lp.services.database.sqlbase import (
connect,
ISOLATION_LEVEL_AUTOCOMMIT,
@@ -30,6 +28,7 @@
logger_options,
)
import replication.helpers
+from replication.helpers import Node
import upgrade
# Ignore connections by these users.
@@ -69,17 +68,16 @@
class DatabasePreflight:
- def __init__(self, log):
+ def __init__(self, log, standbys):
master_con = connect(isolation=ISOLATION_LEVEL_AUTOCOMMIT)
self.log = log
- self.is_replicated = replication.helpers.slony_installed(master_con)
- if self.is_replicated:
+ self.is_slony = replication.helpers.slony_installed(master_con)
+ if self.is_slony:
self.nodes = set(
replication.helpers.get_all_cluster_nodes(master_con))
for node in self.nodes:
- node.con = psycopg2.connect(node.connection_string)
- node.con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
+ node.con = node.connect(ISOLATION_LEVEL_AUTOCOMMIT)
# Create a list of nodes subscribed to the replicated sets we
# are modifying.
@@ -105,12 +103,39 @@
node for node in self.lpmain_nodes
if node.node_id == lpmain_master_node_id][0]
else:
- node = replication.helpers.Node(None, None, None, True)
+ node = Node(None, None, None, True)
node.con = master_con
self.nodes = set([node])
self.lpmain_nodes = self.nodes
self.lpmain_master_node = node
+ # Add streaming replication standbys, which unfortunately cannot be
+ # detected reliably and has to be passed in via the command line.
+ self._num_standbys = len(standbys)
+ for standby in standbys:
+ standby_node = Node(None, None, standby, False)
+ standby_node.con = standby_node.connect(
+ ISOLATION_LEVEL_AUTOCOMMIT)
+ self.nodes.add(standby_node)
+ self.lpmain_nodes.add(standby_node)
+
+ def check_standby_count(self):
+ # We sanity check the options as best we can to protect against
+ # operator error.
+ cur = self.lpmain_master_node.con.cursor()
+ cur.execute("SELECT COUNT(*) FROM pg_stat_replication")
+ required_standbys = cur.fetchone()[0]
+
+ if required_standbys != self._num_standbys:
+ self.log.fatal(
+ "%d streaming standbys connected, but %d provided on cli"
+ % (required_standbys, self._num_standbys))
+ return False
+ else:
+ self.log.info(
+ "%d streaming standby servers streaming", required_standbys)
+ return True
+
def check_is_superuser(self):
"""Return True if all the node connections are as superusers."""
success = True
@@ -232,30 +257,64 @@
def check_replication_lag(self):
"""Return False if the replication cluster is badly lagged."""
- if not self.is_replicated:
- self.log.debug("Not replicated - no replication lag.")
- return True
-
- # Check replication lag on every node just in case there are
- # disagreements.
- max_lag = timedelta(seconds=-1)
- for node in self.nodes:
- cur = node.con.cursor()
- cur.execute("""
- SELECT current_database(),
- max(st_lag_time) AS lag FROM _sl.sl_status
- """)
- dbname, lag = cur.fetchone()
- if lag > max_lag:
- max_lag = lag
+ slony_lagged = False
+ if self.is_slony:
+ # Check replication lag on every node just in case there are
+ # disagreements.
+ max_lag = timedelta(seconds=-1)
+ for node in self.nodes:
+ cur = node.con.cursor()
+ cur.execute("""
+ SELECT current_database(),
+ max(st_lag_time) AS lag FROM _sl.sl_status
+ """)
+ dbname, lag = cur.fetchone()
+ if lag > max_lag:
+ max_lag = lag
+ self.log.debug(
+ "%s reports database lag of %s.", dbname, lag)
+ if max_lag <= MAX_LAG:
+ self.log.info("Slony cluster lag is ok (%s)", max_lag)
+ slony_lagged = False
+ else:
+ self.log.fatal("Slony cluster lag is high (%s)", max_lag)
+ slony_lagged = True
+
+ # Do something harmless to force changes to be streamed in case
+ # system is idle.
+ self.lpmain_master_node.con.cursor().execute(
+ 'ANALYZE LaunchpadDatabaseRevision')
+ start_time = time.time()
+ # Keep looking for low lag for 30 seconds, in case the system
+ # was idle and streaming needs time to kick in.
+ while time.time() < start_time + 30:
+ max_lag = timedelta(seconds=-1)
+ for node in self.nodes:
+ cur = node.con.cursor()
+ cur.execute("""
+ SELECT current_setting('hot_standby') = 'on',
+ now() - pg_last_xact_replay_timestamp()
+ """)
+ is_standby, lag = cur.fetchone()
+ if is_standby:
+ self.log.debug2('streaming lag %s', lag)
+ max_lag = max(max_lag, lag)
+ if max_lag < MAX_LAG:
+ break
+ time.sleep(0.2)
+
+ if max_lag < timedelta(0):
+ streaming_lagged = False
+ self.log.debug("No streaming replication")
+ elif max_lag > MAX_LAG:
+ streaming_lagged = True
+ self.log.fatal("Streaming replication lag is high (%s)", max_lag)
+ else:
+ streaming_lagged = False
self.log.debug(
- "%s reports database lag of %s.", dbname, lag)
- if max_lag <= MAX_LAG:
- self.log.info("Database cluster lag is ok (%s)", max_lag)
- return True
- else:
- self.log.fatal("Database cluster lag is high (%s)", max_lag)
- return False
+ "Streaming replication lag is not high (%s)", max_lag)
+
+ return not (slony_lagged or streaming_lagged)
def check_can_sync(self):
"""Return True if a sync event is acknowledged by all nodes.
@@ -263,9 +322,10 @@
We only wait 30 seconds for the sync, because we require the
cluster to be quiescent.
"""
- if self.is_replicated:
- success = replication.helpers.sync(30, exit_on_fail=False)
- if success:
+ slony_success = True
+ if self.is_slony:
+ slony_success = replication.helpers.sync(30, exit_on_fail=False)
+ if slony_success:
self.log.info(
"Replication events are being propagated.")
else:
@@ -275,9 +335,27 @@
"One or more replication daemons may be down.")
self.log.fatal(
"Bounce the replication daemons and check the logs.")
- return success
- else:
- return True
+
+ streaming_success = False
+ # PG 9.1 streaming replication, or no replication.
+ #
+ cur = self.lpmain_master_node.con.cursor()
+ # Force a WAL switch, returning the current position.
+ cur.execute('SELECT pg_switch_xlog()')
+ wal_point = cur.fetchone()[0]
+ self.log.debug('WAL at %s', wal_point)
+ start_time = time.time()
+ while time.time() < start_time + 30:
+ cur.execute("""
+ SELECT FALSE FROM pg_stat_replication
+ WHERE replay_location < %s LIMIT 1
+ """, (wal_point,))
+ if cur.fetchone() is None:
+ # All slaves, possibly 0, are in sync.
+ streaming_success = True
+ break
+ time.sleep(0.2)
+ return slony_success and streaming_success
def report_patches(self):
"""Report what patches are due to be applied from this tree."""
@@ -299,6 +377,8 @@
self.report_patches()
success = True
+ if not self.check_standby_count():
+ success = False
if not self.check_replication_lag():
success = False
if not self.check_can_sync():
@@ -380,6 +460,10 @@
"--kill-connections", dest='kill_connections',
default=False, action="store_true",
help="Kill non-system connections instead of reporting an error.")
+ parser.add_option(
+ '--standby', dest='standbys', default=[], action="append",
+ metavar='CONN_STR',
+ help="libpq connection string to a hot standby database")
(options, args) = parser.parse_args()
if args:
parser.error("Too many arguments")
@@ -391,11 +475,11 @@
log = logger(options)
if options.kill_connections:
- preflight_check = KillConnectionsPreflight(log)
+ preflight_check = KillConnectionsPreflight(log, options.standbys)
elif options.skip_connection_check:
- preflight_check = NoConnectionCheckPreflight(log)
+ preflight_check = NoConnectionCheckPreflight(log, options.standbys)
else:
- preflight_check = DatabasePreflight(log)
+ preflight_check = DatabasePreflight(log, options.standbys)
if preflight_check.check_all():
log.info('Preflight check succeeded. Good to go.')
Follow ups