← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~stub/launchpad/db-deploy into lp:launchpad

 

Stuart Bishop has proposed merging lp:~stub/launchpad/db-deploy into lp:launchpad.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #906222 in Launchpad itself: "More BAD_USERS creating false positives during fast downtime updates"
  https://bugs.launchpad.net/launchpad/+bug/906222
  Bug #911417 in Launchpad itself: "Staging db restore fails on make -C database/replication stagingsetup"
  https://bugs.launchpad.net/launchpad/+bug/911417
  Bug #1025396 in Launchpad itself: "deployment scripts need to cope with streaming replication"
  https://bugs.launchpad.net/launchpad/+bug/1025396
  Bug #1025399 in Launchpad itself: "full-update.py should wait for streaming slaves before completing"
  https://bugs.launchpad.net/launchpad/+bug/1025399

For more details, see:
https://code.launchpad.net/~stub/launchpad/db-deploy/+merge/118535

= Summary =

We no longer need to involve Slony in db deploys        

== Proposed fix ==

Amputation.

== Pre-implementation notes ==

== LOC Rationale ==

== Implementation details ==

== Tests ==

== Demo and Q/A ==


= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  database/schema/upgrade.py
  database/replication/Makefile
  database/schema/preflight.py
  database/schema/full-update.py

./database/schema/upgrade.py
      12: '_pythonpath' imported but unused
./database/replication/Makefile
      78: Line exceeds 80 characters.
./database/schema/preflight.py
      14: '_pythonpath' imported but unused
./database/schema/full-update.py
       7: '_pythonpath' imported but unused
-- 
https://code.launchpad.net/~stub/launchpad/db-deploy/+merge/118535
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~stub/launchpad/db-deploy into lp:launchpad.
=== modified file 'database/replication/Makefile'
--- database/replication/Makefile	2012-07-09 17:01:51 +0000
+++ database/replication/Makefile	2012-08-07 11:14:21 +0000
@@ -70,7 +70,7 @@
 	# Apply database patches.
 	@echo Running upgrade.py `date`.
 	LPCONFIG=${STAGING_CONFIG} ${SHHH} ../schema/upgrade.py \
-	    --ignore-slony --log-file=INFO:${STAGING_LOGDIR}/dbupgrade.log
+	    --log-file=INFO:${STAGING_LOGDIR}/dbupgrade.log
 	@echo Running security.py `date`
 	LPCONFIG=${STAGING_CONFIG} ${SHHH} ../schema/security.py \
 	    --log-file=INFO:${STAGING_LOGDIR}/dbupgrade.log
@@ -90,5 +90,5 @@
 	pg_restore --dbname=${DOGFOOD_DBNAME} --no-acl --no-owner \
 	    --use-list=${DUMPLIST} ${EXIT_ON_ERROR} ${DOGFOOD_DUMP}
 	rm ${DUMPLIST}
-	../schema/upgrade.py --ignore-slony -d ${DOGFOOD_DBNAME}
+	../schema/upgrade.py -d ${DOGFOOD_DBNAME}
 	../schema/security.py -d ${DOGFOOD_DBNAME}

=== removed file 'database/replication/generate_migration.py'
--- database/replication/generate_migration.py	2011-12-30 06:47:54 +0000
+++ database/replication/generate_migration.py	1970-01-01 00:00:00 +0000
@@ -1,232 +0,0 @@
-#!/usr/bin/python -S
-# Copyright 2011 Canonical Ltd.  This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Generate slonik scripts for Slony 1.2 to 2.0 migration.
-
-Remove this script after migration is complete.
-"""
-
-__metaclass__ = type
-__all__ = []
-
-import _pythonpath
-
-from optparse import OptionParser
-import os.path
-from textwrap import dedent
-
-from lp.services import scripts
-from lp.services.database.sqlbase import connect
-import replication.helpers
-from replication.helpers import (
-    LPMAIN_SET_ID,
-    LPMIRROR_SET_ID,
-    SSO_SET_ID,
-    get_all_cluster_nodes,
-    get_master_node,
-    )
-
-
-con = None
-options = None
-
-sets = {
-    LPMAIN_SET_ID: 'lpmain_set',
-    SSO_SET_ID: 'sso_set',
-    LPMIRROR_SET_ID: 'lpmirror_set',
-    }
-
-
-def outpath(filename):
-    return os.path.join(options.outdir, filename)
-
-
-def message(outf, msg):
-    assert "'" not in msg
-    print >> outf, "echo '%s';" % msg
-
-
-def generate_preamble():
-    outf = open(outpath('mig_preamble.sk'), 'w')
-    print >> outf, replication.helpers.preamble(con)
-
-    cur = con.cursor()
-
-    for set_id, set_name in list(sets.items()):
-        cur.execute(
-            "SELECT set_origin FROM _sl.sl_set WHERE set_id=%s", [set_id])
-        result = cur.fetchone()
-        if result:
-            origin = result[0]
-            print >> outf, "define %s_origin %d;" % (set_name, origin)
-        else:
-            del sets[set_id]  # For testing. Production will have 3 sets.
-    outf.close()
-
-
-def generate_uninstall():
-    outf = open(outpath('mig_uninstall.sk'), 'w')
-    print >> outf, "# Uninstall Slony-I 1.2 from all nodes"
-    print >> outf, "include <mig_preamble.sk>;"
-
-    nodes = get_all_cluster_nodes(con)
-
-    # Ensure everything is really, really synced since we will be
-    # resubscribing with 'omit copy'
-    for node in nodes:
-        print >> outf, dedent("""\
-                sync (id=%d);
-                wait for event (origin=%d, confirmed=all, wait on=%d);
-                """).strip() % (node.node_id, node.node_id, node.node_id)
-
-    for node in nodes:
-        message(outf, "Uninstall node %d" % node.node_id)
-        print >> outf, "uninstall node (id=%d);" % node.node_id
-    outf.close()
-
-
-def generate_sync():
-    outf = open(outpath('mig_sync.sk'), 'w')
-    message(outf, "Waiting for sync")
-    print >> outf, "sync (id=@master_node);"
-    print >> outf, dedent("""\
-        wait for event (
-                origin=@master_node, confirmed=all, wait on=@master_node);
-            """).strip()
-    outf.close()
-
-
-def generate_rebuild():
-    outf = open(outpath('mig_rebuild.sk'), 'w')
-    print >> outf, "# Rebuild the replication cluster with Slony-I 2.0"
-    print >> outf, "include <mig_preamble.sk>;"
-
-    nodes = get_all_cluster_nodes(con)
-    first_node = nodes[0]
-    remaining_nodes = nodes[1:]
-
-    # Initialize the cluster
-    message(outf, "Initializing cluster (node %d)" % first_node.node_id)
-    print >> outf, "init cluster (id=%d);" % first_node.node_id
-
-    # Create all the other nodes
-    for node in remaining_nodes:
-        message(outf, "Initializing node %d" % node.node_id)
-        print >> outf, "store node (id=%d, event node=%d);" % (
-            node.node_id, first_node.node_id)
-
-    # Create paths so they can communicate.
-    message(outf, "Storing %d paths" % pow(len(nodes), 2))
-    for client_node in nodes:
-        for server_node in nodes:
-            print >> outf, (
-                "store path (server=%d, client=%d, "
-                "conninfo=@node%d_node_conninfo);" % (
-                    server_node.node_id, client_node.node_id,
-                    server_node.node_id))
-
-    # sync to ensure replication is happening.
-    print >> outf, "include <mig_sync.sk>;"
-
-    # Create replication sets.
-    for set_id, set_name in sets.items():
-        generate_initialize_set(set_id, set_name, outf)
-    print >> outf, "include <mig_sync.sk>;"
-
-    # Subscribe slave nodes to replication sets.
-    for set_id, set_name in sets.items():
-        generate_subscribe_set(set_id, set_name, outf)
-
-    outf.close()
-
-
-def generate_initialize_set(set_id, set_name, outf):
-    origin_node = get_master_node(con, set_id)
-    message(outf, "Creating %s origin %d" % (set_name, origin_node.node_id))
-    print >> outf, "create set (id=%d, origin=@%s_origin, comment='%s');" % (
-        set_id, set_name, set_name)
-    # Need to connect to a node currently replicating the desired set.
-    origin_con = origin_node.connect()
-    cur = origin_con.cursor()
-    cur.execute("""
-        SELECT tab_id, tab_nspname, tab_relname, tab_comment
-        FROM _sl.sl_table WHERE tab_set=%s
-        """, (set_id,))
-    results = cur.fetchall()
-    message(outf, "Adding %d tables to %s" % (len(results), set_name))
-    for tab_id, tab_nspname, tab_relname, tab_comment in results:
-        if not tab_comment:
-            tab_comment = ''
-        print >> outf, dedent("""\
-                set add table (
-                    set id=@%s, origin=@%s_origin, id=%d,
-                    fully qualified name='%s.%s',
-                    comment='%s');
-                """).strip() % (
-                    set_name, set_name, tab_id,
-                    tab_nspname, tab_relname, tab_comment)
-    cur.execute("""
-        SELECT seq_id, seq_nspname, seq_relname, seq_comment
-        FROM _sl.sl_sequence WHERE seq_set=%s
-        """, (set_id,))
-    results = cur.fetchall()
-    message(outf, "Adding %d sequences to %s" % (len(results), set_name))
-    for seq_id, seq_nspname, seq_relname, seq_comment in results:
-        if not seq_comment:
-            seq_comment = ''
-        print >> outf, dedent("""\
-                set add sequence (
-                    set id=@%s, origin=@%s_origin, id=%d,
-                    fully qualified name='%s.%s',
-                    comment='%s');
-                """).strip() % (
-                    set_name, set_name, seq_id,
-                    seq_nspname, seq_relname, seq_comment)
-
-
-def generate_subscribe_set(set_id, set_name, outf):
-    cur = con.cursor()
-    cur.execute("""
-        SELECT sub_receiver FROM _sl.sl_subscribe
-        WHERE sub_set=%s and sub_active is true
-        """, (set_id,))
-    for receiver_id, in cur.fetchall():
-        message(outf, "Subscribing node %d to %s" % (receiver_id, set_name))
-        print >> outf, dedent("""\
-                subscribe set (
-                    id=%d, provider=@%s_origin, receiver=%d,
-                    forward=true, omit copy=true);
-                wait for event (
-                    origin=@%s_origin, confirmed=all, wait on=@%s_origin);
-                """).strip() % (
-                    set_id, set_name, receiver_id,
-                    set_name, set_name)
-        print >> outf, "include <mig_sync.sk>;"
-
-
-def main():
-    parser = OptionParser()
-    scripts.db_options(parser)
-    parser.add_option(
-        "-o", "--output-dir", dest='outdir', default=".",
-        help="Write slonik scripts to DIR", metavar="DIR")
-    global options
-    options, args = parser.parse_args()
-    if args:
-        parser.error("Too many arguments")
-    scripts.execute_zcml_for_scripts(use_web_security=False)
-
-    global con
-    con = connect()
-
-    generate_preamble()
-    generate_sync()
-    generate_uninstall()
-    generate_rebuild()
-
-    return 0
-
-
-if __name__ == '__main__':
-    raise SystemExit(main())

=== removed file 'database/replication/initialize.py'
--- database/replication/initialize.py	2011-12-30 06:47:54 +0000
+++ database/replication/initialize.py	1970-01-01 00:00:00 +0000
@@ -1,216 +0,0 @@
-#!/usr/bin/python -S
-#
-# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Initialize the cluster.
-
-This script is run once to convert a singledb Launchpad instance to
-a replicated setup.
-"""
-
-import _pythonpath
-
-from optparse import OptionParser
-import subprocess
-import sys
-
-import helpers
-
-from lp.services.config import config
-from lp.services.database.postgresql import (
-    all_sequences_in_schema,
-    all_tables_in_schema,
-    ConnectionString,
-    )
-from lp.services.database.sqlbase import (
-    connect,
-    ISOLATION_LEVEL_AUTOCOMMIT,
-    )
-from lp.services.scripts import (
-    db_options,
-    logger,
-    logger_options,
-    )
-
-
-__metaclass__ = type
-__all__ = []
-
-
-# Global logger, initialized in main().
-log = None
-
-# Parsed command line options, initialized in main().
-options = None
-
-# Shared database cursor to the master, initialized in main().
-cur = None
-
-
-def duplicate_schema():
-    """Duplicate the master schema into the slaves."""
-    log.info('Duplicating database schema')
-
-    master_cs = ConnectionString(config.database.rw_main_master)
-    slave1_cs = ConnectionString(config.database.rw_main_slave)
-
-    # We can't use pg_dump to replicate security as not all of the roles
-    # may exist in the slave databases' clusters yet.
-    cmd = "pg_dump -x -s %s | psql -q %s" % (
-        master_cs.asPGCommandLineArgs(), slave1_cs.asPGCommandLineArgs())
-    log.debug('Running %s' % cmd)
-    rv = subprocess.call(cmd, shell=True)
-    if rv != 0:
-        log.fatal("Schema duplication failed, pg_dump returned %d" % rv)
-        sys.exit(rv)
-
-    # Now setup security on the slaves and create any needed roles,
-    log.info('Setting up security on slave')
-    cmd = "../schema/security.py %s" % slave1_cs.asLPCommandLineArgs()
-    log.debug("Running %s" % cmd)
-    rv = subprocess.call(cmd.split())
-    if rv != 0:
-        print >> sys.stderr, "ERR: security setup failed, returning %d" % rv
-        sys.exit(rv)
-
-
-def initialize_cluster():
-    """Initialize the cluster."""
-    log.info('Initializing Slony-I cluster')
-    master_connection_string = ConnectionString(
-        config.database.rw_main_master)
-    master_connection_string.user = 'slony'
-    helpers.execute_slonik("""
-        node 1 admin conninfo = '%s';
-        try {
-            echo 'Initializing cluster and Master node.';
-            init cluster (id=1, comment='Master Node');
-            }
-        on success { echo 'Cluster initialized.'; }
-        on error { echo 'Cluster initialization failed.'; exit 1; }
-        """ % master_connection_string)
-
-
-def ensure_live():
-    log.info('Ensuring slon daemons are live and propagating events.')
-    # This will exit on failure.
-    helpers.sync(120)
-
-
-def create_replication_sets(lpmain_tables, lpmain_sequences):
-    """Create the replication sets."""
-    log.info('Creating Slony-I replication sets.')
-
-    script = ["try {"]
-
-    entry_id = 1
-
-    script.append("""
-        echo 'Creating LPMain replication set (@lpmain_set)';
-        create set (
-            id=@lpmain_set, origin=@master_node,
-            comment='Launchpad tables and sequences');
-        """)
-
-    script.append(
-        "echo 'Adding %d tables to replication set @lpmain_set';"
-        % len(lpmain_tables))
-    for table in sorted(lpmain_tables):
-        script.append("""
-            set add table (
-                set id=@lpmain_set,
-                origin=@master_node,
-                id=%(entry_id)d,
-                fully qualified name='%(table)s');
-            """ % vars())
-        entry_id += 1
-
-    entry_id = 1
-    script.append(
-        "echo 'Adding %d sequences to replication set @lpmain_set';"
-        % len(lpmain_sequences))
-    for sequence in sorted(lpmain_sequences):
-        script.append("""
-            set add sequence (
-                set id=@lpmain_set,
-                origin=@master_node,
-                id=%(entry_id)d,
-                fully qualified name='%(sequence)s');
-            """ % vars())
-        entry_id += 1
-
-    script.append("""
-        }
-        on error { echo 'Failed.'; exit 1; }
-        """)
-    helpers.execute_slonik('\n'.join(script), sync=600)
-
-    # Explode now if we have messed up.
-    helpers.validate_replication(cur)
-
-
-def main():
-    parser = OptionParser()
-    db_options(parser)
-    logger_options(parser)
-
-    parser.set_defaults(dbuser='slony')
-
-    global options
-    options, args = parser.parse_args()
-
-    global log
-    log = logger(options)
-
-    # Generate lists of sequences and tables for our replication sets.
-    log.debug("Connecting as %s" % options.dbuser)
-    con = connect()
-    con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
-    global cur
-    cur = con.cursor()
-    log.debug("Calculating lpmain replication set.")
-    lpmain_tables, lpmain_sequences = helpers.calculate_replication_set(
-        cur, helpers.LPMAIN_SEED)
-
-    # Sanity check these lists - we want all objects in the public
-    # schema to be in one and only one replication set.
-    log.debug("Performing sanity checks.")
-    fails = 0
-    for table in all_tables_in_schema(cur, 'public'):
-        times_seen = 0
-        for table_set in [lpmain_tables, helpers.IGNORED_TABLES]:
-            if table in table_set:
-                times_seen += 1
-        if times_seen == 0:
-            log.error("%s not in any replication set." % table)
-            fails += 1
-        if times_seen > 1:
-            log.error("%s is in multiple replication sets." % table)
-            fails += 1
-    for sequence in all_sequences_in_schema(cur, 'public'):
-        times_seen = 0
-        for sequence_set in [lpmain_sequences, helpers.IGNORED_SEQUENCES]:
-            if sequence in sequence_set:
-                times_seen += 1
-        if times_seen == 0:
-            log.error("%s not in any replication set." % sequence)
-            fails += 1
-        if times_seen > 1:
-            log.error("%s is in multiple replication sets." % sequence)
-            fails += 1
-    if fails > 0:
-        log.fatal("%d errors in replication set definitions." % fails)
-        sys.exit(1)
-
-    initialize_cluster()
-
-    ensure_live()
-
-    create_replication_sets(lpmain_tables, lpmain_sequences)
-
-    helpers.sync(0)
-
-
-if __name__ == '__main__':
-    sys.exit(main())

=== removed file 'database/replication/new-slave.py'
--- database/replication/new-slave.py	2012-06-29 08:40:05 +0000
+++ database/replication/new-slave.py	1970-01-01 00:00:00 +0000
@@ -1,261 +0,0 @@
-#!/usr/bin/python -S
-#
-# Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Bring a new slave online."""
-
-__metaclass__ = type
-__all__ = []
-
-import _pythonpath
-
-from optparse import OptionParser
-import subprocess
-import sys
-from textwrap import dedent
-import time
-
-import psycopg2
-
-from lp.services.database.postgresql import ConnectionString
-from lp.services.database.sqlbase import (
-    connect_string,
-    ISOLATION_LEVEL_AUTOCOMMIT,
-    )
-from lp.services.scripts import (
-    db_options,
-    logger,
-    logger_options,
-    )
-import replication.helpers
-from replication.helpers import LPMAIN_SET_ID
-
-
-def main():
-    parser = OptionParser(
-        "Usage: %prog [options] node_id connection_string")
-
-    db_options(parser)
-    logger_options(parser)
-
-    options, args = parser.parse_args()
-
-    log = logger(options, 'new-slave')
-
-    if len(args) != 2:
-        parser.error("Missing required arguments.")
-
-    node_id, raw_target_connection_string = args
-
-    # Confirm we can connect to the source database.
-    # Keep the connection as we need it later.
-    source_connection_string = ConnectionString(connect_string(user='slony'))
-    try:
-        log.debug(
-            "Opening source connection to '%s'" % source_connection_string)
-        source_connection = psycopg2.connect(str(source_connection_string))
-        source_connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
-    except psycopg2.Error as exception:
-        parser.error("Unable to connect as %s (%s)" % (
-            source_connection_string, str(exception).strip()))
-
-    # Confirm we are connected to a Slony-I node.
-    if not replication.helpers.slony_installed(source_connection):
-        parser.error(
-            "Database at %s is not a Slony-I node."
-            % source_connection_string)
-
-    # Sanity check the given node_id.
-    existing_nodes = replication.helpers.get_all_cluster_nodes(
-        source_connection)
-    try:
-        node_id = int(node_id)
-    except ValueError:
-        parser.error("node_id must be a positive integer.")
-    if node_id <= 0:
-        parser.error("node_id must be a positive integer.")
-
-    if node_id in [node.node_id for node in existing_nodes]:
-        parser.error("Node %d already exists in the cluster." % node_id)
-
-    # Get the connection string for masters.
-    lpmain_connection_string = get_master_connection_string(
-        source_connection, parser, LPMAIN_SET_ID) or source_connection_string
-
-    # Sanity check the target connection string.
-    target_connection_string = ConnectionString(raw_target_connection_string)
-    if target_connection_string.user is None:
-        target_connection_string.user = 'slony'
-
-    # Make sure we can connect as the required users to our target.
-    # Keep the connection as we need it.
-    try:
-        target_con = psycopg2.connect(str(target_connection_string))
-    except psycopg2.Error as exception:
-        parser.error("Failed to connect using '%s' (%s)" % (
-            target_connection_string, str(exception).strip()))
-
-    # Confirm the target database is sane. Check for common errors
-    # that people might make when bringing new replicas online at 4am.
-    cur = target_con.cursor()
-    cur.execute("SHOW lc_collate")
-    collation = cur.fetchone()[0]
-    if collation != "C":
-        parser.error(
-            "Database at %s has incorrect collation (%s)" % (
-                target_connection_string, collation))
-    cur.execute("SHOW server_encoding")
-    encoding = cur.fetchone()[0]
-    if encoding != "UTF8":
-        parser.error(
-            "Database at %s has incorrect encoding (%s)" % (
-                target_connection_string, encoding))
-    cur.execute("""
-        SELECT COUNT(*) FROM information_schema.tables
-        WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
-        """)
-    num_existing_objects = cur.fetchone()[0]
-    if num_existing_objects != 0:
-        parser.error(
-            "Database at %s is not empty." % target_connection_string)
-    target_con.rollback()
-
-    # Duplicate the full schema.
-    log.info("Duplicating full db schema from '%s' to '%s'" % (
-        lpmain_connection_string, target_connection_string))
-    cmd = "pg_dump --schema-only %s | psql -1 -q %s" % (
-        source_connection_string.asPGCommandLineArgs(),
-        target_connection_string.asPGCommandLineArgs())
-    if subprocess.call(cmd, shell=True) != 0:
-        log.error("Failed to duplicate database schema.")
-        return 1
-
-    # Trash the broken Slony tables we just duplicated.
-    log.debug("Removing slony cruft.")
-    cur = target_con.cursor()
-    cur.execute("DROP SCHEMA _sl CASCADE")
-    target_con.commit()
-    del target_con
-
-    # Get a list of existing set ids that can be subscribed too. This
-    # is all sets where the origin is the master_node. We
-    # don't allow other sets where the master is configured as a
-    # forwarding slave as we have to special case rebuilding the database
-    # schema, and we want to avoid cascading slave configurations anyway
-    # since we are running an antique Slony-I at the moment - keep it
-    # simple!
-    # We order the sets smallest to largest by number of tables.
-    # This should let us subscribe the quickest sets first for more
-    # immediate feedback.
-    source_connection.rollback()
-    master_node = replication.helpers.get_master_node(source_connection)
-    cur = source_connection.cursor()
-    cur.execute("""
-        SELECT set_id
-        FROM _sl.sl_set, (
-            SELECT tab_set, count(*) AS tab_count
-            FROM _sl.sl_table GROUP BY tab_set
-            ) AS TableCounts
-        WHERE
-            set_origin=%d
-            AND tab_set = set_id
-        ORDER BY tab_count
-        """
-        % (master_node.node_id,))
-    set_ids = [set_id for set_id, in cur.fetchall()]
-    log.debug("Discovered set ids %s" % repr(list(set_ids)))
-
-    # Generate and run a slonik(1) script to initialize the new node
-    # and subscribe it to our replication sets.
-    comment = 'New node created %s' % time.ctime()
-    script = dedent("""\
-        define new_node %d;
-        define new_node_conninfo '%s';
-        node @new_node admin conninfo = @new_node_conninfo;
-
-        echo 'Initializing new node.';
-        try {
-            store node (id=@new_node, comment='%s', event node=@master_node);
-            echo 'Creating new node paths.';
-        """ % (node_id, target_connection_string, comment))
-
-    for node in existing_nodes:
-        script += dedent("""\
-            store path (
-                server=@%(nickname)s, client=@new_node,
-                conninfo=@%(nickname)s_conninfo);
-            store path (
-                server=@new_node, client=@%(nickname)s,
-                conninfo=@new_node_conninfo);
-            """ % {'nickname': node.nickname})
-
-    script += dedent("""\
-        } on error { echo 'Failed.'; exit 1; }
-
-        echo 'You may need to restart the Slony daemons now. If the first';
-        echo 'of the following syncs passes then there is no need.';
-        """)
-
-    full_sync = []
-    sync_nicknames = [node.nickname for node in existing_nodes] + ['new_node']
-    for nickname in sync_nicknames:
-        full_sync.append(dedent("""\
-            echo 'Waiting for %(nickname)s sync.';
-            sync (id=@%(nickname)s);
-            wait for event (
-                origin = @%(nickname)s, confirmed=ALL,
-                wait on = @%(nickname)s, timeout=0);
-            echo 'Ok. Replication syncing fine with new node.';
-            """ % {'nickname': nickname}))
-    full_sync = '\n'.join(full_sync)
-    script += full_sync
-
-    for set_id in set_ids:
-        script += dedent("""\
-        echo 'Subscribing new node to set %d.';
-        subscribe set (
-            id=%d, provider=@master_node, receiver=@new_node, forward=yes);
-        echo 'Waiting for subscribe to start processing.';
-        wait for event (
-            origin = @master_node, confirmed = ALL,
-            wait on = @master_node, timeout = 0);
-        """ % (set_id, set_id))
-
-    replication.helpers.execute_slonik(script)
-
-    replication.helpers.validate_replication(source_connection.cursor())
-
-    return 0
-
-
-def get_master_connection_string(con, parser, set_id):
-    """Return the connection string to the origin for the replication set.
-    """
-    cur = con.cursor()
-    cur.execute("""
-        SELECT pa_conninfo FROM _sl.sl_set, _sl.sl_path
-        WHERE set_origin = pa_server AND set_id = %d
-        LIMIT 1
-        """ % set_id)
-    row = cur.fetchone()
-    if row is None:
-        # If we have no paths stored, there is only a single node in the
-        # cluster.
-        return None
-    else:
-        connection_string = ConnectionString(row[0])
-
-    # Confirm we can connect from here.
-    try:
-        # Test connection only.  We're not going to use it.
-        psycopg2.connect(str(connection_string))
-    except psycopg2.Error as exception:
-        parser.error("Failed to connect to using '%s' (%s)" % (
-            connection_string, str(exception).strip()))
-
-    return connection_string
-
-
-if __name__ == '__main__':
-    sys.exit(main())

=== removed file 'database/replication/repair-restored-db.py'
--- database/replication/repair-restored-db.py	2011-12-30 06:47:54 +0000
+++ database/replication/repair-restored-db.py	1970-01-01 00:00:00 +0000
@@ -1,134 +0,0 @@
-#!/usr/bin/python -S
-#
-# Copyright 2009 Canonical Ltd.  This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Remove the broken Slony-I information from database populated by
-pg_restore(1).
-
-When you dump a database using pg_dump(1), the Slony-I install is dumped
-too. After restoring this dump, you have a non-functional Slony-I
-installation. If you are recovering the database for disaster recovery
-purposes, you can keep the current install by repairing it using the
-slonik(1) command REPAIR CONFIG. In other cases, you need to remove
-Slony-I from the database (eg. building a staging database, we need
-to install replication fresh.). This script does this procedure.
-"""
-
-__metaclass__ = type
-__all__ = []
-
-import _pythonpath
-
-from optparse import OptionParser
-import sys
-
-import psycopg2
-
-from lp.services.config import config
-from lp.services.database.postgresql import ConnectionString
-from lp.services.database.sqlbase import (
-    connect,
-    ISOLATION_LEVEL_AUTOCOMMIT,
-    quote,
-    )
-from lp.services.scripts import (
-    db_options,
-    logger,
-    logger_options,
-    )
-import replication.helpers
-
-
-def main():
-    parser = OptionParser()
-    db_options(parser)
-    logger_options(parser)
-
-    parser.set_defaults(dbuser='slony')
-
-    options, args = parser.parse_args()
-
-    log = logger(options)
-
-    con = connect(isolation=ISOLATION_LEVEL_AUTOCOMMIT)
-
-    if not replication.helpers.slony_installed(con):
-        log.info("Slony-I not installed. Nothing to do.")
-        return 0
-
-    if not repair_with_slonik(log, options, con):
-        repair_with_drop_schema(log, con)
-
-    return 0
-
-
-def repair_with_slonik(log, options, con):
-    """Attempt to uninstall Slony-I via 'UNINSTALL NODE' per best practice.
-
-    Returns True on success, False if unable to do so for any reason.
-    """
-    cur = con.cursor()
-
-    # Determine the node id the database thinks it is.
-    try:
-        cmd = "SELECT %s.getlocalnodeid(%s)" % (
-            replication.helpers.CLUSTER_NAMESPACE,
-            quote(replication.helpers.CLUSTER_NAMESPACE))
-        cur.execute(cmd)
-        node_id = cur.fetchone()[0]
-        log.debug("Node Id is %d" % node_id)
-
-        # Get a list of set ids in the database.
-        cur.execute(
-            "SELECT DISTINCT set_id FROM %s.sl_set"
-            % replication.helpers.CLUSTER_NAMESPACE)
-        set_ids = set(row[0] for row in cur.fetchall())
-        log.debug("Set Ids are %s" % repr(set_ids))
-
-    except psycopg2.InternalError:
-        # Not enough information to determine node id. Possibly
-        # this is an empty database.
-        log.debug('Broken or no Slony-I install.')
-        return False
-
-    connection_string = ConnectionString(config.database.rw_main_master)
-    if options.dbname:
-        connection_string.dbname = options.dbname
-    if options.dbuser:
-        connection_string.user = options.dbuser
-    if options.dbhost:
-        connection_string.host = options.dbhost
-
-    script = [
-        "cluster name = %s;" % replication.helpers.CLUSTERNAME,
-        "node %d admin conninfo = '%s';" % (node_id, connection_string),
-        ]
-    for set_id in set_ids:
-        script.append(
-            "repair config (set id=%d, event node=%d, execute only on=%d);"
-            % (set_id, node_id, node_id))
-    script.append("uninstall node (id=%d);" % node_id)
-    for line in script:
-        log.debug(line)
-    script = '\n'.join(script)
-
-    return replication.helpers.execute_slonik(
-        script, auto_preamble=False, exit_on_fail=False)
-
-
-def repair_with_drop_schema(log, con):
-    """
-    Just drop the _sl schema as it is 'good enough' with Slony-I 1.2.
-
-    This mechanism fails with Slony added primary keys, but we don't
-    do that.
-    """
-    log.info('Fallback mode - dropping _sl schema.')
-    cur = con.cursor()
-    cur.execute("DROP SCHEMA _sl CASCADE")
-    return True
-
-
-if __name__ == '__main__':
-    sys.exit(main())

=== removed file 'database/replication/report.py'
--- database/replication/report.py	2012-01-06 11:08:30 +0000
+++ database/replication/report.py	1970-01-01 00:00:00 +0000
@@ -1,288 +0,0 @@
-#!/usr/bin/python -S
-#
-# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Generate a report on the replication setup.
-
-This report spits out whatever we consider useful for checking up on and
-diagnosing replication. This report will grow over time, and maybe some
-bits of this will move to separate monitoring systems or reports.
-
-See the Slony-I documentation for more discussion on the data presented
-by this report.
-"""
-
-__metaclass__ = type
-__all__ = []
-
-import _pythonpath
-
-from cgi import escape as html_escape
-from cStringIO import StringIO
-from optparse import OptionParser
-import sys
-
-from lp.services.database.sqlbase import (
-    connect,
-    quote_identifier,
-    sqlvalues,
-    )
-from lp.services.scripts import db_options
-import replication.helpers
-
-
-class Table:
-    """Representation of a table.
-
-    :ivar labels: List of labels to render as the table's first row.
-    :ivar rows: List of rows, each being a list of strings.
-    """
-    def __init__(self, labels=None):
-        if labels is None:
-            self.labels = []
-        else:
-            self.labels = labels[:]
-        self.rows = []
-
-
-class HtmlReport:
-
-    def alert(self, text):
-        """Return text marked up to be noticed."""
-        return '<span style="alert">%s</span>' % html_escape(text)
-
-    def table(self, table):
-        """Return the rendered table."""
-        out = StringIO()
-        print >> out, "<table>"
-        if table.labels:
-            print >> out, "<tr>"
-            for label in table.labels:
-                print >> out, "<th>%s</th>" % html_escape(unicode(label))
-            print >> out, "</tr>"
-
-        for row in table.rows:
-            print >> out, "<tr>"
-            for cell in row:
-                print >> out, "<td>%s</td>" % html_escape(unicode(cell))
-            print >> out, "</tr>"
-
-        print >> out, "</table>"
-
-        return out.getvalue()
-
-
-class TextReport:
-    def alert(self, text):
-        return text
-
-    def table(self, table):
-        max_col_widths = []
-        for label in table.labels:
-            max_col_widths.append(len(label))
-        for row in table.rows:
-            for col_idx, col in enumerate(row):
-                max_col_widths[col_idx] = max(
-                    len(str(col)), max_col_widths[col_idx])
-
-        out = StringIO()
-        for label_idx in range(0, len(table.labels)):
-            print >> out, table.labels[label_idx].ljust(
-                max_col_widths[label_idx]),
-        print >> out
-        for width in max_col_widths:
-            print >> out, '=' * width,
-        print >> out
-        for row in table.rows:
-            row = list(row)
-            for col_idx in range(0, len(row)):
-                print >> out, str(
-                    row[col_idx]).ljust(max_col_widths[col_idx]),
-            print >> out
-        print >> out
-
-        return out.getvalue()
-
-
-def node_overview_report(cur, options):
-    """Dumps the sl_node table in a human readable format.
-
-    This report tells us which nodes are active and which are inactive.
-    """
-    report = options.mode()
-    table = Table(["Node", "Comment", "Active"])
-
-    cur.execute("""
-        SELECT no_id, no_comment, no_active
-        FROM sl_node
-        ORDER BY no_comment
-        """)
-    for node_id, node_comment, node_active in cur.fetchall():
-        if node_active:
-            node_active_text = 'Active'
-        else:
-            node_active_text = report.alert('Inactive')
-        table.rows.append([
-            'Node %d' % node_id, node_comment, node_active_text])
-
-    return report.table(table)
-
-
-def paths_report(cur, options):
-    """Dumps the sl_paths table in a human readable format.
-
-    This report describes how nodes will attempt to connect to each other
-    if they need to, allowing you to sanity check the settings and pick up
-    obvious misconfigurations that would stop Slony daemons from being able
-    to connect to one or more nodes, blocking replication.
-    """
-    report = options.mode()
-    table = Table(["From client node", "To server node", "Via connection"])
-
-    cur.execute("""
-        SELECT pa_client, pa_server, pa_conninfo
-        FROM sl_path
-        ORDER BY pa_client, pa_server
-        """)
-    for row in cur.fetchall():
-        table.rows.append(row)
-
-    return report.table(table)
-
-
-def listen_report(cur, options):
-    """Dumps the sl_listen table in a human readable format.
-
-    This report shows you the tree of which nodes a node needs to check
-    for events on.
-    """
-    report = options.mode()
-    table = Table(["Node", "Listens To", "Via"])
-
-    cur.execute("""
-        SELECT li_receiver, li_origin, li_provider
-        FROM sl_listen
-        ORDER BY li_receiver, li_origin, li_provider
-        """)
-    for row in cur.fetchall():
-        table.rows.append(['Node %s' % node for node in row])
-    return report.table(table)
-
-
-def subscribe_report(cur, options):
-    """Dumps the sl_subscribe table in a human readable format.
-
-    This report shows the subscription tree - which nodes provide
-    a replication set to which subscriber.
-    """
-
-    report = options.mode()
-    table = Table([
-        "Set", "Is Provided By", "Is Received By",
-        "Is Forwardable", "Is Active"])
-    cur.execute("""
-        SELECT sub_set, sub_provider, sub_receiver, sub_forward, sub_active
-        FROM sl_subscribe ORDER BY sub_set, sub_provider, sub_receiver
-        """)
-    for set_, provider, receiver, forward, active in cur.fetchall():
-        if active:
-            active_text = 'Active'
-        else:
-            active_text = report.alert('Inactive')
-        table.rows.append([
-            "Set %d" % set_, "Node %d" % provider, "Node %d" % receiver,
-            str(forward), active_text])
-    return report.table(table)
-
-
-def tables_report(cur, options):
-    """Dumps the sl_table table in a human readable format.
-
-    This report shows which tables are being replicated and in which
-    replication set. It also importantly shows the internal Slony id used
-    for a table, which is needed for slonik scripts as slonik is incapable
-    of doing the tablename -> Slony id mapping itself.
-    """
-    report = options.mode()
-    table = Table(["Set", "Schema", "Table", "Table Id"])
-    cur.execute("""
-        SELECT tab_set, nspname, relname, tab_id, tab_idxname, tab_comment
-        FROM sl_table, pg_class, pg_namespace
-        WHERE tab_reloid = pg_class.oid AND relnamespace = pg_namespace.oid
-        ORDER BY tab_set, nspname, relname
-        """)
-    for set_, namespace, tablename, table_id, key, comment in cur.fetchall():
-        table.rows.append([
-            "Set %d" % set_, namespace, tablename, str(table_id)])
-    return report.table(table)
-
-
-def sequences_report(cur, options):
-    """Dumps the sl_sequences table in a human readable format.
-
-    This report shows which sequences are being replicated and in which
-    replication set. It also importantly shows the internal Slony id used
-    for a sequence, which is needed for slonik scripts as slonik is incapable
-    of doing the tablename -> Slony id mapping itself.
-    """
-    report = options.mode()
-    table = Table(["Set", "Schema", "Sequence", "Sequence Id"])
-    cur.execute("""
-        SELECT seq_set, nspname, relname, seq_id, seq_comment
-        FROM sl_sequence, pg_class, pg_namespace
-        WHERE seq_reloid = pg_class.oid AND relnamespace = pg_namespace.oid
-        ORDER BY seq_set, nspname, relname
-        """)
-    for set_, namespace, tablename, table_id, comment in cur.fetchall():
-        table.rows.append([
-            "Set %d" % set_, namespace, tablename, str(table_id)])
-    return report.table(table)
-
-
-def main():
-    parser = OptionParser()
-
-    parser.add_option(
-        "-f", "--format", dest="mode", default="text",
-        choices=['text', 'html'],
-        help="Output format MODE", metavar="MODE")
-    db_options(parser)
-
-    options, args = parser.parse_args()
-
-    if options.mode == "text":
-        options.mode = TextReport
-    elif options.mode == "html":
-        options.mode = HtmlReport
-    else:
-        assert False, "Unknown mode %s" % options.mode
-
-    con = connect()
-    cur = con.cursor()
-
-    cur.execute(
-            "SELECT TRUE FROM pg_namespace WHERE nspname=%s"
-            % sqlvalues(replication.helpers.CLUSTER_NAMESPACE))
-
-    if cur.fetchone() is None:
-        parser.error(
-                "No Slony-I cluster called %s in that database"
-                % replication.helpers.CLUSTERNAME)
-        return 1
-
-    # Set our search path to the schema of the cluster we care about.
-    cur.execute(
-            "SET search_path TO %s, public"
-            % quote_identifier(replication.helpers.CLUSTER_NAMESPACE))
-
-    print node_overview_report(cur, options)
-    print paths_report(cur, options)
-    print listen_report(cur, options)
-    print subscribe_report(cur, options)
-    print tables_report(cur, options)
-    print sequences_report(cur, options)
-
-
-if __name__ == '__main__':
-    sys.exit(main())

=== removed file 'database/replication/slon_ctl.py'
--- database/replication/slon_ctl.py	2011-12-30 06:47:54 +0000
+++ database/replication/slon_ctl.py	1970-01-01 00:00:00 +0000
@@ -1,161 +0,0 @@
-#!/usr/bin/python -S
-#
-# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Startup and shutdown slon processes.
-
-On production and staging we probably want to use the standard
-/etc/init.d/slony1 script instead of this tool.
-"""
-
-import _pythonpath
-
-from optparse import OptionParser
-import os.path
-import subprocess
-import sys
-
-from lp.services.config import config
-from lp.services.database.sqlbase import connect
-from lp.services.scripts import (
-    logger,
-    logger_options,
-    )
-import replication.helpers
-
-
-__metaclass__ = type
-__all__ = []
-
-
-def main():
-    parser = OptionParser(
-        "Usage: %prog [options] "
-        "[start [nickname connection_string] | stop [nickname]]")
-    parser.add_option(
-        '-l', '--lag', default=None, dest="lag", metavar='PGINTERVAL',
-        help="Lag events by PGINTERVAL, such as '10 seconds' or '2 minutes'")
-    logger_options(parser)
-    options, args = parser.parse_args()
-
-    if len(args) == 0:
-        parser.error("No command given.")
-
-    command = args[0]
-
-    if command not in ['start', 'stop']:
-        parser.error("Unknown command %s." % command)
-
-    if len(args) == 1:
-        explicit = None
-
-    elif command == 'start':
-        if len(args) == 2:
-            parser.error(
-                "nickname and connection_string required for %s command"
-                % command)
-        elif len(args) == 3:
-            explicit = replication.helpers.Node(
-                None, args[1], args[2], None)
-        else:
-            parser.error("Too many arguments.")
-
-    elif command == 'stop':
-        if len(args) in (2, 3):
-            explicit = replication.helpers.Node(
-                None, args[1], None, None)
-        else:
-            parser.error("Too many arguments.")
-
-    else:
-        parser.error("Unknown command %s." % command)
-
-    log = logger(options)
-
-    if explicit is not None:
-        nodes = [explicit]
-    else:
-        nodes = replication.helpers.get_all_cluster_nodes(
-            connect(user='slony'))
-
-    if command == 'start':
-        return start(log, nodes, options.lag)
-    else:
-        return stop(log, nodes)
-
-
-def get_pidfile(nickname):
-    return os.path.join(
-        config.canonical.pid_dir, 'lpslon_%s_%s.pid' % (
-        nickname, config.instance_name))
-
-
-def get_logfile(nickname):
-    logdir = config.database.replication_logdir
-    if not os.path.isabs(logdir):
-        logdir = os.path.normpath(os.path.join(config.root, logdir))
-    return os.path.join(
-        logdir, 'lpslon_%s_%s.log' % (nickname, config.instance_name))
-
-
-def start(log, nodes, lag=None):
-    for node in nodes:
-        pidfile = get_pidfile(node.nickname)
-        logfile = get_logfile(node.nickname)
-
-        log.info("Starting %s slon daemon." % node.nickname)
-        log.debug("Logging to %s" % logfile)
-        log.debug("PID file %s" % pidfile)
-        # Hard code suitable command line arguments for development.
-        slon_args = "-d 2 -s 500 -t 2500"
-        if lag is not None:
-            slon_args = "%s -l '%s'" % (slon_args, lag)
-        cmd = [
-            "/sbin/start-stop-daemon",
-            "--start",
-            "--background",
-            "--pidfile", pidfile,
-            "--oknodo",
-            "--exec", "/usr/bin/slon",
-            "--startas", "/bin/sh",
-            "--", "-c",
-            "slon -p %s %s %s '%s' >> %s" % (
-                pidfile, slon_args, replication.helpers.CLUSTERNAME,
-                node.connection_string, logfile)]
-        log.debug("Running %s" % repr(cmd))
-        return_code = subprocess.call(cmd)
-        if return_code != 0:
-            log.fatal("Failed. Return code %s" % return_code)
-            return return_code
-
-    return 0
-
-
-def stop(log, nodes):
-    for node in nodes:
-        pidfile = get_pidfile(node.nickname)
-
-        if not os.path.exists(pidfile):
-            log.info(
-                "%s slon daemon not running. Doing nothing."
-                % node.nickname)
-            continue
-        log.info("Stopping %s slon daemon." % node.nickname)
-        log.debug("PID file %s" % pidfile)
-        cmd = [
-            "/sbin/start-stop-daemon",
-            "--stop",
-            "--pidfile", pidfile,
-            "--oknodo"]
-        log.debug("Running %s" % repr(cmd))
-        return_code = subprocess.call(cmd)
-        if return_code != 0:
-            log.fatal("Failed. Return code %s" % return_code)
-            return return_code
-
-    return 0
-
-
-if __name__ == '__main__':
-    sys.exit(main())

=== modified file 'database/schema/full-update.py'
--- database/schema/full-update.py	2012-07-19 15:05:21 +0000
+++ database/schema/full-update.py	2012-08-07 11:14:21 +0000
@@ -57,7 +57,6 @@
     # Fake expected command line arguments and global log
     options.commit = True
     options.partial = False
-    options.ignore_slony = False
     upgrade.options = options
     upgrade.log = log
     # Invoke the database schema upgrade process.

=== modified file 'database/schema/preflight.py'
--- database/schema/preflight.py	2012-07-23 10:13:24 +0000
+++ database/schema/preflight.py	2012-08-07 11:14:21 +0000
@@ -28,7 +28,6 @@
     logger,
     logger_options,
     )
-import replication.helpers
 from replication.helpers import Node
 import upgrade
 
@@ -73,42 +72,11 @@
         master_con = connect(isolation=ISOLATION_LEVEL_AUTOCOMMIT)
 
         self.log = log
-        self.is_slony = replication.helpers.slony_installed(master_con)
-        if self.is_slony:
-            self.nodes = set(
-                replication.helpers.get_all_cluster_nodes(master_con))
-            for node in self.nodes:
-                node.con = node.connect(ISOLATION_LEVEL_AUTOCOMMIT)
-
-            # Create a list of nodes subscribed to the replicated sets we
-            # are modifying.
-            cur = master_con.cursor()
-            cur.execute("""
-                WITH subscriptions AS (
-                    SELECT *
-                    FROM _sl.sl_subscribe
-                    WHERE sub_set = 1 AND sub_active IS TRUE)
-                SELECT sub_provider FROM subscriptions
-                UNION
-                SELECT sub_receiver FROM subscriptions
-                """)
-            lpmain_node_ids = set(row[0] for row in cur.fetchall())
-            self.lpmain_nodes = set(
-                node for node in self.nodes
-                if node.node_id in lpmain_node_ids)
-
-            # Store a reference to the lpmain origin.
-            lpmain_master_node_id = replication.helpers.get_master_node(
-                master_con, 1).node_id
-            self.lpmain_master_node = [
-                node for node in self.lpmain_nodes
-                    if node.node_id == lpmain_master_node_id][0]
-        else:
-            node = Node(None, None, None, True)
-            node.con = master_con
-            self.nodes = set([node])
-            self.lpmain_nodes = self.nodes
-            self.lpmain_master_node = node
+        node = Node(None, None, None, True)
+        node.con = master_con
+        self.nodes = set([node])
+        self.lpmain_nodes = self.nodes
+        self.lpmain_master_node = node
 
         # Add streaming replication standbys, which unfortunately cannot be
         # detected reliably and has to be passed in via the command line.
@@ -258,29 +226,6 @@
 
     def check_replication_lag(self):
         """Return False if the replication cluster is badly lagged."""
-        slony_lagged = False
-        if self.is_slony:
-            # Check replication lag on every node just in case there are
-            # disagreements.
-            max_lag = timedelta(seconds=-1)
-            for node in self.nodes:
-                cur = node.con.cursor()
-                cur.execute("""
-                    SELECT current_database(),
-                    max(st_lag_time) AS lag FROM _sl.sl_status
-                """)
-                dbname, lag = cur.fetchone()
-                if lag > max_lag:
-                    max_lag = lag
-                self.log.debug(
-                    "%s reports database lag of %s.", dbname, lag)
-            if max_lag <= MAX_LAG:
-                self.log.info("Slony cluster lag is ok (%s)", max_lag)
-                slony_lagged = False
-            else:
-                self.log.fatal("Slony cluster lag is high (%s)", max_lag)
-                slony_lagged = True
-
         # Do something harmless to force changes to be streamed in case
         # system is idle.
         self.lpmain_master_node.con.cursor().execute(
@@ -322,7 +267,7 @@
             self.log.debug(
                 "Streaming replication lag is not high (%s)", max_lag)
 
-        return not (slony_lagged or streaming_lagged)
+        return not streaming_lagged
 
     def check_can_sync(self):
         """Return True if a sync event is acknowledged by all nodes.
@@ -330,20 +275,6 @@
         We only wait 30 seconds for the sync, because we require the
         cluster to be quiescent.
         """
-        slony_success = True
-        if self.is_slony:
-            slony_success = replication.helpers.sync(30, exit_on_fail=False)
-            if slony_success:
-                self.log.info(
-                    "Slony replication events are being propagated.")
-            else:
-                self.log.fatal(
-                    "Slony replication events are not being propagated.")
-                self.log.fatal(
-                    "One or more slony replication daemons may be down.")
-                self.log.fatal(
-                    "Bounce the slony replication daemons and check logs.")
-
         # PG 9.1 streaming replication, or no replication.
         streaming_success = streaming_sync(self.lpmain_master_node.con, 30)
         if streaming_success:
@@ -351,7 +282,7 @@
         else:
             self.log.fatal("Streaming replicas not syncing.")
 
-        return slony_success and streaming_success
+        return streaming_success
 
     def report_patches(self):
         """Report what patches are due to be applied from this tree."""

=== modified file 'database/schema/upgrade.py'
--- database/schema/upgrade.py	2012-06-21 07:39:59 +0000
+++ database/schema/upgrade.py	2012-08-07 11:14:21 +0000
@@ -11,21 +11,17 @@
 
 import _pythonpath
 
-from cStringIO import StringIO
 import glob
 from optparse import OptionParser
 import os.path
 import re
-from tempfile import NamedTemporaryFile
 from textwrap import dedent
 
 from bzrlib.branch import Branch
 from bzrlib.errors import NotBranchError
 
-from lp.services.database.postgresql import fqn
 from lp.services.database.sqlbase import (
     connect,
-    ISOLATION_LEVEL_AUTOCOMMIT,
     sqlvalues,
     )
 from lp.services.scripts import (
@@ -34,7 +30,6 @@
     logger_options,
     )
 from lp.services.utils import total_seconds
-import replication.helpers
 
 
 SCHEMA_DIR = os.path.dirname(__file__)
@@ -44,16 +39,8 @@
     con = connect()
     patches = get_patchlist(con)
 
-    if not options.ignore_slony and replication.helpers.slony_installed(con):
-        con.close()
-        if options.commit is False:
-            parser.error("--dry-run does not make sense with replicated db")
-        log.info("Applying patches to Slony-I environment.")
-        apply_patches_replicated()
-        con = connect()
-    else:
-        log.info("Applying patches to unreplicated environment.")
-        apply_patches_normal(con)
+    log.info("Applying patches.")
+    apply_patches_normal(con)
 
     report_patch_times(con, patches)
 
@@ -179,334 +166,6 @@
     apply_comments(con)
 
 
-def apply_patches_replicated():
-    """Update a Slony-I cluster."""
-
-    # Get an autocommit connection. We use autocommit so we don't have to
-    # worry about blocking locks needed by Slony-I.
-    con = connect(isolation=ISOLATION_LEVEL_AUTOCOMMIT)
-
-    # We use three slonik scripts to apply our DB patches.
-    # The first script applies the DB patches to all nodes.
-
-    # First make sure the cluster is synced.
-    log.info("Waiting for cluster to sync, pre-update.")
-    replication.helpers.sync(timeout=600)
-
-    # Slonik script we are generating.
-    outf = StringIO()
-
-    # Start a transaction block.
-    print >> outf, "try {"
-
-    # All the SQL we need to run, combined into one file. This minimizes
-    # Slony-I syncs and downtime.
-    combined_sql = NamedTemporaryFile(prefix='dbupdate', suffix='.sql')
-
-    def add_sql(sql):
-        sql = sql.strip()
-        if sql != '':
-            assert sql.endswith(';'), "SQL not terminated with ';': %s" % sql
-            print >> combined_sql, sql
-            # Flush or we might lose statements from buffering.
-            combined_sql.flush()
-
-    # Apply trusted.sql
-    add_sql(open(os.path.join(SCHEMA_DIR, 'trusted.sql'), 'r').read())
-
-    # Prepare to repair the start timestamps in
-    # LaunchpadDatabaseRevision.
-    add_sql(FIX_PATCH_TIMES_PRE_SQL)
-
-    patches = get_patchlist(con)
-    for (major, minor, patch), patch_file in patches:
-        add_sql(open(patch_file, 'r').read())
-
-        # Trigger a failure if the patch neglected to update
-        # LaunchpadDatabaseRevision.
-        add_sql(
-            "SELECT assert_patch_applied(%d, %d, %d);"
-            % (major, minor, patch))
-
-    # Fix the start timestamps in LaunchpadDatabaseRevision.
-    add_sql(FIX_PATCH_TIMES_POST_SQL % sqlvalues(*get_bzr_details()))
-
-    print >> outf, dedent("""\
-        execute script (
-            set id = @lpmain_set, event node = @master_node,
-            filename='%s'
-            );
-        """ % combined_sql.name)
-
-    # Close transaction block and abort on error.
-    print >> outf, dedent("""\
-        }
-        on error {
-            echo 'Failed! Slonik script aborting. Patches rolled back.';
-            exit 1;
-            }
-        """)
-
-    # Execute the script with slonik.
-    log.info("slonik(1) schema upgrade script generated. Invoking.")
-    if not replication.helpers.execute_slonik(outf.getvalue()):
-        log.fatal("Aborting.")
-        raise SystemExit(4)
-    log.info("slonik(1) schema upgrade script completed.")
-
-    # Wait for replication to sync.
-    log.info("Waiting for patches to apply to slaves and cluster to sync.")
-    replication.helpers.sync(timeout=0)
-
-    # The db patches have now been applied to all nodes, and we are now
-    # committed to completing the upgrade (!). If any of the later stages
-    # fail, it will likely involve manual cleanup.
-
-    # We now scan for new tables and add them to the lpmain
-    # replication set using a second script. Note that upgrade.py only
-    # deals with the lpmain replication set.
-
-    # Detect new tables and sequences.
-    # Everything else that isn't replicated should go in the lpmain
-    # replication set.
-    cur = con.cursor()
-    unrepl_tabs, unrepl_seqs = replication.helpers.discover_unreplicated(cur)
-
-    # But warn if we are going to replicate something not in the calculated
-    # set, as *_SEED in replication.helpers needs to be updated. We don't want
-    # abort unless absolutely necessary to avoid manual cleanup.
-    lpmain_tabs, lpmain_seqs = replication.helpers.calculate_replication_set(
-            cur, replication.helpers.LPMAIN_SEED)
-
-    assumed_tabs = unrepl_tabs.difference(lpmain_tabs)
-    assumed_seqs = unrepl_seqs.difference(lpmain_seqs)
-    for obj in (assumed_tabs.union(assumed_seqs)):
-        log.warn(
-            "%s not in calculated lpmain replication set. "
-            "Update *_SEED in replication/helpers.py" % obj)
-
-    if unrepl_tabs or unrepl_seqs:
-        # TODO: Or if the holding set already exists - catch an aborted run.
-        log.info(
-            "New stuff needs replicating: %s"
-            % ', '.join(sorted(unrepl_tabs.union(unrepl_seqs))))
-        # Create a new replication set to hold new tables and sequences
-        # TODO: Only create set if it doesn't already exist.
-        outf = StringIO()
-        print >> outf, dedent("""\
-            try {
-                create set (
-                    id = @holding_set, origin = @master_node,
-                    comment = 'Temporary set to merge'
-                    );
-            """)
-
-        # Add the new tables and sequences to the holding set.
-        cur.execute("""
-            SELECT max(tab_id) FROM %s
-            """ % fqn(replication.helpers.CLUSTER_NAMESPACE, 'sl_table'))
-        next_id = cur.fetchone()[0] + 1
-        for tab in unrepl_tabs:
-            print >> outf, dedent("""\
-                echo 'Adding %s to holding set for lpmain merge.';
-                set add table (
-                    set id = @holding_set, origin = @master_node, id=%d,
-                    fully qualified name = '%s',
-                    comment = '%s'
-                    );
-                """ % (tab, next_id, tab, tab))
-            next_id += 1
-        cur.execute("""
-            SELECT max(seq_id) FROM %s
-            """ % fqn(replication.helpers.CLUSTER_NAMESPACE, 'sl_sequence'))
-        next_id = cur.fetchone()[0] + 1
-        for seq in  unrepl_seqs:
-            print >> outf, dedent("""\
-                echo 'Adding %s to holding set for lpmain merge.';
-                set add sequence (
-                    set id = @holding_set, origin = @master_node, id=%d,
-                    fully qualified name = '%s',
-                    comment = '%s'
-                    );
-                """ % (seq, next_id, seq, seq))
-            next_id += 1
-
-        print >> outf, dedent("""\
-            } on error {
-                echo 'Failed to create holding set! Aborting.';
-                exit 1;
-                }
-            """)
-
-        # Subscribe the holding set to all replicas.
-        # TODO: Only subscribe the set if not already subscribed.
-        # Close the transaction and sync. Important, or MERGE SET will fail!
-        # Merge the sets.
-        # Sync.
-        # Drop the holding set.
-        for slave_node in replication.helpers.get_slave_nodes(con):
-            print >> outf, dedent("""\
-                echo 'Subscribing holding set to @node%d_node.';
-                subscribe set (
-                    id=@holding_set, provider=@master_node,
-                    receiver=@node%d_node, forward=yes);
-                wait for event (
-                    origin=@master_node, confirmed=all,
-                    wait on=@master_node, timeout=0);
-                echo 'Waiting for sync';
-                sync (id=@master_node);
-                wait for event (
-                    origin=@master_node, confirmed=ALL,
-                    wait on=@master_node, timeout=0);
-                """ % (slave_node.node_id, slave_node.node_id))
-
-        print >> outf, dedent("""\
-            echo 'Merging holding set to lpmain';
-            merge set (
-                id=@lpmain_set, add id=@holding_set, origin=@master_node);
-            """)
-
-        # Execute the script and sync.
-        log.info(
-            "Generated slonik(1) script to replicate new objects. Invoking.")
-        if not replication.helpers.execute_slonik(outf.getvalue()):
-            log.fatal("Aborting.")
-        log.info(
-            "slonik(1) script to replicate new objects completed.")
-        log.info("Waiting for sync.")
-        replication.helpers.sync(timeout=0)
-    else:
-        log.info("No new tables or sequences to replicate.")
-
-    # We also scan for tables and sequences we want to drop and do so using
-    # a final slonik script. Instead of dropping tables in the DB patch,
-    # we rename them into the ToDrop namespace.
-    #
-    # First, remove all todrop.* sequences from replication.
-    cur.execute("""
-        SELECT seq_nspname, seq_relname, seq_id from %s
-        WHERE seq_nspname='todrop'
-        """ % fqn(replication.helpers.CLUSTER_NAMESPACE, 'sl_sequence'))
-    seqs_to_unreplicate = set(
-        (fqn(nspname, relname), tab_id)
-        for nspname, relname, tab_id in cur.fetchall())
-    if seqs_to_unreplicate:
-        log.info("Unreplicating sequences: %s" % ', '.join(
-            name for name, id in seqs_to_unreplicate))
-        # Generate a slonik script to remove sequences from the
-        # replication set.
-        sk = StringIO()
-        print >> sk, "try {"
-        for seq_name, seq_id in seqs_to_unreplicate:
-            if seq_id is not None:
-                print >> sk, dedent("""\
-                    echo 'Removing %s from replication';
-                    set drop sequence (origin=@master_node, id=%d);
-                    """ % (seq_name, seq_id))
-        print >> sk, dedent("""\
-            }
-            on error {
-                echo 'Failed to unreplicate sequences. Aborting.';
-                exit 1;
-                }
-            """)
-        log.info(
-            "Generated slonik(1) script to unreplicate sequences. Invoking.")
-        if not replication.helpers.execute_slonik(sk.getvalue()):
-            log.fatal("Aborting.")
-        log.info("slonik(1) script to drop sequences completed.")
-
-    # Generate a slonik script to remove tables from the replication set,
-    # and a DROP TABLE/DROP SEQUENCE sql script to run after.
-    cur.execute("""
-        SELECT nspname, relname, tab_id
-        FROM pg_class
-        JOIN pg_namespace ON relnamespace = pg_namespace.oid
-        LEFT OUTER JOIN %s ON pg_class.oid = tab_reloid
-        WHERE nspname='todrop' AND relkind='r'
-        """ % fqn(replication.helpers.CLUSTER_NAMESPACE, 'sl_table'))
-    tabs_to_drop = set(
-        (fqn(nspname, relname), tab_id)
-        for nspname, relname, tab_id in cur.fetchall())
-    if tabs_to_drop:
-        log.info("Dropping tables: %s" % ', '.join(
-            name for name, id in tabs_to_drop))
-        sk = StringIO()
-        sql = NamedTemporaryFile(prefix="drop", suffix=".sql")
-        print >> sk, "try {"
-        for tab_name, tab_id in tabs_to_drop:
-            if tab_id is not None:
-                print >> sk, dedent("""\
-                    echo 'Removing %s from replication';
-                    set drop table (origin=@master_node, id=%d);
-                    """ % (tab_name, tab_id))
-            print >> sql, "DROP TABLE %s;" % tab_name
-        sql.flush()
-        print >> sk, dedent("""\
-            execute script (
-                set id=@lpmain_set, event node=@master_node,
-                filename='%s'
-                );
-            }
-            on error {
-                echo 'Failed to drop tables. Aborting.';
-                exit 1;
-                }
-            """ % sql.name)
-        log.info("Generated slonik(1) script to drop tables. Invoking.")
-        if not replication.helpers.execute_slonik(sk.getvalue()):
-            log.fatal("Aborting.")
-        log.info("slonik(1) script to drop tables completed.")
-        sql.close()
-
-    # Now drop any remaining sequences. Most sequences will be dropped
-    # implicitly with the table drop.
-    cur.execute("""
-        SELECT nspname, relname, seq_id
-        FROM pg_class
-        JOIN pg_namespace ON relnamespace = pg_namespace.oid
-        LEFT OUTER JOIN %s ON pg_class.oid = seq_reloid
-        WHERE nspname='todrop' AND relkind='S'
-        """ % fqn(replication.helpers.CLUSTER_NAMESPACE, 'sl_sequence'))
-    seqs_to_drop = set(
-        (fqn(nspname, relname), tab_id)
-        for nspname, relname, tab_id in cur.fetchall())
-
-    if seqs_to_drop:
-        log.info("Dropping sequences: %s" % ', '.join(
-            name for name, id in seqs_to_drop))
-        # Generate a slonik script to remove sequences from the
-        # replication set, DROP SEQUENCE sql script to run after.
-        sk = StringIO()
-        sql = NamedTemporaryFile(prefix="drop", suffix=".sql")
-        print >> sk, "try {"
-        for seq_name, seq_id in seqs_to_drop:
-            if seq_id is not None:
-                print >> sk, dedent("""\
-                    echo 'Removing %s from replication';
-                    set drop sequence (origin=@master_node, id=%d);
-                    """ % (seq_name, seq_id))
-            print >> sql, "DROP SEQUENCE %s;" % seq_name
-        sql.flush()
-        print >> sk, dedent("""\
-            execute script (
-                set id=@lpmain_set, event node=@master_node,
-                filename='%s'
-                );
-            }
-            on error {
-                echo 'Failed to drop sequences. Aborting.';
-                exit 1;
-                }
-            """ % sql.name)
-        log.info("Generated slonik(1) script to drop sequences. Invoking.")
-        if not replication.helpers.execute_slonik(sk.getvalue()):
-            log.fatal("Aborting.")
-        log.info("slonik(1) script to drop sequences completed.")
-    log.info("Waiting for final sync.")
-    replication.helpers.sync(timeout=0)
-
-
 def get_patchlist(con):
     """Return a patches that need to be applied to the connected database
     in [((major, minor, patch), patch_file)] format.
@@ -614,9 +273,6 @@
     parser.add_option(
         "--partial", dest="partial", default=False,
         action="store_true", help="Commit after applying each patch")
-    parser.add_option(
-        "--ignore-slony", dest="ignore_slony", default=False,
-        action="store_true", help="Ignore any Slony installations")
     (options, args) = parser.parse_args()
 
     if args:


Follow ups