launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #28881
[Merge] ~cjwatson/launchpad:black-database into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:black-database into launchpad:master.
Commit message:
database: Apply black
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/427351
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:black-database into launchpad:master.
diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs
index 40d232e..384c68b 100644
--- a/.git-blame-ignore-revs
+++ b/.git-blame-ignore-revs
@@ -100,3 +100,5 @@ b56a741985ca580c281f142bea589b1ef05d3e93
86d834967ddae3eecd13deda5ac9eefea538195d
# apply black to utilities
35ac52c1c63d184b4660abbab4bead59408d3937
+# apply black to database
+a0cc45d527f251438cff74b4134e7a66fba42ac7
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index b925ca3..7421d57 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -42,6 +42,7 @@ repos:
files: |
(?x)^(
cronscripts
+ |database
|lib/lp/(
answers
|app
@@ -91,6 +92,7 @@ repos:
exclude: |
(?x)^(
cronscripts
+ |database
|lib/lp/(
answers
|app
@@ -125,6 +127,7 @@ repos:
files: |
(?x)^(
cronscripts
+ |database
|lib/lp/(
answers
|app
diff --git a/database/replication/helpers.py b/database/replication/helpers.py
index 5f70da5..f1f8349 100644
--- a/database/replication/helpers.py
+++ b/database/replication/helpers.py
@@ -13,28 +13,24 @@ import psycopg2
from lp.services.config import config
from lp.services.database.postgresql import (
+ ConnectionString,
all_sequences_in_schema,
all_tables_in_schema,
- ConnectionString,
fqn,
- )
+)
from lp.services.database.sqlbase import (
- connect,
ISOLATION_LEVEL_DEFAULT,
+ connect,
sqlvalues,
- )
-from lp.services.scripts.logger import (
- DEBUG2,
- log,
- )
-
+)
+from lp.services.scripts.logger import DEBUG2, log
# The Slony-I clustername we use with Launchpad. Hardcoded because there
# is no point changing this, ever.
-CLUSTERNAME = 'sl'
+CLUSTERNAME = "sl"
# The namespace in the database used to contain all the Slony-I tables.
-CLUSTER_NAMESPACE = '_%s' % CLUSTERNAME
+CLUSTER_NAMESPACE = "_%s" % CLUSTERNAME
# Replication set id constants. Don't change these without DBA help.
LPMAIN_SET_ID = 1
@@ -44,36 +40,38 @@ LPMIRROR_SET_ID = 4
# Seed tables for the lpmain replication set to be passed to
# calculate_replication_set().
-LPMAIN_SEED = frozenset([
- ('public', 'account'),
- ('public', 'person'),
- ('public', 'databasereplicationlag'),
- ('public', 'fticache'),
- ('public', 'nameblacklist'),
- ('public', 'openidconsumerassociation'),
- ('public', 'openidconsumernonce'),
- ('public', 'codeimportmachine'),
- ('public', 'scriptactivity'),
- ('public', 'launchpadstatistic'),
- ('public', 'parsedapachelog'),
- ('public', 'databasereplicationlag'),
- ('public', 'featureflag'),
- ('public', 'bugtaskflat'),
- # suggestivepotemplate can be removed when the
- # suggestivepotemplate.potemplate foreign key constraint exists on
- # production.
- ('public', 'suggestivepotemplate'),
- # These are odd. They are updated via slonik & EXECUTE SCRIPT, and
- # the contents of these tables will be different on each node
- # because we store timestamps when the patches were applied.
- # However, we want the tables listed as replicated so that, when
- # building a new replica, the data that documents the schema patch
- # level matches the schema patch level and upgrade.py does the right
- # thing. This is a bad thing to do, but we are safe in this
- # particular case.
- ('public', 'launchpaddatabaserevision'),
- ('public', 'launchpaddatabaseupdatelog'),
- ])
+LPMAIN_SEED = frozenset(
+ [
+ ("public", "account"),
+ ("public", "person"),
+ ("public", "databasereplicationlag"),
+ ("public", "fticache"),
+ ("public", "nameblacklist"),
+ ("public", "openidconsumerassociation"),
+ ("public", "openidconsumernonce"),
+ ("public", "codeimportmachine"),
+ ("public", "scriptactivity"),
+ ("public", "launchpadstatistic"),
+ ("public", "parsedapachelog"),
+ ("public", "databasereplicationlag"),
+ ("public", "featureflag"),
+ ("public", "bugtaskflat"),
+ # suggestivepotemplate can be removed when the
+ # suggestivepotemplate.potemplate foreign key constraint exists on
+ # production.
+ ("public", "suggestivepotemplate"),
+ # These are odd. They are updated via slonik & EXECUTE SCRIPT, and
+ # the contents of these tables will be different on each node
+ # because we store timestamps when the patches were applied.
+ # However, we want the tables listed as replicated so that, when
+ # building a new replica, the data that documents the schema patch
+ # level matches the schema patch level and upgrade.py does the right
+ # thing. This is a bad thing to do, but we are safe in this
+ # particular case.
+ ("public", "launchpaddatabaserevision"),
+ ("public", "launchpaddatabaseupdatelog"),
+ ]
+)
# Explicitly list tables that should not be replicated. This includes the
# session tables, as these might exist in developer databases but will not
@@ -81,47 +79,49 @@ LPMAIN_SEED = frozenset([
IGNORED_TABLES = {
# Session tables that in some situations will exist in the main lp
# database.
- 'public.secret', 'public.sessiondata', 'public.sessionpkgdata',
+ "public.secret",
+ "public.sessiondata",
+ "public.sessionpkgdata",
# Mirror tables, per Bug #489078. These tables have their own private
# replication set that is setup manually.
- 'public.lp_account',
- 'public.lp_openididentifier',
- 'public.lp_person',
- 'public.lp_personlocation',
- 'public.lp_teamparticipation',
+ "public.lp_account",
+ "public.lp_openididentifier",
+ "public.lp_person",
+ "public.lp_personlocation",
+ "public.lp_teamparticipation",
# Database statistics
- 'public.databasetablestats',
- 'public.databasecpustats',
- 'public.databasediskutilization',
+ "public.databasetablestats",
+ "public.databasecpustats",
+ "public.databasediskutilization",
# Don't replicate OAuthNonce - too busy and no real gain.
- 'public.oauthnonce',
+ "public.oauthnonce",
# Ubuntu SSO database. These tables where created manually by ISD
# and the Launchpad scripts should not mess with them. Eventually
# these tables will be in a totally separate database.
- 'public.auth_permission',
- 'public.auth_group',
- 'public.auth_user',
- 'public.auth_message',
- 'public.django_content_type',
- 'public.auth_permission',
- 'public.django_session',
- 'public.django_site',
- 'public.django_admin_log',
- 'public.ssoopenidrpconfig',
- 'public.auth_group_permissions',
- 'public.auth_user_groups',
- 'public.auth_user_user_permissions',
- 'public.oauth_nonce',
- 'public.oauth_consumer',
- 'public.oauth_token',
- 'public.api_user',
- 'public.oauth_consumer_id_seq',
- 'public.api_user_id_seq',
- 'public.oauth_nonce_id_seq',
- }
+ "public.auth_permission",
+ "public.auth_group",
+ "public.auth_user",
+ "public.auth_message",
+ "public.django_content_type",
+ "public.auth_permission",
+ "public.django_session",
+ "public.django_site",
+ "public.django_admin_log",
+ "public.ssoopenidrpconfig",
+ "public.auth_group_permissions",
+ "public.auth_user_groups",
+ "public.auth_user_user_permissions",
+ "public.oauth_nonce",
+ "public.oauth_consumer",
+ "public.oauth_token",
+ "public.api_user",
+ "public.oauth_consumer_id_seq",
+ "public.api_user_id_seq",
+ "public.oauth_nonce_id_seq",
+}
# Calculate IGNORED_SEQUENCES
-IGNORED_SEQUENCES = {'%s_id_seq' % table for table in IGNORED_TABLES}
+IGNORED_SEQUENCES = {"%s_id_seq" % table for table in IGNORED_TABLES}
def slony_installed(con):
@@ -129,33 +129,41 @@ def slony_installed(con):
cluster.
"""
cur = con.cursor()
- cur.execute("""
+ cur.execute(
+ """
SELECT TRUE FROM pg_class,pg_namespace
WHERE
nspname = %s
AND relname = 'sl_table'
AND pg_class.relnamespace = pg_namespace.oid
- """ % sqlvalues(CLUSTER_NAMESPACE))
+ """
+ % sqlvalues(CLUSTER_NAMESPACE)
+ )
return cur.fetchone() is not None
class TableReplicationInfo:
"""Internal table replication details."""
+
table_id = None
replication_set_id = None
primary_node_id = None
def __init__(self, con, namespace, table_name):
cur = con.cursor()
- cur.execute("""
+ cur.execute(
+ """
SELECT tab_id, tab_set, set_origin
FROM %s.sl_table, %s.sl_set
WHERE tab_set = set_id
AND tab_nspname = %s
AND tab_relname = %s
- """ % (
+ """
+ % (
(CLUSTER_NAMESPACE, CLUSTER_NAMESPACE)
- + sqlvalues(namespace, table_name)))
+ + sqlvalues(namespace, table_name)
+ )
+ )
row = cur.fetchone()
if row is None:
raise LookupError(fqn(namespace, table_name))
@@ -204,12 +212,15 @@ def execute_slonik(script, sync=None, exit_on_fail=True, auto_preamble=True):
script = preamble() + script
if sync is not None:
- sync_script = dedent("""\
+ sync_script = dedent(
+ """\
sync (id = @primary_node);
wait for event (
origin = @primary_node, confirmed = ALL,
wait on = @primary_node, timeout = %d);
- """ % sync)
+ """
+ % sync
+ )
script = script + sync_script
# Copy the script to a NamedTemporaryFile rather than just pumping it
@@ -221,8 +232,8 @@ def execute_slonik(script, sync=None, exit_on_fail=True, auto_preamble=True):
# Run slonik
log.debug("Executing slonik script %s" % script_on_disk.name)
- log.log(DEBUG2, 'Running script:\n%s' % script)
- returncode = subprocess.call(['slonik', script_on_disk.name])
+ log.log(DEBUG2, "Running script:\n%s" % script)
+ returncode = subprocess.call(["slonik", script_on_disk.name])
if returncode != 0:
log.error("slonik script failed")
@@ -234,6 +245,7 @@ def execute_slonik(script, sync=None, exit_on_fail=True, auto_preamble=True):
class Node:
"""Simple data structure for holding information about a Slony node."""
+
def __init__(self, node_id, nickname, connection_string, is_primary):
self.node_id = node_id
self.nickname = nickname
@@ -260,7 +272,9 @@ def _get_nodes(con, query):
def get_primary_node(con, set_id=1):
"""Return the primary Node, or None if the cluster is still being setup."""
- nodes = _get_nodes(con, """
+ nodes = _get_nodes(
+ con,
+ """
SELECT DISTINCT
set_origin AS node_id,
'primary',
@@ -269,7 +283,9 @@ def get_primary_node(con, set_id=1):
FROM _sl.sl_set
LEFT OUTER JOIN _sl.sl_path ON set_origin = pa_server
WHERE set_id = %d
- """ % set_id)
+ """
+ % set_id,
+ )
if not nodes:
return None
assert len(nodes) == 1, "More than one primary found for set %s" % set_id
@@ -278,7 +294,9 @@ def get_primary_node(con, set_id=1):
def get_standby_nodes(con, set_id=1):
"""Return the list of standby Nodes."""
- return _get_nodes(con, """
+ return _get_nodes(
+ con,
+ """
SELECT DISTINCT
pa_server AS node_id,
'standby' || pa_server,
@@ -290,7 +308,9 @@ def get_standby_nodes(con, set_id=1):
WHERE
set_id = %d
ORDER BY node_id
- """ % set_id)
+ """
+ % set_id,
+ )
def get_nodes(con, set_id=1):
@@ -310,7 +330,9 @@ def get_all_cluster_nodes(con):
"""
if not slony_installed(con):
return []
- nodes = _get_nodes(con, """
+ nodes = _get_nodes(
+ con,
+ """
SELECT DISTINCT
pa_server AS node_id,
'node' || pa_server || '_node',
@@ -318,7 +340,8 @@ def get_all_cluster_nodes(con):
NULL
FROM _sl.sl_path
ORDER BY node_id
- """)
+ """,
+ )
if not nodes:
# There are no subscriptions yet, so no paths. Generate the
# primary Node.
@@ -330,11 +353,17 @@ def get_all_cluster_nodes(con):
assert len(node_ids) == 1, "Multiple nodes but no paths."
primary_node_id = node_ids[0]
primary_connection_string = ConnectionString(
- config.database.rw_main_primary)
- primary_connection_string.user = 'slony'
- return [Node(
- primary_node_id, 'node%d_node' % primary_node_id,
- primary_connection_string, True)]
+ config.database.rw_main_primary
+ )
+ primary_connection_string.user = "slony"
+ return [
+ Node(
+ primary_node_id,
+ "node%d_node" % primary_node_id,
+ primary_connection_string,
+ True,
+ )
+ ]
return nodes
@@ -342,14 +371,16 @@ def preamble(con=None):
"""Return the preable needed at the start of all slonik scripts."""
if con is None:
- con = connect(user='slony')
+ con = connect(user="slony")
primary_node = get_primary_node(con)
nodes = get_all_cluster_nodes(con)
if primary_node is None and len(nodes) == 1:
primary_node = nodes[0]
- preamble = [dedent("""\
+ preamble = [
+ dedent(
+ """\
#
# Every slonik script must start with a clustername, which cannot
# be changed once the cluster is initialized.
@@ -361,26 +392,43 @@ def preamble(con=None):
define holding_set %d;
define sso_set %d;
define lpmirror_set %d;
- """ % (LPMAIN_SET_ID, HOLDING_SET_ID, SSO_SET_ID, LPMIRROR_SET_ID))]
+ """
+ % (LPMAIN_SET_ID, HOLDING_SET_ID, SSO_SET_ID, LPMIRROR_SET_ID)
+ )
+ ]
if primary_node is not None:
- preamble.append(dedent("""\
+ preamble.append(
+ dedent(
+ """\
# Symbolic id for the main replication set primary node.
define primary_node %d;
define primary_node_conninfo '%s';
- """ % (primary_node.node_id, primary_node.connection_string)))
+ """
+ % (primary_node.node_id, primary_node.connection_string)
+ )
+ )
for node in nodes:
- preamble.append(dedent("""\
+ preamble.append(
+ dedent(
+ """\
define %s %d;
define %s_conninfo '%s';
node @%s admin conninfo = @%s_conninfo;
- """ % (
- node.nickname, node.node_id,
- node.nickname, node.connection_string,
- node.nickname, node.nickname)))
+ """
+ % (
+ node.nickname,
+ node.node_id,
+ node.nickname,
+ node.connection_string,
+ node.nickname,
+ node.nickname,
+ )
+ )
+ )
- return '\n\n'.join(preamble)
+ return "\n\n".join(preamble)
def calculate_replication_set(cur, seeds):
@@ -411,13 +459,16 @@ def calculate_replication_set(cur, seeds):
# Skip if the table doesn't exist - we might have seeds listed that
# have been removed or are yet to be created.
- cur.execute("""
+ cur.execute(
+ """
SELECT TRUE
FROM pg_class, pg_namespace
WHERE pg_class.relnamespace = pg_namespace.oid
AND pg_namespace.nspname = %s
AND pg_class.relname = %s
- """ % sqlvalues(namespace, tablename))
+ """
+ % sqlvalues(namespace, tablename)
+ )
if cur.fetchone() is None:
log.debug("Table %s.%s doesn't exist" % (namespace, tablename))
continue
@@ -426,7 +477,8 @@ def calculate_replication_set(cur, seeds):
# Find all tables that reference the current (seed) table
# and all tables that the seed table references.
- cur.execute("""
+ cur.execute(
+ """
SELECT ref_namespace.nspname, ref_class.relname
FROM
-- One of the seed tables
@@ -456,11 +508,16 @@ def calculate_replication_set(cur, seeds):
OR (pg_constraint.conrelid = seed_class.oid
AND pg_constraint.confrelid = ref_class.oid)
)
- """ % sqlvalues(namespace, tablename))
+ """
+ % sqlvalues(namespace, tablename)
+ )
for namespace, tablename in cur.fetchall():
key = (namespace, tablename)
- if (key not in tables and key not in pending_tables
- and '%s.%s' % (namespace, tablename) not in IGNORED_TABLES):
+ if (
+ key not in tables
+ and key not in pending_tables
+ and "%s.%s" % (namespace, tablename) not in IGNORED_TABLES
+ ):
pending_tables.add(key)
# Generate the set of sequences that are linked to any of our set of
@@ -468,7 +525,8 @@ def calculate_replication_set(cur, seeds):
# serial or bigserial columns, or other sequences OWNED BY a particular
# column.
for namespace, tablename in tables:
- cur.execute("""
+ cur.execute(
+ """
SELECT seq
FROM (
SELECT pg_get_serial_sequence(%s, attname) AS seq
@@ -480,8 +538,10 @@ def calculate_replication_set(cur, seeds):
AND pg_attribute.attisdropped IS FALSE
) AS whatever
WHERE seq IS NOT NULL;
- """ % sqlvalues(fqn(namespace, tablename), namespace, tablename))
- for sequence, in cur.fetchall():
+ """
+ % sqlvalues(fqn(namespace, tablename), namespace, tablename)
+ )
+ for (sequence,) in cur.fetchall():
if sequence not in IGNORED_SEQUENCES:
sequences.add(sequence)
@@ -498,33 +558,42 @@ def discover_unreplicated(cur):
:returns: (unreplicated_tables_set, unreplicated_sequences_set)
"""
- all_tables = all_tables_in_schema(cur, 'public')
- all_sequences = all_sequences_in_schema(cur, 'public')
+ all_tables = all_tables_in_schema(cur, "public")
+ all_sequences = all_sequences_in_schema(cur, "public")
# Ignore any tables and sequences starting with temp_. These are
# transient and not to be replicated per Bug #778338.
all_tables = {
- table for table in all_tables
- if not table.startswith('public.temp_')}
+ table for table in all_tables if not table.startswith("public.temp_")
+ }
all_sequences = {
- sequence for sequence in all_sequences
- if not sequence.startswith('public.temp_')}
+ sequence
+ for sequence in all_sequences
+ if not sequence.startswith("public.temp_")
+ }
- cur.execute("""
+ cur.execute(
+ """
SELECT tab_nspname, tab_relname FROM %s
WHERE tab_nspname = 'public'
- """ % fqn(CLUSTER_NAMESPACE, "sl_table"))
+ """
+ % fqn(CLUSTER_NAMESPACE, "sl_table")
+ )
replicated_tables = {fqn(*row) for row in cur.fetchall()}
- cur.execute("""
+ cur.execute(
+ """
SELECT seq_nspname, seq_relname FROM %s
WHERE seq_nspname = 'public'
- """ % fqn(CLUSTER_NAMESPACE, "sl_sequence"))
+ """
+ % fqn(CLUSTER_NAMESPACE, "sl_sequence")
+ )
replicated_sequences = {fqn(*row) for row in cur.fetchall()}
return (
all_tables - replicated_tables - IGNORED_TABLES,
- all_sequences - replicated_sequences - IGNORED_SEQUENCES)
+ all_sequences - replicated_sequences - IGNORED_SEQUENCES,
+ )
class ReplicationConfigError(Exception):
@@ -547,10 +616,13 @@ def validate_replication(cur):
unrepl_tables, unrepl_sequences = discover_unreplicated(cur)
if unrepl_tables:
raise ReplicationConfigError(
- "Unreplicated tables: %s" % repr(unrepl_tables))
+ "Unreplicated tables: %s" % repr(unrepl_tables)
+ )
if unrepl_sequences:
raise ReplicationConfigError(
- "Unreplicated sequences: %s" % repr(unrepl_sequences))
+ "Unreplicated sequences: %s" % repr(unrepl_sequences)
+ )
lpmain_tables, lpmain_sequences = calculate_replication_set(
- cur, LPMAIN_SEED)
+ cur, LPMAIN_SEED
+ )
diff --git a/database/replication/preamble.py b/database/replication/preamble.py
index b6492d4..694d1bb 100755
--- a/database/replication/preamble.py
+++ b/database/replication/preamble.py
@@ -10,16 +10,15 @@ __all__ = []
import _pythonpath # noqa: F401
-from optparse import OptionParser
import time
+from optparse import OptionParser
+import replication.helpers
from lp.services import scripts
from lp.services.config import config
from lp.services.database.sqlbase import connect
-import replication.helpers
-
-if __name__ == '__main__':
+if __name__ == "__main__":
parser = OptionParser()
scripts.db_options(parser)
(options, args) = parser.parse_args()
@@ -28,7 +27,7 @@ if __name__ == '__main__':
scripts.execute_zcml_for_scripts(use_web_security=False)
con = connect()
- print('# slonik(1) preamble generated %s' % time.ctime())
- print('# LPCONFIG=%s' % config.instance_name)
+ print("# slonik(1) preamble generated %s" % time.ctime())
+ print("# LPCONFIG=%s" % config.instance_name)
print()
print(replication.helpers.preamble(con))
diff --git a/database/replication/sync.py b/database/replication/sync.py
index f74c538..af646c7 100755
--- a/database/replication/sync.py
+++ b/database/replication/sync.py
@@ -11,18 +11,20 @@ import _pythonpath # noqa: F401
from optparse import OptionParser
-from lp.services.scripts import (
- db_options,
- logger_options,
- )
+from lp.services.scripts import db_options, logger_options
from replication.helpers import sync
-
-if __name__ == '__main__':
+if __name__ == "__main__":
parser = OptionParser()
parser.add_option(
- "-t", "--timeout", dest="timeout", metavar="SECS", type="int",
- help="Abort if no sync after SECS seconds.", default=0)
+ "-t",
+ "--timeout",
+ dest="timeout",
+ metavar="SECS",
+ type="int",
+ help="Abort if no sync after SECS seconds.",
+ default=0,
+ )
logger_options(parser)
db_options(parser)
options, args = parser.parse_args()
diff --git a/database/replication/walblock.py b/database/replication/walblock.py
index 49f1b61..3d15a2e 100755
--- a/database/replication/walblock.py
+++ b/database/replication/walblock.py
@@ -7,31 +7,41 @@
__all__ = []
-from glob import glob
-from optparse import OptionParser
import os.path
import sys
import time
+from glob import glob
+from optparse import OptionParser
def main():
parser = OptionParser()
parser.add_option(
- "-n", dest="num_ready", metavar="N", type="int",
- help="Block if there are more than N unshipped WAL files.", default=25)
+ "-n",
+ dest="num_ready",
+ metavar="N",
+ type="int",
+ help="Block if there are more than N unshipped WAL files.",
+ default=25,
+ )
parser.add_option(
- "-d", dest="wal_dir", metavar="DIR", type="string",
+ "-d",
+ dest="wal_dir",
+ metavar="DIR",
+ type="string",
help="Path to pg_wal directory",
- default="/var/lib/postgresql/10/main/pg_wal")
+ default="/var/lib/postgresql/10/main/pg_wal",
+ )
parser.add_option(
- "-v", "--verbose", action="store_true", default=False, help="Verbose")
+ "-v", "--verbose", action="store_true", default=False, help="Verbose"
+ )
options, args = parser.parse_args()
if args:
- parser.error('Too many arguments')
+ parser.error("Too many arguments")
chunk_size = 1024 * 1024
- ready_wal_glob = os.path.join(options.wal_dir, 'archive_status', '*.ready')
+ ready_wal_glob = os.path.join(options.wal_dir, "archive_status", "*.ready")
while True:
notified = False
@@ -39,19 +49,22 @@ def main():
if options.verbose and not notified:
notified = True
print(
- 'Blocking on {} unshipped WAL'.format(
- len(glob(ready_wal_glob))),
- end='', file=sys.stderr)
+ "Blocking on {} unshipped WAL".format(
+ len(glob(ready_wal_glob))
+ ),
+ end="",
+ file=sys.stderr,
+ )
time.sleep(5)
if options.verbose and notified:
- print(' ... Done', file=sys.stderr)
+ print(" ... Done", file=sys.stderr)
chunk = sys.stdin.buffer.read(chunk_size)
- if chunk == b'':
+ if chunk == b"":
sys.stdout.buffer.flush()
return 0
sys.stdout.buffer.write(chunk)
-if __name__ == '__main__':
+if __name__ == "__main__":
raise SystemExit(main())
diff --git a/database/schema/dbcontroller.py b/database/schema/dbcontroller.py
index 8892413..ec2b673 100644
--- a/database/schema/dbcontroller.py
+++ b/database/schema/dbcontroller.py
@@ -3,7 +3,7 @@
"""Connect to and control our pgbouncer instance."""
-__all__ = ['DBController', 'streaming_sync']
+__all__ = ["DBController", "streaming_sync"]
import time
@@ -13,7 +13,6 @@ from psycopg2.extras import NamedTupleConnection
from lp.services.database.postgresql import ConnectionString
-
# Increase this timeout once we are confident in the
# implementation. We don't want to block rollouts
# unnecessarily with slow timeouts and a flaky sync
@@ -23,7 +22,8 @@ STREAMING_SYNC_TIMEOUT = 60
def pg_connect(conn_str):
con = psycopg2.connect(
- str(conn_str), connection_factory=NamedTupleConnection)
+ str(conn_str), connection_factory=NamedTupleConnection
+ )
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
return con
@@ -39,14 +39,17 @@ def streaming_sync(con, timeout=None):
cur = con.cursor()
# Force a WAL switch, returning the current position.
- cur.execute('SELECT pg_switch_wal()')
+ cur.execute("SELECT pg_switch_wal()")
wal_point = cur.fetchone()[0]
start_time = time.time()
while timeout is None or time.time() < start_time + timeout:
- cur.execute("""
+ cur.execute(
+ """
SELECT FALSE FROM pg_stat_replication
WHERE replay_lsn < %s LIMIT 1
- """, (wal_point,))
+ """,
+ (wal_point,),
+ )
if cur.fetchone() is None:
# All standbys, possibly 0, are in sync.
return True
@@ -60,27 +63,29 @@ class DBController:
pgbouncer_conn_str = ConnectionString(pgbouncer_conn_str)
if not pgbouncer_conn_str.dbname:
- pgbouncer_conn_str.dbname = 'pgbouncer'
- if pgbouncer_conn_str.dbname != 'pgbouncer':
+ pgbouncer_conn_str.dbname = "pgbouncer"
+ if pgbouncer_conn_str.dbname != "pgbouncer":
log.warning(
- "pgbouncer administrative database not named 'pgbouncer'")
+ "pgbouncer administrative database not named 'pgbouncer'"
+ )
self.pgbouncer_con = pg_connect(pgbouncer_conn_str)
self.primary_name = None
self.primary = None
self.standbys = {}
- for db in self.pgbouncer_cmd('show databases', results=True):
+ for db in self.pgbouncer_cmd("show databases", results=True):
if db.database != dbname:
continue
conn_str = ConnectionString(
- 'dbname=%s port=%s user=%s' % (dbname, db.port, dbuser))
+ "dbname=%s port=%s user=%s" % (dbname, db.port, dbuser)
+ )
if db.host:
conn_str.host = db.host
con = pg_connect(conn_str)
cur = con.cursor()
- cur.execute('select pg_is_in_recovery()')
+ cur.execute("select pg_is_in_recovery()")
if cur.fetchone()[0] is True:
self.standbys[db.name] = conn_str
else:
@@ -88,7 +93,7 @@ class DBController:
self.primary = conn_str
if self.primary_name is None:
- log.fatal('No primary detected.')
+ log.fatal("No primary detected.")
raise SystemExit(98)
def pgbouncer_cmd(self, cmd, results):
@@ -99,33 +104,33 @@ class DBController:
def pause_replication(self):
names = self.standbys.keys()
- self.log.info("Pausing replication to %s.", ', '.join(names))
+ self.log.info("Pausing replication to %s.", ", ".join(names))
for name, conn_str in self.standbys.items():
try:
con = pg_connect(conn_str)
cur = con.cursor()
- cur.execute('select pg_wal_replay_pause()')
+ cur.execute("select pg_wal_replay_pause()")
except psycopg2.Error as x:
self.log.error(
- 'Unable to pause replication to %s (%s).'
- % (name, str(x)))
+ "Unable to pause replication to %s (%s)." % (name, str(x))
+ )
return False
return True
def resume_replication(self):
names = self.standbys.keys()
- self.log.info("Resuming replication to %s.", ', '.join(names))
+ self.log.info("Resuming replication to %s.", ", ".join(names))
success = True
for name, conn_str in self.standbys.items():
try:
con = pg_connect(conn_str)
cur = con.cursor()
- cur.execute('select pg_wal_replay_resume()')
+ cur.execute("select pg_wal_replay_resume()")
except psycopg2.Error as x:
success = False
self.log.error(
- 'Failed to resume replication to %s (%s).'
- % (name, str(x)))
+ "Failed to resume replication to %s (%s)." % (name, str(x))
+ )
return success
def ensure_replication_enabled(self):
@@ -145,13 +150,15 @@ class DBController:
replication_paused = cur.fetchone()[0]
if replication_paused:
self.log.warning(
- "Replication paused on %s. Resuming.", name)
+ "Replication paused on %s. Resuming.", name
+ )
cur.execute("SELECT pg_wal_replay_resume()")
wait_for_sync = True
except psycopg2.Error as x:
success = False
self.log.error(
- "Failed to resume replication on %s (%s)", name, str(x))
+ "Failed to resume replication on %s (%s)", name, str(x)
+ )
if success and wait_for_sync:
self.sync()
return success
@@ -184,8 +191,7 @@ class DBController:
def disable_standbys(self):
names = self.standbys.keys()
- self.log.info(
- "Disabling access to %s.", ', '.join(names))
+ self.log.info("Disabling access to %s.", ", ".join(names))
for name in self.standbys.keys():
if not self.disable(name):
return False # Don't do further damage if we failed.
@@ -193,8 +199,7 @@ class DBController:
def enable_standbys(self):
names = self.standbys.keys()
- self.log.info(
- "Enabling access to %s.", ', '.join(names))
+ self.log.info("Enabling access to %s.", ", ".join(names))
success = True
for name in self.standbys.keys():
if not self.enable(name):
@@ -204,9 +209,10 @@ class DBController:
def sync(self):
sync = streaming_sync(pg_connect(self.primary), STREAMING_SYNC_TIMEOUT)
if sync:
- self.log.debug('Standbys in sync.')
+ self.log.debug("Standbys in sync.")
else:
self.log.error(
- 'Standbys failed to sync after %d seconds.',
- STREAMING_SYNC_TIMEOUT)
+ "Standbys failed to sync after %d seconds.",
+ STREAMING_SYNC_TIMEOUT,
+ )
return sync
diff --git a/database/schema/emptytables.py b/database/schema/emptytables.py
index 46fd25f..df7e7cc 100755
--- a/database/schema/emptytables.py
+++ b/database/schema/emptytables.py
@@ -17,22 +17,24 @@ from lp.services.scripts import db_options
def main(options):
con = connect()
cur = con.cursor()
- cur.execute("""
+ cur.execute(
+ """
SELECT relname FROM pg_class,pg_namespace
WHERE pg_class.relnamespace = pg_namespace.oid
AND pg_namespace.nspname='public'
AND pg_class.relkind = 'r'
ORDER BY relname
- """)
+ """
+ )
for table in (row[0] for row in cur.fetchall()):
cur.execute(
- "SELECT TRUE FROM public.%s LIMIT 1" % quote_identifier(table)
- )
+ "SELECT TRUE FROM public.%s LIMIT 1" % quote_identifier(table)
+ )
if cur.fetchone() is None:
print(table)
-if __name__ == '__main__':
+if __name__ == "__main__":
parser = OptionParser()
db_options(parser)
(options, args) = parser.parse_args()
diff --git a/database/schema/fti.py b/database/schema/fti.py
index 4c8711c..3bb2342 100755
--- a/database/schema/fti.py
+++ b/database/schema/fti.py
@@ -10,28 +10,23 @@ Add full text indexes to the launchpad database
import _pythonpath # noqa: F401
-from optparse import OptionParser
import sys
+from optparse import OptionParser
from textwrap import dedent
import psycopg2.extensions
from lp.services.database.sqlbase import (
- connect,
ISOLATION_LEVEL_AUTOCOMMIT,
ISOLATION_LEVEL_READ_COMMITTED,
+ connect,
quote,
quote_identifier,
- )
-from lp.services.scripts import (
- db_options,
- logger,
- logger_options,
- )
-
+)
+from lp.services.scripts import db_options, logger, logger_options
# tsearch2 ranking constants:
-A, B, C, D = 'ABCD'
+A, B, C, D = "ABCD"
# This data structure defines all of our full text indexes. Each tuple in the
# top level list creates a 'fti' column in the specified table.
@@ -39,105 +34,139 @@ A, B, C, D = 'ABCD'
# A is most important, and D is least important. This affects result ordering
# when you are ordering by rank.
ALL_FTI = [
- ('archive', [
- ('description', A),
- ('package_description_cache', B),
- ]),
- ('bug', [
- ('name', A),
- ('title', B),
- ('description', D),
- ]),
-
- ('binarypackagerelease', [
- ('summary', B),
- ('description', C),
- ]),
-
- ('cve', [
- ('sequence', A),
- ('description', B),
- ]),
-
- ('distribution', [
- ('name', A),
- ('displayname', A),
- ('title', B),
- ('summary', C),
- ('description', D),
- ]),
-
- ('distributionsourcepackagecache', [
- ('name', A),
- ('binpkgnames', B),
- ('binpkgsummaries', C),
- ('binpkgdescriptions', D),
- ('changelog', D),
- ]),
-
- ('distroseriespackagecache', [
- ('name', A),
- ('summaries', B),
- ('descriptions', C),
- ]),
-
- ('faq', [
- ('title', A),
- ('tags', B),
- ('content', D),
- ]),
-
- ('message', [
- ('subject', B),
- ]),
-
- ('messagechunk', [
- ('content', C),
- ]),
-
- ('person', [
- ('name', A),
- ('displayname', A),
- ]),
-
- ('product', [
- ('name', A),
- ('displayname', A),
- ('title', B),
- ('summary', C),
- ('description', D),
- ]),
-
- ('productreleasefile', [
- ('description', D),
- ]),
-
- ('project', [
- ('name', A),
- ('displayname', A),
- ('title', B),
- ('summary', C),
- ('description', D),
- ]),
-
- ('specification', [
- ('name', A),
- ('title', A),
- ('summary', B),
- ('whiteboard', D),
- ]),
-
- ('question', [
- ('title', A),
- ('description', B),
- ('whiteboard', B),
- ])
- ]
+ (
+ "archive",
+ [
+ ("description", A),
+ ("package_description_cache", B),
+ ],
+ ),
+ (
+ "bug",
+ [
+ ("name", A),
+ ("title", B),
+ ("description", D),
+ ],
+ ),
+ (
+ "binarypackagerelease",
+ [
+ ("summary", B),
+ ("description", C),
+ ],
+ ),
+ (
+ "cve",
+ [
+ ("sequence", A),
+ ("description", B),
+ ],
+ ),
+ (
+ "distribution",
+ [
+ ("name", A),
+ ("displayname", A),
+ ("title", B),
+ ("summary", C),
+ ("description", D),
+ ],
+ ),
+ (
+ "distributionsourcepackagecache",
+ [
+ ("name", A),
+ ("binpkgnames", B),
+ ("binpkgsummaries", C),
+ ("binpkgdescriptions", D),
+ ("changelog", D),
+ ],
+ ),
+ (
+ "distroseriespackagecache",
+ [
+ ("name", A),
+ ("summaries", B),
+ ("descriptions", C),
+ ],
+ ),
+ (
+ "faq",
+ [
+ ("title", A),
+ ("tags", B),
+ ("content", D),
+ ],
+ ),
+ (
+ "message",
+ [
+ ("subject", B),
+ ],
+ ),
+ (
+ "messagechunk",
+ [
+ ("content", C),
+ ],
+ ),
+ (
+ "person",
+ [
+ ("name", A),
+ ("displayname", A),
+ ],
+ ),
+ (
+ "product",
+ [
+ ("name", A),
+ ("displayname", A),
+ ("title", B),
+ ("summary", C),
+ ("description", D),
+ ],
+ ),
+ (
+ "productreleasefile",
+ [
+ ("description", D),
+ ],
+ ),
+ (
+ "project",
+ [
+ ("name", A),
+ ("displayname", A),
+ ("title", B),
+ ("summary", C),
+ ("description", D),
+ ],
+ ),
+ (
+ "specification",
+ [
+ ("name", A),
+ ("title", A),
+ ("summary", B),
+ ("whiteboard", D),
+ ],
+ ),
+ (
+ "question",
+ [
+ ("title", A),
+ ("description", B),
+ ("whiteboard", B),
+ ],
+ ),
+]
def execute(con, sql, results=False, args=None):
sql = sql.strip()
- log.debug('* %s' % sql)
+ log.debug("* %s" % sql)
cur = con.cursor()
if args is None:
cur.execute(sql)
@@ -154,7 +183,7 @@ def sexecute(con, sql):
SQL script. Otherwise execute on the DB.
"""
if slonik_sql is not None:
- print(dedent(sql + ';'), file=slonik_sql)
+ print(dedent(sql + ";"), file=slonik_sql)
else:
execute(con, sql)
@@ -171,8 +200,7 @@ def nullify(con):
def liverebuild(con):
- """Rebuild the data in all the fti columns against possibly live database.
- """
+ """Rebuild the data in all the fti columns against possibly live DB."""
# Update number of rows per transaction.
batch_size = 50
@@ -190,33 +218,43 @@ def liverebuild(con):
try:
query = """
UPDATE %s SET fti=NULL WHERE id BETWEEN %d AND %d
- """ % (table, id + 1, id + batch_size)
+ """ % (
+ table,
+ id + 1,
+ id + batch_size,
+ )
log.debug(query)
cur.execute(query)
except psycopg2.Error:
# No commit - we are in autocommit mode
- log.exception('psycopg error')
+ log.exception("psycopg error")
con = connect()
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
def needs_refresh(con, table, columns):
- '''Return true if the index needs to be rebuilt.
+ """Return true if the index needs to be rebuilt.
We know this by looking in our cache to see what the previous
definitions were, and the --force command line argument
- '''
+ """
current_columns = repr(sorted(columns))
existing = execute(
- con, "SELECT columns FROM FtiCache WHERE tablename=%(table)s",
- results=True, args=vars()
- )
+ con,
+ "SELECT columns FROM FtiCache WHERE tablename=%(table)s",
+ results=True,
+ args=vars(),
+ )
if len(existing) == 0:
log.debug("No fticache for %(table)s" % vars())
- sexecute(con, """
+ sexecute(
+ con,
+ """
INSERT INTO FtiCache (tablename, columns) VALUES (%s, %s)
- """ % (quote(table), quote(current_columns)))
+ """
+ % (quote(table), quote(current_columns)),
+ )
return True
if not options.force:
@@ -224,13 +262,18 @@ def needs_refresh(con, table, columns):
if current_columns == previous_columns:
log.debug("FtiCache for %(table)s still valid" % vars())
return False
- log.debug("Cache out of date - %s != %s" % (
- current_columns, previous_columns
- ))
- sexecute(con, """
+ log.debug(
+ "Cache out of date - %s != %s"
+ % (current_columns, previous_columns)
+ )
+ sexecute(
+ con,
+ """
UPDATE FtiCache SET columns = %s
WHERE tablename = %s
- """ % (quote(current_columns), quote(table)))
+ """
+ % (quote(current_columns), quote(table)),
+ )
return True
@@ -249,15 +292,21 @@ slonik_sql = None
def main():
parser = OptionParser()
parser.add_option(
- "-0", "--null", dest="null",
- action="store_true", default=False,
- help="Set all full text index column values to NULL.",
- )
+ "-0",
+ "--null",
+ dest="null",
+ action="store_true",
+ default=False,
+ help="Set all full text index column values to NULL.",
+ )
parser.add_option(
- "-l", "--live-rebuild", dest="liverebuild",
- action="store_true", default=False,
- help="Rebuild all the indexes against a live database.",
- )
+ "-l",
+ "--live-rebuild",
+ dest="liverebuild",
+ action="store_true",
+ default=False,
+ help="Rebuild all the indexes against a live database.",
+ )
db_options(parser)
logger_options(parser)
@@ -285,5 +334,5 @@ def main():
return 0
-if __name__ == '__main__':
+if __name__ == "__main__":
sys.exit(main())
diff --git a/database/schema/full-update.py b/database/schema/full-update.py
index b2d2258..fe32fb5 100755
--- a/database/schema/full-update.py
+++ b/database/schema/full-update.py
@@ -6,24 +6,21 @@
import _pythonpath # noqa: F401
+import sys
from datetime import datetime
from optparse import OptionParser
-import sys
import psycopg2
+import security # security.py script
+import upgrade # upgrade.py script
from dbcontroller import DBController
-from lp.services.scripts import (
- logger,
- logger_options,
- )
+from lp.services.scripts import logger, logger_options
from preflight import (
+ SYSTEM_USERS,
KillConnectionsPreflight,
NoConnectionCheckPreflight,
- SYSTEM_USERS,
- )
-import security # security.py script
-import upgrade # upgrade.py script
+)
def run_upgrade(options, log, primary_con):
@@ -46,7 +43,7 @@ def run_upgrade(options, log, primary_con):
try:
return upgrade.main(primary_con)
except Exception:
- log.exception('Unhandled exception')
+ log.exception("Unhandled exception")
return 1
except SystemExit as x:
log.fatal("upgrade.py failed [%s]", x)
@@ -61,14 +58,14 @@ def run_security(options, log, primary_con):
# Fake expected command line arguments and global log
options.dryrun = False
options.revoke = True
- options.owner = 'postgres'
+ options.owner = "postgres"
security.options = options
security.log = log
# Invoke the database security reset process.
try:
return security.main(options, primary_con)
except Exception:
- log.exception('Unhandled exception')
+ log.exception("Unhandled exception")
return 1
except SystemExit as x:
log.fatal("security.py failed [%s]", x)
@@ -77,16 +74,26 @@ def run_security(options, log, primary_con):
def main():
parser = OptionParser()
parser.add_option(
- '--pgbouncer', dest='pgbouncer',
- default='host=localhost port=6432 user=pgbouncer',
- metavar='CONN_STR',
- help="libpq connection string to administer pgbouncer")
+ "--pgbouncer",
+ dest="pgbouncer",
+ default="host=localhost port=6432 user=pgbouncer",
+ metavar="CONN_STR",
+ help="libpq connection string to administer pgbouncer",
+ )
parser.add_option(
- '--dbname', dest='dbname', default='launchpad_prod', metavar='DBNAME',
- help='Database name we are updating.')
+ "--dbname",
+ dest="dbname",
+ default="launchpad_prod",
+ metavar="DBNAME",
+ help="Database name we are updating.",
+ )
parser.add_option(
- '--dbuser', dest='dbuser', default='postgres', metavar='USERNAME',
- help='Connect as USERNAME to databases')
+ "--dbuser",
+ dest="dbuser",
+ default="postgres",
+ metavar="USERNAME",
+ help="Connect as USERNAME to databases",
+ )
logger_options(parser, milliseconds=True)
(options, args) = parser.parse_args()
@@ -100,7 +107,8 @@ def main():
log = logger(options)
controller = DBController(
- log, options.pgbouncer, options.dbname, options.dbuser)
+ log, options.pgbouncer, options.dbname, options.dbuser
+ )
try:
# Primary connection, not running in autocommit to allow us to
@@ -146,21 +154,21 @@ def main():
return 95
if not KillConnectionsPreflight(
- log, controller,
- replication_paused=replication_paused).check_all():
+ log, controller, replication_paused=replication_paused
+ ).check_all():
return 100
log.info("Preflight check succeeded. Starting upgrade.")
# Does not commit primary_con, even on success.
upgrade_rc = run_upgrade(options, log, primary_con)
- upgrade_run = (upgrade_rc == 0)
+ upgrade_run = upgrade_rc == 0
if not upgrade_run:
return upgrade_rc
log.info("Database patches applied.")
# Commits primary_con on success.
security_rc = run_security(options, log, primary_con)
- security_run = (security_rc == 0)
+ security_run = security_rc == 0
if not security_run:
return security_rc
@@ -178,20 +186,23 @@ def main():
if replication_paused:
log.error(
"Failed to resume replication. Run pg_wal_replay_pause() "
- "on all standbys to manually resume.")
+ "on all standbys to manually resume."
+ )
else:
if controller.sync():
- log.info('Standbys in sync. Updates replicated.')
+ log.info("Standbys in sync. Updates replicated.")
else:
log.error(
- 'Standbys failed to sync. Updates may not be replicated.')
+ "Standbys failed to sync. Updates may not be replicated."
+ )
if standbys_disabled:
standbys_disabled = not controller.enable_standbys()
if standbys_disabled:
log.warning(
"Failed to enable standby databases in pgbouncer. "
- "Now running in primary-only mode.")
+ "Now running in primary-only mode."
+ )
# We will start seeing connections as soon as pgbouncer is
# reenabled, so ignore them here.
@@ -212,10 +223,12 @@ def main():
log.warning(
"Primary reenabled despite earlier failures. "
"Outage over %s, but we have problems",
- str(datetime.now() - outage_start))
+ str(datetime.now() - outage_start),
+ )
else:
log.warning(
- "Primary is still disabled in pgbouncer. Outage ongoing.")
+ "Primary is still disabled in pgbouncer. Outage ongoing."
+ )
if replication_paused:
controller.resume_replication()
@@ -224,5 +237,5 @@ def main():
controller.enable_standbys()
-if __name__ == '__main__':
+if __name__ == "__main__":
sys.exit(main())
diff --git a/database/schema/newbaseline.py b/database/schema/newbaseline.py
index 11c461d..afbb7ea 100755
--- a/database/schema/newbaseline.py
+++ b/database/schema/newbaseline.py
@@ -18,12 +18,9 @@ Then run it through this filter to create a new baseline::
./newbaseline.py < lpraw.sql > newbaseline.sql
"""
-from datetime import (
- datetime,
- timezone,
- )
import re
import sys
+from datetime import datetime, timezone
def main():
diff --git a/database/schema/online_fti_updater.py b/database/schema/online_fti_updater.py
index 075ceff..7712111 100755
--- a/database/schema/online_fti_updater.py
+++ b/database/schema/online_fti_updater.py
@@ -17,20 +17,21 @@ from fti import ALL_FTI
def main():
con = psycopg.connect("dbname=launchpad_prod user=postgres")
- con.set_isolation_level(0) # autocommit
+ con.set_isolation_level(0) # autocommit
cur = con.cursor()
for table, ignored in ALL_FTI:
- print('Doing %(table)s' % vars(), end='')
+ print("Doing %(table)s" % vars(), end="")
cur.execute("SELECT id FROM %(table)s" % vars())
ids = [row[0] for row in cur.fetchall()]
for id in ids:
cur.execute(
- "UPDATE %(table)s SET fti=NULL WHERE id=%(id)s" % vars()
- )
+ "UPDATE %(table)s SET fti=NULL WHERE id=%(id)s" % vars()
+ )
if id % 100 == 0:
- print('.', end='')
+ print(".", end="")
print()
-if __name__ == '__main__':
+
+if __name__ == "__main__":
main()
diff --git a/database/schema/preflight.py b/database/schema/preflight.py
index 9a34aec..e05e1c8 100755
--- a/database/schema/preflight.py
+++ b/database/schema/preflight.py
@@ -5,40 +5,30 @@
"""Confirm the database systems are ready to be patched as best we can."""
__all__ = [
- 'DatabasePreflight',
- 'KillConnectionsPreflight',
- 'NoConnectionCheckPreflight',
- 'streaming_sync',
- ]
+ "DatabasePreflight",
+ "KillConnectionsPreflight",
+ "NoConnectionCheckPreflight",
+ "streaming_sync",
+]
import _pythonpath # noqa: F401
-from datetime import timedelta
-from optparse import OptionParser
import os.path
import time
+from datetime import timedelta
+from optparse import OptionParser
import psycopg2
-from dbcontroller import (
- DBController,
- streaming_sync,
- )
+import upgrade
+from dbcontroller import DBController, streaming_sync
from lp.services.database import activity_cols
-from lp.services.database.sqlbase import (
- ISOLATION_LEVEL_AUTOCOMMIT,
- sqlvalues,
- )
-from lp.services.scripts import (
- logger,
- logger_options,
- )
+from lp.services.database.sqlbase import ISOLATION_LEVEL_AUTOCOMMIT, sqlvalues
+from lp.services.scripts import logger, logger_options
from replication.helpers import Node
-import upgrade
-
# Ignore connections by these users.
-SYSTEM_USERS = {'postgres', 'slony', 'nagios', 'lagmon'}
+SYSTEM_USERS = {"postgres", "slony", "nagios", "lagmon"}
# Fail checks if these users are connected. If a process should not be
# interrupted by a rollout, the database user it connects as should be
@@ -48,22 +38,22 @@ SYSTEM_USERS = {'postgres', 'slony', 'nagios', 'lagmon'}
FRAGILE_USERS = {
# process_accepted is fragile, but also fast so we likely shouldn't
# need to ever manually shut it down.
- 'process_accepted',
- 'process_upload',
- 'publish_distro',
- 'publish_ftpmaster',
- }
+ "process_accepted",
+ "process_upload",
+ "publish_distro",
+ "publish_ftpmaster",
+}
# If these users have long running transactions, just kill 'em. Entries
# added here must come with a bug number, a if part of Launchpad holds
# open a long running transaction it is a bug we need to fix.
BAD_USERS = {
- 'karma', # Bug #863109
- 'rosettaadmin', # Bug #863122
- 'update-pkg-cache', # Bug #912144
- 'process_death_row', # Bug #912146
- 'langpack', # Bug #912147
- }
+ "karma", # Bug #863109
+ "rosettaadmin", # Bug #863122
+ "update-pkg-cache", # Bug #912144
+ "process_death_row", # Bug #912146
+ "langpack", # Bug #912147
+}
# How lagged the cluster can be before failing the preflight check.
# If this is set too low, perfectly normal state will abort rollouts. If
@@ -91,8 +81,7 @@ class DatabasePreflight:
self._num_standbys = len(standbys)
for standby in standbys:
standby_node = Node(None, None, standby, False)
- standby_node.con = standby_node.connect(
- ISOLATION_LEVEL_AUTOCOMMIT)
+ standby_node.con = standby_node.connect(ISOLATION_LEVEL_AUTOCOMMIT)
self.nodes.add(standby_node)
self.lpmain_nodes.add(standby_node)
@@ -106,11 +95,13 @@ class DatabasePreflight:
if required_standbys != self._num_standbys:
self.log.fatal(
"%d streaming standbys connected, but %d provided on cli"
- % (required_standbys, self._num_standbys))
+ % (required_standbys, self._num_standbys)
+ )
return False
else:
self.log.debug(
- "%d streaming standby servers streaming", required_standbys)
+ "%d streaming standby servers streaming", required_standbys
+ )
return True
def check_is_superuser(self):
@@ -118,11 +109,13 @@ class DatabasePreflight:
success = True
for node in self.nodes:
cur = node.con.cursor()
- cur.execute("""
+ cur.execute(
+ """
SELECT current_database(), pg_user.usesuper
FROM pg_user
WHERE usename = current_user
- """)
+ """
+ )
dbname, is_super = cur.fetchone()
if is_super:
self.log.debug("Connected to %s as a superuser.", dbname)
@@ -143,23 +136,32 @@ class DatabasePreflight:
success = True
for node in self.lpmain_nodes:
cur = node.con.cursor()
- cur.execute("""
+ cur.execute(
+ """
SELECT datname, usename, COUNT(*) AS num_connections
FROM pg_stat_activity
WHERE
datname=current_database()
AND %(pid)s <> pg_backend_pid()
GROUP BY datname, usename
- """ % activity_cols(cur))
+ """
+ % activity_cols(cur)
+ )
for datname, usename, num_connections in cur.fetchall():
if usename in SYSTEM_USERS:
self.log.debug(
"%s has %d connections by %s",
- datname, num_connections, usename)
+ datname,
+ num_connections,
+ usename,
+ )
else:
self.log.fatal(
"%s has %d connections by %s",
- datname, num_connections, usename)
+ datname,
+ num_connections,
+ usename,
+ )
success = False
if success:
self.log.info("Only system users connected to the cluster")
@@ -174,7 +176,9 @@ class DatabasePreflight:
success = True
for node in self.lpmain_nodes:
cur = node.con.cursor()
- cur.execute(("""
+ cur.execute(
+ (
+ """
SELECT datname, usename, COUNT(*) AS num_connections
FROM pg_stat_activity
WHERE
@@ -182,17 +186,24 @@ class DatabasePreflight:
AND %(pid)s <> pg_backend_pid()
AND usename IN %%s
GROUP BY datname, usename
- """ % activity_cols(cur))
- % sqlvalues(FRAGILE_USERS))
+ """
+ % activity_cols(cur)
+ )
+ % sqlvalues(FRAGILE_USERS)
+ )
for datname, usename, num_connections in cur.fetchall():
self.log.fatal(
"Fragile system %s running. %s has %d connections.",
- usename, datname, num_connections)
+ usename,
+ datname,
+ num_connections,
+ )
success = False
if success:
self.log.debug(
"No fragile systems connected to the cluster (%s)"
- % ', '.join(FRAGILE_USERS))
+ % ", ".join(FRAGILE_USERS)
+ )
return success
def check_long_running_transactions(self, max_secs=60):
@@ -210,7 +221,8 @@ class DatabasePreflight:
success = True
for node in self.nodes:
cur = node.con.cursor()
- cur.execute("""
+ cur.execute(
+ """
SELECT
datname, usename,
age(current_timestamp, xact_start) AS age
@@ -218,16 +230,24 @@ class DatabasePreflight:
WHERE
age(current_timestamp, xact_start) > interval '%d secs'
AND datname=current_database()
- """ % max_secs)
+ """
+ % max_secs
+ )
for datname, usename, age in cur.fetchall():
if usename in BAD_USERS:
self.log.info(
"%s has transactions by %s open %s (ignoring)",
- datname, usename, age)
+ datname,
+ usename,
+ age,
+ )
else:
self.log.fatal(
"%s has transaction by %s open %s",
- datname, usename, age)
+ datname,
+ usename,
+ age,
+ )
success = False
if success:
self.log.debug("No long running transactions detected.")
@@ -238,7 +258,8 @@ class DatabasePreflight:
# Do something harmless to force changes to be streamed in case
# system is idle.
self.lpmain_primary_node.con.cursor().execute(
- 'ANALYZE LaunchpadDatabaseRevision')
+ "ANALYZE LaunchpadDatabaseRevision"
+ )
start_time = time.time()
# Keep looking for low lag for 30 seconds, in case the system
# was idle and streaming needs time to kick in.
@@ -246,14 +267,16 @@ class DatabasePreflight:
max_lag = timedelta(seconds=-1)
for node in self.nodes:
cur = node.con.cursor()
- cur.execute("""
+ cur.execute(
+ """
SELECT
pg_is_in_recovery(),
now() - pg_last_xact_replay_timestamp()
- """)
+ """
+ )
is_standby, lag = cur.fetchone()
if is_standby:
- self.log.debug2('streaming lag %s', lag)
+ self.log.debug2("streaming lag %s", lag)
max_lag = max(max_lag, lag)
if max_lag < MAX_LAG:
break
@@ -268,7 +291,8 @@ class DatabasePreflight:
else:
streaming_lagged = False
self.log.debug(
- "Streaming replication lag is not high (%s)", max_lag)
+ "Streaming replication lag is not high (%s)", max_lag
+ )
return not streaming_lagged
@@ -351,7 +375,9 @@ class KillConnectionsPreflight(DatabasePreflight):
all_clear = True
for node in nodes:
cur = node.con.cursor()
- cur.execute(("""
+ cur.execute(
+ (
+ """
SELECT
%(pid)s, datname, usename,
pg_terminate_backend(%(pid)s)
@@ -360,22 +386,28 @@ class KillConnectionsPreflight(DatabasePreflight):
datname=current_database()
AND %(pid)s <> pg_backend_pid()
AND usename NOT IN %%s
- """ % activity_cols(cur))
- % sqlvalues(SYSTEM_USERS))
+ """
+ % activity_cols(cur)
+ )
+ % sqlvalues(SYSTEM_USERS)
+ )
for pid, datname, usename, ignored in cur.fetchall():
all_clear = False
if loop_count == num_tries - 1:
self.log.fatal(
"Unable to kill %s [%s] on %s.",
- usename, pid, datname)
+ usename,
+ pid,
+ datname,
+ )
elif usename in BAD_USERS:
self.log.info(
- "Killed %s [%s] on %s.",
- usename, pid, datname)
+ "Killed %s [%s] on %s.", usename, pid, datname
+ )
else:
self.log.warning(
- "Killed %s [%s] on %s.",
- usename, pid, datname)
+ "Killed %s [%s] on %s.", usename, pid, datname
+ )
if all_clear:
break
@@ -389,24 +421,40 @@ def main():
parser = OptionParser()
logger_options(parser)
parser.add_option(
- "--skip-connection-check", dest='skip_connection_check',
- default=False, action="store_true",
- help="Don't check open connections.")
+ "--skip-connection-check",
+ dest="skip_connection_check",
+ default=False,
+ action="store_true",
+ help="Don't check open connections.",
+ )
parser.add_option(
- "--kill-connections", dest='kill_connections',
- default=False, action="store_true",
- help="Kill non-system connections instead of reporting an error.")
+ "--kill-connections",
+ dest="kill_connections",
+ default=False,
+ action="store_true",
+ help="Kill non-system connections instead of reporting an error.",
+ )
parser.add_option(
- '--pgbouncer', dest='pgbouncer',
- default='host=localhost port=6432 user=pgbouncer',
- metavar='CONN_STR',
- help="libpq connection string to administer pgbouncer")
+ "--pgbouncer",
+ dest="pgbouncer",
+ default="host=localhost port=6432 user=pgbouncer",
+ metavar="CONN_STR",
+ help="libpq connection string to administer pgbouncer",
+ )
parser.add_option(
- '--dbname', dest='dbname', default='launchpad_prod', metavar='DBNAME',
- help='Database name we are updating.')
+ "--dbname",
+ dest="dbname",
+ default="launchpad_prod",
+ metavar="DBNAME",
+ help="Database name we are updating.",
+ )
parser.add_option(
- '--dbuser', dest='dbuser', default='postgres', metavar='USERNAME',
- help='Connect as USERNAME to databases')
+ "--dbuser",
+ dest="dbuser",
+ default="postgres",
+ metavar="USERNAME",
+ help="Connect as USERNAME to databases",
+ )
(options, args) = parser.parse_args()
if args:
@@ -414,12 +462,14 @@ def main():
if options.kill_connections and options.skip_connection_check:
parser.error(
- "--skip-connection-check conflicts with --kill-connections")
+ "--skip-connection-check conflicts with --kill-connections"
+ )
log = logger(options)
controller = DBController(
- log, options.pgbouncer, options.dbname, options.dbuser)
+ log, options.pgbouncer, options.dbname, options.dbuser
+ )
if options.kill_connections:
preflight_check = KillConnectionsPreflight(log, controller)
@@ -429,12 +479,12 @@ def main():
preflight_check = DatabasePreflight(log, controller)
if preflight_check.check_all():
- log.info('Preflight check succeeded. Good to go.')
+ log.info("Preflight check succeeded. Good to go.")
return 0
else:
- log.error('Preflight check failed.')
+ log.error("Preflight check failed.")
return 1
-if __name__ == '__main__':
+if __name__ == "__main__":
raise SystemExit(main())
diff --git a/database/schema/reset_sequences.py b/database/schema/reset_sequences.py
index 52a0f3e..85feeb3 100755
--- a/database/schema/reset_sequences.py
+++ b/database/schema/reset_sequences.py
@@ -20,8 +20,7 @@ from lp.services.database.postgresql import resetSequences
from lp.services.database.sqlbase import connect
from lp.services.scripts import db_options
-
-if __name__ == '__main__':
+if __name__ == "__main__":
parser = OptionParser()
db_options(parser)
(options, args) = parser.parse_args()
diff --git a/database/schema/security.py b/database/schema/security.py
index 7281678..bf13886 100755
--- a/database/schema/security.py
+++ b/database/schema/security.py
@@ -5,54 +5,49 @@
import _pythonpath # noqa: F401
-from collections import defaultdict
-from configparser import ConfigParser
-from optparse import OptionParser
import os
import re
import sys
+from collections import defaultdict
+from configparser import ConfigParser
+from optparse import OptionParser
from fti import quote_identifier
from lp.services.database.sqlbase import connect
-from lp.services.scripts import (
- db_options,
- logger,
- logger_options,
- )
-
+from lp.services.scripts import db_options, logger, logger_options
# The 'read' group does not get given select permission on the following
# tables. This is to stop the ro user being given access to security
# sensitive information that interactive sessions don't need.
SECURE_TABLES = {
- 'public.oauthnonce',
- 'public.oauthnonce_id_seq',
- 'public.openidnonce',
- 'public.openidnonce_id_seq',
- 'public.openidconsumernonce',
- 'public.openidconsumernonce_id_seq',
- }
+ "public.oauthnonce",
+ "public.oauthnonce_id_seq",
+ "public.openidnonce",
+ "public.openidnonce_id_seq",
+ "public.openidconsumernonce",
+ "public.openidconsumernonce_id_seq",
+}
POSTGRES_ACL_MAP = {
- 'r': 'SELECT',
- 'w': 'UPDATE',
- 'a': 'INSERT',
- 'd': 'DELETE',
- 'D': 'TRUNCATE',
- 'x': 'REFERENCES',
- 't': 'TRIGGER',
- 'X': 'EXECUTE',
- 'U': 'USAGE',
- 'C': 'CREATE',
- 'c': 'CONNECT',
- 'T': 'TEMPORARY',
- }
+ "r": "SELECT",
+ "w": "UPDATE",
+ "a": "INSERT",
+ "d": "DELETE",
+ "D": "TRUNCATE",
+ "x": "REFERENCES",
+ "t": "TRIGGER",
+ "X": "EXECUTE",
+ "U": "USAGE",
+ "C": "CREATE",
+ "c": "CONNECT",
+ "T": "TEMPORARY",
+}
# PostgreSQL's putid emits an unquoted string if every character in the role
# name isalnum or is _. Otherwise the name is enclosed in double quotes, and
# any embedded double quotes are doubled.
QUOTED_STRING_RE = r'(?:([A-Za-z0-9_]+)|"([^"]*(?:""[^"]*)*)")?'
-ACLITEM_RE = re.compile(r'^%(qs)s=([\w*]*)/%(qs)s$' % {'qs': QUOTED_STRING_RE})
+ACLITEM_RE = re.compile(r"^%(qs)s=([\w*]*)/%(qs)s$" % {"qs": QUOTED_STRING_RE})
def _split_postgres_aclitem(aclitem):
@@ -60,10 +55,11 @@ def _split_postgres_aclitem(aclitem):
Returns the (grantee, privs, grantor), unquoted and separated.
"""
- grantee_1, grantee_2, privs, grantor_1, grantor_2 = (
- ACLITEM_RE.match(aclitem).groups())
- grantee = (grantee_1 or grantee_2 or '').replace('""', '"')
- grantor = (grantor_1 or grantor_2 or '').replace('""', '"')
+ grantee_1, grantee_2, privs, grantor_1, grantor_2 = ACLITEM_RE.match(
+ aclitem
+ ).groups()
+ grantee = (grantee_1 or grantee_2 or "").replace('""', '"')
+ grantor = (grantor_1 or grantor_2 or "").replace('""', '"')
return grantee, privs, grantor
@@ -85,11 +81,11 @@ def parse_postgres_acl(acl):
grantee, dict_privs = parsed_acl_cache[entry]
else:
grantee, privs, grantor = _split_postgres_aclitem(entry)
- if grantee == '':
- grantee = 'public'
+ if grantee == "":
+ grantee = "public"
parsed_privs = []
for priv in privs:
- if priv == '*':
+ if priv == "*":
parsed_privs[-1] = (parsed_privs[-1][0], True)
continue
parsed_privs.append((POSTGRES_ACL_MAP[priv], False))
@@ -101,14 +97,17 @@ def parse_postgres_acl(acl):
def list_role_members(cur, roles):
"""Return a dict of roles that are members of the given roles."""
- cur.execute("""
+ cur.execute(
+ """
SELECT grp.rolname, member.rolname
FROM
pg_authid member
JOIN pg_auth_members ON pg_auth_members.member = member.oid
JOIN pg_authid grp ON pg_auth_members.roleid = grp.oid
- WHERE grp.rolname IN (%s)""" % ', '.join(['%s'] * len(roles)),
- params=roles)
+ WHERE grp.rolname IN (%s)"""
+ % ", ".join(["%s"] * len(roles)),
+ params=roles,
+ )
members = defaultdict(set)
for group, member in cur.fetchall():
members[group].add(member)
@@ -116,9 +115,9 @@ def list_role_members(cur, roles):
class DbObject:
-
def __init__(
- self, schema, name, type_, owner, acl, arguments=None, language=None):
+ self, schema, name, type_, owner, acl, arguments=None, language=None
+ ):
self.schema = schema
self.name = name
self.type = type_
@@ -136,15 +135,15 @@ class DbObject:
@property
def fullname(self):
fn = "%s.%s" % (self.schema, self.name)
- if self.type == 'function':
+ if self.type == "function":
fn = "%s(%s)" % (fn, self.arguments)
return fn
@property
def seqname(self):
- if self.type != 'table':
- return ''
- return "%s.%s" % (self.schema, self.name + '_id_seq')
+ if self.type != "table":
+ return ""
+ return "%s.%s" % (self.schema, self.name + "_id_seq")
class DbSchema(dict):
@@ -152,7 +151,8 @@ class DbSchema(dict):
super().__init__()
cur = con.cursor()
log.debug("Getting relation metadata")
- cur.execute('''
+ cur.execute(
+ """
SELECT
n.nspname as "Schema",
c.relname as "Name",
@@ -174,14 +174,17 @@ class DbSchema(dict):
'pgdbr', 'pgdbrdata', 'todrop', '_sl')
AND c.relpersistence <> 't'
ORDER BY 1,2
- ''')
+ """
+ )
for schema, name, type_, owner, acl in cur.fetchall():
- key = '%s.%s' % (schema, name)
+ key = "%s.%s" % (schema, name)
self[key] = DbObject(
- schema, name, type_, owner, parse_postgres_acl(acl))
+ schema, name, type_, owner, parse_postgres_acl(acl)
+ )
log.debug("Getting function metadata")
- cur.execute(r"""
+ cur.execute(
+ r"""
SELECT
n.nspname as "schema",
p.proname as "name",
@@ -199,58 +202,72 @@ class DbSchema(dict):
AND n.nspname NOT IN (
'pg_catalog', 'pg_toast', 'trgm', 'information_schema',
'pgdbr', 'pgdbrdata', 'todrop', '_sl')
- """)
+ """
+ )
for schema, name, arguments, owner, acl, language in cur.fetchall():
- self['%s.%s(%s)' % (schema, name, arguments)] = DbObject(
- schema, name, 'function', owner, parse_postgres_acl(acl),
- arguments, language)
+ self["%s.%s(%s)" % (schema, name, arguments)] = DbObject(
+ schema,
+ name,
+ "function",
+ owner,
+ parse_postgres_acl(acl),
+ arguments,
+ language,
+ )
# Pull a list of roles
log.debug("Getting role metadata")
- cur.execute("""
+ cur.execute(
+ """
SELECT
rolname, rolsuper, rolinherit, rolcreaterole, rolcreatedb,
rolcanlogin, rolreplication
FROM pg_roles
- """)
+ """
+ )
options = (
- 'SUPERUSER', 'INHERIT', 'CREATEROLE', 'CREATEDB', 'LOGIN',
- 'REPLICATION')
+ "SUPERUSER",
+ "INHERIT",
+ "CREATEROLE",
+ "CREATEDB",
+ "LOGIN",
+ "REPLICATION",
+ )
self.roles = {
r[0]: {opt for (opt, val) in zip(options, r[1:]) if val}
- for r in cur.fetchall()}
+ for r in cur.fetchall()
+ }
class CursorWrapper:
-
def __init__(self, cursor):
- self.__dict__['_cursor'] = cursor
+ self.__dict__["_cursor"] = cursor
def execute(self, cmd, params=None):
- cmd = cmd.encode('utf8')
+ cmd = cmd.encode("utf8")
if params is None:
- log.debug3('%s' % (cmd, ))
- return self.__dict__['_cursor'].execute(cmd)
+ log.debug3("%s" % (cmd,))
+ return self.__dict__["_cursor"].execute(cmd)
else:
- log.debug3('%s [%r]' % (cmd, params))
- return self.__dict__['_cursor'].execute(cmd, params)
+ log.debug3("%s [%r]" % (cmd, params))
+ return self.__dict__["_cursor"].execute(cmd, params)
def __getattr__(self, key):
- return getattr(self.__dict__['_cursor'], key)
+ return getattr(self.__dict__["_cursor"], key)
def __setattr__(self, key, value):
- return setattr(self.__dict__['_cursor'], key, value)
+ return setattr(self.__dict__["_cursor"], key, value)
CONFIG_DEFAULTS = {
- 'groups': '',
- }
+ "groups": "",
+}
def main(options, primary_con=None):
# Load the config file
config = ConfigParser(CONFIG_DEFAULTS)
- configfile_name = os.path.join(os.path.dirname(__file__), 'security.cfg')
+ configfile_name = os.path.join(os.path.dirname(__file__), "security.cfg")
config.read([configfile_name])
if primary_con is None:
@@ -306,8 +323,7 @@ class PermissionGatherer:
result = []
for permission, parties in self.permissions.items():
for principal, entities in parties.items():
- result.append(
- (permission, ", ".join(entities), principal))
+ result.append((permission, ", ".join(entities), principal))
return result
def countPermissions(self):
@@ -324,9 +340,17 @@ class PermissionGatherer:
def countPrincipals(self):
"""Count the number of different principals."""
- return len(set(sum((
- list(principals)
- for principals in self.permissions.values()), [])))
+ return len(
+ set(
+ sum(
+ (
+ list(principals)
+ for principals in self.permissions.values()
+ ),
+ [],
+ )
+ )
+ )
def grant(self, cur):
"""Grant all gathered permissions.
@@ -338,11 +362,16 @@ class PermissionGatherer:
self.countPermissions(),
self.countEntities(),
self.entity_keyword,
- self.countPrincipals())
+ self.countPrincipals(),
+ )
grant_count = 0
for permissions, entities, principals in self.tabulate():
grant = "GRANT %s ON %s %s TO %s" % (
- permissions, self.entity_keyword, entities, principals)
+ permissions,
+ self.entity_keyword,
+ entities,
+ principals,
+ )
log.debug2(grant)
cur.execute(grant)
grant_count += 1
@@ -358,11 +387,16 @@ class PermissionGatherer:
self.countPermissions(),
self.countEntities(),
self.entity_keyword,
- self.countPrincipals())
+ self.countPrincipals(),
+ )
revoke_count = 0
for permissions, entities, principals in self.tabulate():
revoke = "REVOKE %s ON %s %s FROM %s" % (
- permissions, self.entity_keyword, entities, principals)
+ permissions,
+ self.entity_keyword,
+ entities,
+ principals,
+ )
log.debug2(revoke)
cur.execute(revoke)
revoke_count += 1
@@ -377,14 +411,15 @@ def alter_permissions(cur, which, revoke=False):
:param revoke: whether to revoke or grant permissions
"""
gatherers = {
- 'table': PermissionGatherer("TABLE"),
- 'function': PermissionGatherer("FUNCTION"),
- 'sequence': PermissionGatherer("SEQUENCE"),
- }
+ "table": PermissionGatherer("TABLE"),
+ "function": PermissionGatherer("FUNCTION"),
+ "sequence": PermissionGatherer("SEQUENCE"),
+ }
for obj, role, perms in which:
- gatherers.get(obj.type, gatherers['table']).add(
- ', '.join(perms), obj.fullname, quote_identifier(role))
+ gatherers.get(obj.type, gatherers["table"]).add(
+ ", ".join(perms), obj.fullname, quote_identifier(role)
+ )
for gatherer in gatherers.values():
if revoke:
@@ -401,7 +436,7 @@ def reset_permissions(con, config, options):
groups = set()
# Add our two automatically maintained groups
- for group in ['read', 'admin']:
+ for group in ["read", "admin"]:
groups.add(group)
if group not in schema.roles:
log.debug("Creating %s role" % group)
@@ -411,21 +446,22 @@ def reset_permissions(con, config, options):
# Create all required groups and users.
log.debug("Configuring roles")
for section_name in config.sections():
- if section_name.lower() == 'public':
+ if section_name.lower() == "public":
continue
- assert not section_name.endswith('_ro'), (
- '_ro namespace is reserved (%s)' % repr(section_name))
+ assert not section_name.endswith(
+ "_ro"
+ ), "_ro namespace is reserved (%s)" % repr(section_name)
- type_ = config.get(section_name, 'type')
- assert type_ in ['user', 'group'], 'Unknown type %s' % type_
+ type_ = config.get(section_name, "type")
+ assert type_ in ["user", "group"], "Unknown type %s" % type_
- desired_opts = {'INHERIT'}
- if type_ == 'user':
- desired_opts.add('LOGIN')
+ desired_opts = {"INHERIT"}
+ if type_ == "user":
+ desired_opts.add("LOGIN")
- for username in [section_name, '%s_ro' % section_name]:
- if type_ == 'group':
+ for username in [section_name, "%s_ro" % section_name]:
+ if type_ == "group":
groups.add(username)
if username in schema.roles:
existing_opts = schema.roles[username]
@@ -434,69 +470,84 @@ def reset_permissions(con, config, options):
# objects in other databases. We need to ensure they are
# not superusers though!
log.debug2("Resetting role options of %s role.", username)
- changes = ' '.join(
+ changes = " ".join(
list(desired_opts - existing_opts)
- + ['NO' + o for o in (existing_opts - desired_opts)])
+ + ["NO" + o for o in (existing_opts - desired_opts)]
+ )
cur.execute(
- "ALTER ROLE %s WITH %s" % (
- quote_identifier(username), changes))
+ "ALTER ROLE %s WITH %s"
+ % (quote_identifier(username), changes)
+ )
else:
log.debug("Creating %s role.", username)
cur.execute(
"CREATE ROLE %s WITH %s"
- % (quote_identifier(username), ' '.join(desired_opts)))
+ % (quote_identifier(username), " ".join(desired_opts))
+ )
schema.roles[username] = set()
# Set default read-only mode for our roles.
cur.execute(
- 'ALTER ROLE %s SET default_transaction_read_only TO FALSE'
- % quote_identifier(section_name))
+ "ALTER ROLE %s SET default_transaction_read_only TO FALSE"
+ % quote_identifier(section_name)
+ )
cur.execute(
- 'ALTER ROLE %s SET default_transaction_read_only TO TRUE'
- % quote_identifier('%s_ro' % section_name))
+ "ALTER ROLE %s SET default_transaction_read_only TO TRUE"
+ % quote_identifier("%s_ro" % section_name)
+ )
# Add users to groups
- log.debug('Collecting group memberships')
+ log.debug("Collecting group memberships")
memberships = defaultdict(set)
for user in config.sections():
- if config.get(user, 'type') != 'user':
+ if config.get(user, "type") != "user":
continue
groups = [
- g.strip() for g in config.get(user, 'groups').split(',')
- if g.strip()]
+ g.strip()
+ for g in config.get(user, "groups").split(",")
+ if g.strip()
+ ]
# Read-Only users get added to Read-Only groups.
- if user.endswith('_ro'):
- groups = ['%s_ro' % group for group in groups]
+ if user.endswith("_ro"):
+ groups = ["%s_ro" % group for group in groups]
if groups:
- log.debug2("Adding %s to %s roles", user, ', '.join(groups))
+ log.debug2("Adding %s to %s roles", user, ", ".join(groups))
for group in groups:
memberships[group].add(user)
else:
log.debug2("%s not in any roles", user)
- managed_roles = {'read', 'admin'}
+ managed_roles = {"read", "admin"}
for section_name in config.sections():
managed_roles.add(section_name)
- if section_name != 'public':
+ if section_name != "public":
managed_roles.add(section_name + "_ro")
- log.debug('Updating group memberships')
+ log.debug("Updating group memberships")
existing_memberships = list_role_members(cur, list(memberships))
for group, users in memberships.items():
cur_users = managed_roles.intersection(existing_memberships[group])
to_grant = users - cur_users
if to_grant:
- cur.execute("GRANT %s TO %s" % (
- quote_identifier(group),
- ', '.join(quote_identifier(user) for user in to_grant)))
+ cur.execute(
+ "GRANT %s TO %s"
+ % (
+ quote_identifier(group),
+ ", ".join(quote_identifier(user) for user in to_grant),
+ )
+ )
to_revoke = cur_users - users
if options.revoke and to_revoke:
- cur.execute("REVOKE %s FROM %s" % (
- quote_identifier(group),
- ', '.join(quote_identifier(user) for user in to_revoke)))
+ cur.execute(
+ "REVOKE %s FROM %s"
+ % (
+ quote_identifier(group),
+ ", ".join(quote_identifier(user) for user in to_revoke),
+ )
+ )
if options.revoke:
- log.debug('Resetting object owners')
+ log.debug("Resetting object owners")
# Change ownership of all objects to OWNER.
# We skip this in --no-revoke mode as ownership changes may
# block on a live system.
@@ -506,8 +557,10 @@ def reset_permissions(con, config, options):
else:
if obj.owner != options.owner:
log.info("Resetting ownership of %s", obj.fullname)
- cur.execute("ALTER TABLE %s OWNER TO %s" % (
- obj.fullname, quote_identifier(options.owner)))
+ cur.execute(
+ "ALTER TABLE %s OWNER TO %s"
+ % (obj.fullname, quote_identifier(options.owner))
+ )
else:
log.info("Not resetting ownership of database objects")
@@ -526,19 +579,19 @@ def reset_permissions(con, config, options):
# functions) aren't readable.
granted_objs = set()
- log.debug('Collecting permissions')
+ log.debug("Collecting permissions")
for username in config.sections():
who = username
- if username == 'public':
+ if username == "public":
who_ro = who
else:
- who_ro = '%s_ro' % username
+ who_ro = "%s_ro" % username
for obj_name, perm in config.items(username):
- if '.' not in obj_name:
+ if "." not in obj_name:
continue
if obj_name not in valid_objs:
- log.warning('Bad object name %r', obj_name)
+ log.warning("Bad object name %r", obj_name)
continue
obj = schema[obj_name]
@@ -551,21 +604,21 @@ def reset_permissions(con, config, options):
granted_objs.add(obj)
- if obj.type == 'function':
- desired_permissions[obj][who].update(perm.split(', '))
+ if obj.type == "function":
+ desired_permissions[obj][who].update(perm.split(", "))
if who_ro:
desired_permissions[obj][who_ro].add("EXECUTE")
else:
- desired_permissions[obj][who].update(perm.split(', '))
+ desired_permissions[obj][who].update(perm.split(", "))
if who_ro:
desired_permissions[obj][who_ro].add("SELECT")
if obj.seqname in valid_objs:
seq = schema[obj.seqname]
granted_objs.add(seq)
- if 'INSERT' in perm:
- seqperm = 'USAGE'
- elif 'SELECT' in perm:
- seqperm = 'SELECT'
+ if "INSERT" in perm:
+ seqperm = "USAGE"
+ elif "SELECT" in perm:
+ seqperm = "SELECT"
else:
seqperm = None
if seqperm:
@@ -575,29 +628,32 @@ def reset_permissions(con, config, options):
# read gets read access to all non-secure objects that we've granted
# anybody access to.
for obj in granted_objs:
- if obj.type == 'function':
- desired_permissions[obj]['read'].add("EXECUTE")
+ if obj.type == "function":
+ desired_permissions[obj]["read"].add("EXECUTE")
else:
if obj.fullname not in SECURE_TABLES:
- desired_permissions[obj]['read'].add("SELECT")
+ desired_permissions[obj]["read"].add("SELECT")
# Set permissions on public schemas
public_schemas = [
- s.strip() for s in config.get('DEFAULT', 'public_schemas').split(',')
- if s.strip()]
+ s.strip()
+ for s in config.get("DEFAULT", "public_schemas").split(",")
+ if s.strip()
+ ]
log.debug("Granting access to %d public schemas", len(public_schemas))
for schema_name in public_schemas:
- cur.execute("GRANT USAGE ON SCHEMA %s TO PUBLIC" % (
- quote_identifier(schema_name),
- ))
+ cur.execute(
+ "GRANT USAGE ON SCHEMA %s TO PUBLIC"
+ % (quote_identifier(schema_name),)
+ )
for obj in schema.values():
if obj.schema not in public_schemas:
continue
found.add(obj)
- if obj.type == 'function':
- desired_permissions[obj]['public'].add('EXECUTE')
+ if obj.type == "function":
+ desired_permissions[obj]["public"].add("EXECUTE")
else:
- desired_permissions[obj]['public'].add('SELECT')
+ desired_permissions[obj]["public"].add("SELECT")
# For every object in the DB, ensure that the privileges held by our
# managed roles match our expectations. If not, store the delta
@@ -607,7 +663,7 @@ def reset_permissions(con, config, options):
unmanaged_roles = set()
required_grants = []
required_revokes = []
- log.debug('Calculating permission delta')
+ log.debug("Calculating permission delta")
for obj in schema.values():
# We only care about roles that are in either the desired or
# existing ACL, and are also our managed roles. But skip admin,
@@ -615,7 +671,7 @@ def reset_permissions(con, config, options):
interesting_roles = set(desired_permissions[obj]).union(obj.acl)
unmanaged_roles.update(interesting_roles.difference(managed_roles))
for role in managed_roles.intersection(interesting_roles):
- if role == 'admin':
+ if role == "admin":
continue
new = desired_permissions[obj][role]
old_privs = obj.acl.get(role, {})
@@ -657,10 +713,13 @@ def reset_permissions(con, config, options):
continue
if obj not in found:
forgotten.add(obj)
- forgotten = [obj.fullname for obj in forgotten
- if obj.type in ['table', 'function', 'view']]
+ forgotten = [
+ obj.fullname
+ for obj in forgotten
+ if obj.type in ["table", "function", "view"]
+ ]
if forgotten:
- log.warning('No permissions specified for %r', forgotten)
+ log.warning("No permissions specified for %r", forgotten)
if options.dryrun:
log.info("Dry run - rolling back changes")
@@ -670,20 +729,37 @@ def reset_permissions(con, config, options):
con.commit()
-if __name__ == '__main__':
+if __name__ == "__main__":
parser = OptionParser()
parser.add_option(
- "-n", "--dry-run", dest="dryrun", default=False,
- action="store_true", help="Don't commit any changes")
+ "-n",
+ "--dry-run",
+ dest="dryrun",
+ default=False,
+ action="store_true",
+ help="Don't commit any changes",
+ )
parser.add_option(
- "--revoke", dest="revoke", default=True, action="store_true",
- help="Revoke privileges as well as add them")
+ "--revoke",
+ dest="revoke",
+ default=True,
+ action="store_true",
+ help="Revoke privileges as well as add them",
+ )
parser.add_option(
- "--no-revoke", dest="revoke", default=True, action="store_false",
- help="Do not revoke any privileges. Just add.")
+ "--no-revoke",
+ dest="revoke",
+ default=True,
+ action="store_false",
+ help="Do not revoke any privileges. Just add.",
+ )
parser.add_option(
- "-o", "--owner", dest="owner", default="postgres",
- help="Owner of PostgreSQL objects")
+ "-o",
+ "--owner",
+ dest="owner",
+ default="postgres",
+ help="Owner of PostgreSQL objects",
+ )
db_options(parser)
logger_options(parser)
diff --git a/database/schema/sort_sql.py b/database/schema/sort_sql.py
index de33fc4..f6210e3 100755
--- a/database/schema/sort_sql.py
+++ b/database/schema/sort_sql.py
@@ -28,10 +28,7 @@ import _pythonpath # noqa: F401
import sys
-from lp.services.database.sort_sql import (
- Parser,
- print_lines_sorted,
- )
+from lp.services.database.sort_sql import Parser, print_lines_sorted
def main(argv):
@@ -50,5 +47,5 @@ def main(argv):
return 0
-if __name__ == '__main__':
+if __name__ == "__main__":
sys.exit(main(sys.argv))
diff --git a/database/schema/unautovacuumable.py b/database/schema/unautovacuumable.py
index e92593e..69ab428 100755
--- a/database/schema/unautovacuumable.py
+++ b/database/schema/unautovacuumable.py
@@ -17,20 +17,13 @@ __all__ = []
import _pythonpath # noqa: F401
-from optparse import OptionParser
import sys
import time
+from optparse import OptionParser
from lp.services.database import activity_cols
-from lp.services.database.sqlbase import (
- connect,
- ISOLATION_LEVEL_AUTOCOMMIT,
- )
-from lp.services.scripts import (
- db_options,
- logger,
- logger_options,
- )
+from lp.services.database.sqlbase import ISOLATION_LEVEL_AUTOCOMMIT, connect
+from lp.services.scripts import db_options, logger, logger_options
def main():
@@ -51,30 +44,38 @@ def main():
cur = con.cursor()
log.debug("Disabling autovacuum on all tables in the database.")
- cur.execute("""
+ cur.execute(
+ """
SELECT nspname,relname
FROM pg_namespace, pg_class
WHERE relnamespace = pg_namespace.oid
AND relkind = 'r' AND nspname <> 'pg_catalog'
- """)
+ """
+ )
for namespace, table in list(cur.fetchall()):
- cur.execute("""
+ cur.execute(
+ """
ALTER TABLE ONLY "%s"."%s" SET (
autovacuum_enabled=false,
toast.autovacuum_enabled=false)
- """ % (namespace, table))
+ """
+ % (namespace, table)
+ )
log.debug("Killing existing autovacuum processes")
num_autovacuums = -1
while num_autovacuums != 0:
# Sleep long enough for pg_stat_activity to be updated.
time.sleep(0.6)
- cur.execute("""
+ cur.execute(
+ """
SELECT %(pid)s FROM pg_stat_activity
WHERE
datname=current_database()
AND %(query)s LIKE 'autovacuum: %%'
- """ % activity_cols(cur))
+ """
+ % activity_cols(cur)
+ )
autovacuums = [row[0] for row in cur.fetchall()]
num_autovacuums = len(autovacuums)
for pid in autovacuums:
@@ -82,5 +83,5 @@ def main():
cur.execute("SELECT pg_cancel_backend(%d)" % pid)
-if __name__ == '__main__':
+if __name__ == "__main__":
sys.exit(main())
diff --git a/database/schema/upgrade.py b/database/schema/upgrade.py
index 16fcf12..7c139cc 100755
--- a/database/schema/upgrade.py
+++ b/database/schema/upgrade.py
@@ -10,22 +10,14 @@ Apply all outstanding schema patches to an existing launchpad database
import _pythonpath # noqa: F401
import glob
-from optparse import OptionParser
import os.path
import re
import subprocess
+from optparse import OptionParser
from textwrap import dedent
-from lp.services.database.sqlbase import (
- connect,
- sqlvalues,
- )
-from lp.services.scripts import (
- db_options,
- logger,
- logger_options,
- )
-
+from lp.services.database.sqlbase import connect, sqlvalues
+from lp.services.scripts import db_options, logger, logger_options
SCHEMA_DIR = os.path.dirname(os.path.abspath(__file__))
@@ -62,11 +54,14 @@ def main(con=None):
# have to apply trusted.sql before applying patches (in addition to
# other preamble time such as Slony-I grabbing locks).
# FIX_PATCH_TIMES_POST_SQL does the repair work.
-FIX_PATCH_TIMES_PRE_SQL = dedent("""\
+FIX_PATCH_TIMES_PRE_SQL = dedent(
+ """\
CREATE TEMPORARY TABLE _start_time AS (
SELECT statement_timestamp() AT TIME ZONE 'UTC' AS start_time);
- """)
-FIX_PATCH_TIMES_POST_SQL = dedent("""\
+ """
+)
+FIX_PATCH_TIMES_POST_SQL = dedent(
+ """\
UPDATE LaunchpadDatabaseRevision
SET start_time = prev_end_time
FROM (
@@ -98,57 +93,69 @@ FIX_PATCH_TIMES_POST_SQL = dedent("""\
WHERE
LaunchpadDatabaseRevision.start_time
= transaction_timestamp() AT TIME ZONE 'UTC';
- """)
+ """
+)
def report_patch_times(con, todays_patches):
"""Report how long it took to apply the given patches."""
cur = con.cursor()
- todays_patches = [patch_tuple for patch_tuple, patch_file
- in todays_patches]
+ todays_patches = [
+ patch_tuple for patch_tuple, patch_file in todays_patches
+ ]
- cur.execute("""
+ cur.execute(
+ """
SELECT
major, minor, patch, start_time, end_time - start_time AS db_time
FROM LaunchpadDatabaseRevision
WHERE start_time > CURRENT_TIMESTAMP AT TIME ZONE 'UTC'
- CAST('1 month' AS interval)
ORDER BY major, minor, patch
- """)
+ """
+ )
for major, minor, patch, start_time, db_time in cur.fetchall():
if (major, minor, patch) in todays_patches:
continue
db_time = db_time.total_seconds()
- start_time = start_time.strftime('%Y-%m-%d')
+ start_time = start_time.strftime("%Y-%m-%d")
log.info(
"%d-%02d-%d previously applied %s in %0.1f seconds"
- % (major, minor, patch, start_time, db_time))
+ % (major, minor, patch, start_time, db_time)
+ )
for major, minor, patch in todays_patches:
- cur.execute("""
+ cur.execute(
+ """
SELECT end_time - start_time AS db_time
FROM LaunchpadDatabaseRevision
WHERE major = %s AND minor = %s AND patch = %s
- """, (major, minor, patch))
+ """,
+ (major, minor, patch),
+ )
db_time = cur.fetchone()[0]
# Patches before 2208-01-1 don't have timing information.
# Ignore this. We can remove this code the next time we
# create a new database baseline, as all patches will have
# timing information.
if db_time is None:
- log.debug('%d-%d-%d no application time', major, minor, patch)
+ log.debug("%d-%d-%d no application time", major, minor, patch)
continue
log.info(
"%d-%02d-%d applied just now in %0.1f seconds",
- major, minor, patch, db_time.total_seconds())
+ major,
+ minor,
+ patch,
+ db_time.total_seconds(),
+ )
def apply_patches_normal(con):
"""Update a non replicated database."""
# trusted.sql contains all our stored procedures, which may
# be required for patches to apply correctly so must be run first.
- apply_other(con, 'trusted.sql')
+ apply_other(con, "trusted.sql")
# Prepare to repair patch timestamps if necessary.
cur = con.cursor()
@@ -160,8 +167,7 @@ def apply_patches_normal(con):
apply_patch(con, major, minor, patch, patch_file)
# Repair patch timestamps if necessary.
- cur.execute(
- FIX_PATCH_TIMES_POST_SQL % sqlvalues(*get_vcs_details()))
+ cur.execute(FIX_PATCH_TIMES_POST_SQL % sqlvalues(*get_vcs_details()))
# Update comments.
apply_comments(con)
@@ -196,27 +202,27 @@ def get_patchlist(con):
# Generate a list of all patches we might want to apply
patches = []
all_patch_files = glob.glob(
- os.path.join(SCHEMA_DIR, 'patch-????-??-?.sql'))
+ os.path.join(SCHEMA_DIR, "patch-????-??-?.sql")
+ )
all_patch_files.sort()
for patch_file in all_patch_files:
- m = re.search(r'patch-(\d+)-(\d+)-(\d).sql$', patch_file)
+ m = re.search(r"patch-(\d+)-(\d+)-(\d).sql$", patch_file)
if m is None:
- log.fatal('Invalid patch filename %s' % repr(patch_file))
+ log.fatal("Invalid patch filename %s" % repr(patch_file))
raise SystemExit(1)
major, minor, patch = (int(i) for i in m.groups())
if (major, minor, patch) in dbpatches:
continue # This patch has already been applied
- log.debug("Found patch %d.%d.%d -- %s" % (
- major, minor, patch, patch_file
- ))
+ log.debug(
+ "Found patch %d.%d.%d -- %s" % (major, minor, patch, patch_file)
+ )
patches.append(((major, minor, patch), patch_file))
return patches
def applied_patches(con):
- """Return a list of all patches that have been applied to the database.
- """
+ """Return a list of all patches that have been applied to the database."""
cur = con.cursor()
cur.execute("SELECT major, minor, patch FROM LaunchpadDatabaseRevision")
return [tuple(row) for row in cur.fetchall()]
@@ -229,8 +235,10 @@ def apply_patch(con, major, minor, patch, patch_file):
# automatically and avoid the boilerplate, but then we would lose the
# ability to easily apply the patches manually.
if (major, minor, patch) not in applied_patches(con):
- log.fatal("%s failed to update LaunchpadDatabaseRevision correctly"
- % patch_file)
+ log.fatal(
+ "%s failed to update LaunchpadDatabaseRevision correctly"
+ % patch_file
+ )
raise SystemExit(2)
# Commit changes if we allow partial updates.
@@ -245,12 +253,13 @@ def apply_other(con, script, no_commit=False):
path = os.path.join(os.path.dirname(__file__), script)
with open(path) as f:
sql = f.read()
- if not sql.rstrip().endswith(';'):
+ if not sql.rstrip().endswith(";"):
# This is important because patches are concatenated together
# into a single script when we apply them to a replicated
# environment.
log.fatal(
- "Last non-whitespace character of %s must be a semicolon", script)
+ "Last non-whitespace character of %s must be a semicolon", script
+ )
raise SystemExit(3)
cur.execute(sql)
@@ -261,7 +270,7 @@ def apply_other(con, script, no_commit=False):
def apply_comments(con):
if options.comments:
- apply_other(con, 'comments.sql')
+ apply_other(con, "comments.sql")
else:
log.debug("Skipping comments.sql per command line settings")
@@ -279,30 +288,51 @@ def get_vcs_details():
if _vcs_details_cache is None:
branch_nick = subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
- cwd=SCHEMA_DIR, universal_newlines=True).rstrip("\n")
+ cwd=SCHEMA_DIR,
+ universal_newlines=True,
+ ).rstrip("\n")
revision_id = subprocess.check_output(
["git", "rev-parse", "HEAD"],
- cwd=SCHEMA_DIR, universal_newlines=True).rstrip("\n")
+ cwd=SCHEMA_DIR,
+ universal_newlines=True,
+ ).rstrip("\n")
_vcs_details_cache = (branch_nick, revision_id)
return _vcs_details_cache
-if __name__ == '__main__':
+if __name__ == "__main__":
parser = OptionParser()
db_options(parser)
logger_options(parser)
parser.add_option(
- "-n", "--dry-run", dest="commit", default=True,
- action="store_false", help="Don't actually commit changes")
+ "-n",
+ "--dry-run",
+ dest="commit",
+ default=True,
+ action="store_false",
+ help="Don't actually commit changes",
+ )
parser.add_option(
- "--partial", dest="partial", default=False,
- action="store_true", help="Commit after applying each patch")
+ "--partial",
+ dest="partial",
+ default=False,
+ action="store_true",
+ help="Commit after applying each patch",
+ )
parser.add_option(
- "--skip-comments", dest="comments", default=True,
- action="store_false", help="Skip applying comments.sql")
+ "--skip-comments",
+ dest="comments",
+ default=True,
+ action="store_false",
+ help="Skip applying comments.sql",
+ )
parser.add_option(
- "--separate-sessions", dest="separate_sessions", default=False,
- action="store_true", help="Apply each patch in a separate session")
+ "--separate-sessions",
+ dest="separate_sessions",
+ default=False,
+ action="store_true",
+ help="Apply each patch in a separate session",
+ )
(options, args) = parser.parse_args()
if args: