launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #27692
[Merge] ~cjwatson/launchpad:replication-terminology into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:replication-terminology into launchpad:master.
Commit message:
Use primary/standby terminology in DB scripts
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/411550
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:replication-terminology into launchpad:master.
diff --git a/database/replication/helpers.py b/database/replication/helpers.py
index 726f1e6..050a75e 100644
--- a/database/replication/helpers.py
+++ b/database/replication/helpers.py
@@ -143,7 +143,7 @@ class TableReplicationInfo:
"""Internal table replication details."""
table_id = None
replication_set_id = None
- master_node_id = None
+ primary_node_id = None
def __init__(self, con, namespace, table_name):
cur = con.cursor()
@@ -159,7 +159,7 @@ class TableReplicationInfo:
row = cur.fetchone()
if row is None:
raise LookupError(fqn(namespace, table_name))
- self.table_id, self.replication_set_id, self.master_node_id = row
+ self.table_id, self.replication_set_id, self.primary_node_id = row
def sync(timeout, exit_on_fail=True):
@@ -205,10 +205,10 @@ def execute_slonik(script, sync=None, exit_on_fail=True, auto_preamble=True):
if sync is not None:
sync_script = dedent("""\
- sync (id = @master_node);
+ sync (id = @primary_node);
wait for event (
- origin = @master_node, confirmed = ALL,
- wait on = @master_node, timeout = %d);
+ origin = @primary_node, confirmed = ALL,
+ wait on = @primary_node, timeout = %d);
""" % sync)
script = script + sync_script
@@ -234,11 +234,11 @@ def execute_slonik(script, sync=None, exit_on_fail=True, auto_preamble=True):
class Node:
"""Simple data structure for holding information about a Slony node."""
- def __init__(self, node_id, nickname, connection_string, is_master):
+ def __init__(self, node_id, nickname, connection_string, is_primary):
self.node_id = node_id
self.nickname = nickname
self.connection_string = connection_string
- self.is_master = is_master
+ self.is_primary = is_primary
def connect(self, isolation=ISOLATION_LEVEL_DEFAULT):
con = psycopg2.connect(str(self.connection_string))
@@ -253,17 +253,17 @@ def _get_nodes(con, query):
cur = con.cursor()
cur.execute(query)
nodes = []
- for node_id, nickname, connection_string, is_master in cur.fetchall():
- nodes.append(Node(node_id, nickname, connection_string, is_master))
+ for node_id, nickname, connection_string, is_primary in cur.fetchall():
+ nodes.append(Node(node_id, nickname, connection_string, is_primary))
return nodes
-def get_master_node(con, set_id=1):
- """Return the master Node, or None if the cluster is still being setup."""
+def get_primary_node(con, set_id=1):
+ """Return the primary Node, or None if the cluster is still being setup."""
nodes = _get_nodes(con, """
SELECT DISTINCT
set_origin AS node_id,
- 'master',
+ 'primary',
pa_conninfo AS connection_string,
True
FROM _sl.sl_set
@@ -272,16 +272,16 @@ def get_master_node(con, set_id=1):
""" % set_id)
if not nodes:
return None
- assert len(nodes) == 1, "More than one master found for set %s" % set_id
+ assert len(nodes) == 1, "More than one primary found for set %s" % set_id
return nodes[0]
-def get_slave_nodes(con, set_id=1):
- """Return the list of slave Nodes."""
+def get_standby_nodes(con, set_id=1):
+ """Return the list of standby Nodes."""
return _get_nodes(con, """
SELECT DISTINCT
pa_server AS node_id,
- 'slave' || pa_server,
+ 'standby' || pa_server,
pa_conninfo AS connection_string,
False
FROM _sl.sl_set
@@ -295,17 +295,17 @@ def get_slave_nodes(con, set_id=1):
def get_nodes(con, set_id=1):
"""Return a list of all Nodes."""
- master_node = get_master_node(con, set_id)
- if master_node is None:
+ primary_node = get_primary_node(con, set_id)
+ if primary_node is None:
return []
else:
- return [master_node] + get_slave_nodes(con, set_id)
+ return [primary_node] + get_standby_nodes(con, set_id)
def get_all_cluster_nodes(con):
"""Return a list of all Nodes in the cluster.
- node.is_master will be None, as this boolean doesn't make sense
+ node.is_primary will be None, as this boolean doesn't make sense
in the context of a cluster rather than a single replication set.
"""
if not slony_installed(con):
@@ -321,20 +321,20 @@ def get_all_cluster_nodes(con):
""")
if not nodes:
# There are no subscriptions yet, so no paths. Generate the
- # master Node.
+ # primary Node.
cur = con.cursor()
cur.execute("SELECT no_id from _sl.sl_node")
node_ids = [row[0] for row in cur.fetchall()]
if len(node_ids) == 0:
return []
assert len(node_ids) == 1, "Multiple nodes but no paths."
- master_node_id = node_ids[0]
- master_connection_string = ConnectionString(
+ primary_node_id = node_ids[0]
+ primary_connection_string = ConnectionString(
config.database.rw_main_primary)
- master_connection_string.user = 'slony'
+ primary_connection_string.user = 'slony'
return [Node(
- master_node_id, 'node%d_node' % master_node_id,
- master_connection_string, True)]
+ primary_node_id, 'node%d_node' % primary_node_id,
+ primary_connection_string, True)]
return nodes
@@ -344,10 +344,10 @@ def preamble(con=None):
if con is None:
con = connect(user='slony')
- master_node = get_master_node(con)
+ primary_node = get_primary_node(con)
nodes = get_all_cluster_nodes(con)
- if master_node is None and len(nodes) == 1:
- master_node = nodes[0]
+ if primary_node is None and len(nodes) == 1:
+ primary_node = nodes[0]
preamble = [dedent("""\
#
@@ -363,12 +363,12 @@ def preamble(con=None):
define lpmirror_set %d;
""" % (LPMAIN_SET_ID, HOLDING_SET_ID, SSO_SET_ID, LPMIRROR_SET_ID))]
- if master_node is not None:
+ if primary_node is not None:
preamble.append(dedent("""\
- # Symbolic id for the main replication set master node.
- define master_node %d;
- define master_node_conninfo '%s';
- """ % (master_node.node_id, master_node.connection_string)))
+ # Symbolic id for the main replication set primary node.
+ define primary_node %d;
+ define primary_node_conninfo '%s';
+ """ % (primary_node.node_id, primary_node.connection_string)))
for node in nodes:
preamble.append(dedent("""\
diff --git a/database/schema/dbcontroller.py b/database/schema/dbcontroller.py
index 79edf50..8892413 100644
--- a/database/schema/dbcontroller.py
+++ b/database/schema/dbcontroller.py
@@ -29,7 +29,7 @@ def pg_connect(conn_str):
def streaming_sync(con, timeout=None):
- """Wait for streaming replicas to synchronize with master as of now.
+ """Wait for streaming replicas to synchronize with primary as of now.
:param timeout: seconds to wait, None for no timeout.
@@ -48,7 +48,7 @@ def streaming_sync(con, timeout=None):
WHERE replay_lsn < %s LIMIT 1
""", (wal_point,))
if cur.fetchone() is None:
- # All slaves, possibly 0, are in sync.
+ # All standbys, possibly 0, are in sync.
return True
time.sleep(0.2)
return False
@@ -66,9 +66,9 @@ class DBController:
"pgbouncer administrative database not named 'pgbouncer'")
self.pgbouncer_con = pg_connect(pgbouncer_conn_str)
- self.master_name = None
- self.master = None
- self.slaves = {}
+ self.primary_name = None
+ self.primary = None
+ self.standbys = {}
for db in self.pgbouncer_cmd('show databases', results=True):
if db.database != dbname:
@@ -82,13 +82,13 @@ class DBController:
cur = con.cursor()
cur.execute('select pg_is_in_recovery()')
if cur.fetchone()[0] is True:
- self.slaves[db.name] = conn_str
+ self.standbys[db.name] = conn_str
else:
- self.master_name = db.name
- self.master = conn_str
+ self.primary_name = db.name
+ self.primary = conn_str
- if self.master_name is None:
- log.fatal('No master detected.')
+ if self.primary_name is None:
+ log.fatal('No primary detected.')
raise SystemExit(98)
def pgbouncer_cmd(self, cmd, results):
@@ -98,9 +98,9 @@ class DBController:
return cur.fetchall()
def pause_replication(self):
- names = self.slaves.keys()
+ names = self.standbys.keys()
self.log.info("Pausing replication to %s.", ', '.join(names))
- for name, conn_str in self.slaves.items():
+ for name, conn_str in self.standbys.items():
try:
con = pg_connect(conn_str)
cur = con.cursor()
@@ -113,10 +113,10 @@ class DBController:
return True
def resume_replication(self):
- names = self.slaves.keys()
+ names = self.standbys.keys()
self.log.info("Resuming replication to %s.", ', '.join(names))
success = True
- for name, conn_str in self.slaves.items():
+ for name, conn_str in self.standbys.items():
try:
con = pg_connect(conn_str)
cur = con.cursor()
@@ -137,7 +137,7 @@ class DBController:
"""
success = True
wait_for_sync = False
- for name, conn_str in self.slaves.items():
+ for name, conn_str in self.standbys.items():
try:
con = pg_connect(conn_str)
cur = con.cursor()
@@ -174,39 +174,39 @@ class DBController:
self.log.error("Unable to enable %s (%s)", name, str(x))
return False
- def disable_master(self):
- self.log.info("Disabling access to %s.", self.master_name)
- return self.disable(self.master_name)
+ def disable_primary(self):
+ self.log.info("Disabling access to %s.", self.primary_name)
+ return self.disable(self.primary_name)
- def enable_master(self):
- self.log.info("Enabling access to %s.", self.master_name)
- return self.enable(self.master_name)
+ def enable_primary(self):
+ self.log.info("Enabling access to %s.", self.primary_name)
+ return self.enable(self.primary_name)
- def disable_slaves(self):
- names = self.slaves.keys()
+ def disable_standbys(self):
+ names = self.standbys.keys()
self.log.info(
"Disabling access to %s.", ', '.join(names))
- for name in self.slaves.keys():
+ for name in self.standbys.keys():
if not self.disable(name):
return False # Don't do further damage if we failed.
return True
- def enable_slaves(self):
- names = self.slaves.keys()
+ def enable_standbys(self):
+ names = self.standbys.keys()
self.log.info(
"Enabling access to %s.", ', '.join(names))
success = True
- for name in self.slaves.keys():
+ for name in self.standbys.keys():
if not self.enable(name):
success = False
return success
def sync(self):
- sync = streaming_sync(pg_connect(self.master), STREAMING_SYNC_TIMEOUT)
+ sync = streaming_sync(pg_connect(self.primary), STREAMING_SYNC_TIMEOUT)
if sync:
- self.log.debug('Slaves in sync.')
+ self.log.debug('Standbys in sync.')
else:
self.log.error(
- 'Slaves failed to sync after %d seconds.',
+ 'Standbys failed to sync after %d seconds.',
STREAMING_SYNC_TIMEOUT)
return sync
diff --git a/database/schema/full-update.py b/database/schema/full-update.py
index 974c4ec..6ed32a2 100755
--- a/database/schema/full-update.py
+++ b/database/schema/full-update.py
@@ -26,7 +26,7 @@ import security # security.py script
import upgrade # upgrade.py script
-def run_upgrade(options, log, master_con):
+def run_upgrade(options, log, primary_con):
"""Invoke upgrade.py in-process.
It would be easier to just invoke the script, but this way we save
@@ -43,7 +43,7 @@ def run_upgrade(options, log, master_con):
options.comments = False # Saves about 1s. Apply comments manually.
# Invoke the database schema upgrade process.
try:
- return upgrade.main(master_con)
+ return upgrade.main(primary_con)
except Exception:
log.exception('Unhandled exception')
return 1
@@ -51,7 +51,7 @@ def run_upgrade(options, log, master_con):
log.fatal("upgrade.py failed [%s]", x)
-def run_security(options, log, master_con):
+def run_security(options, log, primary_con):
"""Invoke security.py in-process.
It would be easier to just invoke the script, but this way we save
@@ -65,7 +65,7 @@ def run_security(options, log, master_con):
security.log = log
# Invoke the database security reset process.
try:
- return security.main(options, master_con)
+ return security.main(options, primary_con)
except Exception:
log.exception('Unhandled exception')
return 1
@@ -102,11 +102,11 @@ def main():
log, options.pgbouncer, options.dbname, options.dbuser)
try:
- # Master connection, not running in autocommit to allow us to
+ # Primary connection, not running in autocommit to allow us to
# rollback changes on failure.
- master_con = psycopg2.connect(str(controller.master))
+ primary_con = psycopg2.connect(str(controller.primary))
except Exception as x:
- log.fatal("Unable to open connection to master db (%s)", str(x))
+ log.fatal("Unable to open connection to primary db (%s)", str(x))
return 94
# Preflight checks. Confirm as best we can that the upgrade will
@@ -125,8 +125,8 @@ def main():
upgrade_run = False
security_run = False
replication_paused = False
- master_disabled = False
- slaves_disabled = False
+ primary_disabled = False
+ standbys_disabled = False
outage_start = None
try:
@@ -139,9 +139,9 @@ def main():
log.info("Outage starts.")
outage_start = datetime.now()
- # Disable access and kill connections to the master database.
- master_disabled = controller.disable_master()
- if not master_disabled:
+ # Disable access and kill connections to the primary database.
+ primary_disabled = controller.disable_primary()
+ if not primary_disabled:
return 95
if not KillConnectionsPreflight(
@@ -150,47 +150,47 @@ def main():
return 100
log.info("Preflight check succeeded. Starting upgrade.")
- # Does not commit master_con, even on success.
- upgrade_rc = run_upgrade(options, log, master_con)
+ # Does not commit primary_con, even on success.
+ upgrade_rc = run_upgrade(options, log, primary_con)
upgrade_run = (upgrade_rc == 0)
if not upgrade_run:
return upgrade_rc
log.info("Database patches applied.")
- # Commits master_con on success.
- security_rc = run_security(options, log, master_con)
+ # Commits primary_con on success.
+ security_rc = run_security(options, log, primary_con)
security_run = (security_rc == 0)
if not security_run:
return security_rc
- master_disabled = not controller.enable_master()
- if master_disabled:
+ primary_disabled = not controller.enable_primary()
+ if primary_disabled:
log.warning("Outage ongoing until pgbouncer bounced.")
return 96
else:
log.info("Outage complete. %s", datetime.now() - outage_start)
- slaves_disabled = controller.disable_slaves()
+ standbys_disabled = controller.disable_standbys()
# Resume replication.
replication_paused = not controller.resume_replication()
if replication_paused:
log.error(
"Failed to resume replication. Run pg_wal_replay_pause() "
- "on all slaves to manually resume.")
+ "on all standbys to manually resume.")
else:
if controller.sync():
- log.info('Slaves in sync. Updates replicated.')
+ log.info('Standbys in sync. Updates replicated.')
else:
log.error(
- 'Slaves failed to sync. Updates may not be replicated.')
+ 'Standbys failed to sync. Updates may not be replicated.')
- if slaves_disabled:
- slaves_disabled = not controller.enable_slaves()
- if slaves_disabled:
+ if standbys_disabled:
+ standbys_disabled = not controller.enable_standbys()
+ if standbys_disabled:
log.warning(
- "Failed to enable slave databases in pgbouncer. "
- "Now running in master-only mode.")
+ "Failed to enable standby databases in pgbouncer. "
+ "Now running in primary-only mode.")
# We will start seeing connections as soon as pgbouncer is
# reenabled, so ignore them here.
@@ -203,24 +203,24 @@ def main():
finally:
if not security_run:
log.warning("Rolling back all schema and security changes.")
- master_con.rollback()
+ primary_con.rollback()
# Recovery if necessary.
- if master_disabled:
- if controller.enable_master():
+ if primary_disabled:
+ if controller.enable_primary():
log.warning(
- "Master reenabled despite earlier failures. "
+ "Primary reenabled despite earlier failures. "
"Outage over %s, but we have problems",
str(datetime.now() - outage_start))
else:
log.warning(
- "Master is still disabled in pgbouncer. Outage ongoing.")
+ "Primary is still disabled in pgbouncer. Outage ongoing.")
if replication_paused:
controller.resume_replication()
- if slaves_disabled:
- controller.enable_slaves()
+ if standbys_disabled:
+ controller.enable_standbys()
if __name__ == '__main__':
diff --git a/database/schema/preflight.py b/database/schema/preflight.py
index 9a8d8e5..d8eed83 100755
--- a/database/schema/preflight.py
+++ b/database/schema/preflight.py
@@ -74,20 +74,20 @@ MAX_LAG = timedelta(seconds=60)
class DatabasePreflight:
def __init__(self, log, controller, replication_paused=False):
- master_con = psycopg2.connect(str(controller.master))
- master_con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
+ primary_con = psycopg2.connect(str(controller.primary))
+ primary_con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.log = log
self.replication_paused = replication_paused
node = Node(None, None, None, True)
- node.con = master_con
+ node.con = primary_con
self.nodes = set([node])
self.lpmain_nodes = self.nodes
- self.lpmain_master_node = node
+ self.lpmain_primary_node = node
# Add streaming replication standbys.
- standbys = set(controller.slaves.values())
+ standbys = set(controller.standbys.values())
self._num_standbys = len(standbys)
for standby in standbys:
standby_node = Node(None, None, standby, False)
@@ -99,7 +99,7 @@ class DatabasePreflight:
def check_standby_count(self):
# We sanity check the options as best we can to protect against
# operator error.
- cur = self.lpmain_master_node.con.cursor()
+ cur = self.lpmain_primary_node.con.cursor()
cur.execute("SELECT COUNT(*) FROM pg_stat_replication")
required_standbys = cur.fetchone()[0]
@@ -237,7 +237,7 @@ class DatabasePreflight:
"""Return False if the replication cluster is badly lagged."""
# Do something harmless to force changes to be streamed in case
# system is idle.
- self.lpmain_master_node.con.cursor().execute(
+ self.lpmain_primary_node.con.cursor().execute(
'ANALYZE LaunchpadDatabaseRevision')
start_time = time.time()
# Keep looking for low lag for 30 seconds, in case the system
@@ -279,7 +279,7 @@ class DatabasePreflight:
cluster to be quiescent.
"""
# PG 9.1 streaming replication, or no replication.
- streaming_success = streaming_sync(self.lpmain_master_node.con, 30)
+ streaming_success = streaming_sync(self.lpmain_primary_node.con, 30)
if streaming_success:
self.log.info("Streaming replicas syncing.")
else:
@@ -289,7 +289,7 @@ class DatabasePreflight:
def report_patches(self):
"""Report what patches are due to be applied from this tree."""
- con = self.lpmain_master_node.con
+ con = self.lpmain_primary_node.con
upgrade.log = self.log
for patch_num, patch_file in upgrade.get_patchlist(con):
self.log.info("%s is pending", os.path.basename(patch_file))
@@ -333,7 +333,7 @@ class KillConnectionsPreflight(DatabasePreflight):
def check_open_connections(self):
"""Kill all non-system connections to Launchpad databases.
- If replication is paused, only connections on the master database
+ If replication is paused, only connections on the primary database
are killed.
System users are defined by SYSTEM_USERS.
@@ -343,7 +343,7 @@ class KillConnectionsPreflight(DatabasePreflight):
num_tries = 100
seconds_to_pause = 0.1
if self.replication_paused:
- nodes = set([self.lpmain_master_node])
+ nodes = set([self.lpmain_primary_node])
else:
nodes = self.lpmain_nodes
diff --git a/database/schema/security.py b/database/schema/security.py
index 573486a..67eb59f 100755
--- a/database/schema/security.py
+++ b/database/schema/security.py
@@ -249,17 +249,17 @@ CONFIG_DEFAULTS = {
}
-def main(options, master_con=None):
+def main(options, primary_con=None):
# Load the config file
config = ConfigParser(CONFIG_DEFAULTS)
configfile_name = os.path.join(os.path.dirname(__file__), 'security.cfg')
config.read([configfile_name])
- if master_con is None:
- master_con = connect()
+ if primary_con is None:
+ primary_con = connect()
log.info("Resetting permissions.")
- reset_permissions(master_con, config, options)
+ reset_permissions(primary_con, config, options)
return 0