launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #10668
[Merge] lp:~stub/launchpad/db-deploy into lp:launchpad
Stuart Bishop has proposed merging lp:~stub/launchpad/db-deploy into lp:launchpad.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #906222 in Launchpad itself: "More BAD_USERS creating false positives during fast downtime updates"
https://bugs.launchpad.net/launchpad/+bug/906222
Bug #911417 in Launchpad itself: "Staging db restore fails on make -C database/replication stagingsetup"
https://bugs.launchpad.net/launchpad/+bug/911417
Bug #1025396 in Launchpad itself: "deployment scripts need to cope with streaming replication"
https://bugs.launchpad.net/launchpad/+bug/1025396
Bug #1025399 in Launchpad itself: "full-update.py should wait for streaming slaves before completing"
https://bugs.launchpad.net/launchpad/+bug/1025399
For more details, see:
https://code.launchpad.net/~stub/launchpad/db-deploy/+merge/118535
= Summary =
We no longer need to involve Slony in db deploys
== Proposed fix ==
Amputation.
== Pre-implementation notes ==
== LOC Rationale ==
== Implementation details ==
== Tests ==
== Demo and Q/A ==
= Launchpad lint =
Checking for conflicts and issues in changed files.
Linting changed files:
database/schema/upgrade.py
database/replication/Makefile
database/schema/preflight.py
database/schema/full-update.py
./database/schema/upgrade.py
12: '_pythonpath' imported but unused
./database/replication/Makefile
78: Line exceeds 80 characters.
./database/schema/preflight.py
14: '_pythonpath' imported but unused
./database/schema/full-update.py
7: '_pythonpath' imported but unused
--
https://code.launchpad.net/~stub/launchpad/db-deploy/+merge/118535
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~stub/launchpad/db-deploy into lp:launchpad.
=== modified file 'database/replication/Makefile'
--- database/replication/Makefile 2012-07-09 17:01:51 +0000
+++ database/replication/Makefile 2012-08-07 11:14:21 +0000
@@ -70,7 +70,7 @@
# Apply database patches.
@echo Running upgrade.py `date`.
LPCONFIG=${STAGING_CONFIG} ${SHHH} ../schema/upgrade.py \
- --ignore-slony --log-file=INFO:${STAGING_LOGDIR}/dbupgrade.log
+ --log-file=INFO:${STAGING_LOGDIR}/dbupgrade.log
@echo Running security.py `date`
LPCONFIG=${STAGING_CONFIG} ${SHHH} ../schema/security.py \
--log-file=INFO:${STAGING_LOGDIR}/dbupgrade.log
@@ -90,5 +90,5 @@
pg_restore --dbname=${DOGFOOD_DBNAME} --no-acl --no-owner \
--use-list=${DUMPLIST} ${EXIT_ON_ERROR} ${DOGFOOD_DUMP}
rm ${DUMPLIST}
- ../schema/upgrade.py --ignore-slony -d ${DOGFOOD_DBNAME}
+ ../schema/upgrade.py -d ${DOGFOOD_DBNAME}
../schema/security.py -d ${DOGFOOD_DBNAME}
=== removed file 'database/replication/generate_migration.py'
--- database/replication/generate_migration.py 2011-12-30 06:47:54 +0000
+++ database/replication/generate_migration.py 1970-01-01 00:00:00 +0000
@@ -1,232 +0,0 @@
-#!/usr/bin/python -S
-# Copyright 2011 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Generate slonik scripts for Slony 1.2 to 2.0 migration.
-
-Remove this script after migration is complete.
-"""
-
-__metaclass__ = type
-__all__ = []
-
-import _pythonpath
-
-from optparse import OptionParser
-import os.path
-from textwrap import dedent
-
-from lp.services import scripts
-from lp.services.database.sqlbase import connect
-import replication.helpers
-from replication.helpers import (
- LPMAIN_SET_ID,
- LPMIRROR_SET_ID,
- SSO_SET_ID,
- get_all_cluster_nodes,
- get_master_node,
- )
-
-
-con = None
-options = None
-
-sets = {
- LPMAIN_SET_ID: 'lpmain_set',
- SSO_SET_ID: 'sso_set',
- LPMIRROR_SET_ID: 'lpmirror_set',
- }
-
-
-def outpath(filename):
- return os.path.join(options.outdir, filename)
-
-
-def message(outf, msg):
- assert "'" not in msg
- print >> outf, "echo '%s';" % msg
-
-
-def generate_preamble():
- outf = open(outpath('mig_preamble.sk'), 'w')
- print >> outf, replication.helpers.preamble(con)
-
- cur = con.cursor()
-
- for set_id, set_name in list(sets.items()):
- cur.execute(
- "SELECT set_origin FROM _sl.sl_set WHERE set_id=%s", [set_id])
- result = cur.fetchone()
- if result:
- origin = result[0]
- print >> outf, "define %s_origin %d;" % (set_name, origin)
- else:
- del sets[set_id] # For testing. Production will have 3 sets.
- outf.close()
-
-
-def generate_uninstall():
- outf = open(outpath('mig_uninstall.sk'), 'w')
- print >> outf, "# Uninstall Slony-I 1.2 from all nodes"
- print >> outf, "include <mig_preamble.sk>;"
-
- nodes = get_all_cluster_nodes(con)
-
- # Ensure everything is really, really synced since we will be
- # resubscribing with 'omit copy'
- for node in nodes:
- print >> outf, dedent("""\
- sync (id=%d);
- wait for event (origin=%d, confirmed=all, wait on=%d);
- """).strip() % (node.node_id, node.node_id, node.node_id)
-
- for node in nodes:
- message(outf, "Uninstall node %d" % node.node_id)
- print >> outf, "uninstall node (id=%d);" % node.node_id
- outf.close()
-
-
-def generate_sync():
- outf = open(outpath('mig_sync.sk'), 'w')
- message(outf, "Waiting for sync")
- print >> outf, "sync (id=@master_node);"
- print >> outf, dedent("""\
- wait for event (
- origin=@master_node, confirmed=all, wait on=@master_node);
- """).strip()
- outf.close()
-
-
-def generate_rebuild():
- outf = open(outpath('mig_rebuild.sk'), 'w')
- print >> outf, "# Rebuild the replication cluster with Slony-I 2.0"
- print >> outf, "include <mig_preamble.sk>;"
-
- nodes = get_all_cluster_nodes(con)
- first_node = nodes[0]
- remaining_nodes = nodes[1:]
-
- # Initialize the cluster
- message(outf, "Initializing cluster (node %d)" % first_node.node_id)
- print >> outf, "init cluster (id=%d);" % first_node.node_id
-
- # Create all the other nodes
- for node in remaining_nodes:
- message(outf, "Initializing node %d" % node.node_id)
- print >> outf, "store node (id=%d, event node=%d);" % (
- node.node_id, first_node.node_id)
-
- # Create paths so they can communicate.
- message(outf, "Storing %d paths" % pow(len(nodes), 2))
- for client_node in nodes:
- for server_node in nodes:
- print >> outf, (
- "store path (server=%d, client=%d, "
- "conninfo=@node%d_node_conninfo);" % (
- server_node.node_id, client_node.node_id,
- server_node.node_id))
-
- # sync to ensure replication is happening.
- print >> outf, "include <mig_sync.sk>;"
-
- # Create replication sets.
- for set_id, set_name in sets.items():
- generate_initialize_set(set_id, set_name, outf)
- print >> outf, "include <mig_sync.sk>;"
-
- # Subscribe slave nodes to replication sets.
- for set_id, set_name in sets.items():
- generate_subscribe_set(set_id, set_name, outf)
-
- outf.close()
-
-
-def generate_initialize_set(set_id, set_name, outf):
- origin_node = get_master_node(con, set_id)
- message(outf, "Creating %s origin %d" % (set_name, origin_node.node_id))
- print >> outf, "create set (id=%d, origin=@%s_origin, comment='%s');" % (
- set_id, set_name, set_name)
- # Need to connect to a node currently replicating the desired set.
- origin_con = origin_node.connect()
- cur = origin_con.cursor()
- cur.execute("""
- SELECT tab_id, tab_nspname, tab_relname, tab_comment
- FROM _sl.sl_table WHERE tab_set=%s
- """, (set_id,))
- results = cur.fetchall()
- message(outf, "Adding %d tables to %s" % (len(results), set_name))
- for tab_id, tab_nspname, tab_relname, tab_comment in results:
- if not tab_comment:
- tab_comment = ''
- print >> outf, dedent("""\
- set add table (
- set id=@%s, origin=@%s_origin, id=%d,
- fully qualified name='%s.%s',
- comment='%s');
- """).strip() % (
- set_name, set_name, tab_id,
- tab_nspname, tab_relname, tab_comment)
- cur.execute("""
- SELECT seq_id, seq_nspname, seq_relname, seq_comment
- FROM _sl.sl_sequence WHERE seq_set=%s
- """, (set_id,))
- results = cur.fetchall()
- message(outf, "Adding %d sequences to %s" % (len(results), set_name))
- for seq_id, seq_nspname, seq_relname, seq_comment in results:
- if not seq_comment:
- seq_comment = ''
- print >> outf, dedent("""\
- set add sequence (
- set id=@%s, origin=@%s_origin, id=%d,
- fully qualified name='%s.%s',
- comment='%s');
- """).strip() % (
- set_name, set_name, seq_id,
- seq_nspname, seq_relname, seq_comment)
-
-
-def generate_subscribe_set(set_id, set_name, outf):
- cur = con.cursor()
- cur.execute("""
- SELECT sub_receiver FROM _sl.sl_subscribe
- WHERE sub_set=%s and sub_active is true
- """, (set_id,))
- for receiver_id, in cur.fetchall():
- message(outf, "Subscribing node %d to %s" % (receiver_id, set_name))
- print >> outf, dedent("""\
- subscribe set (
- id=%d, provider=@%s_origin, receiver=%d,
- forward=true, omit copy=true);
- wait for event (
- origin=@%s_origin, confirmed=all, wait on=@%s_origin);
- """).strip() % (
- set_id, set_name, receiver_id,
- set_name, set_name)
- print >> outf, "include <mig_sync.sk>;"
-
-
-def main():
- parser = OptionParser()
- scripts.db_options(parser)
- parser.add_option(
- "-o", "--output-dir", dest='outdir', default=".",
- help="Write slonik scripts to DIR", metavar="DIR")
- global options
- options, args = parser.parse_args()
- if args:
- parser.error("Too many arguments")
- scripts.execute_zcml_for_scripts(use_web_security=False)
-
- global con
- con = connect()
-
- generate_preamble()
- generate_sync()
- generate_uninstall()
- generate_rebuild()
-
- return 0
-
-
-if __name__ == '__main__':
- raise SystemExit(main())
=== removed file 'database/replication/initialize.py'
--- database/replication/initialize.py 2011-12-30 06:47:54 +0000
+++ database/replication/initialize.py 1970-01-01 00:00:00 +0000
@@ -1,216 +0,0 @@
-#!/usr/bin/python -S
-#
-# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Initialize the cluster.
-
-This script is run once to convert a singledb Launchpad instance to
-a replicated setup.
-"""
-
-import _pythonpath
-
-from optparse import OptionParser
-import subprocess
-import sys
-
-import helpers
-
-from lp.services.config import config
-from lp.services.database.postgresql import (
- all_sequences_in_schema,
- all_tables_in_schema,
- ConnectionString,
- )
-from lp.services.database.sqlbase import (
- connect,
- ISOLATION_LEVEL_AUTOCOMMIT,
- )
-from lp.services.scripts import (
- db_options,
- logger,
- logger_options,
- )
-
-
-__metaclass__ = type
-__all__ = []
-
-
-# Global logger, initialized in main().
-log = None
-
-# Parsed command line options, initialized in main().
-options = None
-
-# Shared database cursor to the master, initialized in main().
-cur = None
-
-
-def duplicate_schema():
- """Duplicate the master schema into the slaves."""
- log.info('Duplicating database schema')
-
- master_cs = ConnectionString(config.database.rw_main_master)
- slave1_cs = ConnectionString(config.database.rw_main_slave)
-
- # We can't use pg_dump to replicate security as not all of the roles
- # may exist in the slave databases' clusters yet.
- cmd = "pg_dump -x -s %s | psql -q %s" % (
- master_cs.asPGCommandLineArgs(), slave1_cs.asPGCommandLineArgs())
- log.debug('Running %s' % cmd)
- rv = subprocess.call(cmd, shell=True)
- if rv != 0:
- log.fatal("Schema duplication failed, pg_dump returned %d" % rv)
- sys.exit(rv)
-
- # Now setup security on the slaves and create any needed roles,
- log.info('Setting up security on slave')
- cmd = "../schema/security.py %s" % slave1_cs.asLPCommandLineArgs()
- log.debug("Running %s" % cmd)
- rv = subprocess.call(cmd.split())
- if rv != 0:
- print >> sys.stderr, "ERR: security setup failed, returning %d" % rv
- sys.exit(rv)
-
-
-def initialize_cluster():
- """Initialize the cluster."""
- log.info('Initializing Slony-I cluster')
- master_connection_string = ConnectionString(
- config.database.rw_main_master)
- master_connection_string.user = 'slony'
- helpers.execute_slonik("""
- node 1 admin conninfo = '%s';
- try {
- echo 'Initializing cluster and Master node.';
- init cluster (id=1, comment='Master Node');
- }
- on success { echo 'Cluster initialized.'; }
- on error { echo 'Cluster initialization failed.'; exit 1; }
- """ % master_connection_string)
-
-
-def ensure_live():
- log.info('Ensuring slon daemons are live and propagating events.')
- # This will exit on failure.
- helpers.sync(120)
-
-
-def create_replication_sets(lpmain_tables, lpmain_sequences):
- """Create the replication sets."""
- log.info('Creating Slony-I replication sets.')
-
- script = ["try {"]
-
- entry_id = 1
-
- script.append("""
- echo 'Creating LPMain replication set (@lpmain_set)';
- create set (
- id=@lpmain_set, origin=@master_node,
- comment='Launchpad tables and sequences');
- """)
-
- script.append(
- "echo 'Adding %d tables to replication set @lpmain_set';"
- % len(lpmain_tables))
- for table in sorted(lpmain_tables):
- script.append("""
- set add table (
- set id=@lpmain_set,
- origin=@master_node,
- id=%(entry_id)d,
- fully qualified name='%(table)s');
- """ % vars())
- entry_id += 1
-
- entry_id = 1
- script.append(
- "echo 'Adding %d sequences to replication set @lpmain_set';"
- % len(lpmain_sequences))
- for sequence in sorted(lpmain_sequences):
- script.append("""
- set add sequence (
- set id=@lpmain_set,
- origin=@master_node,
- id=%(entry_id)d,
- fully qualified name='%(sequence)s');
- """ % vars())
- entry_id += 1
-
- script.append("""
- }
- on error { echo 'Failed.'; exit 1; }
- """)
- helpers.execute_slonik('\n'.join(script), sync=600)
-
- # Explode now if we have messed up.
- helpers.validate_replication(cur)
-
-
-def main():
- parser = OptionParser()
- db_options(parser)
- logger_options(parser)
-
- parser.set_defaults(dbuser='slony')
-
- global options
- options, args = parser.parse_args()
-
- global log
- log = logger(options)
-
- # Generate lists of sequences and tables for our replication sets.
- log.debug("Connecting as %s" % options.dbuser)
- con = connect()
- con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
- global cur
- cur = con.cursor()
- log.debug("Calculating lpmain replication set.")
- lpmain_tables, lpmain_sequences = helpers.calculate_replication_set(
- cur, helpers.LPMAIN_SEED)
-
- # Sanity check these lists - we want all objects in the public
- # schema to be in one and only one replication set.
- log.debug("Performing sanity checks.")
- fails = 0
- for table in all_tables_in_schema(cur, 'public'):
- times_seen = 0
- for table_set in [lpmain_tables, helpers.IGNORED_TABLES]:
- if table in table_set:
- times_seen += 1
- if times_seen == 0:
- log.error("%s not in any replication set." % table)
- fails += 1
- if times_seen > 1:
- log.error("%s is in multiple replication sets." % table)
- fails += 1
- for sequence in all_sequences_in_schema(cur, 'public'):
- times_seen = 0
- for sequence_set in [lpmain_sequences, helpers.IGNORED_SEQUENCES]:
- if sequence in sequence_set:
- times_seen += 1
- if times_seen == 0:
- log.error("%s not in any replication set." % sequence)
- fails += 1
- if times_seen > 1:
- log.error("%s is in multiple replication sets." % sequence)
- fails += 1
- if fails > 0:
- log.fatal("%d errors in replication set definitions." % fails)
- sys.exit(1)
-
- initialize_cluster()
-
- ensure_live()
-
- create_replication_sets(lpmain_tables, lpmain_sequences)
-
- helpers.sync(0)
-
-
-if __name__ == '__main__':
- sys.exit(main())
=== removed file 'database/replication/new-slave.py'
--- database/replication/new-slave.py 2012-06-29 08:40:05 +0000
+++ database/replication/new-slave.py 1970-01-01 00:00:00 +0000
@@ -1,261 +0,0 @@
-#!/usr/bin/python -S
-#
-# Copyright 2009-2012 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Bring a new slave online."""
-
-__metaclass__ = type
-__all__ = []
-
-import _pythonpath
-
-from optparse import OptionParser
-import subprocess
-import sys
-from textwrap import dedent
-import time
-
-import psycopg2
-
-from lp.services.database.postgresql import ConnectionString
-from lp.services.database.sqlbase import (
- connect_string,
- ISOLATION_LEVEL_AUTOCOMMIT,
- )
-from lp.services.scripts import (
- db_options,
- logger,
- logger_options,
- )
-import replication.helpers
-from replication.helpers import LPMAIN_SET_ID
-
-
-def main():
- parser = OptionParser(
- "Usage: %prog [options] node_id connection_string")
-
- db_options(parser)
- logger_options(parser)
-
- options, args = parser.parse_args()
-
- log = logger(options, 'new-slave')
-
- if len(args) != 2:
- parser.error("Missing required arguments.")
-
- node_id, raw_target_connection_string = args
-
- # Confirm we can connect to the source database.
- # Keep the connection as we need it later.
- source_connection_string = ConnectionString(connect_string(user='slony'))
- try:
- log.debug(
- "Opening source connection to '%s'" % source_connection_string)
- source_connection = psycopg2.connect(str(source_connection_string))
- source_connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
- except psycopg2.Error as exception:
- parser.error("Unable to connect as %s (%s)" % (
- source_connection_string, str(exception).strip()))
-
- # Confirm we are connected to a Slony-I node.
- if not replication.helpers.slony_installed(source_connection):
- parser.error(
- "Database at %s is not a Slony-I node."
- % source_connection_string)
-
- # Sanity check the given node_id.
- existing_nodes = replication.helpers.get_all_cluster_nodes(
- source_connection)
- try:
- node_id = int(node_id)
- except ValueError:
- parser.error("node_id must be a positive integer.")
- if node_id <= 0:
- parser.error("node_id must be a positive integer.")
-
- if node_id in [node.node_id for node in existing_nodes]:
- parser.error("Node %d already exists in the cluster." % node_id)
-
- # Get the connection string for masters.
- lpmain_connection_string = get_master_connection_string(
- source_connection, parser, LPMAIN_SET_ID) or source_connection_string
-
- # Sanity check the target connection string.
- target_connection_string = ConnectionString(raw_target_connection_string)
- if target_connection_string.user is None:
- target_connection_string.user = 'slony'
-
- # Make sure we can connect as the required users to our target.
- # Keep the connection as we need it.
- try:
- target_con = psycopg2.connect(str(target_connection_string))
- except psycopg2.Error as exception:
- parser.error("Failed to connect using '%s' (%s)" % (
- target_connection_string, str(exception).strip()))
-
- # Confirm the target database is sane. Check for common errors
- # that people might make when bringing new replicas online at 4am.
- cur = target_con.cursor()
- cur.execute("SHOW lc_collate")
- collation = cur.fetchone()[0]
- if collation != "C":
- parser.error(
- "Database at %s has incorrect collation (%s)" % (
- target_connection_string, collation))
- cur.execute("SHOW server_encoding")
- encoding = cur.fetchone()[0]
- if encoding != "UTF8":
- parser.error(
- "Database at %s has incorrect encoding (%s)" % (
- target_connection_string, encoding))
- cur.execute("""
- SELECT COUNT(*) FROM information_schema.tables
- WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
- """)
- num_existing_objects = cur.fetchone()[0]
- if num_existing_objects != 0:
- parser.error(
- "Database at %s is not empty." % target_connection_string)
- target_con.rollback()
-
- # Duplicate the full schema.
- log.info("Duplicating full db schema from '%s' to '%s'" % (
- lpmain_connection_string, target_connection_string))
- cmd = "pg_dump --schema-only %s | psql -1 -q %s" % (
- source_connection_string.asPGCommandLineArgs(),
- target_connection_string.asPGCommandLineArgs())
- if subprocess.call(cmd, shell=True) != 0:
- log.error("Failed to duplicate database schema.")
- return 1
-
- # Trash the broken Slony tables we just duplicated.
- log.debug("Removing slony cruft.")
- cur = target_con.cursor()
- cur.execute("DROP SCHEMA _sl CASCADE")
- target_con.commit()
- del target_con
-
- # Get a list of existing set ids that can be subscribed too. This
- # is all sets where the origin is the master_node. We
- # don't allow other sets where the master is configured as a
- # forwarding slave as we have to special case rebuilding the database
- # schema, and we want to avoid cascading slave configurations anyway
- # since we are running an antique Slony-I at the moment - keep it
- # simple!
- # We order the sets smallest to largest by number of tables.
- # This should let us subscribe the quickest sets first for more
- # immediate feedback.
- source_connection.rollback()
- master_node = replication.helpers.get_master_node(source_connection)
- cur = source_connection.cursor()
- cur.execute("""
- SELECT set_id
- FROM _sl.sl_set, (
- SELECT tab_set, count(*) AS tab_count
- FROM _sl.sl_table GROUP BY tab_set
- ) AS TableCounts
- WHERE
- set_origin=%d
- AND tab_set = set_id
- ORDER BY tab_count
- """
- % (master_node.node_id,))
- set_ids = [set_id for set_id, in cur.fetchall()]
- log.debug("Discovered set ids %s" % repr(list(set_ids)))
-
- # Generate and run a slonik(1) script to initialize the new node
- # and subscribe it to our replication sets.
- comment = 'New node created %s' % time.ctime()
- script = dedent("""\
- define new_node %d;
- define new_node_conninfo '%s';
- node @new_node admin conninfo = @new_node_conninfo;
-
- echo 'Initializing new node.';
- try {
- store node (id=@new_node, comment='%s', event node=@master_node);
- echo 'Creating new node paths.';
- """ % (node_id, target_connection_string, comment))
-
- for node in existing_nodes:
- script += dedent("""\
- store path (
- server=@%(nickname)s, client=@new_node,
- conninfo=@%(nickname)s_conninfo);
- store path (
- server=@new_node, client=@%(nickname)s,
- conninfo=@new_node_conninfo);
- """ % {'nickname': node.nickname})
-
- script += dedent("""\
- } on error { echo 'Failed.'; exit 1; }
-
- echo 'You may need to restart the Slony daemons now. If the first';
- echo 'of the following syncs passes then there is no need.';
- """)
-
- full_sync = []
- sync_nicknames = [node.nickname for node in existing_nodes] + ['new_node']
- for nickname in sync_nicknames:
- full_sync.append(dedent("""\
- echo 'Waiting for %(nickname)s sync.';
- sync (id=@%(nickname)s);
- wait for event (
- origin = @%(nickname)s, confirmed=ALL,
- wait on = @%(nickname)s, timeout=0);
- echo 'Ok. Replication syncing fine with new node.';
- """ % {'nickname': nickname}))
- full_sync = '\n'.join(full_sync)
- script += full_sync
-
- for set_id in set_ids:
- script += dedent("""\
- echo 'Subscribing new node to set %d.';
- subscribe set (
- id=%d, provider=@master_node, receiver=@new_node, forward=yes);
- echo 'Waiting for subscribe to start processing.';
- wait for event (
- origin = @master_node, confirmed = ALL,
- wait on = @master_node, timeout = 0);
- """ % (set_id, set_id))
-
- replication.helpers.execute_slonik(script)
-
- replication.helpers.validate_replication(source_connection.cursor())
-
- return 0
-
-
-def get_master_connection_string(con, parser, set_id):
- """Return the connection string to the origin for the replication set.
- """
- cur = con.cursor()
- cur.execute("""
- SELECT pa_conninfo FROM _sl.sl_set, _sl.sl_path
- WHERE set_origin = pa_server AND set_id = %d
- LIMIT 1
- """ % set_id)
- row = cur.fetchone()
- if row is None:
- # If we have no paths stored, there is only a single node in the
- # cluster.
- return None
- else:
- connection_string = ConnectionString(row[0])
-
- # Confirm we can connect from here.
- try:
- # Test connection only. We're not going to use it.
- psycopg2.connect(str(connection_string))
- except psycopg2.Error as exception:
- parser.error("Failed to connect to using '%s' (%s)" % (
- connection_string, str(exception).strip()))
-
- return connection_string
-
-
-if __name__ == '__main__':
- sys.exit(main())
=== removed file 'database/replication/repair-restored-db.py'
--- database/replication/repair-restored-db.py 2011-12-30 06:47:54 +0000
+++ database/replication/repair-restored-db.py 1970-01-01 00:00:00 +0000
@@ -1,134 +0,0 @@
-#!/usr/bin/python -S
-#
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Remove the broken Slony-I information from database populated by
-pg_restore(1).
-
-When you dump a database using pg_dump(1), the Slony-I install is dumped
-too. After restoring this dump, you have a non-functional Slony-I
-installation. If you are recovering the database for disaster recovery
-purposes, you can keep the current install by repairing it using the
-slonik(1) command REPAIR CONFIG. In other cases, you need to remove
-Slony-I from the database (eg. building a staging database, we need
-to install replication fresh.). This script does this procedure.
-"""
-
-__metaclass__ = type
-__all__ = []
-
-import _pythonpath
-
-from optparse import OptionParser
-import sys
-
-import psycopg2
-
-from lp.services.config import config
-from lp.services.database.postgresql import ConnectionString
-from lp.services.database.sqlbase import (
- connect,
- ISOLATION_LEVEL_AUTOCOMMIT,
- quote,
- )
-from lp.services.scripts import (
- db_options,
- logger,
- logger_options,
- )
-import replication.helpers
-
-
-def main():
- parser = OptionParser()
- db_options(parser)
- logger_options(parser)
-
- parser.set_defaults(dbuser='slony')
-
- options, args = parser.parse_args()
-
- log = logger(options)
-
- con = connect(isolation=ISOLATION_LEVEL_AUTOCOMMIT)
-
- if not replication.helpers.slony_installed(con):
- log.info("Slony-I not installed. Nothing to do.")
- return 0
-
- if not repair_with_slonik(log, options, con):
- repair_with_drop_schema(log, con)
-
- return 0
-
-
-def repair_with_slonik(log, options, con):
- """Attempt to uninstall Slony-I via 'UNINSTALL NODE' per best practice.
-
- Returns True on success, False if unable to do so for any reason.
- """
- cur = con.cursor()
-
- # Determine the node id the database thinks it is.
- try:
- cmd = "SELECT %s.getlocalnodeid(%s)" % (
- replication.helpers.CLUSTER_NAMESPACE,
- quote(replication.helpers.CLUSTER_NAMESPACE))
- cur.execute(cmd)
- node_id = cur.fetchone()[0]
- log.debug("Node Id is %d" % node_id)
-
- # Get a list of set ids in the database.
- cur.execute(
- "SELECT DISTINCT set_id FROM %s.sl_set"
- % replication.helpers.CLUSTER_NAMESPACE)
- set_ids = set(row[0] for row in cur.fetchall())
- log.debug("Set Ids are %s" % repr(set_ids))
-
- except psycopg2.InternalError:
- # Not enough information to determine node id. Possibly
- # this is an empty database.
- log.debug('Broken or no Slony-I install.')
- return False
-
- connection_string = ConnectionString(config.database.rw_main_master)
- if options.dbname:
- connection_string.dbname = options.dbname
- if options.dbuser:
- connection_string.user = options.dbuser
- if options.dbhost:
- connection_string.host = options.dbhost
-
- script = [
- "cluster name = %s;" % replication.helpers.CLUSTERNAME,
- "node %d admin conninfo = '%s';" % (node_id, connection_string),
- ]
- for set_id in set_ids:
- script.append(
- "repair config (set id=%d, event node=%d, execute only on=%d);"
- % (set_id, node_id, node_id))
- script.append("uninstall node (id=%d);" % node_id)
- for line in script:
- log.debug(line)
- script = '\n'.join(script)
-
- return replication.helpers.execute_slonik(
- script, auto_preamble=False, exit_on_fail=False)
-
-
-def repair_with_drop_schema(log, con):
- """
- Just drop the _sl schema as it is 'good enough' with Slony-I 1.2.
-
- This mechanism fails with Slony added primary keys, but we don't
- do that.
- """
- log.info('Fallback mode - dropping _sl schema.')
- cur = con.cursor()
- cur.execute("DROP SCHEMA _sl CASCADE")
- return True
-
-
-if __name__ == '__main__':
- sys.exit(main())
=== removed file 'database/replication/report.py'
--- database/replication/report.py 2012-01-06 11:08:30 +0000
+++ database/replication/report.py 1970-01-01 00:00:00 +0000
@@ -1,288 +0,0 @@
-#!/usr/bin/python -S
-#
-# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Generate a report on the replication setup.
-
-This report spits out whatever we consider useful for checking up on and
-diagnosing replication. This report will grow over time, and maybe some
-bits of this will move to separate monitoring systems or reports.
-
-See the Slony-I documentation for more discussion on the data presented
-by this report.
-"""
-
-__metaclass__ = type
-__all__ = []
-
-import _pythonpath
-
-from cgi import escape as html_escape
-from cStringIO import StringIO
-from optparse import OptionParser
-import sys
-
-from lp.services.database.sqlbase import (
- connect,
- quote_identifier,
- sqlvalues,
- )
-from lp.services.scripts import db_options
-import replication.helpers
-
-
-class Table:
- """Representation of a table.
-
- :ivar labels: List of labels to render as the table's first row.
- :ivar rows: List of rows, each being a list of strings.
- """
- def __init__(self, labels=None):
- if labels is None:
- self.labels = []
- else:
- self.labels = labels[:]
- self.rows = []
-
-
-class HtmlReport:
-
- def alert(self, text):
- """Return text marked up to be noticed."""
- return '<span style="alert">%s</span>' % html_escape(text)
-
- def table(self, table):
- """Return the rendered table."""
- out = StringIO()
- print >> out, "<table>"
- if table.labels:
- print >> out, "<tr>"
- for label in table.labels:
- print >> out, "<th>%s</th>" % html_escape(unicode(label))
- print >> out, "</tr>"
-
- for row in table.rows:
- print >> out, "<tr>"
- for cell in row:
- print >> out, "<td>%s</td>" % html_escape(unicode(cell))
- print >> out, "</tr>"
-
- print >> out, "</table>"
-
- return out.getvalue()
-
-
-class TextReport:
- def alert(self, text):
- return text
-
- def table(self, table):
- max_col_widths = []
- for label in table.labels:
- max_col_widths.append(len(label))
- for row in table.rows:
- for col_idx, col in enumerate(row):
- max_col_widths[col_idx] = max(
- len(str(col)), max_col_widths[col_idx])
-
- out = StringIO()
- for label_idx in range(0, len(table.labels)):
- print >> out, table.labels[label_idx].ljust(
- max_col_widths[label_idx]),
- print >> out
- for width in max_col_widths:
- print >> out, '=' * width,
- print >> out
- for row in table.rows:
- row = list(row)
- for col_idx in range(0, len(row)):
- print >> out, str(
- row[col_idx]).ljust(max_col_widths[col_idx]),
- print >> out
- print >> out
-
- return out.getvalue()
-
-
-def node_overview_report(cur, options):
- """Dumps the sl_node table in a human readable format.
-
- This report tells us which nodes are active and which are inactive.
- """
- report = options.mode()
- table = Table(["Node", "Comment", "Active"])
-
- cur.execute("""
- SELECT no_id, no_comment, no_active
- FROM sl_node
- ORDER BY no_comment
- """)
- for node_id, node_comment, node_active in cur.fetchall():
- if node_active:
- node_active_text = 'Active'
- else:
- node_active_text = report.alert('Inactive')
- table.rows.append([
- 'Node %d' % node_id, node_comment, node_active_text])
-
- return report.table(table)
-
-
-def paths_report(cur, options):
- """Dumps the sl_paths table in a human readable format.
-
- This report describes how nodes will attempt to connect to each other
- if they need to, allowing you to sanity check the settings and pick up
- obvious misconfigurations that would stop Slony daemons from being able
- to connect to one or more nodes, blocking replication.
- """
- report = options.mode()
- table = Table(["From client node", "To server node", "Via connection"])
-
- cur.execute("""
- SELECT pa_client, pa_server, pa_conninfo
- FROM sl_path
- ORDER BY pa_client, pa_server
- """)
- for row in cur.fetchall():
- table.rows.append(row)
-
- return report.table(table)
-
-
-def listen_report(cur, options):
- """Dumps the sl_listen table in a human readable format.
-
- This report shows you the tree of which nodes a node needs to check
- for events on.
- """
- report = options.mode()
- table = Table(["Node", "Listens To", "Via"])
-
- cur.execute("""
- SELECT li_receiver, li_origin, li_provider
- FROM sl_listen
- ORDER BY li_receiver, li_origin, li_provider
- """)
- for row in cur.fetchall():
- table.rows.append(['Node %s' % node for node in row])
- return report.table(table)
-
-
-def subscribe_report(cur, options):
- """Dumps the sl_subscribe table in a human readable format.
-
- This report shows the subscription tree - which nodes provide
- a replication set to which subscriber.
- """
-
- report = options.mode()
- table = Table([
- "Set", "Is Provided By", "Is Received By",
- "Is Forwardable", "Is Active"])
- cur.execute("""
- SELECT sub_set, sub_provider, sub_receiver, sub_forward, sub_active
- FROM sl_subscribe ORDER BY sub_set, sub_provider, sub_receiver
- """)
- for set_, provider, receiver, forward, active in cur.fetchall():
- if active:
- active_text = 'Active'
- else:
- active_text = report.alert('Inactive')
- table.rows.append([
- "Set %d" % set_, "Node %d" % provider, "Node %d" % receiver,
- str(forward), active_text])
- return report.table(table)
-
-
-def tables_report(cur, options):
- """Dumps the sl_table table in a human readable format.
-
- This report shows which tables are being replicated and in which
- replication set. It also importantly shows the internal Slony id used
- for a table, which is needed for slonik scripts as slonik is incapable
- of doing the tablename -> Slony id mapping itself.
- """
- report = options.mode()
- table = Table(["Set", "Schema", "Table", "Table Id"])
- cur.execute("""
- SELECT tab_set, nspname, relname, tab_id, tab_idxname, tab_comment
- FROM sl_table, pg_class, pg_namespace
- WHERE tab_reloid = pg_class.oid AND relnamespace = pg_namespace.oid
- ORDER BY tab_set, nspname, relname
- """)
- for set_, namespace, tablename, table_id, key, comment in cur.fetchall():
- table.rows.append([
- "Set %d" % set_, namespace, tablename, str(table_id)])
- return report.table(table)
-
-
-def sequences_report(cur, options):
- """Dumps the sl_sequences table in a human readable format.
-
- This report shows which sequences are being replicated and in which
- replication set. It also importantly shows the internal Slony id used
- for a sequence, which is needed for slonik scripts as slonik is incapable
- of doing the tablename -> Slony id mapping itself.
- """
- report = options.mode()
- table = Table(["Set", "Schema", "Sequence", "Sequence Id"])
- cur.execute("""
- SELECT seq_set, nspname, relname, seq_id, seq_comment
- FROM sl_sequence, pg_class, pg_namespace
- WHERE seq_reloid = pg_class.oid AND relnamespace = pg_namespace.oid
- ORDER BY seq_set, nspname, relname
- """)
- for set_, namespace, tablename, table_id, comment in cur.fetchall():
- table.rows.append([
- "Set %d" % set_, namespace, tablename, str(table_id)])
- return report.table(table)
-
-
-def main():
- parser = OptionParser()
-
- parser.add_option(
- "-f", "--format", dest="mode", default="text",
- choices=['text', 'html'],
- help="Output format MODE", metavar="MODE")
- db_options(parser)
-
- options, args = parser.parse_args()
-
- if options.mode == "text":
- options.mode = TextReport
- elif options.mode == "html":
- options.mode = HtmlReport
- else:
- assert False, "Unknown mode %s" % options.mode
-
- con = connect()
- cur = con.cursor()
-
- cur.execute(
- "SELECT TRUE FROM pg_namespace WHERE nspname=%s"
- % sqlvalues(replication.helpers.CLUSTER_NAMESPACE))
-
- if cur.fetchone() is None:
- parser.error(
- "No Slony-I cluster called %s in that database"
- % replication.helpers.CLUSTERNAME)
- return 1
-
- # Set our search path to the schema of the cluster we care about.
- cur.execute(
- "SET search_path TO %s, public"
- % quote_identifier(replication.helpers.CLUSTER_NAMESPACE))
-
- print node_overview_report(cur, options)
- print paths_report(cur, options)
- print listen_report(cur, options)
- print subscribe_report(cur, options)
- print tables_report(cur, options)
- print sequences_report(cur, options)
-
-
-if __name__ == '__main__':
- sys.exit(main())
=== removed file 'database/replication/slon_ctl.py'
--- database/replication/slon_ctl.py 2011-12-30 06:47:54 +0000
+++ database/replication/slon_ctl.py 1970-01-01 00:00:00 +0000
@@ -1,161 +0,0 @@
-#!/usr/bin/python -S
-#
-# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Startup and shutdown slon processes.
-
-On production and staging we probably want to use the standard
-/etc/init.d/slony1 script instead of this tool.
-"""
-
-import _pythonpath
-
-from optparse import OptionParser
-import os.path
-import subprocess
-import sys
-
-from lp.services.config import config
-from lp.services.database.sqlbase import connect
-from lp.services.scripts import (
- logger,
- logger_options,
- )
-import replication.helpers
-
-
-__metaclass__ = type
-__all__ = []
-
-
-def main():
- parser = OptionParser(
- "Usage: %prog [options] "
- "[start [nickname connection_string] | stop [nickname]]")
- parser.add_option(
- '-l', '--lag', default=None, dest="lag", metavar='PGINTERVAL',
- help="Lag events by PGINTERVAL, such as '10 seconds' or '2 minutes'")
- logger_options(parser)
- options, args = parser.parse_args()
-
- if len(args) == 0:
- parser.error("No command given.")
-
- command = args[0]
-
- if command not in ['start', 'stop']:
- parser.error("Unknown command %s." % command)
-
- if len(args) == 1:
- explicit = None
-
- elif command == 'start':
- if len(args) == 2:
- parser.error(
- "nickname and connection_string required for %s command"
- % command)
- elif len(args) == 3:
- explicit = replication.helpers.Node(
- None, args[1], args[2], None)
- else:
- parser.error("Too many arguments.")
-
- elif command == 'stop':
- if len(args) in (2, 3):
- explicit = replication.helpers.Node(
- None, args[1], None, None)
- else:
- parser.error("Too many arguments.")
-
- else:
- parser.error("Unknown command %s." % command)
-
- log = logger(options)
-
- if explicit is not None:
- nodes = [explicit]
- else:
- nodes = replication.helpers.get_all_cluster_nodes(
- connect(user='slony'))
-
- if command == 'start':
- return start(log, nodes, options.lag)
- else:
- return stop(log, nodes)
-
-
-def get_pidfile(nickname):
- return os.path.join(
- config.canonical.pid_dir, 'lpslon_%s_%s.pid' % (
- nickname, config.instance_name))
-
-
-def get_logfile(nickname):
- logdir = config.database.replication_logdir
- if not os.path.isabs(logdir):
- logdir = os.path.normpath(os.path.join(config.root, logdir))
- return os.path.join(
- logdir, 'lpslon_%s_%s.log' % (nickname, config.instance_name))
-
-
-def start(log, nodes, lag=None):
- for node in nodes:
- pidfile = get_pidfile(node.nickname)
- logfile = get_logfile(node.nickname)
-
- log.info("Starting %s slon daemon." % node.nickname)
- log.debug("Logging to %s" % logfile)
- log.debug("PID file %s" % pidfile)
- # Hard code suitable command line arguments for development.
- slon_args = "-d 2 -s 500 -t 2500"
- if lag is not None:
- slon_args = "%s -l '%s'" % (slon_args, lag)
- cmd = [
- "/sbin/start-stop-daemon",
- "--start",
- "--background",
- "--pidfile", pidfile,
- "--oknodo",
- "--exec", "/usr/bin/slon",
- "--startas", "/bin/sh",
- "--", "-c",
- "slon -p %s %s %s '%s' >> %s" % (
- pidfile, slon_args, replication.helpers.CLUSTERNAME,
- node.connection_string, logfile)]
- log.debug("Running %s" % repr(cmd))
- return_code = subprocess.call(cmd)
- if return_code != 0:
- log.fatal("Failed. Return code %s" % return_code)
- return return_code
-
- return 0
-
-
-def stop(log, nodes):
- for node in nodes:
- pidfile = get_pidfile(node.nickname)
-
- if not os.path.exists(pidfile):
- log.info(
- "%s slon daemon not running. Doing nothing."
- % node.nickname)
- continue
- log.info("Stopping %s slon daemon." % node.nickname)
- log.debug("PID file %s" % pidfile)
- cmd = [
- "/sbin/start-stop-daemon",
- "--stop",
- "--pidfile", pidfile,
- "--oknodo"]
- log.debug("Running %s" % repr(cmd))
- return_code = subprocess.call(cmd)
- if return_code != 0:
- log.fatal("Failed. Return code %s" % return_code)
- return return_code
-
- return 0
-
-
-if __name__ == '__main__':
- sys.exit(main())
=== modified file 'database/schema/full-update.py'
--- database/schema/full-update.py 2012-07-19 15:05:21 +0000
+++ database/schema/full-update.py 2012-08-07 11:14:21 +0000
@@ -57,7 +57,6 @@
# Fake expected command line arguments and global log
options.commit = True
options.partial = False
- options.ignore_slony = False
upgrade.options = options
upgrade.log = log
# Invoke the database schema upgrade process.
=== modified file 'database/schema/preflight.py'
--- database/schema/preflight.py 2012-07-23 10:13:24 +0000
+++ database/schema/preflight.py 2012-08-07 11:14:21 +0000
@@ -28,7 +28,6 @@
logger,
logger_options,
)
-import replication.helpers
from replication.helpers import Node
import upgrade
@@ -73,42 +72,11 @@
master_con = connect(isolation=ISOLATION_LEVEL_AUTOCOMMIT)
self.log = log
- self.is_slony = replication.helpers.slony_installed(master_con)
- if self.is_slony:
- self.nodes = set(
- replication.helpers.get_all_cluster_nodes(master_con))
- for node in self.nodes:
- node.con = node.connect(ISOLATION_LEVEL_AUTOCOMMIT)
-
- # Create a list of nodes subscribed to the replicated sets we
- # are modifying.
- cur = master_con.cursor()
- cur.execute("""
- WITH subscriptions AS (
- SELECT *
- FROM _sl.sl_subscribe
- WHERE sub_set = 1 AND sub_active IS TRUE)
- SELECT sub_provider FROM subscriptions
- UNION
- SELECT sub_receiver FROM subscriptions
- """)
- lpmain_node_ids = set(row[0] for row in cur.fetchall())
- self.lpmain_nodes = set(
- node for node in self.nodes
- if node.node_id in lpmain_node_ids)
-
- # Store a reference to the lpmain origin.
- lpmain_master_node_id = replication.helpers.get_master_node(
- master_con, 1).node_id
- self.lpmain_master_node = [
- node for node in self.lpmain_nodes
- if node.node_id == lpmain_master_node_id][0]
- else:
- node = Node(None, None, None, True)
- node.con = master_con
- self.nodes = set([node])
- self.lpmain_nodes = self.nodes
- self.lpmain_master_node = node
+ node = Node(None, None, None, True)
+ node.con = master_con
+ self.nodes = set([node])
+ self.lpmain_nodes = self.nodes
+ self.lpmain_master_node = node
# Add streaming replication standbys, which unfortunately cannot be
# detected reliably and has to be passed in via the command line.
@@ -258,29 +226,6 @@
def check_replication_lag(self):
"""Return False if the replication cluster is badly lagged."""
- slony_lagged = False
- if self.is_slony:
- # Check replication lag on every node just in case there are
- # disagreements.
- max_lag = timedelta(seconds=-1)
- for node in self.nodes:
- cur = node.con.cursor()
- cur.execute("""
- SELECT current_database(),
- max(st_lag_time) AS lag FROM _sl.sl_status
- """)
- dbname, lag = cur.fetchone()
- if lag > max_lag:
- max_lag = lag
- self.log.debug(
- "%s reports database lag of %s.", dbname, lag)
- if max_lag <= MAX_LAG:
- self.log.info("Slony cluster lag is ok (%s)", max_lag)
- slony_lagged = False
- else:
- self.log.fatal("Slony cluster lag is high (%s)", max_lag)
- slony_lagged = True
-
# Do something harmless to force changes to be streamed in case
# system is idle.
self.lpmain_master_node.con.cursor().execute(
@@ -322,7 +267,7 @@
self.log.debug(
"Streaming replication lag is not high (%s)", max_lag)
- return not (slony_lagged or streaming_lagged)
+ return not streaming_lagged
def check_can_sync(self):
"""Return True if a sync event is acknowledged by all nodes.
@@ -330,20 +275,6 @@
We only wait 30 seconds for the sync, because we require the
cluster to be quiescent.
"""
- slony_success = True
- if self.is_slony:
- slony_success = replication.helpers.sync(30, exit_on_fail=False)
- if slony_success:
- self.log.info(
- "Slony replication events are being propagated.")
- else:
- self.log.fatal(
- "Slony replication events are not being propagated.")
- self.log.fatal(
- "One or more slony replication daemons may be down.")
- self.log.fatal(
- "Bounce the slony replication daemons and check logs.")
-
# PG 9.1 streaming replication, or no replication.
streaming_success = streaming_sync(self.lpmain_master_node.con, 30)
if streaming_success:
@@ -351,7 +282,7 @@
else:
self.log.fatal("Streaming replicas not syncing.")
- return slony_success and streaming_success
+ return streaming_success
def report_patches(self):
"""Report what patches are due to be applied from this tree."""
=== modified file 'database/schema/upgrade.py'
--- database/schema/upgrade.py 2012-06-21 07:39:59 +0000
+++ database/schema/upgrade.py 2012-08-07 11:14:21 +0000
@@ -11,21 +11,17 @@
import _pythonpath
-from cStringIO import StringIO
import glob
from optparse import OptionParser
import os.path
import re
-from tempfile import NamedTemporaryFile
from textwrap import dedent
from bzrlib.branch import Branch
from bzrlib.errors import NotBranchError
-from lp.services.database.postgresql import fqn
from lp.services.database.sqlbase import (
connect,
- ISOLATION_LEVEL_AUTOCOMMIT,
sqlvalues,
)
from lp.services.scripts import (
@@ -34,7 +30,6 @@
logger_options,
)
from lp.services.utils import total_seconds
-import replication.helpers
SCHEMA_DIR = os.path.dirname(__file__)
@@ -44,16 +39,8 @@
con = connect()
patches = get_patchlist(con)
- if not options.ignore_slony and replication.helpers.slony_installed(con):
- con.close()
- if options.commit is False:
- parser.error("--dry-run does not make sense with replicated db")
- log.info("Applying patches to Slony-I environment.")
- apply_patches_replicated()
- con = connect()
- else:
- log.info("Applying patches to unreplicated environment.")
- apply_patches_normal(con)
+ log.info("Applying patches.")
+ apply_patches_normal(con)
report_patch_times(con, patches)
@@ -179,334 +166,6 @@
apply_comments(con)
-def apply_patches_replicated():
- """Update a Slony-I cluster."""
-
- # Get an autocommit connection. We use autocommit so we don't have to
- # worry about blocking locks needed by Slony-I.
- con = connect(isolation=ISOLATION_LEVEL_AUTOCOMMIT)
-
- # We use three slonik scripts to apply our DB patches.
- # The first script applies the DB patches to all nodes.
-
- # First make sure the cluster is synced.
- log.info("Waiting for cluster to sync, pre-update.")
- replication.helpers.sync(timeout=600)
-
- # Slonik script we are generating.
- outf = StringIO()
-
- # Start a transaction block.
- print >> outf, "try {"
-
- # All the SQL we need to run, combined into one file. This minimizes
- # Slony-I syncs and downtime.
- combined_sql = NamedTemporaryFile(prefix='dbupdate', suffix='.sql')
-
- def add_sql(sql):
- sql = sql.strip()
- if sql != '':
- assert sql.endswith(';'), "SQL not terminated with ';': %s" % sql
- print >> combined_sql, sql
- # Flush or we might lose statements from buffering.
- combined_sql.flush()
-
- # Apply trusted.sql
- add_sql(open(os.path.join(SCHEMA_DIR, 'trusted.sql'), 'r').read())
-
- # Prepare to repair the start timestamps in
- # LaunchpadDatabaseRevision.
- add_sql(FIX_PATCH_TIMES_PRE_SQL)
-
- patches = get_patchlist(con)
- for (major, minor, patch), patch_file in patches:
- add_sql(open(patch_file, 'r').read())
-
- # Trigger a failure if the patch neglected to update
- # LaunchpadDatabaseRevision.
- add_sql(
- "SELECT assert_patch_applied(%d, %d, %d);"
- % (major, minor, patch))
-
- # Fix the start timestamps in LaunchpadDatabaseRevision.
- add_sql(FIX_PATCH_TIMES_POST_SQL % sqlvalues(*get_bzr_details()))
-
- print >> outf, dedent("""\
- execute script (
- set id = @lpmain_set, event node = @master_node,
- filename='%s'
- );
- """ % combined_sql.name)
-
- # Close transaction block and abort on error.
- print >> outf, dedent("""\
- }
- on error {
- echo 'Failed! Slonik script aborting. Patches rolled back.';
- exit 1;
- }
- """)
-
- # Execute the script with slonik.
- log.info("slonik(1) schema upgrade script generated. Invoking.")
- if not replication.helpers.execute_slonik(outf.getvalue()):
- log.fatal("Aborting.")
- raise SystemExit(4)
- log.info("slonik(1) schema upgrade script completed.")
-
- # Wait for replication to sync.
- log.info("Waiting for patches to apply to slaves and cluster to sync.")
- replication.helpers.sync(timeout=0)
-
- # The db patches have now been applied to all nodes, and we are now
- # committed to completing the upgrade (!). If any of the later stages
- # fail, it will likely involve manual cleanup.
-
- # We now scan for new tables and add them to the lpmain
- # replication set using a second script. Note that upgrade.py only
- # deals with the lpmain replication set.
-
- # Detect new tables and sequences.
- # Everything else that isn't replicated should go in the lpmain
- # replication set.
- cur = con.cursor()
- unrepl_tabs, unrepl_seqs = replication.helpers.discover_unreplicated(cur)
-
- # But warn if we are going to replicate something not in the calculated
- # set, as *_SEED in replication.helpers needs to be updated. We don't want
- # abort unless absolutely necessary to avoid manual cleanup.
- lpmain_tabs, lpmain_seqs = replication.helpers.calculate_replication_set(
- cur, replication.helpers.LPMAIN_SEED)
-
- assumed_tabs = unrepl_tabs.difference(lpmain_tabs)
- assumed_seqs = unrepl_seqs.difference(lpmain_seqs)
- for obj in (assumed_tabs.union(assumed_seqs)):
- log.warn(
- "%s not in calculated lpmain replication set. "
- "Update *_SEED in replication/helpers.py" % obj)
-
- if unrepl_tabs or unrepl_seqs:
- # TODO: Or if the holding set already exists - catch an aborted run.
- log.info(
- "New stuff needs replicating: %s"
- % ', '.join(sorted(unrepl_tabs.union(unrepl_seqs))))
- # Create a new replication set to hold new tables and sequences
- # TODO: Only create set if it doesn't already exist.
- outf = StringIO()
- print >> outf, dedent("""\
- try {
- create set (
- id = @holding_set, origin = @master_node,
- comment = 'Temporary set to merge'
- );
- """)
-
- # Add the new tables and sequences to the holding set.
- cur.execute("""
- SELECT max(tab_id) FROM %s
- """ % fqn(replication.helpers.CLUSTER_NAMESPACE, 'sl_table'))
- next_id = cur.fetchone()[0] + 1
- for tab in unrepl_tabs:
- print >> outf, dedent("""\
- echo 'Adding %s to holding set for lpmain merge.';
- set add table (
- set id = @holding_set, origin = @master_node, id=%d,
- fully qualified name = '%s',
- comment = '%s'
- );
- """ % (tab, next_id, tab, tab))
- next_id += 1
- cur.execute("""
- SELECT max(seq_id) FROM %s
- """ % fqn(replication.helpers.CLUSTER_NAMESPACE, 'sl_sequence'))
- next_id = cur.fetchone()[0] + 1
- for seq in unrepl_seqs:
- print >> outf, dedent("""\
- echo 'Adding %s to holding set for lpmain merge.';
- set add sequence (
- set id = @holding_set, origin = @master_node, id=%d,
- fully qualified name = '%s',
- comment = '%s'
- );
- """ % (seq, next_id, seq, seq))
- next_id += 1
-
- print >> outf, dedent("""\
- } on error {
- echo 'Failed to create holding set! Aborting.';
- exit 1;
- }
- """)
-
- # Subscribe the holding set to all replicas.
- # TODO: Only subscribe the set if not already subscribed.
- # Close the transaction and sync. Important, or MERGE SET will fail!
- # Merge the sets.
- # Sync.
- # Drop the holding set.
- for slave_node in replication.helpers.get_slave_nodes(con):
- print >> outf, dedent("""\
- echo 'Subscribing holding set to @node%d_node.';
- subscribe set (
- id=@holding_set, provider=@master_node,
- receiver=@node%d_node, forward=yes);
- wait for event (
- origin=@master_node, confirmed=all,
- wait on=@master_node, timeout=0);
- echo 'Waiting for sync';
- sync (id=@master_node);
- wait for event (
- origin=@master_node, confirmed=ALL,
- wait on=@master_node, timeout=0);
- """ % (slave_node.node_id, slave_node.node_id))
-
- print >> outf, dedent("""\
- echo 'Merging holding set to lpmain';
- merge set (
- id=@lpmain_set, add id=@holding_set, origin=@master_node);
- """)
-
- # Execute the script and sync.
- log.info(
- "Generated slonik(1) script to replicate new objects. Invoking.")
- if not replication.helpers.execute_slonik(outf.getvalue()):
- log.fatal("Aborting.")
- log.info(
- "slonik(1) script to replicate new objects completed.")
- log.info("Waiting for sync.")
- replication.helpers.sync(timeout=0)
- else:
- log.info("No new tables or sequences to replicate.")
-
- # We also scan for tables and sequences we want to drop and do so using
- # a final slonik script. Instead of dropping tables in the DB patch,
- # we rename them into the ToDrop namespace.
- #
- # First, remove all todrop.* sequences from replication.
- cur.execute("""
- SELECT seq_nspname, seq_relname, seq_id from %s
- WHERE seq_nspname='todrop'
- """ % fqn(replication.helpers.CLUSTER_NAMESPACE, 'sl_sequence'))
- seqs_to_unreplicate = set(
- (fqn(nspname, relname), tab_id)
- for nspname, relname, tab_id in cur.fetchall())
- if seqs_to_unreplicate:
- log.info("Unreplicating sequences: %s" % ', '.join(
- name for name, id in seqs_to_unreplicate))
- # Generate a slonik script to remove sequences from the
- # replication set.
- sk = StringIO()
- print >> sk, "try {"
- for seq_name, seq_id in seqs_to_unreplicate:
- if seq_id is not None:
- print >> sk, dedent("""\
- echo 'Removing %s from replication';
- set drop sequence (origin=@master_node, id=%d);
- """ % (seq_name, seq_id))
- print >> sk, dedent("""\
- }
- on error {
- echo 'Failed to unreplicate sequences. Aborting.';
- exit 1;
- }
- """)
- log.info(
- "Generated slonik(1) script to unreplicate sequences. Invoking.")
- if not replication.helpers.execute_slonik(sk.getvalue()):
- log.fatal("Aborting.")
- log.info("slonik(1) script to drop sequences completed.")
-
- # Generate a slonik script to remove tables from the replication set,
- # and a DROP TABLE/DROP SEQUENCE sql script to run after.
- cur.execute("""
- SELECT nspname, relname, tab_id
- FROM pg_class
- JOIN pg_namespace ON relnamespace = pg_namespace.oid
- LEFT OUTER JOIN %s ON pg_class.oid = tab_reloid
- WHERE nspname='todrop' AND relkind='r'
- """ % fqn(replication.helpers.CLUSTER_NAMESPACE, 'sl_table'))
- tabs_to_drop = set(
- (fqn(nspname, relname), tab_id)
- for nspname, relname, tab_id in cur.fetchall())
- if tabs_to_drop:
- log.info("Dropping tables: %s" % ', '.join(
- name for name, id in tabs_to_drop))
- sk = StringIO()
- sql = NamedTemporaryFile(prefix="drop", suffix=".sql")
- print >> sk, "try {"
- for tab_name, tab_id in tabs_to_drop:
- if tab_id is not None:
- print >> sk, dedent("""\
- echo 'Removing %s from replication';
- set drop table (origin=@master_node, id=%d);
- """ % (tab_name, tab_id))
- print >> sql, "DROP TABLE %s;" % tab_name
- sql.flush()
- print >> sk, dedent("""\
- execute script (
- set id=@lpmain_set, event node=@master_node,
- filename='%s'
- );
- }
- on error {
- echo 'Failed to drop tables. Aborting.';
- exit 1;
- }
- """ % sql.name)
- log.info("Generated slonik(1) script to drop tables. Invoking.")
- if not replication.helpers.execute_slonik(sk.getvalue()):
- log.fatal("Aborting.")
- log.info("slonik(1) script to drop tables completed.")
- sql.close()
-
- # Now drop any remaining sequences. Most sequences will be dropped
- # implicitly with the table drop.
- cur.execute("""
- SELECT nspname, relname, seq_id
- FROM pg_class
- JOIN pg_namespace ON relnamespace = pg_namespace.oid
- LEFT OUTER JOIN %s ON pg_class.oid = seq_reloid
- WHERE nspname='todrop' AND relkind='S'
- """ % fqn(replication.helpers.CLUSTER_NAMESPACE, 'sl_sequence'))
- seqs_to_drop = set(
- (fqn(nspname, relname), tab_id)
- for nspname, relname, tab_id in cur.fetchall())
-
- if seqs_to_drop:
- log.info("Dropping sequences: %s" % ', '.join(
- name for name, id in seqs_to_drop))
- # Generate a slonik script to remove sequences from the
- # replication set, DROP SEQUENCE sql script to run after.
- sk = StringIO()
- sql = NamedTemporaryFile(prefix="drop", suffix=".sql")
- print >> sk, "try {"
- for seq_name, seq_id in seqs_to_drop:
- if seq_id is not None:
- print >> sk, dedent("""\
- echo 'Removing %s from replication';
- set drop sequence (origin=@master_node, id=%d);
- """ % (seq_name, seq_id))
- print >> sql, "DROP SEQUENCE %s;" % seq_name
- sql.flush()
- print >> sk, dedent("""\
- execute script (
- set id=@lpmain_set, event node=@master_node,
- filename='%s'
- );
- }
- on error {
- echo 'Failed to drop sequences. Aborting.';
- exit 1;
- }
- """ % sql.name)
- log.info("Generated slonik(1) script to drop sequences. Invoking.")
- if not replication.helpers.execute_slonik(sk.getvalue()):
- log.fatal("Aborting.")
- log.info("slonik(1) script to drop sequences completed.")
- log.info("Waiting for final sync.")
- replication.helpers.sync(timeout=0)
-
-
def get_patchlist(con):
"""Return a patches that need to be applied to the connected database
in [((major, minor, patch), patch_file)] format.
@@ -614,9 +273,6 @@
parser.add_option(
"--partial", dest="partial", default=False,
action="store_true", help="Commit after applying each patch")
- parser.add_option(
- "--ignore-slony", dest="ignore_slony", default=False,
- action="store_true", help="Ignore any Slony installations")
(options, args) = parser.parse_args()
if args:
Follow ups