← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:replication-terminology into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:replication-terminology into launchpad:master.

Commit message:
Use primary/standby terminology in DB scripts

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/411550
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:replication-terminology into launchpad:master.
diff --git a/database/replication/helpers.py b/database/replication/helpers.py
index 726f1e6..050a75e 100644
--- a/database/replication/helpers.py
+++ b/database/replication/helpers.py
@@ -143,7 +143,7 @@ class TableReplicationInfo:
     """Internal table replication details."""
     table_id = None
     replication_set_id = None
-    master_node_id = None
+    primary_node_id = None
 
     def __init__(self, con, namespace, table_name):
         cur = con.cursor()
@@ -159,7 +159,7 @@ class TableReplicationInfo:
         row = cur.fetchone()
         if row is None:
             raise LookupError(fqn(namespace, table_name))
-        self.table_id, self.replication_set_id, self.master_node_id = row
+        self.table_id, self.replication_set_id, self.primary_node_id = row
 
 
 def sync(timeout, exit_on_fail=True):
@@ -205,10 +205,10 @@ def execute_slonik(script, sync=None, exit_on_fail=True, auto_preamble=True):
 
     if sync is not None:
         sync_script = dedent("""\
-            sync (id = @master_node);
+            sync (id = @primary_node);
             wait for event (
-                origin = @master_node, confirmed = ALL,
-                wait on = @master_node, timeout = %d);
+                origin = @primary_node, confirmed = ALL,
+                wait on = @primary_node, timeout = %d);
             """ % sync)
         script = script + sync_script
 
@@ -234,11 +234,11 @@ def execute_slonik(script, sync=None, exit_on_fail=True, auto_preamble=True):
 
 class Node:
     """Simple data structure for holding information about a Slony node."""
-    def __init__(self, node_id, nickname, connection_string, is_master):
+    def __init__(self, node_id, nickname, connection_string, is_primary):
         self.node_id = node_id
         self.nickname = nickname
         self.connection_string = connection_string
-        self.is_master = is_master
+        self.is_primary = is_primary
 
     def connect(self, isolation=ISOLATION_LEVEL_DEFAULT):
         con = psycopg2.connect(str(self.connection_string))
@@ -253,17 +253,17 @@ def _get_nodes(con, query):
     cur = con.cursor()
     cur.execute(query)
     nodes = []
-    for node_id, nickname, connection_string, is_master in cur.fetchall():
-        nodes.append(Node(node_id, nickname, connection_string, is_master))
+    for node_id, nickname, connection_string, is_primary in cur.fetchall():
+        nodes.append(Node(node_id, nickname, connection_string, is_primary))
     return nodes
 
 
-def get_master_node(con, set_id=1):
-    """Return the master Node, or None if the cluster is still being setup."""
+def get_primary_node(con, set_id=1):
+    """Return the primary Node, or None if the cluster is still being setup."""
     nodes = _get_nodes(con, """
         SELECT DISTINCT
             set_origin AS node_id,
-            'master',
+            'primary',
             pa_conninfo AS connection_string,
             True
         FROM _sl.sl_set
@@ -272,16 +272,16 @@ def get_master_node(con, set_id=1):
         """ % set_id)
     if not nodes:
         return None
-    assert len(nodes) == 1, "More than one master found for set %s" % set_id
+    assert len(nodes) == 1, "More than one primary found for set %s" % set_id
     return nodes[0]
 
 
-def get_slave_nodes(con, set_id=1):
-    """Return the list of slave Nodes."""
+def get_standby_nodes(con, set_id=1):
+    """Return the list of standby Nodes."""
     return _get_nodes(con, """
         SELECT DISTINCT
             pa_server AS node_id,
-            'slave' || pa_server,
+            'standby' || pa_server,
             pa_conninfo AS connection_string,
             False
         FROM _sl.sl_set
@@ -295,17 +295,17 @@ def get_slave_nodes(con, set_id=1):
 
 def get_nodes(con, set_id=1):
     """Return a list of all Nodes."""
-    master_node = get_master_node(con, set_id)
-    if master_node is None:
+    primary_node = get_primary_node(con, set_id)
+    if primary_node is None:
         return []
     else:
-        return [master_node] + get_slave_nodes(con, set_id)
+        return [primary_node] + get_standby_nodes(con, set_id)
 
 
 def get_all_cluster_nodes(con):
     """Return a list of all Nodes in the cluster.
 
-    node.is_master will be None, as this boolean doesn't make sense
+    node.is_primary will be None, as this boolean doesn't make sense
     in the context of a cluster rather than a single replication set.
     """
     if not slony_installed(con):
@@ -321,20 +321,20 @@ def get_all_cluster_nodes(con):
         """)
     if not nodes:
         # There are no subscriptions yet, so no paths. Generate the
-        # master Node.
+        # primary Node.
         cur = con.cursor()
         cur.execute("SELECT no_id from _sl.sl_node")
         node_ids = [row[0] for row in cur.fetchall()]
         if len(node_ids) == 0:
             return []
         assert len(node_ids) == 1, "Multiple nodes but no paths."
-        master_node_id = node_ids[0]
-        master_connection_string = ConnectionString(
+        primary_node_id = node_ids[0]
+        primary_connection_string = ConnectionString(
             config.database.rw_main_primary)
-        master_connection_string.user = 'slony'
+        primary_connection_string.user = 'slony'
         return [Node(
-            master_node_id, 'node%d_node' % master_node_id,
-            master_connection_string, True)]
+            primary_node_id, 'node%d_node' % primary_node_id,
+            primary_connection_string, True)]
     return nodes
 
 
@@ -344,10 +344,10 @@ def preamble(con=None):
     if con is None:
         con = connect(user='slony')
 
-    master_node = get_master_node(con)
+    primary_node = get_primary_node(con)
     nodes = get_all_cluster_nodes(con)
-    if master_node is None and len(nodes) == 1:
-        master_node = nodes[0]
+    if primary_node is None and len(nodes) == 1:
+        primary_node = nodes[0]
 
     preamble = [dedent("""\
         #
@@ -363,12 +363,12 @@ def preamble(con=None):
         define lpmirror_set %d;
         """ % (LPMAIN_SET_ID, HOLDING_SET_ID, SSO_SET_ID, LPMIRROR_SET_ID))]
 
-    if master_node is not None:
+    if primary_node is not None:
         preamble.append(dedent("""\
-        # Symbolic id for the main replication set master node.
-        define master_node %d;
-        define master_node_conninfo '%s';
-        """ % (master_node.node_id, master_node.connection_string)))
+        # Symbolic id for the main replication set primary node.
+        define primary_node %d;
+        define primary_node_conninfo '%s';
+        """ % (primary_node.node_id, primary_node.connection_string)))
 
     for node in nodes:
         preamble.append(dedent("""\
diff --git a/database/schema/dbcontroller.py b/database/schema/dbcontroller.py
index 79edf50..8892413 100644
--- a/database/schema/dbcontroller.py
+++ b/database/schema/dbcontroller.py
@@ -29,7 +29,7 @@ def pg_connect(conn_str):
 
 
 def streaming_sync(con, timeout=None):
-    """Wait for streaming replicas to synchronize with master as of now.
+    """Wait for streaming replicas to synchronize with primary as of now.
 
     :param timeout: seconds to wait, None for no timeout.
 
@@ -48,7 +48,7 @@ def streaming_sync(con, timeout=None):
             WHERE replay_lsn < %s LIMIT 1
             """, (wal_point,))
         if cur.fetchone() is None:
-            # All slaves, possibly 0, are in sync.
+            # All standbys, possibly 0, are in sync.
             return True
         time.sleep(0.2)
     return False
@@ -66,9 +66,9 @@ class DBController:
                 "pgbouncer administrative database not named 'pgbouncer'")
         self.pgbouncer_con = pg_connect(pgbouncer_conn_str)
 
-        self.master_name = None
-        self.master = None
-        self.slaves = {}
+        self.primary_name = None
+        self.primary = None
+        self.standbys = {}
 
         for db in self.pgbouncer_cmd('show databases', results=True):
             if db.database != dbname:
@@ -82,13 +82,13 @@ class DBController:
             cur = con.cursor()
             cur.execute('select pg_is_in_recovery()')
             if cur.fetchone()[0] is True:
-                self.slaves[db.name] = conn_str
+                self.standbys[db.name] = conn_str
             else:
-                self.master_name = db.name
-                self.master = conn_str
+                self.primary_name = db.name
+                self.primary = conn_str
 
-        if self.master_name is None:
-            log.fatal('No master detected.')
+        if self.primary_name is None:
+            log.fatal('No primary detected.')
             raise SystemExit(98)
 
     def pgbouncer_cmd(self, cmd, results):
@@ -98,9 +98,9 @@ class DBController:
             return cur.fetchall()
 
     def pause_replication(self):
-        names = self.slaves.keys()
+        names = self.standbys.keys()
         self.log.info("Pausing replication to %s.", ', '.join(names))
-        for name, conn_str in self.slaves.items():
+        for name, conn_str in self.standbys.items():
             try:
                 con = pg_connect(conn_str)
                 cur = con.cursor()
@@ -113,10 +113,10 @@ class DBController:
         return True
 
     def resume_replication(self):
-        names = self.slaves.keys()
+        names = self.standbys.keys()
         self.log.info("Resuming replication to %s.", ', '.join(names))
         success = True
-        for name, conn_str in self.slaves.items():
+        for name, conn_str in self.standbys.items():
             try:
                 con = pg_connect(conn_str)
                 cur = con.cursor()
@@ -137,7 +137,7 @@ class DBController:
         """
         success = True
         wait_for_sync = False
-        for name, conn_str in self.slaves.items():
+        for name, conn_str in self.standbys.items():
             try:
                 con = pg_connect(conn_str)
                 cur = con.cursor()
@@ -174,39 +174,39 @@ class DBController:
             self.log.error("Unable to enable %s (%s)", name, str(x))
             return False
 
-    def disable_master(self):
-        self.log.info("Disabling access to %s.", self.master_name)
-        return self.disable(self.master_name)
+    def disable_primary(self):
+        self.log.info("Disabling access to %s.", self.primary_name)
+        return self.disable(self.primary_name)
 
-    def enable_master(self):
-        self.log.info("Enabling access to %s.", self.master_name)
-        return self.enable(self.master_name)
+    def enable_primary(self):
+        self.log.info("Enabling access to %s.", self.primary_name)
+        return self.enable(self.primary_name)
 
-    def disable_slaves(self):
-        names = self.slaves.keys()
+    def disable_standbys(self):
+        names = self.standbys.keys()
         self.log.info(
             "Disabling access to %s.", ', '.join(names))
-        for name in self.slaves.keys():
+        for name in self.standbys.keys():
             if not self.disable(name):
                 return False  # Don't do further damage if we failed.
         return True
 
-    def enable_slaves(self):
-        names = self.slaves.keys()
+    def enable_standbys(self):
+        names = self.standbys.keys()
         self.log.info(
             "Enabling access to %s.", ', '.join(names))
         success = True
-        for name in self.slaves.keys():
+        for name in self.standbys.keys():
             if not self.enable(name):
                 success = False
         return success
 
     def sync(self):
-        sync = streaming_sync(pg_connect(self.master), STREAMING_SYNC_TIMEOUT)
+        sync = streaming_sync(pg_connect(self.primary), STREAMING_SYNC_TIMEOUT)
         if sync:
-            self.log.debug('Slaves in sync.')
+            self.log.debug('Standbys in sync.')
         else:
             self.log.error(
-                'Slaves failed to sync after %d seconds.',
+                'Standbys failed to sync after %d seconds.',
                 STREAMING_SYNC_TIMEOUT)
         return sync
diff --git a/database/schema/full-update.py b/database/schema/full-update.py
index 974c4ec..6ed32a2 100755
--- a/database/schema/full-update.py
+++ b/database/schema/full-update.py
@@ -26,7 +26,7 @@ import security  # security.py script
 import upgrade  # upgrade.py script
 
 
-def run_upgrade(options, log, master_con):
+def run_upgrade(options, log, primary_con):
     """Invoke upgrade.py in-process.
 
     It would be easier to just invoke the script, but this way we save
@@ -43,7 +43,7 @@ def run_upgrade(options, log, master_con):
     options.comments = False  # Saves about 1s. Apply comments manually.
     # Invoke the database schema upgrade process.
     try:
-        return upgrade.main(master_con)
+        return upgrade.main(primary_con)
     except Exception:
         log.exception('Unhandled exception')
         return 1
@@ -51,7 +51,7 @@ def run_upgrade(options, log, master_con):
         log.fatal("upgrade.py failed [%s]", x)
 
 
-def run_security(options, log, master_con):
+def run_security(options, log, primary_con):
     """Invoke security.py in-process.
 
     It would be easier to just invoke the script, but this way we save
@@ -65,7 +65,7 @@ def run_security(options, log, master_con):
     security.log = log
     # Invoke the database security reset process.
     try:
-        return security.main(options, master_con)
+        return security.main(options, primary_con)
     except Exception:
         log.exception('Unhandled exception')
         return 1
@@ -102,11 +102,11 @@ def main():
         log, options.pgbouncer, options.dbname, options.dbuser)
 
     try:
-        # Master connection, not running in autocommit to allow us to
+        # Primary connection, not running in autocommit to allow us to
         # rollback changes on failure.
-        master_con = psycopg2.connect(str(controller.master))
+        primary_con = psycopg2.connect(str(controller.primary))
     except Exception as x:
-        log.fatal("Unable to open connection to master db (%s)", str(x))
+        log.fatal("Unable to open connection to primary db (%s)", str(x))
         return 94
 
     # Preflight checks. Confirm as best we can that the upgrade will
@@ -125,8 +125,8 @@ def main():
     upgrade_run = False
     security_run = False
     replication_paused = False
-    master_disabled = False
-    slaves_disabled = False
+    primary_disabled = False
+    standbys_disabled = False
     outage_start = None
 
     try:
@@ -139,9 +139,9 @@ def main():
         log.info("Outage starts.")
         outage_start = datetime.now()
 
-        # Disable access and kill connections to the master database.
-        master_disabled = controller.disable_master()
-        if not master_disabled:
+        # Disable access and kill connections to the primary database.
+        primary_disabled = controller.disable_primary()
+        if not primary_disabled:
             return 95
 
         if not KillConnectionsPreflight(
@@ -150,47 +150,47 @@ def main():
             return 100
 
         log.info("Preflight check succeeded. Starting upgrade.")
-        # Does not commit master_con, even on success.
-        upgrade_rc = run_upgrade(options, log, master_con)
+        # Does not commit primary_con, even on success.
+        upgrade_rc = run_upgrade(options, log, primary_con)
         upgrade_run = (upgrade_rc == 0)
         if not upgrade_run:
             return upgrade_rc
         log.info("Database patches applied.")
 
-        # Commits master_con on success.
-        security_rc = run_security(options, log, master_con)
+        # Commits primary_con on success.
+        security_rc = run_security(options, log, primary_con)
         security_run = (security_rc == 0)
         if not security_run:
             return security_rc
 
-        master_disabled = not controller.enable_master()
-        if master_disabled:
+        primary_disabled = not controller.enable_primary()
+        if primary_disabled:
             log.warning("Outage ongoing until pgbouncer bounced.")
             return 96
         else:
             log.info("Outage complete. %s", datetime.now() - outage_start)
 
-        slaves_disabled = controller.disable_slaves()
+        standbys_disabled = controller.disable_standbys()
 
         # Resume replication.
         replication_paused = not controller.resume_replication()
         if replication_paused:
             log.error(
                 "Failed to resume replication. Run pg_wal_replay_pause() "
-                "on all slaves to manually resume.")
+                "on all standbys to manually resume.")
         else:
             if controller.sync():
-                log.info('Slaves in sync. Updates replicated.')
+                log.info('Standbys in sync. Updates replicated.')
             else:
                 log.error(
-                    'Slaves failed to sync. Updates may not be replicated.')
+                    'Standbys failed to sync. Updates may not be replicated.')
 
-        if slaves_disabled:
-            slaves_disabled = not controller.enable_slaves()
-            if slaves_disabled:
+        if standbys_disabled:
+            standbys_disabled = not controller.enable_standbys()
+            if standbys_disabled:
                 log.warning(
-                    "Failed to enable slave databases in pgbouncer. "
-                    "Now running in master-only mode.")
+                    "Failed to enable standby databases in pgbouncer. "
+                    "Now running in primary-only mode.")
 
         # We will start seeing connections as soon as pgbouncer is
         # reenabled, so ignore them here.
@@ -203,24 +203,24 @@ def main():
     finally:
         if not security_run:
             log.warning("Rolling back all schema and security changes.")
-            master_con.rollback()
+            primary_con.rollback()
 
         # Recovery if necessary.
-        if master_disabled:
-            if controller.enable_master():
+        if primary_disabled:
+            if controller.enable_primary():
                 log.warning(
-                    "Master reenabled despite earlier failures. "
+                    "Primary reenabled despite earlier failures. "
                     "Outage over %s, but we have problems",
                     str(datetime.now() - outage_start))
             else:
                 log.warning(
-                    "Master is still disabled in pgbouncer. Outage ongoing.")
+                    "Primary is still disabled in pgbouncer. Outage ongoing.")
 
         if replication_paused:
             controller.resume_replication()
 
-        if slaves_disabled:
-            controller.enable_slaves()
+        if standbys_disabled:
+            controller.enable_standbys()
 
 
 if __name__ == '__main__':
diff --git a/database/schema/preflight.py b/database/schema/preflight.py
index 9a8d8e5..d8eed83 100755
--- a/database/schema/preflight.py
+++ b/database/schema/preflight.py
@@ -74,20 +74,20 @@ MAX_LAG = timedelta(seconds=60)
 
 class DatabasePreflight:
     def __init__(self, log, controller, replication_paused=False):
-        master_con = psycopg2.connect(str(controller.master))
-        master_con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
+        primary_con = psycopg2.connect(str(controller.primary))
+        primary_con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
 
         self.log = log
         self.replication_paused = replication_paused
 
         node = Node(None, None, None, True)
-        node.con = master_con
+        node.con = primary_con
         self.nodes = set([node])
         self.lpmain_nodes = self.nodes
-        self.lpmain_master_node = node
+        self.lpmain_primary_node = node
 
         # Add streaming replication standbys.
-        standbys = set(controller.slaves.values())
+        standbys = set(controller.standbys.values())
         self._num_standbys = len(standbys)
         for standby in standbys:
             standby_node = Node(None, None, standby, False)
@@ -99,7 +99,7 @@ class DatabasePreflight:
     def check_standby_count(self):
         # We sanity check the options as best we can to protect against
         # operator error.
-        cur = self.lpmain_master_node.con.cursor()
+        cur = self.lpmain_primary_node.con.cursor()
         cur.execute("SELECT COUNT(*) FROM pg_stat_replication")
         required_standbys = cur.fetchone()[0]
 
@@ -237,7 +237,7 @@ class DatabasePreflight:
         """Return False if the replication cluster is badly lagged."""
         # Do something harmless to force changes to be streamed in case
         # system is idle.
-        self.lpmain_master_node.con.cursor().execute(
+        self.lpmain_primary_node.con.cursor().execute(
             'ANALYZE LaunchpadDatabaseRevision')
         start_time = time.time()
         # Keep looking for low lag for 30 seconds, in case the system
@@ -279,7 +279,7 @@ class DatabasePreflight:
         cluster to be quiescent.
         """
         # PG 9.1 streaming replication, or no replication.
-        streaming_success = streaming_sync(self.lpmain_master_node.con, 30)
+        streaming_success = streaming_sync(self.lpmain_primary_node.con, 30)
         if streaming_success:
             self.log.info("Streaming replicas syncing.")
         else:
@@ -289,7 +289,7 @@ class DatabasePreflight:
 
     def report_patches(self):
         """Report what patches are due to be applied from this tree."""
-        con = self.lpmain_master_node.con
+        con = self.lpmain_primary_node.con
         upgrade.log = self.log
         for patch_num, patch_file in upgrade.get_patchlist(con):
             self.log.info("%s is pending", os.path.basename(patch_file))
@@ -333,7 +333,7 @@ class KillConnectionsPreflight(DatabasePreflight):
     def check_open_connections(self):
         """Kill all non-system connections to Launchpad databases.
 
-        If replication is paused, only connections on the master database
+        If replication is paused, only connections on the primary database
         are killed.
 
         System users are defined by SYSTEM_USERS.
@@ -343,7 +343,7 @@ class KillConnectionsPreflight(DatabasePreflight):
         num_tries = 100
         seconds_to_pause = 0.1
         if self.replication_paused:
-            nodes = set([self.lpmain_master_node])
+            nodes = set([self.lpmain_primary_node])
         else:
             nodes = self.lpmain_nodes
 
diff --git a/database/schema/security.py b/database/schema/security.py
index 573486a..67eb59f 100755
--- a/database/schema/security.py
+++ b/database/schema/security.py
@@ -249,17 +249,17 @@ CONFIG_DEFAULTS = {
     }
 
 
-def main(options, master_con=None):
+def main(options, primary_con=None):
     # Load the config file
     config = ConfigParser(CONFIG_DEFAULTS)
     configfile_name = os.path.join(os.path.dirname(__file__), 'security.cfg')
     config.read([configfile_name])
 
-    if master_con is None:
-        master_con = connect()
+    if primary_con is None:
+        primary_con = connect()
 
     log.info("Resetting permissions.")
-    reset_permissions(master_con, config, options)
+    reset_permissions(primary_con, config, options)
     return 0